mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-06-30 16:25:48 +02:00
55af0d4634
Re-model the pentest agent into an autonomous, markdown-driven engine that turns a URL into a full engagement and delegates execution to a locally installed agentic CLI backend. Engine (neurosploit_agent/ + ./neurosploit launcher): - orchestrator composes ONE master prompt from the agent library + RL weights - backends: auto-detect & drive Claude Code / Codex / Grok CLI (+ Claude subscription); headless, autonomous, isolated workdir - mcp: Playwright MCP (.mcp.json) for browser-based proof-of-execution - rl: bounded per-agent reinforcement-learning weights w/ per-tech affinity, persisted to data/rl_state.json - models: latest registry incl. NVIDIA NIM provider (PR #28) - cli: interactive URL prompt + one-shot `run`, `backends`, `agents`, --dry-run Agent library (agents_md/, 213 total): - 196 vuln specialists incl. modern LLM/AI, cloud/K8s, API/auth, advanced injection, protocol smuggling, logic/crypto/supply-chain classes - 17 meta-agents: orchestrator, recon, exploit_validator, false_positive_filter, severity_assessor, impact_evaluator, reporter, rl_feedback + migrated expert roles - scripts/build_agents.py data-driven builder; REGISTRY.md index Docs: rewritten README.md, v3.3.0 RELEASE.md, .env.example (NVIDIA NIM, xAI, engine vars). Retire legacy Python orchestration (neurosploit.py + agent classes) to legacy/. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
149 lines
5.3 KiB
Python
Executable File
149 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Network Reconnaissance Agent - Network-focused information gathering and enumeration
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
from typing import Dict, List
|
|
import logging
|
|
from core.llm_manager import LLMManager
|
|
from tools.recon import (
|
|
NetworkScanner,
|
|
OSINTCollector,
|
|
DNSEnumerator,
|
|
SubdomainFinder
|
|
)
|
|
from urllib.parse import urlparse # Added import
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class NetworkReconAgent:
|
|
"""Agent responsible for network-focused reconnaissance and information gathering"""
|
|
|
|
def __init__(self, config: Dict):
|
|
"""Initialize network reconnaissance agent"""
|
|
self.config = config
|
|
self.llm = LLMManager(config)
|
|
self.network_scanner = NetworkScanner(config)
|
|
self.osint = OSINTCollector(config)
|
|
self.dns_enum = DNSEnumerator(config)
|
|
self.subdomain_finder = SubdomainFinder(config)
|
|
|
|
logger.info("NetworkReconAgent initialized")
|
|
|
|
def execute(self, target: str, context: Dict) -> Dict:
|
|
"""Execute network reconnaissance phase"""
|
|
logger.info(f"Starting network reconnaissance on {target}")
|
|
|
|
results = {
|
|
"target": target,
|
|
"status": "running",
|
|
"findings": [],
|
|
"network_scan": {},
|
|
"osint": {},
|
|
"dns": {},
|
|
"subdomains": [],
|
|
"ai_analysis": {}
|
|
}
|
|
|
|
# Parse target to extract hostname if it's a URL
|
|
parsed_target = urlparse(target)
|
|
target_host = parsed_target.hostname or target # Use hostname if exists, otherwise original target
|
|
logger.info(f"Target for network tools: {target_host}")
|
|
|
|
try:
|
|
# Phase 1: Network Scanning
|
|
logger.info("Phase 1: Network scanning")
|
|
results["network_scan"] = self.network_scanner.scan(target_host) # Use target_host
|
|
|
|
# Phase 2: DNS Enumeration
|
|
logger.info("Phase 2: DNS enumeration")
|
|
results["dns"] = self.dns_enum.enumerate(target_host) # Use target_host
|
|
|
|
# Phase 3: Subdomain Discovery
|
|
logger.info("Phase 3: Subdomain discovery")
|
|
results["subdomains"] = self.subdomain_finder.find(target_host) # Use target_host
|
|
|
|
# Phase 4: OSINT Collection
|
|
logger.info("Phase 4: OSINT collection")
|
|
results["osint"] = self.osint.collect(target_host) # Use target_host
|
|
|
|
# Phase 5: AI Analysis
|
|
logger.info("Phase 5: AI-powered analysis")
|
|
results["ai_analysis"] = self._ai_analysis(results)
|
|
|
|
results["status"] = "completed"
|
|
logger.info("Network reconnaissance phase completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during network reconnaissance: {e}")
|
|
results["status"] = "error"
|
|
results["error"] = str(e)
|
|
|
|
return results
|
|
|
|
def _ai_analysis(self, recon_data: Dict) -> Dict:
|
|
"""Use AI to analyze reconnaissance data"""
|
|
prompt = self.llm.get_prompt(
|
|
"network_recon",
|
|
"ai_analysis_user",
|
|
default=f"""
|
|
Analyze the following network reconnaissance data and provide insights:
|
|
|
|
{json.dumps(recon_data, indent=2)}
|
|
|
|
Provide:
|
|
1. Attack surface summary
|
|
2. Prioritized network target list
|
|
3. Identified network vulnerabilities or misconfigurations
|
|
4. Recommended next steps for network exploitation
|
|
5. Network risk assessment
|
|
6. Stealth considerations for network activities
|
|
|
|
Response in JSON format with actionable recommendations.
|
|
"""
|
|
)
|
|
|
|
system_prompt = self.llm.get_prompt(
|
|
"network_recon",
|
|
"ai_analysis_system",
|
|
default="""You are an expert network penetration tester analyzing reconnaissance data.
|
|
Identify network security weaknesses, network attack vectors, and provide strategic recommendations.
|
|
Consider both technical and operational security aspects."""
|
|
)
|
|
|
|
try:
|
|
# Format the user prompt with recon_data
|
|
formatted_prompt = prompt.format(recon_data_json=json.dumps(recon_data, indent=2))
|
|
response = self.llm.generate(formatted_prompt, system_prompt)
|
|
return json.loads(response)
|
|
except Exception as e:
|
|
logger.error(f"AI analysis error: {e}")
|
|
return {"error": str(e), "raw_response": response if 'response' in locals() else None}
|
|
|
|
def passive_recon(self, target: str) -> Dict:
|
|
"""Perform passive reconnaissance only"""
|
|
# Parse target to extract hostname if it's a URL
|
|
parsed_target = urlparse(target)
|
|
target_host = parsed_target.hostname or target
|
|
|
|
return {
|
|
"osint": self.osint.collect(target_host), # Use target_host
|
|
"dns": self.dns_enum.enumerate(target_host), # Use target_host
|
|
"subdomains": self.subdomain_finder.find(target_host) # Use target_host
|
|
}
|
|
|
|
def active_recon(self, target: str) -> Dict:
|
|
"""Perform active reconnaissance"""
|
|
# Parse target to extract hostname if it's a URL
|
|
parsed_target = urlparse(target)
|
|
target_host = parsed_target.hostname or target
|
|
|
|
return {
|
|
"network_scan": self.network_scanner.scan(target_host) # Use target_host
|
|
}
|
|
|