""" NeuroSploit v3 - XBOW-Inspired Response Verification Framework Multi-signal verification system that confirms vulnerabilities through 4 independent signals, reducing false positives dramatically. Inspired by XBOW benchmark methodology: - Binary verification (flag-based in CTF, evidence-based here) - Health checks before testing - Baseline diffing for behavioral anomaly detection - Multi-signal confirmation (2+ signals = confirmed without AI) """ import re import hashlib from typing import Dict, List, Optional, Tuple, Any # --------------------------------------------------------------------------- # Error / indicator patterns used across multiple checkers # --------------------------------------------------------------------------- DB_ERROR_PATTERNS = [ r"(?:sql|database|query)\s*(?:error|syntax|exception)", r"mysql_(?:fetch|query|num_rows|connect)", r"mysqli_", r"pg_(?:query|exec|prepare|connect)", r"sqlite3?\.\w+error", r"ora-\d{4,5}", r"mssql_query", r"sqlstate\[", r"odbc\s+driver", r"jdbc\s+exception", r"unclosed\s+quotation", r"you have an error in your sql", r"syntax error.*at line \d+", ] TEMPLATE_ERROR_PATTERNS = [ r"jinja2\.exceptions\.\w+", r"mako\.exceptions\.\w+", r"twig.*error", r"freemarker.*error", r"smarty.*error", r"django\.template\.\w+", r"template syntax error", ] FILE_CONTENT_MARKERS = [ "root:x:0:0:", "daemon:x:1:1:", "bin:x:2:2:", "www-data:", "[boot loader]", "[operating systems]", "[extensions]", ] SSTI_EVALUATIONS = { "7*7": "49", "7*'7'": "7777777", "3*3": "9", } COMMAND_OUTPUT_MARKERS = [ r"uid=\d+\(", r"gid=\d+\(", r"root:\w+:0:0:", r"/bin/(?:ba)?sh", r"Linux\s+\S+\s+\d+\.\d+", r"total\s+\d+\s*\n", ] NOSQL_ERROR_PATTERNS = [ r"MongoError", r"mongo.*(?:syntax|parse|query).*error", r"\$(?:gt|lt|ne|in|nin|regex|where|exists)\b", r"CastError.*ObjectId", r"BSONTypeError", r"operator.*\$(?:gt|lt|ne|regex)", ] LDAP_ERROR_PATTERNS = [ r"javax\.naming\.(?:directory\.)?InvalidSearchFilterException", r"Bad search filter", r"ldap_search.*error", r"invalid.*(?:dn|distinguished name|ldap filter)", r"unbalanced.*parenthes[ei]s", r"NamingException", ] XPATH_ERROR_PATTERNS = [ r"XPathException", r"Invalid XPath", r"xmlXPathEval.*error", r"DOMXPath.*(?:evaluate|query).*error", r"SimpleXMLElement.*xpath", r"unterminated.*(?:string|expression).*xpath", r"XPATH syntax error", ] GRAPHQL_ERROR_PATTERNS = [ r'"errors"\s*:\s*\[', r"Syntax Error.*GraphQL", r"Cannot query field", r"Unknown argument", r"Expected Name", r"graphql.*parse.*error", ] DESERIALIZATION_ERROR_PATTERNS = [ r"java\.io\.(?:InvalidClass|StreamCorrupted)Exception", r"ClassNotFoundException", r"unserialize\(\).*error", r"pickle\.UnpicklingError", r"yaml\.(?:scanner|parser)\.ScannerError", r"__wakeup\(\).*failed", r"ObjectInputStream", r"readObject\(\).*exception", ] EL_INJECTION_PATTERNS = [ r"javax\.el\.ELException", r"org\.springframework\.expression\.spel", r"EL Expression.*error", r"OGNL.*exception", ] # --------------------------------------------------------------------------- # Health checking # --------------------------------------------------------------------------- UNHEALTHY_PATTERNS = [ "502 bad gateway", "503 service unavailable", "service unavailable", "maintenance mode", "under maintenance", "temporarily unavailable", "server is starting", "connection refused", ] class ResponseVerifier: """ Multi-signal verification framework for vulnerability confirmation. 4 independent signals are checked: 1. VulnEngine tester pattern match (structured analyze_response) 2. Baseline diff (status / length / hash change) 3. Payload effect (reflection, evaluation, file content) 4. New error patterns (present in test but absent in baseline) Confidence rules: - 2+ signals → confirmed (skip AI) - 1 signal + confidence >= 0.8 → confirmed - 1 signal + confidence < 0.8 → needs AI confirmation - 0 signals → rejected """ def __init__(self): self._compiled_db_errors = [re.compile(p, re.IGNORECASE) for p in DB_ERROR_PATTERNS] self._compiled_template_errors = [re.compile(p, re.IGNORECASE) for p in TEMPLATE_ERROR_PATTERNS] self._compiled_cmd_markers = [re.compile(p, re.IGNORECASE) for p in COMMAND_OUTPUT_MARKERS] self._compiled_nosql_errors = [re.compile(p, re.IGNORECASE) for p in NOSQL_ERROR_PATTERNS] self._compiled_ldap_errors = [re.compile(p, re.IGNORECASE) for p in LDAP_ERROR_PATTERNS] self._compiled_xpath_errors = [re.compile(p, re.IGNORECASE) for p in XPATH_ERROR_PATTERNS] self._compiled_graphql_errors = [re.compile(p, re.IGNORECASE) for p in GRAPHQL_ERROR_PATTERNS] self._compiled_deser_errors = [re.compile(p, re.IGNORECASE) for p in DESERIALIZATION_ERROR_PATTERNS] self._compiled_el_errors = [re.compile(p, re.IGNORECASE) for p in EL_INJECTION_PATTERNS] # ------------------------------------------------------------------ # Target health check # ------------------------------------------------------------------ async def check_target_health(self, session, url: str) -> Tuple[bool, dict]: """ Verify the target is alive and functional before testing. Returns: (is_healthy, info_dict) """ try: async with session.get(url, timeout=15, allow_redirects=True) as resp: body = await resp.text() status = resp.status headers = dict(resp.headers) info = { "status": status, "content_length": len(body), "content_type": headers.get("Content-Type", ""), "server": headers.get("Server", ""), } # Reject server errors if status >= 500: info["reason"] = f"Server error (HTTP {status})" return False, info # Reject empty/minimal pages if len(body) < 50: info["reason"] = "Response too short (< 50 chars)" return False, info # Check for unhealthy content body_lower = body.lower() for pattern in UNHEALTHY_PATTERNS: if pattern in body_lower: info["reason"] = f"Unhealthy response: '{pattern}'" return False, info info["healthy"] = True return True, info except Exception as e: return False, {"reason": f"Connection error: {str(e)[:200]}"} # ------------------------------------------------------------------ # Baseline diffing # ------------------------------------------------------------------ def compute_response_diff(self, baseline: dict, test_response: dict) -> dict: """ Compare test response against cached baseline. Returns dict with diff metrics. """ baseline_body = baseline.get("body", "") test_body = test_response.get("body", "") baseline_len = len(baseline_body) if isinstance(baseline_body, str) else baseline.get("body_length", 0) test_len = len(test_body) length_diff = abs(test_len - baseline_len) length_pct = (length_diff / max(baseline_len, 1)) * 100 baseline_hash = baseline.get("body_hash") or hashlib.md5( baseline_body.encode("utf-8", errors="replace") ).hexdigest() test_hash = hashlib.md5( test_body.encode("utf-8", errors="replace") ).hexdigest() # Detect new error patterns in test but not baseline baseline_lower = (baseline_body if isinstance(baseline_body, str) else "").lower() test_lower = test_body.lower() new_errors = [] for pat in self._compiled_db_errors: if pat.search(test_lower) and not pat.search(baseline_lower): new_errors.append(pat.pattern) for pat in self._compiled_template_errors: if pat.search(test_lower) and not pat.search(baseline_lower): new_errors.append(pat.pattern) return { "status_changed": baseline.get("status", 0) != test_response.get("status", 0), "baseline_status": baseline.get("status", 0), "test_status": test_response.get("status", 0), "length_diff": length_diff, "length_diff_pct": round(length_pct, 1), "body_hash_changed": baseline_hash != test_hash, "new_error_patterns": new_errors, } # ------------------------------------------------------------------ # Payload effect verification # ------------------------------------------------------------------ def _check_payload_effect(self, vuln_type: str, payload: str, test_body: str, test_status: int, test_headers: dict, baseline_body: str = "", baseline_status: int = 0) -> Tuple[bool, Optional[str]]: """ Check if the payload produced a detectable effect in the response. This is signal #3 in multi-signal verification. Weak checks (NoSQL blind, parameter pollution, type juggling, HTML injection, JWT, blind XSS, mutation XSS) require baseline comparison to eliminate false positives. """ body_lower = test_body.lower() baseline_lower = baseline_body.lower() if baseline_body else "" # ---- XSS ---- if vuln_type in ("xss", "xss_reflected", "xss_stored", "xss_dom"): payload_lower = payload.lower() # Unescaped reflection — use context-aware analysis if payload in test_body or payload_lower in body_lower: from backend.core.xss_context_analyzer import analyze_xss_execution_context ctx = analyze_xss_execution_context(test_body, payload) if ctx["executable"]: return True, f"XSS payload in auto-executing context: {ctx['detail']}" if ctx["interactive"]: return True, f"XSS payload in interactive context: {ctx['detail']}" return False, None # ---- SQLi ---- if vuln_type in ("sqli", "sqli_error", "sqli_union", "sqli_blind", "sqli_time"): for pat in self._compiled_db_errors: m = pat.search(body_lower) if m: return True, f"SQL error induced by payload: {m.group()}" return False, None # ---- SSTI ---- if vuln_type == "ssti": for expr, result in SSTI_EVALUATIONS.items(): if expr in payload and result in test_body: # Confirm the raw expression is NOT present (evaluated) if expr not in test_body: return True, f"Template expression evaluated: {expr}={result}" return False, None # ---- LFI / Path Traversal ---- if vuln_type in ("lfi", "path_traversal"): for marker in FILE_CONTENT_MARKERS: if marker.lower() in body_lower: return True, f"File content detected: {marker}" return False, None # ---- Command Injection / RCE ---- if vuln_type in ("rce", "command_injection"): for pat in self._compiled_cmd_markers: m = pat.search(test_body) if m: return True, f"Command output detected: {m.group()}" return False, None # ---- SSRF ---- if vuln_type in ("ssrf", "ssrf_cloud"): ssrf_markers = ["ami-id", "instance-type", "iam/info", "meta-data", "computeMetadata", "root:x:0:0"] for marker in ssrf_markers: if marker.lower() in body_lower: return True, f"Internal resource content: {marker}" return False, None # ---- Open Redirect ---- if vuln_type == "open_redirect": if test_status in (301, 302, 303, 307, 308): location = test_headers.get("Location", test_headers.get("location", "")) if "evil.com" in location or location.startswith("//"): return True, f"Redirect to external: {location}" return False, None # ---- XXE ---- if vuln_type == "xxe": for marker in FILE_CONTENT_MARKERS: if marker.lower() in body_lower: return True, f"XXE file read: {marker}" return False, None # ---- NoSQL Injection ---- if vuln_type == "nosql_injection": for pat in self._compiled_nosql_errors: m = pat.search(body_lower) if m: return True, f"NoSQL error induced: {m.group()}" # Boolean-based blind NoSQL: require response DIFFERS from baseline if "$gt" in payload or "$ne" in payload or "$regex" in payload: if baseline_body and test_status == 200: len_diff = abs(len(test_body) - len(baseline_body)) len_pct = (len_diff / max(len(baseline_body), 1)) * 100 status_diff = test_status != baseline_status if len_pct > 20 or status_diff: return True, f"NoSQL blind: Response differs from baseline (delta {len_diff} chars, {len_pct:.0f}%)" return False, None # ---- LDAP Injection ---- if vuln_type == "ldap_injection": for pat in self._compiled_ldap_errors: m = pat.search(test_body) if m: return True, f"LDAP error induced: {m.group()}" return False, None # ---- XPath Injection ---- if vuln_type == "xpath_injection": for pat in self._compiled_xpath_errors: m = pat.search(test_body) if m: return True, f"XPath error induced: {m.group()}" return False, None # ---- CRLF Injection ---- if vuln_type == "crlf_injection": # Check if injected header appears in response headers injected_headers = ["X-Injected", "Set-Cookie", "X-CRLF-Test"] for hdr in injected_headers: if hdr.lower() in payload.lower(): header_val = test_headers.get(hdr, test_headers.get(hdr.lower(), "")) if header_val and ("injected" in header_val.lower() or "crlf" in header_val.lower()): return True, f"CRLF: Injected header appeared: {hdr}: {header_val[:100]}" # Check for header splitting in body if "\r\n" in payload and test_status in (200, 302): if "x-injected" in body_lower or "set-cookie" in body_lower: return True, "CRLF: Injected headers visible in response body" return False, None # ---- Header Injection ---- if vuln_type == "header_injection": # Similar to CRLF but broader if "\r\n" in payload or "%0d%0a" in payload.lower(): for hdr_name in ["X-Injected", "X-Custom"]: if test_headers.get(hdr_name) or test_headers.get(hdr_name.lower()): return True, f"Header injection: {hdr_name} injected via payload" return False, None # ---- Expression Language Injection ---- if vuln_type == "expression_language_injection": for pat in self._compiled_el_errors: m = pat.search(test_body) if m: return True, f"EL error induced: {m.group()}" # Check for EL evaluation (similar to SSTI) for expr, result in SSTI_EVALUATIONS.items(): if expr in payload and result in test_body and expr not in test_body: return True, f"EL expression evaluated: {expr}={result}" return False, None # ---- Log Injection ---- if vuln_type == "log_injection": # Check for injected log line content reflected back log_markers = ["INJECTED_LOG_ENTRY", "FAKE_ADMIN_LOGIN", "log-injection-test"] for marker in log_markers: if marker in payload and marker in test_body: return True, f"Log injection: Marker '{marker}' reflected in response" return False, None # ---- HTML Injection ---- if vuln_type == "html_injection": payload_lower = payload.lower() # Check for unescaped HTML tags reflected html_tags = ["