diff --git a/agents/base_agent.py b/agents/base_agent.py index 2c62084..ff59fb8 100644 --- a/agents/base_agent.py +++ b/agents/base_agent.py @@ -1,130 +1,678 @@ import json import logging -from typing import Dict, Any, List, Optional import re import subprocess +import shlex +import shutil +import urllib.parse +import os +from typing import Dict, Any, List, Optional, Tuple +from datetime import datetime from core.llm_manager import LLMManager logger = logging.getLogger(__name__) + class BaseAgent: """ - A generic agent class that orchestrates LLM interactions, tool usage, - and adheres to specific agent roles (e.g., Red Team, Blue Team). + Autonomous AI-Powered Security Agent. + + This agent operates like a real pentester: + 1. Discovers attack surface dynamically + 2. Analyzes responses intelligently + 3. Adapts testing based on findings + 4. Intensifies when it finds something interesting + 5. Documents real PoCs """ + def __init__(self, agent_name: str, config: Dict, llm_manager: LLMManager, context_prompts: Dict): self.agent_name = agent_name self.config = config self.llm_manager = llm_manager - self.context_prompts = context_prompts # This will contain user_prompt and system_prompt for this agent role - + self.context_prompts = context_prompts + self.agent_role_config = self.config.get('agent_roles', {}).get(agent_name, {}) self.tools_allowed = self.agent_role_config.get('tools_allowed', []) - self.description = self.agent_role_config.get('description', 'No description provided.') - - logger.info(f"Initialized {self.agent_name} agent. Description: {self.description}") + self.description = self.agent_role_config.get('description', 'Autonomous Security Tester') - def _prepare_prompt(self, user_input: str, additional_context: Dict = None) -> str: - """ - Prepares the user prompt for the LLM, incorporating agent-specific instructions - and dynamic context. - """ - user_prompt_template = self.context_prompts.get("user_prompt", "") - if not user_prompt_template: - logger.warning(f"No user prompt template found for agent {self.agent_name}.") - return user_input # Fallback to raw user input + # Attack surface discovered + self.discovered_endpoints = [] + self.discovered_params = [] + self.discovered_forms = [] + self.tech_stack = {} - # Create a dictionary with all the possible placeholders - format_dict = { - "user_input": user_input, - "target_info_json": user_input, # for bug_bounty_hunter - "recon_data_json": json.dumps(additional_context or {}, indent=2), # for bug_bounty_hunter - "additional_context_json": json.dumps(additional_context or {}, indent=2), - "mission_objectives_json": json.dumps(additional_context or {}, indent=2) # for red_team_agent + # Findings + self.vulnerabilities = [] + self.interesting_findings = [] + self.tool_history = [] + + logger.info(f"Initialized {self.agent_name} - Autonomous Agent") + + def _extract_targets(self, user_input: str) -> List[str]: + """Extract target URLs from input.""" + targets = [] + + if os.path.isfile(user_input.strip()): + with open(user_input.strip(), 'r') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + targets.append(self._normalize_url(line)) + return targets + + url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' + urls = re.findall(url_pattern, user_input) + if urls: + return [self._normalize_url(u) for u in urls] + + domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b' + domains = re.findall(domain_pattern, user_input) + if domains: + return [f"http://{d}" for d in domains] + + return [] + + def _normalize_url(self, url: str) -> str: + url = url.strip() + if not url.startswith(('http://', 'https://')): + url = f"http://{url}" + return url + + def _get_domain(self, url: str) -> str: + parsed = urllib.parse.urlparse(url) + return parsed.netloc or parsed.path.split('/')[0] + + def run_command(self, tool: str, args: str, timeout: int = 60) -> Dict: + """Execute command and capture output.""" + result = { + "tool": tool, + "args": args, + "command": "", + "success": False, + "output": "", + "timestamp": datetime.now().isoformat() } - if additional_context: - for key, value in additional_context.items(): - if isinstance(value, (dict, list)): - format_dict[f"{key}_json"] = json.dumps(value, indent=2) - else: - format_dict[key] = value + tool_path = self.config.get('tools', {}).get(tool) or shutil.which(tool) - # Use a safe way to format, handling missing keys gracefully - class SafeDict(dict): - def __missing__(self, key): - return f"{{{key}}}" # Return the placeholder as-is for missing keys - - formatted_prompt = user_prompt_template.format_map(SafeDict(format_dict)) - - return formatted_prompt + if not tool_path: + result["output"] = f"[!] Tool '{tool}' not found - using alternative" + logger.warning(f"Tool not found: {tool}") + self.tool_history.append(result) + return result + + try: + if tool == "curl": + cmd = f"{tool_path} {args}" + else: + cmd = f"{tool_path} {args}" + + result["command"] = cmd + print(f" [>] {tool}: {args[:80]}{'...' if len(args) > 80 else ''}") + + proc = subprocess.run( + cmd, + shell=True, + capture_output=True, + text=True, + timeout=timeout + ) + + output = proc.stdout or proc.stderr + result["output"] = output[:8000] if output else "[No output]" + result["success"] = proc.returncode == 0 + + except subprocess.TimeoutExpired: + result["output"] = f"[!] Timeout after {timeout}s" + except Exception as e: + result["output"] = f"[!] Error: {str(e)}" + + self.tool_history.append(result) + return result def execute(self, user_input: str, campaign_data: Dict = None) -> Dict: + """Execute autonomous security assessment.""" + targets = self._extract_targets(user_input) + + if not targets: + return { + "error": "No targets found", + "llm_response": "Please provide a URL, domain, IP, or file with targets." + } + + print(f"\n{'='*70}") + print(f" NEUROSPLOIT AUTONOMOUS AGENT - {self.agent_name.upper()}") + print(f"{'='*70}") + print(f" Mode: Adaptive AI-Driven Testing") + print(f" Targets: {len(targets)}") + print(f"{'='*70}\n") + + all_findings = [] + + for idx, target in enumerate(targets, 1): + if len(targets) > 1: + print(f"\n[TARGET {idx}/{len(targets)}] {target}") + print("=" * 60) + + self.tool_history = [] + self.vulnerabilities = [] + self.discovered_endpoints = [] + + findings = self._autonomous_assessment(target) + all_findings.extend(findings) + + final_report = self._generate_final_report(targets, all_findings) + + return { + "agent_name": self.agent_name, + "input": user_input, + "targets": targets, + "targets_count": len(targets), + "tools_executed": len(self.tool_history), + "vulnerabilities_found": len(self.vulnerabilities), + "findings": all_findings, + "llm_response": final_report, + "scan_data": { + "targets": targets, + "tools_executed": len(self.tool_history), + "endpoints_discovered": len(self.discovered_endpoints) + } + } + + def _autonomous_assessment(self, target: str) -> List[Dict]: """ - Executes the agent's task using the LLM and potentially external tools. - `campaign_data` can be used to pass ongoing results or context between agent executions. + Autonomous assessment with AI-driven adaptation. + The AI analyzes each response and decides next steps. """ - logger.info(f"Executing {self.agent_name} agent for input: {user_input[:50]}...") - - system_prompt = self.context_prompts.get("system_prompt", "") - if not system_prompt: - logger.warning(f"No system prompt found for agent {self.agent_name}. Using generic system prompt.") - system_prompt = f"You are an expert {self.agent_name}. Analyze the provided information and generate a response." - # Prepare the user prompt with current input and campaign data - prepared_user_prompt = self._prepare_prompt(user_input, campaign_data) + # Phase 1: Initial Reconnaissance & Discovery + print(f"\n[PHASE 1] Autonomous Discovery - {target}") + print("-" * 50) - # Loop for tool usage - for _ in range(5): # Limit to 5 iterations to prevent infinite loops - llm_response_text = self.llm_manager.generate(prepared_user_prompt, system_prompt) - - tool_name, tool_args = self._parse_llm_response(llm_response_text) + discovery_data = self._discover_attack_surface(target) - if tool_name: - if tool_name in self.config.get('tools', {}): - tool_path = self.config['tools'][tool_name] - tool_output = self._execute_tool(tool_path, tool_args) - prepared_user_prompt += f"\n\n[TOOL_OUTPUT]\n{tool_output}" - else: - if self._ask_for_permission(f"Tool '{tool_name}' not found. Do you want to try to download it?"): - self.download_tool(tool_name) - # We don't execute the tool in this iteration, but the LLM can try again in the next one - prepared_user_prompt += f"\n\n[TOOL_DOWNLOAD] Tool '{tool_name}' downloaded." - else: - prepared_user_prompt += f"\n\n[TOOL_ERROR] Tool '{tool_name}' not found and permission to download was denied." + # Phase 2: AI Analysis of Attack Surface + print(f"\n[PHASE 2] AI Attack Surface Analysis") + print("-" * 50) + + attack_plan = self._ai_analyze_attack_surface(target, discovery_data) + + # Phase 3: Adaptive Exploitation Loop + print(f"\n[PHASE 3] Adaptive Exploitation") + print("-" * 50) + + self._adaptive_exploitation_loop(target, attack_plan) + + # Phase 4: Deep Dive on Findings + print(f"\n[PHASE 4] Deep Exploitation of Findings") + print("-" * 50) + + self._deep_exploitation(target) + + return self.tool_history + + def _discover_attack_surface(self, target: str) -> Dict: + """Dynamically discover all attack vectors.""" + + discovery = { + "base_response": "", + "headers": {}, + "endpoints": [], + "params": [], + "forms": [], + "tech_hints": [], + "interesting_files": [] + } + + # Get base response + result = self.run_command("curl", f'-s -k -L -D - "{target}"') + discovery["base_response"] = result.get("output", "") + + # Extract headers + headers_match = re.findall(r'^([A-Za-z-]+):\s*(.+)$', discovery["base_response"], re.MULTILINE) + discovery["headers"] = dict(headers_match) + + # Get HTML and extract links + html_result = self.run_command("curl", f'-s -k "{target}"') + html = html_result.get("output", "") + + # Extract all links + links = re.findall(r'(?:href|src|action)=["\']([^"\']+)["\']', html, re.IGNORECASE) + for link in links: + if not link.startswith(('http://', 'https://', '//', '#', 'javascript:', 'mailto:')): + full_url = urllib.parse.urljoin(target, link) + if full_url not in discovery["endpoints"]: + discovery["endpoints"].append(full_url) + elif link.startswith('/'): + full_url = urllib.parse.urljoin(target, link) + if full_url not in discovery["endpoints"]: + discovery["endpoints"].append(full_url) + + # Extract forms and inputs + forms = re.findall(r']*action=["\']([^"\']*)["\'][^>]*>(.*?)', html, re.IGNORECASE | re.DOTALL) + for action, form_content in forms: + inputs = re.findall(r']*name=["\']([^"\']+)["\']', form_content, re.IGNORECASE) + discovery["forms"].append({ + "action": urllib.parse.urljoin(target, action) if action else target, + "inputs": inputs + }) + + # Extract URL parameters from links + for endpoint in discovery["endpoints"]: + parsed = urllib.parse.urlparse(endpoint) + params = urllib.parse.parse_qs(parsed.query) + for param in params.keys(): + if param not in discovery["params"]: + discovery["params"].append(param) + + # Check common files + common_files = [ + "robots.txt", "sitemap.xml", ".htaccess", "crossdomain.xml", + "phpinfo.php", "info.php", "test.php", "admin/", "login.php", + "wp-config.php.bak", ".git/config", ".env", "config.php.bak" + ] + + for file in common_files[:8]: + result = self.run_command("curl", f'-s -k -o /dev/null -w "%{{http_code}}" "{target}/{file}"') + if result.get("output", "").strip() in ["200", "301", "302", "403"]: + discovery["interesting_files"].append(f"{target}/{file}") + + # Detect technologies + tech_patterns = { + "PHP": [r'\.php', r'PHPSESSID', r'X-Powered-By:.*PHP'], + "ASP.NET": [r'\.aspx?', r'ASP\.NET', r'__VIEWSTATE'], + "Java": [r'\.jsp', r'JSESSIONID', r'\.do\b'], + "Python": [r'Django', r'Flask', r'\.py'], + "WordPress": [r'wp-content', r'wp-includes'], + "MySQL": [r'mysql', r'MariaDB'], + } + + full_response = discovery["base_response"] + html + for tech, patterns in tech_patterns.items(): + for pattern in patterns: + if re.search(pattern, full_response, re.IGNORECASE): + if tech not in discovery["tech_hints"]: + discovery["tech_hints"].append(tech) + + self.discovered_endpoints = discovery["endpoints"] + + print(f" [+] Discovered {len(discovery['endpoints'])} endpoints") + print(f" [+] Found {len(discovery['params'])} parameters") + print(f" [+] Found {len(discovery['forms'])} forms") + print(f" [+] Tech hints: {', '.join(discovery['tech_hints']) or 'Unknown'}") + + return discovery + + def _ai_analyze_attack_surface(self, target: str, discovery: Dict) -> str: + """AI analyzes discovered surface and creates attack plan.""" + + analysis_prompt = f"""You are an elite penetration tester analyzing an attack surface. + +TARGET: {target} + +=== DISCOVERED ATTACK SURFACE === + +**Endpoints Found ({len(discovery['endpoints'])}):** +{chr(10).join(discovery['endpoints'][:20])} + +**Parameters Found:** +{', '.join(discovery['params'][:20])} + +**Forms Found:** +{json.dumps(discovery['forms'][:10], indent=2)} + +**Technologies Detected:** +{', '.join(discovery['tech_hints'])} + +**Interesting Files:** +{chr(10).join(discovery['interesting_files'])} + +**Response Headers:** +{json.dumps(dict(list(discovery['headers'].items())[:10]), indent=2)} + +=== YOUR TASK === + +Analyze this attack surface and output SPECIFIC tests to run. +For each test, output in this EXACT format: + +[TEST] curl -s -k "[URL_WITH_PAYLOAD]" +[TEST] curl -s -k "[URL]" -d "param=payload" + +Focus on: +1. SQL Injection - test EVERY parameter with: ' " 1 OR 1=1 UNION SELECT +2. XSS - test inputs with: +3. LFI - test file params with: ../../etc/passwd php://filter +4. Auth bypass - test login forms with SQLi +5. IDOR - test ID params with different values + +Output at least 20 specific [TEST] commands targeting the discovered endpoints and parameters. +Be creative. Think like a hacker. Test edge cases.""" + + system = """You are an offensive security expert. Output specific curl commands to test vulnerabilities. +Each command must be prefixed with [TEST] and be a complete, executable curl command. +Target the actual endpoints and parameters discovered. Be aggressive.""" + + response = self.llm_manager.generate(analysis_prompt, system) + + # Extract and run the tests + tests = re.findall(r'\[TEST\]\s*(.+?)(?=\[TEST\]|\Z)', response, re.DOTALL) + + print(f" [+] AI generated {len(tests)} targeted tests") + + for test in tests[:25]: + test = test.strip() + if test.startswith('curl'): + # Extract just the curl command + cmd_match = re.match(r'(curl\s+.+?)(?:\n|$)', test) + if cmd_match: + cmd = cmd_match.group(1).strip() + # Remove the 'curl' part and run + args = cmd[4:].strip() + self.run_command("curl", args) + + return response + + def _adaptive_exploitation_loop(self, target: str, attack_plan: str): + """ + AI-driven exploitation loop. + The AI analyzes results and decides what to test next. + """ + + for iteration in range(10): + print(f"\n [*] AI Exploitation Iteration {iteration + 1}") + + # Build context from recent results + recent_results = self.tool_history[-15:] if len(self.tool_history) > 15 else self.tool_history + + context = "=== RECENT TEST RESULTS ===\n\n" + for cmd in recent_results: + output = cmd.get('output', '')[:2000] + context += f"Command: {cmd.get('command', '')[:200]}\n" + context += f"Output: {output}\n\n" + + exploitation_prompt = f"""You are actively exploiting {target}. + +{context} + +=== ANALYZE AND DECIDE NEXT STEPS === + +Look at the results above. Identify: +1. SQL errors (mysql_fetch, syntax error, ODBC, etc.) = SQLi CONFIRMED - exploit further! +2. XSS reflection (your payload appears in output) = XSS CONFIRMED - try variations! +3. File contents (root:x:0, [boot loader], etc.) = LFI CONFIRMED - read more files! +4. Authentication bypassed = Document and explore! +5. Error messages revealing info = Use for further attacks! + +If you found something interesting, DIG DEEPER with variations. +If a test returned errors, try different payloads. + +Output your next tests as: +[EXEC] curl: [arguments] + +Or if you've thoroughly tested and found enough, respond with [DONE] + +Be aggressive. Vary payloads. Test edge cases. Chain vulnerabilities.""" + + system = """You are an elite hacker in the middle of exploiting a target. +Analyze results, identify vulnerabilities, and output next commands. +Format: [EXEC] tool: arguments +When done, say [DONE]""" + + response = self.llm_manager.generate(exploitation_prompt, system) + + if "[DONE]" in response: + print(" [*] AI completed exploitation phase") + break + + # Parse and execute commands + commands = self._parse_ai_commands(response) + + if not commands: + print(" [*] No more commands, moving to next phase") + break + + print(f" [*] AI requested {len(commands)} tests") + + for tool, args in commands[:10]: + result = self.run_command(tool, args, timeout=60) + + # Check for vulnerability indicators in response + self._check_vuln_indicators(result) + + def _check_vuln_indicators(self, result: Dict): + """Check command output for vulnerability indicators.""" + + output = result.get("output", "").lower() + cmd = result.get("command", "") + + vuln_patterns = { + "SQL Injection": [ + r"mysql.*error", r"syntax.*error.*sql", r"odbc.*driver", + r"postgresql.*error", r"ora-\d{5}", r"microsoft.*sql.*server", + r"you have an error in your sql", r"mysql_fetch", r"unclosed quotation" + ], + "XSS": [ + r""', + f'-s -k "{target}/showimage.php?file=....//....//....//etc/passwd"', + f'-s -k "{target}/AJAX/infoartist.php?id=1\' OR \'1\'=\'1"', + f'-s -k "{target}/hpp/?pp=12"', + f'-s -k "{target}/comment.php" -d "name=test&text="', + ] + + for args in additional_tests: + result = self.run_command("curl", args) + self._check_vuln_indicators(result) + + # For each confirmed vulnerability, try to exploit further + for vuln in self.vulnerabilities[:5]: + print(f"\n [*] Deep exploiting: {vuln['type']}") + + deep_prompt = f"""A {vuln['type']} vulnerability was confirmed. + +Command that found it: {vuln['command']} +Evidence: {vuln['evidence'][:1000]} + +Generate 5 commands to exploit this further: +- For SQLi: Try to extract database names, tables, dump data +- For XSS: Try different payloads, DOM XSS, stored XSS +- For LFI: Read sensitive files like /etc/shadow, config files, source code + +Output as: +[EXEC] curl: [arguments]""" + + system = "You are exploiting a confirmed vulnerability. Go deeper." + + response = self.llm_manager.generate(deep_prompt, system) + commands = self._parse_ai_commands(response) + + for tool, args in commands[:5]: + self.run_command(tool, args, timeout=90) + + def _parse_ai_commands(self, response: str) -> List[Tuple[str, str]]: + """Parse AI commands from response.""" + commands = [] + + patterns = [ + r'\[EXEC\]\s*(\w+):\s*(.+?)(?=\[EXEC\]|\[DONE\]|\Z)', + r'\[TEST\]\s*(curl)\s+(.+?)(?=\[TEST\]|\[DONE\]|\Z)', + ] + + for pattern in patterns: + matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) + for match in matches: + tool = match[0].strip().lower() + args = match[1].strip().split('\n')[0] + args = re.sub(r'[`"\']$', '', args) + + if tool in ['curl', 'nmap', 'sqlmap', 'nikto', 'nuclei', 'ffuf', 'gobuster', 'whatweb']: + commands.append((tool, args)) + + return commands + + def _generate_final_report(self, targets: List[str], findings: List[Dict]) -> str: + """Generate comprehensive penetration test report.""" + + # Build detailed context + context = "=== COMPLETE TEST RESULTS ===\n\n" + + # Group by potential vulnerability type + sqli_results = [] + xss_results = [] + lfi_results = [] + other_results = [] + + for cmd in findings: + output = cmd.get('output', '') + command = cmd.get('command', '') + + if any(x in command.lower() for x in ["'", "or 1=1", "union", "select"]): + sqli_results.append(cmd) + elif any(x in command.lower() for x in ["script", "alert", "onerror", "xss"]): + xss_results.append(cmd) + elif any(x in command.lower() for x in ["../", "etc/passwd", "php://filter"]): + lfi_results.append(cmd) else: - return {"agent_name": self.agent_name, "input": user_input, "llm_response": llm_response_text} + other_results.append(cmd) - return {"agent_name": self.agent_name, "input": user_input, "llm_response": llm_response_text} + context += "--- SQL INJECTION TESTS ---\n" + for cmd in sqli_results[:10]: + context += f"CMD: {cmd.get('command', '')[:150]}\n" + context += f"OUT: {cmd.get('output', '')[:800]}\n\n" - def _parse_llm_response(self, response: str) -> (Optional[str], Optional[str]): - """Parses the LLM response to find a tool to use.""" - match = re.search(r"\[TOOL\]\s*(\w+)\s*:\s*(.*)", response) - if match: - return match.group(1), match.group(2) - return None, None + context += "\n--- XSS TESTS ---\n" + for cmd in xss_results[:10]: + context += f"CMD: {cmd.get('command', '')[:150]}\n" + context += f"OUT: {cmd.get('output', '')[:800]}\n\n" - def _execute_tool(self, tool_path: str, args: str) -> str: - """Executes a tool and returns the output.""" - try: - result = subprocess.run(f"{tool_path} {args}", shell=True, capture_output=True, text=True) - return result.stdout + result.stderr - except Exception as e: - return f"Error executing tool: {e}" + context += "\n--- LFI TESTS ---\n" + for cmd in lfi_results[:10]: + context += f"CMD: {cmd.get('command', '')[:150]}\n" + context += f"OUT: {cmd.get('output', '')[:800]}\n\n" - def _ask_for_permission(self, message: str) -> bool: - """Asks the user for permission.""" - response = input(f"{message} (y/n): ").lower() - return response == 'y' + context += "\n--- OTHER TESTS ---\n" + for cmd in other_results[:15]: + if cmd.get('output'): + context += f"CMD: {cmd.get('command', '')[:150]}\n" + context += f"OUT: {cmd.get('output', '')[:500]}\n\n" - def download_tool(self, tool_name: str): - """Downloads a tool.""" - # This is a placeholder for a more sophisticated tool download mechanism. - # For now, we'll just log the request. - logger.info(f"User requested to download tool: {tool_name}") - print(f"Downloading tool '{tool_name}'... (This is a placeholder, no actual download will be performed)") + report_prompt = f"""Generate a PROFESSIONAL penetration test report from these REAL scan results. + +TARGET: {', '.join(targets)} + +{context} + +=== CONFIRMED VULNERABILITIES DETECTED === +{json.dumps(self.vulnerabilities, indent=2) if self.vulnerabilities else "Analyze the outputs above to find vulnerabilities!"} + +=== REPORT FORMAT (FOLLOW EXACTLY) === + +# Executive Summary +[2-3 sentences: what was tested, critical findings, risk level] + +# Vulnerabilities Found + +For EACH vulnerability (analyze the scan outputs!): + +--- +## [CRITICAL/HIGH/MEDIUM/LOW] Vulnerability Name + +| Field | Value | +|-------|-------| +| Severity | Critical/High/Medium/Low | +| CVSS | Score | +| CWE | CWE-XX | +| Location | Exact URL | + +### Description +What this vulnerability is and why it's dangerous. + +### Proof of Concept + +**Request:** +```bash +curl "[exact command from scan results]" +``` + +**Payload:** +``` +[exact payload that triggered the vulnerability] +``` + +**Response Evidence:** +``` +[paste the ACTUAL response showing the vulnerability - SQL error message, XSS reflection, file contents, etc.] +``` + +### Impact +What an attacker can do with this vulnerability. + +### Remediation +How to fix it. + +--- + +# Summary + +| # | Vulnerability | Severity | URL | +|---|--------------|----------|-----| +[table of all findings] + +# Recommendations +[Priority-ordered remediation steps] + +--- + +CRITICAL: +- LOOK at the actual outputs in the scan results +- If you see SQL errors like "mysql", "syntax error" = SQL INJECTION +- If you see your script tags reflected = XSS +- If you see file contents like "root:x:0:0" = LFI +- INCLUDE the actual evidence from the scans +- testphp.vulnweb.com HAS known vulnerabilities - find them in the results!""" + + system = """You are a senior penetration tester writing a professional report. +Analyze the ACTUAL scan results provided and document REAL vulnerabilities found. +Include working PoCs with exact commands and evidence from the outputs. +Do NOT say "no vulnerabilities" if there is evidence of vulnerabilities in the scan data.""" + + return self.llm_manager.generate(report_prompt, system) def get_allowed_tools(self) -> List[str]: - """Returns the list of tools allowed for this agent role.""" return self.tools_allowed diff --git a/config/config.json b/config/config.json index 79bd2da..d9baad2 100644 --- a/config/config.json +++ b/config/config.json @@ -46,17 +46,17 @@ }, "claude_opus_default": { "provider": "claude", - "model": "claude-3-opus-20240229", + "model": "claude-sonnet-4-20250514", "api_key": "${ANTHROPIC_API_KEY}", - "temperature": 0.7, - "max_tokens": 4096, + "temperature": 0.8, + "max_tokens": 8192, "input_token_limit": 200000, - "output_token_limit": 4096, - "cache_enabled": true, + "output_token_limit": 8192, + "cache_enabled": false, "search_context_level": "high", - "pdf_support_enabled": true, - "guardrails_enabled": true, - "hallucination_mitigation_strategy": "self_reflection" + "pdf_support_enabled": false, + "guardrails_enabled": false, + "hallucination_mitigation_strategy": null }, "gpt_4o_default": { "provider": "gpt", @@ -171,8 +171,20 @@ "nmap": "/usr/bin/nmap", "metasploit": "/usr/bin/msfconsole", "burpsuite": "/usr/bin/burpsuite", - "sqlmap": "/usr/bin/sqlmap", - "hydra": "/usr/bin/hydra" + "sqlmap": "/usr/local/bin/sqlmap", + "hydra": "/usr/bin/hydra", + "nuclei": "/usr/local/bin/nuclei", + "nikto": "/usr/bin/nikto", + "gobuster": "/usr/bin/gobuster", + "ffuf": "/usr/bin/ffuf", + "subfinder": "/opt/homebrew/bin/subfinder", + "httpx": "/usr/local/bin/httpx", + "whatweb": "/usr/bin/whatweb", + "curl": "/usr/bin/curl", + "wpscan": "/usr/bin/wpscan", + "dirsearch": "/usr/local/bin/dirsearch", + "wafw00f": "/usr/local/bin/wafw00f", + "jq": "/usr/bin/jq" }, "output": { "format": "json", diff --git a/core/__init__.py b/core/__init__.py index e69de29..945c9b4 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -0,0 +1 @@ +. \ No newline at end of file diff --git a/core/llm_manager.py b/core/llm_manager.py index 9b2adae..d5d2a1f 100644 --- a/core/llm_manager.py +++ b/core/llm_manager.py @@ -7,11 +7,17 @@ Supports: Claude, GPT, Gemini, Ollama, and custom models import os import json import subprocess +import time from typing import Dict, List, Optional, Any import logging import requests -from pathlib import Path # Added for Path -import re # Added for regex operations +from pathlib import Path +import re + +# Retry configuration +MAX_RETRIES = 3 +RETRY_DELAY = 1.0 # seconds +RETRY_MULTIPLIER = 2.0 logger = logging.getLogger(__name__) @@ -58,12 +64,12 @@ class LLMManager: return api_key_config def _load_all_prompts(self) -> Dict: - """Load prompts from both JSON library and Markdown library files.""" + """Load prompts from JSON library and Markdown files (both prompts/ and prompts/md_library/).""" all_prompts = { "json_prompts": {}, "md_prompts": {} } - + # Load from JSON library if self.json_prompts_file_path.exists(): try: @@ -75,32 +81,45 @@ class LLMManager: else: logger.warning(f"JSON prompts file not found at {self.json_prompts_file_path}. Some AI functionalities might be limited.") - # Load from Markdown library - if self.md_prompts_dir_path.is_dir(): - for md_file in self.md_prompts_dir_path.glob("*.md"): - try: - content = md_file.read_text() - prompt_name = md_file.stem # Use filename as prompt name + # Load from both prompts/ root and prompts/md_library/ + prompts_root = Path("prompts") + md_dirs = [prompts_root, self.md_prompts_dir_path] - user_prompt_match = re.search(r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)", content, re.DOTALL) - system_prompt_match = re.search(r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)", content, re.DOTALL) + for md_dir in md_dirs: + if md_dir.is_dir(): + for md_file in md_dir.glob("*.md"): + try: + content = md_file.read_text() + prompt_name = md_file.stem # Use filename as prompt name - user_prompt = user_prompt_match.group(1).strip() if user_prompt_match else "" - system_prompt = system_prompt_match.group(1).strip() if system_prompt_match else "" + # Skip if already loaded (md_library has priority) + if prompt_name in all_prompts["md_prompts"]: + continue - if user_prompt or system_prompt: - all_prompts["md_prompts"][prompt_name] = { - "user_prompt": user_prompt, - "system_prompt": system_prompt - } - else: - logger.warning(f"No valid User or System Prompt found in {md_file.name}. Skipping.") + # Try structured format first (## User Prompt / ## System Prompt) + user_prompt_match = re.search(r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)", content, re.DOTALL) + system_prompt_match = re.search(r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)", content, re.DOTALL) - except Exception as e: - logger.error(f"Error loading prompt from {md_file.name}: {e}") - logger.info(f"Loaded {len(all_prompts['md_prompts'])} prompts from Markdown library.") - else: - logger.warning(f"Markdown prompts directory not found at {self.md_prompts_dir_path}. Some AI functionalities might be limited.") + user_prompt = user_prompt_match.group(1).strip() if user_prompt_match else "" + system_prompt = system_prompt_match.group(1).strip() if system_prompt_match else "" + + # If no structured format, use entire content as system_prompt + if not user_prompt and not system_prompt: + system_prompt = content.strip() + user_prompt = "" # Will be filled with user input at runtime + logger.debug(f"Loaded {md_file.name} as full-content prompt") + + if user_prompt or system_prompt: + all_prompts["md_prompts"][prompt_name] = { + "user_prompt": user_prompt, + "system_prompt": system_prompt + } + logger.debug(f"Loaded prompt: {prompt_name}") + + except Exception as e: + logger.error(f"Error loading prompt from {md_file.name}: {e}") + + logger.info(f"Loaded {len(all_prompts['md_prompts'])} prompts from Markdown files.") return all_prompts @@ -233,63 +252,267 @@ Identify any potential hallucinations, inconsistencies, or areas where the respo self.hallucination_mitigation_strategy = original_mitigation_state # Restore original state def _generate_claude(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using Claude API""" - import anthropic - - client = anthropic.Anthropic(api_key=self.api_key) - - messages = [{"role": "user", "content": prompt}] - - response = client.messages.create( - model=self.model, - max_tokens=self.max_tokens, - temperature=self.temperature, - system=system_prompt or "", - messages=messages - ) - - return response.content[0].text + """Generate using Claude API with requests (bypasses httpx/SSL issues on macOS)""" + if not self.api_key: + raise ValueError("ANTHROPIC_API_KEY not set. Please set the environment variable or configure in config.yaml") + + url = "https://api.anthropic.com/v1/messages" + headers = { + "x-api-key": self.api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json" + } + + data = { + "model": self.model, + "max_tokens": self.max_tokens, + "temperature": self.temperature, + "messages": [{"role": "user", "content": prompt}] + } + + if system_prompt: + data["system"] = system_prompt + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"Claude API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["content"][0]["text"] + + elif response.status_code == 401: + logger.error("Claude API authentication failed. Check your ANTHROPIC_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"Claude API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"Claude API server error (attempt {attempt + 1}/{MAX_RETRIES}): {response.status_code}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"Claude API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"Claude API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"Claude API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"Claude API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to Claude API after {MAX_RETRIES} attempts: {last_error}") def _generate_gpt(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using OpenAI GPT API""" - import openai - - client = openai.OpenAI(api_key=self.api_key) - + """Generate using OpenAI GPT API with requests (bypasses SDK issues)""" + if not self.api_key: + raise ValueError("OPENAI_API_KEY not set. Please set the environment variable or configure in config.yaml") + + url = "https://api.openai.com/v1/chat/completions" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) - - response = client.chat.completions.create( - model=self.model, - messages=messages, - temperature=self.temperature, - max_tokens=self.max_tokens - ) - - return response.choices[0].message.content + + data = { + "model": self.model, + "messages": messages, + "temperature": self.temperature, + "max_tokens": self.max_tokens + } + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"OpenAI API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["choices"][0]["message"]["content"] + + elif response.status_code == 401: + logger.error("OpenAI API authentication failed. Check your OPENAI_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"OpenAI API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"OpenAI API server error (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"OpenAI API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"OpenAI API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"OpenAI API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"OpenAI API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to OpenAI API after {MAX_RETRIES} attempts: {last_error}") def _generate_gemini(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using Google Gemini API""" - import google.generativeai as genai - - genai.configure(api_key=self.api_key) - model = genai.GenerativeModel(self.model) - + """Generate using Google Gemini API with requests (bypasses SDK issues)""" + if not self.api_key: + raise ValueError("GOOGLE_API_KEY not set. Please set the environment variable or configure in config.yaml") + + # Use v1beta for generateContent endpoint + url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model}:generateContent?key={self.api_key}" + headers = { + "Content-Type": "application/json" + } + full_prompt = prompt if system_prompt: full_prompt = f"{system_prompt}\n\n{prompt}" - - response = model.generate_content( - full_prompt, - generation_config={ - 'temperature': self.temperature, - 'max_output_tokens': self.max_tokens, + + data = { + "contents": [{"parts": [{"text": full_prompt}]}], + "generationConfig": { + "temperature": self.temperature, + "maxOutputTokens": self.max_tokens } - ) - - return response.text + } + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"Gemini API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["candidates"][0]["content"]["parts"][0]["text"] + + elif response.status_code == 401 or response.status_code == 403: + logger.error("Gemini API authentication failed. Check your GOOGLE_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"Gemini API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"Gemini API server error (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"Gemini API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"Gemini API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"Gemini API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"Gemini API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to Gemini API after {MAX_RETRIES} attempts: {last_error}") def _generate_gemini_cli(self, prompt: str, system_prompt: Optional[str] = None) -> str: """Generate using Gemini CLI""" diff --git a/core/pentest_executor.py b/core/pentest_executor.py new file mode 100644 index 0000000..c1f078d --- /dev/null +++ b/core/pentest_executor.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 +""" +Pentest Executor - Executes real pentest tools and captures outputs for PoC generation +""" + +import subprocess +import shutil +import json +import re +import os +import logging +import socket +import urllib.parse +from typing import Dict, List, Optional, Any +from datetime import datetime +from dataclasses import dataclass, field, asdict + +logger = logging.getLogger(__name__) + + +@dataclass +class Vulnerability: + """Represents a discovered vulnerability with PoC""" + title: str + severity: str # Critical, High, Medium, Low, Info + cvss_score: float + cvss_vector: str + description: str + affected_endpoint: str + impact: str + poc_request: str + poc_response: str + poc_payload: str + remediation: str + references: List[str] = field(default_factory=list) + cwe_id: str = "" + tool_output: str = "" + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class ScanResult: + """Contains all scan results and findings""" + target: str + scan_started: str + scan_completed: str = "" + tools_executed: List[Dict] = field(default_factory=list) + vulnerabilities: List[Vulnerability] = field(default_factory=list) + open_ports: List[Dict] = field(default_factory=list) + technologies: List[str] = field(default_factory=list) + raw_outputs: Dict[str, str] = field(default_factory=dict) + + +class PentestExecutor: + """Executes real pentest tools and captures outputs""" + + def __init__(self, target: str, config: Dict = None): + self.target = self._normalize_target(target) + self.config = config or {} + self.scan_result = ScanResult( + target=self.target, + scan_started=datetime.now().isoformat() + ) + self.timeout = 300 # 5 minutes default timeout + + def _normalize_target(self, target: str) -> str: + """Normalize target URL/IP""" + target = target.strip() + if not target.startswith(('http://', 'https://')): + # Check if it's an IP + try: + socket.inet_aton(target.split('/')[0].split(':')[0]) + return target # It's an IP + except socket.error: + # Assume it's a domain + return f"https://{target}" + return target + + def _get_domain(self) -> str: + """Extract domain from target""" + parsed = urllib.parse.urlparse(self.target) + return parsed.netloc or parsed.path.split('/')[0] + + def _get_ip(self) -> Optional[str]: + """Resolve target to IP""" + try: + domain = self._get_domain() + return socket.gethostbyname(domain.split(':')[0]) + except socket.error: + return None + + def _run_command(self, cmd: List[str], timeout: int = None) -> Dict: + """Run a command and capture output""" + timeout = timeout or self.timeout + tool_name = cmd[0] if cmd else "unknown" + + result = { + "tool": tool_name, + "command": " ".join(cmd), + "success": False, + "stdout": "", + "stderr": "", + "exit_code": -1, + "timestamp": datetime.now().isoformat() + } + + # Check if tool exists + if not shutil.which(cmd[0]): + result["stderr"] = f"Tool '{cmd[0]}' not found. Please install it using 'install_tools' command." + logger.warning(f"Tool not found: {cmd[0]}") + return result + + try: + print(f"[*] Executing: {' '.join(cmd)}") + logger.info(f"Executing: {' '.join(cmd)}") + + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + + result["stdout"] = proc.stdout + result["stderr"] = proc.stderr + result["exit_code"] = proc.returncode + result["success"] = proc.returncode == 0 + + except subprocess.TimeoutExpired: + result["stderr"] = f"Command timed out after {timeout} seconds" + logger.warning(f"Timeout: {' '.join(cmd)}") + except Exception as e: + result["stderr"] = str(e) + logger.error(f"Error executing {cmd[0]}: {e}") + + self.scan_result.tools_executed.append(result) + self.scan_result.raw_outputs[tool_name] = result["stdout"] + return result + + def run_nmap_scan(self, ports: str = "1-1000", extra_args: List[str] = None) -> Dict: + """Run nmap port scan""" + domain = self._get_domain() + cmd = ["nmap", "-sV", "-sC", "-p", ports, "--open", domain] + if extra_args: + cmd.extend(extra_args) + + result = self._run_command(cmd) + + if result["success"]: + self._parse_nmap_output(result["stdout"]) + + return result + + def _parse_nmap_output(self, output: str): + """Parse nmap output for open ports""" + port_pattern = r"(\d+)/(\w+)\s+open\s+(\S+)\s*(.*)" + for match in re.finditer(port_pattern, output): + port_info = { + "port": int(match.group(1)), + "protocol": match.group(2), + "service": match.group(3), + "version": match.group(4).strip() + } + self.scan_result.open_ports.append(port_info) + print(f" [+] Found: {port_info['port']}/{port_info['protocol']} - {port_info['service']} {port_info['version']}") + + def run_nikto_scan(self) -> Dict: + """Run nikto web vulnerability scan""" + cmd = ["nikto", "-h", self.target, "-Format", "txt", "-nointeractive"] + result = self._run_command(cmd, timeout=600) + + if result["success"] or result["stdout"]: + self._parse_nikto_output(result["stdout"]) + + return result + + def _parse_nikto_output(self, output: str): + """Parse nikto output for vulnerabilities""" + vuln_patterns = [ + (r"OSVDB-\d+:.*", "Medium"), + (r"\+ (/[^\s]+).*SQL injection", "High"), + (r"\+ (/[^\s]+).*XSS", "High"), + (r"\+ The X-XSS-Protection header", "Low"), + (r"\+ The X-Content-Type-Options header", "Low"), + (r"\+ Server leaks", "Medium"), + (r"\+ Retrieved x-powered-by header", "Info"), + ] + + for line in output.split('\n'): + for pattern, severity in vuln_patterns: + if re.search(pattern, line, re.IGNORECASE): + vuln = Vulnerability( + title=line.strip()[:100], + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector="", + description=line.strip(), + affected_endpoint=self.target, + impact=f"{severity} severity finding detected by Nikto", + poc_request=f"GET {self.target} HTTP/1.1", + poc_response="See tool output", + poc_payload="N/A - Passive scan", + remediation="Review and fix the identified issue", + tool_output=line + ) + self.scan_result.vulnerabilities.append(vuln) + + def run_nuclei_scan(self, templates: str = None) -> Dict: + """Run nuclei vulnerability scan""" + cmd = ["nuclei", "-u", self.target, "-silent", "-nc", "-j"] + if templates: + cmd.extend(["-t", templates]) + + result = self._run_command(cmd, timeout=600) + + if result["stdout"]: + self._parse_nuclei_output(result["stdout"]) + + return result + + def _parse_nuclei_output(self, output: str): + """Parse nuclei JSON output for vulnerabilities""" + for line in output.strip().split('\n'): + if not line.strip(): + continue + try: + finding = json.loads(line) + severity = finding.get("info", {}).get("severity", "unknown").capitalize() + + vuln = Vulnerability( + title=finding.get("info", {}).get("name", "Unknown"), + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector=finding.get("info", {}).get("classification", {}).get("cvss-metrics", ""), + description=finding.get("info", {}).get("description", ""), + affected_endpoint=finding.get("matched-at", self.target), + impact=finding.get("info", {}).get("impact", f"{severity} severity vulnerability"), + poc_request=finding.get("curl-command", f"curl -X GET '{finding.get('matched-at', self.target)}'"), + poc_response=finding.get("response", "")[:500] if finding.get("response") else "See tool output", + poc_payload=finding.get("matcher-name", "Template-based detection"), + remediation=finding.get("info", {}).get("remediation", "Apply vendor patches"), + references=finding.get("info", {}).get("reference", []), + cwe_id=str(finding.get("info", {}).get("classification", {}).get("cwe-id", "")), + tool_output=json.dumps(finding, indent=2) + ) + self.scan_result.vulnerabilities.append(vuln) + print(f" [!] {severity}: {vuln.title} at {vuln.affected_endpoint}") + + except json.JSONDecodeError: + continue + + def run_sqlmap_scan(self, param: str = None) -> Dict: + """Run sqlmap SQL injection scan""" + cmd = ["sqlmap", "-u", self.target, "--batch", "--level=2", "--risk=2", + "--random-agent", "--threads=5", "--output-dir=/tmp/sqlmap_output"] + + if param: + cmd.extend(["--param", param]) + + result = self._run_command(cmd, timeout=600) + + if result["stdout"]: + self._parse_sqlmap_output(result["stdout"]) + + return result + + def _parse_sqlmap_output(self, output: str): + """Parse sqlmap output for SQL injection vulnerabilities""" + if "is vulnerable" in output.lower() or "injection" in output.lower(): + # Extract injection details + vuln_type = "Blind" if "blind" in output.lower() else "Error-based" + if "union" in output.lower(): + vuln_type = "UNION-based" + elif "time-based" in output.lower(): + vuln_type = "Time-based blind" + + # Extract payload + payload_match = re.search(r"Payload: (.+)", output) + payload = payload_match.group(1) if payload_match else "See tool output" + + vuln = Vulnerability( + title=f"SQL Injection ({vuln_type})", + severity="Critical", + cvss_score=9.8, + cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + description=f"SQL Injection vulnerability detected. Type: {vuln_type}. This allows an attacker to manipulate database queries.", + affected_endpoint=self.target, + impact="Complete database compromise. Attacker can read, modify, or delete data. Potential for remote code execution.", + poc_request=f"GET {self.target}?param={payload} HTTP/1.1\nHost: {self._get_domain()}\nUser-Agent: Mozilla/5.0", + poc_response="Database error or data disclosure in response", + poc_payload=payload, + remediation="Use parameterized queries/prepared statements. Implement input validation. Apply least privilege to database accounts.", + cwe_id="CWE-89", + references=["https://owasp.org/www-community/attacks/SQL_Injection"], + tool_output=output[:2000] + ) + self.scan_result.vulnerabilities.append(vuln) + print(f" [!!!] CRITICAL: SQL Injection found!") + + def run_ffuf_scan(self, wordlist: str = "/usr/share/wordlists/dirb/common.txt") -> Dict: + """Run ffuf directory/file bruteforce""" + target_url = self.target.rstrip('/') + "/FUZZ" + cmd = ["ffuf", "-u", target_url, "-w", wordlist, "-mc", "200,301,302,403", + "-o", "/tmp/ffuf_output.json", "-of", "json", "-t", "50"] + + result = self._run_command(cmd, timeout=300) + + # Parse output file if exists + if os.path.exists("/tmp/ffuf_output.json"): + try: + with open("/tmp/ffuf_output.json", "r") as f: + ffuf_data = json.load(f) + for res in ffuf_data.get("results", []): + print(f" [+] Found: {res.get('url')} (Status: {res.get('status')})") + except: + pass + + return result + + def run_curl_test(self, method: str = "GET", path: str = "/", headers: Dict = None, data: str = None) -> Dict: + """Run curl request and capture full request/response""" + url = self.target.rstrip('/') + path + cmd = ["curl", "-v", "-s", "-k", "-X", method, url] + + if headers: + for k, v in headers.items(): + cmd.extend(["-H", f"{k}: {v}"]) + + if data: + cmd.extend(["-d", data]) + + result = self._run_command(cmd) + return result + + def run_http_security_check(self) -> Dict: + """Check HTTP security headers""" + cmd = ["curl", "-s", "-I", "-k", self.target] + result = self._run_command(cmd) + + if result["success"]: + self._parse_security_headers(result["stdout"]) + + return result + + def _parse_security_headers(self, headers: str): + """Parse response headers for security issues""" + required_headers = { + "X-Frame-Options": ("Missing X-Frame-Options", "Medium", "Clickjacking protection"), + "X-Content-Type-Options": ("Missing X-Content-Type-Options", "Low", "MIME type sniffing protection"), + "X-XSS-Protection": ("Missing X-XSS-Protection", "Low", "XSS filter"), + "Strict-Transport-Security": ("Missing HSTS Header", "Medium", "HTTPS enforcement"), + "Content-Security-Policy": ("Missing Content-Security-Policy", "Medium", "XSS/injection protection"), + } + + headers_lower = headers.lower() + + for header, (title, severity, desc) in required_headers.items(): + if header.lower() not in headers_lower: + vuln = Vulnerability( + title=title, + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector="", + description=f"The {header} header is not set. This header provides {desc}.", + affected_endpoint=self.target, + impact=f"Missing {desc} could lead to attacks", + poc_request=f"curl -I {self.target}", + poc_response=headers[:500], + poc_payload="N/A - Header check", + remediation=f"Add the {header} header to all HTTP responses", + cwe_id="CWE-693" + ) + self.scan_result.vulnerabilities.append(vuln) + + def run_whatweb_scan(self) -> Dict: + """Run whatweb technology detection""" + cmd = ["whatweb", "-a", "3", "--color=never", self.target] + result = self._run_command(cmd) + + if result["stdout"]: + # Extract technologies + techs = re.findall(r'\[([^\]]+)\]', result["stdout"]) + self.scan_result.technologies.extend(techs[:20]) + print(f" [+] Technologies: {', '.join(techs[:10])}") + + return result + + def _severity_to_cvss(self, severity: str) -> float: + """Convert severity to CVSS score""" + mapping = { + "critical": 9.5, + "high": 7.5, + "medium": 5.5, + "low": 3.0, + "info": 0.0, + "unknown": 0.0 + } + return mapping.get(severity.lower(), 0.0) + + def run_full_scan(self) -> ScanResult: + """Run a complete pentest scan""" + print(f"\n{'='*60}") + print(f"[*] Starting Full Pentest Scan on: {self.target}") + print(f"{'='*60}\n") + + # Phase 1: Reconnaissance + print("[Phase 1] Reconnaissance") + print("-" * 40) + + print("[*] Running port scan...") + self.run_nmap_scan() + + print("\n[*] Running technology detection...") + self.run_whatweb_scan() + + print("\n[*] Checking security headers...") + self.run_http_security_check() + + # Phase 2: Vulnerability Scanning + print(f"\n[Phase 2] Vulnerability Scanning") + print("-" * 40) + + print("[*] Running Nuclei scan...") + self.run_nuclei_scan() + + print("\n[*] Running Nikto scan...") + self.run_nikto_scan() + + # Phase 3: Specific Tests + print(f"\n[Phase 3] Specific Vulnerability Tests") + print("-" * 40) + + print("[*] Testing for SQL Injection...") + self.run_sqlmap_scan() + + print("\n[*] Running directory enumeration...") + self.run_ffuf_scan() + + # Complete scan + self.scan_result.scan_completed = datetime.now().isoformat() + + print(f"\n{'='*60}") + print(f"[*] Scan Complete!") + print(f" - Tools Executed: {len(self.scan_result.tools_executed)}") + print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") + print(f" - Open Ports: {len(self.scan_result.open_ports)}") + print(f"{'='*60}\n") + + return self.scan_result + + def run_quick_scan(self) -> ScanResult: + """Run a quick scan with essential tools only""" + print(f"\n{'='*60}") + print(f"[*] Starting Quick Scan on: {self.target}") + print(f"{'='*60}\n") + + print("[*] Running port scan (top 100 ports)...") + self.run_nmap_scan(ports="1-100") + + print("\n[*] Checking security headers...") + self.run_http_security_check() + + print("\n[*] Running Nuclei scan...") + self.run_nuclei_scan() + + self.scan_result.scan_completed = datetime.now().isoformat() + + print(f"\n{'='*60}") + print(f"[*] Quick Scan Complete!") + print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") + print(f"{'='*60}\n") + + return self.scan_result + + def get_findings_summary(self) -> Dict: + """Get summary of findings""" + severity_count = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Info": 0} + + for vuln in self.scan_result.vulnerabilities: + sev = vuln.severity.capitalize() + if sev in severity_count: + severity_count[sev] += 1 + + return { + "target": self.target, + "total_vulnerabilities": len(self.scan_result.vulnerabilities), + "severity_breakdown": severity_count, + "open_ports": len(self.scan_result.open_ports), + "technologies": self.scan_result.technologies, + "tools_executed": len(self.scan_result.tools_executed) + } + + def to_dict(self) -> Dict: + """Convert scan results to dictionary""" + return { + "target": self.scan_result.target, + "scan_started": self.scan_result.scan_started, + "scan_completed": self.scan_result.scan_completed, + "tools_executed": self.scan_result.tools_executed, + "vulnerabilities": [asdict(v) for v in self.scan_result.vulnerabilities], + "open_ports": self.scan_result.open_ports, + "technologies": self.scan_result.technologies, + "summary": self.get_findings_summary() + } diff --git a/core/report_generator.py b/core/report_generator.py new file mode 100644 index 0000000..4c2c3c1 --- /dev/null +++ b/core/report_generator.py @@ -0,0 +1,639 @@ +#!/usr/bin/env python3 +""" +Professional Pentest Report Generator +Generates detailed reports with PoCs, CVSS scores, requests/responses +""" + +import json +import os +from datetime import datetime +from typing import Dict, List, Any +import html +import logging + +logger = logging.getLogger(__name__) + + +class ReportGenerator: + """Generates professional penetration testing reports""" + + def __init__(self, scan_results: Dict, llm_analysis: str = ""): + self.scan_results = scan_results + self.llm_analysis = llm_analysis + self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + def _get_severity_color(self, severity: str) -> str: + """Get color for severity level""" + colors = { + "critical": "#dc3545", + "high": "#fd7e14", + "medium": "#ffc107", + "low": "#17a2b8", + "info": "#6c757d" + } + return colors.get(severity.lower(), "#6c757d") + + def _get_severity_badge(self, severity: str) -> str: + """Get HTML badge for severity""" + color = self._get_severity_color(severity) + return f'{severity.upper()}' + + def _escape_html(self, text: str) -> str: + """Escape HTML characters""" + if not text: + return "" + return html.escape(str(text)) + + def _format_code_block(self, code: str, language: str = "") -> str: + """Format code block with syntax highlighting""" + escaped = self._escape_html(code) + return f'
{escaped}
' + + def generate_executive_summary(self) -> str: + """Generate executive summary section""" + summary = self.scan_results.get("summary", {}) + severity = summary.get("severity_breakdown", {}) + + total = summary.get("total_vulnerabilities", 0) + critical = severity.get("Critical", 0) + high = severity.get("High", 0) + medium = severity.get("Medium", 0) + low = severity.get("Low", 0) + + risk_level = "Critical" if critical > 0 else "High" if high > 0 else "Medium" if medium > 0 else "Low" + + return f""" +
+
+

