From 49dde7c637ed50268722e1f6d8ec7b9658261968 Mon Sep 17 00:00:00 2001 From: CyberSecurityUP Date: Thu, 25 Jun 2026 00:41:22 -0300 Subject: [PATCH] feat(repl): pause-on-exhaustion + live findings checkpoint + instant stop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Token/quota exhaustion no longer silently drops agents. When every candidate model is rate-limited / out of quota, the run PARKS (keeping all state) and prints "⏸ token/quota exhausted … PAUSED". The user can: - wait for renewal and /continue (retry same model), or - /model (or the /model selector) then /continue to switch. Implemented via ModelPool: is_exhaustion() detection, park_exhausted() that awaits a resume Notify, and a fallback-model slot tried first on retry. /model queues the chosen models into a paused run's fallback so a plain /continue resumes on them. Findings now survive a crash/quit: each finding is checkpointed live to .neurosploit/active_run.json; on next launch an interrupted run is recovered into /runs (a raw report is materialized) so /results, /finding and /report keep working. /stop now actually halts immediately on raw/discard: one() races the in-flight model call against the hard-cancel flag, so the CLI child (kill_on_drop) is terminated at once instead of finishing its whole command sequence. The validate path still soft-stops (lets validation run). Docs: TUTORIAL documents the 3-way /stop, crash recovery and pause/continue; /help lists /continue and the new behaviors. Co-Authored-By: Claude Opus 4.8 (1M context) --- TUTORIAL.md | 29 +++- neurosploit-rs/app/src/main.rs | 11 +- neurosploit-rs/app/src/repl.rs | 112 ++++++++++++++- neurosploit-rs/crates/harness/src/pool.rs | 158 ++++++++++++++++++++-- 4 files changed, 283 insertions(+), 27 deletions(-) diff --git a/TUTORIAL.md b/TUTORIAL.md index 689e8ce..4989c60 100644 --- a/TUTORIAL.md +++ b/TUTORIAL.md @@ -305,11 +305,27 @@ prompt — you keep typing while it streams live above the prompt. While it runs - **`/status`** — live phase, a **progress bar** (agents done / total), elapsed time, token/cost and the possible findings so far. -- **`/stop`** — gracefully stop (a report is still generated from partial results). +- **`/stop`** — stop with a 3-way choice: **[1]** validate the findings found so + far, then report · **[2]** raw report **now** without validating · **[3]** + discard. Choices 2 and 3 abort in-flight agents immediately (running commands + are killed); choice 1 stops launching new agents but lets validation finish. - Findings are color-coded by severity (Critical = red … Info = grey), and a confirmed vote shows green ✓. - When it finishes you get `◀ run #n done — N validated finding(s) · /results n · /report n`. +**Findings survive a crash/quit.** Every finding is checkpointed live to +`.neurosploit/active_run.json`. If the REPL is closed (or crashes) mid-run, the +next launch recovers them into `/runs` automatically (`↻ recovered interrupted +run …`), so `/results`, `/finding` and `/report` still work. + +**If your tokens/quota run out, the run pauses instead of dying.** When every +candidate model is rate-limited/out of quota, the run **parks** (keeping all +state) and prints `⏸ token/quota exhausted … PAUSED`. Then either: + +- wait for your quota to renew and type **`/continue`** to retry the same model, or +- switch model first — **`/model `** (or `/model` for the + arrow-select menu) — then **`/continue`** to resume on the new model. + (When stdin is piped/non-interactive, `/run` falls back to blocking mode.) --- @@ -416,13 +432,16 @@ When you launch the REPL in a project directory, NeuroSploit creates ``` .neurosploit/ - session.json # your config (models, target, repo, auth, focus) - runs.json # run history (for /runs, /results, /report, /diff, /retest) - history.txt # command history (↑/↓) + session.json # your config (models, target, repo, auth, focus) + runs.json # run history (for /runs, /results, /report, /diff, /retest) + active_run.json # live checkpoint of an in-flight run (auto-recovered if interrupted) + history.txt # command history (↑/↓) ``` Close and reopen in the same folder → it **resumes** automatically -(`↻ resumed project session`). No database needed — it's structured state. +(`↻ resumed project session`). If a run was interrupted mid-flight, its +checkpointed findings are recovered into `/runs` (`↻ recovered interrupted run`). +No database needed — it's structured state. --- diff --git a/neurosploit-rs/app/src/main.rs b/neurosploit-rs/app/src/main.rs index aa8af72..9156de4 100644 --- a/neurosploit-rs/app/src/main.rs +++ b/neurosploit-rs/app/src/main.rs @@ -360,6 +360,12 @@ pub(crate) struct Spawned { pub rx: tokio::sync::mpsc::Receiver, pub cancel: std::sync::Arc, pub soft: std::sync::Arc, + /// Set when the run is parked on token/quota exhaustion (awaiting /continue). + pub paused: std::sync::Arc, + /// Wakes a parked run when the user runs /continue. + pub resume: std::sync::Arc, + /// Fallback models pushed by /continue before resuming. + pub fallback: std::sync::Arc>>, pub workdir: PathBuf, } @@ -410,6 +416,9 @@ pub(crate) fn spawn_engagement(base: &Path, mut cfg: RunConfig, mcp: bool, mode: let pool = ModelPool::with_auth(refs, cfg.concurrency, cfg.subscription, mcp_config); let cancel = pool.cancel_handle(); let soft = pool.soft_handle(); + let paused = pool.pause_handle(); + let resume = pool.resume_handle(); + let fallback = pool.fallback_handle(); let (tx, rx) = tokio::sync::mpsc::channel::(256); let task = tokio::spawn(async move { match mode { @@ -419,7 +428,7 @@ pub(crate) fn spawn_engagement(base: &Path, mut cfg: RunConfig, mcp: bool, mode: Mode::Black => harness::run(cfg, &lib, &pool, tx).await, } }); - Spawned { task, rx, cancel, soft, workdir } + Spawned { task, rx, cancel, soft, paused, resume, fallback, workdir } } /// Absolute file:// URL of a run's report (PDF if present, else HTML). diff --git a/neurosploit-rs/app/src/repl.rs b/neurosploit-rs/app/src/repl.rs index edafad7..6ecc8ef 100644 --- a/neurosploit-rs/app/src/repl.rs +++ b/neurosploit-rs/app/src/repl.rs @@ -6,7 +6,7 @@ //! models, target/repo/auth/instructions, run history, and reports. use dialoguer::{theme::ColorfulTheme, MultiSelect}; -use harness::{agents, types::Finding, types::RunConfig}; +use harness::{agents, models::ModelRef, types::Finding, types::RunConfig}; use rustyline::completion::{Completer, Pair}; use rustyline::error::ReadlineError; use rustyline::highlight::Highlighter; @@ -48,7 +48,9 @@ impl RunLive { } fn ingest(&mut self, line: &str) { let low = line.to_lowercase(); - if low.contains("recon complete") { self.phase = "recon".into(); } + if low.contains("token/quota exhausted") || low.contains("run is paused") { self.phase = "paused (quota)".into(); } + else if low.contains("resumed — retrying") { self.phase = "exploiting".into(); } + else if low.contains("recon complete") { self.phase = "recon".into(); } else if low.contains("selected") && low.contains("agent") { self.phase = "planning".into(); if let Some(n) = line.split_whitespace().find_map(|t| t.parse::().ok()) { self.agents = n; } @@ -93,13 +95,31 @@ struct ActiveRun { soft: Arc, done: Arc, choice: Arc>, + /// Set when the run is parked on token/quota exhaustion (awaiting /continue). + paused: Arc, + /// Wakes the parked run when the user runs /continue. + resume: Arc, + /// Fallback models to try first, pushed by /continue . + fallback: Arc>>, +} + +/// On-disk checkpoint of an in-flight run's findings/commands, written live so a +/// run survives quitting/crashing — recovered into /runs on the next launch. +#[derive(Serialize, Deserialize, Clone, Default)] +struct LiveCheckpoint { + target: String, + mode: String, + phase: String, + workdir: String, + findings: Vec, + commands: Vec, } /// All slash-commands, for Tab completion. const COMMANDS: &[&str] = &[ "/help", "/show", "/config", "/providers", "/model", "/key", "/sub", "/target", "/repo", "/auth", "/creds", "/focus", "/attach", "/context", "/mcp", "/offline", - "/votes", "/agents", "/theme", "/clear", "/run", "/stop", "/runs", "/results", "/report", + "/votes", "/agents", "/theme", "/clear", "/run", "/stop", "/continue", "/runs", "/results", "/report", "/status", "/diff", "/retest", "/quit", ]; @@ -294,8 +314,26 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> { let history: Arc>> = Arc::new(Mutex::new(load_runs(base))); let past = history.lock().unwrap().len(); if resumed || past > 0 { - println!(" ↻ resumed project session from {} — {} past run(s)\n", proj_dir().display(), past); + println!(" ↻ resumed project session from {} — {} past run(s)", proj_dir().display(), past); } + // Recover an interrupted run (REPL was quit/crashed mid-engagement): its + // live findings were checkpointed to disk — fold them into /runs so + // /results, /finding and /report still work. + if let Some(cp) = load_checkpoint() { + if !cp.findings.is_empty() { + let wd = std::path::PathBuf::from(&cp.workdir); + std::fs::create_dir_all(&wd).ok(); + crate::report_raw(&cp.target, &cp.findings, &wd); // materialize a report so /report works + let mut h = history.lock().unwrap(); + let id = h.len() + 1; + h.push(RunRecord { id, mode: cp.mode.clone(), target: cp.target.clone(), workdir: cp.workdir.clone(), findings: cp.findings.clone() }); + save_runs(base, &h); + println!(" \x1b[1;33m↻ recovered interrupted run on {} — {} finding(s) saved as run #{}\x1b[0m (/results {id} · /report {id})", + cp.target, cp.findings.len(), id); + } + clear_checkpoint(); + } + println!(); let mut reader = Reader::new(base); let mut active: Option = None; show(&s); @@ -332,6 +370,15 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> { s.models = arg.split([',', ' ']).filter(|x| !x.is_empty()).map(String::from).collect(); println!(" models: {}", s.models.join(", ")); } + // If a run is paused on exhaustion, queue the newly-chosen models + // as its fallback so a plain /continue picks them up. + if let Some(a) = &active { + if a.paused.load(Ordering::Relaxed) { + let mut fb = a.fallback.lock().unwrap(); + for id in &s.models { fb.push(ModelRef::parse(id)); } + println!(" \x1b[2m↪ queued for the paused run — /continue to resume on these model(s)\x1b[0m"); + } + } } "/key" => key_cmd(&mut s, arg, &mut reader), "/sub" | "/subscription" => { @@ -414,6 +461,23 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> { _ => println!(" no active run."), } } + "/continue" | "/resume" => { + match &active { + Some(a) if a.paused.load(Ordering::Relaxed) => { + if !arg.is_empty() { + let m = ModelRef::parse(arg); + println!(" \x1b[1;35m▶ resuming with fallback model\x1b[0m {}:{}", m.provider, m.model); + a.fallback.lock().unwrap().push(m); + } else { + println!(" \x1b[1;35m▶ resuming\x1b[0m — retrying with the current model(s)."); + } + a.paused.store(false, Ordering::Relaxed); + a.resume.notify_waiters(); + } + Some(a) if !a.done.load(Ordering::Relaxed) => println!(" run is not paused — it's still working. /status to check."), + _ => println!(" no paused run. (a run pauses automatically if your tokens/quota run out)"), + } + } "/runs" | "/history" => list_runs(&history.lock().unwrap()), "/diff" | "/changed" => diff_runs(&history.lock().unwrap()), "/retest" => { @@ -478,6 +542,9 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> { let sev = if by.is_empty() { "0".into() } else { by.iter().map(|(k, v)| format!("{k}:{v}")).collect::>().join(" ") }; println!(" \x1b[1m▶ live\x1b[0m {} ({}) · phase {} · {:02}:{:02} · {} possible finding(s) [{}]", l.target, l.mode, l.phase, el / 60, el % 60, l.findings.len(), sev); + if a.paused.load(Ordering::Relaxed) { + println!(" \x1b[1;33m⏸ PAUSED — token/quota exhausted. /continue to resume, or /model then /continue to switch.\x1b[0m"); + } if l.agents > 0 { println!(" progress \x1b[36m{}\x1b[0m", l.bar(24)); } for (sv, t) in l.findings.iter().rev().take(5) { println!(" ✦ [{sv}] {t}"); } } @@ -661,21 +728,40 @@ async fn start_background(base: &Path, s: &Session, reader: &mut Reader, })); let cancel = sp.cancel.clone(); let soft = sp.soft.clone(); + let paused = sp.paused.clone(); + let resume = sp.resume.clone(); + let fallback = sp.fallback.clone(); let done = Arc::new(AtomicBool::new(false)); let choice = Arc::new(Mutex::new(StopMode::Run)); let (live2, done2, hist2, choice2) = (live.clone(), done.clone(), history, choice.clone()); tokio::spawn(async move { let crate::Spawned { task, mut rx, workdir, .. } = sp; + let mut last_saved = 0usize; while let Some(line) = rx.recv().await { live2.lock().unwrap().ingest(&line); if let Some(out) = crate::render_compact(&line) { let _ = printer.print(out); } + // Checkpoint live findings to disk whenever a new one lands, so the + // run survives a quit/crash and is recovered on next launch. + let snap = { + let l = live2.lock().unwrap(); + if l.full.len() != last_saved { + last_saved = l.full.len(); + Some(LiveCheckpoint { + target: l.target.clone(), mode: l.mode.into(), phase: l.phase.clone(), + workdir: workdir.display().to_string(), + findings: l.full.clone(), commands: l.commands.clone(), + }) + } else { None } + }; + if let Some(c) = snap { save_checkpoint(&c); } } let task_out = task.await.unwrap_or_default(); let mode_choice = *choice2.lock().unwrap(); if mode_choice == StopMode::Discard { std::fs::remove_dir_all(&workdir).ok(); + clear_checkpoint(); let _ = printer.print(format!("\x1b[33m🗑 run discarded — {}\x1b[0m", workdir.display())); done2.store(true, Ordering::Relaxed); return; @@ -698,13 +784,14 @@ async fn start_background(base: &Path, s: &Session, reader: &mut Reader, if let Ok(j) = serde_json::to_string_pretty(&*h) { std::fs::write(proj_dir().join("runs.json"), j).ok(); } id }; + clear_checkpoint(); // run is now a completed RunRecord let _ = printer.print(format!( "\x1b[1;32m◀ run #{id} done — {} {} finding(s)\x1b[0m · /results {id} · /finding", findings.len(), validated_word)); let _ = printer.print(format!("\x1b[36m report: {}\x1b[0m", crate::report_url(&workdir))); done2.store(true, Ordering::Relaxed); }); - Some(ActiveRun { live, cancel, soft, done, choice }) + Some(ActiveRun { live, cancel, soft, done, choice, paused, resume, fallback }) } /// Project-local store: `/.neurosploit/` so each project keeps its own @@ -726,6 +813,16 @@ fn save_runs(_base: &Path, history: &[RunRecord]) { if let Ok(j) = serde_json::to_string_pretty(history) { std::fs::write(p, j).ok(); } } +/// Live-run checkpoint file (one in-flight run at a time). +fn checkpoint_path() -> std::path::PathBuf { proj_dir().join("active_run.json") } +fn save_checkpoint(c: &LiveCheckpoint) { + if let Ok(j) = serde_json::to_string_pretty(c) { std::fs::write(checkpoint_path(), j).ok(); } +} +fn clear_checkpoint() { std::fs::remove_file(checkpoint_path()).ok(); } +fn load_checkpoint() -> Option { + std::fs::read_to_string(checkpoint_path()).ok().and_then(|t| serde_json::from_str(&t).ok()) +} + /// Persistable snapshot of the session config (resume across restarts). #[derive(Serialize, Deserialize, Default)] struct Snapshot { @@ -939,7 +1036,8 @@ fn help() { println!("\n \x1b[2mRUN & MONITOR\x1b[0m"); h("/run", "launch (runs in the BACKGROUND — keep typing)"); h("/status", "live progress + findings while running (or a past run #)"); - h("/stop", "gracefully stop the active run"); + h("/stop", "stop: [1] validate+report [2] raw report now [3] discard"); + h("/continue", "resume a run paused on token/quota (change /model first to switch)"); h("/runs", "list runs · /results [n] · /report [n]"); h("/diff /retest [n]", "what changed vs last run · re-verify a past run"); @@ -949,6 +1047,8 @@ fn help() { h("/theme color|mono", "/show (config) /clear /quit"); println!("\n \x1b[2mMODES — black-box: set /target · white-box: set /repo · grey-box: set BOTH /repo + /target · host: /target + /creds\x1b[0m"); + println!(" \x1b[2mFindings are checkpointed live to .neurosploit/ — quit/crash mid-run and they're recovered into /runs next launch.\x1b[0m"); + println!(" \x1b[2mIf tokens/quota run out the run PAUSES (state kept) — /continue to resume, or switch with /model then /continue.\x1b[0m"); println!(" \x1b[2m↑/↓ history · Tab completes commands & @paths · Ctrl-A/E/K edit · Ctrl-O full cmd · \\ for multiline\x1b[0m\n"); } diff --git a/neurosploit-rs/crates/harness/src/pool.rs b/neurosploit-rs/crates/harness/src/pool.rs index 469f70f..b553f93 100644 --- a/neurosploit-rs/crates/harness/src/pool.rs +++ b/neurosploit-rs/crates/harness/src/pool.rs @@ -1,7 +1,24 @@ use crate::models::{cli_binary_for, ChatClient, ModelRef}; use anyhow::{anyhow, Result}; -use std::sync::Arc; -use tokio::sync::Semaphore; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::sync::{Notify, Semaphore}; + +/// Does this error look like token/quota/rate-limit exhaustion (as opposed to a +/// transient network blip)? Used to PAUSE the run instead of silently dropping +/// the agent, so the user can /continue (wait for renewal) or switch model. +pub fn is_exhaustion(e: &anyhow::Error) -> bool { + let s = format!("{e:#}").to_lowercase(); + [ + "rate limit", "rate_limit", "ratelimit", "429", "too many requests", + "quota", "insufficient_quota", "insufficient quota", "out of credit", + "credit balance", "billing", "exhausted", "overloaded", "capacity", + "usage limit", "resource_exhausted", "resource exhausted", + ] + .iter() + .any(|k| s.contains(k)) +} /// Task type used by the model router to pick the best model for the step. #[derive(Clone, Copy, Debug)] @@ -39,6 +56,14 @@ pub struct ModelPool { /// SOFT stop: stop launching new EXPLOIT agents, but let in-flight finish and /// VALIDATION still run — so "stop and validate what was found" works. soft: std::sync::Arc, + /// PAUSE: set when every candidate model is token/quota-exhausted. The run + /// parks (keeping all state) until the user runs /continue. + paused: Arc, + /// Wakes the parked task when the user runs /continue. + resume: Arc, + /// Fallback models the user added via `/continue ` while + /// paused — tried first on the next attempt. + fallback: Arc>>, } impl ModelPool { @@ -68,6 +93,9 @@ impl ModelPool { progress: std::sync::Mutex::new(None), cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)), soft: Arc::new(std::sync::atomic::AtomicBool::new(false)), + paused: Arc::new(AtomicBool::new(false)), + resume: Arc::new(Notify::new()), + fallback: Arc::new(Mutex::new(Vec::new())), } } @@ -101,6 +129,52 @@ impl ModelPool { || self.soft.load(std::sync::atomic::Ordering::Relaxed) } + /// Handle to the PAUSE flag (observe whether the run is parked on exhaustion). + pub fn pause_handle(&self) -> Arc { + self.paused.clone() + } + /// Handle used by the REPL to wake a parked run (`/continue`). + pub fn resume_handle(&self) -> Arc { + self.resume.clone() + } + /// Slot the REPL pushes a fallback model into before resuming + /// (`/continue `). + pub fn fallback_handle(&self) -> Arc>> { + self.fallback.clone() + } + pub fn is_paused(&self) -> bool { + self.paused.load(Ordering::Relaxed) + } + + /// Park the run on token/quota exhaustion: keep ALL state, emit a notice, + /// and wait until the user runs `/continue` (or cancels). Returns when the + /// run should retry (pause cleared) or give up (cancelled). + async fn park_exhausted(&self, err: &anyhow::Error) { + self.paused.store(true, Ordering::Relaxed); + if let Some(tx) = self.progress() { + let msg = format!("{err:#}"); + let short = msg.lines().next().unwrap_or(&msg); + let _ = tx + .send(format!( + "notify: ⏸ token/quota exhausted ({}). Run is PAUSED — type /continue when your quota renews, or switch with /model then /continue.", + short.chars().take(120).collect::() + )) + .await; + } + while self.paused.load(Ordering::Relaxed) && !self.is_cancelled() { + let notified = self.resume.notified(); + tokio::select! { + _ = notified => {} + _ = tokio::time::sleep(Duration::from_millis(500)) => {} + } + } + if !self.is_cancelled() { + if let Some(tx) = self.progress() { + let _ = tx.send("notify: ▶ resumed — retrying exhausted step.".to_string()).await; + } + } + } + /// One completion for a model, via subscription CLI (optionally with MCP) or /// HTTP API, with a short retry/backoff. `label` (e.g. the agent name) tags /// the streamed activity so each command/tool is attributable. @@ -112,18 +186,35 @@ impl ModelPool { let progress = self.progress(); let mut last = anyhow::anyhow!("no attempt"); for attempt in 0..3u64 { + if self.is_cancelled() { + return Err(anyhow!("cancelled")); + } if attempt > 0 { tokio::time::sleep(std::time::Duration::from_millis(1500 * attempt * attempt.max(1))).await; } - let r = if use_cli { - self.client - .chat_cli(label, &m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone()) - .await - } else { - self.client.chat(m, system, user).await + let call = async { + if use_cli { + self.client + .chat_cli(label, &m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone()) + .await + } else { + self.client.chat(m, system, user).await + } + }; + // Race the in-flight call against a HARD cancel: when the user picks + // "report raw" / "discard" on /stop, drop the call future so the + // CLI child (spawned with kill_on_drop) is terminated immediately + // instead of finishing its whole command sequence. + let r = tokio::select! { + biased; + _ = wait_cancelled(&self.cancel) => return Err(anyhow!("cancelled")), + r = call => r, }; match r { Ok(t) => return Ok(t), + // Don't burn retries on exhaustion — surface it so the caller + // can park and let the user /continue. + Err(e) if is_exhaustion(&e) => return Err(e), Err(e) => last = e, } } @@ -138,15 +229,44 @@ impl ModelPool { /// Router-aware completion. `label` tags streamed activity (agent name). pub async fn complete_routed(&self, task: Task, label: &str, system: &str, user: &str) -> Result<(ModelRef, String)> { let _permit = self.sem.acquire().await.expect("semaphore closed"); - let order = self.route(task); - let mut last = anyhow!("no candidate models"); - for m in &order { - match self.one(label, m, system, user).await { - Ok(text) => return Ok((m.clone(), text)), - Err(e) => last = e, + loop { + if self.is_cancelled() { + return Err(anyhow!("cancelled")); } + // User-supplied fallback models (via /continue) are tried first. + let mut order = self.route(task); + if let Ok(fb) = self.fallback.lock() { + for m in fb.iter().rev() { + if !order.iter().any(|o| o.provider == m.provider && o.model == m.model) { + order.insert(0, m.clone()); + } + } + } + let mut last = anyhow!("no candidate models"); + let mut exhausted = false; + for m in &order { + if self.is_cancelled() { + return Err(anyhow!("cancelled")); + } + match self.one(label, m, system, user).await { + Ok(text) => return Ok((m.clone(), text)), + Err(e) => { + if is_exhaustion(&e) { + exhausted = true; + } + last = e; + } + } + } + // Every candidate failed. If it was token/quota exhaustion, park the + // run until the user runs /continue, then retry the whole order (now + // including any fallback model they added). Otherwise, give up. + if exhausted && !self.is_cancelled() { + self.park_exhausted(&last).await; + continue; + } + return Err(last); } - Err(last) } /// Reorder candidates for a task. With a single-model panel this is a no-op. @@ -205,3 +325,11 @@ impl ModelPool { (confirmed, total) } } + +/// Resolve once the HARD-cancel flag flips. Lets `tokio::select!` race an +/// in-flight model call against cancellation and drop it on the spot. +async fn wait_cancelled(flag: &Arc) { + while !flag.load(Ordering::Relaxed) { + tokio::time::sleep(Duration::from_millis(120)).await; + } +}