mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-13 06:22:44 +00:00
424 lines
16 KiB
Python
424 lines
16 KiB
Python
"""
|
|
NeuroSploit v3 - Access Control Learning Engine
|
|
|
|
Adaptive learning system for BOLA/BFLA/IDOR and other access control testing.
|
|
Records test outcomes and response patterns to improve future evaluations.
|
|
|
|
Key insight: HTTP status codes are unreliable for access control testing.
|
|
This module learns from actual response DATA patterns to distinguish:
|
|
- True positives (cross-user data access)
|
|
- False positives (error messages, login pages, empty responses with 200 status)
|
|
|
|
Usage:
|
|
learner = AccessControlLearner()
|
|
# Record a test outcome
|
|
learner.record_test(vuln_type, url, response_body, is_true_positive, pattern_notes)
|
|
# Get learned patterns for a target
|
|
patterns = learner.get_patterns_for_target(domain)
|
|
# Get learning context for AI prompts
|
|
context = learner.get_learning_context(vuln_type)
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DATA_DIR = Path(__file__).parent.parent.parent / "data"
|
|
LEARNING_FILE = DATA_DIR / "access_control_learning.json"
|
|
|
|
|
|
@dataclass
|
|
class ResponsePattern:
|
|
"""A learned response pattern from access control testing."""
|
|
pattern_type: str # "denial", "empty", "login_page", "data_leak", "public_data"
|
|
indicators: List[str] # Strings/patterns that identify this response type
|
|
is_false_positive: bool # True if this pattern indicates a false positive
|
|
confidence: float # 0.0-1.0 how reliable this pattern is
|
|
example_body: str # Truncated example response body
|
|
vuln_type: str # bola, bfla, idor, etc.
|
|
target_domain: str # Domain this was learned from
|
|
timestamp: str # When this was learned
|
|
|
|
|
|
@dataclass
|
|
class TestRecord:
|
|
"""Record of an access control test outcome."""
|
|
vuln_type: str
|
|
target_url: str
|
|
status_code: int
|
|
response_length: int
|
|
is_true_positive: bool
|
|
pattern_type: str # What pattern was identified
|
|
key_indicators: List[str] # What strings/patterns were decisive
|
|
notes: str # Human or AI notes about why this was TP/FP
|
|
timestamp: str
|
|
|
|
|
|
class AccessControlLearner:
|
|
"""Adaptive learning engine for access control vulnerability testing.
|
|
|
|
Learns from test outcomes to identify response patterns that indicate
|
|
true vs false positives for BOLA, BFLA, IDOR, and related vuln types.
|
|
"""
|
|
|
|
MAX_RECORDS = 500
|
|
MAX_PATTERNS = 200
|
|
|
|
# Pre-seeded patterns from known false positive scenarios
|
|
DEFAULT_PATTERNS: List[Dict] = [
|
|
{
|
|
"pattern_type": "denial_200",
|
|
"indicators": ["unauthorized", "forbidden", "access denied", "not authorized",
|
|
"permission denied", "insufficient privileges"],
|
|
"is_false_positive": True,
|
|
"confidence": 0.9,
|
|
"description": "Server returns 200 OK but body contains access denial message",
|
|
},
|
|
{
|
|
"pattern_type": "empty_200",
|
|
"indicators": ["[]", "{}", '""', "null", ""],
|
|
"is_false_positive": True,
|
|
"confidence": 0.85,
|
|
"description": "Server returns 200 OK with empty/null response body",
|
|
},
|
|
{
|
|
"pattern_type": "login_redirect",
|
|
"indicators": ["type=\"password\"", "sign in", "log in", "login",
|
|
"authentication required"],
|
|
"is_false_positive": True,
|
|
"confidence": 0.95,
|
|
"description": "Server returns 200 OK but body is a login page",
|
|
},
|
|
{
|
|
"pattern_type": "error_json",
|
|
"indicators": ['"error":', '"status":"error"', '"success":false',
|
|
'"message":"not found"', '"code":401', '"code":403'],
|
|
"is_false_positive": True,
|
|
"confidence": 0.9,
|
|
"description": "Server returns 200 OK but JSON body indicates error",
|
|
},
|
|
{
|
|
"pattern_type": "own_data",
|
|
"indicators": [],
|
|
"is_false_positive": True,
|
|
"confidence": 0.8,
|
|
"description": "Server returns authenticated user's own data regardless of requested ID",
|
|
},
|
|
{
|
|
"pattern_type": "public_data",
|
|
"indicators": [],
|
|
"is_false_positive": True,
|
|
"confidence": 0.7,
|
|
"description": "Response contains only public profile fields (username, bio) not private data",
|
|
},
|
|
{
|
|
"pattern_type": "cross_user_data",
|
|
"indicators": ['"email":', '"phone":', '"address":', '"ssn":',
|
|
'"credit_card":', '"password":', '"secret":'],
|
|
"is_false_positive": False,
|
|
"confidence": 0.9,
|
|
"description": "Response contains another user's private data fields",
|
|
},
|
|
{
|
|
"pattern_type": "admin_data_leak",
|
|
"indicators": ['"role":"admin"', '"is_admin":true', '"users":[',
|
|
'"audit_log":', '"system_config":'],
|
|
"is_false_positive": False,
|
|
"confidence": 0.9,
|
|
"description": "Response contains admin-level data accessible to non-admin user",
|
|
},
|
|
{
|
|
"pattern_type": "state_change",
|
|
"indicators": ['"updated":', '"deleted":', '"created":', '"modified":',
|
|
'"success":true'],
|
|
"is_false_positive": False,
|
|
"confidence": 0.85,
|
|
"description": "Write operation succeeded on another user's resource",
|
|
},
|
|
]
|
|
|
|
# Known application patterns that cause false positives
|
|
KNOWN_FP_PATTERNS: Dict[str, List[str]] = {
|
|
"wso2": ["wso2", "carbon", "identity server", "api manager"],
|
|
"keycloak": ["keycloak", "red hat sso"],
|
|
"spring_security": ["spring security", "whitelabel error"],
|
|
"oauth2_proxy": ["oauth2-proxy", "sign in with"],
|
|
"cloudflare": ["cloudflare", "cf-ray", "attention required"],
|
|
"aws_waf": ["aws-waf", "request blocked"],
|
|
}
|
|
|
|
def __init__(self, data_dir: Optional[Path] = None):
|
|
self.data_dir = data_dir or DATA_DIR
|
|
self.learning_file = self.data_dir / "access_control_learning.json"
|
|
self.records: List[TestRecord] = []
|
|
self.custom_patterns: List[ResponsePattern] = []
|
|
self._load()
|
|
|
|
def _load(self):
|
|
"""Load learning data from disk."""
|
|
try:
|
|
if self.learning_file.exists():
|
|
with open(self.learning_file, "r") as f:
|
|
data = json.load(f)
|
|
self.records = [
|
|
TestRecord(**r) for r in data.get("records", [])
|
|
]
|
|
self.custom_patterns = [
|
|
ResponsePattern(**p) for p in data.get("patterns", [])
|
|
]
|
|
logger.debug(f"Loaded {len(self.records)} records, {len(self.custom_patterns)} patterns")
|
|
except Exception as e:
|
|
logger.debug(f"Failed to load learning data: {e}")
|
|
|
|
def _save(self):
|
|
"""Save learning data to disk."""
|
|
try:
|
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
|
data = {
|
|
"records": [asdict(r) for r in self.records[-self.MAX_RECORDS:]],
|
|
"patterns": [asdict(p) for p in self.custom_patterns[-self.MAX_PATTERNS:]],
|
|
"metadata": {
|
|
"total_records": len(self.records),
|
|
"total_patterns": len(self.custom_patterns),
|
|
"last_updated": datetime.now().isoformat(),
|
|
},
|
|
}
|
|
with open(self.learning_file, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to save learning data: {e}")
|
|
|
|
def record_test(
|
|
self,
|
|
vuln_type: str,
|
|
target_url: str,
|
|
status_code: int,
|
|
response_body: str,
|
|
is_true_positive: bool,
|
|
pattern_notes: str = "",
|
|
):
|
|
"""Record an access control test outcome for learning.
|
|
|
|
Called after the validation judge makes a decision, with the
|
|
verified outcome (true positive or false positive).
|
|
"""
|
|
# Identify response pattern
|
|
pattern_type = self._classify_response(response_body, status_code)
|
|
key_indicators = self._extract_key_indicators(response_body)
|
|
|
|
record = TestRecord(
|
|
vuln_type=vuln_type,
|
|
target_url=target_url,
|
|
status_code=status_code,
|
|
response_length=len(response_body),
|
|
is_true_positive=is_true_positive,
|
|
pattern_type=pattern_type,
|
|
key_indicators=key_indicators[:10],
|
|
notes=pattern_notes[:500],
|
|
timestamp=datetime.now().isoformat(),
|
|
)
|
|
self.records.append(record)
|
|
|
|
# Learn new pattern if we have enough data
|
|
self._maybe_learn_pattern(record, response_body)
|
|
|
|
# Auto-save periodically
|
|
if len(self.records) % 10 == 0:
|
|
self._save()
|
|
|
|
def _classify_response(self, body: str, status: int) -> str:
|
|
"""Classify the response into a pattern type."""
|
|
body_lower = body.lower().strip()
|
|
|
|
if len(body_lower) < 10:
|
|
return "empty_200"
|
|
|
|
# Check for denial indicators
|
|
denial = ["unauthorized", "forbidden", "access denied", "not authorized",
|
|
"permission denied", '"error":', '"success":false']
|
|
if sum(1 for d in denial if d in body_lower) >= 2:
|
|
return "denial_200"
|
|
|
|
# Check for login page
|
|
login = ["type=\"password\"", "sign in", "log in", "<form"]
|
|
if sum(1 for l in login if l in body_lower) >= 2:
|
|
return "login_redirect"
|
|
|
|
# Check for data fields
|
|
data = ['"email":', '"name":', '"phone":', '"address":',
|
|
'"role":', '"password":', '"token":']
|
|
if sum(1 for d in data if d in body_lower) >= 2:
|
|
return "cross_user_data" if status == 200 else "blocked_data"
|
|
|
|
return "unknown"
|
|
|
|
def _extract_key_indicators(self, body: str) -> List[str]:
|
|
"""Extract key string indicators from the response."""
|
|
indicators = []
|
|
body_lower = body.lower()
|
|
|
|
# Check for JSON keys
|
|
json_keys = re.findall(r'"(\w+)":', body[:2000])
|
|
indicators.extend(json_keys[:10])
|
|
|
|
# Check for specific patterns
|
|
patterns = {
|
|
"has_email": '"email":' in body_lower,
|
|
"has_name": '"name":' in body_lower,
|
|
"has_error": '"error":' in body_lower,
|
|
"has_success_false": '"success":false' in body_lower or '"success": false' in body_lower,
|
|
"has_login_form": 'type="password"' in body_lower,
|
|
"is_empty_array": body.strip() in ("[]", "{}"),
|
|
"has_html_form": "<form" in body_lower,
|
|
}
|
|
for key, present in patterns.items():
|
|
if present:
|
|
indicators.append(key)
|
|
|
|
return indicators
|
|
|
|
def _maybe_learn_pattern(self, record: TestRecord, body: str):
|
|
"""Learn a new pattern from a test record if it provides new insight."""
|
|
from urllib.parse import urlparse
|
|
|
|
domain = urlparse(record.target_url).netloc
|
|
body_excerpt = body[:500]
|
|
|
|
# Check if we already know this pattern for this domain
|
|
known = any(
|
|
p.target_domain == domain
|
|
and p.pattern_type == record.pattern_type
|
|
and p.vuln_type == record.vuln_type
|
|
for p in self.custom_patterns
|
|
)
|
|
if known:
|
|
return
|
|
|
|
# Learn new domain-specific pattern
|
|
pattern = ResponsePattern(
|
|
pattern_type=record.pattern_type,
|
|
indicators=record.key_indicators,
|
|
is_false_positive=not record.is_true_positive,
|
|
confidence=0.7, # Start with moderate confidence
|
|
example_body=body_excerpt,
|
|
vuln_type=record.vuln_type,
|
|
target_domain=domain,
|
|
timestamp=record.timestamp,
|
|
)
|
|
self.custom_patterns.append(pattern)
|
|
|
|
def get_patterns_for_target(self, domain: str) -> List[ResponsePattern]:
|
|
"""Get learned patterns for a specific target domain."""
|
|
return [
|
|
p for p in self.custom_patterns
|
|
if p.target_domain == domain
|
|
]
|
|
|
|
def get_false_positive_rate(self, vuln_type: str) -> float:
|
|
"""Get the false positive rate for a specific vuln type from historical data."""
|
|
type_records = [r for r in self.records if r.vuln_type == vuln_type]
|
|
if not type_records:
|
|
return 0.5 # No data → assume 50%
|
|
fp_count = sum(1 for r in type_records if not r.is_true_positive)
|
|
return fp_count / len(type_records)
|
|
|
|
def get_learning_context(self, vuln_type: str, domain: str = "") -> str:
|
|
"""Generate learning context for AI prompts.
|
|
|
|
Returns a formatted string with learned patterns and statistics
|
|
that can be injected into LLM prompts to improve access control testing.
|
|
"""
|
|
parts = []
|
|
|
|
# Historical stats
|
|
type_records = [r for r in self.records if r.vuln_type == vuln_type]
|
|
if type_records:
|
|
total = len(type_records)
|
|
tp = sum(1 for r in type_records if r.is_true_positive)
|
|
fp = total - tp
|
|
parts.append(
|
|
f"Historical {vuln_type} testing: {total} tests, "
|
|
f"{tp} true positives ({100*tp/total:.0f}%), "
|
|
f"{fp} false positives ({100*fp/total:.0f}%)"
|
|
)
|
|
|
|
# Most common FP patterns
|
|
fp_patterns = [r.pattern_type for r in type_records if not r.is_true_positive]
|
|
if fp_patterns:
|
|
from collections import Counter
|
|
common = Counter(fp_patterns).most_common(3)
|
|
pattern_str = ", ".join(f"{p} ({c}x)" for p, c in common)
|
|
parts.append(f"Common false positive patterns: {pattern_str}")
|
|
|
|
# Domain-specific patterns
|
|
if domain:
|
|
domain_patterns = self.get_patterns_for_target(domain)
|
|
if domain_patterns:
|
|
for p in domain_patterns[:5]:
|
|
status = "FALSE POSITIVE" if p.is_false_positive else "TRUE POSITIVE"
|
|
parts.append(
|
|
f"Known pattern for {domain}: {p.pattern_type} = {status} "
|
|
f"(confidence: {p.confidence:.0%})"
|
|
)
|
|
|
|
# Known application FP patterns
|
|
if domain:
|
|
for app_name, indicators in self.KNOWN_FP_PATTERNS.items():
|
|
if any(i in domain.lower() for i in indicators):
|
|
parts.append(
|
|
f"WARNING: Target appears to use {app_name} — "
|
|
f"known for producing false positive access control findings"
|
|
)
|
|
|
|
if not parts:
|
|
return ""
|
|
|
|
return "## Learned Access Control Patterns\n" + "\n".join(f"- {p}" for p in parts)
|
|
|
|
def get_evaluation_hints(self, vuln_type: str, response_body: str, status: int) -> Dict:
|
|
"""Get evaluation hints for a specific response.
|
|
|
|
Returns hints that can help the validation judge or AI make better decisions.
|
|
"""
|
|
pattern_type = self._classify_response(response_body, status)
|
|
indicators = self._extract_key_indicators(response_body)
|
|
|
|
# Check against default patterns
|
|
matching_default = [
|
|
p for p in self.DEFAULT_PATTERNS
|
|
if any(i.lower() in response_body.lower() for i in p["indicators"] if i)
|
|
]
|
|
|
|
# Check against learned patterns
|
|
matching_learned = [
|
|
p for p in self.custom_patterns
|
|
if p.vuln_type == vuln_type and p.pattern_type == pattern_type
|
|
]
|
|
|
|
fp_signals = sum(
|
|
1 for p in matching_default if p["is_false_positive"]
|
|
) + sum(
|
|
1 for p in matching_learned if p.is_false_positive
|
|
)
|
|
|
|
tp_signals = sum(
|
|
1 for p in matching_default if not p["is_false_positive"]
|
|
) + sum(
|
|
1 for p in matching_learned if not p.is_false_positive
|
|
)
|
|
|
|
return {
|
|
"pattern_type": pattern_type,
|
|
"indicators": indicators,
|
|
"fp_signals": fp_signals,
|
|
"tp_signals": tp_signals,
|
|
"likely_false_positive": fp_signals > tp_signals,
|
|
"matching_patterns": len(matching_default) + len(matching_learned),
|
|
}
|