Files
CyberStrikeAI/tools/http-framework-test.yaml
2025-11-24 00:37:59 +08:00

615 lines
22 KiB
YAML
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
name: "http-framework-test"
command: "python3"
args:
- "-c"
- |
import argparse
import json
import re
import shlex
import subprocess
import sys
import time
import urllib.parse
try:
from charset_normalizer import from_bytes as charset_from_bytes
except ImportError:
charset_from_bytes = None
try:
import chardet
except ImportError:
chardet = None
METRIC_MARKER = "__CYBERSTRIKE_HTTP_METRICS__"
METRIC_KEYS = [
"dns_lookup",
"tcp_connect",
"tls_handshake",
"pretransfer",
"ttfb",
"total",
"speed_download",
"size_download",
"http_code",
"redirects",
]
def parse_headers(raw: str):
if not raw:
return []
raw = raw.strip()
if not raw:
return []
try:
parsed = json.loads(raw)
headers = []
if isinstance(parsed, dict):
for k, v in parsed.items():
headers.append(f"{k}: {v}")
return headers
if isinstance(parsed, list):
for item in parsed:
if isinstance(item, str) and item.strip():
headers.append(item.strip())
if headers:
return headers
except json.JSONDecodeError:
pass
headers = []
for line in raw.replace(";", "\n").splitlines():
stripped = line.strip()
if stripped:
headers.append(stripped)
return headers
def parse_additional(raw: str):
if not raw:
return []
try:
return shlex.split(raw)
except ValueError:
return [arg for arg in raw.split() if arg]
def smart_encode_url(url: str, safe_path="/:@&=%+,$-~", safe_query="/:@&=%+,$-~"):
try:
parts = urllib.parse.urlsplit(url)
except ValueError:
return url
path = urllib.parse.quote(parts.path or "/", safe=safe_path)
query = urllib.parse.quote(parts.query, safe=safe_query)
fragment = urllib.parse.quote(parts.fragment, safe=safe_query)
return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, query, fragment))
def sanitize_cmd(cmd):
return " ".join(shlex.quote(part) for part in cmd)
def extract_metrics(output: bytes):
marker = (METRIC_MARKER + ":").encode("ascii")
if marker not in output:
return output, {}
head, tail = output.rsplit(marker, 1)
try:
tail_text = tail.decode("utf-8", errors="ignore")
except UnicodeDecodeError:
tail_text = ""
values = tail_text.strip().split("|")
stats = {}
for key, value in zip(METRIC_KEYS, values):
stats[key] = value.strip()
return head, stats
def extract_declared_charset(data: bytes):
if not data:
return ""
sample = data[:16384]
try:
sample_text = sample.decode("iso-8859-1", errors="ignore")
except UnicodeDecodeError:
return ""
for line in sample_text.splitlines():
lowered = line.lower()
if "content-type:" in lowered and "charset=" in lowered:
charset_index = lowered.index("charset=") + len("charset=")
remainder = line[charset_index:]
for separator in [";", " ", "\t"]:
remainder = remainder.split(separator)[0]
return remainder.strip().strip('"').strip("'")
meta_match = re.search(r'charset=["\']?([a-zA-Z0-9_\-.:]+)', sample_text, re.IGNORECASE)
if meta_match:
return meta_match.group(1)
return ""
def decode_body_bytes(data: bytes, user_encoding: str = ""):
declared = extract_declared_charset(data)
attempts = []
if user_encoding:
attempts.append(("user", user_encoding))
if declared:
attempts.append(("declared", declared))
for source, encoding in attempts:
enc = (encoding or "").strip()
if not enc:
continue
try:
return data.decode(enc), enc, source
except (LookupError, UnicodeDecodeError):
continue
if charset_from_bytes is not None and data:
best = charset_from_bytes(data).best()
if best and best.encoding:
try:
return data.decode(best.encoding), best.encoding, "detected"
except (LookupError, UnicodeDecodeError):
pass
if chardet is not None and data:
detection = chardet.detect(data)
encoding = detection.get("encoding")
if encoding:
try:
return data.decode(encoding), encoding, "detected"
except (LookupError, UnicodeDecodeError):
pass
try:
return data.decode("utf-8"), "utf-8", "fallback"
except UnicodeDecodeError:
return data.decode("utf-8", errors="replace"), "utf-8", "fallback"
def to_float(value):
try:
return float(value)
except (TypeError, ValueError):
return None
def encode_form_data(data: str):
"""
对application/x-www-form-urlencoded格式的数据进行URL编码
解析key=value格式对值部分进行URL编码保持键名不变
支持多个键值对(用&分隔)
正确处理值中包含=的情况使用split('=', 1)只分割第一个=
智能处理值中包含&的情况(通过检查&后是否跟着key=模式来判断)
注意:假设输入是未编码的原始数据,如果已经是编码的,可能会重复编码
"""
if not data:
return data
# 智能分割:找到真正的键值对分隔符&
# &是分隔符的条件:&后面跟着key=模式(即非空白字符后跟=
def find_key_value_pairs(text):
"""找到所有的key=value对正确处理值中的&和="""
pairs = []
i = 0
text_len = len(text)
while i < text_len:
# 跳过空白
while i < text_len and text[i] in ' \t\n\r':
i += 1
if i >= text_len:
break
# 查找键名(到第一个=为止)
key_start = i
while i < text_len and text[i] != '=':
i += 1
if i >= text_len:
# 没有=,可能是单个值
remaining = text[key_start:].strip()
if remaining:
pairs.append((None, remaining))
break
# 提取键名
key = text[key_start:i].strip()
i += 1 # 跳过=
if not key:
continue
# 查找值的结束位置
# 值可能包含&,需要判断&是否是分隔符
value_start = i
value_end = text_len
# 从值开始位置查找&
j = value_start
while j < text_len:
if text[j] == '&':
# 检查&后面是否跟着key=模式
k = j + 1
# 跳过空白
while k < text_len and text[k] in ' \t\n\r':
k += 1
# 查找下一个=或&
m = k
while m < text_len and text[m] not in '=&':
m += 1
# 如果找到=,说明&后面是新的键值对
if m < text_len and text[m] == '=':
value_end = j
i = j + 1 # 从&后开始下一轮
break
j += 1
# 提取值
value = text[value_start:value_end]
pairs.append((key, value))
if value_end < text_len:
i = value_end + 1
else:
break
return pairs
# 解析所有键值对
pairs = find_key_value_pairs(data)
parts = []
for key, value in pairs:
if key is None:
# 没有键,只有值
parts.append(urllib.parse.quote_plus(value, safe=''))
else:
# 对值进行URL编码
encoded_value = urllib.parse.quote_plus(value, safe='')
parts.append(f"{key}={encoded_value}")
return '&'.join(parts)
def should_encode_data(headers: list, data: str):
"""
判断是否需要对POST数据进行URL编码
如果Content-Type是application/x-www-form-urlencoded则需要编码
"""
if not data:
return False
content_type = None
for header in headers:
if ':' in header:
h_key, h_value = header.split(':', 1)
if h_key.strip().lower() == 'content-type':
content_type = h_value.strip().lower()
break
if content_type and 'application/x-www-form-urlencoded' in content_type:
return True
return False
parser = argparse.ArgumentParser(description="Enhanced HTTP testing helper")
parser.add_argument("--url", required=True)
parser.add_argument("--method", default="GET")
parser.add_argument("--data", default="")
parser.add_argument("--headers", default="")
parser.add_argument("--cookies", default="")
parser.add_argument("--user-agent", dest="user_agent", default="")
parser.add_argument("--proxy", default="")
parser.add_argument("--timeout", default="")
parser.add_argument("--repeat", type=int, default=1)
parser.add_argument("--delay", default="0")
parser.add_argument("--additional-args", dest="additional_args", default="")
parser.add_argument("--action", default="")
parser.add_argument("--include-headers", dest="include_headers", action="store_true")
parser.add_argument("--auto-encode-url", dest="auto_encode_url", action="store_true")
parser.add_argument("--follow-redirects", dest="follow_redirects", action="store_true")
parser.add_argument("--allow-insecure", dest="allow_insecure", action="store_true")
parser.add_argument("--verbose-output", dest="verbose_output", action="store_true")
parser.add_argument("--show-command", dest="show_command", action="store_true")
parser.add_argument("--show-summary", dest="show_summary", action="store_true")
parser.add_argument("--response-encoding", dest="response_encoding", default="")
parser.add_argument("--debug", dest="debug", action="store_true")
args = parser.parse_args()
repeat = max(1, args.repeat)
try:
delay_between = float(args.delay or "0")
if delay_between < 0:
delay_between = 0.0
except ValueError:
delay_between = 0.0
prepared_url = smart_encode_url(args.url) if args.auto_encode_url else args.url
curl_cmd = ["curl", "-sS"]
if args.include_headers:
curl_cmd.append("-i")
if args.verbose_output:
curl_cmd.append("-v")
method = (args.method or "GET").upper()
if method:
curl_cmd.extend(["-X", method])
if args.cookies:
curl_cmd.extend(["-b", args.cookies])
if args.user_agent:
curl_cmd.extend(["-A", args.user_agent])
if args.timeout:
curl_cmd.extend(["--max-time", str(args.timeout)])
if args.follow_redirects:
curl_cmd.append("-L")
if args.allow_insecure:
curl_cmd.append("-k")
if args.proxy:
curl_cmd.extend(["-x", args.proxy])
# 解析headers以便检查Content-Type
parsed_headers = parse_headers(args.headers)
for header in parsed_headers:
curl_cmd.extend(["-H", header])
# 处理POST数据如果是表单数据需要URL编码
prepared_data = args.data
if args.debug and args.data:
print("\n===== Debug: POST Data Processing =====")
print(f"Original data: {repr(args.data)}")
print(f"Data length: {len(args.data)} bytes")
# 显示原始数据中的键值对
if '=' in args.data:
parts = args.data.split('&')
print(f"Detected {len(parts)} key-value pair(s):")
for i, part in enumerate(parts, 1):
if '=' in part:
k, v = part.split('=', 1)
print(f" [{i}] {k} = {repr(v[:50])}{'...' if len(v) > 50 else ''} (length: {len(v)})")
else:
print(f" [{i}] (no key): {repr(part[:50])}{'...' if len(part) > 50 else ''}")
if should_encode_data(parsed_headers, args.data):
print("Content-Type detected: application/x-www-form-urlencoded")
print("Encoding will be applied...")
else:
print("No encoding needed (not form-urlencoded)")
if args.data and should_encode_data(parsed_headers, args.data):
prepared_data = encode_form_data(args.data)
if args.debug:
print(f"\nEncoded data: {repr(prepared_data)}")
print(f"Encoded length: {len(prepared_data)} bytes")
# 显示编码后的键值对
if '&' in prepared_data:
encoded_parts = prepared_data.split('&')
print(f"Encoded into {len(encoded_parts)} key-value pair(s):")
for i, part in enumerate(encoded_parts, 1):
if '=' in part:
k, v = part.split('=', 1)
print(f" [{i}] {k} = {repr(v[:80])}{'...' if len(v) > 80 else ''} (length: {len(v)})")
else:
print(f" [{i}] (no key): {repr(part[:80])}{'...' if len(part) > 80 else ''}")
if prepared_data:
curl_cmd.extend(["--data", prepared_data])
metrics_template = METRIC_MARKER + ":" + "|".join([
"%{time_namelookup}",
"%{time_connect}",
"%{time_appconnect}",
"%{time_pretransfer}",
"%{time_starttransfer}",
"%{time_total}",
"%{speed_download}",
"%{size_download}",
"%{http_code}",
"%{num_redirects}",
])
curl_cmd.extend(["-w", f"\n{metrics_template}\n"])
if args.additional_args:
curl_cmd.extend(parse_additional(args.additional_args))
curl_cmd.append(prepared_url)
aggregate = {key: [] for key in METRIC_KEYS} if args.show_summary else None
if aggregate is not None:
aggregate["wall_time"] = []
exit_code = 0
for run_index in range(repeat):
if run_index > 0 and delay_between > 0:
time.sleep(delay_between)
run_cmd = list(curl_cmd)
start = time.perf_counter()
proc = subprocess.run(
run_cmd,
capture_output=True,
)
elapsed = time.perf_counter() - start
body_bytes, stats = extract_metrics(proc.stdout)
decoded_body, used_encoding, encoding_source = decode_body_bytes(
body_bytes,
user_encoding=args.response_encoding,
)
print(f"\n===== Response #{run_index + 1} =====")
output_body = decoded_body.rstrip()
if output_body:
print(output_body)
else:
print("[no body]")
print(f"\n----- Meta #{run_index + 1} -----")
if args.show_command and run_index == 0:
print("Command:", sanitize_cmd(run_cmd))
if args.show_summary:
if stats:
for key in METRIC_KEYS:
label = key.replace("_", " ").title()
print(f"{label}: {stats.get(key, 'n/a')}")
value = to_float(stats.get(key))
if value is not None and aggregate is not None:
aggregate[key].append(value)
else:
print("Timing data unavailable (curl -w output missing).")
print(f"Wall Time (client): {elapsed:.6f}s")
if aggregate is not None:
aggregate["wall_time"].append(elapsed)
else:
print("Summary disabled (--show-summary=false).")
print(f"Wall Time (client): {elapsed:.6f}s")
print(f"Encoding Used: {used_encoding} ({encoding_source})")
stderr_text = proc.stderr.decode("utf-8", errors="replace").strip()
if stderr_text:
print("\nstderr:")
print(stderr_text)
if proc.returncode != 0:
exit_code = proc.returncode
if args.show_summary and repeat > 1 and aggregate is not None:
def summarize(values):
if not values:
return None
return (min(values), sum(values)/len(values), max(values))
print("\n===== Aggregate Timing =====")
for key, values in aggregate.items():
summary = summarize(values)
if not summary:
continue
label = key.replace("_", " ").title()
min_v, avg_v, max_v = summary
print(f"{label}: min {min_v:.6f}s | avg {avg_v:.6f}s | max {max_v:.6f}s")
if exit_code != 0:
sys.exit(exit_code)
sys.exit(0)
enabled: true
short_description: "增强的HTTP测试框架带延时、编码、可观察性"
description: |
增强的HTTP测试框架提供自动URL编码、详细响应/时延输出、重复请求和命令可见性,可用于常规请求、重放、盲注延时观测等场景。
**能力亮点:**
- 自动URL编码解决包含空格、引号等字符时curl报错的问题必要时可手动关闭
- 时延观测:采集 DNS / TCP / TLS / TTFB / 总耗时,可循环请求计算盲注延时
- 详细输出可选响应头、命令、stderr方便排查
- 自动编码识别:支持响应内容编码自动检测,可额外指定强制编码
- 扩展控制支持代理、超时、重复次数、延迟间隔及原生curl参数透传
parameters:
- name: "url"
type: "string"
description: "目标URL自动进行路径/查询编码,确保特殊字符安全发送)"
required: true
flag: "--url"
- name: "method"
type: "string"
description: "HTTP方法GET, POST, PUT, DELETE等"
required: false
default: "GET"
flag: "--method"
- name: "data"
type: "string"
description: "请求数据/参数JSON、表单、原始payload均可"
required: false
flag: "--data"
- name: "headers"
type: "string"
description: "自定义请求头JSON字典、行分隔或以分号分隔的 Header: Value 格式)"
required: false
flag: "--headers"
- name: "cookies"
type: "string"
description: "自定义Cookie格式name1=value1; name2=value2"
required: false
flag: "--cookies"
- name: "user_agent"
type: "string"
description: "自定义User-Agent"
required: false
flag: "--user-agent"
- name: "proxy"
type: "string"
description: "代理curl -x 形式,如 http://127.0.0.1:8080"
required: false
flag: "--proxy"
- name: "timeout"
type: "string"
description: "最大超时时间传递给curl --max-time"
required: false
flag: "--timeout"
- name: "repeat"
type: "int"
description: "重复请求次数,用于盲注延时观测(>=1"
required: false
default: 1
flag: "--repeat"
- name: "delay"
type: "string"
description: "重复请求之间的延迟(秒,可为小数)"
required: false
default: "0"
flag: "--delay"
- name: "include_headers"
type: "bool"
description: "输出响应头等价于curl -i默认开启"
required: false
default: true
flag: "--include-headers"
- name: "auto_encode_url"
type: "bool"
description: "自动URL编码默认开启避免出现URL格式错误"
required: false
default: true
flag: "--auto-encode-url"
- name: "follow_redirects"
type: "bool"
description: "跟随重定向curl -L"
required: false
default: false
flag: "--follow-redirects"
- name: "allow_insecure"
type: "bool"
description: "忽略TLS证书错误curl -k"
required: false
default: false
flag: "--allow-insecure"
- name: "verbose_output"
type: "bool"
description: "输出curl调试信息curl -v"
required: false
default: false
flag: "--verbose-output"
- name: "show_command"
type: "bool"
description: "打印最终curl命令含自动编码后的URL默认开启"
required: false
default: true
flag: "--show-command"
- name: "show_summary"
type: "bool"
description: "打印高亮摘要(默认开启)"
required: false
default: true
flag: "--show-summary"
- name: "debug"
type: "bool"
description: "调试模式打印POST数据的原始值和编码后的值方便排查编码问题"
required: false
default: false
flag: "--debug"
- name: "response_encoding"
type: "string"
description: "强制响应解码使用的编码默认自动检测可用于指定GBK等场景"
required: false
flag: "--response-encoding"
- name: "action"
type: "string"
description: "保留字段标识调用意图request, spider等当前脚本内部不使用"
required: false
default: "request"
- name: "additional_args"
type: "string"
description: |
额外的curl参数原样透传多个参数使用空格或带引号的shell风格传入。
**示例值:**
- "-H 'Origin: https://target'"
- "--interface tun0 --compressed"
required: false
flag: "--additional-args"