Skip to content
This repository was archived by the owner on Apr 8, 2026. It is now read-only.

Commit 43058dc

Browse files
committed
feat(telemetry): add worker.init and worker.done events
Wire the worker boot control plane into the existing worker lifecycle telemetry so ready and terminal transitions emit structured worker.init/worker.done events. This also records boot duration on the worker state itself, resets the timer on restart, and covers the new telemetry path with a focused registry test. Constraint: Boot duration must measure the current worker boot cycle without changing existing worker state transitions Rejected: Record boot timing only in ad-hoc session trace attributes | loses the worker lifecycle event envelope and shared helper path Confidence: high Scope-risk: narrow Directive: Keep worker boot duration tied to ReadyForPrompt for a single boot cycle and reset it whenever the worker restarts Tested: cargo build --workspace; cargo test --workspace Not-tested: Live CLI-driven worker sessions outside the in-memory registry/test harness
1 parent c195113 commit 43058dc

1 file changed

Lines changed: 205 additions & 3 deletions

File tree

rust/crates/runtime/src/worker_boot.rs

Lines changed: 205 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use std::sync::{Arc, Mutex};
1010
use std::time::{SystemTime, UNIX_EPOCH};
1111

1212
use serde::{Deserialize, Serialize};
13+
use serde_json::{Map, Value};
14+
use telemetry::SessionTracer;
1315

