mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-12 22:12:45 +00:00
569 lines
19 KiB
Python
569 lines
19 KiB
Python
"""
|
|
Terminal Agent API - Interactive infrastructure pentesting via AI chat + Docker sandbox.
|
|
|
|
Provides session-based terminal interaction with AI-guided command execution,
|
|
exploitation path tracking, and VPN status monitoring.
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from core.llm_manager import LLMManager
|
|
from core.sandbox_manager import get_sandbox
|
|
|
|
router = APIRouter()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# In-memory session store
|
|
# ---------------------------------------------------------------------------
|
|
terminal_sessions: Dict[str, Dict] = {}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pre-built templates
|
|
# ---------------------------------------------------------------------------
|
|
TEMPLATES = {
|
|
"network_scanner": {
|
|
"name": "Network Scanner",
|
|
"description": "Host discovery, port scanning, and service detection",
|
|
"system_prompt": (
|
|
"You are an expert network reconnaissance specialist. You guide the "
|
|
"operator through systematic host discovery, port scanning, and service "
|
|
"fingerprinting. Always suggest nmap flags appropriate for the situation, "
|
|
"explain output, and recommend next steps based on discovered services. "
|
|
"Prioritize stealth when asked and suggest timing/fragmentation options."
|
|
),
|
|
"initial_commands": [
|
|
"nmap -sn {target}",
|
|
"nmap -sV -sC -O -p- {target}",
|
|
"nmap -sU --top-ports 50 {target}",
|
|
],
|
|
},
|
|
"lateral_movement": {
|
|
"name": "Lateral Movement",
|
|
"description": "Pass-the-hash, SMB/WinRM pivoting, and SSH tunneling",
|
|
"system_prompt": (
|
|
"You are a lateral movement specialist. You help the operator pivot "
|
|
"through compromised networks using techniques such as pass-the-hash, "
|
|
"SMB relay, WinRM sessions, SSH tunneling, and SOCKS proxying. Always "
|
|
"verify credentials before attempting pivots, suggest cleanup steps, "
|
|
"and track which hosts have been compromised."
|
|
),
|
|
"initial_commands": [
|
|
"crackmapexec smb {target} -u '' -p ''",
|
|
"crackmapexec smb {target} --shares -u '' -p ''",
|
|
"ssh -D 1080 -N -f user@{target}",
|
|
],
|
|
},
|
|
"privilege_escalation": {
|
|
"name": "Privilege Escalation",
|
|
"description": "SUID binaries, kernel exploits, cron jobs, and writable paths",
|
|
"system_prompt": (
|
|
"You are a privilege escalation expert for Linux and Windows systems. "
|
|
"Guide the operator through enumeration of SUID/SGID binaries, kernel "
|
|
"version checks, misconfigured cron jobs, writable PATH directories, "
|
|
"sudo misconfigurations, and capability abuse. Suggest automated tools "
|
|
"like linpeas/winpeas when appropriate and explain each finding."
|
|
),
|
|
"initial_commands": [
|
|
"id && whoami && uname -a",
|
|
"find / -perm -4000 -type f 2>/dev/null",
|
|
"cat /etc/crontab && ls -la /etc/cron.*",
|
|
"echo $PATH | tr ':' '\\n' | xargs -I {} ls -ld {}",
|
|
],
|
|
},
|
|
"vpn_recon": {
|
|
"name": "VPN Reconnaissance",
|
|
"description": "VPN connection management and internal network discovery",
|
|
"system_prompt": (
|
|
"You are a VPN and internal network reconnaissance specialist. You "
|
|
"help the operator connect to target VPNs, verify tunnel status, "
|
|
"discover internal subnets, and enumerate services behind the VPN. "
|
|
"Always confirm connectivity before proceeding with scans and suggest "
|
|
"appropriate scope for internal reconnaissance."
|
|
),
|
|
"initial_commands": [
|
|
"openvpn --config client.ovpn --daemon",
|
|
"ip addr show tun0",
|
|
"ip route | grep tun",
|
|
"nmap -sn 10.0.0.0/24",
|
|
],
|
|
},
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pydantic request / response models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class CreateSessionRequest(BaseModel):
|
|
template_id: Optional[str] = None
|
|
target: Optional[str] = ""
|
|
name: Optional[str] = ""
|
|
|
|
|
|
class MessageRequest(BaseModel):
|
|
message: str
|
|
|
|
|
|
class ExecuteCommandRequest(BaseModel):
|
|
command: str
|
|
execution_method: str = "sandbox" # "sandbox" or "direct"
|
|
|
|
|
|
class ExploitationStepRequest(BaseModel):
|
|
description: str
|
|
command: Optional[str] = ""
|
|
result: Optional[str] = ""
|
|
step_type: str = "recon" # recon | exploit | pivot | escalate | action
|
|
|
|
|
|
class SessionSummary(BaseModel):
|
|
session_id: str
|
|
name: str
|
|
target: str
|
|
template_id: Optional[str]
|
|
status: str
|
|
created_at: str
|
|
messages_count: int
|
|
commands_count: int
|
|
|
|
|
|
class MessageResponse(BaseModel):
|
|
role: str
|
|
response: str
|
|
timestamp: str
|
|
suggested_commands: List[str]
|
|
|
|
|
|
class CommandResult(BaseModel):
|
|
command: str
|
|
exit_code: int
|
|
stdout: str
|
|
stderr: str
|
|
duration: float
|
|
execution_method: str
|
|
timestamp: str
|
|
|
|
|
|
class VPNStatus(BaseModel):
|
|
connected: bool
|
|
ip: Optional[str] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
def _build_session(
|
|
session_id: str,
|
|
name: str,
|
|
target: str,
|
|
template_id: Optional[str],
|
|
) -> Dict:
|
|
return {
|
|
"session_id": session_id,
|
|
"name": name,
|
|
"target": target,
|
|
"template_id": template_id,
|
|
"status": "active",
|
|
"created_at": _now_iso(),
|
|
"messages": [],
|
|
"command_history": [],
|
|
"exploitation_path": [],
|
|
"vpn_status": {"connected": False, "ip": None},
|
|
}
|
|
|
|
|
|
def _get_session(session_id: str) -> Dict:
|
|
session = terminal_sessions.get(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
return session
|
|
|
|
|
|
def _build_context_string(
|
|
messages: List[Dict],
|
|
commands: List[Dict],
|
|
exploitation: List[Dict],
|
|
) -> str:
|
|
parts: List[str] = []
|
|
|
|
if messages:
|
|
parts.append("=== Recent Conversation ===")
|
|
for msg in messages:
|
|
role = msg.get("role", "unknown").upper()
|
|
parts.append(f"[{role}] {msg.get('content', '')}")
|
|
|
|
if commands:
|
|
parts.append("\n=== Recent Command Results ===")
|
|
for cmd in commands:
|
|
parts.append(
|
|
f"$ {cmd['command']}\n"
|
|
f"Exit code: {cmd['exit_code']}\n"
|
|
f"Stdout: {cmd['stdout'][:500]}\n"
|
|
f"Stderr: {cmd['stderr'][:300]}"
|
|
)
|
|
|
|
if exploitation:
|
|
parts.append("\n=== Exploitation Path ===")
|
|
for i, step in enumerate(exploitation, 1):
|
|
parts.append(
|
|
f"Step {i} [{step['step_type']}]: {step['description']}"
|
|
)
|
|
if step.get("command"):
|
|
parts.append(f" Command: {step['command']}")
|
|
if step.get("result"):
|
|
parts.append(f" Result: {step['result'][:300]}")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _extract_suggested_commands(text: str) -> List[str]:
|
|
"""Extract commands from backtick-fenced code blocks."""
|
|
blocks = re.findall(r"```(?:bash|sh|shell)?\n?(.*?)```", text, re.DOTALL)
|
|
commands: List[str] = []
|
|
for block in blocks:
|
|
for line in block.strip().splitlines():
|
|
stripped = line.strip()
|
|
if stripped and not stripped.startswith("#"):
|
|
commands.append(stripped)
|
|
return commands
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Template endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/templates")
|
|
async def list_templates():
|
|
"""List all available session templates."""
|
|
result = []
|
|
for tid, tmpl in TEMPLATES.items():
|
|
result.append({
|
|
"id": tid,
|
|
"name": tmpl["name"],
|
|
"description": tmpl["description"],
|
|
"initial_commands": tmpl["initial_commands"],
|
|
})
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session CRUD
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/session")
|
|
async def create_session(req: CreateSessionRequest):
|
|
"""Create a new terminal session, optionally from a template."""
|
|
session_id = str(uuid.uuid4())
|
|
target = req.target or ""
|
|
template_id = req.template_id
|
|
|
|
if template_id and template_id not in TEMPLATES:
|
|
raise HTTPException(status_code=400, detail=f"Unknown template: {template_id}")
|
|
|
|
name = req.name or (
|
|
TEMPLATES[template_id]["name"] if template_id else f"Session {session_id[:8]}"
|
|
)
|
|
|
|
session = _build_session(session_id, name, target, template_id)
|
|
|
|
# Seed initial system message from template
|
|
if template_id:
|
|
tmpl = TEMPLATES[template_id]
|
|
session["messages"].append({
|
|
"role": "system",
|
|
"content": tmpl["system_prompt"],
|
|
"timestamp": _now_iso(),
|
|
"metadata": {"template": template_id},
|
|
})
|
|
# Provide initial suggested commands with target interpolated
|
|
initial_cmds = [
|
|
cmd.replace("{target}", target) for cmd in tmpl["initial_commands"]
|
|
]
|
|
session["messages"].append({
|
|
"role": "assistant",
|
|
"content": (
|
|
f"Session initialised with the **{tmpl['name']}** template.\n\n"
|
|
f"Target: `{target or '(not set)'}`\n\n"
|
|
"Suggested starting commands:\n"
|
|
+ "\n".join(f"```\n{c}\n```" for c in initial_cmds)
|
|
),
|
|
"timestamp": _now_iso(),
|
|
"suggested_commands": initial_cmds,
|
|
})
|
|
|
|
terminal_sessions[session_id] = session
|
|
return session
|
|
|
|
|
|
@router.get("/sessions")
|
|
async def list_sessions():
|
|
"""Return lightweight summaries of every session."""
|
|
summaries = []
|
|
for sid, s in terminal_sessions.items():
|
|
summaries.append(
|
|
SessionSummary(
|
|
session_id=sid,
|
|
name=s["name"],
|
|
target=s["target"],
|
|
template_id=s["template_id"],
|
|
status=s["status"],
|
|
created_at=s["created_at"],
|
|
messages_count=len(s["messages"]),
|
|
commands_count=len(s["command_history"]),
|
|
).model_dump()
|
|
)
|
|
return summaries
|
|
|
|
|
|
@router.get("/sessions/{session_id}")
|
|
async def get_session(session_id: str):
|
|
"""Return the full session including messages, commands, and exploitation path."""
|
|
return _get_session(session_id)
|
|
|
|
|
|
@router.delete("/sessions/{session_id}")
|
|
async def delete_session(session_id: str):
|
|
"""Delete a terminal session."""
|
|
if session_id not in terminal_sessions:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
del terminal_sessions[session_id]
|
|
return {"status": "deleted", "session_id": session_id}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AI message interaction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/message")
|
|
async def send_message(session_id: str, req: MessageRequest):
|
|
"""Send a user prompt to the AI and receive a response with suggested commands."""
|
|
session = _get_session(session_id)
|
|
user_message = req.message.strip()
|
|
if not user_message:
|
|
raise HTTPException(status_code=400, detail="Message content cannot be empty")
|
|
|
|
# Record user message
|
|
session["messages"].append({
|
|
"role": "user",
|
|
"content": user_message,
|
|
"timestamp": _now_iso(),
|
|
"metadata": {},
|
|
})
|
|
|
|
# Determine system prompt
|
|
template_id = session.get("template_id")
|
|
if template_id and template_id in TEMPLATES:
|
|
system_prompt = TEMPLATES[template_id]["system_prompt"]
|
|
else:
|
|
system_prompt = (
|
|
"You are an expert infrastructure penetration tester. Help the "
|
|
"operator plan and execute attacks against the target. Suggest "
|
|
"concrete commands, explain their purpose, and interpret output. "
|
|
"Always wrap commands in fenced code blocks so they can be extracted."
|
|
)
|
|
|
|
# Build context window
|
|
context_messages = session["messages"][-20:]
|
|
context_cmds = session["command_history"][-10:]
|
|
exploitation = session["exploitation_path"]
|
|
context = _build_context_string(context_messages, context_cmds, exploitation)
|
|
|
|
# Call LLM
|
|
try:
|
|
llm = LLMManager()
|
|
prompt = f"{context}\n\nUser: {user_message}"
|
|
response = await llm.generate(prompt, system_prompt)
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=502, detail=f"LLM call failed: {exc}")
|
|
|
|
suggested_commands = _extract_suggested_commands(response)
|
|
|
|
# Record assistant response
|
|
session["messages"].append({
|
|
"role": "assistant",
|
|
"content": response,
|
|
"timestamp": _now_iso(),
|
|
"suggested_commands": suggested_commands,
|
|
})
|
|
|
|
return MessageResponse(
|
|
role="assistant",
|
|
response=response,
|
|
timestamp=session["messages"][-1]["timestamp"],
|
|
suggested_commands=suggested_commands,
|
|
).model_dump()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Command execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/execute")
|
|
async def execute_command(session_id: str, req: ExecuteCommandRequest):
|
|
"""Execute a command in the Docker sandbox (fallback: direct shell)."""
|
|
session = _get_session(session_id)
|
|
command = req.command.strip()
|
|
if not command:
|
|
raise HTTPException(status_code=400, detail="Command cannot be empty")
|
|
|
|
start = time.time()
|
|
stdout = ""
|
|
stderr = ""
|
|
exit_code = -1
|
|
execution_method = "direct"
|
|
|
|
# Use requested execution method
|
|
use_sandbox = req.execution_method == "sandbox"
|
|
|
|
if use_sandbox:
|
|
try:
|
|
sandbox = await get_sandbox()
|
|
if sandbox and sandbox.is_available:
|
|
result = await sandbox.execute_raw(command)
|
|
stdout = result.stdout
|
|
stderr = result.stderr
|
|
exit_code = result.exit_code
|
|
execution_method = "sandbox"
|
|
except Exception:
|
|
pass # Fall through to direct execution
|
|
|
|
# Fallback or direct execution requested
|
|
if execution_method != "sandbox":
|
|
try:
|
|
proc = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
raw_stdout, raw_stderr = await asyncio.wait_for(
|
|
proc.communicate(), timeout=120
|
|
)
|
|
stdout = raw_stdout.decode(errors="replace")
|
|
stderr = raw_stderr.decode(errors="replace")
|
|
exit_code = proc.returncode or 0
|
|
execution_method = "direct"
|
|
except asyncio.TimeoutError:
|
|
stderr = "Command timed out after 120 seconds"
|
|
exit_code = 124
|
|
except Exception as exc:
|
|
stderr = str(exc)
|
|
exit_code = 1
|
|
|
|
duration = round(time.time() - start, 3)
|
|
|
|
cmd_record = {
|
|
"command": command,
|
|
"exit_code": exit_code,
|
|
"stdout": stdout,
|
|
"stderr": stderr,
|
|
"duration": duration,
|
|
"execution_method": execution_method,
|
|
"timestamp": _now_iso(),
|
|
}
|
|
session["command_history"].append(cmd_record)
|
|
|
|
# Mirror into messages for AI context continuity
|
|
output_preview = stdout[:2000] if stdout else stderr[:2000]
|
|
session["messages"].append({
|
|
"role": "tool",
|
|
"content": f"$ {command}\n[exit {exit_code}] ({execution_method}, {duration}s)\n{output_preview}",
|
|
"timestamp": cmd_record["timestamp"],
|
|
"metadata": {"exit_code": exit_code, "execution_method": execution_method},
|
|
})
|
|
|
|
return CommandResult(**cmd_record).model_dump()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Exploitation path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/exploitation-path")
|
|
async def add_exploitation_step(session_id: str, req: ExploitationStepRequest):
|
|
"""Add a manual step to the exploitation path timeline."""
|
|
session = _get_session(session_id)
|
|
|
|
valid_types = {"recon", "exploit", "pivot", "escalate", "action"}
|
|
if req.step_type not in valid_types:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"step_type must be one of {sorted(valid_types)}",
|
|
)
|
|
|
|
step = {
|
|
"description": req.description,
|
|
"command": req.command or "",
|
|
"result": req.result or "",
|
|
"timestamp": _now_iso(),
|
|
"step_type": req.step_type,
|
|
}
|
|
session["exploitation_path"].append(step)
|
|
return step
|
|
|
|
|
|
@router.get("/sessions/{session_id}/exploitation-path")
|
|
async def get_exploitation_path(session_id: str):
|
|
"""Return the full exploitation path timeline."""
|
|
session = _get_session(session_id)
|
|
return session["exploitation_path"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# VPN status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/sessions/{session_id}/vpn-status")
|
|
async def get_vpn_status(session_id: str):
|
|
"""Check OpenVPN process and tun0 interface status."""
|
|
session = _get_session(session_id)
|
|
|
|
connected = False
|
|
ip_addr: Optional[str] = None
|
|
|
|
# Check for running openvpn process
|
|
try:
|
|
proc = await asyncio.create_subprocess_shell(
|
|
"pgrep -a openvpn",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
raw_stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
|
|
if proc.returncode == 0 and raw_stdout.strip():
|
|
connected = True
|
|
except Exception:
|
|
pass
|
|
|
|
# Check tun0 interface for IP
|
|
if connected:
|
|
try:
|
|
proc = await asyncio.create_subprocess_shell(
|
|
"ip addr show tun0",
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
raw_stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
|
|
if proc.returncode == 0:
|
|
match = re.search(
|
|
r"inet\s+(\d+\.\d+\.\d+\.\d+)", raw_stdout.decode(errors="replace")
|
|
)
|
|
if match:
|
|
ip_addr = match.group(1)
|
|
except Exception:
|
|
pass
|
|
|
|
vpn = {"connected": connected, "ip": ip_addr}
|
|
session["vpn_status"] = vpn
|
|
return VPNStatus(**vpn).model_dump()
|