v3.5.0: structured activity feed — stream Claude tool/command/file events as a categorized REPL conversation

Harness:
- ModelPool gains a progress channel (set_progress); chat_cli forwards it.
- New chat_claude_stream: drives Claude Code with --output-format stream-json and
  parses the event stream live — assistant text, and tool_use blocks categorized
  into tagged events (exec/danger command, read/edit file, net request/browser,
  grep/glob tool). 900s bound; clear error surfacing.
- Wired set_progress into run / whitebox / greybox.

REPL renderer (render_line):
- Tagged events render as the conversation feed: tool/command/network as compact
  CARDS (tool-runner visual), files/edits/AI text/states as iconized lines.
- Clear "what the AI is doing" states: reconning, planning, testing, validating,
  chaining, report, complete — plus a ⚠ DANGEROUS marker for risky commands.
- Untagged harness lines mapped to the same state vocabulary.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
CyberSecurityUP
2026-06-24 21:04:51 -03:00
parent e8df48af9e
commit d864ea8b8a
4 changed files with 222 additions and 15 deletions
+84 -1
View File
@@ -323,7 +323,7 @@ async fn run_mode(base: &Path, mut cfg: RunConfig, mcp: bool, mode: Mode) -> any
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(256);
let printer = tokio::spawn(async move {
while let Some(line) = rx.recv().await {
println!(" [*] {line}");
render_line(&line);
}
});
let out = match mode {
@@ -364,6 +364,89 @@ fn now_ts() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0)
}
// ── Activity-feed renderer ─────────────────────────────────────────────────
// Turns the harness's tagged progress stream into a categorized feed: tool/
// command/file events render as compact cards; everything else as a state line
// with an icon, so it's clear what the AI is doing (no "black box").
const RST: &str = "\x1b[0m";
fn render_line(raw: &str) {
let line = raw.trim_end();
let (tag, rest) = match line.split_once(": ") {
Some((t, r)) if matches!(t, "exec" | "danger" | "read" | "edit" | "tool" | "net" | "ai" | "plan") => (t, r),
_ => ("", line),
};
match tag {
"exec" => card("⌘ command", rest, "\x1b[33m"),
"danger" => card("⚠ DANGEROUS command", rest, "\x1b[1;31m"),
"read" => state("📄", "reading", rest, "\x1b[34m"),
"edit" => state("✏️", "editing", rest, "\x1b[35m"),
"net" => card("🌐 request", rest, "\x1b[36m"),
"tool" => state("🔧", "tool", rest, "\x1b[35m"),
"ai" => state("💬", "", rest, "\x1b[2m"),
"plan" => state("🧭", "plan", rest, "\x1b[36m"),
_ => render_untagged(line),
}
}
fn render_untagged(l: &str) {
let low = l.to_lowercase();
if l.starts_with("===") {
println!("\n\x1b[1;35m▌ {}\x1b[0m", l.trim_matches('=').trim());
} else if low.contains("✓ complete") || low.contains("validated finding(s)") {
println!(" \x1b[1;32m✓\x1b[0m {l}");
} else if low.starts_with("recon") {
state("🔍", "reconning", l.trim_start_matches("recon").trim_start_matches(' '), "\x1b[36m");
} else if low.contains("selected") || low.contains("agent selection") || low.contains("heuristic") {
state("🧭", "planning", l, "\x1b[36m");
} else if low.starts_with("exploit") || low.starts_with("analyze") || low.contains("launching agent") || low.starts_with("review ") {
state("🧪", "testing", l, "\x1b[35m");
} else if low.starts_with("vote") {
if low.contains("confirmed") { state("", "validated", l, "\x1b[32m"); }
else { state("·", "rejected", l, "\x1b[2m"); }
} else if low.starts_with("chain") {
state("🔗", "chaining", l, "\x1b[36m");
} else if low.contains("report") {
state("📄", "report", l, "\x1b[34m");
} else if low.contains("fail") || low.contains("error") || low.starts_with('✗') {
println!(" \x1b[31m✗\x1b[0m {l}");
} else {
println!(" \x1b[2m·\x1b[0m {l}");
}
}
fn state(icon: &str, kind: &str, msg: &str, color: &str) {
let k = if kind.is_empty() { String::new() } else { format!("{color}{kind}{RST} ") };
println!(" {icon} {k}{}", msg.trim());
}
/// Compact card for a tool the AI ran (the "tool runner visual").
fn card(title: &str, body: &str, color: &str) {
let body = body.trim();
let width = body.chars().count().min(72);
let bar = "".repeat(width.max(title.chars().count()) + 2);
println!(" {color}╭─ {title} {}{RST}", "".repeat(bar.len().saturating_sub(title.chars().count() + 3)));
for chunk in wrap(body, 72) {
println!(" {color}{RST} {chunk}");
}
println!(" {color}{}{RST}", bar);
}
fn wrap(s: &str, w: usize) -> Vec<String> {
let mut out = Vec::new();
let mut cur = String::new();
for word in s.split_whitespace() {
if cur.chars().count() + word.chars().count() + 1 > w && !cur.is_empty() {
out.push(std::mem::take(&mut cur));
}
if !cur.is_empty() { cur.push(' '); }
cur.push_str(word);
}
if !cur.is_empty() { out.push(cur); }
if out.is_empty() { out.push(String::new()); }
out
}
fn write_status(workdir: &Path, state: &str, extra: &str) {
let p = workdir.join("status.json");
let _ = std::fs::write(&p, format!("{{\"state\":\"{state}\",\"ts\":{}{}}}", now_ts(),
+117 -13
View File
@@ -2,7 +2,7 @@ use anyhow::{anyhow, Result};
use serde::Serialize;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
/// A model provider exposing an OpenAI-compatible `/chat/completions` endpoint.
@@ -139,24 +139,20 @@ impl ChatClient {
system: &str,
user: &str,
mcp_config: Option<&str>,
progress: Option<tokio::sync::mpsc::Sender<String>>,
) -> Result<String> {
let bin = cli_binary_for(provider)
.ok_or_else(|| anyhow!("no CLI/subscription backend for provider '{}'", provider))?;
let prompt = format!("{system}\n\n{user}");
// Claude Code can stream structured events (tools, commands, files) which
// we surface live as a categorized activity feed.
if bin == "claude" {
return self.chat_claude_stream(model, &prompt, mcp_config, progress).await;
}
let mut cmd = Command::new(bin);
match bin {
// Claude Code headless print mode (uses the Claude subscription login).
// Tool autonomy is always enabled so the agent can use its built-in
// tools (Bash/curl/etc.) to actually probe the target — Playwright MCP
// is an *optional* add-on, not a requirement.
"claude" => {
cmd.arg("-p").arg("--model").arg(model).arg("--dangerously-skip-permissions");
// Required to allow tool autonomy when running as root.
cmd.env("IS_SANDBOX", "1");
if let Some(mcp) = mcp_config {
cmd.arg("--mcp-config").arg(mcp);
}
}
// Codex non-interactive exec (uses the ChatGPT/Codex login), prompt on stdin.
"codex" => {
cmd.arg("exec").arg("--model").arg(model)
@@ -211,6 +207,114 @@ impl ChatClient {
}
Ok(stdout)
}
/// Drive Claude Code with `--output-format stream-json` and surface its
/// activity as a live, categorized feed (states, tools, commands, files).
/// Tagged events are sent to `progress`; the final assistant text is returned.
async fn chat_claude_stream(
&self,
model: &str,
prompt: &str,
mcp_config: Option<&str>,
progress: Option<tokio::sync::mpsc::Sender<String>>,
) -> Result<String> {
let mut cmd = Command::new("claude");
cmd.arg("-p").arg("--model").arg(model)
.arg("--output-format").arg("stream-json").arg("--verbose")
.arg("--dangerously-skip-permissions")
.env("IS_SANDBOX", "1");
if let Some(mcp) = mcp_config {
cmd.arg("--mcp-config").arg(mcp);
}
cmd.stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped()).kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| anyhow!("spawn claude failed: {e}"))?;
if let Some(mut stdin) = child.stdin.take() {
stdin.write_all(prompt.as_bytes()).await?;
}
let stdout = child.stdout.take().ok_or_else(|| anyhow!("no stdout"))?;
let mut lines = BufReader::new(stdout).lines();
let emit = |s: String| {
if let Some(tx) = &progress {
let _ = tx.try_send(s);
}
};
let mut result = String::new();
let mut had_err = String::new();
let read = async {
while let Ok(Some(line)) = lines.next_line().await {
let Ok(v) = serde_json::from_str::<serde_json::Value>(&line) else { continue };
match v.get("type").and_then(|t| t.as_str()) {
Some("assistant") => {
if let Some(content) = v.pointer("/message/content").and_then(|c| c.as_array()) {
for blk in content {
match blk.get("type").and_then(|t| t.as_str()) {
Some("text") => {
if let Some(t) = blk.get("text").and_then(|x| x.as_str()) {
let t = t.trim();
if !t.is_empty() {
emit(format!("ai: {}", truncate(t, 240)));
}
}
}
Some("tool_use") => {
let name = blk.get("name").and_then(|x| x.as_str()).unwrap_or("tool");
let input = blk.get("input");
emit(tool_event(name, input));
}
_ => {}
}
}
}
}
Some("result") => {
if let Some(r) = v.get("result").and_then(|x| x.as_str()) {
result = r.to_string();
}
if v.get("is_error").and_then(|x| x.as_bool()).unwrap_or(false) {
had_err = v.get("result").and_then(|x| x.as_str()).unwrap_or("error").to_string();
}
}
_ => {}
}
}
};
// Bound the whole streamed turn.
if tokio::time::timeout(Duration::from_secs(900), read).await.is_err() {
return Err(anyhow!("claude stream timed out after 900s"));
}
let _ = child.wait().await;
if !had_err.is_empty() && result.is_empty() {
return Err(anyhow!("claude: {}", truncate(&had_err, 240)));
}
if result.is_empty() {
return Err(anyhow!("claude stream produced no result"));
}
Ok(result)
}
}
/// Categorise a Claude tool_use block into a tagged activity-feed event.
fn tool_event(name: &str, input: Option<&serde_json::Value>) -> String {
let s = |k: &str| input.and_then(|i| i.get(k)).and_then(|x| x.as_str()).unwrap_or("");
match name {
"Bash" => {
let c = s("command");
let danger = c.contains("rm -rf") || c.contains("mkfs") || c.contains(":(){")
|| c.contains("dd if=") || c.contains("> /dev/");
format!("{}: {}", if danger { "danger" } else { "exec" }, truncate(c, 200))
}
"Read" => format!("read: {}", s("file_path")),
"Write" | "Edit" => format!("edit: {}", s("file_path")),
"Grep" => format!("tool: grep {}", truncate(s("pattern"), 80)),
"Glob" => format!("tool: glob {}", truncate(s("pattern"), 80)),
"WebFetch" => format!("net: fetch {}", s("url")),
n if n.contains("playwright") || n.contains("browser") => {
let url = s("url");
format!("net: browser {}{}", n.rsplit('_').next().unwrap_or(n), if url.is_empty() { String::new() } else { format!(" {url}") })
}
other => format!("tool: {other}"),
}
}
/// Map a provider to its local agentic CLI binary (subscription backend).
@@ -71,6 +71,7 @@ Base every claim on an actual observed response — never assume. Stop when you'
/// Black-box web engagement: recon → parallel exploit → N-model vote → report.
pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender<String>) -> RunOutput {
pool.set_progress(tx.clone());
let _ = tx
.send(format!(
"Loaded {} agents ({} vuln / {} recon / {} code / {} meta) · models: {} · vote_n={} · concurrency={}{}",
@@ -210,6 +211,7 @@ pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender<Str
/// White-box engagement: analyse a repository's source for vulnerabilities.
pub async fn run_whitebox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender<String>) -> RunOutput {
pool.set_progress(tx.clone());
let _ = tx.send(format!("WHITEBOX · repo: {} · {} code agents · models: {}", cfg.target, lib.code.len(),
pool.candidates.iter().map(|m| m.label()).collect::<Vec<_>>().join(", "))).await;
@@ -272,6 +274,7 @@ pub async fn run_whitebox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: S
/// pipeline — code-review findings become *leads* that guide live exploitation
/// (with credentials/auth so testing is authenticated).
pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender<String>) -> RunOutput {
pool.set_progress(tx.clone());
let repo = cfg.repo.clone().unwrap_or_default();
let _ = tx.send(format!("GREYBOX · live: {} · repo: {} · {} code agents",
cfg.target, repo, lib.code.len())).await;
+18 -1
View File
@@ -31,6 +31,9 @@ pub struct ModelPool {
pub subscription: bool,
/// Path to an `.mcp.json` (Playwright) used on the subscription/CLI path.
pub mcp_config: Option<String>,
/// Progress channel: when set, the subscription CLI streams structured
/// activity (tools called, commands run, files read) here live.
progress: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<String>>>,
}
impl ModelPool {
@@ -57,14 +60,28 @@ impl ModelPool {
},
subscription,
mcp_config,
progress: std::sync::Mutex::new(None),
}
}
/// Attach a progress channel so the subscription CLI streams structured
/// activity (commands run, files read, tools called) live.
pub fn set_progress(&self, tx: tokio::sync::mpsc::Sender<String>) {
if let Ok(mut g) = self.progress.lock() {
*g = Some(tx);
}
}
fn progress(&self) -> Option<tokio::sync::mpsc::Sender<String>> {
self.progress.lock().ok().and_then(|g| g.clone())
}
/// One completion for a model, via subscription CLI (optionally with MCP) or
/// HTTP API, with a short retry/backoff to ride out transient failures
/// (rate limits, MCP cold-starts, network blips).
async fn one(&self, m: &ModelRef, system: &str, user: &str) -> Result<String> {
let use_cli = self.subscription && cli_binary_for(&m.provider).is_some();
let progress = self.progress();
let mut last = anyhow::anyhow!("no attempt");
for attempt in 0..3u64 {
if attempt > 0 {
@@ -73,7 +90,7 @@ impl ModelPool {
}
let r = if use_cli {
self.client
.chat_cli(&m.provider, &m.model, system, user, self.mcp_config.as_deref())
.chat_cli(&m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone())
.await
} else {
self.client.chat(m, system, user).await