diff --git a/tools/http-framework-test.yaml b/tools/http-framework-test.yaml index 20d777d3..eea4ff08 100644 --- a/tools/http-framework-test.yaml +++ b/tools/http-framework-test.yaml @@ -440,6 +440,230 @@ args: print("Body: ") + def compile_response_filter(pattern: str, ignore_case: bool): + flags = 0 + if ignore_case: + flags |= re.IGNORECASE + try: + return re.compile(pattern, flags) + except re.error as exc: + print(f"Invalid response_filter regex: {exc}", file=sys.stderr) + sys.exit(2) + + + def truncate_utf8(text: str, max_bytes: int) -> Tuple[str, bool]: + if max_bytes <= 0 or not text: + return text, False + encoded = text.encode("utf-8", errors="replace") + if len(encoded) <= max_bytes: + return text, False + truncated = encoded[:max_bytes].decode("utf-8", errors="ignore") + return truncated, True + + + def cap_line_entries(entries: List[Tuple[int, str]], max_lines: int) -> Tuple[List[Tuple[int, str]], bool]: + if max_lines <= 0 or len(entries) <= max_lines: + return entries, False + return entries[:max_lines], True + + + def expand_line_context(line_numbers: List[int], total_lines: int, context: int) -> List[int]: + if context <= 0: + return sorted(set(line_numbers)) + included = set() + for num in line_numbers: + start = max(1, num - context) + end = min(total_lines, num + context) + for i in range(start, end + 1): + included.add(i) + return sorted(included) + + + def format_line_entries(lines: List[str], indices: List[int], ellipsis_gaps: bool = True) -> str: + if not indices: + return "" + chunks = [] + prev = None + for num in indices: + if ellipsis_gaps and prev is not None and num > prev + 1: + chunks.append(" ...") + chunks.append(f" L{num}: {lines[num - 1]}") + prev = num + return "\n".join(chunks) + + + def filter_body_by_lines( + lines: List[str], + compiled: "re.Pattern", + invert: bool, + context_lines: int, + max_lines: int, + ) -> Tuple[str, Dict[str, object]]: + matched_nums = [] + for idx, line in enumerate(lines, start=1): + hit = compiled.search(line) is not None + if invert: + hit = not hit + if hit: + matched_nums.append(idx) + total = len(lines) + meta = { + "mode": "line", + "total_lines": total, + "matched_lines": len(matched_nums), + "invert": invert, + "truncated": False, + "byte_truncated": False, + } + if not matched_nums: + return "", meta + display_nums = expand_line_context(matched_nums, total, context_lines) + entries = [(n, lines[n - 1]) for n in display_nums] + entries, line_capped = cap_line_entries(entries, max_lines) + meta["truncated"] = line_capped + meta["display_lines"] = len(entries) + return format_line_entries(lines, [n for n, _ in entries], ellipsis_gaps=context_lines > 0), meta + + + def filter_body_multiline( + text: str, + compiled: "re.Pattern", + invert: bool, + max_lines: int, + dotall: bool, + ) -> Tuple[str, Dict[str, object]]: + flags = compiled.flags + if dotall: + pattern = re.compile(compiled.pattern, flags | re.DOTALL | re.MULTILINE) + else: + pattern = re.compile(compiled.pattern, flags | re.MULTILINE) + matches = list(pattern.finditer(text)) + if invert: + if matches: + return "", {"mode": "multiline" if not dotall else "full", "total_lines": text.count("\n") + (1 if text else 0), "matched_lines": 0, "invert": True, "truncated": False, "byte_truncated": False} + output = text + meta = {"mode": "multiline" if not dotall else "full", "matched_lines": 1, "invert": True, "truncated": False, "byte_truncated": False} + lines = text.splitlines() + if max_lines > 0 and len(lines) > max_lines: + output = "\n".join(lines[:max_lines]) + meta["truncated"] = True + meta["total_lines"] = len(lines) + meta["display_lines"] = min(len(lines), max_lines) if max_lines > 0 else len(lines) + return output, meta + chunks = [] + for match in matches: + snippet = match.group(0) + if "\n" in snippet: + snippet = snippet.replace("\n", "\\n") + start_line = text.count("\n", 0, match.start()) + 1 + chunks.append((start_line, f" @{start_line}: {snippet}")) + entries, line_capped = cap_line_entries(chunks, max_lines if max_lines > 0 else len(chunks)) + meta = { + "mode": "multiline" if not dotall else "full", + "total_lines": text.count("\n") + (1 if text else 0), + "matched_lines": len(matches), + "invert": False, + "truncated": line_capped, + "byte_truncated": False, + "display_lines": len(entries), + } + return "\n".join(line for _, line in entries), meta + + + def apply_body_limits_plain(text: str, max_lines: int) -> Tuple[str, Dict[str, object]]: + lines = text.splitlines() + meta = { + "mode": "plain", + "total_lines": len(lines), + "matched_lines": len(lines), + "invert": False, + "truncated": False, + "byte_truncated": False, + "display_lines": len(lines), + } + output = text + if max_lines > 0 and len(lines) > max_lines: + output = "\n".join(lines[:max_lines]) + meta["truncated"] = True + meta["display_lines"] = max_lines + return output, meta + + + def format_response_body_output( + decoded_body: str, + filter_pattern: str, + filter_mode: str, + filter_invert: bool, + filter_ignore_case: bool, + max_lines: int, + max_bytes: int, + preview_lines: int, + context_lines: int, + compiled_filter=None, + ) -> Tuple[str, Dict[str, object]]: + text = decoded_body.rstrip("\r\n") + if not text: + return "", {"mode": "empty", "total_lines": 0, "matched_lines": 0, "invert": filter_invert, "truncated": False, "byte_truncated": False, "display_lines": 0} + + lines = text.splitlines() + mode = (filter_mode or "line").strip().lower() + if mode not in {"line", "multiline", "full"}: + mode = "line" + + if filter_pattern: + compiled = compiled_filter or compile_response_filter(filter_pattern, filter_ignore_case) + if mode == "line": + output, meta = filter_body_by_lines(lines, compiled, filter_invert, context_lines, max_lines) + else: + output, meta = filter_body_multiline(text, compiled, filter_invert, max_lines, dotall=(mode == "full")) + meta["filter_pattern"] = filter_pattern + if not output and not filter_invert: + preview = min(max(preview_lines, 0), len(lines)) + if preview > 0: + preview_text = format_line_entries(lines, list(range(1, preview + 1)), ellipsis_gaps=False) + preview_text, byte_truncated = truncate_utf8(preview_text, max_bytes) + return preview_text, { + **meta, + "preview": True, + "matched_lines": 0, + "display_lines": preview, + "byte_truncated": byte_truncated, + } + return "", {**meta, "preview": False, "matched_lines": 0, "display_lines": 0} + else: + output, meta = apply_body_limits_plain(text, max_lines) + + output, byte_truncated = truncate_utf8(output, max_bytes) + if byte_truncated: + meta["byte_truncated"] = True + return output, meta + + + def print_response_body_summary(meta: Dict[str, object]): + mode = meta.get("mode") + if mode == "empty": + return + parts = [f"mode={mode}"] + if meta.get("filter_pattern"): + parts.append(f"pattern={meta['filter_pattern']!r}") + if meta.get("invert"): + parts.append("invert=true") + total = meta.get("total_lines") + matched = meta.get("matched_lines") + displayed = meta.get("display_lines") + if total is not None and matched is not None: + parts.append(f"matched {matched}/{total} lines") + if displayed is not None: + parts.append(f"showing {displayed}") + if meta.get("preview"): + parts.append("preview on zero match") + if meta.get("truncated"): + parts.append("line cap applied") + if meta.get("byte_truncated"): + parts.append("byte cap applied") + print(f"[body] {' | '.join(parts)}") + + def main(): parser = argparse.ArgumentParser(description="Pure Python HTTP testing helper powered by httpx") parser.add_argument("--url", required=True) @@ -466,6 +690,16 @@ args: parser.add_argument("--debug", dest="debug", action="store_true") parser.add_argument("--response-encoding", dest="response_encoding", default="") parser.add_argument("--download", dest="download", default="") + parser.add_argument("--response-filter", dest="response_filter", default="") + parser.add_argument("--response-filter-mode", dest="response_filter_mode", default="line") + parser.add_argument("--response-filter-invert", dest="response_filter_invert", action="store_true") + parser.add_argument("--no-response-filter-invert", dest="response_filter_invert", action="store_false") + parser.add_argument("--response-filter-ignore-case", dest="response_filter_ignore_case", action="store_true") + parser.add_argument("--no-response-filter-ignore-case", dest="response_filter_ignore_case", action="store_false") + parser.add_argument("--response-max-lines", dest="response_max_lines", type=int, default=0) + parser.add_argument("--response-max-bytes", dest="response_max_bytes", type=int, default=0) + parser.add_argument("--response-preview-lines", dest="response_preview_lines", type=int, default=5) + parser.add_argument("--response-context-lines", dest="response_context_lines", type=int, default=0) parser.set_defaults( include_headers=False, auto_encode_url=False, @@ -475,9 +709,22 @@ args: show_command=False, show_summary=False, debug=False, + response_filter_invert=False, + response_filter_ignore_case=False, ) args = parser.parse_args() + response_filter = (args.response_filter or "").strip() + response_max_lines = max(0, args.response_max_lines or 0) + response_max_bytes = max(0, args.response_max_bytes or 0) + response_preview_lines = max(0, args.response_preview_lines if args.response_preview_lines is not None else 5) + response_context_lines = max(0, args.response_context_lines or 0) + compiled_response_filter = None + if response_filter: + compiled_response_filter = compile_response_filter( + response_filter, args.response_filter_ignore_case + ) + repeat = max(1, args.repeat) try: delay_between = float(args.delay or "0") @@ -648,9 +895,37 @@ args: for key, value in response.headers.items(): print(f"{key}: {value}") print("") - output_body = decoded_body.rstrip() + output_body, body_output_meta = format_response_body_output( + decoded_body, + response_filter, + args.response_filter_mode, + args.response_filter_invert, + args.response_filter_ignore_case, + response_max_lines, + response_max_bytes, + response_preview_lines, + response_context_lines, + compiled_filter=compiled_response_filter, + ) + has_filter_or_cap = bool( + response_filter or response_max_lines > 0 or response_max_bytes > 0 + ) + if has_filter_or_cap and body_output_meta.get("mode") != "empty": + print_response_body_summary(body_output_meta) + if body_output_meta.get("preview") and not body_output_meta.get("matched_lines"): + print("[body] no regex match; showing preview:") if output_body: print(output_body) + if body_output_meta.get("truncated") or body_output_meta.get("byte_truncated"): + omitted = (body_output_meta.get("total_lines") or 0) - ( + body_output_meta.get("display_lines") or 0 + ) + if omitted > 0: + print(f"[body] ... {omitted} more line(s) omitted (use --download for full body)") + elif body_output_meta.get("mode") == "empty": + print("[no body]") + elif response_filter and not body_output_meta.get("preview"): + print("[body] no lines matched filter") else: print("[no body]") @@ -729,6 +1004,13 @@ description: | - 连接探针:在无代理场景下额外进行 DNS/TCP/TLS 探测,粗粒度复刻 curl -w 指标 - 可重复观测:repeat/delay + TTFB/total/speed_download 统计,便于盲注/时序测试 - 扩展开关:additional_args 解析 http2/cert/verify/trust_env/max_redirects 等 httpx 选项 + - 响应体瘦身:response_filter 按行/块正则提取,配合 max_lines/max_bytes 限制 stdout,降低 Agent token 消耗 + + **响应过滤最佳实践:** + - 大页面/HTML:用 `response_filter` 抓 error|exception|password|token|uid 等关键字行 + - 无 filter 时:设 `response_max_lines=80` 或 `response_max_bytes=8192` 防止整页灌入上下文 + - 0 命中:自动预览前 `response_preview_lines` 行,避免误判「空响应」 + - 完整留存:大 body 用 `download` 落盘,stdout 只保留摘要行 parameters: - name: "url" type: "string" @@ -836,6 +1118,56 @@ parameters: description: "强制响应解码使用的编码(如GBK),覆盖自动探测" required: false flag: "--response-encoding" + - name: "response_filter" + type: "string" + description: | + 响应体正则过滤(仅影响 stdout,不影响 --download 与指标)。 + 默认 line 模式按行匹配;示例:'(error|exception|SQL|password|token|uid)'。 + 与 response_max_lines/response_max_bytes 配合可显著减少 token 消耗。 + required: false + flag: "--response-filter" + - name: "response_filter_mode" + type: "string" + description: "过滤模式:line(按行,默认)、multiline(跨行块)、full(整段 DOTALL 匹配)" + required: false + default: "line" + flag: "--response-filter-mode" + - name: "response_filter_invert" + type: "bool" + description: "反向过滤:输出不匹配 regex 的行(用于剔除 HTML 噪音)" + required: false + default: false + flag: "--response-filter-invert" + - name: "response_filter_ignore_case" + type: "bool" + description: "正则忽略大小写" + required: false + default: false + flag: "--response-filter-ignore-case" + - name: "response_max_lines" + type: "int" + description: "stdout 最多输出行数(0=不限制);有 filter 时限制命中行数,无 filter 时截断全文" + required: false + default: 0 + flag: "--response-max-lines" + - name: "response_max_bytes" + type: "int" + description: "stdout 响应体 UTF-8 字节上限(0=不限制),超出部分截断" + required: false + default: 0 + flag: "--response-max-bytes" + - name: "response_preview_lines" + type: "int" + description: "filter 零命中时预览的前 N 行(默认 5,0=不预览)" + required: false + default: 5 + flag: "--response-preview-lines" + - name: "response_context_lines" + type: "int" + description: "line 模式下命中行上下各保留 N 行上下文(类似 grep -C)" + required: false + default: 0 + flag: "--response-context-lines" - name: "action" type: "string" description: "保留字段:标识调用意图(request, spider等),脚本内部不使用"