Files
NeuroSploit/tools/recon/recon_tools.py
2026-01-19 19:24:02 -03:00

2858 lines
106 KiB
Python

#!/usr/bin/env python3
"""
NeuroSploit Advanced Reconnaissance Module
Deep enumeration with multiple tools and techniques
"""
import subprocess
import json
import re
import socket
import requests
import shutil
import os
import sys
import concurrent.futures
import hashlib
import base64
import tempfile
import time
from typing import Dict, List, Optional, Set, Tuple, Any
from collections import defaultdict
import logging
from urllib.parse import urlparse, parse_qs, urljoin, quote
from pathlib import Path
from dataclasses import dataclass, field
try:
import dns.resolver
except ImportError:
dns = None
logger = logging.getLogger(__name__)
# Disable SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# =============================================================================
# CONFIGURATION
# =============================================================================
SECLISTS_BASE = "/opt/wordlists/SecLists"
WORDLISTS = {
"directories_small": f"{SECLISTS_BASE}/Discovery/Web-Content/directory-list-2.3-small.txt",
"directories_medium": f"{SECLISTS_BASE}/Discovery/Web-Content/raft-medium-directories.txt",
"directories_big": f"{SECLISTS_BASE}/Discovery/Web-Content/directory-list-2.3-big.txt",
"common": f"{SECLISTS_BASE}/Discovery/Web-Content/common.txt",
"subdomains_small": f"{SECLISTS_BASE}/Discovery/DNS/subdomains-top1million-5000.txt",
"subdomains_medium": f"{SECLISTS_BASE}/Discovery/DNS/subdomains-top1million-20000.txt",
"subdomains_big": f"{SECLISTS_BASE}/Discovery/DNS/subdomains-top1million-110000.txt",
"dns_jhaddix": f"{SECLISTS_BASE}/Discovery/DNS/dns-Jhaddix.txt",
"params": f"{SECLISTS_BASE}/Discovery/Web-Content/burp-parameter-names.txt",
"api_endpoints": f"{SECLISTS_BASE}/Discovery/Web-Content/api/api-endpoints.txt",
"backup_files": f"{SECLISTS_BASE}/Discovery/Web-Content/Common-DB-Backups.txt",
}
# Common ports for fast scan
COMMON_PORTS = "21,22,23,25,53,80,110,111,135,139,143,443,445,993,995,1433,1521,2049,3306,3389,5432,5900,6379,8000,8080,8443,8888,9000,9200,27017"
TOP_1000_PORTS = "1-1000"
FULL_PORTS = "1-65535"
# Patterns for sensitive data extraction
SECRET_PATTERNS = {
"aws_key": r"(?:AKIA|A3T|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{16}",
"aws_secret": r"(?i)aws(.{0,20})?(?-i)['\"][0-9a-zA-Z/+]{40}['\"]",
"github_token": r"ghp_[a-zA-Z0-9]{36}|github_pat_[a-zA-Z0-9]{22}_[a-zA-Z0-9]{59}",
"google_api": r"AIza[0-9A-Za-z\\-_]{35}",
"slack_token": r"xox[baprs]-[0-9]{10,13}-[0-9]{10,13}[a-zA-Z0-9-]*",
"jwt": r"eyJ[A-Za-z0-9-_=]+\.eyJ[A-Za-z0-9-_=]+\.?[A-Za-z0-9-_.+/=]*",
"private_key": r"-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----",
"password_field": r"(?i)(?:password|passwd|pwd|secret|token|api_key|apikey|auth)[\s]*[=:]\s*['\"]?[^\s'\"]+",
"internal_ip": r"(?:10|172\.(?:1[6-9]|2[0-9]|3[01])|192\.168)\.\d{1,3}\.\d{1,3}",
"s3_bucket": r"(?:s3://|s3\.amazonaws\.com/|s3-[\w-]+\.amazonaws\.com/)[\w.-]+",
"firebase": r"https://[\w-]+\.firebaseio\.com",
"bearer_token": r"(?i)bearer\s+[a-zA-Z0-9\-_.~+/]+=*",
}
# CNAME records indicating potential takeover
TAKEOVER_CNAMES = {
"github.io": "GitHub Pages",
"herokuapp.com": "Heroku",
"pantheonsite.io": "Pantheon",
"domains.tumblr.com": "Tumblr",
"wpengine.com": "WP Engine",
"ghost.io": "Ghost",
"myshopify.com": "Shopify",
"surge.sh": "Surge.sh",
"bitbucket.io": "Bitbucket",
"freshdesk.com": "Freshdesk",
"zendesk.com": "Zendesk",
"uservoice.com": "UserVoice",
"teamwork.com": "Teamwork",
"helpjuice.com": "Helpjuice",
"helpscoutdocs.com": "HelpScout",
"feedpress.me": "Feedpress",
"readme.io": "Readme.io",
"statuspage.io": "Statuspage",
"azurewebsites.net": "Azure",
"cloudapp.net": "Azure",
"trafficmanager.net": "Azure",
"blob.core.windows.net": "Azure Blob",
"cloudfront.net": "AWS CloudFront",
"s3.amazonaws.com": "AWS S3",
"elasticbeanstalk.com": "AWS Elastic Beanstalk",
"amazonaws.com": "AWS",
"storage.googleapis.com": "Google Cloud Storage",
"appspot.com": "Google App Engine",
"firebaseapp.com": "Firebase",
"netlify.app": "Netlify",
"vercel.app": "Vercel",
"now.sh": "Vercel",
"fly.dev": "Fly.io",
"render.com": "Render",
}
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def check_tool(tool_name: str) -> Tuple[bool, Optional[str]]:
"""Check if a tool is installed and return its path."""
path = shutil.which(tool_name)
return (path is not None, path)
def run_tool(cmd: List[str], timeout: int = 300, stdin_data: str = None) -> Dict:
"""Execute a tool and return structured results."""
result = {
"tool": cmd[0] if cmd else "unknown",
"command": " ".join(cmd),
"success": False,
"stdout": "",
"stderr": "",
"exit_code": -1,
"timed_out": False
}
tool_path = shutil.which(cmd[0])
if not tool_path:
result["stderr"] = f"Tool '{cmd[0]}' not found in PATH"
return result
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
input=stdin_data
)
result["stdout"] = proc.stdout
result["stderr"] = proc.stderr
result["exit_code"] = proc.returncode
result["success"] = proc.returncode == 0
except subprocess.TimeoutExpired:
result["stderr"] = f"Timeout after {timeout}s"
result["timed_out"] = True
except Exception as e:
result["stderr"] = str(e)
return result
def get_wordlist(name: str, fallback: str = None) -> Optional[str]:
"""Get wordlist path, checking if it exists."""
path = WORDLISTS.get(name)
if path and os.path.exists(path):
return path
if fallback and os.path.exists(fallback):
return fallback
return None
def extract_domain(target: str) -> str:
"""Extract domain from URL or return as-is."""
if target.startswith(('http://', 'https://')):
return urlparse(target).netloc
return target
def make_url(host: str, scheme: str = "https") -> str:
"""Ensure host has proper URL format."""
if host.startswith(('http://', 'https://')):
return host
return f"{scheme}://{host}"
def print_phase(phase_num: int, title: str):
"""Print phase header."""
print(f"\n{'='*60}")
print(f"[PHASE {phase_num}] {title}")
print(f"{'='*60}")
def print_result(icon: str, msg: str):
"""Print formatted result."""
print(f" {icon} {msg}")
# =============================================================================
# ADVANCED SUBDOMAIN ENUMERATION
# =============================================================================
class AdvancedSubdomainEnum:
"""Deep subdomain enumeration using multiple tools and techniques."""
TOOLS = ['subfinder', 'amass', 'assetfinder', 'findomain', 'puredns', 'shuffledns']
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 300)
def enumerate(self, domain: str, depth: str = "medium") -> Dict:
"""
Enumerate subdomains with multiple tools.
Args:
domain: Target domain
depth: quick, medium, deep
"""
logger.info(f"[*] Subdomain enumeration for: {domain}")
print(f"[*] Starting subdomain enumeration for: {domain}")
print(f" Depth: {depth}")
all_subdomains: Set[str] = set()
results = {
"domain": domain,
"subdomains": [],
"by_tool": {},
"crt_sh": [],
"dns_bruteforce": []
}
# 1. Certificate Transparency (crt.sh) - Always run first (passive)
print_result("[~]", "Querying Certificate Transparency logs (crt.sh)...")
crt_subs = self._crtsh_enum(domain)
results["crt_sh"] = crt_subs
all_subdomains.update(crt_subs)
print_result("[+]", f"crt.sh: {len(crt_subs)} subdomains")
# 2. Run enumeration tools
tools_to_run = self.TOOLS if depth == "deep" else self.TOOLS[:4]
for tool in tools_to_run:
installed, _ = check_tool(tool)
if not installed:
continue
print_result("[~]", f"Running {tool}...")
tool_subs = self._run_tool(tool, domain)
results["by_tool"][tool] = tool_subs
all_subdomains.update(tool_subs)
print_result("[+]", f"{tool}: {len(tool_subs)} subdomains")
# 3. DNS Bruteforce (for deep mode)
if depth == "deep":
wordlist = get_wordlist("subdomains_medium")
if wordlist:
print_result("[~]", "Running DNS bruteforce...")
brute_subs = self._dns_bruteforce(domain, wordlist)
results["dns_bruteforce"] = brute_subs
all_subdomains.update(brute_subs)
print_result("[+]", f"Bruteforce: {len(brute_subs)} subdomains")
# 4. Permutation/mutation (for deep mode)
if depth == "deep" and all_subdomains:
print_result("[~]", "Generating permutations...")
perms = self._generate_permutations(list(all_subdomains)[:100], domain)
valid_perms = self._resolve_subdomains(perms)
all_subdomains.update(valid_perms)
print_result("[+]", f"Permutations: {len(valid_perms)} valid")
results["subdomains"] = sorted(list(all_subdomains))
results["total"] = len(all_subdomains)
print_result("[✓]", f"Total unique subdomains: {len(all_subdomains)}")
return results
def _crtsh_enum(self, domain: str) -> List[str]:
"""Query crt.sh Certificate Transparency logs."""
subdomains = set()
try:
url = f"https://crt.sh/?q=%.{domain}&output=json"
resp = requests.get(url, timeout=30)
if resp.status_code == 200:
data = resp.json()
for entry in data:
name = entry.get("name_value", "")
for sub in name.split("\n"):
sub = sub.strip().lower()
if sub and "*" not in sub and domain in sub:
subdomains.add(sub)
except Exception as e:
logger.warning(f"crt.sh error: {e}")
return list(subdomains)
def _run_tool(self, tool: str, domain: str) -> List[str]:
"""Run specific subdomain enumeration tool."""
subdomains = []
cmd_map = {
"subfinder": ["subfinder", "-d", domain, "-silent", "-all"],
"amass": ["amass", "enum", "-passive", "-d", domain, "-silent"],
"assetfinder": ["assetfinder", "--subs-only", domain],
"findomain": ["findomain", "-t", domain, "-q"],
"puredns": ["puredns", "bruteforce", get_wordlist("subdomains_small") or "", domain, "-q"],
"shuffledns": ["shuffledns", "-d", domain, "-w", get_wordlist("subdomains_small") or "", "-silent"]
}
cmd = cmd_map.get(tool)
if not cmd:
return []
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
sub = line.strip().lower()
if sub and domain in sub and "*" not in sub:
subdomains.append(sub)
return subdomains
def _dns_bruteforce(self, domain: str, wordlist: str) -> List[str]:
"""DNS bruteforce using wordlist."""
found = []
try:
with open(wordlist, 'r') as f:
words = [w.strip() for w in f.readlines()[:5000]]
def check_sub(word):
subdomain = f"{word}.{domain}"
try:
socket.gethostbyname(subdomain)
return subdomain
except:
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(check_sub, words)
found = [r for r in results if r]
except Exception as e:
logger.warning(f"DNS bruteforce error: {e}")
return found
def _generate_permutations(self, subdomains: List[str], domain: str) -> List[str]:
"""Generate subdomain permutations."""
permutations = set()
prefixes = ['dev', 'staging', 'stage', 'test', 'uat', 'qa', 'prod', 'api', 'admin', 'internal', 'private', 'beta', 'alpha', 'old', 'new', 'v1', 'v2']
suffixes = ['-dev', '-staging', '-test', '-api', '-admin', '-internal', '2', '-v2', '-old', '-new']
for sub in subdomains:
parts = sub.replace(f".{domain}", "").split(".")
if parts:
base = parts[0]
for prefix in prefixes:
permutations.add(f"{prefix}.{sub}")
permutations.add(f"{prefix}-{base}.{domain}")
for suffix in suffixes:
permutations.add(f"{base}{suffix}.{domain}")
return list(permutations)[:1000]
def _resolve_subdomains(self, subdomains: List[str]) -> List[str]:
"""Resolve subdomains to check if they exist."""
valid = []
def resolve(sub):
try:
socket.gethostbyname(sub)
return sub
except:
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
results = executor.map(resolve, subdomains)
valid = [r for r in results if r]
return valid
# =============================================================================
# HTTP PROBING
# =============================================================================
class HttpProber:
"""Advanced HTTP probing with technology detection."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 180)
def probe(self, hosts: List[str]) -> Dict:
"""Probe hosts for HTTP/HTTPS with detailed info."""
logger.info(f"[*] Probing {len(hosts)} hosts...")
print(f"[*] Probing {len(hosts)} hosts for HTTP/HTTPS...")
results = {
"total_input": len(hosts),
"alive": [],
"details": {},
"technologies": {},
"status_codes": {},
"by_status": defaultdict(list),
"redirects": [],
"interesting": []
}
httpx_ok, _ = check_tool("httpx")
if httpx_ok:
results = self._run_httpx(hosts)
else:
results = self._manual_probe(hosts)
# Identify interesting hosts
results["interesting"] = self._identify_interesting(results)
print_result("[+]", f"Alive hosts: {len(results['alive'])}")
print_result("[+]", f"Technologies found: {len(results['technologies'])}")
if results["interesting"]:
print_result("[!]", f"Interesting hosts: {len(results['interesting'])}")
return results
def _run_httpx(self, hosts: List[str]) -> Dict:
"""Run httpx with maximum output."""
results = {
"total_input": len(hosts),
"alive": [],
"details": {},
"technologies": {},
"status_codes": {},
"by_status": defaultdict(list),
"redirects": [],
"interesting": []
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write('\n'.join(hosts))
hosts_file = f.name
try:
cmd = [
"httpx", "-l", hosts_file, "-silent",
"-status-code", "-content-length", "-title", "-tech-detect",
"-web-server", "-cdn", "-follow-redirects", "-json",
"-threads", "50", "-timeout", "10"
]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
if not line.strip():
continue
try:
data = json.loads(line)
url = data.get("url", "")
if url:
results["alive"].append(url)
# Store detailed info
results["details"][url] = {
"status": data.get("status_code"),
"title": data.get("title", ""),
"server": data.get("webserver", ""),
"content_length": data.get("content_length", 0),
"technologies": data.get("tech", []),
"cdn": data.get("cdn", False),
"final_url": data.get("final_url", url)
}
# Track redirects
if data.get("final_url") and data["final_url"] != url:
results["redirects"].append({
"from": url,
"to": data["final_url"]
})
# Technologies
for tech in data.get("tech", []):
results["technologies"][tech] = results["technologies"].get(tech, 0) + 1
# Status codes
status = str(data.get("status_code", ""))
if status:
results["status_codes"][status] = results["status_codes"].get(status, 0) + 1
results["by_status"][status].append(url)
except json.JSONDecodeError:
continue
finally:
os.unlink(hosts_file)
return results
def _manual_probe(self, hosts: List[str]) -> Dict:
"""Manual HTTP probing fallback."""
results = {
"total_input": len(hosts),
"alive": [],
"details": {},
"technologies": {},
"status_codes": {},
"by_status": defaultdict(list),
"redirects": [],
"interesting": []
}
def probe_host(host):
for scheme in ['https', 'http']:
url = make_url(host, scheme)
try:
resp = requests.get(url, timeout=10, verify=False, allow_redirects=True)
return {
"url": url,
"status": resp.status_code,
"title": re.search(r'<title>(.*?)</title>', resp.text, re.I),
"server": resp.headers.get("Server", ""),
"headers": dict(resp.headers)
}
except:
continue
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(probe_host, h): h for h in hosts[:500]}
for future in concurrent.futures.as_completed(futures):
try:
data = future.result()
if data:
url = data["url"]
results["alive"].append(url)
results["details"][url] = data
status = str(data["status"])
results["status_codes"][status] = results["status_codes"].get(status, 0) + 1
results["by_status"][status].append(url)
except:
continue
return results
def _identify_interesting(self, results: Dict) -> List[Dict]:
"""Identify potentially interesting hosts."""
interesting = []
for url, details in results.get("details", {}).items():
reasons = []
# Check for interesting status codes
status = details.get("status", 0)
if status in [401, 403, 500, 502, 503]:
reasons.append(f"Status {status}")
# Check for interesting titles
title = details.get("title", "").lower()
interesting_titles = ['admin', 'login', 'dashboard', 'panel', 'jenkins', 'gitlab', 'jira', 'confluence', 'kibana', 'grafana', 'debug', 'staging', 'internal']
for t in interesting_titles:
if t in title:
reasons.append(f"Title contains '{t}'")
break
# Check for interesting technologies
techs = details.get("technologies", [])
risky_techs = ['Apache Tomcat', 'Jenkins', 'GitLab', 'Jira', 'Confluence', 'Elasticsearch', 'Kibana', 'Grafana', 'phpMyAdmin', 'WordPress', 'Drupal']
for tech in techs:
if any(rt.lower() in tech.lower() for rt in risky_techs):
reasons.append(f"Technology: {tech}")
if reasons:
interesting.append({"url": url, "reasons": reasons})
return interesting
# =============================================================================
# DIRECTORY BRUTEFORCE WITH FEROXBUSTER
# =============================================================================
class DirectoryBruter:
"""Directory/file bruteforcing using feroxbuster or fallbacks."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 600)
def bruteforce(self, target: str, wordlist_size: str = "medium", extensions: List[str] = None) -> Dict:
"""
Directory bruteforce using feroxbuster.
Args:
target: Target URL
wordlist_size: small, medium, big
extensions: File extensions to check
"""
logger.info(f"[*] Directory bruteforce on: {target}")
print(f"[*] Starting directory bruteforce on: {target}")
results = {
"target": target,
"directories": [],
"files": [],
"interesting": [],
"status_codes": {},
"total": 0
}
# Get wordlist
wordlist_key = f"directories_{wordlist_size}"
wordlist = get_wordlist(wordlist_key, WORDLISTS.get("common"))
if not wordlist:
print_result("[-]", "No wordlist available")
return results
print_result("[~]", f"Using wordlist: {os.path.basename(wordlist)}")
# Default extensions
if not extensions:
extensions = ["php", "asp", "aspx", "jsp", "html", "js", "json", "xml", "txt", "bak", "old", "conf", "config", "sql", "zip", "tar.gz", "log"]
ferox_ok, _ = check_tool("feroxbuster")
if ferox_ok:
results = self._run_feroxbuster(target, wordlist, extensions)
else:
# Fallback to gobuster or ffuf
gobuster_ok, _ = check_tool("gobuster")
if gobuster_ok:
results = self._run_gobuster(target, wordlist, extensions)
else:
ffuf_ok, _ = check_tool("ffuf")
if ffuf_ok:
results = self._run_ffuf(target, wordlist, extensions)
else:
print_result("[-]", "No directory bruteforce tool available")
return results
# Identify interesting findings
results["interesting"] = self._identify_interesting(results)
print_result("[+]", f"Total found: {results['total']}")
print_result("[+]", f"Directories: {len(results['directories'])}")
print_result("[+]", f"Files: {len(results['files'])}")
if results["interesting"]:
print_result("[!]", f"Interesting: {len(results['interesting'])}")
return results
def _run_feroxbuster(self, target: str, wordlist: str, extensions: List[str]) -> Dict:
"""Run feroxbuster for directory bruteforce."""
results = {
"target": target,
"directories": [],
"files": [],
"interesting": [],
"status_codes": {},
"total": 0
}
ext_str = ",".join(extensions)
cmd = [
"feroxbuster",
"-u", target,
"-w", wordlist,
"-x", ext_str,
"-t", "50",
"-C", "404,400",
"--silent",
"--no-state",
"-o", "-",
"--json"
]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
if not line.strip() or not line.startswith('{'):
continue
try:
data = json.loads(line)
if data.get("type") == "response":
entry = {
"url": data.get("url", ""),
"status": data.get("status", 0),
"size": data.get("content_length", 0),
"words": data.get("word_count", 0),
"lines": data.get("line_count", 0)
}
if entry["url"]:
results["total"] += 1
status = str(entry["status"])
results["status_codes"][status] = results["status_codes"].get(status, 0) + 1
if entry["url"].endswith('/'):
results["directories"].append(entry)
else:
results["files"].append(entry)
except:
continue
return results
def _run_gobuster(self, target: str, wordlist: str, extensions: List[str]) -> Dict:
"""Run gobuster as fallback."""
results = {
"target": target,
"directories": [],
"files": [],
"interesting": [],
"status_codes": {},
"total": 0
}
ext_str = ",".join(extensions)
cmd = [
"gobuster", "dir",
"-u", target,
"-w", wordlist,
"-x", ext_str,
"-t", "50",
"-q",
"--no-error"
]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
pattern = r"(\S+)\s+\(Status:\s*(\d+)\)"
for match in re.finditer(pattern, result["stdout"]):
path, status = match.groups()
entry = {"url": urljoin(target, path), "status": int(status), "size": 0}
results["total"] += 1
results["status_codes"][status] = results["status_codes"].get(status, 0) + 1
if path.endswith('/'):
results["directories"].append(entry)
else:
results["files"].append(entry)
return results
def _run_ffuf(self, target: str, wordlist: str, extensions: List[str]) -> Dict:
"""Run ffuf as fallback."""
results = {
"target": target,
"directories": [],
"files": [],
"interesting": [],
"status_codes": {},
"total": 0
}
fuzz_url = f"{target.rstrip('/')}/FUZZ"
cmd = [
"ffuf",
"-u", fuzz_url,
"-w", wordlist,
"-t", "50",
"-mc", "200,201,204,301,302,307,308,401,403,405,500",
"-o", "-",
"-of", "json",
"-s"
]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
try:
data = json.loads(result["stdout"])
for entry in data.get("results", []):
item = {
"url": entry.get("url", ""),
"status": entry.get("status", 0),
"size": entry.get("length", 0)
}
results["total"] += 1
status = str(item["status"])
results["status_codes"][status] = results["status_codes"].get(status, 0) + 1
if item["url"].endswith('/'):
results["directories"].append(item)
else:
results["files"].append(item)
except:
pass
return results
def _identify_interesting(self, results: Dict) -> List[Dict]:
"""Identify interesting findings."""
interesting = []
interesting_patterns = [
r'\.(?:bak|backup|old|orig|save|swp|tmp)$',
r'\.(?:sql|db|mdb|sqlite)$',
r'\.(?:conf|config|cfg|ini|env)$',
r'\.(?:log|logs)$',
r'(?:admin|login|dashboard|panel|console)',
r'(?:upload|uploads|files|backup)',
r'(?:api|v1|v2|graphql)',
r'(?:\.git|\.svn|\.hg)',
r'(?:phpinfo|info\.php|test\.php)',
r'(?:wp-admin|wp-content|wp-includes)',
r'(?:install|setup|config)',
]
all_items = results["directories"] + results["files"]
for item in all_items:
url = item.get("url", "").lower()
for pattern in interesting_patterns:
if re.search(pattern, url):
interesting.append(item)
break
return interesting
# =============================================================================
# PARAMETER SPIDER
# =============================================================================
class ParamSpider:
"""Parameter discovery using paramspider and analysis."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 300)
def spider(self, domain: str) -> Dict:
"""Discover parameters from various sources."""
logger.info(f"[*] Parameter discovery for: {domain}")
print(f"[*] Starting parameter discovery for: {domain}")
results = {
"domain": domain,
"urls_with_params": [],
"unique_params": set(),
"by_param": defaultdict(list),
"interesting_params": [],
"total": 0
}
# Try paramspider
paramspider_ok, _ = check_tool("paramspider")
if paramspider_ok:
print_result("[~]", "Running paramspider...")
ps_results = self._run_paramspider(domain)
results["urls_with_params"].extend(ps_results)
else:
print_result("[-]", "paramspider not available, using alternative methods...")
# Also collect from gau/waybackurls
print_result("[~]", "Collecting URLs from archives...")
archive_urls = self._collect_archive_urls(domain)
# Parse parameters
all_urls = results["urls_with_params"] + archive_urls
for url in all_urls:
params = self._extract_params(url)
for param in params:
results["unique_params"].add(param)
results["by_param"][param].append(url)
results["unique_params"] = list(results["unique_params"])
results["total"] = len(all_urls)
# Identify interesting parameters
results["interesting_params"] = self._identify_interesting_params(results["unique_params"])
# Convert defaultdict to regular dict for JSON serialization
results["by_param"] = dict(results["by_param"])
print_result("[+]", f"URLs with params: {len(all_urls)}")
print_result("[+]", f"Unique parameters: {len(results['unique_params'])}")
if results["interesting_params"]:
print_result("[!]", f"Interesting params: {', '.join(results['interesting_params'][:10])}")
return results
def _run_paramspider(self, domain: str) -> List[str]:
"""Run paramspider tool."""
urls = []
with tempfile.TemporaryDirectory() as tmpdir:
cmd = ["paramspider", "-d", domain, "-o", tmpdir, "-s"]
result = run_tool(cmd, self.timeout)
# Read output files
for f in Path(tmpdir).glob("*.txt"):
try:
with open(f, 'r') as file:
urls.extend([line.strip() for line in file if '=' in line])
except:
continue
return urls
def _collect_archive_urls(self, domain: str) -> List[str]:
"""Collect URLs with parameters from archives."""
urls = []
# Try gau
gau_ok, _ = check_tool("gau")
if gau_ok:
result = run_tool(["gau", "--subs", domain], self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
url = line.strip()
if '?' in url and '=' in url:
urls.append(url)
# Try waybackurls
wayback_ok, _ = check_tool("waybackurls")
if wayback_ok:
result = run_tool(["waybackurls", domain], self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
url = line.strip()
if '?' in url and '=' in url:
urls.append(url)
return list(set(urls))
def _extract_params(self, url: str) -> List[str]:
"""Extract parameter names from URL."""
params = []
try:
parsed = urlparse(url)
query = parse_qs(parsed.query)
params = list(query.keys())
except:
pass
return params
def _identify_interesting_params(self, params: List[str]) -> List[str]:
"""Identify potentially interesting/vulnerable parameters."""
interesting = []
sqli_params = ['id', 'pid', 'uid', 'userid', 'user_id', 'item', 'itemid', 'cat', 'category', 'page', 'p', 'q', 'query', 'search', 's', 'keyword', 'order', 'sort', 'filter']
xss_params = ['q', 'query', 'search', 's', 'keyword', 'name', 'username', 'user', 'email', 'message', 'msg', 'comment', 'text', 'content', 'title', 'desc', 'description', 'error', 'err', 'ref', 'callback', 'redirect', 'url', 'return', 'returnUrl', 'return_url', 'next', 'goto', 'dest', 'destination', 'redir']
lfi_params = ['file', 'filename', 'path', 'filepath', 'page', 'include', 'inc', 'dir', 'document', 'doc', 'folder', 'root', 'pg', 'template', 'view']
ssrf_params = ['url', 'uri', 'link', 'src', 'source', 'dest', 'redirect', 'uri', 'path', 'continue', 'return', 'page', 'feed', 'host', 'site', 'html', 'domain', 'callback', 'api']
rce_params = ['cmd', 'exec', 'command', 'execute', 'ping', 'query', 'jump', 'code', 'reg', 'do', 'func', 'arg', 'option', 'load', 'process', 'step', 'read', 'function', 'req', 'feature', 'exe', 'module', 'payload', 'run', 'print']
idor_params = ['id', 'user', 'userid', 'user_id', 'account', 'account_id', 'accountid', 'uid', 'pid', 'profile', 'profile_id', 'doc', 'document', 'order', 'order_id', 'orderid', 'invoice', 'invoice_id', 'number', 'no']
all_interesting = set(sqli_params + xss_params + lfi_params + ssrf_params + rce_params + idor_params)
for param in params:
param_lower = param.lower()
if param_lower in all_interesting:
interesting.append(param)
return interesting
# =============================================================================
# URL COLLECTION
# =============================================================================
class URLCollector:
"""Collect URLs using multiple passive sources."""
TOOLS = ['gau', 'waybackurls', 'waymore', 'hakrawler']
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 300)
def collect(self, domain: str) -> Dict:
"""Collect URLs from passive sources."""
logger.info(f"[*] URL collection for: {domain}")
print(f"[*] Collecting URLs for: {domain}")
all_urls: Set[str] = set()
urls_with_params: Set[str] = set()
js_files: Set[str] = set()
api_endpoints: Set[str] = set()
results = {
"domain": domain,
"urls": [],
"urls_with_params": [],
"js_files": [],
"api_endpoints": [],
"by_tool": {},
"by_extension": defaultdict(list),
"total": 0
}
for tool in self.TOOLS:
installed, _ = check_tool(tool)
if not installed:
continue
print_result("[~]", f"Running {tool}...")
tool_urls = self._run_tool(tool, domain)
results["by_tool"][tool] = len(tool_urls)
for url in tool_urls:
all_urls.add(url)
# Categorize
url_lower = url.lower()
if '?' in url and '=' in url:
urls_with_params.add(url)
if '.js' in url_lower:
js_files.add(url)
if any(x in url_lower for x in ['/api/', '/v1/', '/v2/', '/v3/', '/graphql', '/rest/', '/json/']):
api_endpoints.add(url)
# By extension
ext_match = re.search(r'\.(\w{2,5})(?:\?|$)', url_lower)
if ext_match:
ext = ext_match.group(1)
results["by_extension"][ext].append(url)
print_result("[+]", f"{tool}: {len(tool_urls)} URLs")
results["urls"] = list(all_urls)
results["urls_with_params"] = list(urls_with_params)
results["js_files"] = list(js_files)
results["api_endpoints"] = list(api_endpoints)
results["total"] = len(all_urls)
results["by_extension"] = dict(results["by_extension"])
print_result("[✓]", f"Total unique URLs: {len(all_urls)}")
print_result("[+]", f"URLs with params: {len(urls_with_params)}")
print_result("[+]", f"JS files: {len(js_files)}")
print_result("[+]", f"API endpoints: {len(api_endpoints)}")
return results
def _run_tool(self, tool: str, domain: str) -> List[str]:
"""Run URL collection tool."""
urls = []
cmd_map = {
"gau": ["gau", "--subs", domain],
"waybackurls": ["waybackurls", domain],
"waymore": ["waymore", "-i", domain, "-mode", "U", "-oU", "-"],
"hakrawler": ["hakrawler", "-url", f"https://{domain}", "-subs", "-plain"]
}
cmd = cmd_map.get(tool)
if not cmd:
return []
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
url = line.strip()
if url and url.startswith(('http://', 'https://')):
urls.append(url)
return urls
# =============================================================================
# WEB CRAWLER
# =============================================================================
class WebCrawler:
"""Web crawling with katana or gospider."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 300)
self.depth = self.config.get('crawl_depth', 3)
def crawl(self, target: str) -> Dict:
"""Crawl target to discover URLs, forms, endpoints."""
logger.info(f"[*] Crawling: {target}")
print(f"[*] Crawling: {target}")
results = {
"target": target,
"urls": [],
"forms": [],
"js_files": [],
"api_endpoints": [],
"params": [],
"comments": [],
"total": 0
}
katana_ok, _ = check_tool("katana")
if katana_ok:
results = self._run_katana(target)
else:
gospider_ok, _ = check_tool("gospider")
if gospider_ok:
results = self._run_gospider(target)
print_result("[+]", f"URLs discovered: {len(results.get('urls', []))}")
print_result("[+]", f"JS files: {len(results.get('js_files', []))}")
print_result("[+]", f"API endpoints: {len(results.get('api_endpoints', []))}")
return results
def _run_katana(self, target: str) -> Dict:
"""Run katana for advanced crawling."""
results = {
"target": target,
"urls": [],
"forms": [],
"js_files": [],
"api_endpoints": [],
"params": [],
"comments": [],
"total": 0
}
cmd = [
"katana",
"-u", target,
"-d", str(self.depth),
"-silent",
"-jc", # JavaScript crawling
"-kf", "all", # Known files
"-ef", "css,png,jpg,jpeg,gif,svg,ico,woff,woff2,ttf,eot",
"-ct", "60", # Concurrency
"-timeout", "10",
"-aff" # Automatic form filling
]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
url = line.strip()
if not url:
continue
results["urls"].append(url)
results["total"] += 1
url_lower = url.lower()
if '.js' in url_lower:
results["js_files"].append(url)
if any(x in url_lower for x in ['/api/', '/v1/', '/v2/', '/graphql', '/rest/']):
results["api_endpoints"].append(url)
if '?' in url and '=' in url:
results["params"].append(url)
return results
def _run_gospider(self, target: str) -> Dict:
"""Run gospider as fallback."""
results = {
"target": target,
"urls": [],
"forms": [],
"js_files": [],
"api_endpoints": [],
"params": [],
"comments": [],
"total": 0
}
cmd = [
"gospider",
"-s", target,
"-d", str(self.depth),
"-c", "10",
"-t", "5",
"--js",
"-q"
]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
if ' - ' in line:
url = line.split(' - ')[-1].strip()
else:
url = line.strip()
if url and url.startswith(('http://', 'https://')):
results["urls"].append(url)
results["total"] += 1
url_lower = url.lower()
if '.js' in url_lower:
results["js_files"].append(url)
if '/api/' in url_lower:
results["api_endpoints"].append(url)
if '?' in url:
results["params"].append(url)
return results
# =============================================================================
# ADVANCED PORT SCANNING
# =============================================================================
class PortScanner:
"""Advanced port scanning with rustscan, naabu, or nmap."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 600)
def scan(self, target: str, scan_type: str = "quick") -> Dict:
"""
Port scan with service detection.
Args:
target: Target host/IP
scan_type: quick (top ports), full (all ports), stealth
"""
logger.info(f"[*] Port scanning: {target}")
print(f"[*] Port scanning: {target} ({scan_type} mode)")
results = {
"target": target,
"open_ports": [],
"services": {},
"by_service": defaultdict(list),
"total": 0
}
# Determine port range based on scan type
if scan_type == "quick":
ports = COMMON_PORTS
elif scan_type == "full":
ports = FULL_PORTS
else:
ports = TOP_1000_PORTS
# Try rustscan first (fastest)
rustscan_ok, _ = check_tool("rustscan")
if rustscan_ok:
print_result("[~]", "Using rustscan (fastest)...")
results = self._run_rustscan(target, ports)
else:
# Try naabu
naabu_ok, _ = check_tool("naabu")
if naabu_ok:
print_result("[~]", "Using naabu...")
results = self._run_naabu(target, ports)
else:
# Fallback to nmap
nmap_ok, _ = check_tool("nmap")
if nmap_ok:
print_result("[~]", "Using nmap...")
results = self._run_nmap(target, ports)
# Service version detection on open ports
if results["open_ports"] and scan_type != "quick":
print_result("[~]", "Running service version detection...")
services = self._detect_services(target, results["open_ports"])
results["services"] = services
for port, service in services.items():
results["by_service"][service].append(port)
results["by_service"] = dict(results["by_service"])
print_result("[+]", f"Open ports: {results['total']}")
if results["services"]:
print_result("[+]", f"Services detected: {len(results['services'])}")
return results
def _run_rustscan(self, target: str, ports: str) -> Dict:
"""Run rustscan for ultra-fast scanning."""
results = {
"target": target,
"open_ports": [],
"services": {},
"by_service": defaultdict(list),
"total": 0
}
cmd = ["rustscan", "-a", target, "-p", ports, "--ulimit", "5000", "-g"]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
# Parse rustscan output format: host -> [ports]
for line in result["stdout"].strip().split('\n'):
if '->' in line:
ports_str = line.split('->')[-1].strip().strip('[]')
for port in ports_str.split(','):
try:
p = int(port.strip())
results["open_ports"].append({"port": p, "protocol": "tcp"})
except:
continue
results["total"] = len(results["open_ports"])
return results
def _run_naabu(self, target: str, ports: str) -> Dict:
"""Run naabu for fast port scanning."""
results = {
"target": target,
"open_ports": [],
"services": {},
"by_service": defaultdict(list),
"total": 0
}
cmd = ["naabu", "-host", target, "-p", ports, "-silent", "-c", "100"]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
line = line.strip()
if ':' in line:
try:
_, port = line.rsplit(':', 1)
results["open_ports"].append({"port": int(port), "protocol": "tcp"})
except:
continue
results["total"] = len(results["open_ports"])
return results
def _run_nmap(self, target: str, ports: str) -> Dict:
"""Run nmap for port scanning."""
results = {
"target": target,
"open_ports": [],
"services": {},
"by_service": defaultdict(list),
"total": 0
}
cmd = ["nmap", "-sS", "-T4", "-p", ports, "--open", "-Pn", target]
result = run_tool(cmd, self.timeout)
if result["stdout"]:
port_pattern = r"(\d+)/(\w+)\s+open\s+(\S+)"
for match in re.finditer(port_pattern, result["stdout"]):
port_info = {
"port": int(match.group(1)),
"protocol": match.group(2),
"service": match.group(3)
}
results["open_ports"].append(port_info)
results["by_service"][match.group(3)].append(int(match.group(1)))
results["total"] = len(results["open_ports"])
return results
def _detect_services(self, target: str, ports: List[Dict]) -> Dict:
"""Detect services on open ports using nmap."""
services = {}
nmap_ok, _ = check_tool("nmap")
if not nmap_ok:
return services
port_list = ",".join([str(p["port"]) for p in ports[:50]]) # Limit to 50 ports
cmd = ["nmap", "-sV", "-p", port_list, "-Pn", target]
result = run_tool(cmd, 300)
if result["stdout"]:
pattern = r"(\d+)/\w+\s+open\s+(\S+)\s+(.*)"
for match in re.finditer(pattern, result["stdout"]):
port = int(match.group(1))
service = f"{match.group(2)} {match.group(3)}".strip()
services[port] = service
return services
# =============================================================================
# DNS ENUMERATION
# =============================================================================
class DNSEnumerator:
"""Advanced DNS enumeration."""
def __init__(self, config: Dict = None):
self.config = config or {}
def enumerate(self, domain: str) -> Dict:
"""Complete DNS enumeration."""
logger.info(f"[*] DNS enumeration for: {domain}")
print(f"[*] DNS enumeration for: {domain}")
results = {
"domain": domain,
"A": [],
"AAAA": [],
"MX": [],
"NS": [],
"TXT": [],
"SOA": [],
"CNAME": [],
"SRV": [],
"zone_transfer": [],
"nameservers_info": []
}
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA', 'CNAME', 'SRV']
if dns:
for rtype in record_types:
try:
answers = dns.resolver.resolve(domain, rtype)
results[rtype] = [str(rdata) for rdata in answers]
print_result("[+]", f"{rtype}: {len(results[rtype])} records")
except dns.resolver.NoAnswer:
pass
except dns.resolver.NXDOMAIN:
print_result("[-]", f"Domain {domain} does not exist")
break
except Exception as e:
pass
# Try zone transfer on each nameserver
if results["NS"]:
print_result("[~]", "Attempting zone transfer...")
for ns in results["NS"]:
zt_result = self._try_zone_transfer(domain, ns.rstrip('.'))
if zt_result:
results["zone_transfer"].extend(zt_result)
print_result("[!]", f"Zone transfer successful on {ns}!")
else:
# Fallback to dig/nslookup
print_result("[~]", "Using dig/nslookup fallback...")
results = self._dig_fallback(domain)
return results
def _try_zone_transfer(self, domain: str, nameserver: str) -> List[str]:
"""Attempt zone transfer."""
records = []
try:
import dns.zone
import dns.query
z = dns.zone.from_xfr(dns.query.xfr(nameserver, domain, timeout=10))
for name, node in z.nodes.items():
records.append(str(name))
except Exception:
pass
return records
def _dig_fallback(self, domain: str) -> Dict:
"""Fallback using dig command."""
results = {
"domain": domain,
"A": [], "AAAA": [], "MX": [], "NS": [], "TXT": [], "SOA": [], "CNAME": [], "SRV": [],
"zone_transfer": [], "nameservers_info": []
}
dig_ok, _ = check_tool("dig")
if not dig_ok:
return results
for rtype in ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME']:
result = run_tool(["dig", "+short", rtype, domain], 30)
if result["stdout"]:
results[rtype] = [r.strip() for r in result["stdout"].strip().split('\n') if r.strip()]
return results
# =============================================================================
# VULNERABILITY SCANNER
# =============================================================================
class VulnScanner:
"""Vulnerability scanning using nuclei."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.timeout = self.config.get('timeout', 900)
def scan(self, targets: List[str], severity: str = "all", templates: str = None) -> Dict:
"""
Vulnerability scan with nuclei.
Args:
targets: List of URLs to scan
severity: critical, high, medium, low, info, all
templates: Specific template path or tag
"""
logger.info(f"[*] Vulnerability scanning {len(targets)} targets")
print(f"[*] Vulnerability scanning {len(targets)} targets...")
results = {
"total_targets": len(targets),
"vulnerabilities": [],
"by_severity": {"critical": [], "high": [], "medium": [], "low": [], "info": []},
"by_type": defaultdict(list),
"statistics": {}
}
nuclei_ok, _ = check_tool("nuclei")
if not nuclei_ok:
print_result("[-]", "nuclei not installed")
return results
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write('\n'.join(targets))
targets_file = f.name
try:
cmd = ["nuclei", "-l", targets_file, "-silent", "-nc", "-j", "-c", "50", "-bs", "25", "-rl", "150"]
if severity != "all":
cmd.extend(["-s", severity])
if templates:
cmd.extend(["-t", templates])
result = run_tool(cmd, self.timeout)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
if not line.strip():
continue
try:
finding = json.loads(line)
vuln = {
"template": finding.get("template-id", ""),
"name": finding.get("info", {}).get("name", ""),
"severity": finding.get("info", {}).get("severity", "info"),
"url": finding.get("matched-at", ""),
"host": finding.get("host", ""),
"description": finding.get("info", {}).get("description", ""),
"tags": finding.get("info", {}).get("tags", []),
"reference": finding.get("info", {}).get("reference", []),
"curl_command": finding.get("curl-command", ""),
"matcher_name": finding.get("matcher-name", ""),
"extracted": finding.get("extracted-results", [])
}
results["vulnerabilities"].append(vuln)
sev = vuln["severity"].lower()
if sev in results["by_severity"]:
results["by_severity"][sev].append(vuln)
# By type/tag
for tag in vuln["tags"]:
results["by_type"][tag].append(vuln)
# Print finding
sev_icon = {"critical": "[!!]", "high": "[!]", "medium": "[*]", "low": "[+]", "info": "[i]"}.get(sev, "[?]")
print_result(sev_icon, f"[{sev.upper()}] {vuln['name']} - {vuln['url']}")
except json.JSONDecodeError:
continue
finally:
os.unlink(targets_file)
# Statistics
results["statistics"] = {
"total": len(results["vulnerabilities"]),
"critical": len(results["by_severity"]["critical"]),
"high": len(results["by_severity"]["high"]),
"medium": len(results["by_severity"]["medium"]),
"low": len(results["by_severity"]["low"]),
"info": len(results["by_severity"]["info"])
}
results["by_type"] = dict(results["by_type"])
print_result("[✓]", f"Total vulnerabilities: {results['statistics']['total']}")
print_result("[!]", f"Critical: {results['statistics']['critical']} | High: {results['statistics']['high']} | Medium: {results['statistics']['medium']}")
return results
# =============================================================================
# WAF DETECTION
# =============================================================================
class WAFDetector:
"""Web Application Firewall detection."""
WAF_SIGNATURES = {
"Cloudflare": ["cf-ray", "cloudflare", "__cfduid", "cf-cache-status"],
"AWS WAF": ["x-amzn-requestid", "x-amz-cf-id"],
"Akamai": ["akamai", "akamai-ghost", "ak_bmsc"],
"Imperva/Incapsula": ["incap_ses", "visid_incap", "x-iinfo", "incapsula"],
"Sucuri": ["sucuri", "x-sucuri-id", "x-sucuri-cache"],
"F5 BIG-IP": ["bigipserver", "x-cnection", "x-wa-info"],
"Barracuda": ["barra_counter_session", "barracuda"],
"Citrix NetScaler": ["ns_af", "citrix_ns_id", "nsc_"],
"Fortinet FortiWeb": ["fortiwafsid", "fgd_icon_hash"],
"ModSecurity": ["mod_security", "modsecurity"],
"DenyAll": ["sessioncookie", "denyall"],
"StackPath": ["x-sp-", "stackpath"],
"Fastly": ["fastly", "x-fastly-request-id"],
"KeyCDN": ["keycdn", "x-edge-location"],
}
def __init__(self, config: Dict = None):
self.config = config or {}
def detect(self, target: str) -> Dict:
"""Detect WAF on target."""
logger.info(f"[*] WAF detection for: {target}")
print(f"[*] Detecting WAF on: {target}")
results = {
"target": target,
"waf_detected": False,
"waf_name": None,
"confidence": "low",
"indicators": [],
"bypass_hints": []
}
# Try wafw00f first
wafw00f_ok, _ = check_tool("wafw00f")
if wafw00f_ok:
print_result("[~]", "Using wafw00f...")
wafw00f_result = self._run_wafw00f(target)
if wafw00f_result.get("waf_detected"):
results.update(wafw00f_result)
print_result("[!]", f"WAF Detected: {results['waf_name']}")
return results
# Manual detection
print_result("[~]", "Running manual WAF detection...")
manual_result = self._manual_detection(target)
results.update(manual_result)
if results["waf_detected"]:
print_result("[!]", f"WAF Detected: {results['waf_name']} (Confidence: {results['confidence']})")
results["bypass_hints"] = self._get_bypass_hints(results["waf_name"])
else:
print_result("[+]", "No WAF detected")
return results
def _run_wafw00f(self, target: str) -> Dict:
"""Run wafw00f for WAF detection."""
result = {
"waf_detected": False,
"waf_name": None,
"confidence": "low",
"indicators": []
}
cmd = ["wafw00f", target, "-o", "-"]
output = run_tool(cmd, 60)
if output["stdout"]:
if "is behind" in output["stdout"]:
match = re.search(r"is behind (.+?)(?:\s|$)", output["stdout"])
if match:
result["waf_detected"] = True
result["waf_name"] = match.group(1).strip()
result["confidence"] = "high"
elif "No WAF" not in output["stdout"]:
result["waf_detected"] = True
result["confidence"] = "medium"
return result
def _manual_detection(self, target: str) -> Dict:
"""Manual WAF detection via headers and behavior."""
result = {
"waf_detected": False,
"waf_name": None,
"confidence": "low",
"indicators": []
}
url = make_url(target)
try:
# Normal request
resp_normal = requests.get(url, timeout=10, verify=False)
headers_normal = {k.lower(): v.lower() for k, v in resp_normal.headers.items()}
cookies_normal = resp_normal.cookies.get_dict()
# Check headers and cookies for WAF signatures
for waf_name, signatures in self.WAF_SIGNATURES.items():
for sig in signatures:
sig_lower = sig.lower()
# Check headers
for header, value in headers_normal.items():
if sig_lower in header or sig_lower in value:
result["waf_detected"] = True
result["waf_name"] = waf_name
result["indicators"].append(f"Header match: {header}")
result["confidence"] = "medium"
break
# Check cookies
for cookie_name in cookies_normal:
if sig_lower in cookie_name.lower():
result["waf_detected"] = True
result["waf_name"] = waf_name
result["indicators"].append(f"Cookie match: {cookie_name}")
result["confidence"] = "medium"
break
if result["waf_detected"]:
break
# Malicious request test (if no WAF detected yet)
if not result["waf_detected"]:
payloads = [
"?id=1' OR '1'='1",
"?q=<script>alert(1)</script>",
"?file=../../../etc/passwd",
"?cmd=;cat /etc/passwd"
]
for payload in payloads:
try:
resp_malicious = requests.get(f"{url}{payload}", timeout=10, verify=False)
# Check for WAF block responses
if resp_malicious.status_code in [403, 406, 429, 503]:
content = resp_malicious.text.lower()
waf_keywords = ['blocked', 'forbidden', 'denied', 'firewall', 'security', 'waf', 'captcha', 'challenge']
if any(kw in content for kw in waf_keywords):
result["waf_detected"] = True
result["confidence"] = "medium"
result["indicators"].append(f"Blocked request: {payload}")
break
except:
continue
except Exception as e:
logger.warning(f"WAF detection error: {e}")
return result
def _get_bypass_hints(self, waf_name: str) -> List[str]:
"""Get WAF bypass hints."""
hints = {
"Cloudflare": [
"Try finding origin IP via DNS history, Shodan, or SecurityTrails",
"Use HTTP/2 specific techniques",
"Try case variation: SeLeCt instead of SELECT",
"URL encode payloads multiple times"
],
"AWS WAF": [
"Try unicode normalization bypass",
"Use JSON-based payloads",
"Chunk transfer encoding"
],
"ModSecurity": [
"Try comments in SQL: SEL/**/ECT",
"Use HPP (HTTP Parameter Pollution)",
"Try alternative encodings"
],
"Akamai": [
"Try cache poisoning techniques",
"Use origin IP if discoverable",
"Header injection techniques"
]
}
return hints.get(waf_name, ["Try common bypass techniques: encoding, case variation, HPP"])
# =============================================================================
# JS FILE ANALYZER
# =============================================================================
class JSAnalyzer:
"""JavaScript file analysis for secrets, endpoints, and sensitive info."""
def __init__(self, config: Dict = None):
self.config = config or {}
def analyze(self, js_urls: List[str]) -> Dict:
"""Analyze JavaScript files for sensitive information."""
logger.info(f"[*] Analyzing {len(js_urls)} JS files")
print(f"[*] Analyzing {len(js_urls)} JavaScript files for secrets...")
results = {
"files_analyzed": 0,
"secrets": [],
"api_endpoints": [],
"domains": [],
"emails": [],
"comments": [],
"by_file": {}
}
for url in js_urls[:50]: # Limit to 50 files
try:
file_results = self._analyze_file(url)
if file_results:
results["by_file"][url] = file_results
results["secrets"].extend(file_results.get("secrets", []))
results["api_endpoints"].extend(file_results.get("endpoints", []))
results["domains"].extend(file_results.get("domains", []))
results["files_analyzed"] += 1
except Exception as e:
logger.warning(f"Error analyzing {url}: {e}")
continue
# Deduplicate
results["secrets"] = list(set([s["value"] if isinstance(s, dict) else s for s in results["secrets"]]))
results["api_endpoints"] = list(set(results["api_endpoints"]))
results["domains"] = list(set(results["domains"]))
print_result("[+]", f"Files analyzed: {results['files_analyzed']}")
print_result("[!]", f"Secrets found: {len(results['secrets'])}")
print_result("[+]", f"API endpoints: {len(results['api_endpoints'])}")
if results["secrets"]:
for secret in results["secrets"][:5]:
print_result("[!!]", f"Secret: {secret[:50]}...")
return results
def _analyze_file(self, url: str) -> Dict:
"""Analyze single JS file."""
results = {
"secrets": [],
"endpoints": [],
"domains": [],
"comments": []
}
try:
resp = requests.get(url, timeout=15, verify=False)
if resp.status_code != 200:
return results
content = resp.text
# Find secrets
for secret_type, pattern in SECRET_PATTERNS.items():
matches = re.findall(pattern, content)
for match in matches:
results["secrets"].append({
"type": secret_type,
"value": match,
"file": url
})
# Find API endpoints
endpoint_patterns = [
r'["\']/(api|v[0-9]+)/[a-zA-Z0-9/_-]+["\']',
r'["\']https?://[^"\']+/api/[^"\']+["\']',
r'fetch\(["\'][^"\']+["\']',
r'axios\.(get|post|put|delete)\(["\'][^"\']+["\']',
r'\.ajax\(\{[^}]*url:\s*["\'][^"\']+["\']'
]
for pattern in endpoint_patterns:
matches = re.findall(pattern, content, re.I)
for match in matches:
if isinstance(match, tuple):
match = match[0]
endpoint = match.strip('"\'')
if len(endpoint) > 3:
results["endpoints"].append(endpoint)
# Find domains/URLs
domain_pattern = r'https?://([a-zA-Z0-9][-a-zA-Z0-9]*\.)+[a-zA-Z]{2,}'
domains = re.findall(domain_pattern, content)
results["domains"] = list(set(domains))[:20]
except Exception as e:
logger.warning(f"JS analysis error for {url}: {e}")
return results
# =============================================================================
# SUBDOMAIN TAKEOVER DETECTION
# =============================================================================
class TakeoverDetector:
"""Detect potential subdomain takeover vulnerabilities."""
def __init__(self, config: Dict = None):
self.config = config or {}
def detect(self, subdomains: List[str]) -> Dict:
"""Check for subdomain takeover possibilities."""
logger.info(f"[*] Checking {len(subdomains)} subdomains for takeover")
print(f"[*] Checking {len(subdomains)} subdomains for takeover...")
results = {
"checked": 0,
"vulnerable": [],
"potential": [],
"cname_records": {}
}
# Try subjack if available
subjack_ok, _ = check_tool("subjack")
if subjack_ok:
print_result("[~]", "Using subjack...")
subjack_results = self._run_subjack(subdomains)
results["vulnerable"].extend(subjack_results)
# Manual CNAME check
print_result("[~]", "Checking CNAME records...")
for subdomain in subdomains[:100]: # Limit
cname_result = self._check_cname(subdomain)
results["checked"] += 1
if cname_result.get("vulnerable"):
results["vulnerable"].append(cname_result)
print_result("[!!]", f"VULNERABLE: {subdomain} -> {cname_result['cname']} ({cname_result['service']})")
elif cname_result.get("potential"):
results["potential"].append(cname_result)
if cname_result.get("cname"):
results["cname_records"][subdomain] = cname_result["cname"]
print_result("[+]", f"Subdomains checked: {results['checked']}")
print_result("[!]", f"Vulnerable: {len(results['vulnerable'])}")
print_result("[*]", f"Potential: {len(results['potential'])}")
return results
def _run_subjack(self, subdomains: List[str]) -> List[Dict]:
"""Run subjack for takeover detection."""
vulnerable = []
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write('\n'.join(subdomains))
subs_file = f.name
try:
cmd = ["subjack", "-w", subs_file, "-t", "50", "-timeout", "30", "-o", "-", "-ssl"]
result = run_tool(cmd, 300)
if result["stdout"]:
for line in result["stdout"].strip().split('\n'):
if "[Vulnerable]" in line or "vulnerable" in line.lower():
vulnerable.append({"subdomain": line, "source": "subjack"})
finally:
os.unlink(subs_file)
return vulnerable
def _check_cname(self, subdomain: str) -> Dict:
"""Check CNAME record for takeover indicators."""
result = {
"subdomain": subdomain,
"cname": None,
"vulnerable": False,
"potential": False,
"service": None
}
try:
if dns:
answers = dns.resolver.resolve(subdomain, 'CNAME')
for rdata in answers:
cname = str(rdata.target).rstrip('.')
result["cname"] = cname
# Check against known takeover signatures
for pattern, service in TAKEOVER_CNAMES.items():
if pattern in cname.lower():
result["potential"] = True
result["service"] = service
# Try to resolve CNAME - if it fails, likely vulnerable
try:
socket.gethostbyname(cname)
except socket.gaierror:
result["vulnerable"] = True
break
except:
pass
return result
# =============================================================================
# CORS MISCONFIGURATION CHECKER
# =============================================================================
class CORSChecker:
"""Check for CORS misconfigurations."""
def __init__(self, config: Dict = None):
self.config = config or {}
def check(self, targets: List[str]) -> Dict:
"""Check targets for CORS misconfigurations."""
logger.info(f"[*] CORS check on {len(targets)} targets")
print(f"[*] Checking {len(targets)} targets for CORS misconfigurations...")
results = {
"checked": 0,
"vulnerable": [],
"warnings": [],
"by_type": defaultdict(list)
}
for target in targets[:50]:
url = make_url(target)
cors_result = self._check_cors(url)
results["checked"] += 1
if cors_result.get("vulnerable"):
results["vulnerable"].append(cors_result)
results["by_type"][cors_result["type"]].append(url)
print_result("[!]", f"CORS Vuln ({cors_result['type']}): {url}")
elif cors_result.get("warning"):
results["warnings"].append(cors_result)
results["by_type"] = dict(results["by_type"])
print_result("[+]", f"Checked: {results['checked']}")
print_result("[!]", f"Vulnerable: {len(results['vulnerable'])}")
return results
def _check_cors(self, url: str) -> Dict:
"""Check single URL for CORS misconfiguration."""
result = {
"url": url,
"vulnerable": False,
"warning": False,
"type": None,
"details": None
}
test_origins = [
"https://evil.com",
"null",
f"https://{urlparse(url).netloc}.evil.com",
urlparse(url).scheme + "://" + urlparse(url).netloc.replace(".", "x"),
]
try:
for origin in test_origins:
headers = {"Origin": origin}
resp = requests.get(url, headers=headers, timeout=10, verify=False)
acao = resp.headers.get("Access-Control-Allow-Origin", "")
acac = resp.headers.get("Access-Control-Allow-Credentials", "")
# Check for vulnerable configurations
if acao == "*":
result["warning"] = True
result["type"] = "wildcard_origin"
result["details"] = "ACAO: * (wildcard)"
if acac.lower() == "true":
result["vulnerable"] = True
result["type"] = "wildcard_with_credentials"
return result
elif acao == origin:
result["vulnerable"] = True
result["type"] = "origin_reflection"
result["details"] = f"Origin reflected: {origin}"
if acac.lower() == "true":
result["type"] = "origin_reflection_with_credentials"
return result
elif acao == "null" and origin == "null":
result["vulnerable"] = True
result["type"] = "null_origin_allowed"
result["details"] = "null origin allowed"
return result
except Exception as e:
logger.warning(f"CORS check error for {url}: {e}")
return result
# =============================================================================
# SCREENSHOT CAPTURE
# =============================================================================
class ScreenshotCapture:
"""Capture screenshots of web targets."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.output_dir = config.get('screenshot_dir', '/opt/NeuroSploitv2/results/screenshots')
def capture(self, targets: List[str]) -> Dict:
"""Capture screenshots of targets."""
logger.info(f"[*] Capturing screenshots for {len(targets)} targets")
print(f"[*] Capturing screenshots for {len(targets)} targets...")
results = {
"captured": 0,
"failed": 0,
"screenshots": [],
"output_dir": self.output_dir
}
# Create output directory
os.makedirs(self.output_dir, exist_ok=True)
# Try gowitness
gowitness_ok, _ = check_tool("gowitness")
if gowitness_ok:
print_result("[~]", "Using gowitness...")
results = self._run_gowitness(targets)
else:
# Try eyewitness
eyewitness_ok, _ = check_tool("eyewitness")
if eyewitness_ok:
print_result("[~]", "Using eyewitness...")
results = self._run_eyewitness(targets)
else:
print_result("[-]", "No screenshot tool available (gowitness/eyewitness)")
print_result("[+]", f"Screenshots captured: {results['captured']}")
print_result("[+]", f"Output directory: {results['output_dir']}")
return results
def _run_gowitness(self, targets: List[str]) -> Dict:
"""Run gowitness for screenshots."""
results = {
"captured": 0,
"failed": 0,
"screenshots": [],
"output_dir": self.output_dir
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write('\n'.join(targets))
targets_file = f.name
try:
cmd = [
"gowitness", "file",
"-f", targets_file,
"-P", self.output_dir,
"--timeout", "30",
"-t", "10"
]
result = run_tool(cmd, 600)
# Count screenshots
if os.path.exists(self.output_dir):
screenshots = list(Path(self.output_dir).glob("*.png"))
results["captured"] = len(screenshots)
results["screenshots"] = [str(s) for s in screenshots[:100]]
finally:
os.unlink(targets_file)
return results
def _run_eyewitness(self, targets: List[str]) -> Dict:
"""Run eyewitness for screenshots."""
results = {
"captured": 0,
"failed": 0,
"screenshots": [],
"output_dir": self.output_dir
}
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write('\n'.join(targets))
targets_file = f.name
try:
cmd = [
"eyewitness",
"-f", targets_file,
"-d", self.output_dir,
"--timeout", "30",
"--threads", "10",
"--no-prompt"
]
result = run_tool(cmd, 600)
if os.path.exists(self.output_dir):
screenshots = list(Path(self.output_dir).glob("**/*.png"))
results["captured"] = len(screenshots)
results["screenshots"] = [str(s) for s in screenshots[:100]]
finally:
os.unlink(targets_file)
return results
# =============================================================================
# CLOUD BUCKET ENUMERATION
# =============================================================================
class CloudBucketEnum:
"""Enumerate cloud storage buckets (S3, GCS, Azure)."""
def __init__(self, config: Dict = None):
self.config = config or {}
def enumerate(self, domain: str, keywords: List[str] = None) -> Dict:
"""Enumerate cloud buckets based on domain and keywords."""
logger.info(f"[*] Cloud bucket enumeration for: {domain}")
print(f"[*] Enumerating cloud buckets for: {domain}")
results = {
"domain": domain,
"s3_buckets": [],
"gcs_buckets": [],
"azure_blobs": [],
"accessible": [],
"total": 0
}
# Generate bucket names
base_name = domain.replace(".", "-").replace("www-", "")
bucket_names = self._generate_bucket_names(base_name, keywords or [])
print_result("[~]", f"Testing {len(bucket_names)} potential bucket names...")
# Check S3
s3_found = self._check_s3_buckets(bucket_names)
results["s3_buckets"] = s3_found
# Check GCS
gcs_found = self._check_gcs_buckets(bucket_names)
results["gcs_buckets"] = gcs_found
# Check Azure
azure_found = self._check_azure_blobs(bucket_names)
results["azure_blobs"] = azure_found
results["accessible"] = [b for b in s3_found + gcs_found + azure_found if b.get("accessible")]
results["total"] = len(s3_found) + len(gcs_found) + len(azure_found)
print_result("[+]", f"S3 buckets: {len(s3_found)}")
print_result("[+]", f"GCS buckets: {len(gcs_found)}")
print_result("[+]", f"Azure blobs: {len(azure_found)}")
if results["accessible"]:
print_result("[!]", f"Accessible buckets: {len(results['accessible'])}")
return results
def _generate_bucket_names(self, base: str, keywords: List[str]) -> List[str]:
"""Generate potential bucket names."""
names = set()
prefixes = ['', 'dev-', 'staging-', 'prod-', 'test-', 'backup-', 'assets-', 'static-', 'media-', 'uploads-', 'data-', 'files-', 'cdn-', 'img-', 'images-']
suffixes = ['', '-dev', '-staging', '-prod', '-test', '-backup', '-assets', '-static', '-media', '-uploads', '-data', '-files', '-cdn', '-images', '-public', '-private', '-internal']
for prefix in prefixes:
for suffix in suffixes:
name = f"{prefix}{base}{suffix}".strip('-')
if name and 3 <= len(name) <= 63:
names.add(name)
for keyword in keywords:
names.add(f"{base}-{keyword}")
names.add(f"{keyword}-{base}")
return list(names)[:200]
def _check_s3_buckets(self, names: List[str]) -> List[Dict]:
"""Check for S3 buckets."""
found = []
def check_bucket(name):
try:
url = f"https://{name}.s3.amazonaws.com"
resp = requests.head(url, timeout=5)
if resp.status_code in [200, 403, 301, 307]:
accessible = resp.status_code == 200
return {"name": name, "url": url, "status": resp.status_code, "accessible": accessible}
except:
pass
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
results = executor.map(check_bucket, names)
found = [r for r in results if r]
return found
def _check_gcs_buckets(self, names: List[str]) -> List[Dict]:
"""Check for Google Cloud Storage buckets."""
found = []
def check_bucket(name):
try:
url = f"https://storage.googleapis.com/{name}"
resp = requests.head(url, timeout=5)
if resp.status_code in [200, 403]:
accessible = resp.status_code == 200
return {"name": name, "url": url, "status": resp.status_code, "accessible": accessible}
except:
pass
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
results = executor.map(check_bucket, names)
found = [r for r in results if r]
return found
def _check_azure_blobs(self, names: List[str]) -> List[Dict]:
"""Check for Azure Blob Storage."""
found = []
def check_blob(name):
try:
url = f"https://{name}.blob.core.windows.net"
resp = requests.head(url, timeout=5)
if resp.status_code in [200, 403, 400]:
accessible = resp.status_code == 200
return {"name": name, "url": url, "status": resp.status_code, "accessible": accessible}
except:
pass
return None
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
results = executor.map(check_blob, names)
found = [r for r in results if r]
return found
# =============================================================================
# TECHNOLOGY FINGERPRINTER
# =============================================================================
class TechFingerprinter:
"""Advanced technology fingerprinting."""
def __init__(self, config: Dict = None):
self.config = config or {}
def fingerprint(self, target: str) -> Dict:
"""Deep technology fingerprinting."""
logger.info(f"[*] Fingerprinting: {target}")
print(f"[*] Technology fingerprinting: {target}")
results = {
"target": target,
"technologies": [],
"cms": None,
"web_server": None,
"programming_language": None,
"frameworks": [],
"js_libraries": [],
"cdn": None,
"analytics": [],
"headers": {},
"meta_tags": {}
}
# Try whatweb first
whatweb_ok, _ = check_tool("whatweb")
if whatweb_ok:
print_result("[~]", "Running whatweb...")
whatweb_results = self._run_whatweb(target)
results.update(whatweb_results)
# Manual fingerprinting
print_result("[~]", "Running manual fingerprinting...")
manual_results = self._manual_fingerprint(target)
# Merge results
results["technologies"] = list(set(results.get("technologies", []) + manual_results.get("technologies", [])))
results["frameworks"] = list(set(results.get("frameworks", []) + manual_results.get("frameworks", [])))
results["js_libraries"] = list(set(results.get("js_libraries", []) + manual_results.get("js_libraries", [])))
if not results["cms"]:
results["cms"] = manual_results.get("cms")
if not results["web_server"]:
results["web_server"] = manual_results.get("web_server")
results["headers"] = manual_results.get("headers", {})
results["meta_tags"] = manual_results.get("meta_tags", {})
print_result("[+]", f"Technologies: {len(results['technologies'])}")
if results["cms"]:
print_result("[+]", f"CMS: {results['cms']}")
if results["web_server"]:
print_result("[+]", f"Web Server: {results['web_server']}")
return results
def _run_whatweb(self, target: str) -> Dict:
"""Run whatweb for fingerprinting."""
results = {"technologies": [], "cms": None, "web_server": None, "frameworks": [], "js_libraries": []}
url = make_url(target)
cmd = ["whatweb", "-a", "3", "--color=never", url]
result = run_tool(cmd, 120)
if result["stdout"]:
# Parse whatweb output
techs = re.findall(r'\[([^\]]+)\]', result["stdout"])
results["technologies"] = list(set(techs))
# Identify specific categories
cms_keywords = ['WordPress', 'Drupal', 'Joomla', 'Magento', 'Shopify', 'PrestaShop', 'OpenCart', 'TYPO3', 'Ghost']
framework_keywords = ['Laravel', 'Django', 'Rails', 'Express', 'Spring', 'ASP.NET', 'Flask', 'FastAPI', 'Next.js', 'Nuxt']
for tech in results["technologies"]:
for cms in cms_keywords:
if cms.lower() in tech.lower():
results["cms"] = cms
for fw in framework_keywords:
if fw.lower() in tech.lower():
results["frameworks"].append(fw)
return results
def _manual_fingerprint(self, target: str) -> Dict:
"""Manual technology fingerprinting."""
results = {
"technologies": [],
"cms": None,
"web_server": None,
"programming_language": None,
"frameworks": [],
"js_libraries": [],
"headers": {},
"meta_tags": {}
}
url = make_url(target)
try:
resp = requests.get(url, timeout=15, verify=False)
# Headers analysis
headers = dict(resp.headers)
results["headers"] = headers
if 'Server' in headers:
results["web_server"] = headers['Server']
results["technologies"].append(f"Server: {headers['Server']}")
if 'X-Powered-By' in headers:
results["programming_language"] = headers['X-Powered-By']
results["technologies"].append(f"X-Powered-By: {headers['X-Powered-By']}")
# Content analysis
content = resp.text.lower()
# CMS detection
cms_signatures = {
'WordPress': ['wp-content', 'wp-includes', 'wordpress'],
'Drupal': ['drupal', 'sites/default/files'],
'Joomla': ['joomla', '/components/com_'],
'Magento': ['magento', 'mage/'],
'Shopify': ['shopify', 'cdn.shopify'],
'Ghost': ['ghost', 'ghost/'],
}
for cms, sigs in cms_signatures.items():
if any(sig in content for sig in sigs):
results["cms"] = cms
results["technologies"].append(cms)
break
# JS Library detection
js_libs = {
'jQuery': ['jquery', 'jquery.min.js'],
'React': ['react', 'react.production.min.js', '__react'],
'Vue.js': ['vue.js', 'vue.min.js', '__vue__'],
'Angular': ['angular', 'ng-app', 'ng-controller'],
'Bootstrap': ['bootstrap', 'bootstrap.min'],
'Tailwind': ['tailwindcss', 'tailwind'],
}
for lib, sigs in js_libs.items():
if any(sig in content for sig in sigs):
results["js_libraries"].append(lib)
results["technologies"].append(lib)
# Meta tags
meta_patterns = {
'generator': r'<meta[^>]*name=["\']generator["\'][^>]*content=["\']([^"\']+)["\']',
'framework': r'<meta[^>]*name=["\']framework["\'][^>]*content=["\']([^"\']+)["\']',
}
for name, pattern in meta_patterns.items():
match = re.search(pattern, content, re.I)
if match:
results["meta_tags"][name] = match.group(1)
results["technologies"].append(match.group(1))
except Exception as e:
logger.warning(f"Manual fingerprint error: {e}")
return results
# =============================================================================
# FULL RECON RUNNER - ORCHESTRATOR
# =============================================================================
class FullReconRunner:
"""
Complete reconnaissance orchestrator.
Runs all phases and consolidates results.
"""
def __init__(self, config: Dict = None):
self.config = config or {}
def run(self, target: str, target_type: str = "domain", depth: str = "medium") -> Dict:
"""
Run comprehensive reconnaissance.
Args:
target: Target domain or URL
target_type: domain, url
depth: quick, medium, deep
Returns:
Consolidated recon results
"""
from core.context_builder import ReconContextBuilder
print(f"\n{'='*70}")
print(" NEUROSPLOIT v2 - ADVANCED RECONNAISSANCE ENGINE")
print(f"{'='*70}")
print(f"\n[*] Target: {target}")
print(f"[*] Type: {target_type}")
print(f"[*] Depth: {depth}\n")
# Initialize context builder
ctx = ReconContextBuilder()
ctx.set_target(target, target_type)
# Extract domain
domain = extract_domain(target) if target_type == "url" else target
# ================================================================
# PHASE 1: Subdomain Enumeration
# ================================================================
print_phase(1, "SUBDOMAIN ENUMERATION")
sub_enum = AdvancedSubdomainEnum(self.config)
sub_results = sub_enum.enumerate(domain, depth)
ctx.add_subdomains(sub_results.get("subdomains", []))
ctx.add_tool_result("subdomain_enum", sub_results)
subdomains = sub_results.get("subdomains", [domain])
# ================================================================
# PHASE 2: HTTP Probing
# ================================================================
print_phase(2, "HTTP PROBING & TECHNOLOGY DETECTION")
prober = HttpProber(self.config)
probe_results = prober.probe(subdomains)
ctx.add_live_hosts(probe_results.get("alive", []))
ctx.add_technologies(list(probe_results.get("technologies", {}).keys()))
ctx.add_tool_result("http_probe", probe_results)
alive_hosts = probe_results.get("alive", [])
# ================================================================
# PHASE 3: WAF Detection
# ================================================================
print_phase(3, "WAF DETECTION")
waf_detector = WAFDetector(self.config)
waf_result = waf_detector.detect(target)
ctx.add_tool_result("waf_detection", waf_result)
# ================================================================
# PHASE 4: Port Scanning
# ================================================================
print_phase(4, "PORT SCANNING")
port_scanner = PortScanner(self.config)
scan_type = "quick" if depth == "quick" else ("full" if depth == "deep" else "quick")
port_results = port_scanner.scan(domain, scan_type)
ctx.add_open_ports(port_results.get("open_ports", []))
ctx.add_tool_result("port_scan", port_results)
# ================================================================
# PHASE 5: Directory Bruteforce
# ================================================================
if alive_hosts and depth != "quick":
print_phase(5, "DIRECTORY BRUTEFORCE")
dir_bruter = DirectoryBruter(self.config)
wordlist_size = "medium" if depth == "medium" else "big"
dir_results = dir_bruter.bruteforce(alive_hosts[0], wordlist_size)
ctx.add_interesting_paths([d.get("url", "") for d in dir_results.get("interesting", [])])
ctx.add_tool_result("dir_bruteforce", dir_results)
# ================================================================
# PHASE 6: URL Collection
# ================================================================
print_phase(6, "URL COLLECTION")
url_collector = URLCollector(self.config)
url_results = url_collector.collect(domain)
ctx.add_urls(url_results.get("urls", []))
ctx.add_js_files(url_results.get("js_files", []))
ctx.add_api_endpoints(url_results.get("api_endpoints", []))
ctx.add_tool_result("url_collection", url_results)
# ================================================================
# PHASE 7: Parameter Discovery
# ================================================================
print_phase(7, "PARAMETER DISCOVERY")
param_spider = ParamSpider(self.config)
param_results = param_spider.spider(domain)
ctx.add_tool_result("param_discovery", param_results)
# ================================================================
# PHASE 8: Web Crawling
# ================================================================
if alive_hosts:
print_phase(8, "WEB CRAWLING")
crawler = WebCrawler(self.config)
crawl_results = crawler.crawl(alive_hosts[0])
ctx.add_urls(crawl_results.get("urls", []))
ctx.add_js_files(crawl_results.get("js_files", []))
ctx.add_api_endpoints(crawl_results.get("api_endpoints", []))
ctx.add_tool_result("crawling", crawl_results)
# ================================================================
# PHASE 9: JavaScript Analysis
# ================================================================
js_files = list(ctx.js_files)
if js_files:
print_phase(9, "JAVASCRIPT ANALYSIS")
js_analyzer = JSAnalyzer(self.config)
js_results = js_analyzer.analyze(js_files)
ctx.add_secrets(js_results.get("secrets", []))
ctx.add_api_endpoints(js_results.get("api_endpoints", []))
ctx.add_tool_result("js_analysis", js_results)
# ================================================================
# PHASE 10: DNS Enumeration
# ================================================================
print_phase(10, "DNS ENUMERATION")
dns_enum = DNSEnumerator(self.config)
dns_results = dns_enum.enumerate(domain)
dns_records = []
for rtype, records in dns_results.items():
if rtype != "domain" and records:
for r in records:
dns_records.append(f"[{rtype}] {r}")
ctx.add_dns_records(dns_records)
ctx.add_tool_result("dns_enum", dns_results)
# ================================================================
# PHASE 11: Subdomain Takeover Check
# ================================================================
if depth != "quick" and subdomains:
print_phase(11, "SUBDOMAIN TAKEOVER CHECK")
takeover = TakeoverDetector(self.config)
takeover_results = takeover.detect(subdomains[:100])
ctx.add_tool_result("subdomain_takeover", takeover_results)
if takeover_results.get("vulnerable"):
for v in takeover_results["vulnerable"]:
ctx.add_vulnerabilities([{
"title": "Subdomain Takeover",
"severity": "high",
"affected_endpoint": v.get("subdomain", ""),
"description": f"Potential subdomain takeover via {v.get('service', 'unknown')}"
}])
# ================================================================
# PHASE 12: CORS Misconfiguration Check
# ================================================================
if alive_hosts and depth != "quick":
print_phase(12, "CORS MISCONFIGURATION CHECK")
cors_checker = CORSChecker(self.config)
cors_results = cors_checker.check(alive_hosts[:30])
ctx.add_tool_result("cors_check", cors_results)
for vuln in cors_results.get("vulnerable", []):
ctx.add_vulnerabilities([{
"title": f"CORS Misconfiguration ({vuln.get('type', '')})",
"severity": "medium",
"affected_endpoint": vuln.get("url", ""),
"description": vuln.get("details", "")
}])
# ================================================================
# PHASE 13: Cloud Bucket Enumeration
# ================================================================
if depth == "deep":
print_phase(13, "CLOUD BUCKET ENUMERATION")
cloud_enum = CloudBucketEnum(self.config)
cloud_results = cloud_enum.enumerate(domain)
ctx.add_tool_result("cloud_buckets", cloud_results)
for bucket in cloud_results.get("accessible", []):
ctx.add_vulnerabilities([{
"title": "Accessible Cloud Bucket",
"severity": "high",
"affected_endpoint": bucket.get("url", ""),
"description": f"Publicly accessible cloud storage: {bucket.get('name', '')}"
}])
# ================================================================
# PHASE 14: Technology Fingerprinting
# ================================================================
print_phase(14, "TECHNOLOGY FINGERPRINTING")
fingerprinter = TechFingerprinter(self.config)
tech_results = fingerprinter.fingerprint(target)
ctx.add_technologies(tech_results.get("technologies", []))
ctx.add_tool_result("tech_fingerprint", tech_results)
# ================================================================
# PHASE 15: Vulnerability Scanning
# ================================================================
print_phase(15, "VULNERABILITY SCANNING (NUCLEI)")
vuln_scanner = VulnScanner(self.config)
scan_targets = alive_hosts[:30] if alive_hosts else [target]
severity = "all" if depth == "deep" else "critical,high,medium"
vuln_results = vuln_scanner.scan(scan_targets, severity)
for v in vuln_results.get("vulnerabilities", []):
ctx.add_vulnerabilities([{
"title": v.get("name", ""),
"severity": v.get("severity", "info"),
"affected_endpoint": v.get("url", ""),
"description": v.get("description", ""),
"references": v.get("reference", [])
}])
ctx.add_tool_result("vuln_scan", vuln_results)
# ================================================================
# PHASE 16: Screenshot Capture (optional)
# ================================================================
if depth == "deep" and alive_hosts:
print_phase(16, "SCREENSHOT CAPTURE")
screenshot = ScreenshotCapture(self.config)
screenshot_results = screenshot.capture(alive_hosts[:20])
ctx.add_tool_result("screenshots", screenshot_results)
# ================================================================
# CONSOLIDATION
# ================================================================
print(f"\n{'='*70}")
print("[FINAL] CONSOLIDATING RESULTS")
print(f"{'='*70}")
# Identify interesting paths from all URLs
all_urls = list(ctx.urls)
ctx.add_interesting_paths(all_urls)
# Save context
saved = ctx.save()
# Print summary
print(f"\n{'='*70}")
print("[✓] RECONNAISSANCE COMPLETE!")
print(f"{'='*70}")
print(f"""
SUMMARY:
─────────────────────────────────────────────
Subdomains discovered: {len(ctx.subdomains)}
Live hosts: {len(ctx.live_hosts)}
Open ports: {len(ctx.open_ports)}
URLs collected: {len(ctx.urls)}
URLs with parameters: {len(ctx.urls_with_params)}
JavaScript files: {len(ctx.js_files)}
API endpoints: {len(ctx.api_endpoints)}
Technologies detected: {len(ctx.technologies)}
Vulnerabilities found: {len(ctx.vulnerabilities)}
WAF Detected: {waf_result.get('waf_name', 'None')}
Context saved to: {saved['json']}
─────────────────────────────────────────────
""")
return {
"context": saved["context"],
"context_file": str(saved["json"]),
"context_text_file": str(saved["txt"]),
"context_text": ctx.get_llm_prompt_context(),
"summary": {
"subdomains": len(ctx.subdomains),
"live_hosts": len(ctx.live_hosts),
"open_ports": len(ctx.open_ports),
"urls": len(ctx.urls),
"vulnerabilities": len(ctx.vulnerabilities),
"waf": waf_result.get('waf_name')
}
}
# =============================================================================
# LEGACY CLASSES (Backwards Compatibility)
# =============================================================================
class NetworkScanner(PortScanner):
"""Legacy NetworkScanner - now uses PortScanner."""
pass
class WebRecon:
"""Legacy web reconnaissance - now uses multiple specialized classes."""
def __init__(self, config: Dict = None):
self.config = config or {}
self.fingerprinter = TechFingerprinter(config)
self.waf_detector = WAFDetector(config)
def analyze(self, url: str) -> Dict:
"""Analyze web application."""
results = {
"url": url,
"technologies": [],
"headers": {},
"security_headers": {},
"endpoints": [],
"forms": [],
"vulnerabilities": [],
"waf": None
}
# Technology fingerprinting
tech_results = self.fingerprinter.fingerprint(url)
results["technologies"] = tech_results.get("technologies", [])
results["headers"] = tech_results.get("headers", {})
# WAF detection
waf_results = self.waf_detector.detect(url)
results["waf"] = waf_results.get("waf_name")
# Security headers check
security_headers = ['X-Frame-Options', 'X-Content-Type-Options', 'Strict-Transport-Security',
'Content-Security-Policy', 'X-XSS-Protection', 'Referrer-Policy']
for header in security_headers:
if header in results["headers"]:
results["security_headers"][header] = results["headers"][header]
else:
results["security_headers"][header] = "Missing"
return results
class OSINTCollector:
"""OSINT collection."""
def __init__(self, config: Dict = None):
self.config = config or {}
def collect(self, target: str) -> Dict:
"""Collect OSINT data."""
return {
"target": target,
"emails": [],
"social_media": {},
"data_breaches": [],
"metadata": {}
}
class SubdomainFinder(AdvancedSubdomainEnum):
"""Legacy SubdomainFinder - now uses AdvancedSubdomainEnum."""
def find(self, domain: str) -> List[str]:
"""Find subdomains."""
results = self.enumerate(domain, depth="quick")
return results.get("subdomains", [])