name: "http-intruder" command: "python3" args: - "-c" - | import json import sys import time from urllib.parse import urlencode, urlparse, parse_qs, urlunparse import requests if len(sys.argv) < 3: sys.stderr.write("需要至少URL和载荷\n") sys.exit(1) url = sys.argv[1] method = (sys.argv[2] or "GET").upper() location = (sys.argv[3] or "query").lower() params_input = sys.argv[4] if len(sys.argv) > 4 else "{}" payloads_json = sys.argv[5] if len(sys.argv) > 5 else "[]" max_requests = int(sys.argv[6]) if len(sys.argv) > 6 and sys.argv[6] else 0 try: # 框架会将 object 类型序列化为 JSON 字符串传递 # sys.argv 中的参数都是字符串,需要解析 JSON if params_input and params_input.strip(): params_template = json.loads(params_input) if not isinstance(params_template, dict): sys.stderr.write("参数模板必须是字典格式\n") sys.exit(1) else: params_template = {} except json.JSONDecodeError as exc: sys.stderr.write(f"参数模板解析失败(需要 JSON 字典格式): {exc}\n") sys.exit(1) try: # 框架会将 array 类型转换为逗号分隔的字符串(见 formatParamValue) # 但为了兼容性,也支持 JSON 数组格式 if payloads_json and payloads_json.strip(): payloads_str = payloads_json.strip() # 优先尝试解析为 JSON 数组 if payloads_str.startswith("["): try: payloads = json.loads(payloads_str) except json.JSONDecodeError: # JSON 解析失败,尝试逗号分隔格式 payloads = [item.strip() for item in payloads_str.split(",") if item.strip()] else: # 逗号分隔的字符串(框架的 array 类型默认格式) payloads = [item.strip() for item in payloads_str.split(",") if item.strip()] if not isinstance(payloads, list): sys.stderr.write("载荷必须是数组格式\n") sys.exit(1) else: payloads = [] except (json.JSONDecodeError, ValueError) as exc: sys.stderr.write(f"载荷解析失败(需要 JSON 数组或逗号分隔格式): {exc}\n") sys.exit(1) if not isinstance(payloads, list) or not payloads: sys.stderr.write("载荷列表不能为空\n") sys.exit(1) param_names = list(params_template.keys()) if not param_names: sys.stderr.write("参数模板不能为空\n") sys.exit(1) session = requests.Session() sent = 0 def build_query(original_url, data): parsed = urlparse(original_url) existing = {k: v[0] for k, v in parse_qs(parsed.query, keep_blank_values=True).items()} existing.update(data) new_query = urlencode(existing, doseq=True) return urlunparse(parsed._replace(query=new_query)) for param in param_names: for payload in payloads: if max_requests and sent >= max_requests: break payload_str = str(payload) if location == "query": new_url = build_query(url, {param: payload_str}) response = session.request(method, new_url) elif location == "body": body = params_template.copy() body[param] = payload_str response = session.request(method, url, data=body) elif location == "headers": headers = params_template.copy() headers[param] = payload_str response = session.request(method, url, headers=headers) elif location == "cookie": cookies = params_template.copy() cookies[param] = payload_str response = session.request(method, url, cookies=cookies) else: sys.stderr.write(f"不支持的位置: {location}\n") sys.exit(1) sent += 1 length = len(response.content) print(f"[{sent}] {param} = {payload_str} -> {response.status_code} ({length} bytes)") if max_requests and sent >= max_requests: break if sent == 0: sys.stderr.write("未发送任何请求,请检查参数配置。\n") enabled: true short_description: "简单的Intruder(sniper)模糊测试工具" description: | 轻量级HTTP“狙击手”模式模糊器,对每个参数逐一替换载荷并记录响应。 parameters: - name: "url" type: "string" description: "目标URL" required: true position: 0 format: "positional" - name: "method" type: "string" description: "HTTP方法(默认GET)" required: false default: "GET" position: 1 format: "positional" - name: "location" type: "string" description: "载荷位置(query, body, headers, cookie)" required: false default: "query" position: 2 format: "positional" - name: "params" type: "object" description: "参数模板(字典格式),指定要模糊的键及默认值,如 {\"id\": \"1\", \"name\": \"test\"}" required: true position: 3 format: "positional" - name: "payloads" type: "array" item_type: "string" description: "载荷列表(数组格式),如 [\"test1\", \"test2\", \"test3\"]" required: true position: 4 format: "positional" - name: "max_requests" type: "int" description: "最大请求数(0表示全部)" required: false default: 0 position: 5 format: "positional"