Files
2026-02-11 10:47:33 -03:00

348 lines
13 KiB
Python

"""
NeuroSploit v3 - Authentication Vulnerability Testers
Testers for Auth Bypass, JWT, Session Fixation
"""
import re
import base64
import json
from typing import Tuple, Dict, Optional
from backend.core.vuln_engine.testers.base_tester import BaseTester
class AuthBypassTester(BaseTester):
"""Tester for Authentication Bypass"""
def __init__(self):
super().__init__()
self.name = "auth_bypass"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for authentication bypass"""
# Check for successful auth indicators after bypass payload
auth_success = [
"welcome", "dashboard", "logged in", "authenticated",
"success", "admin", "profile"
]
if response_status == 200:
body_lower = response_body.lower()
for indicator in auth_success:
if indicator in body_lower:
# Check if this was with a bypass payload
bypass_indicators = ["' or '1'='1", "admin'--", "' or 1=1"]
if any(bp in payload.lower() for bp in bypass_indicators):
return True, 0.8, f"Auth bypass possible: '{indicator}' found after injection"
# Check for redirect to authenticated area
location = response_headers.get("Location", "")
if response_status in [301, 302]:
if "dashboard" in location or "admin" in location or "home" in location:
return True, 0.7, f"Auth bypass: Redirect to {location}"
return False, 0.0, None
class JWTManipulationTester(BaseTester):
"""Tester for JWT Token Manipulation"""
def __init__(self):
super().__init__()
self.name = "jwt_manipulation"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for JWT manipulation vulnerabilities"""
# Check if manipulated JWT was accepted
if response_status == 200:
# Algorithm none attack
if '"alg":"none"' in payload or '"alg": "none"' in payload:
return True, 0.9, "JWT 'none' algorithm accepted"
# Check for elevated privileges response
elevated_indicators = ["admin", "administrator", "role.*admin"]
for pattern in elevated_indicators:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.8, "JWT manipulation: Elevated privileges detected"
# Check for JWT-specific errors
jwt_errors = [
r"invalid.*token", r"jwt.*expired", r"signature.*invalid",
r"token.*malformed", r"unauthorized"
]
for pattern in jwt_errors:
if re.search(pattern, response_body, re.IGNORECASE):
# Error means it's checking - note for further testing
return False, 0.0, None
return False, 0.0, None
class SessionFixationTester(BaseTester):
"""Tester for Session Fixation"""
def __init__(self):
super().__init__()
self.name = "session_fixation"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for session fixation vulnerability"""
# Check Set-Cookie header
set_cookie = response_headers.get("Set-Cookie", "")
# If session ID in URL was accepted
if "JSESSIONID=" in payload or "PHPSESSID=" in payload:
if response_status == 200:
# Check if session was NOT regenerated
if not set_cookie or "JSESSIONID" not in set_cookie:
return True, 0.7, "Session ID from URL accepted without regeneration"
# Check for session in URL
if re.search(r'[?&](?:session|sid|PHPSESSID|JSESSIONID)=', response_body):
return True, 0.6, "Session ID exposed in URL"
return False, 0.0, None
class WeakPasswordTester(BaseTester):
"""Tester for Weak Password acceptance"""
def __init__(self):
super().__init__()
self.name = "weak_password"
self.weak_passwords = [
"123456", "password", "12345678", "qwerty", "abc123",
"111111", "123123", "admin", "letmein", "welcome",
"1234", "1", "a"
]
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for successful login/registration with weak passwords"""
body_lower = response_body.lower()
# Check if payload contains a weak password
payload_has_weak = any(wp in payload for wp in self.weak_passwords)
if not payload_has_weak:
return False, 0.0, None
# Check for successful auth with weak password
if response_status in [200, 201, 302]:
success_indicators = [
r'"(?:access_)?token"\s*:', r'"session"\s*:',
r"(?:login|registration|signup)\s+successful",
r'"authenticated"\s*:\s*true', r'"success"\s*:\s*true',
r"welcome", r"dashboard", r"logged\s*in",
]
for pattern in success_indicators:
if re.search(pattern, response_body, re.IGNORECASE):
matched_pw = next((wp for wp in self.weak_passwords if wp in payload), "unknown")
return True, 0.85, f"Weak password accepted: '{matched_pw}' allowed for authentication"
# Redirect to authenticated area
location = response_headers.get("Location", "")
if response_status == 302 and any(x in location.lower() for x in ["dashboard", "home", "profile", "account"]):
matched_pw = next((wp for wp in self.weak_passwords if wp in payload), "unknown")
return True, 0.8, f"Weak password accepted: Redirect to authenticated area with '{matched_pw}'"
return False, 0.0, None
class DefaultCredentialsTester(BaseTester):
"""Tester for Default Credentials acceptance"""
def __init__(self):
super().__init__()
self.name = "default_credentials"
self.default_creds = [
("admin", "admin"), ("admin", "password"), ("admin", "admin123"),
("root", "root"), ("root", "toor"), ("root", "password"),
("administrator", "administrator"), ("admin", "1234"),
("test", "test"), ("guest", "guest"), ("user", "user"),
("admin", "changeme"), ("admin", "default"),
]
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for successful login with default credentials"""
body_lower = response_body.lower()
# Check if payload matches default creds
payload_lower = payload.lower()
matched_cred = None
for username, password in self.default_creds:
if username in payload_lower and password in payload_lower:
matched_cred = f"{username}/{password}"
break
if not matched_cred:
return False, 0.0, None
# Check for successful login
if response_status in [200, 201, 302]:
auth_success = [
r'"(?:access_)?token"\s*:', r'"session"\s*:',
r"(?:login|auth)\s+successful", r'"success"\s*:\s*true',
r'"authenticated"\s*:\s*true', r"welcome",
r"dashboard", r"admin\s*panel",
]
for pattern in auth_success:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.9, f"Default credentials accepted: {matched_cred}"
# Redirect to admin/dashboard
location = response_headers.get("Location", "")
if response_status == 302 and any(x in location.lower() for x in ["dashboard", "admin", "home", "panel"]):
return True, 0.85, f"Default credentials accepted: {matched_cred} (redirect to {location})"
return False, 0.0, None
class TwoFactorBypassTester(BaseTester):
"""Tester for Two-Factor Authentication Bypass"""
def __init__(self):
super().__init__()
self.name = "two_factor_bypass"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for authenticated access without completing 2FA"""
body_lower = response_body.lower()
# Check if we reached authenticated content without 2FA
if response_status == 200:
# Authenticated area indicators
auth_area_patterns = [
r"dashboard", r"my\s*account", r"profile",
r"settings", r"admin\s*panel", r'"user"\s*:\s*\{',
r'"email"\s*:\s*"[^"]+"', r'"role"\s*:',
]
# 2FA page indicators (we should NOT see these if bypassed)
twofa_page_patterns = [
r"(?:enter|verify)\s+(?:your\s+)?(?:otp|code|token|2fa)",
r"two.?factor", r"verification\s+code",
r"authenticator", r"sms\s+code",
]
has_auth_content = any(re.search(p, response_body, re.IGNORECASE) for p in auth_area_patterns)
is_twofa_page = any(re.search(p, response_body, re.IGNORECASE) for p in twofa_page_patterns)
if has_auth_content and not is_twofa_page:
# Check if payload suggests 2FA bypass attempt
bypass_indicators = [
"2fa", "otp", "mfa", "verify", "code",
"step2", "second", "challenge",
]
if any(bi in payload.lower() for bi in bypass_indicators):
return True, 0.85, "2FA bypass: Authenticated area accessed without completing 2FA"
# Direct navigation bypass
if context.get("skip_2fa") or context.get("direct_access"):
return True, 0.9, "2FA bypass: Direct navigation to authenticated page bypassed 2FA"
# Redirect skipping 2FA step
if response_status in [301, 302]:
location = response_headers.get("Location", "").lower()
if any(x in location for x in ["dashboard", "home", "account"]):
if "verify" not in location and "2fa" not in location and "otp" not in location:
return True, 0.7, "2FA bypass: Redirect to authenticated area skipping verification"
return False, 0.0, None
class OauthMisconfigTester(BaseTester):
"""Tester for OAuth Misconfiguration"""
def __init__(self):
super().__init__()
self.name = "oauth_misconfig"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for OAuth misconfiguration (open redirect, token leakage)"""
# Check for open redirect in OAuth flow
if response_status in [301, 302, 303, 307]:
location = response_headers.get("Location", "")
# External redirect in OAuth callback
if "redirect_uri" in payload.lower() or "callback" in payload.lower():
# Check if redirecting to attacker-controlled domain
evil_domains = ["evil.com", "attacker.com", "malicious.com"]
if any(domain in location for domain in evil_domains):
return True, 0.9, f"OAuth misconfig: Open redirect in OAuth flow to {location}"
# Check if arbitrary redirect_uri accepted
if payload in location:
return True, 0.85, "OAuth misconfig: Arbitrary redirect_uri accepted"
# Check for token in URL parameters (should be in fragment or POST)
if response_status in [200, 302]:
location = response_headers.get("Location", "")
# Token in query string instead of fragment
token_in_url = re.search(
r'[?&](?:access_token|token|code)=([A-Za-z0-9._-]+)',
location
)
if token_in_url:
return True, 0.8, "OAuth misconfig: Token/code exposed in URL query parameters"
# Token in response body URL
token_in_body = re.search(
r'(?:redirect|callback|return)["\']?\s*[:=]\s*["\']?https?://[^"\'>\s]*[?&]access_token=',
response_body, re.IGNORECASE
)
if token_in_body:
return True, 0.75, "OAuth misconfig: Access token in redirect URL"
# Check for missing state parameter (CSRF in OAuth)
if "state=" not in response_body and "state=" not in response_headers.get("Location", ""):
if re.search(r"(?:authorize|oauth|auth)\?", response_body, re.IGNORECASE):
return True, 0.6, "OAuth misconfig: Missing state parameter (CSRF risk)"
return False, 0.0, None