Files
NeuroSploit/backend/core/vuln_engine/testers/infrastructure.py
2026-02-11 10:47:33 -03:00

510 lines
18 KiB
Python

"""
NeuroSploit v3 - Infrastructure Vulnerability Testers
Testers for Security Headers, SSL/TLS, HTTP Methods
"""
import re
from typing import Tuple, Dict, Optional
from backend.core.vuln_engine.testers.base_tester import BaseTester
class SecurityHeadersTester(BaseTester):
"""Tester for Missing Security Headers"""
def __init__(self):
super().__init__()
self.name = "security_headers"
self.required_headers = {
"Strict-Transport-Security": "HSTS not configured",
"X-Content-Type-Options": "X-Content-Type-Options not set",
"X-Frame-Options": "X-Frame-Options not set",
"Content-Security-Policy": "CSP not configured",
"X-XSS-Protection": "X-XSS-Protection not set (legacy but still useful)",
"Referrer-Policy": "Referrer-Policy not configured"
}
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for missing security headers"""
missing = []
headers_lower = {k.lower(): v for k, v in response_headers.items()}
for header, message in self.required_headers.items():
if header.lower() not in headers_lower:
missing.append(message)
# Check for weak CSP
csp = headers_lower.get("content-security-policy", "")
if csp:
weak_csp = []
if "unsafe-inline" in csp:
weak_csp.append("unsafe-inline")
if "unsafe-eval" in csp:
weak_csp.append("unsafe-eval")
if "*" in csp:
weak_csp.append("wildcard sources")
if weak_csp:
missing.append(f"Weak CSP: {', '.join(weak_csp)}")
if missing:
confidence = min(0.3 + len(missing) * 0.1, 0.8)
return True, confidence, f"Missing/weak headers: {'; '.join(missing[:3])}"
return False, 0.0, None
class SSLTester(BaseTester):
"""Tester for SSL/TLS Issues"""
def __init__(self):
super().__init__()
self.name = "ssl_issues"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for SSL/TLS issues"""
issues = []
# Check HSTS
hsts = response_headers.get("Strict-Transport-Security", "")
if not hsts:
issues.append("HSTS not enabled")
else:
# Check HSTS max-age
max_age_match = re.search(r'max-age=(\d+)', hsts)
if max_age_match:
max_age = int(max_age_match.group(1))
if max_age < 31536000: # Less than 1 year
issues.append(f"HSTS max-age too short: {max_age}s")
if "includeSubDomains" not in hsts:
issues.append("HSTS missing includeSubDomains")
# Check for HTTP resources on HTTPS page
if "https://" in (context.get("url", "") or ""):
http_resources = re.findall(r'(?:src|href)=["\']http://[^"\']+', response_body)
if http_resources:
issues.append(f"Mixed content: {len(http_resources)} HTTP resources")
if issues:
return True, 0.6, f"SSL/TLS issues: {'; '.join(issues)}"
return False, 0.0, None
class HTTPMethodsTester(BaseTester):
"""Tester for Dangerous HTTP Methods"""
def __init__(self):
super().__init__()
self.name = "http_methods"
self.dangerous_methods = ["TRACE", "TRACK", "PUT", "DELETE", "CONNECT"]
def build_request(self, endpoint, payload: str) -> Tuple[str, Dict, Dict, Optional[str]]:
"""Build OPTIONS request to check allowed methods"""
headers = {
"User-Agent": "NeuroSploit/3.0"
}
# payload is the HTTP method to test
return endpoint.url, {}, headers, None
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for dangerous HTTP methods"""
# Check Allow header from OPTIONS response
allow = response_headers.get("Allow", "")
dangerous_found = []
for method in self.dangerous_methods:
if method in allow.upper():
dangerous_found.append(method)
# TRACE method enables XST attacks
if "TRACE" in dangerous_found or "TRACK" in dangerous_found:
return True, 0.7, f"Dangerous methods enabled: {', '.join(dangerous_found)} (XST risk)"
if dangerous_found:
return True, 0.5, f"Potentially dangerous methods: {', '.join(dangerous_found)}"
# Check if specific method test succeeded
if payload.upper() in self.dangerous_methods:
if response_status == 200:
return True, 0.6, f"{payload} method accepted"
return False, 0.0, None
class DirectoryListingTester(BaseTester):
"""Tester for Directory Listing exposure"""
def __init__(self):
super().__init__()
self.name = "directory_listing"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for directory listing patterns"""
if response_status == 200:
listing_patterns = [
r"<title>Index of\s*/",
r"Index of\s*/",
r"<h1>Index of",
r"Directory listing for\s*/",
r"<title>Directory listing",
r'<a\s+href="\.\./">\.\./</a>',
r"Parent Directory</a>",
r'\[DIR\]',
r'\[TXT\]',
r"<pre>.*<a href=",
]
for pattern in listing_patterns:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.9, "Directory listing: Server directory contents exposed"
# Check for Apache/Nginx-specific listing
if re.search(r'<address>Apache/[\d.]+ .* Server at', response_body, re.IGNORECASE):
if "Index of" in response_body:
return True, 0.95, "Directory listing: Apache directory listing enabled"
return False, 0.0, None
class DebugModeTester(BaseTester):
"""Tester for Debug Mode/Page exposure"""
def __init__(self):
super().__init__()
self.name = "debug_mode"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for debug pages, stack traces with source paths"""
# Werkzeug/Flask debugger
werkzeug_patterns = [
r"Werkzeug\s+Debugger",
r"werkzeug\.debug",
r"<div class=\"debugger\">",
r"The debugger caught an exception",
r"__debugger__",
]
for pattern in werkzeug_patterns:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.95, "Debug mode: Werkzeug interactive debugger exposed (RCE risk)"
# Laravel debug
laravel_patterns = [
r"Whoops!.*Laravel",
r"Ignition\s",
r"vendor/laravel",
r"Laravel.*Exception",
r"app/Http/Controllers",
]
for pattern in laravel_patterns:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.9, "Debug mode: Laravel debug page exposed"
# Django debug
django_patterns = [
r"You\'re seeing this error because you have <code>DEBUG = True</code>",
r"Django Version:",
r"Traceback.*django",
r"INSTALLED_APPS",
r"settings\.py",
]
for pattern in django_patterns:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.9, "Debug mode: Django debug page exposed"
# Generic stack traces with source paths
stack_trace_patterns = [
r"(?:File|at)\s+[\"']?(?:/[a-z]+/|C:\\)[^\s\"']+\.(?:py|php|rb|js|java|go)\b",
r"Traceback \(most recent call last\)",
r"Stack trace:.*(?:\.php|\.py|\.rb|\.java)",
r"(?:Error|Exception)\s+in\s+(?:/[a-z]+/|C:\\)[^\s]+:\d+",
]
for pattern in stack_trace_patterns:
if re.search(pattern, response_body, re.IGNORECASE | re.DOTALL):
return True, 0.8, "Debug mode: Stack trace with source file paths exposed"
# ASP.NET detailed errors
if re.search(r"Server Error in '/' Application", response_body):
return True, 0.85, "Debug mode: ASP.NET detailed error page exposed"
return False, 0.0, None
class ExposedAdminPanelTester(BaseTester):
"""Tester for Publicly Accessible Admin Panel"""
def __init__(self):
super().__init__()
self.name = "exposed_admin_panel"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for admin login pages accessible publicly"""
if response_status == 200:
admin_panel_patterns = [
(r"(?:admin|administrator)\s+(?:login|panel|dashboard|console)", "admin login page"),
(r"<title>[^<]*(?:admin|dashboard|control\s*panel|cms)[^<]*</title>", "admin title"),
(r"wp-login\.php", "WordPress login"),
(r"wp-admin", "WordPress admin"),
(r"/admin/login", "admin login endpoint"),
(r"phpmyadmin", "phpMyAdmin"),
(r"adminer\.php", "Adminer"),
(r"cPanel", "cPanel"),
(r"Webmin", "Webmin"),
(r"Plesk", "Plesk"),
(r"joomla.*administrator", "Joomla admin"),
(r"drupal.*user/login", "Drupal admin login"),
]
for pattern, panel_name in admin_panel_patterns:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.75, f"Exposed admin panel: {panel_name} accessible publicly"
return False, 0.0, None
class ExposedApiDocsTester(BaseTester):
"""Tester for Publicly Accessible API Documentation"""
def __init__(self):
super().__init__()
self.name = "exposed_api_docs"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for Swagger/OpenAPI documentation pages"""
if response_status == 200:
api_docs_patterns = [
(r"swagger-ui", "Swagger UI"),
(r'"swagger"\s*:\s*"[0-9.]+"', "Swagger spec"),
(r'"openapi"\s*:\s*"[0-9.]+"', "OpenAPI spec"),
(r"swagger-ui-bundle\.js", "Swagger UI bundle"),
(r"<title>Swagger UI</title>", "Swagger UI page"),
(r"redoc", "ReDoc API docs"),
(r"api-docs", "API documentation"),
(r"graphiql", "GraphiQL interface"),
(r"GraphQL Playground", "GraphQL Playground"),
(r'"paths"\s*:\s*\{', "OpenAPI paths object"),
(r'"info"\s*:\s*\{.*"title"\s*:', "OpenAPI info object"),
]
for pattern, doc_type in api_docs_patterns:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.8, f"Exposed API docs: {doc_type} publicly accessible"
# Check content type for JSON API specs
content_type = response_headers.get("Content-Type", "")
if "json" in content_type.lower():
if re.search(r'"paths"\s*:\s*\{.*"(?:get|post|put|delete)"', response_body, re.DOTALL):
return True, 0.85, "Exposed API docs: OpenAPI/Swagger JSON specification exposed"
return False, 0.0, None
class InsecureCookieFlagsTester(BaseTester):
"""Tester for Missing Secure/HttpOnly/SameSite Cookie Flags"""
def __init__(self):
super().__init__()
self.name = "insecure_cookie_flags"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check Set-Cookie headers for missing security flags"""
# Collect all Set-Cookie headers
set_cookie_values = []
for key, value in response_headers.items():
if key.lower() == "set-cookie":
if isinstance(value, list):
set_cookie_values.extend(value)
else:
set_cookie_values.append(value)
if not set_cookie_values:
return False, 0.0, None
issues = []
for cookie in set_cookie_values:
cookie_lower = cookie.lower()
cookie_name = cookie.split("=")[0].strip()
# Session cookies are more critical
is_session = any(
s in cookie_name.lower()
for s in ["session", "sess", "sid", "token", "auth", "jwt", "csrf"]
)
missing_flags = []
if "secure" not in cookie_lower:
missing_flags.append("Secure")
if "httponly" not in cookie_lower:
missing_flags.append("HttpOnly")
if "samesite" not in cookie_lower:
missing_flags.append("SameSite")
if missing_flags:
severity = "session cookie" if is_session else "cookie"
issues.append(f"{cookie_name} ({severity}): missing {', '.join(missing_flags)}")
if issues:
confidence = 0.8 if any("session cookie" in i for i in issues) else 0.6
return True, confidence, f"Insecure cookie flags: {'; '.join(issues[:3])}"
return False, 0.0, None
class HttpSmugglingTester(BaseTester):
"""Tester for HTTP Request Smuggling (CL/TE discrepancy)"""
def __init__(self):
super().__init__()
self.name = "http_smuggling"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for CL/TE discrepancy indicators"""
# Check for request smuggling indicators
smuggling_indicators = [
# Different response than expected
(r"400 Bad Request.*(?:Content-Length|Transfer-Encoding)", "CL/TE parsing error"),
(r"(?:invalid|malformed)\s+(?:chunk|transfer.encoding)", "chunked encoding error"),
]
for pattern, desc in smuggling_indicators:
if re.search(pattern, response_body, re.IGNORECASE | re.DOTALL):
return True, 0.7, f"HTTP smuggling indicator: {desc}"
# Check for dual Transfer-Encoding handling
te_header = response_headers.get("Transfer-Encoding", "")
cl_header = response_headers.get("Content-Length", "")
if te_header and cl_header:
return True, 0.75, "HTTP smuggling: Both Transfer-Encoding and Content-Length in response"
# Check for timeout-based detection (context)
if context.get("response_time_ms", 0) > 10000:
if "transfer-encoding" in payload.lower() or "content-length" in payload.lower():
return True, 0.6, "HTTP smuggling: Abnormal response delay with CL/TE payload"
# Check for response desync indicators
if response_status == 0 or context.get("connection_reset"):
return True, 0.65, "HTTP smuggling: Connection reset/timeout with smuggling payload"
return False, 0.0, None
class CachePoisoningTester(BaseTester):
"""Tester for Web Cache Poisoning"""
def __init__(self):
super().__init__()
self.name = "cache_poisoning"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for cached response with injected unkeyed input"""
if response_status == 200:
# Check for cache headers indicating response was cached
cache_indicators = {
"X-Cache": response_headers.get("X-Cache", ""),
"CF-Cache-Status": response_headers.get("CF-Cache-Status", ""),
"Age": response_headers.get("Age", ""),
"X-Varnish": response_headers.get("X-Varnish", ""),
}
is_cached = False
for header, value in cache_indicators.items():
if value:
if any(hit in value.upper() for hit in ["HIT", "STALE"]):
is_cached = True
break
if header == "Age" and int(value or 0) > 0:
is_cached = True
break
# Check if our unkeyed input is reflected in the cached response
if is_cached or response_headers.get("Cache-Control", ""):
# Common unkeyed headers that might be reflected
unkeyed_indicators = [
r"X-Forwarded-Host", r"X-Forwarded-Scheme",
r"X-Original-URL", r"X-Rewrite-URL",
]
if payload in response_body:
if is_cached:
return True, 0.9, "Cache poisoning: Injected unkeyed input reflected in cached response"
else:
return True, 0.7, "Cache poisoning: Unkeyed input reflected - verify caching"
# Check for Vary header missing expected values
vary = response_headers.get("Vary", "")
cache_control = response_headers.get("Cache-Control", "")
if "no-store" not in cache_control and "private" not in cache_control:
if payload in response_body:
return True, 0.6, "Cache poisoning potential: Input reflected in cacheable response"
return False, 0.0, None