name: "fofa_search" command: "python3" args: - "-c" - | import sys import json import base64 import requests import os # ==================== FOFA配置 ==================== # 请在此处配置您的FOFA账号信息 # 您也可以在环境变量中设置:FOFA_EMAIL 和 FOFA_API_KEY # enable 默认为 false,需开启才能调用该MCP FOFA_EMAIL = "" # 请填写您的FOFA账号邮箱 FOFA_API_KEY = "" # 请填写您的FOFA API密钥 # ================================================== # FOFA API基础URL base_url = "https://fofa.info/api/v1/search/all" # 解析参数(从JSON字符串或命令行参数) def parse_args(): # 尝试从第一个参数读取JSON配置 if len(sys.argv) > 1: try: # 确保 sys.argv[1] 是字符串 arg1 = str(sys.argv[1]) # 尝试解析为JSON config = json.loads(arg1) # 确保返回的是字典类型 if isinstance(config, dict): return config except (json.JSONDecodeError, TypeError, ValueError): # 如果不是JSON,使用传统的位置参数方式 pass # 传统位置参数方式(向后兼容) # 注意:email 和 api_key 已从参数中移除,现在从配置中读取 # 参数位置:query=2, size=3, page=4, fields=5, full=6 # 但在 sys.argv 中,由于 python3 -c "code" 的格式,实际位置需要调整 # sys.argv[0] 是 '-c',sys.argv[1] 开始是实际参数 config = {} if len(sys.argv) > 1: config['query'] = str(sys.argv[1]) if len(sys.argv) > 2: try: config['size'] = int(sys.argv[2]) except (ValueError, TypeError): pass if len(sys.argv) > 3: try: config['page'] = int(sys.argv[3]) except (ValueError, TypeError): pass if len(sys.argv) > 4: config['fields'] = str(sys.argv[4]) if len(sys.argv) > 5: val = sys.argv[5] if isinstance(val, str): config['full'] = val.lower() in ('true', '1', 'yes') else: config['full'] = bool(val) return config try: config = parse_args() # 确保 config 是字典类型 if not isinstance(config, dict): error_result = { "status": "error", "message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}", "type": "TypeError" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) # 从配置或环境变量获取email和api_key email = os.getenv('FOFA_EMAIL', FOFA_EMAIL).strip() api_key = os.getenv('FOFA_API_KEY', FOFA_API_KEY).strip() query = config.get('query', '').strip() if not email: error_result = { "status": "error", "message": "缺少FOFA配置: email(FOFA账号邮箱)", "required_config": ["email", "api_key"], "note": "请在YAML文件的FOFA_EMAIL配置项中填写您的FOFA账号邮箱,或在环境变量FOFA_EMAIL中设置" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) if not api_key: error_result = { "status": "error", "message": "缺少FOFA配置: api_key(FOFA API密钥)", "required_config": ["email", "api_key"], "note": "请在YAML文件的FOFA_API_KEY配置项中填写您的API密钥,或在环境变量FOFA_API_KEY中设置。API密钥可在FOFA个人中心获取: https://fofa.info/userInfo" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) if not query: error_result = { "status": "error", "message": "缺少必需参数: query(搜索查询语句)", "required_params": ["query"], "examples": [ 'app="Apache"', 'title="登录"', 'domain="example.com"', 'ip="1.1.1.1"', 'port="80"', 'country="CN"', 'city="Beijing"' ] } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) # 构建请求参数 params = { 'email': email, 'key': api_key, 'qbase64': base64.b64encode(query.encode('utf-8')).decode('utf-8') } # 可选参数 if 'size' in config and config['size'] is not None: try: size = int(config['size']) if size > 0: params['size'] = size except (ValueError, TypeError): pass if 'page' in config and config['page'] is not None: try: page = int(config['page']) if page > 0: params['page'] = page except (ValueError, TypeError): pass if 'fields' in config and config['fields']: params['fields'] = str(config['fields']).strip() if 'full' in config and config['full'] is not None: full_val = config['full'] if isinstance(full_val, bool): params['full'] = 'true' if full_val else 'false' elif isinstance(full_val, str): params['full'] = 'true' if full_val.lower() in ('true', '1', 'yes') else 'false' elif isinstance(full_val, (int, float)): params['full'] = 'true' if full_val != 0 else 'false' # 发送请求 try: response = requests.get(base_url, params=params, timeout=30) response.raise_for_status() result_data = response.json() # 检查FOFA API返回的错误 if result_data.get('error'): error_result = { "status": "error", "message": f"FOFA API错误: {result_data.get('errmsg', '未知错误')}", "error_code": result_data.get('error'), "suggestion": "请检查API密钥是否正确,或查询语句是否符合FOFA语法" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) # 格式化输出结果 output = { "status": "success", "query": query, "size": result_data.get('size', 0), "page": result_data.get('page', 1), "total": result_data.get('total', 0), "results_count": len(result_data.get('results', [])), "results": result_data.get('results', []), "message": f"成功获取 {len(result_data.get('results', []))} 条结果" } print(json.dumps(output, ensure_ascii=False, indent=2)) except requests.exceptions.RequestException as e: error_result = { "status": "error", "message": f"请求失败: {str(e)}", "suggestion": "请检查网络连接或FOFA API服务状态" } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) except Exception as e: error_result = { "status": "error", "message": f"执行出错: {str(e)}", "type": type(e).__name__ } print(json.dumps(error_result, ensure_ascii=False, indent=2)) sys.exit(1) enabled: false short_description: "FOFA网络空间搜索引擎,支持灵活的查询参数配置" description: | FOFA是一个网络空间测绘搜索引擎,可以通过多种查询条件搜索互联网资产。 **主要功能:** - 支持多种查询语法(app、title、domain、ip、port、country、city等) - 灵活的字段返回配置 - 分页查询支持 - 完整数据模式(full参数) **使用场景:** - 资产发现和枚举 - 漏洞影响范围评估 - 安全态势感知 - 威胁情报收集 - Bug bounty信息收集 **查询语法说明:** **基础查询:** - 直接输入查询语句,将从标题、HTML内容、HTTP头信息、URL字段中搜索 - 如果查询表达式有多个与或关系,尽量在外面用括号包含起来,例如:`(app="Apache" || app="Nginx") && country="CN"` **逻辑连接符:** - `=` - 匹配,=""时,可查询不存在字段或者值为空的情况 - `==` - 完全匹配,==""时,可查询存在且值为空的情况 - `&&` - 与(AND) - `||` - 或(OR) - `!=` - 不匹配,!=""时,可查询值不为空的情况 - `*=` - 模糊匹配,使用*或者?进行搜索 - `()` - 确认查询优先级,括号内容优先级最高 **常用查询语法分类:** **基础类:** `ip`(支持IPv4/IPv6/C段)、`port`、`domain`、`host`、`os`、`server`、`asn`、`org`、`is_domain`、`is_ipv6` **标记类:** `app`(应用识别)、`fid`(站点指纹)、`product`、`product.version`、`category`、`type`(service/subdomain)、`cloud_name`、`is_cloud`、`is_fraud`、`is_honeypot` **协议类(type=service):** `protocol`、`banner`、`banner_hash`、`banner_fid`、`base_protocol`(tcp/udp) **网站类(type=subdomain):** `title`、`header`、`header_hash`、`body`、`body_hash`、`js_name`、`js_md5`、`cname`、`cname_domain`、`icon_hash`、`status_code`、`icp`、`sdk_hash` **地理位置:** `country`(支持代码/中文)、`region`(支持英文/中文,中文仅限中国)、`city` **证书类:** `cert`、`cert.subject`、`cert.issuer`、`cert.subject.org`、`cert.subject.cn`、`cert.issuer.org`、`cert.issuer.cn`、`cert.domain`、`cert.is_equal`、`cert.is_valid`、`cert.is_match`、`cert.is_expired`、`jarm`、`tls.version`、`tls.ja3s`、`cert.sn`、`cert.not_after.after/before`、`cert.not_before.after/before` **时间类:** `after`(某时间之后更新)、`before`(某时间之前更新) **独立IP语法(不可与其他语法共用):** `port_size`、`port_size_gt`、`port_size_lt`、`ip_ports`、`ip_country`、`ip_region`、`ip_city`、`ip_after`、`ip_before` **常用查询示例:** - `app="Apache"` - 搜索Apache应用 - `title="登录"` - 搜索标题包含"登录"的页面 - `domain="example.com"` - 搜索特定域名 - `ip="1.1.1.1"` 或 `ip="220.181.111.1/24"` - 搜索IP或C段 - `port="80"` - 搜索开放80端口的资产 - `country="CN"` 或 `country="中国"` - 搜索中国境内的资产 - `city="Beijing"` - 搜索北京的资产 - `app="Apache" && country="CN"` - 组合查询 - `(app="Apache" || app="Nginx") && country="CN"` - 使用括号的复杂查询 - `title*="登录"` - 模糊匹配标题 - `after="2023-01-01" && before="2023-12-01"` - 时间范围查询 **详细语法文档:** 更多语法说明和组件列表请参考FOFA官方文档 **注意事项:** - API调用有频率限制,请合理使用 - 查询结果数量受账户权限限制 - full参数需要高级权限 parameters: - name: "query" type: "string" description: | FOFA查询语句(必需) 搜索查询语句,支持FOFA查询语法。 示例: - app="Apache" - title="登录" - domain="example.com" - ip="1.1.1.1" - port="80" - country="CN" - app="Apache" && country="CN" required: true position: 2 format: "positional" - name: "size" type: "int" description: | 返回结果数量(可选) 每页返回的结果数量,默认100。 取值范围:1-10000,受账户权限限制。 required: false position: 3 format: "positional" default: 100 - name: "page" type: "int" description: | 页码(可选) 要返回的页码,从1开始,默认1。 用于分页查询大量结果。 required: false position: 4 format: "positional" default: 1 - name: "fields" type: "string" description: | 返回字段列表(可选) 需要返回的字段,多个字段用逗号分隔。 默认字段:ip,port,domain 可用字段:host,title,ip,domain,port,protocol,country,province,city,server,icp,header,banner,body,as_number,as_organization required: false position: 5 format: "positional" default: "ip,port,domain" - name: "full" type: "bool" description: | 是否返回完整数据(可选) 设置为true时返回完整数据,需要高级权限。 默认false,返回基础数据。 required: false position: 6 format: "positional" default: false