Add files via upload

This commit is contained in:
公明
2026-04-09 20:44:17 +08:00
committed by GitHub
parent a273d6d7ba
commit cd30953a84
+403
View File
@@ -0,0 +1,403 @@
name: "shodan_search"
command: "python3"
args:
- "-c"
- |
import sys
import json
import requests
import os
import math
# ==================== Shodan配置 ====================
# 请在此处配置您的Shodan API Key
# 您也可以在环境变量中设置:SHODAN_API_KEY
# enable 默认为 false,需开启才能调用该MCP
SHODAN_API_KEY = "" # 请替换为您自己的Shodan API Key
# ==================================================
# Shodan API基础URL
base_url = "https://api.shodan.io"
# 解析参数(从JSON字符串或命令行参数)
def parse_args():
# 尝试从第一个参数读取JSON配置
if len(sys.argv) > 1:
try:
arg1 = str(sys.argv[1])
config = json.loads(arg1)
if isinstance(config, dict):
return config
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 传统位置参数方式(向后兼容)
# 兼容两种序列:
# 1) query,page,facets,minify,fields,count_only,size
# 2) query,page,minify,fields,count_only,size (facets省略时执行器会压缩参数)
config = {}
if len(sys.argv) > 1:
config["query"] = str(sys.argv[1])
if len(sys.argv) > 2:
try:
config["page"] = int(sys.argv[2])
except (ValueError, TypeError):
pass
def is_bool_like(val):
if isinstance(val, bool):
return True
if not isinstance(val, str):
return False
return val.strip().lower() in ("true", "false", "1", "0", "yes", "no")
remaining = [str(x) for x in sys.argv[3:]]
if remaining:
# facets 省略时,第一个剩余参数通常是 minify(布尔)
first_is_bool = is_bool_like(remaining[0])
idx = 0
if not first_is_bool:
config["facets"] = remaining[idx]
idx += 1
if idx < len(remaining):
val = remaining[idx]
config["minify"] = val.lower() in ("true", "1", "yes")
idx += 1
if idx < len(remaining):
config["fields"] = remaining[idx]
idx += 1
if idx < len(remaining):
val = remaining[idx]
config["count_only"] = val.lower() in ("true", "1", "yes")
idx += 1
if idx < len(remaining):
try:
config["size"] = int(remaining[idx])
except (ValueError, TypeError):
pass
return config
def normalize_bool(value, default_value):
if value is None:
return default_value
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ("true", "1", "yes")
if isinstance(value, (int, float)):
return value != 0
return default_value
try:
config = parse_args()
if not isinstance(config, dict):
error_result = {
"status": "error",
"message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}",
"type": "TypeError"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
api_key = os.getenv("SHODAN_API_KEY", SHODAN_API_KEY).strip()
query = str(config.get("query", "")).strip()
if not api_key:
error_result = {
"status": "error",
"message": "缺少Shodan配置: api_keyShodan API密钥)",
"required_config": ["api_key"],
"note": "请在YAML文件的SHODAN_API_KEY配置项中填写您的API密钥,或在环境变量SHODAN_API_KEY中设置。API密钥可在Shodan账户页面查看: https://account.shodan.io/"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
if not query:
error_result = {
"status": "error",
"message": "缺少必需参数: query(搜索查询语句)",
"required_params": ["query"],
"examples": [
"product:nginx",
"apache country:DE",
"port:22",
"ssl.cert.subject.cn:example.com",
"org:\"Amazon\" port:443"
]
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
count_only = normalize_bool(config.get("count_only"), False)
minify = normalize_bool(config.get("minify"), True)
requested_size = config.get("size", None)
if requested_size is not None:
try:
requested_size = int(requested_size)
if requested_size <= 0:
requested_size = None
else:
# 防止单次请求过大导致额度和响应时间问题
requested_size = min(requested_size, 1000)
except (ValueError, TypeError):
requested_size = None
# 根据 count_only 选择搜索端点
endpoint = "/shodan/host/count" if count_only else "/shodan/host/search"
url = f"{base_url}{endpoint}"
params = {
"key": api_key,
"query": query
}
# 可选参数 facetssearch 和 count 都支持)
if "facets" in config and config["facets"]:
facets_value = str(config["facets"]).strip()
if facets_value:
params["facets"] = facets_value
# search 接口的可选参数
if not count_only:
if "page" in config and config["page"] is not None:
try:
page = int(config["page"])
if page > 0:
params["page"] = page
except (ValueError, TypeError):
pass
minify_effective = minify
if "fields" in config and config["fields"]:
fields_value = str(config["fields"]).strip()
if fields_value:
params["fields"] = fields_value
# Shodan API约束:fields 与 minify=true 互斥
minify_effective = False
params["minify"] = "true" if minify_effective else "false"
try:
if count_only:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
result_data = response.json()
if isinstance(result_data, dict) and result_data.get("error"):
error_result = {
"status": "error",
"message": f"Shodan API错误: {result_data.get('error', '未知错误')}",
"suggestion": "请检查API密钥、查询语法和账户查询额度"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
output = {
"status": "success",
"mode": "count",
"query": query,
"total": result_data.get("total", 0),
"facets": result_data.get("facets", {}),
"size": requested_size,
"note": "count模式仅返回统计,不返回明细结果",
"message": "统计查询完成(未返回资产明细)"
}
else:
start_page = int(params.get("page", 1))
# Shodan search 每页固定最多100条
# 如果未指定 size,则保持原始行为(单页)
target_size = requested_size if requested_size else 100
pages_needed = 1 if not requested_size else max(1, int(math.ceil(target_size / 100.0)))
all_matches = []
last_result_data = {}
current_page = start_page
pages_fetched = 0
for _ in range(pages_needed):
page_params = dict(params)
page_params["page"] = current_page
response = requests.get(url, params=page_params, timeout=30)
response.raise_for_status()
result_data = response.json()
last_result_data = result_data if isinstance(result_data, dict) else {}
pages_fetched += 1
if isinstance(last_result_data, dict) and last_result_data.get("error"):
error_result = {
"status": "error",
"message": f"Shodan API错误: {last_result_data.get('error', '未知错误')}",
"suggestion": "请检查API密钥、查询语法和账户查询额度"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
page_matches = last_result_data.get("matches", []) if isinstance(last_result_data, dict) else []
if not page_matches:
break
all_matches.extend(page_matches)
if len(all_matches) >= target_size:
break
current_page += 1
matches = all_matches[:target_size]
output = {
"status": "success",
"mode": "search",
"query": query,
"page": start_page,
"size": target_size,
"pages_fetched": pages_fetched,
"total": last_result_data.get("total", 0),
"results_count": len(matches),
"facets": last_result_data.get("facets", {}),
"results": matches,
"message": f"成功获取 {len(matches)} 条结果"
}
print(json.dumps(output, ensure_ascii=False, indent=2))
except requests.exceptions.RequestException as e:
response_body = ""
status_code = None
if hasattr(e, "response") and e.response is not None:
status_code = e.response.status_code
try:
response_body = e.response.text[:500]
except Exception:
response_body = ""
error_result = {
"status": "error",
"message": f"请求失败: {str(e)}",
"status_code": status_code,
"response": response_body,
"suggestion": "请检查网络连接、Shodan API状态、API密钥与查询额度"
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
except Exception as e:
error_result = {
"status": "error",
"message": f"执行出错: {str(e)}",
"type": type(e).__name__
}
print(json.dumps(error_result, ensure_ascii=False, indent=2))
sys.exit(1)
enabled: false
short_description: "Shodan网络空间搜索,支持search与count模式"
description: |
Shodan 资产搜索工具,基于官方 Developer API 实现,支持快速检索和统计分析。
**主要功能:**
- 使用 `/shodan/host/search` 进行资产搜索
- 使用 `/shodan/host/count` 进行无明细统计(节省查询信用)
- 支持按 `size` 控制返回条数(自动翻页聚合)
- 支持分页(page
- 支持分面统计(facets
- 支持结果字段裁剪(fields)
- 支持 `minify` 控制返回数据体积
**鉴权方式:**
- Query 参数使用 `key`
- 可在本文件中填写 `SHODAN_API_KEY`,或通过环境变量 `SHODAN_API_KEY` 注入
**查询语法示例:**
- `product:nginx`
- `apache country:DE`
- `port:22`
- `org:"Amazon" port:443`
- `ssl.cert.subject.cn:example.com`
**注意事项:**
- 带过滤器的查询通常会消耗 query credits
- 翻页(超过第1页)会额外消耗额度
- `size` 大于 100 时会自动请求更多页(每页最多 100)
- `size` 最大限制为 1000(防止过量请求)
- `count_only=true` 使用统计接口,不返回 matches 明细
parameters:
- name: "query"
type: "string"
description: |
Shodan 搜索语句(必需)。
支持 Shodan filter 语法(`filter:value`)与关键字组合。
示例:
- `product:nginx`
- `apache country:DE`
- `port:22`
- `org:"Amazon" port:443`
required: true
position: 1
format: "positional"
- name: "page"
type: "int"
description: |
页码(可选,仅 search 模式生效),从 1 开始,默认 1。
required: false
position: 2
format: "positional"
default: 1
- name: "facets"
type: "string"
description: |
分面统计字段(可选)。
多个字段用英文逗号分隔,也可指定数量:
- `org,os`
- `country:20,org:10`
required: false
position: 3
format: "positional"
- name: "minify"
type: "bool"
description: |
是否精简返回字段(可选,仅 search 模式生效)。
默认 `true`。
required: false
position: 4
format: "positional"
default: true
- name: "fields"
type: "string"
description: |
指定返回字段(可选,仅 search 模式生效)。
多个字段用英文逗号分隔,例如:
- `ip_str,port,org,hostnames,http.title`
- `tags,http.title,http.favicon.hash`
required: false
position: 5
format: "positional"
- name: "count_only"
type: "bool"
description: |
是否仅统计总数(可选)。
- `false`(默认):调用 `/shodan/host/search` 返回明细
- `true`:调用 `/shodan/host/count` 仅返回 total 和 facets
required: false
position: 6
format: "positional"
default: false
- name: "size"
type: "int"
description: |
返回结果数量(可选,仅 search 模式生效)。
- 支持 `10 / 20 / 100 / n`
- Shodan 单页最多 100,超过 100 时会自动翻页拼接
- 为避免额度和时延问题,最大值限制为 1000
- 未传时默认返回单页结果(最多 100 条)
required: false
position: 7
format: "positional"