Executive Summary

+
+
+
+
+

Assessment Overview

+ + + + + +
Target:{self._escape_html(self.scan_results.get('target', 'N/A'))}
Scan Started:{self.scan_results.get('scan_started', 'N/A')}
Scan Completed:{self.scan_results.get('scan_completed', 'N/A')}
Overall Risk Level:{self._get_severity_badge(risk_level)}
+
+
+

Findings Summary

+
+
{critical} Critical
+
{high} High
+
{medium} Medium
+
{low} Low
+
+

Total Vulnerabilities: {total}

+

Open Ports Found: {summary.get('open_ports', 0)}

+

Tools Executed: {summary.get('tools_executed', 0)}

+
+
+
+
+ """ + + def generate_vulnerability_card(self, vuln: Dict, index: int) -> str: + """Generate HTML card for a single vulnerability""" + severity = vuln.get("severity", "Unknown") + color = self._get_severity_color(severity) + + # Build references list + refs_html = "" + if vuln.get("references"): + refs_html = "" + + return f""" +
+
+
+

{self._escape_html(vuln.get('title', 'Unknown Vulnerability'))}

+
+ {self._get_severity_badge(severity)} + CVSS: {vuln.get('cvss_score', 'N/A')} + {f'CWE: {vuln.get("cwe_id")}' if vuln.get('cwe_id') else ''} +
+
+
+ +
+
+

