diff --git a/rust/crates/telemetry/src/lib.rs b/rust/crates/telemetry/src/lib.rs index 6e369e1..84343f3 100644 --- a/rust/crates/telemetry/src/lib.rs +++ b/rust/crates/telemetry/src/lib.rs @@ -198,6 +198,41 @@ pub enum TelemetryEvent { #[serde(default, skip_serializing_if = "Map::is_empty")] attributes: Map, }, + #[serde(rename = "worker.init")] + WorkerInit { + session_id: String, + worker_id: String, + cwd: String, + boot_duration_ms: u64, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + #[serde(rename = "worker.done")] + WorkerDone { + session_id: String, + worker_id: String, + status: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + boot_duration_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + error: Option, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + #[serde(rename = "lane.open")] + LaneOpen { + session_id: String, + lane_id: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + #[serde(rename = "lane.close")] + LaneClose { + session_id: String, + lane_id: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, Analytics(AnalyticsEvent), SessionTrace(SessionTraceRecord), } @@ -394,6 +429,80 @@ impl SessionTracer { self.record("http_request_failed", trace_attributes); } + pub fn record_worker_init( + &self, + worker_id: impl Into, + cwd: impl Into, + boot_duration_ms: u64, + attributes: Map, + ) { + let worker_id = worker_id.into(); + let cwd = cwd.into(); + self.sink.record(TelemetryEvent::WorkerInit { + session_id: self.session_id.clone(), + worker_id: worker_id.clone(), + cwd: cwd.clone(), + boot_duration_ms, + attributes: attributes.clone(), + }); + self.record( + "worker.init", + merge_worker_trace_fields( + worker_id, + Some(cwd), + Some(boot_duration_ms), + None, + attributes, + ), + ); + } + + pub fn record_worker_done( + &self, + worker_id: impl Into, + status: impl Into, + boot_duration_ms: Option, + error: Option, + attributes: Map, + ) { + let worker_id = worker_id.into(); + let status = status.into(); + self.sink.record(TelemetryEvent::WorkerDone { + session_id: self.session_id.clone(), + worker_id: worker_id.clone(), + status: status.clone(), + boot_duration_ms, + error: error.clone(), + attributes: attributes.clone(), + }); + let mut trace_attributes = + merge_worker_trace_fields(worker_id, None, boot_duration_ms, Some(status), attributes); + if let Some(error) = error { + trace_attributes.insert("error".to_string(), Value::String(error)); + } + self.record("worker.done", trace_attributes); + } + + pub fn record_lane_open(&self, lane_id: impl Into, attributes: Map) { + let lane_id = lane_id.into(); + self.sink.record(TelemetryEvent::LaneOpen { + session_id: self.session_id.clone(), + lane_id: lane_id.clone(), + attributes: attributes.clone(), + }); + self.record("lane.open", merge_lane_trace_fields(lane_id, attributes)); + } + + pub fn record_lane_close(&self, lane_id: impl Into, attributes: Map) { + let lane_id = lane_id.into(); + self.sink.record(TelemetryEvent::LaneClose { + session_id: self.session_id.clone(), + lane_id: lane_id.clone(), + attributes: attributes.clone(), + }); + self.record("lane.close", merge_lane_trace_fields(lane_id, attributes)); + } + pub fn record_analytics(&self, event: AnalyticsEvent) { let mut attributes = event.properties.clone(); attributes.insert( @@ -418,6 +527,37 @@ fn merge_trace_fields( attributes } +fn merge_lane_trace_fields( + lane_id: String, + mut attributes: Map, +) -> Map { + attributes.insert("lane_id".to_string(), Value::String(lane_id)); + attributes +} + +fn merge_worker_trace_fields( + worker_id: String, + cwd: Option, + boot_duration_ms: Option, + status: Option, + mut attributes: Map, +) -> Map { + attributes.insert("worker_id".to_string(), Value::String(worker_id)); + if let Some(cwd) = cwd { + attributes.insert("cwd".to_string(), Value::String(cwd)); + } + if let Some(boot_duration_ms) = boot_duration_ms { + attributes.insert( + "boot_duration_ms".to_string(), + Value::from(boot_duration_ms), + ); + } + if let Some(status) = status { + attributes.insert("status".to_string(), Value::String(status)); + } + attributes +} + fn current_timestamp_ms() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -477,6 +617,12 @@ mod tests { let sink = Arc::new(MemoryTelemetrySink::default()); let tracer = SessionTracer::new("session-123", sink.clone()); + let mut lane_open_attributes = Map::new(); + lane_open_attributes.insert("worker".to_string(), Value::String("worker-1".to_string())); + tracer.record_lane_open("lane-42", lane_open_attributes); + let mut lane_close_attributes = Map::new(); + lane_close_attributes.insert("status".to_string(), Value::String("completed".to_string())); + tracer.record_lane_close("lane-42", lane_close_attributes); tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new()); tracer.record_analytics( AnalyticsEvent::new("cli", "prompt_sent") @@ -486,6 +632,32 @@ mod tests { let events = sink.events(); assert!(matches!( &events[0], + TelemetryEvent::LaneOpen { + session_id, + lane_id, + .. + } if session_id == "session-123" && lane_id == "lane-42" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, attributes, .. }) + if name == "lane.open" && attributes.get("lane_id") == Some(&Value::String("lane-42".to_string())) + )); + assert!(matches!( + &events[2], + TelemetryEvent::LaneClose { + session_id, + lane_id, + .. + } if session_id == "session-123" && lane_id == "lane-42" + )); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, attributes, .. }) + if name == "lane.close" && attributes.get("lane_id") == Some(&Value::String("lane-42".to_string())) + )); + assert!(matches!( + &events[4], TelemetryEvent::HttpRequestStarted { session_id, attempt: 1, @@ -495,18 +667,89 @@ mod tests { } if session_id == "session-123" && method == "POST" && path == "/v1/messages" )); assert!(matches!( - &events[1], - TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. }) + &events[5], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 2, name, .. }) if name == "http_request_started" )); - assert!(matches!(&events[2], TelemetryEvent::Analytics(_))); + assert!(matches!(&events[6], TelemetryEvent::Analytics(_))); assert!(matches!( - &events[3], - TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. }) + &events[7], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 3, name, .. }) if name == "analytics" )); } + #[test] + fn session_tracer_records_worker_events_with_boot_duration() { + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-worker", sink.clone()); + + tracer.record_worker_init("worker-1", "/tmp/project", 125, Map::new()); + tracer.record_worker_done("worker-1", "finished", Some(125), None, Map::new()); + + let events = sink.events(); + assert!(matches!( + &events[0], + TelemetryEvent::WorkerInit { + session_id, + worker_id, + cwd, + boot_duration_ms: 125, + .. + } if session_id == "session-worker" && worker_id == "worker-1" && cwd == "/tmp/project" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, attributes, .. }) + if name == "worker.init" + && attributes.get("worker_id") == Some(&Value::String("worker-1".to_string())) + && attributes.get("boot_duration_ms") == Some(&Value::from(125)) + )); + assert!(matches!( + &events[2], + TelemetryEvent::WorkerDone { + session_id, + worker_id, + status, + boot_duration_ms: Some(125), + .. + } if session_id == "session-worker" && worker_id == "worker-1" && status == "finished" + )); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, attributes, .. }) + if name == "worker.done" + && attributes.get("worker_id") == Some(&Value::String("worker-1".to_string())) + && attributes.get("status") == Some(&Value::String("finished".to_string())) + )); + } + + #[test] + fn jsonl_sink_persists_lane_events() { + let path = std::env::temp_dir().join(format!( + "telemetry-jsonl-lane-{}.log", + current_timestamp_ms() + )); + let sink = JsonlTelemetrySink::new(&path).expect("sink should create file"); + + sink.record(TelemetryEvent::LaneOpen { + session_id: "session-123".to_string(), + lane_id: "lane-42".to_string(), + attributes: Map::new(), + }); + sink.record(TelemetryEvent::LaneClose { + session_id: "session-123".to_string(), + lane_id: "lane-42".to_string(), + attributes: Map::new(), + }); + + let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable"); + assert!(contents.contains("\"type\":\"lane.open\"")); + assert!(contents.contains("\"type\":\"lane.close\"")); + + let _ = std::fs::remove_file(path); + } + #[test] fn jsonl_sink_persists_events() { let path =