v3.5.0: per-agent attribution + token/cost telemetry + graceful Ctrl-C (stop → generate/discard)

- Streamed Claude events now tagged with the agent label (@name) so every
  command/tool/file is attributable to the agent that ran it.
- Token/cost telemetry: parse usage from the stream-json result event; feed shows
  per-call in/out/cost and a running total in the run summary.
- Ctrl-C during a run no longer hard-kills: it cancels cooperatively (no new
  agents launch, in-flight bounded), then asks "generate report from partial
  results? [Y/n]" — discard removes the run dir. Second Ctrl-C aborts.
- pool: cancel handle + is_cancelled; one()/complete_routed/chat_cli carry a label.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
CyberSecurityUP
2026-06-24 21:36:23 -03:00
parent 702f22a87a
commit d4bd6d4877
5 changed files with 157 additions and 39 deletions
+99 -15
View File
@@ -319,6 +319,7 @@ async fn run_mode(base: &Path, mut cfg: RunConfig, mcp: bool, mode: Mode) -> any
let refs: Vec<ModelRef> = cfg.models.iter().map(|s| ModelRef::parse(s)).collect();
let pool = ModelPool::with_auth(refs, cfg.concurrency, cfg.subscription, mcp_config);
let cancel = pool.cancel_handle();
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(256);
let printer = tokio::spawn(async move {
@@ -326,13 +327,44 @@ async fn run_mode(base: &Path, mut cfg: RunConfig, mcp: bool, mode: Mode) -> any
render_line(&line);
}
});
let out = match mode {
Mode::White => harness::run_whitebox(cfg, &lib, &pool, tx).await,
Mode::Grey => harness::run_greybox(cfg, &lib, &pool, tx).await,
Mode::Black => harness::run(cfg, &lib, &pool, tx).await,
// Run the engagement as a task so Ctrl-C can stop it gracefully (the AI's
// in-flight CLI/subprocesses are bounded; no new agents launch once cancelled).
let mut task = tokio::spawn(async move {
let out = match mode {
Mode::White => harness::run_whitebox(cfg, &lib, &pool, tx).await,
Mode::Grey => harness::run_greybox(cfg, &lib, &pool, tx).await,
Mode::Black => harness::run(cfg, &lib, &pool, tx).await,
};
out
});
let mut cancelled = false;
let out: RunOutput = tokio::select! {
r = &mut task => r.unwrap_or_default(),
_ = tokio::signal::ctrl_c() => {
cancelled = true;
cancel.store(true, std::sync::atomic::Ordering::Relaxed);
println!("\n \x1b[33m⏸ stopping — finishing in-flight work… (Ctrl-C again to abort now)\x1b[0m");
tokio::select! {
r = &mut task => r.unwrap_or_default(),
_ = tokio::signal::ctrl_c() => { task.abort(); println!(" \x1b[31m✗ aborted.\x1b[0m"); RunOutput::default() }
}
}
};
let _ = printer.await;
// On a graceful stop, ask whether to keep (generate report) or discard.
if cancelled {
let keep = ask_yes_no("Generate a report from partial results? [Y/n]");
if !keep {
std::fs::remove_dir_all(&workdir).ok();
write_status(&workdir, "discarded", "");
println!(" 🗑 discarded run {}", workdir.display());
return Ok(out);
}
}
// Final report via Typst (PDF if the `typst` binary is present) + HTML/MD already written.
match harness::report::typst_report(&out.target, &out.findings, &workdir) {
Ok(p) => println!(" [*] report → {}", p.display()),
@@ -353,8 +385,12 @@ pub(crate) fn print_findings(out: &RunOutput) {
println!("\n \x1b[1mAttack path / kill chain\x1b[0m");
print!("{}", harness::attack_graph::ascii_killchain(&out.findings));
}
let toks = token_summary();
if !toks.is_empty() {
println!("\n {toks}");
}
if !out.artifacts.is_empty() {
println!("\n artifacts: {}", out.artifacts.join(", "));
println!(" artifacts: {}", out.artifacts.join(", "));
println!(" (full attack graph rendered in report.html)");
}
}
@@ -372,6 +408,18 @@ fn now_ts() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0)
}
/// Blocking yes/no prompt (default yes). Used after a graceful Ctrl-C.
fn ask_yes_no(q: &str) -> bool {
use std::io::Write;
print!(" {q} ");
std::io::stdout().flush().ok();
let mut s = String::new();
if std::io::stdin().read_line(&mut s).is_err() {
return true;
}
!matches!(s.trim().to_lowercase().as_str(), "n" | "no")
}
// ── Activity-feed renderer ─────────────────────────────────────────────────
// Turns the harness's tagged progress stream into a categorized feed: tool/
// command/file events render as compact cards; everything else as a state line
@@ -379,24 +427,60 @@ fn now_ts() -> u64 {
const RST: &str = "\x1b[0m";
fn render_line(raw: &str) {
let line = raw.trim_end();
let mut line = raw.trim_end();
// Optional "@agent " prefix tags which agent produced the event.
let mut who = String::new();
if let Some(stripped) = line.strip_prefix('@') {
if let Some((label, rest)) = stripped.split_once(' ') {
who = format!("\x1b[2m[{label}]\x1b[0m ");
line = rest;
}
}
let (tag, rest) = match line.split_once(": ") {
Some((t, r)) if matches!(t, "exec" | "danger" | "read" | "edit" | "tool" | "net" | "ai" | "plan") => (t, r),
Some((t, r)) if matches!(t, "exec" | "danger" | "read" | "edit" | "tool" | "net" | "ai" | "plan" | "tokens") => (t, r),
_ => ("", line),
};
match tag {
"exec" => card("⌘ command", rest, "\x1b[33m"),
"danger" => card("⚠ DANGEROUS command", rest, "\x1b[1;31m"),
"read" => state("📄", "reading", rest, "\x1b[34m"),
"edit" => state("✏️", "editing", rest, "\x1b[35m"),
"net" => card("🌐 request", rest, "\x1b[36m"),
"tool" => state("🔧", "tool", rest, "\x1b[35m"),
"ai" => state("💬", "", rest, "\x1b[2m"),
"plan" => state("🧭", "plan", rest, "\x1b[36m"),
"exec" => card(&format!("{who}⌘ command"), rest, "\x1b[33m"),
"danger" => card(&format!("{who}⚠ DANGEROUS command"), rest, "\x1b[1;31m"),
"read" => state("📄", "reading", &format!("{who}{rest}"), "\x1b[34m"),
"edit" => state("✏️", "editing", &format!("{who}{rest}"), "\x1b[35m"),
"net" => card(&format!("{who}🌐 request"), rest, "\x1b[36m"),
"tool" => state("🔧", "tool", &format!("{who}{rest}"), "\x1b[35m"),
"tokens" => { track_tokens(rest); state("🪙", "tokens", &format!("{who}{rest}"), "\x1b[2;33m"); }
"ai" => state("💬", "", &format!("{who}{rest}"), "\x1b[2m"),
"plan" => state("🧭", "plan", &format!("{who}{rest}"), "\x1b[36m"),
_ => render_untagged(line),
}
}
// Running token/cost total across the engagement (shown in the summary).
static TOK_IN: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
static TOK_OUT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
static COST_MILLI: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
fn track_tokens(rest: &str) {
use std::sync::atomic::Ordering::Relaxed;
// parse "in=N out=M cost=$X.XXXX"
for part in rest.split_whitespace() {
if let Some(v) = part.strip_prefix("in=") { TOK_IN.fetch_add(v.parse().unwrap_or(0), Relaxed); }
else if let Some(v) = part.strip_prefix("out=") { TOK_OUT.fetch_add(v.parse().unwrap_or(0), Relaxed); }
else if let Some(v) = part.strip_prefix("cost=$") {
COST_MILLI.fetch_add((v.parse::<f64>().unwrap_or(0.0) * 1000.0) as u64, Relaxed);
}
}
}
/// Render and reset the running token/cost total (called at end of a run).
pub(crate) fn token_summary() -> String {
use std::sync::atomic::Ordering::Relaxed;
let i = TOK_IN.swap(0, Relaxed);
let o = TOK_OUT.swap(0, Relaxed);
let c = COST_MILLI.swap(0, Relaxed) as f64 / 1000.0;
if i == 0 && o == 0 && c == 0.0 { return String::new(); }
format!("🪙 tokens: in={i} out={o} · est. cost ${c:.4}")
}
fn render_untagged(l: &str) {
let low = l.to_lowercase();
if l.starts_with("===") {
+15 -3
View File
@@ -134,6 +134,7 @@ impl ChatClient {
/// **Playwright** (browse, execute JS, screenshot) during execution.
pub async fn chat_cli(
&self,
label: &str,
provider: &str,
model: &str,
system: &str,
@@ -146,9 +147,9 @@ impl ChatClient {
let prompt = format!("{system}\n\n{user}");
// Claude Code can stream structured events (tools, commands, files) which
// we surface live as a categorized activity feed.
// we surface live as a categorized activity feed, attributed to `label`.
if bin == "claude" {
return self.chat_claude_stream(model, &prompt, mcp_config, progress).await;
return self.chat_claude_stream(label, model, &prompt, mcp_config, progress).await;
}
let mut cmd = Command::new(bin);
@@ -213,6 +214,7 @@ impl ChatClient {
/// Tagged events are sent to `progress`; the final assistant text is returned.
async fn chat_claude_stream(
&self,
label: &str,
model: &str,
prompt: &str,
mcp_config: Option<&str>,
@@ -233,9 +235,11 @@ impl ChatClient {
}
let stdout = child.stdout.take().ok_or_else(|| anyhow!("no stdout"))?;
let mut lines = BufReader::new(stdout).lines();
// Tag every streamed event with the agent label so the feed is attributable.
let lbl = if label.is_empty() { String::new() } else { format!("@{label} ") };
let emit = |s: String| {
if let Some(tx) = &progress {
let _ = tx.try_send(s);
let _ = tx.try_send(format!("{lbl}{s}"));
}
};
@@ -271,6 +275,14 @@ impl ChatClient {
if let Some(r) = v.get("result").and_then(|x| x.as_str()) {
result = r.to_string();
}
// Token/cost telemetry from the final result event.
let ti = v.pointer("/usage/input_tokens").and_then(|x| x.as_u64());
let to = v.pointer("/usage/output_tokens").and_then(|x| x.as_u64());
let cost = v.get("total_cost_usd").and_then(|x| x.as_f64());
if ti.is_some() || to.is_some() || cost.is_some() {
emit(format!("tokens: in={} out={} cost=${:.4}",
ti.unwrap_or(0), to.unwrap_or(0), cost.unwrap_or(0.0)));
}
if v.get("is_error").and_then(|x| x.as_bool()).unwrap_or(false) {
had_err = v.get("result").and_then(|x| x.as_str()).unwrap_or("error").to_string();
}
+14 -8
View File
@@ -88,7 +88,7 @@ pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender<Str
"{}".to_string()
} else {
let recon_user = format!("{}{}Target: {}", operator_directives(&cfg), tool_doctrine(pool.mcp_config.is_some()), cfg.target);
match pool.complete_routed(Task::Recon, RECON_SYS, &recon_user).await {
match pool.complete_routed(Task::Recon, "recon", RECON_SYS, &recon_user).await {
Ok((m, t)) => {
let _ = tx.send(format!("recon complete via {}", m.label())).await;
if cfg.verbose {
@@ -159,6 +159,9 @@ pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender<Str
let directives = directives.clone();
let txc = tx.clone();
async move {
if pool.is_cancelled() {
return (ag.name.clone(), String::new(), vec![]);
}
if verbose {
let _ = txc.send(format!(" ▶ launching agent: {} ({})", ag.name, ag.title.replace(" Agent", ""))).await;
}
@@ -174,7 +177,7 @@ pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender<Str
doctrine = tool_doctrine(mcp_on),
body = ag.user.replace("{target}", &target).replace("{recon_json}", &recon),
);
match pool.complete_routed(Task::Exploit, &ag.system, &user).await {
match pool.complete_routed(Task::Exploit, &ag.name, &ag.system, &user).await {
Ok((m, text)) => {
let f = extract_findings(&text, &ag.name);
let _ = txc.send(format!("exploit {} via {}{} candidate(s)", ag.name, m.label(), f.len())).await;
@@ -246,7 +249,7 @@ pub async fn run_whitebox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: S
ag.user.replace("{target}", "the provided repository").replace("{recon_json}", "{}"),
ctx
);
match pool.complete(&ag.system, &user).await {
match pool.complete_routed(Task::Exploit, &ag.name, &ag.system, &user).await {
Ok((m, text)) => {
let f = extract_findings(&text, &ag.name);
let _ = txc.send(format!("analyze {} via {}{} candidate(s)", ag.name, m.label(), f.len())).await;
@@ -283,7 +286,7 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se
let recon = if cfg.offline {
"{}".to_string()
} else {
match pool.complete_routed(Task::Recon, RECON_SYS,
match pool.complete_routed(Task::Recon, "recon", RECON_SYS,
&format!("{}{}Target: {}", operator_directives(&cfg), tool_doctrine(pool.mcp_config.is_some()), cfg.target)).await {
Ok((m, t)) => { let _ = tx.send(format!("recon complete via {}", m.label())).await; t }
Err(e) => { let _ = tx.send(format!("recon failed ({e})")).await; "{}".to_string() }
@@ -310,7 +313,7 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se
where endpoint is file:line.",
ag.user.replace("{target}", "the repository").replace("{recon_json}", "{}"), ctx
);
match pool.complete_routed(Task::Select, &ag.system, &user).await {
match pool.complete_routed(Task::Select, &ag.name, &ag.system, &user).await {
Ok((_, text)) => { let f = extract_findings(&text, &ag.name);
let _ = txc.send(format!("review {}{} lead(s)", ag.name, f.len())).await; f }
Err(_) => vec![],
@@ -371,6 +374,9 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se
let leads = leads_ctx.clone();
let txc = tx.clone();
async move {
if pool.is_cancelled() {
return (ag.name.clone(), String::new(), vec![]);
}
if verbose {
let _ = txc.send(format!(" ▶ launching agent: {} ({})", ag.name, ag.title.replace(" Agent", ""))).await;
}
@@ -383,7 +389,7 @@ pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Se
react = REACT_DOCTRINE, doctrine = tool_doctrine(mcp_on),
body = ag.user.replace("{target}", &target).replace("{recon_json}", &recon),
);
match pool.complete_routed(Task::Exploit, &ag.system, &user).await {
match pool.complete_routed(Task::Exploit, &ag.name, &ag.system, &user).await {
Ok((m, text)) => { let f = extract_findings(&text, &ag.name);
let _ = txc.send(format!("exploit {} via {}{} candidate(s)", ag.name, m.label(), f.len())).await;
(ag.name.clone(), text, f) }
@@ -431,7 +437,7 @@ async fn chain_round(pool: &ModelPool, target: &str, recon: &str, directives: &s
(may be []): {{id,title,severity,cwe,endpoint,payload,evidence,impact,remediation,confidence}}.",
react = REACT_DOCTRINE, doctrine = tool_doctrine(pool.mcp_config.is_some()),
);
match pool.complete_routed(Task::Exploit, CHAIN_SYS, &user).await {
match pool.complete_routed(Task::Exploit, "chain", CHAIN_SYS, &user).await {
Ok((m, text)) => {
let f = extract_findings(&text, "chain");
let _ = tx.send(format!("chain via {}{} new candidate(s)", m.label(), f.len())).await;
@@ -461,7 +467,7 @@ async fn select_agents(pool: &ModelPool, recon: &str, focus: &str, catalog: &[Ag
format!("OPERATOR FOCUS (strongly prioritise agents for this): {focus}\n\n")
};
let user = format!("{focus_line}RECON:\n{recon_trim}\n\nAGENT CATALOG (name — title [cwe]):\n{list}\n\nReturn a JSON array of agent names to run.");
match pool.complete_routed(Task::Select, SELECT_SYS, &user).await {
match pool.complete_routed(Task::Select, "select", SELECT_SYS, &user).await {
Ok((m, text)) => {
let names = parse_string_array(&text);
if names.is_empty() {
+24 -13
View File
@@ -34,6 +34,9 @@ pub struct ModelPool {
/// Progress channel: when set, the subscription CLI streams structured
/// activity (tools called, commands run, files read) here live.
progress: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<String>>>,
/// Cooperative cancellation: when set, in-flight model calls short-circuit
/// and the pipeline stops launching new agents (graceful stop).
cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl ModelPool {
@@ -61,6 +64,7 @@ impl ModelPool {
subscription,
mcp_config,
progress: std::sync::Mutex::new(None),
cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
@@ -76,21 +80,31 @@ impl ModelPool {
self.progress.lock().ok().and_then(|g| g.clone())
}
/// Handle to request graceful cancellation of an in-progress engagement.
pub fn cancel_handle(&self) -> Arc<std::sync::atomic::AtomicBool> {
self.cancel.clone()
}
pub fn is_cancelled(&self) -> bool {
self.cancel.load(std::sync::atomic::Ordering::Relaxed)
}
/// One completion for a model, via subscription CLI (optionally with MCP) or
/// HTTP API, with a short retry/backoff to ride out transient failures
/// (rate limits, MCP cold-starts, network blips).
async fn one(&self, m: &ModelRef, system: &str, user: &str) -> Result<String> {
/// HTTP API, with a short retry/backoff. `label` (e.g. the agent name) tags
/// the streamed activity so each command/tool is attributable.
async fn one(&self, label: &str, m: &ModelRef, system: &str, user: &str) -> Result<String> {
if self.is_cancelled() {
return Err(anyhow!("cancelled"));
}
let use_cli = self.subscription && cli_binary_for(&m.provider).is_some();
let progress = self.progress();
let mut last = anyhow::anyhow!("no attempt");
for attempt in 0..3u64 {
if attempt > 0 {
// 1.5s, 4.5s backoff.
tokio::time::sleep(std::time::Duration::from_millis(1500 * attempt * attempt.max(1))).await;
}
let r = if use_cli {
self.client
.chat_cli(&m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone())
.chat_cli(label, &m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone())
.await
} else {
self.client.chat(m, system, user).await
@@ -104,20 +118,17 @@ impl ModelPool {
}
/// Complete a prompt, trying each candidate model until one succeeds.
/// Returns the model that answered and its text.
pub async fn complete(&self, system: &str, user: &str) -> Result<(ModelRef, String)> {
self.complete_routed(Task::Default, system, user).await
self.complete_routed(Task::Default, "", system, user).await
}
/// Router-aware completion: reorder the candidate panel by task before the
/// failover loop. Recon/triage prefer a fast/cheap model to save tokens and
/// latency; exploitation prefers the strongest (primary) model.
pub async fn complete_routed(&self, task: Task, system: &str, user: &str) -> Result<(ModelRef, String)> {
/// Router-aware completion. `label` tags streamed activity (agent name).
pub async fn complete_routed(&self, task: Task, label: &str, system: &str, user: &str) -> Result<(ModelRef, String)> {
let _permit = self.sem.acquire().await.expect("semaphore closed");
let order = self.route(task);
let mut last = anyhow!("no candidate models");
for m in &order {
match self.one(m, system, user).await {
match self.one(label, m, system, user).await {
Ok(text) => return Ok((m.clone(), text)),
Err(e) => last = e,
}
@@ -166,7 +177,7 @@ impl ModelPool {
Ok(p) => p,
Err(_) => break,
};
if let Ok(text) = self.one(m, system, user).await {
if let Ok(text) = self.one("validate", m, system, user).await {
total += 1;
let t = text.to_lowercase();
if t.contains("\"verdict\": \"confirmed\"")
@@ -119,6 +119,10 @@ pub struct RunConfig {
/// Greybox: a source repository to review alongside the live `target` URL.
#[serde(default)]
pub repo: Option<String>,
/// Explicit agent allowlist. When non-empty, the pipeline runs exactly these
/// agents (skipping recon-based selection) — used by the category picker.
#[serde(default)]
pub pinned: Vec<String>,
}
fn default_vote() -> usize {
@@ -144,6 +148,7 @@ impl RunConfig {
instructions: None,
auth: None,
repo: None,
pinned: Vec::new(),
}
}
}