Files
2026-01-19 19:21:57 -03:00

288 lines
9.7 KiB
Python

"""
NeuroSploit v3 - Dynamic Vulnerability Engine
The core of NeuroSploit v3: prompt-driven vulnerability testing.
Instead of hardcoded tests, this engine dynamically tests based on
what vulnerabilities are extracted from the user's prompt.
"""
import asyncio
import aiohttp
from typing import List, Dict, Optional, Any
from datetime import datetime
from backend.core.vuln_engine.registry import VulnerabilityRegistry
from backend.core.vuln_engine.payload_generator import PayloadGenerator
from backend.models import Endpoint, Vulnerability, VulnerabilityTest
from backend.schemas.prompt import VulnerabilityTypeExtracted
class TestResult:
"""Result of a vulnerability test"""
def __init__(
self,
vuln_type: str,
is_vulnerable: bool,
confidence: float,
payload: str,
request_data: dict,
response_data: dict,
evidence: Optional[str] = None
):
self.vuln_type = vuln_type
self.is_vulnerable = is_vulnerable
self.confidence = confidence
self.payload = payload
self.request_data = request_data
self.response_data = response_data
self.evidence = evidence
class DynamicVulnerabilityEngine:
"""
Prompt-driven vulnerability testing engine.
Key principles:
1. Tests ONLY what the prompt specifies
2. Generates payloads dynamically based on context
3. Uses multiple detection techniques per vulnerability type
4. Adapts based on target responses
"""
def __init__(self, llm_manager=None):
self.llm_manager = llm_manager
self.registry = VulnerabilityRegistry()
self.payload_generator = PayloadGenerator()
self.session: Optional[aiohttp.ClientSession] = None
self.timeout = aiohttp.ClientTimeout(total=30)
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def test_endpoint(
self,
endpoint: Endpoint,
vuln_types: List[VulnerabilityTypeExtracted],
context: Dict[str, Any],
progress_callback=None
) -> List[TestResult]:
"""
Test an endpoint for specified vulnerability types.
Args:
endpoint: The endpoint to test
vuln_types: List of vulnerability types to test for
context: Additional context (technologies, WAF info, etc.)
progress_callback: Optional callback for progress updates
Returns:
List of test results
"""
results = []
if not self.session:
self.session = aiohttp.ClientSession(timeout=self.timeout)
for vuln in vuln_types:
try:
if progress_callback:
await progress_callback(f"Testing {vuln.type} on {endpoint.url}")
# Get tester for this vulnerability type
tester = self.registry.get_tester(vuln.type)
# Get payloads for this vulnerability and endpoint
payloads = await self.payload_generator.get_payloads(
vuln_type=vuln.type,
endpoint=endpoint,
context=context
)
# Test each payload
for payload in payloads:
result = await self._execute_test(
endpoint=endpoint,
vuln_type=vuln.type,
payload=payload,
tester=tester,
context=context
)
results.append(result)
# If vulnerable, try to get more evidence
if result.is_vulnerable:
deeper_results = await self._deep_test(
endpoint=endpoint,
vuln_type=vuln.type,
initial_result=result,
tester=tester,
context=context
)
results.extend(deeper_results)
break # Found vulnerability, move to next type
except Exception as e:
print(f"Error testing {vuln.type}: {e}")
continue
return results
async def _execute_test(
self,
endpoint: Endpoint,
vuln_type: str,
payload: str,
tester,
context: Dict
) -> TestResult:
"""Execute a single vulnerability test"""
request_data = {
"url": endpoint.url,
"method": endpoint.method,
"payload": payload,
"timestamp": datetime.utcnow().isoformat()
}
try:
# Build the test request
test_url, test_params, test_headers, test_body = tester.build_request(
endpoint=endpoint,
payload=payload
)
# Send the request
async with self.session.request(
method=endpoint.method,
url=test_url,
params=test_params,
headers=test_headers,
data=test_body,
ssl=False,
allow_redirects=False
) as response:
response_text = await response.text()
response_data = {
"status": response.status,
"headers": dict(response.headers),
"body_preview": response_text[:2000] if response_text else "",
"content_length": len(response_text) if response_text else 0
}
# Analyze response for vulnerability
is_vulnerable, confidence, evidence = tester.analyze_response(
payload=payload,
response_status=response.status,
response_headers=dict(response.headers),
response_body=response_text,
context=context
)
return TestResult(
vuln_type=vuln_type,
is_vulnerable=is_vulnerable,
confidence=confidence,
payload=payload,
request_data=request_data,
response_data=response_data,
evidence=evidence
)
except asyncio.TimeoutError:
# Timeout might indicate time-based injection
response_data = {"error": "timeout", "timeout_seconds": self.timeout.total}
is_vulnerable = tester.check_timeout_vulnerability(vuln_type)
return TestResult(
vuln_type=vuln_type,
is_vulnerable=is_vulnerable,
confidence=0.7 if is_vulnerable else 0.0,
payload=payload,
request_data=request_data,
response_data=response_data,
evidence="Request timed out - possible time-based vulnerability" if is_vulnerable else None
)
except Exception as e:
response_data = {"error": str(e)}
return TestResult(
vuln_type=vuln_type,
is_vulnerable=False,
confidence=0.0,
payload=payload,
request_data=request_data,
response_data=response_data,
evidence=None
)
async def _deep_test(
self,
endpoint: Endpoint,
vuln_type: str,
initial_result: TestResult,
tester,
context: Dict
) -> List[TestResult]:
"""
Perform deeper testing after initial vulnerability confirmation.
This helps establish higher confidence and better PoC.
"""
results = []
# Get exploitation payloads
deeper_payloads = await self.payload_generator.get_exploitation_payloads(
vuln_type=vuln_type,
initial_payload=initial_result.payload,
context=context
)
for payload in deeper_payloads[:3]: # Limit to 3 deeper tests
result = await self._execute_test(
endpoint=endpoint,
vuln_type=vuln_type,
payload=payload,
tester=tester,
context=context
)
if result.is_vulnerable:
result.confidence = min(result.confidence + 0.1, 1.0)
results.append(result)
return results
async def create_vulnerability_record(
self,
scan_id: str,
endpoint: Endpoint,
result: TestResult
) -> Vulnerability:
"""Create a vulnerability record from a test result"""
# Get severity based on vulnerability type
severity = self.registry.get_severity(result.vuln_type)
# Get CWE ID
cwe_id = self.registry.get_cwe_id(result.vuln_type)
# Get remediation advice
remediation = self.registry.get_remediation(result.vuln_type)
# Generate title
title = self.registry.get_title(result.vuln_type)
return Vulnerability(
scan_id=scan_id,
title=f"{title} on {endpoint.path or endpoint.url}",
vulnerability_type=result.vuln_type,
severity=severity,
cwe_id=cwe_id,
description=self.registry.get_description(result.vuln_type),
affected_endpoint=endpoint.url,
poc_request=str(result.request_data),
poc_response=str(result.response_data.get("body_preview", ""))[:5000],
poc_payload=result.payload,
impact=self.registry.get_impact(result.vuln_type),
remediation=remediation,
ai_analysis=result.evidence
)