1416
fn now_secs() -> u64 {
1517
SystemTime::now()
@@ -18,6 +20,15 @@ fn now_secs() -> u64 {
1820
.as_secs()
1921
}
2022

23+
fn now_millis() -> u64 {
24+
SystemTime::now()
25+
.duration_since(UNIX_EPOCH)
26+
.unwrap_or_default()
27+
.as_millis()
28+
.try_into()
29+
.unwrap_or(u64::MAX)
30+
}
31+
2132
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2233
#[serde(rename_all = "snake_case")]
2334
pub enum WorkerStatus {
@@ -51,6 +62,17 @@ pub enum WorkerFailureKind {
5162
Provider,
5263
}
5364

65+
impl std::fmt::Display for WorkerFailureKind {
66+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67+
match self {
68+
Self::TrustGate => write!(f, "trust_gate"),
69+
Self::PromptDelivery => write!(f, "prompt_delivery"),
70+
Self::Protocol => write!(f, "protocol"),
71+
Self::Provider => write!(f, "provider"),
72+
}
73+
}
74+
}
75+
5476
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
5577
pub struct WorkerFailure {
5678
pub kind: WorkerFailureKind,
@@ -131,12 +153,19 @@ pub struct Worker {
131153
pub last_error: Option<WorkerFailure>,
132154
pub created_at: u64,
133155
pub updated_at: u64,
156+
#[serde(default)]
157+
pub boot_started_at_ms: u64,
158+
#[serde(default, skip_serializing_if = "Option::is_none")]
159+
pub boot_completed_at_ms: Option<u64>,
160+
#[serde(default, skip_serializing_if = "Option::is_none")]
161+
pub boot_duration_ms: Option<u64>,
134162
pub events: Vec<WorkerEvent>,
135163
}
136164

137165
#[derive(Debug, Clone, Default)]
138166
pub struct WorkerRegistry {
139167
inner: Arc<Mutex<WorkerRegistryInner>>,
168+
session_tracer: Option<SessionTracer>,
140169
}
141170

142171
#[derive(Debug, Default)]
@@ -151,6 +180,12 @@ impl WorkerRegistry {
151180
Self::default()
152181
}
153182

183+
#[must_use]
184+
pub fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self {
185+
self.session_tracer = Some(session_tracer);
186+
self
187+
}
188+
154189
#[must_use]
155190
pub fn create(
156191
&self,
@@ -161,6 +196,7 @@ impl WorkerRegistry {
161196
let mut inner = self.inner.lock().expect("worker registry lock poisoned");
162197
inner.counter += 1;
163198
let ts = now_secs();
199+
let boot_started_at_ms = now_millis();
164200
let worker_id = format!("worker_{:08x}_{}", ts, inner.counter);
165201
let trust_auto_resolve = trusted_roots
166202
.iter()
@@ -179,6 +215,9 @@ impl WorkerRegistry {
179215
last_error: None,
180216
created_at: ts,
181217
updated_at: ts,
218+
boot_started_at_ms,
219+
boot_completed_at_ms: None,
220+
boot_duration_ms: None,
182221
events: Vec::new(),
183222
};
184223
push_event(
@@ -205,6 +244,7 @@ impl WorkerRegistry {
205244
.get_mut(worker_id)
206245
.ok_or_else(|| format!("worker not found: {worker_id}"))?;
207246
let lowered = screen_text.to_ascii_lowercase();
247+
let tracer = self.session_tracer.as_ref();
208248

209249
if !worker.trust_gate_cleared && detect_trust_prompt(&lowered) {
210250
worker.status = WorkerStatus::TrustRequired;
@@ -257,7 +297,9 @@ impl WorkerRegistry {
257297
let prompt_preview = prompt_preview(worker.last_prompt.as_deref().unwrap_or_default());
258298
let message = match observation.target {
259299
WorkerPromptTarget::Shell => {
260-
format!("worker prompt landed in shell instead of coding agent: {prompt_preview}")
300+
format!(
301+
"worker prompt landed in shell instead of coding agent: {prompt_preview}"
302+
)
261303
}
262304
WorkerPromptTarget::WrongTarget => format!(
263305
"worker prompt landed in the wrong target instead of {}: {}",
@@ -302,6 +344,7 @@ impl WorkerRegistry {
302344
);
303345
} else {
304346
worker.status = WorkerStatus::Failed;
347+
record_worker_done(tracer, worker, Map::new());
305348
}
306349
return Ok(worker.clone());
307350
}
@@ -312,7 +355,9 @@ impl WorkerRegistry {
312355
worker.last_error = None;
313356
}
314357

315-
if detect_ready_for_prompt(screen_text, &lowered) && worker.status != WorkerStatus::ReadyForPrompt {
358+
if detect_ready_for_prompt(screen_text, &lowered)
359+
&& worker.status != WorkerStatus::ReadyForPrompt
360+
{
316361
worker.status = WorkerStatus::ReadyForPrompt;
317362
worker.prompt_in_flight = false;
318363
if matches!(
@@ -328,6 +373,7 @@ impl WorkerRegistry {
328373
Some("worker is ready for prompt delivery".to_string()),
329374
None,
330375
);
376+
record_worker_init(tracer, worker);
331377
}
332378

333379
Ok(worker.clone())
@@ -412,7 +458,10 @@ impl WorkerRegistry {
412458
worker_id: worker.worker_id.clone(),
413459
status: worker.status,
414460
ready: worker.status == WorkerStatus::ReadyForPrompt,
415-
blocked: matches!(worker.status, WorkerStatus::TrustRequired | WorkerStatus::Failed),
461+
blocked: matches!(
462+
worker.status,
463+
WorkerStatus::TrustRequired | WorkerStatus::Failed
464+
),
416465
replay_prompt_ready: worker.replay_prompt.is_some(),
417466
last_error: worker.last_error.clone(),
418467
})
@@ -431,6 +480,7 @@ impl WorkerRegistry {
431480
worker.last_error = None;
432481
worker.prompt_delivery_attempts = 0;
433482
worker.prompt_in_flight = false;
483+
reset_worker_boot_clock(worker);
434484
push_event(
435485
worker,
436486
WorkerEventKind::Restarted,
@@ -456,6 +506,7 @@ impl WorkerRegistry {
456506
Some("worker terminated by control plane".to_string()),
457507
None,
458508
);
509+
record_worker_done(self.session_tracer.as_ref(), worker, Map::new());
459510
Ok(worker.clone())
460511
}
461512

@@ -512,6 +563,14 @@ impl WorkerRegistry {
512563
);
513564
}
514565

566+
let mut attributes = Map::new();
567+
attributes.insert(
568+
"finish_reason".to_string(),
569+
Value::String(finish_reason.to_string()),
570+
);
571+
attributes.insert("tokens_output".to_string(), Value::from(tokens_output));
572+
record_worker_done(self.session_tracer.as_ref(), worker, attributes);
573+
515574
Ok(worker.clone())
516575
}
517576
}
@@ -556,6 +615,88 @@ fn push_event(
556615
});
557616
}
558617

618+
fn reset_worker_boot_clock(worker: &mut Worker) {
619+
worker.boot_started_at_ms = now_millis();
620+
worker.boot_completed_at_ms = None;
621+
worker.boot_duration_ms = None;
622+
}
623+
624+
fn ensure_worker_boot_duration_ms(worker: &mut Worker) -> u64 {
625+
if let Some(duration) = worker.boot_duration_ms {
626+
return duration;
627+
}
628+
629+
let completed_at_ms = now_millis();
630+
let duration = completed_at_ms.saturating_sub(worker.boot_started_at_ms);
631+
worker.boot_completed_at_ms = Some(completed_at_ms);
632+
worker.boot_duration_ms = Some(duration);
633+
duration
634+
}
635+
636+
fn worker_done_error(worker: &Worker) -> Option<String> {
637+
worker
638+
.last_error
639+
.as_ref()
640+
.map(|error| format!("{}: {}", error.kind, error.message))
641+
}
642+
643+
fn worker_done_boot_duration_ms(worker: &Worker) -> Option<u64> {
644+
worker.boot_duration_ms.or_else(|| {
645+
(worker.boot_started_at_ms > 0)
646+
.then(|| now_millis().saturating_sub(worker.boot_started_at_ms))
647+
})
648+
}
649+
650+
fn record_worker_init(tracer: Option<&SessionTracer>, worker: &mut Worker) {
651+
let Some(tracer) = tracer else {
652+
let _ = ensure_worker_boot_duration_ms(worker);
653+
return;
654+
};
655+
656+
let boot_duration_ms = ensure_worker_boot_duration_ms(worker);
657+
let mut attributes = Map::new();
658+
attributes.insert(
659+
"trust_auto_resolve".to_string(),
660+
Value::Bool(worker.trust_auto_resolve),
661+
);
662+
attributes.insert(
663+
"auto_recover_prompt_misdelivery".to_string(),
664+
Value::Bool(worker.auto_recover_prompt_misdelivery),
665+
);
666+
attributes.insert(
667+
"prompt_delivery_attempts".to_string(),
668+
Value::from(worker.prompt_delivery_attempts),
669+
);
670+
tracer.record_worker_init(
671+
worker.worker_id.clone(),
672+
worker.cwd.clone(),
673+
boot_duration_ms,
674+
attributes,
675+
);
676+
}
677+
678+
fn record_worker_done(
679+
tracer: Option<&SessionTracer>,
680+
worker: &Worker,
681+
mut attributes: Map<String, Value>,
682+
) {
683+
let Some(tracer) = tracer else {
684+
return;
685+
};
686+
687+
attributes.insert(
688+
"prompt_delivery_attempts".to_string(),
689+
Value::from(worker.prompt_delivery_attempts),
690+
);
691+
tracer.record_worker_done(
692+
worker.worker_id.clone(),
693+
worker.status.to_string(),
694+
worker_done_boot_duration_ms(worker),
695+
worker_done_error(worker),
696+
attributes,
697+
);
698+
}
699+
559700
fn path_matches_allowlist(cwd: &str, trusted_root: &str) -> bool {
560701
let cwd = normalize_path(cwd);
561702
let trusted_root = normalize_path(trusted_root);
@@ -739,6 +880,8 @@ fn cwd_matches_observed_target(expected_cwd: &str, observed_cwd: &str) -> bool {
739880
#[cfg(test)]
740881
mod tests {
741882
use super::*;
883+
use std::sync::Arc;
884+
use telemetry::{MemoryTelemetrySink, SessionTracer, TelemetryEvent};
742885

743886
#[test]
744887
fn allowlisted_trust_prompt_auto_resolves_then_reaches_ready_state() {
@@ -1019,6 +1162,65 @@ mod tests {
10191162
.any(|event| event.kind == WorkerEventKind::Finished));
10201163
}
10211164

1165+
#[test]
1166+
fn worker_registry_emits_worker_lifecycle_telemetry_with_boot_duration() {
1167+
let sink = Arc::new(MemoryTelemetrySink::default());
1168+
let tracer = SessionTracer::new("session-worker", sink.clone());
1169+
let registry = WorkerRegistry::new().with_session_tracer(tracer);
1170+
let worker = registry.create("/tmp/repo-telemetry", &[], true);
1171+
1172+
let ready = registry
1173+
.observe(&worker.worker_id, "Ready for input\n>")
1174+
.expect("ready observe should succeed");
1175+
assert_eq!(ready.status, WorkerStatus::ReadyForPrompt);
1176+
assert!(ready.boot_duration_ms.is_some());
1177+
1178+
registry
1179+
.terminate(&worker.worker_id)
1180+
.expect("terminate should succeed");
1181+
1182+
let events = sink.events();
1183+
assert!(events.iter().any(|event| {
1184+
matches!(
1185+
event,
1186+
TelemetryEvent::WorkerInit {
1187+
session_id,
1188+
worker_id,
1189+
boot_duration_ms,
1190+
..
1191+
} if session_id == "session-worker"
1192+
&& worker_id == &worker.worker_id
1193+
&& Some(*boot_duration_ms) == ready.boot_duration_ms
1194+
)
1195+
}));
1196+
assert!(events.iter().any(|event| {
1197+
matches!(
1198+
event,
1199+
TelemetryEvent::WorkerDone {
1200+
session_id,
1201+
worker_id,
1202+
status,
1203+
boot_duration_ms: Some(_),
1204+
..
1205+
} if session_id == "session-worker"
1206+
&& worker_id == &worker.worker_id
1207+
&& status == "finished"
1208+
)
1209+
}));
1210+
assert!(events.iter().any(|event| {
1211+
matches!(
1212+
event,
1213+
TelemetryEvent::SessionTrace(trace) if trace.name == "worker.init"
1214+
)
1215+
}));
1216+
assert!(events.iter().any(|event| {
1217+
matches!(
1218+
event,
1219+
TelemetryEvent::SessionTrace(trace) if trace.name == "worker.done"
1220+
)
1221+
}));
1222+
}
1223+
10221224
#[test]
10231225
fn observe_completion_classifies_provider_failure_on_unknown_finish_zero_tokens() {
10241226
let registry = WorkerRegistry::new();

0 commit comments

Comments
 (0)