From c35df46566e16aefe6ba878928a112388cea0f5d Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:25:53 +0800 Subject: [PATCH 1/8] feat: add team run source metadata --- crates/aionui-api-types/src/lib.rs | 2 +- crates/aionui-api-types/src/team.rs | 72 +++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/crates/aionui-api-types/src/lib.rs b/crates/aionui-api-types/src/lib.rs index 647faac67..adf91585c 100644 --- a/crates/aionui-api-types/src/lib.rs +++ b/crates/aionui-api-types/src/lib.rs @@ -151,7 +151,7 @@ pub use team::{ TeamAgentRemovedPayload, TeamAgentRenamedPayload, TeamAgentResponse, TeamAgentSpawnedPayload, TeamAgentStatusPayload, TeamChildTurnPayload, TeamListResponse, TeamMcpPhase, TeamMcpRuntimeConfig, TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole, - TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason, + TeamRunSource, TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason, TeamSendMessageStatus, TeamSendMessageTargetQueueState, TeamSessionBinding, TeamSlotRuntimeHealth, TeamSlotWorkPayload, TeammateMessagePayload, }; diff --git a/crates/aionui-api-types/src/team.rs b/crates/aionui-api-types/src/team.rs index 4418a2eeb..a59b5686f 100644 --- a/crates/aionui-api-types/src/team.rs +++ b/crates/aionui-api-types/src/team.rs @@ -201,6 +201,13 @@ pub enum TeamRunStatus { Failed, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TeamRunSource { + UserMessage, + RecoveryDrain, +} + #[derive(Debug, Deserialize)] pub struct CancelTeamRunRequest { #[serde(default)] @@ -225,6 +232,8 @@ pub struct PauseTeamSlotRequest { pub struct TeamRunAckResponse { pub team_run_id: String, pub team_id: String, + pub source: TeamRunSource, + pub has_user_intervention: bool, pub target_slot_id: String, pub target_role: TeamRunTargetRole, pub accepted_slot_id: String, @@ -269,6 +278,8 @@ pub struct TeamSlotWorkPayload { pub struct TeamRunPayload { pub team_id: String, pub team_run_id: String, + pub source: TeamRunSource, + pub has_user_intervention: bool, pub target_slot_id: String, pub target_role: TeamRunTargetRole, pub status: TeamRunStatus, @@ -1110,6 +1121,8 @@ mod tests { let ack = TeamRunAckResponse { team_run_id: "trun-1".into(), team_id: "team-1".into(), + source: TeamRunSource::UserMessage, + has_user_intervention: true, target_slot_id: "lead-1".into(), target_role: TeamRunTargetRole::Lead, accepted_slot_id: "lead-1".into(), @@ -1129,6 +1142,8 @@ mod tests { let ack = TeamRunAckResponse { team_run_id: "trun-1".into(), team_id: "team-1".into(), + source: TeamRunSource::UserMessage, + has_user_intervention: true, target_slot_id: "lead-1".into(), target_role: TeamRunTargetRole::Lead, accepted_slot_id: "worker-1".into(), @@ -1144,11 +1159,64 @@ mod tests { assert_eq!(value["accepted_role"], "teammate"); } + #[test] + fn team_run_source_serializes_snake_case() { + let user = serde_json::to_value(TeamRunSource::UserMessage).unwrap(); + let recovery = serde_json::to_value(TeamRunSource::RecoveryDrain).unwrap(); + + assert_eq!(user, serde_json::json!("user_message")); + assert_eq!(recovery, serde_json::json!("recovery_drain")); + } + + #[test] + fn team_run_payload_serializes_source_metadata() { + let payload = TeamRunPayload { + team_id: "team-1".into(), + team_run_id: "run-1".into(), + source: TeamRunSource::RecoveryDrain, + has_user_intervention: false, + target_slot_id: "lead-1".into(), + target_role: TeamRunTargetRole::Lead, + status: TeamRunStatus::Accepted, + active_child_count: 0, + pending_wake_count: 1, + starting_child_count: 0, + slot_work: vec![], + }; + + let json = serde_json::to_value(payload).unwrap(); + assert_eq!(json["source"], "recovery_drain"); + assert_eq!(json["has_user_intervention"], false); + } + + #[test] + fn team_run_ack_serializes_source_metadata() { + let ack = TeamRunAckResponse { + team_run_id: "run-1".into(), + team_id: "team-1".into(), + source: TeamRunSource::UserMessage, + has_user_intervention: true, + target_slot_id: "lead-1".into(), + target_role: TeamRunTargetRole::Lead, + accepted_slot_id: "lead-1".into(), + accepted_role: TeamRunTargetRole::Lead, + status: TeamRunStatus::Accepted, + message_id: Some("mailbox-1".into()), + }; + + let json = serde_json::to_value(ack).unwrap(); + assert_eq!(json["source"], "user_message"); + assert_eq!(json["has_user_intervention"], true); + assert_eq!(json["message_id"], "mailbox-1"); + } + #[test] fn team_run_payload_omits_sensitive_content() { let payload = TeamRunPayload { team_id: "team-1".into(), team_run_id: "trun-1".into(), + source: TeamRunSource::UserMessage, + has_user_intervention: true, target_slot_id: "worker-1".into(), target_role: TeamRunTargetRole::Teammate, status: TeamRunStatus::Running, @@ -1170,6 +1238,8 @@ mod tests { let payload = TeamRunPayload { team_id: "team-1".into(), team_run_id: "trun-1".into(), + source: TeamRunSource::UserMessage, + has_user_intervention: true, target_slot_id: "lead-1".into(), target_role: TeamRunTargetRole::Lead, status: TeamRunStatus::Running, @@ -1216,6 +1286,8 @@ mod tests { let decoded: TeamRunPayload = serde_json::from_value(serde_json::json!({ "team_id": "team-1", "team_run_id": "trun-1", + "source": "user_message", + "has_user_intervention": true, "target_slot_id": "lead-1", "target_role": "lead", "status": "running", From 5e091fd8ecbafb2296396eb746206794471152e4 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:28:38 +0800 Subject: [PATCH 2/8] feat: carry team run source metadata --- crates/aionui-team/src/events.rs | 2 + crates/aionui-team/src/team_run.rs | 111 +++++++++++++++++++---------- 2 files changed, 76 insertions(+), 37 deletions(-) diff --git a/crates/aionui-team/src/events.rs b/crates/aionui-team/src/events.rs index 30dc2cf49..b63e686c0 100644 --- a/crates/aionui-team/src/events.rs +++ b/crates/aionui-team/src/events.rs @@ -272,6 +272,8 @@ mod tests { aionui_api_types::TeamRunPayload { team_id: "team-1".into(), team_run_id: "run-1".into(), + source: aionui_api_types::TeamRunSource::UserMessage, + has_user_intervention: true, target_slot_id: "lead-1".into(), target_role: aionui_api_types::TeamRunTargetRole::Lead, status: aionui_api_types::TeamRunStatus::Accepted, diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index f6072992d..b9121687c 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -2,8 +2,8 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use aionui_api_types::{ - TeamChildTurnPayload, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole, TeamSlotRuntimeHealth, - TeamSlotWorkPayload, + TeamChildTurnPayload, TeamRunAckResponse, TeamRunPayload, TeamRunSource, TeamRunStatus, TeamRunTargetRole, + TeamSlotRuntimeHealth, TeamSlotWorkPayload, }; use aionui_common::{TimestampMs, generate_id, now_ms}; use tokio::sync::Mutex; @@ -141,6 +141,8 @@ pub(crate) struct PendingWakeView { struct TeamRunRecord { team_run_id: String, team_id: String, + source: TeamRunSource, + has_user_intervention: bool, target_slot_id: String, target_role: TeamRunTargetRole, status: TeamRunStatus, @@ -281,6 +283,8 @@ impl TeamRunRecord { TeamRunPayload { team_id: self.team_id.clone(), team_run_id: self.team_run_id.clone(), + source: self.source.clone(), + has_user_intervention: self.has_user_intervention, target_slot_id: self.target_slot_id.clone(), target_role: self.target_role.clone(), status: self.status.clone(), @@ -300,6 +304,8 @@ impl TeamRunRecord { TeamRunAckResponse { team_run_id: self.team_run_id.clone(), team_id: self.team_id.clone(), + source: self.source.clone(), + has_user_intervention: self.has_user_intervention, target_slot_id: self.target_slot_id.clone(), target_role: self.target_role.clone(), accepted_slot_id: accepted_slot_id.to_owned(), @@ -334,6 +340,34 @@ fn new_operation_lease( lease } +fn new_team_run_record( + team_id: String, + target_slot_id: &str, + target_role: TeamRunTargetRole, + source: TeamRunSource, + has_user_intervention: bool, +) -> TeamRunRecord { + TeamRunRecord { + team_run_id: generate_id(), + team_id, + source, + has_user_intervention, + target_slot_id: target_slot_id.to_owned(), + target_role, + status: TeamRunStatus::Accepted, + started_at: None, + completed_at: None, + cancelled_at: None, + cancel_reason: None, + active_child_turns: HashMap::new(), + starting_reservations: HashMap::new(), + pending_wakes: HashMap::new(), + slot_runtime_health: HashMap::new(), + slot_wake_gate: SlotWakeGate::default(), + active_operation_leases: HashMap::new(), + } +} + fn push_pending_wake_locked( run: &mut TeamRunRecord, slot_id: String, @@ -425,7 +459,7 @@ impl TeamRunManager { message_id: Option, ) -> Result { let mut guard = self.state.lock().await; - if let Some(active) = guard.as_ref().filter(|r| r.is_active()) { + if let Some(active) = guard.as_mut().filter(|r| r.is_active()) { if allow_active_intervention { if active.slot_is_busy(target_slot_id) { info!( @@ -446,6 +480,7 @@ impl TeamRunManager { active_target_role = ?active.target_role, "team_run active intervention accepted" ); + active.has_user_intervention = true; return Ok(active.ack(target_slot_id, target_role, message_id)); } return Err(TeamError::InvalidRequest("team run is already active".into())); @@ -457,23 +492,13 @@ impl TeamRunManager { ))); } - let record = TeamRunRecord { - team_run_id: generate_id(), - team_id: self.team_id.clone(), - target_slot_id: target_slot_id.to_owned(), - target_role: target_role.clone(), - status: TeamRunStatus::Accepted, - started_at: None, - completed_at: None, - cancelled_at: None, - cancel_reason: None, - active_child_turns: HashMap::new(), - starting_reservations: HashMap::new(), - pending_wakes: HashMap::new(), - slot_runtime_health: HashMap::new(), - slot_wake_gate: SlotWakeGate::default(), - active_operation_leases: HashMap::new(), - }; + let record = new_team_run_record( + self.team_id.clone(), + target_slot_id, + target_role.clone(), + TeamRunSource::UserMessage, + true, + ); let ack = record.ack(target_slot_id, target_role, message_id); let payload = record.payload(); *guard = Some(record); @@ -511,6 +536,7 @@ impl TeamRunManager { let source = TeamWakeSource::UserIntervention; let _ = run.slot_wake_gate.before_wake(slot_id, source, None); + run.has_user_intervention = true; let lease = new_operation_lease(run, slot_id, role.clone(), source, false); let ack = run.ack(slot_id, role, None); info!( @@ -532,23 +558,13 @@ impl TeamRunManager { ))); } - let mut record = TeamRunRecord { - team_run_id: generate_id(), - team_id: self.team_id.clone(), - target_slot_id: slot_id.to_owned(), - target_role: role.clone(), - status: TeamRunStatus::Accepted, - started_at: None, - completed_at: None, - cancelled_at: None, - cancel_reason: None, - active_child_turns: HashMap::new(), - starting_reservations: HashMap::new(), - pending_wakes: HashMap::new(), - slot_runtime_health: HashMap::new(), - slot_wake_gate: SlotWakeGate::default(), - active_operation_leases: HashMap::new(), - }; + let mut record = new_team_run_record( + self.team_id.clone(), + slot_id, + role.clone(), + TeamRunSource::UserMessage, + true, + ); let lease = new_operation_lease(&mut record, slot_id, role.clone(), TeamWakeSource::UserMessage, true); let ack = record.ack(slot_id, role, None); let payload = record.payload(); @@ -1744,6 +1760,27 @@ mod tests { .expect("slot work must exist") } + #[tokio::test] + async fn user_message_run_payload_has_user_source() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user message should create run"); + + assert_eq!(ack.source, TeamRunSource::UserMessage); + assert!(ack.has_user_intervention); + + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-1".into())) + .await + .expect("commit user wake"); + + let payload = manager.current_payload().await.expect("active payload"); + assert_eq!(payload.source, TeamRunSource::UserMessage); + assert!(payload.has_user_intervention); + } + #[tokio::test] async fn lease_keeps_run_active_until_commit() { let (manager, _bc) = manager(); From e5e85a1c50abb5b83ea8c022fe8bb0d764017568 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:31:09 +0800 Subject: [PATCH 3/8] feat: add team recovery backlog wakes --- crates/aionui-team/src/team_run.rs | 428 +++++++++++++++++++++++++++-- crates/aionui-team/src/wake.rs | 10 +- 2 files changed, 406 insertions(+), 32 deletions(-) diff --git a/crates/aionui-team/src/team_run.rs b/crates/aionui-team/src/team_run.rs index b9121687c..1ec7148f4 100644 --- a/crates/aionui-team/src/team_run.rs +++ b/crates/aionui-team/src/team_run.rs @@ -137,6 +137,21 @@ pub(crate) struct PendingWakeView { pub message_id: Option, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct RecoveryWakeCandidate { + pub slot_id: String, + pub role: TeamRunTargetRole, + pub unread_count: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct RecoveryBacklogResult { + pub team_run_id: String, + pub source: TeamRunSource, + pub recorded_wakes: Vec, + pub pending_wake_count: usize, +} + #[derive(Debug, Clone)] struct TeamRunRecord { team_run_id: String, @@ -375,15 +390,26 @@ fn push_pending_wake_locked( source: TeamWakeSource, message_id: Option, ) { - run.pending_wakes - .entry(slot_id.clone()) - .or_default() - .push_back(PendingWake { - slot_id, - role, - source, - message_id, - }); + let wake = PendingWake { + slot_id: slot_id.clone(), + role, + source, + message_id, + }; + let queue = run.pending_wakes.entry(slot_id).or_default(); + if is_foreground_wake(source) { + let insert_at = queue + .iter() + .position(|pending| !is_foreground_wake(pending.source)) + .unwrap_or(queue.len()); + queue.insert(insert_at, wake); + } else { + queue.push_back(wake); + } +} + +fn is_foreground_wake(source: TeamWakeSource) -> bool { + matches!(source, TeamWakeSource::UserMessage | TeamWakeSource::UserIntervention) } fn acquire_policy( @@ -423,7 +449,8 @@ fn acquire_policy( AcquirePolicyDecision::Suppress("background_notification_deduped") } }, - TeamWakeSource::CrashNotification + TeamWakeSource::RecoveryDrain + | TeamWakeSource::CrashNotification | TeamWakeSource::InactivityTimeout | TeamWakeSource::SpawnAttachFailure | TeamWakeSource::ShutdownRejected => match slot_state { @@ -591,6 +618,120 @@ impl TeamRunManager { Ok((ack, lease)) } + pub(crate) async fn recover_mailbox_backlog( + &self, + candidates: Vec, + ) -> Option { + if candidates.is_empty() { + return None; + } + + let mut guard = self.state.lock().await; + if let Some(run) = guard.as_ref().filter(|r| matches!(r.status, TeamRunStatus::Cancelling)) { + warn!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + source = %TeamWakeSource::RecoveryDrain, + candidate_count = candidates.len(), + reason = "active_run_cancelling", + "team recovery backlog skipped" + ); + return None; + } + + if guard.as_ref().is_none_or(|run| !run.is_active()) { + let first = candidates.first().expect("checked non-empty"); + let mut run = new_team_run_record( + self.team_id.clone(), + &first.slot_id, + first.role.clone(), + TeamRunSource::RecoveryDrain, + false, + ); + for candidate in &candidates { + push_pending_wake_locked( + &mut run, + candidate.slot_id.clone(), + candidate.role.clone(), + TeamWakeSource::RecoveryDrain, + None, + ); + } + let payload = run.payload(); + let result = RecoveryBacklogResult { + team_run_id: run.team_run_id.clone(), + source: run.source.clone(), + recorded_wakes: candidates.iter().map(|candidate| candidate.slot_id.clone()).collect(), + pending_wake_count: payload.pending_wake_count, + }; + *guard = Some(run); + drop(guard); + + info!( + team_id = %self.team_id, + team_run_id = %result.team_run_id, + source = "recovery_drain", + slot_count = result.recorded_wakes.len(), + pending_wake_count = result.pending_wake_count, + reason = "orphan_mailbox_backlog", + "team recovery drain accepted" + ); + self.emitter.broadcast_team_run(TEAM_RUN_ACCEPTED_EVENT, payload); + return Some(result); + } + + let run = guard.as_mut().expect("active checked"); + let mut recorded_wakes = Vec::new(); + for candidate in candidates { + let slot_state = run.slot_run_state(&candidate.slot_id); + if !matches!(slot_state, TeamRunSlotState::Idle) { + debug!( + team_id = %self.team_id, + team_run_id = %run.team_run_id, + slot_id = %candidate.slot_id, + source = "recovery_drain", + unread_count = candidate.unread_count, + slot_state = ?slot_state, + reason = "slot_already_has_run_work", + "team recovery backlog wake skipped" + ); + continue; + } + push_pending_wake_locked( + run, + candidate.slot_id.clone(), + candidate.role, + TeamWakeSource::RecoveryDrain, + None, + ); + recorded_wakes.push(candidate.slot_id); + } + + if recorded_wakes.is_empty() { + return None; + } + + let payload = run.payload(); + let result = RecoveryBacklogResult { + team_run_id: run.team_run_id.clone(), + source: run.source.clone(), + recorded_wakes, + pending_wake_count: payload.pending_wake_count, + }; + info!( + team_id = %self.team_id, + team_run_id = %result.team_run_id, + source = "recovery_drain", + slot_count = result.recorded_wakes.len(), + pending_wake_count = result.pending_wake_count, + reason = "attached_to_active_run", + "team recovery backlog attached to active run" + ); + drop(guard); + self.emitter.broadcast_team_run(TEAM_RUN_UPDATED_EVENT, payload); + Some(result) + } + pub(crate) async fn commit_operation_lease( &self, lease_id: &str, @@ -956,16 +1097,13 @@ impl TeamRunManager { Ok(WakeRecordDecision::Suppressed) } WakeGateDecision::Record { resumed_from_pause } => { - let pending = PendingWake { - slot_id: slot_id.to_owned(), - role: target_role.clone(), - source: wake_source, - message_id: trigger_message_id.clone(), - }; - run.pending_wakes - .entry(slot_id.to_owned()) - .or_default() - .push_back(pending); + push_pending_wake_locked( + run, + slot_id.to_owned(), + target_role.clone(), + wake_source, + trigger_message_id.clone(), + ); let slot_pending_wake_count = run.pending_wake_count_for_slot(slot_id); let payload = run.payload(); if resumed_from_pause { @@ -1588,15 +1726,7 @@ impl TeamRunManager { let mut guard = self.state.lock().await; let run = guard.as_mut().filter(|r| r.is_active())?; let source = run.slot_wake_gate.release_suppressed_if_resumed(slot_id)?; - run.pending_wakes - .entry(slot_id.to_owned()) - .or_default() - .push_back(PendingWake { - slot_id: slot_id.to_owned(), - role: role.clone(), - source, - message_id: None, - }); + push_pending_wake_locked(run, slot_id.to_owned(), role.clone(), source, None); let payload = run.payload(); let slot_work = payload.slot_work.iter().find(|work| work.slot_id == slot_id); info!( @@ -1781,6 +1911,246 @@ mod tests { assert!(payload.has_user_intervention); } + #[tokio::test] + async fn active_recovery_run_records_user_intervention_without_changing_source() { + let (manager, _bc) = manager(); + let result = manager + .recover_mailbox_backlog(vec![RecoveryWakeCandidate { + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + unread_count: 2, + }]) + .await + .expect("recovery scan should succeed"); + + assert_eq!(result.source, TeamRunSource::RecoveryDrain); + assert_eq!( + manager.current_payload().await.unwrap().source, + TeamRunSource::RecoveryDrain + ); + + let (ack, lease) = manager + .acquire_user_message_wake("worker", TeamRunTargetRole::Teammate) + .await + .expect("user intervention should join recovery run"); + assert_eq!(ack.team_run_id, result.team_run_id); + assert_eq!(ack.source, TeamRunSource::RecoveryDrain); + assert!(ack.has_user_intervention); + assert!(!lease.accepted_as_new_run); + + let payload = manager.current_payload().await.expect("active payload"); + assert_eq!(payload.source, TeamRunSource::RecoveryDrain); + assert!(payload.has_user_intervention); + } + + #[tokio::test] + async fn recovery_drain_creates_run_with_pending_wakes() { + let (manager, _bc) = manager(); + let result = manager + .recover_mailbox_backlog(vec![ + RecoveryWakeCandidate { + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + unread_count: 2, + }, + RecoveryWakeCandidate { + slot_id: "worker".into(), + role: TeamRunTargetRole::Teammate, + unread_count: 1, + }, + ]) + .await + .expect("recovery should create run"); + + assert_eq!(result.source, TeamRunSource::RecoveryDrain); + assert_eq!(result.recorded_wakes.len(), 2); + assert_eq!(result.pending_wake_count, 2); + + let lead = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .expect("lead recovery wake should be claimable"); + assert_eq!(lead.team_run_id, result.team_run_id); + assert_eq!(lead.wake_source, TeamWakeSource::RecoveryDrain); + assert!(lead.message_id.is_none()); + } + + #[tokio::test] + async fn recovery_backlog_attaches_to_active_user_run() { + let (manager, _bc) = manager(); + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user run"); + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-user".into())) + .await + .expect("commit user wake"); + + let result = manager + .recover_mailbox_backlog(vec![RecoveryWakeCandidate { + slot_id: "worker".into(), + role: TeamRunTargetRole::Teammate, + unread_count: 3, + }]) + .await + .expect("recovery should attach to active run"); + + assert_eq!(result.team_run_id, ack.team_run_id); + assert_eq!(result.source, TeamRunSource::UserMessage); + assert_eq!(result.recorded_wakes, vec!["worker".to_string()]); + + let worker = manager + .claim_wake_for_turn("worker", TeamRunTargetRole::Teammate, "conv-worker") + .await + .expect("attached recovery wake should be claimable"); + assert_eq!(worker.wake_source, TeamWakeSource::RecoveryDrain); + assert!(worker.message_id.is_none()); + } + + #[tokio::test] + async fn user_intervention_wake_prioritizes_over_recovery_backlog() { + let (manager, _bc) = manager(); + let result = manager + .recover_mailbox_backlog(vec![RecoveryWakeCandidate { + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + unread_count: 2, + }]) + .await + .expect("recovery should create run"); + + let (ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user intervention should join recovery run"); + assert_eq!(ack.team_run_id, result.team_run_id); + assert_eq!(ack.source, TeamRunSource::RecoveryDrain); + assert!(ack.has_user_intervention); + assert!(!lease.accepted_as_new_run); + + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-user".into())) + .await + .expect("commit user intervention wake"); + + let foreground = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead-user") + .await + .expect("foreground user wake should be claimed first"); + assert_eq!(foreground.team_run_id, result.team_run_id); + assert_eq!(foreground.wake_source, TeamWakeSource::UserIntervention); + assert_eq!(foreground.message_id.as_deref(), Some("mailbox-user")); + + let recovery = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead-recovery") + .await + .expect("recovery backlog should remain after foreground wake"); + assert_eq!(recovery.team_run_id, result.team_run_id); + assert_eq!(recovery.wake_source, TeamWakeSource::RecoveryDrain); + assert!(recovery.message_id.is_none()); + } + + #[tokio::test] + async fn recovery_backlog_does_not_duplicate_pending_slot_work() { + let (manager, _bc) = manager(); + let (_ack, lease) = manager + .acquire_user_message_wake("worker", TeamRunTargetRole::Teammate) + .await + .expect("user run"); + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-worker".into())) + .await + .expect("commit pending user wake"); + + let result = manager + .recover_mailbox_backlog(vec![RecoveryWakeCandidate { + slot_id: "worker".into(), + role: TeamRunTargetRole::Teammate, + unread_count: 3, + }]) + .await; + + assert!(result.is_none(), "pending slot work already owns the unread backlog"); + + let user_wake = manager + .claim_wake_for_turn("worker", TeamRunTargetRole::Teammate, "conv-worker") + .await + .expect("original pending user wake should remain"); + assert_eq!(user_wake.wake_source, TeamWakeSource::UserMessage); + assert_eq!(user_wake.message_id.as_deref(), Some("mailbox-worker")); + + assert!( + manager + .claim_wake_for_turn("worker", TeamRunTargetRole::Teammate, "conv-worker-dup") + .await + .is_none(), + "recovery scan must not append a duplicate wake for represented work" + ); + } + + #[tokio::test] + async fn recovery_backlog_does_not_duplicate_paused_gate_work() { + let (manager, _bc) = manager(); + let (_ack, lease) = manager + .acquire_user_message_wake("lead", TeamRunTargetRole::Lead) + .await + .expect("user run"); + manager + .commit_operation_lease(&lease.lease_id, Some("mailbox-lead".into())) + .await + .expect("commit pending user wake"); + + let reservation = manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead") + .await + .expect("claim lead wake"); + let child = ActiveChildTurn { + team_run_id: reservation.team_run_id.clone(), + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + conversation_id: "conv-lead".into(), + turn_id: "turn-lead".into(), + started_at_ms: now_ms(), + last_slow_notified_at_ms: None, + }; + assert_eq!( + manager + .record_child_started(&reservation.reservation_id, child.clone()) + .await, + ChildStartDecision::Accepted + ); + manager + .complete_pause_after_child_cancelled(&child, Some("test_pause".into())) + .await + .expect("pause slot"); + + let result = manager + .recover_mailbox_backlog(vec![RecoveryWakeCandidate { + slot_id: "lead".into(), + role: TeamRunTargetRole::Lead, + unread_count: 3, + }]) + .await; + + assert!(result.is_none(), "paused wake gate already represents retained work"); + assert!( + manager + .claim_wake_for_turn("lead", TeamRunTargetRole::Lead, "conv-lead-dup") + .await + .is_none(), + "recovery scan must not append a duplicate wake for paused gate work" + ); + } + + #[tokio::test] + async fn empty_recovery_backlog_does_not_create_run() { + let (manager, _bc) = manager(); + let result = manager.recover_mailbox_backlog(vec![]).await; + assert!(result.is_none()); + assert!(manager.active_run_id().await.is_none()); + } + #[tokio::test] async fn lease_keeps_run_active_until_commit() { let (manager, _bc) = manager(); diff --git a/crates/aionui-team/src/wake.rs b/crates/aionui-team/src/wake.rs index 675984e12..2db79833c 100644 --- a/crates/aionui-team/src/wake.rs +++ b/crates/aionui-team/src/wake.rs @@ -14,6 +14,7 @@ pub(crate) enum TeamWakeSource { CrashNotification, InactivityTimeout, ShutdownRejected, + RecoveryDrain, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -38,6 +39,7 @@ impl TeamWakeSource { Self::CrashNotification => "crash_notification", Self::InactivityTimeout => "inactivity_timeout", Self::ShutdownRejected => "shutdown_rejected", + Self::RecoveryDrain => "recovery_drain", } } @@ -45,9 +47,11 @@ impl TeamWakeSource { match self { Self::UserMessage | Self::UserIntervention => TeamWakeClass::Foreground, Self::McpSendMessage | Self::IdleNotification | Self::InterruptedNotification => TeamWakeClass::Background, - Self::CrashNotification | Self::InactivityTimeout | Self::SpawnAttachFailure | Self::ShutdownRejected => { - TeamWakeClass::SystemRecovery - } + Self::CrashNotification + | Self::InactivityTimeout + | Self::SpawnAttachFailure + | Self::ShutdownRejected + | Self::RecoveryDrain => TeamWakeClass::SystemRecovery, Self::SpawnWelcome | Self::McpShutdownRequest => TeamWakeClass::Lifecycle, } } From 30b564d1ae7ad3d5e1db0581736a996aeb8b50dc Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:34:02 +0800 Subject: [PATCH 4/8] fix: require TeamRun wake ownership for agent turns --- crates/aionui-team/src/session.rs | 102 +++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 3 deletions(-) diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index fbddeaccd..906383b98 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -248,6 +248,19 @@ impl TeamSession { } else { all_unread.into_iter().filter(|m| m.from_agent_id != slot_id).collect() }; + let active_team_run_id = self.team_run_manager.active_run_id().await; + if !unread.is_empty() && (active_team_run_id.is_none() || next_wake.is_none()) { + warn!( + team_id = %self.team.id, + slot_id, + unread_count = unread.len(), + has_active_team_run = active_team_run_id.is_some(), + has_pending_wake = next_wake.is_some(), + reason = "unowned_mailbox_backlog", + "team wake input skipped because unread mailbox is not owned by a TeamRun wake" + ); + return Ok(None); + } let tasks = self.scheduler.list_tasks().await?; let mut wake_body = build_wake_payload(&agent, &tasks, &unread); @@ -295,7 +308,7 @@ impl TeamSession { let should_send = !unread.is_empty(); Ok(Some(WakeInput { - team_run_id: self.team_run_manager.active_run_id().await, + team_run_id: active_team_run_id, conversation_id: agent.conversation_id, first_message, should_send, @@ -1428,13 +1441,15 @@ fn classify_send_message_queue_state( mod tests { use super::*; use crate::event_loop::AgentLoopContext; - use crate::team_run::ActiveChildTurn; + use crate::team_run::{ActiveChildTurn, RecoveryWakeCandidate}; use crate::test_utils::MockTeamRepo; use crate::types::{Team, TeamAgent, TeammateRole}; use aionui_ai_agent::AgentError; use aionui_ai_agent::agent_task::AgentInstance; use aionui_ai_agent::types::BuildTaskOptions; - use aionui_api_types::{TeamRunStatus, TeamSendMessageDelivery, TeamSendMessageReason, WebSocketMessage}; + use aionui_api_types::{ + TeamRunStatus, TeamRunTargetRole, TeamSendMessageDelivery, TeamSendMessageReason, WebSocketMessage, + }; use aionui_common::{AgentKillReason, TimestampMs, now_ms}; use std::sync::{Arc, Mutex}; @@ -1787,6 +1802,18 @@ mod tests { Arc::new(start_session().await) } + async fn record_recovery_wake(session: &TeamSession, slot_id: &str, role: TeamRunTargetRole, unread_count: usize) { + session + .team_run_manager() + .recover_mailbox_backlog(vec![RecoveryWakeCandidate { + slot_id: slot_id.to_owned(), + role, + unread_count, + }]) + .await + .expect("recovery wake"); + } + fn register_test_event_loop(session: &Arc, slot_id: &str) { session.event_loops().spawn( slot_id, @@ -2854,6 +2881,7 @@ mod tests { .write("t1", "lead-1", "user", MailboxMessageType::Message, "kick off", None) .await .unwrap(); + record_recovery_wake(&session, "lead-1", TeamRunTargetRole::Lead, 1).await; let input = session.compute_wake_input("lead-1").await.unwrap().expect("WakeInput"); @@ -2876,6 +2904,7 @@ mod tests { .write("t1", "worker-1", "user", MailboxMessageType::Message, "do X", None) .await .unwrap(); + record_recovery_wake(&session, "worker-1", TeamRunTargetRole::Teammate, 1).await; let input = session .compute_wake_input("worker-1") @@ -2907,6 +2936,7 @@ mod tests { .write("t1", "lead-1", "user", MailboxMessageType::Message, "follow-up", None) .await .unwrap(); + record_recovery_wake(&session, "lead-1", TeamRunTargetRole::Lead, 1).await; let input = session.compute_wake_input("lead-1").await.unwrap().expect("WakeInput"); @@ -2930,6 +2960,70 @@ mod tests { session.stop(); } + #[tokio::test] + async fn compute_wake_input_refuses_unowned_mailbox_backlog() { + let session = start_session().await; + let lead = session.scheduler().find_lead_slot_id().await.expect("lead"); + session + .mailbox() + .write( + session.team_id(), + &lead, + "worker-1", + MailboxMessageType::Message, + "recover this", + None, + ) + .await + .expect("mailbox write"); + + let input = session + .compute_wake_input(&lead) + .await + .expect("compute should not fail"); + + assert!(input.is_none(), "unowned unread mailbox must not start a turn"); + } + + #[tokio::test] + async fn compute_wake_input_accepts_recovery_pending_wake() { + let session = start_session().await; + let lead = session.scheduler().find_lead_slot_id().await.expect("lead"); + session + .mailbox() + .write( + session.team_id(), + &lead, + "worker-1", + MailboxMessageType::Message, + "recover this", + None, + ) + .await + .expect("mailbox write"); + session + .team_run_manager() + .recover_mailbox_backlog(vec![RecoveryWakeCandidate { + slot_id: lead.clone(), + role: TeamRunTargetRole::Lead, + unread_count: 1, + }]) + .await + .expect("recovery run"); + + let input = session + .compute_wake_input(&lead) + .await + .expect("compute") + .expect("owned wake input"); + + assert!(input.team_run_id.is_some()); + assert!(input.should_send); + assert_eq!(input.wake_source, Some(TeamWakeSource::RecoveryDrain)); + assert!(input.trigger_message_id.is_none()); + assert_eq!(input.unread.len(), 1); + } + #[tokio::test] async fn compute_wake_input_returns_unread_rows_and_role_for_teammate() { let session = start_session().await; @@ -2950,6 +3044,7 @@ mod tests { .write("t1", "worker-1", "user", MailboxMessageType::Message, "from user", None) .await .unwrap(); + record_recovery_wake(&session, "worker-1", TeamRunTargetRole::Teammate, 2).await; let input = session .compute_wake_input("worker-1") @@ -2972,6 +3067,7 @@ mod tests { .write("t1", "lead-1", "user", MailboxMessageType::Message, "hi lead", None) .await .unwrap(); + record_recovery_wake(&session, "lead-1", TeamRunTargetRole::Lead, 1).await; let input = session.compute_wake_input("lead-1").await.unwrap().expect("WakeInput"); From 40c339f11460bc35aa78cfa7b4695951693dea8c Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:35:34 +0800 Subject: [PATCH 5/8] feat: add team recovery drain scan --- crates/aionui-team/src/session.rs | 180 +++++++++++++++++++++++++++++- 1 file changed, 178 insertions(+), 2 deletions(-) diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index 906383b98..921638d1a 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use aionui_ai_agent::IWorkerTaskManager; @@ -27,7 +28,10 @@ use crate::service::TeamSessionService; use crate::task_board::TaskBoard; #[cfg(test)] use crate::team_run::WakeRecordDecision; -use crate::team_run::{ChildCancelTarget, TeamRunManager, TeamRunWakeAcquireOutcome, target_role_for}; +use crate::team_run::{ + ChildCancelTarget, RecoveryBacklogResult, RecoveryWakeCandidate, TeamRunManager, TeamRunWakeAcquireOutcome, + target_role_for, +}; use crate::types::{MailboxMessageType, Team, TeamAgent, TeammateRole, TeammateStatus}; use crate::wake::TeamWakeSource; @@ -98,6 +102,12 @@ pub struct TeamSession { /// Per-agent event loop registry. Each agent has a dedicated tokio task /// that drains its mailbox whenever notified. event_loops: Arc, + /// Set after the session lifecycle performs its system recovery mailbox scan. + /// Written by `try_start_recovery_drain` and read by later scan attempts so + /// ordinary event-loop notifications cannot repeatedly create recovery runs. + /// Reset only by constructing a fresh `TeamSession` during a new restore, + /// reconnect, or explicit re-ensure lifecycle. + recovery_scan_completed: AtomicBool, } impl TeamSession { @@ -163,6 +173,7 @@ impl TeamSession { service, broadcaster, event_loops, + recovery_scan_completed: AtomicBool::new(false), }) } @@ -822,6 +833,92 @@ impl TeamSession { Ok(()) } + pub(crate) async fn try_start_recovery_drain( + &self, + reason: &'static str, + ) -> Result, TeamError> { + if self.recovery_scan_completed.swap(true, Ordering::AcqRel) { + tracing::debug!( + team_id = %self.team.id, + reason, + "team recovery scan skipped because it already ran for this session lifecycle" + ); + return Ok(None); + } + + let agents = self.scheduler.list_agents().await; + let mut candidates = Vec::new(); + let mut unread_total = 0usize; + let mut missing_event_loops = Vec::new(); + + for agent in agents { + if !self.event_loops.has(&agent.slot_id) { + missing_event_loops.push(agent.slot_id.clone()); + continue; + } + + let unread = self.mailbox.peek_unread(&self.team.id, &agent.slot_id).await?; + let recoverable_count = unread + .into_iter() + .filter(|message| message.from_agent_id != agent.slot_id) + .count(); + if recoverable_count == 0 { + continue; + } + unread_total += recoverable_count; + candidates.push(RecoveryWakeCandidate { + slot_id: agent.slot_id, + role: target_role_for(agent.role), + unread_count: recoverable_count, + }); + } + + if !missing_event_loops.is_empty() { + warn!( + team_id = %self.team.id, + reason, + missing_event_loop_count = missing_event_loops.len(), + unread_count = unread_total, + "team recovery scan found slots without event loops; unread work is retained" + ); + } + + if candidates.is_empty() { + tracing::debug!( + team_id = %self.team.id, + reason, + "team recovery scan found no recoverable mailbox backlog" + ); + return Ok(None); + } + + let result = self.team_run_manager.recover_mailbox_backlog(candidates).await; + if let Some(result) = result.as_ref() { + info!( + team_id = %self.team.id, + team_run_id = %result.team_run_id, + source = "recovery_drain", + slot_count = result.recorded_wakes.len(), + pending_wake_count = result.pending_wake_count, + reason, + "team recovery scan recorded TeamRun wakes" + ); + for slot_id in &result.recorded_wakes { + self.event_loops.notify(slot_id); + } + } else { + warn!( + team_id = %self.team.id, + source = "recovery_drain", + unread_count = unread_total, + reason, + "team recovery scan retained unread mailbox backlog without recording wakes" + ); + } + + Ok(result) + } + /// Mirror each non-user mailbox row into the target agent's conversation /// as a left bubble so the UI shows "who said what" when the user opens /// an agent's chat panel. @@ -1448,7 +1545,8 @@ mod tests { use aionui_ai_agent::agent_task::AgentInstance; use aionui_ai_agent::types::BuildTaskOptions; use aionui_api_types::{ - TeamRunStatus, TeamRunTargetRole, TeamSendMessageDelivery, TeamSendMessageReason, WebSocketMessage, + TeamRunSource, TeamRunStatus, TeamRunTargetRole, TeamSendMessageDelivery, TeamSendMessageReason, + WebSocketMessage, }; use aionui_common::{AgentKillReason, TimestampMs, now_ms}; use std::sync::{Arc, Mutex}; @@ -3024,6 +3122,84 @@ mod tests { assert_eq!(input.unread.len(), 1); } + #[tokio::test] + async fn recovery_scan_creates_one_wake_per_slot_with_non_self_unread() { + let session = start_session_arc().await; + let lead = session.scheduler().find_lead_slot_id().await.expect("lead"); + let worker = session + .scheduler() + .list_agents() + .await + .into_iter() + .find(|agent| agent.slot_id != lead) + .expect("worker") + .slot_id; + register_test_event_loop(&session, &lead); + register_test_event_loop(&session, &worker); + + session + .mailbox() + .write(session.team_id(), &lead, "worker-1", MailboxMessageType::Message, "m1", None) + .await + .expect("lead mailbox write 1"); + session + .mailbox() + .write(session.team_id(), &lead, "worker-2", MailboxMessageType::Message, "m2", None) + .await + .expect("lead mailbox write 2"); + session + .mailbox() + .write( + session.team_id(), + &worker, + &worker, + MailboxMessageType::Message, + "self", + None, + ) + .await + .expect("self mailbox write"); + + let result = session + .try_start_recovery_drain("test_restore") + .await + .expect("scan should not fail") + .expect("recovery result"); + + assert_eq!(result.recorded_wakes, vec![lead.clone()]); + assert_eq!(result.source, TeamRunSource::RecoveryDrain); + assert_eq!(result.pending_wake_count, 1); + } + + #[tokio::test] + async fn recovery_scan_is_one_shot_per_session_lifecycle() { + let session = start_session_arc().await; + let lead = session.scheduler().find_lead_slot_id().await.expect("lead"); + register_test_event_loop(&session, &lead); + + let first = session + .try_start_recovery_drain("test_restore_no_work") + .await + .expect("first scan"); + assert!(first.is_none(), "empty first scan must not create recovery work"); + + session + .mailbox() + .write(session.team_id(), &lead, "worker-1", MailboxMessageType::Message, "late", None) + .await + .expect("late mailbox write"); + + let second = session + .try_start_recovery_drain("test_restore_second") + .await + .expect("second scan should not fail"); + + assert!( + second.is_none(), + "ordinary new work must not create a second recovery scan" + ); + } + #[tokio::test] async fn compute_wake_input_returns_unread_rows_and_role_for_teammate() { let session = start_session().await; From 391d8216b37d5f27ae85594f4b2d38e094ed3c56 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:38:51 +0800 Subject: [PATCH 6/8] fix: restore team mailbox backlog through TeamRun --- crates/aionui-team/src/service.rs | 19 +- crates/aionui-team/src/session.rs | 22 --- .../tests/session_service_integration.rs | 164 ++++++++++++++++++ 3 files changed, 170 insertions(+), 35 deletions(-) diff --git a/crates/aionui-team/src/service.rs b/crates/aionui-team/src/service.rs index 8e43dde6f..986309948 100644 --- a/crates/aionui-team/src/service.rs +++ b/crates/aionui-team/src/service.rs @@ -552,19 +552,12 @@ impl TeamSessionService { }; self.sessions.insert(team_id.to_owned(), entry); - // Notify all agents so they drain any pre-existing mailbox messages - // (e.g. from a prior session or backend restart). - for agent in &agents_snapshot { - if session.team_run_manager().active_run_id().await.is_some() { - warn!( - team_id, - slot_id = %agent.slot_id, - wake_policy = "session_restore_drain", - "session restore drain skipped because active team run exists" - ); - } else { - session.notify_agent_for_session_restore_drain(&agent.slot_id); - } + if let Err(err) = session.try_start_recovery_drain("ensure_session_ready").await { + warn!( + team_id, + error = %err, + "team recovery scan failed after session ensure" + ); } let active_count = if skip_leader { diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index 921638d1a..f54cd5420 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -205,10 +205,6 @@ impl TeamSession { &self.team_run_manager } - pub(crate) fn notify_agent_for_session_restore_drain(&self, slot_id: &str) { - self.event_loops.notify(slot_id); - } - pub fn mcp_stdio_config(&self, slot_id: &str) -> TeamMcpStdioConfig { TeamMcpStdioConfig { team_id: self.team.id.clone(), @@ -2573,24 +2569,6 @@ mod tests { session.stop(); } - #[tokio::test] - async fn session_restore_drain_does_not_record_pending_wake_without_active_run() { - let session = start_session().await; - - session.notify_agent_for_session_restore_drain("worker-1"); - - let reservation = session - .team_run_manager() - .claim_wake_for_turn("worker-1", TeamRunTargetRole::Teammate, "c2") - .await; - - assert!( - reservation.is_none(), - "restore drain must not create Team Run reservation" - ); - session.stop(); - } - #[tokio::test] async fn crash_notification_in_active_run_wakes_leader() { let session = start_session().await; diff --git a/crates/aionui-team/tests/session_service_integration.rs b/crates/aionui-team/tests/session_service_integration.rs index e8175f813..e72407734 100644 --- a/crates/aionui-team/tests/session_service_integration.rs +++ b/crates/aionui-team/tests/session_service_integration.rs @@ -218,6 +218,34 @@ impl AgentTurnExecutionPort for NoopTurnPort { } } +#[derive(Default)] +struct RecordingTurnPort { + requests: Mutex>, +} + +#[async_trait::async_trait] +impl AgentTurnExecutionPort for RecordingTurnPort { + async fn run_agent_turn(&self, request: AgentTurnRequest) -> Result { + if let Some(on_started) = request.on_started.as_ref() { + on_started(AgentTurnStarted { + team_run_id: request.team_run_id.clone().expect("team run id"), + slot_id: request.slot_id.clone(), + role: request.role.clone(), + conversation_id: request.conversation_id.clone(), + turn_id: format!("turn-{}", request.slot_id), + }) + .await; + } + self.requests.lock().unwrap().push(request.clone()); + Ok(AgentTurnOutcome { + conversation_id: request.conversation_id, + turn_id: "turn-recorded".into(), + status: AgentTurnStatus::Completed, + runtime: None, + }) + } +} + fn noop_turn_port() -> Arc { Arc::new(NoopTurnPort) } @@ -1131,10 +1159,146 @@ fn setup_with_ports_team_repo_and_conversation_repo( (svc, team_repo, conversation_ports, conv_repo) } +fn setup_with_recording_turn_port() -> ( + Arc, + Arc, + Arc, + Arc, +) { + let team_repo = Arc::new(FullMockTeamRepo::new()); + let team_repo_dyn: Arc = team_repo.clone(); + let conv_repo = Arc::new(MockConversationRepo::new()); + let broadcaster: Arc = Arc::new(NullBroadcaster); + let conversation_ports = Arc::new(FakeConversationPorts::new(conv_repo.clone(), broadcaster.clone())); + let conversation_port: Arc = conversation_ports.clone(); + let projection_store: Arc = conversation_ports.clone(); + let lookup_port: Arc = conversation_ports; + let task_manager: Arc = Arc::new(CountingTaskManager::new(success_factory())); + let turn_port = Arc::new(RecordingTurnPort::default()); + let backend_binary_path = Arc::new(std::path::PathBuf::from("/tmp/aioncore-test")); + let provider_repo: Arc = Arc::new(EmptyProviderRepo); + let svc = TeamSessionService::new( + team_repo_dyn, + Arc::new(StubAgentMetadataRepo::empty()), + provider_repo, + conversation_port, + projection_store, + lookup_port, + broadcaster, + task_manager, + turn_port.clone(), + noop_cancellation_port(), + backend_binary_path, + None, + ); + (svc, team_repo, turn_port, conv_repo) +} + fn setup() -> Arc { setup_with_factory(success_factory()).0 } +#[tokio::test] +async fn ensure_session_recovery_drain_runs_agent_turn_with_team_run_id() { + let (svc, team_repo, turn_port, _conv_repo) = setup_with_recording_turn_port(); + let created = svc + .create_team( + "user1", + CreateTeamRequest { + name: "Recover".into(), + agents: two_agent_input(), + workspace: None, + }, + ) + .await + .expect("create team"); + let lead_slot_id = created.lead_agent_id.clone().expect("lead"); + svc.stop_session("user1", &created.id) + .await + .expect("stop auto-started session"); + + team_repo + .write_message(&aionui_db::models::MailboxMessageRow { + id: "mailbox-orphan-1".into(), + team_id: created.id.clone(), + to_agent_id: lead_slot_id.clone(), + from_agent_id: "worker-or-user".into(), + msg_type: "message".into(), + content: "orphan backlog".into(), + summary: None, + files: None, + read: false, + created_at: aionui_common::now_ms(), + }) + .await + .expect("seed orphan mailbox"); + + svc.ensure_session("user1", &created.id).await.expect("ensure"); + + tokio::time::timeout(std::time::Duration::from_secs(2), async { + loop { + if !turn_port.requests.lock().unwrap().is_empty() { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + } + }) + .await + .expect("recovery turn should run"); + + let requests = turn_port.requests.lock().unwrap(); + assert_eq!(requests.len(), 1); + assert_eq!(requests[0].slot_id, lead_slot_id); + assert!( + requests[0].team_run_id.is_some(), + "recovery turn must be TeamRun-owned" + ); +} + +#[tokio::test] +async fn ensure_session_does_not_run_self_message_only_recovery_turn() { + let (svc, team_repo, turn_port, _conv_repo) = setup_with_recording_turn_port(); + let created = svc + .create_team( + "user1", + CreateTeamRequest { + name: "Self Only".into(), + agents: two_agent_input(), + workspace: None, + }, + ) + .await + .expect("create team"); + let lead_slot_id = created.lead_agent_id.clone().expect("lead"); + svc.stop_session("user1", &created.id) + .await + .expect("stop auto-started session"); + + team_repo + .write_message(&aionui_db::models::MailboxMessageRow { + id: "mailbox-self-1".into(), + team_id: created.id.clone(), + to_agent_id: lead_slot_id.clone(), + from_agent_id: lead_slot_id, + msg_type: "message".into(), + content: "self backlog".into(), + summary: None, + files: None, + read: false, + created_at: aionui_common::now_ms(), + }) + .await + .expect("seed self mailbox"); + + svc.ensure_session("user1", &created.id).await.expect("ensure"); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + assert!( + turn_port.requests.lock().unwrap().is_empty(), + "self-only unread must not start a recovery turn" + ); +} + fn setup_with_recording_broadcaster() -> (Arc, Arc) { let team_repo: Arc = Arc::new(FullMockTeamRepo::new()); let conv_repo = Arc::new(MockConversationRepo::new()); From d606a475d74d0d694ac5ac63a93bfcf8ecae6703 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:43:27 +0800 Subject: [PATCH 7/8] test: update TeamRun wake ownership fixtures --- crates/aionui-api-types/src/lib.rs | 4 +- crates/aionui-team/src/session.rs | 31 ++++++++++-- crates/aionui-team/tests/e2e_team_flow.rs | 47 +++---------------- .../tests/session_service_integration.rs | 5 +- 4 files changed, 37 insertions(+), 50 deletions(-) diff --git a/crates/aionui-api-types/src/lib.rs b/crates/aionui-api-types/src/lib.rs index adf91585c..3c6c0d305 100644 --- a/crates/aionui-api-types/src/lib.rs +++ b/crates/aionui-api-types/src/lib.rs @@ -150,8 +150,8 @@ pub use team::{ RenameAgentRequest, RenameTeamRequest, SendAgentMessageRequest, SendTeamMessageRequest, TeamAgentInput, TeamAgentRemovedPayload, TeamAgentRenamedPayload, TeamAgentResponse, TeamAgentSpawnedPayload, TeamAgentStatusPayload, TeamChildTurnPayload, TeamListResponse, TeamMcpPhase, TeamMcpRuntimeConfig, - TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole, - TeamRunSource, TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason, + TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunPayload, TeamRunSource, TeamRunStatus, + TeamRunTargetRole, TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason, TeamSendMessageStatus, TeamSendMessageTargetQueueState, TeamSessionBinding, TeamSlotRuntimeHealth, TeamSlotWorkPayload, TeammateMessagePayload, }; diff --git a/crates/aionui-team/src/session.rs b/crates/aionui-team/src/session.rs index f54cd5420..cf7a6a74c 100644 --- a/crates/aionui-team/src/session.rs +++ b/crates/aionui-team/src/session.rs @@ -3117,12 +3117,26 @@ mod tests { session .mailbox() - .write(session.team_id(), &lead, "worker-1", MailboxMessageType::Message, "m1", None) + .write( + session.team_id(), + &lead, + "worker-1", + MailboxMessageType::Message, + "m1", + None, + ) .await .expect("lead mailbox write 1"); session .mailbox() - .write(session.team_id(), &lead, "worker-2", MailboxMessageType::Message, "m2", None) + .write( + session.team_id(), + &lead, + "worker-2", + MailboxMessageType::Message, + "m2", + None, + ) .await .expect("lead mailbox write 2"); session @@ -3163,7 +3177,14 @@ mod tests { session .mailbox() - .write(session.team_id(), &lead, "worker-1", MailboxMessageType::Message, "late", None) + .write( + session.team_id(), + &lead, + "worker-1", + MailboxMessageType::Message, + "late", + None, + ) .await .expect("late mailbox write"); @@ -3245,6 +3266,7 @@ mod tests { ) .await .unwrap(); + record_recovery_wake(&session, "lead-1", TeamRunTargetRole::Lead, 1).await; let input = session.compute_wake_input("lead-1").await.unwrap().expect("WakeInput"); @@ -3270,6 +3292,7 @@ mod tests { ) .await .unwrap(); + record_recovery_wake(&session, "lead-1", TeamRunTargetRole::Lead, 1).await; let input = session.compute_wake_input("lead-1").await.unwrap().expect("WakeInput"); assert_eq!( @@ -3303,6 +3326,7 @@ mod tests { ) .await .unwrap(); + record_recovery_wake(&session, "lead-1", TeamRunTargetRole::Lead, 1).await; let input = session.compute_wake_input("lead-1").await.unwrap().expect("WakeInput"); session.mirror_unread_to_conversation(&input).await; @@ -3389,6 +3413,7 @@ mod tests { .write("t1", "worker-1", "lead-1", MailboxMessageType::Message, "do it", None) .await .unwrap(); + record_recovery_wake(&session, "worker-1", TeamRunTargetRole::Teammate, 1).await; let input = session .compute_wake_input("worker-1") diff --git a/crates/aionui-team/tests/e2e_team_flow.rs b/crates/aionui-team/tests/e2e_team_flow.rs index a92cf14da..bc22b8b52 100644 --- a/crates/aionui-team/tests/e2e_team_flow.rs +++ b/crates/aionui-team/tests/e2e_team_flow.rs @@ -1814,20 +1814,8 @@ async fn s11_shutdown_approved_interception() { /// Scenario 12a: Cold-start lead gets role prompt injected into first_message. #[tokio::test] async fn s12a_cold_start_lead_gets_role_prompt() { - let (session, _tm, _repo, _sent) = setup_session().await; - - session - .mailbox() - .write( - "e2e-team", - "lead-1", - "user", - aionui_team::MailboxMessageType::Message, - "kick off", - None, - ) - .await - .unwrap(); + let (session, _tm, _repo, _sent, _turn_requests) = setup_session_with_turn_recorder_without_loops().await; + session.send_message("kick off", None).await.unwrap(); let input = session .compute_wake_input("lead-1") @@ -1853,22 +1841,10 @@ async fn s12a_cold_start_lead_gets_role_prompt() { /// on a first compute_wake_input call, then assert the second call omits it. #[tokio::test] async fn s12b_warm_lead_skips_role_prompt() { - let (session, _tm, _repo, _sent) = setup_session().await; + let (session, _tm, _repo, _sent, _turn_requests) = setup_session_with_turn_recorder_without_loops().await; - // First: consume the cold-start role-prompt flag by calling compute_wake_input once. - // The mailbox will be empty so should_send=false, but the flag is consumed. - session - .mailbox() - .write( - "e2e-team", - "lead-1", - "user", - aionui_team::MailboxMessageType::Message, - "initial kick", - None, - ) - .await - .unwrap(); + // First: consume the cold-start role-prompt flag from a TeamRun-owned wake. + session.send_message("initial kick", None).await.unwrap(); let first_input = session .compute_wake_input("lead-1") .await @@ -1881,18 +1857,7 @@ async fn s12b_warm_lead_skips_role_prompt() { // The flag is now consumed (take_needs_role_prompt returned true, set to false). // Second call: write another message and compute again — no role prompt this time. - session - .mailbox() - .write( - "e2e-team", - "lead-1", - "user", - aionui_team::MailboxMessageType::Message, - "follow-up message", - None, - ) - .await - .unwrap(); + session.send_message("follow-up message", None).await.unwrap(); let input = session .compute_wake_input("lead-1") diff --git a/crates/aionui-team/tests/session_service_integration.rs b/crates/aionui-team/tests/session_service_integration.rs index e72407734..51c9c8481 100644 --- a/crates/aionui-team/tests/session_service_integration.rs +++ b/crates/aionui-team/tests/session_service_integration.rs @@ -1249,10 +1249,7 @@ async fn ensure_session_recovery_drain_runs_agent_turn_with_team_run_id() { let requests = turn_port.requests.lock().unwrap(); assert_eq!(requests.len(), 1); assert_eq!(requests[0].slot_id, lead_slot_id); - assert!( - requests[0].team_run_id.is_some(), - "recovery turn must be TeamRun-owned" - ); + assert!(requests[0].team_run_id.is_some(), "recovery turn must be TeamRun-owned"); } #[tokio::test] From f18b4875f61d664798cea2113ce16b843a2eaaf9 Mon Sep 17 00:00:00 2001 From: zynx <> Date: Wed, 17 Jun 2026 18:52:43 +0800 Subject: [PATCH 8/8] test: serialize managed runtime cache tests --- crates/aionui-runtime/src/node_runtime/mod.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/aionui-runtime/src/node_runtime/mod.rs b/crates/aionui-runtime/src/node_runtime/mod.rs index 72892ce70..d3cd4a392 100644 --- a/crates/aionui-runtime/src/node_runtime/mod.rs +++ b/crates/aionui-runtime/src/node_runtime/mod.rs @@ -286,12 +286,14 @@ mod tests { use super::*; use std::fs; use std::path::PathBuf; - use std::sync::{Arc, Mutex}; + use std::sync::{Arc, Mutex, OnceLock}; use std::io::Write; use tracing::Level; use tracing_subscriber::fmt; + static TEST_MANAGED_RUNTIME_CACHE_LOCK: OnceLock> = OnceLock::new(); + #[derive(Clone)] struct SharedBuf(Arc>>); @@ -354,6 +356,10 @@ mod tests { } } + fn test_managed_runtime_cache_lock() -> &'static tokio::sync::Mutex<()> { + TEST_MANAGED_RUNTIME_CACHE_LOCK.get_or_init(|| tokio::sync::Mutex::new(())) + } + #[test] fn probe_non_node_command_is_path_only() { let probe = probe_runtime_command("sh"); @@ -429,6 +435,7 @@ mod tests { #[tokio::test] async fn stale_managed_runtime_cache_is_evicted_when_root_is_deleted() { + let _guard = test_managed_runtime_cache_lock().lock().await; let tmp = tempfile::tempdir().expect("tempdir"); let root = tmp.path().join("node-v24.11.0-test"); let runtime = fake_managed_runtime(&root); @@ -453,6 +460,7 @@ mod tests { #[tokio::test] async fn cached_managed_runtime_emits_ready_after_validation() { + let _guard = test_managed_runtime_cache_lock().lock().await; let tmp = tempfile::tempdir().expect("tempdir"); let root = tmp.path().join("node-v24.11.0-test"); let runtime = fake_managed_runtime(&root);