diff --git a/neurosploit-rs/app/src/main.rs b/neurosploit-rs/app/src/main.rs index d211d0c..7810a46 100644 --- a/neurosploit-rs/app/src/main.rs +++ b/neurosploit-rs/app/src/main.rs @@ -323,7 +323,7 @@ async fn run_mode(base: &Path, mut cfg: RunConfig, mcp: bool, mode: Mode) -> any let (tx, mut rx) = tokio::sync::mpsc::channel::(256); let printer = tokio::spawn(async move { while let Some(line) = rx.recv().await { - println!(" [*] {line}"); + render_line(&line); } }); let out = match mode { @@ -364,6 +364,89 @@ fn now_ts() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0) } +// ── Activity-feed renderer ───────────────────────────────────────────────── +// Turns the harness's tagged progress stream into a categorized feed: tool/ +// command/file events render as compact cards; everything else as a state line +// with an icon, so it's clear what the AI is doing (no "black box"). +const RST: &str = "\x1b[0m"; + +fn render_line(raw: &str) { + let line = raw.trim_end(); + let (tag, rest) = match line.split_once(": ") { + Some((t, r)) if matches!(t, "exec" | "danger" | "read" | "edit" | "tool" | "net" | "ai" | "plan") => (t, r), + _ => ("", line), + }; + match tag { + "exec" => card("⌘ command", rest, "\x1b[33m"), + "danger" => card("⚠ DANGEROUS command", rest, "\x1b[1;31m"), + "read" => state("📄", "reading", rest, "\x1b[34m"), + "edit" => state("✏️", "editing", rest, "\x1b[35m"), + "net" => card("🌐 request", rest, "\x1b[36m"), + "tool" => state("🔧", "tool", rest, "\x1b[35m"), + "ai" => state("💬", "", rest, "\x1b[2m"), + "plan" => state("🧭", "plan", rest, "\x1b[36m"), + _ => render_untagged(line), + } +} + +fn render_untagged(l: &str) { + let low = l.to_lowercase(); + if l.starts_with("===") { + println!("\n\x1b[1;35m▌ {}\x1b[0m", l.trim_matches('=').trim()); + } else if low.contains("✓ complete") || low.contains("validated finding(s)") { + println!(" \x1b[1;32m✓\x1b[0m {l}"); + } else if low.starts_with("recon") { + state("🔍", "reconning", l.trim_start_matches("recon").trim_start_matches(' '), "\x1b[36m"); + } else if low.contains("selected") || low.contains("agent selection") || low.contains("heuristic") { + state("🧭", "planning", l, "\x1b[36m"); + } else if low.starts_with("exploit") || low.starts_with("analyze") || low.contains("launching agent") || low.starts_with("review ") { + state("🧪", "testing", l, "\x1b[35m"); + } else if low.starts_with("vote") { + if low.contains("confirmed") { state("✓", "validated", l, "\x1b[32m"); } + else { state("·", "rejected", l, "\x1b[2m"); } + } else if low.starts_with("chain") { + state("🔗", "chaining", l, "\x1b[36m"); + } else if low.contains("report") { + state("📄", "report", l, "\x1b[34m"); + } else if low.contains("fail") || low.contains("error") || low.starts_with('✗') { + println!(" \x1b[31m✗\x1b[0m {l}"); + } else { + println!(" \x1b[2m·\x1b[0m {l}"); + } +} + +fn state(icon: &str, kind: &str, msg: &str, color: &str) { + let k = if kind.is_empty() { String::new() } else { format!("{color}{kind}{RST} ") }; + println!(" {icon} {k}{}", msg.trim()); +} + +/// Compact card for a tool the AI ran (the "tool runner visual"). +fn card(title: &str, body: &str, color: &str) { + let body = body.trim(); + let width = body.chars().count().min(72); + let bar = "─".repeat(width.max(title.chars().count()) + 2); + println!(" {color}╭─ {title} {}{RST}", "─".repeat(bar.len().saturating_sub(title.chars().count() + 3))); + for chunk in wrap(body, 72) { + println!(" {color}│{RST} {chunk}"); + } + println!(" {color}╰{}{RST}", bar); +} + +fn wrap(s: &str, w: usize) -> Vec { + let mut out = Vec::new(); + let mut cur = String::new(); + for word in s.split_whitespace() { + if cur.chars().count() + word.chars().count() + 1 > w && !cur.is_empty() { + out.push(std::mem::take(&mut cur)); + } + if !cur.is_empty() { cur.push(' '); } + cur.push_str(word); + } + if !cur.is_empty() { out.push(cur); } + if out.is_empty() { out.push(String::new()); } + out +} + fn write_status(workdir: &Path, state: &str, extra: &str) { let p = workdir.join("status.json"); let _ = std::fs::write(&p, format!("{{\"state\":\"{state}\",\"ts\":{}{}}}", now_ts(), diff --git a/neurosploit-rs/crates/harness/src/models.rs b/neurosploit-rs/crates/harness/src/models.rs index 93b5a6c..a286241 100644 --- a/neurosploit-rs/crates/harness/src/models.rs +++ b/neurosploit-rs/crates/harness/src/models.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use serde::Serialize; use std::process::Stdio; use std::time::Duration; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; /// A model provider exposing an OpenAI-compatible `/chat/completions` endpoint. @@ -139,24 +139,20 @@ impl ChatClient { system: &str, user: &str, mcp_config: Option<&str>, + progress: Option>, ) -> Result { let bin = cli_binary_for(provider) .ok_or_else(|| anyhow!("no CLI/subscription backend for provider '{}'", provider))?; let prompt = format!("{system}\n\n{user}"); + + // Claude Code can stream structured events (tools, commands, files) which + // we surface live as a categorized activity feed. + if bin == "claude" { + return self.chat_claude_stream(model, &prompt, mcp_config, progress).await; + } + let mut cmd = Command::new(bin); match bin { - // Claude Code headless print mode (uses the Claude subscription login). - // Tool autonomy is always enabled so the agent can use its built-in - // tools (Bash/curl/etc.) to actually probe the target — Playwright MCP - // is an *optional* add-on, not a requirement. - "claude" => { - cmd.arg("-p").arg("--model").arg(model).arg("--dangerously-skip-permissions"); - // Required to allow tool autonomy when running as root. - cmd.env("IS_SANDBOX", "1"); - if let Some(mcp) = mcp_config { - cmd.arg("--mcp-config").arg(mcp); - } - } // Codex non-interactive exec (uses the ChatGPT/Codex login), prompt on stdin. "codex" => { cmd.arg("exec").arg("--model").arg(model) @@ -211,6 +207,114 @@ impl ChatClient { } Ok(stdout) } + + /// Drive Claude Code with `--output-format stream-json` and surface its + /// activity as a live, categorized feed (states, tools, commands, files). + /// Tagged events are sent to `progress`; the final assistant text is returned. + async fn chat_claude_stream( + &self, + model: &str, + prompt: &str, + mcp_config: Option<&str>, + progress: Option>, + ) -> Result { + let mut cmd = Command::new("claude"); + cmd.arg("-p").arg("--model").arg(model) + .arg("--output-format").arg("stream-json").arg("--verbose") + .arg("--dangerously-skip-permissions") + .env("IS_SANDBOX", "1"); + if let Some(mcp) = mcp_config { + cmd.arg("--mcp-config").arg(mcp); + } + cmd.stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped()).kill_on_drop(true); + let mut child = cmd.spawn().map_err(|e| anyhow!("spawn claude failed: {e}"))?; + if let Some(mut stdin) = child.stdin.take() { + stdin.write_all(prompt.as_bytes()).await?; + } + let stdout = child.stdout.take().ok_or_else(|| anyhow!("no stdout"))?; + let mut lines = BufReader::new(stdout).lines(); + let emit = |s: String| { + if let Some(tx) = &progress { + let _ = tx.try_send(s); + } + }; + + let mut result = String::new(); + let mut had_err = String::new(); + let read = async { + while let Ok(Some(line)) = lines.next_line().await { + let Ok(v) = serde_json::from_str::(&line) else { continue }; + match v.get("type").and_then(|t| t.as_str()) { + Some("assistant") => { + if let Some(content) = v.pointer("/message/content").and_then(|c| c.as_array()) { + for blk in content { + match blk.get("type").and_then(|t| t.as_str()) { + Some("text") => { + if let Some(t) = blk.get("text").and_then(|x| x.as_str()) { + let t = t.trim(); + if !t.is_empty() { + emit(format!("ai: {}", truncate(t, 240))); + } + } + } + Some("tool_use") => { + let name = blk.get("name").and_then(|x| x.as_str()).unwrap_or("tool"); + let input = blk.get("input"); + emit(tool_event(name, input)); + } + _ => {} + } + } + } + } + Some("result") => { + if let Some(r) = v.get("result").and_then(|x| x.as_str()) { + result = r.to_string(); + } + if v.get("is_error").and_then(|x| x.as_bool()).unwrap_or(false) { + had_err = v.get("result").and_then(|x| x.as_str()).unwrap_or("error").to_string(); + } + } + _ => {} + } + } + }; + // Bound the whole streamed turn. + if tokio::time::timeout(Duration::from_secs(900), read).await.is_err() { + return Err(anyhow!("claude stream timed out after 900s")); + } + let _ = child.wait().await; + if !had_err.is_empty() && result.is_empty() { + return Err(anyhow!("claude: {}", truncate(&had_err, 240))); + } + if result.is_empty() { + return Err(anyhow!("claude stream produced no result")); + } + Ok(result) + } +} + +/// Categorise a Claude tool_use block into a tagged activity-feed event. +fn tool_event(name: &str, input: Option<&serde_json::Value>) -> String { + let s = |k: &str| input.and_then(|i| i.get(k)).and_then(|x| x.as_str()).unwrap_or(""); + match name { + "Bash" => { + let c = s("command"); + let danger = c.contains("rm -rf") || c.contains("mkfs") || c.contains(":(){") + || c.contains("dd if=") || c.contains("> /dev/"); + format!("{}: {}", if danger { "danger" } else { "exec" }, truncate(c, 200)) + } + "Read" => format!("read: {}", s("file_path")), + "Write" | "Edit" => format!("edit: {}", s("file_path")), + "Grep" => format!("tool: grep {}", truncate(s("pattern"), 80)), + "Glob" => format!("tool: glob {}", truncate(s("pattern"), 80)), + "WebFetch" => format!("net: fetch {}", s("url")), + n if n.contains("playwright") || n.contains("browser") => { + let url = s("url"); + format!("net: browser {}{}", n.rsplit('_').next().unwrap_or(n), if url.is_empty() { String::new() } else { format!(" {url}") }) + } + other => format!("tool: {other}"), + } } /// Map a provider to its local agentic CLI binary (subscription backend). diff --git a/neurosploit-rs/crates/harness/src/pipeline.rs b/neurosploit-rs/crates/harness/src/pipeline.rs index fa6b74a..facd3f9 100644 --- a/neurosploit-rs/crates/harness/src/pipeline.rs +++ b/neurosploit-rs/crates/harness/src/pipeline.rs @@ -71,6 +71,7 @@ Base every claim on an actual observed response — never assume. Stop when you' /// Black-box web engagement: recon → parallel exploit → N-model vote → report. pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender) -> RunOutput { + pool.set_progress(tx.clone()); let _ = tx .send(format!( "Loaded {} agents ({} vuln / {} recon / {} code / {} meta) · models: {} · vote_n={} · concurrency={}{}", @@ -210,6 +211,7 @@ pub async fn run(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender) -> RunOutput { + pool.set_progress(tx.clone()); let _ = tx.send(format!("WHITEBOX · repo: {} · {} code agents · models: {}", cfg.target, lib.code.len(), pool.candidates.iter().map(|m| m.label()).collect::>().join(", "))).await; @@ -272,6 +274,7 @@ pub async fn run_whitebox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: S /// pipeline — code-review findings become *leads* that guide live exploitation /// (with credentials/auth so testing is authenticated). pub async fn run_greybox(cfg: RunConfig, lib: &Library, pool: &ModelPool, tx: Sender) -> RunOutput { + pool.set_progress(tx.clone()); let repo = cfg.repo.clone().unwrap_or_default(); let _ = tx.send(format!("GREYBOX · live: {} · repo: {} · {} code agents", cfg.target, repo, lib.code.len())).await; diff --git a/neurosploit-rs/crates/harness/src/pool.rs b/neurosploit-rs/crates/harness/src/pool.rs index 299c4d7..459ee89 100644 --- a/neurosploit-rs/crates/harness/src/pool.rs +++ b/neurosploit-rs/crates/harness/src/pool.rs @@ -31,6 +31,9 @@ pub struct ModelPool { pub subscription: bool, /// Path to an `.mcp.json` (Playwright) used on the subscription/CLI path. pub mcp_config: Option, + /// Progress channel: when set, the subscription CLI streams structured + /// activity (tools called, commands run, files read) here live. + progress: std::sync::Mutex>>, } impl ModelPool { @@ -57,14 +60,28 @@ impl ModelPool { }, subscription, mcp_config, + progress: std::sync::Mutex::new(None), } } + /// Attach a progress channel so the subscription CLI streams structured + /// activity (commands run, files read, tools called) live. + pub fn set_progress(&self, tx: tokio::sync::mpsc::Sender) { + if let Ok(mut g) = self.progress.lock() { + *g = Some(tx); + } + } + + fn progress(&self) -> Option> { + self.progress.lock().ok().and_then(|g| g.clone()) + } + /// One completion for a model, via subscription CLI (optionally with MCP) or /// HTTP API, with a short retry/backoff to ride out transient failures /// (rate limits, MCP cold-starts, network blips). async fn one(&self, m: &ModelRef, system: &str, user: &str) -> Result { let use_cli = self.subscription && cli_binary_for(&m.provider).is_some(); + let progress = self.progress(); let mut last = anyhow::anyhow!("no attempt"); for attempt in 0..3u64 { if attempt > 0 { @@ -73,7 +90,7 @@ impl ModelPool { } let r = if use_cli { self.client - .chat_cli(&m.provider, &m.model, system, user, self.mcp_config.as_deref()) + .chat_cli(&m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone()) .await } else { self.client.chat(m, system, user).await