Files
NeuroSploit/backend/core/vuln_engine/testers/injection.py
2026-01-19 19:21:57 -03:00

373 lines
12 KiB
Python

"""
NeuroSploit v3 - Injection Vulnerability Testers
Testers for XSS, SQL Injection, Command Injection, SSTI, etc.
"""
import re
from typing import Tuple, Dict, Optional
from backend.core.vuln_engine.testers.base_tester import BaseTester
class XSSReflectedTester(BaseTester):
"""Tester for Reflected XSS vulnerabilities"""
def __init__(self):
super().__init__()
self.name = "xss_reflected"
# Unique markers for detection
self.markers = [
"neurosploit",
"xsstest123",
"alert(1)"
]
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check if XSS payload is reflected in response"""
if response_status >= 400:
return False, 0.0, None
# Check if payload is reflected
if payload in response_body:
# Check if it's in a dangerous context
dangerous_patterns = [
rf'<script[^>]*>{re.escape(payload)}',
rf'on\w+\s*=\s*["\']?{re.escape(payload)}',
rf'javascript:\s*{re.escape(payload)}',
rf'<[^>]+{re.escape(payload)}[^>]*>',
]
for pattern in dangerous_patterns:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.9, f"XSS payload reflected in dangerous context: {pattern}"
# Payload reflected but possibly encoded
return True, 0.7, "XSS payload reflected in response"
# Check for partial reflection (script tags, etc.)
for marker in self.markers:
if marker in payload and marker in response_body:
return True, 0.6, f"XSS marker '{marker}' found in response"
return False, 0.0, None
class XSSStoredTester(BaseTester):
"""Tester for Stored XSS vulnerabilities"""
def __init__(self):
super().__init__()
self.name = "xss_stored"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for stored XSS - requires subsequent request verification"""
# For stored XSS, we need to check if data was stored
# This is a simplified check - full implementation would verify on retrieval
if response_status in [200, 201, 302]:
if "success" in response_body.lower() or "created" in response_body.lower():
return True, 0.5, "Data possibly stored - verify retrieval for stored XSS"
return False, 0.0, None
class XSSDomTester(BaseTester):
"""Tester for DOM-based XSS vulnerabilities"""
def __init__(self):
super().__init__()
self.name = "xss_dom"
self.dom_sinks = [
"innerHTML", "outerHTML", "document.write", "document.writeln",
"eval(", "setTimeout(", "setInterval(", "location.href",
"location.assign", "location.replace"
]
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for DOM XSS indicators"""
# Look for dangerous DOM sinks in JavaScript
for sink in self.dom_sinks:
pattern = rf'{sink}[^;]*(?:location|document\.URL|document\.referrer|window\.name)'
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.7, f"Potential DOM XSS sink found: {sink}"
# Check if URL parameters are used in JavaScript
if re.search(r'(?:location\.search|location\.hash|document\.URL)', response_body):
if any(sink in response_body for sink in self.dom_sinks):
return True, 0.6, "URL input flows to DOM sink"
return False, 0.0, None
class SQLiErrorTester(BaseTester):
"""Tester for Error-based SQL Injection"""
def __init__(self):
super().__init__()
self.name = "sqli_error"
self.error_patterns = [
# MySQL
r"SQL syntax.*MySQL", r"Warning.*mysql_", r"MySQLSyntaxErrorException",
r"valid MySQL result", r"check the manual that corresponds to your MySQL",
# PostgreSQL
r"PostgreSQL.*ERROR", r"Warning.*pg_", r"valid PostgreSQL result",
r"Npgsql\.", r"PG::SyntaxError",
# SQL Server
r"Driver.*SQL[\-\_\ ]*Server", r"OLE DB.*SQL Server",
r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_",
r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"Microsoft SQL Native Client error",
# Oracle
r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver",
r"Warning.*oci_", r"Warning.*ora_",
# SQLite
r"SQLite/JDBCDriver", r"SQLite\.Exception", r"System\.Data\.SQLite\.SQLiteException",
r"Warning.*sqlite_", r"Warning.*SQLite3::",
# Generic
r"SQL syntax.*", r"syntax error.*SQL", r"unclosed quotation mark",
r"quoted string not properly terminated"
]
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for SQL error messages in response"""
for pattern in self.error_patterns:
match = re.search(pattern, response_body, re.IGNORECASE)
if match:
return True, 0.9, f"SQL error detected: {match.group(0)[:100]}"
return False, 0.0, None
class SQLiUnionTester(BaseTester):
"""Tester for Union-based SQL Injection"""
def __init__(self):
super().__init__()
self.name = "sqli_union"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for union-based SQLi indicators"""
# Look for injected data markers
union_markers = ["neurosploit", "uniontest", "concat(", "version()"]
for marker in union_markers:
if marker in payload.lower() and marker in response_body.lower():
return True, 0.8, f"Union injection marker '{marker}' found in response"
# Check for database version strings
version_patterns = [
r"MySQL.*\d+\.\d+", r"PostgreSQL.*\d+\.\d+",
r"Microsoft SQL Server.*\d+", r"Oracle.*\d+",
r"\d+\.\d+\.\d+-MariaDB"
]
for pattern in version_patterns:
if re.search(pattern, response_body):
return True, 0.7, "Database version string found - possible union SQLi"
return False, 0.0, None
class SQLiBlindTester(BaseTester):
"""Tester for Boolean-based Blind SQL Injection"""
def __init__(self):
super().__init__()
self.name = "sqli_blind"
self.baseline_length = None
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for boolean-based blind SQLi"""
# This requires comparing responses - simplified check
response_length = len(response_body)
# Check for significant difference in response
if "baseline_length" in context:
diff = abs(response_length - context["baseline_length"])
if diff > 100: # Significant difference
return True, 0.6, f"Response length differs by {diff} bytes - possible blind SQLi"
# Check for conditional responses
if "1=1" in payload and response_status == 200:
return True, 0.5, "True condition returned 200 - possible blind SQLi"
return False, 0.0, None
class SQLiTimeTester(BaseTester):
"""Tester for Time-based Blind SQL Injection"""
def __init__(self):
super().__init__()
self.name = "sqli_time"
def check_timeout_vulnerability(self, vuln_type: str) -> bool:
"""Time-based SQLi is indicated by timeout"""
return True
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Time-based detection relies on timeout"""
# Response time analysis would be done in the engine
return False, 0.0, None
class CommandInjectionTester(BaseTester):
"""Tester for OS Command Injection"""
def __init__(self):
super().__init__()
self.name = "command_injection"
self.command_outputs = [
# Linux
r"root:.*:0:0:", r"bin:.*:1:1:", # /etc/passwd
r"uid=\d+.*gid=\d+", # id command
r"Linux.*\d+\.\d+\.\d+", # uname
r"total \d+.*drwx", # ls -la
# Windows
r"Volume Serial Number",
r"Directory of [A-Z]:\\",
r"Windows.*\[Version",
r"Microsoft Windows"
]
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for command execution evidence"""
for pattern in self.command_outputs:
match = re.search(pattern, response_body, re.IGNORECASE)
if match:
return True, 0.95, f"Command output detected: {match.group(0)[:100]}"
# Check for our marker
if "neurosploit" in payload and "neurosploit" in response_body:
return True, 0.8, "Command injection marker echoed"
return False, 0.0, None
class SSTITester(BaseTester):
"""Tester for Server-Side Template Injection"""
def __init__(self):
super().__init__()
self.name = "ssti"
# Mathematical expressions that prove code execution
self.math_results = {
"{{7*7}}": "49",
"${7*7}": "49",
"#{7*7}": "49",
"<%= 7*7 %>": "49",
"{{7*'7'}}": "7777777", # Jinja2
}
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for SSTI indicators"""
# Check mathematical results
for expr, result in self.math_results.items():
if expr in payload and result in response_body:
return True, 0.95, f"SSTI confirmed: {expr} = {result}"
# Check for template errors
template_errors = [
r"TemplateSyntaxError", r"Jinja2", r"Twig_Error",
r"freemarker\.core\.", r"velocity\.exception",
r"org\.apache\.velocity", r"Smarty"
]
for pattern in template_errors:
if re.search(pattern, response_body):
return True, 0.7, f"Template engine error: {pattern}"
return False, 0.0, None
class NoSQLInjectionTester(BaseTester):
"""Tester for NoSQL Injection"""
def __init__(self):
super().__init__()
self.name = "nosql_injection"
def analyze_response(
self,
payload: str,
response_status: int,
response_headers: Dict,
response_body: str,
context: Dict
) -> Tuple[bool, float, Optional[str]]:
"""Check for NoSQL injection indicators"""
# MongoDB errors
nosql_errors = [
r"MongoError", r"MongoDB", r"bson",
r"\$where", r"\$gt", r"\$ne",
r"SyntaxError.*JSON"
]
for pattern in nosql_errors:
if re.search(pattern, response_body, re.IGNORECASE):
return True, 0.7, f"NoSQL error indicator: {pattern}"
# Check for authentication bypass
if "$ne" in payload or "$gt" in payload:
if response_status == 200 and "success" in response_body.lower():
return True, 0.6, "Possible NoSQL authentication bypass"
return False, 0.0, None