"""
NeuroSploit v3 - Autonomous AI Security Agent
REAL AI-powered penetration testing agent that:
1. Actually calls Claude/OpenAI API for intelligent analysis
2. Performs comprehensive reconnaissance
3. Tests vulnerabilities with proper verification (no false positives)
4. Generates detailed reports with CVSS, PoC, remediation
"""
import asyncio
import aiohttp
import json
import re
import os
import hashlib
from typing import Dict, List, Any, Optional, Callable, Tuple
from dataclasses import dataclass, field, asdict
from datetime import datetime
from urllib.parse import urljoin, urlparse, parse_qs, urlencode
from enum import Enum
from pathlib import Path
# Try to import anthropic for Claude API
try:
import anthropic
ANTHROPIC_AVAILABLE = True
except ImportError:
ANTHROPIC_AVAILABLE = False
anthropic = None
# Try to import openai
try:
import openai
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
openai = None
class OperationMode(Enum):
"""Agent operation modes"""
RECON_ONLY = "recon_only"
FULL_AUTO = "full_auto"
PROMPT_ONLY = "prompt_only"
ANALYZE_ONLY = "analyze_only"
class FindingSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class CVSSScore:
"""CVSS 3.1 Score"""
score: float
severity: str
vector: str
@dataclass
class Finding:
"""Vulnerability finding with full details"""
id: str
title: str
severity: str
vulnerability_type: str = ""
cvss_score: float = 0.0
cvss_vector: str = ""
cwe_id: str = ""
description: str = ""
affected_endpoint: str = ""
parameter: str = ""
payload: str = ""
evidence: str = ""
request: str = ""
response: str = ""
impact: str = ""
poc_code: str = ""
remediation: str = ""
references: List[str] = field(default_factory=list)
ai_verified: bool = False
confidence: str = "high"
@dataclass
class ReconData:
"""Reconnaissance data"""
subdomains: List[str] = field(default_factory=list)
live_hosts: List[str] = field(default_factory=list)
endpoints: List[Dict] = field(default_factory=list)
parameters: Dict[str, List[str]] = field(default_factory=dict)
technologies: List[str] = field(default_factory=list)
forms: List[Dict] = field(default_factory=list)
js_files: List[str] = field(default_factory=list)
api_endpoints: List[str] = field(default_factory=list)
def _get_endpoint_url(ep) -> str:
"""Safely get URL from endpoint (handles both str and dict)"""
if isinstance(ep, str):
return ep
elif isinstance(ep, dict):
return ep.get("url", "")
return ""
def _get_endpoint_method(ep) -> str:
"""Safely get method from endpoint"""
if isinstance(ep, dict):
return ep.get("method", "GET")
return "GET"
class LLMClient:
"""Unified LLM client for Claude, OpenAI, Ollama, and Gemini"""
# Ollama and LM Studio endpoints
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
LMSTUDIO_URL = os.getenv("LMSTUDIO_URL", "http://localhost:1234")
GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta"
def __init__(self):
self.anthropic_key = os.getenv("ANTHROPIC_API_KEY", "")
self.openai_key = os.getenv("OPENAI_API_KEY", "")
self.google_key = os.getenv("GOOGLE_API_KEY", "")
self.ollama_model = os.getenv("OLLAMA_MODEL", "llama3.2")
self.client = None
self.provider = None
self.error_message = None
self.connection_tested = False
# Validate keys are not placeholder values
if self.anthropic_key in ["", "your-anthropic-api-key"]:
self.anthropic_key = None
if self.openai_key in ["", "your-openai-api-key"]:
self.openai_key = None
if self.google_key in ["", "your-google-api-key"]:
self.google_key = None
# Try providers in order of preference
self._initialize_provider()
def _initialize_provider(self):
"""Initialize the first available LLM provider"""
# 1. Try Claude (Anthropic)
if ANTHROPIC_AVAILABLE and self.anthropic_key:
try:
self.client = anthropic.Anthropic(api_key=self.anthropic_key)
self.provider = "claude"
print("[LLM] Claude API initialized successfully")
return
except Exception as e:
self.error_message = f"Claude init error: {e}"
print(f"[LLM] Claude initialization failed: {e}")
# 2. Try OpenAI
if OPENAI_AVAILABLE and self.openai_key:
try:
self.client = openai.OpenAI(api_key=self.openai_key)
self.provider = "openai"
print("[LLM] OpenAI API initialized successfully")
return
except Exception as e:
self.error_message = f"OpenAI init error: {e}"
print(f"[LLM] OpenAI initialization failed: {e}")
# 3. Try Google Gemini
if self.google_key:
self.client = "gemini" # Placeholder - uses HTTP requests
self.provider = "gemini"
print("[LLM] Gemini API initialized")
return
# 4. Try Ollama (local)
if self._check_ollama():
self.client = "ollama" # Placeholder - uses HTTP requests
self.provider = "ollama"
print(f"[LLM] Ollama initialized with model: {self.ollama_model}")
return
# 5. Try LM Studio (local)
if self._check_lmstudio():
self.client = "lmstudio" # Placeholder - uses HTTP requests
self.provider = "lmstudio"
print("[LLM] LM Studio initialized")
return
# No provider available
self._set_no_provider_error()
def _check_ollama(self) -> bool:
"""Check if Ollama is running locally"""
try:
import requests
response = requests.get(f"{self.OLLAMA_URL}/api/tags", timeout=2)
return response.status_code == 200
except Exception:
return False
def _check_lmstudio(self) -> bool:
"""Check if LM Studio is running locally"""
try:
import requests
response = requests.get(f"{self.LMSTUDIO_URL}/v1/models", timeout=2)
return response.status_code == 200
except Exception:
return False
def _set_no_provider_error(self):
"""Set appropriate error message when no provider is available"""
errors = []
if not ANTHROPIC_AVAILABLE and not OPENAI_AVAILABLE:
errors.append("LLM libraries not installed (run: pip install anthropic openai)")
if not self.anthropic_key and not self.openai_key and not self.google_key:
errors.append("No API keys configured")
if not self._check_ollama():
errors.append("Ollama not running locally")
if not self._check_lmstudio():
errors.append("LM Studio not running locally")
self.error_message = "No LLM provider available. " + "; ".join(errors)
print(f"[LLM] WARNING: {self.error_message}")
def is_available(self) -> bool:
return self.client is not None
def get_status(self) -> dict:
"""Get LLM status for debugging"""
return {
"available": self.is_available(),
"provider": self.provider,
"error": self.error_message,
"anthropic_lib": ANTHROPIC_AVAILABLE,
"openai_lib": OPENAI_AVAILABLE,
"ollama_available": self._check_ollama(),
"lmstudio_available": self._check_lmstudio(),
"has_google_key": bool(self.google_key)
}
async def test_connection(self) -> Tuple[bool, str]:
"""Test if the API connection is working"""
if not self.client:
return False, self.error_message or "No LLM client configured"
try:
# Simple test prompt
result = await self.generate("Say 'OK' if you can hear me.", max_tokens=10)
if result:
self.connection_tested = True
return True, f"Connected to {self.provider}"
return False, f"Empty response from {self.provider}"
except Exception as e:
return False, f"Connection test failed for {self.provider}: {str(e)}"
async def generate(self, prompt: str, system: str = "", max_tokens: int = 4096) -> str:
"""Generate response from LLM"""
if not self.client:
raise LLMConnectionError(self.error_message or "No LLM provider available")
default_system = "You are an expert penetration tester and security researcher. Provide accurate, technical, and actionable security analysis. Be precise and avoid false positives."
try:
if self.provider == "claude":
message = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=max_tokens,
system=system or default_system,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
elif self.provider == "openai":
response = self.client.chat.completions.create(
model="gpt-4-turbo-preview",
max_tokens=max_tokens,
messages=[
{"role": "system", "content": system or default_system},
{"role": "user", "content": prompt}
]
)
return response.choices[0].message.content
elif self.provider == "gemini":
return await self._generate_gemini(prompt, system or default_system, max_tokens)
elif self.provider == "ollama":
return await self._generate_ollama(prompt, system or default_system)
elif self.provider == "lmstudio":
return await self._generate_lmstudio(prompt, system or default_system, max_tokens)
except LLMConnectionError:
raise
except Exception as e:
error_msg = str(e)
print(f"[LLM] Error from {self.provider}: {error_msg}")
raise LLMConnectionError(f"API call failed ({self.provider}): {error_msg}")
return ""
async def _generate_gemini(self, prompt: str, system: str, max_tokens: int) -> str:
"""Generate using Google Gemini API"""
import aiohttp
url = f"{self.GEMINI_URL}/models/gemini-pro:generateContent?key={self.google_key}"
payload = {
"contents": [{"parts": [{"text": f"{system}\n\n{prompt}"}]}],
"generationConfig": {"maxOutputTokens": max_tokens}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=60)) as response:
if response.status != 200:
error_text = await response.text()
raise LLMConnectionError(f"Gemini API error ({response.status}): {error_text}")
data = await response.json()
return data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
async def _generate_ollama(self, prompt: str, system: str) -> str:
"""Generate using local Ollama"""
import aiohttp
url = f"{self.OLLAMA_URL}/api/generate"
payload = {
"model": self.ollama_model,
"prompt": prompt,
"system": system,
"stream": False
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response:
if response.status != 200:
error_text = await response.text()
raise LLMConnectionError(f"Ollama error ({response.status}): {error_text}")
data = await response.json()
return data.get("response", "")
async def _generate_lmstudio(self, prompt: str, system: str, max_tokens: int) -> str:
"""Generate using LM Studio (OpenAI-compatible)"""
import aiohttp
url = f"{self.LMSTUDIO_URL}/v1/chat/completions"
payload = {
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": prompt}
],
"max_tokens": max_tokens,
"stream": False
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=120)) as response:
if response.status != 200:
error_text = await response.text()
raise LLMConnectionError(f"LM Studio error ({response.status}): {error_text}")
data = await response.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
class LLMConnectionError(Exception):
"""Exception raised when LLM connection fails"""
pass
class AutonomousAgent:
"""
AI-Powered Autonomous Security Agent
Performs real security testing with AI-powered analysis
"""
# Comprehensive payload sets for testing
PAYLOADS = {
"sqli": [
"'", "\"", "' OR '1'='1", "\" OR \"1\"=\"1", "' OR 1=1--",
"admin'--", "1' AND '1'='1", "1 AND 1=1", "' UNION SELECT NULL--",
"1' AND SLEEP(5)--", "1' WAITFOR DELAY '0:0:5'--",
"1'; DROP TABLE users--", "' OR ''='", "1' ORDER BY 1--"
],
"xss": [
"", "
",
"\">", "'-alert('XSS')-'",
"