Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/aionui-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ pub use team::{
RenameAgentRequest, RenameTeamRequest, SendAgentMessageRequest, SendTeamMessageRequest, TeamAgentInput,
TeamAgentRemovedPayload, TeamAgentRenamedPayload, TeamAgentResponse, TeamAgentSpawnedPayload,
TeamAgentStatusPayload, TeamChildTurnPayload, TeamListResponse, TeamMcpPhase, TeamMcpRuntimeConfig,
TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunPayload, TeamRunStatus, TeamRunTargetRole,
TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason,
TeamMcpStatusPayload, TeamResponse, TeamRunAckResponse, TeamRunPayload, TeamRunSource, TeamRunStatus,
TeamRunTargetRole, TeamRuntimeSeed, TeamSendMessageDelivery, TeamSendMessageQueuedResponse, TeamSendMessageReason,
TeamSendMessageStatus, TeamSendMessageTargetQueueState, TeamSessionBinding, TeamSlotRuntimeHealth,
TeamSlotWorkPayload, TeammateMessagePayload,
};
Expand Down
72 changes: 72 additions & 0 deletions crates/aionui-api-types/src/team.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ pub enum TeamRunStatus {
Failed,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TeamRunSource {
UserMessage,
RecoveryDrain,
}

#[derive(Debug, Deserialize)]
pub struct CancelTeamRunRequest {
#[serde(default)]
Expand All @@ -225,6 +232,8 @@ pub struct PauseTeamSlotRequest {
pub struct TeamRunAckResponse {
pub team_run_id: String,
pub team_id: String,
pub source: TeamRunSource,
pub has_user_intervention: bool,
pub target_slot_id: String,
pub target_role: TeamRunTargetRole,
pub accepted_slot_id: String,
Expand Down Expand Up @@ -269,6 +278,8 @@ pub struct TeamSlotWorkPayload {
pub struct TeamRunPayload {
pub team_id: String,
pub team_run_id: String,
pub source: TeamRunSource,
pub has_user_intervention: bool,
pub target_slot_id: String,
pub target_role: TeamRunTargetRole,
pub status: TeamRunStatus,
Expand Down Expand Up @@ -1110,6 +1121,8 @@ mod tests {
let ack = TeamRunAckResponse {
team_run_id: "trun-1".into(),
team_id: "team-1".into(),
source: TeamRunSource::UserMessage,
has_user_intervention: true,
target_slot_id: "lead-1".into(),
target_role: TeamRunTargetRole::Lead,
accepted_slot_id: "lead-1".into(),
Expand All @@ -1129,6 +1142,8 @@ mod tests {
let ack = TeamRunAckResponse {
team_run_id: "trun-1".into(),
team_id: "team-1".into(),
source: TeamRunSource::UserMessage,
has_user_intervention: true,
target_slot_id: "lead-1".into(),
target_role: TeamRunTargetRole::Lead,
accepted_slot_id: "worker-1".into(),
Expand All @@ -1144,11 +1159,64 @@ mod tests {
assert_eq!(value["accepted_role"], "teammate");
}

#[test]
fn team_run_source_serializes_snake_case() {
let user = serde_json::to_value(TeamRunSource::UserMessage).unwrap();
let recovery = serde_json::to_value(TeamRunSource::RecoveryDrain).unwrap();

assert_eq!(user, serde_json::json!("user_message"));
assert_eq!(recovery, serde_json::json!("recovery_drain"));
}

#[test]
fn team_run_payload_serializes_source_metadata() {
let payload = TeamRunPayload {
team_id: "team-1".into(),
team_run_id: "run-1".into(),
source: TeamRunSource::RecoveryDrain,
has_user_intervention: false,
target_slot_id: "lead-1".into(),
target_role: TeamRunTargetRole::Lead,
status: TeamRunStatus::Accepted,
active_child_count: 0,
pending_wake_count: 1,
starting_child_count: 0,
slot_work: vec![],
};

let json = serde_json::to_value(payload).unwrap();
assert_eq!(json["source"], "recovery_drain");
assert_eq!(json["has_user_intervention"], false);
}

#[test]
fn team_run_ack_serializes_source_metadata() {
let ack = TeamRunAckResponse {
team_run_id: "run-1".into(),
team_id: "team-1".into(),
source: TeamRunSource::UserMessage,
has_user_intervention: true,
target_slot_id: "lead-1".into(),
target_role: TeamRunTargetRole::Lead,
accepted_slot_id: "lead-1".into(),
accepted_role: TeamRunTargetRole::Lead,
status: TeamRunStatus::Accepted,
message_id: Some("mailbox-1".into()),
};

let json = serde_json::to_value(ack).unwrap();
assert_eq!(json["source"], "user_message");
assert_eq!(json["has_user_intervention"], true);
assert_eq!(json["message_id"], "mailbox-1");
}

#[test]
fn team_run_payload_omits_sensitive_content() {
let payload = TeamRunPayload {
team_id: "team-1".into(),
team_run_id: "trun-1".into(),
source: TeamRunSource::UserMessage,
has_user_intervention: true,
target_slot_id: "worker-1".into(),
target_role: TeamRunTargetRole::Teammate,
status: TeamRunStatus::Running,
Expand All @@ -1170,6 +1238,8 @@ mod tests {
let payload = TeamRunPayload {
team_id: "team-1".into(),
team_run_id: "trun-1".into(),
source: TeamRunSource::UserMessage,
has_user_intervention: true,
target_slot_id: "lead-1".into(),
target_role: TeamRunTargetRole::Lead,
status: TeamRunStatus::Running,
Expand Down Expand Up @@ -1216,6 +1286,8 @@ mod tests {
let decoded: TeamRunPayload = serde_json::from_value(serde_json::json!({
"team_id": "team-1",
"team_run_id": "trun-1",
"source": "user_message",
"has_user_intervention": true,
"target_slot_id": "lead-1",
"target_role": "lead",
"status": "running",
Expand Down
10 changes: 9 additions & 1 deletion crates/aionui-runtime/src/node_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ mod tests {
use super::*;
use std::fs;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, OnceLock};

use std::io::Write;
use tracing::Level;
use tracing_subscriber::fmt;

static TEST_MANAGED_RUNTIME_CACHE_LOCK: OnceLock<tokio::sync::Mutex<()>> = OnceLock::new();

#[derive(Clone)]
struct SharedBuf(Arc<Mutex<Vec<u8>>>);

Expand Down Expand Up @@ -354,6 +356,10 @@ mod tests {
}
}

fn test_managed_runtime_cache_lock() -> &'static tokio::sync::Mutex<()> {
TEST_MANAGED_RUNTIME_CACHE_LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
}

#[test]
fn probe_non_node_command_is_path_only() {
let probe = probe_runtime_command("sh");
Expand Down Expand Up @@ -429,6 +435,7 @@ mod tests {

#[tokio::test]
async fn stale_managed_runtime_cache_is_evicted_when_root_is_deleted() {
let _guard = test_managed_runtime_cache_lock().lock().await;
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().join("node-v24.11.0-test");
let runtime = fake_managed_runtime(&root);
Expand All @@ -453,6 +460,7 @@ mod tests {

#[tokio::test]
async fn cached_managed_runtime_emits_ready_after_validation() {
let _guard = test_managed_runtime_cache_lock().lock().await;
let tmp = tempfile::tempdir().expect("tempdir");
let root = tmp.path().join("node-v24.11.0-test");
let runtime = fake_managed_runtime(&root);
Expand Down
2 changes: 2 additions & 0 deletions crates/aionui-team/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ mod tests {
aionui_api_types::TeamRunPayload {
team_id: "team-1".into(),
team_run_id: "run-1".into(),
source: aionui_api_types::TeamRunSource::UserMessage,
has_user_intervention: true,
target_slot_id: "lead-1".into(),
target_role: aionui_api_types::TeamRunTargetRole::Lead,
status: aionui_api_types::TeamRunStatus::Accepted,
Expand Down
19 changes: 6 additions & 13 deletions crates/aionui-team/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,19 +552,12 @@ impl TeamSessionService {
};
self.sessions.insert(team_id.to_owned(), entry);

// Notify all agents so they drain any pre-existing mailbox messages
// (e.g. from a prior session or backend restart).
for agent in &agents_snapshot {
if session.team_run_manager().active_run_id().await.is_some() {
warn!(
team_id,
slot_id = %agent.slot_id,
wake_policy = "session_restore_drain",
"session restore drain skipped because active team run exists"
);
} else {
session.notify_agent_for_session_restore_drain(&agent.slot_id);
}
if let Err(err) = session.try_start_recovery_drain("ensure_session_ready").await {
warn!(
team_id,
error = %err,
"team recovery scan failed after session ensure"
);
}

let active_count = if skip_leader {
Expand Down
Loading
Loading