mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-03-31 08:19:54 +02:00
343 lines
13 KiB
YAML
343 lines
13 KiB
YAML
name: "fofa_search"
|
||
command: "python3"
|
||
args:
|
||
- "-c"
|
||
- |
|
||
import sys
|
||
import json
|
||
import base64
|
||
import requests
|
||
import os
|
||
|
||
# ==================== FOFA配置 ====================
|
||
# 请在此处配置您的FOFA账号信息
|
||
# 您也可以在环境变量中设置:FOFA_EMAIL 和 FOFA_API_KEY
|
||
# enable 默认为 false,需开启才能调用该MCP
|
||
FOFA_EMAIL = "" # 请填写您的FOFA账号邮箱
|
||
FOFA_API_KEY = "" # 请填写您的FOFA API密钥
|
||
# ==================================================
|
||
|
||
# FOFA API基础URL
|
||
base_url = "https://fofa.info/api/v1/search/all"
|
||
|
||
# 解析参数(从JSON字符串或命令行参数)
|
||
def parse_args():
|
||
# 尝试从第一个参数读取JSON配置
|
||
if len(sys.argv) > 1:
|
||
try:
|
||
# 确保 sys.argv[1] 是字符串
|
||
arg1 = str(sys.argv[1])
|
||
# 尝试解析为JSON
|
||
config = json.loads(arg1)
|
||
# 确保返回的是字典类型
|
||
if isinstance(config, dict):
|
||
return config
|
||
except (json.JSONDecodeError, TypeError, ValueError):
|
||
# 如果不是JSON,使用传统的位置参数方式
|
||
pass
|
||
|
||
# 传统位置参数方式(向后兼容)
|
||
# 注意:email 和 api_key 已从参数中移除,现在从配置中读取
|
||
# 参数位置:query=2, size=3, page=4, fields=5, full=6
|
||
# 但在 sys.argv 中,由于 python3 -c "code" 的格式,实际位置需要调整
|
||
# sys.argv[0] 是 '-c',sys.argv[1] 开始是实际参数
|
||
config = {}
|
||
if len(sys.argv) > 1:
|
||
config['query'] = str(sys.argv[1])
|
||
if len(sys.argv) > 2:
|
||
try:
|
||
config['size'] = int(sys.argv[2])
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if len(sys.argv) > 3:
|
||
try:
|
||
config['page'] = int(sys.argv[3])
|
||
except (ValueError, TypeError):
|
||
pass
|
||
if len(sys.argv) > 4:
|
||
config['fields'] = str(sys.argv[4])
|
||
if len(sys.argv) > 5:
|
||
val = sys.argv[5]
|
||
if isinstance(val, str):
|
||
config['full'] = val.lower() in ('true', '1', 'yes')
|
||
else:
|
||
config['full'] = bool(val)
|
||
return config
|
||
|
||
try:
|
||
config = parse_args()
|
||
|
||
# 确保 config 是字典类型
|
||
if not isinstance(config, dict):
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"参数解析错误: 期望字典类型,但得到 {type(config).__name__}",
|
||
"type": "TypeError"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
# 从配置或环境变量获取email和api_key
|
||
email = os.getenv('FOFA_EMAIL', FOFA_EMAIL).strip()
|
||
api_key = os.getenv('FOFA_API_KEY', FOFA_API_KEY).strip()
|
||
query = config.get('query', '').strip()
|
||
|
||
if not email:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": "缺少FOFA配置: email(FOFA账号邮箱)",
|
||
"required_config": ["email", "api_key"],
|
||
"note": "请在YAML文件的FOFA_EMAIL配置项中填写您的FOFA账号邮箱,或在环境变量FOFA_EMAIL中设置"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
if not api_key:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": "缺少FOFA配置: api_key(FOFA API密钥)",
|
||
"required_config": ["email", "api_key"],
|
||
"note": "请在YAML文件的FOFA_API_KEY配置项中填写您的API密钥,或在环境变量FOFA_API_KEY中设置。API密钥可在FOFA个人中心获取: https://fofa.info/userInfo"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
if not query:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": "缺少必需参数: query(搜索查询语句)",
|
||
"required_params": ["query"],
|
||
"examples": [
|
||
'app="Apache"',
|
||
'title="登录"',
|
||
'domain="example.com"',
|
||
'ip="1.1.1.1"',
|
||
'port="80"',
|
||
'country="CN"',
|
||
'city="Beijing"'
|
||
]
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
# 构建请求参数
|
||
params = {
|
||
'email': email,
|
||
'key': api_key,
|
||
'qbase64': base64.b64encode(query.encode('utf-8')).decode('utf-8')
|
||
}
|
||
|
||
# 可选参数
|
||
if 'size' in config and config['size'] is not None:
|
||
try:
|
||
size = int(config['size'])
|
||
if size > 0:
|
||
params['size'] = size
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if 'page' in config and config['page'] is not None:
|
||
try:
|
||
page = int(config['page'])
|
||
if page > 0:
|
||
params['page'] = page
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
if 'fields' in config and config['fields']:
|
||
params['fields'] = str(config['fields']).strip()
|
||
|
||
if 'full' in config and config['full'] is not None:
|
||
full_val = config['full']
|
||
if isinstance(full_val, bool):
|
||
params['full'] = 'true' if full_val else 'false'
|
||
elif isinstance(full_val, str):
|
||
params['full'] = 'true' if full_val.lower() in ('true', '1', 'yes') else 'false'
|
||
elif isinstance(full_val, (int, float)):
|
||
params['full'] = 'true' if full_val != 0 else 'false'
|
||
|
||
# 发送请求
|
||
try:
|
||
response = requests.get(base_url, params=params, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
result_data = response.json()
|
||
|
||
# 检查FOFA API返回的错误
|
||
if result_data.get('error'):
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"FOFA API错误: {result_data.get('errmsg', '未知错误')}",
|
||
"error_code": result_data.get('error'),
|
||
"suggestion": "请检查API密钥是否正确,或查询语句是否符合FOFA语法"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
# 格式化输出结果
|
||
output = {
|
||
"status": "success",
|
||
"query": query,
|
||
"size": result_data.get('size', 0),
|
||
"page": result_data.get('page', 1),
|
||
"total": result_data.get('total', 0),
|
||
"results_count": len(result_data.get('results', [])),
|
||
"results": result_data.get('results', []),
|
||
"message": f"成功获取 {len(result_data.get('results', []))} 条结果"
|
||
}
|
||
|
||
print(json.dumps(output, ensure_ascii=False, indent=2))
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"请求失败: {str(e)}",
|
||
"suggestion": "请检查网络连接或FOFA API服务状态"
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
|
||
except Exception as e:
|
||
error_result = {
|
||
"status": "error",
|
||
"message": f"执行出错: {str(e)}",
|
||
"type": type(e).__name__
|
||
}
|
||
print(json.dumps(error_result, ensure_ascii=False, indent=2))
|
||
sys.exit(1)
|
||
enabled: false
|
||
short_description: "FOFA网络空间搜索引擎,支持灵活的查询参数配置"
|
||
description: |
|
||
FOFA是一个网络空间测绘搜索引擎,可以通过多种查询条件搜索互联网资产。
|
||
|
||
**主要功能:**
|
||
- 支持多种查询语法(app、title、domain、ip、port、country、city等)
|
||
- 灵活的字段返回配置
|
||
- 分页查询支持
|
||
- 完整数据模式(full参数)
|
||
|
||
**使用场景:**
|
||
- 资产发现和枚举
|
||
- 漏洞影响范围评估
|
||
- 安全态势感知
|
||
- 威胁情报收集
|
||
- Bug bounty信息收集
|
||
|
||
**查询语法说明:**
|
||
|
||
**基础查询:**
|
||
- 直接输入查询语句,将从标题、HTML内容、HTTP头信息、URL字段中搜索
|
||
- 如果查询表达式有多个与或关系,尽量在外面用括号包含起来,例如:`(app="Apache" || app="Nginx") && country="CN"`
|
||
|
||
**逻辑连接符:**
|
||
- `=` - 匹配,=""时,可查询不存在字段或者值为空的情况
|
||
- `==` - 完全匹配,==""时,可查询存在且值为空的情况
|
||
- `&&` - 与(AND)
|
||
- `||` - 或(OR)
|
||
- `!=` - 不匹配,!=""时,可查询值不为空的情况
|
||
- `*=` - 模糊匹配,使用*或者?进行搜索
|
||
- `()` - 确认查询优先级,括号内容优先级最高
|
||
|
||
**常用查询语法分类:**
|
||
|
||
**基础类:** `ip`(支持IPv4/IPv6/C段)、`port`、`domain`、`host`、`os`、`server`、`asn`、`org`、`is_domain`、`is_ipv6`
|
||
|
||
**标记类:** `app`(应用识别)、`fid`(站点指纹)、`product`、`product.version`、`category`、`type`(service/subdomain)、`cloud_name`、`is_cloud`、`is_fraud`、`is_honeypot`
|
||
|
||
**协议类(type=service):** `protocol`、`banner`、`banner_hash`、`banner_fid`、`base_protocol`(tcp/udp)
|
||
|
||
**网站类(type=subdomain):** `title`、`header`、`header_hash`、`body`、`body_hash`、`js_name`、`js_md5`、`cname`、`cname_domain`、`icon_hash`、`status_code`、`icp`、`sdk_hash`
|
||
|
||
**地理位置:** `country`(支持代码/中文)、`region`(支持英文/中文,中文仅限中国)、`city`
|
||
|
||
**证书类:** `cert`、`cert.subject`、`cert.issuer`、`cert.subject.org`、`cert.subject.cn`、`cert.issuer.org`、`cert.issuer.cn`、`cert.domain`、`cert.is_equal`、`cert.is_valid`、`cert.is_match`、`cert.is_expired`、`jarm`、`tls.version`、`tls.ja3s`、`cert.sn`、`cert.not_after.after/before`、`cert.not_before.after/before`
|
||
|
||
**时间类:** `after`(某时间之后更新)、`before`(某时间之前更新)
|
||
|
||
**独立IP语法(不可与其他语法共用):** `port_size`、`port_size_gt`、`port_size_lt`、`ip_ports`、`ip_country`、`ip_region`、`ip_city`、`ip_after`、`ip_before`
|
||
|
||
**常用查询示例:**
|
||
- `app="Apache"` - 搜索Apache应用
|
||
- `title="登录"` - 搜索标题包含"登录"的页面
|
||
- `domain="example.com"` - 搜索特定域名
|
||
- `ip="1.1.1.1"` 或 `ip="220.181.111.1/24"` - 搜索IP或C段
|
||
- `port="80"` - 搜索开放80端口的资产
|
||
- `country="CN"` 或 `country="中国"` - 搜索中国境内的资产
|
||
- `city="Beijing"` - 搜索北京的资产
|
||
- `app="Apache" && country="CN"` - 组合查询
|
||
- `(app="Apache" || app="Nginx") && country="CN"` - 使用括号的复杂查询
|
||
- `title*="登录"` - 模糊匹配标题
|
||
- `after="2023-01-01" && before="2023-12-01"` - 时间范围查询
|
||
|
||
**详细语法文档:** 更多语法说明和组件列表请参考FOFA官方文档
|
||
|
||
|
||
**注意事项:**
|
||
- API调用有频率限制,请合理使用
|
||
- 查询结果数量受账户权限限制
|
||
- full参数需要高级权限
|
||
parameters:
|
||
|
||
- name: "query"
|
||
type: "string"
|
||
description: |
|
||
FOFA查询语句(必需)
|
||
|
||
搜索查询语句,支持FOFA查询语法。
|
||
示例:
|
||
- app="Apache"
|
||
- title="登录"
|
||
- domain="example.com"
|
||
- ip="1.1.1.1"
|
||
- port="80"
|
||
- country="CN"
|
||
- app="Apache" && country="CN"
|
||
required: true
|
||
position: 2
|
||
format: "positional"
|
||
- name: "size"
|
||
type: "int"
|
||
description: |
|
||
返回结果数量(可选)
|
||
|
||
每页返回的结果数量,默认100。
|
||
取值范围:1-10000,受账户权限限制。
|
||
required: false
|
||
position: 3
|
||
format: "positional"
|
||
default: 100
|
||
- name: "page"
|
||
type: "int"
|
||
description: |
|
||
页码(可选)
|
||
|
||
要返回的页码,从1开始,默认1。
|
||
用于分页查询大量结果。
|
||
required: false
|
||
position: 4
|
||
format: "positional"
|
||
default: 1
|
||
- name: "fields"
|
||
type: "string"
|
||
description: |
|
||
返回字段列表(可选)
|
||
|
||
需要返回的字段,多个字段用逗号分隔。
|
||
默认字段:ip,port,domain
|
||
可用字段:host,title,ip,domain,port,protocol,country,province,city,server,icp,header,banner,body,as_number,as_organization
|
||
required: false
|
||
position: 5
|
||
format: "positional"
|
||
default: "ip,port,domain"
|
||
- name: "full"
|
||
type: "bool"
|
||
description: |
|
||
是否返回完整数据(可选)
|
||
|
||
设置为true时返回完整数据,需要高级权限。
|
||
默认false,返回基础数据。
|
||
required: false
|
||
position: 6
|
||
format: "positional"
|
||
default: false
|