diff --git a/tools/shodan_search.yaml b/tools/shodan_search.yaml new file mode 100644 index 00000000..766cdaa0 --- /dev/null +++ b/tools/shodan_search.yaml @@ -0,0 +1,403 @@ +name: "shodan_search" +command: "python3" +args: + - "-c" + - | + import sys + import json + import requests + import os + import math + + # ==================== Shodan配置 ==================== + # 请在此处配置您的Shodan API Key + # 您也可以在环境变量中设置:SHODAN_API_KEY + # enable 默认为 false,需开启才能调用该MCP + SHODAN_API_KEY = "" # 请替换为您自己的Shodan API Key + # ================================================== + + # Shodan API基础URL + base_url = "https://api.shodan.io" + + # 解析参数(从JSON字符串或命令行参数) + def parse_args(): + # 尝试从第一个参数读取JSON配置 + if len(sys.argv) > 1: + try: + arg1 = str(sys.argv[1]) + config = json.loads(arg1) + if isinstance(config, dict): + return config + except (json.JSONDecodeError, TypeError, ValueError): + pass + + # 传统位置参数方式(向后兼容) + # 兼容两种序列: + # 1) query,page,facets,minify,fields,count_only,size + # 2) query,page,minify,fields,count_only,size (facets省略时执行器会压缩参数) + config = {} + if len(sys.argv) > 1: + config["query"] = str(sys.argv[1]) + if len(sys.argv) > 2: + try: + config["page"] = int(sys.argv[2]) + except (ValueError, TypeError): + pass + + def is_bool_like(val): + if isinstance(val, bool): + return True + if not isinstance(val, str): + return False + return val.strip().lower() in ("true", "false", "1", "0", "yes", "no") + + remaining = [str(x) for x in sys.argv[3:]] + if remaining: + # facets 省略时,第一个剩余参数通常是 minify(布尔) + first_is_bool = is_bool_like(remaining[0]) + idx = 0 + if not first_is_bool: + config["facets"] = remaining[idx] + idx += 1 + + if idx < len(remaining): + val = remaining[idx] + config["minify"] = val.lower() in ("true", "1", "yes") + idx += 1 + + if idx < len(remaining): + config["fields"] = remaining[idx] + idx += 1 + + if idx < len(remaining): + val = remaining[idx] + config["count_only"] = val.lower() in ("true", "1", "yes") + idx += 1 + + if idx < len(remaining): + try: + config["size"] = int(remaining[idx]) + except (ValueError, TypeError): + pass + + return config + + def normalize_bool(value, default_value): + if value is None: + return default_value + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.lower() in ("true", "1", "yes") + if isinstance(value, (int, float)): + return value != 0 + return default_value + + try: + config = parse_args() + + if not isinstance(config, dict): + error_result = { + "status": "error", + "message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}", + "type": "TypeError" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + api_key = os.getenv("SHODAN_API_KEY", SHODAN_API_KEY).strip() + query = str(config.get("query", "")).strip() + + if not api_key: + error_result = { + "status": "error", + "message": "缺少Shodan配置: api_key(Shodan API密钥)", + "required_config": ["api_key"], + "note": "请在YAML文件的SHODAN_API_KEY配置项中填写您的API密钥,或在环境变量SHODAN_API_KEY中设置。API密钥可在Shodan账户页面查看: https://account.shodan.io/" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + if not query: + error_result = { + "status": "error", + "message": "缺少必需参数: query(搜索查询语句)", + "required_params": ["query"], + "examples": [ + "product:nginx", + "apache country:DE", + "port:22", + "ssl.cert.subject.cn:example.com", + "org:\"Amazon\" port:443" + ] + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + count_only = normalize_bool(config.get("count_only"), False) + minify = normalize_bool(config.get("minify"), True) + requested_size = config.get("size", None) + if requested_size is not None: + try: + requested_size = int(requested_size) + if requested_size <= 0: + requested_size = None + else: + # 防止单次请求过大导致额度和响应时间问题 + requested_size = min(requested_size, 1000) + except (ValueError, TypeError): + requested_size = None + + # 根据 count_only 选择搜索端点 + endpoint = "/shodan/host/count" if count_only else "/shodan/host/search" + url = f"{base_url}{endpoint}" + + params = { + "key": api_key, + "query": query + } + + # 可选参数 facets(search 和 count 都支持) + if "facets" in config and config["facets"]: + facets_value = str(config["facets"]).strip() + if facets_value: + params["facets"] = facets_value + + # search 接口的可选参数 + if not count_only: + if "page" in config and config["page"] is not None: + try: + page = int(config["page"]) + if page > 0: + params["page"] = page + except (ValueError, TypeError): + pass + minify_effective = minify + + if "fields" in config and config["fields"]: + fields_value = str(config["fields"]).strip() + if fields_value: + params["fields"] = fields_value + # Shodan API约束:fields 与 minify=true 互斥 + minify_effective = False + + params["minify"] = "true" if minify_effective else "false" + + try: + if count_only: + response = requests.get(url, params=params, timeout=30) + response.raise_for_status() + result_data = response.json() + + if isinstance(result_data, dict) and result_data.get("error"): + error_result = { + "status": "error", + "message": f"Shodan API错误: {result_data.get('error', '未知错误')}", + "suggestion": "请检查API密钥、查询语法和账户查询额度" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + output = { + "status": "success", + "mode": "count", + "query": query, + "total": result_data.get("total", 0), + "facets": result_data.get("facets", {}), + "size": requested_size, + "note": "count模式仅返回统计,不返回明细结果", + "message": "统计查询完成(未返回资产明细)" + } + else: + start_page = int(params.get("page", 1)) + # Shodan search 每页固定最多100条 + # 如果未指定 size,则保持原始行为(单页) + target_size = requested_size if requested_size else 100 + pages_needed = 1 if not requested_size else max(1, int(math.ceil(target_size / 100.0))) + + all_matches = [] + last_result_data = {} + current_page = start_page + pages_fetched = 0 + + for _ in range(pages_needed): + page_params = dict(params) + page_params["page"] = current_page + + response = requests.get(url, params=page_params, timeout=30) + response.raise_for_status() + result_data = response.json() + last_result_data = result_data if isinstance(result_data, dict) else {} + pages_fetched += 1 + + if isinstance(last_result_data, dict) and last_result_data.get("error"): + error_result = { + "status": "error", + "message": f"Shodan API错误: {last_result_data.get('error', '未知错误')}", + "suggestion": "请检查API密钥、查询语法和账户查询额度" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + page_matches = last_result_data.get("matches", []) if isinstance(last_result_data, dict) else [] + if not page_matches: + break + + all_matches.extend(page_matches) + if len(all_matches) >= target_size: + break + current_page += 1 + + matches = all_matches[:target_size] + output = { + "status": "success", + "mode": "search", + "query": query, + "page": start_page, + "size": target_size, + "pages_fetched": pages_fetched, + "total": last_result_data.get("total", 0), + "results_count": len(matches), + "facets": last_result_data.get("facets", {}), + "results": matches, + "message": f"成功获取 {len(matches)} 条结果" + } + + print(json.dumps(output, ensure_ascii=False, indent=2)) + + except requests.exceptions.RequestException as e: + response_body = "" + status_code = None + if hasattr(e, "response") and e.response is not None: + status_code = e.response.status_code + try: + response_body = e.response.text[:500] + except Exception: + response_body = "" + + error_result = { + "status": "error", + "message": f"请求失败: {str(e)}", + "status_code": status_code, + "response": response_body, + "suggestion": "请检查网络连接、Shodan API状态、API密钥与查询额度" + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) + + except Exception as e: + error_result = { + "status": "error", + "message": f"执行出错: {str(e)}", + "type": type(e).__name__ + } + print(json.dumps(error_result, ensure_ascii=False, indent=2)) + sys.exit(1) +enabled: false +short_description: "Shodan网络空间搜索,支持search与count模式" +description: | + Shodan 资产搜索工具,基于官方 Developer API 实现,支持快速检索和统计分析。 + + **主要功能:** + - 使用 `/shodan/host/search` 进行资产搜索 + - 使用 `/shodan/host/count` 进行无明细统计(节省查询信用) + - 支持按 `size` 控制返回条数(自动翻页聚合) + - 支持分页(page) + - 支持分面统计(facets) + - 支持结果字段裁剪(fields) + - 支持 `minify` 控制返回数据体积 + + **鉴权方式:** + - Query 参数使用 `key` + - 可在本文件中填写 `SHODAN_API_KEY`,或通过环境变量 `SHODAN_API_KEY` 注入 + + **查询语法示例:** + - `product:nginx` + - `apache country:DE` + - `port:22` + - `org:"Amazon" port:443` + - `ssl.cert.subject.cn:example.com` + + **注意事项:** + - 带过滤器的查询通常会消耗 query credits + - 翻页(超过第1页)会额外消耗额度 + - `size` 大于 100 时会自动请求更多页(每页最多 100) + - `size` 最大限制为 1000(防止过量请求) + - `count_only=true` 使用统计接口,不返回 matches 明细 +parameters: + - name: "query" + type: "string" + description: | + Shodan 搜索语句(必需)。 + + 支持 Shodan filter 语法(`filter:value`)与关键字组合。 + 示例: + - `product:nginx` + - `apache country:DE` + - `port:22` + - `org:"Amazon" port:443` + required: true + position: 1 + format: "positional" + - name: "page" + type: "int" + description: | + 页码(可选,仅 search 模式生效),从 1 开始,默认 1。 + required: false + position: 2 + format: "positional" + default: 1 + - name: "facets" + type: "string" + description: | + 分面统计字段(可选)。 + + 多个字段用英文逗号分隔,也可指定数量: + - `org,os` + - `country:20,org:10` + required: false + position: 3 + format: "positional" + - name: "minify" + type: "bool" + description: | + 是否精简返回字段(可选,仅 search 模式生效)。 + 默认 `true`。 + required: false + position: 4 + format: "positional" + default: true + - name: "fields" + type: "string" + description: | + 指定返回字段(可选,仅 search 模式生效)。 + + 多个字段用英文逗号分隔,例如: + - `ip_str,port,org,hostnames,http.title` + - `tags,http.title,http.favicon.hash` + required: false + position: 5 + format: "positional" + - name: "count_only" + type: "bool" + description: | + 是否仅统计总数(可选)。 + + - `false`(默认):调用 `/shodan/host/search` 返回明细 + - `true`:调用 `/shodan/host/count` 仅返回 total 和 facets + required: false + position: 6 + format: "positional" + default: false + - name: "size" + type: "int" + description: | + 返回结果数量(可选,仅 search 模式生效)。 + + - 支持 `10 / 20 / 100 / n` + - Shodan 单页最多 100,超过 100 时会自动翻页拼接 + - 为避免额度和时延问题,最大值限制为 1000 + - 未传时默认返回单页结果(最多 100 条) + required: false + position: 7 + format: "positional"