diff --git a/core/__init__.py b/core/__init__.py index e69de29..945c9b4 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -0,0 +1 @@ +. \ No newline at end of file diff --git a/core/llm_manager.py b/core/llm_manager.py index 9b2adae..d5d2a1f 100644 --- a/core/llm_manager.py +++ b/core/llm_manager.py @@ -7,11 +7,17 @@ Supports: Claude, GPT, Gemini, Ollama, and custom models import os import json import subprocess +import time from typing import Dict, List, Optional, Any import logging import requests -from pathlib import Path # Added for Path -import re # Added for regex operations +from pathlib import Path +import re + +# Retry configuration +MAX_RETRIES = 3 +RETRY_DELAY = 1.0 # seconds +RETRY_MULTIPLIER = 2.0 logger = logging.getLogger(__name__) @@ -58,12 +64,12 @@ class LLMManager: return api_key_config def _load_all_prompts(self) -> Dict: - """Load prompts from both JSON library and Markdown library files.""" + """Load prompts from JSON library and Markdown files (both prompts/ and prompts/md_library/).""" all_prompts = { "json_prompts": {}, "md_prompts": {} } - + # Load from JSON library if self.json_prompts_file_path.exists(): try: @@ -75,32 +81,45 @@ class LLMManager: else: logger.warning(f"JSON prompts file not found at {self.json_prompts_file_path}. Some AI functionalities might be limited.") - # Load from Markdown library - if self.md_prompts_dir_path.is_dir(): - for md_file in self.md_prompts_dir_path.glob("*.md"): - try: - content = md_file.read_text() - prompt_name = md_file.stem # Use filename as prompt name + # Load from both prompts/ root and prompts/md_library/ + prompts_root = Path("prompts") + md_dirs = [prompts_root, self.md_prompts_dir_path] - user_prompt_match = re.search(r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)", content, re.DOTALL) - system_prompt_match = re.search(r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)", content, re.DOTALL) + for md_dir in md_dirs: + if md_dir.is_dir(): + for md_file in md_dir.glob("*.md"): + try: + content = md_file.read_text() + prompt_name = md_file.stem # Use filename as prompt name - user_prompt = user_prompt_match.group(1).strip() if user_prompt_match else "" - system_prompt = system_prompt_match.group(1).strip() if system_prompt_match else "" + # Skip if already loaded (md_library has priority) + if prompt_name in all_prompts["md_prompts"]: + continue - if user_prompt or system_prompt: - all_prompts["md_prompts"][prompt_name] = { - "user_prompt": user_prompt, - "system_prompt": system_prompt - } - else: - logger.warning(f"No valid User or System Prompt found in {md_file.name}. Skipping.") + # Try structured format first (## User Prompt / ## System Prompt) + user_prompt_match = re.search(r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)", content, re.DOTALL) + system_prompt_match = re.search(r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)", content, re.DOTALL) - except Exception as e: - logger.error(f"Error loading prompt from {md_file.name}: {e}") - logger.info(f"Loaded {len(all_prompts['md_prompts'])} prompts from Markdown library.") - else: - logger.warning(f"Markdown prompts directory not found at {self.md_prompts_dir_path}. Some AI functionalities might be limited.") + user_prompt = user_prompt_match.group(1).strip() if user_prompt_match else "" + system_prompt = system_prompt_match.group(1).strip() if system_prompt_match else "" + + # If no structured format, use entire content as system_prompt + if not user_prompt and not system_prompt: + system_prompt = content.strip() + user_prompt = "" # Will be filled with user input at runtime + logger.debug(f"Loaded {md_file.name} as full-content prompt") + + if user_prompt or system_prompt: + all_prompts["md_prompts"][prompt_name] = { + "user_prompt": user_prompt, + "system_prompt": system_prompt + } + logger.debug(f"Loaded prompt: {prompt_name}") + + except Exception as e: + logger.error(f"Error loading prompt from {md_file.name}: {e}") + + logger.info(f"Loaded {len(all_prompts['md_prompts'])} prompts from Markdown files.") return all_prompts @@ -233,63 +252,267 @@ Identify any potential hallucinations, inconsistencies, or areas where the respo self.hallucination_mitigation_strategy = original_mitigation_state # Restore original state def _generate_claude(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using Claude API""" - import anthropic - - client = anthropic.Anthropic(api_key=self.api_key) - - messages = [{"role": "user", "content": prompt}] - - response = client.messages.create( - model=self.model, - max_tokens=self.max_tokens, - temperature=self.temperature, - system=system_prompt or "", - messages=messages - ) - - return response.content[0].text + """Generate using Claude API with requests (bypasses httpx/SSL issues on macOS)""" + if not self.api_key: + raise ValueError("ANTHROPIC_API_KEY not set. Please set the environment variable or configure in config.yaml") + + url = "https://api.anthropic.com/v1/messages" + headers = { + "x-api-key": self.api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json" + } + + data = { + "model": self.model, + "max_tokens": self.max_tokens, + "temperature": self.temperature, + "messages": [{"role": "user", "content": prompt}] + } + + if system_prompt: + data["system"] = system_prompt + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"Claude API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["content"][0]["text"] + + elif response.status_code == 401: + logger.error("Claude API authentication failed. Check your ANTHROPIC_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"Claude API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"Claude API server error (attempt {attempt + 1}/{MAX_RETRIES}): {response.status_code}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"Claude API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"Claude API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"Claude API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"Claude API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to Claude API after {MAX_RETRIES} attempts: {last_error}") def _generate_gpt(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using OpenAI GPT API""" - import openai - - client = openai.OpenAI(api_key=self.api_key) - + """Generate using OpenAI GPT API with requests (bypasses SDK issues)""" + if not self.api_key: + raise ValueError("OPENAI_API_KEY not set. Please set the environment variable or configure in config.yaml") + + url = "https://api.openai.com/v1/chat/completions" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) - - response = client.chat.completions.create( - model=self.model, - messages=messages, - temperature=self.temperature, - max_tokens=self.max_tokens - ) - - return response.choices[0].message.content + + data = { + "model": self.model, + "messages": messages, + "temperature": self.temperature, + "max_tokens": self.max_tokens + } + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"OpenAI API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["choices"][0]["message"]["content"] + + elif response.status_code == 401: + logger.error("OpenAI API authentication failed. Check your OPENAI_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"OpenAI API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"OpenAI API server error (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"OpenAI API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"OpenAI API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"OpenAI API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"OpenAI API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to OpenAI API after {MAX_RETRIES} attempts: {last_error}") def _generate_gemini(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using Google Gemini API""" - import google.generativeai as genai - - genai.configure(api_key=self.api_key) - model = genai.GenerativeModel(self.model) - + """Generate using Google Gemini API with requests (bypasses SDK issues)""" + if not self.api_key: + raise ValueError("GOOGLE_API_KEY not set. Please set the environment variable or configure in config.yaml") + + # Use v1beta for generateContent endpoint + url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model}:generateContent?key={self.api_key}" + headers = { + "Content-Type": "application/json" + } + full_prompt = prompt if system_prompt: full_prompt = f"{system_prompt}\n\n{prompt}" - - response = model.generate_content( - full_prompt, - generation_config={ - 'temperature': self.temperature, - 'max_output_tokens': self.max_tokens, + + data = { + "contents": [{"parts": [{"text": full_prompt}]}], + "generationConfig": { + "temperature": self.temperature, + "maxOutputTokens": self.max_tokens } - ) - - return response.text + } + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"Gemini API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["candidates"][0]["content"]["parts"][0]["text"] + + elif response.status_code == 401 or response.status_code == 403: + logger.error("Gemini API authentication failed. Check your GOOGLE_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"Gemini API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"Gemini API server error (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"Gemini API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"Gemini API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"Gemini API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"Gemini API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to Gemini API after {MAX_RETRIES} attempts: {last_error}") def _generate_gemini_cli(self, prompt: str, system_prompt: Optional[str] = None) -> str: """Generate using Gemini CLI""" diff --git a/core/pentest_executor.py b/core/pentest_executor.py new file mode 100644 index 0000000..c1f078d --- /dev/null +++ b/core/pentest_executor.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 +""" +Pentest Executor - Executes real pentest tools and captures outputs for PoC generation +""" + +import subprocess +import shutil +import json +import re +import os +import logging +import socket +import urllib.parse +from typing import Dict, List, Optional, Any +from datetime import datetime +from dataclasses import dataclass, field, asdict + +logger = logging.getLogger(__name__) + + +@dataclass +class Vulnerability: + """Represents a discovered vulnerability with PoC""" + title: str + severity: str # Critical, High, Medium, Low, Info + cvss_score: float + cvss_vector: str + description: str + affected_endpoint: str + impact: str + poc_request: str + poc_response: str + poc_payload: str + remediation: str + references: List[str] = field(default_factory=list) + cwe_id: str = "" + tool_output: str = "" + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class ScanResult: + """Contains all scan results and findings""" + target: str + scan_started: str + scan_completed: str = "" + tools_executed: List[Dict] = field(default_factory=list) + vulnerabilities: List[Vulnerability] = field(default_factory=list) + open_ports: List[Dict] = field(default_factory=list) + technologies: List[str] = field(default_factory=list) + raw_outputs: Dict[str, str] = field(default_factory=dict) + + +class PentestExecutor: + """Executes real pentest tools and captures outputs""" + + def __init__(self, target: str, config: Dict = None): + self.target = self._normalize_target(target) + self.config = config or {} + self.scan_result = ScanResult( + target=self.target, + scan_started=datetime.now().isoformat() + ) + self.timeout = 300 # 5 minutes default timeout + + def _normalize_target(self, target: str) -> str: + """Normalize target URL/IP""" + target = target.strip() + if not target.startswith(('http://', 'https://')): + # Check if it's an IP + try: + socket.inet_aton(target.split('/')[0].split(':')[0]) + return target # It's an IP + except socket.error: + # Assume it's a domain + return f"https://{target}" + return target + + def _get_domain(self) -> str: + """Extract domain from target""" + parsed = urllib.parse.urlparse(self.target) + return parsed.netloc or parsed.path.split('/')[0] + + def _get_ip(self) -> Optional[str]: + """Resolve target to IP""" + try: + domain = self._get_domain() + return socket.gethostbyname(domain.split(':')[0]) + except socket.error: + return None + + def _run_command(self, cmd: List[str], timeout: int = None) -> Dict: + """Run a command and capture output""" + timeout = timeout or self.timeout + tool_name = cmd[0] if cmd else "unknown" + + result = { + "tool": tool_name, + "command": " ".join(cmd), + "success": False, + "stdout": "", + "stderr": "", + "exit_code": -1, + "timestamp": datetime.now().isoformat() + } + + # Check if tool exists + if not shutil.which(cmd[0]): + result["stderr"] = f"Tool '{cmd[0]}' not found. Please install it using 'install_tools' command." + logger.warning(f"Tool not found: {cmd[0]}") + return result + + try: + print(f"[*] Executing: {' '.join(cmd)}") + logger.info(f"Executing: {' '.join(cmd)}") + + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + + result["stdout"] = proc.stdout + result["stderr"] = proc.stderr + result["exit_code"] = proc.returncode + result["success"] = proc.returncode == 0 + + except subprocess.TimeoutExpired: + result["stderr"] = f"Command timed out after {timeout} seconds" + logger.warning(f"Timeout: {' '.join(cmd)}") + except Exception as e: + result["stderr"] = str(e) + logger.error(f"Error executing {cmd[0]}: {e}") + + self.scan_result.tools_executed.append(result) + self.scan_result.raw_outputs[tool_name] = result["stdout"] + return result + + def run_nmap_scan(self, ports: str = "1-1000", extra_args: List[str] = None) -> Dict: + """Run nmap port scan""" + domain = self._get_domain() + cmd = ["nmap", "-sV", "-sC", "-p", ports, "--open", domain] + if extra_args: + cmd.extend(extra_args) + + result = self._run_command(cmd) + + if result["success"]: + self._parse_nmap_output(result["stdout"]) + + return result + + def _parse_nmap_output(self, output: str): + """Parse nmap output for open ports""" + port_pattern = r"(\d+)/(\w+)\s+open\s+(\S+)\s*(.*)" + for match in re.finditer(port_pattern, output): + port_info = { + "port": int(match.group(1)), + "protocol": match.group(2), + "service": match.group(3), + "version": match.group(4).strip() + } + self.scan_result.open_ports.append(port_info) + print(f" [+] Found: {port_info['port']}/{port_info['protocol']} - {port_info['service']} {port_info['version']}") + + def run_nikto_scan(self) -> Dict: + """Run nikto web vulnerability scan""" + cmd = ["nikto", "-h", self.target, "-Format", "txt", "-nointeractive"] + result = self._run_command(cmd, timeout=600) + + if result["success"] or result["stdout"]: + self._parse_nikto_output(result["stdout"]) + + return result + + def _parse_nikto_output(self, output: str): + """Parse nikto output for vulnerabilities""" + vuln_patterns = [ + (r"OSVDB-\d+:.*", "Medium"), + (r"\+ (/[^\s]+).*SQL injection", "High"), + (r"\+ (/[^\s]+).*XSS", "High"), + (r"\+ The X-XSS-Protection header", "Low"), + (r"\+ The X-Content-Type-Options header", "Low"), + (r"\+ Server leaks", "Medium"), + (r"\+ Retrieved x-powered-by header", "Info"), + ] + + for line in output.split('\n'): + for pattern, severity in vuln_patterns: + if re.search(pattern, line, re.IGNORECASE): + vuln = Vulnerability( + title=line.strip()[:100], + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector="", + description=line.strip(), + affected_endpoint=self.target, + impact=f"{severity} severity finding detected by Nikto", + poc_request=f"GET {self.target} HTTP/1.1", + poc_response="See tool output", + poc_payload="N/A - Passive scan", + remediation="Review and fix the identified issue", + tool_output=line + ) + self.scan_result.vulnerabilities.append(vuln) + + def run_nuclei_scan(self, templates: str = None) -> Dict: + """Run nuclei vulnerability scan""" + cmd = ["nuclei", "-u", self.target, "-silent", "-nc", "-j"] + if templates: + cmd.extend(["-t", templates]) + + result = self._run_command(cmd, timeout=600) + + if result["stdout"]: + self._parse_nuclei_output(result["stdout"]) + + return result + + def _parse_nuclei_output(self, output: str): + """Parse nuclei JSON output for vulnerabilities""" + for line in output.strip().split('\n'): + if not line.strip(): + continue + try: + finding = json.loads(line) + severity = finding.get("info", {}).get("severity", "unknown").capitalize() + + vuln = Vulnerability( + title=finding.get("info", {}).get("name", "Unknown"), + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector=finding.get("info", {}).get("classification", {}).get("cvss-metrics", ""), + description=finding.get("info", {}).get("description", ""), + affected_endpoint=finding.get("matched-at", self.target), + impact=finding.get("info", {}).get("impact", f"{severity} severity vulnerability"), + poc_request=finding.get("curl-command", f"curl -X GET '{finding.get('matched-at', self.target)}'"), + poc_response=finding.get("response", "")[:500] if finding.get("response") else "See tool output", + poc_payload=finding.get("matcher-name", "Template-based detection"), + remediation=finding.get("info", {}).get("remediation", "Apply vendor patches"), + references=finding.get("info", {}).get("reference", []), + cwe_id=str(finding.get("info", {}).get("classification", {}).get("cwe-id", "")), + tool_output=json.dumps(finding, indent=2) + ) + self.scan_result.vulnerabilities.append(vuln) + print(f" [!] {severity}: {vuln.title} at {vuln.affected_endpoint}") + + except json.JSONDecodeError: + continue + + def run_sqlmap_scan(self, param: str = None) -> Dict: + """Run sqlmap SQL injection scan""" + cmd = ["sqlmap", "-u", self.target, "--batch", "--level=2", "--risk=2", + "--random-agent", "--threads=5", "--output-dir=/tmp/sqlmap_output"] + + if param: + cmd.extend(["--param", param]) + + result = self._run_command(cmd, timeout=600) + + if result["stdout"]: + self._parse_sqlmap_output(result["stdout"]) + + return result + + def _parse_sqlmap_output(self, output: str): + """Parse sqlmap output for SQL injection vulnerabilities""" + if "is vulnerable" in output.lower() or "injection" in output.lower(): + # Extract injection details + vuln_type = "Blind" if "blind" in output.lower() else "Error-based" + if "union" in output.lower(): + vuln_type = "UNION-based" + elif "time-based" in output.lower(): + vuln_type = "Time-based blind" + + # Extract payload + payload_match = re.search(r"Payload: (.+)", output) + payload = payload_match.group(1) if payload_match else "See tool output" + + vuln = Vulnerability( + title=f"SQL Injection ({vuln_type})", + severity="Critical", + cvss_score=9.8, + cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + description=f"SQL Injection vulnerability detected. Type: {vuln_type}. This allows an attacker to manipulate database queries.", + affected_endpoint=self.target, + impact="Complete database compromise. Attacker can read, modify, or delete data. Potential for remote code execution.", + poc_request=f"GET {self.target}?param={payload} HTTP/1.1\nHost: {self._get_domain()}\nUser-Agent: Mozilla/5.0", + poc_response="Database error or data disclosure in response", + poc_payload=payload, + remediation="Use parameterized queries/prepared statements. Implement input validation. Apply least privilege to database accounts.", + cwe_id="CWE-89", + references=["https://owasp.org/www-community/attacks/SQL_Injection"], + tool_output=output[:2000] + ) + self.scan_result.vulnerabilities.append(vuln) + print(f" [!!!] CRITICAL: SQL Injection found!") + + def run_ffuf_scan(self, wordlist: str = "/usr/share/wordlists/dirb/common.txt") -> Dict: + """Run ffuf directory/file bruteforce""" + target_url = self.target.rstrip('/') + "/FUZZ" + cmd = ["ffuf", "-u", target_url, "-w", wordlist, "-mc", "200,301,302,403", + "-o", "/tmp/ffuf_output.json", "-of", "json", "-t", "50"] + + result = self._run_command(cmd, timeout=300) + + # Parse output file if exists + if os.path.exists("/tmp/ffuf_output.json"): + try: + with open("/tmp/ffuf_output.json", "r") as f: + ffuf_data = json.load(f) + for res in ffuf_data.get("results", []): + print(f" [+] Found: {res.get('url')} (Status: {res.get('status')})") + except: + pass + + return result + + def run_curl_test(self, method: str = "GET", path: str = "/", headers: Dict = None, data: str = None) -> Dict: + """Run curl request and capture full request/response""" + url = self.target.rstrip('/') + path + cmd = ["curl", "-v", "-s", "-k", "-X", method, url] + + if headers: + for k, v in headers.items(): + cmd.extend(["-H", f"{k}: {v}"]) + + if data: + cmd.extend(["-d", data]) + + result = self._run_command(cmd) + return result + + def run_http_security_check(self) -> Dict: + """Check HTTP security headers""" + cmd = ["curl", "-s", "-I", "-k", self.target] + result = self._run_command(cmd) + + if result["success"]: + self._parse_security_headers(result["stdout"]) + + return result + + def _parse_security_headers(self, headers: str): + """Parse response headers for security issues""" + required_headers = { + "X-Frame-Options": ("Missing X-Frame-Options", "Medium", "Clickjacking protection"), + "X-Content-Type-Options": ("Missing X-Content-Type-Options", "Low", "MIME type sniffing protection"), + "X-XSS-Protection": ("Missing X-XSS-Protection", "Low", "XSS filter"), + "Strict-Transport-Security": ("Missing HSTS Header", "Medium", "HTTPS enforcement"), + "Content-Security-Policy": ("Missing Content-Security-Policy", "Medium", "XSS/injection protection"), + } + + headers_lower = headers.lower() + + for header, (title, severity, desc) in required_headers.items(): + if header.lower() not in headers_lower: + vuln = Vulnerability( + title=title, + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector="", + description=f"The {header} header is not set. This header provides {desc}.", + affected_endpoint=self.target, + impact=f"Missing {desc} could lead to attacks", + poc_request=f"curl -I {self.target}", + poc_response=headers[:500], + poc_payload="N/A - Header check", + remediation=f"Add the {header} header to all HTTP responses", + cwe_id="CWE-693" + ) + self.scan_result.vulnerabilities.append(vuln) + + def run_whatweb_scan(self) -> Dict: + """Run whatweb technology detection""" + cmd = ["whatweb", "-a", "3", "--color=never", self.target] + result = self._run_command(cmd) + + if result["stdout"]: + # Extract technologies + techs = re.findall(r'\[([^\]]+)\]', result["stdout"]) + self.scan_result.technologies.extend(techs[:20]) + print(f" [+] Technologies: {', '.join(techs[:10])}") + + return result + + def _severity_to_cvss(self, severity: str) -> float: + """Convert severity to CVSS score""" + mapping = { + "critical": 9.5, + "high": 7.5, + "medium": 5.5, + "low": 3.0, + "info": 0.0, + "unknown": 0.0 + } + return mapping.get(severity.lower(), 0.0) + + def run_full_scan(self) -> ScanResult: + """Run a complete pentest scan""" + print(f"\n{'='*60}") + print(f"[*] Starting Full Pentest Scan on: {self.target}") + print(f"{'='*60}\n") + + # Phase 1: Reconnaissance + print("[Phase 1] Reconnaissance") + print("-" * 40) + + print("[*] Running port scan...") + self.run_nmap_scan() + + print("\n[*] Running technology detection...") + self.run_whatweb_scan() + + print("\n[*] Checking security headers...") + self.run_http_security_check() + + # Phase 2: Vulnerability Scanning + print(f"\n[Phase 2] Vulnerability Scanning") + print("-" * 40) + + print("[*] Running Nuclei scan...") + self.run_nuclei_scan() + + print("\n[*] Running Nikto scan...") + self.run_nikto_scan() + + # Phase 3: Specific Tests + print(f"\n[Phase 3] Specific Vulnerability Tests") + print("-" * 40) + + print("[*] Testing for SQL Injection...") + self.run_sqlmap_scan() + + print("\n[*] Running directory enumeration...") + self.run_ffuf_scan() + + # Complete scan + self.scan_result.scan_completed = datetime.now().isoformat() + + print(f"\n{'='*60}") + print(f"[*] Scan Complete!") + print(f" - Tools Executed: {len(self.scan_result.tools_executed)}") + print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") + print(f" - Open Ports: {len(self.scan_result.open_ports)}") + print(f"{'='*60}\n") + + return self.scan_result + + def run_quick_scan(self) -> ScanResult: + """Run a quick scan with essential tools only""" + print(f"\n{'='*60}") + print(f"[*] Starting Quick Scan on: {self.target}") + print(f"{'='*60}\n") + + print("[*] Running port scan (top 100 ports)...") + self.run_nmap_scan(ports="1-100") + + print("\n[*] Checking security headers...") + self.run_http_security_check() + + print("\n[*] Running Nuclei scan...") + self.run_nuclei_scan() + + self.scan_result.scan_completed = datetime.now().isoformat() + + print(f"\n{'='*60}") + print(f"[*] Quick Scan Complete!") + print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") + print(f"{'='*60}\n") + + return self.scan_result + + def get_findings_summary(self) -> Dict: + """Get summary of findings""" + severity_count = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Info": 0} + + for vuln in self.scan_result.vulnerabilities: + sev = vuln.severity.capitalize() + if sev in severity_count: + severity_count[sev] += 1 + + return { + "target": self.target, + "total_vulnerabilities": len(self.scan_result.vulnerabilities), + "severity_breakdown": severity_count, + "open_ports": len(self.scan_result.open_ports), + "technologies": self.scan_result.technologies, + "tools_executed": len(self.scan_result.tools_executed) + } + + def to_dict(self) -> Dict: + """Convert scan results to dictionary""" + return { + "target": self.scan_result.target, + "scan_started": self.scan_result.scan_started, + "scan_completed": self.scan_result.scan_completed, + "tools_executed": self.scan_result.tools_executed, + "vulnerabilities": [asdict(v) for v in self.scan_result.vulnerabilities], + "open_ports": self.scan_result.open_ports, + "technologies": self.scan_result.technologies, + "summary": self.get_findings_summary() + } diff --git a/core/report_generator.py b/core/report_generator.py new file mode 100644 index 0000000..4c2c3c1 --- /dev/null +++ b/core/report_generator.py @@ -0,0 +1,639 @@ +#!/usr/bin/env python3 +""" +Professional Pentest Report Generator +Generates detailed reports with PoCs, CVSS scores, requests/responses +""" + +import json +import os +from datetime import datetime +from typing import Dict, List, Any +import html +import logging + +logger = logging.getLogger(__name__) + + +class ReportGenerator: + """Generates professional penetration testing reports""" + + def __init__(self, scan_results: Dict, llm_analysis: str = ""): + self.scan_results = scan_results + self.llm_analysis = llm_analysis + self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + def _get_severity_color(self, severity: str) -> str: + """Get color for severity level""" + colors = { + "critical": "#dc3545", + "high": "#fd7e14", + "medium": "#ffc107", + "low": "#17a2b8", + "info": "#6c757d" + } + return colors.get(severity.lower(), "#6c757d") + + def _get_severity_badge(self, severity: str) -> str: + """Get HTML badge for severity""" + color = self._get_severity_color(severity) + return f'{severity.upper()}' + + def _escape_html(self, text: str) -> str: + """Escape HTML characters""" + if not text: + return "" + return html.escape(str(text)) + + def _format_code_block(self, code: str, language: str = "") -> str: + """Format code block with syntax highlighting""" + escaped = self._escape_html(code) + return f'
{escaped}
' + + def generate_executive_summary(self) -> str: + """Generate executive summary section""" + summary = self.scan_results.get("summary", {}) + severity = summary.get("severity_breakdown", {}) + + total = summary.get("total_vulnerabilities", 0) + critical = severity.get("Critical", 0) + high = severity.get("High", 0) + medium = severity.get("Medium", 0) + low = severity.get("Low", 0) + + risk_level = "Critical" if critical > 0 else "High" if high > 0 else "Medium" if medium > 0 else "Low" + + return f""" +
+
+

