From 43058dc789904f43c45f00e6cdbe3eb2ecb7417c Mon Sep 17 00:00:00 2001 From: Yeachan-Heo Date: Sat, 4 Apr 2026 16:07:56 +0000 Subject: [PATCH] feat(telemetry): add worker.init and worker.done events Wire the worker boot control plane into the existing worker lifecycle telemetry so ready and terminal transitions emit structured worker.init/worker.done events. This also records boot duration on the worker state itself, resets the timer on restart, and covers the new telemetry path with a focused registry test. Constraint: Boot duration must measure the current worker boot cycle without changing existing worker state transitions Rejected: Record boot timing only in ad-hoc session trace attributes | loses the worker lifecycle event envelope and shared helper path Confidence: high Scope-risk: narrow Directive: Keep worker boot duration tied to ReadyForPrompt for a single boot cycle and reset it whenever the worker restarts Tested: cargo build --workspace; cargo test --workspace Not-tested: Live CLI-driven worker sessions outside the in-memory registry/test harness --- rust/crates/runtime/src/worker_boot.rs | 208 ++++++++++++++++++++++++- 1 file changed, 205 insertions(+), 3 deletions(-) diff --git a/rust/crates/runtime/src/worker_boot.rs b/rust/crates/runtime/src/worker_boot.rs index 4854e2a..0fc19c9 100644 --- a/rust/crates/runtime/src/worker_boot.rs +++ b/rust/crates/runtime/src/worker_boot.rs @@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use telemetry::SessionTracer; fn now_secs() -> u64 { SystemTime::now() @@ -18,6 +20,15 @@ fn now_secs() -> u64 { .as_secs() } +fn now_millis() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(u64::MAX) +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum WorkerStatus { @@ -51,6 +62,17 @@ pub enum WorkerFailureKind { Provider, } +impl std::fmt::Display for WorkerFailureKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::TrustGate => write!(f, "trust_gate"), + Self::PromptDelivery => write!(f, "prompt_delivery"), + Self::Protocol => write!(f, "protocol"), + Self::Provider => write!(f, "provider"), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerFailure { pub kind: WorkerFailureKind, @@ -131,12 +153,19 @@ pub struct Worker { pub last_error: Option, pub created_at: u64, pub updated_at: u64, + #[serde(default)] + pub boot_started_at_ms: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub boot_completed_at_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub boot_duration_ms: Option, pub events: Vec, } #[derive(Debug, Clone, Default)] pub struct WorkerRegistry { inner: Arc>, + session_tracer: Option, } #[derive(Debug, Default)] @@ -151,6 +180,12 @@ impl WorkerRegistry { Self::default() } + #[must_use] + pub fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.session_tracer = Some(session_tracer); + self + } + #[must_use] pub fn create( &self, @@ -161,6 +196,7 @@ impl WorkerRegistry { let mut inner = self.inner.lock().expect("worker registry lock poisoned"); inner.counter += 1; let ts = now_secs(); + let boot_started_at_ms = now_millis(); let worker_id = format!("worker_{:08x}_{}", ts, inner.counter); let trust_auto_resolve = trusted_roots .iter() @@ -179,6 +215,9 @@ impl WorkerRegistry { last_error: None, created_at: ts, updated_at: ts, + boot_started_at_ms, + boot_completed_at_ms: None, + boot_duration_ms: None, events: Vec::new(), }; push_event( @@ -205,6 +244,7 @@ impl WorkerRegistry { .get_mut(worker_id) .ok_or_else(|| format!("worker not found: {worker_id}"))?; let lowered = screen_text.to_ascii_lowercase(); + let tracer = self.session_tracer.as_ref(); if !worker.trust_gate_cleared && detect_trust_prompt(&lowered) { worker.status = WorkerStatus::TrustRequired; @@ -257,7 +297,9 @@ impl WorkerRegistry { let prompt_preview = prompt_preview(worker.last_prompt.as_deref().unwrap_or_default()); let message = match observation.target { WorkerPromptTarget::Shell => { - format!("worker prompt landed in shell instead of coding agent: {prompt_preview}") + format!( + "worker prompt landed in shell instead of coding agent: {prompt_preview}" + ) } WorkerPromptTarget::WrongTarget => format!( "worker prompt landed in the wrong target instead of {}: {}", @@ -302,6 +344,7 @@ impl WorkerRegistry { ); } else { worker.status = WorkerStatus::Failed; + record_worker_done(tracer, worker, Map::new()); } return Ok(worker.clone()); } @@ -312,7 +355,9 @@ impl WorkerRegistry { worker.last_error = None; } - if detect_ready_for_prompt(screen_text, &lowered) && worker.status != WorkerStatus::ReadyForPrompt { + if detect_ready_for_prompt(screen_text, &lowered) + && worker.status != WorkerStatus::ReadyForPrompt + { worker.status = WorkerStatus::ReadyForPrompt; worker.prompt_in_flight = false; if matches!( @@ -328,6 +373,7 @@ impl WorkerRegistry { Some("worker is ready for prompt delivery".to_string()), None, ); + record_worker_init(tracer, worker); } Ok(worker.clone()) @@ -412,7 +458,10 @@ impl WorkerRegistry { worker_id: worker.worker_id.clone(), status: worker.status, ready: worker.status == WorkerStatus::ReadyForPrompt, - blocked: matches!(worker.status, WorkerStatus::TrustRequired | WorkerStatus::Failed), + blocked: matches!( + worker.status, + WorkerStatus::TrustRequired | WorkerStatus::Failed + ), replay_prompt_ready: worker.replay_prompt.is_some(), last_error: worker.last_error.clone(), }) @@ -431,6 +480,7 @@ impl WorkerRegistry { worker.last_error = None; worker.prompt_delivery_attempts = 0; worker.prompt_in_flight = false; + reset_worker_boot_clock(worker); push_event( worker, WorkerEventKind::Restarted, @@ -456,6 +506,7 @@ impl WorkerRegistry { Some("worker terminated by control plane".to_string()), None, ); + record_worker_done(self.session_tracer.as_ref(), worker, Map::new()); Ok(worker.clone()) } @@ -512,6 +563,14 @@ impl WorkerRegistry { ); } + let mut attributes = Map::new(); + attributes.insert( + "finish_reason".to_string(), + Value::String(finish_reason.to_string()), + ); + attributes.insert("tokens_output".to_string(), Value::from(tokens_output)); + record_worker_done(self.session_tracer.as_ref(), worker, attributes); + Ok(worker.clone()) } } @@ -556,6 +615,88 @@ fn push_event( }); } +fn reset_worker_boot_clock(worker: &mut Worker) { + worker.boot_started_at_ms = now_millis(); + worker.boot_completed_at_ms = None; + worker.boot_duration_ms = None; +} + +fn ensure_worker_boot_duration_ms(worker: &mut Worker) -> u64 { + if let Some(duration) = worker.boot_duration_ms { + return duration; + } + + let completed_at_ms = now_millis(); + let duration = completed_at_ms.saturating_sub(worker.boot_started_at_ms); + worker.boot_completed_at_ms = Some(completed_at_ms); + worker.boot_duration_ms = Some(duration); + duration +} + +fn worker_done_error(worker: &Worker) -> Option { + worker + .last_error + .as_ref() + .map(|error| format!("{}: {}", error.kind, error.message)) +} + +fn worker_done_boot_duration_ms(worker: &Worker) -> Option { + worker.boot_duration_ms.or_else(|| { + (worker.boot_started_at_ms > 0) + .then(|| now_millis().saturating_sub(worker.boot_started_at_ms)) + }) +} + +fn record_worker_init(tracer: Option<&SessionTracer>, worker: &mut Worker) { + let Some(tracer) = tracer else { + let _ = ensure_worker_boot_duration_ms(worker); + return; + }; + + let boot_duration_ms = ensure_worker_boot_duration_ms(worker); + let mut attributes = Map::new(); + attributes.insert( + "trust_auto_resolve".to_string(), + Value::Bool(worker.trust_auto_resolve), + ); + attributes.insert( + "auto_recover_prompt_misdelivery".to_string(), + Value::Bool(worker.auto_recover_prompt_misdelivery), + ); + attributes.insert( + "prompt_delivery_attempts".to_string(), + Value::from(worker.prompt_delivery_attempts), + ); + tracer.record_worker_init( + worker.worker_id.clone(), + worker.cwd.clone(), + boot_duration_ms, + attributes, + ); +} + +fn record_worker_done( + tracer: Option<&SessionTracer>, + worker: &Worker, + mut attributes: Map, +) { + let Some(tracer) = tracer else { + return; + }; + + attributes.insert( + "prompt_delivery_attempts".to_string(), + Value::from(worker.prompt_delivery_attempts), + ); + tracer.record_worker_done( + worker.worker_id.clone(), + worker.status.to_string(), + worker_done_boot_duration_ms(worker), + worker_done_error(worker), + attributes, + ); +} + fn path_matches_allowlist(cwd: &str, trusted_root: &str) -> bool { let cwd = normalize_path(cwd); let trusted_root = normalize_path(trusted_root); @@ -739,6 +880,8 @@ fn cwd_matches_observed_target(expected_cwd: &str, observed_cwd: &str) -> bool { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; + use telemetry::{MemoryTelemetrySink, SessionTracer, TelemetryEvent}; #[test] fn allowlisted_trust_prompt_auto_resolves_then_reaches_ready_state() { @@ -1019,6 +1162,65 @@ mod tests { .any(|event| event.kind == WorkerEventKind::Finished)); } + #[test] + fn worker_registry_emits_worker_lifecycle_telemetry_with_boot_duration() { + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-worker", sink.clone()); + let registry = WorkerRegistry::new().with_session_tracer(tracer); + let worker = registry.create("/tmp/repo-telemetry", &[], true); + + let ready = registry + .observe(&worker.worker_id, "Ready for input\n>") + .expect("ready observe should succeed"); + assert_eq!(ready.status, WorkerStatus::ReadyForPrompt); + assert!(ready.boot_duration_ms.is_some()); + + registry + .terminate(&worker.worker_id) + .expect("terminate should succeed"); + + let events = sink.events(); + assert!(events.iter().any(|event| { + matches!( + event, + TelemetryEvent::WorkerInit { + session_id, + worker_id, + boot_duration_ms, + .. + } if session_id == "session-worker" + && worker_id == &worker.worker_id + && Some(*boot_duration_ms) == ready.boot_duration_ms + ) + })); + assert!(events.iter().any(|event| { + matches!( + event, + TelemetryEvent::WorkerDone { + session_id, + worker_id, + status, + boot_duration_ms: Some(_), + .. + } if session_id == "session-worker" + && worker_id == &worker.worker_id + && status == "finished" + ) + })); + assert!(events.iter().any(|event| { + matches!( + event, + TelemetryEvent::SessionTrace(trace) if trace.name == "worker.init" + ) + })); + assert!(events.iter().any(|event| { + matches!( + event, + TelemetryEvent::SessionTrace(trace) if trace.name == "worker.done" + ) + })); + } + #[test] fn observe_completion_classifies_provider_failure_on_unknown_finish_zero_tokens() { let registry = WorkerRegistry::new();