Files
NeuroSploit/backend/core/md_agent.py
CyberSecurityUP 7563260b2b NeuroSploit v3.2.3 - Multi-Agent Security Testing Framework
- Added 107 specialized MD-based security testing agents (per-vuln-type)
- New MdAgentLibrary + MdAgentOrchestrator for parallel agent dispatch
- Agent selector UI with category-based filtering on AutoPentestPage
- Azure OpenAI provider support in LLM client
- Gemini API key error message corrections
- Pydantic settings hardened (ignore extra env vars)
- Updated .gitignore for runtime data artifacts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 18:59:22 -03:00

875 lines
32 KiB
Python

"""
NeuroSploit v3 - Markdown-Based Agent System
Each .md file in prompts/md_library/ acts as a self-contained agent definition
with its own methodology, system prompt, and output format.
After recon completes, the MdAgentOrchestrator dispatches each selected agent
against the target URL with full recon context. Findings flow through the
normal validation pipeline.
Components:
- MdAgentDefinition: parsed .md agent metadata
- MdAgent(SpecialistAgent): executes a single .md agent via LLM
- MdAgentLibrary: loads & indexes all .md agent definitions
- MdAgentOrchestrator: runs selected agents post-recon
"""
import asyncio
import json
import logging
import re
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from core.agent_base import SpecialistAgent, AgentResult
logger = logging.getLogger(__name__)
# ─── Agent categories ───────────────────────────────────────────────
# Only 'offensive' agents are dispatched during auto-pentest by default.
# Others are available on explicit selection.
# General-purpose agents (from md_library)
AGENT_CATEGORIES: Dict[str, str] = {
"pentest_generalist": "generalist",
"red_team_agent": "generalist",
"bug_bounty_hunter": "generalist",
"owasp_expert": "generalist",
"exploit_expert": "generalist",
"cwe_expert": "generalist",
"replay_attack_specialist": "generalist",
"Pentestfull": "methodology",
}
# All vuln-type agents default to "offensive" (handled in _load_all fallback)
# Agents that should NOT run as standalone agents (methodology files, dupes)
SKIP_AGENTS = {"Pentestfull"}
# Default agents to run when none are explicitly selected:
# Run ALL vuln-type (offensive) agents — the system is designed for 100-agent dispatch
DEFAULT_OFFENSIVE_AGENTS: List[str] = [] # Empty = use all offensive agents
# ─── Data classes ────────────────────────────────────────────────────
@dataclass
class MdAgentDefinition:
"""Parsed .md agent definition."""
name: str # filename stem (e.g. "owasp_expert")
display_name: str # human-readable (e.g. "OWASP Expert")
category: str # offensive / analysis / defensive / methodology
user_prompt_template: str # raw user prompt with {placeholders}
system_prompt: str # system prompt
file_path: str # absolute path to .md file
placeholders: List[str] = field(default_factory=list) # detected {vars}
# ─── MdAgent: executes one .md agent via LLM ────────────────────────
class MdAgent(SpecialistAgent):
"""Executes a single .md-based agent against a target URL.
The agent fills the .md template with recon context, sends to the LLM,
then parses structured findings from the response.
"""
def __init__(
self,
definition: MdAgentDefinition,
llm=None,
memory=None,
budget_allocation: float = 0.0,
budget=None,
validation_judge=None,
):
super().__init__(
name=f"md_{definition.name}",
llm=llm,
memory=memory,
budget_allocation=budget_allocation,
budget=budget,
)
self.definition = definition
self.validation_judge = validation_judge
async def run(self, context: Dict) -> AgentResult:
"""Execute the .md agent against the target with recon context."""
result = AgentResult(agent_name=self.name)
target = context.get("target", "")
if not target:
result.error = "No target provided"
return result
# Build prompts
user_prompt = self._build_user_prompt(context)
system_prompt = self.definition.system_prompt
# LLM call
try:
response = await self._llm_call(
f"{system_prompt}\n\n{user_prompt}",
category="md_agent",
estimated_tokens=2000,
)
except Exception as e:
result.error = f"LLM call failed: {e}"
return result
if not response:
result.error = "Empty LLM response"
return result
# Parse findings from structured response
parsed = self._parse_findings(response, target)
result.findings = parsed
result.data = {
"agent_name": self.definition.display_name,
"agent_category": self.definition.category,
"findings_count": len(parsed),
"raw_response_length": len(response),
}
self.tasks_completed += 1
return result
# ── Prompt building ──────────────────────────────────────────────
def _build_user_prompt(self, context: Dict) -> str:
"""Fill the .md template placeholders with recon context."""
target = context.get("target", "")
endpoints = context.get("endpoints", [])
technologies = context.get("technologies", [])
parameters = context.get("parameters", {})
headers = context.get("headers", {})
forms = context.get("forms", [])
waf_info = context.get("waf_info", "")
existing_findings = context.get("existing_findings", [])
# Build context objects for different placeholder patterns
scope_json = json.dumps({
"target": target,
"endpoints_discovered": len(endpoints),
"technologies": technologies[:15],
"waf": waf_info or "Not detected",
}, indent=2)
initial_info_json = json.dumps({
"target_url": target,
"endpoints": [
ep.get("url", ep) if isinstance(ep, dict) else str(ep)
for ep in endpoints[:30]
],
"parameters": (
{k: v for k, v in list(parameters.items())[:20]}
if isinstance(parameters, dict) else {}
),
"technologies": technologies[:15],
"headers": {k: v for k, v in list(headers.items())[:10]},
"forms": [
{"action": f.get("action", ""), "method": f.get("method", "GET")}
for f in (forms[:10] if isinstance(forms, list) else [])
],
}, indent=2)
target_environment_json = json.dumps({
"target": target,
"technology_stack": technologies[:10],
"waf": waf_info or "None detected",
"endpoints_count": len(endpoints),
"parameters_count": (
len(parameters) if isinstance(parameters, dict) else 0
),
}, indent=2)
existing_findings_summary = ""
if existing_findings:
existing_findings_summary = "\n".join(
f"- [{getattr(f, 'severity', 'unknown').upper()}] "
f"{getattr(f, 'title', '?')} at {getattr(f, 'affected_endpoint', '?')}"
for f in existing_findings[:20]
)
recon_data_json = json.dumps({
"target": target,
"endpoints": [
ep.get("url", ep) if isinstance(ep, dict) else str(ep)
for ep in endpoints[:30]
],
"technologies": technologies[:15],
"parameters": (
{k: v for k, v in list(parameters.items())[:20]}
if isinstance(parameters, dict) else {}
),
"existing_findings": existing_findings_summary or "None yet",
}, indent=2)
# Replacement map for all known placeholders
replacements = {
# New vuln-type agents use these two:
"{target}": target,
"{recon_json}": recon_data_json,
# Legacy generalist agents use these:
"{scope_json}": scope_json,
"{initial_info_json}": initial_info_json,
"{mission_objectives_json}": json.dumps({
"primary": f"Identify and exploit vulnerabilities on {target}",
"scope": "Web application only",
"existing_findings": len(existing_findings),
}, indent=2),
"{target_environment_json}": target_environment_json,
"{user_input}": target,
"{target_info_json}": initial_info_json,
"{recon_data_json}": recon_data_json,
"{vulnerability_details_json}": json.dumps({
"target": target,
"known_technologies": technologies[:10],
"endpoints": [
ep.get("url", ep) if isinstance(ep, dict) else str(ep)
for ep in endpoints[:15]
],
}, indent=2),
"{traffic_logs_json}": json.dumps({
"target": target,
"note": "Live traffic analysis - test authentication replay on discovered endpoints",
"endpoints": [
ep.get("url", ep) if isinstance(ep, dict) else str(ep)
for ep in endpoints[:10]
],
}, indent=2),
"{code_vulnerability_json}": json.dumps({
"target": target,
"technologies": technologies[:10],
"note": "Analyze target for CWE weaknesses based on observed behavior",
}, indent=2),
}
# Apply replacements
prompt = self.definition.user_prompt_template
for placeholder, value in replacements.items():
prompt = prompt.replace(placeholder, value)
# Inject recon context appendix if any placeholders remain unfilled
if "{" in prompt:
prompt += f"\n\n**Recon Context:**\n{recon_data_json}"
return prompt
# ── Finding parsing ──────────────────────────────────────────────
def _parse_findings(self, response: str, target: str) -> List[Dict]:
"""Parse structured findings from LLM response.
Handles multiple output formats from different .md agents:
- FINDING: key-value blocks (vuln-type agents)
- Headed sections (## [SEVERITY] Vulnerability: ...)
- OWASP format (## OWASP A0X: ...)
- Generic bold-label patterns
"""
findings = []
# Pattern 1: FINDING: blocks (used by 100 vuln-type agents)
finding_blocks = re.split(r"(?:^|\n)FINDING:", response)
if len(finding_blocks) > 1:
for block in finding_blocks[1:]: # skip text before first FINDING:
parsed = self._parse_finding_block(block, target)
if parsed:
findings.append(parsed)
if findings:
return findings
# Pattern 2: Section-based findings (## [SEVERITY] Vulnerability: Title)
vuln_sections = re.findall(
r"##\s*\[?(Critical|High|Medium|Low|Info)\]?\s*(?:Vulnerability|Attack|OWASP\s+A\d+)[\s:]*([^\n]+)",
response, re.IGNORECASE,
)
if vuln_sections:
parts = re.split(
r"(?=##\s*\[?(?:Critical|High|Medium|Low|Info)\]?\s*(?:Vulnerability|Attack|OWASP))",
response, flags=re.IGNORECASE,
)
for part in parts:
finding = self._parse_finding_section(part, target)
if finding:
findings.append(finding)
else:
# Pattern 3: Generic vulnerability mentions with evidence
generic = re.findall(
r"\*\*(?:Vulnerability|Finding|Issue)[:\s]*\*\*\s*([^\n]+)",
response, re.IGNORECASE,
)
for title in generic:
findings.append({
"title": title.strip(),
"severity": "medium",
"vulnerability_type": self._infer_vuln_type(title),
"description": "",
"affected_endpoint": target,
"evidence": "",
"poc_code": "",
"source_agent": self.definition.display_name,
})
return findings
def _parse_finding_block(self, block: str, target: str) -> Optional[Dict]:
"""Parse a FINDING: key-value block from vuln-type agent response.
Expected format:
FINDING:
- Title: SSRF in url parameter at /api/fetch
- Severity: High
- CWE: CWE-918
- Endpoint: https://target.com/api/fetch
- Evidence: Internal content returned
- Impact: Internal network access
- Remediation: Whitelist URLs
"""
if not block.strip():
return None
# Extract key-value pairs (- Key: Value)
kvs: Dict[str, str] = {}
for match in re.finditer(r"-\s*([A-Za-z][\w\s/]*?):\s*(.+)", block):
key = match.group(1).strip().lower().replace(" ", "_")
kvs[key] = match.group(2).strip()
title = kvs.get("title", "").strip()
if not title:
return None
# Extract severity
sev_raw = kvs.get("severity", "medium").lower().strip()
severity = "medium"
for s in ("critical", "high", "medium", "low", "info"):
if s in sev_raw:
severity = s
break
# Extract CWE
cwe = ""
cwe_raw = kvs.get("cwe", "")
cwe_match = re.search(r"CWE-(\d+)", cwe_raw)
if cwe_match:
cwe = f"CWE-{cwe_match.group(1)}"
# Use agent name as vuln type if it matches a known type
vuln_type = self.definition.name
if vuln_type.startswith("md_"):
vuln_type = vuln_type[3:]
# Extract endpoint
endpoint = kvs.get("endpoint", kvs.get("url", target)).strip()
# Extract code blocks as PoC
poc = ""
code_blocks = re.findall(r"```(?:\w+)?\n(.*?)```", block, re.DOTALL)
if code_blocks:
poc = "\n---\n".join(b.strip() for b in code_blocks[:3])
return {
"title": title,
"severity": severity,
"vulnerability_type": vuln_type,
"cvss_score": 0.0,
"cwe_id": cwe,
"description": kvs.get("impact", ""),
"affected_endpoint": endpoint,
"evidence": kvs.get("evidence", kvs.get("proof", "")),
"poc_code": poc or kvs.get("poc", kvs.get("payload", "")),
"impact": kvs.get("impact", ""),
"remediation": kvs.get("remediation", kvs.get("fix", "")),
"source_agent": self.definition.display_name,
"parameter": kvs.get("parameter", kvs.get("param", "")),
}
def _parse_finding_section(self, section: str, target: str) -> Optional[Dict]:
"""Parse a single finding section from the response."""
if not section.strip():
return None
# Extract title
title_match = re.search(
r"##\s*\[?(?:Critical|High|Medium|Low|Info)\]?\s*(?:Vulnerability|Attack|OWASP[^:]*)[:\s]*(.+)",
section, re.IGNORECASE,
)
title = title_match.group(1).strip() if title_match else ""
if not title:
return None
# Extract severity from header or table
severity = "medium"
sev_match = re.search(
r"\*\*Severity\*\*\s*\|?\s*(Critical|High|Medium|Low|Info)",
section, re.IGNORECASE,
)
if sev_match:
severity = sev_match.group(1).lower()
else:
header_sev = re.search(
r"##\s*\[?(Critical|High|Medium|Low|Info)\]?",
section, re.IGNORECASE,
)
if header_sev:
severity = header_sev.group(1).lower()
# Extract CVSS
cvss_match = re.search(r"(\d+\.\d+)", section[:500])
cvss = float(cvss_match.group(1)) if cvss_match else 0.0
# Extract CWE
cwe_match = re.search(r"CWE-(\d+)", section)
cwe = f"CWE-{cwe_match.group(1)}" if cwe_match else ""
# Extract endpoint
endpoint = target
ep_match = re.search(
r"\*\*Endpoint\*\*\s*\|?\s*(https?://[^\s|]+)",
section, re.IGNORECASE,
)
if ep_match:
endpoint = ep_match.group(1).strip()
# Extract description
desc = ""
desc_match = re.search(
r"###?\s*Description\s*\n(.*?)(?=\n###?\s|\Z)",
section, re.DOTALL | re.IGNORECASE,
)
if desc_match:
desc = desc_match.group(1).strip()[:1000]
# Extract PoC code blocks
poc = ""
code_blocks = re.findall(r"```(?:\w+)?\n(.*?)```", section, re.DOTALL)
if code_blocks:
poc = "\n---\n".join(block.strip() for block in code_blocks[:3])
# Extract evidence/proof
evidence = ""
ev_match = re.search(
r"###?\s*(?:Proof|Evidence|Tool (?:Output|Evidence))\s*\n(.*?)(?=\n###?\s|\Z)",
section, re.DOTALL | re.IGNORECASE,
)
if ev_match:
evidence = ev_match.group(1).strip()[:1000]
# Extract impact
impact = ""
imp_match = re.search(
r"###?\s*Impact\s*\n(.*?)(?=\n###?\s|\Z)",
section, re.DOTALL | re.IGNORECASE,
)
if imp_match:
impact = imp_match.group(1).strip()[:500]
# Extract remediation
remediation = ""
rem_match = re.search(
r"###?\s*(?:Remediation|Mitigations?|Fix)\s*\n(.*?)(?=\n###?\s|\Z)",
section, re.DOTALL | re.IGNORECASE,
)
if rem_match:
remediation = rem_match.group(1).strip()[:500]
return {
"title": title,
"severity": severity,
"vulnerability_type": self._infer_vuln_type(title),
"cvss_score": cvss,
"cwe_id": cwe,
"description": desc,
"affected_endpoint": endpoint,
"evidence": evidence,
"poc_code": poc,
"impact": impact,
"remediation": remediation,
"source_agent": self.definition.display_name,
}
@staticmethod
def _infer_vuln_type(title: str) -> str:
"""Infer vulnerability type from finding title."""
title_lower = title.lower()
type_map = {
"sql injection": "sqli_error",
"sqli": "sqli_error",
"xss": "xss_reflected",
"cross-site scripting": "xss_reflected",
"stored xss": "xss_stored",
"dom xss": "xss_dom",
"command injection": "command_injection",
"rce": "command_injection",
"remote code": "command_injection",
"ssrf": "ssrf",
"server-side request": "ssrf",
"csrf": "csrf",
"cross-site request": "csrf",
"lfi": "lfi",
"local file": "lfi",
"path traversal": "path_traversal",
"directory traversal": "path_traversal",
"file upload": "file_upload",
"xxe": "xxe",
"xml external": "xxe",
"ssti": "ssti",
"template injection": "ssti",
"open redirect": "open_redirect",
"redirect": "open_redirect",
"idor": "idor",
"insecure direct": "idor",
"broken access": "bola",
"access control": "bola",
"authentication": "auth_bypass",
"auth bypass": "auth_bypass",
"brute force": "brute_force",
"jwt": "jwt_manipulation",
"session": "session_fixation",
"clickjacking": "clickjacking",
"cors": "cors_misconfig",
"crlf": "crlf_injection",
"header injection": "header_injection",
"security header": "security_headers",
"ssl": "ssl_issues",
"tls": "ssl_issues",
"information disclosure": "information_disclosure",
"sensitive data": "sensitive_data_exposure",
"directory listing": "directory_listing",
"debug": "debug_mode",
"deserialization": "insecure_deserialization",
"nosql": "nosql_injection",
"ldap": "ldap_injection",
"graphql": "graphql_injection",
"race condition": "race_condition",
"business logic": "business_logic",
"rate limit": "rate_limit_bypass",
"subdomain takeover": "subdomain_takeover",
"host header": "host_header_injection",
"prototype pollution": "prototype_pollution",
"websocket": "websocket_hijacking",
}
for keyword, vtype in type_map.items():
if keyword in title_lower:
return vtype
return "unknown"
# ─── MdAgentLibrary: loads all .md agents ────────────────────────────
class MdAgentLibrary:
"""Loads all .md files from prompts/agents/ and indexes them
as executable agent definitions (100+ vuln-type agents)."""
def __init__(self, md_dir: str = "prompts/agents"):
self.md_dir = Path(md_dir)
self.agents: Dict[str, MdAgentDefinition] = {}
self._load_all()
def _load_all(self):
"""Load all .md files as agent definitions."""
if not self.md_dir.is_dir():
logger.warning(f"MD agent directory not found: {self.md_dir}")
return
for md_file in sorted(self.md_dir.glob("*.md")):
name = md_file.stem
if name in SKIP_AGENTS:
continue
try:
content = md_file.read_text(encoding="utf-8")
# Parse structured format
user_match = re.search(
r"## User Prompt\n(.*?)(?=\n## System Prompt|\Z)",
content, re.DOTALL,
)
system_match = re.search(
r"## System Prompt\n(.*?)(?=\n## User Prompt|\Z)",
content, re.DOTALL,
)
user_prompt = user_match.group(1).strip() if user_match else ""
system_prompt = system_match.group(1).strip() if system_match else ""
if not user_prompt and not system_prompt:
system_prompt = content.strip()
# Detect placeholders
placeholders = re.findall(r"\{(\w+)\}", user_prompt)
# Build display name
display_name = name.replace("_", " ").title()
title_match = re.search(r"^#\s+(.+)", content)
if title_match:
raw_title = title_match.group(1).strip()
# Remove suffixes: "Prompt", "Specialist Agent", "Agent"
display_name = re.sub(
r"\s*(?:Specialist Agent|Agent|Prompt)\s*$",
"", raw_title,
).strip()
category = AGENT_CATEGORIES.get(name, "offensive")
self.agents[name] = MdAgentDefinition(
name=name,
display_name=display_name,
category=category,
user_prompt_template=user_prompt,
system_prompt=system_prompt,
file_path=str(md_file.resolve()),
placeholders=placeholders,
)
logger.debug(f"Loaded MD agent: {name} ({category})")
except Exception as e:
logger.warning(f"Failed to load MD agent {md_file.name}: {e}")
logger.info(
f"MdAgentLibrary: loaded {len(self.agents)} agents from {self.md_dir}"
)
def get_agent(self, name: str) -> Optional[MdAgentDefinition]:
return self.agents.get(name)
def get_offensive_agents(self) -> List[MdAgentDefinition]:
return [a for a in self.agents.values() if a.category == "offensive"]
def get_by_category(self, category: str) -> List[MdAgentDefinition]:
return [a for a in self.agents.values() if a.category == category]
def list_agents(self) -> List[Dict]:
"""Return agent metadata list for API/frontend."""
return [
{
"name": a.name,
"display_name": a.display_name,
"category": a.category,
"placeholders": a.placeholders,
}
for a in self.agents.values()
]
# ─── MdAgentOrchestrator: runs agents post-recon ────────────────────
class MdAgentOrchestrator:
"""Coordinates execution of .md-based agents after recon.
Flow:
1. Select agents (explicit list or defaults)
2. Build shared context from recon data
3. Run agents in parallel (bounded concurrency)
4. Collect and merge findings
"""
MAX_CONCURRENT = 3
def __init__(
self,
llm=None,
memory=None,
budget=None,
validation_judge=None,
log_callback: Optional[Callable] = None,
progress_callback: Optional[Callable] = None,
):
self.llm = llm
self.memory = memory
self.budget = budget
self.validation_judge = validation_judge
self.log = log_callback
self.progress_callback = progress_callback
self.library = MdAgentLibrary()
self._cancel_event = asyncio.Event()
async def _log(self, level: str, message: str):
if self.log:
await self.log(level, message)
async def run(
self,
target: str,
recon_data: Any = None,
existing_findings: List[Any] = None,
selected_agents: Optional[List[str]] = None,
headers: Optional[Dict] = None,
waf_info: str = "",
) -> Dict:
"""Execute selected .md agents against target.
Args:
target: Target URL.
recon_data: ReconData object from recon phase.
existing_findings: Findings discovered so far.
selected_agents: List of agent names to run. None = defaults.
headers: Auth/custom headers.
waf_info: WAF detection info.
Returns:
Dict with findings, agent_results, statistics.
"""
start_time = time.time()
self._cancel_event.clear()
# Resolve agent selection
agents_to_run = self._resolve_agents(selected_agents)
if not agents_to_run:
await self._log("warning", "[MD-AGENTS] No agents available to run")
return {"findings": [], "agent_results": {}, "duration": 0}
agent_names = [a.display_name for a in agents_to_run]
await self._log("info", f"[MD-AGENTS] Dispatching {len(agents_to_run)} agents: "
f"{', '.join(agent_names)}")
# Build shared context
context = self._build_context(
target, recon_data, existing_findings, headers, waf_info,
)
# Budget per agent
n_agents = len(agents_to_run)
per_agent_budget = 1.0 / max(n_agents, 1)
# Create MdAgent instances
md_agents: List[MdAgent] = []
for defn in agents_to_run:
agent = MdAgent(
definition=defn,
llm=self.llm,
memory=self.memory,
budget_allocation=per_agent_budget,
budget=self.budget,
validation_judge=self.validation_judge,
)
md_agents.append(agent)
# Run agents with bounded concurrency
semaphore = asyncio.Semaphore(self.MAX_CONCURRENT)
all_results: Dict[str, AgentResult] = {}
async def _run_one(agent: MdAgent) -> AgentResult:
async with semaphore:
if self._cancel_event.is_set():
return AgentResult(
agent_name=agent.name, status="cancelled",
)
await self._log("info",
f" [{agent.definition.display_name}] Starting...")
result = await agent.execute(context)
await self._log("info",
f" [{agent.definition.display_name}] Done: "
f"{len(result.findings)} findings, "
f"{result.duration:.1f}s")
return result
tasks = [_run_one(a) for a in md_agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Collect results
all_findings = []
for agent, res in zip(md_agents, results):
if isinstance(res, Exception):
logger.error(f"MD agent {agent.name} error: {res}")
all_results[agent.name] = AgentResult(
agent_name=agent.name, status="failed", error=str(res),
)
else:
all_results[agent.name] = res
all_findings.extend(res.findings)
elapsed = time.time() - start_time
total_tokens = sum(
r.tokens_used for r in all_results.values()
if isinstance(r, AgentResult)
)
await self._log("info",
f"[MD-AGENTS] Complete: {len(all_findings)} findings from "
f"{len(agents_to_run)} agents in {elapsed:.1f}s")
return {
"findings": all_findings,
"agent_results": {
name: {
"status": r.status,
"findings_count": len(r.findings),
"tokens_used": r.tokens_used,
"duration": round(r.duration, 1),
"error": r.error,
}
for name, r in all_results.items()
if isinstance(r, AgentResult)
},
"total_findings": len(all_findings),
"total_tokens": total_tokens,
"agents_run": len(agents_to_run),
"duration": round(elapsed, 1),
}
def _resolve_agents(
self, selected: Optional[List[str]],
) -> List[MdAgentDefinition]:
"""Resolve agent selection to definitions.
When no agents are explicitly selected, dispatches ALL
offensive (vuln-type) agents — the XBOW-style architecture
runs one specialist per vulnerability type.
"""
if selected:
resolved = []
for name in selected:
defn = self.library.get_agent(name)
if defn:
resolved.append(defn)
else:
logger.warning(f"MD agent not found: {name}")
return resolved
# Default: all offensive (vuln-type) agents
return self.library.get_offensive_agents()
def _build_context(
self,
target: str,
recon_data: Any,
existing_findings: List[Any],
headers: Optional[Dict],
waf_info: str,
) -> Dict:
"""Build shared context dict from recon data."""
ctx: Dict[str, Any] = {"target": target}
if recon_data:
ctx["endpoints"] = getattr(recon_data, "endpoints", [])
ctx["technologies"] = getattr(recon_data, "technologies", [])
ctx["parameters"] = getattr(recon_data, "parameters", {})
ctx["forms"] = getattr(recon_data, "forms", [])
ctx["headers"] = getattr(recon_data, "response_headers", {})
else:
ctx["endpoints"] = []
ctx["technologies"] = []
ctx["parameters"] = {}
ctx["forms"] = []
ctx["headers"] = {}
if headers:
ctx["headers"].update(headers)
ctx["existing_findings"] = existing_findings or []
ctx["waf_info"] = waf_info
return ctx
def cancel(self):
self._cancel_event.set()
def list_available_agents(self) -> List[Dict]:
"""Return agent list for API/frontend."""
return self.library.list_agents()