Description

+

{self._escape_html(vuln.get('description', 'No description available'))}

+
+ +
+

Affected Endpoint

+ {self._escape_html(vuln.get('affected_endpoint', 'N/A'))} +
+ +
+

Impact

+

{self._escape_html(vuln.get('impact', 'Impact not assessed'))}

+
+ +
+

Proof of Concept (PoC)

+ +
+
Request
+ {self._format_code_block(vuln.get('poc_request', 'N/A'), 'http')} +
+ +
+
Payload
+ {self._format_code_block(vuln.get('poc_payload', 'N/A'), 'text')} +
+ +
+
Response
+ {self._format_code_block(vuln.get('poc_response', 'N/A')[:1000], 'http')} +
+
+ + {f'''
+

CVSS Vector

+ {self._escape_html(vuln.get('cvss_vector', 'N/A'))} +
''' if vuln.get('cvss_vector') else ''} + +
+

Remediation

+

{self._escape_html(vuln.get('remediation', 'Consult vendor documentation for patches'))}

+
+ + {f'''
+

References

+ {refs_html} +
''' if refs_html else ''} + + {f'''
+

Raw Tool Output

+ {self._format_code_block(vuln.get('tool_output', '')[:2000], 'text')} +
''' if vuln.get('tool_output') else ''} +
+
+ """ + + def generate_open_ports_section(self) -> str: + """Generate open ports section""" + ports = self.scan_results.get("open_ports", []) + if not ports: + return "" + + rows = "" + for port in ports: + rows += f""" + + {port.get('port', 'N/A')} + {port.get('protocol', 'N/A')} + {self._escape_html(port.get('service', 'N/A'))} + {self._escape_html(port.get('version', 'N/A'))} + + """ + + return f""" +
+
+

