""" NeuroSploit v3 - Access Control Learning Engine Adaptive learning system for BOLA/BFLA/IDOR and other access control testing. Records test outcomes and response patterns to improve future evaluations. Key insight: HTTP status codes are unreliable for access control testing. This module learns from actual response DATA patterns to distinguish: - True positives (cross-user data access) - False positives (error messages, login pages, empty responses with 200 status) Usage: learner = AccessControlLearner() # Record a test outcome learner.record_test(vuln_type, url, response_body, is_true_positive, pattern_notes) # Get learned patterns for a target patterns = learner.get_patterns_for_target(domain) # Get learning context for AI prompts context = learner.get_learning_context(vuln_type) """ import json import logging import re from dataclasses import dataclass, field, asdict from datetime import datetime from pathlib import Path from typing import Dict, List, Optional logger = logging.getLogger(__name__) DATA_DIR = Path(__file__).parent.parent.parent / "data" LEARNING_FILE = DATA_DIR / "access_control_learning.json" @dataclass class ResponsePattern: """A learned response pattern from access control testing.""" pattern_type: str # "denial", "empty", "login_page", "data_leak", "public_data" indicators: List[str] # Strings/patterns that identify this response type is_false_positive: bool # True if this pattern indicates a false positive confidence: float # 0.0-1.0 how reliable this pattern is example_body: str # Truncated example response body vuln_type: str # bola, bfla, idor, etc. target_domain: str # Domain this was learned from timestamp: str # When this was learned @dataclass class TestRecord: """Record of an access control test outcome.""" vuln_type: str target_url: str status_code: int response_length: int is_true_positive: bool pattern_type: str # What pattern was identified key_indicators: List[str] # What strings/patterns were decisive notes: str # Human or AI notes about why this was TP/FP timestamp: str class AccessControlLearner: """Adaptive learning engine for access control vulnerability testing. Learns from test outcomes to identify response patterns that indicate true vs false positives for BOLA, BFLA, IDOR, and related vuln types. """ MAX_RECORDS = 500 MAX_PATTERNS = 200 # Pre-seeded patterns from known false positive scenarios DEFAULT_PATTERNS: List[Dict] = [ { "pattern_type": "denial_200", "indicators": ["unauthorized", "forbidden", "access denied", "not authorized", "permission denied", "insufficient privileges"], "is_false_positive": True, "confidence": 0.9, "description": "Server returns 200 OK but body contains access denial message", }, { "pattern_type": "empty_200", "indicators": ["[]", "{}", '""', "null", ""], "is_false_positive": True, "confidence": 0.85, "description": "Server returns 200 OK with empty/null response body", }, { "pattern_type": "login_redirect", "indicators": ["type=\"password\"", "sign in", "log in", "login", "authentication required"], "is_false_positive": True, "confidence": 0.95, "description": "Server returns 200 OK but body is a login page", }, { "pattern_type": "error_json", "indicators": ['"error":', '"status":"error"', '"success":false', '"message":"not found"', '"code":401', '"code":403'], "is_false_positive": True, "confidence": 0.9, "description": "Server returns 200 OK but JSON body indicates error", }, { "pattern_type": "own_data", "indicators": [], "is_false_positive": True, "confidence": 0.8, "description": "Server returns authenticated user's own data regardless of requested ID", }, { "pattern_type": "public_data", "indicators": [], "is_false_positive": True, "confidence": 0.7, "description": "Response contains only public profile fields (username, bio) not private data", }, { "pattern_type": "cross_user_data", "indicators": ['"email":', '"phone":', '"address":', '"ssn":', '"credit_card":', '"password":', '"secret":'], "is_false_positive": False, "confidence": 0.9, "description": "Response contains another user's private data fields", }, { "pattern_type": "admin_data_leak", "indicators": ['"role":"admin"', '"is_admin":true', '"users":[', '"audit_log":', '"system_config":'], "is_false_positive": False, "confidence": 0.9, "description": "Response contains admin-level data accessible to non-admin user", }, { "pattern_type": "state_change", "indicators": ['"updated":', '"deleted":', '"created":', '"modified":', '"success":true'], "is_false_positive": False, "confidence": 0.85, "description": "Write operation succeeded on another user's resource", }, ] # Known application patterns that cause false positives KNOWN_FP_PATTERNS: Dict[str, List[str]] = { "wso2": ["wso2", "carbon", "identity server", "api manager"], "keycloak": ["keycloak", "red hat sso"], "spring_security": ["spring security", "whitelabel error"], "oauth2_proxy": ["oauth2-proxy", "sign in with"], "cloudflare": ["cloudflare", "cf-ray", "attention required"], "aws_waf": ["aws-waf", "request blocked"], } def __init__(self, data_dir: Optional[Path] = None): self.data_dir = data_dir or DATA_DIR self.learning_file = self.data_dir / "access_control_learning.json" self.records: List[TestRecord] = [] self.custom_patterns: List[ResponsePattern] = [] self._load() def _load(self): """Load learning data from disk.""" try: if self.learning_file.exists(): with open(self.learning_file, "r") as f: data = json.load(f) self.records = [ TestRecord(**r) for r in data.get("records", []) ] self.custom_patterns = [ ResponsePattern(**p) for p in data.get("patterns", []) ] logger.debug(f"Loaded {len(self.records)} records, {len(self.custom_patterns)} patterns") except Exception as e: logger.debug(f"Failed to load learning data: {e}") def _save(self): """Save learning data to disk.""" try: self.data_dir.mkdir(parents=True, exist_ok=True) data = { "records": [asdict(r) for r in self.records[-self.MAX_RECORDS:]], "patterns": [asdict(p) for p in self.custom_patterns[-self.MAX_PATTERNS:]], "metadata": { "total_records": len(self.records), "total_patterns": len(self.custom_patterns), "last_updated": datetime.now().isoformat(), }, } with open(self.learning_file, "w") as f: json.dump(data, f, indent=2) except Exception as e: logger.debug(f"Failed to save learning data: {e}") def record_test( self, vuln_type: str, target_url: str, status_code: int, response_body: str, is_true_positive: bool, pattern_notes: str = "", ): """Record an access control test outcome for learning. Called after the validation judge makes a decision, with the verified outcome (true positive or false positive). """ # Identify response pattern pattern_type = self._classify_response(response_body, status_code) key_indicators = self._extract_key_indicators(response_body) record = TestRecord( vuln_type=vuln_type, target_url=target_url, status_code=status_code, response_length=len(response_body), is_true_positive=is_true_positive, pattern_type=pattern_type, key_indicators=key_indicators[:10], notes=pattern_notes[:500], timestamp=datetime.now().isoformat(), ) self.records.append(record) # Learn new pattern if we have enough data self._maybe_learn_pattern(record, response_body) # Auto-save periodically if len(self.records) % 10 == 0: self._save() def _classify_response(self, body: str, status: int) -> str: """Classify the response into a pattern type.""" body_lower = body.lower().strip() if len(body_lower) < 10: return "empty_200" # Check for denial indicators denial = ["unauthorized", "forbidden", "access denied", "not authorized", "permission denied", '"error":', '"success":false'] if sum(1 for d in denial if d in body_lower) >= 2: return "denial_200" # Check for login page login = ["type=\"password\"", "sign in", "log in", "