From d4bd6d4877a55d7e15c96e415b65a901701fa868 Mon Sep 17 00:00:00 2001 From: CyberSecurityUP Date: Wed, 24 Jun 2026 21:36:23 -0300 Subject: [PATCH] =?UTF-8?q?v3.5.0:=20per-agent=20attribution=20+=20token/c?= =?UTF-8?q?ost=20telemetry=20+=20graceful=20Ctrl-C=20(stop=20=E2=86=92=20g?= =?UTF-8?q?enerate/discard)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Streamed Claude events now tagged with the agent label (@name) so every command/tool/file is attributable to the agent that ran it. - Token/cost telemetry: parse usage from the stream-json result event; feed shows per-call in/out/cost and a running total in the run summary. - Ctrl-C during a run no longer hard-kills: it cancels cooperatively (no new agents launch, in-flight bounded), then asks "generate report from partial results? [Y/n]" — discard removes the run dir. Second Ctrl-C aborts. - pool: cancel handle + is_cancelled; one()/complete_routed/chat_cli carry a label. Co-Authored-By: Claude Opus 4.8 (1M context) --- neurosploit-rs/app/src/main.rs | 114 +++++++++++++++--- neurosploit-rs/crates/harness/src/models.rs | 18 ++- neurosploit-rs/crates/harness/src/pipeline.rs | 22 ++-- neurosploit-rs/crates/harness/src/pool.rs | 37 ++++-- neurosploit-rs/crates/harness/src/types.rs | 5 + 5 files changed, 157 insertions(+), 39 deletions(-) diff --git a/neurosploit-rs/app/src/main.rs b/neurosploit-rs/app/src/main.rs index d7080b2..1924d08 100644 --- a/neurosploit-rs/app/src/main.rs +++ b/neurosploit-rs/app/src/main.rs @@ -319,6 +319,7 @@ async fn run_mode(base: &Path, mut cfg: RunConfig, mcp: bool, mode: Mode) -> any let refs: Vec = cfg.models.iter().map(|s| ModelRef::parse(s)).collect(); let pool = ModelPool::with_auth(refs, cfg.concurrency, cfg.subscription, mcp_config); + let cancel = pool.cancel_handle(); let (tx, mut rx) = tokio::sync::mpsc::channel::(256); let printer = tokio::spawn(async move { @@ -326,13 +327,44 @@ async fn run_mode(base: &Path, mut cfg: RunConfig, mcp: bool, mode: Mode) -> any render_line(&line); } }); - let out = match mode { - Mode::White => harness::run_whitebox(cfg, &lib, &pool, tx).await, - Mode::Grey => harness::run_greybox(cfg, &lib, &pool, tx).await, - Mode::Black => harness::run(cfg, &lib, &pool, tx).await, + + // Run the engagement as a task so Ctrl-C can stop it gracefully (the AI's + // in-flight CLI/subprocesses are bounded; no new agents launch once cancelled). + let mut task = tokio::spawn(async move { + let out = match mode { + Mode::White => harness::run_whitebox(cfg, &lib, &pool, tx).await, + Mode::Grey => harness::run_greybox(cfg, &lib, &pool, tx).await, + Mode::Black => harness::run(cfg, &lib, &pool, tx).await, + }; + out + }); + + let mut cancelled = false; + let out: RunOutput = tokio::select! { + r = &mut task => r.unwrap_or_default(), + _ = tokio::signal::ctrl_c() => { + cancelled = true; + cancel.store(true, std::sync::atomic::Ordering::Relaxed); + println!("\n \x1b[33m⏸ stopping — finishing in-flight work… (Ctrl-C again to abort now)\x1b[0m"); + tokio::select! { + r = &mut task => r.unwrap_or_default(), + _ = tokio::signal::ctrl_c() => { task.abort(); println!(" \x1b[31m✗ aborted.\x1b[0m"); RunOutput::default() } + } + } }; let _ = printer.await; + // On a graceful stop, ask whether to keep (generate report) or discard. + if cancelled { + let keep = ask_yes_no("Generate a report from partial results? [Y/n]"); + if !keep { + std::fs::remove_dir_all(&workdir).ok(); + write_status(&workdir, "discarded", ""); + println!(" 🗑 discarded run {}", workdir.display()); + return Ok(out); + } + } + // Final report via Typst (PDF if the `typst` binary is present) + HTML/MD already written. match harness::report::typst_report(&out.target, &out.findings, &workdir) { Ok(p) => println!(" [*] report → {}", p.display()), @@ -353,8 +385,12 @@ pub(crate) fn print_findings(out: &RunOutput) { println!("\n \x1b[1mAttack path / kill chain\x1b[0m"); print!("{}", harness::attack_graph::ascii_killchain(&out.findings)); } + let toks = token_summary(); + if !toks.is_empty() { + println!("\n {toks}"); + } if !out.artifacts.is_empty() { - println!("\n artifacts: {}", out.artifacts.join(", ")); + println!(" artifacts: {}", out.artifacts.join(", ")); println!(" (full attack graph rendered in report.html)"); } } @@ -372,6 +408,18 @@ fn now_ts() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0) } +/// Blocking yes/no prompt (default yes). Used after a graceful Ctrl-C. +fn ask_yes_no(q: &str) -> bool { + use std::io::Write; + print!(" {q} "); + std::io::stdout().flush().ok(); + let mut s = String::new(); + if std::io::stdin().read_line(&mut s).is_err() { + return true; + } + !matches!(s.trim().to_lowercase().as_str(), "n" | "no") +} + // ── Activity-feed renderer ───────────────────────────────────────────────── // Turns the harness's tagged progress stream into a categorized feed: tool/ // command/file events render as compact cards; everything else as a state line @@ -379,24 +427,60 @@ fn now_ts() -> u64 { const RST: &str = "\x1b[0m"; fn render_line(raw: &str) { - let line = raw.trim_end(); + let mut line = raw.trim_end(); + // Optional "@agent " prefix tags which agent produced the event. + let mut who = String::new(); + if let Some(stripped) = line.strip_prefix('@') { + if let Some((label, rest)) = stripped.split_once(' ') { + who = format!("\x1b[2m[{label}]\x1b[0m "); + line = rest; + } + } let (tag, rest) = match line.split_once(": ") { - Some((t, r)) if matches!(t, "exec" | "danger" | "read" | "edit" | "tool" | "net" | "ai" | "plan") => (t, r), + Some((t, r)) if matches!(t, "exec" | "danger" | "read" | "edit" | "tool" | "net" | "ai" | "plan" | "tokens") => (t, r), _ => ("", line), }; match tag { - "exec" => card("⌘ command", rest, "\x1b[33m"), - "danger" => card("⚠ DANGEROUS command", rest, "\x1b[1;31m"), - "read" => state("📄", "reading", rest, "\x1b[34m"), - "edit" => state("✏️", "editing", rest, "\x1b[35m"), - "net" => card("🌐 request", rest, "\x1b[36m"), - "tool" => state("🔧", "tool", rest, "\x1b[35m"), - "ai" => state("💬", "", rest, "\x1b[2m"), - "plan" => state("🧭", "plan", rest, "\x1b[36m"), + "exec" => card(&format!("{who}⌘ command"), rest, "\x1b[33m"), + "danger" => card(&format!("{who}⚠ DANGEROUS command"), rest, "\x1b[1;31m"), + "read" => state("📄", "reading", &format!("{who}{rest}"), "\x1b[34m"), + "edit" => state("✏️", "editing", &format!("{who}{rest}"), "\x1b[35m"), + "net" => card(&format!("{who}🌐 request"), rest, "\x1b[36m"), + "tool" => state("🔧", "tool", &format!("{who}{rest}"), "\x1b[35m"), + "tokens" => { track_tokens(rest); state("🪙", "tokens", &format!("{who}{rest}"), "\x1b[2;33m"); } + "ai" => state("💬", "", &format!("{who}{rest}"), "\x1b[2m"), + "plan" => state("🧭", "plan", &format!("{who}{rest}"), "\x1b[36m"), _ => render_untagged(line), } } +// Running token/cost total across the engagement (shown in the summary). +static TOK_IN: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); +static TOK_OUT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); +static COST_MILLI: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + +fn track_tokens(rest: &str) { + use std::sync::atomic::Ordering::Relaxed; + // parse "in=N out=M cost=$X.XXXX" + for part in rest.split_whitespace() { + if let Some(v) = part.strip_prefix("in=") { TOK_IN.fetch_add(v.parse().unwrap_or(0), Relaxed); } + else if let Some(v) = part.strip_prefix("out=") { TOK_OUT.fetch_add(v.parse().unwrap_or(0), Relaxed); } + else if let Some(v) = part.strip_prefix("cost=$") { + COST_MILLI.fetch_add((v.parse::().unwrap_or(0.0) * 1000.0) as u64, Relaxed); + } + } +} + +/// Render and reset the running token/cost total (called at end of a run). +pub(crate) fn token_summary() -> String { + use std::sync::atomic::Ordering::Relaxed; + let i = TOK_IN.swap(0, Relaxed); + let o = TOK_OUT.swap(0, Relaxed); + let c = COST_MILLI.swap(0, Relaxed) as f64 / 1000.0; + if i == 0 && o == 0 && c == 0.0 { return String::new(); } + format!("🪙 tokens: in={i} out={o} · est. cost ${c:.4}") +} + fn render_untagged(l: &str) { let low = l.to_lowercase(); if l.starts_with("===") { diff --git a/neurosploit-rs/crates/harness/src/models.rs b/neurosploit-rs/crates/harness/src/models.rs index fb9e17d..11caa9a 100644 --- a/neurosploit-rs/crates/harness/src/models.rs +++ b/neurosploit-rs/crates/harness/src/models.rs @@ -134,6 +134,7 @@ impl ChatClient { /// **Playwright** (browse, execute JS, screenshot) during execution. pub async fn chat_cli( &self, + label: &str, provider: &str, model: &str, system: &str, @@ -146,9 +147,9 @@ impl ChatClient { let prompt = format!("{system}\n\n{user}"); // Claude Code can stream structured events (tools, commands, files) which - // we surface live as a categorized activity feed. + // we surface live as a categorized activity feed, attributed to `label`. if bin == "claude" { - return self.chat_claude_stream(model, &prompt, mcp_config, progress).await; + return self.chat_claude_stream(label, model, &prompt, mcp_config, progress).await; } let mut cmd = Command::new(bin); @@ -213,6 +214,7 @@ impl ChatClient { /// Tagged events are sent to `progress`; the final assistant text is returned. async fn chat_claude_stream( &self, + label: &str, model: &str, prompt: &str, mcp_config: Option<&str>, @@ -233,9 +235,11 @@ impl ChatClient { } let stdout = child.stdout.take().ok_or_else(|| anyhow!("no stdout"))?; let mut lines = BufReader::new(stdout).lines(); + // Tag every streamed event with the agent label so the feed is attributable. + let lbl = if label.is_empty() { String::new() } else { format!("@{label} ") }; let emit = |s: String| { if let Some(tx) = &progress { - let _ = tx.try_send(s); + let _ = tx.try_send(format!("{lbl}{s}")); } }; @@ -271,6 +275,14 @@ impl ChatClient { if let Some(r) = v.get("result").and_then(|x| x.as_str()) { result = r.to_string(); } + // Token/cost telemetry from the final result event. + let ti = v.pointer("/usage/input_tokens").and_then(|x| x.as_u64()); + let to = v.pointer("/usage/output_tokens").and_then(|x| x.as_u64()); + let cost = v.get("total_cost_usd").and_then(|x| x.as_f64()); + if ti.is_some() || to.is_some() || cost.is_some() { + emit(format!("tokens: in={} out={} cost=${:.4}", + ti.unwrap_or(0), to.unwrap_or(0), cost.unwrap_or(0.0))); + } if v.get("is_error").and_then(|x| x.as_bool()).unwrap_or(false) { had_err = v.get("result").and_then(|x| x.as_str()).unwrap_or("error").to_string(); } diff --git a/neurosploit-rs/crates/harness/src/pipeline.rs b/neurosploit-rs/crates/harness/src/pipeline.rs index 263d90d..8acccfa 100644 --- a/neurosploit-rs/crates/harness/src/pipeline.rs +++ b/neurosploit-rs/crates/harness/src/pipeline.rs @@ -88,7 +88,7 @@ pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender { let _ = tx.send(format!("recon complete via {}", m.label())).await; if cfg.verbose { @@ -159,6 +159,9 @@ pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender { let f = extract_findings(&text, &ag.name); let _ = txc.send(format!("exploit {} via {} → {} candidate(s)", ag.name, m.label(), f.len())).await; @@ -246,7 +249,7 @@ pub async fn run_whitebox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: S ag.user.replace("{target}", "the provided repository").replace("{recon_json}", "{}"), ctx ); - match pool.complete(&ag.system, &user).await { + match pool.complete_routed(Task::Exploit, &ag.name, &ag.system, &user).await { Ok((m, text)) => { let f = extract_findings(&text, &ag.name); let _ = txc.send(format!("analyze {} via {} → {} candidate(s)", ag.name, m.label(), f.len())).await; @@ -283,7 +286,7 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se let recon = if cfg.offline { "{}".to_string() } else { - match pool.complete_routed(Task::Recon, RECON_SYS, + match pool.complete_routed(Task::Recon, "recon", RECON_SYS, &format!("{}{}Target: {}", operator_directives(&cfg), tool_doctrine(pool.mcp_config.is_some()), cfg.target)).await { Ok((m, t)) => { let _ = tx.send(format!("recon complete via {}", m.label())).await; t } Err(e) => { let _ = tx.send(format!("recon failed ({e})")).await; "{}".to_string() } @@ -310,7 +313,7 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se where endpoint is file:line.", ag.user.replace("{target}", "the repository").replace("{recon_json}", "{}"), ctx ); - match pool.complete_routed(Task::Select, &ag.system, &user).await { + match pool.complete_routed(Task::Select, &ag.name, &ag.system, &user).await { Ok((_, text)) => { let f = extract_findings(&text, &ag.name); let _ = txc.send(format!("review {} → {} lead(s)", ag.name, f.len())).await; f } Err(_) => vec![], @@ -371,6 +374,9 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se let leads = leads_ctx.clone(); let txc = tx.clone(); async move { + if pool.is_cancelled() { + return (ag.name.clone(), String::new(), vec![]); + } if verbose { let _ = txc.send(format!(" ▶ launching agent: {} ({})", ag.name, ag.title.replace(" Agent", ""))).await; } @@ -383,7 +389,7 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se react = REACT_DOCTRINE, doctrine = tool_doctrine(mcp_on), body = ag.user.replace("{target}", &target).replace("{recon_json}", &recon), ); - match pool.complete_routed(Task::Exploit, &ag.system, &user).await { + match pool.complete_routed(Task::Exploit, &ag.name, &ag.system, &user).await { Ok((m, text)) => { let f = extract_findings(&text, &ag.name); let _ = txc.send(format!("exploit {} via {} → {} candidate(s)", ag.name, m.label(), f.len())).await; (ag.name.clone(), text, f) } @@ -431,7 +437,7 @@ async fn chain_round(pool: &ModelPool, target: &str, recon: &str, directives: &s (may be []): {{id,title,severity,cwe,endpoint,payload,evidence,impact,remediation,confidence}}.", react = REACT_DOCTRINE, doctrine = tool_doctrine(pool.mcp_config.is_some()), ); - match pool.complete_routed(Task::Exploit, CHAIN_SYS, &user).await { + match pool.complete_routed(Task::Exploit, "chain", CHAIN_SYS, &user).await { Ok((m, text)) => { let f = extract_findings(&text, "chain"); let _ = tx.send(format!("chain via {} → {} new candidate(s)", m.label(), f.len())).await; @@ -461,7 +467,7 @@ async fn select_agents(pool: &ModelPool, recon: &str, focus: &str, catalog: &[Ag format!("OPERATOR FOCUS (strongly prioritise agents for this): {focus}\n\n") }; let user = format!("{focus_line}RECON:\n{recon_trim}\n\nAGENT CATALOG (name — title [cwe]):\n{list}\n\nReturn a JSON array of agent names to run."); - match pool.complete_routed(Task::Select, SELECT_SYS, &user).await { + match pool.complete_routed(Task::Select, "select", SELECT_SYS, &user).await { Ok((m, text)) => { let names = parse_string_array(&text); if names.is_empty() { diff --git a/neurosploit-rs/crates/harness/src/pool.rs b/neurosploit-rs/crates/harness/src/pool.rs index 459ee89..e7c39d4 100644 --- a/neurosploit-rs/crates/harness/src/pool.rs +++ b/neurosploit-rs/crates/harness/src/pool.rs @@ -34,6 +34,9 @@ pub struct ModelPool { /// Progress channel: when set, the subscription CLI streams structured /// activity (tools called, commands run, files read) here live. progress: std::sync::Mutex>>, + /// Cooperative cancellation: when set, in-flight model calls short-circuit + /// and the pipeline stops launching new agents (graceful stop). + cancel: std::sync::Arc, } impl ModelPool { @@ -61,6 +64,7 @@ impl ModelPool { subscription, mcp_config, progress: std::sync::Mutex::new(None), + cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)), } } @@ -76,21 +80,31 @@ impl ModelPool { self.progress.lock().ok().and_then(|g| g.clone()) } + /// Handle to request graceful cancellation of an in-progress engagement. + pub fn cancel_handle(&self) -> Arc { + self.cancel.clone() + } + pub fn is_cancelled(&self) -> bool { + self.cancel.load(std::sync::atomic::Ordering::Relaxed) + } + /// One completion for a model, via subscription CLI (optionally with MCP) or - /// HTTP API, with a short retry/backoff to ride out transient failures - /// (rate limits, MCP cold-starts, network blips). - async fn one(&self, m: &ModelRef, system: &str, user: &str) -> Result { + /// HTTP API, with a short retry/backoff. `label` (e.g. the agent name) tags + /// the streamed activity so each command/tool is attributable. + async fn one(&self, label: &str, m: &ModelRef, system: &str, user: &str) -> Result { + if self.is_cancelled() { + return Err(anyhow!("cancelled")); + } let use_cli = self.subscription && cli_binary_for(&m.provider).is_some(); let progress = self.progress(); let mut last = anyhow::anyhow!("no attempt"); for attempt in 0..3u64 { if attempt > 0 { - // 1.5s, 4.5s backoff. tokio::time::sleep(std::time::Duration::from_millis(1500 * attempt * attempt.max(1))).await; } let r = if use_cli { self.client - .chat_cli(&m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone()) + .chat_cli(label, &m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone()) .await } else { self.client.chat(m, system, user).await @@ -104,20 +118,17 @@ impl ModelPool { } /// Complete a prompt, trying each candidate model until one succeeds. - /// Returns the model that answered and its text. pub async fn complete(&self, system: &str, user: &str) -> Result<(ModelRef, String)> { - self.complete_routed(Task::Default, system, user).await + self.complete_routed(Task::Default, "", system, user).await } - /// Router-aware completion: reorder the candidate panel by task before the - /// failover loop. Recon/triage prefer a fast/cheap model to save tokens and - /// latency; exploitation prefers the strongest (primary) model. - pub async fn complete_routed(&self, task: Task, system: &str, user: &str) -> Result<(ModelRef, String)> { + /// Router-aware completion. `label` tags streamed activity (agent name). + pub async fn complete_routed(&self, task: Task, label: &str, system: &str, user: &str) -> Result<(ModelRef, String)> { let _permit = self.sem.acquire().await.expect("semaphore closed"); let order = self.route(task); let mut last = anyhow!("no candidate models"); for m in &order { - match self.one(m, system, user).await { + match self.one(label, m, system, user).await { Ok(text) => return Ok((m.clone(), text)), Err(e) => last = e, } @@ -166,7 +177,7 @@ impl ModelPool { Ok(p) => p, Err(_) => break, }; - if let Ok(text) = self.one(m, system, user).await { + if let Ok(text) = self.one("validate", m, system, user).await { total += 1; let t = text.to_lowercase(); if t.contains("\"verdict\": \"confirmed\"") diff --git a/neurosploit-rs/crates/harness/src/types.rs b/neurosploit-rs/crates/harness/src/types.rs index fc8b115..bb2e535 100644 --- a/neurosploit-rs/crates/harness/src/types.rs +++ b/neurosploit-rs/crates/harness/src/types.rs @@ -119,6 +119,10 @@ pub struct RunConfig { /// Greybox: a source repository to review alongside the live `target` URL. #[serde(default)] pub repo: Option, + /// Explicit agent allowlist. When non-empty, the pipeline runs exactly these + /// agents (skipping recon-based selection) — used by the category picker. + #[serde(default)] + pub pinned: Vec, } fn default_vote() -> usize { @@ -144,6 +148,7 @@ impl RunConfig { instructions: None, auth: None, repo: None, + pinned: Vec::new(), } } }