mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-16 15:52:44 +00:00
1094 lines
37 KiB
Python
1094 lines
37 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Reconnaissance Tools - Network scanning, web recon, OSINT, DNS enumeration
|
|
Ferramentas avancadas de recon com suporte a multiplas ferramentas externas
|
|
"""
|
|
|
|
import subprocess
|
|
import json
|
|
import re
|
|
import socket
|
|
import requests
|
|
import shutil
|
|
import os
|
|
import concurrent.futures
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
import logging
|
|
from urllib.parse import urlparse, parse_qs
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import dns.resolver
|
|
except ImportError:
|
|
dns = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def check_tool(tool_name: str) -> Tuple[bool, Optional[str]]:
|
|
"""Verifica se uma ferramenta esta instalada."""
|
|
path = shutil.which(tool_name)
|
|
return (path is not None, path)
|
|
|
|
|
|
def run_tool(cmd: List[str], timeout: int = 300) -> Dict:
|
|
"""Executa uma ferramenta e retorna o resultado."""
|
|
result = {
|
|
"tool": cmd[0] if cmd else "unknown",
|
|
"command": " ".join(cmd),
|
|
"success": False,
|
|
"stdout": "",
|
|
"stderr": "",
|
|
"exit_code": -1
|
|
}
|
|
|
|
if not shutil.which(cmd[0]):
|
|
result["stderr"] = f"Tool '{cmd[0]}' not found"
|
|
return result
|
|
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
result["stdout"] = proc.stdout
|
|
result["stderr"] = proc.stderr
|
|
result["exit_code"] = proc.returncode
|
|
result["success"] = proc.returncode == 0
|
|
except subprocess.TimeoutExpired:
|
|
result["stderr"] = f"Timeout after {timeout}s"
|
|
except Exception as e:
|
|
result["stderr"] = str(e)
|
|
|
|
return result
|
|
|
|
|
|
class NetworkScanner:
|
|
"""Network scanning and port enumeration"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.nmap_path = config.get('tools', {}).get('nmap', '/usr/bin/nmap')
|
|
|
|
def scan(self, target: str) -> Dict:
|
|
"""Perform comprehensive network scan"""
|
|
logger.info(f"Scanning target: {target}")
|
|
|
|
results = {
|
|
"target": target,
|
|
"hosts": {},
|
|
"summary": {}
|
|
}
|
|
|
|
try:
|
|
# Quick scan for open ports
|
|
quick_scan = self._nmap_scan(target, "-sS -T4 -p-")
|
|
results["hosts"].update(quick_scan)
|
|
|
|
# Service version detection
|
|
if results["hosts"]:
|
|
version_scan = self._nmap_scan(target, "-sV -sC")
|
|
results["hosts"].update(version_scan)
|
|
|
|
# Vulnerability scan
|
|
vuln_scan = self._nmap_vuln_scan(target)
|
|
results["vulnerabilities"] = vuln_scan
|
|
|
|
results["summary"] = self._generate_summary(results["hosts"])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Network scan error: {e}")
|
|
results["error"] = str(e)
|
|
|
|
return results
|
|
|
|
def _nmap_scan(self, target: str, options: str) -> Dict:
|
|
"""Execute nmap scan"""
|
|
try:
|
|
cmd = f"{self.nmap_path} {options} {target} -oX -"
|
|
result = subprocess.run(
|
|
cmd.split(),
|
|
capture_output=True,
|
|
timeout=300,
|
|
text=True
|
|
)
|
|
|
|
return self._parse_nmap_output(result.stdout)
|
|
except Exception as e:
|
|
logger.error(f"Nmap scan error: {e}")
|
|
return {}
|
|
|
|
def _nmap_vuln_scan(self, target: str) -> List[Dict]:
|
|
"""Scan for vulnerabilities using NSE scripts"""
|
|
try:
|
|
cmd = f"{self.nmap_path} --script vuln {target}"
|
|
result = subprocess.run(
|
|
cmd.split(),
|
|
capture_output=True,
|
|
timeout=600,
|
|
text=True
|
|
)
|
|
|
|
return self._parse_vuln_output(result.stdout)
|
|
except Exception as e:
|
|
logger.error(f"Vulnerability scan error: {e}")
|
|
return []
|
|
|
|
def _parse_nmap_output(self, output: str) -> Dict:
|
|
"""Parse nmap XML output"""
|
|
hosts = {}
|
|
|
|
# Simple parsing - in production, use proper XML parser
|
|
ip_pattern = r'(\d+\.\d+\.\d+\.\d+)'
|
|
port_pattern = r'(\d+)/tcp\s+open\s+(\S+)'
|
|
|
|
current_ip = None
|
|
for line in output.split('\n'):
|
|
ip_match = re.search(ip_pattern, line)
|
|
if ip_match and 'Nmap scan report' in line:
|
|
current_ip = ip_match.group(1)
|
|
hosts[current_ip] = {
|
|
"ip": current_ip,
|
|
"open_ports": [],
|
|
"os": "unknown"
|
|
}
|
|
|
|
port_match = re.search(port_pattern, line)
|
|
if port_match and current_ip:
|
|
hosts[current_ip]["open_ports"].append({
|
|
"port": int(port_match.group(1)),
|
|
"service": port_match.group(2),
|
|
"version": "unknown"
|
|
})
|
|
|
|
return hosts
|
|
|
|
def _parse_vuln_output(self, output: str) -> List[Dict]:
|
|
"""Parse vulnerability scan output"""
|
|
vulnerabilities = []
|
|
|
|
# Extract CVEs and vulnerability info
|
|
cve_pattern = r'(CVE-\d{4}-\d+)'
|
|
for match in re.finditer(cve_pattern, output):
|
|
vulnerabilities.append({
|
|
"cve": match.group(1),
|
|
"severity": "unknown"
|
|
})
|
|
|
|
return vulnerabilities
|
|
|
|
def _generate_summary(self, hosts: Dict) -> Dict:
|
|
"""Generate scan summary"""
|
|
total_hosts = len(hosts)
|
|
total_ports = sum(len(h.get("open_ports", [])) for h in hosts.values())
|
|
services = set()
|
|
|
|
for host in hosts.values():
|
|
for port in host.get("open_ports", []):
|
|
services.add(port.get("service"))
|
|
|
|
return {
|
|
"total_hosts": total_hosts,
|
|
"total_open_ports": total_ports,
|
|
"unique_services": list(services)
|
|
}
|
|
|
|
|
|
class WebRecon:
|
|
"""Web application reconnaissance"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
|
|
def analyze(self, url: str) -> Dict:
|
|
"""Analyze web application"""
|
|
logger.info(f"Analyzing web application: {url}")
|
|
|
|
results = {
|
|
"url": url,
|
|
"technologies": [],
|
|
"headers": {},
|
|
"security_headers": {},
|
|
"endpoints": [],
|
|
"forms": [],
|
|
"vulnerabilities": []
|
|
}
|
|
|
|
try:
|
|
# Technology detection
|
|
results["technologies"] = self._detect_technologies(url)
|
|
|
|
# Header analysis
|
|
results["headers"], results["security_headers"] = self._analyze_headers(url)
|
|
|
|
# Endpoint discovery
|
|
results["endpoints"] = self._discover_endpoints(url)
|
|
|
|
# Form detection
|
|
results["forms"] = self._detect_forms(url)
|
|
|
|
# Quick vulnerability checks
|
|
results["vulnerabilities"] = self._check_vulnerabilities(url)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Web recon error: {e}")
|
|
results["error"] = str(e)
|
|
|
|
return results
|
|
|
|
def _detect_technologies(self, url: str) -> List[str]:
|
|
"""Detect web technologies"""
|
|
technologies = []
|
|
|
|
try:
|
|
response = requests.get(url, timeout=10, verify=False)
|
|
|
|
# Check headers for technology indicators
|
|
server = response.headers.get('Server', '')
|
|
if server:
|
|
technologies.append(f"Server: {server}")
|
|
|
|
x_powered_by = response.headers.get('X-Powered-By', '')
|
|
if x_powered_by:
|
|
technologies.append(f"X-Powered-By: {x_powered_by}")
|
|
|
|
# Check content for framework indicators
|
|
content = response.text.lower()
|
|
if 'wordpress' in content:
|
|
technologies.append("WordPress")
|
|
if 'joomla' in content:
|
|
technologies.append("Joomla")
|
|
if 'drupal' in content:
|
|
technologies.append("Drupal")
|
|
if 'django' in content:
|
|
technologies.append("Django")
|
|
if 'laravel' in content:
|
|
technologies.append("Laravel")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Technology detection error: {e}")
|
|
|
|
return technologies
|
|
|
|
def _analyze_headers(self, url: str) -> tuple:
|
|
"""Analyze HTTP headers"""
|
|
headers = {}
|
|
security_headers = {}
|
|
|
|
try:
|
|
response = requests.head(url, timeout=10, verify=False)
|
|
headers = dict(response.headers)
|
|
|
|
# Check for security headers
|
|
security_checks = [
|
|
'X-Frame-Options',
|
|
'X-Content-Type-Options',
|
|
'Strict-Transport-Security',
|
|
'Content-Security-Policy',
|
|
'X-XSS-Protection'
|
|
]
|
|
|
|
for header in security_checks:
|
|
if header in headers:
|
|
security_headers[header] = headers[header]
|
|
else:
|
|
security_headers[header] = "Missing"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Header analysis error: {e}")
|
|
|
|
return headers, security_headers
|
|
|
|
def _discover_endpoints(self, url: str) -> List[str]:
|
|
"""Discover endpoints using common paths"""
|
|
endpoints = []
|
|
common_paths = [
|
|
'/admin', '/login', '/api', '/config', '/backup',
|
|
'/admin.php', '/phpinfo.php', '/info.php',
|
|
'/robots.txt', '/sitemap.xml', '/.git', '/.env'
|
|
]
|
|
|
|
parsed = urlparse(url)
|
|
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
|
|
|
for path in common_paths:
|
|
try:
|
|
response = requests.head(
|
|
f"{base_url}{path}",
|
|
timeout=5,
|
|
verify=False,
|
|
allow_redirects=False
|
|
)
|
|
|
|
if response.status_code < 400:
|
|
endpoints.append(path)
|
|
except:
|
|
continue
|
|
|
|
return endpoints
|
|
|
|
def _detect_forms(self, url: str) -> List[Dict]:
|
|
"""Detect forms on webpage"""
|
|
forms = []
|
|
|
|
try:
|
|
response = requests.get(url, timeout=10, verify=False)
|
|
|
|
# Simple form detection
|
|
form_pattern = r'<form[^>]*>(.*?)</form>'
|
|
for match in re.finditer(form_pattern, response.text, re.DOTALL):
|
|
forms.append({
|
|
"action": re.search(r'action=["\']([^"\']+)["\']', match.group(0)),
|
|
"method": re.search(r'method=["\']([^"\']+)["\']', match.group(0))
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Form detection error: {e}")
|
|
|
|
return forms
|
|
|
|
def _check_vulnerabilities(self, url: str) -> List[str]:
|
|
"""Quick vulnerability checks"""
|
|
vulnerabilities = []
|
|
|
|
try:
|
|
# SQL Injection test
|
|
test_url = f"{url}?id=1'"
|
|
response = requests.get(test_url, timeout=10, verify=False)
|
|
if 'sql' in response.text.lower() or 'mysql' in response.text.lower():
|
|
vulnerabilities.append("Potential SQL Injection")
|
|
|
|
# XSS test
|
|
test_url = f"{url}?q=<script>alert(1)</script>"
|
|
response = requests.get(test_url, timeout=10, verify=False)
|
|
if '<script>alert(1)</script>' in response.text:
|
|
vulnerabilities.append("Potential XSS")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Vulnerability check error: {e}")
|
|
|
|
return vulnerabilities
|
|
|
|
|
|
class OSINTCollector:
|
|
"""Open Source Intelligence collection"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
|
|
def collect(self, target: str) -> Dict:
|
|
"""Collect OSINT data"""
|
|
logger.info(f"Collecting OSINT for: {target}")
|
|
|
|
return {
|
|
"target": target,
|
|
"emails": self._find_emails(target),
|
|
"social_media": self._find_social_media(target),
|
|
"data_breaches": self._check_breaches(target),
|
|
"metadata": self._collect_metadata(target)
|
|
}
|
|
|
|
def _find_emails(self, target: str) -> List[str]:
|
|
"""Find email addresses"""
|
|
# Placeholder - would use theHarvester or similar
|
|
return []
|
|
|
|
def _find_social_media(self, target: str) -> Dict:
|
|
"""Find social media profiles"""
|
|
return {}
|
|
|
|
def _check_breaches(self, target: str) -> List[str]:
|
|
"""Check for data breaches"""
|
|
return []
|
|
|
|
def _collect_metadata(self, target: str) -> Dict:
|
|
"""Collect metadata"""
|
|
return {}
|
|
|
|
|
|
class DNSEnumerator:
|
|
"""DNS enumeration"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
|
|
def enumerate(self, domain: str) -> Dict:
|
|
"""Enumerate DNS records"""
|
|
logger.info(f"Enumerating DNS for: {domain}")
|
|
|
|
records = {
|
|
"domain": domain,
|
|
"A": [],
|
|
"AAAA": [],
|
|
"MX": [],
|
|
"NS": [],
|
|
"TXT": [],
|
|
"SOA": []
|
|
}
|
|
|
|
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA']
|
|
|
|
for rtype in record_types:
|
|
try:
|
|
answers = dns.resolver.resolve(domain, rtype)
|
|
records[rtype] = [str(rdata) for rdata in answers]
|
|
except:
|
|
continue
|
|
|
|
return records
|
|
|
|
|
|
class SubdomainFinder:
|
|
"""Subdomain discovery"""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
|
|
def find(self, domain: str) -> List[str]:
|
|
"""Find subdomains"""
|
|
logger.info(f"Finding subdomains for: {domain}")
|
|
|
|
subdomains = []
|
|
common_subdomains = [
|
|
'www', 'mail', 'ftp', 'admin', 'test', 'dev',
|
|
'staging', 'api', 'blog', 'shop', 'portal'
|
|
]
|
|
|
|
for sub in common_subdomains:
|
|
subdomain = f"{sub}.{domain}"
|
|
try:
|
|
socket.gethostbyname(subdomain)
|
|
subdomains.append(subdomain)
|
|
except:
|
|
continue
|
|
|
|
return subdomains
|
|
|
|
|
|
# ============================================================================
|
|
# ADVANCED RECON TOOLS
|
|
# ============================================================================
|
|
|
|
class AdvancedSubdomainEnum:
|
|
"""Advanced subdomain enumeration using multiple tools."""
|
|
|
|
TOOLS = ['subfinder', 'amass', 'assetfinder', 'findomain']
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.timeout = config.get('timeout', 300)
|
|
|
|
def enumerate(self, domain: str) -> Dict:
|
|
"""Enumerate subdomains using all available tools."""
|
|
logger.info(f"[*] Enumerating subdomains for: {domain}")
|
|
print(f"[*] Enumerating subdomains for: {domain}")
|
|
|
|
all_subdomains: Set[str] = set()
|
|
results = {"domain": domain, "subdomains": [], "by_tool": {}}
|
|
|
|
for tool in self.TOOLS:
|
|
installed, _ = check_tool(tool)
|
|
if not installed:
|
|
logger.warning(f" [-] {tool} not installed, skipping...")
|
|
continue
|
|
|
|
print(f" [*] Running {tool}...")
|
|
tool_subs = self._run_tool(tool, domain)
|
|
results["by_tool"][tool] = tool_subs
|
|
all_subdomains.update(tool_subs)
|
|
print(f" [+] {tool}: {len(tool_subs)} subdomains")
|
|
|
|
results["subdomains"] = sorted(list(all_subdomains))
|
|
results["total"] = len(all_subdomains)
|
|
print(f"[+] Total unique subdomains: {len(all_subdomains)}")
|
|
|
|
return results
|
|
|
|
def _run_tool(self, tool: str, domain: str) -> List[str]:
|
|
"""Run a specific tool."""
|
|
subdomains = []
|
|
|
|
if tool == "subfinder":
|
|
result = run_tool(["subfinder", "-d", domain, "-silent"], self.timeout)
|
|
elif tool == "amass":
|
|
result = run_tool(["amass", "enum", "-passive", "-d", domain], self.timeout)
|
|
elif tool == "assetfinder":
|
|
result = run_tool(["assetfinder", "--subs-only", domain], self.timeout)
|
|
elif tool == "findomain":
|
|
result = run_tool(["findomain", "-t", domain, "-q"], self.timeout)
|
|
else:
|
|
return []
|
|
|
|
if result["stdout"]:
|
|
for line in result["stdout"].strip().split('\n'):
|
|
sub = line.strip().lower()
|
|
if sub and domain in sub:
|
|
subdomains.append(sub)
|
|
|
|
return subdomains
|
|
|
|
|
|
class HttpProber:
|
|
"""Check active HTTP hosts using httpx or httprobe."""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.timeout = config.get('timeout', 120)
|
|
|
|
def probe(self, hosts: List[str]) -> Dict:
|
|
"""Check which hosts are alive via HTTP/HTTPS."""
|
|
logger.info(f"[*] Probing {len(hosts)} hosts...")
|
|
print(f"[*] Probing {len(hosts)} hosts via HTTP...")
|
|
|
|
results = {
|
|
"total_input": len(hosts),
|
|
"alive": [],
|
|
"technologies": {},
|
|
"status_codes": {}
|
|
}
|
|
|
|
# Try httpx first (more complete)
|
|
httpx_installed, _ = check_tool("httpx")
|
|
if httpx_installed:
|
|
results = self._run_httpx(hosts)
|
|
else:
|
|
# Fallback to httprobe
|
|
httprobe_installed, _ = check_tool("httprobe")
|
|
if httprobe_installed:
|
|
results = self._run_httprobe(hosts)
|
|
else:
|
|
# Manual fallback with curl
|
|
results = self._manual_probe(hosts)
|
|
|
|
print(f"[+] Alive hosts: {len(results['alive'])}")
|
|
return results
|
|
|
|
def _run_httpx(self, hosts: List[str]) -> Dict:
|
|
"""Run httpx for advanced probing."""
|
|
results = {"total_input": len(hosts), "alive": [], "technologies": {}, "status_codes": {}}
|
|
|
|
# Create temp file with hosts
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
|
f.write('\n'.join(hosts))
|
|
hosts_file = f.name
|
|
|
|
try:
|
|
cmd = ["httpx", "-l", hosts_file, "-silent", "-status-code", "-tech-detect", "-json"]
|
|
result = run_tool(cmd, self.timeout)
|
|
|
|
if result["stdout"]:
|
|
for line in result["stdout"].strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
url = data.get("url", "")
|
|
if url:
|
|
results["alive"].append(url)
|
|
|
|
# Technologies
|
|
techs = data.get("tech", [])
|
|
for tech in techs:
|
|
results["technologies"][tech] = results["technologies"].get(tech, 0) + 1
|
|
|
|
# Status codes
|
|
status = str(data.get("status_code", ""))
|
|
if status:
|
|
results["status_codes"][status] = results["status_codes"].get(status, 0) + 1
|
|
except json.JSONDecodeError:
|
|
continue
|
|
finally:
|
|
os.unlink(hosts_file)
|
|
|
|
return results
|
|
|
|
def _run_httprobe(self, hosts: List[str]) -> Dict:
|
|
"""Run httprobe for basic probing."""
|
|
results = {"total_input": len(hosts), "alive": [], "technologies": {}, "status_codes": {}}
|
|
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
|
f.write('\n'.join(hosts))
|
|
hosts_file = f.name
|
|
|
|
try:
|
|
with open(hosts_file, 'r') as stdin:
|
|
proc = subprocess.run(
|
|
["httprobe"],
|
|
stdin=stdin,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.timeout
|
|
)
|
|
if proc.stdout:
|
|
results["alive"] = [line.strip() for line in proc.stdout.strip().split('\n') if line.strip()]
|
|
except Exception as e:
|
|
logger.error(f"httprobe error: {e}")
|
|
finally:
|
|
os.unlink(hosts_file)
|
|
|
|
return results
|
|
|
|
def _manual_probe(self, hosts: List[str]) -> Dict:
|
|
"""Manual probing using requests."""
|
|
results = {"total_input": len(hosts), "alive": [], "technologies": {}, "status_codes": {}}
|
|
|
|
for host in hosts[:100]: # Limit to avoid long execution
|
|
for scheme in ['https://', 'http://']:
|
|
url = f"{scheme}{host}" if not host.startswith('http') else host
|
|
try:
|
|
resp = requests.head(url, timeout=5, verify=False, allow_redirects=True)
|
|
results["alive"].append(url)
|
|
results["status_codes"][str(resp.status_code)] = results["status_codes"].get(str(resp.status_code), 0) + 1
|
|
break
|
|
except:
|
|
continue
|
|
|
|
return results
|
|
|
|
|
|
class URLCollector:
|
|
"""Collect URLs using gau, waybackurls and waymore."""
|
|
|
|
TOOLS = ['gau', 'waybackurls', 'waymore']
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.timeout = config.get('timeout', 300)
|
|
|
|
def collect(self, domain: str) -> Dict:
|
|
"""Collect URLs from passive sources."""
|
|
logger.info(f"[*] Collecting URLs for: {domain}")
|
|
print(f"[*] Collecting historical URLs for: {domain}")
|
|
|
|
all_urls: Set[str] = set()
|
|
urls_with_params: Set[str] = set()
|
|
js_files: Set[str] = set()
|
|
results = {"domain": domain, "urls": [], "by_tool": {}}
|
|
|
|
for tool in self.TOOLS:
|
|
installed, _ = check_tool(tool)
|
|
if not installed:
|
|
continue
|
|
|
|
print(f" [*] Running {tool}...")
|
|
tool_urls = self._run_tool(tool, domain)
|
|
results["by_tool"][tool] = len(tool_urls)
|
|
|
|
for url in tool_urls:
|
|
all_urls.add(url)
|
|
if '?' in url and '=' in url:
|
|
urls_with_params.add(url)
|
|
if '.js' in url.lower():
|
|
js_files.add(url)
|
|
|
|
print(f" [+] {tool}: {len(tool_urls)} URLs")
|
|
|
|
results["urls"] = list(all_urls)
|
|
results["urls_with_params"] = list(urls_with_params)
|
|
results["js_files"] = list(js_files)
|
|
results["total"] = len(all_urls)
|
|
print(f"[+] Total unique URLs: {len(all_urls)}")
|
|
print(f"[+] URLs with params: {len(urls_with_params)}")
|
|
print(f"[+] JS files: {len(js_files)}")
|
|
|
|
return results
|
|
|
|
def _run_tool(self, tool: str, domain: str) -> List[str]:
|
|
"""Run a specific tool."""
|
|
urls = []
|
|
|
|
if tool == "gau":
|
|
result = run_tool(["gau", domain], self.timeout)
|
|
elif tool == "waybackurls":
|
|
result = run_tool(["waybackurls", domain], self.timeout)
|
|
elif tool == "waymore":
|
|
result = run_tool(["waymore", "-i", domain, "-mode", "U"], self.timeout)
|
|
else:
|
|
return []
|
|
|
|
if result["stdout"]:
|
|
for line in result["stdout"].strip().split('\n'):
|
|
url = line.strip()
|
|
if url and url.startswith(('http://', 'https://')):
|
|
urls.append(url)
|
|
|
|
return urls
|
|
|
|
|
|
class WebCrawler:
|
|
"""Web crawler using katana or gospider."""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.timeout = config.get('timeout', 300)
|
|
self.depth = config.get('crawl_depth', 3)
|
|
|
|
def crawl(self, target: str) -> Dict:
|
|
"""Crawl target to discover URLs, forms, endpoints."""
|
|
logger.info(f"[*] Crawling: {target}")
|
|
print(f"[*] Starting crawl on: {target}")
|
|
|
|
results = {
|
|
"target": target,
|
|
"urls": [],
|
|
"forms": [],
|
|
"js_files": [],
|
|
"api_endpoints": [],
|
|
"params": []
|
|
}
|
|
|
|
# Try katana first
|
|
katana_installed, _ = check_tool("katana")
|
|
if katana_installed:
|
|
results = self._run_katana(target)
|
|
else:
|
|
# Fallback to gospider
|
|
gospider_installed, _ = check_tool("gospider")
|
|
if gospider_installed:
|
|
results = self._run_gospider(target)
|
|
|
|
print(f"[+] URLs discovered: {len(results.get('urls', []))}")
|
|
return results
|
|
|
|
def _run_katana(self, target: str) -> Dict:
|
|
"""Run katana for crawling."""
|
|
results = {"target": target, "urls": [], "forms": [], "js_files": [], "api_endpoints": [], "params": []}
|
|
|
|
cmd = ["katana", "-u", target, "-d", str(self.depth), "-silent", "-jc", "-kf", "all", "-ef", "css,png,jpg,gif,svg,ico"]
|
|
result = run_tool(cmd, self.timeout)
|
|
|
|
if result["stdout"]:
|
|
for line in result["stdout"].strip().split('\n'):
|
|
url = line.strip()
|
|
if not url:
|
|
continue
|
|
|
|
results["urls"].append(url)
|
|
|
|
if '.js' in url.lower():
|
|
results["js_files"].append(url)
|
|
if '/api/' in url.lower() or '/v1/' in url or '/v2/' in url:
|
|
results["api_endpoints"].append(url)
|
|
if '?' in url and '=' in url:
|
|
results["params"].append(url)
|
|
|
|
return results
|
|
|
|
def _run_gospider(self, target: str) -> Dict:
|
|
"""Run gospider for crawling."""
|
|
results = {"target": target, "urls": [], "forms": [], "js_files": [], "api_endpoints": [], "params": []}
|
|
|
|
cmd = ["gospider", "-s", target, "-d", str(self.depth), "--no-redirect", "-q"]
|
|
result = run_tool(cmd, self.timeout)
|
|
|
|
if result["stdout"]:
|
|
for line in result["stdout"].strip().split('\n'):
|
|
# gospider output: [source] URL
|
|
if ' - ' in line:
|
|
url = line.split(' - ')[-1].strip()
|
|
else:
|
|
url = line.strip()
|
|
|
|
if url and url.startswith(('http://', 'https://')):
|
|
results["urls"].append(url)
|
|
|
|
if '.js' in url.lower():
|
|
results["js_files"].append(url)
|
|
if '/api/' in url.lower():
|
|
results["api_endpoints"].append(url)
|
|
if '?' in url:
|
|
results["params"].append(url)
|
|
|
|
return results
|
|
|
|
|
|
class PortScanner:
|
|
"""Port scanner using naabu or nmap."""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.timeout = config.get('timeout', 600)
|
|
|
|
def scan(self, target: str, ports: str = "1-10000") -> Dict:
|
|
"""Port scan on target."""
|
|
logger.info(f"[*] Scanning ports on: {target}")
|
|
print(f"[*] Scanning ports on: {target}")
|
|
|
|
results = {"target": target, "open_ports": [], "by_service": {}}
|
|
|
|
# Try naabu first (faster)
|
|
naabu_installed, _ = check_tool("naabu")
|
|
if naabu_installed:
|
|
results = self._run_naabu(target, ports)
|
|
else:
|
|
# Fallback to nmap
|
|
nmap_installed, _ = check_tool("nmap")
|
|
if nmap_installed:
|
|
results = self._run_nmap(target, ports)
|
|
|
|
print(f"[+] Open ports: {len(results.get('open_ports', []))}")
|
|
return results
|
|
|
|
def _run_naabu(self, target: str, ports: str) -> Dict:
|
|
"""Run naabu for fast port scan."""
|
|
results = {"target": target, "open_ports": [], "by_service": {}}
|
|
|
|
cmd = ["naabu", "-host", target, "-p", ports, "-silent"]
|
|
result = run_tool(cmd, self.timeout)
|
|
|
|
if result["stdout"]:
|
|
for line in result["stdout"].strip().split('\n'):
|
|
line = line.strip()
|
|
if ':' in line:
|
|
host, port = line.rsplit(':', 1)
|
|
try:
|
|
results["open_ports"].append({
|
|
"host": host,
|
|
"port": int(port),
|
|
"protocol": "tcp"
|
|
})
|
|
except ValueError:
|
|
continue
|
|
|
|
return results
|
|
|
|
def _run_nmap(self, target: str, ports: str) -> Dict:
|
|
"""Run nmap for detailed port scan."""
|
|
results = {"target": target, "open_ports": [], "by_service": {}}
|
|
|
|
cmd = ["nmap", "-sS", "-T4", "-p", ports, "--open", target]
|
|
result = run_tool(cmd, self.timeout)
|
|
|
|
if result["stdout"]:
|
|
port_pattern = r"(\d+)/(\w+)\s+open\s+(\S+)"
|
|
for match in re.finditer(port_pattern, result["stdout"]):
|
|
port_info = {
|
|
"host": target,
|
|
"port": int(match.group(1)),
|
|
"protocol": match.group(2),
|
|
"service": match.group(3)
|
|
}
|
|
results["open_ports"].append(port_info)
|
|
results["by_service"][match.group(3)] = results["by_service"].get(match.group(3), 0) + 1
|
|
|
|
return results
|
|
|
|
|
|
class VulnScanner:
|
|
"""Vulnerability scanner using nuclei."""
|
|
|
|
def __init__(self, config: Dict):
|
|
self.config = config
|
|
self.timeout = config.get('timeout', 600)
|
|
|
|
def scan(self, targets: List[str], templates: str = None) -> Dict:
|
|
"""Vulnerability scan on targets."""
|
|
logger.info(f"[*] Scanning vulnerabilities on {len(targets)} targets")
|
|
print(f"[*] Scanning vulnerabilities on {len(targets)} targets...")
|
|
|
|
results = {
|
|
"total_targets": len(targets),
|
|
"vulnerabilities": [],
|
|
"by_severity": {"critical": [], "high": [], "medium": [], "low": [], "info": []}
|
|
}
|
|
|
|
nuclei_installed, _ = check_tool("nuclei")
|
|
if not nuclei_installed:
|
|
print(" [-] nuclei not installed")
|
|
return results
|
|
|
|
# Create temp file with targets
|
|
import tempfile
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
|
|
f.write('\n'.join(targets))
|
|
targets_file = f.name
|
|
|
|
try:
|
|
cmd = ["nuclei", "-l", targets_file, "-silent", "-nc", "-j"]
|
|
if templates:
|
|
cmd.extend(["-t", templates])
|
|
|
|
result = run_tool(cmd, self.timeout)
|
|
|
|
if result["stdout"]:
|
|
for line in result["stdout"].strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
finding = json.loads(line)
|
|
vuln = {
|
|
"template": finding.get("template-id", ""),
|
|
"name": finding.get("info", {}).get("name", ""),
|
|
"severity": finding.get("info", {}).get("severity", "info"),
|
|
"url": finding.get("matched-at", ""),
|
|
"description": finding.get("info", {}).get("description", ""),
|
|
"curl_command": finding.get("curl-command", "")
|
|
}
|
|
results["vulnerabilities"].append(vuln)
|
|
|
|
sev = vuln["severity"].lower()
|
|
if sev in results["by_severity"]:
|
|
results["by_severity"][sev].append(vuln)
|
|
|
|
print(f" [!] [{sev.upper()}] {vuln['name']} - {vuln['url']}")
|
|
except json.JSONDecodeError:
|
|
continue
|
|
finally:
|
|
os.unlink(targets_file)
|
|
|
|
print(f"[+] Vulnerabilities found: {len(results['vulnerabilities'])}")
|
|
return results
|
|
|
|
|
|
class FullReconRunner:
|
|
"""Run all recon tools and consolidate results."""
|
|
|
|
def __init__(self, config: Dict = None):
|
|
self.config = config or {}
|
|
|
|
def run(self, target: str, target_type: str = "domain") -> Dict:
|
|
"""
|
|
Run full recon and return consolidated context.
|
|
|
|
Args:
|
|
target: Target domain or URL
|
|
target_type: Target type (domain, url)
|
|
|
|
Returns:
|
|
Dict with all consolidated results
|
|
"""
|
|
from core.context_builder import ReconContextBuilder
|
|
|
|
print(f"\n{'='*70}")
|
|
print(" NEUROSPLOIT - ADVANCED RECON")
|
|
print(f"{'='*70}")
|
|
print(f"\n[*] Target: {target}")
|
|
print(f"[*] Type: {target_type}\n")
|
|
|
|
# Initialize context builder
|
|
ctx = ReconContextBuilder()
|
|
ctx.set_target(target, target_type)
|
|
|
|
# Extract domain from target
|
|
if target_type == "url":
|
|
parsed = urlparse(target)
|
|
domain = parsed.netloc
|
|
else:
|
|
domain = target
|
|
|
|
# 1. Subdomain Enumeration
|
|
print("\n" + "=" * 50)
|
|
print("[PHASE 1] Subdomain Enumeration")
|
|
print("=" * 50)
|
|
sub_enum = AdvancedSubdomainEnum(self.config)
|
|
sub_results = sub_enum.enumerate(domain)
|
|
ctx.add_subdomains(sub_results.get("subdomains", []))
|
|
ctx.add_tool_result("subdomain_enum", sub_results)
|
|
|
|
# 2. HTTP Probing
|
|
print("\n" + "=" * 50)
|
|
print("[PHASE 2] HTTP Probing")
|
|
print("=" * 50)
|
|
prober = HttpProber(self.config)
|
|
probe_results = prober.probe(sub_results.get("subdomains", [domain]))
|
|
ctx.add_live_hosts(probe_results.get("alive", []))
|
|
ctx.add_technologies(list(probe_results.get("technologies", {}).keys()))
|
|
ctx.add_tool_result("http_probe", probe_results)
|
|
|
|
# 3. URL Collection
|
|
print("\n" + "=" * 50)
|
|
print("[PHASE 3] URL Collection")
|
|
print("=" * 50)
|
|
url_collector = URLCollector(self.config)
|
|
url_results = url_collector.collect(domain)
|
|
ctx.add_urls(url_results.get("urls", []))
|
|
ctx.add_js_files(url_results.get("js_files", []))
|
|
ctx.add_tool_result("url_collection", url_results)
|
|
|
|
# 4. Web Crawling
|
|
print("\n" + "=" * 50)
|
|
print("[PHASE 4] Web Crawling")
|
|
print("=" * 50)
|
|
crawler = WebCrawler(self.config)
|
|
alive_hosts = probe_results.get("alive", [])
|
|
if alive_hosts:
|
|
crawl_target = alive_hosts[0] # Crawl first alive host
|
|
crawl_results = crawler.crawl(crawl_target)
|
|
ctx.add_urls(crawl_results.get("urls", []))
|
|
ctx.add_js_files(crawl_results.get("js_files", []))
|
|
ctx.add_api_endpoints(crawl_results.get("api_endpoints", []))
|
|
ctx.add_tool_result("crawling", crawl_results)
|
|
|
|
# 5. Port Scanning
|
|
print("\n" + "=" * 50)
|
|
print("[PHASE 5] Port Scanning")
|
|
print("=" * 50)
|
|
port_scanner = PortScanner(self.config)
|
|
port_results = port_scanner.scan(domain)
|
|
ctx.add_open_ports(port_results.get("open_ports", []))
|
|
ctx.add_tool_result("port_scan", port_results)
|
|
|
|
# 6. DNS Enumeration
|
|
print("\n" + "=" * 50)
|
|
print("[PHASE 6] DNS Enumeration")
|
|
print("=" * 50)
|
|
dns_enum = DNSEnumerator(self.config)
|
|
dns_results = dns_enum.enumerate(domain)
|
|
dns_records = []
|
|
for rtype, records in dns_results.items():
|
|
if rtype != "domain" and records:
|
|
for r in records:
|
|
dns_records.append(f"[{rtype}] {r}")
|
|
ctx.add_dns_records(dns_records)
|
|
ctx.add_tool_result("dns_enum", dns_results)
|
|
|
|
# 7. Vulnerability Scanning
|
|
print("\n" + "=" * 50)
|
|
print("[PHASE 7] Vulnerability Scanning")
|
|
print("=" * 50)
|
|
vuln_scanner = VulnScanner(self.config)
|
|
scan_targets = probe_results.get("alive", [target])[:20] # Limit to 20
|
|
vuln_results = vuln_scanner.scan(scan_targets)
|
|
|
|
vulns = []
|
|
for v in vuln_results.get("vulnerabilities", []):
|
|
vulns.append({
|
|
"title": v.get("name", ""),
|
|
"severity": v.get("severity", "info"),
|
|
"affected_endpoint": v.get("url", ""),
|
|
"description": v.get("description", "")
|
|
})
|
|
ctx.add_vulnerabilities(vulns)
|
|
ctx.add_tool_result("vuln_scan", vuln_results)
|
|
|
|
# Identify interesting paths
|
|
all_urls = list(ctx.urls)
|
|
ctx.add_interesting_paths(all_urls)
|
|
|
|
# Save consolidated context
|
|
print("\n" + "=" * 50)
|
|
print("[FINAL PHASE] Consolidating Context")
|
|
print("=" * 50)
|
|
saved = ctx.save()
|
|
|
|
print(f"\n{'='*70}")
|
|
print("[+] RECON COMPLETE!")
|
|
print(f" - Subdomains: {len(ctx.subdomains)}")
|
|
print(f" - Live hosts: {len(ctx.live_hosts)}")
|
|
print(f" - URLs: {len(ctx.urls)}")
|
|
print(f" - URLs with params: {len(ctx.urls_with_params)}")
|
|
print(f" - Open ports: {len(ctx.open_ports)}")
|
|
print(f" - Vulnerabilities: {len(ctx.vulnerabilities)}")
|
|
print(f"\n[+] Context saved to: {saved['json']}")
|
|
print(f"{'='*70}\n")
|
|
|
|
return {
|
|
"context": saved["context"],
|
|
"context_file": str(saved["json"]),
|
|
"context_text_file": str(saved["txt"]),
|
|
"context_text": ctx.get_llm_prompt_context()
|
|
}
|