mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-13 06:22:44 +00:00
458 lines
18 KiB
Python
458 lines
18 KiB
Python
"""
|
|
NeuroSploit v3 - Logic and Protocol Vulnerability Testers
|
|
|
|
Testers for race conditions, business logic, rate limiting, parameter pollution,
|
|
type juggling, timing attacks, host header injection, HTTP smuggling, cache poisoning.
|
|
"""
|
|
import re
|
|
from typing import Tuple, Dict, Optional
|
|
from backend.core.vuln_engine.testers.base_tester import BaseTester
|
|
|
|
|
|
class RaceConditionTester(BaseTester):
|
|
"""Tester for Race Condition vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "race_condition"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for duplicate operation success indicators"""
|
|
body_lower = response_body.lower()
|
|
|
|
# Multiple success responses from concurrent requests
|
|
if context.get("concurrent_successes", 0) > 1:
|
|
return True, 0.85, f"Race condition: {context['concurrent_successes']} concurrent requests succeeded"
|
|
|
|
# Double-spend / duplicate operation indicators
|
|
duplicate_indicators = [
|
|
"already processed", "duplicate", "already exists",
|
|
"already applied", "already redeemed",
|
|
]
|
|
# If we got a success despite expected duplicate check
|
|
if response_status in [200, 201]:
|
|
success_words = ["success", "created", "processed", "applied", "completed", "confirmed"]
|
|
if any(w in body_lower for w in success_words):
|
|
if context.get("request_count", 0) > 1:
|
|
return True, 0.7, "Race condition: operation succeeded multiple times"
|
|
|
|
# Check for resource count discrepancy
|
|
if "balance" in body_lower or "quantity" in body_lower or "count" in body_lower:
|
|
numbers = re.findall(r'"(?:balance|quantity|count|amount)"\s*:\s*(-?\d+\.?\d*)', response_body)
|
|
if numbers and context.get("expected_value") is not None:
|
|
try:
|
|
actual = float(numbers[0])
|
|
expected = float(context["expected_value"])
|
|
if actual != expected:
|
|
return True, 0.75, f"Race condition: value mismatch (expected {expected}, got {actual})"
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
return False, 0.0, None
|
|
|
|
|
|
class BusinessLogicTester(BaseTester):
|
|
"""Tester for Business Logic vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "business_logic"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for business logic bypass indicators"""
|
|
body_lower = response_body.lower()
|
|
|
|
# Negative value acceptance
|
|
if re.search(r"-\d+", payload):
|
|
if response_status == 200:
|
|
if any(w in body_lower for w in ["success", "accepted", "processed", "approved"]):
|
|
return True, 0.8, "Business logic: negative value accepted"
|
|
# Check for negative pricing
|
|
if re.search(r'"(?:total|price|amount)"\s*:\s*-\d+', response_body):
|
|
return True, 0.9, "Business logic: negative price/amount in response"
|
|
|
|
# Zero-value bypass
|
|
if payload.strip() in ["0", "0.00", "0.0"]:
|
|
if response_status == 200 and "success" in body_lower:
|
|
return True, 0.75, "Business logic: zero value accepted for transaction"
|
|
|
|
# Workflow step skip
|
|
if context.get("skipped_step"):
|
|
if response_status == 200:
|
|
return True, 0.7, f"Business logic: step '{context['skipped_step']}' was skippable"
|
|
|
|
# Discount/coupon abuse
|
|
if "coupon" in payload.lower() or "discount" in payload.lower():
|
|
if re.search(r'"discount"\s*:\s*(?:100|[1-9]\d{2,})', response_body):
|
|
return True, 0.8, "Business logic: excessive discount applied"
|
|
|
|
# Role/privilege escalation via parameter
|
|
if any(w in payload.lower() for w in ["admin", "role=admin", "is_admin=true", "privilege"]):
|
|
if response_status == 200 and "admin" in body_lower:
|
|
return True, 0.6, "Business logic: privilege escalation parameter accepted"
|
|
|
|
return False, 0.0, None
|
|
|
|
|
|
class RateLimitBypassTester(BaseTester):
|
|
"""Tester for Rate Limit Bypass vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "rate_limit_bypass"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for continued success after many requests (bypass)"""
|
|
headers_lower = {k.lower(): v for k, v in response_headers.items()}
|
|
|
|
# After many requests, still getting 200
|
|
request_count = context.get("request_count", 0)
|
|
if request_count > 50 and response_status == 200:
|
|
# Check rate limit headers
|
|
remaining = headers_lower.get("x-ratelimit-remaining",
|
|
headers_lower.get("x-rate-limit-remaining",
|
|
headers_lower.get("ratelimit-remaining")))
|
|
if remaining is not None:
|
|
try:
|
|
if int(remaining) > 0:
|
|
return True, 0.6, f"Rate limit not enforced after {request_count} requests (remaining: {remaining})"
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
# No rate limit headers at all
|
|
return True, 0.7, f"No rate limiting detected after {request_count} requests"
|
|
|
|
# Rate limit bypass via header manipulation
|
|
bypass_headers = ["x-forwarded-for", "x-real-ip", "x-originating-ip", "x-client-ip"]
|
|
if any(h in payload.lower() for h in bypass_headers):
|
|
if response_status == 200 and context.get("was_rate_limited"):
|
|
return True, 0.85, "Rate limit bypassed via IP spoofing header"
|
|
|
|
# Check if 429 was expected but got 200
|
|
if context.get("expected_429") and response_status == 200:
|
|
return True, 0.8, "Expected 429 (rate limited) but received 200"
|
|
|
|
return False, 0.0, None
|
|
|
|
|
|
class ParameterPollutionTester(BaseTester):
|
|
"""Tester for HTTP Parameter Pollution vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "parameter_pollution"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for different behavior with duplicate parameters"""
|
|
# Compare response with baseline (single param)
|
|
if "baseline_body" in context and "baseline_status" in context:
|
|
baseline_len = len(context["baseline_body"])
|
|
current_len = len(response_body)
|
|
diff = abs(current_len - baseline_len)
|
|
|
|
# Significant response difference
|
|
if diff > 200:
|
|
return True, 0.7, f"Parameter pollution: response differs by {diff} bytes from baseline"
|
|
|
|
# Status code difference
|
|
if response_status != context["baseline_status"]:
|
|
return True, 0.75, f"Parameter pollution: status changed from {context['baseline_status']} to {response_status}"
|
|
|
|
# Check if attacker-controlled value was used
|
|
if "neurosploit" in payload and "neurosploit" in response_body:
|
|
if context.get("original_value") and context["original_value"] not in response_body:
|
|
return True, 0.8, "Parameter pollution: attacker value used instead of original"
|
|
|
|
# WAF bypass via duplicate params
|
|
if context.get("waf_blocked_original") and response_status == 200:
|
|
return True, 0.8, "Parameter pollution: WAF bypass - blocked payload succeeded with duplicate params"
|
|
|
|
return False, 0.0, None
|
|
|
|
|
|
class TypeJugglingTester(BaseTester):
|
|
"""Tester for Type Juggling / Type Coercion vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "type_juggling"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for auth bypass with type coercion"""
|
|
body_lower = response_body.lower()
|
|
|
|
# Type juggling payloads
|
|
juggling_values = ["0", "true", "false", "null", "[]", "{}", "0e123", "0e999"]
|
|
|
|
if payload.strip() in juggling_values or payload.strip().startswith("0e"):
|
|
# Auth bypass
|
|
if response_status == 200:
|
|
auth_success = [
|
|
"authenticated", "logged in", "welcome", "dashboard",
|
|
"token", "session", "success",
|
|
]
|
|
for indicator in auth_success:
|
|
if indicator in body_lower:
|
|
return True, 0.8, f"Type juggling: auth bypass with '{payload.strip()}' - '{indicator}' in response"
|
|
|
|
# JWT/token accepted
|
|
if "jwt" in body_lower or "bearer" in body_lower:
|
|
if response_status == 200:
|
|
return True, 0.7, f"Type juggling: token accepted with value '{payload.strip()}'"
|
|
|
|
# Magic hash comparison bypass (0e strings)
|
|
if re.match(r"0e\d+", payload.strip()):
|
|
if response_status == 200 and any(w in body_lower for w in ["match", "equal", "valid", "correct"]):
|
|
return True, 0.85, f"Type juggling: magic hash bypass with '{payload.strip()}'"
|
|
|
|
# Array vs string comparison
|
|
if payload.strip() in ["[]", "Array"]:
|
|
if response_status == 200 and "success" in body_lower:
|
|
return True, 0.7, "Type juggling: array comparison bypass"
|
|
|
|
return False, 0.0, None
|
|
|
|
|
|
class TimingAttackTester(BaseTester):
|
|
"""Tester for Timing Attack vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "timing_attack"
|
|
|
|
def check_timeout_vulnerability(self, vuln_type: str) -> bool:
|
|
"""Timing attacks are detected via response time differences"""
|
|
return True
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check response time differences for timing side channels"""
|
|
# Primary detection is via response timing (handled by engine)
|
|
response_time = context.get("response_time_ms", 0)
|
|
baseline_time = context.get("baseline_time_ms", 0)
|
|
|
|
if response_time > 0 and baseline_time > 0:
|
|
diff = response_time - baseline_time
|
|
# Significant timing difference (> 100ms)
|
|
if diff > 100:
|
|
return True, 0.7, f"Timing attack: {diff}ms difference (baseline: {baseline_time}ms, actual: {response_time}ms)"
|
|
# Very significant (> 500ms)
|
|
if diff > 500:
|
|
return True, 0.9, f"Timing attack: {diff}ms difference strongly indicates character-by-character comparison"
|
|
|
|
# Check for timing via multiple measurements
|
|
if "timing_samples" in context:
|
|
samples = context["timing_samples"]
|
|
if len(samples) >= 2:
|
|
max_diff = max(samples) - min(samples)
|
|
if max_diff > 200:
|
|
return True, 0.65, f"Timing attack: {max_diff}ms variance across samples"
|
|
|
|
return False, 0.0, None
|
|
|
|
|
|
class HostHeaderInjectionTester(BaseTester):
|
|
"""Tester for Host Header Injection vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "host_header_injection"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for Host value reflected in response links/URLs"""
|
|
body_lower = response_body.lower()
|
|
|
|
# Injected host appearing in response
|
|
evil_hosts = ["evil.com", "attacker.com", "neurosploit.test", "canary.host"]
|
|
for host in evil_hosts:
|
|
if host in payload.lower() and host in body_lower:
|
|
# High confidence if in URL context
|
|
url_contexts = [
|
|
rf'https?://{re.escape(host)}',
|
|
rf'href\s*=\s*["\'][^"\']*{re.escape(host)}',
|
|
rf'action\s*=\s*["\'][^"\']*{re.escape(host)}',
|
|
rf'redirect.*{re.escape(host)}',
|
|
]
|
|
for pattern in url_contexts:
|
|
if re.search(pattern, response_body, re.IGNORECASE):
|
|
return True, 0.9, f"Host header injection: '{host}' reflected in URL context"
|
|
|
|
return True, 0.7, f"Host header injection: '{host}' reflected in response body"
|
|
|
|
# X-Forwarded-Host injection
|
|
if "x-forwarded-host" in payload.lower():
|
|
headers_lower = {k.lower(): v for k, v in response_headers.items()}
|
|
location = headers_lower.get("location", "")
|
|
if any(h in location.lower() for h in evil_hosts):
|
|
return True, 0.9, "Host header injection: X-Forwarded-Host reflected in redirect"
|
|
|
|
# Password reset link poisoning
|
|
if context.get("is_password_reset") and response_status == 200:
|
|
for host in evil_hosts:
|
|
if host in body_lower:
|
|
return True, 0.95, f"Host header injection in password reset: link points to '{host}'"
|
|
|
|
return False, 0.0, None
|
|
|
|
|
|
class HttpSmugglingTester(BaseTester):
|
|
"""Tester for HTTP Request Smuggling vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "http_smuggling"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for HTTP smuggling indicators"""
|
|
headers_lower = {k.lower(): v for k, v in response_headers.items()}
|
|
|
|
# Response splitting - two HTTP responses in one
|
|
if re.search(r"HTTP/\d\.\d\s+\d{3}", response_body):
|
|
return True, 0.85, "HTTP smuggling: embedded HTTP response in body (response splitting)"
|
|
|
|
# Conflicting Content-Length and Transfer-Encoding
|
|
has_cl = "content-length" in headers_lower
|
|
has_te = "transfer-encoding" in headers_lower
|
|
if has_cl and has_te:
|
|
return True, 0.7, "HTTP smuggling: both Content-Length and Transfer-Encoding present"
|
|
|
|
# CL.TE or TE.CL desync indicators
|
|
if context.get("desync_detected"):
|
|
return True, 0.9, "HTTP smuggling: request desync confirmed"
|
|
|
|
# Unexpected response to smuggled second request
|
|
if "smuggle_marker" in context:
|
|
marker = context["smuggle_marker"]
|
|
if marker in response_body:
|
|
return True, 0.85, f"HTTP smuggling: smuggled request marker '{marker}' in response"
|
|
|
|
# Different status than expected (frontend vs backend disagreement)
|
|
if context.get("expected_status") and response_status != context["expected_status"]:
|
|
if response_status in [400, 403] and context["expected_status"] == 200:
|
|
return True, 0.5, f"HTTP smuggling: status mismatch (expected {context['expected_status']}, got {response_status})"
|
|
|
|
# Timeout on second request (queued/poisoned)
|
|
if context.get("second_request_timeout"):
|
|
return True, 0.7, "HTTP smuggling: second request timed out (possible queue poisoning)"
|
|
|
|
return False, 0.0, None
|
|
|
|
def check_timeout_vulnerability(self, vuln_type: str) -> bool:
|
|
"""Smuggling can cause timeouts on subsequent requests"""
|
|
return True
|
|
|
|
|
|
class CachePoisoningTester(BaseTester):
|
|
"""Tester for Web Cache Poisoning vulnerabilities"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.name = "cache_poisoning"
|
|
|
|
def analyze_response(
|
|
self,
|
|
payload: str,
|
|
response_status: int,
|
|
response_headers: Dict,
|
|
response_body: str,
|
|
context: Dict
|
|
) -> Tuple[bool, float, Optional[str]]:
|
|
"""Check for cache poisoning - injected content served from cache"""
|
|
headers_lower = {k.lower(): v for k, v in response_headers.items()}
|
|
|
|
# Check for cache hit with injected content
|
|
cache_hit = False
|
|
cache_headers = ["x-cache", "cf-cache-status", "x-varnish", "x-drupal-cache",
|
|
"x-proxy-cache", "x-cdn-cache"]
|
|
for header in cache_headers:
|
|
value = headers_lower.get(header, "").lower()
|
|
if "hit" in value:
|
|
cache_hit = True
|
|
break
|
|
|
|
# Age header indicates cached response
|
|
age = headers_lower.get("age")
|
|
if age and age != "0":
|
|
cache_hit = True
|
|
|
|
if cache_hit:
|
|
# Check if our injected content is in the cached response
|
|
injection_markers = ["neurosploit", "xss", "evil.com", "attacker"]
|
|
for marker in injection_markers:
|
|
if marker in payload.lower() and marker in response_body.lower():
|
|
return True, 0.9, f"Cache poisoning: injected content '{marker}' served from cache"
|
|
|
|
# Unkeyed header reflected in response (potential cache poison vector)
|
|
unkeyed_headers = ["x-forwarded-host", "x-forwarded-scheme", "x-original-url",
|
|
"x-rewrite-url", "x-forwarded-prefix"]
|
|
for header in unkeyed_headers:
|
|
if header in payload.lower():
|
|
# Check if the value appears in response
|
|
for marker in ["evil.com", "neurosploit", "attacker"]:
|
|
if marker in payload.lower() and marker in response_body.lower():
|
|
cache_status = "cached" if cache_hit else "uncached"
|
|
confidence = 0.85 if cache_hit else 0.5
|
|
return True, confidence, f"Cache poisoning: unkeyed header '{header}' reflected ({cache_status})"
|
|
|
|
# Cache deception check
|
|
if context.get("is_cache_deception_test"):
|
|
if cache_hit and ("token" in response_body.lower() or "session" in response_body.lower()):
|
|
return True, 0.8, "Cache deception: sensitive data cached via path confusion"
|
|
|
|
return False, 0.0, None
|