mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-03-31 08:19:54 +02:00
359 lines
12 KiB
YAML
359 lines
12 KiB
YAML
name: "http-framework-test"
|
||
command: "python3"
|
||
args:
|
||
- "-c"
|
||
- |
|
||
import argparse
|
||
import json
|
||
import shlex
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
import urllib.parse
|
||
|
||
METRIC_MARKER = "__CYBERSTRIKE_HTTP_METRICS__"
|
||
METRIC_KEYS = [
|
||
"dns_lookup",
|
||
"tcp_connect",
|
||
"tls_handshake",
|
||
"pretransfer",
|
||
"ttfb",
|
||
"total",
|
||
"speed_download",
|
||
"size_download",
|
||
"http_code",
|
||
"redirects",
|
||
]
|
||
|
||
def parse_headers(raw: str):
|
||
if not raw:
|
||
return []
|
||
raw = raw.strip()
|
||
if not raw:
|
||
return []
|
||
try:
|
||
parsed = json.loads(raw)
|
||
headers = []
|
||
if isinstance(parsed, dict):
|
||
for k, v in parsed.items():
|
||
headers.append(f"{k}: {v}")
|
||
return headers
|
||
if isinstance(parsed, list):
|
||
for item in parsed:
|
||
if isinstance(item, str) and item.strip():
|
||
headers.append(item.strip())
|
||
if headers:
|
||
return headers
|
||
except json.JSONDecodeError:
|
||
pass
|
||
headers = []
|
||
for line in raw.replace(";", "\n").splitlines():
|
||
stripped = line.strip()
|
||
if stripped:
|
||
headers.append(stripped)
|
||
return headers
|
||
|
||
def parse_additional(raw: str):
|
||
if not raw:
|
||
return []
|
||
try:
|
||
return shlex.split(raw)
|
||
except ValueError:
|
||
return [arg for arg in raw.split() if arg]
|
||
|
||
def smart_encode_url(url: str, safe_path="/:@&=%+,$-~", safe_query="/:@&=%+,$-~"):
|
||
try:
|
||
parts = urllib.parse.urlsplit(url)
|
||
except ValueError:
|
||
return url
|
||
path = urllib.parse.quote(parts.path or "/", safe=safe_path)
|
||
query = urllib.parse.quote(parts.query, safe=safe_query)
|
||
fragment = urllib.parse.quote(parts.fragment, safe=safe_query)
|
||
return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, query, fragment))
|
||
|
||
def sanitize_cmd(cmd):
|
||
return " ".join(shlex.quote(part) for part in cmd)
|
||
|
||
def extract_metrics(output: str):
|
||
if METRIC_MARKER not in output:
|
||
return output, {}
|
||
head, tail = output.rsplit(METRIC_MARKER + ":", 1)
|
||
values = tail.strip().split("|")
|
||
stats = {}
|
||
for key, value in zip(METRIC_KEYS, values):
|
||
stats[key] = value.strip()
|
||
return head, stats
|
||
|
||
def to_float(value):
|
||
try:
|
||
return float(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
parser = argparse.ArgumentParser(description="Enhanced HTTP testing helper")
|
||
parser.add_argument("--url", required=True)
|
||
parser.add_argument("--method", default="GET")
|
||
parser.add_argument("--data", default="")
|
||
parser.add_argument("--headers", default="")
|
||
parser.add_argument("--cookies", default="")
|
||
parser.add_argument("--user-agent", dest="user_agent", default="")
|
||
parser.add_argument("--proxy", default="")
|
||
parser.add_argument("--timeout", default="")
|
||
parser.add_argument("--repeat", type=int, default=1)
|
||
parser.add_argument("--delay", default="0")
|
||
parser.add_argument("--additional-args", dest="additional_args", default="")
|
||
parser.add_argument("--action", default="")
|
||
parser.add_argument("--include-headers", dest="include_headers", action="store_true")
|
||
parser.add_argument("--auto-encode-url", dest="auto_encode_url", action="store_true")
|
||
parser.add_argument("--follow-redirects", dest="follow_redirects", action="store_true")
|
||
parser.add_argument("--allow-insecure", dest="allow_insecure", action="store_true")
|
||
parser.add_argument("--verbose-output", dest="verbose_output", action="store_true")
|
||
parser.add_argument("--show-command", dest="show_command", action="store_true")
|
||
parser.add_argument("--show-summary", dest="show_summary", action="store_true")
|
||
args = parser.parse_args()
|
||
|
||
repeat = max(1, args.repeat)
|
||
try:
|
||
delay_between = float(args.delay or "0")
|
||
if delay_between < 0:
|
||
delay_between = 0.0
|
||
except ValueError:
|
||
delay_between = 0.0
|
||
|
||
prepared_url = smart_encode_url(args.url) if args.auto_encode_url else args.url
|
||
curl_cmd = ["curl", "-sS"]
|
||
if args.include_headers:
|
||
curl_cmd.append("-i")
|
||
if args.verbose_output:
|
||
curl_cmd.append("-v")
|
||
method = (args.method or "GET").upper()
|
||
if method:
|
||
curl_cmd.extend(["-X", method])
|
||
if args.cookies:
|
||
curl_cmd.extend(["-b", args.cookies])
|
||
if args.user_agent:
|
||
curl_cmd.extend(["-A", args.user_agent])
|
||
if args.timeout:
|
||
curl_cmd.extend(["--max-time", str(args.timeout)])
|
||
if args.follow_redirects:
|
||
curl_cmd.append("-L")
|
||
if args.allow_insecure:
|
||
curl_cmd.append("-k")
|
||
if args.proxy:
|
||
curl_cmd.extend(["-x", args.proxy])
|
||
for header in parse_headers(args.headers):
|
||
curl_cmd.extend(["-H", header])
|
||
if args.data:
|
||
curl_cmd.extend(["--data", args.data])
|
||
metrics_template = METRIC_MARKER + ":" + "|".join([
|
||
"%{time_namelookup}",
|
||
"%{time_connect}",
|
||
"%{time_appconnect}",
|
||
"%{time_pretransfer}",
|
||
"%{time_starttransfer}",
|
||
"%{time_total}",
|
||
"%{speed_download}",
|
||
"%{size_download}",
|
||
"%{http_code}",
|
||
"%{num_redirects}",
|
||
])
|
||
curl_cmd.extend(["-w", f"\n{metrics_template}\n"])
|
||
if args.additional_args:
|
||
curl_cmd.extend(parse_additional(args.additional_args))
|
||
curl_cmd.append(prepared_url)
|
||
|
||
aggregate = {key: [] for key in METRIC_KEYS} if args.show_summary else None
|
||
if aggregate is not None:
|
||
aggregate["wall_time"] = []
|
||
exit_code = 0
|
||
|
||
for run_index in range(repeat):
|
||
if run_index > 0 and delay_between > 0:
|
||
time.sleep(delay_between)
|
||
|
||
run_cmd = list(curl_cmd)
|
||
start = time.perf_counter()
|
||
proc = subprocess.run(
|
||
run_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
)
|
||
elapsed = time.perf_counter() - start
|
||
body, stats = extract_metrics(proc.stdout)
|
||
|
||
print(f"\n===== Response #{run_index + 1} =====")
|
||
output_body = body.rstrip()
|
||
if output_body:
|
||
print(output_body)
|
||
else:
|
||
print("[no body]")
|
||
|
||
print(f"\n----- Meta #{run_index + 1} -----")
|
||
if args.show_command and run_index == 0:
|
||
print("Command:", sanitize_cmd(run_cmd))
|
||
if args.show_summary:
|
||
if stats:
|
||
for key in METRIC_KEYS:
|
||
label = key.replace("_", " ").title()
|
||
print(f"{label}: {stats.get(key, 'n/a')}")
|
||
value = to_float(stats.get(key))
|
||
if value is not None and aggregate is not None:
|
||
aggregate[key].append(value)
|
||
else:
|
||
print("Timing data unavailable (curl -w output missing).")
|
||
print(f"Wall Time (client): {elapsed:.6f}s")
|
||
if aggregate is not None:
|
||
aggregate["wall_time"].append(elapsed)
|
||
else:
|
||
print("Summary disabled (--show-summary=false).")
|
||
print(f"Wall Time (client): {elapsed:.6f}s")
|
||
|
||
if proc.stderr.strip():
|
||
print("\nstderr:")
|
||
print(proc.stderr.strip())
|
||
|
||
if proc.returncode != 0:
|
||
exit_code = proc.returncode
|
||
|
||
if args.show_summary and repeat > 1 and aggregate is not None:
|
||
def summarize(values):
|
||
if not values:
|
||
return None
|
||
return (min(values), sum(values)/len(values), max(values))
|
||
|
||
print("\n===== Aggregate Timing =====")
|
||
for key, values in aggregate.items():
|
||
summary = summarize(values)
|
||
if not summary:
|
||
continue
|
||
label = key.replace("_", " ").title()
|
||
min_v, avg_v, max_v = summary
|
||
print(f"{label}: min {min_v:.6f}s | avg {avg_v:.6f}s | max {max_v:.6f}s")
|
||
|
||
if exit_code != 0:
|
||
sys.exit(exit_code)
|
||
sys.exit(0)
|
||
enabled: true
|
||
short_description: "增强的HTTP测试框架(带延时、编码、可观察性)"
|
||
description: |
|
||
增强的HTTP测试框架:提供自动URL编码、详细响应/时延输出、重复请求和命令可见性,可用于常规请求、重放、盲注延时观测等场景。
|
||
|
||
**能力亮点:**
|
||
- 自动URL编码:解决包含空格、引号等字符时curl报错的问题,必要时可手动关闭
|
||
- 时延观测:采集 DNS / TCP / TLS / TTFB / 总耗时,可循环请求计算盲注延时
|
||
- 详细输出:可选响应头、命令、stderr,方便排查
|
||
- 扩展控制:支持代理、超时、重复次数、延迟间隔及原生curl参数透传
|
||
parameters:
|
||
- name: "url"
|
||
type: "string"
|
||
description: "目标URL(自动进行路径/查询编码,确保特殊字符安全发送)"
|
||
required: true
|
||
flag: "--url"
|
||
- name: "method"
|
||
type: "string"
|
||
description: "HTTP方法(GET, POST, PUT, DELETE等)"
|
||
required: false
|
||
default: "GET"
|
||
flag: "--method"
|
||
- name: "data"
|
||
type: "string"
|
||
description: "请求数据/参数(JSON、表单、原始payload均可)"
|
||
required: false
|
||
flag: "--data"
|
||
- name: "headers"
|
||
type: "string"
|
||
description: "自定义请求头(JSON字典、行分隔或以分号分隔的 Header: Value 格式)"
|
||
required: false
|
||
flag: "--headers"
|
||
- name: "cookies"
|
||
type: "string"
|
||
description: "自定义Cookie(格式:name1=value1; name2=value2)"
|
||
required: false
|
||
flag: "--cookies"
|
||
- name: "user_agent"
|
||
type: "string"
|
||
description: "自定义User-Agent"
|
||
required: false
|
||
flag: "--user-agent"
|
||
- name: "proxy"
|
||
type: "string"
|
||
description: "代理(curl -x 形式,如 http://127.0.0.1:8080)"
|
||
required: false
|
||
flag: "--proxy"
|
||
- name: "timeout"
|
||
type: "string"
|
||
description: "最大超时时间(秒,传递给curl --max-time)"
|
||
required: false
|
||
flag: "--timeout"
|
||
- name: "repeat"
|
||
type: "int"
|
||
description: "重复请求次数,用于盲注延时观测(>=1)"
|
||
required: false
|
||
default: 1
|
||
flag: "--repeat"
|
||
- name: "delay"
|
||
type: "string"
|
||
description: "重复请求之间的延迟(秒,可为小数)"
|
||
required: false
|
||
default: "0"
|
||
flag: "--delay"
|
||
- name: "include_headers"
|
||
type: "bool"
|
||
description: "输出响应头(等价于curl -i),默认开启"
|
||
required: false
|
||
default: true
|
||
flag: "--include-headers"
|
||
- name: "auto_encode_url"
|
||
type: "bool"
|
||
description: "自动URL编码(默认开启,避免出现URL格式错误)"
|
||
required: false
|
||
default: true
|
||
flag: "--auto-encode-url"
|
||
- name: "follow_redirects"
|
||
type: "bool"
|
||
description: "跟随重定向(curl -L)"
|
||
required: false
|
||
default: false
|
||
flag: "--follow-redirects"
|
||
- name: "allow_insecure"
|
||
type: "bool"
|
||
description: "忽略TLS证书错误(curl -k)"
|
||
required: false
|
||
default: false
|
||
flag: "--allow-insecure"
|
||
- name: "verbose_output"
|
||
type: "bool"
|
||
description: "输出curl调试信息(curl -v)"
|
||
required: false
|
||
default: false
|
||
flag: "--verbose-output"
|
||
- name: "show_command"
|
||
type: "bool"
|
||
description: "打印最终curl命令(含自动编码后的URL),默认开启"
|
||
required: false
|
||
default: true
|
||
flag: "--show-command"
|
||
- name: "show_summary"
|
||
type: "bool"
|
||
description: "打印高亮摘要(默认开启)"
|
||
required: false
|
||
default: true
|
||
flag: "--show-summary"
|
||
- name: "action"
|
||
type: "string"
|
||
description: "保留字段:标识调用意图(request, spider等),当前脚本内部不使用"
|
||
required: false
|
||
default: "request"
|
||
- name: "additional_args"
|
||
type: "string"
|
||
description: |
|
||
额外的curl参数(原样透传),多个参数使用空格或带引号的shell风格传入。
|
||
|
||
**示例值:**
|
||
- "-H 'Origin: https://target'"
|
||
- "--interface tun0 --compressed"
|
||
required: false
|
||
flag: "--additional-args"
|