mirror of
https://github.com/CyberSecurityUP/NeuroSploit.git
synced 2026-06-29 23:05:30 +02:00
feat(repl): pause-on-exhaustion + live findings checkpoint + instant stop
Token/quota exhaustion no longer silently drops agents. When every candidate model is rate-limited / out of quota, the run PARKS (keeping all state) and prints "⏸ token/quota exhausted … PAUSED". The user can: - wait for renewal and /continue (retry same model), or - /model <provider:model> (or the /model selector) then /continue to switch. Implemented via ModelPool: is_exhaustion() detection, park_exhausted() that awaits a resume Notify, and a fallback-model slot tried first on retry. /model queues the chosen models into a paused run's fallback so a plain /continue resumes on them. Findings now survive a crash/quit: each finding is checkpointed live to .neurosploit/active_run.json; on next launch an interrupted run is recovered into /runs (a raw report is materialized) so /results, /finding and /report keep working. /stop now actually halts immediately on raw/discard: one() races the in-flight model call against the hard-cancel flag, so the CLI child (kill_on_drop) is terminated at once instead of finishing its whole command sequence. The validate path still soft-stops (lets validation run). Docs: TUTORIAL documents the 3-way /stop, crash recovery and pause/continue; /help lists /continue and the new behaviors. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+24
-5
@@ -305,11 +305,27 @@ prompt — you keep typing while it streams live above the prompt. While it runs
|
||||
|
||||
- **`/status`** — live phase, a **progress bar** (agents done / total), elapsed
|
||||
time, token/cost and the possible findings so far.
|
||||
- **`/stop`** — gracefully stop (a report is still generated from partial results).
|
||||
- **`/stop`** — stop with a 3-way choice: **[1]** validate the findings found so
|
||||
far, then report · **[2]** raw report **now** without validating · **[3]**
|
||||
discard. Choices 2 and 3 abort in-flight agents immediately (running commands
|
||||
are killed); choice 1 stops launching new agents but lets validation finish.
|
||||
- Findings are color-coded by severity (Critical = red … Info = grey), and a
|
||||
confirmed vote shows green ✓.
|
||||
- When it finishes you get `◀ run #n done — N validated finding(s) · /results n · /report n`.
|
||||
|
||||
**Findings survive a crash/quit.** Every finding is checkpointed live to
|
||||
`.neurosploit/active_run.json`. If the REPL is closed (or crashes) mid-run, the
|
||||
next launch recovers them into `/runs` automatically (`↻ recovered interrupted
|
||||
run …`), so `/results`, `/finding` and `/report` still work.
|
||||
|
||||
**If your tokens/quota run out, the run pauses instead of dying.** When every
|
||||
candidate model is rate-limited/out of quota, the run **parks** (keeping all
|
||||
state) and prints `⏸ token/quota exhausted … PAUSED`. Then either:
|
||||
|
||||
- wait for your quota to renew and type **`/continue`** to retry the same model, or
|
||||
- switch model first — **`/model <provider:model>`** (or `/model` for the
|
||||
arrow-select menu) — then **`/continue`** to resume on the new model.
|
||||
|
||||
(When stdin is piped/non-interactive, `/run` falls back to blocking mode.)
|
||||
|
||||
---
|
||||
@@ -416,13 +432,16 @@ When you launch the REPL in a project directory, NeuroSploit creates
|
||||
|
||||
```
|
||||
.neurosploit/
|
||||
session.json # your config (models, target, repo, auth, focus)
|
||||
runs.json # run history (for /runs, /results, /report, /diff, /retest)
|
||||
history.txt # command history (↑/↓)
|
||||
session.json # your config (models, target, repo, auth, focus)
|
||||
runs.json # run history (for /runs, /results, /report, /diff, /retest)
|
||||
active_run.json # live checkpoint of an in-flight run (auto-recovered if interrupted)
|
||||
history.txt # command history (↑/↓)
|
||||
```
|
||||
|
||||
Close and reopen in the same folder → it **resumes** automatically
|
||||
(`↻ resumed project session`). No database needed — it's structured state.
|
||||
(`↻ resumed project session`). If a run was interrupted mid-flight, its
|
||||
checkpointed findings are recovered into `/runs` (`↻ recovered interrupted run`).
|
||||
No database needed — it's structured state.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -360,6 +360,12 @@ pub(crate) struct Spawned {
|
||||
pub rx: tokio::sync::mpsc::Receiver<String>,
|
||||
pub cancel: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
pub soft: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
/// Set when the run is parked on token/quota exhaustion (awaiting /continue).
|
||||
pub paused: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
/// Wakes a parked run when the user runs /continue.
|
||||
pub resume: std::sync::Arc<tokio::sync::Notify>,
|
||||
/// Fallback models pushed by /continue <provider:model> before resuming.
|
||||
pub fallback: std::sync::Arc<std::sync::Mutex<Vec<ModelRef>>>,
|
||||
pub workdir: PathBuf,
|
||||
}
|
||||
|
||||
@@ -410,6 +416,9 @@ pub(crate) fn spawn_engagement(base: &Path, mut cfg: RunConfig, mcp: bool, mode:
|
||||
let pool = ModelPool::with_auth(refs, cfg.concurrency, cfg.subscription, mcp_config);
|
||||
let cancel = pool.cancel_handle();
|
||||
let soft = pool.soft_handle();
|
||||
let paused = pool.pause_handle();
|
||||
let resume = pool.resume_handle();
|
||||
let fallback = pool.fallback_handle();
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<String>(256);
|
||||
let task = tokio::spawn(async move {
|
||||
match mode {
|
||||
@@ -419,7 +428,7 @@ pub(crate) fn spawn_engagement(base: &Path, mut cfg: RunConfig, mcp: bool, mode:
|
||||
Mode::Black => harness::run(cfg, &lib, &pool, tx).await,
|
||||
}
|
||||
});
|
||||
Spawned { task, rx, cancel, soft, workdir }
|
||||
Spawned { task, rx, cancel, soft, paused, resume, fallback, workdir }
|
||||
}
|
||||
|
||||
/// Absolute file:// URL of a run's report (PDF if present, else HTML).
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! models, target/repo/auth/instructions, run history, and reports.
|
||||
|
||||
use dialoguer::{theme::ColorfulTheme, MultiSelect};
|
||||
use harness::{agents, types::Finding, types::RunConfig};
|
||||
use harness::{agents, models::ModelRef, types::Finding, types::RunConfig};
|
||||
use rustyline::completion::{Completer, Pair};
|
||||
use rustyline::error::ReadlineError;
|
||||
use rustyline::highlight::Highlighter;
|
||||
@@ -48,7 +48,9 @@ impl RunLive {
|
||||
}
|
||||
fn ingest(&mut self, line: &str) {
|
||||
let low = line.to_lowercase();
|
||||
if low.contains("recon complete") { self.phase = "recon".into(); }
|
||||
if low.contains("token/quota exhausted") || low.contains("run is paused") { self.phase = "paused (quota)".into(); }
|
||||
else if low.contains("resumed — retrying") { self.phase = "exploiting".into(); }
|
||||
else if low.contains("recon complete") { self.phase = "recon".into(); }
|
||||
else if low.contains("selected") && low.contains("agent") {
|
||||
self.phase = "planning".into();
|
||||
if let Some(n) = line.split_whitespace().find_map(|t| t.parse::<usize>().ok()) { self.agents = n; }
|
||||
@@ -93,13 +95,31 @@ struct ActiveRun {
|
||||
soft: Arc<AtomicBool>,
|
||||
done: Arc<AtomicBool>,
|
||||
choice: Arc<Mutex<StopMode>>,
|
||||
/// Set when the run is parked on token/quota exhaustion (awaiting /continue).
|
||||
paused: Arc<AtomicBool>,
|
||||
/// Wakes the parked run when the user runs /continue.
|
||||
resume: Arc<tokio::sync::Notify>,
|
||||
/// Fallback models to try first, pushed by /continue <provider:model>.
|
||||
fallback: Arc<Mutex<Vec<ModelRef>>>,
|
||||
}
|
||||
|
||||
/// On-disk checkpoint of an in-flight run's findings/commands, written live so a
|
||||
/// run survives quitting/crashing — recovered into /runs on the next launch.
|
||||
#[derive(Serialize, Deserialize, Clone, Default)]
|
||||
struct LiveCheckpoint {
|
||||
target: String,
|
||||
mode: String,
|
||||
phase: String,
|
||||
workdir: String,
|
||||
findings: Vec<Finding>,
|
||||
commands: Vec<String>,
|
||||
}
|
||||
|
||||
/// All slash-commands, for Tab completion.
|
||||
const COMMANDS: &[&str] = &[
|
||||
"/help", "/show", "/config", "/providers", "/model", "/key", "/sub", "/target",
|
||||
"/repo", "/auth", "/creds", "/focus", "/attach", "/context", "/mcp", "/offline",
|
||||
"/votes", "/agents", "/theme", "/clear", "/run", "/stop", "/runs", "/results", "/report",
|
||||
"/votes", "/agents", "/theme", "/clear", "/run", "/stop", "/continue", "/runs", "/results", "/report",
|
||||
"/status", "/diff", "/retest", "/quit",
|
||||
];
|
||||
|
||||
@@ -294,8 +314,26 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> {
|
||||
let history: Arc<Mutex<Vec<RunRecord>>> = Arc::new(Mutex::new(load_runs(base)));
|
||||
let past = history.lock().unwrap().len();
|
||||
if resumed || past > 0 {
|
||||
println!(" ↻ resumed project session from {} — {} past run(s)\n", proj_dir().display(), past);
|
||||
println!(" ↻ resumed project session from {} — {} past run(s)", proj_dir().display(), past);
|
||||
}
|
||||
// Recover an interrupted run (REPL was quit/crashed mid-engagement): its
|
||||
// live findings were checkpointed to disk — fold them into /runs so
|
||||
// /results, /finding and /report still work.
|
||||
if let Some(cp) = load_checkpoint() {
|
||||
if !cp.findings.is_empty() {
|
||||
let wd = std::path::PathBuf::from(&cp.workdir);
|
||||
std::fs::create_dir_all(&wd).ok();
|
||||
crate::report_raw(&cp.target, &cp.findings, &wd); // materialize a report so /report works
|
||||
let mut h = history.lock().unwrap();
|
||||
let id = h.len() + 1;
|
||||
h.push(RunRecord { id, mode: cp.mode.clone(), target: cp.target.clone(), workdir: cp.workdir.clone(), findings: cp.findings.clone() });
|
||||
save_runs(base, &h);
|
||||
println!(" \x1b[1;33m↻ recovered interrupted run on {} — {} finding(s) saved as run #{}\x1b[0m (/results {id} · /report {id})",
|
||||
cp.target, cp.findings.len(), id);
|
||||
}
|
||||
clear_checkpoint();
|
||||
}
|
||||
println!();
|
||||
let mut reader = Reader::new(base);
|
||||
let mut active: Option<ActiveRun> = None;
|
||||
show(&s);
|
||||
@@ -332,6 +370,15 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> {
|
||||
s.models = arg.split([',', ' ']).filter(|x| !x.is_empty()).map(String::from).collect();
|
||||
println!(" models: {}", s.models.join(", "));
|
||||
}
|
||||
// If a run is paused on exhaustion, queue the newly-chosen models
|
||||
// as its fallback so a plain /continue picks them up.
|
||||
if let Some(a) = &active {
|
||||
if a.paused.load(Ordering::Relaxed) {
|
||||
let mut fb = a.fallback.lock().unwrap();
|
||||
for id in &s.models { fb.push(ModelRef::parse(id)); }
|
||||
println!(" \x1b[2m↪ queued for the paused run — /continue to resume on these model(s)\x1b[0m");
|
||||
}
|
||||
}
|
||||
}
|
||||
"/key" => key_cmd(&mut s, arg, &mut reader),
|
||||
"/sub" | "/subscription" => {
|
||||
@@ -414,6 +461,23 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> {
|
||||
_ => println!(" no active run."),
|
||||
}
|
||||
}
|
||||
"/continue" | "/resume" => {
|
||||
match &active {
|
||||
Some(a) if a.paused.load(Ordering::Relaxed) => {
|
||||
if !arg.is_empty() {
|
||||
let m = ModelRef::parse(arg);
|
||||
println!(" \x1b[1;35m▶ resuming with fallback model\x1b[0m {}:{}", m.provider, m.model);
|
||||
a.fallback.lock().unwrap().push(m);
|
||||
} else {
|
||||
println!(" \x1b[1;35m▶ resuming\x1b[0m — retrying with the current model(s).");
|
||||
}
|
||||
a.paused.store(false, Ordering::Relaxed);
|
||||
a.resume.notify_waiters();
|
||||
}
|
||||
Some(a) if !a.done.load(Ordering::Relaxed) => println!(" run is not paused — it's still working. /status to check."),
|
||||
_ => println!(" no paused run. (a run pauses automatically if your tokens/quota run out)"),
|
||||
}
|
||||
}
|
||||
"/runs" | "/history" => list_runs(&history.lock().unwrap()),
|
||||
"/diff" | "/changed" => diff_runs(&history.lock().unwrap()),
|
||||
"/retest" => {
|
||||
@@ -478,6 +542,9 @@ pub async fn repl(base: &Path) -> anyhow::Result<()> {
|
||||
let sev = if by.is_empty() { "0".into() } else { by.iter().map(|(k, v)| format!("{k}:{v}")).collect::<Vec<_>>().join(" ") };
|
||||
println!(" \x1b[1m▶ live\x1b[0m {} ({}) · phase {} · {:02}:{:02} · {} possible finding(s) [{}]",
|
||||
l.target, l.mode, l.phase, el / 60, el % 60, l.findings.len(), sev);
|
||||
if a.paused.load(Ordering::Relaxed) {
|
||||
println!(" \x1b[1;33m⏸ PAUSED — token/quota exhausted. /continue to resume, or /model <provider:model> then /continue to switch.\x1b[0m");
|
||||
}
|
||||
if l.agents > 0 { println!(" progress \x1b[36m{}\x1b[0m", l.bar(24)); }
|
||||
for (sv, t) in l.findings.iter().rev().take(5) { println!(" ✦ [{sv}] {t}"); }
|
||||
}
|
||||
@@ -661,21 +728,40 @@ async fn start_background(base: &Path, s: &Session, reader: &mut Reader,
|
||||
}));
|
||||
let cancel = sp.cancel.clone();
|
||||
let soft = sp.soft.clone();
|
||||
let paused = sp.paused.clone();
|
||||
let resume = sp.resume.clone();
|
||||
let fallback = sp.fallback.clone();
|
||||
let done = Arc::new(AtomicBool::new(false));
|
||||
let choice = Arc::new(Mutex::new(StopMode::Run));
|
||||
let (live2, done2, hist2, choice2) = (live.clone(), done.clone(), history, choice.clone());
|
||||
|
||||
tokio::spawn(async move {
|
||||
let crate::Spawned { task, mut rx, workdir, .. } = sp;
|
||||
let mut last_saved = 0usize;
|
||||
while let Some(line) = rx.recv().await {
|
||||
live2.lock().unwrap().ingest(&line);
|
||||
if let Some(out) = crate::render_compact(&line) { let _ = printer.print(out); }
|
||||
// Checkpoint live findings to disk whenever a new one lands, so the
|
||||
// run survives a quit/crash and is recovered on next launch.
|
||||
let snap = {
|
||||
let l = live2.lock().unwrap();
|
||||
if l.full.len() != last_saved {
|
||||
last_saved = l.full.len();
|
||||
Some(LiveCheckpoint {
|
||||
target: l.target.clone(), mode: l.mode.into(), phase: l.phase.clone(),
|
||||
workdir: workdir.display().to_string(),
|
||||
findings: l.full.clone(), commands: l.commands.clone(),
|
||||
})
|
||||
} else { None }
|
||||
};
|
||||
if let Some(c) = snap { save_checkpoint(&c); }
|
||||
}
|
||||
let task_out = task.await.unwrap_or_default();
|
||||
let mode_choice = *choice2.lock().unwrap();
|
||||
|
||||
if mode_choice == StopMode::Discard {
|
||||
std::fs::remove_dir_all(&workdir).ok();
|
||||
clear_checkpoint();
|
||||
let _ = printer.print(format!("\x1b[33m🗑 run discarded — {}\x1b[0m", workdir.display()));
|
||||
done2.store(true, Ordering::Relaxed);
|
||||
return;
|
||||
@@ -698,13 +784,14 @@ async fn start_background(base: &Path, s: &Session, reader: &mut Reader,
|
||||
if let Ok(j) = serde_json::to_string_pretty(&*h) { std::fs::write(proj_dir().join("runs.json"), j).ok(); }
|
||||
id
|
||||
};
|
||||
clear_checkpoint(); // run is now a completed RunRecord
|
||||
let _ = printer.print(format!(
|
||||
"\x1b[1;32m◀ run #{id} done — {} {} finding(s)\x1b[0m · /results {id} · /finding",
|
||||
findings.len(), validated_word));
|
||||
let _ = printer.print(format!("\x1b[36m report: {}\x1b[0m", crate::report_url(&workdir)));
|
||||
done2.store(true, Ordering::Relaxed);
|
||||
});
|
||||
Some(ActiveRun { live, cancel, soft, done, choice })
|
||||
Some(ActiveRun { live, cancel, soft, done, choice, paused, resume, fallback })
|
||||
}
|
||||
|
||||
/// Project-local store: `<cwd>/.neurosploit/` so each project keeps its own
|
||||
@@ -726,6 +813,16 @@ fn save_runs(_base: &Path, history: &[RunRecord]) {
|
||||
if let Ok(j) = serde_json::to_string_pretty(history) { std::fs::write(p, j).ok(); }
|
||||
}
|
||||
|
||||
/// Live-run checkpoint file (one in-flight run at a time).
|
||||
fn checkpoint_path() -> std::path::PathBuf { proj_dir().join("active_run.json") }
|
||||
fn save_checkpoint(c: &LiveCheckpoint) {
|
||||
if let Ok(j) = serde_json::to_string_pretty(c) { std::fs::write(checkpoint_path(), j).ok(); }
|
||||
}
|
||||
fn clear_checkpoint() { std::fs::remove_file(checkpoint_path()).ok(); }
|
||||
fn load_checkpoint() -> Option<LiveCheckpoint> {
|
||||
std::fs::read_to_string(checkpoint_path()).ok().and_then(|t| serde_json::from_str(&t).ok())
|
||||
}
|
||||
|
||||
/// Persistable snapshot of the session config (resume across restarts).
|
||||
#[derive(Serialize, Deserialize, Default)]
|
||||
struct Snapshot {
|
||||
@@ -939,7 +1036,8 @@ fn help() {
|
||||
println!("\n \x1b[2mRUN & MONITOR\x1b[0m");
|
||||
h("/run", "launch (runs in the BACKGROUND — keep typing)");
|
||||
h("/status", "live progress + findings while running (or a past run #)");
|
||||
h("/stop", "gracefully stop the active run");
|
||||
h("/stop", "stop: [1] validate+report [2] raw report now [3] discard");
|
||||
h("/continue", "resume a run paused on token/quota (change /model first to switch)");
|
||||
h("/runs", "list runs · /results [n] · /report [n]");
|
||||
h("/diff /retest [n]", "what changed vs last run · re-verify a past run");
|
||||
|
||||
@@ -949,6 +1047,8 @@ fn help() {
|
||||
h("/theme color|mono", "/show (config) /clear /quit");
|
||||
|
||||
println!("\n \x1b[2mMODES — black-box: set /target · white-box: set /repo · grey-box: set BOTH /repo + /target · host: /target <ip> + /creds\x1b[0m");
|
||||
println!(" \x1b[2mFindings are checkpointed live to .neurosploit/ — quit/crash mid-run and they're recovered into /runs next launch.\x1b[0m");
|
||||
println!(" \x1b[2mIf tokens/quota run out the run PAUSES (state kept) — /continue to resume, or switch with /model then /continue.\x1b[0m");
|
||||
println!(" \x1b[2m↑/↓ history · Tab completes commands & @paths · Ctrl-A/E/K edit · Ctrl-O full cmd · \\ for multiline\x1b[0m\n");
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,24 @@
|
||||
use crate::models::{cli_binary_for, ChatClient, ModelRef};
|
||||
use anyhow::{anyhow, Result};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Semaphore;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{Notify, Semaphore};
|
||||
|
||||
/// Does this error look like token/quota/rate-limit exhaustion (as opposed to a
|
||||
/// transient network blip)? Used to PAUSE the run instead of silently dropping
|
||||
/// the agent, so the user can /continue (wait for renewal) or switch model.
|
||||
pub fn is_exhaustion(e: &anyhow::Error) -> bool {
|
||||
let s = format!("{e:#}").to_lowercase();
|
||||
[
|
||||
"rate limit", "rate_limit", "ratelimit", "429", "too many requests",
|
||||
"quota", "insufficient_quota", "insufficient quota", "out of credit",
|
||||
"credit balance", "billing", "exhausted", "overloaded", "capacity",
|
||||
"usage limit", "resource_exhausted", "resource exhausted",
|
||||
]
|
||||
.iter()
|
||||
.any(|k| s.contains(k))
|
||||
}
|
||||
|
||||
/// Task type used by the model router to pick the best model for the step.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
@@ -39,6 +56,14 @@ pub struct ModelPool {
|
||||
/// SOFT stop: stop launching new EXPLOIT agents, but let in-flight finish and
|
||||
/// VALIDATION still run — so "stop and validate what was found" works.
|
||||
soft: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||
/// PAUSE: set when every candidate model is token/quota-exhausted. The run
|
||||
/// parks (keeping all state) until the user runs /continue.
|
||||
paused: Arc<AtomicBool>,
|
||||
/// Wakes the parked task when the user runs /continue.
|
||||
resume: Arc<Notify>,
|
||||
/// Fallback models the user added via `/continue <provider:model>` while
|
||||
/// paused — tried first on the next attempt.
|
||||
fallback: Arc<Mutex<Vec<ModelRef>>>,
|
||||
}
|
||||
|
||||
impl ModelPool {
|
||||
@@ -68,6 +93,9 @@ impl ModelPool {
|
||||
progress: std::sync::Mutex::new(None),
|
||||
cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
soft: Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
||||
paused: Arc::new(AtomicBool::new(false)),
|
||||
resume: Arc::new(Notify::new()),
|
||||
fallback: Arc::new(Mutex::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +129,52 @@ impl ModelPool {
|
||||
|| self.soft.load(std::sync::atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Handle to the PAUSE flag (observe whether the run is parked on exhaustion).
|
||||
pub fn pause_handle(&self) -> Arc<AtomicBool> {
|
||||
self.paused.clone()
|
||||
}
|
||||
/// Handle used by the REPL to wake a parked run (`/continue`).
|
||||
pub fn resume_handle(&self) -> Arc<Notify> {
|
||||
self.resume.clone()
|
||||
}
|
||||
/// Slot the REPL pushes a fallback model into before resuming
|
||||
/// (`/continue <provider:model>`).
|
||||
pub fn fallback_handle(&self) -> Arc<Mutex<Vec<ModelRef>>> {
|
||||
self.fallback.clone()
|
||||
}
|
||||
pub fn is_paused(&self) -> bool {
|
||||
self.paused.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Park the run on token/quota exhaustion: keep ALL state, emit a notice,
|
||||
/// and wait until the user runs `/continue` (or cancels). Returns when the
|
||||
/// run should retry (pause cleared) or give up (cancelled).
|
||||
async fn park_exhausted(&self, err: &anyhow::Error) {
|
||||
self.paused.store(true, Ordering::Relaxed);
|
||||
if let Some(tx) = self.progress() {
|
||||
let msg = format!("{err:#}");
|
||||
let short = msg.lines().next().unwrap_or(&msg);
|
||||
let _ = tx
|
||||
.send(format!(
|
||||
"notify: ⏸ token/quota exhausted ({}). Run is PAUSED — type /continue when your quota renews, or switch with /model <provider:model> then /continue.",
|
||||
short.chars().take(120).collect::<String>()
|
||||
))
|
||||
.await;
|
||||
}
|
||||
while self.paused.load(Ordering::Relaxed) && !self.is_cancelled() {
|
||||
let notified = self.resume.notified();
|
||||
tokio::select! {
|
||||
_ = notified => {}
|
||||
_ = tokio::time::sleep(Duration::from_millis(500)) => {}
|
||||
}
|
||||
}
|
||||
if !self.is_cancelled() {
|
||||
if let Some(tx) = self.progress() {
|
||||
let _ = tx.send("notify: ▶ resumed — retrying exhausted step.".to_string()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// One completion for a model, via subscription CLI (optionally with MCP) or
|
||||
/// HTTP API, with a short retry/backoff. `label` (e.g. the agent name) tags
|
||||
/// the streamed activity so each command/tool is attributable.
|
||||
@@ -112,18 +186,35 @@ impl ModelPool {
|
||||
let progress = self.progress();
|
||||
let mut last = anyhow::anyhow!("no attempt");
|
||||
for attempt in 0..3u64 {
|
||||
if self.is_cancelled() {
|
||||
return Err(anyhow!("cancelled"));
|
||||
}
|
||||
if attempt > 0 {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1500 * attempt * attempt.max(1))).await;
|
||||
}
|
||||
let r = if use_cli {
|
||||
self.client
|
||||
.chat_cli(label, &m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone())
|
||||
.await
|
||||
} else {
|
||||
self.client.chat(m, system, user).await
|
||||
let call = async {
|
||||
if use_cli {
|
||||
self.client
|
||||
.chat_cli(label, &m.provider, &m.model, system, user, self.mcp_config.as_deref(), progress.clone())
|
||||
.await
|
||||
} else {
|
||||
self.client.chat(m, system, user).await
|
||||
}
|
||||
};
|
||||
// Race the in-flight call against a HARD cancel: when the user picks
|
||||
// "report raw" / "discard" on /stop, drop the call future so the
|
||||
// CLI child (spawned with kill_on_drop) is terminated immediately
|
||||
// instead of finishing its whole command sequence.
|
||||
let r = tokio::select! {
|
||||
biased;
|
||||
_ = wait_cancelled(&self.cancel) => return Err(anyhow!("cancelled")),
|
||||
r = call => r,
|
||||
};
|
||||
match r {
|
||||
Ok(t) => return Ok(t),
|
||||
// Don't burn retries on exhaustion — surface it so the caller
|
||||
// can park and let the user /continue.
|
||||
Err(e) if is_exhaustion(&e) => return Err(e),
|
||||
Err(e) => last = e,
|
||||
}
|
||||
}
|
||||
@@ -138,15 +229,44 @@ impl ModelPool {
|
||||
/// Router-aware completion. `label` tags streamed activity (agent name).
|
||||
pub async fn complete_routed(&self, task: Task, label: &str, system: &str, user: &str) -> Result<(ModelRef, String)> {
|
||||
let _permit = self.sem.acquire().await.expect("semaphore closed");
|
||||
let order = self.route(task);
|
||||
let mut last = anyhow!("no candidate models");
|
||||
for m in &order {
|
||||
match self.one(label, m, system, user).await {
|
||||
Ok(text) => return Ok((m.clone(), text)),
|
||||
Err(e) => last = e,
|
||||
loop {
|
||||
if self.is_cancelled() {
|
||||
return Err(anyhow!("cancelled"));
|
||||
}
|
||||
// User-supplied fallback models (via /continue) are tried first.
|
||||
let mut order = self.route(task);
|
||||
if let Ok(fb) = self.fallback.lock() {
|
||||
for m in fb.iter().rev() {
|
||||
if !order.iter().any(|o| o.provider == m.provider && o.model == m.model) {
|
||||
order.insert(0, m.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut last = anyhow!("no candidate models");
|
||||
let mut exhausted = false;
|
||||
for m in &order {
|
||||
if self.is_cancelled() {
|
||||
return Err(anyhow!("cancelled"));
|
||||
}
|
||||
match self.one(label, m, system, user).await {
|
||||
Ok(text) => return Ok((m.clone(), text)),
|
||||
Err(e) => {
|
||||
if is_exhaustion(&e) {
|
||||
exhausted = true;
|
||||
}
|
||||
last = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Every candidate failed. If it was token/quota exhaustion, park the
|
||||
// run until the user runs /continue, then retry the whole order (now
|
||||
// including any fallback model they added). Otherwise, give up.
|
||||
if exhausted && !self.is_cancelled() {
|
||||
self.park_exhausted(&last).await;
|
||||
continue;
|
||||
}
|
||||
return Err(last);
|
||||
}
|
||||
Err(last)
|
||||
}
|
||||
|
||||
/// Reorder candidates for a task. With a single-model panel this is a no-op.
|
||||
@@ -205,3 +325,11 @@ impl ModelPool {
|
||||
(confirmed, total)
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve once the HARD-cancel flag flips. Lets `tokio::select!` race an
|
||||
/// in-flight model call against cancellation and drop it on the spot.
|
||||
async fn wait_cancelled(flag: &Arc<AtomicBool>) {
|
||||
while !flag.load(Ordering::Relaxed) {
|
||||
tokio::time::sleep(Duration::from_millis(120)).await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user