From 5cc51ab649589f06a6c790f6431c6c8aafd9bace Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Mon, 24 Nov 2025 21:06:09 +0800 Subject: [PATCH] Add files via upload --- tools/http-framework-test.yaml | 961 ++++++++++++++++++++------------- tools/httpx.yaml | 119 ++-- 2 files changed, 641 insertions(+), 439 deletions(-) diff --git a/tools/http-framework-test.yaml b/tools/http-framework-test.yaml index 07fac1b6..c111ac01 100644 --- a/tools/http-framework-test.yaml +++ b/tools/http-framework-test.yaml @@ -5,12 +5,21 @@ args: - | import argparse import json + import os import re import shlex - import subprocess + import socket + import ssl import sys import time import urllib.parse + from typing import Dict, List, Tuple + + try: + import httpx + except ImportError: + print("Missing dependency: httpx. Install it with `pip install httpx`.", file=sys.stderr) + sys.exit(1) try: from charset_normalizer import from_bytes as charset_from_bytes @@ -22,7 +31,6 @@ args: except ImportError: chardet = None - METRIC_MARKER = "__CYBERSTRIKE_HTTP_METRICS__" METRIC_KEYS = [ "dns_lookup", "tcp_connect", @@ -36,41 +44,77 @@ args: "redirects", ] - def parse_headers(raw: str): + + def parse_headers(raw: str) -> List[Tuple[str, str]]: if not raw: return [] raw = raw.strip() if not raw: return [] + headers: List[Tuple[str, str]] = [] + parsed = None try: parsed = json.loads(raw) - headers = [] - if isinstance(parsed, dict): - for k, v in parsed.items(): - headers.append(f"{k}: {v}") - return headers - if isinstance(parsed, list): - for item in parsed: - if isinstance(item, str) and item.strip(): - headers.append(item.strip()) - if headers: - return headers except json.JSONDecodeError: - pass - headers = [] + parsed = None + if isinstance(parsed, dict): + for key, value in parsed.items(): + headers.append((str(key).strip(), str(value).strip())) + return headers + if isinstance(parsed, list): + for item in parsed: + if isinstance(item, str) and ":" in item: + key, value = item.split(":", 1) + headers.append((key.strip(), value.strip())) + if headers: + return headers for line in raw.replace(";", "\n").splitlines(): - stripped = line.strip() - if stripped: - headers.append(stripped) + if ":" not in line: + continue + key, value = line.split(":", 1) + headers.append((key.strip(), value.strip())) return headers - def parse_additional(raw: str): + + def parse_cookies(raw: str) -> Dict[str, str]: + cookies: Dict[str, str] = {} if not raw: - return [] + return cookies + for part in raw.split(";"): + if "=" not in part: + continue + name, value = part.split("=", 1) + name = name.strip() + if not name: + continue + cookies[name] = value.strip() + return cookies + + + def parse_additional_options(raw: str) -> Dict[str, str]: + options: Dict[str, str] = {} + if not raw: + return options try: - return shlex.split(raw) + tokens = shlex.split(raw) except ValueError: - return [arg for arg in raw.split() if arg] + tokens = raw.split() + for token in tokens: + if "=" in token: + key, value = token.split("=", 1) + options[key.strip()] = value.strip() + else: + options[token.strip()] = "true" + return options + + + def str_to_bool(value) -> bool: + if isinstance(value, bool): + return value + if value is None: + return False + return str(value).strip().lower() in {"1", "true", "yes", "on"} + def smart_encode_url(url: str, safe_path="/:@&=%+,$-~", safe_query="/:@&=%+,$-~"): try: @@ -82,25 +126,102 @@ args: fragment = urllib.parse.quote(parts.fragment, safe=safe_query) return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, query, fragment)) - def sanitize_cmd(cmd): - return " ".join(shlex.quote(part) for part in cmd) - def extract_metrics(output: bytes): - marker = (METRIC_MARKER + ":").encode("ascii") - if marker not in output: - return output, {} - head, tail = output.rsplit(marker, 1) - try: - tail_text = tail.decode("utf-8", errors="ignore") - except UnicodeDecodeError: - tail_text = "" - values = tail_text.strip().split("|") - stats = {} - for key, value in zip(METRIC_KEYS, values): - stats[key] = value.strip() - return head, stats + def encode_form_data(data: str) -> str: + if not data: + return data - def extract_declared_charset(data: bytes): + def find_key_value_pairs(text): + pairs = [] + i = 0 + text_len = len(text) + + while i < text_len: + while i < text_len and text[i] in " \t\n\r": + i += 1 + if i >= text_len: + break + + key_start = i + while i < text_len and text[i] != "=": + i += 1 + + if i >= text_len: + remaining = text[key_start:].strip() + if remaining: + pairs.append((None, remaining)) + break + + key = text[key_start:i].strip() + i += 1 + + if not key: + continue + + value_start = i + value_end = text_len + + j = value_start + while j < text_len: + if text[j] == "&": + k = j + 1 + while k < text_len and text[k] in " \t\n\r": + k += 1 + m = k + while m < text_len and text[m] not in "=&": + m += 1 + if m < text_len and text[m] == "=": + value_end = j + i = j + 1 + break + j += 1 + + value = text[value_start:value_end] + pairs.append((key, value)) + + if value_end < text_len: + i = value_end + 1 + else: + break + + return pairs + + pairs = find_key_value_pairs(data) + parts = [] + + for key, value in pairs: + if key is None: + parts.append(urllib.parse.quote_plus(value, safe="")) + else: + encoded_value = urllib.parse.quote_plus(value, safe="") + parts.append(f"{key}={encoded_value}") + + return "&".join(parts) + + + def should_encode_form(headers: httpx.Headers, data: str) -> bool: + if not data: + return False + if not headers: + return False + content_type = headers.get("Content-Type") + if not content_type: + return False + return "application/x-www-form-urlencoded" in content_type.lower() + + + def extract_charset_from_content_type(content_type: str) -> str: + if not content_type: + return "" + parts = content_type.split(";") + for part in parts[1:]: + lowered = part.strip().lower() + if lowered.startswith("charset="): + return part.split("=", 1)[1].strip().strip('"').strip("'") + return "" + + + def extract_declared_charset_from_body(data: bytes) -> str: if not data: return "" sample = data[:16384] @@ -108,7 +229,6 @@ args: sample_text = sample.decode("iso-8859-1", errors="ignore") except UnicodeDecodeError: return "" - for line in sample_text.splitlines(): lowered = line.lower() if "content-type:" in lowered and "charset=" in lowered: @@ -117,20 +237,22 @@ args: for separator in [";", " ", "\t"]: remainder = remainder.split(separator)[0] return remainder.strip().strip('"').strip("'") - meta_match = re.search(r'charset=["\']?([a-zA-Z0-9_\-.:]+)', sample_text, re.IGNORECASE) if meta_match: return meta_match.group(1) return "" - def decode_body_bytes(data: bytes, user_encoding: str = ""): - declared = extract_declared_charset(data) + + def decode_body_bytes(data: bytes, headers: httpx.Headers, user_encoding: str = ""): attempts = [] if user_encoding: attempts.append(("user", user_encoding)) - if declared: - attempts.append(("declared", declared)) - + header_declared = extract_charset_from_content_type(headers.get("Content-Type", "")) if headers else "" + if header_declared: + attempts.append(("header", header_declared)) + body_declared = extract_declared_charset_from_body(data) if not header_declared else "" + if body_declared: + attempts.append(("body", body_declared)) for source, encoding in attempts: enc = (encoding or "").strip() if not enc: @@ -139,7 +261,6 @@ args: return data.decode(enc), enc, source except (LookupError, UnicodeDecodeError): continue - if charset_from_bytes is not None and data: best = charset_from_bytes(data).best() if best and best.encoding: @@ -147,7 +268,6 @@ args: return data.decode(best.encoding), best.encoding, "detected" except (LookupError, UnicodeDecodeError): pass - if chardet is not None and data: detection = chardet.detect(data) encoding = detection.get("encoding") @@ -156,344 +276,424 @@ args: return data.decode(encoding), encoding, "detected" except (LookupError, UnicodeDecodeError): pass - try: return data.decode("utf-8"), "utf-8", "fallback" except UnicodeDecodeError: return data.decode("utf-8", errors="replace"), "utf-8", "fallback" - def to_float(value): + + def prepare_body(data: str, headers: httpx.Headers, debug: bool = False): + meta = { + "source": "inline", + "mode": "none", + "length": 0, + "charset": None, + "encoded": False, + } + if not data: + return None, meta + if data.startswith("@"): + path = data[1:] + if path == "-": + payload = sys.stdin.buffer.read() + meta["source"] = "stdin" + else: + path = os.path.expanduser(path) + if not os.path.isfile(path): + raise FileNotFoundError(f"Body file not found: {path}") + with open(path, "rb") as fh: + payload = fh.read() + meta["source"] = path + meta["mode"] = "binary" + meta["length"] = len(payload) + return payload, meta + stripped = data.strip() + content_type = headers.get("Content-Type") + if not content_type: + guessed = "" + if stripped.startswith("{") or stripped.startswith("["): + guessed = "application/json" + elif "=" in stripped and "&" in stripped: + guessed = "application/x-www-form-urlencoded" + elif stripped.startswith("<"): + guessed = "application/xml" + if guessed: + headers["Content-Type"] = guessed + content_type = guessed + processed = data + if should_encode_form(headers, data): + processed = encode_form_data(data) + meta["mode"] = "form-urlencoded" + meta["encoded"] = True + else: + normalized = (headers.get("Content-Type") or "").lower() + if normalized.startswith("application/json"): + meta["mode"] = "json" + elif normalized.startswith("multipart/form-data"): + meta["mode"] = "multipart" + elif normalized.startswith("text/") or "xml" in normalized: + meta["mode"] = "text" + else: + meta["mode"] = "raw" + content_type = headers.get("Content-Type") + charset = extract_charset_from_content_type(content_type) if content_type else "" + if not charset and meta["mode"] in ("json", "text", "form-urlencoded"): + charset = "utf-8" + base = (content_type or "text/plain").split(";")[0].strip() + headers["Content-Type"] = f"{base}; charset={charset}" + elif not charset: + charset = "utf-8" + body_bytes = processed.encode(charset, errors="surrogatepass") + meta["charset"] = charset + meta["length"] = len(body_bytes) + if debug: + print("\n===== Debug: Body Encoding =====") + print(f"Mode: {meta['mode']}") + print(f"Charset: {charset}") + print(f"Length: {meta['length']} bytes") + print(f"Source: {meta['source']}") + if meta["encoded"]: + print("Form data was URL-encoded prior to sending.") + return body_bytes, meta + + + def probe_connection(url: str, timeout: float, verify_tls: bool, skip: bool): + metrics = {} + if skip: + return metrics + parsed = urllib.parse.urlsplit(url) + host = parsed.hostname + if not host: + return metrics + port = parsed.port or (443 if parsed.scheme == "https" else 80) + dns_start = time.perf_counter() try: - return float(value) - except (TypeError, ValueError): + addr_info = socket.getaddrinfo(host, port, type=socket.SOCK_STREAM) + except socket.gaierror: + return metrics + dns_time = time.perf_counter() - dns_start + family, socktype, proto, _, sockaddr = addr_info[0] + sock = socket.socket(family, socktype, proto) + sock.settimeout(timeout or 30.0) + try: + connect_start = time.perf_counter() + sock.connect(sockaddr) + connect_time = time.perf_counter() - connect_start + tls_time = 0.0 + if parsed.scheme == "https": + ctx = ssl.create_default_context() + if not verify_tls: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + try: + tls_start = time.perf_counter() + tls_sock = ctx.wrap_socket(sock, server_hostname=host, do_handshake_on_connect=False) + try: + tls_sock.do_handshake() + tls_time = time.perf_counter() - tls_start + finally: + tls_sock.close() + except ssl.SSLError: + tls_time = 0.0 + else: + sock.close() + except OSError: + sock.close() + return {"dns_lookup": dns_time} + metrics["dns_lookup"] = dns_time + metrics["tcp_connect"] = connect_time + metrics["tls_handshake"] = tls_time + metrics["pretransfer"] = dns_time + connect_time + tls_time + return metrics + + + def format_metric_value(key: str, value): + if value is None: + return "n/a" + if key in {"http_code", "redirects", "size_download"}: + return str(int(value)) + if key == "speed_download": + return f"{value:.2f} B/s" + return f"{value:.6f}s" + + + def summarize(values): + if not values: return None + count = len(values) + total = sum(values) + return min(values), total / count, max(values) - def encode_form_data(data: str): - """ - 对application/x-www-form-urlencoded格式的数据进行URL编码 - 解析key=value格式,对值部分进行URL编码,保持键名不变 - 支持多个键值对(用&分隔) - 正确处理值中包含=的情况(使用split('=', 1)只分割第一个=) - 智能处理值中包含&的情况(通过检查&后是否跟着key=模式来判断) - - 注意:假设输入是未编码的原始数据,如果已经是编码的,可能会重复编码 - """ - if not data: - return data - - # 智能分割:找到真正的键值对分隔符& - # &是分隔符的条件:&后面跟着key=模式(即非空白字符后跟=) - def find_key_value_pairs(text): - """找到所有的key=value对,正确处理值中的&和=""" - pairs = [] - i = 0 - text_len = len(text) - - while i < text_len: - # 跳过空白 - while i < text_len and text[i] in ' \t\n\r': - i += 1 - if i >= text_len: - break - - # 查找键名(到第一个=为止) - key_start = i - while i < text_len and text[i] != '=': - i += 1 - - if i >= text_len: - # 没有=,可能是单个值 - remaining = text[key_start:].strip() - if remaining: - pairs.append((None, remaining)) - break - - # 提取键名 - key = text[key_start:i].strip() - i += 1 # 跳过= - - if not key: - continue - - # 查找值的结束位置 - # 值可能包含&,需要判断&是否是分隔符 - value_start = i - value_end = text_len - - # 从值开始位置查找& - j = value_start - while j < text_len: - if text[j] == '&': - # 检查&后面是否跟着key=模式 - k = j + 1 - # 跳过空白 - while k < text_len and text[k] in ' \t\n\r': - k += 1 - # 查找下一个=或& - m = k - while m < text_len and text[m] not in '=&': - m += 1 - # 如果找到=,说明&后面是新的键值对 - if m < text_len and text[m] == '=': - value_end = j - i = j + 1 # 从&后开始下一轮 - break - j += 1 - - # 提取值 - value = text[value_start:value_end] - pairs.append((key, value)) - - if value_end < text_len: - i = value_end + 1 - else: - break - - return pairs - - # 解析所有键值对 - pairs = find_key_value_pairs(data) - parts = [] - - for key, value in pairs: - if key is None: - # 没有键,只有值 - parts.append(urllib.parse.quote_plus(value, safe='')) - else: - # 对值进行URL编码 - encoded_value = urllib.parse.quote_plus(value, safe='') - parts.append(f"{key}={encoded_value}") - - return '&'.join(parts) - def should_encode_data(headers: list, data: str): - """ - 判断是否需要对POST数据进行URL编码 - 如果Content-Type是application/x-www-form-urlencoded,则需要编码 - """ - if not data: - return False - - content_type = None - for header in headers: - if ':' in header: - h_key, h_value = header.split(':', 1) - if h_key.strip().lower() == 'content-type': - content_type = h_value.strip().lower() - break - - if content_type and 'application/x-www-form-urlencoded' in content_type: - return True - return False + def render_request_overview(method: str, url: str, headers: httpx.Headers, body_meta: Dict[str, str]): + items = list(headers.items()) + print("\n===== Prepared Request =====") + print(f"Method: {method}") + print(f"URL: {url}") + print(f"Headers ({len(items)} total):") + for key, value in items: + print(f" {key}: {value}") + if body_meta["length"]: + charset = body_meta.get("charset") or "n/a" + print(f"Body: {body_meta['length']} bytes ({body_meta['mode']}, charset={charset}, source={body_meta['source']})") + else: + print("Body: ") - parser = argparse.ArgumentParser(description="Enhanced HTTP testing helper") - parser.add_argument("--url", required=True) - parser.add_argument("--method", default="GET") - parser.add_argument("--data", default="") - parser.add_argument("--headers", default="") - parser.add_argument("--cookies", default="") - parser.add_argument("--user-agent", dest="user_agent", default="") - parser.add_argument("--proxy", default="") - parser.add_argument("--timeout", default="") - parser.add_argument("--repeat", type=int, default=1) - parser.add_argument("--delay", default="0") - parser.add_argument("--additional-args", dest="additional_args", default="") - parser.add_argument("--action", default="") - parser.add_argument("--include-headers", dest="include_headers", action="store_true") - parser.add_argument("--auto-encode-url", dest="auto_encode_url", action="store_true") - parser.add_argument("--follow-redirects", dest="follow_redirects", action="store_true") - parser.add_argument("--allow-insecure", dest="allow_insecure", action="store_true") - parser.add_argument("--verbose-output", dest="verbose_output", action="store_true") - parser.add_argument("--show-command", dest="show_command", action="store_true") - parser.add_argument("--show-summary", dest="show_summary", action="store_true") - parser.add_argument("--response-encoding", dest="response_encoding", default="") - parser.add_argument("--debug", dest="debug", action="store_true") - args = parser.parse_args() - repeat = max(1, args.repeat) - try: - delay_between = float(args.delay or "0") - if delay_between < 0: + def main(): + parser = argparse.ArgumentParser(description="Pure Python HTTP testing helper powered by httpx") + parser.add_argument("--url", required=True) + parser.add_argument("--method", default="GET") + parser.add_argument("--data", default="") + parser.add_argument("--headers", default="") + parser.add_argument("--cookies", default="") + parser.add_argument("--user-agent", dest="user_agent", default="") + parser.add_argument("--proxy", default="") + parser.add_argument("--timeout", default="") + parser.add_argument("--repeat", type=int, default=1) + parser.add_argument("--delay", default="0") + parser.add_argument("--additional-args", dest="additional_args", default="") + parser.add_argument("--action", default="") + parser.add_argument("--include-headers", dest="include_headers", action="store_true") + parser.add_argument("--no-include-headers", dest="include_headers", action="store_false") + parser.add_argument("--auto-encode-url", dest="auto_encode_url", action="store_true") + parser.add_argument("--no-auto-encode-url", dest="auto_encode_url", action="store_false") + parser.add_argument("--follow-redirects", dest="follow_redirects", action="store_true") + parser.add_argument("--allow-insecure", dest="allow_insecure", action="store_true") + parser.add_argument("--verbose-output", dest="verbose_output", action="store_true") + parser.add_argument("--show-command", dest="show_command", action="store_true") + parser.add_argument("--show-summary", dest="show_summary", action="store_true") + parser.add_argument("--debug", dest="debug", action="store_true") + parser.add_argument("--response-encoding", dest="response_encoding", default="") + parser.set_defaults( + include_headers=False, + auto_encode_url=False, + follow_redirects=False, + allow_insecure=False, + verbose_output=False, + show_command=False, + show_summary=False, + debug=False, + ) + args = parser.parse_args() + + repeat = max(1, args.repeat) + try: + delay_between = float(args.delay or "0") + if delay_between < 0: + delay_between = 0.0 + except ValueError: delay_between = 0.0 - except ValueError: - delay_between = 0.0 - prepared_url = smart_encode_url(args.url) if args.auto_encode_url else args.url - curl_cmd = ["curl", "-sS"] - if args.include_headers: - curl_cmd.append("-i") - if args.verbose_output: - curl_cmd.append("-v") - method = (args.method or "GET").upper() - if method: - curl_cmd.extend(["-X", method]) - if args.cookies: - curl_cmd.extend(["-b", args.cookies]) - if args.user_agent: - curl_cmd.extend(["-A", args.user_agent]) - if args.timeout: - curl_cmd.extend(["--max-time", str(args.timeout)]) - if args.follow_redirects: - curl_cmd.append("-L") - if args.allow_insecure: - curl_cmd.append("-k") - if args.proxy: - curl_cmd.extend(["-x", args.proxy]) - - # 解析headers以便检查Content-Type - parsed_headers = parse_headers(args.headers) - for header in parsed_headers: - curl_cmd.extend(["-H", header]) - - # 处理POST数据:如果是表单数据,需要URL编码 - prepared_data = args.data - if args.debug and args.data: - print("\n===== Debug: POST Data Processing =====") - print(f"Original data: {repr(args.data)}") - print(f"Data length: {len(args.data)} bytes") - # 显示原始数据中的键值对 - if '=' in args.data: - parts = args.data.split('&') - print(f"Detected {len(parts)} key-value pair(s):") - for i, part in enumerate(parts, 1): - if '=' in part: - k, v = part.split('=', 1) - print(f" [{i}] {k} = {repr(v[:50])}{'...' if len(v) > 50 else ''} (length: {len(v)})") - else: - print(f" [{i}] (no key): {repr(part[:50])}{'...' if len(part) > 50 else ''}") - if should_encode_data(parsed_headers, args.data): - print("Content-Type detected: application/x-www-form-urlencoded") - print("Encoding will be applied...") - else: - print("No encoding needed (not form-urlencoded)") - - if args.data and should_encode_data(parsed_headers, args.data): - prepared_data = encode_form_data(args.data) - if args.debug: - print(f"\nEncoded data: {repr(prepared_data)}") - print(f"Encoded length: {len(prepared_data)} bytes") - # 显示编码后的键值对 - if '&' in prepared_data: - encoded_parts = prepared_data.split('&') - print(f"Encoded into {len(encoded_parts)} key-value pair(s):") - for i, part in enumerate(encoded_parts, 1): - if '=' in part: - k, v = part.split('=', 1) - print(f" [{i}] {k} = {repr(v[:80])}{'...' if len(v) > 80 else ''} (length: {len(v)})") - else: - print(f" [{i}] (no key): {repr(part[:80])}{'...' if len(part) > 80 else ''}") - - if prepared_data: - curl_cmd.extend(["--data", prepared_data]) - metrics_template = METRIC_MARKER + ":" + "|".join([ - "%{time_namelookup}", - "%{time_connect}", - "%{time_appconnect}", - "%{time_pretransfer}", - "%{time_starttransfer}", - "%{time_total}", - "%{speed_download}", - "%{size_download}", - "%{http_code}", - "%{num_redirects}", - ]) - curl_cmd.extend(["-w", f"\n{metrics_template}\n"]) - if args.additional_args: - curl_cmd.extend(parse_additional(args.additional_args)) - curl_cmd.append(prepared_url) + prepared_url = smart_encode_url(args.url) if args.auto_encode_url else args.url + method = (args.method or "GET").upper() - aggregate = {key: [] for key in METRIC_KEYS} if args.show_summary else None - if aggregate is not None: - aggregate["wall_time"] = [] - exit_code = 0 + headers = httpx.Headers(parse_headers(args.headers)) + if args.user_agent: + headers["User-Agent"] = args.user_agent - for run_index in range(repeat): - if run_index > 0 and delay_between > 0: - time.sleep(delay_between) + body_bytes = None + body_meta = { + "source": "inline", + "mode": "none", + "length": 0, + "charset": None, + "encoded": False, + } + try: + body_bytes, body_meta = prepare_body(args.data, headers, args.debug) + except FileNotFoundError as exc: + print(f"Body preparation failed: {exc}", file=sys.stderr) + sys.exit(1) - run_cmd = list(curl_cmd) - start = time.perf_counter() - proc = subprocess.run( - run_cmd, - capture_output=True, - ) - elapsed = time.perf_counter() - start - body_bytes, stats = extract_metrics(proc.stdout) - decoded_body, used_encoding, encoding_source = decode_body_bytes( - body_bytes, - user_encoding=args.response_encoding, - ) + cookie_dict = parse_cookies(args.cookies) + cookie_jar = httpx.Cookies() + for name, value in cookie_dict.items(): + cookie_jar.set(name, value) - print(f"\n===== Response #{run_index + 1} =====") - output_body = decoded_body.rstrip() - if output_body: - print(output_body) - else: - print("[no body]") + additional_options = parse_additional_options(args.additional_args) - print(f"\n----- Meta #{run_index + 1} -----") - if args.show_command and run_index == 0: - print("Command:", sanitize_cmd(run_cmd)) + timeout_value = None + if args.timeout: + try: + timeout_value = float(args.timeout) + except ValueError: + timeout_value = None + timeout = httpx.Timeout(timeout_value or 60.0) + + client_kwargs = { + "timeout": timeout, + "verify": not args.allow_insecure, + "follow_redirects": args.follow_redirects, + "cookies": cookie_jar, + } + if args.proxy: + client_kwargs["proxies"] = args.proxy + if "http2" in additional_options: + client_kwargs["http2"] = str_to_bool(additional_options["http2"]) + if "cert" in additional_options: + client_kwargs["cert"] = additional_options["cert"] + if "verify" in additional_options: + value = additional_options["verify"] + lowered = value.strip().lower() + client_kwargs["verify"] = str_to_bool(value) if lowered in {"true", "false", "1", "0", "yes", "no", "on", "off"} else value + if "max_redirects" in additional_options: + try: + client_kwargs["max_redirects"] = int(additional_options["max_redirects"]) + except ValueError: + pass + if "trust_env" in additional_options: + client_kwargs["trust_env"] = str_to_bool(additional_options["trust_env"]) + + if args.show_command: + render_request_overview(method, prepared_url, headers, body_meta) + + aggregate = None if args.show_summary: - if stats: - for key in METRIC_KEYS: + aggregate = {key: [] for key in METRIC_KEYS} + aggregate["wall_time"] = [] + + exit_code = 0 + verify_option = client_kwargs.get("verify", True) + verify_tls = verify_option if isinstance(verify_option, bool) else True + skip_probe = bool(args.proxy) + + client = httpx.Client(**client_kwargs) + try: + for run_index in range(repeat): + if run_index > 0 and delay_between > 0: + time.sleep(delay_between) + + metrics = {key: None for key in METRIC_KEYS} + probe_metrics = probe_connection(prepared_url, timeout_value or 60.0, verify_tls, skip_probe) + metrics.update(probe_metrics) + + start = time.perf_counter() + first_byte_time = None + body_buffer = bytearray() + + try: + stream_kwargs = { + "method": method, + "url": prepared_url, + "headers": headers, + } + if body_bytes is not None: + stream_kwargs["content"] = body_bytes + + with client.stream(**stream_kwargs) as response: + status_code = response.status_code + metrics["http_code"] = status_code + metrics["redirects"] = len(response.history) + http_version = response.http_version or "HTTP/1.1" + + for chunk in response.iter_bytes(): + if first_byte_time is None: + first_byte_time = time.perf_counter() + body_buffer.extend(chunk) + + total_elapsed = time.perf_counter() - start + metrics["total"] = total_elapsed + metrics["ttfb"] = (first_byte_time - start) if first_byte_time else total_elapsed + metrics["size_download"] = len(body_buffer) + metrics["speed_download"] = (len(body_buffer) / total_elapsed) if total_elapsed > 0 else 0.0 + if metrics.get("pretransfer") is None: + metrics["pretransfer"] = (metrics.get("dns_lookup") or 0.0) + (metrics.get("tcp_connect") or 0.0) + (metrics.get("tls_handshake") or 0.0) + + decoded_body, used_encoding, encoding_source = decode_body_bytes(bytes(body_buffer), response.headers, args.response_encoding) + + print(f"\n===== Response #{run_index + 1} =====") + if args.include_headers: + status_line = f"{http_version} {status_code} {response.reason_phrase}" + print(status_line) + for key, value in response.headers.items(): + print(f"{key}: {value}") + print("") + output_body = decoded_body.rstrip() + if output_body: + print(output_body) + else: + print("[no body]") + + print(f"\n----- Meta #{run_index + 1} -----") + if args.show_summary: + for key in METRIC_KEYS: + label = key.replace("_", " ").title() + print(f"{label}: {format_metric_value(key, metrics.get(key))}") + value = metrics.get(key) + if aggregate is not None and isinstance(value, (int, float)): + aggregate[key].append(float(value)) + print(f"Wall Time (client): {total_elapsed:.6f}s") + if aggregate is not None: + aggregate["wall_time"].append(total_elapsed) + else: + print("Summary disabled (--show-summary=false).") + print(f"Wall Time (client): {total_elapsed:.6f}s") + + print(f"Encoding Used: {used_encoding} ({encoding_source})") + + if args.verbose_output: + sent_bytes = len(body_bytes) if body_bytes else 0 + print("\nVerbose diagnostics:") + print(f" Sent body bytes: {sent_bytes}") + if probe_metrics: + for key, value in probe_metrics.items(): + print(f" Probe {key}: {value:.6f}s") + print(f" History length: {len(response.history)}") + + except httpx.HTTPError as exc: + exit_code = 1 + elapsed = time.perf_counter() - start + print(f"\n===== Response #{run_index + 1} =====") + print(f"Request failed after {elapsed:.6f}s: {exc}") + continue + + if aggregate and repeat > 1: + print("\n===== Aggregate Timing =====") + for key, values in aggregate.items(): + summary = summarize(values) + if not summary: + continue label = key.replace("_", " ").title() - print(f"{label}: {stats.get(key, 'n/a')}") - value = to_float(stats.get(key)) - if value is not None and aggregate is not None: - aggregate[key].append(value) - else: - print("Timing data unavailable (curl -w output missing).") - print(f"Wall Time (client): {elapsed:.6f}s") - if aggregate is not None: - aggregate["wall_time"].append(elapsed) - else: - print("Summary disabled (--show-summary=false).") - print(f"Wall Time (client): {elapsed:.6f}s") + min_v, avg_v, max_v = summary + if key == "speed_download": + print(f"{label}: min {min_v:.2f} B/s | avg {avg_v:.2f} B/s | max {max_v:.2f} B/s") + elif key == "size_download": + print(f"{label}: min {min_v:.0f}B | avg {avg_v:.0f}B | max {max_v:.0f}B") + elif key in {"http_code", "redirects"}: + print(f"{label}: min {min_v:.0f} | avg {avg_v:.2f} | max {max_v:.0f}") + else: + print(f"{label}: min {min_v:.6f}s | avg {avg_v:.6f}s | max {max_v:.6f}s") - print(f"Encoding Used: {used_encoding} ({encoding_source})") + finally: + client.close() - stderr_text = proc.stderr.decode("utf-8", errors="replace").strip() - if stderr_text: - print("\nstderr:") - print(stderr_text) - - if proc.returncode != 0: - exit_code = proc.returncode - - if args.show_summary and repeat > 1 and aggregate is not None: - def summarize(values): - if not values: - return None - return (min(values), sum(values)/len(values), max(values)) - - print("\n===== Aggregate Timing =====") - for key, values in aggregate.items(): - summary = summarize(values) - if not summary: - continue - label = key.replace("_", " ").title() - min_v, avg_v, max_v = summary - print(f"{label}: min {min_v:.6f}s | avg {avg_v:.6f}s | max {max_v:.6f}s") - - if exit_code != 0: sys.exit(exit_code) - sys.exit(0) -enabled: true -short_description: "增强的HTTP测试框架(带延时、编码、可观察性)" -description: | - 增强的HTTP测试框架:提供自动URL编码、详细响应/时延输出、重复请求和命令可见性,可用于常规请求、重放、盲注延时观测等场景。 - **能力亮点:** - - 自动URL编码:解决包含空格、引号等字符时curl报错的问题,必要时可手动关闭 - - 时延观测:采集 DNS / TCP / TLS / TTFB / 总耗时,可循环请求计算盲注延时 - - 详细输出:可选响应头、命令、stderr,方便排查 - - 自动编码识别:支持响应内容编码自动检测,可额外指定强制编码 - - 扩展控制:支持代理、超时、重复次数、延迟间隔及原生curl参数透传 + + if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nInterrupted.", file=sys.stderr) + sys.exit(130) +enabled: true +short_description: "纯Python HTTP测试框架(httpx,会话复用+编码守护)" +description: | + 纯 Python 构建的 HTTP 测试框架:基于 httpx 会话,结合 socket 探针补齐 DNS/TCP/TLS 观测, + 提供全流程的请求体编码守护、重复播放与可观测能力,适配盲注延时、复杂 payload 调优等场景。 + + **亮点:** + - 纯 Python 实现:httpx 会话重用、HTTP/2/代理/certs 直接在脚本内配置,无外部二进制依赖 + - 智能 Body 编码:支持 application/x-www-form-urlencoded 二次编码、JSON/文本 charset 推断、 + `@file`/`@-` 注入二进制、可视化调试 + - 连接探针:在无代理场景下额外进行 DNS/TCP/TLS 探测,粗粒度复刻 curl -w 指标 + - 可重复观测:repeat/delay + TTFB/total/speed_download 统计,便于盲注/时序测试 + - 扩展开关:additional_args 解析 http2/cert/verify/trust_env/max_redirects 等 httpx 选项 parameters: - name: "url" type: "string" - description: "目标URL(自动进行路径/查询编码,确保特殊字符安全发送)" + description: "目标URL(可自动编码路径/参数,避免特殊字符导致的请求失败)" required: true flag: "--url" - name: "method" @@ -504,12 +704,12 @@ parameters: flag: "--method" - name: "data" type: "string" - description: "请求数据/参数(JSON、表单、原始payload均可)" + description: "请求数据/参数(支持JSON、表单、原始payload,前缀@file或@-可加载文件/STDIN)" required: false flag: "--data" - name: "headers" type: "string" - description: "自定义请求头(JSON字典、行分隔或以分号分隔的 Header: Value 格式)" + description: "自定义请求头(JSON字典、行分隔或分号分隔的 Header: Value 格式)" required: false flag: "--headers" - name: "cookies" @@ -524,17 +724,17 @@ parameters: flag: "--user-agent" - name: "proxy" type: "string" - description: "代理(curl -x 形式,如 http://127.0.0.1:8080)" + description: "代理(http(s)://或socks5://地址,直接传递给httpx Client)" required: false flag: "--proxy" - name: "timeout" type: "string" - description: "最大超时时间(秒,传递给curl --max-time)" + description: "最大超时时间(秒,支持小数)" required: false flag: "--timeout" - name: "repeat" type: "int" - description: "重复请求次数,用于盲注延时观测(>=1)" + description: "重复请求次数,用于盲注/稳定性观测(>=1)" required: false default: 1 flag: "--repeat" @@ -546,69 +746,70 @@ parameters: flag: "--delay" - name: "include_headers" type: "bool" - description: "输出响应头(等价于curl -i),默认开启" + description: "输出响应头(默认开启,可用 --no-include-headers 关闭)" required: false default: true flag: "--include-headers" - name: "auto_encode_url" type: "bool" - description: "自动URL编码(默认开启,避免出现URL格式错误)" + description: "自动URL编码(默认开启,可通过 --no-auto-encode-url 关闭)" required: false default: true flag: "--auto-encode-url" - name: "follow_redirects" type: "bool" - description: "跟随重定向(curl -L)" + description: "跟随重定向(httpx follow_redirects)" required: false default: false flag: "--follow-redirects" - name: "allow_insecure" type: "bool" - description: "忽略TLS证书错误(curl -k)" + description: "忽略TLS证书错误(verify=False)" required: false default: false flag: "--allow-insecure" - name: "verbose_output" type: "bool" - description: "输出curl调试信息(curl -v)" + description: "输出额外调试信息(请求体字节数、探针延时、历史长度等)" required: false default: false flag: "--verbose-output" - name: "show_command" type: "bool" - description: "打印最终curl命令(含自动编码后的URL),默认开启" + description: "打印请求概览(方法、URL、所有头、Body概况)" required: false default: true flag: "--show-command" - name: "show_summary" type: "bool" - description: "打印高亮摘要(默认开启)" + description: "打印指标摘要(默认开启,含DNS/TCP/TLS/TTFB/Total)" required: false default: true flag: "--show-summary" - name: "debug" type: "bool" - description: "调试模式:打印POST数据的原始值和编码后的值,方便排查编码问题" + description: "调试模式:显示Body编码处理细节(模式/charset/长度)" required: false default: false flag: "--debug" - name: "response_encoding" type: "string" - description: "强制响应解码使用的编码(默认自动检测,可用于指定GBK等场景)" + description: "强制响应解码使用的编码(如GBK),覆盖自动探测" required: false flag: "--response-encoding" - name: "action" type: "string" - description: "保留字段:标识调用意图(request, spider等),当前脚本内部不使用" + description: "保留字段:标识调用意图(request, spider等),脚本内部不使用" required: false default: "request" + flag: "--action" - name: "additional_args" type: "string" description: | - 额外的curl参数(原样透传),多个参数使用空格或带引号的shell风格传入。 - - **示例值:** - - "-H 'Origin: https://target'" - - "--interface tun0 --compressed" + 额外的 httpx 选项,支持 "key=value" 或单词形式: + - "http2=true" + - "cert=/path/to/client.pem" + - "verify=/path/to/ca.pem" 或 "verify=false" + - "trust_env=false"、"max_redirects=5" 等 required: false flag: "--additional-args" diff --git a/tools/httpx.yaml b/tools/httpx.yaml index 85b2ccf5..32114466 100644 --- a/tools/httpx.yaml +++ b/tools/httpx.yaml @@ -1,90 +1,91 @@ name: "httpx" command: "httpx" enabled: true -short_description: "快速HTTP探测和指纹识别工具" +short_description: "基于Python httpx库的HTTP客户端" description: | - HTTPx是一个快速HTTP探测工具,用于发现和验证HTTP服务。 + 该工具包装的是 Python 社区版 httpx CLI(`pip install httpx` 提供),可用于快速向 Web 目标发起请求、调试接口。 - **主要功能:** - - 快速HTTP探测 - - 技术检测 - - 状态码过滤 - - 多线程支持 - - **使用场景:** - - HTTP服务发现 - - 技术栈识别 - - Web应用发现 - - 安全测试 + **提示:** + - 官方 CLI 的调用方式为 `httpx [OPTIONS]` + - 不支持 ProjectDiscovery 版本的 `-u/-l/-td` 等参数,请使用下方列出的原生选项或 additional_args 自行扩展 parameters: - name: "url" type: "string" - description: "单个目标URL(使用 -u 选项)" - required: false - flag: "-u" - format: "flag" - - name: "input_file" + description: "目标URL(必填,作为位置参数传入)" + required: true + format: "positional" + - name: "method" type: "string" - description: "包含多个目标的文件(使用 -l 选项)" + description: "HTTP方法,默认GET" required: false - flag: "-l" + flag: "-m" format: "flag" - - name: "tech_detect" - type: "bool" - description: "启用技术检测" + - name: "content" + type: "string" + description: "原始请求体内容(对应 httpx CLI 的 --content)" required: false - flag: "-td" + flag: "-c" + format: "flag" + - name: "json" + type: "string" + description: "JSON 请求体(字符串形式)" + required: false + flag: "-j" + format: "flag" + - name: "proxy" + type: "string" + description: "代理地址(http(s):// 或 socks5://)" + required: false + flag: "--proxy" + format: "flag" + - name: "timeout" + type: "string" + description: "网络超时时间(秒,可为小数)" + required: false + flag: "--timeout" + format: "flag" + - name: "follow_redirects" + type: "bool" + description: "是否自动跟随重定向" + required: false + flag: "--follow-redirects" format: "flag" default: false - - name: "status_code" + - name: "no_verify" type: "bool" - description: "显示状态码" + description: "关闭TLS证书校验(对应 --no-verify)" required: false - flag: "-sc" + flag: "--no-verify" format: "flag" default: false - - name: "content_length" + - name: "http2" type: "bool" - description: "显示内容长度" + description: "启用HTTP/2" required: false - flag: "-cl" + flag: "--http2" format: "flag" default: false - - name: "title" - type: "bool" - description: "显示页面标题" + - name: "download" + type: "string" + description: "将响应内容保存至文件" required: false - flag: "-title" + flag: "--download" + format: "flag" + - name: "verbose" + type: "bool" + description: "显示请求与响应的详细信息" + required: false + flag: "-v" format: "flag" default: false - - name: "web_server" - type: "bool" - description: "显示Web服务器" - required: false - flag: "-server" - format: "flag" - default: false - - name: "threads" - type: "int" - description: "线程数" - required: false - flag: "-t" - format: "flag" - default: 50 - name: "additional_args" type: "string" description: | - 额外的HTTPx参数。用于传递未在参数列表中定义的HTTPx选项。 + 额外 httpx CLI 选项,格式直接与官方命令保持一致。 - **示例值:** - - "-o output.txt": 输出到文件 - - "-json": JSON格式输出 - - "-silent": 安静模式 - - "-rate-limit 100": 限制请求速率 - - **注意事项:** - - 多个参数用空格分隔 - - 确保参数格式正确,避免命令注入 - - 此参数会直接追加到命令末尾 + **示例:** + - "--headers 'X-Test 1' 'X-Token secret'" + - "--cookies 'session abc123'" + - "--auth user pass" required: false format: "positional"