mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-03-30 16:10:55 +02:00
116 modules | 100 vuln types | 18 API routes | 18 frontend pages Major features: - VulnEngine: 100 vuln types, 526+ payloads, 12 testers, anti-hallucination prompts - Autonomous Agent: 3-stream auto pentest, multi-session (5 concurrent), pause/resume/stop - CLI Agent: Claude Code / Gemini CLI / Codex CLI inside Kali containers - Validation Pipeline: negative controls, proof of execution, confidence scoring, judge - AI Reasoning: ReACT engine, token budget, endpoint classifier, CVE hunter, deep recon - Multi-Agent: 5 specialists + orchestrator + researcher AI + vuln type agents - RAG System: BM25/TF-IDF/ChromaDB vectorstore, few-shot, reasoning templates - Smart Router: 20 providers (8 CLI OAuth + 12 API), tier failover, token refresh - Kali Sandbox: container-per-scan, 56 tools, VPN support, on-demand install - Full IA Testing: methodology-driven comprehensive pentest sessions - Notifications: Discord, Telegram, WhatsApp/Twilio multi-channel alerts - Frontend: React/TypeScript with 18 pages, real-time WebSocket updates
742 lines
31 KiB
Python
742 lines
31 KiB
Python
"""
|
|
NeuroSploit v3 - Researcher AI Agent
|
|
|
|
Dedicated 0-day research agent that uses Kali Linux sandbox for tool execution,
|
|
AI-driven tool selection/installation, and hypothesis-driven vulnerability discovery.
|
|
|
|
Architecture:
|
|
- Reasoning loop: Observe → Hypothesize → Plan Tools → Execute in Sandbox → Analyze → Confirm/Reject
|
|
- AI selects tools from ToolRegistry (56+ tools), installs on demand in Kali container
|
|
- Each hypothesis generates targeted test plans with sandbox-executed tool chains
|
|
- Findings feed through existing ValidationJudge pipeline for confirmation
|
|
- Enabled via ENABLE_RESEARCHER_AI=true + enable_kali_sandbox=true per scan
|
|
|
|
Key difference from standard agent streams:
|
|
- Standard streams use hardcoded payload sets → Researcher uses AI-generated test plans
|
|
- Standard streams test known vuln types → Researcher hypothesizes unknown vulnerabilities
|
|
- Standard streams run tools locally → Researcher executes everything in Kali sandbox
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Optional imports with guards
|
|
try:
|
|
from core.kali_sandbox import KaliSandbox
|
|
from core.tool_registry import ToolRegistry
|
|
HAS_KALI = True
|
|
except ImportError:
|
|
HAS_KALI = False
|
|
|
|
try:
|
|
from core.sandbox_manager import SandboxResult
|
|
HAS_SANDBOX_RESULT = True
|
|
except ImportError:
|
|
HAS_SANDBOX_RESULT = False
|
|
|
|
|
|
# ── Data Classes ──────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class ResearchHypothesis:
|
|
"""A hypothesis about a potential vulnerability to test."""
|
|
id: str
|
|
title: str
|
|
description: str
|
|
target_endpoint: str
|
|
vuln_category: str # e.g., "logic_flaw", "race_condition", "auth_bypass", "injection"
|
|
confidence: float # 0.0-1.0 how likely this is exploitable
|
|
tools_needed: List[str] = field(default_factory=list)
|
|
test_commands: List[str] = field(default_factory=list)
|
|
expected_indicators: List[str] = field(default_factory=list)
|
|
status: str = "pending" # pending, testing, confirmed, rejected
|
|
evidence: str = ""
|
|
reasoning: str = ""
|
|
|
|
|
|
@dataclass
|
|
class ToolExecution:
|
|
"""Record of a tool execution in the sandbox."""
|
|
tool: str
|
|
command: str
|
|
purpose: str
|
|
exit_code: int = -1
|
|
stdout: str = ""
|
|
stderr: str = ""
|
|
duration: float = 0.0
|
|
findings_extracted: List[Dict] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class ResearchResult:
|
|
"""Final result from a research session."""
|
|
hypotheses_tested: int = 0
|
|
hypotheses_confirmed: int = 0
|
|
tools_used: Set[str] = field(default_factory=set)
|
|
tools_installed: Set[str] = field(default_factory=set)
|
|
findings: List[Dict] = field(default_factory=list)
|
|
tool_executions: List[ToolExecution] = field(default_factory=list)
|
|
total_duration: float = 0.0
|
|
token_usage: int = 0
|
|
|
|
|
|
# ── System Prompts ────────────────────────────────────────────────────────
|
|
|
|
RESEARCHER_SYSTEM_PROMPT = """You are an elite security researcher focused on discovering 0-day vulnerabilities and novel attack vectors.
|
|
|
|
CRITICAL RULES:
|
|
1. Think like an adversary — look for UNUSUAL behaviors, edge cases, race conditions, logic flaws
|
|
2. Don't just run scanners — REASON about the application architecture and hypothesize weaknesses
|
|
3. Base hypotheses on CONCRETE observations from recon data, not speculation
|
|
4. For each hypothesis, design SPECIFIC tool commands to test it
|
|
5. Analyze tool output carefully — distinguish between false positives and real findings
|
|
6. Chain findings — one weakness may unlock access to deeper vulnerabilities
|
|
7. Use the Kali sandbox tools strategically, not exhaustively
|
|
|
|
TOOL SELECTION STRATEGY:
|
|
- Use nuclei with specific templates for known-CVE testing
|
|
- Use sqlmap for targeted injection points only (not blind scans)
|
|
- Use ffuf/gobuster for hidden endpoint discovery
|
|
- Use nmap -sV -sC for service fingerprinting
|
|
- Use curl for precise manual verification of hypotheses
|
|
- Use custom scripts (python3, bash) for logic flaw testing
|
|
- NEVER run tools with default broad scans — always TARGET specific endpoints/params
|
|
|
|
OUTPUT FORMAT: Always respond in valid JSON."""
|
|
|
|
|
|
HYPOTHESIS_PROMPT = """Based on the following reconnaissance data, generate security research hypotheses.
|
|
|
|
**Target:** {target}
|
|
|
|
**Reconnaissance:**
|
|
- Endpoints ({endpoint_count}): {endpoints}
|
|
- Technologies: {technologies}
|
|
- Parameters: {parameters}
|
|
- Response headers: {headers}
|
|
- Existing findings: {existing_findings}
|
|
|
|
**Already tested hypotheses:** {tested_hypotheses}
|
|
|
|
Generate 3-5 NEW hypotheses about potential vulnerabilities. Focus on:
|
|
1. Logic flaws that automated scanners miss (race conditions, TOCTOU, business logic)
|
|
2. Misconfigurations specific to the detected tech stack
|
|
3. Chained attacks combining multiple weak signals
|
|
4. Known CVEs for detected software versions
|
|
5. Custom code vulnerabilities visible through error messages or behavior
|
|
|
|
Respond in JSON:
|
|
{{
|
|
"hypotheses": [
|
|
{{
|
|
"id": "H001",
|
|
"title": "Race condition in cart checkout",
|
|
"description": "The checkout flow may be vulnerable to TOCTOU if price validation happens before payment processing",
|
|
"target_endpoint": "/checkout/process",
|
|
"vuln_category": "logic_flaw",
|
|
"confidence": 0.6,
|
|
"tools_needed": ["curl", "python3"],
|
|
"test_commands": [
|
|
"curl -X POST '{target}/checkout/process' -d 'item=1&qty=1' -H 'Cookie: session=xxx' &",
|
|
"for i in $(seq 1 10); do curl -X POST '{target}/checkout/process' -d 'item=1&qty=1' -H 'Cookie: session=xxx' & done; wait"
|
|
],
|
|
"expected_indicators": ["duplicate order", "negative balance", "status 500", "inconsistent qty"],
|
|
"reasoning": "Cart endpoint accepts concurrent requests. If no mutex/lock on inventory check, TOCTOU may allow double-spend."
|
|
}}
|
|
]
|
|
}}"""
|
|
|
|
|
|
TOOL_PLAN_PROMPT = """Plan the tool execution for testing this hypothesis in a Kali Linux sandbox.
|
|
|
|
**Hypothesis:** {hypothesis_title}
|
|
**Description:** {hypothesis_desc}
|
|
**Target endpoint:** {target_endpoint}
|
|
**Category:** {vuln_category}
|
|
**Available tools (pre-installed):** nuclei, naabu, httpx, subfinder, katana, ffuf, gobuster, dalfox, nikto, sqlmap, nmap, curl, python3, bash
|
|
**Installable tools:** wpscan, dirb, hydra, testssl, sslscan, dirsearch, wfuzz, arjun, wafw00f, gau, gitleaks, commix, sslyze
|
|
|
|
Design 1-5 targeted tool commands to test this hypothesis. Each command should:
|
|
- Target ONLY the specific endpoint/parameter in question
|
|
- Have clear expected output that would confirm/deny the hypothesis
|
|
- Include proper timeouts and output format flags
|
|
|
|
Respond in JSON:
|
|
{{
|
|
"tools_needed": ["tool1", "tool2"],
|
|
"commands": [
|
|
{{
|
|
"tool": "curl",
|
|
"command": "curl -s -o /dev/null -w '%{{http_code}}' -X POST ...",
|
|
"purpose": "Test if endpoint accepts method override",
|
|
"timeout": 30,
|
|
"success_indicators": ["405", "200 with different response"],
|
|
"failure_indicators": ["403", "404", "identical response"]
|
|
}}
|
|
],
|
|
"analysis_notes": "If command 1 returns 200, proceed with command 2 to extract data"
|
|
}}"""
|
|
|
|
|
|
ANALYZE_RESULTS_PROMPT = """Analyze the results of testing hypothesis: {hypothesis_title}
|
|
|
|
**Hypothesis:** {hypothesis_desc}
|
|
**Expected indicators:** {expected_indicators}
|
|
|
|
**Tool execution results:**
|
|
{tool_results}
|
|
|
|
Based on the actual results:
|
|
1. Was the hypothesis confirmed, partially confirmed, or rejected?
|
|
2. What evidence supports your conclusion?
|
|
3. If confirmed, what is the severity and impact?
|
|
4. Are there follow-up hypotheses to test?
|
|
|
|
Respond in JSON:
|
|
{{
|
|
"verdict": "confirmed|partially_confirmed|rejected",
|
|
"confidence": 0.85,
|
|
"evidence_summary": "The response contained...",
|
|
"severity": "critical|high|medium|low|info",
|
|
"impact": "An attacker could...",
|
|
"follow_up_hypotheses": ["Test if same flaw exists on /api/v2/checkout"],
|
|
"poc_steps": ["Step 1: ...", "Step 2: ..."]
|
|
}}"""
|
|
|
|
|
|
class ResearcherAgent:
|
|
"""AI-driven 0-day vulnerability researcher using Kali sandbox.
|
|
|
|
The researcher operates in a hypothesis-driven loop:
|
|
1. OBSERVE: Analyze recon data and existing findings
|
|
2. HYPOTHESIZE: Generate targeted hypotheses about potential vulns
|
|
3. PLAN: Design tool execution plans for each hypothesis
|
|
4. EXECUTE: Run tools in Kali sandbox
|
|
5. ANALYZE: Evaluate results and confirm/reject hypotheses
|
|
6. ITERATE: Generate follow-up hypotheses from discoveries
|
|
|
|
Unlike the standard 3-stream agent, the researcher:
|
|
- Uses AI reasoning at every step (not just verification)
|
|
- Runs ALL tools in sandboxed Kali containers (no local execution)
|
|
- Focuses on novel/unknown vulns, not just known patterns
|
|
- Chains findings to discover deeper attack paths
|
|
"""
|
|
|
|
MAX_HYPOTHESES = 15 # Max hypotheses per research session
|
|
MAX_TOOL_EXECUTIONS = 30 # Max individual tool runs
|
|
MAX_ITERATIONS = 5 # Max hypothesis generation rounds
|
|
|
|
def __init__(
|
|
self,
|
|
llm,
|
|
scan_id: str,
|
|
target: str,
|
|
log_callback: Optional[Callable] = None,
|
|
progress_callback: Optional[Callable] = None,
|
|
finding_callback: Optional[Callable] = None,
|
|
recon_data: Optional[Dict] = None,
|
|
existing_findings: Optional[List] = None,
|
|
token_budget=None,
|
|
):
|
|
self.llm = llm
|
|
self.scan_id = scan_id
|
|
self.target = target
|
|
self.log_callback = log_callback
|
|
self.progress_callback = progress_callback
|
|
self.finding_callback = finding_callback
|
|
self.recon_data = recon_data or {}
|
|
self.existing_findings = existing_findings or []
|
|
self.token_budget = token_budget
|
|
|
|
# State
|
|
self._sandbox: Optional[Any] = None
|
|
self._tool_registry = ToolRegistry() if HAS_KALI else None
|
|
self._hypotheses: List[ResearchHypothesis] = []
|
|
self._tested_hypotheses: Set[str] = set()
|
|
self._tool_executions: List[ToolExecution] = []
|
|
self._findings: List[Dict] = []
|
|
self._tools_used: Set[str] = set()
|
|
self._tools_installed: Set[str] = set()
|
|
self._cancelled = False
|
|
self._token_usage = 0
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ------------------------------------------------------------------
|
|
async def initialize(self) -> Tuple[bool, str]:
|
|
"""Initialize the Kali sandbox for this research session."""
|
|
if not HAS_KALI:
|
|
return False, "Kali sandbox not available (missing core.kali_sandbox)"
|
|
|
|
self._sandbox = KaliSandbox(
|
|
scan_id=f"research-{self.scan_id}",
|
|
image=os.getenv("KALI_SANDBOX_IMAGE", "neurosploit-kali:latest"),
|
|
)
|
|
ok, msg = await self._sandbox.initialize()
|
|
if ok:
|
|
await self._log("success", f"[RESEARCHER] Kali sandbox ready: {msg}")
|
|
else:
|
|
await self._log("warning", f"[RESEARCHER] Sandbox init failed: {msg}")
|
|
return ok, msg
|
|
|
|
async def shutdown(self):
|
|
"""Destroy the sandbox container."""
|
|
if self._sandbox:
|
|
await self._sandbox.stop()
|
|
self._sandbox = None
|
|
|
|
def cancel(self):
|
|
"""Signal cancellation."""
|
|
self._cancelled = True
|
|
|
|
# ------------------------------------------------------------------
|
|
# Main Research Loop
|
|
# ------------------------------------------------------------------
|
|
async def run(self) -> ResearchResult:
|
|
"""Execute the full research pipeline.
|
|
|
|
Returns ResearchResult with all findings and metadata.
|
|
"""
|
|
start_time = time.time()
|
|
result = ResearchResult()
|
|
|
|
if not self._sandbox or not self._sandbox.is_available:
|
|
await self._log("error", "[RESEARCHER] No sandbox available, cannot run")
|
|
return result
|
|
|
|
await self._log("info", "=" * 60)
|
|
await self._log("info", " AI RESEARCHER — 0-Day Discovery Mode")
|
|
await self._log("info", f" Target: {self.target}")
|
|
await self._log("info", f" Sandbox: {self._sandbox.container_name}")
|
|
await self._log("info", "=" * 60)
|
|
|
|
try:
|
|
# Iteration loop: observe → hypothesize → test → analyze → repeat
|
|
for iteration in range(self.MAX_ITERATIONS):
|
|
if self._cancelled:
|
|
break
|
|
if len(self._hypotheses) >= self.MAX_HYPOTHESES:
|
|
await self._log("info", "[RESEARCHER] Max hypotheses reached")
|
|
break
|
|
if len(self._tool_executions) >= self.MAX_TOOL_EXECUTIONS:
|
|
await self._log("info", "[RESEARCHER] Max tool executions reached")
|
|
break
|
|
|
|
progress_base = int((iteration / self.MAX_ITERATIONS) * 100)
|
|
await self._progress(progress_base, f"Research iteration {iteration + 1}")
|
|
|
|
# 1. Generate hypotheses
|
|
await self._log("info", f"[RESEARCHER] Iteration {iteration + 1}: Generating hypotheses...")
|
|
new_hypotheses = await self._generate_hypotheses()
|
|
|
|
if not new_hypotheses:
|
|
await self._log("info", "[RESEARCHER] No new hypotheses generated, research complete")
|
|
break
|
|
|
|
await self._log("info", f"[RESEARCHER] Generated {len(new_hypotheses)} hypotheses")
|
|
|
|
# 2. Test each hypothesis
|
|
for i, hypothesis in enumerate(new_hypotheses):
|
|
if self._cancelled:
|
|
break
|
|
|
|
sub_progress = progress_base + int(((i + 1) / len(new_hypotheses)) * (100 / self.MAX_ITERATIONS))
|
|
await self._progress(min(sub_progress, 95), f"Testing: {hypothesis.title[:40]}...")
|
|
|
|
await self._log("info", f"[RESEARCHER] Testing H{hypothesis.id}: {hypothesis.title}")
|
|
await self._log("info", f" Category: {hypothesis.vuln_category} | Confidence: {hypothesis.confidence:.0%}")
|
|
|
|
# Plan tools
|
|
tool_plan = await self._plan_tools(hypothesis)
|
|
if not tool_plan:
|
|
hypothesis.status = "rejected"
|
|
hypothesis.evidence = "Failed to generate tool plan"
|
|
continue
|
|
|
|
# Execute tools in sandbox
|
|
tool_results = await self._execute_tool_plan(hypothesis, tool_plan)
|
|
|
|
# Analyze results
|
|
verdict = await self._analyze_results(hypothesis, tool_results)
|
|
|
|
if verdict.get("verdict") == "confirmed":
|
|
hypothesis.status = "confirmed"
|
|
hypothesis.evidence = verdict.get("evidence_summary", "")
|
|
await self._create_finding(hypothesis, verdict)
|
|
await self._log("success",
|
|
f" CONFIRMED: {hypothesis.title} "
|
|
f"[{verdict.get('severity', 'medium').upper()}]"
|
|
)
|
|
elif verdict.get("verdict") == "partially_confirmed":
|
|
hypothesis.status = "confirmed"
|
|
hypothesis.evidence = verdict.get("evidence_summary", "")
|
|
await self._create_finding(hypothesis, verdict)
|
|
await self._log("warning",
|
|
f" PARTIAL: {hypothesis.title} — needs manual verification"
|
|
)
|
|
else:
|
|
hypothesis.status = "rejected"
|
|
hypothesis.evidence = verdict.get("evidence_summary", "No exploitable behavior observed")
|
|
await self._log("info", f" Rejected: {hypothesis.title}")
|
|
|
|
self._tested_hypotheses.add(hypothesis.id)
|
|
|
|
# Follow-up hypotheses from analysis
|
|
follow_ups = verdict.get("follow_up_hypotheses", [])
|
|
if follow_ups:
|
|
await self._log("info", f" {len(follow_ups)} follow-up hypotheses queued")
|
|
|
|
except Exception as e:
|
|
await self._log("error", f"[RESEARCHER] Research error: {e}")
|
|
|
|
# Finalize
|
|
await self._progress(100, "Research complete")
|
|
result.hypotheses_tested = len(self._tested_hypotheses)
|
|
result.hypotheses_confirmed = sum(1 for h in self._hypotheses if h.status == "confirmed")
|
|
result.tools_used = self._tools_used.copy()
|
|
result.tools_installed = self._tools_installed.copy()
|
|
result.findings = self._findings.copy()
|
|
result.tool_executions = self._tool_executions.copy()
|
|
result.total_duration = time.time() - start_time
|
|
result.token_usage = self._token_usage
|
|
|
|
await self._log("info", "=" * 60)
|
|
await self._log("info", " RESEARCH COMPLETE")
|
|
await self._log("info", f" Hypotheses tested: {result.hypotheses_tested}")
|
|
await self._log("info", f" Confirmed: {result.hypotheses_confirmed}")
|
|
await self._log("info", f" Findings: {len(result.findings)}")
|
|
await self._log("info", f" Tools used: {', '.join(sorted(result.tools_used)) or 'none'}")
|
|
await self._log("info", f" Duration: {result.total_duration:.1f}s")
|
|
await self._log("info", "=" * 60)
|
|
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Step 1: Hypothesis Generation
|
|
# ------------------------------------------------------------------
|
|
async def _generate_hypotheses(self) -> List[ResearchHypothesis]:
|
|
"""Use AI to generate research hypotheses from recon data."""
|
|
endpoints = self.recon_data.get("endpoints", [])
|
|
endpoint_strs = []
|
|
for ep in endpoints[:20]:
|
|
if isinstance(ep, dict):
|
|
endpoint_strs.append(f"{ep.get('method', 'GET')} {ep.get('url', ep.get('path', ''))}")
|
|
else:
|
|
endpoint_strs.append(str(ep))
|
|
|
|
params = self.recon_data.get("parameters", {})
|
|
if isinstance(params, dict):
|
|
param_str = json.dumps(dict(list(params.items())[:20]))
|
|
elif isinstance(params, list):
|
|
param_str = ", ".join(str(p) for p in params[:20])
|
|
else:
|
|
param_str = str(params)[:500]
|
|
|
|
technologies = self.recon_data.get("technologies", [])
|
|
headers = self.recon_data.get("response_headers", {})
|
|
|
|
existing = []
|
|
for f in self.existing_findings[:10]:
|
|
if isinstance(f, dict):
|
|
existing.append(f"{f.get('vulnerability_type', '?')}: {f.get('title', '?')}")
|
|
else:
|
|
existing.append(f"{getattr(f, 'vulnerability_type', '?')}: {getattr(f, 'title', '?')}")
|
|
|
|
tested = [f"{h.id}: {h.title} ({h.status})" for h in self._hypotheses[-10:]]
|
|
|
|
prompt = HYPOTHESIS_PROMPT.format(
|
|
target=self.target,
|
|
endpoint_count=len(endpoints),
|
|
endpoints="\n".join(endpoint_strs[:15]),
|
|
technologies=", ".join(technologies[:10]),
|
|
parameters=param_str[:500],
|
|
headers=json.dumps(dict(list(headers.items())[:10]) if isinstance(headers, dict) else {})[:500],
|
|
existing_findings="\n".join(existing) if existing else "None yet",
|
|
tested_hypotheses="\n".join(tested) if tested else "None yet",
|
|
)
|
|
|
|
try:
|
|
response = await self.llm.generate(prompt, RESEARCHER_SYSTEM_PROMPT)
|
|
self._token_usage += len(prompt.split()) + len(response.split())
|
|
|
|
match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if match:
|
|
data = json.loads(match.group())
|
|
hypotheses = []
|
|
for h_data in data.get("hypotheses", []):
|
|
h_id = h_data.get("id", f"H{len(self._hypotheses) + len(hypotheses) + 1:03d}")
|
|
|
|
# Skip already tested
|
|
if h_id in self._tested_hypotheses:
|
|
continue
|
|
|
|
hypothesis = ResearchHypothesis(
|
|
id=h_id,
|
|
title=h_data.get("title", "Unknown hypothesis"),
|
|
description=h_data.get("description", ""),
|
|
target_endpoint=h_data.get("target_endpoint", self.target),
|
|
vuln_category=h_data.get("vuln_category", "unknown"),
|
|
confidence=min(1.0, max(0.0, float(h_data.get("confidence", 0.5)))),
|
|
tools_needed=h_data.get("tools_needed", []),
|
|
test_commands=h_data.get("test_commands", []),
|
|
expected_indicators=h_data.get("expected_indicators", []),
|
|
reasoning=h_data.get("reasoning", ""),
|
|
)
|
|
hypotheses.append(hypothesis)
|
|
self._hypotheses.append(hypothesis)
|
|
|
|
return hypotheses
|
|
except Exception as e:
|
|
await self._log("warning", f"[RESEARCHER] Hypothesis generation failed: {e}")
|
|
|
|
return []
|
|
|
|
# ------------------------------------------------------------------
|
|
# Step 2: Tool Planning
|
|
# ------------------------------------------------------------------
|
|
async def _plan_tools(self, hypothesis: ResearchHypothesis) -> Optional[Dict]:
|
|
"""Use AI to plan specific tool executions for a hypothesis."""
|
|
prompt = TOOL_PLAN_PROMPT.format(
|
|
hypothesis_title=hypothesis.title,
|
|
hypothesis_desc=hypothesis.description,
|
|
target_endpoint=hypothesis.target_endpoint,
|
|
vuln_category=hypothesis.vuln_category,
|
|
)
|
|
|
|
try:
|
|
response = await self.llm.generate(prompt, RESEARCHER_SYSTEM_PROMPT)
|
|
self._token_usage += len(prompt.split()) + len(response.split())
|
|
|
|
match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if match:
|
|
return json.loads(match.group())
|
|
except Exception as e:
|
|
await self._log("warning", f"[RESEARCHER] Tool planning failed for {hypothesis.id}: {e}")
|
|
|
|
# Fallback: use hypothesis test_commands directly
|
|
if hypothesis.test_commands:
|
|
return {
|
|
"tools_needed": hypothesis.tools_needed,
|
|
"commands": [
|
|
{
|
|
"tool": hypothesis.tools_needed[0] if hypothesis.tools_needed else "curl",
|
|
"command": cmd,
|
|
"purpose": f"Test hypothesis: {hypothesis.title}",
|
|
"timeout": 60,
|
|
"success_indicators": hypothesis.expected_indicators,
|
|
"failure_indicators": [],
|
|
}
|
|
for cmd in hypothesis.test_commands[:5]
|
|
],
|
|
}
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Step 3: Sandbox Execution
|
|
# ------------------------------------------------------------------
|
|
async def _execute_tool_plan(
|
|
self, hypothesis: ResearchHypothesis, plan: Dict
|
|
) -> List[ToolExecution]:
|
|
"""Execute tool plan commands inside Kali sandbox."""
|
|
results = []
|
|
tools_needed = plan.get("tools_needed", [])
|
|
|
|
# Install required tools first
|
|
for tool in tools_needed:
|
|
if tool not in self._tools_installed and self._tool_registry:
|
|
if self._tool_registry.is_known(tool):
|
|
await self._log("info", f" [SANDBOX] Ensuring tool: {tool}")
|
|
ok = await self._sandbox._ensure_tool(tool)
|
|
if ok:
|
|
self._tools_installed.add(tool)
|
|
await self._log("success", f" [SANDBOX] Tool ready: {tool}")
|
|
else:
|
|
await self._log("warning", f" [SANDBOX] Failed to install: {tool}")
|
|
|
|
# Execute commands
|
|
for cmd_spec in plan.get("commands", [])[:5]:
|
|
if self._cancelled:
|
|
break
|
|
|
|
tool_name = cmd_spec.get("tool", "raw")
|
|
command = cmd_spec.get("command", "")
|
|
purpose = cmd_spec.get("purpose", "")
|
|
timeout = min(cmd_spec.get("timeout", 120), 300) # Cap at 5 min
|
|
|
|
if not command:
|
|
continue
|
|
|
|
# Sanitize: replace target placeholder
|
|
command = command.replace("{target}", self.target)
|
|
|
|
await self._log("info", f" [SANDBOX] Running {tool_name}: {purpose[:60]}")
|
|
self._tools_used.add(tool_name)
|
|
|
|
sandbox_result = await self._sandbox.execute_raw(command, timeout=timeout)
|
|
|
|
exec_record = ToolExecution(
|
|
tool=tool_name,
|
|
command=command,
|
|
purpose=purpose,
|
|
exit_code=sandbox_result.exit_code,
|
|
stdout=sandbox_result.stdout[:5000], # Cap output
|
|
stderr=sandbox_result.stderr[:2000],
|
|
duration=sandbox_result.duration_seconds,
|
|
)
|
|
|
|
# Extract structured findings if available
|
|
if sandbox_result.findings:
|
|
exec_record.findings_extracted = sandbox_result.findings
|
|
|
|
results.append(exec_record)
|
|
self._tool_executions.append(exec_record)
|
|
|
|
# Quick check for success indicators
|
|
success_indicators = cmd_spec.get("success_indicators", [])
|
|
for indicator in success_indicators:
|
|
if indicator.lower() in (sandbox_result.stdout or "").lower():
|
|
await self._log("warning",
|
|
f" [SANDBOX] Possible hit: '{indicator}' found in output"
|
|
)
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Step 4: Result Analysis
|
|
# ------------------------------------------------------------------
|
|
async def _analyze_results(
|
|
self, hypothesis: ResearchHypothesis, tool_results: List[ToolExecution]
|
|
) -> Dict:
|
|
"""Use AI to analyze tool execution results and verdict the hypothesis."""
|
|
if not tool_results:
|
|
return {"verdict": "rejected", "evidence_summary": "No tool output to analyze"}
|
|
|
|
# Format tool results for AI
|
|
results_text = []
|
|
for tr in tool_results:
|
|
output_preview = tr.stdout[:1500] if tr.stdout else "(empty)"
|
|
error_preview = tr.stderr[:500] if tr.stderr else ""
|
|
results_text.append(
|
|
f"**{tr.tool}** ({tr.purpose}):\n"
|
|
f" Command: {tr.command[:200]}\n"
|
|
f" Exit code: {tr.exit_code}\n"
|
|
f" Duration: {tr.duration:.1f}s\n"
|
|
f" Output:\n```\n{output_preview}\n```\n"
|
|
+ (f" Errors: {error_preview}\n" if error_preview else "")
|
|
)
|
|
|
|
prompt = ANALYZE_RESULTS_PROMPT.format(
|
|
hypothesis_title=hypothesis.title,
|
|
hypothesis_desc=hypothesis.description,
|
|
expected_indicators=", ".join(hypothesis.expected_indicators),
|
|
tool_results="\n---\n".join(results_text),
|
|
)
|
|
|
|
try:
|
|
response = await self.llm.generate(prompt, RESEARCHER_SYSTEM_PROMPT)
|
|
self._token_usage += len(prompt.split()) + len(response.split())
|
|
|
|
match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if match:
|
|
return json.loads(match.group())
|
|
except Exception as e:
|
|
await self._log("warning", f"[RESEARCHER] Analysis failed for {hypothesis.id}: {e}")
|
|
|
|
return {"verdict": "rejected", "evidence_summary": "Analysis failed"}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Step 5: Finding Creation
|
|
# ------------------------------------------------------------------
|
|
async def _create_finding(self, hypothesis: ResearchHypothesis, verdict: Dict):
|
|
"""Create a finding from a confirmed hypothesis."""
|
|
severity = verdict.get("severity", "medium")
|
|
if severity not in ("critical", "high", "medium", "low", "info"):
|
|
severity = "medium"
|
|
|
|
# Build PoC from tool commands
|
|
poc_steps = verdict.get("poc_steps", [])
|
|
tool_cmds = [te.command for te in self._tool_executions
|
|
if any(te.purpose and hypothesis.title[:20] in te.purpose
|
|
for _ in [1])]
|
|
|
|
poc_code = ""
|
|
if poc_steps:
|
|
poc_code = "# PoC Steps (verified in Kali sandbox)\n"
|
|
for i, step in enumerate(poc_steps, 1):
|
|
poc_code += f"# Step {i}: {step}\n"
|
|
elif tool_cmds:
|
|
poc_code = "# Verified tool commands:\n" + "\n".join(tool_cmds[:5])
|
|
|
|
finding = {
|
|
"title": hypothesis.title,
|
|
"severity": severity,
|
|
"vulnerability_type": hypothesis.vuln_category,
|
|
"description": hypothesis.description,
|
|
"affected_endpoint": hypothesis.target_endpoint,
|
|
"evidence": hypothesis.evidence or verdict.get("evidence_summary", ""),
|
|
"impact": verdict.get("impact", ""),
|
|
"poc_code": poc_code,
|
|
"confidence_score": int(verdict.get("confidence", 0.7) * 100),
|
|
"source": "researcher_agent",
|
|
"sandbox_verified": True,
|
|
"reasoning": hypothesis.reasoning,
|
|
"tools_used": list(self._tools_used),
|
|
}
|
|
|
|
self._findings.append(finding)
|
|
|
|
# Notify via callback
|
|
if self.finding_callback:
|
|
try:
|
|
await self.finding_callback(finding)
|
|
except Exception:
|
|
pass
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------
|
|
async def _log(self, level: str, message: str):
|
|
"""Send log message."""
|
|
if self.log_callback:
|
|
try:
|
|
await self.log_callback(level, message)
|
|
except Exception:
|
|
pass
|
|
logger.log(
|
|
{"info": logging.INFO, "warning": logging.WARNING, "error": logging.ERROR,
|
|
"success": logging.INFO, "debug": logging.DEBUG}.get(level, logging.INFO),
|
|
message,
|
|
)
|
|
|
|
async def _progress(self, pct: int, phase: str):
|
|
"""Send progress update."""
|
|
if self.progress_callback:
|
|
try:
|
|
await self.progress_callback(min(pct, 100), f"Researcher: {phase}")
|
|
except Exception:
|
|
pass
|
|
|
|
def get_status(self) -> Dict:
|
|
"""Return current research status for dashboard."""
|
|
return {
|
|
"hypotheses_total": len(self._hypotheses),
|
|
"hypotheses_tested": len(self._tested_hypotheses),
|
|
"hypotheses_confirmed": sum(1 for h in self._hypotheses if h.status == "confirmed"),
|
|
"hypotheses_rejected": sum(1 for h in self._hypotheses if h.status == "rejected"),
|
|
"tool_executions": len(self._tool_executions),
|
|
"tools_used": sorted(self._tools_used),
|
|
"tools_installed": sorted(self._tools_installed),
|
|
"findings": len(self._findings),
|
|
"sandbox_available": self._sandbox.is_available if self._sandbox else False,
|
|
"token_usage": self._token_usage,
|
|
}
|