""" NeuroSploit v3 - XBOW-Inspired Response Verification Framework Multi-signal verification system that confirms vulnerabilities through 4 independent signals, reducing false positives dramatically. Inspired by XBOW benchmark methodology: - Binary verification (flag-based in CTF, evidence-based here) - Health checks before testing - Baseline diffing for behavioral anomaly detection - Multi-signal confirmation (2+ signals = confirmed without AI) """ import re import hashlib from typing import Dict, List, Optional, Tuple, Any # --------------------------------------------------------------------------- # Error / indicator patterns used across multiple checkers # --------------------------------------------------------------------------- DB_ERROR_PATTERNS = [ r"(?:sql|database|query)\s*(?:error|syntax|exception)", r"mysql_(?:fetch|query|num_rows|connect)", r"mysqli_", r"pg_(?:query|exec|prepare|connect)", r"sqlite3?\.\w+error", r"ora-\d{4,5}", r"mssql_query", r"sqlstate\[", r"odbc\s+driver", r"jdbc\s+exception", r"unclosed\s+quotation", r"you have an error in your sql", r"syntax error.*at line \d+", ] TEMPLATE_ERROR_PATTERNS = [ r"jinja2\.exceptions\.\w+", r"mako\.exceptions\.\w+", r"twig.*error", r"freemarker.*error", r"smarty.*error", r"django\.template\.\w+", r"template syntax error", ] FILE_CONTENT_MARKERS = [ "root:x:0:0:", "daemon:x:1:1:", "bin:x:2:2:", "www-data:", "[boot loader]", "[operating systems]", "[extensions]", ] SSTI_EVALUATIONS = { "7*7": "49", "7*'7'": "7777777", "3*3": "9", } COMMAND_OUTPUT_MARKERS = [ r"uid=\d+\(", r"gid=\d+\(", r"root:\w+:0:0:", r"/bin/(?:ba)?sh", r"Linux\s+\S+\s+\d+\.\d+", r"total\s+\d+\s*\n", ] NOSQL_ERROR_PATTERNS = [ r"MongoError", r"mongo.*(?:syntax|parse|query).*error", r"\$(?:gt|lt|ne|in|nin|regex|where|exists)\b", r"CastError.*ObjectId", r"BSONTypeError", r"operator.*\$(?:gt|lt|ne|regex)", ] LDAP_ERROR_PATTERNS = [ r"javax\.naming\.(?:directory\.)?InvalidSearchFilterException", r"Bad search filter", r"ldap_search.*error", r"invalid.*(?:dn|distinguished name|ldap filter)", r"unbalanced.*parenthes[ei]s", r"NamingException", ] XPATH_ERROR_PATTERNS = [ r"XPathException", r"Invalid XPath", r"xmlXPathEval.*error", r"DOMXPath.*(?:evaluate|query).*error", r"SimpleXMLElement.*xpath", r"unterminated.*(?:string|expression).*xpath", r"XPATH syntax error", ] GRAPHQL_ERROR_PATTERNS = [ r'"errors"\s*:\s*\[', r"Syntax Error.*GraphQL", r"Cannot query field", r"Unknown argument", r"Expected Name", r"graphql.*parse.*error", ] DESERIALIZATION_ERROR_PATTERNS = [ r"java\.io\.(?:InvalidClass|StreamCorrupted)Exception", r"ClassNotFoundException", r"unserialize\(\).*error", r"pickle\.UnpicklingError", r"yaml\.(?:scanner|parser)\.ScannerError", r"__wakeup\(\).*failed", r"ObjectInputStream", r"readObject\(\).*exception", ] EL_INJECTION_PATTERNS = [ r"javax\.el\.ELException", r"org\.springframework\.expression\.spel", r"EL Expression.*error", r"OGNL.*exception", ] # --------------------------------------------------------------------------- # Health checking # --------------------------------------------------------------------------- UNHEALTHY_PATTERNS = [ "502 bad gateway", "503 service unavailable", "service unavailable", "maintenance mode", "under maintenance", "temporarily unavailable", "server is starting", "connection refused", ] class ResponseVerifier: """ Multi-signal verification framework for vulnerability confirmation. 4 independent signals are checked: 1. VulnEngine tester pattern match (structured analyze_response) 2. Baseline diff (status / length / hash change) 3. Payload effect (reflection, evaluation, file content) 4. New error patterns (present in test but absent in baseline) Confidence rules: - 2+ signals → confirmed (skip AI) - 1 signal + confidence >= 0.8 → confirmed - 1 signal + confidence < 0.8 → needs AI confirmation - 0 signals → rejected """ def __init__(self): self._compiled_db_errors = [re.compile(p, re.IGNORECASE) for p in DB_ERROR_PATTERNS] self._compiled_template_errors = [re.compile(p, re.IGNORECASE) for p in TEMPLATE_ERROR_PATTERNS] self._compiled_cmd_markers = [re.compile(p, re.IGNORECASE) for p in COMMAND_OUTPUT_MARKERS] self._compiled_nosql_errors = [re.compile(p, re.IGNORECASE) for p in NOSQL_ERROR_PATTERNS] self._compiled_ldap_errors = [re.compile(p, re.IGNORECASE) for p in LDAP_ERROR_PATTERNS] self._compiled_xpath_errors = [re.compile(p, re.IGNORECASE) for p in XPATH_ERROR_PATTERNS] self._compiled_graphql_errors = [re.compile(p, re.IGNORECASE) for p in GRAPHQL_ERROR_PATTERNS] self._compiled_deser_errors = [re.compile(p, re.IGNORECASE) for p in DESERIALIZATION_ERROR_PATTERNS] self._compiled_el_errors = [re.compile(p, re.IGNORECASE) for p in EL_INJECTION_PATTERNS] # ------------------------------------------------------------------ # Target health check # ------------------------------------------------------------------ async def check_target_health(self, session, url: str) -> Tuple[bool, dict]: """ Verify the target is alive and functional before testing. Returns: (is_healthy, info_dict) """ try: async with session.get(url, timeout=15, allow_redirects=True) as resp: body = await resp.text() status = resp.status headers = dict(resp.headers) info = { "status": status, "content_length": len(body), "content_type": headers.get("Content-Type", ""), "server": headers.get("Server", ""), } # Reject server errors if status >= 500: info["reason"] = f"Server error (HTTP {status})" return False, info # Reject empty/minimal pages if len(body) < 50: info["reason"] = "Response too short (< 50 chars)" return False, info # Check for unhealthy content body_lower = body.lower() for pattern in UNHEALTHY_PATTERNS: if pattern in body_lower: info["reason"] = f"Unhealthy response: '{pattern}'" return False, info info["healthy"] = True return True, info except Exception as e: return False, {"reason": f"Connection error: {str(e)[:200]}"} # ------------------------------------------------------------------ # Baseline diffing # ------------------------------------------------------------------ def compute_response_diff(self, baseline: dict, test_response: dict) -> dict: """ Compare test response against cached baseline. Returns dict with diff metrics. """ baseline_body = baseline.get("body", "") test_body = test_response.get("body", "") baseline_len = len(baseline_body) if isinstance(baseline_body, str) else baseline.get("body_length", 0) test_len = len(test_body) length_diff = abs(test_len - baseline_len) length_pct = (length_diff / max(baseline_len, 1)) * 100 baseline_hash = baseline.get("body_hash") or hashlib.md5( baseline_body.encode("utf-8", errors="replace") ).hexdigest() test_hash = hashlib.md5( test_body.encode("utf-8", errors="replace") ).hexdigest() # Detect new error patterns in test but not baseline baseline_lower = (baseline_body if isinstance(baseline_body, str) else "").lower() test_lower = test_body.lower() new_errors = [] for pat in self._compiled_db_errors: if pat.search(test_lower) and not pat.search(baseline_lower): new_errors.append(pat.pattern) for pat in self._compiled_template_errors: if pat.search(test_lower) and not pat.search(baseline_lower): new_errors.append(pat.pattern) return { "status_changed": baseline.get("status", 0) != test_response.get("status", 0), "baseline_status": baseline.get("status", 0), "test_status": test_response.get("status", 0), "length_diff": length_diff, "length_diff_pct": round(length_pct, 1), "body_hash_changed": baseline_hash != test_hash, "new_error_patterns": new_errors, } # ------------------------------------------------------------------ # Payload effect verification # ------------------------------------------------------------------ def _check_payload_effect(self, vuln_type: str, payload: str, test_body: str, test_status: int, test_headers: dict, baseline_body: str = "", baseline_status: int = 0) -> Tuple[bool, Optional[str]]: """ Check if the payload produced a detectable effect in the response. This is signal #3 in multi-signal verification. Weak checks (NoSQL blind, parameter pollution, type juggling, HTML injection, JWT, blind XSS, mutation XSS) require baseline comparison to eliminate false positives. """ body_lower = test_body.lower() baseline_lower = baseline_body.lower() if baseline_body else "" # ---- XSS ---- if vuln_type in ("xss", "xss_reflected", "xss_stored", "xss_dom"): payload_lower = payload.lower() # Unescaped reflection — use context-aware analysis if payload in test_body or payload_lower in body_lower: from backend.core.xss_context_analyzer import analyze_xss_execution_context ctx = analyze_xss_execution_context(test_body, payload) if ctx["executable"]: return True, f"XSS payload in auto-executing context: {ctx['detail']}" if ctx["interactive"]: return True, f"XSS payload in interactive context: {ctx['detail']}" return False, None # ---- SQLi ---- if vuln_type in ("sqli", "sqli_error", "sqli_union", "sqli_blind", "sqli_time"): for pat in self._compiled_db_errors: m = pat.search(body_lower) if m: return True, f"SQL error induced by payload: {m.group()}" return False, None # ---- SSTI ---- if vuln_type == "ssti": for expr, result in SSTI_EVALUATIONS.items(): if expr in payload and result in test_body: # Confirm the raw expression is NOT present (evaluated) if expr not in test_body: return True, f"Template expression evaluated: {expr}={result}" return False, None # ---- LFI / Path Traversal ---- if vuln_type in ("lfi", "path_traversal"): for marker in FILE_CONTENT_MARKERS: if marker.lower() in body_lower: return True, f"File content detected: {marker}" return False, None # ---- Command Injection / RCE ---- if vuln_type in ("rce", "command_injection"): for pat in self._compiled_cmd_markers: m = pat.search(test_body) if m: return True, f"Command output detected: {m.group()}" return False, None # ---- SSRF ---- if vuln_type in ("ssrf", "ssrf_cloud"): ssrf_markers = ["ami-id", "instance-type", "iam/info", "meta-data", "computeMetadata", "root:x:0:0"] for marker in ssrf_markers: if marker.lower() in body_lower: return True, f"Internal resource content: {marker}" return False, None # ---- Open Redirect ---- if vuln_type == "open_redirect": if test_status in (301, 302, 303, 307, 308): location = test_headers.get("Location", test_headers.get("location", "")) if "evil.com" in location or location.startswith("//"): return True, f"Redirect to external: {location}" return False, None # ---- XXE ---- if vuln_type == "xxe": for marker in FILE_CONTENT_MARKERS: if marker.lower() in body_lower: return True, f"XXE file read: {marker}" return False, None # ---- NoSQL Injection ---- if vuln_type == "nosql_injection": for pat in self._compiled_nosql_errors: m = pat.search(body_lower) if m: return True, f"NoSQL error induced: {m.group()}" # Boolean-based blind NoSQL: require response DIFFERS from baseline if "$gt" in payload or "$ne" in payload or "$regex" in payload: if baseline_body and test_status == 200: len_diff = abs(len(test_body) - len(baseline_body)) len_pct = (len_diff / max(len(baseline_body), 1)) * 100 status_diff = test_status != baseline_status if len_pct > 20 or status_diff: return True, f"NoSQL blind: Response differs from baseline (delta {len_diff} chars, {len_pct:.0f}%)" return False, None # ---- LDAP Injection ---- if vuln_type == "ldap_injection": for pat in self._compiled_ldap_errors: m = pat.search(test_body) if m: return True, f"LDAP error induced: {m.group()}" return False, None # ---- XPath Injection ---- if vuln_type == "xpath_injection": for pat in self._compiled_xpath_errors: m = pat.search(test_body) if m: return True, f"XPath error induced: {m.group()}" return False, None # ---- CRLF Injection ---- if vuln_type == "crlf_injection": # Check if injected header appears in response headers injected_headers = ["X-Injected", "Set-Cookie", "X-CRLF-Test"] for hdr in injected_headers: if hdr.lower() in payload.lower(): header_val = test_headers.get(hdr, test_headers.get(hdr.lower(), "")) if header_val and ("injected" in header_val.lower() or "crlf" in header_val.lower()): return True, f"CRLF: Injected header appeared: {hdr}: {header_val[:100]}" # Check for header splitting in body if "\r\n" in payload and test_status in (200, 302): if "x-injected" in body_lower or "set-cookie" in body_lower: return True, "CRLF: Injected headers visible in response body" return False, None # ---- Header Injection ---- if vuln_type == "header_injection": # Similar to CRLF but broader if "\r\n" in payload or "%0d%0a" in payload.lower(): for hdr_name in ["X-Injected", "X-Custom"]: if test_headers.get(hdr_name) or test_headers.get(hdr_name.lower()): return True, f"Header injection: {hdr_name} injected via payload" return False, None # ---- Expression Language Injection ---- if vuln_type == "expression_language_injection": for pat in self._compiled_el_errors: m = pat.search(test_body) if m: return True, f"EL error induced: {m.group()}" # Check for EL evaluation (similar to SSTI) for expr, result in SSTI_EVALUATIONS.items(): if expr in payload and result in test_body and expr not in test_body: return True, f"EL expression evaluated: {expr}={result}" return False, None # ---- Log Injection ---- if vuln_type == "log_injection": # Check for injected log line content reflected back log_markers = ["INJECTED_LOG_ENTRY", "FAKE_ADMIN_LOGIN", "log-injection-test"] for marker in log_markers: if marker in payload and marker in test_body: return True, f"Log injection: Marker '{marker}' reflected in response" return False, None # ---- HTML Injection ---- if vuln_type == "html_injection": payload_lower = payload.lower() # Check for unescaped HTML tags reflected html_tags = ["", "", "= 2: return True, "Prototype pollution: Injected properties reflected" return False, None # ---- Host Header Injection ---- if vuln_type == "host_header_injection": # Check if injected host is reflected in response evil_hosts = ["evil.com", "attacker.com", "injected.host"] for host in evil_hosts: if host in payload and host in body_lower: return True, f"Host header injection: {host} reflected in response" # Password reset poisoning if "evil.com" in payload: if "reset" in body_lower or "password" in body_lower: if "evil.com" in body_lower: return True, "Host header injection: Evil host in password reset link" return False, None # ---- HTTP Smuggling ---- if vuln_type == "http_smuggling": smuggling_indicators = [ test_status == 400 and "transfer-encoding" in payload.lower(), "unrecognized transfer-coding" in body_lower, "request smuggling" in body_lower, ] if any(smuggling_indicators): return True, "HTTP smuggling: Desync indicators detected" return False, None # ---- Cache Poisoning ---- if vuln_type == "cache_poisoning": # Check if injected value appears in cached response cache_headers = ["X-Cache", "CF-Cache-Status", "Age", "X-Cache-Hit"] is_cached = any( test_headers.get(h, test_headers.get(h.lower(), "")) for h in cache_headers ) if is_cached and payload.lower() in body_lower: return True, "Cache poisoning: Payload reflected in cached response" return False, None # ---- Insecure Deserialization ---- if vuln_type == "insecure_deserialization": for pat in self._compiled_deser_errors: m = pat.search(test_body) if m: return True, f"Deserialization error: {m.group()}" # Check for command execution via deser for pat in self._compiled_cmd_markers: m = pat.search(test_body) if m: return True, f"Deserialization RCE: {m.group()}" return False, None # ---- Parameter Pollution ---- if vuln_type == "parameter_pollution": # HPP only confirmed if response DIFFERS significantly from baseline if "&" in payload and baseline_body: len_diff = abs(len(test_body) - len(baseline_body)) len_pct = (len_diff / max(len(baseline_body), 1)) * 100 status_diff = test_status != baseline_status if len_pct > 20 or status_diff: return True, f"Parameter pollution: Response differs from baseline (delta {len_diff} chars, {len_pct:.0f}%)" return False, None # ---- Type Juggling ---- if vuln_type == "type_juggling": if test_status == 200: if "0" in payload or "true" in payload.lower() or "[]" in payload: auth_markers = ["authenticated", "authorized", "welcome", "admin", "success"] for marker in auth_markers: if marker in body_lower: # Require marker NOT in baseline — otherwise it's normal behavior if baseline_lower and marker in baseline_lower: continue return True, f"Type juggling: Auth bypass ({marker} appears only with juggled type)" return False, None # ---- SOAP Injection ---- if vuln_type == "soap_injection": soap_errors = [ r"soap.*(?:fault|error|exception)", r"xml.*(?:parse|syntax).*error", r"", r"", ] for pat_str in soap_errors: if re.search(pat_str, body_lower): return True, f"SOAP injection: {pat_str}" return False, None # ---- Subdomain Takeover ---- if vuln_type == "subdomain_takeover": takeover_markers = [ "there isn't a github pages site here", "herokucdn.com/error-pages", "the request could not be satisfied", "no such app", "project not found", "this page is parked free", "does not exist in the app platform", "NoSuchBucket", ] for marker in takeover_markers: if marker.lower() in body_lower: return True, f"Subdomain takeover: {marker}" return False, None return False, None # ------------------------------------------------------------------ # Multi-signal verification (core method) # ------------------------------------------------------------------ def multi_signal_verify( self, vuln_type: str, payload: str, test_response: dict, baseline: Optional[dict], tester_result: Tuple[bool, float, Optional[str]], ) -> Tuple[bool, str, int]: """ Combine 4 signals to determine if a vulnerability is confirmed. Args: vuln_type: Vulnerability type (registry key or legacy name) payload: The payload used test_response: The HTTP response from the payload test baseline: Cached baseline response (can be None) tester_result: (is_vuln, confidence, evidence) from VulnEngine tester Returns: (is_confirmed, evidence_summary, signal_count) """ signals: List[str] = [] evidence_parts: List[str] = [] max_confidence = 0.0 test_body = test_response.get("body", "") test_status = test_response.get("status", 0) test_headers = test_response.get("headers", {}) # --- Signal 1: VulnEngine tester pattern match --- tester_vuln, tester_conf, tester_evidence = tester_result if tester_vuln and tester_conf >= 0.7: signals.append("tester_match") evidence_parts.append(tester_evidence or "Pattern match") max_confidence = max(max_confidence, tester_conf) # --- Signal 2: Baseline diff --- if baseline: diff = self.compute_response_diff(baseline, test_response) # Type-specific diff thresholds significant_diff = False if vuln_type in ("sqli", "sqli_error", "sqli_blind"): significant_diff = diff["length_diff"] > 300 and diff["status_changed"] elif vuln_type in ("lfi", "path_traversal", "xxe"): significant_diff = diff["length_diff_pct"] > 50 elif vuln_type in ("ssti", "command_injection", "rce"): significant_diff = diff["body_hash_changed"] and diff["length_diff"] > 100 else: significant_diff = diff["status_changed"] and diff["length_diff"] > 500 if significant_diff: signals.append("baseline_diff") evidence_parts.append( f"Response diff: status {diff['baseline_status']}->{diff['test_status']}, " f"length delta {diff['length_diff']} ({diff['length_diff_pct']}%)" ) # New error patterns is an independent sub-signal if diff["new_error_patterns"]: signals.append("new_errors") evidence_parts.append( f"New error patterns: {', '.join(diff['new_error_patterns'][:3])}" ) # --- Signal 3: Payload effect --- baseline_body = baseline.get("body", "") if baseline else "" baseline_status = baseline.get("status", 0) if baseline else 0 effect_found, effect_evidence = self._check_payload_effect( vuln_type, payload, test_body, test_status, test_headers, baseline_body=baseline_body, baseline_status=baseline_status ) if effect_found: signals.append("payload_effect") evidence_parts.append(effect_evidence) # --- Confidence rules --- signal_count = len(signals) evidence_summary = " | ".join(evidence_parts) if evidence_parts else "" if signal_count >= 2: # 2+ signals → confirmed (skip AI) return True, evidence_summary, signal_count elif signal_count == 1 and max_confidence >= 0.8: # 1 signal + high confidence → confirmed return True, evidence_summary, signal_count elif signal_count == 1: # 1 signal + lower confidence → needs AI confirmation # Return False but with evidence so caller can decide to ask AI return False, evidence_summary, signal_count else: # 0 signals → rejected return False, "", 0