import json import logging import re import subprocess import shlex import shutil import urllib.parse import os from typing import Dict, Any, List, Optional, Tuple from datetime import datetime from core.llm_manager import LLMManager logger = logging.getLogger(__name__) class BaseAgent: """ Autonomous AI-Powered Security Agent. This agent operates like a real pentester: 1. Discovers attack surface dynamically 2. Analyzes responses intelligently 3. Adapts testing based on findings 4. Intensifies when it finds something interesting 5. Documents real PoCs """ def __init__(self, agent_name: str, config: Dict, llm_manager: LLMManager, context_prompts: Dict): self.agent_name = agent_name self.config = config self.llm_manager = llm_manager self.context_prompts = context_prompts self.agent_role_config = self.config.get('agent_roles', {}).get(agent_name, {}) self.tools_allowed = self.agent_role_config.get('tools_allowed', []) self.description = self.agent_role_config.get('description', 'Autonomous Security Tester') # Attack surface discovered self.discovered_endpoints = [] self.discovered_params = [] self.discovered_forms = [] self.tech_stack = {} # Findings self.vulnerabilities = [] self.interesting_findings = [] self.tool_history = [] logger.info(f"Initialized {self.agent_name} - Autonomous Agent") def _extract_targets(self, user_input: str) -> List[str]: """Extract target URLs from input.""" targets = [] if os.path.isfile(user_input.strip()): with open(user_input.strip(), 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#'): targets.append(self._normalize_url(line)) return targets url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' urls = re.findall(url_pattern, user_input) if urls: return [self._normalize_url(u) for u in urls] domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b' domains = re.findall(domain_pattern, user_input) if domains: return [f"http://{d}" for d in domains] return [] def _normalize_url(self, url: str) -> str: url = url.strip() if not url.startswith(('http://', 'https://')): url = f"http://{url}" return url def _get_domain(self, url: str) -> str: parsed = urllib.parse.urlparse(url) return parsed.netloc or parsed.path.split('/')[0] def run_command(self, tool: str, args: str, timeout: int = 60) -> Dict: """Execute command and capture output.""" result = { "tool": tool, "args": args, "command": "", "success": False, "output": "", "timestamp": datetime.now().isoformat() } tool_path = self.config.get('tools', {}).get(tool) or shutil.which(tool) if not tool_path: result["output"] = f"[!] Tool '{tool}' not found - using alternative" logger.warning(f"Tool not found: {tool}") self.tool_history.append(result) return result try: if tool == "curl": cmd = f"{tool_path} {args}" else: cmd = f"{tool_path} {args}" result["command"] = cmd print(f" [>] {tool}: {args[:80]}{'...' if len(args) > 80 else ''}") proc = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout ) output = proc.stdout or proc.stderr result["output"] = output[:8000] if output else "[No output]" result["success"] = proc.returncode == 0 except subprocess.TimeoutExpired: result["output"] = f"[!] Timeout after {timeout}s" except Exception as e: result["output"] = f"[!] Error: {str(e)}" self.tool_history.append(result) return result def execute(self, user_input: str, campaign_data: Dict = None) -> Dict: """Execute autonomous security assessment.""" targets = self._extract_targets(user_input) if not targets: return { "error": "No targets found", "llm_response": "Please provide a URL, domain, IP, or file with targets." } print(f"\n{'='*70}") print(f" NEUROSPLOIT AUTONOMOUS AGENT - {self.agent_name.upper()}") print(f"{'='*70}") print(f" Mode: Adaptive AI-Driven Testing") print(f" Targets: {len(targets)}") print(f"{'='*70}\n") all_findings = [] for idx, target in enumerate(targets, 1): if len(targets) > 1: print(f"\n[TARGET {idx}/{len(targets)}] {target}") print("=" * 60) self.tool_history = [] self.vulnerabilities = [] self.discovered_endpoints = [] findings = self._autonomous_assessment(target) all_findings.extend(findings) final_report = self._generate_final_report(targets, all_findings) return { "agent_name": self.agent_name, "input": user_input, "targets": targets, "targets_count": len(targets), "tools_executed": len(self.tool_history), "vulnerabilities_found": len(self.vulnerabilities), "findings": all_findings, "llm_response": final_report, "scan_data": { "targets": targets, "tools_executed": len(self.tool_history), "endpoints_discovered": len(self.discovered_endpoints) } } def _autonomous_assessment(self, target: str) -> List[Dict]: """ Autonomous assessment with AI-driven adaptation. The AI analyzes each response and decides next steps. """ # Phase 1: Initial Reconnaissance & Discovery print(f"\n[PHASE 1] Autonomous Discovery - {target}") print("-" * 50) discovery_data = self._discover_attack_surface(target) # Phase 2: AI Analysis of Attack Surface print(f"\n[PHASE 2] AI Attack Surface Analysis") print("-" * 50) attack_plan = self._ai_analyze_attack_surface(target, discovery_data) # Phase 3: Adaptive Exploitation Loop print(f"\n[PHASE 3] Adaptive Exploitation") print("-" * 50) self._adaptive_exploitation_loop(target, attack_plan) # Phase 4: Deep Dive on Findings print(f"\n[PHASE 4] Deep Exploitation of Findings") print("-" * 50) self._deep_exploitation(target) return self.tool_history def _discover_attack_surface(self, target: str) -> Dict: """Dynamically discover all attack vectors.""" discovery = { "base_response": "", "headers": {}, "endpoints": [], "params": [], "forms": [], "tech_hints": [], "interesting_files": [] } # Get base response result = self.run_command("curl", f'-s -k -L -D - "{target}"') discovery["base_response"] = result.get("output", "") # Extract headers headers_match = re.findall(r'^([A-Za-z-]+):\s*(.+)$', discovery["base_response"], re.MULTILINE) discovery["headers"] = dict(headers_match) # Get HTML and extract links html_result = self.run_command("curl", f'-s -k "{target}"') html = html_result.get("output", "") # Extract all links links = re.findall(r'(?:href|src|action)=["\']([^"\']+)["\']', html, re.IGNORECASE) for link in links: if not link.startswith(('http://', 'https://', '//', '#', 'javascript:', 'mailto:')): full_url = urllib.parse.urljoin(target, link) if full_url not in discovery["endpoints"]: discovery["endpoints"].append(full_url) elif link.startswith('/'): full_url = urllib.parse.urljoin(target, link) if full_url not in discovery["endpoints"]: discovery["endpoints"].append(full_url) # Extract forms and inputs forms = re.findall(r'
', html, re.IGNORECASE | re.DOTALL) for action, form_content in forms: inputs = re.findall(r']*name=["\']([^"\']+)["\']', form_content, re.IGNORECASE) discovery["forms"].append({ "action": urllib.parse.urljoin(target, action) if action else target, "inputs": inputs }) # Extract URL parameters from links for endpoint in discovery["endpoints"]: parsed = urllib.parse.urlparse(endpoint) params = urllib.parse.parse_qs(parsed.query) for param in params.keys(): if param not in discovery["params"]: discovery["params"].append(param) # Check common files common_files = [ "robots.txt", "sitemap.xml", ".htaccess", "crossdomain.xml", "phpinfo.php", "info.php", "test.php", "admin/", "login.php", "wp-config.php.bak", ".git/config", ".env", "config.php.bak" ] for file in common_files[:8]: result = self.run_command("curl", f'-s -k -o /dev/null -w "%{{http_code}}" "{target}/{file}"') if result.get("output", "").strip() in ["200", "301", "302", "403"]: discovery["interesting_files"].append(f"{target}/{file}") # Detect technologies tech_patterns = { "PHP": [r'\.php', r'PHPSESSID', r'X-Powered-By:.*PHP'], "ASP.NET": [r'\.aspx?', r'ASP\.NET', r'__VIEWSTATE'], "Java": [r'\.jsp', r'JSESSIONID', r'\.do\b'], "Python": [r'Django', r'Flask', r'\.py'], "WordPress": [r'wp-content', r'wp-includes'], "MySQL": [r'mysql', r'MariaDB'], } full_response = discovery["base_response"] + html for tech, patterns in tech_patterns.items(): for pattern in patterns: if re.search(pattern, full_response, re.IGNORECASE): if tech not in discovery["tech_hints"]: discovery["tech_hints"].append(tech) self.discovered_endpoints = discovery["endpoints"] print(f" [+] Discovered {len(discovery['endpoints'])} endpoints") print(f" [+] Found {len(discovery['params'])} parameters") print(f" [+] Found {len(discovery['forms'])} forms") print(f" [+] Tech hints: {', '.join(discovery['tech_hints']) or 'Unknown'}") return discovery def _ai_analyze_attack_surface(self, target: str, discovery: Dict) -> str: """AI analyzes discovered surface and creates attack plan.""" analysis_prompt = f"""You are an elite penetration tester analyzing an attack surface. TARGET: {target} === DISCOVERED ATTACK SURFACE === **Endpoints Found ({len(discovery['endpoints'])}):** {chr(10).join(discovery['endpoints'][:20])} **Parameters Found:** {', '.join(discovery['params'][:20])} **Forms Found:** {json.dumps(discovery['forms'][:10], indent=2)} **Technologies Detected:** {', '.join(discovery['tech_hints'])} **Interesting Files:** {chr(10).join(discovery['interesting_files'])} **Response Headers:** {json.dumps(dict(list(discovery['headers'].items())[:10]), indent=2)} === YOUR TASK === Analyze this attack surface and output SPECIFIC tests to run. For each test, output in this EXACT format: [TEST] curl -s -k "[URL_WITH_PAYLOAD]" [TEST] curl -s -k "[URL]" -d "param=payload" Focus on: 1. SQL Injection - test EVERY parameter with: ' " 1 OR 1=1 UNION SELECT 2. XSS - test inputs with: