name: "http-framework-test" command: "python3" args: - "-c" - | import argparse import json import os import re import shlex import socket import ssl import sys import time import urllib.parse from typing import Dict, List, Tuple try: import httpx except ImportError: print("Missing dependency: httpx. Install it with `pip install httpx`.", file=sys.stderr) sys.exit(1) try: from charset_normalizer import from_bytes as charset_from_bytes except ImportError: charset_from_bytes = None try: import chardet except ImportError: chardet = None METRIC_KEYS = [ "dns_lookup", "tcp_connect", "tls_handshake", "pretransfer", "ttfb", "total", "speed_download", "size_download", "http_code", "redirects", ] def parse_headers(raw: str) -> List[Tuple[str, str]]: if not raw: return [] raw = raw.strip() if not raw: return [] headers: List[Tuple[str, str]] = [] parsed = None try: parsed = json.loads(raw) except json.JSONDecodeError: parsed = None if isinstance(parsed, dict): for key, value in parsed.items(): headers.append((str(key).strip(), str(value).strip())) return headers if isinstance(parsed, list): for item in parsed: if isinstance(item, str) and ":" in item: key, value = item.split(":", 1) headers.append((key.strip(), value.strip())) if headers: return headers for line in raw.replace(";", "\n").splitlines(): if ":" not in line: continue key, value = line.split(":", 1) headers.append((key.strip(), value.strip())) return headers def parse_cookies(raw: str) -> Dict[str, str]: cookies: Dict[str, str] = {} if not raw: return cookies for part in raw.split(";"): if "=" not in part: continue name, value = part.split("=", 1) name = name.strip() if not name: continue cookies[name] = value.strip() return cookies def parse_additional_options(raw: str) -> Dict[str, str]: options: Dict[str, str] = {} if not raw: return options try: tokens = shlex.split(raw) except ValueError: tokens = raw.split() for token in tokens: if "=" in token: key, value = token.split("=", 1) options[key.strip()] = value.strip() else: options[token.strip()] = "true" return options def str_to_bool(value) -> bool: if isinstance(value, bool): return value if value is None: return False return str(value).strip().lower() in {"1", "true", "yes", "on"} def smart_encode_url(url: str, safe_path="/:@&=%+,$-~", safe_query="/:@&=%+,$-~"): try: parts = urllib.parse.urlsplit(url) except ValueError: return url path = urllib.parse.quote(parts.path or "/", safe=safe_path) query = urllib.parse.quote(parts.query, safe=safe_query) fragment = urllib.parse.quote(parts.fragment, safe=safe_query) return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, query, fragment)) def encode_form_data(data: str) -> str: if not data: return data def find_key_value_pairs(text): pairs = [] i = 0 text_len = len(text) while i < text_len: while i < text_len and text[i] in " \t\n\r": i += 1 if i >= text_len: break key_start = i while i < text_len and text[i] != "=": i += 1 if i >= text_len: remaining = text[key_start:].strip() if remaining: pairs.append((None, remaining)) break key = text[key_start:i].strip() i += 1 if not key: continue value_start = i value_end = text_len j = value_start while j < text_len: if text[j] == "&": k = j + 1 while k < text_len and text[k] in " \t\n\r": k += 1 m = k while m < text_len and text[m] not in "=&": m += 1 if m < text_len and text[m] == "=": value_end = j i = j + 1 break j += 1 value = text[value_start:value_end] pairs.append((key, value)) if value_end < text_len: i = value_end + 1 else: break return pairs pairs = find_key_value_pairs(data) parts = [] for key, value in pairs: if key is None: parts.append(urllib.parse.quote_plus(value, safe="")) else: encoded_value = urllib.parse.quote_plus(value, safe="") parts.append(f"{key}={encoded_value}") return "&".join(parts) def should_encode_form(headers: httpx.Headers, data: str) -> bool: if not data: return False if not headers: return False content_type = headers.get("Content-Type") if not content_type: return False return "application/x-www-form-urlencoded" in content_type.lower() def extract_charset_from_content_type(content_type: str) -> str: if not content_type: return "" parts = content_type.split(";") for part in parts[1:]: lowered = part.strip().lower() if lowered.startswith("charset="): return part.split("=", 1)[1].strip().strip('"').strip("'") return "" def extract_declared_charset_from_body(data: bytes) -> str: if not data: return "" sample = data[:16384] try: sample_text = sample.decode("iso-8859-1", errors="ignore") except UnicodeDecodeError: return "" for line in sample_text.splitlines(): lowered = line.lower() if "content-type:" in lowered and "charset=" in lowered: charset_index = lowered.index("charset=") + len("charset=") remainder = line[charset_index:] for separator in [";", " ", "\t"]: remainder = remainder.split(separator)[0] return remainder.strip().strip('"').strip("'") meta_match = re.search(r'charset=["\']?([a-zA-Z0-9_\-.:]+)', sample_text, re.IGNORECASE) if meta_match: return meta_match.group(1) return "" def decode_body_bytes(data: bytes, headers: httpx.Headers, user_encoding: str = ""): attempts = [] if user_encoding: attempts.append(("user", user_encoding)) header_declared = extract_charset_from_content_type(headers.get("Content-Type", "")) if headers else "" if header_declared: attempts.append(("header", header_declared)) body_declared = extract_declared_charset_from_body(data) if not header_declared else "" if body_declared: attempts.append(("body", body_declared)) for source, encoding in attempts: enc = (encoding or "").strip() if not enc: continue try: return data.decode(enc), enc, source except (LookupError, UnicodeDecodeError): continue if charset_from_bytes is not None and data: best = charset_from_bytes(data).best() if best and best.encoding: try: return data.decode(best.encoding), best.encoding, "detected" except (LookupError, UnicodeDecodeError): pass if chardet is not None and data: detection = chardet.detect(data) encoding = detection.get("encoding") if encoding: try: return data.decode(encoding), encoding, "detected" except (LookupError, UnicodeDecodeError): pass try: return data.decode("utf-8"), "utf-8", "fallback" except UnicodeDecodeError: return data.decode("utf-8", errors="replace"), "utf-8", "fallback" def prepare_body(data: str, headers: httpx.Headers, debug: bool = False): meta = { "source": "inline", "mode": "none", "length": 0, "charset": None, "encoded": False, } if not data: return None, meta if data.startswith("@"): path = data[1:] if path == "-": payload = sys.stdin.buffer.read() meta["source"] = "stdin" else: path = os.path.expanduser(path) if not os.path.isfile(path): raise FileNotFoundError(f"Body file not found: {path}") with open(path, "rb") as fh: payload = fh.read() meta["source"] = path meta["mode"] = "binary" meta["length"] = len(payload) return payload, meta stripped = data.strip() content_type = headers.get("Content-Type") if not content_type: guessed = "" if stripped.startswith("{") or stripped.startswith("["): guessed = "application/json" elif "=" in stripped and "&" in stripped: guessed = "application/x-www-form-urlencoded" elif stripped.startswith("<"): guessed = "application/xml" if guessed: headers["Content-Type"] = guessed content_type = guessed processed = data if should_encode_form(headers, data): processed = encode_form_data(data) meta["mode"] = "form-urlencoded" meta["encoded"] = True else: normalized = (headers.get("Content-Type") or "").lower() if normalized.startswith("application/json"): meta["mode"] = "json" elif normalized.startswith("multipart/form-data"): meta["mode"] = "multipart" elif normalized.startswith("text/") or "xml" in normalized: meta["mode"] = "text" else: meta["mode"] = "raw" content_type = headers.get("Content-Type") charset = extract_charset_from_content_type(content_type) if content_type else "" if not charset and meta["mode"] in ("json", "text", "form-urlencoded"): charset = "utf-8" base = (content_type or "text/plain").split(";")[0].strip() headers["Content-Type"] = f"{base}; charset={charset}" elif not charset: charset = "utf-8" body_bytes = processed.encode(charset, errors="surrogatepass") meta["charset"] = charset meta["length"] = len(body_bytes) if debug: print("\n===== Debug: Body Encoding =====") print(f"Mode: {meta['mode']}") print(f"Charset: {charset}") print(f"Length: {meta['length']} bytes") print(f"Source: {meta['source']}") if meta["encoded"]: print("Form data was URL-encoded prior to sending.") return body_bytes, meta def probe_connection(url: str, timeout: float, verify_tls: bool, skip: bool): metrics = {} if skip: return metrics parsed = urllib.parse.urlsplit(url) host = parsed.hostname if not host: return metrics port = parsed.port or (443 if parsed.scheme == "https" else 80) dns_start = time.perf_counter() try: addr_info = socket.getaddrinfo(host, port, type=socket.SOCK_STREAM) except socket.gaierror: return metrics dns_time = time.perf_counter() - dns_start family, socktype, proto, _, sockaddr = addr_info[0] sock = socket.socket(family, socktype, proto) sock.settimeout(timeout or 30.0) try: connect_start = time.perf_counter() sock.connect(sockaddr) connect_time = time.perf_counter() - connect_start tls_time = 0.0 if parsed.scheme == "https": ctx = ssl.create_default_context() if not verify_tls: ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE try: tls_start = time.perf_counter() tls_sock = ctx.wrap_socket(sock, server_hostname=host, do_handshake_on_connect=False) try: tls_sock.do_handshake() tls_time = time.perf_counter() - tls_start finally: tls_sock.close() except ssl.SSLError: tls_time = 0.0 else: sock.close() except OSError: sock.close() return {"dns_lookup": dns_time} metrics["dns_lookup"] = dns_time metrics["tcp_connect"] = connect_time metrics["tls_handshake"] = tls_time metrics["pretransfer"] = dns_time + connect_time + tls_time return metrics def format_metric_value(key: str, value): if value is None: return "n/a" if key in {"http_code", "redirects", "size_download"}: return str(int(value)) if key == "speed_download": return f"{value:.2f} B/s" return f"{value:.6f}s" def summarize(values): if not values: return None count = len(values) total = sum(values) return min(values), total / count, max(values) def render_request_overview(method: str, url: str, headers: httpx.Headers, body_meta: Dict[str, str]): items = list(headers.items()) print("\n===== Prepared Request =====") print(f"Method: {method}") print(f"URL: {url}") print(f"Headers ({len(items)} total):") for key, value in items: print(f" {key}: {value}") if body_meta["length"]: charset = body_meta.get("charset") or "n/a" print(f"Body: {body_meta['length']} bytes ({body_meta['mode']}, charset={charset}, source={body_meta['source']})") else: print("Body: ") def main(): parser = argparse.ArgumentParser(description="Pure Python HTTP testing helper powered by httpx") parser.add_argument("--url", required=True) parser.add_argument("--method", default="GET") parser.add_argument("--data", default="") parser.add_argument("--headers", default="", type=str) parser.add_argument("--cookies", default="") parser.add_argument("--user-agent", dest="user_agent", default="") parser.add_argument("--proxy", default="") parser.add_argument("--timeout", default="") parser.add_argument("--repeat", type=int, default=1) parser.add_argument("--delay", default="0") parser.add_argument("--additional-args", dest="additional_args", default="") parser.add_argument("--action", default="") parser.add_argument("--include-headers", dest="include_headers", action="store_true") parser.add_argument("--no-include-headers", dest="include_headers", action="store_false") parser.add_argument("--auto-encode-url", dest="auto_encode_url", action="store_true") parser.add_argument("--no-auto-encode-url", dest="auto_encode_url", action="store_false") parser.add_argument("--follow-redirects", dest="follow_redirects", action="store_true") parser.add_argument("--allow-insecure", dest="allow_insecure", action="store_true") parser.add_argument("--verbose-output", dest="verbose_output", action="store_true") parser.add_argument("--show-command", dest="show_command", action="store_true") parser.add_argument("--show-summary", dest="show_summary", action="store_true") parser.add_argument("--debug", dest="debug", action="store_true") parser.add_argument("--response-encoding", dest="response_encoding", default="") parser.add_argument("--download", dest="download", default="") parser.set_defaults( include_headers=False, auto_encode_url=False, follow_redirects=False, allow_insecure=False, verbose_output=False, show_command=False, show_summary=False, debug=False, ) args = parser.parse_args() repeat = max(1, args.repeat) try: delay_between = float(args.delay or "0") if delay_between < 0: delay_between = 0.0 except ValueError: delay_between = 0.0 prepared_url = smart_encode_url(args.url) if args.auto_encode_url else args.url method = (args.method or "GET").upper() # 处理 headers:支持字典(JSON字符串)和字符串格式 # 框架会将 object 类型序列化为 JSON 字符串传递 headers_list = [] if args.headers: headers_str = args.headers.strip() # 优先尝试解析为 JSON(框架传递的字典会被序列化为 JSON) if headers_str.startswith("{") or headers_str.startswith("["): try: parsed = json.loads(headers_str) if isinstance(parsed, dict): # 字典格式:直接转换为 (key, value) 元组列表 headers_list = [(str(k).strip(), str(v).strip()) for k, v in parsed.items()] elif isinstance(parsed, list): # 数组格式:使用原有的 parse_headers 函数处理 headers_list = parse_headers(headers_str) else: headers_list = parse_headers(headers_str) except (json.JSONDecodeError, ValueError): # JSON 解析失败,回退到原有的字符串解析逻辑 headers_list = parse_headers(headers_str) else: # 非 JSON 格式,使用原有的字符串解析逻辑(向后兼容) headers_list = parse_headers(headers_str) headers = httpx.Headers(headers_list) if args.user_agent: headers["User-Agent"] = args.user_agent body_bytes = None body_meta = { "source": "inline", "mode": "none", "length": 0, "charset": None, "encoded": False, } try: body_bytes, body_meta = prepare_body(args.data, headers, args.debug) except FileNotFoundError as exc: print(f"Body preparation failed: {exc}", file=sys.stderr) sys.exit(1) cookie_dict = parse_cookies(args.cookies) cookie_jar = httpx.Cookies() for name, value in cookie_dict.items(): cookie_jar.set(name, value) additional_options = parse_additional_options(args.additional_args) timeout_value = None if args.timeout: try: timeout_value = float(args.timeout) except ValueError: timeout_value = None timeout = httpx.Timeout(timeout_value or 60.0) client_kwargs = { "timeout": timeout, "verify": not args.allow_insecure, "follow_redirects": args.follow_redirects, "cookies": cookie_jar, } if args.proxy: client_kwargs["proxies"] = args.proxy if "http2" in additional_options: client_kwargs["http2"] = str_to_bool(additional_options["http2"]) if "cert" in additional_options: client_kwargs["cert"] = additional_options["cert"] if "verify" in additional_options: value = additional_options["verify"] lowered = value.strip().lower() client_kwargs["verify"] = str_to_bool(value) if lowered in {"true", "false", "1", "0", "yes", "no", "on", "off"} else value if "max_redirects" in additional_options: try: client_kwargs["max_redirects"] = int(additional_options["max_redirects"]) except ValueError: pass if "trust_env" in additional_options: client_kwargs["trust_env"] = str_to_bool(additional_options["trust_env"]) if args.show_command: render_request_overview(method, prepared_url, headers, body_meta) aggregate = None if args.show_summary: aggregate = {key: [] for key in METRIC_KEYS} aggregate["wall_time"] = [] exit_code = 0 verify_option = client_kwargs.get("verify", True) verify_tls = verify_option if isinstance(verify_option, bool) else True skip_probe = bool(args.proxy) client = httpx.Client(**client_kwargs) try: for run_index in range(repeat): if run_index > 0 and delay_between > 0: time.sleep(delay_between) metrics = {key: None for key in METRIC_KEYS} probe_metrics = probe_connection(prepared_url, timeout_value or 60.0, verify_tls, skip_probe) metrics.update(probe_metrics) start = time.perf_counter() first_byte_time = None body_buffer = bytearray() try: stream_kwargs = { "method": method, "url": prepared_url, "headers": headers, } if body_bytes is not None: stream_kwargs["content"] = body_bytes with client.stream(**stream_kwargs) as response: status_code = response.status_code metrics["http_code"] = status_code metrics["redirects"] = len(response.history) http_version = response.http_version or "HTTP/1.1" for chunk in response.iter_bytes(): if first_byte_time is None: first_byte_time = time.perf_counter() body_buffer.extend(chunk) total_elapsed = time.perf_counter() - start metrics["total"] = total_elapsed metrics["ttfb"] = (first_byte_time - start) if first_byte_time else total_elapsed metrics["size_download"] = len(body_buffer) metrics["speed_download"] = (len(body_buffer) / total_elapsed) if total_elapsed > 0 else 0.0 if metrics.get("pretransfer") is None: metrics["pretransfer"] = (metrics.get("dns_lookup") or 0.0) + (metrics.get("tcp_connect") or 0.0) + (metrics.get("tls_handshake") or 0.0) decoded_body, used_encoding, encoding_source = decode_body_bytes(bytes(body_buffer), response.headers, args.response_encoding) print(f"\n===== Response #{run_index + 1} =====") download_target = (args.download or "").strip() if download_target: # 支持多次请求时通过 {i} 占位符区分文件名 if repeat > 1 and "{i}" in download_target: filename = download_target.format(i=run_index + 1) else: filename = download_target try: if os.path.dirname(filename): os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, "wb") as fh: fh.write(body_buffer) print(f"[saved response body to file: {filename}]") except OSError as exc: print(f"[failed to save response body to {filename}: {exc}]", file=sys.stderr) if args.include_headers: status_line = f"{http_version} {status_code} {response.reason_phrase}" print(status_line) for key, value in response.headers.items(): print(f"{key}: {value}") print("") output_body = decoded_body.rstrip() if output_body: print(output_body) else: print("[no body]") print(f"\n----- Meta #{run_index + 1} -----") if args.show_summary: for key in METRIC_KEYS: label = key.replace("_", " ").title() print(f"{label}: {format_metric_value(key, metrics.get(key))}") value = metrics.get(key) if aggregate is not None and isinstance(value, (int, float)): aggregate[key].append(float(value)) print(f"Wall Time (client): {total_elapsed:.6f}s") if aggregate is not None: aggregate["wall_time"].append(total_elapsed) else: print("Summary disabled (--show-summary=false).") print(f"Wall Time (client): {total_elapsed:.6f}s") print(f"Encoding Used: {used_encoding} ({encoding_source})") if args.verbose_output: sent_bytes = len(body_bytes) if body_bytes else 0 print("\nVerbose diagnostics:") print(f" Sent body bytes: {sent_bytes}") if probe_metrics: for key, value in probe_metrics.items(): print(f" Probe {key}: {value:.6f}s") print(f" History length: {len(response.history)}") except httpx.HTTPError as exc: exit_code = 1 elapsed = time.perf_counter() - start print(f"\n===== Response #{run_index + 1} =====") print(f"Request failed after {elapsed:.6f}s: {exc}") continue if aggregate and repeat > 1: print("\n===== Aggregate Timing =====") for key, values in aggregate.items(): summary = summarize(values) if not summary: continue label = key.replace("_", " ").title() min_v, avg_v, max_v = summary if key == "speed_download": print(f"{label}: min {min_v:.2f} B/s | avg {avg_v:.2f} B/s | max {max_v:.2f} B/s") elif key == "size_download": print(f"{label}: min {min_v:.0f}B | avg {avg_v:.0f}B | max {max_v:.0f}B") elif key in {"http_code", "redirects"}: print(f"{label}: min {min_v:.0f} | avg {avg_v:.2f} | max {max_v:.0f}") else: print(f"{label}: min {min_v:.6f}s | avg {avg_v:.6f}s | max {max_v:.6f}s") finally: client.close() sys.exit(exit_code) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterrupted.", file=sys.stderr) sys.exit(130) enabled: true short_description: "纯Python HTTP测试框架(httpx,会话复用+编码守护)" description: | 纯 Python 构建的 HTTP 测试框架:基于 httpx 会话,结合 socket 探针补齐 DNS/TCP/TLS 观测, 提供全流程的请求体编码守护、重复播放与可观测能力,适配盲注延时、复杂 payload 调优等场景。 **亮点:** - 纯 Python 实现:httpx 会话重用、HTTP/2/代理/certs 直接在脚本内配置,无外部二进制依赖 - 智能 Body 编码:支持 application/x-www-form-urlencoded 二次编码、JSON/文本 charset 推断、 `@file`/`@-` 注入二进制、可视化调试 - 连接探针:在无代理场景下额外进行 DNS/TCP/TLS 探测,粗粒度复刻 curl -w 指标 - 可重复观测:repeat/delay + TTFB/total/speed_download 统计,便于盲注/时序测试 - 扩展开关:additional_args 解析 http2/cert/verify/trust_env/max_redirects 等 httpx 选项 parameters: - name: "url" type: "string" description: "目标URL(可自动编码路径/参数,避免特殊字符导致的请求失败如:https://www.target.com/s?wd=test)" required: true flag: "--url" - name: "method" type: "string" description: "HTTP方法(GET, POST, PUT, DELETE等)" required: false default: "GET" flag: "--method" - name: "data" type: "string" description: "请求数据/参数(支持JSON、表单、原始payload,前缀@file或@-可加载文件/STDIN)" required: false flag: "--data" - name: "headers" type: "object" description: "自定义请求头(字典格式,如 {\"X-Custom\": \"value\"})" required: false flag: "--headers" - name: "cookies" type: "string" description: "自定义Cookie(格式:name1=value1; name2=value2)" required: false flag: "--cookies" - name: "user_agent" type: "string" description: "自定义User-Agent" required: false flag: "--user-agent" - name: "proxy" type: "string" description: "代理(http(s)://或socks5://地址,直接传递给httpx Client)" required: false flag: "--proxy" - name: "timeout" type: "string" description: "最大超时时间(秒,支持小数)" required: false flag: "--timeout" - name: "repeat" type: "int" description: "重复请求次数,用于盲注/稳定性观测(>=1)" required: false default: 1 flag: "--repeat" - name: "delay" type: "string" description: "重复请求之间的延迟(秒,可为小数)" required: false default: "0" flag: "--delay" - name: "include_headers" type: "bool" description: "输出响应头(默认开启,可用 --no-include-headers 关闭)" required: false default: true flag: "--include-headers" - name: "auto_encode_url" type: "bool" description: "自动URL编码(默认开启,可通过 --no-auto-encode-url 关闭)" required: false default: true flag: "--auto-encode-url" - name: "follow_redirects" type: "bool" description: "跟随重定向(httpx follow_redirects)" required: false default: false flag: "--follow-redirects" - name: "allow_insecure" type: "bool" description: "忽略TLS证书错误(verify=False)" required: false default: false flag: "--allow-insecure" - name: "verbose_output" type: "bool" description: "输出额外调试信息(请求体字节数、探针延时、历史长度等)" required: false default: false flag: "--verbose-output" - name: "show_command" type: "bool" description: "打印请求概览(方法、URL、所有头、Body概况)" required: false default: true flag: "--show-command" - name: "show_summary" type: "bool" description: "打印指标摘要(默认开启,含DNS/TCP/TLS/TTFB/Total)" required: false default: true flag: "--show-summary" - name: "debug" type: "bool" description: "调试模式:显示Body编码处理细节(模式/charset/长度)" required: false default: false flag: "--debug" - name: "response_encoding" type: "string" description: "强制响应解码使用的编码(如GBK),覆盖自动探测" required: false flag: "--response-encoding" - name: "action" type: "string" description: "保留字段:标识调用意图(request, spider等),脚本内部不使用" required: false default: "request" flag: "--action" - name: "download" type: "string" description: | 将响应体保存到本地文件,写入原始字节数据: - "output.bin":单次请求直接保存到指定文件 - "outputs/run-{i}.bin":repeat>1 时按序号区分文件({i} 将被替换为 1,2,...) required: false flag: "--download" - name: "additional_args" type: "string" description: | 额外的 httpx 选项,需按 httpx.Client 的关键字参数名与类型填写,支持 "key=value" 或单词形式(等价于 key=true): - "http2=true" - "cert=/path/to/client.pem" - "verify=/path/to/ca.pem" 或 "verify=false" - "trust_env=false"、"max_redirects=5" 等 required: false flag: "--additional-args"