#!/usr/bin/env python3 """ Pentest Executor - Executes real pentest tools and captures outputs for PoC generation """ import subprocess import shutil import json import re import os import logging import socket import urllib.parse from typing import Dict, List, Optional, Any from datetime import datetime from dataclasses import dataclass, field, asdict logger = logging.getLogger(__name__) @dataclass class Vulnerability: """Represents a discovered vulnerability with PoC""" title: str severity: str # Critical, High, Medium, Low, Info cvss_score: float cvss_vector: str description: str affected_endpoint: str impact: str poc_request: str poc_response: str poc_payload: str remediation: str references: List[str] = field(default_factory=list) cwe_id: str = "" tool_output: str = "" timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) @dataclass class ScanResult: """Contains all scan results and findings""" target: str scan_started: str scan_completed: str = "" tools_executed: List[Dict] = field(default_factory=list) vulnerabilities: List[Vulnerability] = field(default_factory=list) open_ports: List[Dict] = field(default_factory=list) technologies: List[str] = field(default_factory=list) raw_outputs: Dict[str, str] = field(default_factory=dict) class PentestExecutor: """Executes real pentest tools and captures outputs""" def __init__(self, target: str, config: Dict = None, recon_context: Dict = None): self.target = self._normalize_target(target) self.config = config or {} self.recon_context = recon_context # Contexto consolidado do recon self.scan_result = ScanResult( target=self.target, scan_started=datetime.now().isoformat() ) self.timeout = 300 # 5 minutes default timeout # Se tiver contexto de recon, pre-popula dados if self.recon_context: self._load_from_recon_context() def _load_from_recon_context(self): """Carrega dados do contexto de recon consolidado.""" if not self.recon_context: return data = self.recon_context.get('data', {}) # Carrega tecnologias detectadas techs = data.get('technologies', []) self.scan_result.technologies.extend(techs) # Carrega portas abertas ports = data.get('open_ports', []) for port in ports: if port not in self.scan_result.open_ports: self.scan_result.open_ports.append(port) # Carrega vulnerabilidades ja encontradas vulns = self.recon_context.get('vulnerabilities', {}).get('all', []) for v in vulns: vuln = Vulnerability( title=v.get('title', v.get('name', 'Unknown')), severity=v.get('severity', 'Info').capitalize(), cvss_score=self._severity_to_cvss(v.get('severity', 'info')), cvss_vector="", description=v.get('description', ''), affected_endpoint=v.get('affected_endpoint', v.get('url', self.target)), impact=f"{v.get('severity', 'info')} severity vulnerability", poc_request=v.get('curl_command', ''), poc_response="", poc_payload="", remediation="Apply vendor patches and security best practices" ) self.scan_result.vulnerabilities.append(vuln) logger.info(f"Carregados do recon: {len(techs)} techs, {len(ports)} portas, {len(vulns)} vulns") @classmethod def load_context_from_file(cls, context_file: str) -> Optional[Dict]: """Carrega contexto de recon de um arquivo JSON.""" try: with open(context_file, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Erro ao carregar contexto: {e}") return None def get_urls_with_params(self) -> List[str]: """Retorna URLs com parametros do contexto para testes de injecao.""" if not self.recon_context: return [] data = self.recon_context.get('data', {}) urls = data.get('urls', {}) if isinstance(urls, dict): return urls.get('with_params', []) return [] def get_api_endpoints(self) -> List[str]: """Retorna endpoints de API do contexto.""" if not self.recon_context: return [] data = self.recon_context.get('data', {}) return data.get('api_endpoints', []) def get_interesting_paths(self) -> List[str]: """Retorna caminhos interessantes do contexto.""" if not self.recon_context: return [] data = self.recon_context.get('data', {}) return data.get('interesting_paths', []) def get_live_hosts(self) -> List[str]: """Retorna hosts ativos do contexto.""" if not self.recon_context: return [] data = self.recon_context.get('data', {}) return data.get('live_hosts', []) def get_context_for_llm(self) -> str: """Retorna o contexto formatado para incluir no prompt do LLM.""" if not self.recon_context: return "" lines = [ "=== CONTEXTO DE RECON CONSOLIDADO ===", f"Alvo: {self.recon_context.get('target', {}).get('primary_target', 'N/A')}", "", "SUPERFICIE DE ATAQUE:", ] attack_surface = self.recon_context.get('attack_surface', {}) for key, value in attack_surface.items(): lines.append(f" - {key}: {value}") lines.append("\nTECNOLOGIAS DETECTADAS:") for tech in self.scan_result.technologies[:10]: lines.append(f" - {tech}") lines.append("\nURLs COM PARAMETROS (para testes de injecao):") for url in self.get_urls_with_params()[:20]: lines.append(f" - {url}") lines.append("\nENDPOINTS DE API:") for ep in self.get_api_endpoints()[:10]: lines.append(f" - {ep}") lines.append("\nVULNERABILIDADES JA ENCONTRADAS:") for vuln in self.scan_result.vulnerabilities[:10]: lines.append(f" - [{vuln.severity}] {vuln.title}") return "\n".join(lines) def _normalize_target(self, target: str) -> str: """Normalize target URL/IP""" target = target.strip() if not target.startswith(('http://', 'https://')): # Check if it's an IP try: socket.inet_aton(target.split('/')[0].split(':')[0]) return target # It's an IP except socket.error: # Assume it's a domain return f"https://{target}" return target def _get_domain(self) -> str: """Extract domain from target""" parsed = urllib.parse.urlparse(self.target) return parsed.netloc or parsed.path.split('/')[0] def _get_ip(self) -> Optional[str]: """Resolve target to IP""" try: domain = self._get_domain() return socket.gethostbyname(domain.split(':')[0]) except socket.error: return None def _run_command(self, cmd: List[str], timeout: int = None) -> Dict: """Run a command and capture output""" timeout = timeout or self.timeout tool_name = cmd[0] if cmd else "unknown" result = { "tool": tool_name, "command": " ".join(cmd), "success": False, "stdout": "", "stderr": "", "exit_code": -1, "timestamp": datetime.now().isoformat() } # Check if tool exists if not shutil.which(cmd[0]): result["stderr"] = f"Tool '{cmd[0]}' not found. Please install it using 'install_tools' command." logger.warning(f"Tool not found: {cmd[0]}") return result try: print(f"[*] Executing: {' '.join(cmd)}") logger.info(f"Executing: {' '.join(cmd)}") proc = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) result["stdout"] = proc.stdout result["stderr"] = proc.stderr result["exit_code"] = proc.returncode result["success"] = proc.returncode == 0 except subprocess.TimeoutExpired: result["stderr"] = f"Command timed out after {timeout} seconds" logger.warning(f"Timeout: {' '.join(cmd)}") except Exception as e: result["stderr"] = str(e) logger.error(f"Error executing {cmd[0]}: {e}") self.scan_result.tools_executed.append(result) self.scan_result.raw_outputs[tool_name] = result["stdout"] return result def run_nmap_scan(self, ports: str = "1-1000", extra_args: List[str] = None) -> Dict: """Run nmap port scan""" domain = self._get_domain() cmd = ["nmap", "-sV", "-sC", "-p", ports, "--open", domain] if extra_args: cmd.extend(extra_args) result = self._run_command(cmd) if result["success"]: self._parse_nmap_output(result["stdout"]) return result def _parse_nmap_output(self, output: str): """Parse nmap output for open ports""" port_pattern = r"(\d+)/(\w+)\s+open\s+(\S+)\s*(.*)" for match in re.finditer(port_pattern, output): port_info = { "port": int(match.group(1)), "protocol": match.group(2), "service": match.group(3), "version": match.group(4).strip() } self.scan_result.open_ports.append(port_info) print(f" [+] Found: {port_info['port']}/{port_info['protocol']} - {port_info['service']} {port_info['version']}") def run_nikto_scan(self) -> Dict: """Run nikto web vulnerability scan""" cmd = ["nikto", "-h", self.target, "-Format", "txt", "-nointeractive"] result = self._run_command(cmd, timeout=600) if result["success"] or result["stdout"]: self._parse_nikto_output(result["stdout"]) return result def _parse_nikto_output(self, output: str): """Parse nikto output for vulnerabilities""" vuln_patterns = [ (r"OSVDB-\d+:.*", "Medium"), (r"\+ (/[^\s]+).*SQL injection", "High"), (r"\+ (/[^\s]+).*XSS", "High"), (r"\+ The X-XSS-Protection header", "Low"), (r"\+ The X-Content-Type-Options header", "Low"), (r"\+ Server leaks", "Medium"), (r"\+ Retrieved x-powered-by header", "Info"), ] for line in output.split('\n'): for pattern, severity in vuln_patterns: if re.search(pattern, line, re.IGNORECASE): vuln = Vulnerability( title=line.strip()[:100], severity=severity, cvss_score=self._severity_to_cvss(severity), cvss_vector="", description=line.strip(), affected_endpoint=self.target, impact=f"{severity} severity finding detected by Nikto", poc_request=f"GET {self.target} HTTP/1.1", poc_response="See tool output", poc_payload="N/A - Passive scan", remediation="Review and fix the identified issue", tool_output=line ) self.scan_result.vulnerabilities.append(vuln) def run_nuclei_scan(self, templates: str = None) -> Dict: """Run nuclei vulnerability scan""" cmd = ["nuclei", "-u", self.target, "-silent", "-nc", "-j"] if templates: cmd.extend(["-t", templates]) result = self._run_command(cmd, timeout=600) if result["stdout"]: self._parse_nuclei_output(result["stdout"]) return result def _parse_nuclei_output(self, output: str): """Parse nuclei JSON output for vulnerabilities""" for line in output.strip().split('\n'): if not line.strip(): continue try: finding = json.loads(line) severity = finding.get("info", {}).get("severity", "unknown").capitalize() vuln = Vulnerability( title=finding.get("info", {}).get("name", "Unknown"), severity=severity, cvss_score=self._severity_to_cvss(severity), cvss_vector=finding.get("info", {}).get("classification", {}).get("cvss-metrics", ""), description=finding.get("info", {}).get("description", ""), affected_endpoint=finding.get("matched-at", self.target), impact=finding.get("info", {}).get("impact", f"{severity} severity vulnerability"), poc_request=finding.get("curl-command", f"curl -X GET '{finding.get('matched-at', self.target)}'"), poc_response=finding.get("response", "")[:500] if finding.get("response") else "See tool output", poc_payload=finding.get("matcher-name", "Template-based detection"), remediation=finding.get("info", {}).get("remediation", "Apply vendor patches"), references=finding.get("info", {}).get("reference", []), cwe_id=str(finding.get("info", {}).get("classification", {}).get("cwe-id", "")), tool_output=json.dumps(finding, indent=2) ) self.scan_result.vulnerabilities.append(vuln) print(f" [!] {severity}: {vuln.title} at {vuln.affected_endpoint}") except json.JSONDecodeError: continue def run_sqlmap_scan(self, param: str = None) -> Dict: """Run sqlmap SQL injection scan""" cmd = ["sqlmap", "-u", self.target, "--batch", "--level=2", "--risk=2", "--random-agent", "--threads=5", "--output-dir=/tmp/sqlmap_output"] if param: cmd.extend(["--param", param]) result = self._run_command(cmd, timeout=600) if result["stdout"]: self._parse_sqlmap_output(result["stdout"]) return result def _parse_sqlmap_output(self, output: str): """Parse sqlmap output for SQL injection vulnerabilities""" if "is vulnerable" in output.lower() or "injection" in output.lower(): # Extract injection details vuln_type = "Blind" if "blind" in output.lower() else "Error-based" if "union" in output.lower(): vuln_type = "UNION-based" elif "time-based" in output.lower(): vuln_type = "Time-based blind" # Extract payload payload_match = re.search(r"Payload: (.+)", output) payload = payload_match.group(1) if payload_match else "See tool output" vuln = Vulnerability( title=f"SQL Injection ({vuln_type})", severity="Critical", cvss_score=9.8, cvss_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", description=f"SQL Injection vulnerability detected. Type: {vuln_type}. This allows an attacker to manipulate database queries.", affected_endpoint=self.target, impact="Complete database compromise. Attacker can read, modify, or delete data. Potential for remote code execution.", poc_request=f"GET {self.target}?param={payload} HTTP/1.1\nHost: {self._get_domain()}\nUser-Agent: Mozilla/5.0", poc_response="Database error or data disclosure in response", poc_payload=payload, remediation="Use parameterized queries/prepared statements. Implement input validation. Apply least privilege to database accounts.", cwe_id="CWE-89", references=["https://owasp.org/www-community/attacks/SQL_Injection"], tool_output=output[:2000] ) self.scan_result.vulnerabilities.append(vuln) print(f" [!!!] CRITICAL: SQL Injection found!") def run_ffuf_scan(self, wordlist: str = "/usr/share/wordlists/dirb/common.txt") -> Dict: """Run ffuf directory/file bruteforce""" target_url = self.target.rstrip('/') + "/FUZZ" cmd = ["ffuf", "-u", target_url, "-w", wordlist, "-mc", "200,301,302,403", "-o", "/tmp/ffuf_output.json", "-of", "json", "-t", "50"] result = self._run_command(cmd, timeout=300) # Parse output file if exists if os.path.exists("/tmp/ffuf_output.json"): try: with open("/tmp/ffuf_output.json", "r") as f: ffuf_data = json.load(f) for res in ffuf_data.get("results", []): print(f" [+] Found: {res.get('url')} (Status: {res.get('status')})") except: pass return result def run_curl_test(self, method: str = "GET", path: str = "/", headers: Dict = None, data: str = None) -> Dict: """Run curl request and capture full request/response""" url = self.target.rstrip('/') + path cmd = ["curl", "-v", "-s", "-k", "-X", method, url] if headers: for k, v in headers.items(): cmd.extend(["-H", f"{k}: {v}"]) if data: cmd.extend(["-d", data]) result = self._run_command(cmd) return result def run_http_security_check(self) -> Dict: """Check HTTP security headers""" cmd = ["curl", "-s", "-I", "-k", self.target] result = self._run_command(cmd) if result["success"]: self._parse_security_headers(result["stdout"]) return result def _parse_security_headers(self, headers: str): """Parse response headers for security issues""" required_headers = { "X-Frame-Options": ("Missing X-Frame-Options", "Medium", "Clickjacking protection"), "X-Content-Type-Options": ("Missing X-Content-Type-Options", "Low", "MIME type sniffing protection"), "X-XSS-Protection": ("Missing X-XSS-Protection", "Low", "XSS filter"), "Strict-Transport-Security": ("Missing HSTS Header", "Medium", "HTTPS enforcement"), "Content-Security-Policy": ("Missing Content-Security-Policy", "Medium", "XSS/injection protection"), } headers_lower = headers.lower() for header, (title, severity, desc) in required_headers.items(): if header.lower() not in headers_lower: vuln = Vulnerability( title=title, severity=severity, cvss_score=self._severity_to_cvss(severity), cvss_vector="", description=f"The {header} header is not set. This header provides {desc}.", affected_endpoint=self.target, impact=f"Missing {desc} could lead to attacks", poc_request=f"curl -I {self.target}", poc_response=headers[:500], poc_payload="N/A - Header check", remediation=f"Add the {header} header to all HTTP responses", cwe_id="CWE-693" ) self.scan_result.vulnerabilities.append(vuln) def run_whatweb_scan(self) -> Dict: """Run whatweb technology detection""" cmd = ["whatweb", "-a", "3", "--color=never", self.target] result = self._run_command(cmd) if result["stdout"]: # Extract technologies techs = re.findall(r'\[([^\]]+)\]', result["stdout"]) self.scan_result.technologies.extend(techs[:20]) print(f" [+] Technologies: {', '.join(techs[:10])}") return result def _severity_to_cvss(self, severity: str) -> float: """Convert severity to CVSS score""" mapping = { "critical": 9.5, "high": 7.5, "medium": 5.5, "low": 3.0, "info": 0.0, "unknown": 0.0 } return mapping.get(severity.lower(), 0.0) def run_full_scan(self) -> ScanResult: """Run a complete pentest scan""" print(f"\n{'='*60}") print(f"[*] Starting Full Pentest Scan on: {self.target}") print(f"{'='*60}\n") # Phase 1: Reconnaissance print("[Phase 1] Reconnaissance") print("-" * 40) print("[*] Running port scan...") self.run_nmap_scan() print("\n[*] Running technology detection...") self.run_whatweb_scan() print("\n[*] Checking security headers...") self.run_http_security_check() # Phase 2: Vulnerability Scanning print(f"\n[Phase 2] Vulnerability Scanning") print("-" * 40) print("[*] Running Nuclei scan...") self.run_nuclei_scan() print("\n[*] Running Nikto scan...") self.run_nikto_scan() # Phase 3: Specific Tests print(f"\n[Phase 3] Specific Vulnerability Tests") print("-" * 40) print("[*] Testing for SQL Injection...") self.run_sqlmap_scan() print("\n[*] Running directory enumeration...") self.run_ffuf_scan() # Complete scan self.scan_result.scan_completed = datetime.now().isoformat() print(f"\n{'='*60}") print(f"[*] Scan Complete!") print(f" - Tools Executed: {len(self.scan_result.tools_executed)}") print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") print(f" - Open Ports: {len(self.scan_result.open_ports)}") print(f"{'='*60}\n") return self.scan_result def run_quick_scan(self) -> ScanResult: """Run a quick scan with essential tools only""" print(f"\n{'='*60}") print(f"[*] Starting Quick Scan on: {self.target}") print(f"{'='*60}\n") print("[*] Running port scan (top 100 ports)...") self.run_nmap_scan(ports="1-100") print("\n[*] Checking security headers...") self.run_http_security_check() print("\n[*] Running Nuclei scan...") self.run_nuclei_scan() self.scan_result.scan_completed = datetime.now().isoformat() print(f"\n{'='*60}") print(f"[*] Quick Scan Complete!") print(f" - Vulnerabilities Found: {len(self.scan_result.vulnerabilities)}") print(f"{'='*60}\n") return self.scan_result def get_findings_summary(self) -> Dict: """Get summary of findings""" severity_count = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Info": 0} for vuln in self.scan_result.vulnerabilities: sev = vuln.severity.capitalize() if sev in severity_count: severity_count[sev] += 1 return { "target": self.target, "total_vulnerabilities": len(self.scan_result.vulnerabilities), "severity_breakdown": severity_count, "open_ports": len(self.scan_result.open_ports), "technologies": self.scan_result.technologies, "tools_executed": len(self.scan_result.tools_executed) } def to_dict(self) -> Dict: """Convert scan results to dictionary""" return { "target": self.scan_result.target, "scan_started": self.scan_result.scan_started, "scan_completed": self.scan_result.scan_completed, "tools_executed": self.scan_result.tools_executed, "vulnerabilities": [asdict(v) for v in self.scan_result.vulnerabilities], "open_ports": self.scan_result.open_ports, "technologies": self.scan_result.technologies, "summary": self.get_findings_summary() }