Files
CyberSecurityUP 55af0d4634 NeuroSploit v3.3.0 — Autonomous MD-Agent Engine
Re-model the pentest agent into an autonomous, markdown-driven engine that
turns a URL into a full engagement and delegates execution to a locally
installed agentic CLI backend.

Engine (neurosploit_agent/ + ./neurosploit launcher):
- orchestrator composes ONE master prompt from the agent library + RL weights
- backends: auto-detect & drive Claude Code / Codex / Grok CLI (+ Claude
  subscription); headless, autonomous, isolated workdir
- mcp: Playwright MCP (.mcp.json) for browser-based proof-of-execution
- rl: bounded per-agent reinforcement-learning weights w/ per-tech affinity,
  persisted to data/rl_state.json
- models: latest registry incl. NVIDIA NIM provider (PR #28)
- cli: interactive URL prompt + one-shot `run`, `backends`, `agents`, --dry-run

Agent library (agents_md/, 213 total):
- 196 vuln specialists incl. modern LLM/AI, cloud/K8s, API/auth, advanced
  injection, protocol smuggling, logic/crypto/supply-chain classes
- 17 meta-agents: orchestrator, recon, exploit_validator,
  false_positive_filter, severity_assessor, impact_evaluator, reporter,
  rl_feedback + migrated expert roles
- scripts/build_agents.py data-driven builder; REGISTRY.md index

Docs: rewritten README.md, v3.3.0 RELEASE.md, .env.example (NVIDIA NIM, xAI,
engine vars).

Retire legacy Python orchestration (neurosploit.py + agent classes) to legacy/.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-14 20:57:38 -03:00

126 lines
4.4 KiB
Python

"""
Reinforcement-learning engine for NeuroSploit v3.3.0.
A lightweight, persisted reward loop that biases agent selection across runs.
It is deliberately model-free and explainable: each specialist agent carries a
weight in [0.05, 1.0] plus per-tech-stack affinity, updated after every run from
validated findings (positive reward) and rejected false positives (negative).
This mirrors `agents_md/meta/rl_feedback.md`: the markdown agent reasons about
rewards qualitatively; this module applies them deterministically so the state
file is reproducible and auditable.
"""
import json
import os
from dataclasses import dataclass
from typing import Dict, List, Optional
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
STATE_PATH = os.path.join(ROOT, "data", "rl_state.json")
SEVERITY_REWARD = {"critical": 1.0, "high": 0.7, "medium": 0.4, "low": 0.2, "info": 0.05}
FP_PENALTY = 0.3
IDLE_PENALTY = 0.05 # ran, found nothing, cost budget
ALPHA = 0.3 # learning rate
WMIN, WMAX = 0.05, 1.0
@dataclass
class Outcome:
agent: str
validated: List[str] # severities of validated findings
false_positives: int = 0
ran: bool = True
skipped_correctly: bool = False
tech: Optional[str] = None
def _clamp(x: float) -> float:
return max(WMIN, min(WMAX, x))
class RLEngine:
def __init__(self, path: str = STATE_PATH):
self.path = path
self.state = self._load()
def _load(self) -> dict:
if os.path.exists(self.path):
try:
return json.load(open(self.path, encoding="utf-8"))
except Exception:
pass
return {"version": 1, "agents": {}}
def weights(self) -> Dict[str, float]:
return {name: rec.get("weight", 0.5) for name, rec in self.state.get("agents", {}).items()}
def weight(self, agent: str, tech: Optional[str] = None) -> float:
rec = self.state.get("agents", {}).get(agent)
if not rec:
return 0.5
w = rec.get("weight", 0.5)
if tech:
w = max(w, rec.get("tech_affinity", {}).get(tech, 0.0) or 0.0)
return w
def reward(self, o: Outcome) -> float:
if o.skipped_correctly:
return 0.0
if not o.ran:
return 0.0
r = sum(SEVERITY_REWARD.get(s.lower(), 0.2) for s in o.validated)
r -= FP_PENALTY * o.false_positives
if not o.validated and not o.false_positives:
r -= IDLE_PENALTY
return max(-1.0, min(1.0, r))
def update(self, outcomes: List[Outcome], target: str = "") -> dict:
agents = self.state.setdefault("agents", {})
for o in outcomes:
rec = agents.setdefault(o.agent, {
"weight": 0.5, "runs": 0, "validated_hits": 0,
"false_positives": 0, "reward_last": 0.0, "tech_affinity": {},
})
r = self.reward(o)
old = rec["weight"]
rec["weight"] = _clamp(old + ALPHA * (r - old))
rec["reward_last"] = round(r, 3)
if o.ran and not o.skipped_correctly:
rec["runs"] += 1
rec["validated_hits"] += len(o.validated)
rec["false_positives"] += o.false_positives
if o.tech:
ta = rec.setdefault("tech_affinity", {})
ta[o.tech] = _clamp((ta.get(o.tech, 0.5)) + ALPHA * (r - ta.get(o.tech, 0.5)))
self.state["updated_for"] = target
return self.state
def save(self):
os.makedirs(os.path.dirname(self.path), exist_ok=True)
json.dump(self.state, open(self.path, "w", encoding="utf-8"), indent=2)
def outcomes_from_findings(findings: List[dict], ran_agents: List[str],
tech: Optional[str] = None) -> List[Outcome]:
"""Build per-agent Outcomes from a run's findings + the agents that ran."""
by_agent: Dict[str, Outcome] = {
a: Outcome(agent=a, validated=[], false_positives=0, ran=True, tech=tech)
for a in ran_agents
}
for f in findings:
a = f.get("agent")
if a not in by_agent:
by_agent[a] = Outcome(agent=a, validated=[], false_positives=0, ran=True, tech=tech)
if f.get("validated"):
by_agent[a].validated.append(f.get("severity", "Low"))
elif f.get("verdict") == "false_positive":
by_agent[a].false_positives += 1
return list(by_agent.values())
if __name__ == "__main__":
rl = RLEngine()
print(json.dumps(rl.weights(), indent=2))