mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-07-01 01:05:31 +02:00
NeuroSploit v3.3.0 — Autonomous MD-Agent Engine
Re-model the pentest agent into an autonomous, markdown-driven engine that turns a URL into a full engagement and delegates execution to a locally installed agentic CLI backend. Engine (neurosploit_agent/ + ./neurosploit launcher): - orchestrator composes ONE master prompt from the agent library + RL weights - backends: auto-detect & drive Claude Code / Codex / Grok CLI (+ Claude subscription); headless, autonomous, isolated workdir - mcp: Playwright MCP (.mcp.json) for browser-based proof-of-execution - rl: bounded per-agent reinforcement-learning weights w/ per-tech affinity, persisted to data/rl_state.json - models: latest registry incl. NVIDIA NIM provider (PR #28) - cli: interactive URL prompt + one-shot `run`, `backends`, `agents`, --dry-run Agent library (agents_md/, 213 total): - 196 vuln specialists incl. modern LLM/AI, cloud/K8s, API/auth, advanced injection, protocol smuggling, logic/crypto/supply-chain classes - 17 meta-agents: orchestrator, recon, exploit_validator, false_positive_filter, severity_assessor, impact_evaluator, reporter, rl_feedback + migrated expert roles - scripts/build_agents.py data-driven builder; REGISTRY.md index Docs: rewritten README.md, v3.3.0 RELEASE.md, .env.example (NVIDIA NIM, xAI, engine vars). Retire legacy Python orchestration (neurosploit.py + agent classes) to legacy/. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Executable
+305
@@ -0,0 +1,305 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Privilege Escalation Agent - System privilege elevation
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Dict, List
|
||||
from core.llm_manager import LLMManager
|
||||
from tools.privesc import (
|
||||
LinuxPrivEsc,
|
||||
WindowsPrivEsc,
|
||||
KernelExploiter,
|
||||
MisconfigFinder,
|
||||
CredentialHarvester,
|
||||
SudoExploiter
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PrivEscAgent:
|
||||
"""Agent responsible for privilege escalation"""
|
||||
|
||||
def __init__(self, config: Dict):
|
||||
"""Initialize privilege escalation agent"""
|
||||
self.config = config
|
||||
self.llm = LLMManager(config)
|
||||
self.linux_privesc = LinuxPrivEsc(config)
|
||||
self.windows_privesc = WindowsPrivEsc(config)
|
||||
self.kernel_exploiter = KernelExploiter(config)
|
||||
self.misconfig_finder = MisconfigFinder(config)
|
||||
self.cred_harvester = CredentialHarvester(config)
|
||||
self.sudo_exploiter = SudoExploiter(config)
|
||||
|
||||
logger.info("PrivEscAgent initialized")
|
||||
|
||||
def execute(self, target: str, context: Dict) -> Dict:
|
||||
"""Execute privilege escalation phase"""
|
||||
logger.info(f"Starting privilege escalation on {target}")
|
||||
|
||||
results = {
|
||||
"target": target,
|
||||
"status": "running",
|
||||
"escalation_paths": [],
|
||||
"successful_escalations": [],
|
||||
"credentials_harvested": [],
|
||||
"system_info": {},
|
||||
"ai_analysis": {}
|
||||
}
|
||||
|
||||
try:
|
||||
# Get exploitation data from context
|
||||
exploit_data = context.get("phases", {}).get("exploitation", {})
|
||||
|
||||
if not exploit_data.get("successful_exploits"):
|
||||
logger.warning("No successful exploits found. Limited privilege escalation options.")
|
||||
results["status"] = "skipped"
|
||||
results["message"] = "No initial access obtained"
|
||||
return results
|
||||
|
||||
# Phase 1: System Enumeration
|
||||
logger.info("Phase 1: System enumeration")
|
||||
results["system_info"] = self._enumerate_system(exploit_data)
|
||||
|
||||
# Phase 2: Identify Escalation Paths
|
||||
logger.info("Phase 2: Identifying escalation paths")
|
||||
results["escalation_paths"] = self._identify_escalation_paths(
|
||||
results["system_info"]
|
||||
)
|
||||
|
||||
# Phase 3: AI-Powered Path Selection
|
||||
logger.info("Phase 3: AI escalation strategy")
|
||||
strategy = self._ai_escalation_strategy(
|
||||
results["system_info"],
|
||||
results["escalation_paths"]
|
||||
)
|
||||
results["ai_analysis"] = strategy
|
||||
|
||||
# Phase 4: Execute Escalation Attempts
|
||||
logger.info("Phase 4: Executing escalation attempts")
|
||||
for path in results["escalation_paths"][:5]:
|
||||
escalation_result = self._attempt_escalation(path, results["system_info"])
|
||||
|
||||
if escalation_result.get("success"):
|
||||
results["successful_escalations"].append(escalation_result)
|
||||
logger.info(f"Successful escalation: {path.get('technique')}")
|
||||
break # Stop after first successful escalation
|
||||
|
||||
# Phase 5: Credential Harvesting
|
||||
if results["successful_escalations"]:
|
||||
logger.info("Phase 5: Harvesting credentials")
|
||||
results["credentials_harvested"] = self._harvest_credentials(
|
||||
results["system_info"]
|
||||
)
|
||||
|
||||
results["status"] = "completed"
|
||||
logger.info("Privilege escalation phase completed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during privilege escalation: {e}")
|
||||
results["status"] = "error"
|
||||
results["error"] = str(e)
|
||||
|
||||
return results
|
||||
|
||||
def _enumerate_system(self, exploit_data: Dict) -> Dict:
|
||||
"""Enumerate system for privilege escalation opportunities"""
|
||||
system_info = {
|
||||
"os": "unknown",
|
||||
"kernel_version": "unknown",
|
||||
"architecture": "unknown",
|
||||
"users": [],
|
||||
"groups": [],
|
||||
"sudo_permissions": [],
|
||||
"suid_binaries": [],
|
||||
"writable_paths": [],
|
||||
"scheduled_tasks": [],
|
||||
"services": [],
|
||||
"environment_variables": {}
|
||||
}
|
||||
|
||||
# Determine OS type from exploit data
|
||||
os_type = self._detect_os_type(exploit_data)
|
||||
system_info["os"] = os_type
|
||||
|
||||
if os_type == "linux":
|
||||
system_info.update(self.linux_privesc.enumerate())
|
||||
elif os_type == "windows":
|
||||
system_info.update(self.windows_privesc.enumerate())
|
||||
|
||||
return system_info
|
||||
|
||||
def _detect_os_type(self, exploit_data: Dict) -> str:
|
||||
"""Detect operating system type"""
|
||||
# Placeholder - would analyze exploit data to determine OS
|
||||
return "linux" # Default assumption
|
||||
|
||||
def _identify_escalation_paths(self, system_info: Dict) -> List[Dict]:
|
||||
"""Identify possible privilege escalation paths"""
|
||||
paths = []
|
||||
os_type = system_info.get("os")
|
||||
|
||||
if os_type == "linux":
|
||||
# SUID exploitation
|
||||
for binary in system_info.get("suid_binaries", []):
|
||||
paths.append({
|
||||
"technique": "suid_exploitation",
|
||||
"target": binary,
|
||||
"difficulty": "medium",
|
||||
"likelihood": 0.6
|
||||
})
|
||||
|
||||
# Sudo exploitation
|
||||
for permission in system_info.get("sudo_permissions", []):
|
||||
paths.append({
|
||||
"technique": "sudo_exploitation",
|
||||
"target": permission,
|
||||
"difficulty": "low",
|
||||
"likelihood": 0.8
|
||||
})
|
||||
|
||||
# Kernel exploitation
|
||||
if system_info.get("kernel_version"):
|
||||
paths.append({
|
||||
"technique": "kernel_exploit",
|
||||
"target": system_info["kernel_version"],
|
||||
"difficulty": "high",
|
||||
"likelihood": 0.4
|
||||
})
|
||||
|
||||
# Writable path exploitation
|
||||
for path in system_info.get("writable_paths", []):
|
||||
if "bin" in path or "sbin" in path:
|
||||
paths.append({
|
||||
"technique": "path_hijacking",
|
||||
"target": path,
|
||||
"difficulty": "medium",
|
||||
"likelihood": 0.5
|
||||
})
|
||||
|
||||
elif os_type == "windows":
|
||||
# Service exploitation
|
||||
for service in system_info.get("services", []):
|
||||
if service.get("unquoted_path") or service.get("weak_permissions"):
|
||||
paths.append({
|
||||
"technique": "service_exploitation",
|
||||
"target": service,
|
||||
"difficulty": "medium",
|
||||
"likelihood": 0.7
|
||||
})
|
||||
|
||||
# AlwaysInstallElevated
|
||||
if system_info.get("always_install_elevated"):
|
||||
paths.append({
|
||||
"technique": "always_install_elevated",
|
||||
"target": "MSI",
|
||||
"difficulty": "low",
|
||||
"likelihood": 0.9
|
||||
})
|
||||
|
||||
# Token impersonation
|
||||
paths.append({
|
||||
"technique": "token_impersonation",
|
||||
"target": "SeImpersonatePrivilege",
|
||||
"difficulty": "medium",
|
||||
"likelihood": 0.6
|
||||
})
|
||||
|
||||
# Sort by likelihood
|
||||
paths.sort(key=lambda x: x.get("likelihood", 0), reverse=True)
|
||||
return paths
|
||||
|
||||
def _ai_escalation_strategy(self, system_info: Dict, escalation_paths: List[Dict]) -> Dict:
|
||||
"""Use AI to optimize escalation strategy"""
|
||||
prompt = self.llm.get_prompt(
|
||||
"privesc",
|
||||
"ai_escalation_strategy_user",
|
||||
default=f"""
|
||||
Analyze the system and recommend optimal privilege escalation strategy:
|
||||
|
||||
System Information:
|
||||
{json.dumps(system_info, indent=2)}
|
||||
|
||||
Identified Escalation Paths:
|
||||
{json.dumps(escalation_paths, indent=2)}
|
||||
|
||||
Provide:
|
||||
1. Recommended escalation path (with justification)
|
||||
2. Step-by-step execution plan
|
||||
3. Required tools and commands
|
||||
4. Detection likelihood and evasion techniques
|
||||
5. Fallback options
|
||||
6. Post-escalation actions
|
||||
|
||||
Response in JSON format with actionable recommendations.
|
||||
"""
|
||||
)
|
||||
|
||||
system_prompt = self.llm.get_prompt(
|
||||
"privesc",
|
||||
"ai_escalation_strategy_system",
|
||||
default="""You are an expert in privilege escalation techniques.
|
||||
Analyze systems and recommend the most effective, stealthy escalation paths.
|
||||
Consider Windows, Linux, and Active Directory environments.
|
||||
Prioritize reliability and minimal detection."""
|
||||
)
|
||||
|
||||
try:
|
||||
formatted_prompt = prompt.format(
|
||||
system_info_json=json.dumps(system_info, indent=2),
|
||||
escalation_paths_json=json.dumps(escalation_paths, indent=2)
|
||||
)
|
||||
response = self.llm.generate(formatted_prompt, system_prompt)
|
||||
return json.loads(response)
|
||||
except Exception as e:
|
||||
logger.error(f"AI escalation strategy error: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
def _attempt_escalation(self, path: Dict, system_info: Dict) -> Dict:
|
||||
"""Attempt privilege escalation using specified path"""
|
||||
technique = path.get("technique")
|
||||
os_type = system_info.get("os")
|
||||
|
||||
result = {
|
||||
"technique": technique,
|
||||
"success": False,
|
||||
"details": {}
|
||||
}
|
||||
|
||||
try:
|
||||
if os_type == "linux":
|
||||
if technique == "suid_exploitation":
|
||||
result = self.linux_privesc.exploit_suid(path.get("target"))
|
||||
elif technique == "sudo_exploitation":
|
||||
result = self.sudo_exploiter.exploit(path.get("target"))
|
||||
elif technique == "kernel_exploit":
|
||||
result = self.kernel_exploiter.exploit_linux(path.get("target"))
|
||||
elif technique == "path_hijacking":
|
||||
result = self.linux_privesc.exploit_path_hijacking(path.get("target"))
|
||||
|
||||
elif os_type == "windows":
|
||||
if technique == "service_exploitation":
|
||||
result = self.windows_privesc.exploit_service(path.get("target"))
|
||||
elif technique == "always_install_elevated":
|
||||
result = self.windows_privesc.exploit_msi()
|
||||
elif technique == "token_impersonation":
|
||||
result = self.windows_privesc.impersonate_token()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Escalation error for {technique}: {e}")
|
||||
result["error"] = str(e)
|
||||
|
||||
return result
|
||||
|
||||
def _harvest_credentials(self, system_info: Dict) -> List[Dict]:
|
||||
"""Harvest credentials after privilege escalation"""
|
||||
os_type = system_info.get("os")
|
||||
|
||||
if os_type == "linux":
|
||||
return self.cred_harvester.harvest_linux()
|
||||
elif os_type == "windows":
|
||||
return self.cred_harvester.harvest_windows()
|
||||
|
||||
return []
|
||||
Reference in New Issue
Block a user