mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-04-22 02:36:40 +02:00
294 lines
9.8 KiB
YAML
294 lines
9.8 KiB
YAML
name: "quake_search"
|
||
command: "python3"
|
||
args:
|
||
- "-c"
|
||
- |
|
||
import sys
|
||
import json
|
||
import requests
|
||
import os
|
||
|
||
# ==================== Quake配置 ====================
|
||
# 请在此处配置您的Quake API Token
|
||
# 您也可以在环境变量中设置:QUAKE_API_KEY
|
||
# enable 默认为 false,需开启才能调用该MCP
|
||
QUAKE_API_KEY = "" # 请填写您的Quake API Token
|
||
# ==================================================
|
||
|
||
# Quake API基础URL
|
||
base_url = "https://quake.360.cn/api/v3/search/quake_service"
|
||
|
||
# 解析参数(从JSON字符串或命令行参数)
|
||
def parse_args():
|
||
# 尝试从第一个参数读取JSON配置
|
||
if len(sys.argv) > 1:
|
||
try:
|
||
arg1 = str(sys.argv[1])
|
||
config = json.loads(arg1)
|
||
if isinstance(config, dict):
|
||
return config
|
||
except (json.JSONDecodeError, TypeError, ValueError):
|
||
pass
|
||
|
||
# 传统位置参数方式(向后兼容)
|
||
# 参数位置:query=1, size=2, start=3, fields=4, latest=5
|
||
config = {}
|
||
if len(sys.argv) > 1:
|
||
config["query"] = str(sys.argv[1])
|
||
if len(sys.argv) > 2:
|
||
try:
|
||
config["size"] = int(sys.argv[2])
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if len(sys.argv) > 3:
|
||
try:
|
||
config["start"] = int(sys.argv[3])
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if len(sys.argv) > 4:
|
||
config["fields"] = str(sys.argv[4])
|
||
if len(sys.argv) > 5:
|
||
val = sys.argv[5]
|
||
if isinstance(val, str):
|
||
config["latest"] = val.lower() in ("true", "1", "yes")
|
||
else:
|
||
config["latest"] = bool(val)
|
||
return config
|
||
|
||
# 标准化 fields 参数:支持字符串和数组
|
||
def normalize_fields(fields_value):
|
||
if fields_value is None:
|
||
return None
|
||
|
||
if isinstance(fields_value, str):
|
||
raw = fields_value.strip()
|
||
if not raw:
|
||
return None
|
||
return [x.strip() for x in raw.split(",") if x.strip()]
|
||
|
||
if isinstance(fields_value, list):
|
||
output = []
|
||
for item in fields_value:
|
||
text = str(item).strip()
|
||
if text:
|
||
output.append(text)
|
||
return output or None
|
||
|
||
return None
|
||
|
||
try:
|
||
config = parse_args()
|
||
|
||
if not isinstance(config, dict):
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}",
|
||
"type": "TypeError"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
api_key = os.getenv("QUAKE_API_KEY", QUAKE_API_KEY).strip()
|
||
query = str(config.get("query", "")).strip()
|
||
|
||
if not api_key:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": "缺少Quake配置: api_key(Quake API Token)",
|
||
"required_config": ["api_key"],
|
||
"note": "请在YAML文件的QUAKE_API_KEY配置项中填写Token,或在环境变量QUAKE_API_KEY中设置。Token可在Quake用户中心获取。"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
if not query:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": "缺少必需参数: query(搜索查询语句)",
|
||
"required_params": ["query"],
|
||
"examples": [
|
||
'domain:"example.com"',
|
||
'ip:"1.1.1.1"',
|
||
'port:443',
|
||
'service.name:"http"',
|
||
'port:22 AND country_cn:"中国"'
|
||
]
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
# 构建请求体
|
||
data = {
|
||
"query": query
|
||
}
|
||
|
||
# 可选参数 size(通常最大100)
|
||
if "size" in config and config["size"] is not None:
|
||
try:
|
||
size = int(config["size"])
|
||
if size > 0:
|
||
data["size"] = size
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# 可选参数 start(分页偏移,默认0)
|
||
if "start" in config and config["start"] is not None:
|
||
try:
|
||
start = int(config["start"])
|
||
if start >= 0:
|
||
data["start"] = start
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# fields 映射到 Quake 的 include 字段
|
||
include_fields = normalize_fields(config.get("fields"))
|
||
if include_fields:
|
||
data["include"] = include_fields
|
||
|
||
# latest 参数,默认 true(取最新索引结果)
|
||
latest_value = config.get("latest", True)
|
||
if isinstance(latest_value, bool):
|
||
data["latest"] = latest_value
|
||
elif isinstance(latest_value, str):
|
||
data["latest"] = latest_value.lower() in ("true", "1", "yes")
|
||
elif isinstance(latest_value, (int, float)):
|
||
data["latest"] = latest_value != 0
|
||
else:
|
||
data["latest"] = True
|
||
|
||
headers = {
|
||
"X-QuakeToken": api_key,
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
try:
|
||
response = requests.post(base_url, json=data, headers=headers, timeout=30)
|
||
response.raise_for_status()
|
||
result_data = response.json()
|
||
|
||
# Quake API code==0 表示成功
|
||
if result_data.get("code") != 0:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"Quake API错误: {result_data.get('message', '未知错误')}",
|
||
"error_code": result_data.get("code", "unknown"),
|
||
"suggestion": "请检查API Token、查询语法和账户积分是否正常"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
results = result_data.get("data", [])
|
||
meta = result_data.get("meta", {})
|
||
pagination = meta.get("pagination", {}) if isinstance(meta, dict) else {}
|
||
|
||
output = {
|
||
"status": "success",
|
||
"query": query,
|
||
"size": data.get("size", pagination.get("size", len(results))),
|
||
"start": data.get("start", pagination.get("page_index", 0)),
|
||
"total": result_data.get("total_count", pagination.get("total", 0)),
|
||
"results_count": len(results),
|
||
"fields": include_fields or "all",
|
||
"results": results,
|
||
"message": f"成功获取 {len(results)} 条结果"
|
||
}
|
||
|
||
print(json.dumps(output, ensure_ascii=False, indent=2))
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"请求失败: {str(e)}",
|
||
"suggestion": "请检查网络连通性或Quake API服务状态"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
except Exception as e:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"执行出错: {str(e)}",
|
||
"type": type(e).__name__
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
enabled: false
|
||
short_description: "Quake网络空间搜索接口,支持自定义query、size、fields"
|
||
description: |
|
||
Quake(360 网络空间测绘)资产搜索工具,调用 Quake API v3 实时检索互联网资产。
|
||
|
||
**主要功能:**
|
||
- 支持 Quake DSL 查询语法(query)
|
||
- 支持返回数量控制(size)
|
||
- 支持字段裁剪(fields,对应 Quake include)
|
||
- 支持分页偏移(start)
|
||
|
||
**鉴权方式:**
|
||
- Header 使用 `X-QuakeToken`
|
||
- 可在本文件中填写 `QUAKE_API_KEY`,或通过环境变量 `QUAKE_API_KEY` 注入
|
||
|
||
**常见查询示例:**
|
||
- `domain:"example.com"`
|
||
- `ip:"1.1.1.1"`
|
||
- `port:443`
|
||
- `service.name:"http" AND country_cn:"中国"`
|
||
|
||
**注意事项:**
|
||
- API 调用会消耗积分,请按需控制 `size`
|
||
- `fields` 会映射到请求体 `include` 字段,多个字段用英文逗号分隔
|
||
- 如遇语法报错,请先在 Quake 控制台验证 DSL
|
||
parameters:
|
||
- name: "query"
|
||
type: "string"
|
||
description: |
|
||
Quake DSL 查询语句(必需)。
|
||
|
||
**示例:**
|
||
- `domain:"example.com"`
|
||
- `ip:"1.1.1.1"`
|
||
- `port:443`
|
||
- `service.name:"http" AND country_cn:"中国"`
|
||
required: true
|
||
position: 1
|
||
format: "positional"
|
||
- name: "size"
|
||
type: "int"
|
||
description: |
|
||
返回结果数量(可选)。
|
||
|
||
建议范围:1-100(具体受账户权限/接口限制影响)。
|
||
required: false
|
||
position: 2
|
||
format: "positional"
|
||
default: 10
|
||
- name: "start"
|
||
type: "int"
|
||
description: |
|
||
分页起始偏移(可选),从 0 开始。
|
||
required: false
|
||
position: 3
|
||
format: "positional"
|
||
default: 0
|
||
- name: "fields"
|
||
type: "string"
|
||
description: |
|
||
返回字段(可选),多个字段用英文逗号分隔。
|
||
|
||
该参数会映射到 Quake 请求体中的 `include` 字段。
|
||
**示例:**
|
||
- `ip,port`
|
||
- `ip,port,service.name,service.http.title,location.country_cn`
|
||
required: false
|
||
position: 4
|
||
format: "positional"
|
||
default: "ip,port"
|
||
- name: "latest"
|
||
type: "bool"
|
||
description: |
|
||
是否优先返回最新索引结果(可选)。
|
||
默认 `true`。
|
||
required: false
|
||
position: 5
|
||
format: "positional"
|
||
default: true
|