mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-02-12 14:02:45 +00:00
829 lines
30 KiB
Python
829 lines
30 KiB
Python
"""
|
|
NeuroSploit v3 - Docker Tool Executor
|
|
Executes security tools in isolated Docker containers
|
|
"""
|
|
|
|
import asyncio
|
|
import docker
|
|
import json
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ToolStatus(Enum):
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
TIMEOUT = "timeout"
|
|
|
|
|
|
@dataclass
|
|
class ToolResult:
|
|
"""Result from a tool execution"""
|
|
tool: str
|
|
command: str
|
|
status: ToolStatus
|
|
output: str
|
|
error: str = ""
|
|
findings: List[Dict] = field(default_factory=list)
|
|
duration_seconds: float = 0
|
|
started_at: str = ""
|
|
completed_at: str = ""
|
|
|
|
|
|
class SecurityTool:
|
|
"""Definition of a security tool"""
|
|
|
|
TOOLS = {
|
|
"dirb": {
|
|
"name": "Dirb",
|
|
"description": "Web content scanner",
|
|
"command": "dirb {target} /opt/wordlists/common.txt -o /opt/output/dirb.txt -w",
|
|
"output_file": "/opt/output/dirb.txt",
|
|
"parser": "parse_dirb_output"
|
|
},
|
|
"feroxbuster": {
|
|
"name": "Feroxbuster",
|
|
"description": "Fast content discovery tool",
|
|
"command": "feroxbuster -u {target} -w /opt/wordlists/common.txt -o /opt/output/ferox.txt --json -q",
|
|
"output_file": "/opt/output/ferox.txt",
|
|
"parser": "parse_feroxbuster_output"
|
|
},
|
|
"ffuf": {
|
|
"name": "FFUF",
|
|
"description": "Fast web fuzzer",
|
|
"command": "ffuf -u {target}/FUZZ -w /opt/wordlists/common.txt -o /opt/output/ffuf.json -of json -mc 200,204,301,302,307,401,403",
|
|
"output_file": "/opt/output/ffuf.json",
|
|
"parser": "parse_ffuf_output"
|
|
},
|
|
"gobuster": {
|
|
"name": "Gobuster",
|
|
"description": "Directory/file brute-forcer",
|
|
"command": "gobuster dir -u {target} -w /opt/wordlists/common.txt -o /opt/output/gobuster.txt -q",
|
|
"output_file": "/opt/output/gobuster.txt",
|
|
"parser": "parse_gobuster_output"
|
|
},
|
|
"nmap": {
|
|
"name": "Nmap",
|
|
"description": "Network scanner",
|
|
"command": "nmap -sV -sC -oN /opt/output/nmap.txt {host}",
|
|
"output_file": "/opt/output/nmap.txt",
|
|
"parser": "parse_nmap_output"
|
|
},
|
|
"nuclei": {
|
|
"name": "Nuclei",
|
|
"description": "Vulnerability scanner",
|
|
"command": "nuclei -u {target} -o /opt/output/nuclei.txt -jsonl",
|
|
"output_file": "/opt/output/nuclei.txt",
|
|
"parser": "parse_nuclei_output"
|
|
},
|
|
"nikto": {
|
|
"name": "Nikto",
|
|
"description": "Web server scanner",
|
|
"command": "nikto -h {target} -o /opt/output/nikto.txt -Format txt",
|
|
"output_file": "/opt/output/nikto.txt",
|
|
"parser": "parse_nikto_output"
|
|
},
|
|
"sqlmap": {
|
|
"name": "SQLMap",
|
|
"description": "SQL injection scanner",
|
|
"command": "sqlmap -u {target} --batch --output-dir=/opt/output/sqlmap",
|
|
"output_file": "/opt/output/sqlmap",
|
|
"parser": "parse_sqlmap_output"
|
|
},
|
|
"whatweb": {
|
|
"name": "WhatWeb",
|
|
"description": "Web technology fingerprinting",
|
|
"command": "whatweb {target} -a 3 --log-json=/opt/output/whatweb.json",
|
|
"output_file": "/opt/output/whatweb.json",
|
|
"parser": "parse_whatweb_output"
|
|
},
|
|
"httpx": {
|
|
"name": "HTTPX",
|
|
"description": "HTTP toolkit",
|
|
"command": "echo {target} | httpx -silent -json -o /opt/output/httpx.json -title -tech-detect -status-code",
|
|
"output_file": "/opt/output/httpx.json",
|
|
"parser": "parse_httpx_output"
|
|
},
|
|
"katana": {
|
|
"name": "Katana",
|
|
"description": "Web crawler",
|
|
"command": "katana -u {target} -o /opt/output/katana.txt -jc -d 3",
|
|
"output_file": "/opt/output/katana.txt",
|
|
"parser": "parse_katana_output"
|
|
},
|
|
"subfinder": {
|
|
"name": "Subfinder",
|
|
"description": "Subdomain discovery",
|
|
"command": "subfinder -d {domain} -o /opt/output/subfinder.txt -silent",
|
|
"output_file": "/opt/output/subfinder.txt",
|
|
"parser": "parse_subfinder_output"
|
|
},
|
|
"dalfox": {
|
|
"name": "Dalfox",
|
|
"description": "XSS scanner",
|
|
"command": "dalfox url {target} -o /opt/output/dalfox.txt --silence",
|
|
"output_file": "/opt/output/dalfox.txt",
|
|
"parser": "parse_dalfox_output"
|
|
},
|
|
"naabu": {
|
|
"name": "Naabu",
|
|
"description": "Fast port scanner",
|
|
"command": "naabu -host {host} -json -top-ports 1000 -silent -o /opt/output/naabu.json",
|
|
"output_file": "/opt/output/naabu.json",
|
|
"parser": "parse_naabu_output"
|
|
},
|
|
"dnsx": {
|
|
"name": "DNSX",
|
|
"description": "DNS toolkit",
|
|
"command": "echo {domain} | dnsx -silent -a -aaaa -cname -mx -ns -txt -o /opt/output/dnsx.txt",
|
|
"output_file": "/opt/output/dnsx.txt",
|
|
"parser": "parse_dnsx_output"
|
|
}
|
|
}
|
|
|
|
|
|
class DockerToolExecutor:
|
|
"""Execute security tools in Docker containers"""
|
|
|
|
DOCKER_IMAGE = "neurosploit-tools:latest"
|
|
DEFAULT_TIMEOUT = 300 # 5 minutes
|
|
MAX_OUTPUT_SIZE = 1024 * 1024 # 1MB max output
|
|
|
|
def __init__(self):
|
|
self.client = None
|
|
self.active_containers: Dict[str, Any] = {}
|
|
self._initialized = False
|
|
|
|
async def initialize(self) -> Tuple[bool, str]:
|
|
"""Initialize Docker client and ensure image exists"""
|
|
try:
|
|
self.client = docker.from_env()
|
|
self.client.ping()
|
|
|
|
# Check if tools image exists
|
|
try:
|
|
self.client.images.get(self.DOCKER_IMAGE)
|
|
self._initialized = True
|
|
return True, "Docker initialized with tools image"
|
|
except docker.errors.ImageNotFound:
|
|
# Try to build the image
|
|
logger.info("Building security tools Docker image...")
|
|
return await self._build_tools_image()
|
|
|
|
except docker.errors.DockerException as e:
|
|
return False, f"Docker not available: {str(e)}"
|
|
except Exception as e:
|
|
return False, f"Failed to initialize Docker: {str(e)}"
|
|
|
|
async def _build_tools_image(self) -> Tuple[bool, str]:
|
|
"""Build the security tools Docker image"""
|
|
try:
|
|
dockerfile_path = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
|
"docker", "Dockerfile.tools"
|
|
)
|
|
|
|
if not os.path.exists(dockerfile_path):
|
|
return False, f"Dockerfile not found at {dockerfile_path}"
|
|
|
|
# Build image
|
|
build_path = os.path.dirname(dockerfile_path)
|
|
image, logs = self.client.images.build(
|
|
path=build_path,
|
|
dockerfile="Dockerfile.tools",
|
|
tag=self.DOCKER_IMAGE,
|
|
rm=True
|
|
)
|
|
|
|
self._initialized = True
|
|
return True, "Tools image built successfully"
|
|
|
|
except Exception as e:
|
|
return False, f"Failed to build tools image: {str(e)}"
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if Docker executor is available"""
|
|
return self._initialized and self.client is not None
|
|
|
|
def get_available_tools(self) -> List[Dict]:
|
|
"""Get list of available security tools"""
|
|
return [
|
|
{
|
|
"id": tool_id,
|
|
"name": tool["name"],
|
|
"description": tool["description"]
|
|
}
|
|
for tool_id, tool in SecurityTool.TOOLS.items()
|
|
]
|
|
|
|
async def execute_tool(
|
|
self,
|
|
tool_name: str,
|
|
target: str,
|
|
options: Optional[Dict] = None,
|
|
timeout: int = None
|
|
) -> ToolResult:
|
|
"""Execute a security tool against a target"""
|
|
|
|
if not self.is_available():
|
|
return ToolResult(
|
|
tool=tool_name,
|
|
command="",
|
|
status=ToolStatus.FAILED,
|
|
output="",
|
|
error="Docker executor not initialized"
|
|
)
|
|
|
|
tool_config = SecurityTool.TOOLS.get(tool_name.lower())
|
|
if not tool_config:
|
|
return ToolResult(
|
|
tool=tool_name,
|
|
command="",
|
|
status=ToolStatus.FAILED,
|
|
output="",
|
|
error=f"Unknown tool: {tool_name}"
|
|
)
|
|
|
|
# Parse target URL
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(target)
|
|
host = parsed.netloc or parsed.path
|
|
domain = host.split(':')[0]
|
|
|
|
# Build command
|
|
command = tool_config["command"].format(
|
|
target=target,
|
|
host=host,
|
|
domain=domain
|
|
)
|
|
|
|
# Add custom options
|
|
if options:
|
|
for key, value in options.items():
|
|
command += f" {key} {value}"
|
|
|
|
timeout = timeout or self.DEFAULT_TIMEOUT
|
|
started_at = datetime.utcnow()
|
|
|
|
result = ToolResult(
|
|
tool=tool_name,
|
|
command=command,
|
|
status=ToolStatus.RUNNING,
|
|
output="",
|
|
started_at=started_at.isoformat()
|
|
)
|
|
|
|
container = None
|
|
|
|
try:
|
|
# Create and run container
|
|
container = self.client.containers.run(
|
|
self.DOCKER_IMAGE,
|
|
command=command,
|
|
detach=True,
|
|
remove=False,
|
|
network_mode="bridge",
|
|
mem_limit="512m",
|
|
cpu_period=100000,
|
|
cpu_quota=50000, # 50% CPU
|
|
volumes={},
|
|
environment={
|
|
"TERM": "xterm"
|
|
}
|
|
)
|
|
|
|
container_id = container.id[:12]
|
|
self.active_containers[container_id] = container
|
|
|
|
# Wait for container to finish
|
|
try:
|
|
exit_code = container.wait(timeout=timeout)
|
|
|
|
# Get output
|
|
logs = container.logs(stdout=True, stderr=True)
|
|
output = logs.decode('utf-8', errors='replace')
|
|
|
|
# Truncate if too large
|
|
if len(output) > self.MAX_OUTPUT_SIZE:
|
|
output = output[:self.MAX_OUTPUT_SIZE] + "\n... [output truncated]"
|
|
|
|
# Try to get output file
|
|
try:
|
|
output_file = tool_config.get("output_file")
|
|
if output_file:
|
|
bits, stat = container.get_archive(output_file)
|
|
# Extract file content from tar
|
|
import tarfile
|
|
import io
|
|
tar_stream = io.BytesIO()
|
|
for chunk in bits:
|
|
tar_stream.write(chunk)
|
|
tar_stream.seek(0)
|
|
with tarfile.open(fileobj=tar_stream) as tar:
|
|
for member in tar.getmembers():
|
|
if member.isfile():
|
|
f = tar.extractfile(member)
|
|
if f:
|
|
file_content = f.read().decode('utf-8', errors='replace')
|
|
output = file_content
|
|
except Exception:
|
|
pass # Use container logs as output
|
|
|
|
result.output = output
|
|
result.status = ToolStatus.COMPLETED if exit_code.get('StatusCode', 1) == 0 else ToolStatus.FAILED
|
|
|
|
except Exception as e:
|
|
if "timeout" in str(e).lower() or "read timeout" in str(e).lower():
|
|
result.status = ToolStatus.TIMEOUT
|
|
result.error = f"Tool execution timed out after {timeout}s"
|
|
container.kill()
|
|
else:
|
|
raise
|
|
|
|
except Exception as e:
|
|
result.status = ToolStatus.FAILED
|
|
result.error = str(e)
|
|
logger.error(f"Tool execution failed: {e}")
|
|
|
|
finally:
|
|
# Cleanup container
|
|
if container:
|
|
try:
|
|
container.remove(force=True)
|
|
except Exception:
|
|
pass
|
|
self.active_containers.pop(container.id[:12], None)
|
|
|
|
completed_at = datetime.utcnow()
|
|
result.completed_at = completed_at.isoformat()
|
|
result.duration_seconds = (completed_at - started_at).total_seconds()
|
|
|
|
# Parse findings from output
|
|
if result.status == ToolStatus.COMPLETED and result.output:
|
|
parser_name = tool_config.get("parser")
|
|
if parser_name and hasattr(self, parser_name):
|
|
parser = getattr(self, parser_name)
|
|
result.findings = parser(result.output, target)
|
|
|
|
return result
|
|
|
|
async def kill_container(self, container_id: str) -> bool:
|
|
"""Kill a running container"""
|
|
container = self.active_containers.get(container_id)
|
|
if container:
|
|
try:
|
|
container.kill()
|
|
container.remove(force=True)
|
|
del self.active_containers[container_id]
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
async def cleanup_all(self):
|
|
"""Cleanup all running containers"""
|
|
for container_id in list(self.active_containers.keys()):
|
|
await self.kill_container(container_id)
|
|
|
|
# ==================== Output Parsers ====================
|
|
|
|
def parse_dirb_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse dirb output into findings"""
|
|
findings = []
|
|
|
|
# Match lines like: + http://example.com/admin (CODE:200|SIZE:1234)
|
|
pattern = r'\+ (https?://[^\s]+)\s+\(CODE:(\d+)\|SIZE:(\d+)\)'
|
|
matches = re.findall(pattern, output)
|
|
|
|
for url, code, size in matches:
|
|
severity = "info"
|
|
if "/admin" in url.lower() or "/panel" in url.lower():
|
|
severity = "medium"
|
|
elif ".env" in url or "config" in url.lower() or ".git" in url:
|
|
severity = "high"
|
|
|
|
findings.append({
|
|
"title": f"Directory/File Found: {url.split('/')[-1] or url}",
|
|
"severity": severity,
|
|
"vulnerability_type": "Information Disclosure",
|
|
"description": f"Accessible endpoint discovered at {url}",
|
|
"affected_endpoint": url,
|
|
"evidence": f"HTTP {code}, Size: {size} bytes",
|
|
"remediation": "Review if this endpoint should be publicly accessible"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_feroxbuster_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse feroxbuster JSON output"""
|
|
findings = []
|
|
|
|
for line in output.split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
url = data.get('url', '')
|
|
status = data.get('status', 0)
|
|
|
|
if status in [200, 301, 302, 403]:
|
|
severity = "info"
|
|
if "/admin" in url.lower() or status == 403:
|
|
severity = "medium"
|
|
elif ".env" in url or ".git" in url:
|
|
severity = "high"
|
|
|
|
findings.append({
|
|
"title": f"Endpoint: {url.split('/')[-1] or url}",
|
|
"severity": severity,
|
|
"vulnerability_type": "Information Disclosure",
|
|
"description": f"Discovered endpoint: {url}",
|
|
"affected_endpoint": url,
|
|
"evidence": f"HTTP {status}",
|
|
"remediation": "Review endpoint accessibility"
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return findings
|
|
|
|
def parse_ffuf_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse ffuf JSON output"""
|
|
findings = []
|
|
|
|
try:
|
|
data = json.loads(output)
|
|
results = data.get('results', [])
|
|
|
|
for result in results:
|
|
url = result.get('url', '')
|
|
status = result.get('status', 0)
|
|
length = result.get('length', 0)
|
|
|
|
severity = "info"
|
|
path = url.lower()
|
|
if any(x in path for x in ['/admin', '/panel', '/dashboard']):
|
|
severity = "medium"
|
|
elif any(x in path for x in ['.env', '.git', 'config', 'backup']):
|
|
severity = "high"
|
|
|
|
findings.append({
|
|
"title": f"Found: {url.split('/')[-1]}",
|
|
"severity": severity,
|
|
"vulnerability_type": "Content Discovery",
|
|
"description": f"Discovered: {url}",
|
|
"affected_endpoint": url,
|
|
"evidence": f"HTTP {status}, Length: {length}",
|
|
"remediation": "Review if endpoint should be accessible"
|
|
})
|
|
except json.JSONDecodeError:
|
|
# Fall back to text parsing
|
|
pass
|
|
|
|
return findings
|
|
|
|
def parse_gobuster_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse gobuster output"""
|
|
findings = []
|
|
|
|
for line in output.split('\n'):
|
|
# Match: /admin (Status: 200) [Size: 1234]
|
|
match = re.search(r'(/[^\s]+)\s+\(Status:\s*(\d+)\)', line)
|
|
if match:
|
|
path = match.group(1)
|
|
status = match.group(2)
|
|
url = target.rstrip('/') + path
|
|
|
|
severity = "info"
|
|
if any(x in path.lower() for x in ['/admin', '/panel']):
|
|
severity = "medium"
|
|
elif any(x in path.lower() for x in ['.env', '.git', 'config']):
|
|
severity = "high"
|
|
|
|
findings.append({
|
|
"title": f"Found: {path}",
|
|
"severity": severity,
|
|
"vulnerability_type": "Content Discovery",
|
|
"description": f"Discovered endpoint at {url}",
|
|
"affected_endpoint": url,
|
|
"evidence": f"HTTP {status}",
|
|
"remediation": "Review endpoint accessibility"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_nuclei_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse nuclei JSONL output"""
|
|
findings = []
|
|
|
|
severity_map = {
|
|
"critical": "critical",
|
|
"high": "high",
|
|
"medium": "medium",
|
|
"low": "low",
|
|
"info": "info"
|
|
}
|
|
|
|
for line in output.split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
|
|
findings.append({
|
|
"title": data.get('info', {}).get('name', 'Unknown'),
|
|
"severity": severity_map.get(
|
|
data.get('info', {}).get('severity', 'info'),
|
|
'info'
|
|
),
|
|
"vulnerability_type": data.get('info', {}).get('tags', ['vulnerability'])[0] if data.get('info', {}).get('tags') else 'vulnerability',
|
|
"description": data.get('info', {}).get('description', ''),
|
|
"affected_endpoint": data.get('matched-at', target),
|
|
"evidence": data.get('matcher-name', ''),
|
|
"remediation": data.get('info', {}).get('remediation', 'Review and fix the vulnerability'),
|
|
"references": data.get('info', {}).get('reference', [])
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return findings
|
|
|
|
def parse_nmap_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse nmap output"""
|
|
findings = []
|
|
|
|
# Parse open ports
|
|
port_pattern = r'(\d+)/tcp\s+open\s+(\S+)\s*(.*)?'
|
|
for match in re.finditer(port_pattern, output):
|
|
port = match.group(1)
|
|
service = match.group(2)
|
|
version = match.group(3) or ''
|
|
|
|
severity = "info"
|
|
if service in ['telnet', 'ftp']:
|
|
severity = "medium"
|
|
elif 'vnc' in service.lower() or 'rdp' in service.lower():
|
|
severity = "medium"
|
|
|
|
findings.append({
|
|
"title": f"Open Port: {port}/{service}",
|
|
"severity": severity,
|
|
"vulnerability_type": "Open Port",
|
|
"description": f"Port {port} is open running {service} {version}".strip(),
|
|
"affected_endpoint": f"{target}:{port}",
|
|
"evidence": f"Service: {service}, Version: {version}",
|
|
"remediation": "Review if this port should be exposed"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_nikto_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse nikto output"""
|
|
findings = []
|
|
|
|
# Parse OSVDB entries and other findings
|
|
vuln_pattern = r'\+\s+(\S+):\s+(.+)'
|
|
for match in re.finditer(vuln_pattern, output):
|
|
ref = match.group(1)
|
|
desc = match.group(2)
|
|
|
|
severity = "info"
|
|
if any(x in desc.lower() for x in ['sql', 'injection', 'xss']):
|
|
severity = "high"
|
|
elif any(x in desc.lower() for x in ['outdated', 'vulnerable', 'dangerous']):
|
|
severity = "medium"
|
|
|
|
findings.append({
|
|
"title": f"Nikto: {desc[:50]}...",
|
|
"severity": severity,
|
|
"vulnerability_type": "Web Vulnerability",
|
|
"description": desc,
|
|
"affected_endpoint": target,
|
|
"evidence": ref,
|
|
"remediation": "Review and address the finding"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_sqlmap_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse sqlmap output"""
|
|
findings = []
|
|
|
|
if "is vulnerable" in output.lower() or "sql injection" in output.lower():
|
|
# Extract vulnerable parameter
|
|
param_match = re.search(r"Parameter:\s*(\S+)", output)
|
|
param = param_match.group(1) if param_match else "unknown"
|
|
|
|
findings.append({
|
|
"title": f"SQL Injection: {param}",
|
|
"severity": "critical",
|
|
"vulnerability_type": "SQL Injection",
|
|
"description": f"SQL injection vulnerability found in parameter: {param}",
|
|
"affected_endpoint": target,
|
|
"evidence": "SQLMap confirmed the vulnerability",
|
|
"remediation": "Use parameterized queries and input validation"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_whatweb_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse whatweb JSON output"""
|
|
findings = []
|
|
|
|
try:
|
|
data = json.loads(output)
|
|
if isinstance(data, list) and len(data) > 0:
|
|
result = data[0]
|
|
plugins = result.get('plugins', {})
|
|
|
|
techs = []
|
|
for name, info in plugins.items():
|
|
if name not in ['IP', 'Country']:
|
|
version = info.get('version', [''])[0] if info.get('version') else ''
|
|
techs.append(f"{name} {version}".strip())
|
|
|
|
if techs:
|
|
findings.append({
|
|
"title": "Technology Stack Detected",
|
|
"severity": "info",
|
|
"vulnerability_type": "Information Disclosure",
|
|
"description": f"Detected technologies: {', '.join(techs)}",
|
|
"affected_endpoint": target,
|
|
"evidence": ", ".join(techs),
|
|
"remediation": "Consider hiding version information"
|
|
})
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
return findings
|
|
|
|
def parse_httpx_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse httpx JSON output"""
|
|
findings = []
|
|
|
|
for line in output.split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
|
|
techs = data.get('tech', [])
|
|
title = data.get('title', '')
|
|
status = data.get('status_code', 0)
|
|
|
|
if techs:
|
|
findings.append({
|
|
"title": f"Technologies: {', '.join(techs[:3])}",
|
|
"severity": "info",
|
|
"vulnerability_type": "Technology Detection",
|
|
"description": f"Page title: {title}. Technologies: {', '.join(techs)}",
|
|
"affected_endpoint": data.get('url', target),
|
|
"evidence": f"HTTP {status}",
|
|
"remediation": "Review exposed technology information"
|
|
})
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return findings
|
|
|
|
def parse_katana_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse katana output"""
|
|
findings = []
|
|
endpoints = set()
|
|
|
|
for line in output.split('\n'):
|
|
url = line.strip()
|
|
if url and url.startswith('http'):
|
|
endpoints.add(url)
|
|
|
|
# Group interesting findings
|
|
interesting = [u for u in endpoints if any(x in u.lower() for x in [
|
|
'api', 'admin', 'login', 'upload', 'config', '.php', '.asp'
|
|
])]
|
|
|
|
for url in interesting[:20]: # Limit findings
|
|
findings.append({
|
|
"title": f"Interesting Endpoint: {url.split('/')[-1][:30]}",
|
|
"severity": "info",
|
|
"vulnerability_type": "Endpoint Discovery",
|
|
"description": f"Crawled endpoint: {url}",
|
|
"affected_endpoint": url,
|
|
"evidence": "Discovered via web crawling",
|
|
"remediation": "Review endpoint for security issues"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_subfinder_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse subfinder output"""
|
|
findings = []
|
|
subdomains = [s.strip() for s in output.split('\n') if s.strip()]
|
|
|
|
if subdomains:
|
|
findings.append({
|
|
"title": f"Subdomains Found: {len(subdomains)}",
|
|
"severity": "info",
|
|
"vulnerability_type": "Subdomain Enumeration",
|
|
"description": f"Found {len(subdomains)} subdomains: {', '.join(subdomains[:10])}{'...' if len(subdomains) > 10 else ''}",
|
|
"affected_endpoint": target,
|
|
"evidence": "\n".join(subdomains[:20]),
|
|
"remediation": "Review all subdomains for security"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_dalfox_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse dalfox output"""
|
|
findings = []
|
|
|
|
# Look for XSS findings
|
|
if "POC" in output or "Vulnerable" in output.lower():
|
|
poc_match = re.search(r'POC:\s*(\S+)', output)
|
|
poc = poc_match.group(1) if poc_match else "See output"
|
|
|
|
findings.append({
|
|
"title": "XSS Vulnerability Found",
|
|
"severity": "high",
|
|
"vulnerability_type": "Cross-Site Scripting (XSS)",
|
|
"description": "Dalfox found a potential XSS vulnerability",
|
|
"affected_endpoint": target,
|
|
"evidence": poc,
|
|
"remediation": "Implement proper output encoding and CSP"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_naabu_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse naabu JSON output"""
|
|
findings = []
|
|
ports = []
|
|
|
|
for line in output.split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
host = data.get('host', data.get('ip', ''))
|
|
port = data.get('port', 0)
|
|
ports.append(str(port))
|
|
except json.JSONDecodeError:
|
|
# Text mode: host:port
|
|
match = re.match(r'^(.+?):(\d+)$', line.strip())
|
|
if match:
|
|
ports.append(match.group(2))
|
|
|
|
if ports:
|
|
findings.append({
|
|
"title": f"Open Ports Found: {len(ports)}",
|
|
"severity": "info",
|
|
"vulnerability_type": "Port Discovery",
|
|
"description": f"Found {len(ports)} open ports: {', '.join(ports[:20])}",
|
|
"affected_endpoint": target,
|
|
"evidence": f"Ports: {', '.join(ports)}",
|
|
"remediation": "Review exposed services and close unnecessary ports"
|
|
})
|
|
|
|
return findings
|
|
|
|
def parse_dnsx_output(self, output: str, target: str) -> List[Dict]:
|
|
"""Parse dnsx output"""
|
|
findings = []
|
|
records = [line.strip() for line in output.split('\n') if line.strip()]
|
|
|
|
if records:
|
|
findings.append({
|
|
"title": f"DNS Records: {len(records)}",
|
|
"severity": "info",
|
|
"vulnerability_type": "DNS Enumeration",
|
|
"description": f"DNS records found: {', '.join(records[:10])}",
|
|
"affected_endpoint": target,
|
|
"evidence": "\n".join(records[:20]),
|
|
"remediation": "Review DNS records for security issues"
|
|
})
|
|
|
|
return findings
|
|
|
|
|
|
# Global executor instance
|
|
_executor: Optional[DockerToolExecutor] = None
|
|
|
|
|
|
async def get_tool_executor() -> DockerToolExecutor:
|
|
"""Get or create the global tool executor instance"""
|
|
global _executor
|
|
if _executor is None:
|
|
_executor = DockerToolExecutor()
|
|
await _executor.initialize()
|
|
return _executor
|