mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-04-22 02:36:40 +02:00
404 lines
14 KiB
YAML
404 lines
14 KiB
YAML
name: "shodan_search"
|
||
command: "python3"
|
||
args:
|
||
- "-c"
|
||
- |
|
||
import sys
|
||
import json
|
||
import requests
|
||
import os
|
||
import math
|
||
|
||
# ==================== Shodan配置 ====================
|
||
# 请在此处配置您的Shodan API Key
|
||
# 您也可以在环境变量中设置:SHODAN_API_KEY
|
||
# enable 默认为 false,需开启才能调用该MCP
|
||
SHODAN_API_KEY = "" # 请替换为您自己的Shodan API Key
|
||
# ==================================================
|
||
|
||
# Shodan API基础URL
|
||
base_url = "https://api.shodan.io"
|
||
|
||
# 解析参数(从JSON字符串或命令行参数)
|
||
def parse_args():
|
||
# 尝试从第一个参数读取JSON配置
|
||
if len(sys.argv) > 1:
|
||
try:
|
||
arg1 = str(sys.argv[1])
|
||
config = json.loads(arg1)
|
||
if isinstance(config, dict):
|
||
return config
|
||
except (json.JSONDecodeError, TypeError, ValueError):
|
||
pass
|
||
|
||
# 传统位置参数方式(向后兼容)
|
||
# 兼容两种序列:
|
||
# 1) query,page,facets,minify,fields,count_only,size
|
||
# 2) query,page,minify,fields,count_only,size (facets省略时执行器会压缩参数)
|
||
config = {}
|
||
if len(sys.argv) > 1:
|
||
config["query"] = str(sys.argv[1])
|
||
if len(sys.argv) > 2:
|
||
try:
|
||
config["page"] = int(sys.argv[2])
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
def is_bool_like(val):
|
||
if isinstance(val, bool):
|
||
return True
|
||
if not isinstance(val, str):
|
||
return False
|
||
return val.strip().lower() in ("true", "false", "1", "0", "yes", "no")
|
||
|
||
remaining = [str(x) for x in sys.argv[3:]]
|
||
if remaining:
|
||
# facets 省略时,第一个剩余参数通常是 minify(布尔)
|
||
first_is_bool = is_bool_like(remaining[0])
|
||
idx = 0
|
||
if not first_is_bool:
|
||
config["facets"] = remaining[idx]
|
||
idx += 1
|
||
|
||
if idx < len(remaining):
|
||
val = remaining[idx]
|
||
config["minify"] = val.lower() in ("true", "1", "yes")
|
||
idx += 1
|
||
|
||
if idx < len(remaining):
|
||
config["fields"] = remaining[idx]
|
||
idx += 1
|
||
|
||
if idx < len(remaining):
|
||
val = remaining[idx]
|
||
config["count_only"] = val.lower() in ("true", "1", "yes")
|
||
idx += 1
|
||
|
||
if idx < len(remaining):
|
||
try:
|
||
config["size"] = int(remaining[idx])
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
return config
|
||
|
||
def normalize_bool(value, default_value):
|
||
if value is None:
|
||
return default_value
|
||
if isinstance(value, bool):
|
||
return value
|
||
if isinstance(value, str):
|
||
return value.lower() in ("true", "1", "yes")
|
||
if isinstance(value, (int, float)):
|
||
return value != 0
|
||
return default_value
|
||
|
||
try:
|
||
config = parse_args()
|
||
|
||
if not isinstance(config, dict):
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}",
|
||
"type": "TypeError"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
api_key = os.getenv("SHODAN_API_KEY", SHODAN_API_KEY).strip()
|
||
query = str(config.get("query", "")).strip()
|
||
|
||
if not api_key:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": "缺少Shodan配置: api_key(Shodan API密钥)",
|
||
"required_config": ["api_key"],
|
||
"note": "请在YAML文件的SHODAN_API_KEY配置项中填写您的API密钥,或在环境变量SHODAN_API_KEY中设置。API密钥可在Shodan账户页面查看: https://account.shodan.io/"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
if not query:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": "缺少必需参数: query(搜索查询语句)",
|
||
"required_params": ["query"],
|
||
"examples": [
|
||
"product:nginx",
|
||
"apache country:DE",
|
||
"port:22",
|
||
"ssl.cert.subject.cn:example.com",
|
||
"org:\"Amazon\" port:443"
|
||
]
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
count_only = normalize_bool(config.get("count_only"), False)
|
||
minify = normalize_bool(config.get("minify"), True)
|
||
requested_size = config.get("size", None)
|
||
if requested_size is not None:
|
||
try:
|
||
requested_size = int(requested_size)
|
||
if requested_size <= 0:
|
||
requested_size = None
|
||
else:
|
||
# 防止单次请求过大导致额度和响应时间问题
|
||
requested_size = min(requested_size, 1000)
|
||
except (ValueError, TypeError):
|
||
requested_size = None
|
||
|
||
# 根据 count_only 选择搜索端点
|
||
endpoint = "/shodan/host/count" if count_only else "/shodan/host/search"
|
||
url = f"{base_url}{endpoint}"
|
||
|
||
params = {
|
||
"key": api_key,
|
||
"query": query
|
||
}
|
||
|
||
# 可选参数 facets(search 和 count 都支持)
|
||
if "facets" in config and config["facets"]:
|
||
facets_value = str(config["facets"]).strip()
|
||
if facets_value:
|
||
params["facets"] = facets_value
|
||
|
||
# search 接口的可选参数
|
||
if not count_only:
|
||
if "page" in config and config["page"] is not None:
|
||
try:
|
||
page = int(config["page"])
|
||
if page > 0:
|
||
params["page"] = page
|
||
except (ValueError, TypeError):
|
||
pass
|
||
minify_effective = minify
|
||
|
||
if "fields" in config and config["fields"]:
|
||
fields_value = str(config["fields"]).strip()
|
||
if fields_value:
|
||
params["fields"] = fields_value
|
||
# Shodan API约束:fields 与 minify=true 互斥
|
||
minify_effective = False
|
||
|
||
params["minify"] = "true" if minify_effective else "false"
|
||
|
||
try:
|
||
if count_only:
|
||
response = requests.get(url, params=params, timeout=30)
|
||
response.raise_for_status()
|
||
result_data = response.json()
|
||
|
||
if isinstance(result_data, dict) and result_data.get("error"):
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"Shodan API错误: {result_data.get('error', '未知错误')}",
|
||
"suggestion": "请检查API密钥、查询语法和账户查询额度"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
output = {
|
||
"status": "success",
|
||
"mode": "count",
|
||
"query": query,
|
||
"total": result_data.get("total", 0),
|
||
"facets": result_data.get("facets", {}),
|
||
"size": requested_size,
|
||
"note": "count模式仅返回统计,不返回明细结果",
|
||
"message": "统计查询完成(未返回资产明细)"
|
||
}
|
||
else:
|
||
start_page = int(params.get("page", 1))
|
||
# Shodan search 每页固定最多100条
|
||
# 如果未指定 size,则保持原始行为(单页)
|
||
target_size = requested_size if requested_size else 100
|
||
pages_needed = 1 if not requested_size else max(1, int(math.ceil(target_size / 100.0)))
|
||
|
||
all_matches = []
|
||
last_result_data = {}
|
||
current_page = start_page
|
||
pages_fetched = 0
|
||
|
||
for _ in range(pages_needed):
|
||
page_params = dict(params)
|
||
page_params["page"] = current_page
|
||
|
||
response = requests.get(url, params=page_params, timeout=30)
|
||
response.raise_for_status()
|
||
result_data = response.json()
|
||
last_result_data = result_data if isinstance(result_data, dict) else {}
|
||
pages_fetched += 1
|
||
|
||
if isinstance(last_result_data, dict) and last_result_data.get("error"):
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"Shodan API错误: {last_result_data.get('error', '未知错误')}",
|
||
"suggestion": "请检查API密钥、查询语法和账户查询额度"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
page_matches = last_result_data.get("matches", []) if isinstance(last_result_data, dict) else []
|
||
if not page_matches:
|
||
break
|
||
|
||
all_matches.extend(page_matches)
|
||
if len(all_matches) >= target_size:
|
||
break
|
||
current_page += 1
|
||
|
||
matches = all_matches[:target_size]
|
||
output = {
|
||
"status": "success",
|
||
"mode": "search",
|
||
"query": query,
|
||
"page": start_page,
|
||
"size": target_size,
|
||
"pages_fetched": pages_fetched,
|
||
"total": last_result_data.get("total", 0),
|
||
"results_count": len(matches),
|
||
"facets": last_result_data.get("facets", {}),
|
||
"results": matches,
|
||
"message": f"成功获取 {len(matches)} 条结果"
|
||
}
|
||
|
||
print(json.dumps(output, ensure_ascii=False, indent=2))
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
response_body = ""
|
||
status_code = None
|
||
if hasattr(e, "response") and e.response is not None:
|
||
status_code = e.response.status_code
|
||
try:
|
||
response_body = e.response.text[:500]
|
||
except Exception:
|
||
response_body = ""
|
||
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"请求失败: {str(e)}",
|
||
"status_code": status_code,
|
||
"response": response_body,
|
||
"suggestion": "请检查网络连接、Shodan API状态、API密钥与查询额度"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
except Exception as e:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"执行出错: {str(e)}",
|
||
"type": type(e).__name__
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
enabled: false
|
||
short_description: "Shodan网络空间搜索,支持search与count模式"
|
||
description: |
|
||
Shodan 资产搜索工具,基于官方 Developer API 实现,支持快速检索和统计分析。
|
||
|
||
**主要功能:**
|
||
- 使用 `/shodan/host/search` 进行资产搜索
|
||
- 使用 `/shodan/host/count` 进行无明细统计(节省查询信用)
|
||
- 支持按 `size` 控制返回条数(自动翻页聚合)
|
||
- 支持分页(page)
|
||
- 支持分面统计(facets)
|
||
- 支持结果字段裁剪(fields)
|
||
- 支持 `minify` 控制返回数据体积
|
||
|
||
**鉴权方式:**
|
||
- Query 参数使用 `key`
|
||
- 可在本文件中填写 `SHODAN_API_KEY`,或通过环境变量 `SHODAN_API_KEY` 注入
|
||
|
||
**查询语法示例:**
|
||
- `product:nginx`
|
||
- `apache country:DE`
|
||
- `port:22`
|
||
- `org:"Amazon" port:443`
|
||
- `ssl.cert.subject.cn:example.com`
|
||
|
||
**注意事项:**
|
||
- 带过滤器的查询通常会消耗 query credits
|
||
- 翻页(超过第1页)会额外消耗额度
|
||
- `size` 大于 100 时会自动请求更多页(每页最多 100)
|
||
- `size` 最大限制为 1000(防止过量请求)
|
||
- `count_only=true` 使用统计接口,不返回 matches 明细
|
||
parameters:
|
||
- name: "query"
|
||
type: "string"
|
||
description: |
|
||
Shodan 搜索语句(必需)。
|
||
|
||
支持 Shodan filter 语法(`filter:value`)与关键字组合。
|
||
示例:
|
||
- `product:nginx`
|
||
- `apache country:DE`
|
||
- `port:22`
|
||
- `org:"Amazon" port:443`
|
||
required: true
|
||
position: 1
|
||
format: "positional"
|
||
- name: "page"
|
||
type: "int"
|
||
description: |
|
||
页码(可选,仅 search 模式生效),从 1 开始,默认 1。
|
||
required: false
|
||
position: 2
|
||
format: "positional"
|
||
default: 1
|
||
- name: "facets"
|
||
type: "string"
|
||
description: |
|
||
分面统计字段(可选)。
|
||
|
||
多个字段用英文逗号分隔,也可指定数量:
|
||
- `org,os`
|
||
- `country:20,org:10`
|
||
required: false
|
||
position: 3
|
||
format: "positional"
|
||
- name: "minify"
|
||
type: "bool"
|
||
description: |
|
||
是否精简返回字段(可选,仅 search 模式生效)。
|
||
默认 `true`。
|
||
required: false
|
||
position: 4
|
||
format: "positional"
|
||
default: true
|
||
- name: "fields"
|
||
type: "string"
|
||
description: |
|
||
指定返回字段(可选,仅 search 模式生效)。
|
||
|
||
多个字段用英文逗号分隔,例如:
|
||
- `ip_str,port,org,hostnames,http.title`
|
||
- `tags,http.title,http.favicon.hash`
|
||
required: false
|
||
position: 5
|
||
format: "positional"
|
||
- name: "count_only"
|
||
type: "bool"
|
||
description: |
|
||
是否仅统计总数(可选)。
|
||
|
||
- `false`(默认):调用 `/shodan/host/search` 返回明细
|
||
- `true`:调用 `/shodan/host/count` 仅返回 total 和 facets
|
||
required: false
|
||
position: 6
|
||
format: "positional"
|
||
default: false
|
||
- name: "size"
|
||
type: "int"
|
||
description: |
|
||
返回结果数量(可选,仅 search 模式生效)。
|
||
|
||
- 支持 `10 / 20 / 100 / n`
|
||
- Shodan 单页最多 100,超过 100 时会自动翻页拼接
|
||
- 为避免额度和时延问题,最大值限制为 1000
|
||
- 未传时默认返回单页结果(最多 100 条)
|
||
required: false
|
||
position: 7
|
||
format: "positional"
|