Executive Summary

+
+
+
+
+

Assessment Overview

+ + + + + +
Target:{self._escape_html(self.scan_results.get('target', 'N/A'))}
Scan Started:{self.scan_results.get('scan_started', 'N/A')}
Scan Completed:{self.scan_results.get('scan_completed', 'N/A')}
Overall Risk Level:{self._get_severity_badge(risk_level)}
+
+
+

Findings Summary

+
+
{critical} Critical
+
{high} High
+
{medium} Medium
+
{low} Low
+
+

Total Vulnerabilities: {total}

+

Open Ports Found: {summary.get('open_ports', 0)}

+

Tools Executed: {summary.get('tools_executed', 0)}

+
+
+
+
+ """ + + def generate_vulnerability_card(self, vuln: Dict, index: int) -> str: + """Generate HTML card for a single vulnerability""" + severity = vuln.get("severity", "Unknown") + color = self._get_severity_color(severity) + + # Build references list + refs_html = "" + if vuln.get("references"): + refs_html = "" + + return f""" +
+
+
+

{self._escape_html(vuln.get('title', 'Unknown Vulnerability'))}

+
+ {self._get_severity_badge(severity)} + CVSS: {vuln.get('cvss_score', 'N/A')} + {f'CWE: {vuln.get("cwe_id")}' if vuln.get('cwe_id') else ''} +
+
+
+ +
+
+

