Files
NeuroSploit/agents/privesc_agent.py
2025-12-18 18:18:29 -03:00

306 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Privilege Escalation Agent - System privilege elevation
"""
import json
import logging
from typing import Dict, List
from core.llm_manager import LLMManager
from tools.privesc import (
LinuxPrivEsc,
WindowsPrivEsc,
KernelExploiter,
MisconfigFinder,
CredentialHarvester,
SudoExploiter
)
logger = logging.getLogger(__name__)
class PrivEscAgent:
"""Agent responsible for privilege escalation"""
def __init__(self, config: Dict):
"""Initialize privilege escalation agent"""
self.config = config
self.llm = LLMManager(config)
self.linux_privesc = LinuxPrivEsc(config)
self.windows_privesc = WindowsPrivEsc(config)
self.kernel_exploiter = KernelExploiter(config)
self.misconfig_finder = MisconfigFinder(config)
self.cred_harvester = CredentialHarvester(config)
self.sudo_exploiter = SudoExploiter(config)
logger.info("PrivEscAgent initialized")
def execute(self, target: str, context: Dict) -> Dict:
"""Execute privilege escalation phase"""
logger.info(f"Starting privilege escalation on {target}")
results = {
"target": target,
"status": "running",
"escalation_paths": [],
"successful_escalations": [],
"credentials_harvested": [],
"system_info": {},
"ai_analysis": {}
}
try:
# Get exploitation data from context
exploit_data = context.get("phases", {}).get("exploitation", {})
if not exploit_data.get("successful_exploits"):
logger.warning("No successful exploits found. Limited privilege escalation options.")
results["status"] = "skipped"
results["message"] = "No initial access obtained"
return results
# Phase 1: System Enumeration
logger.info("Phase 1: System enumeration")
results["system_info"] = self._enumerate_system(exploit_data)
# Phase 2: Identify Escalation Paths
logger.info("Phase 2: Identifying escalation paths")
results["escalation_paths"] = self._identify_escalation_paths(
results["system_info"]
)
# Phase 3: AI-Powered Path Selection
logger.info("Phase 3: AI escalation strategy")
strategy = self._ai_escalation_strategy(
results["system_info"],
results["escalation_paths"]
)
results["ai_analysis"] = strategy
# Phase 4: Execute Escalation Attempts
logger.info("Phase 4: Executing escalation attempts")
for path in results["escalation_paths"][:5]:
escalation_result = self._attempt_escalation(path, results["system_info"])
if escalation_result.get("success"):
results["successful_escalations"].append(escalation_result)
logger.info(f"Successful escalation: {path.get('technique')}")
break # Stop after first successful escalation
# Phase 5: Credential Harvesting
if results["successful_escalations"]:
logger.info("Phase 5: Harvesting credentials")
results["credentials_harvested"] = self._harvest_credentials(
results["system_info"]
)
results["status"] = "completed"
logger.info("Privilege escalation phase completed")
except Exception as e:
logger.error(f"Error during privilege escalation: {e}")
results["status"] = "error"
results["error"] = str(e)
return results
def _enumerate_system(self, exploit_data: Dict) -> Dict:
"""Enumerate system for privilege escalation opportunities"""
system_info = {
"os": "unknown",
"kernel_version": "unknown",
"architecture": "unknown",
"users": [],
"groups": [],
"sudo_permissions": [],
"suid_binaries": [],
"writable_paths": [],
"scheduled_tasks": [],
"services": [],
"environment_variables": {}
}
# Determine OS type from exploit data
os_type = self._detect_os_type(exploit_data)
system_info["os"] = os_type
if os_type == "linux":
system_info.update(self.linux_privesc.enumerate())
elif os_type == "windows":
system_info.update(self.windows_privesc.enumerate())
return system_info
def _detect_os_type(self, exploit_data: Dict) -> str:
"""Detect operating system type"""
# Placeholder - would analyze exploit data to determine OS
return "linux" # Default assumption
def _identify_escalation_paths(self, system_info: Dict) -> List[Dict]:
"""Identify possible privilege escalation paths"""
paths = []
os_type = system_info.get("os")
if os_type == "linux":
# SUID exploitation
for binary in system_info.get("suid_binaries", []):
paths.append({
"technique": "suid_exploitation",
"target": binary,
"difficulty": "medium",
"likelihood": 0.6
})
# Sudo exploitation
for permission in system_info.get("sudo_permissions", []):
paths.append({
"technique": "sudo_exploitation",
"target": permission,
"difficulty": "low",
"likelihood": 0.8
})
# Kernel exploitation
if system_info.get("kernel_version"):
paths.append({
"technique": "kernel_exploit",
"target": system_info["kernel_version"],
"difficulty": "high",
"likelihood": 0.4
})
# Writable path exploitation
for path in system_info.get("writable_paths", []):
if "bin" in path or "sbin" in path:
paths.append({
"technique": "path_hijacking",
"target": path,
"difficulty": "medium",
"likelihood": 0.5
})
elif os_type == "windows":
# Service exploitation
for service in system_info.get("services", []):
if service.get("unquoted_path") or service.get("weak_permissions"):
paths.append({
"technique": "service_exploitation",
"target": service,
"difficulty": "medium",
"likelihood": 0.7
})
# AlwaysInstallElevated
if system_info.get("always_install_elevated"):
paths.append({
"technique": "always_install_elevated",
"target": "MSI",
"difficulty": "low",
"likelihood": 0.9
})
# Token impersonation
paths.append({
"technique": "token_impersonation",
"target": "SeImpersonatePrivilege",
"difficulty": "medium",
"likelihood": 0.6
})
# Sort by likelihood
paths.sort(key=lambda x: x.get("likelihood", 0), reverse=True)
return paths
def _ai_escalation_strategy(self, system_info: Dict, escalation_paths: List[Dict]) -> Dict:
"""Use AI to optimize escalation strategy"""
prompt = self.llm.get_prompt(
"privesc",
"ai_escalation_strategy_user",
default=f"""
Analyze the system and recommend optimal privilege escalation strategy:
System Information:
{json.dumps(system_info, indent=2)}
Identified Escalation Paths:
{json.dumps(escalation_paths, indent=2)}
Provide:
1. Recommended escalation path (with justification)
2. Step-by-step execution plan
3. Required tools and commands
4. Detection likelihood and evasion techniques
5. Fallback options
6. Post-escalation actions
Response in JSON format with actionable recommendations.
"""
)
system_prompt = self.llm.get_prompt(
"privesc",
"ai_escalation_strategy_system",
default="""You are an expert in privilege escalation techniques.
Analyze systems and recommend the most effective, stealthy escalation paths.
Consider Windows, Linux, and Active Directory environments.
Prioritize reliability and minimal detection."""
)
try:
formatted_prompt = prompt.format(
system_info_json=json.dumps(system_info, indent=2),
escalation_paths_json=json.dumps(escalation_paths, indent=2)
)
response = self.llm.generate(formatted_prompt, system_prompt)
return json.loads(response)
except Exception as e:
logger.error(f"AI escalation strategy error: {e}")
return {"error": str(e)}
def _attempt_escalation(self, path: Dict, system_info: Dict) -> Dict:
"""Attempt privilege escalation using specified path"""
technique = path.get("technique")
os_type = system_info.get("os")
result = {
"technique": technique,
"success": False,
"details": {}
}
try:
if os_type == "linux":
if technique == "suid_exploitation":
result = self.linux_privesc.exploit_suid(path.get("target"))
elif technique == "sudo_exploitation":
result = self.sudo_exploiter.exploit(path.get("target"))
elif technique == "kernel_exploit":
result = self.kernel_exploiter.exploit_linux(path.get("target"))
elif technique == "path_hijacking":
result = self.linux_privesc.exploit_path_hijacking(path.get("target"))
elif os_type == "windows":
if technique == "service_exploitation":
result = self.windows_privesc.exploit_service(path.get("target"))
elif technique == "always_install_elevated":
result = self.windows_privesc.exploit_msi()
elif technique == "token_impersonation":
result = self.windows_privesc.impersonate_token()
except Exception as e:
logger.error(f"Escalation error for {technique}: {e}")
result["error"] = str(e)
return result
def _harvest_credentials(self, system_info: Dict) -> List[Dict]:
"""Harvest credentials after privilege escalation"""
os_type = system_info.get("os")
if os_type == "linux":
return self.cred_harvester.harvest_linux()
elif os_type == "windows":
return self.cred_harvester.harvest_windows()
return []