name: "quake_search" command: "python3" args: - "-c" - | import sys import json import requests import os # ==================== Quake配置 ==================== # 请在此处配置您的Quake API Token # 您也可以在环境变量中设置:QUAKE_API_KEY # enable 默认为 false,需开启才能调用该MCP QUAKE_API_KEY = "" # 请填写您的Quake API Token # ================================================== # Quake API基础URL base_url = "https://quake.360.cn/api/v3/search/quake_service" # 解析参数(从JSON字符串或命令行参数) def parse_args(): # 尝试从第一个参数读取JSON配置 if len(sys.argv) > 1: try: arg1 = str(sys.argv[1]) config = json.loads(arg1) if isinstance(config, dict): return config except (json.JSONDecodeError, TypeError, ValueError): pass # 传统位置参数方式(向后兼容) # 参数位置:query=1, size=2, start=3, fields=4, latest=5 config = {} if len(sys.argv) > 1: config["query"] = str(sys.argv[1]) if len(sys.argv) > 2: try: config["size"] = int(sys.argv[2]) except (ValueError, TypeError): pass if len(sys.argv) > 3: try: config["start"] = int(sys.argv[3]) except (ValueError, TypeError): pass if len(sys.argv) > 4: config["fields"] = str(sys.argv[4]) if len(sys.argv) > 5: val = sys.argv[5] if isinstance(val, str): config["latest"] = val.lower() in ("true", "1", "yes") else: config["latest"] = bool(val) return config # 标准化 fields 参数:支持字符串和数组 def normalize_fields(fields_value): if fields_value is None: return None if isinstance(fields_value, str): raw = fields_value.strip() if not raw: return None return [x.strip() for x in raw.split(",") if x.strip()] if isinstance(fields_value, list): output = [] for item in fields_value: text = str(item).strip() if text: output.append(text) return output or None return None try: config = parse_args() if not isinstance(config, dict): error_result = { "status": "error", "message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}", "type": "TypeError" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) api_key = os.getenv("QUAKE_API_KEY", QUAKE_API_KEY).strip() query = str(config.get("query", "")).strip() if not api_key: error_result = { "status": "error", "message": "缺少Quake配置: api_key(Quake API Token)", "required_config": ["api_key"], "note": "请在YAML文件的QUAKE_API_KEY配置项中填写Token,或在环境变量QUAKE_API_KEY中设置。Token可在Quake用户中心获取。" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) if not query: error_result = { "status": "error", "message": "缺少必需参数: query(搜索查询语句)", "required_params": ["query"], "examples": [ 'domain:"example.com"', 'ip:"1.1.1.1"', 'port:443', 'service.name:"http"', 'port:22 AND country_cn:"中国"' ] } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) # 构建请求体 data = { "query": query } # 可选参数 size(通常最大100) if "size" in config and config["size"] is not None: try: size = int(config["size"]) if size > 0: data["size"] = size except (ValueError, TypeError): pass # 可选参数 start(分页偏移,默认0) if "start" in config and config["start"] is not None: try: start = int(config["start"]) if start >= 0: data["start"] = start except (ValueError, TypeError): pass # fields 映射到 Quake 的 include 字段 include_fields = normalize_fields(config.get("fields")) if include_fields: data["include"] = include_fields # latest 参数,默认 true(取最新索引结果) latest_value = config.get("latest", True) if isinstance(latest_value, bool): data["latest"] = latest_value elif isinstance(latest_value, str): data["latest"] = latest_value.lower() in ("true", "1", "yes") elif isinstance(latest_value, (int, float)): data["latest"] = latest_value != 0 else: data["latest"] = True headers = { "X-QuakeToken": api_key, "Content-Type": "application/json" } try: response = requests.post(base_url, json=data, headers=headers, timeout=30) response.raise_for_status() result_data = response.json() # Quake API code==0 表示成功 if result_data.get("code") != 0: error_result = { "status": "error", "message": f"Quake API错误: {result_data.get('message', '未知错误')}", "error_code": result_data.get("code", "unknown"), "suggestion": "请检查API Token、查询语法和账户积分是否正常" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) results = result_data.get("data", []) meta = result_data.get("meta", {}) pagination = meta.get("pagination", {}) if isinstance(meta, dict) else {} output = { "status": "success", "query": query, "size": data.get("size", pagination.get("size", len(results))), "start": data.get("start", pagination.get("page_index", 0)), "total": result_data.get("total_count", pagination.get("total", 0)), "results_count": len(results), "fields": include_fields or "all", "results": results, "message": f"成功获取 {len(results)} 条结果" } print(json.dumps(output, ensure_ascii=False, indent=2)) except requests.exceptions.RequestException as e: error_result = { "status": "error", "message": f"请求失败: {str(e)}", "suggestion": "请检查网络连通性或Quake API服务状态" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) except Exception as e: error_result = { "status": "error", "message": f"执行出错: {str(e)}", "type": type(e).__name__ } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) enabled: false short_description: "Quake网络空间搜索接口,支持自定义query、size、fields" description: | Quake(360 网络空间测绘)资产搜索工具,调用 Quake API v3 实时检索互联网资产。 **主要功能:** - 支持 Quake DSL 查询语法(query) - 支持返回数量控制(size) - 支持字段裁剪(fields,对应 Quake include) - 支持分页偏移(start) **鉴权方式:** - Header 使用 `X-QuakeToken` - 可在本文件中填写 `QUAKE_API_KEY`,或通过环境变量 `QUAKE_API_KEY` 注入 **常见查询示例:** - `domain:"example.com"` - `ip:"1.1.1.1"` - `port:443` - `service.name:"http" AND country_cn:"中国"` **注意事项:** - API 调用会消耗积分,请按需控制 `size` - `fields` 会映射到请求体 `include` 字段,多个字段用英文逗号分隔 - 如遇语法报错,请先在 Quake 控制台验证 DSL parameters: - name: "query" type: "string" description: | Quake DSL 查询语句(必需)。 **示例:** - `domain:"example.com"` - `ip:"1.1.1.1"` - `port:443` - `service.name:"http" AND country_cn:"中国"` required: true position: 1 format: "positional" - name: "size" type: "int" description: | 返回结果数量(可选)。 建议范围:1-100(具体受账户权限/接口限制影响)。 required: false position: 2 format: "positional" default: 10 - name: "start" type: "int" description: | 分页起始偏移(可选),从 0 开始。 required: false position: 3 format: "positional" default: 0 - name: "fields" type: "string" description: | 返回字段(可选),多个字段用英文逗号分隔。 该参数会映射到 Quake 请求体中的 `include` 字段。 **示例:** - `ip,port` - `ip,port,service.name,service.http.title,location.country_cn` required: false position: 4 format: "positional" default: "ip,port" - name: "latest" type: "bool" description: | 是否优先返回最新索引结果(可选)。 默认 `true`。 required: false position: 5 format: "positional" default: true