Add files via upload

This commit is contained in:
公明
2026-04-08 23:43:20 +08:00
committed by GitHub
parent 7bf0697526
commit f658cc6e93
+293
View File
@@ -0,0 +1,293 @@
name: "quake_search"
command: "python3"
args:
- "-c"
- |
import sys
import json
import requests
import os
# ==================== Quake配置 ====================
# 请在此处配置您的Quake API Token
# 您也可以在环境变量中设置:QUAKE_API_KEY
# enable 默认为 false,需开启才能调用该MCP
QUAKE_API_KEY = "" # 请填写您的Quake API Token
# ==================================================
# Quake API基础URL
base_url = "https://quake.360.cn/api/v3/search/quake_service"
# 解析参数(从JSON字符串或命令行参数)
def parse_args():
# 尝试从第一个参数读取JSON配置
if len(sys.argv) > 1:
try:
arg1 = str(sys.argv[1])
config = json.loads(arg1)
if isinstance(config, dict):
return config
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 传统位置参数方式(向后兼容)
# 参数位置:query=1, size=2, start=3, fields=4, latest=5
config = {}
if len(sys.argv) > 1:
config["query"] = str(sys.argv[1])
if len(sys.argv) > 2:
try:
config["size"] = int(sys.argv[2])
except (ValueError, TypeError):
pass
if len(sys.argv) > 3:
try:
config["start"] = int(sys.argv[3])
except (ValueError, TypeError):
pass
if len(sys.argv) > 4:
config["fields"] = str(sys.argv[4])
if len(sys.argv) > 5:
val = sys.argv[5]
if isinstance(val, str):
config["latest"] = val.lower() in ("true", "1", "yes")
else:
config["latest"] = bool(val)
return config
# 标准化 fields 参数:支持字符串和数组
def normalize_fields(fields_value):
if fields_value is None:
return None
if isinstance(fields_value, str):
raw = fields_value.strip()
if not raw:
return None
return [x.strip() for x in raw.split(",") if x.strip()]
if isinstance(fields_value, list):
output = []
for item in fields_value:
text = str(item).strip()
if text:
output.append(text)
return output or None
return None
try:
config = parse_args()
if not isinstance(config, dict):
error_result = {
"status": "error",
"message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}",
"type": "TypeError"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
api_key = os.getenv("QUAKE_API_KEY", QUAKE_API_KEY).strip()
query = str(config.get("query", "")).strip()
if not api_key:
error_result = {
"status": "error",
"message": "缺少Quake配置: api_keyQuake API Token",
"required_config": ["api_key"],
"note": "请在YAML文件的QUAKE_API_KEY配置项中填写Token,或在环境变量QUAKE_API_KEY中设置。Token可在Quake用户中心获取。"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
if not query:
error_result = {
"status": "error",
"message": "缺少必需参数: query(搜索查询语句)",
"required_params": ["query"],
"examples": [
'domain:"example.com"',
'ip:"1.1.1.1"',
'port:443',
'service.name:"http"',
'port:22 AND country_cn:"中国"'
]
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
# 构建请求体
data = {
"query": query
}
# 可选参数 size(通常最大100)
if "size" in config and config["size"] is not None:
try:
size = int(config["size"])
if size > 0:
data["size"] = size
except (ValueError, TypeError):
pass
# 可选参数 start(分页偏移,默认0)
if "start" in config and config["start"] is not None:
try:
start = int(config["start"])
if start >= 0:
data["start"] = start
except (ValueError, TypeError):
pass
# fields 映射到 Quake 的 include 字段
include_fields = normalize_fields(config.get("fields"))
if include_fields:
data["include"] = include_fields
# latest 参数,默认 true(取最新索引结果)
latest_value = config.get("latest", True)
if isinstance(latest_value, bool):
data["latest"] = latest_value
elif isinstance(latest_value, str):
data["latest"] = latest_value.lower() in ("true", "1", "yes")
elif isinstance(latest_value, (int, float)):
data["latest"] = latest_value != 0
else:
data["latest"] = True
headers = {
"X-QuakeToken": api_key,
"Content-Type": "application/json"
}
try:
response = requests.post(base_url, json=data, headers=headers, timeout=30)
response.raise_for_status()
result_data = response.json()
# Quake API code==0 表示成功
if result_data.get("code") != 0:
error_result = {
"status": "error",
"message": f"Quake API错误: {result_data.get('message', '未知错误')}",
"error_code": result_data.get("code", "unknown"),
"suggestion": "请检查API Token、查询语法和账户积分是否正常"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
results = result_data.get("data", [])
meta = result_data.get("meta", {})
pagination = meta.get("pagination", {}) if isinstance(meta, dict) else {}
output = {
"status": "success",
"query": query,
"size": data.get("size", pagination.get("size", len(results))),
"start": data.get("start", pagination.get("page_index", 0)),
"total": result_data.get("total_count", pagination.get("total", 0)),
"results_count": len(results),
"fields": include_fields or "all",
"results": results,
"message": f"成功获取 {len(results)} 条结果"
}
print(json.dumps(output, ensure_ascii=False, indent=2))
except requests.exceptions.RequestException as e:
error_result = {
"status": "error",
"message": f"请求失败: {str(e)}",
"suggestion": "请检查网络连通性或Quake API服务状态"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
except Exception as e:
error_result = {
"status": "error",
"message": f"执行出错: {str(e)}",
"type": type(e).__name__
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
enabled: false
short_description: "Quake网络空间搜索接口,支持自定义query、size、fields"
description: |
Quake(360 网络空间测绘)资产搜索工具,调用 Quake API v3 实时检索互联网资产。
**主要功能:**
- 支持 Quake DSL 查询语法(query
- 支持返回数量控制(size
- 支持字段裁剪(fields,对应 Quake include
- 支持分页偏移(start
**鉴权方式:**
- Header 使用 `X-QuakeToken`
- 可在本文件中填写 `QUAKE_API_KEY`,或通过环境变量 `QUAKE_API_KEY` 注入
**常见查询示例:**
- `domain:"example.com"`
- `ip:"1.1.1.1"`
- `port:443`
- `service.name:"http" AND country_cn:"中国"`
**注意事项:**
- API 调用会消耗积分,请按需控制 `size`
- `fields` 会映射到请求体 `include` 字段,多个字段用英文逗号分隔
- 如遇语法报错,请先在 Quake 控制台验证 DSL
parameters:
- name: "query"
type: "string"
description: |
Quake DSL 查询语句(必需)。
**示例:**
- `domain:"example.com"`
- `ip:"1.1.1.1"`
- `port:443`
- `service.name:"http" AND country_cn:"中国"`
required: true
position: 1
format: "positional"
- name: "size"
type: "int"
description: |
返回结果数量(可选)。
建议范围:1-100(具体受账户权限/接口限制影响)。
required: false
position: 2
format: "positional"
default: 10
- name: "start"
type: "int"
description: |
分页起始偏移(可选),从 0 开始。
required: false
position: 3
format: "positional"
default: 0
- name: "fields"
type: "string"
description: |
返回字段(可选),多个字段用英文逗号分隔。
该参数会映射到 Quake 请求体中的 `include` 字段。
**示例:**
- `ip,port`
- `ip,port,service.name,service.http.title,location.country_cn`
required: false
position: 4
format: "positional"
default: "ip,port"
- name: "latest"
type: "bool"
description: |
是否优先返回最新索引结果(可选)。
默认 `true`。
required: false
position: 5
format: "positional"
default: true