Description

+

{self._escape_html(vuln.get('description', 'No description available'))}

+
+ +
+

Affected Endpoint

+ {self._escape_html(vuln.get('affected_endpoint', 'N/A'))} +
+ +
+

Impact

+

{self._escape_html(vuln.get('impact', 'Impact not assessed'))}

+
+ +
+

Proof of Concept (PoC)

+ +
+
Request
+ {self._format_code_block(vuln.get('poc_request', 'N/A'), 'http')} +
+ +
+
Payload
+ {self._format_code_block(vuln.get('poc_payload', 'N/A'), 'text')} +
+ +
+
Response
+ {self._format_code_block(vuln.get('poc_response', 'N/A')[:1000], 'http')} +
+
+ + {f'''
+

CVSS Vector

+ {self._escape_html(vuln.get('cvss_vector', 'N/A'))} +
''' if vuln.get('cvss_vector') else ''} + +
+

Remediation

+

{self._escape_html(vuln.get('remediation', 'Consult vendor documentation for patches'))}

+
+ + {f'''
+

References

+ {refs_html} +
''' if refs_html else ''} + + {f'''
+

Raw Tool Output

+ {self._format_code_block(vuln.get('tool_output', '')[:2000], 'text')} +
''' if vuln.get('tool_output') else ''} +
+
+ """ + + def generate_open_ports_section(self) -> str: + """Generate open ports section""" + ports = self.scan_results.get("open_ports", []) + if not ports: + return "" + + rows = "" + for port in ports: + rows += f""" + + {port.get('port', 'N/A')} + {port.get('protocol', 'N/A')} + {self._escape_html(port.get('service', 'N/A'))} + {self._escape_html(port.get('version', 'N/A'))} + + """ + + return f""" +
+
+

