feat(telemetry): add lane.open and lane.close events

Add typed lane lifecycle telemetry entries and mirror them into session trace and JSONL output using the dotted lane.open/lane.close wire names. This keeps lane lifecycle data queryable without routing it through generic analytics events.

Constraint: Keep telemetry crate changes backward-compatible for existing HTTP and analytics event consumers
Rejected: Reuse generic analytics events for lane lifecycle | loses dedicated typed telemetry variants
Rejected: Keep snake_case lane_open/lane_close wire names | does not match the requested lane.open/lane.close event names
Confidence: high
Scope-risk: narrow
Reversibility: clean
Directive: Preserve lane.open/lane.close wire names and the lane_id attribute key unless downstream consumers are migrated together
Tested: cargo build --workspace; cargo test --workspace
Not-tested: Runtime wiring that emits lane open/close events from higher-level crates
This commit is contained in:
Yeachan-Heo
2026-04-04 16:04:23 +00:00
parent 476b03e609
commit f0e5a9d6a0
+248 -5
View File
@@ -198,6 +198,41 @@ pub enum TelemetryEvent {
#[serde(default, skip_serializing_if = "Map::is_empty")]
attributes: Map<String, Value>,
},
#[serde(rename = "worker.init")]
WorkerInit {
session_id: String,
worker_id: String,
cwd: String,
boot_duration_ms: u64,
#[serde(default, skip_serializing_if = "Map::is_empty")]
attributes: Map<String, Value>,
},
#[serde(rename = "worker.done")]
WorkerDone {
session_id: String,
worker_id: String,
status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
boot_duration_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(default, skip_serializing_if = "Map::is_empty")]
attributes: Map<String, Value>,
},
#[serde(rename = "lane.open")]
LaneOpen {
session_id: String,
lane_id: String,
#[serde(default, skip_serializing_if = "Map::is_empty")]
attributes: Map<String, Value>,
},
#[serde(rename = "lane.close")]
LaneClose {
session_id: String,
lane_id: String,
#[serde(default, skip_serializing_if = "Map::is_empty")]
attributes: Map<String, Value>,
},
Analytics(AnalyticsEvent),
SessionTrace(SessionTraceRecord),
}
@@ -394,6 +429,80 @@ impl SessionTracer {
self.record("http_request_failed", trace_attributes);
}
pub fn record_worker_init(
&self,
worker_id: impl Into<String>,
cwd: impl Into<String>,
boot_duration_ms: u64,
attributes: Map<String, Value>,
) {
let worker_id = worker_id.into();
let cwd = cwd.into();
self.sink.record(TelemetryEvent::WorkerInit {
session_id: self.session_id.clone(),
worker_id: worker_id.clone(),
cwd: cwd.clone(),
boot_duration_ms,
attributes: attributes.clone(),
});
self.record(
"worker.init",
merge_worker_trace_fields(
worker_id,
Some(cwd),
Some(boot_duration_ms),
None,
attributes,
),
);
}
pub fn record_worker_done(
&self,
worker_id: impl Into<String>,
status: impl Into<String>,
boot_duration_ms: Option<u64>,
error: Option<String>,
attributes: Map<String, Value>,
) {
let worker_id = worker_id.into();
let status = status.into();
self.sink.record(TelemetryEvent::WorkerDone {
session_id: self.session_id.clone(),
worker_id: worker_id.clone(),
status: status.clone(),
boot_duration_ms,
error: error.clone(),
attributes: attributes.clone(),
});
let mut trace_attributes =
merge_worker_trace_fields(worker_id, None, boot_duration_ms, Some(status), attributes);
if let Some(error) = error {
trace_attributes.insert("error".to_string(), Value::String(error));
}
self.record("worker.done", trace_attributes);
}
pub fn record_lane_open(&self, lane_id: impl Into<String>, attributes: Map<String, Value>) {
let lane_id = lane_id.into();
self.sink.record(TelemetryEvent::LaneOpen {
session_id: self.session_id.clone(),
lane_id: lane_id.clone(),
attributes: attributes.clone(),
});
self.record("lane.open", merge_lane_trace_fields(lane_id, attributes));
}
pub fn record_lane_close(&self, lane_id: impl Into<String>, attributes: Map<String, Value>) {
let lane_id = lane_id.into();
self.sink.record(TelemetryEvent::LaneClose {
session_id: self.session_id.clone(),
lane_id: lane_id.clone(),
attributes: attributes.clone(),
});
self.record("lane.close", merge_lane_trace_fields(lane_id, attributes));
}
pub fn record_analytics(&self, event: AnalyticsEvent) {
let mut attributes = event.properties.clone();
attributes.insert(
@@ -418,6 +527,37 @@ fn merge_trace_fields(
attributes
}
fn merge_lane_trace_fields(
lane_id: String,
mut attributes: Map<String, Value>,
) -> Map<String, Value> {
attributes.insert("lane_id".to_string(), Value::String(lane_id));
attributes
}
fn merge_worker_trace_fields(
worker_id: String,
cwd: Option<String>,
boot_duration_ms: Option<u64>,
status: Option<String>,
mut attributes: Map<String, Value>,
) -> Map<String, Value> {
attributes.insert("worker_id".to_string(), Value::String(worker_id));
if let Some(cwd) = cwd {
attributes.insert("cwd".to_string(), Value::String(cwd));
}
if let Some(boot_duration_ms) = boot_duration_ms {
attributes.insert(
"boot_duration_ms".to_string(),
Value::from(boot_duration_ms),
);
}
if let Some(status) = status {
attributes.insert("status".to_string(), Value::String(status));
}
attributes
}
fn current_timestamp_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -477,6 +617,12 @@ mod tests {
let sink = Arc::new(MemoryTelemetrySink::default());
let tracer = SessionTracer::new("session-123", sink.clone());
let mut lane_open_attributes = Map::new();
lane_open_attributes.insert("worker".to_string(), Value::String("worker-1".to_string()));
tracer.record_lane_open("lane-42", lane_open_attributes);
let mut lane_close_attributes = Map::new();
lane_close_attributes.insert("status".to_string(), Value::String("completed".to_string()));
tracer.record_lane_close("lane-42", lane_close_attributes);
tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new());
tracer.record_analytics(
AnalyticsEvent::new("cli", "prompt_sent")
@@ -486,6 +632,32 @@ mod tests {
let events = sink.events();
assert!(matches!(
&events[0],
TelemetryEvent::LaneOpen {
session_id,
lane_id,
..
} if session_id == "session-123" && lane_id == "lane-42"
));
assert!(matches!(
&events[1],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, attributes, .. })
if name == "lane.open" && attributes.get("lane_id") == Some(&Value::String("lane-42".to_string()))
));
assert!(matches!(
&events[2],
TelemetryEvent::LaneClose {
session_id,
lane_id,
..
} if session_id == "session-123" && lane_id == "lane-42"
));
assert!(matches!(
&events[3],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, attributes, .. })
if name == "lane.close" && attributes.get("lane_id") == Some(&Value::String("lane-42".to_string()))
));
assert!(matches!(
&events[4],
TelemetryEvent::HttpRequestStarted {
session_id,
attempt: 1,
@@ -495,18 +667,89 @@ mod tests {
} if session_id == "session-123" && method == "POST" && path == "/v1/messages"
));
assert!(matches!(
&events[1],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. })
&events[5],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 2, name, .. })
if name == "http_request_started"
));
assert!(matches!(&events[2], TelemetryEvent::Analytics(_)));
assert!(matches!(&events[6], TelemetryEvent::Analytics(_)));
assert!(matches!(
&events[3],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. })
&events[7],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 3, name, .. })
if name == "analytics"
));
}
#[test]
fn session_tracer_records_worker_events_with_boot_duration() {
let sink = Arc::new(MemoryTelemetrySink::default());
let tracer = SessionTracer::new("session-worker", sink.clone());
tracer.record_worker_init("worker-1", "/tmp/project", 125, Map::new());
tracer.record_worker_done("worker-1", "finished", Some(125), None, Map::new());
let events = sink.events();
assert!(matches!(
&events[0],
TelemetryEvent::WorkerInit {
session_id,
worker_id,
cwd,
boot_duration_ms: 125,
..
} if session_id == "session-worker" && worker_id == "worker-1" && cwd == "/tmp/project"
));
assert!(matches!(
&events[1],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, attributes, .. })
if name == "worker.init"
&& attributes.get("worker_id") == Some(&Value::String("worker-1".to_string()))
&& attributes.get("boot_duration_ms") == Some(&Value::from(125))
));
assert!(matches!(
&events[2],
TelemetryEvent::WorkerDone {
session_id,
worker_id,
status,
boot_duration_ms: Some(125),
..
} if session_id == "session-worker" && worker_id == "worker-1" && status == "finished"
));
assert!(matches!(
&events[3],
TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, attributes, .. })
if name == "worker.done"
&& attributes.get("worker_id") == Some(&Value::String("worker-1".to_string()))
&& attributes.get("status") == Some(&Value::String("finished".to_string()))
));
}
#[test]
fn jsonl_sink_persists_lane_events() {
let path = std::env::temp_dir().join(format!(
"telemetry-jsonl-lane-{}.log",
current_timestamp_ms()
));
let sink = JsonlTelemetrySink::new(&path).expect("sink should create file");
sink.record(TelemetryEvent::LaneOpen {
session_id: "session-123".to_string(),
lane_id: "lane-42".to_string(),
attributes: Map::new(),
});
sink.record(TelemetryEvent::LaneClose {
session_id: "session-123".to_string(),
lane_id: "lane-42".to_string(),
attributes: Map::new(),
});
let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable");
assert!(contents.contains("\"type\":\"lane.open\""));
assert!(contents.contains("\"type\":\"lane.close\""));
let _ = std::fs::remove_file(path);
}
#[test]
fn jsonl_sink_persists_events() {
let path =