Open Ports & Services

+
+
+ + + + + + + + + + + {rows} + +
PortProtocolServiceVersion
+
+
+ """ + + def generate_tools_executed_section(self) -> str: + """Generate tools executed section""" + tools = self.scan_results.get("tools_executed", []) + if not tools: + return "" + + rows = "" + for tool in tools: + status = "Success" if tool.get("success") else "Failed" + status_class = "text-success" if tool.get("success") else "text-danger" + rows += f""" + + {self._escape_html(tool.get('tool', 'N/A'))} + {self._escape_html(tool.get('command', 'N/A')[:100])} + {status} + {tool.get('timestamp', 'N/A')} + + """ + + return f""" +
+
+

Tools Executed

+
+
+ + + + + + + + + + + {rows} + +
ToolCommandStatusTimestamp
+
+
+ """ + + def generate_llm_analysis_section(self) -> str: + """Generate AI analysis section""" + if not self.llm_analysis: + return "" + + import mistune + analysis_html = mistune.html(self.llm_analysis) + + return f""" +
+
+

AI Security Analysis

+
+
+ {analysis_html} +
+
+ """ + + def generate_html_report(self) -> str: + """Generate complete HTML report""" + vulnerabilities = self.scan_results.get("vulnerabilities", []) + + # Sort vulnerabilities by severity + severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Info": 4} + vulnerabilities.sort(key=lambda x: severity_order.get(x.get("severity", "Info").capitalize(), 5)) + + vuln_cards = "" + for i, vuln in enumerate(vulnerabilities, 1): + vuln_cards += self.generate_vulnerability_card(vuln, i) + + # Table of contents + toc_items = "" + for i, vuln in enumerate(vulnerabilities, 1): + severity = vuln.get("severity", "Unknown") + color = self._get_severity_color(severity) + toc_items += f'
  • [{severity.upper()}] {self._escape_html(vuln.get("title", "Unknown")[:50])}
  • ' + + html = f""" + + + + + + NeuroSploitv2 - Penetration Test Report + + + + + +
    +
    +

    NeuroSploitv2

    +

    Penetration Test Report

    +

    Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

    +
    + + {self.generate_executive_summary()} + +
    +

    Table of Contents - Vulnerabilities ({len(vulnerabilities)})

    +
      + {toc_items} +
    +
    + + {self.generate_open_ports_section()} + + {self.generate_tools_executed_section()} + +
    +
    +

    Vulnerability Details

    +
    +
    + {vuln_cards if vuln_cards else '

    No vulnerabilities found during the assessment.

    '} +
    +
    + + {self.generate_llm_analysis_section()} + + +
    + + + + + + """ + + return html + + def save_report(self, output_dir: str = "reports") -> str: + """Save HTML report to file""" + os.makedirs(output_dir, exist_ok=True) + + filename = f"pentest_report_{self.timestamp}.html" + filepath = os.path.join(output_dir, filename) + + html_content = self.generate_html_report() + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(html_content) + + logger.info(f"Report saved to: {filepath}") + return filepath + + def save_json_report(self, output_dir: str = "results") -> str: + """Save JSON report to file""" + os.makedirs(output_dir, exist_ok=True) + + filename = f"pentest_results_{self.timestamp}.json" + filepath = os.path.join(output_dir, filename) + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(self.scan_results, f, indent=2, default=str) + + logger.info(f"JSON results saved to: {filepath}") + return filepath diff --git a/core/tool_installer.py b/core/tool_installer.py new file mode 100644 index 0000000..1f19d7b --- /dev/null +++ b/core/tool_installer.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +""" +Tool Installer - Installs required pentest tools for NeuroSploitv2 +""" + +import subprocess +import shutil +import os +import sys +import logging +from typing import Dict, List, Tuple + +logger = logging.getLogger(__name__) + +# Tool definitions with installation commands for different package managers +PENTEST_TOOLS = { + "nmap": { + "description": "Network scanner and port mapper", + "check_cmd": "nmap --version", + "install": { + "apt": "sudo apt-get install -y nmap", + "yum": "sudo yum install -y nmap", + "dnf": "sudo dnf install -y nmap", + "brew": "brew install nmap", + "pacman": "sudo pacman -S --noconfirm nmap" + }, + "binary": "nmap" + }, + "sqlmap": { + "description": "SQL injection detection and exploitation", + "check_cmd": "sqlmap --version", + "install": { + "apt": "sudo apt-get install -y sqlmap", + "yum": "sudo pip3 install sqlmap", + "dnf": "sudo dnf install -y sqlmap", + "brew": "brew install sqlmap", + "pacman": "sudo pacman -S --noconfirm sqlmap", + "pip": "pip3 install sqlmap" + }, + "binary": "sqlmap" + }, + "nikto": { + "description": "Web server vulnerability scanner", + "check_cmd": "nikto -Version", + "install": { + "apt": "sudo apt-get install -y nikto", + "yum": "sudo yum install -y nikto", + "dnf": "sudo dnf install -y nikto", + "brew": "brew install nikto", + "pacman": "sudo pacman -S --noconfirm nikto" + }, + "binary": "nikto" + }, + "gobuster": { + "description": "Directory/file & DNS busting tool", + "check_cmd": "gobuster version", + "install": { + "apt": "sudo apt-get install -y gobuster", + "brew": "brew install gobuster", + "go": "go install github.com/OJ/gobuster/v3@latest" + }, + "binary": "gobuster" + }, + "nuclei": { + "description": "Fast vulnerability scanner based on templates", + "check_cmd": "nuclei -version", + "install": { + "go": "go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest", + "brew": "brew install nuclei" + }, + "binary": "nuclei" + }, + "subfinder": { + "description": "Subdomain discovery tool", + "check_cmd": "subfinder -version", + "install": { + "go": "go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest", + "brew": "brew install subfinder" + }, + "binary": "subfinder" + }, + "httpx": { + "description": "HTTP toolkit for probing", + "check_cmd": "httpx -version", + "install": { + "go": "go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest", + "brew": "brew install httpx" + }, + "binary": "httpx" + }, + "ffuf": { + "description": "Fast web fuzzer", + "check_cmd": "ffuf -V", + "install": { + "apt": "sudo apt-get install -y ffuf", + "go": "go install github.com/ffuf/ffuf/v2@latest", + "brew": "brew install ffuf" + }, + "binary": "ffuf" + }, + "hydra": { + "description": "Network login cracker", + "check_cmd": "hydra -h", + "install": { + "apt": "sudo apt-get install -y hydra", + "yum": "sudo yum install -y hydra", + "dnf": "sudo dnf install -y hydra", + "brew": "brew install hydra", + "pacman": "sudo pacman -S --noconfirm hydra" + }, + "binary": "hydra" + }, + "whatweb": { + "description": "Web technology identifier", + "check_cmd": "whatweb --version", + "install": { + "apt": "sudo apt-get install -y whatweb", + "brew": "brew install whatweb", + "gem": "sudo gem install whatweb" + }, + "binary": "whatweb" + }, + "wpscan": { + "description": "WordPress vulnerability scanner", + "check_cmd": "wpscan --version", + "install": { + "apt": "sudo apt-get install -y wpscan", + "brew": "brew install wpscan", + "gem": "sudo gem install wpscan" + }, + "binary": "wpscan" + }, + "curl": { + "description": "HTTP client for requests", + "check_cmd": "curl --version", + "install": { + "apt": "sudo apt-get install -y curl", + "yum": "sudo yum install -y curl", + "dnf": "sudo dnf install -y curl", + "brew": "brew install curl", + "pacman": "sudo pacman -S --noconfirm curl" + }, + "binary": "curl" + }, + "jq": { + "description": "JSON processor for parsing outputs", + "check_cmd": "jq --version", + "install": { + "apt": "sudo apt-get install -y jq", + "yum": "sudo yum install -y jq", + "dnf": "sudo dnf install -y jq", + "brew": "brew install jq", + "pacman": "sudo pacman -S --noconfirm jq" + }, + "binary": "jq" + }, + "dirsearch": { + "description": "Web path discovery tool", + "check_cmd": "dirsearch --version", + "install": { + "pip": "pip3 install dirsearch" + }, + "binary": "dirsearch" + }, + "wafw00f": { + "description": "Web Application Firewall detection", + "check_cmd": "wafw00f -h", + "install": { + "pip": "pip3 install wafw00f" + }, + "binary": "wafw00f" + } +} + + +class ToolInstaller: + """Manages installation of pentest tools""" + + def __init__(self): + self.package_manager = self._detect_package_manager() + + def _detect_package_manager(self) -> str: + """Detect the system's package manager""" + managers = [ + ("apt-get", "apt"), + ("dnf", "dnf"), + ("yum", "yum"), + ("pacman", "pacman"), + ("brew", "brew") + ] + + for cmd, name in managers: + if shutil.which(cmd): + return name + + # Fallback to pip for Python tools + return "pip" + + def check_tool_installed(self, tool_name: str) -> Tuple[bool, str]: + """Check if a tool is installed and return its path""" + tool_info = PENTEST_TOOLS.get(tool_name) + if not tool_info: + return False, "" + + binary = tool_info.get("binary", tool_name) + path = shutil.which(binary) + + if path: + return True, path + + # Check common paths + common_paths = [ + f"/usr/bin/{binary}", + f"/usr/local/bin/{binary}", + f"/opt/{binary}/{binary}", + os.path.expanduser(f"~/go/bin/{binary}"), + f"/snap/bin/{binary}" + ] + + for p in common_paths: + if os.path.isfile(p) and os.access(p, os.X_OK): + return True, p + + return False, "" + + def get_tools_status(self) -> Dict[str, Dict]: + """Get installation status of all tools""" + status = {} + for tool_name, tool_info in PENTEST_TOOLS.items(): + installed, path = self.check_tool_installed(tool_name) + status[tool_name] = { + "installed": installed, + "path": path, + "description": tool_info["description"] + } + return status + + def install_tool(self, tool_name: str) -> Tuple[bool, str]: + """Install a specific tool""" + if tool_name not in PENTEST_TOOLS: + return False, f"Unknown tool: {tool_name}" + + tool_info = PENTEST_TOOLS[tool_name] + install_cmds = tool_info.get("install", {}) + + # Try package manager first + if self.package_manager in install_cmds: + cmd = install_cmds[self.package_manager] + elif "pip" in install_cmds: + cmd = install_cmds["pip"] + elif "go" in install_cmds and shutil.which("go"): + cmd = install_cmds["go"] + elif "gem" in install_cmds and shutil.which("gem"): + cmd = install_cmds["gem"] + else: + return False, f"No installation method available for {tool_name} on this system" + + print(f"[*] Installing {tool_name}...") + print(f" Command: {cmd}") + + try: + result = subprocess.run( + cmd, + shell=True, + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode == 0: + # Verify installation + installed, path = self.check_tool_installed(tool_name) + if installed: + return True, f"Successfully installed {tool_name} at {path}" + else: + return True, f"Installation completed but binary not found in PATH" + else: + return False, f"Installation failed: {result.stderr}" + + except subprocess.TimeoutExpired: + return False, "Installation timed out" + except Exception as e: + return False, f"Installation error: {str(e)}" + + def install_all_tools(self) -> Dict[str, Tuple[bool, str]]: + """Install all pentest tools""" + results = {} + for tool_name in PENTEST_TOOLS: + installed, path = self.check_tool_installed(tool_name) + if installed: + results[tool_name] = (True, f"Already installed at {path}") + else: + results[tool_name] = self.install_tool(tool_name) + return results + + def install_essential_tools(self) -> Dict[str, Tuple[bool, str]]: + """Install only essential tools for basic pentesting""" + essential = ["nmap", "sqlmap", "nikto", "nuclei", "curl", "jq", "httpx", "ffuf"] + results = {} + for tool_name in essential: + installed, path = self.check_tool_installed(tool_name) + if installed: + results[tool_name] = (True, f"Already installed at {path}") + else: + results[tool_name] = self.install_tool(tool_name) + return results + + +def print_tools_menu(): + """Print the tools installation menu""" + installer = ToolInstaller() + status = installer.get_tools_status() + + print("\n" + "="*70) + print(" PENTEST TOOLS INSTALLATION MANAGER") + print("="*70) + print(f"\nDetected Package Manager: {installer.package_manager}") + print("\nAvailable Tools:") + print("-"*70) + + for i, (tool_name, info) in enumerate(status.items(), 1): + status_icon = "[+]" if info["installed"] else "[-]" + status_text = "Installed" if info["installed"] else "Not Installed" + print(f" {i:2}. {status_icon} {tool_name:15} - {info['description'][:40]}") + + print("-"*70) + print("\nOptions:") + print(" A - Install ALL tools") + print(" E - Install ESSENTIAL tools only (nmap, sqlmap, nikto, nuclei, etc.)") + print(" 1-N - Install specific tool by number") + print(" Q - Return to main menu") + print("-"*70) + + return installer, list(status.keys()) + + +def run_installer_menu(): + """Run the interactive installer menu""" + while True: + installer, tool_list = print_tools_menu() + + choice = input("\nSelect option: ").strip().upper() + + if choice == 'Q': + break + elif choice == 'A': + print("\n[*] Installing all tools...") + results = installer.install_all_tools() + for tool, (success, msg) in results.items(): + icon = "[+]" if success else "[!]" + print(f" {icon} {tool}: {msg}") + input("\nPress Enter to continue...") + elif choice == 'E': + print("\n[*] Installing essential tools...") + results = installer.install_essential_tools() + for tool, (success, msg) in results.items(): + icon = "[+]" if success else "[!]" + print(f" {icon} {tool}: {msg}") + input("\nPress Enter to continue...") + else: + try: + idx = int(choice) - 1 + if 0 <= idx < len(tool_list): + tool_name = tool_list[idx] + success, msg = installer.install_tool(tool_name) + icon = "[+]" if success else "[!]" + print(f"\n {icon} {msg}") + input("\nPress Enter to continue...") + else: + print("[!] Invalid selection") + except ValueError: + print("[!] Invalid input") + + +if __name__ == "__main__": + run_installer_menu()