Open Ports & Services

+
+
+ + + + + + + + + + + {rows} + +
PortProtocolServiceVersion
+
+
+ """ + + def generate_tools_executed_section(self) -> str: + """Generate tools executed section""" + tools = self.scan_results.get("tools_executed", []) + if not tools: + return "" + + rows = "" + for tool in tools: + status = "Success" if tool.get("success") else "Failed" + status_class = "text-success" if tool.get("success") else "text-danger" + rows += f""" + + {self._escape_html(tool.get('tool', 'N/A'))} + {self._escape_html(tool.get('command', 'N/A')[:100])} + {status} + {tool.get('timestamp', 'N/A')} + + """ + + return f""" +
+
+

Tools Executed

+
+
+ + + + + + + + + + + {rows} + +
ToolCommandStatusTimestamp
+
+
+ """ + + def generate_llm_analysis_section(self) -> str: + """Generate AI analysis section""" + if not self.llm_analysis: + return "" + + import mistune + analysis_html = mistune.html(self.llm_analysis) + + return f""" +
+
+

AI Security Analysis

+
+
+ {analysis_html} +
+
+ """ + + def generate_html_report(self) -> str: + """Generate complete HTML report""" + vulnerabilities = self.scan_results.get("vulnerabilities", []) + + # Sort vulnerabilities by severity + severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Info": 4} + vulnerabilities.sort(key=lambda x: severity_order.get(x.get("severity", "Info").capitalize(), 5)) + + vuln_cards = "" + for i, vuln in enumerate(vulnerabilities, 1): + vuln_cards += self.generate_vulnerability_card(vuln, i) + + # Table of contents + toc_items = "" + for i, vuln in enumerate(vulnerabilities, 1): + severity = vuln.get("severity", "Unknown") + color = self._get_severity_color(severity) + toc_items += f'
  • [{severity.upper()}] {self._escape_html(vuln.get("title", "Unknown")[:50])}
  • ' + + html = f""" + + + + + + NeuroSploitv2 - Penetration Test Report + + + + + +
    +
    +

    NeuroSploitv2

    +

    Penetration Test Report

    +

    Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

    +
    + + {self.generate_executive_summary()} + +
    +

    Table of Contents - Vulnerabilities ({len(vulnerabilities)})

    + +
    + + {self.generate_open_ports_section()} + + {self.generate_tools_executed_section()} + +
    +
    +

    Vulnerability Details

    +
    +
    + {vuln_cards if vuln_cards else '

    No vulnerabilities found during the assessment.

    '} +
    +
    + + {self.generate_llm_analysis_section()} + + +
    + + + + + + """ + + return html + + def save_report(self, output_dir: str = "reports") -> str: + """Save HTML report to file""" + os.makedirs(output_dir, exist_ok=True) + + filename = f"pentest_report_{self.timestamp}.html" + filepath = os.path.join(output_dir, filename) + + html_content = self.generate_html_report() + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(html_content) + + logger.info(f"Report saved to: {filepath}") + return filepath + + def save_json_report(self, output_dir: str = "results") -> str: + """Save JSON report to file""" + os.makedirs(output_dir, exist_ok=True) + + filename = f"pentest_results_{self.timestamp}.json" + filepath = os.path.join(output_dir, filename) + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(self.scan_results, f, indent=2, default=str) + + logger.info(f"JSON results saved to: {filepath}") + return filepath diff --git a/core/tool_installer.py b/core/tool_installer.py new file mode 100644 index 0000000..1f19d7b --- /dev/null +++ b/core/tool_installer.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +""" +Tool Installer - Installs required pentest tools for NeuroSploitv2 +""" + +import subprocess +import shutil +import os +import sys +import logging +from typing import Dict, List, Tuple + +logger = logging.getLogger(__name__) + +# Tool definitions with installation commands for different package managers +PENTEST_TOOLS = { + "nmap": { + "description": "Network scanner and port mapper", + "check_cmd": "nmap --version", + "install": { + "apt": "sudo apt-get install -y nmap", + "yum": "sudo yum install -y nmap", + "dnf": "sudo dnf install -y nmap", + "brew": "brew install nmap", + "pacman": "sudo pacman -S --noconfirm nmap" + }, + "binary": "nmap" + }, + "sqlmap": { + "description": "SQL injection detection and exploitation", + "check_cmd": "sqlmap --version", + "install": { + "apt": "sudo apt-get install -y sqlmap", + "yum": "sudo pip3 install sqlmap", + "dnf": "sudo dnf install -y sqlmap", + "brew": "brew install sqlmap", + "pacman": "sudo pacman -S --noconfirm sqlmap", + "pip": "pip3 install sqlmap" + }, + "binary": "sqlmap" + }, + "nikto": { + "description": "Web server vulnerability scanner", + "check_cmd": "nikto -Version", + "install": { + "apt": "sudo apt-get install -y nikto", + "yum": "sudo yum install -y nikto", + "dnf": "sudo dnf install -y nikto", + "brew": "brew install nikto", + "pacman": "sudo pacman -S --noconfirm nikto" + }, + "binary": "nikto" + }, + "gobuster": { + "description": "Directory/file & DNS busting tool", + "check_cmd": "gobuster version", + "install": { + "apt": "sudo apt-get install -y gobuster", + "brew": "brew install gobuster", + "go": "go install github.com/OJ/gobuster/v3@latest" + }, + "binary": "gobuster" + }, + "nuclei": { + "description": "Fast vulnerability scanner based on templates", + "check_cmd": "nuclei -version", + "install": { + "go": "go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest", + "brew": "brew install nuclei" + }, + "binary": "nuclei" + }, + "subfinder": { + "description": "Subdomain discovery tool", + "check_cmd": "subfinder -version", + "install": { + "go": "go install -v github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest", + "brew": "brew install subfinder" + }, + "binary": "subfinder" + }, + "httpx": { + "description": "HTTP toolkit for probing", + "check_cmd": "httpx -version", + "install": { + "go": "go install -v github.com/projectdiscovery/httpx/cmd/httpx@latest", + "brew": "brew install httpx" + }, + "binary": "httpx" + }, + "ffuf": { + "description": "Fast web fuzzer", + "check_cmd": "ffuf -V", + "install": { + "apt": "sudo apt-get install -y ffuf", + "go": "go install github.com/ffuf/ffuf/v2@latest", + "brew": "brew install ffuf" + }, + "binary": "ffuf" + }, + "hydra": { + "description": "Network login cracker", + "check_cmd": "hydra -h", + "install": { + "apt": "sudo apt-get install -y hydra", + "yum": "sudo yum install -y hydra", + "dnf": "sudo dnf install -y hydra", + "brew": "brew install hydra", + "pacman": "sudo pacman -S --noconfirm hydra" + }, + "binary": "hydra" + }, + "whatweb": { + "description": "Web technology identifier", + "check_cmd": "whatweb --version", + "install": { + "apt": "sudo apt-get install -y whatweb", + "brew": "brew install whatweb", + "gem": "sudo gem install whatweb" + }, + "binary": "whatweb" + }, + "wpscan": { + "description": "WordPress vulnerability scanner", + "check_cmd": "wpscan --version", + "install": { + "apt": "sudo apt-get install -y wpscan", + "brew": "brew install wpscan", + "gem": "sudo gem install wpscan" + }, + "binary": "wpscan" + }, + "curl": { + "description": "HTTP client for requests", + "check_cmd": "curl --version", + "install": { + "apt": "sudo apt-get install -y curl", + "yum": "sudo yum install -y curl", + "dnf": "sudo dnf install -y curl", + "brew": "brew install curl", + "pacman": "sudo pacman -S --noconfirm curl" + }, + "binary": "curl" + }, + "jq": { + "description": "JSON processor for parsing outputs", + "check_cmd": "jq --version", + "install": { + "apt": "sudo apt-get install -y jq", + "yum": "sudo yum install -y jq", + "dnf": "sudo dnf install -y jq", + "brew": "brew install jq", + "pacman": "sudo pacman -S --noconfirm jq" + }, + "binary": "jq" + }, + "dirsearch": { + "description": "Web path discovery tool", + "check_cmd": "dirsearch --version", + "install": { + "pip": "pip3 install dirsearch" + }, + "binary": "dirsearch" + }, + "wafw00f": { + "description": "Web Application Firewall detection", + "check_cmd": "wafw00f -h", + "install": { + "pip": "pip3 install wafw00f" + }, + "binary": "wafw00f" + } +} + + +class ToolInstaller: + """Manages installation of pentest tools""" + + def __init__(self): + self.package_manager = self._detect_package_manager() + + def _detect_package_manager(self) -> str: + """Detect the system's package manager""" + managers = [ + ("apt-get", "apt"), + ("dnf", "dnf"), + ("yum", "yum"), + ("pacman", "pacman"), + ("brew", "brew") + ] + + for cmd, name in managers: + if shutil.which(cmd): + return name + + # Fallback to pip for Python tools + return "pip" + + def check_tool_installed(self, tool_name: str) -> Tuple[bool, str]: + """Check if a tool is installed and return its path""" + tool_info = PENTEST_TOOLS.get(tool_name) + if not tool_info: + return False, "" + + binary = tool_info.get("binary", tool_name) + path = shutil.which(binary) + + if path: + return True, path + + # Check common paths + common_paths = [ + f"/usr/bin/{binary}", + f"/usr/local/bin/{binary}", + f"/opt/{binary}/{binary}", + os.path.expanduser(f"~/go/bin/{binary}"), + f"/snap/bin/{binary}" + ] + + for p in common_paths: + if os.path.isfile(p) and os.access(p, os.X_OK): + return True, p + + return False, "" + + def get_tools_status(self) -> Dict[str, Dict]: + """Get installation status of all tools""" + status = {} + for tool_name, tool_info in PENTEST_TOOLS.items(): + installed, path = self.check_tool_installed(tool_name) + status[tool_name] = { + "installed": installed, + "path": path, + "description": tool_info["description"] + } + return status + + def install_tool(self, tool_name: str) -> Tuple[bool, str]: + """Install a specific tool""" + if tool_name not in PENTEST_TOOLS: + return False, f"Unknown tool: {tool_name}" + + tool_info = PENTEST_TOOLS[tool_name] + install_cmds = tool_info.get("install", {}) + + # Try package manager first + if self.package_manager in install_cmds: + cmd = install_cmds[self.package_manager] + elif "pip" in install_cmds: + cmd = install_cmds["pip"] + elif "go" in install_cmds and shutil.which("go"): + cmd = install_cmds["go"] + elif "gem" in install_cmds and shutil.which("gem"): + cmd = install_cmds["gem"] + else: + return False, f"No installation method available for {tool_name} on this system" + + print(f"[*] Installing {tool_name}...") + print(f" Command: {cmd}") + + try: + result = subprocess.run( + cmd, + shell=True, + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode == 0: + # Verify installation + installed, path = self.check_tool_installed(tool_name) + if installed: + return True, f"Successfully installed {tool_name} at {path}" + else: + return True, f"Installation completed but binary not found in PATH" + else: + return False, f"Installation failed: {result.stderr}" + + except subprocess.TimeoutExpired: + return False, "Installation timed out" + except Exception as e: + return False, f"Installation error: {str(e)}" + + def install_all_tools(self) -> Dict[str, Tuple[bool, str]]: + """Install all pentest tools""" + results = {} + for tool_name in PENTEST_TOOLS: + installed, path = self.check_tool_installed(tool_name) + if installed: + results[tool_name] = (True, f"Already installed at {path}") + else: + results[tool_name] = self.install_tool(tool_name) + return results + + def install_essential_tools(self) -> Dict[str, Tuple[bool, str]]: + """Install only essential tools for basic pentesting""" + essential = ["nmap", "sqlmap", "nikto", "nuclei", "curl", "jq", "httpx", "ffuf"] + results = {} + for tool_name in essential: + installed, path = self.check_tool_installed(tool_name) + if installed: + results[tool_name] = (True, f"Already installed at {path}") + else: + results[tool_name] = self.install_tool(tool_name) + return results + + +def print_tools_menu(): + """Print the tools installation menu""" + installer = ToolInstaller() + status = installer.get_tools_status() + + print("\n" + "="*70) + print(" PENTEST TOOLS INSTALLATION MANAGER") + print("="*70) + print(f"\nDetected Package Manager: {installer.package_manager}") + print("\nAvailable Tools:") + print("-"*70) + + for i, (tool_name, info) in enumerate(status.items(), 1): + status_icon = "[+]" if info["installed"] else "[-]" + status_text = "Installed" if info["installed"] else "Not Installed" + print(f" {i:2}. {status_icon} {tool_name:15} - {info['description'][:40]}") + + print("-"*70) + print("\nOptions:") + print(" A - Install ALL tools") + print(" E - Install ESSENTIAL tools only (nmap, sqlmap, nikto, nuclei, etc.)") + print(" 1-N - Install specific tool by number") + print(" Q - Return to main menu") + print("-"*70) + + return installer, list(status.keys()) + + +def run_installer_menu(): + """Run the interactive installer menu""" + while True: + installer, tool_list = print_tools_menu() + + choice = input("\nSelect option: ").strip().upper() + + if choice == 'Q': + break + elif choice == 'A': + print("\n[*] Installing all tools...") + results = installer.install_all_tools() + for tool, (success, msg) in results.items(): + icon = "[+]" if success else "[!]" + print(f" {icon} {tool}: {msg}") + input("\nPress Enter to continue...") + elif choice == 'E': + print("\n[*] Installing essential tools...") + results = installer.install_essential_tools() + for tool, (success, msg) in results.items(): + icon = "[+]" if success else "[!]" + print(f" {icon} {tool}: {msg}") + input("\nPress Enter to continue...") + else: + try: + idx = int(choice) - 1 + if 0 <= idx < len(tool_list): + tool_name = tool_list[idx] + success, msg = installer.install_tool(tool_name) + icon = "[+]" if success else "[!]" + print(f"\n {icon} {msg}") + input("\nPress Enter to continue...") + else: + print("[!] Invalid selection") + except ValueError: + print("[!] Invalid input") + + +if __name__ == "__main__": + run_installer_menu()