Files
NeuroSploit/backend/core/recon_integration.py
2026-01-19 19:21:57 -03:00

884 lines
34 KiB
Python

"""
NeuroSploit v3 - Full Recon Integration
Integrates 40+ security/recon tools for comprehensive reconnaissance:
- Subdomain Enumeration: subfinder, amass, assetfinder, chaos, cero
- DNS Resolution: dnsx, massdns, puredns
- HTTP Probing: httpx, httprobe
- URL Discovery: gau, waybackurls, katana, gospider, hakrawler, cariddi
- Port Scanning: nmap, naabu, rustscan
- Tech Detection: whatweb, wafw00f
- Fuzzing: ffuf, gobuster, dirb, dirsearch
- Vulnerability Scanning: nuclei, nikto
- Parameter Discovery: arjun, paramspider
"""
import asyncio
import subprocess
import json
import os
import sys
import shutil
from typing import Optional, Callable, List, Dict, Any
from datetime import datetime
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from backend.api.websocket import manager as ws_manager
class ReconIntegration:
"""
Full reconnaissance integration with 40+ security tools.
Automatically uses available tools and skips missing ones.
"""
def __init__(self, scan_id: str):
self.scan_id = scan_id
self.base_path = Path("/app")
self.results_path = self.base_path / "data" / "recon"
self.results_path.mkdir(parents=True, exist_ok=True)
self.wordlists_path = Path("/opt/wordlists")
# Track available tools
self.available_tools = {}
async def log(self, level: str, message: str):
"""Send log message via WebSocket"""
await ws_manager.broadcast_log(self.scan_id, level, message)
print(f"[{level.upper()}] {message}")
def _tool_exists(self, tool: str) -> bool:
"""Check if a tool is available"""
if tool not in self.available_tools:
self.available_tools[tool] = shutil.which(tool) is not None
return self.available_tools[tool]
async def run_full_recon(self, target: str, depth: str = "medium") -> Dict[str, Any]:
"""
Run full reconnaissance using all available tools.
Args:
target: Target domain or URL
depth: quick, medium, or full
Returns:
Dictionary with all recon results
"""
await self.log("info", f"🚀 Starting FULL reconnaissance on {target}")
await self.log("info", f"📊 Depth level: {depth}")
await ws_manager.broadcast_progress(self.scan_id, 5, "Initializing reconnaissance...")
# Check available tools
await self._check_tools()
results = {
"target": target,
"timestamp": datetime.utcnow().isoformat(),
"depth": depth,
"subdomains": [],
"live_hosts": [],
"urls": [],
"endpoints": [],
"ports": [],
"technologies": [],
"vulnerabilities": [],
"js_files": [],
"parameters": [],
"interesting_paths": [],
"dns_records": [],
"screenshots": [],
"secrets": []
}
# Extract domain from URL
domain = self._extract_domain(target)
base_url = target if target.startswith("http") else f"https://{target}"
# Run recon phases based on depth
phases = self._get_phases(depth)
total_phases = len(phases)
for i, (phase_name, phase_func) in enumerate(phases):
try:
progress = 5 + int((i / total_phases) * 35)
await ws_manager.broadcast_progress(self.scan_id, progress, f"Recon: {phase_name}")
await self.log("info", f"▶ Running {phase_name}...")
phase_results = await phase_func(domain, base_url)
results = self._merge_results(results, phase_results)
# Broadcast discoveries
for endpoint in phase_results.get("endpoints", []):
if isinstance(endpoint, dict):
await ws_manager.broadcast_endpoint_found(self.scan_id, endpoint)
for url in phase_results.get("urls", [])[:10]:
await ws_manager.broadcast_url_discovered(self.scan_id, url)
await self.log("info", f"{phase_name} complete")
except Exception as e:
await self.log("warning", f"{phase_name} failed: {str(e)}")
# Summary
await self.log("info", f"═══════════════════════════════════════")
await self.log("info", f"📊 Reconnaissance Summary:")
await self.log("info", f" • Subdomains: {len(results['subdomains'])}")
await self.log("info", f" • Live hosts: {len(results['live_hosts'])}")
await self.log("info", f" • URLs: {len(results['urls'])}")
await self.log("info", f" • Endpoints: {len(results['endpoints'])}")
await self.log("info", f" • Open ports: {len(results['ports'])}")
await self.log("info", f" • JS files: {len(results['js_files'])}")
await self.log("info", f" • Nuclei findings: {len(results['vulnerabilities'])}")
await self.log("info", f"═══════════════════════════════════════")
return results
async def _check_tools(self):
"""Check and report available tools"""
essential_tools = [
"subfinder", "httpx", "nuclei", "nmap", "katana", "gau",
"waybackurls", "ffuf", "gobuster", "amass", "naabu"
]
available = []
missing = []
for tool in essential_tools:
if self._tool_exists(tool):
available.append(tool)
else:
missing.append(tool)
await self.log("info", f"🔧 Tools available: {', '.join(available)}")
if missing:
await self.log("debug", f"Missing tools: {', '.join(missing)}")
def _extract_domain(self, target: str) -> str:
"""Extract domain from URL"""
domain = target.replace("https://", "").replace("http://", "")
domain = domain.split("/")[0]
domain = domain.split(":")[0]
return domain
def _get_phases(self, depth: str) -> List[tuple]:
"""Get recon phases based on depth"""
quick_phases = [
("DNS Resolution", self._dns_resolution),
("HTTP Probing", self._http_probe),
("Basic Path Discovery", self._basic_paths),
]
medium_phases = quick_phases + [
("Subdomain Enumeration", self._subdomain_enum),
("URL Collection", self._url_collection),
("Port Scan (Top 100)", self._port_scan_quick),
("Technology Detection", self._tech_detection),
("Web Crawling", self._web_crawl),
]
full_phases = medium_phases + [
("Full Port Scan", self._port_scan_full),
("Parameter Discovery", self._param_discovery),
("JavaScript Analysis", self._js_analysis),
("Directory Fuzzing", self._directory_fuzz),
("Nuclei Vulnerability Scan", self._nuclei_scan),
("Screenshot Capture", self._screenshot_capture),
]
return {
"quick": quick_phases,
"medium": medium_phases,
"full": full_phases
}.get(depth, medium_phases)
async def _run_command(self, cmd: List[str], timeout: int = 120) -> str:
"""Run a shell command asynchronously"""
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=timeout
)
return stdout.decode('utf-8', errors='ignore')
except asyncio.TimeoutError:
try:
process.kill()
except:
pass
return ""
except Exception as e:
return ""
# =========================================================================
# RECON PHASES
# =========================================================================
async def _dns_resolution(self, domain: str, base_url: str) -> Dict:
"""DNS resolution using dnsx, dig"""
results = {"dns_records": [], "subdomains": []}
# Try dnsx
if self._tool_exists("dnsx"):
output = await self._run_command(
["dnsx", "-d", domain, "-a", "-aaaa", "-cname", "-mx", "-ns", "-txt", "-silent"],
timeout=60
)
if output:
for line in output.strip().split("\n"):
if line:
results["dns_records"].append(line)
await self.log("debug", f"DNS: {line}")
# Fallback to dig
if not results["dns_records"]:
for record_type in ["A", "AAAA", "MX", "NS", "TXT", "CNAME"]:
output = await self._run_command(["dig", domain, record_type, "+short"], timeout=10)
if output:
for line in output.strip().split("\n"):
if line:
results["dns_records"].append(f"{record_type}: {line}")
return results
async def _http_probe(self, domain: str, base_url: str) -> Dict:
"""HTTP probing using httpx, httprobe"""
results = {"live_hosts": [], "endpoints": []}
# Try httpx (preferred)
if self._tool_exists("httpx"):
output = await self._run_command(
["httpx", "-u", domain, "-silent", "-status-code", "-title",
"-tech-detect", "-content-length", "-web-server"],
timeout=60
)
if output:
for line in output.strip().split("\n"):
if line:
results["live_hosts"].append(line)
parts = line.split()
url = parts[0] if parts else f"https://{domain}"
results["endpoints"].append({
"url": url,
"method": "GET",
"path": "/",
"status": int(parts[1].strip("[]")) if len(parts) > 1 and parts[1].strip("[]").isdigit() else 200,
"source": "httpx"
})
# Try httprobe
elif self._tool_exists("httprobe"):
process = await asyncio.create_subprocess_exec(
"httprobe",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await asyncio.wait_for(
process.communicate(input=f"{domain}\n".encode()),
timeout=30
)
if stdout:
for line in stdout.decode().strip().split("\n"):
if line:
results["live_hosts"].append(line)
results["endpoints"].append({
"url": line,
"method": "GET",
"path": "/",
"source": "httprobe"
})
# Fallback to curl
if not results["live_hosts"]:
for proto in ["https", "http"]:
url = f"{proto}://{domain}"
output = await self._run_command(
["curl", "-sI", "-m", "10", "-o", "/dev/null", "-w", "%{http_code}", url],
timeout=15
)
if output and output.strip() not in ["000", ""]:
results["live_hosts"].append(f"{url} [{output.strip()}]")
results["endpoints"].append({
"url": url,
"status": int(output.strip()) if output.strip().isdigit() else 0,
"source": "curl"
})
return results
async def _basic_paths(self, domain: str, base_url: str) -> Dict:
"""Check common paths"""
results = {"endpoints": [], "interesting_paths": []}
common_paths = [
"/", "/robots.txt", "/sitemap.xml", "/.git/config", "/.env",
"/api", "/api/v1", "/api/v2", "/graphql", "/swagger", "/api-docs",
"/swagger.json", "/openapi.json", "/.well-known/security.txt",
"/admin", "/administrator", "/login", "/register", "/dashboard",
"/wp-admin", "/wp-login.php", "/wp-content", "/wp-includes",
"/phpmyadmin", "/pma", "/console", "/debug", "/trace",
"/actuator", "/actuator/health", "/actuator/env", "/metrics",
"/server-status", "/server-info", "/.htaccess", "/.htpasswd",
"/backup", "/backup.zip", "/backup.sql", "/db.sql", "/dump.sql",
"/config", "/config.php", "/config.json", "/settings.json",
"/uploads", "/files", "/static", "/assets", "/media",
"/test", "/dev", "/staging", "/temp", "/tmp",
"/.git/HEAD", "/.svn/entries", "/.DS_Store",
"/info.php", "/phpinfo.php", "/test.php",
"/elmah.axd", "/trace.axd", "/web.config"
]
import aiohttp
connector = aiohttp.TCPConnector(ssl=False, limit=20)
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
tasks = []
for path in common_paths:
tasks.append(self._check_path(session, base_url, path, results))
await asyncio.gather(*tasks, return_exceptions=True)
return results
async def _check_path(self, session, base_url: str, path: str, results: Dict):
"""Check a single path"""
try:
url = f"{base_url.rstrip('/')}{path}"
async with session.get(url, allow_redirects=False) as response:
if response.status < 404:
endpoint = {
"url": url,
"path": path,
"status": response.status,
"content_type": response.headers.get("Content-Type", ""),
"content_length": response.headers.get("Content-Length", ""),
"source": "path_check"
}
results["endpoints"].append(endpoint)
# Mark interesting paths
sensitive_paths = ["/.git", "/.env", "/debug", "/actuator",
"/backup", "/config", "/.htaccess", "/phpinfo",
"/trace", "/elmah", "/web.config"]
if any(s in path for s in sensitive_paths):
results["interesting_paths"].append({
"path": path,
"status": response.status,
"risk": "high",
"reason": "Potentially sensitive file/endpoint"
})
await self.log("warning", f"🚨 Interesting: {path} [{response.status}]")
else:
await self.log("info", f"Found: {path} [{response.status}]")
except:
pass
async def _subdomain_enum(self, domain: str, base_url: str) -> Dict:
"""Subdomain enumeration using multiple tools"""
results = {"subdomains": []}
found_subs = set()
await self.log("info", f"🔍 Enumerating subdomains for {domain}")
# 1. Subfinder (fast and reliable)
if self._tool_exists("subfinder"):
await self.log("debug", "Running subfinder...")
output = await self._run_command(
["subfinder", "-d", domain, "-silent", "-all"],
timeout=180
)
if output:
for sub in output.strip().split("\n"):
if sub and sub not in found_subs:
found_subs.add(sub)
# 2. Amass (comprehensive)
if self._tool_exists("amass"):
await self.log("debug", "Running amass passive...")
output = await self._run_command(
["amass", "enum", "-passive", "-d", domain, "-timeout", "3"],
timeout=240
)
if output:
for sub in output.strip().split("\n"):
if sub and sub not in found_subs:
found_subs.add(sub)
# 3. Assetfinder
if self._tool_exists("assetfinder"):
await self.log("debug", "Running assetfinder...")
output = await self._run_command(
["assetfinder", "--subs-only", domain],
timeout=60
)
if output:
for sub in output.strip().split("\n"):
if sub and sub not in found_subs:
found_subs.add(sub)
# 4. Chaos (if API key available)
if self._tool_exists("chaos") and os.environ.get("CHAOS_KEY"):
await self.log("debug", "Running chaos...")
output = await self._run_command(
["chaos", "-d", domain, "-silent"],
timeout=60
)
if output:
for sub in output.strip().split("\n"):
if sub and sub not in found_subs:
found_subs.add(sub)
# 5. Cero (certificate transparency)
if self._tool_exists("cero"):
await self.log("debug", "Running cero...")
output = await self._run_command(
["cero", domain],
timeout=60
)
if output:
for sub in output.strip().split("\n"):
if sub and domain in sub and sub not in found_subs:
found_subs.add(sub)
results["subdomains"] = list(found_subs)
await self.log("info", f"✓ Found {len(found_subs)} subdomains")
return results
async def _url_collection(self, domain: str, base_url: str) -> Dict:
"""Collect URLs from various sources"""
results = {"urls": [], "parameters": [], "js_files": []}
found_urls = set()
await self.log("info", f"🔗 Collecting URLs for {domain}")
# 1. GAU (GetAllUrls)
if self._tool_exists("gau"):
await self.log("debug", "Running gau...")
output = await self._run_command(
["gau", "--threads", "5", "--subs", domain],
timeout=180
)
if output:
for url in output.strip().split("\n")[:1000]:
if url and url not in found_urls:
found_urls.add(url)
if url.endswith(".js"):
results["js_files"].append(url)
if "?" in url:
results["parameters"].append(url)
# 2. Waybackurls
if self._tool_exists("waybackurls"):
await self.log("debug", "Running waybackurls...")
output = await self._run_command(
["waybackurls", domain],
timeout=120
)
if output:
for url in output.strip().split("\n")[:1000]:
if url and url not in found_urls:
found_urls.add(url)
if url.endswith(".js"):
results["js_files"].append(url)
if "?" in url:
results["parameters"].append(url)
results["urls"] = list(found_urls)
await self.log("info", f"✓ Collected {len(found_urls)} URLs, {len(results['parameters'])} with parameters")
return results
async def _port_scan_quick(self, domain: str, base_url: str) -> Dict:
"""Quick port scan (top 100)"""
results = {"ports": []}
await self.log("info", f"🔌 Port scanning {domain} (top 100)")
# Try naabu (fastest)
if self._tool_exists("naabu"):
await self.log("debug", "Running naabu...")
output = await self._run_command(
["naabu", "-host", domain, "-top-ports", "100", "-silent"],
timeout=120
)
if output:
for line in output.strip().split("\n"):
if line:
results["ports"].append(line)
await self.log("info", f"Port: {line}")
# Fallback to nmap
elif self._tool_exists("nmap"):
await self.log("debug", "Running nmap...")
output = await self._run_command(
["nmap", "-sT", "-T4", "--top-ports", "100", "-oG", "-", domain],
timeout=180
)
if output:
for line in output.split("\n"):
if "Ports:" in line:
ports_part = line.split("Ports:")[1]
for port_info in ports_part.split(","):
if "/open/" in port_info:
port = port_info.strip().split("/")[0]
results["ports"].append(f"{domain}:{port}")
await self.log("info", f"Port: {domain}:{port}")
return results
async def _port_scan_full(self, domain: str, base_url: str) -> Dict:
"""Full port scan"""
results = {"ports": []}
await self.log("info", f"🔌 Full port scan on {domain}")
# Try rustscan (fastest full scan)
if self._tool_exists("rustscan"):
await self.log("debug", "Running rustscan...")
output = await self._run_command(
["rustscan", "-a", domain, "--ulimit", "5000", "-g"],
timeout=300
)
if output:
for line in output.strip().split("\n"):
if line and "->" in line:
results["ports"].append(line)
# Fallback to naabu full
elif self._tool_exists("naabu"):
output = await self._run_command(
["naabu", "-host", domain, "-p", "-", "-silent"],
timeout=600
)
if output:
for line in output.strip().split("\n"):
if line:
results["ports"].append(line)
return results
async def _tech_detection(self, domain: str, base_url: str) -> Dict:
"""Detect technologies"""
results = {"technologies": []}
await self.log("info", f"🔬 Detecting technologies on {base_url}")
# Try whatweb
if self._tool_exists("whatweb"):
await self.log("debug", "Running whatweb...")
output = await self._run_command(
["whatweb", "-q", "-a", "3", "--color=never", base_url],
timeout=60
)
if output:
results["technologies"].append({"source": "whatweb", "data": output.strip()})
await self.log("debug", f"WhatWeb: {output[:200]}...")
# Try wafw00f (WAF detection)
if self._tool_exists("wafw00f"):
await self.log("debug", "Running wafw00f...")
output = await self._run_command(
["wafw00f", base_url, "-o", "-"],
timeout=60
)
if output and "No WAF" not in output:
results["technologies"].append({"source": "wafw00f", "data": output.strip()})
await self.log("warning", f"WAF detected: {output[:100]}")
return results
async def _web_crawl(self, domain: str, base_url: str) -> Dict:
"""Crawl the website for endpoints"""
results = {"endpoints": [], "js_files": [], "urls": []}
await self.log("info", f"🕷 Crawling {base_url}")
# Try katana (modern, fast)
if self._tool_exists("katana"):
await self.log("debug", "Running katana...")
output = await self._run_command(
["katana", "-u", base_url, "-d", "3", "-silent", "-jc", "-kf", "all"],
timeout=180
)
if output:
for url in output.strip().split("\n"):
if url:
if url.endswith(".js"):
results["js_files"].append(url)
results["endpoints"].append({"url": url, "source": "katana"})
results["urls"].append(url)
# Try gospider
if self._tool_exists("gospider"):
await self.log("debug", "Running gospider...")
output = await self._run_command(
["gospider", "-s", base_url, "-d", "2", "-t", "5", "--no-redirect", "-q"],
timeout=180
)
if output:
for line in output.strip().split("\n"):
if "[" in line and "]" in line:
parts = line.split(" - ")
if len(parts) > 1:
url = parts[-1].strip()
if url and url.startswith("http"):
if url not in results["urls"]:
results["urls"].append(url)
results["endpoints"].append({"url": url, "source": "gospider"})
# Try hakrawler
if self._tool_exists("hakrawler") and not results["endpoints"]:
await self.log("debug", "Running hakrawler...")
process = await asyncio.create_subprocess_exec(
"hakrawler", "-d", "2", "-u",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await asyncio.wait_for(
process.communicate(input=f"{base_url}\n".encode()),
timeout=120
)
if stdout:
for url in stdout.decode().strip().split("\n"):
if url and url.startswith("http"):
results["urls"].append(url)
results["endpoints"].append({"url": url, "source": "hakrawler"})
await self.log("info", f"✓ Crawled {len(results['endpoints'])} endpoints, {len(results['js_files'])} JS files")
return results
async def _param_discovery(self, domain: str, base_url: str) -> Dict:
"""Discover parameters"""
results = {"parameters": []}
await self.log("info", f"🔎 Discovering parameters for {domain}")
# Try paramspider
if self._tool_exists("paramspider"):
await self.log("debug", "Running paramspider...")
output = await self._run_command(
["paramspider", "-d", domain, "--quiet"],
timeout=120
)
if output:
for url in output.strip().split("\n"):
if url and "?" in url:
results["parameters"].append(url)
# Try arjun
if self._tool_exists("arjun"):
await self.log("debug", "Running arjun...")
output = await self._run_command(
["arjun", "-u", base_url, "--stable", "-oT", "/dev/stdout"],
timeout=180
)
if output:
for line in output.strip().split("\n"):
if ":" in line and line not in results["parameters"]:
results["parameters"].append(line)
return results
async def _js_analysis(self, domain: str, base_url: str) -> Dict:
"""Analyze JavaScript files for secrets and endpoints"""
results = {"secrets": [], "endpoints": [], "js_files": []}
await self.log("info", f"📜 Analyzing JavaScript files")
# Try getJS
if self._tool_exists("getJS"):
await self.log("debug", "Running getJS...")
output = await self._run_command(
["getJS", "-u", base_url, "--complete"],
timeout=60
)
if output:
for js_url in output.strip().split("\n"):
if js_url and js_url.endswith(".js"):
results["js_files"].append(js_url)
return results
async def _directory_fuzz(self, domain: str, base_url: str) -> Dict:
"""Directory fuzzing"""
results = {"endpoints": []}
wordlist = self.wordlists_path / "common.txt"
if not wordlist.exists():
return results
await self.log("info", f"📂 Fuzzing directories on {base_url}")
# Try ffuf (fastest)
if self._tool_exists("ffuf"):
await self.log("debug", "Running ffuf...")
output = await self._run_command(
["ffuf", "-u", f"{base_url}/FUZZ", "-w", str(wordlist),
"-mc", "200,201,204,301,302,307,401,403,405",
"-t", "50", "-o", "-", "-of", "json"],
timeout=180
)
if output:
try:
data = json.loads(output)
for result in data.get("results", []):
results["endpoints"].append({
"url": result.get("url", ""),
"status": result.get("status", 0),
"length": result.get("length", 0),
"source": "ffuf"
})
except:
pass
# Try gobuster
elif self._tool_exists("gobuster"):
await self.log("debug", "Running gobuster...")
output = await self._run_command(
["gobuster", "dir", "-u", base_url, "-w", str(wordlist),
"-t", "50", "-q", "--no-error"],
timeout=180
)
if output:
for line in output.strip().split("\n"):
if line and "(Status:" in line:
parts = line.split()
if parts:
path = parts[0]
results["endpoints"].append({
"url": f"{base_url}{path}",
"path": path,
"source": "gobuster"
})
return results
async def _nuclei_scan(self, domain: str, base_url: str) -> Dict:
"""Run nuclei vulnerability scanner"""
results = {"vulnerabilities": []}
if not self._tool_exists("nuclei"):
return results
await self.log("info", f"☢ Running Nuclei vulnerability scan on {base_url}")
output = await self._run_command(
["nuclei", "-u", base_url, "-severity", "critical,high,medium",
"-silent", "-json", "-c", "25"],
timeout=600
)
if output:
for line in output.strip().split("\n"):
if line:
try:
vuln = json.loads(line)
results["vulnerabilities"].append({
"name": vuln.get("info", {}).get("name", "Unknown"),
"severity": vuln.get("info", {}).get("severity", "unknown"),
"url": vuln.get("matched-at", ""),
"template": vuln.get("template-id", ""),
"description": vuln.get("info", {}).get("description", ""),
"matcher_name": vuln.get("matcher-name", "")
})
await ws_manager.broadcast_vulnerability_found(self.scan_id, {
"title": vuln.get("info", {}).get("name", "Unknown"),
"severity": vuln.get("info", {}).get("severity", "unknown"),
"type": "nuclei",
"endpoint": vuln.get("matched-at", "")
})
severity = vuln.get("info", {}).get("severity", "unknown").upper()
await self.log("warning", f"☢ NUCLEI [{severity}]: {vuln.get('info', {}).get('name')}")
except:
pass
await self.log("info", f"✓ Nuclei found {len(results['vulnerabilities'])} issues")
return results
async def _screenshot_capture(self, domain: str, base_url: str) -> Dict:
"""Capture screenshots of web pages"""
results = {"screenshots": []}
if not self._tool_exists("gowitness"):
return results
await self.log("info", f"📸 Capturing screenshots")
screenshot_dir = self.results_path / "screenshots" / self.scan_id
screenshot_dir.mkdir(parents=True, exist_ok=True)
output = await self._run_command(
["gowitness", "single", base_url, "-P", str(screenshot_dir)],
timeout=60
)
# List captured screenshots
if screenshot_dir.exists():
for f in screenshot_dir.glob("*.png"):
results["screenshots"].append(str(f))
return results
def _merge_results(self, base: Dict, new: Dict) -> Dict:
"""Merge two result dictionaries"""
for key, value in new.items():
if key in base:
if isinstance(value, list):
# Deduplicate while merging
existing = set(str(x) for x in base[key])
for item in value:
if str(item) not in existing:
base[key].append(item)
existing.add(str(item))
elif isinstance(value, dict):
base[key].update(value)
else:
base[key] = value
return base
async def check_tools_installed() -> Dict[str, bool]:
"""Check which recon tools are installed"""
tools = [
# Subdomain enumeration
"subfinder", "amass", "assetfinder", "chaos", "cero",
# DNS
"dnsx", "massdns", "puredns",
# HTTP probing
"httpx", "httprobe",
# URL discovery
"gau", "waybackurls", "katana", "gospider", "hakrawler", "cariddi", "getJS",
# Port scanning
"nmap", "naabu", "rustscan",
# Tech detection
"whatweb", "wafw00f",
# Fuzzing
"ffuf", "gobuster", "dirb", "dirsearch", "wfuzz",
# Parameter discovery
"arjun", "paramspider",
# Vulnerability scanning
"nuclei", "nikto", "sqlmap", "dalfox", "crlfuzz",
# Utilities
"gf", "qsreplace", "unfurl", "anew", "jq",
# Screenshot
"gowitness",
# Network
"curl", "wget", "dig", "whois"
]
results = {}
for tool in tools:
results[tool] = shutil.which(tool) is not None
return results