From fd6ef4d25879112476a37ca569eb358633d362ad Mon Sep 17 00:00:00 2001 From: Joas A Santos <34966120+CyberSecurityUP@users.noreply.github.com> Date: Fri, 9 Jan 2026 22:50:30 -0300 Subject: [PATCH] Add files via upload --- core/__init__.py | 1 + core/llm_manager.py | 365 +++++++++++++++++----- core/pentest_executor.py | 504 ++++++++++++++++++++++++++++++ core/report_generator.py | 639 +++++++++++++++++++++++++++++++++++++++ core/tool_installer.py | 376 +++++++++++++++++++++++ 5 files changed, 1814 insertions(+), 71 deletions(-) create mode 100644 core/pentest_executor.py create mode 100644 core/report_generator.py create mode 100644 core/tool_installer.py diff --git a/core/__init__.py b/core/__init__.py index e69de29..945c9b4 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -0,0 +1 @@ +. \ No newline at end of file diff --git a/core/llm_manager.py b/core/llm_manager.py index 9b2adae..d5d2a1f 100644 --- a/core/llm_manager.py +++ b/core/llm_manager.py @@ -7,11 +7,17 @@ Supports: Claude, GPT, Gemini, Ollama, and custom models import os import json import subprocess +import time from typing import Dict, List, Optional, Any import logging import requests -from pathlib import Path # Added for Path -import re # Added for regex operations +from pathlib import Path +import re + +# Retry configuration +MAX_RETRIES = 3 +RETRY_DELAY = 1.0 # seconds +RETRY_MULTIPLIER = 2.0 logger = logging.getLogger(__name__) @@ -58,12 +64,12 @@ class LLMManager: return api_key_config def _load_all_prompts(self) -> Dict: - """Load prompts from both JSON library and Markdown library files.""" + """Load prompts from JSON library and Markdown files (both prompts/ and prompts/md_library/).""" all_prompts = { "json_prompts": {}, "md_prompts": {} } - + # Load from JSON library if self.json_prompts_file_path.exists(): try: @@ -75,32 +81,45 @@ class LLMManager: else: logger.warning(f"JSON prompts file not found at {self.json_prompts_file_path}. Some AI functionalities might be limited.") - # Load from Markdown library - if self.md_prompts_dir_path.is_dir(): - for md_file in self.md_prompts_dir_path.glob("*.md"): - try: - content = md_file.read_text() - prompt_name = md_file.stem # Use filename as prompt name + # Load from both prompts/ root and prompts/md_library/ + prompts_root = Path("prompts") + md_dirs = [prompts_root, self.md_prompts_dir_path] - user_prompt_match = re.search(r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)", content, re.DOTALL) - system_prompt_match = re.search(r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)", content, re.DOTALL) + for md_dir in md_dirs: + if md_dir.is_dir(): + for md_file in md_dir.glob("*.md"): + try: + content = md_file.read_text() + prompt_name = md_file.stem # Use filename as prompt name - user_prompt = user_prompt_match.group(1).strip() if user_prompt_match else "" - system_prompt = system_prompt_match.group(1).strip() if system_prompt_match else "" + # Skip if already loaded (md_library has priority) + if prompt_name in all_prompts["md_prompts"]: + continue - if user_prompt or system_prompt: - all_prompts["md_prompts"][prompt_name] = { - "user_prompt": user_prompt, - "system_prompt": system_prompt - } - else: - logger.warning(f"No valid User or System Prompt found in {md_file.name}. Skipping.") + # Try structured format first (## User Prompt / ## System Prompt) + user_prompt_match = re.search(r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)", content, re.DOTALL) + system_prompt_match = re.search(r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)", content, re.DOTALL) - except Exception as e: - logger.error(f"Error loading prompt from {md_file.name}: {e}") - logger.info(f"Loaded {len(all_prompts['md_prompts'])} prompts from Markdown library.") - else: - logger.warning(f"Markdown prompts directory not found at {self.md_prompts_dir_path}. Some AI functionalities might be limited.") + user_prompt = user_prompt_match.group(1).strip() if user_prompt_match else "" + system_prompt = system_prompt_match.group(1).strip() if system_prompt_match else "" + + # If no structured format, use entire content as system_prompt + if not user_prompt and not system_prompt: + system_prompt = content.strip() + user_prompt = "" # Will be filled with user input at runtime + logger.debug(f"Loaded {md_file.name} as full-content prompt") + + if user_prompt or system_prompt: + all_prompts["md_prompts"][prompt_name] = { + "user_prompt": user_prompt, + "system_prompt": system_prompt + } + logger.debug(f"Loaded prompt: {prompt_name}") + + except Exception as e: + logger.error(f"Error loading prompt from {md_file.name}: {e}") + + logger.info(f"Loaded {len(all_prompts['md_prompts'])} prompts from Markdown files.") return all_prompts @@ -233,63 +252,267 @@ Identify any potential hallucinations, inconsistencies, or areas where the respo self.hallucination_mitigation_strategy = original_mitigation_state # Restore original state def _generate_claude(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using Claude API""" - import anthropic - - client = anthropic.Anthropic(api_key=self.api_key) - - messages = [{"role": "user", "content": prompt}] - - response = client.messages.create( - model=self.model, - max_tokens=self.max_tokens, - temperature=self.temperature, - system=system_prompt or "", - messages=messages - ) - - return response.content[0].text + """Generate using Claude API with requests (bypasses httpx/SSL issues on macOS)""" + if not self.api_key: + raise ValueError("ANTHROPIC_API_KEY not set. Please set the environment variable or configure in config.yaml") + + url = "https://api.anthropic.com/v1/messages" + headers = { + "x-api-key": self.api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json" + } + + data = { + "model": self.model, + "max_tokens": self.max_tokens, + "temperature": self.temperature, + "messages": [{"role": "user", "content": prompt}] + } + + if system_prompt: + data["system"] = system_prompt + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"Claude API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["content"][0]["text"] + + elif response.status_code == 401: + logger.error("Claude API authentication failed. Check your ANTHROPIC_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"Claude API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"Claude API server error (attempt {attempt + 1}/{MAX_RETRIES}): {response.status_code}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"Claude API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"Claude API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"Claude API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"Claude API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to Claude API after {MAX_RETRIES} attempts: {last_error}") def _generate_gpt(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using OpenAI GPT API""" - import openai - - client = openai.OpenAI(api_key=self.api_key) - + """Generate using OpenAI GPT API with requests (bypasses SDK issues)""" + if not self.api_key: + raise ValueError("OPENAI_API_KEY not set. Please set the environment variable or configure in config.yaml") + + url = "https://api.openai.com/v1/chat/completions" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json" + } + messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) - - response = client.chat.completions.create( - model=self.model, - messages=messages, - temperature=self.temperature, - max_tokens=self.max_tokens - ) - - return response.choices[0].message.content + + data = { + "model": self.model, + "messages": messages, + "temperature": self.temperature, + "max_tokens": self.max_tokens + } + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"OpenAI API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["choices"][0]["message"]["content"] + + elif response.status_code == 401: + logger.error("OpenAI API authentication failed. Check your OPENAI_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"OpenAI API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"OpenAI API server error (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"OpenAI API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"OpenAI API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"OpenAI API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"OpenAI API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to OpenAI API after {MAX_RETRIES} attempts: {last_error}") def _generate_gemini(self, prompt: str, system_prompt: Optional[str] = None) -> str: - """Generate using Google Gemini API""" - import google.generativeai as genai - - genai.configure(api_key=self.api_key) - model = genai.GenerativeModel(self.model) - + """Generate using Google Gemini API with requests (bypasses SDK issues)""" + if not self.api_key: + raise ValueError("GOOGLE_API_KEY not set. Please set the environment variable or configure in config.yaml") + + # Use v1beta for generateContent endpoint + url = f"https://generativelanguage.googleapis.com/v1beta/models/{self.model}:generateContent?key={self.api_key}" + headers = { + "Content-Type": "application/json" + } + full_prompt = prompt if system_prompt: full_prompt = f"{system_prompt}\n\n{prompt}" - - response = model.generate_content( - full_prompt, - generation_config={ - 'temperature': self.temperature, - 'max_output_tokens': self.max_tokens, + + data = { + "contents": [{"parts": [{"text": full_prompt}]}], + "generationConfig": { + "temperature": self.temperature, + "maxOutputTokens": self.max_tokens } - ) - - return response.text + } + + last_error = None + for attempt in range(MAX_RETRIES): + try: + logger.debug(f"Gemini API request attempt {attempt + 1}/{MAX_RETRIES}") + response = requests.post( + url, + headers=headers, + json=data, + timeout=120 + ) + + if response.status_code == 200: + result = response.json() + return result["candidates"][0]["content"]["parts"][0]["text"] + + elif response.status_code == 401 or response.status_code == 403: + logger.error("Gemini API authentication failed. Check your GOOGLE_API_KEY") + raise ValueError(f"Invalid API key: {response.text}") + + elif response.status_code == 429: + last_error = f"Rate limit: {response.text}" + logger.warning(f"Gemini API rate limit hit (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** (attempt + 1)) + logger.info(f"Rate limited. Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + elif response.status_code >= 500: + last_error = f"Server error {response.status_code}: {response.text}" + logger.warning(f"Gemini API server error (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + else: + logger.error(f"Gemini API error: {response.status_code} - {response.text}") + raise ValueError(f"API error {response.status_code}: {response.text}") + + except requests.exceptions.Timeout as e: + last_error = e + logger.warning(f"Gemini API timeout (attempt {attempt + 1}/{MAX_RETRIES})") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.ConnectionError as e: + last_error = e + logger.warning(f"Gemini API connection error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + except requests.exceptions.RequestException as e: + last_error = e + logger.warning(f"Gemini API request error (attempt {attempt + 1}/{MAX_RETRIES}): {e}") + if attempt < MAX_RETRIES - 1: + sleep_time = RETRY_DELAY * (RETRY_MULTIPLIER ** attempt) + logger.info(f"Retrying in {sleep_time:.1f}s...") + time.sleep(sleep_time) + + raise ConnectionError(f"Failed to connect to Gemini API after {MAX_RETRIES} attempts: {last_error}") def _generate_gemini_cli(self, prompt: str, system_prompt: Optional[str] = None) -> str: """Generate using Gemini CLI""" diff --git a/core/pentest_executor.py b/core/pentest_executor.py new file mode 100644 index 0000000..c1f078d --- /dev/null +++ b/core/pentest_executor.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 +""" +Pentest Executor - Executes real pentest tools and captures outputs for PoC generation +""" + +import subprocess +import shutil +import json +import re +import os +import logging +import socket +import urllib.parse +from typing import Dict, List, Optional, Any +from datetime import datetime +from dataclasses import dataclass, field, asdict + +logger = logging.getLogger(__name__) + + +@dataclass +class Vulnerability: + """Represents a discovered vulnerability with PoC""" + title: str + severity: str # Critical, High, Medium, Low, Info + cvss_score: float + cvss_vector: str + description: str + affected_endpoint: str + impact: str + poc_request: str + poc_response: str + poc_payload: str + remediation: str + references: List[str] = field(default_factory=list) + cwe_id: str = "" + tool_output: str = "" + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +@dataclass +class ScanResult: + """Contains all scan results and findings""" + target: str + scan_started: str + scan_completed: str = "" + tools_executed: List[Dict] = field(default_factory=list) + vulnerabilities: List[Vulnerability] = field(default_factory=list) + open_ports: List[Dict] = field(default_factory=list) + technologies: List[str] = field(default_factory=list) + raw_outputs: Dict[str, str] = field(default_factory=dict) + + +class PentestExecutor: + """Executes real pentest tools and captures outputs""" + + def __init__(self, target: str, config: Dict = None): + self.target = self._normalize_target(target) + self.config = config or {} + self.scan_result = ScanResult( + target=self.target, + scan_started=datetime.now().isoformat() + ) + self.timeout = 300 # 5 minutes default timeout + + def _normalize_target(self, target: str) -> str: + """Normalize target URL/IP""" + target = target.strip() + if not target.startswith(('http://', 'https://')): + # Check if it's an IP + try: + socket.inet_aton(target.split('/')[0].split(':')[0]) + return target # It's an IP + except socket.error: + # Assume it's a domain + return f"https://{target}" + return target + + def _get_domain(self) -> str: + """Extract domain from target""" + parsed = urllib.parse.urlparse(self.target) + return parsed.netloc or parsed.path.split('/')[0] + + def _get_ip(self) -> Optional[str]: + """Resolve target to IP""" + try: + domain = self._get_domain() + return socket.gethostbyname(domain.split(':')[0]) + except socket.error: + return None + + def _run_command(self, cmd: List[str], timeout: int = None) -> Dict: + """Run a command and capture output""" + timeout = timeout or self.timeout + tool_name = cmd[0] if cmd else "unknown" + + result = { + "tool": tool_name, + "command": " ".join(cmd), + "success": False, + "stdout": "", + "stderr": "", + "exit_code": -1, + "timestamp": datetime.now().isoformat() + } + + # Check if tool exists + if not shutil.which(cmd[0]): + result["stderr"] = f"Tool '{cmd[0]}' not found. Please install it using 'install_tools' command." + logger.warning(f"Tool not found: {cmd[0]}") + return result + + try: + print(f"[*] Executing: {' '.join(cmd)}") + logger.info(f"Executing: {' '.join(cmd)}") + + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + + result["stdout"] = proc.stdout + result["stderr"] = proc.stderr + result["exit_code"] = proc.returncode + result["success"] = proc.returncode == 0 + + except subprocess.TimeoutExpired: + result["stderr"] = f"Command timed out after {timeout} seconds" + logger.warning(f"Timeout: {' '.join(cmd)}") + except Exception as e: + result["stderr"] = str(e) + logger.error(f"Error executing {cmd[0]}: {e}") + + self.scan_result.tools_executed.append(result) + self.scan_result.raw_outputs[tool_name] = result["stdout"] + return result + + def run_nmap_scan(self, ports: str = "1-1000", extra_args: List[str] = None) -> Dict: + """Run nmap port scan""" + domain = self._get_domain() + cmd = ["nmap", "-sV", "-sC", "-p", ports, "--open", domain] + if extra_args: + cmd.extend(extra_args) + + result = self._run_command(cmd) + + if result["success"]: + self._parse_nmap_output(result["stdout"]) + + return result + + def _parse_nmap_output(self, output: str): + """Parse nmap output for open ports""" + port_pattern = r"(\d+)/(\w+)\s+open\s+(\S+)\s*(.*)" + for match in re.finditer(port_pattern, output): + port_info = { + "port": int(match.group(1)), + "protocol": match.group(2), + "service": match.group(3), + "version": match.group(4).strip() + } + self.scan_result.open_ports.append(port_info) + print(f" [+] Found: {port_info['port']}/{port_info['protocol']} - {port_info['service']} {port_info['version']}") + + def run_nikto_scan(self) -> Dict: + """Run nikto web vulnerability scan""" + cmd = ["nikto", "-h", self.target, "-Format", "txt", "-nointeractive"] + result = self._run_command(cmd, timeout=600) + + if result["success"] or result["stdout"]: + self._parse_nikto_output(result["stdout"]) + + return result + + def _parse_nikto_output(self, output: str): + """Parse nikto output for vulnerabilities""" + vuln_patterns = [ + (r"OSVDB-\d+:.*", "Medium"), + (r"\+ (/[^\s]+).*SQL injection", "High"), + (r"\+ (/[^\s]+).*XSS", "High"), + (r"\+ The X-XSS-Protection header", "Low"), + (r"\+ The X-Content-Type-Options header", "Low"), + (r"\+ Server leaks", "Medium"), + (r"\+ Retrieved x-powered-by header", "Info"), + ] + + for line in output.split('\n'): + for pattern, severity in vuln_patterns: + if re.search(pattern, line, re.IGNORECASE): + vuln = Vulnerability( + title=line.strip()[:100], + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector="", + description=line.strip(), + affected_endpoint=self.target, + impact=f"{severity} severity finding detected by Nikto", + poc_request=f"GET {self.target} HTTP/1.1", + poc_response="See tool output", + poc_payload="N/A - Passive scan", + remediation="Review and fix the identified issue", + tool_output=line + ) + self.scan_result.vulnerabilities.append(vuln) + + def run_nuclei_scan(self, templates: str = None) -> Dict: + """Run nuclei vulnerability scan""" + cmd = ["nuclei", "-u", self.target, "-silent", "-nc", "-j"] + if templates: + cmd.extend(["-t", templates]) + + result = self._run_command(cmd, timeout=600) + + if result["stdout"]: + self._parse_nuclei_output(result["stdout"]) + + return result + + def _parse_nuclei_output(self, output: str): + """Parse nuclei JSON output for vulnerabilities""" + for line in output.strip().split('\n'): + if not line.strip(): + continue + try: + finding = json.loads(line) + severity = finding.get("info", {}).get("severity", "unknown").capitalize() + + vuln = Vulnerability( + title=finding.get("info", {}).get("name", "Unknown"), + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector=finding.get("info", {}).get("classification", {}).get("cvss-metrics", ""), + description=finding.get("info", {}).get("description", ""), + affected_endpoint=finding.get("matched-at", self.target), + impact=finding.get("info", {}).get("impact", f"{severity} severity vulnerability"), + poc_request=finding.get("curl-command", f"curl -X GET '{finding.get('matched-at', self.target)}'"), + poc_response=finding.get("response", "")[:500] if finding.get("response") else "See tool output", + poc_payload=finding.get("matcher-name", "Template-based detection"), + remediation=finding.get("info", {}).get("remediation", "Apply vendor patches"), + references=finding.get("info", {}).get("reference", []), + cwe_id=str(finding.get("info", {}).get("classification", {}).get("cwe-id", "")), + tool_output=json.dumps(finding, indent=2) + ) + self.scan_result.vulnerabilities.append(vuln) + print(f" [!] {severity}: {vuln.title} at {vuln.affected_endpoint}") + + except json.JSONDecodeError: + continue + + def run_sqlmap_scan(self, param: str = None) -> Dict: + """Run sqlmap SQL injection scan""" + cmd = ["sqlmap", "-u", self.target, "--batch", "--level=2", "--risk=2", + "--random-agent", "--threads=5", "--output-dir=/tmp/sqlmap_output"] + + if param: + cmd.extend(["--param", param]) + + result = self._run_command(cmd, timeout=600) + + if result["stdout"]: + self._parse_sqlmap_output(result["stdout"]) + + return result + + def _parse_sqlmap_output(self, output: str): + """Parse sqlmap output for SQL injection vulnerabilities""" + if "is vulnerable" in output.lower() or "injection" in output.lower(): + # Extract injection details + vuln_type = "Blind" if "blind" in output.lower() else "Error-based" + if "union" in output.lower(): + vuln_type = "UNION-based" + elif "time-based" in output.lower(): + vuln_type = "Time-based blind" + + # Extract payload + payload_match = re.search(r"Payload: (.+)", output) + payload = payload_match.group(1) if payload_match else "See tool output" + + vuln = Vulnerability( + title=f"SQL Injection ({vuln_type})", + severity="Critical", + cvss_score=9.8, + cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + description=f"SQL Injection vulnerability detected. Type: {vuln_type}. This allows an attacker to manipulate database queries.", + affected_endpoint=self.target, + impact="Complete database compromise. Attacker can read, modify, or delete data. Potential for remote code execution.", + poc_request=f"GET {self.target}?param={payload} HTTP/1.1\nHost: {self._get_domain()}\nUser-Agent: Mozilla/5.0", + poc_response="Database error or data disclosure in response", + poc_payload=payload, + remediation="Use parameterized queries/prepared statements. Implement input validation. Apply least privilege to database accounts.", + cwe_id="CWE-89", + references=["https://owasp.org/www-community/attacks/SQL_Injection"], + tool_output=output[:2000] + ) + self.scan_result.vulnerabilities.append(vuln) + print(f" [!!!] CRITICAL: SQL Injection found!") + + def run_ffuf_scan(self, wordlist: str = "/usr/share/wordlists/dirb/common.txt") -> Dict: + """Run ffuf directory/file bruteforce""" + target_url = self.target.rstrip('/') + "/FUZZ" + cmd = ["ffuf", "-u", target_url, "-w", wordlist, "-mc", "200,301,302,403", + "-o", "/tmp/ffuf_output.json", "-of", "json", "-t", "50"] + + result = self._run_command(cmd, timeout=300) + + # Parse output file if exists + if os.path.exists("/tmp/ffuf_output.json"): + try: + with open("/tmp/ffuf_output.json", "r") as f: + ffuf_data = json.load(f) + for res in ffuf_data.get("results", []): + print(f" [+] Found: {res.get('url')} (Status: {res.get('status')})") + except: + pass + + return result + + def run_curl_test(self, method: str = "GET", path: str = "/", headers: Dict = None, data: str = None) -> Dict: + """Run curl request and capture full request/response""" + url = self.target.rstrip('/') + path + cmd = ["curl", "-v", "-s", "-k", "-X", method, url] + + if headers: + for k, v in headers.items(): + cmd.extend(["-H", f"{k}: {v}"]) + + if data: + cmd.extend(["-d", data]) + + result = self._run_command(cmd) + return result + + def run_http_security_check(self) -> Dict: + """Check HTTP security headers""" + cmd = ["curl", "-s", "-I", "-k", self.target] + result = self._run_command(cmd) + + if result["success"]: + self._parse_security_headers(result["stdout"]) + + return result + + def _parse_security_headers(self, headers: str): + """Parse response headers for security issues""" + required_headers = { + "X-Frame-Options": ("Missing X-Frame-Options", "Medium", "Clickjacking protection"), + "X-Content-Type-Options": ("Missing X-Content-Type-Options", "Low", "MIME type sniffing protection"), + "X-XSS-Protection": ("Missing X-XSS-Protection", "Low", "XSS filter"), + "Strict-Transport-Security": ("Missing HSTS Header", "Medium", "HTTPS enforcement"), + "Content-Security-Policy": ("Missing Content-Security-Policy", "Medium", "XSS/injection protection"), + } + + headers_lower = headers.lower() + + for header, (title, severity, desc) in required_headers.items(): + if header.lower() not in headers_lower: + vuln = Vulnerability( + title=title, + severity=severity, + cvss_score=self._severity_to_cvss(severity), + cvss_vector="", + description=f"The {header} header is not set. This header provides {desc}.", + affected_endpoint=self.target, + impact=f"Missing {desc} could lead to attacks", + poc_request=f"curl -I {self.target}", + poc_response=headers[:500], + poc_payload="N/A - Header check", + remediation=f"Add the {header} header to all HTTP responses", + cwe_id="CWE-693" + ) + self.scan_result.vulnerabilities.append(vuln) + + def run_whatweb_scan(self) -> Dict: + """Run whatweb technology detection""" + cmd = ["whatweb", "-a", "3", "--color=never", self.target] + result = self._run_command(cmd) + + if result["stdout"]: + # Extract technologies + techs = re.findall(r'\[([^\]]+)\]', result["stdout"]) + self.scan_result.technologies.extend(techs[:20]) + print(f" [+] Technologies: {', '.join(techs[:10])}") + + return result + + def _severity_to_cvss(self, severity: str) -> float: + """Convert severity to CVSS score""" + mapping = { + "critical": 9.5, + "high": 7.5, + "medium": 5.5, + "low": 3.0, + "info": 0.0, + "unknown": 0.0 + } + return mapping.get(severity.lower(), 0.0) + + def run_full_scan(self) -> ScanResult: + """Run a complete pentest scan""" + print(f"\n{'='*60}") + print(f"[*] Starting Full Pentest Scan on: {self.target}") + print(f"{'='*60}\n") + + # Phase 1: Reconnaissance + print("[Phase 1] Reconnaissance") + print("-" * 40) + + print("[*] Running port scan...") + self.run_nmap_scan() + + print("\n[*] Running technology detection...") + self.run_whatweb_scan() + + print("\n[*] Checking security headers...") + self.run_http_security_check() + + # Phase 2: Vulnerability Scanning + print(f"\n[Phase 2] Vulnerability Scanning") + print("-" * 40) + + print("[*] Running Nuclei scan...") + self.run_nuclei_scan() + + print("\n[*] Running Nikto scan...") + self.run_nikto_scan() + + # Phase 3: Specific Tests + print(f"\n[Phase 3] Specific Vulnerability Tests") + print("-" * 40) + + print("[*] Testing for SQL Injection...") + self.run_sqlmap_scan() + + print("\n[*] Running directory enumeration...") + self.run_ffuf_scan() + + # Complete scan + self.scan_result.scan_completed = datetime.now().isoformat() + + print(f"\n{'='*60}") + print(f"[*] Scan Complete!") + print(f" - Tools Executed: {len(self.scan_result.tools_executed)}") + print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") + print(f" - Open Ports: {len(self.scan_result.open_ports)}") + print(f"{'='*60}\n") + + return self.scan_result + + def run_quick_scan(self) -> ScanResult: + """Run a quick scan with essential tools only""" + print(f"\n{'='*60}") + print(f"[*] Starting Quick Scan on: {self.target}") + print(f"{'='*60}\n") + + print("[*] Running port scan (top 100 ports)...") + self.run_nmap_scan(ports="1-100") + + print("\n[*] Checking security headers...") + self.run_http_security_check() + + print("\n[*] Running Nuclei scan...") + self.run_nuclei_scan() + + self.scan_result.scan_completed = datetime.now().isoformat() + + print(f"\n{'='*60}") + print(f"[*] Quick Scan Complete!") + print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") + print(f"{'='*60}\n") + + return self.scan_result + + def get_findings_summary(self) -> Dict: + """Get summary of findings""" + severity_count = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Info": 0} + + for vuln in self.scan_result.vulnerabilities: + sev = vuln.severity.capitalize() + if sev in severity_count: + severity_count[sev] += 1 + + return { + "target": self.target, + "total_vulnerabilities": len(self.scan_result.vulnerabilities), + "severity_breakdown": severity_count, + "open_ports": len(self.scan_result.open_ports), + "technologies": self.scan_result.technologies, + "tools_executed": len(self.scan_result.tools_executed) + } + + def to_dict(self) -> Dict: + """Convert scan results to dictionary""" + return { + "target": self.scan_result.target, + "scan_started": self.scan_result.scan_started, + "scan_completed": self.scan_result.scan_completed, + "tools_executed": self.scan_result.tools_executed, + "vulnerabilities": [asdict(v) for v in self.scan_result.vulnerabilities], + "open_ports": self.scan_result.open_ports, + "technologies": self.scan_result.technologies, + "summary": self.get_findings_summary() + } diff --git a/core/report_generator.py b/core/report_generator.py new file mode 100644 index 0000000..4c2c3c1 --- /dev/null +++ b/core/report_generator.py @@ -0,0 +1,639 @@ +#!/usr/bin/env python3 +""" +Professional Pentest Report Generator +Generates detailed reports with PoCs, CVSS scores, requests/responses +""" + +import json +import os +from datetime import datetime +from typing import Dict, List, Any +import html +import logging + +logger = logging.getLogger(__name__) + + +class ReportGenerator: + """Generates professional penetration testing reports""" + + def __init__(self, scan_results: Dict, llm_analysis: str = ""): + self.scan_results = scan_results + self.llm_analysis = llm_analysis + self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + def _get_severity_color(self, severity: str) -> str: + """Get color for severity level""" + colors = { + "critical": "#dc3545", + "high": "#fd7e14", + "medium": "#ffc107", + "low": "#17a2b8", + "info": "#6c757d" + } + return colors.get(severity.lower(), "#6c757d") + + def _get_severity_badge(self, severity: str) -> str: + """Get HTML badge for severity""" + color = self._get_severity_color(severity) + return f'{severity.upper()}' + + def _escape_html(self, text: str) -> str: + """Escape HTML characters""" + if not text: + return "" + return html.escape(str(text)) + + def _format_code_block(self, code: str, language: str = "") -> str: + """Format code block with syntax highlighting""" + escaped = self._escape_html(code) + return f'
{escaped}'
+
+ def generate_executive_summary(self) -> str:
+ """Generate executive summary section"""
+ summary = self.scan_results.get("summary", {})
+ severity = summary.get("severity_breakdown", {})
+
+ total = summary.get("total_vulnerabilities", 0)
+ critical = severity.get("Critical", 0)
+ high = severity.get("High", 0)
+ medium = severity.get("Medium", 0)
+ low = severity.get("Low", 0)
+
+ risk_level = "Critical" if critical > 0 else "High" if high > 0 else "Medium" if medium > 0 else "Low"
+
+ return f"""
+ | Target: | {self._escape_html(self.scan_results.get('target', 'N/A'))} |
| Scan Started: | {self.scan_results.get('scan_started', 'N/A')} |
| Scan Completed: | {self.scan_results.get('scan_completed', 'N/A')} |
| Overall Risk Level: | {self._get_severity_badge(risk_level)} |
Total Vulnerabilities: {total}
+Open Ports Found: {summary.get('open_ports', 0)}
+Tools Executed: {summary.get('tools_executed', 0)}
+{self._escape_html(vuln.get('description', 'No description available'))}
+{self._escape_html(vuln.get('affected_endpoint', 'N/A'))}
+ {self._escape_html(vuln.get('impact', 'Impact not assessed'))}
+{self._escape_html(vuln.get('cvss_vector', 'N/A'))}
+ {self._escape_html(vuln.get('remediation', 'Consult vendor documentation for patches'))}
+| Port | +Protocol | +Service | +Version | +
|---|
{self._escape_html(tool.get('command', 'N/A')[:100])}| Tool | +Command | +Status | +Timestamp | +
|---|
Penetration Test Report
+Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
+No vulnerabilities found during the assessment.
'} +