name: "http-framework-test" command: "python3" args: - "-c" - | import argparse import json import shlex import subprocess import sys import time import urllib.parse METRIC_MARKER = "__CYBERSTRIKE_HTTP_METRICS__" METRIC_KEYS = [ "dns_lookup", "tcp_connect", "tls_handshake", "pretransfer", "ttfb", "total", "speed_download", "size_download", "http_code", "redirects", ] def parse_headers(raw: str): if not raw: return [] raw = raw.strip() if not raw: return [] try: parsed = json.loads(raw) headers = [] if isinstance(parsed, dict): for k, v in parsed.items(): headers.append(f"{k}: {v}") return headers if isinstance(parsed, list): for item in parsed: if isinstance(item, str) and item.strip(): headers.append(item.strip()) if headers: return headers except json.JSONDecodeError: pass headers = [] for line in raw.replace(";", "\n").splitlines(): stripped = line.strip() if stripped: headers.append(stripped) return headers def parse_additional(raw: str): if not raw: return [] try: return shlex.split(raw) except ValueError: return [arg for arg in raw.split() if arg] def smart_encode_url(url: str, safe_path="/:@&=%+,$-~", safe_query="/:@&=%+,$-~"): try: parts = urllib.parse.urlsplit(url) except ValueError: return url path = urllib.parse.quote(parts.path or "/", safe=safe_path) query = urllib.parse.quote(parts.query, safe=safe_query) fragment = urllib.parse.quote(parts.fragment, safe=safe_query) return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, query, fragment)) def sanitize_cmd(cmd): return " ".join(shlex.quote(part) for part in cmd) def extract_metrics(output: str): if METRIC_MARKER not in output: return output, {} head, tail = output.rsplit(METRIC_MARKER + ":", 1) values = tail.strip().split("|") stats = {} for key, value in zip(METRIC_KEYS, values): stats[key] = value.strip() return head, stats def to_float(value): try: return float(value) except (TypeError, ValueError): return None parser = argparse.ArgumentParser(description="Enhanced HTTP testing helper") parser.add_argument("--url", required=True) parser.add_argument("--method", default="GET") parser.add_argument("--data", default="") parser.add_argument("--headers", default="") parser.add_argument("--cookies", default="") parser.add_argument("--user-agent", dest="user_agent", default="") parser.add_argument("--proxy", default="") parser.add_argument("--timeout", default="") parser.add_argument("--repeat", type=int, default=1) parser.add_argument("--delay", default="0") parser.add_argument("--additional-args", dest="additional_args", default="") parser.add_argument("--action", default="") parser.add_argument("--include-headers", dest="include_headers", action="store_true") parser.add_argument("--auto-encode-url", dest="auto_encode_url", action="store_true") parser.add_argument("--follow-redirects", dest="follow_redirects", action="store_true") parser.add_argument("--allow-insecure", dest="allow_insecure", action="store_true") parser.add_argument("--verbose-output", dest="verbose_output", action="store_true") parser.add_argument("--show-command", dest="show_command", action="store_true") parser.add_argument("--show-summary", dest="show_summary", action="store_true") args = parser.parse_args() repeat = max(1, args.repeat) try: delay_between = float(args.delay or "0") if delay_between < 0: delay_between = 0.0 except ValueError: delay_between = 0.0 prepared_url = smart_encode_url(args.url) if args.auto_encode_url else args.url curl_cmd = ["curl", "-sS"] if args.include_headers: curl_cmd.append("-i") if args.verbose_output: curl_cmd.append("-v") method = (args.method or "GET").upper() if method: curl_cmd.extend(["-X", method]) if args.cookies: curl_cmd.extend(["-b", args.cookies]) if args.user_agent: curl_cmd.extend(["-A", args.user_agent]) if args.timeout: curl_cmd.extend(["--max-time", str(args.timeout)]) if args.follow_redirects: curl_cmd.append("-L") if args.allow_insecure: curl_cmd.append("-k") if args.proxy: curl_cmd.extend(["-x", args.proxy]) for header in parse_headers(args.headers): curl_cmd.extend(["-H", header]) if args.data: curl_cmd.extend(["--data", args.data]) metrics_template = METRIC_MARKER + ":" + "|".join([ "%{time_namelookup}", "%{time_connect}", "%{time_appconnect}", "%{time_pretransfer}", "%{time_starttransfer}", "%{time_total}", "%{speed_download}", "%{size_download}", "%{http_code}", "%{num_redirects}", ]) curl_cmd.extend(["-w", f"\n{metrics_template}\n"]) if args.additional_args: curl_cmd.extend(parse_additional(args.additional_args)) curl_cmd.append(prepared_url) aggregate = {key: [] for key in METRIC_KEYS} if args.show_summary else None if aggregate is not None: aggregate["wall_time"] = [] exit_code = 0 for run_index in range(repeat): if run_index > 0 and delay_between > 0: time.sleep(delay_between) run_cmd = list(curl_cmd) start = time.perf_counter() proc = subprocess.run( run_cmd, capture_output=True, text=True, encoding="utf-8", errors="replace", ) elapsed = time.perf_counter() - start body, stats = extract_metrics(proc.stdout) print(f"\n===== Response #{run_index + 1} =====") output_body = body.rstrip() if output_body: print(output_body) else: print("[no body]") print(f"\n----- Meta #{run_index + 1} -----") if args.show_command and run_index == 0: print("Command:", sanitize_cmd(run_cmd)) if args.show_summary: if stats: for key in METRIC_KEYS: label = key.replace("_", " ").title() print(f"{label}: {stats.get(key, 'n/a')}") value = to_float(stats.get(key)) if value is not None and aggregate is not None: aggregate[key].append(value) else: print("Timing data unavailable (curl -w output missing).") print(f"Wall Time (client): {elapsed:.6f}s") if aggregate is not None: aggregate["wall_time"].append(elapsed) else: print("Summary disabled (--show-summary=false).") print(f"Wall Time (client): {elapsed:.6f}s") if proc.stderr.strip(): print("\nstderr:") print(proc.stderr.strip()) if proc.returncode != 0: exit_code = proc.returncode if args.show_summary and repeat > 1 and aggregate is not None: def summarize(values): if not values: return None return (min(values), sum(values)/len(values), max(values)) print("\n===== Aggregate Timing =====") for key, values in aggregate.items(): summary = summarize(values) if not summary: continue label = key.replace("_", " ").title() min_v, avg_v, max_v = summary print(f"{label}: min {min_v:.6f}s | avg {avg_v:.6f}s | max {max_v:.6f}s") if exit_code != 0: sys.exit(exit_code) sys.exit(0) enabled: true short_description: "增强的HTTP测试框架(带延时、编码、可观察性)" description: | 增强的HTTP测试框架:提供自动URL编码、详细响应/时延输出、重复请求和命令可见性,可用于常规请求、重放、盲注延时观测等场景。 **能力亮点:** - 自动URL编码:解决包含空格、引号等字符时curl报错的问题,必要时可手动关闭 - 时延观测:采集 DNS / TCP / TLS / TTFB / 总耗时,可循环请求计算盲注延时 - 详细输出:可选响应头、命令、stderr,方便排查 - 扩展控制:支持代理、超时、重复次数、延迟间隔及原生curl参数透传 parameters: - name: "url" type: "string" description: "目标URL(自动进行路径/查询编码,确保特殊字符安全发送)" required: true flag: "--url" - name: "method" type: "string" description: "HTTP方法(GET, POST, PUT, DELETE等)" required: false default: "GET" flag: "--method" - name: "data" type: "string" description: "请求数据/参数(JSON、表单、原始payload均可)" required: false flag: "--data" - name: "headers" type: "string" description: "自定义请求头(JSON字典、行分隔或以分号分隔的 Header: Value 格式)" required: false flag: "--headers" - name: "cookies" type: "string" description: "自定义Cookie(格式:name1=value1; name2=value2)" required: false flag: "--cookies" - name: "user_agent" type: "string" description: "自定义User-Agent" required: false flag: "--user-agent" - name: "proxy" type: "string" description: "代理(curl -x 形式,如 http://127.0.0.1:8080)" required: false flag: "--proxy" - name: "timeout" type: "string" description: "最大超时时间(秒,传递给curl --max-time)" required: false flag: "--timeout" - name: "repeat" type: "int" description: "重复请求次数,用于盲注延时观测(>=1)" required: false default: 1 flag: "--repeat" - name: "delay" type: "string" description: "重复请求之间的延迟(秒,可为小数)" required: false default: "0" flag: "--delay" - name: "include_headers" type: "bool" description: "输出响应头(等价于curl -i),默认开启" required: false default: true flag: "--include-headers" - name: "auto_encode_url" type: "bool" description: "自动URL编码(默认开启,避免出现URL格式错误)" required: false default: true flag: "--auto-encode-url" - name: "follow_redirects" type: "bool" description: "跟随重定向(curl -L)" required: false default: false flag: "--follow-redirects" - name: "allow_insecure" type: "bool" description: "忽略TLS证书错误(curl -k)" required: false default: false flag: "--allow-insecure" - name: "verbose_output" type: "bool" description: "输出curl调试信息(curl -v)" required: false default: false flag: "--verbose-output" - name: "show_command" type: "bool" description: "打印最终curl命令(含自动编码后的URL),默认开启" required: false default: true flag: "--show-command" - name: "show_summary" type: "bool" description: "打印高亮摘要(默认开启)" required: false default: true flag: "--show-summary" - name: "action" type: "string" description: "保留字段:标识调用意图(request, spider等),当前脚本内部不使用" required: false default: "request" - name: "additional_args" type: "string" description: | 额外的curl参数(原样透传),多个参数使用空格或带引号的shell风格传入。 **示例值:** - "-H 'Origin: https://target'" - "--interface tun0 --compressed" required: false flag: "--additional-args"