Files
CyberStrikeAI/tools/http-framework-test.yaml
2025-11-17 20:17:37 +08:00

359 lines
12 KiB
YAML
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
name: "http-framework-test"
command: "python3"
args:
- "-c"
- |
import argparse
import json
import shlex
import subprocess
import sys
import time
import urllib.parse
METRIC_MARKER = "__CYBERSTRIKE_HTTP_METRICS__"
METRIC_KEYS = [
"dns_lookup",
"tcp_connect",
"tls_handshake",
"pretransfer",
"ttfb",
"total",
"speed_download",
"size_download",
"http_code",
"redirects",
]
def parse_headers(raw: str):
if not raw:
return []
raw = raw.strip()
if not raw:
return []
try:
parsed = json.loads(raw)
headers = []
if isinstance(parsed, dict):
for k, v in parsed.items():
headers.append(f"{k}: {v}")
return headers
if isinstance(parsed, list):
for item in parsed:
if isinstance(item, str) and item.strip():
headers.append(item.strip())
if headers:
return headers
except json.JSONDecodeError:
pass
headers = []
for line in raw.replace(";", "\n").splitlines():
stripped = line.strip()
if stripped:
headers.append(stripped)
return headers
def parse_additional(raw: str):
if not raw:
return []
try:
return shlex.split(raw)
except ValueError:
return [arg for arg in raw.split() if arg]
def smart_encode_url(url: str, safe_path="/:@&=%+,$-~", safe_query="/:@&=%+,$-~"):
try:
parts = urllib.parse.urlsplit(url)
except ValueError:
return url
path = urllib.parse.quote(parts.path or "/", safe=safe_path)
query = urllib.parse.quote(parts.query, safe=safe_query)
fragment = urllib.parse.quote(parts.fragment, safe=safe_query)
return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, query, fragment))
def sanitize_cmd(cmd):
return " ".join(shlex.quote(part) for part in cmd)
def extract_metrics(output: str):
if METRIC_MARKER not in output:
return output, {}
head, tail = output.rsplit(METRIC_MARKER + ":", 1)
values = tail.strip().split("|")
stats = {}
for key, value in zip(METRIC_KEYS, values):
stats[key] = value.strip()
return head, stats
def to_float(value):
try:
return float(value)
except (TypeError, ValueError):
return None
parser = argparse.ArgumentParser(description="Enhanced HTTP testing helper")
parser.add_argument("--url", required=True)
parser.add_argument("--method", default="GET")
parser.add_argument("--data", default="")
parser.add_argument("--headers", default="")
parser.add_argument("--cookies", default="")
parser.add_argument("--user-agent", dest="user_agent", default="")
parser.add_argument("--proxy", default="")
parser.add_argument("--timeout", default="")
parser.add_argument("--repeat", type=int, default=1)
parser.add_argument("--delay", default="0")
parser.add_argument("--additional-args", dest="additional_args", default="")
parser.add_argument("--action", default="")
parser.add_argument("--include-headers", dest="include_headers", action="store_true")
parser.add_argument("--auto-encode-url", dest="auto_encode_url", action="store_true")
parser.add_argument("--follow-redirects", dest="follow_redirects", action="store_true")
parser.add_argument("--allow-insecure", dest="allow_insecure", action="store_true")
parser.add_argument("--verbose-output", dest="verbose_output", action="store_true")
parser.add_argument("--show-command", dest="show_command", action="store_true")
parser.add_argument("--show-summary", dest="show_summary", action="store_true")
args = parser.parse_args()
repeat = max(1, args.repeat)
try:
delay_between = float(args.delay or "0")
if delay_between < 0:
delay_between = 0.0
except ValueError:
delay_between = 0.0
prepared_url = smart_encode_url(args.url) if args.auto_encode_url else args.url
curl_cmd = ["curl", "-sS"]
if args.include_headers:
curl_cmd.append("-i")
if args.verbose_output:
curl_cmd.append("-v")
method = (args.method or "GET").upper()
if method:
curl_cmd.extend(["-X", method])
if args.cookies:
curl_cmd.extend(["-b", args.cookies])
if args.user_agent:
curl_cmd.extend(["-A", args.user_agent])
if args.timeout:
curl_cmd.extend(["--max-time", str(args.timeout)])
if args.follow_redirects:
curl_cmd.append("-L")
if args.allow_insecure:
curl_cmd.append("-k")
if args.proxy:
curl_cmd.extend(["-x", args.proxy])
for header in parse_headers(args.headers):
curl_cmd.extend(["-H", header])
if args.data:
curl_cmd.extend(["--data", args.data])
metrics_template = METRIC_MARKER + ":" + "|".join([
"%{time_namelookup}",
"%{time_connect}",
"%{time_appconnect}",
"%{time_pretransfer}",
"%{time_starttransfer}",
"%{time_total}",
"%{speed_download}",
"%{size_download}",
"%{http_code}",
"%{num_redirects}",
])
curl_cmd.extend(["-w", f"\n{metrics_template}\n"])
if args.additional_args:
curl_cmd.extend(parse_additional(args.additional_args))
curl_cmd.append(prepared_url)
aggregate = {key: [] for key in METRIC_KEYS} if args.show_summary else None
if aggregate is not None:
aggregate["wall_time"] = []
exit_code = 0
for run_index in range(repeat):
if run_index > 0 and delay_between > 0:
time.sleep(delay_between)
run_cmd = list(curl_cmd)
start = time.perf_counter()
proc = subprocess.run(
run_cmd,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
elapsed = time.perf_counter() - start
body, stats = extract_metrics(proc.stdout)
print(f"\n===== Response #{run_index + 1} =====")
output_body = body.rstrip()
if output_body:
print(output_body)
else:
print("[no body]")
print(f"\n----- Meta #{run_index + 1} -----")
if args.show_command and run_index == 0:
print("Command:", sanitize_cmd(run_cmd))
if args.show_summary:
if stats:
for key in METRIC_KEYS:
label = key.replace("_", " ").title()
print(f"{label}: {stats.get(key, 'n/a')}")
value = to_float(stats.get(key))
if value is not None and aggregate is not None:
aggregate[key].append(value)
else:
print("Timing data unavailable (curl -w output missing).")
print(f"Wall Time (client): {elapsed:.6f}s")
if aggregate is not None:
aggregate["wall_time"].append(elapsed)
else:
print("Summary disabled (--show-summary=false).")
print(f"Wall Time (client): {elapsed:.6f}s")
if proc.stderr.strip():
print("\nstderr:")
print(proc.stderr.strip())
if proc.returncode != 0:
exit_code = proc.returncode
if args.show_summary and repeat > 1 and aggregate is not None:
def summarize(values):
if not values:
return None
return (min(values), sum(values)/len(values), max(values))
print("\n===== Aggregate Timing =====")
for key, values in aggregate.items():
summary = summarize(values)
if not summary:
continue
label = key.replace("_", " ").title()
min_v, avg_v, max_v = summary
print(f"{label}: min {min_v:.6f}s | avg {avg_v:.6f}s | max {max_v:.6f}s")
if exit_code != 0:
sys.exit(exit_code)
sys.exit(0)
enabled: true
short_description: "增强的HTTP测试框架带延时、编码、可观察性"
description: |
增强的HTTP测试框架提供自动URL编码、详细响应/时延输出、重复请求和命令可见性,可用于常规请求、重放、盲注延时观测等场景。
**能力亮点:**
- 自动URL编码解决包含空格、引号等字符时curl报错的问题必要时可手动关闭
- 时延观测:采集 DNS / TCP / TLS / TTFB / 总耗时,可循环请求计算盲注延时
- 详细输出可选响应头、命令、stderr方便排查
- 扩展控制支持代理、超时、重复次数、延迟间隔及原生curl参数透传
parameters:
- name: "url"
type: "string"
description: "目标URL自动进行路径/查询编码,确保特殊字符安全发送)"
required: true
flag: "--url"
- name: "method"
type: "string"
description: "HTTP方法GET, POST, PUT, DELETE等"
required: false
default: "GET"
flag: "--method"
- name: "data"
type: "string"
description: "请求数据/参数JSON、表单、原始payload均可"
required: false
flag: "--data"
- name: "headers"
type: "string"
description: "自定义请求头JSON字典、行分隔或以分号分隔的 Header: Value 格式)"
required: false
flag: "--headers"
- name: "cookies"
type: "string"
description: "自定义Cookie格式name1=value1; name2=value2"
required: false
flag: "--cookies"
- name: "user_agent"
type: "string"
description: "自定义User-Agent"
required: false
flag: "--user-agent"
- name: "proxy"
type: "string"
description: "代理curl -x 形式,如 http://127.0.0.1:8080"
required: false
flag: "--proxy"
- name: "timeout"
type: "string"
description: "最大超时时间传递给curl --max-time"
required: false
flag: "--timeout"
- name: "repeat"
type: "int"
description: "重复请求次数,用于盲注延时观测(>=1"
required: false
default: 1
flag: "--repeat"
- name: "delay"
type: "string"
description: "重复请求之间的延迟(秒,可为小数)"
required: false
default: "0"
flag: "--delay"
- name: "include_headers"
type: "bool"
description: "输出响应头等价于curl -i默认开启"
required: false
default: true
flag: "--include-headers"
- name: "auto_encode_url"
type: "bool"
description: "自动URL编码默认开启避免出现URL格式错误"
required: false
default: true
flag: "--auto-encode-url"
- name: "follow_redirects"
type: "bool"
description: "跟随重定向curl -L"
required: false
default: false
flag: "--follow-redirects"
- name: "allow_insecure"
type: "bool"
description: "忽略TLS证书错误curl -k"
required: false
default: false
flag: "--allow-insecure"
- name: "verbose_output"
type: "bool"
description: "输出curl调试信息curl -v"
required: false
default: false
flag: "--verbose-output"
- name: "show_command"
type: "bool"
description: "打印最终curl命令含自动编码后的URL默认开启"
required: false
default: true
flag: "--show-command"
- name: "show_summary"
type: "bool"
description: "打印高亮摘要(默认开启)"
required: false
default: true
flag: "--show-summary"
- name: "action"
type: "string"
description: "保留字段标识调用意图request, spider等当前脚本内部不使用"
required: false
default: "request"
- name: "additional_args"
type: "string"
description: |
额外的curl参数原样透传多个参数使用空格或带引号的shell风格传入。
**示例值:**
- "-H 'Origin: https://target'"
- "--interface tun0 --compressed"
required: false
flag: "--additional-args"