mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-12 14:02:45 +00:00
884 lines
34 KiB
Python
884 lines
34 KiB
Python
"""
|
|
NeuroSploit v3 - Full Recon Integration
|
|
|
|
Integrates 40+ security/recon tools for comprehensive reconnaissance:
|
|
- Subdomain Enumeration: subfinder, amass, assetfinder, chaos, cero
|
|
- DNS Resolution: dnsx, massdns, puredns
|
|
- HTTP Probing: httpx, httprobe
|
|
- URL Discovery: gau, waybackurls, katana, gospider, hakrawler, cariddi
|
|
- Port Scanning: nmap, naabu, rustscan
|
|
- Tech Detection: whatweb, wafw00f
|
|
- Fuzzing: ffuf, gobuster, dirb, dirsearch
|
|
- Vulnerability Scanning: nuclei, nikto
|
|
- Parameter Discovery: arjun, paramspider
|
|
"""
|
|
import asyncio
|
|
import subprocess
|
|
import json
|
|
import os
|
|
import sys
|
|
import shutil
|
|
from typing import Optional, Callable, List, Dict, Any
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from backend.api.websocket import manager as ws_manager
|
|
|
|
|
|
class ReconIntegration:
|
|
"""
|
|
Full reconnaissance integration with 40+ security tools.
|
|
Automatically uses available tools and skips missing ones.
|
|
"""
|
|
|
|
def __init__(self, scan_id: str):
|
|
self.scan_id = scan_id
|
|
self.base_path = Path("/app")
|
|
self.results_path = self.base_path / "data" / "recon"
|
|
self.results_path.mkdir(parents=True, exist_ok=True)
|
|
self.wordlists_path = Path("/opt/wordlists")
|
|
|
|
# Track available tools
|
|
self.available_tools = {}
|
|
|
|
async def log(self, level: str, message: str):
|
|
"""Send log message via WebSocket"""
|
|
await ws_manager.broadcast_log(self.scan_id, level, message)
|
|
print(f"[{level.upper()}] {message}")
|
|
|
|
def _tool_exists(self, tool: str) -> bool:
|
|
"""Check if a tool is available"""
|
|
if tool not in self.available_tools:
|
|
self.available_tools[tool] = shutil.which(tool) is not None
|
|
return self.available_tools[tool]
|
|
|
|
async def run_full_recon(self, target: str, depth: str = "medium") -> Dict[str, Any]:
|
|
"""
|
|
Run full reconnaissance using all available tools.
|
|
|
|
Args:
|
|
target: Target domain or URL
|
|
depth: quick, medium, or full
|
|
|
|
Returns:
|
|
Dictionary with all recon results
|
|
"""
|
|
await self.log("info", f"🚀 Starting FULL reconnaissance on {target}")
|
|
await self.log("info", f"📊 Depth level: {depth}")
|
|
await ws_manager.broadcast_progress(self.scan_id, 5, "Initializing reconnaissance...")
|
|
|
|
# Check available tools
|
|
await self._check_tools()
|
|
|
|
results = {
|
|
"target": target,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"depth": depth,
|
|
"subdomains": [],
|
|
"live_hosts": [],
|
|
"urls": [],
|
|
"endpoints": [],
|
|
"ports": [],
|
|
"technologies": [],
|
|
"vulnerabilities": [],
|
|
"js_files": [],
|
|
"parameters": [],
|
|
"interesting_paths": [],
|
|
"dns_records": [],
|
|
"screenshots": [],
|
|
"secrets": []
|
|
}
|
|
|
|
# Extract domain from URL
|
|
domain = self._extract_domain(target)
|
|
base_url = target if target.startswith("http") else f"https://{target}"
|
|
|
|
# Run recon phases based on depth
|
|
phases = self._get_phases(depth)
|
|
total_phases = len(phases)
|
|
|
|
for i, (phase_name, phase_func) in enumerate(phases):
|
|
try:
|
|
progress = 5 + int((i / total_phases) * 35)
|
|
await ws_manager.broadcast_progress(self.scan_id, progress, f"Recon: {phase_name}")
|
|
await self.log("info", f"▶ Running {phase_name}...")
|
|
|
|
phase_results = await phase_func(domain, base_url)
|
|
results = self._merge_results(results, phase_results)
|
|
|
|
# Broadcast discoveries
|
|
for endpoint in phase_results.get("endpoints", []):
|
|
if isinstance(endpoint, dict):
|
|
await ws_manager.broadcast_endpoint_found(self.scan_id, endpoint)
|
|
|
|
for url in phase_results.get("urls", [])[:10]:
|
|
await ws_manager.broadcast_url_discovered(self.scan_id, url)
|
|
|
|
await self.log("info", f"✓ {phase_name} complete")
|
|
except Exception as e:
|
|
await self.log("warning", f"⚠ {phase_name} failed: {str(e)}")
|
|
|
|
# Summary
|
|
await self.log("info", f"═══════════════════════════════════════")
|
|
await self.log("info", f"📊 Reconnaissance Summary:")
|
|
await self.log("info", f" • Subdomains: {len(results['subdomains'])}")
|
|
await self.log("info", f" • Live hosts: {len(results['live_hosts'])}")
|
|
await self.log("info", f" • URLs: {len(results['urls'])}")
|
|
await self.log("info", f" • Endpoints: {len(results['endpoints'])}")
|
|
await self.log("info", f" • Open ports: {len(results['ports'])}")
|
|
await self.log("info", f" • JS files: {len(results['js_files'])}")
|
|
await self.log("info", f" • Nuclei findings: {len(results['vulnerabilities'])}")
|
|
await self.log("info", f"═══════════════════════════════════════")
|
|
|
|
return results
|
|
|
|
async def _check_tools(self):
|
|
"""Check and report available tools"""
|
|
essential_tools = [
|
|
"subfinder", "httpx", "nuclei", "nmap", "katana", "gau",
|
|
"waybackurls", "ffuf", "gobuster", "amass", "naabu"
|
|
]
|
|
|
|
available = []
|
|
missing = []
|
|
|
|
for tool in essential_tools:
|
|
if self._tool_exists(tool):
|
|
available.append(tool)
|
|
else:
|
|
missing.append(tool)
|
|
|
|
await self.log("info", f"🔧 Tools available: {', '.join(available)}")
|
|
if missing:
|
|
await self.log("debug", f"Missing tools: {', '.join(missing)}")
|
|
|
|
def _extract_domain(self, target: str) -> str:
|
|
"""Extract domain from URL"""
|
|
domain = target.replace("https://", "").replace("http://", "")
|
|
domain = domain.split("/")[0]
|
|
domain = domain.split(":")[0]
|
|
return domain
|
|
|
|
def _get_phases(self, depth: str) -> List[tuple]:
|
|
"""Get recon phases based on depth"""
|
|
quick_phases = [
|
|
("DNS Resolution", self._dns_resolution),
|
|
("HTTP Probing", self._http_probe),
|
|
("Basic Path Discovery", self._basic_paths),
|
|
]
|
|
|
|
medium_phases = quick_phases + [
|
|
("Subdomain Enumeration", self._subdomain_enum),
|
|
("URL Collection", self._url_collection),
|
|
("Port Scan (Top 100)", self._port_scan_quick),
|
|
("Technology Detection", self._tech_detection),
|
|
("Web Crawling", self._web_crawl),
|
|
]
|
|
|
|
full_phases = medium_phases + [
|
|
("Full Port Scan", self._port_scan_full),
|
|
("Parameter Discovery", self._param_discovery),
|
|
("JavaScript Analysis", self._js_analysis),
|
|
("Directory Fuzzing", self._directory_fuzz),
|
|
("Nuclei Vulnerability Scan", self._nuclei_scan),
|
|
("Screenshot Capture", self._screenshot_capture),
|
|
]
|
|
|
|
return {
|
|
"quick": quick_phases,
|
|
"medium": medium_phases,
|
|
"full": full_phases
|
|
}.get(depth, medium_phases)
|
|
|
|
async def _run_command(self, cmd: List[str], timeout: int = 120) -> str:
|
|
"""Run a shell command asynchronously"""
|
|
try:
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await asyncio.wait_for(
|
|
process.communicate(),
|
|
timeout=timeout
|
|
)
|
|
return stdout.decode('utf-8', errors='ignore')
|
|
except asyncio.TimeoutError:
|
|
try:
|
|
process.kill()
|
|
except:
|
|
pass
|
|
return ""
|
|
except Exception as e:
|
|
return ""
|
|
|
|
# =========================================================================
|
|
# RECON PHASES
|
|
# =========================================================================
|
|
|
|
async def _dns_resolution(self, domain: str, base_url: str) -> Dict:
|
|
"""DNS resolution using dnsx, dig"""
|
|
results = {"dns_records": [], "subdomains": []}
|
|
|
|
# Try dnsx
|
|
if self._tool_exists("dnsx"):
|
|
output = await self._run_command(
|
|
["dnsx", "-d", domain, "-a", "-aaaa", "-cname", "-mx", "-ns", "-txt", "-silent"],
|
|
timeout=60
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line:
|
|
results["dns_records"].append(line)
|
|
await self.log("debug", f"DNS: {line}")
|
|
|
|
# Fallback to dig
|
|
if not results["dns_records"]:
|
|
for record_type in ["A", "AAAA", "MX", "NS", "TXT", "CNAME"]:
|
|
output = await self._run_command(["dig", domain, record_type, "+short"], timeout=10)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line:
|
|
results["dns_records"].append(f"{record_type}: {line}")
|
|
|
|
return results
|
|
|
|
async def _http_probe(self, domain: str, base_url: str) -> Dict:
|
|
"""HTTP probing using httpx, httprobe"""
|
|
results = {"live_hosts": [], "endpoints": []}
|
|
|
|
# Try httpx (preferred)
|
|
if self._tool_exists("httpx"):
|
|
output = await self._run_command(
|
|
["httpx", "-u", domain, "-silent", "-status-code", "-title",
|
|
"-tech-detect", "-content-length", "-web-server"],
|
|
timeout=60
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line:
|
|
results["live_hosts"].append(line)
|
|
parts = line.split()
|
|
url = parts[0] if parts else f"https://{domain}"
|
|
results["endpoints"].append({
|
|
"url": url,
|
|
"method": "GET",
|
|
"path": "/",
|
|
"status": int(parts[1].strip("[]")) if len(parts) > 1 and parts[1].strip("[]").isdigit() else 200,
|
|
"source": "httpx"
|
|
})
|
|
|
|
# Try httprobe
|
|
elif self._tool_exists("httprobe"):
|
|
process = await asyncio.create_subprocess_exec(
|
|
"httprobe",
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, _ = await asyncio.wait_for(
|
|
process.communicate(input=f"{domain}\n".encode()),
|
|
timeout=30
|
|
)
|
|
if stdout:
|
|
for line in stdout.decode().strip().split("\n"):
|
|
if line:
|
|
results["live_hosts"].append(line)
|
|
results["endpoints"].append({
|
|
"url": line,
|
|
"method": "GET",
|
|
"path": "/",
|
|
"source": "httprobe"
|
|
})
|
|
|
|
# Fallback to curl
|
|
if not results["live_hosts"]:
|
|
for proto in ["https", "http"]:
|
|
url = f"{proto}://{domain}"
|
|
output = await self._run_command(
|
|
["curl", "-sI", "-m", "10", "-o", "/dev/null", "-w", "%{http_code}", url],
|
|
timeout=15
|
|
)
|
|
if output and output.strip() not in ["000", ""]:
|
|
results["live_hosts"].append(f"{url} [{output.strip()}]")
|
|
results["endpoints"].append({
|
|
"url": url,
|
|
"status": int(output.strip()) if output.strip().isdigit() else 0,
|
|
"source": "curl"
|
|
})
|
|
|
|
return results
|
|
|
|
async def _basic_paths(self, domain: str, base_url: str) -> Dict:
|
|
"""Check common paths"""
|
|
results = {"endpoints": [], "interesting_paths": []}
|
|
|
|
common_paths = [
|
|
"/", "/robots.txt", "/sitemap.xml", "/.git/config", "/.env",
|
|
"/api", "/api/v1", "/api/v2", "/graphql", "/swagger", "/api-docs",
|
|
"/swagger.json", "/openapi.json", "/.well-known/security.txt",
|
|
"/admin", "/administrator", "/login", "/register", "/dashboard",
|
|
"/wp-admin", "/wp-login.php", "/wp-content", "/wp-includes",
|
|
"/phpmyadmin", "/pma", "/console", "/debug", "/trace",
|
|
"/actuator", "/actuator/health", "/actuator/env", "/metrics",
|
|
"/server-status", "/server-info", "/.htaccess", "/.htpasswd",
|
|
"/backup", "/backup.zip", "/backup.sql", "/db.sql", "/dump.sql",
|
|
"/config", "/config.php", "/config.json", "/settings.json",
|
|
"/uploads", "/files", "/static", "/assets", "/media",
|
|
"/test", "/dev", "/staging", "/temp", "/tmp",
|
|
"/.git/HEAD", "/.svn/entries", "/.DS_Store",
|
|
"/info.php", "/phpinfo.php", "/test.php",
|
|
"/elmah.axd", "/trace.axd", "/web.config"
|
|
]
|
|
|
|
import aiohttp
|
|
connector = aiohttp.TCPConnector(ssl=False, limit=20)
|
|
timeout = aiohttp.ClientTimeout(total=10)
|
|
|
|
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
|
|
tasks = []
|
|
for path in common_paths:
|
|
tasks.append(self._check_path(session, base_url, path, results))
|
|
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
return results
|
|
|
|
async def _check_path(self, session, base_url: str, path: str, results: Dict):
|
|
"""Check a single path"""
|
|
try:
|
|
url = f"{base_url.rstrip('/')}{path}"
|
|
async with session.get(url, allow_redirects=False) as response:
|
|
if response.status < 404:
|
|
endpoint = {
|
|
"url": url,
|
|
"path": path,
|
|
"status": response.status,
|
|
"content_type": response.headers.get("Content-Type", ""),
|
|
"content_length": response.headers.get("Content-Length", ""),
|
|
"source": "path_check"
|
|
}
|
|
results["endpoints"].append(endpoint)
|
|
|
|
# Mark interesting paths
|
|
sensitive_paths = ["/.git", "/.env", "/debug", "/actuator",
|
|
"/backup", "/config", "/.htaccess", "/phpinfo",
|
|
"/trace", "/elmah", "/web.config"]
|
|
if any(s in path for s in sensitive_paths):
|
|
results["interesting_paths"].append({
|
|
"path": path,
|
|
"status": response.status,
|
|
"risk": "high",
|
|
"reason": "Potentially sensitive file/endpoint"
|
|
})
|
|
await self.log("warning", f"🚨 Interesting: {path} [{response.status}]")
|
|
else:
|
|
await self.log("info", f"Found: {path} [{response.status}]")
|
|
except:
|
|
pass
|
|
|
|
async def _subdomain_enum(self, domain: str, base_url: str) -> Dict:
|
|
"""Subdomain enumeration using multiple tools"""
|
|
results = {"subdomains": []}
|
|
found_subs = set()
|
|
|
|
await self.log("info", f"🔍 Enumerating subdomains for {domain}")
|
|
|
|
# 1. Subfinder (fast and reliable)
|
|
if self._tool_exists("subfinder"):
|
|
await self.log("debug", "Running subfinder...")
|
|
output = await self._run_command(
|
|
["subfinder", "-d", domain, "-silent", "-all"],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
for sub in output.strip().split("\n"):
|
|
if sub and sub not in found_subs:
|
|
found_subs.add(sub)
|
|
|
|
# 2. Amass (comprehensive)
|
|
if self._tool_exists("amass"):
|
|
await self.log("debug", "Running amass passive...")
|
|
output = await self._run_command(
|
|
["amass", "enum", "-passive", "-d", domain, "-timeout", "3"],
|
|
timeout=240
|
|
)
|
|
if output:
|
|
for sub in output.strip().split("\n"):
|
|
if sub and sub not in found_subs:
|
|
found_subs.add(sub)
|
|
|
|
# 3. Assetfinder
|
|
if self._tool_exists("assetfinder"):
|
|
await self.log("debug", "Running assetfinder...")
|
|
output = await self._run_command(
|
|
["assetfinder", "--subs-only", domain],
|
|
timeout=60
|
|
)
|
|
if output:
|
|
for sub in output.strip().split("\n"):
|
|
if sub and sub not in found_subs:
|
|
found_subs.add(sub)
|
|
|
|
# 4. Chaos (if API key available)
|
|
if self._tool_exists("chaos") and os.environ.get("CHAOS_KEY"):
|
|
await self.log("debug", "Running chaos...")
|
|
output = await self._run_command(
|
|
["chaos", "-d", domain, "-silent"],
|
|
timeout=60
|
|
)
|
|
if output:
|
|
for sub in output.strip().split("\n"):
|
|
if sub and sub not in found_subs:
|
|
found_subs.add(sub)
|
|
|
|
# 5. Cero (certificate transparency)
|
|
if self._tool_exists("cero"):
|
|
await self.log("debug", "Running cero...")
|
|
output = await self._run_command(
|
|
["cero", domain],
|
|
timeout=60
|
|
)
|
|
if output:
|
|
for sub in output.strip().split("\n"):
|
|
if sub and domain in sub and sub not in found_subs:
|
|
found_subs.add(sub)
|
|
|
|
results["subdomains"] = list(found_subs)
|
|
await self.log("info", f"✓ Found {len(found_subs)} subdomains")
|
|
|
|
return results
|
|
|
|
async def _url_collection(self, domain: str, base_url: str) -> Dict:
|
|
"""Collect URLs from various sources"""
|
|
results = {"urls": [], "parameters": [], "js_files": []}
|
|
found_urls = set()
|
|
|
|
await self.log("info", f"🔗 Collecting URLs for {domain}")
|
|
|
|
# 1. GAU (GetAllUrls)
|
|
if self._tool_exists("gau"):
|
|
await self.log("debug", "Running gau...")
|
|
output = await self._run_command(
|
|
["gau", "--threads", "5", "--subs", domain],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
for url in output.strip().split("\n")[:1000]:
|
|
if url and url not in found_urls:
|
|
found_urls.add(url)
|
|
if url.endswith(".js"):
|
|
results["js_files"].append(url)
|
|
if "?" in url:
|
|
results["parameters"].append(url)
|
|
|
|
# 2. Waybackurls
|
|
if self._tool_exists("waybackurls"):
|
|
await self.log("debug", "Running waybackurls...")
|
|
output = await self._run_command(
|
|
["waybackurls", domain],
|
|
timeout=120
|
|
)
|
|
if output:
|
|
for url in output.strip().split("\n")[:1000]:
|
|
if url and url not in found_urls:
|
|
found_urls.add(url)
|
|
if url.endswith(".js"):
|
|
results["js_files"].append(url)
|
|
if "?" in url:
|
|
results["parameters"].append(url)
|
|
|
|
results["urls"] = list(found_urls)
|
|
await self.log("info", f"✓ Collected {len(found_urls)} URLs, {len(results['parameters'])} with parameters")
|
|
|
|
return results
|
|
|
|
async def _port_scan_quick(self, domain: str, base_url: str) -> Dict:
|
|
"""Quick port scan (top 100)"""
|
|
results = {"ports": []}
|
|
|
|
await self.log("info", f"🔌 Port scanning {domain} (top 100)")
|
|
|
|
# Try naabu (fastest)
|
|
if self._tool_exists("naabu"):
|
|
await self.log("debug", "Running naabu...")
|
|
output = await self._run_command(
|
|
["naabu", "-host", domain, "-top-ports", "100", "-silent"],
|
|
timeout=120
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line:
|
|
results["ports"].append(line)
|
|
await self.log("info", f"Port: {line}")
|
|
|
|
# Fallback to nmap
|
|
elif self._tool_exists("nmap"):
|
|
await self.log("debug", "Running nmap...")
|
|
output = await self._run_command(
|
|
["nmap", "-sT", "-T4", "--top-ports", "100", "-oG", "-", domain],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
for line in output.split("\n"):
|
|
if "Ports:" in line:
|
|
ports_part = line.split("Ports:")[1]
|
|
for port_info in ports_part.split(","):
|
|
if "/open/" in port_info:
|
|
port = port_info.strip().split("/")[0]
|
|
results["ports"].append(f"{domain}:{port}")
|
|
await self.log("info", f"Port: {domain}:{port}")
|
|
|
|
return results
|
|
|
|
async def _port_scan_full(self, domain: str, base_url: str) -> Dict:
|
|
"""Full port scan"""
|
|
results = {"ports": []}
|
|
|
|
await self.log("info", f"🔌 Full port scan on {domain}")
|
|
|
|
# Try rustscan (fastest full scan)
|
|
if self._tool_exists("rustscan"):
|
|
await self.log("debug", "Running rustscan...")
|
|
output = await self._run_command(
|
|
["rustscan", "-a", domain, "--ulimit", "5000", "-g"],
|
|
timeout=300
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line and "->" in line:
|
|
results["ports"].append(line)
|
|
|
|
# Fallback to naabu full
|
|
elif self._tool_exists("naabu"):
|
|
output = await self._run_command(
|
|
["naabu", "-host", domain, "-p", "-", "-silent"],
|
|
timeout=600
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line:
|
|
results["ports"].append(line)
|
|
|
|
return results
|
|
|
|
async def _tech_detection(self, domain: str, base_url: str) -> Dict:
|
|
"""Detect technologies"""
|
|
results = {"technologies": []}
|
|
|
|
await self.log("info", f"🔬 Detecting technologies on {base_url}")
|
|
|
|
# Try whatweb
|
|
if self._tool_exists("whatweb"):
|
|
await self.log("debug", "Running whatweb...")
|
|
output = await self._run_command(
|
|
["whatweb", "-q", "-a", "3", "--color=never", base_url],
|
|
timeout=60
|
|
)
|
|
if output:
|
|
results["technologies"].append({"source": "whatweb", "data": output.strip()})
|
|
await self.log("debug", f"WhatWeb: {output[:200]}...")
|
|
|
|
# Try wafw00f (WAF detection)
|
|
if self._tool_exists("wafw00f"):
|
|
await self.log("debug", "Running wafw00f...")
|
|
output = await self._run_command(
|
|
["wafw00f", base_url, "-o", "-"],
|
|
timeout=60
|
|
)
|
|
if output and "No WAF" not in output:
|
|
results["technologies"].append({"source": "wafw00f", "data": output.strip()})
|
|
await self.log("warning", f"WAF detected: {output[:100]}")
|
|
|
|
return results
|
|
|
|
async def _web_crawl(self, domain: str, base_url: str) -> Dict:
|
|
"""Crawl the website for endpoints"""
|
|
results = {"endpoints": [], "js_files": [], "urls": []}
|
|
|
|
await self.log("info", f"🕷 Crawling {base_url}")
|
|
|
|
# Try katana (modern, fast)
|
|
if self._tool_exists("katana"):
|
|
await self.log("debug", "Running katana...")
|
|
output = await self._run_command(
|
|
["katana", "-u", base_url, "-d", "3", "-silent", "-jc", "-kf", "all"],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
for url in output.strip().split("\n"):
|
|
if url:
|
|
if url.endswith(".js"):
|
|
results["js_files"].append(url)
|
|
results["endpoints"].append({"url": url, "source": "katana"})
|
|
results["urls"].append(url)
|
|
|
|
# Try gospider
|
|
if self._tool_exists("gospider"):
|
|
await self.log("debug", "Running gospider...")
|
|
output = await self._run_command(
|
|
["gospider", "-s", base_url, "-d", "2", "-t", "5", "--no-redirect", "-q"],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if "[" in line and "]" in line:
|
|
parts = line.split(" - ")
|
|
if len(parts) > 1:
|
|
url = parts[-1].strip()
|
|
if url and url.startswith("http"):
|
|
if url not in results["urls"]:
|
|
results["urls"].append(url)
|
|
results["endpoints"].append({"url": url, "source": "gospider"})
|
|
|
|
# Try hakrawler
|
|
if self._tool_exists("hakrawler") and not results["endpoints"]:
|
|
await self.log("debug", "Running hakrawler...")
|
|
process = await asyncio.create_subprocess_exec(
|
|
"hakrawler", "-d", "2", "-u",
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, _ = await asyncio.wait_for(
|
|
process.communicate(input=f"{base_url}\n".encode()),
|
|
timeout=120
|
|
)
|
|
if stdout:
|
|
for url in stdout.decode().strip().split("\n"):
|
|
if url and url.startswith("http"):
|
|
results["urls"].append(url)
|
|
results["endpoints"].append({"url": url, "source": "hakrawler"})
|
|
|
|
await self.log("info", f"✓ Crawled {len(results['endpoints'])} endpoints, {len(results['js_files'])} JS files")
|
|
return results
|
|
|
|
async def _param_discovery(self, domain: str, base_url: str) -> Dict:
|
|
"""Discover parameters"""
|
|
results = {"parameters": []}
|
|
|
|
await self.log("info", f"🔎 Discovering parameters for {domain}")
|
|
|
|
# Try paramspider
|
|
if self._tool_exists("paramspider"):
|
|
await self.log("debug", "Running paramspider...")
|
|
output = await self._run_command(
|
|
["paramspider", "-d", domain, "--quiet"],
|
|
timeout=120
|
|
)
|
|
if output:
|
|
for url in output.strip().split("\n"):
|
|
if url and "?" in url:
|
|
results["parameters"].append(url)
|
|
|
|
# Try arjun
|
|
if self._tool_exists("arjun"):
|
|
await self.log("debug", "Running arjun...")
|
|
output = await self._run_command(
|
|
["arjun", "-u", base_url, "--stable", "-oT", "/dev/stdout"],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if ":" in line and line not in results["parameters"]:
|
|
results["parameters"].append(line)
|
|
|
|
return results
|
|
|
|
async def _js_analysis(self, domain: str, base_url: str) -> Dict:
|
|
"""Analyze JavaScript files for secrets and endpoints"""
|
|
results = {"secrets": [], "endpoints": [], "js_files": []}
|
|
|
|
await self.log("info", f"📜 Analyzing JavaScript files")
|
|
|
|
# Try getJS
|
|
if self._tool_exists("getJS"):
|
|
await self.log("debug", "Running getJS...")
|
|
output = await self._run_command(
|
|
["getJS", "-u", base_url, "--complete"],
|
|
timeout=60
|
|
)
|
|
if output:
|
|
for js_url in output.strip().split("\n"):
|
|
if js_url and js_url.endswith(".js"):
|
|
results["js_files"].append(js_url)
|
|
|
|
return results
|
|
|
|
async def _directory_fuzz(self, domain: str, base_url: str) -> Dict:
|
|
"""Directory fuzzing"""
|
|
results = {"endpoints": []}
|
|
|
|
wordlist = self.wordlists_path / "common.txt"
|
|
if not wordlist.exists():
|
|
return results
|
|
|
|
await self.log("info", f"📂 Fuzzing directories on {base_url}")
|
|
|
|
# Try ffuf (fastest)
|
|
if self._tool_exists("ffuf"):
|
|
await self.log("debug", "Running ffuf...")
|
|
output = await self._run_command(
|
|
["ffuf", "-u", f"{base_url}/FUZZ", "-w", str(wordlist),
|
|
"-mc", "200,201,204,301,302,307,401,403,405",
|
|
"-t", "50", "-o", "-", "-of", "json"],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
try:
|
|
data = json.loads(output)
|
|
for result in data.get("results", []):
|
|
results["endpoints"].append({
|
|
"url": result.get("url", ""),
|
|
"status": result.get("status", 0),
|
|
"length": result.get("length", 0),
|
|
"source": "ffuf"
|
|
})
|
|
except:
|
|
pass
|
|
|
|
# Try gobuster
|
|
elif self._tool_exists("gobuster"):
|
|
await self.log("debug", "Running gobuster...")
|
|
output = await self._run_command(
|
|
["gobuster", "dir", "-u", base_url, "-w", str(wordlist),
|
|
"-t", "50", "-q", "--no-error"],
|
|
timeout=180
|
|
)
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line and "(Status:" in line:
|
|
parts = line.split()
|
|
if parts:
|
|
path = parts[0]
|
|
results["endpoints"].append({
|
|
"url": f"{base_url}{path}",
|
|
"path": path,
|
|
"source": "gobuster"
|
|
})
|
|
|
|
return results
|
|
|
|
async def _nuclei_scan(self, domain: str, base_url: str) -> Dict:
|
|
"""Run nuclei vulnerability scanner"""
|
|
results = {"vulnerabilities": []}
|
|
|
|
if not self._tool_exists("nuclei"):
|
|
return results
|
|
|
|
await self.log("info", f"☢ Running Nuclei vulnerability scan on {base_url}")
|
|
|
|
output = await self._run_command(
|
|
["nuclei", "-u", base_url, "-severity", "critical,high,medium",
|
|
"-silent", "-json", "-c", "25"],
|
|
timeout=600
|
|
)
|
|
|
|
if output:
|
|
for line in output.strip().split("\n"):
|
|
if line:
|
|
try:
|
|
vuln = json.loads(line)
|
|
results["vulnerabilities"].append({
|
|
"name": vuln.get("info", {}).get("name", "Unknown"),
|
|
"severity": vuln.get("info", {}).get("severity", "unknown"),
|
|
"url": vuln.get("matched-at", ""),
|
|
"template": vuln.get("template-id", ""),
|
|
"description": vuln.get("info", {}).get("description", ""),
|
|
"matcher_name": vuln.get("matcher-name", "")
|
|
})
|
|
|
|
await ws_manager.broadcast_vulnerability_found(self.scan_id, {
|
|
"title": vuln.get("info", {}).get("name", "Unknown"),
|
|
"severity": vuln.get("info", {}).get("severity", "unknown"),
|
|
"type": "nuclei",
|
|
"endpoint": vuln.get("matched-at", "")
|
|
})
|
|
|
|
severity = vuln.get("info", {}).get("severity", "unknown").upper()
|
|
await self.log("warning", f"☢ NUCLEI [{severity}]: {vuln.get('info', {}).get('name')}")
|
|
except:
|
|
pass
|
|
|
|
await self.log("info", f"✓ Nuclei found {len(results['vulnerabilities'])} issues")
|
|
return results
|
|
|
|
async def _screenshot_capture(self, domain: str, base_url: str) -> Dict:
|
|
"""Capture screenshots of web pages"""
|
|
results = {"screenshots": []}
|
|
|
|
if not self._tool_exists("gowitness"):
|
|
return results
|
|
|
|
await self.log("info", f"📸 Capturing screenshots")
|
|
|
|
screenshot_dir = self.results_path / "screenshots" / self.scan_id
|
|
screenshot_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
output = await self._run_command(
|
|
["gowitness", "single", base_url, "-P", str(screenshot_dir)],
|
|
timeout=60
|
|
)
|
|
|
|
# List captured screenshots
|
|
if screenshot_dir.exists():
|
|
for f in screenshot_dir.glob("*.png"):
|
|
results["screenshots"].append(str(f))
|
|
|
|
return results
|
|
|
|
def _merge_results(self, base: Dict, new: Dict) -> Dict:
|
|
"""Merge two result dictionaries"""
|
|
for key, value in new.items():
|
|
if key in base:
|
|
if isinstance(value, list):
|
|
# Deduplicate while merging
|
|
existing = set(str(x) for x in base[key])
|
|
for item in value:
|
|
if str(item) not in existing:
|
|
base[key].append(item)
|
|
existing.add(str(item))
|
|
elif isinstance(value, dict):
|
|
base[key].update(value)
|
|
else:
|
|
base[key] = value
|
|
return base
|
|
|
|
|
|
async def check_tools_installed() -> Dict[str, bool]:
|
|
"""Check which recon tools are installed"""
|
|
tools = [
|
|
# Subdomain enumeration
|
|
"subfinder", "amass", "assetfinder", "chaos", "cero",
|
|
# DNS
|
|
"dnsx", "massdns", "puredns",
|
|
# HTTP probing
|
|
"httpx", "httprobe",
|
|
# URL discovery
|
|
"gau", "waybackurls", "katana", "gospider", "hakrawler", "cariddi", "getJS",
|
|
# Port scanning
|
|
"nmap", "naabu", "rustscan",
|
|
# Tech detection
|
|
"whatweb", "wafw00f",
|
|
# Fuzzing
|
|
"ffuf", "gobuster", "dirb", "dirsearch", "wfuzz",
|
|
# Parameter discovery
|
|
"arjun", "paramspider",
|
|
# Vulnerability scanning
|
|
"nuclei", "nikto", "sqlmap", "dalfox", "crlfuzz",
|
|
# Utilities
|
|
"gf", "qsreplace", "unfurl", "anew", "jq",
|
|
# Screenshot
|
|
"gowitness",
|
|
# Network
|
|
"curl", "wget", "dig", "whois"
|
|
]
|
|
|
|
results = {}
|
|
for tool in tools:
|
|
results[tool] = shutil.which(tool) is not None
|
|
|
|
return results
|