diff --git a/agents/base_agent.py b/agents/base_agent.py index 2c62084..ff59fb8 100644 --- a/agents/base_agent.py +++ b/agents/base_agent.py @@ -1,130 +1,678 @@ import json import logging -from typing import Dict, Any, List, Optional import re import subprocess +import shlex +import shutil +import urllib.parse +import os +from typing import Dict, Any, List, Optional, Tuple +from datetime import datetime from core.llm_manager import LLMManager logger = logging.getLogger(__name__) + class BaseAgent: """ - A generic agent class that orchestrates LLM interactions, tool usage, - and adheres to specific agent roles (e.g., Red Team, Blue Team). + Autonomous AI-Powered Security Agent. + + This agent operates like a real pentester: + 1. Discovers attack surface dynamically + 2. Analyzes responses intelligently + 3. Adapts testing based on findings + 4. Intensifies when it finds something interesting + 5. Documents real PoCs """ + def __init__(self, agent_name: str, config: Dict, llm_manager: LLMManager, context_prompts: Dict): self.agent_name = agent_name self.config = config self.llm_manager = llm_manager - self.context_prompts = context_prompts # This will contain user_prompt and system_prompt for this agent role - + self.context_prompts = context_prompts + self.agent_role_config = self.config.get('agent_roles', {}).get(agent_name, {}) self.tools_allowed = self.agent_role_config.get('tools_allowed', []) - self.description = self.agent_role_config.get('description', 'No description provided.') - - logger.info(f"Initialized {self.agent_name} agent. Description: {self.description}") + self.description = self.agent_role_config.get('description', 'Autonomous Security Tester') - def _prepare_prompt(self, user_input: str, additional_context: Dict = None) -> str: - """ - Prepares the user prompt for the LLM, incorporating agent-specific instructions - and dynamic context. - """ - user_prompt_template = self.context_prompts.get("user_prompt", "") - if not user_prompt_template: - logger.warning(f"No user prompt template found for agent {self.agent_name}.") - return user_input # Fallback to raw user input + # Attack surface discovered + self.discovered_endpoints = [] + self.discovered_params = [] + self.discovered_forms = [] + self.tech_stack = {} - # Create a dictionary with all the possible placeholders - format_dict = { - "user_input": user_input, - "target_info_json": user_input, # for bug_bounty_hunter - "recon_data_json": json.dumps(additional_context or {}, indent=2), # for bug_bounty_hunter - "additional_context_json": json.dumps(additional_context or {}, indent=2), - "mission_objectives_json": json.dumps(additional_context or {}, indent=2) # for red_team_agent + # Findings + self.vulnerabilities = [] + self.interesting_findings = [] + self.tool_history = [] + + logger.info(f"Initialized {self.agent_name} - Autonomous Agent") + + def _extract_targets(self, user_input: str) -> List[str]: + """Extract target URLs from input.""" + targets = [] + + if os.path.isfile(user_input.strip()): + with open(user_input.strip(), 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + targets.append(self._normalize_url(line)) + return targets + + url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' + urls = re.findall(url_pattern, user_input) + if urls: + return [self._normalize_url(u) for u in urls] + + domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b' + domains = re.findall(domain_pattern, user_input) + if domains: + return [f"http://{d}" for d in domains] + + return [] + + def _normalize_url(self, url: str) -> str: + url = url.strip() + if not url.startswith(('http://', 'https://')): + url = f"http://{url}" + return url + + def _get_domain(self, url: str) -> str: + parsed = urllib.parse.urlparse(url) + return parsed.netloc or parsed.path.split('/')[0] + + def run_command(self, tool: str, args: str, timeout: int = 60) -> Dict: + """Execute command and capture output.""" + result = { + "tool": tool, + "args": args, + "command": "", + "success": False, + "output": "", + "timestamp": datetime.now().isoformat() } - if additional_context: - for key, value in additional_context.items(): - if isinstance(value, (dict, list)): - format_dict[f"{key}_json"] = json.dumps(value, indent=2) - else: - format_dict[key] = value + tool_path = self.config.get('tools', {}).get(tool) or shutil.which(tool) - # Use a safe way to format, handling missing keys gracefully - class SafeDict(dict): - def __missing__(self, key): - return f"{{{key}}}" # Return the placeholder as-is for missing keys - - formatted_prompt = user_prompt_template.format_map(SafeDict(format_dict)) - - return formatted_prompt + if not tool_path: + result["output"] = f"[!] Tool '{tool}' not found - using alternative" + logger.warning(f"Tool not found: {tool}") + self.tool_history.append(result) + return result + + try: + if tool == "curl": + cmd = f"{tool_path} {args}" + else: + cmd = f"{tool_path} {args}" + + result["command"] = cmd + print(f" [>] {tool}: {args[:80]}{'...' if len(args) > 80 else ''}") + + proc = subprocess.run( + cmd, + shell=True, + capture_output=True, + text=True, + timeout=timeout + ) + + output = proc.stdout or proc.stderr + result["output"] = output[:8000] if output else "[No output]" + result["success"] = proc.returncode == 0 + + except subprocess.TimeoutExpired: + result["output"] = f"[!] Timeout after {timeout}s" + except Exception as e: + result["output"] = f"[!] Error: {str(e)}" + + self.tool_history.append(result) + return result def execute(self, user_input: str, campaign_data: Dict = None) -> Dict: + """Execute autonomous security assessment.""" + targets = self._extract_targets(user_input) + + if not targets: + return { + "error": "No targets found", + "llm_response": "Please provide a URL, domain, IP, or file with targets." + } + + print(f"\n{'='*70}") + print(f" NEUROSPLOIT AUTONOMOUS AGENT - {self.agent_name.upper()}") + print(f"{'='*70}") + print(f" Mode: Adaptive AI-Driven Testing") + print(f" Targets: {len(targets)}") + print(f"{'='*70}\n") + + all_findings = [] + + for idx, target in enumerate(targets, 1): + if len(targets) > 1: + print(f"\n[TARGET {idx}/{len(targets)}] {target}") + print("=" * 60) + + self.tool_history = [] + self.vulnerabilities = [] + self.discovered_endpoints = [] + + findings = self._autonomous_assessment(target) + all_findings.extend(findings) + + final_report = self._generate_final_report(targets, all_findings) + + return { + "agent_name": self.agent_name, + "input": user_input, + "targets": targets, + "targets_count": len(targets), + "tools_executed": len(self.tool_history), + "vulnerabilities_found": len(self.vulnerabilities), + "findings": all_findings, + "llm_response": final_report, + "scan_data": { + "targets": targets, + "tools_executed": len(self.tool_history), + "endpoints_discovered": len(self.discovered_endpoints) + } + } + + def _autonomous_assessment(self, target: str) -> List[Dict]: """ - Executes the agent's task using the LLM and potentially external tools. - `campaign_data` can be used to pass ongoing results or context between agent executions. + Autonomous assessment with AI-driven adaptation. + The AI analyzes each response and decides next steps. """ - logger.info(f"Executing {self.agent_name} agent for input: {user_input[:50]}...") - - system_prompt = self.context_prompts.get("system_prompt", "") - if not system_prompt: - logger.warning(f"No system prompt found for agent {self.agent_name}. Using generic system prompt.") - system_prompt = f"You are an expert {self.agent_name}. Analyze the provided information and generate a response." - # Prepare the user prompt with current input and campaign data - prepared_user_prompt = self._prepare_prompt(user_input, campaign_data) + # Phase 1: Initial Reconnaissance & Discovery + print(f"\n[PHASE 1] Autonomous Discovery - {target}") + print("-" * 50) - # Loop for tool usage - for _ in range(5): # Limit to 5 iterations to prevent infinite loops - llm_response_text = self.llm_manager.generate(prepared_user_prompt, system_prompt) - - tool_name, tool_args = self._parse_llm_response(llm_response_text) + discovery_data = self._discover_attack_surface(target) - if tool_name: - if tool_name in self.config.get('tools', {}): - tool_path = self.config['tools'][tool_name] - tool_output = self._execute_tool(tool_path, tool_args) - prepared_user_prompt += f"\n\n[TOOL_OUTPUT]\n{tool_output}" - else: - if self._ask_for_permission(f"Tool '{tool_name}' not found. Do you want to try to download it?"): - self.download_tool(tool_name) - # We don't execute the tool in this iteration, but the LLM can try again in the next one - prepared_user_prompt += f"\n\n[TOOL_DOWNLOAD] Tool '{tool_name}' downloaded." - else: - prepared_user_prompt += f"\n\n[TOOL_ERROR] Tool '{tool_name}' not found and permission to download was denied." + # Phase 2: AI Analysis of Attack Surface + print(f"\n[PHASE 2] AI Attack Surface Analysis") + print("-" * 50) + + attack_plan = self._ai_analyze_attack_surface(target, discovery_data) + + # Phase 3: Adaptive Exploitation Loop + print(f"\n[PHASE 3] Adaptive Exploitation") + print("-" * 50) + + self._adaptive_exploitation_loop(target, attack_plan) + + # Phase 4: Deep Dive on Findings + print(f"\n[PHASE 4] Deep Exploitation of Findings") + print("-" * 50) + + self._deep_exploitation(target) + + return self.tool_history + + def _discover_attack_surface(self, target: str) -> Dict: + """Dynamically discover all attack vectors.""" + + discovery = { + "base_response": "", + "headers": {}, + "endpoints": [], + "params": [], + "forms": [], + "tech_hints": [], + "interesting_files": [] + } + + # Get base response + result = self.run_command("curl", f'-s -k -L -D - "{target}"') + discovery["base_response"] = result.get("output", "") + + # Extract headers + headers_match = re.findall(r'^([A-Za-z-]+):\s*(.+)$', discovery["base_response"], re.MULTILINE) + discovery["headers"] = dict(headers_match) + + # Get HTML and extract links + html_result = self.run_command("curl", f'-s -k "{target}"') + html = html_result.get("output", "") + + # Extract all links + links = re.findall(r'(?:href|src|action)=["\']([^"\']+)["\']', html, re.IGNORECASE) + for link in links: + if not link.startswith(('http://', 'https://', '//', '#', 'javascript:', 'mailto:')): + full_url = urllib.parse.urljoin(target, link) + if full_url not in discovery["endpoints"]: + discovery["endpoints"].append(full_url) + elif link.startswith('/'): + full_url = urllib.parse.urljoin(target, link) + if full_url not in discovery["endpoints"]: + discovery["endpoints"].append(full_url) + + # Extract forms and inputs + forms = re.findall(r'
', html, re.IGNORECASE | re.DOTALL) + for action, form_content in forms: + inputs = re.findall(r']*name=["\']([^"\']+)["\']', form_content, re.IGNORECASE) + discovery["forms"].append({ + "action": urllib.parse.urljoin(target, action) if action else target, + "inputs": inputs + }) + + # Extract URL parameters from links + for endpoint in discovery["endpoints"]: + parsed = urllib.parse.urlparse(endpoint) + params = urllib.parse.parse_qs(parsed.query) + for param in params.keys(): + if param not in discovery["params"]: + discovery["params"].append(param) + + # Check common files + common_files = [ + "robots.txt", "sitemap.xml", ".htaccess", "crossdomain.xml", + "phpinfo.php", "info.php", "test.php", "admin/", "login.php", + "wp-config.php.bak", ".git/config", ".env", "config.php.bak" + ] + + for file in common_files[:8]: + result = self.run_command("curl", f'-s -k -o /dev/null -w "%{{http_code}}" "{target}/{file}"') + if result.get("output", "").strip() in ["200", "301", "302", "403"]: + discovery["interesting_files"].append(f"{target}/{file}") + + # Detect technologies + tech_patterns = { + "PHP": [r'\.php', r'PHPSESSID', r'X-Powered-By:.*PHP'], + "ASP.NET": [r'\.aspx?', r'ASP\.NET', r'__VIEWSTATE'], + "Java": [r'\.jsp', r'JSESSIONID', r'\.do\b'], + "Python": [r'Django', r'Flask', r'\.py'], + "WordPress": [r'wp-content', r'wp-includes'], + "MySQL": [r'mysql', r'MariaDB'], + } + + full_response = discovery["base_response"] + html + for tech, patterns in tech_patterns.items(): + for pattern in patterns: + if re.search(pattern, full_response, re.IGNORECASE): + if tech not in discovery["tech_hints"]: + discovery["tech_hints"].append(tech) + + self.discovered_endpoints = discovery["endpoints"] + + print(f" [+] Discovered {len(discovery['endpoints'])} endpoints") + print(f" [+] Found {len(discovery['params'])} parameters") + print(f" [+] Found {len(discovery['forms'])} forms") + print(f" [+] Tech hints: {', '.join(discovery['tech_hints']) or 'Unknown'}") + + return discovery + + def _ai_analyze_attack_surface(self, target: str, discovery: Dict) -> str: + """AI analyzes discovered surface and creates attack plan.""" + + analysis_prompt = f"""You are an elite penetration tester analyzing an attack surface. + +TARGET: {target} + +=== DISCOVERED ATTACK SURFACE === + +**Endpoints Found ({len(discovery['endpoints'])}):** +{chr(10).join(discovery['endpoints'][:20])} + +**Parameters Found:** +{', '.join(discovery['params'][:20])} + +**Forms Found:** +{json.dumps(discovery['forms'][:10], indent=2)} + +**Technologies Detected:** +{', '.join(discovery['tech_hints'])} + +**Interesting Files:** +{chr(10).join(discovery['interesting_files'])} + +**Response Headers:** +{json.dumps(dict(list(discovery['headers'].items())[:10]), indent=2)} + +=== YOUR TASK === + +Analyze this attack surface and output SPECIFIC tests to run. +For each test, output in this EXACT format: + +[TEST] curl -s -k "[URL_WITH_PAYLOAD]" +[TEST] curl -s -k "[URL]" -d "param=payload" + +Focus on: +1. SQL Injection - test EVERY parameter with: ' " 1 OR 1=1 UNION SELECT +2. XSS - test inputs with: