From f658cc6e939c2d8f536abea37854e70369140cc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 8 Apr 2026 23:43:20 +0800 Subject: [PATCH] Add files via upload --- tools/quake_search.yaml | 293 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 tools/quake_search.yaml diff --git a/tools/quake_search.yaml b/tools/quake_search.yaml new file mode 100644 index 00000000..5aa41145 --- /dev/null +++ b/tools/quake_search.yaml @@ -0,0 +1,293 @@ +name: "quake_search" +command: "python3" +args: + - "-c" + - | + import sys + import json + import requests + import os + + # ==================== Quake配置 ==================== + # 请在此处配置您的Quake API Token + # 您也可以在环境变量中设置:QUAKE_API_KEY + # enable 默认为 false,需开启才能调用该MCP + QUAKE_API_KEY = "" # 请填写您的Quake API Token + # ================================================== + + # Quake API基础URL + base_url = "https://quake.360.cn/api/v3/search/quake_service" + + # 解析参数(从JSON字符串或命令行参数) + def parse_args(): + # 尝试从第一个参数读取JSON配置 + if len(sys.argv) > 1: + try: + arg1 = str(sys.argv[1]) + config = json.loads(arg1) + if isinstance(config, dict): + return config + except (json.JSONDecodeError, TypeError, ValueError): + pass + + # 传统位置参数方式(向后兼容) + # 参数位置:query=1, size=2, start=3, fields=4, latest=5 + config = {} + if len(sys.argv) > 1: + config["query"] = str(sys.argv[1]) + if len(sys.argv) > 2: + try: + config["size"] = int(sys.argv[2]) + except (ValueError, TypeError): + pass + if len(sys.argv) > 3: + try: + config["start"] = int(sys.argv[3]) + except (ValueError, TypeError): + pass + if len(sys.argv) > 4: + config["fields"] = str(sys.argv[4]) + if len(sys.argv) > 5: + val = sys.argv[5] + if isinstance(val, str): + config["latest"] = val.lower() in ("true", "1", "yes") + else: + config["latest"] = bool(val) + return config + + # 标准化 fields 参数:支持字符串和数组 + def normalize_fields(fields_value): + if fields_value is None: + return None + + if isinstance(fields_value, str): + raw = fields_value.strip() + if not raw: + return None + return [x.strip() for x in raw.split(",") if x.strip()] + + if isinstance(fields_value, list): + output = [] + for item in fields_value: + text = str(item).strip() + if text: + output.append(text) + return output or None + + return None + + try: + config = parse_args() + + if not isinstance(config, dict): + error_result = { + "status": "error", + "message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}", + "type": "TypeError" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + api_key = os.getenv("QUAKE_API_KEY", QUAKE_API_KEY).strip() + query = str(config.get("query", "")).strip() + + if not api_key: + error_result = { + "status": "error", + "message": "缺少Quake配置: api_key(Quake API Token)", + "required_config": ["api_key"], + "note": "请在YAML文件的QUAKE_API_KEY配置项中填写Token,或在环境变量QUAKE_API_KEY中设置。Token可在Quake用户中心获取。" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + if not query: + error_result = { + "status": "error", + "message": "缺少必需参数: query(搜索查询语句)", + "required_params": ["query"], + "examples": [ + 'domain:"example.com"', + 'ip:"1.1.1.1"', + 'port:443', + 'service.name:"http"', + 'port:22 AND country_cn:"中国"' + ] + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + # 构建请求体 + data = { + "query": query + } + + # 可选参数 size(通常最大100) + if "size" in config and config["size"] is not None: + try: + size = int(config["size"]) + if size > 0: + data["size"] = size + except (ValueError, TypeError): + pass + + # 可选参数 start(分页偏移,默认0) + if "start" in config and config["start"] is not None: + try: + start = int(config["start"]) + if start >= 0: + data["start"] = start + except (ValueError, TypeError): + pass + + # fields 映射到 Quake 的 include 字段 + include_fields = normalize_fields(config.get("fields")) + if include_fields: + data["include"] = include_fields + + # latest 参数,默认 true(取最新索引结果) + latest_value = config.get("latest", True) + if isinstance(latest_value, bool): + data["latest"] = latest_value + elif isinstance(latest_value, str): + data["latest"] = latest_value.lower() in ("true", "1", "yes") + elif isinstance(latest_value, (int, float)): + data["latest"] = latest_value != 0 + else: + data["latest"] = True + + headers = { + "X-QuakeToken": api_key, + "Content-Type": "application/json" + } + + try: + response = requests.post(base_url, json=data, headers=headers, timeout=30) + response.raise_for_status() + result_data = response.json() + + # Quake API code==0 表示成功 + if result_data.get("code") != 0: + error_result = { + "status": "error", + "message": f"Quake API错误: {result_data.get('message', '未知错误')}", + "error_code": result_data.get("code", "unknown"), + "suggestion": "请检查API Token、查询语法和账户积分是否正常" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + results = result_data.get("data", []) + meta = result_data.get("meta", {}) + pagination = meta.get("pagination", {}) if isinstance(meta, dict) else {} + + output = { + "status": "success", + "query": query, + "size": data.get("size", pagination.get("size", len(results))), + "start": data.get("start", pagination.get("page_index", 0)), + "total": result_data.get("total_count", pagination.get("total", 0)), + "results_count": len(results), + "fields": include_fields or "all", + "results": results, + "message": f"成功获取 {len(results)} 条结果" + } + + print(json.dumps(output, ensure_ascii=False, indent=2)) + + except requests.exceptions.RequestException as e: + error_result = { + "status": "error", + "message": f"请求失败: {str(e)}", + "suggestion": "请检查网络连通性或Quake API服务状态" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + except Exception as e: + error_result = { + "status": "error", + "message": f"执行出错: {str(e)}", + "type": type(e).__name__ + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) +enabled: false +short_description: "Quake网络空间搜索接口,支持自定义query、size、fields" +description: | + Quake(360 网络空间测绘)资产搜索工具,调用 Quake API v3 实时检索互联网资产。 + + **主要功能:** + - 支持 Quake DSL 查询语法(query) + - 支持返回数量控制(size) + - 支持字段裁剪(fields,对应 Quake include) + - 支持分页偏移(start) + + **鉴权方式:** + - Header 使用 `X-QuakeToken` + - 可在本文件中填写 `QUAKE_API_KEY`,或通过环境变量 `QUAKE_API_KEY` 注入 + + **常见查询示例:** + - `domain:"example.com"` + - `ip:"1.1.1.1"` + - `port:443` + - `service.name:"http" AND country_cn:"中国"` + + **注意事项:** + - API 调用会消耗积分,请按需控制 `size` + - `fields` 会映射到请求体 `include` 字段,多个字段用英文逗号分隔 + - 如遇语法报错,请先在 Quake 控制台验证 DSL +parameters: + - name: "query" + type: "string" + description: | + Quake DSL 查询语句(必需)。 + + **示例:** + - `domain:"example.com"` + - `ip:"1.1.1.1"` + - `port:443` + - `service.name:"http" AND country_cn:"中国"` + required: true + position: 1 + format: "positional" + - name: "size" + type: "int" + description: | + 返回结果数量(可选)。 + + 建议范围:1-100(具体受账户权限/接口限制影响)。 + required: false + position: 2 + format: "positional" + default: 10 + - name: "start" + type: "int" + description: | + 分页起始偏移(可选),从 0 开始。 + required: false + position: 3 + format: "positional" + default: 0 + - name: "fields" + type: "string" + description: | + 返回字段(可选),多个字段用英文逗号分隔。 + + 该参数会映射到 Quake 请求体中的 `include` 字段。 + **示例:** + - `ip,port` + - `ip,port,service.name,service.http.title,location.country_cn` + required: false + position: 4 + format: "positional" + default: "ip,port" + - name: "latest" + type: "bool" + description: | + 是否优先返回最新索引结果(可选)。 + 默认 `true`。 + required: false + position: 5 + format: "positional" + default: true