mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-12 14:02:45 +00:00
781 lines
32 KiB
Python
781 lines
32 KiB
Python
"""
|
|
NeuroSploit v3 - XBOW-Inspired Response Verification Framework
|
|
|
|
Multi-signal verification system that confirms vulnerabilities
|
|
through 4 independent signals, reducing false positives dramatically.
|
|
|
|
Inspired by XBOW benchmark methodology:
|
|
- Binary verification (flag-based in CTF, evidence-based here)
|
|
- Health checks before testing
|
|
- Baseline diffing for behavioral anomaly detection
|
|
- Multi-signal confirmation (2+ signals = confirmed without AI)
|
|
"""
|
|
|
|
import re
|
|
import hashlib
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error / indicator patterns used across multiple checkers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DB_ERROR_PATTERNS = [
|
|
r"(?:sql|database|query)\s*(?:error|syntax|exception)",
|
|
r"mysql_(?:fetch|query|num_rows|connect)",
|
|
r"mysqli_",
|
|
r"pg_(?:query|exec|prepare|connect)",
|
|
r"sqlite3?\.\w+error",
|
|
r"ora-\d{4,5}",
|
|
r"mssql_query",
|
|
r"sqlstate\[",
|
|
r"odbc\s+driver",
|
|
r"jdbc\s+exception",
|
|
r"unclosed\s+quotation",
|
|
r"you have an error in your sql",
|
|
r"syntax error.*at line \d+",
|
|
]
|
|
|
|
TEMPLATE_ERROR_PATTERNS = [
|
|
r"jinja2\.exceptions\.\w+",
|
|
r"mako\.exceptions\.\w+",
|
|
r"twig.*error",
|
|
r"freemarker.*error",
|
|
r"smarty.*error",
|
|
r"django\.template\.\w+",
|
|
r"template syntax error",
|
|
]
|
|
|
|
FILE_CONTENT_MARKERS = [
|
|
"root:x:0:0:",
|
|
"daemon:x:1:1:",
|
|
"bin:x:2:2:",
|
|
"www-data:",
|
|
"[boot loader]",
|
|
"[operating systems]",
|
|
"[extensions]",
|
|
]
|
|
|
|
SSTI_EVALUATIONS = {
|
|
"7*7": "49",
|
|
"7*'7'": "7777777",
|
|
"3*3": "9",
|
|
}
|
|
|
|
COMMAND_OUTPUT_MARKERS = [
|
|
r"uid=\d+\(",
|
|
r"gid=\d+\(",
|
|
r"root:\w+:0:0:",
|
|
r"/bin/(?:ba)?sh",
|
|
r"Linux\s+\S+\s+\d+\.\d+",
|
|
r"total\s+\d+\s*\n",
|
|
]
|
|
|
|
NOSQL_ERROR_PATTERNS = [
|
|
r"MongoError",
|
|
r"mongo.*(?:syntax|parse|query).*error",
|
|
r"\$(?:gt|lt|ne|in|nin|regex|where|exists)\b",
|
|
r"CastError.*ObjectId",
|
|
r"BSONTypeError",
|
|
r"operator.*\$(?:gt|lt|ne|regex)",
|
|
]
|
|
|
|
LDAP_ERROR_PATTERNS = [
|
|
r"javax\.naming\.(?:directory\.)?InvalidSearchFilterException",
|
|
r"Bad search filter",
|
|
r"ldap_search.*error",
|
|
r"invalid.*(?:dn|distinguished name|ldap filter)",
|
|
r"unbalanced.*parenthes[ei]s",
|
|
r"NamingException",
|
|
]
|
|
|
|
XPATH_ERROR_PATTERNS = [
|
|
r"XPathException",
|
|
r"Invalid XPath",
|
|
r"xmlXPathEval.*error",
|
|
r"DOMXPath.*(?:evaluate|query).*error",
|
|
r"SimpleXMLElement.*xpath",
|
|
r"unterminated.*(?:string|expression).*xpath",
|
|
r"XPATH syntax error",
|
|
]
|
|
|
|
GRAPHQL_ERROR_PATTERNS = [
|
|
r'"errors"\s*:\s*\[',
|
|
r"Syntax Error.*GraphQL",
|
|
r"Cannot query field",
|
|
r"Unknown argument",
|
|
r"Expected Name",
|
|
r"graphql.*parse.*error",
|
|
]
|
|
|
|
DESERIALIZATION_ERROR_PATTERNS = [
|
|
r"java\.io\.(?:InvalidClass|StreamCorrupted)Exception",
|
|
r"ClassNotFoundException",
|
|
r"unserialize\(\).*error",
|
|
r"pickle\.UnpicklingError",
|
|
r"yaml\.(?:scanner|parser)\.ScannerError",
|
|
r"__wakeup\(\).*failed",
|
|
r"ObjectInputStream",
|
|
r"readObject\(\).*exception",
|
|
]
|
|
|
|
EL_INJECTION_PATTERNS = [
|
|
r"javax\.el\.ELException",
|
|
r"org\.springframework\.expression\.spel",
|
|
r"EL Expression.*error",
|
|
r"OGNL.*exception",
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Health checking
|
|
# ---------------------------------------------------------------------------
|
|
|
|
UNHEALTHY_PATTERNS = [
|
|
"502 bad gateway",
|
|
"503 service unavailable",
|
|
"service unavailable",
|
|
"maintenance mode",
|
|
"under maintenance",
|
|
"temporarily unavailable",
|
|
"server is starting",
|
|
"connection refused",
|
|
]
|
|
|
|
|
|
class ResponseVerifier:
|
|
"""
|
|
Multi-signal verification framework for vulnerability confirmation.
|
|
|
|
4 independent signals are checked:
|
|
1. VulnEngine tester pattern match (structured analyze_response)
|
|
2. Baseline diff (status / length / hash change)
|
|
3. Payload effect (reflection, evaluation, file content)
|
|
4. New error patterns (present in test but absent in baseline)
|
|
|
|
Confidence rules:
|
|
- 2+ signals → confirmed (skip AI)
|
|
- 1 signal + confidence >= 0.8 → confirmed
|
|
- 1 signal + confidence < 0.8 → needs AI confirmation
|
|
- 0 signals → rejected
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._compiled_db_errors = [re.compile(p, re.IGNORECASE) for p in DB_ERROR_PATTERNS]
|
|
self._compiled_template_errors = [re.compile(p, re.IGNORECASE) for p in TEMPLATE_ERROR_PATTERNS]
|
|
self._compiled_cmd_markers = [re.compile(p, re.IGNORECASE) for p in COMMAND_OUTPUT_MARKERS]
|
|
self._compiled_nosql_errors = [re.compile(p, re.IGNORECASE) for p in NOSQL_ERROR_PATTERNS]
|
|
self._compiled_ldap_errors = [re.compile(p, re.IGNORECASE) for p in LDAP_ERROR_PATTERNS]
|
|
self._compiled_xpath_errors = [re.compile(p, re.IGNORECASE) for p in XPATH_ERROR_PATTERNS]
|
|
self._compiled_graphql_errors = [re.compile(p, re.IGNORECASE) for p in GRAPHQL_ERROR_PATTERNS]
|
|
self._compiled_deser_errors = [re.compile(p, re.IGNORECASE) for p in DESERIALIZATION_ERROR_PATTERNS]
|
|
self._compiled_el_errors = [re.compile(p, re.IGNORECASE) for p in EL_INJECTION_PATTERNS]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Target health check
|
|
# ------------------------------------------------------------------
|
|
|
|
async def check_target_health(self, session, url: str) -> Tuple[bool, dict]:
|
|
"""
|
|
Verify the target is alive and functional before testing.
|
|
|
|
Returns:
|
|
(is_healthy, info_dict)
|
|
"""
|
|
try:
|
|
async with session.get(url, timeout=15, allow_redirects=True) as resp:
|
|
body = await resp.text()
|
|
status = resp.status
|
|
headers = dict(resp.headers)
|
|
|
|
info = {
|
|
"status": status,
|
|
"content_length": len(body),
|
|
"content_type": headers.get("Content-Type", ""),
|
|
"server": headers.get("Server", ""),
|
|
}
|
|
|
|
# Reject server errors
|
|
if status >= 500:
|
|
info["reason"] = f"Server error (HTTP {status})"
|
|
return False, info
|
|
|
|
# Reject empty/minimal pages
|
|
if len(body) < 50:
|
|
info["reason"] = "Response too short (< 50 chars)"
|
|
return False, info
|
|
|
|
# Check for unhealthy content
|
|
body_lower = body.lower()
|
|
for pattern in UNHEALTHY_PATTERNS:
|
|
if pattern in body_lower:
|
|
info["reason"] = f"Unhealthy response: '{pattern}'"
|
|
return False, info
|
|
|
|
info["healthy"] = True
|
|
return True, info
|
|
|
|
except Exception as e:
|
|
return False, {"reason": f"Connection error: {str(e)[:200]}"}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Baseline diffing
|
|
# ------------------------------------------------------------------
|
|
|
|
def compute_response_diff(self, baseline: dict, test_response: dict) -> dict:
|
|
"""
|
|
Compare test response against cached baseline.
|
|
|
|
Returns dict with diff metrics.
|
|
"""
|
|
baseline_body = baseline.get("body", "")
|
|
test_body = test_response.get("body", "")
|
|
|
|
baseline_len = len(baseline_body) if isinstance(baseline_body, str) else baseline.get("body_length", 0)
|
|
test_len = len(test_body)
|
|
|
|
length_diff = abs(test_len - baseline_len)
|
|
length_pct = (length_diff / max(baseline_len, 1)) * 100
|
|
|
|
baseline_hash = baseline.get("body_hash") or hashlib.md5(
|
|
baseline_body.encode("utf-8", errors="replace")
|
|
).hexdigest()
|
|
test_hash = hashlib.md5(
|
|
test_body.encode("utf-8", errors="replace")
|
|
).hexdigest()
|
|
|
|
# Detect new error patterns in test but not baseline
|
|
baseline_lower = (baseline_body if isinstance(baseline_body, str) else "").lower()
|
|
test_lower = test_body.lower()
|
|
|
|
new_errors = []
|
|
for pat in self._compiled_db_errors:
|
|
if pat.search(test_lower) and not pat.search(baseline_lower):
|
|
new_errors.append(pat.pattern)
|
|
for pat in self._compiled_template_errors:
|
|
if pat.search(test_lower) and not pat.search(baseline_lower):
|
|
new_errors.append(pat.pattern)
|
|
|
|
return {
|
|
"status_changed": baseline.get("status", 0) != test_response.get("status", 0),
|
|
"baseline_status": baseline.get("status", 0),
|
|
"test_status": test_response.get("status", 0),
|
|
"length_diff": length_diff,
|
|
"length_diff_pct": round(length_pct, 1),
|
|
"body_hash_changed": baseline_hash != test_hash,
|
|
"new_error_patterns": new_errors,
|
|
}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Payload effect verification
|
|
# ------------------------------------------------------------------
|
|
|
|
def _check_payload_effect(self, vuln_type: str, payload: str,
|
|
test_body: str, test_status: int,
|
|
test_headers: dict,
|
|
baseline_body: str = "",
|
|
baseline_status: int = 0) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Check if the payload produced a detectable effect in the response.
|
|
|
|
This is signal #3 in multi-signal verification.
|
|
Weak checks (NoSQL blind, parameter pollution, type juggling,
|
|
HTML injection, JWT, blind XSS, mutation XSS) require baseline
|
|
comparison to eliminate false positives.
|
|
"""
|
|
body_lower = test_body.lower()
|
|
baseline_lower = baseline_body.lower() if baseline_body else ""
|
|
|
|
# ---- XSS ----
|
|
if vuln_type in ("xss", "xss_reflected", "xss_stored", "xss_dom"):
|
|
payload_lower = payload.lower()
|
|
# Unescaped reflection — use context-aware analysis
|
|
if payload in test_body or payload_lower in body_lower:
|
|
from backend.core.xss_context_analyzer import analyze_xss_execution_context
|
|
ctx = analyze_xss_execution_context(test_body, payload)
|
|
if ctx["executable"]:
|
|
return True, f"XSS payload in auto-executing context: {ctx['detail']}"
|
|
if ctx["interactive"]:
|
|
return True, f"XSS payload in interactive context: {ctx['detail']}"
|
|
return False, None
|
|
|
|
# ---- SQLi ----
|
|
if vuln_type in ("sqli", "sqli_error", "sqli_union", "sqli_blind", "sqli_time"):
|
|
for pat in self._compiled_db_errors:
|
|
m = pat.search(body_lower)
|
|
if m:
|
|
return True, f"SQL error induced by payload: {m.group()}"
|
|
return False, None
|
|
|
|
# ---- SSTI ----
|
|
if vuln_type == "ssti":
|
|
for expr, result in SSTI_EVALUATIONS.items():
|
|
if expr in payload and result in test_body:
|
|
# Confirm the raw expression is NOT present (evaluated)
|
|
if expr not in test_body:
|
|
return True, f"Template expression evaluated: {expr}={result}"
|
|
return False, None
|
|
|
|
# ---- LFI / Path Traversal ----
|
|
if vuln_type in ("lfi", "path_traversal"):
|
|
for marker in FILE_CONTENT_MARKERS:
|
|
if marker.lower() in body_lower:
|
|
return True, f"File content detected: {marker}"
|
|
return False, None
|
|
|
|
# ---- Command Injection / RCE ----
|
|
if vuln_type in ("rce", "command_injection"):
|
|
for pat in self._compiled_cmd_markers:
|
|
m = pat.search(test_body)
|
|
if m:
|
|
return True, f"Command output detected: {m.group()}"
|
|
return False, None
|
|
|
|
# ---- SSRF ----
|
|
if vuln_type in ("ssrf", "ssrf_cloud"):
|
|
ssrf_markers = ["ami-id", "instance-type", "iam/info", "meta-data",
|
|
"computeMetadata", "root:x:0:0"]
|
|
for marker in ssrf_markers:
|
|
if marker.lower() in body_lower:
|
|
return True, f"Internal resource content: {marker}"
|
|
return False, None
|
|
|
|
# ---- Open Redirect ----
|
|
if vuln_type == "open_redirect":
|
|
if test_status in (301, 302, 303, 307, 308):
|
|
location = test_headers.get("Location", test_headers.get("location", ""))
|
|
if "evil.com" in location or location.startswith("//"):
|
|
return True, f"Redirect to external: {location}"
|
|
return False, None
|
|
|
|
# ---- XXE ----
|
|
if vuln_type == "xxe":
|
|
for marker in FILE_CONTENT_MARKERS:
|
|
if marker.lower() in body_lower:
|
|
return True, f"XXE file read: {marker}"
|
|
return False, None
|
|
|
|
# ---- NoSQL Injection ----
|
|
if vuln_type == "nosql_injection":
|
|
for pat in self._compiled_nosql_errors:
|
|
m = pat.search(body_lower)
|
|
if m:
|
|
return True, f"NoSQL error induced: {m.group()}"
|
|
# Boolean-based blind NoSQL: require response DIFFERS from baseline
|
|
if "$gt" in payload or "$ne" in payload or "$regex" in payload:
|
|
if baseline_body and test_status == 200:
|
|
len_diff = abs(len(test_body) - len(baseline_body))
|
|
len_pct = (len_diff / max(len(baseline_body), 1)) * 100
|
|
status_diff = test_status != baseline_status
|
|
if len_pct > 20 or status_diff:
|
|
return True, f"NoSQL blind: Response differs from baseline (delta {len_diff} chars, {len_pct:.0f}%)"
|
|
return False, None
|
|
|
|
# ---- LDAP Injection ----
|
|
if vuln_type == "ldap_injection":
|
|
for pat in self._compiled_ldap_errors:
|
|
m = pat.search(test_body)
|
|
if m:
|
|
return True, f"LDAP error induced: {m.group()}"
|
|
return False, None
|
|
|
|
# ---- XPath Injection ----
|
|
if vuln_type == "xpath_injection":
|
|
for pat in self._compiled_xpath_errors:
|
|
m = pat.search(test_body)
|
|
if m:
|
|
return True, f"XPath error induced: {m.group()}"
|
|
return False, None
|
|
|
|
# ---- CRLF Injection ----
|
|
if vuln_type == "crlf_injection":
|
|
# Check if injected header appears in response headers
|
|
injected_headers = ["X-Injected", "Set-Cookie", "X-CRLF-Test"]
|
|
for hdr in injected_headers:
|
|
if hdr.lower() in payload.lower():
|
|
header_val = test_headers.get(hdr, test_headers.get(hdr.lower(), ""))
|
|
if header_val and ("injected" in header_val.lower() or "crlf" in header_val.lower()):
|
|
return True, f"CRLF: Injected header appeared: {hdr}: {header_val[:100]}"
|
|
# Check for header splitting in body
|
|
if "\r\n" in payload and test_status in (200, 302):
|
|
if "x-injected" in body_lower or "set-cookie" in body_lower:
|
|
return True, "CRLF: Injected headers visible in response body"
|
|
return False, None
|
|
|
|
# ---- Header Injection ----
|
|
if vuln_type == "header_injection":
|
|
# Similar to CRLF but broader
|
|
if "\r\n" in payload or "%0d%0a" in payload.lower():
|
|
for hdr_name in ["X-Injected", "X-Custom"]:
|
|
if test_headers.get(hdr_name) or test_headers.get(hdr_name.lower()):
|
|
return True, f"Header injection: {hdr_name} injected via payload"
|
|
return False, None
|
|
|
|
# ---- Expression Language Injection ----
|
|
if vuln_type == "expression_language_injection":
|
|
for pat in self._compiled_el_errors:
|
|
m = pat.search(test_body)
|
|
if m:
|
|
return True, f"EL error induced: {m.group()}"
|
|
# Check for EL evaluation (similar to SSTI)
|
|
for expr, result in SSTI_EVALUATIONS.items():
|
|
if expr in payload and result in test_body and expr not in test_body:
|
|
return True, f"EL expression evaluated: {expr}={result}"
|
|
return False, None
|
|
|
|
# ---- Log Injection ----
|
|
if vuln_type == "log_injection":
|
|
# Check for injected log line content reflected back
|
|
log_markers = ["INJECTED_LOG_ENTRY", "FAKE_ADMIN_LOGIN", "log-injection-test"]
|
|
for marker in log_markers:
|
|
if marker in payload and marker in test_body:
|
|
return True, f"Log injection: Marker '{marker}' reflected in response"
|
|
return False, None
|
|
|
|
# ---- HTML Injection ----
|
|
if vuln_type == "html_injection":
|
|
payload_lower = payload.lower()
|
|
# Check for unescaped HTML tags reflected
|
|
html_tags = ["<h1", "<div", "<marquee", "<b>", "<u>", "<font", "<form"]
|
|
for tag in html_tags:
|
|
if tag in payload_lower and tag in body_lower:
|
|
# Verify not HTML-encoded
|
|
escaped = tag.replace("<", "<")
|
|
if escaped not in body_lower:
|
|
# Require tag is NOT already present in baseline (pre-existing)
|
|
if baseline_lower and tag in baseline_lower:
|
|
continue # Tag exists in baseline — not injected
|
|
return True, f"HTML injection: Tag {tag} reflected unescaped (not in baseline)"
|
|
return False, None
|
|
|
|
# ---- CSV Injection ----
|
|
if vuln_type == "csv_injection":
|
|
csv_prefixes = ["=CMD", "=HYPERLINK", "+CMD", "-CMD", "@SUM"]
|
|
content_type = test_headers.get("Content-Type", test_headers.get("content-type", ""))
|
|
if "csv" in content_type.lower() or "spreadsheet" in content_type.lower():
|
|
for prefix in csv_prefixes:
|
|
if prefix in payload and prefix in test_body:
|
|
return True, f"CSV injection: Formula '{prefix}' in CSV output"
|
|
return False, None
|
|
|
|
# ---- GraphQL Injection ----
|
|
if vuln_type == "graphql_injection":
|
|
for pat in self._compiled_graphql_errors:
|
|
m = pat.search(test_body)
|
|
if m:
|
|
return True, f"GraphQL error: {m.group()}"
|
|
return False, None
|
|
|
|
# ---- ORM Injection ----
|
|
if vuln_type == "orm_injection":
|
|
orm_errors = [
|
|
r"hibernate.*exception", r"sequelize.*error", r"typeorm.*error",
|
|
r"ActiveRecord.*(?:Statement)?Invalid", r"django\.db.*error",
|
|
r"prisma.*error", r"sqlalchemy.*error",
|
|
]
|
|
for pat_str in orm_errors:
|
|
if re.search(pat_str, test_body, re.IGNORECASE):
|
|
return True, f"ORM error induced: {pat_str}"
|
|
return False, None
|
|
|
|
# ---- Blind XSS ----
|
|
if vuln_type == "blind_xss":
|
|
# Blind XSS payloads typically use external callbacks
|
|
# We can only detect if the payload was stored (reflected later)
|
|
if payload.lower() in body_lower:
|
|
if "src=" in payload.lower() or "onerror=" in payload.lower():
|
|
# Require payload NOT already in baseline
|
|
if baseline_lower and payload.lower() in baseline_lower:
|
|
return False, None
|
|
return True, "Blind XSS payload stored in response"
|
|
return False, None
|
|
|
|
# ---- Mutation XSS ----
|
|
if vuln_type == "mutation_xss":
|
|
# mXSS exploits browser HTML parsing mutations
|
|
mxss_markers = ["<svg", "<math", "<xmp", "<noembed", "<listing"]
|
|
for marker in mxss_markers:
|
|
if marker in payload.lower() and marker in body_lower:
|
|
# Require element NOT already in baseline
|
|
if baseline_lower and marker in baseline_lower:
|
|
continue
|
|
return True, f"Mutation XSS: Mutatable element {marker} reflected (not in baseline)"
|
|
return False, None
|
|
|
|
# ---- RFI ----
|
|
if vuln_type == "rfi":
|
|
rfi_indicators = ["<?php", "<%", "#!/", "import os"]
|
|
for indicator in rfi_indicators:
|
|
if indicator.lower() in body_lower:
|
|
return True, f"RFI: Remote file content marker: {indicator}"
|
|
return False, None
|
|
|
|
# ---- File Upload ----
|
|
if vuln_type == "file_upload":
|
|
upload_success = [
|
|
r"(?:file|upload).*(?:success|saved|stored|created)",
|
|
r"(?:uploaded|saved) to.*(?:\/|\\)",
|
|
]
|
|
for pat_str in upload_success:
|
|
if re.search(pat_str, body_lower):
|
|
return True, f"File upload succeeded: {pat_str}"
|
|
return False, None
|
|
|
|
# ---- Arbitrary File Read ----
|
|
if vuln_type == "arbitrary_file_read":
|
|
for marker in FILE_CONTENT_MARKERS:
|
|
if marker.lower() in body_lower:
|
|
return True, f"Arbitrary file read: {marker}"
|
|
return False, None
|
|
|
|
# ---- Arbitrary File Delete ----
|
|
if vuln_type == "arbitrary_file_delete":
|
|
delete_indicators = [
|
|
r"(?:file|resource).*(?:deleted|removed|not found after)",
|
|
r"successfully.*(?:deleted|removed)",
|
|
]
|
|
for pat_str in delete_indicators:
|
|
if re.search(pat_str, body_lower):
|
|
return True, f"File delete confirmed: {pat_str}"
|
|
return False, None
|
|
|
|
# ---- Zip Slip ----
|
|
if vuln_type == "zip_slip":
|
|
zip_indicators = [
|
|
r"extracted to.*/\.\./",
|
|
r"path traversal.*(?:zip|archive)",
|
|
]
|
|
for pat_str in zip_indicators:
|
|
if re.search(pat_str, body_lower):
|
|
return True, f"Zip slip: {pat_str}"
|
|
for marker in FILE_CONTENT_MARKERS:
|
|
if marker.lower() in body_lower:
|
|
return True, f"Zip slip - file overwrite evidence: {marker}"
|
|
return False, None
|
|
|
|
# ---- JWT Manipulation ----
|
|
if vuln_type == "jwt_manipulation":
|
|
# Tampered JWT accepted — require auth markers NOT in baseline
|
|
if test_status == 200 and ("alg" in payload.lower() or "none" in payload.lower()):
|
|
jwt_auth_markers = ["authorized", "welcome", "admin"]
|
|
for marker in jwt_auth_markers:
|
|
if marker in body_lower:
|
|
# If baseline also has this marker, it's normal behavior
|
|
if baseline_lower and marker in baseline_lower:
|
|
continue
|
|
return True, f"JWT manipulation: Tampered token granted access ({marker} not in baseline)"
|
|
return False, None
|
|
|
|
# ---- Prototype Pollution ----
|
|
if vuln_type == "prototype_pollution":
|
|
if "__proto__" in payload or "constructor" in payload:
|
|
pollution_markers = ["polluted", "__proto__", "isAdmin", "true"]
|
|
match_count = sum(1 for m in pollution_markers if m.lower() in body_lower)
|
|
if match_count >= 2:
|
|
return True, "Prototype pollution: Injected properties reflected"
|
|
return False, None
|
|
|
|
# ---- Host Header Injection ----
|
|
if vuln_type == "host_header_injection":
|
|
# Check if injected host is reflected in response
|
|
evil_hosts = ["evil.com", "attacker.com", "injected.host"]
|
|
for host in evil_hosts:
|
|
if host in payload and host in body_lower:
|
|
return True, f"Host header injection: {host} reflected in response"
|
|
# Password reset poisoning
|
|
if "evil.com" in payload:
|
|
if "reset" in body_lower or "password" in body_lower:
|
|
if "evil.com" in body_lower:
|
|
return True, "Host header injection: Evil host in password reset link"
|
|
return False, None
|
|
|
|
# ---- HTTP Smuggling ----
|
|
if vuln_type == "http_smuggling":
|
|
smuggling_indicators = [
|
|
test_status == 400 and "transfer-encoding" in payload.lower(),
|
|
"unrecognized transfer-coding" in body_lower,
|
|
"request smuggling" in body_lower,
|
|
]
|
|
if any(smuggling_indicators):
|
|
return True, "HTTP smuggling: Desync indicators detected"
|
|
return False, None
|
|
|
|
# ---- Cache Poisoning ----
|
|
if vuln_type == "cache_poisoning":
|
|
# Check if injected value appears in cached response
|
|
cache_headers = ["X-Cache", "CF-Cache-Status", "Age", "X-Cache-Hit"]
|
|
is_cached = any(
|
|
test_headers.get(h, test_headers.get(h.lower(), ""))
|
|
for h in cache_headers
|
|
)
|
|
if is_cached and payload.lower() in body_lower:
|
|
return True, "Cache poisoning: Payload reflected in cached response"
|
|
return False, None
|
|
|
|
# ---- Insecure Deserialization ----
|
|
if vuln_type == "insecure_deserialization":
|
|
for pat in self._compiled_deser_errors:
|
|
m = pat.search(test_body)
|
|
if m:
|
|
return True, f"Deserialization error: {m.group()}"
|
|
# Check for command execution via deser
|
|
for pat in self._compiled_cmd_markers:
|
|
m = pat.search(test_body)
|
|
if m:
|
|
return True, f"Deserialization RCE: {m.group()}"
|
|
return False, None
|
|
|
|
# ---- Parameter Pollution ----
|
|
if vuln_type == "parameter_pollution":
|
|
# HPP only confirmed if response DIFFERS significantly from baseline
|
|
if "&" in payload and baseline_body:
|
|
len_diff = abs(len(test_body) - len(baseline_body))
|
|
len_pct = (len_diff / max(len(baseline_body), 1)) * 100
|
|
status_diff = test_status != baseline_status
|
|
if len_pct > 20 or status_diff:
|
|
return True, f"Parameter pollution: Response differs from baseline (delta {len_diff} chars, {len_pct:.0f}%)"
|
|
return False, None
|
|
|
|
# ---- Type Juggling ----
|
|
if vuln_type == "type_juggling":
|
|
if test_status == 200:
|
|
if "0" in payload or "true" in payload.lower() or "[]" in payload:
|
|
auth_markers = ["authenticated", "authorized", "welcome", "admin", "success"]
|
|
for marker in auth_markers:
|
|
if marker in body_lower:
|
|
# Require marker NOT in baseline — otherwise it's normal behavior
|
|
if baseline_lower and marker in baseline_lower:
|
|
continue
|
|
return True, f"Type juggling: Auth bypass ({marker} appears only with juggled type)"
|
|
return False, None
|
|
|
|
# ---- SOAP Injection ----
|
|
if vuln_type == "soap_injection":
|
|
soap_errors = [
|
|
r"soap.*(?:fault|error|exception)",
|
|
r"xml.*(?:parse|syntax).*error",
|
|
r"<faultcode>",
|
|
r"<faultstring>",
|
|
]
|
|
for pat_str in soap_errors:
|
|
if re.search(pat_str, body_lower):
|
|
return True, f"SOAP injection: {pat_str}"
|
|
return False, None
|
|
|
|
# ---- Subdomain Takeover ----
|
|
if vuln_type == "subdomain_takeover":
|
|
takeover_markers = [
|
|
"there isn't a github pages site here",
|
|
"herokucdn.com/error-pages",
|
|
"the request could not be satisfied",
|
|
"no such app",
|
|
"project not found",
|
|
"this page is parked free",
|
|
"does not exist in the app platform",
|
|
"NoSuchBucket",
|
|
]
|
|
for marker in takeover_markers:
|
|
if marker.lower() in body_lower:
|
|
return True, f"Subdomain takeover: {marker}"
|
|
return False, None
|
|
|
|
return False, None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Multi-signal verification (core method)
|
|
# ------------------------------------------------------------------
|
|
|
|
def multi_signal_verify(
|
|
self,
|
|
vuln_type: str,
|
|
payload: str,
|
|
test_response: dict,
|
|
baseline: Optional[dict],
|
|
tester_result: Tuple[bool, float, Optional[str]],
|
|
) -> Tuple[bool, str, int]:
|
|
"""
|
|
Combine 4 signals to determine if a vulnerability is confirmed.
|
|
|
|
Args:
|
|
vuln_type: Vulnerability type (registry key or legacy name)
|
|
payload: The payload used
|
|
test_response: The HTTP response from the payload test
|
|
baseline: Cached baseline response (can be None)
|
|
tester_result: (is_vuln, confidence, evidence) from VulnEngine tester
|
|
|
|
Returns:
|
|
(is_confirmed, evidence_summary, signal_count)
|
|
"""
|
|
signals: List[str] = []
|
|
evidence_parts: List[str] = []
|
|
max_confidence = 0.0
|
|
|
|
test_body = test_response.get("body", "")
|
|
test_status = test_response.get("status", 0)
|
|
test_headers = test_response.get("headers", {})
|
|
|
|
# --- Signal 1: VulnEngine tester pattern match ---
|
|
tester_vuln, tester_conf, tester_evidence = tester_result
|
|
if tester_vuln and tester_conf >= 0.7:
|
|
signals.append("tester_match")
|
|
evidence_parts.append(tester_evidence or "Pattern match")
|
|
max_confidence = max(max_confidence, tester_conf)
|
|
|
|
# --- Signal 2: Baseline diff ---
|
|
if baseline:
|
|
diff = self.compute_response_diff(baseline, test_response)
|
|
|
|
# Type-specific diff thresholds
|
|
significant_diff = False
|
|
if vuln_type in ("sqli", "sqli_error", "sqli_blind"):
|
|
significant_diff = diff["length_diff"] > 300 and diff["status_changed"]
|
|
elif vuln_type in ("lfi", "path_traversal", "xxe"):
|
|
significant_diff = diff["length_diff_pct"] > 50
|
|
elif vuln_type in ("ssti", "command_injection", "rce"):
|
|
significant_diff = diff["body_hash_changed"] and diff["length_diff"] > 100
|
|
else:
|
|
significant_diff = diff["status_changed"] and diff["length_diff"] > 500
|
|
|
|
if significant_diff:
|
|
signals.append("baseline_diff")
|
|
evidence_parts.append(
|
|
f"Response diff: status {diff['baseline_status']}->{diff['test_status']}, "
|
|
f"length delta {diff['length_diff']} ({diff['length_diff_pct']}%)"
|
|
)
|
|
|
|
# New error patterns is an independent sub-signal
|
|
if diff["new_error_patterns"]:
|
|
signals.append("new_errors")
|
|
evidence_parts.append(
|
|
f"New error patterns: {', '.join(diff['new_error_patterns'][:3])}"
|
|
)
|
|
|
|
# --- Signal 3: Payload effect ---
|
|
baseline_body = baseline.get("body", "") if baseline else ""
|
|
baseline_status = baseline.get("status", 0) if baseline else 0
|
|
effect_found, effect_evidence = self._check_payload_effect(
|
|
vuln_type, payload, test_body, test_status, test_headers,
|
|
baseline_body=baseline_body, baseline_status=baseline_status
|
|
)
|
|
if effect_found:
|
|
signals.append("payload_effect")
|
|
evidence_parts.append(effect_evidence)
|
|
|
|
# --- Confidence rules ---
|
|
signal_count = len(signals)
|
|
evidence_summary = " | ".join(evidence_parts) if evidence_parts else ""
|
|
|
|
if signal_count >= 2:
|
|
# 2+ signals → confirmed (skip AI)
|
|
return True, evidence_summary, signal_count
|
|
elif signal_count == 1 and max_confidence >= 0.8:
|
|
# 1 signal + high confidence → confirmed
|
|
return True, evidence_summary, signal_count
|
|
elif signal_count == 1:
|
|
# 1 signal + lower confidence → needs AI confirmation
|
|
# Return False but with evidence so caller can decide to ask AI
|
|
return False, evidence_summary, signal_count
|
|
else:
|
|
# 0 signals → rejected
|
|
return False, "", 0
|