Files
NeuroSploit/agents/lateral_agent.py
2025-12-18 18:18:29 -03:00

200 lines
6.9 KiB
Python

#!/usr/bin/env python3
"""
Lateral Movement Agent - Move through the network
"""
import json
import logging
from typing import Dict, List
from core.llm_manager import LLMManager
logger = logging.getLogger(__name__)
class LateralMovementAgent:
"""Agent responsible for lateral movement"""
def __init__(self, config: Dict):
"""Initialize lateral movement agent"""
self.config = config
self.llm = LLMManager(config)
logger.info("LateralMovementAgent initialized")
def execute(self, target: str, context: Dict) -> Dict:
"""Execute lateral movement phase"""
logger.info(f"Starting lateral movement from {target}")
results = {
"target": target,
"status": "running",
"discovered_hosts": [],
"compromised_hosts": [],
"credentials_used": [],
"movement_paths": [],
"ai_analysis": {}
}
try:
# Get previous phase data
recon_data = context.get("phases", {}).get("recon", {})
privesc_data = context.get("phases", {}).get("privilege_escalation", {})
# Phase 1: Network Discovery
logger.info("Phase 1: Internal network discovery")
results["discovered_hosts"] = self._discover_internal_network(recon_data)
# Phase 2: AI-Powered Movement Strategy
logger.info("Phase 2: AI lateral movement strategy")
strategy = self._ai_movement_strategy(context, results["discovered_hosts"])
results["ai_analysis"] = strategy
# Phase 3: Credential Reuse
logger.info("Phase 3: Credential reuse attacks")
credentials = privesc_data.get("credentials_harvested", [])
results["credentials_used"] = self._attempt_credential_reuse(
results["discovered_hosts"],
credentials
)
# Phase 4: Pass-the-Hash/Pass-the-Ticket
logger.info("Phase 4: Pass-the-Hash/Ticket attacks")
results["movement_paths"].extend(
self._pass_the_hash_attacks(results["discovered_hosts"])
)
# Phase 5: Exploit Trust Relationships
logger.info("Phase 5: Exploiting trust relationships")
results["movement_paths"].extend(
self._exploit_trust_relationships(results["discovered_hosts"])
)
results["status"] = "completed"
logger.info("Lateral movement phase completed")
except Exception as e:
logger.error(f"Error during lateral movement: {e}")
results["status"] = "error"
results["error"] = str(e)
return results
def _discover_internal_network(self, recon_data: Dict) -> List[Dict]:
"""Discover internal network hosts"""
hosts = []
# Extract hosts from recon data
network_scan = recon_data.get("network_scan", {})
for ip, data in network_scan.get("hosts", {}).items():
hosts.append({
"ip": ip,
"ports": data.get("open_ports", []),
"os": data.get("os", "unknown")
})
# Simulate additional internal discovery
hosts.extend([
{"ip": "192.168.1.10", "role": "domain_controller", "status": "discovered"},
{"ip": "192.168.1.20", "role": "file_server", "status": "discovered"},
{"ip": "192.168.1.30", "role": "workstation", "status": "discovered"}
])
return hosts
def _ai_movement_strategy(self, context: Dict, hosts: List[Dict]) -> Dict:
"""Use AI to plan lateral movement"""
prompt = self.llm.get_prompt(
"lateral_movement",
"ai_movement_strategy_user",
default=f"""
Plan a lateral movement strategy based on the following:
Current Context:
{json.dumps(context, indent=2)}
Discovered Hosts:
{json.dumps(hosts, indent=2)}
Provide:
1. Target prioritization (high-value targets first)
2. Movement techniques for each target
3. Credential strategies
4. Evasion techniques
5. Attack path optimization
6. Fallback options
Response in JSON format with detailed attack paths.
"""
)
system_prompt = self.llm.get_prompt(
"lateral_movement",
"ai_movement_strategy_system",
default="""You are an expert in lateral movement and Active Directory attacks.
Plan sophisticated movement strategies that minimize detection and maximize impact.
Consider Pass-the-Hash, Pass-the-Ticket, RDP, WMI, PSExec, and other techniques.
Prioritize domain controllers and critical infrastructure."""
)
try:
formatted_prompt = prompt.format(
context_json=json.dumps(context, indent=2),
hosts_json=json.dumps(hosts, indent=2)
)
response = self.llm.generate(formatted_prompt, system_prompt)
return json.loads(response)
except Exception as e:
logger.error(f"AI movement strategy error: {e}")
return {"error": str(e)}
def _attempt_credential_reuse(self, hosts: List[Dict], credentials: List[Dict]) -> List[Dict]:
"""Attempt credential reuse across hosts"""
attempts = []
for host in hosts[:5]: # Limit attempts
for cred in credentials[:3]:
attempts.append({
"host": host.get("ip"),
"credential": "***hidden***",
"protocol": "SMB",
"success": False, # Simulated
"status": "simulated"
})
return attempts
def _pass_the_hash_attacks(self, hosts: List[Dict]) -> List[Dict]:
"""Perform Pass-the-Hash attacks"""
attacks = []
for host in hosts:
if host.get("role") in ["domain_controller", "file_server"]:
attacks.append({
"type": "pass_the_hash",
"target": host.get("ip"),
"technique": "SMB relay",
"success": False, # Simulated
"status": "simulated"
})
return attacks
def _exploit_trust_relationships(self, hosts: List[Dict]) -> List[Dict]:
"""Exploit trust relationships"""
exploits = []
# Domain trust exploitation
exploits.append({
"type": "domain_trust",
"description": "Cross-domain exploitation",
"status": "simulated"
})
# Kerberos delegation
exploits.append({
"type": "kerberos_delegation",
"description": "Unconstrained delegation abuse",
"status": "simulated"
})
return exploits