From 1d95038347ffe1c52c4831216ae8e0b22ecdd044 Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Thu, 23 Apr 2026 14:52:19 -0400 Subject: [PATCH 1/2] fix(memory): close triage gaps and add proof tests --- docs/design-docs/working-memory-triage.md | 16 +- src/agent/channel.rs | 123 ++++++++-- src/agent/channel_dispatch.rs | 63 ++++- src/agent/compactor.rs | 33 +++ src/agent/cortex.rs | 85 ++++++- src/api/channels.rs | 122 +++++++--- src/cron/scheduler.rs | 91 +++++++- src/memory/working.rs | 271 +++++++++++++++++++++- src/secrets/scrub.rs | 120 ++++++++++ src/tasks/store.rs | 34 ++- src/tools/spawn_worker.rs | 19 +- src/tools/task_update.rs | 133 ++++++++++- 12 files changed, 1009 insertions(+), 101 deletions(-) diff --git a/docs/design-docs/working-memory-triage.md b/docs/design-docs/working-memory-triage.md index 762dce88e..6a919c2ed 100644 --- a/docs/design-docs/working-memory-triage.md +++ b/docs/design-docs/working-memory-triage.md @@ -17,8 +17,8 @@ Findings from CodeRabbit review + bug reports. Tracking resolution before merge. - [x] **R3 — Don't exclude participant-role facts yet** (`prompts/en/cortex_knowledge_synthesis.md.j2:21`) Exclusion of "The user is the CEO" drops participant context with nowhere else to live until Phase 6 ships. **Fixed in this slice:** knowledge synthesis now preserves concise participant/user role facts when they affect future routing, authority, relationships, or interpretation. -- [ ] **R4 — Raw worker task in working memory** (`src/agent/channel_dispatch.rs:596`) - `task` from user input persisted verbatim; could capture secrets/PII. Truncate and scrub. +- [x] **R4 — Raw worker task in working memory** (`src/agent/channel_dispatch.rs:596`) + `task` from user input persisted verbatim; could capture secrets/PII. **Fixed in this slice:** worker-spawn task text is now redacted and bounded via shared working-memory scrub helpers. - [ ] **R5 — Dirty flag only bumps on merges** (`src/agent/cortex.rs:1958`) Prunes and decays also change the memory set but don't trigger knowledge synthesis re-gen. Add `report.pruned > 0 || report.decayed > 0`. **Partial in PR #570:** prunes and merges now dirty synthesis; decay remains intentionally importance-only and needs a follow-up decision. @@ -41,11 +41,11 @@ Findings from CodeRabbit review + bug reports. Tracking resolution before merge. - [x] **R11 — Unsynthesized yesterday events dropped** (`src/agent/cortex.rs:2916`) Raw events that didn't hit count/time trigger before midnight are lost from daily summary. Roll them into the summary. **Fixed:** daily summary now fetches all raw events, filters to the unsynthesized tail after the last intra-day synthesis, and includes them in the LLM input. -- [ ] **R12 — Silent error swallowing in inspect_prompt** (`src/api/channels.rs:649`) - `unwrap_or_default()` / `.ok()` hides DB/template errors. Log and propagate per coding guidelines. +- [x] **R12 — Silent error swallowing in inspect_prompt** (`src/api/channels.rs:649`) + `unwrap_or_default()` / `.ok()` hides DB/template errors. **Fixed in this slice:** inspect prompt now logs and returns internal errors when DB/template rendering fails. -- [ ] **R13 — Raw error strings in working memory** (`src/cron/scheduler.rs:386`) - Full error text persisted; could contain sensitive internals. Emit redacted summary only. +- [x] **R13 — Raw error strings in working memory** (`src/cron/scheduler.rs:386`) + Full error text persisted; could contain sensitive internals. **Fixed in this slice:** cron error events now persist scrubbed and bounded summaries (including encoded leak fail-closed redaction). - [ ] **R14 — Timezone fallback drops valid `cron_timezone`** (`src/main.rs:2559`) If `user_timezone` is present but unparseable, `cron_timezone` is never tried. Parse each independently. @@ -53,8 +53,8 @@ Findings from CodeRabbit review + bug reports. Tracking resolution before merge. - [x] **R15 — UTF-8 panic on topic truncation** (`src/memory/working.rs:739`) Byte-index slice at 80 can split multibyte chars. **Fixed:** `floor_char_boundary(80)`. -- [ ] **R16 — Task update event always says "status change"** (`src/tools/task_update.rs:246`) - Every update emits `"updated to "` even for title/description edits. Compute actual delta. +- [x] **R16 — Task update event always says "status change"** (`src/tools/task_update.rs:246`) + Every update emits `"updated to "` even for title/description edits. **Fixed in this slice:** working-memory task updates now describe actual field deltas and preserve status-only wording when only status changed. ## Live Observations (from prompt inspect, March 19) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 963fdeab8..440edd96f 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -218,6 +218,31 @@ fn branch_working_memory_event_summary( ) } +fn memory_persistence_trigger_kind( + message_count: usize, + elapsed_secs: u64, + event_count_since_last: Option, + wm_config: &crate::config::WorkingMemoryConfig, +) -> Option<&'static str> { + let message_trigger = message_count >= wm_config.persistence_message_threshold; + let time_trigger = + message_count > 0 && elapsed_secs >= wm_config.persistence_time_threshold_secs; + let density_trigger = !message_trigger + && !time_trigger + && event_count_since_last + .is_some_and(|count| count >= wm_config.persistence_event_density_threshold); + + if message_trigger { + Some("message_count") + } else if time_trigger { + Some("time") + } else if density_trigger { + Some("event_density") + } else { + None + } +} + fn parse_branch_cancellation_reason(conclusion: &str) -> Option<&str> { let trimmed = conclusion.trim(); if let Some(rest) = trimmed.strip_prefix(BRANCH_CANCELLED_PREFIX) { @@ -3706,17 +3731,14 @@ impl Channel { let wm_config = **self.deps.runtime_config.working_memory.load(); let elapsed = self.last_persistence_at.elapsed(); - - // Trigger 1: Message count threshold. - let message_trigger = self.message_count >= wm_config.persistence_message_threshold; - - // Trigger 2: Time-based — only if conversation is active (message_count > 0). - let time_trigger = self.message_count > 0 - && elapsed.as_secs() >= wm_config.persistence_time_threshold_secs; - - // Trigger 3: Event density — working memory events from this channel. - let density_trigger = if !message_trigger && !time_trigger { - // Only check DB if the cheap triggers didn't fire. + let event_count_since_last = if memory_persistence_trigger_kind( + self.message_count, + elapsed.as_secs(), + None, + &wm_config, + ) + .is_none() + { let since = chrono::Utc::now() - chrono::Duration::seconds(elapsed.as_secs() as i64); match self .deps @@ -3724,26 +3746,23 @@ impl Channel { .count_events_since(self.id.as_ref(), since) .await { - Ok(count) => count as usize >= wm_config.persistence_event_density_threshold, + Ok(count) => Some(count as usize), Err(error) => { tracing::debug!(%error, "event density check failed, skipping"); - false + None } } } else { - false + None }; - if !message_trigger && !time_trigger && !density_trigger { + let Some(trigger) = memory_persistence_trigger_kind( + self.message_count, + elapsed.as_secs(), + event_count_since_last, + &wm_config, + ) else { return; - } - - let trigger = if message_trigger { - "message_count" - } else if time_trigger { - "time" - } else { - "event_density" }; // Reset counters before spawning so subsequent messages don't pile up. @@ -3993,9 +4012,11 @@ mod tests { ObserveModeFallbackState, branch_working_memory_event_summary, classify_conversational_event_summary, compute_listen_mode_invocation, decision_user_id, extract_decision_summary_from_reply, format_conversational_event_summary, - is_dm_conversation_id, recv_channel_event, should_process_event_for_channel, - should_send_discord_quiet_mode_ping_ack, should_send_quiet_mode_fallback, + is_dm_conversation_id, memory_persistence_trigger_kind, recv_channel_event, + should_process_event_for_channel, should_send_discord_quiet_mode_ping_ack, + should_send_quiet_mode_fallback, }; + use crate::config::WorkingMemoryConfig; use crate::memory::{MemoryType, WorkingMemoryEventType}; use crate::{AgentId, ChannelId, InboundMessage, MessageContent, ProcessEvent, ProcessId}; use std::collections::HashMap; @@ -4110,6 +4131,58 @@ mod tests { ); } + #[test] + fn memory_persistence_trigger_prefers_message_count() { + let config = WorkingMemoryConfig { + persistence_message_threshold: 20, + persistence_time_threshold_secs: 900, + persistence_event_density_threshold: 5, + ..WorkingMemoryConfig::default() + }; + + let trigger = memory_persistence_trigger_kind(20, 900, Some(10), &config); + assert_eq!(trigger, Some("message_count")); + } + + #[test] + fn memory_persistence_trigger_uses_time_for_active_channels() { + let config = WorkingMemoryConfig { + persistence_message_threshold: 20, + persistence_time_threshold_secs: 900, + persistence_event_density_threshold: 5, + ..WorkingMemoryConfig::default() + }; + + let trigger = memory_persistence_trigger_kind(3, 900, Some(10), &config); + assert_eq!(trigger, Some("time")); + } + + #[test] + fn memory_persistence_trigger_uses_event_density_when_other_triggers_miss() { + let config = WorkingMemoryConfig { + persistence_message_threshold: 20, + persistence_time_threshold_secs: 900, + persistence_event_density_threshold: 5, + ..WorkingMemoryConfig::default() + }; + + let trigger = memory_persistence_trigger_kind(3, 120, Some(5), &config); + assert_eq!(trigger, Some("event_density")); + } + + #[test] + fn memory_persistence_trigger_requires_activity_for_time_fallback() { + let config = WorkingMemoryConfig { + persistence_message_threshold: 20, + persistence_time_threshold_secs: 900, + persistence_event_density_threshold: 5, + ..WorkingMemoryConfig::default() + }; + + let trigger = memory_persistence_trigger_kind(0, 5000, Some(0), &config); + assert_eq!(trigger, None); + } + #[test] fn decision_user_id_skips_retrigger_messages() { let humans = vec![crate::config::HumanDef { diff --git a/src/agent/channel_dispatch.rs b/src/agent/channel_dispatch.rs index 30402c119..5087457fd 100644 --- a/src/agent/channel_dispatch.rs +++ b/src/agent/channel_dispatch.rs @@ -40,6 +40,8 @@ enum WorkerCompletionKind { Failed, } +const WORKING_MEMORY_TASK_MAX_CHARS: usize = 500; + #[derive(Debug, Clone)] pub(crate) enum WorkerCompletionError { Cancelled { reason: String }, @@ -99,6 +101,14 @@ pub(crate) fn map_worker_completion_result( (result_text, notify, success) } +fn sanitize_worker_memory_task(task: &str, tool_secret_pairs: &[(String, String)]) -> String { + crate::secrets::scrub::scrub_working_memory_text( + task, + tool_secret_pairs, + WORKING_MEMORY_TASK_MAX_CHARS, + ) +} + /// Build the worker status text (time + system info) used in worker system prompts. /// /// Centralises the `SystemInfo` + `TemporalContext` assembly so every worker @@ -597,9 +607,9 @@ async fn spawn_worker_inner( let sandbox_write_allowlist = state.deps.sandbox.prompt_write_allowlist(); // Collect tool secret names so the worker template can list available credentials. let secrets_guard = rc.secrets.load(); - let tool_secret_names = match (*secrets_guard).as_ref() { - Some(store) => store.tool_secret_names(), - None => Vec::new(), + let (tool_secret_names, tool_secret_pairs) = match (*secrets_guard).as_ref() { + Some(store) => (store.tool_secret_names(), store.tool_secret_pairs()), + None => (Vec::new(), Vec::new()), }; let browser_config = (**rc.browser_config.load()).clone(); @@ -793,12 +803,13 @@ async fn spawn_worker_inner( }) .ok(); + let memory_task = sanitize_worker_memory_task(task, &tool_secret_pairs); state .deps .working_memory .emit( crate::memory::WorkingMemoryEventType::WorkerSpawned, - format!("Worker spawned: {task}"), + format!("Worker spawned: {memory_task}"), ) .channel(state.channel_id.to_string()) .importance(0.6) @@ -872,6 +883,9 @@ async fn spawn_opencode_worker_inner( let persist_directory = directory.clone(); let oc_secrets_store = state.deps.runtime_config.secrets.load().as_ref().clone(); + let oc_tool_secret_pairs = oc_secrets_store + .as_ref() + .map_or_else(Vec::new, |store| store.tool_secret_pairs()); // Build temporal/status context so OpenCode workers get the same system // info (time, model, context window) as builtin workers. @@ -996,12 +1010,13 @@ async fn spawn_opencode_worker_inner( }) .ok(); + let memory_task = sanitize_worker_memory_task(task, &oc_tool_secret_pairs); state .deps .working_memory .emit( crate::memory::WorkingMemoryEventType::WorkerSpawned, - format!("Worker spawned (opencode): {task}"), + format!("Worker spawned (opencode): {memory_task}"), ) .channel(state.channel_id.to_string()) .importance(0.6) @@ -1396,7 +1411,10 @@ fn expand_tilde(path: &str) -> std::path::PathBuf { #[cfg(test)] mod tests { - use super::{WorkerCompletionError, map_worker_completion_result, spawn_worker_task}; + use super::{ + WORKING_MEMORY_TASK_MAX_CHARS, WorkerCompletionError, map_worker_completion_result, + sanitize_worker_memory_task, spawn_worker_task, + }; use crate::{ProcessEvent, WorkerId}; use std::sync::Arc; use std::time::Duration; @@ -1414,6 +1432,39 @@ mod tests { assert!(!success); } + #[test] + fn worker_spawned_memory_task_redacts_secrets() { + let tool_secret_pairs = vec![("API_KEY".to_string(), "stored-secret".to_string())]; + let task = "use stored-secret and sk-ant-abc123456789012345678"; + let result = sanitize_worker_memory_task(task, &tool_secret_pairs); + + assert!( + !result.contains("stored-secret"), + "stored secret should be redacted in: {result}" + ); + assert!( + !result.contains("sk-ant-"), + "leak pattern should be redacted in: {result}" + ); + assert!( + result.contains("[REDACTED:API_KEY]"), + "stored secret marker missing in: {result}" + ); + assert!( + result.contains("[LEAKED_SECRET_REDACTED]"), + "leak marker missing in: {result}" + ); + } + + #[test] + fn worker_spawned_memory_task_is_bounded() { + let task = "a".repeat(WORKING_MEMORY_TASK_MAX_CHARS + 100); + let result = sanitize_worker_memory_task(&task, &[]); + + assert_eq!(result.chars().count(), WORKING_MEMORY_TASK_MAX_CHARS); + assert!(result.ends_with(" ... [truncated]")); + } + #[tokio::test] async fn spawn_worker_task_emits_cancelled_completion_event() { let (event_tx, mut event_rx) = broadcast::channel(8); diff --git a/src/agent/compactor.rs b/src/agent/compactor.rs index fb44ec86c..c33106eb3 100644 --- a/src/agent/compactor.rs +++ b/src/agent/compactor.rs @@ -440,3 +440,36 @@ pub enum CompactionAction { /// Emergency truncation (no LLM, drop oldest 50%). EmergencyTruncate, } + +#[cfg(test)] +mod tests { + use super::extract_summary_section; + + #[test] + fn extract_summary_section_strips_markdown_header() { + let response = "## Summary\n\nCondensed thread narrative"; + assert_eq!( + extract_summary_section(response), + "Condensed thread narrative".to_string() + ); + } + + #[test] + fn run_compaction_path_remains_toolless_and_one_turn() { + let source = include_str!("compactor.rs"); + let run_compaction_start = source + .find("async fn run_compaction") + .expect("run_compaction should exist"); + let run_compaction_end = source.find("#[cfg(test)]").unwrap_or(source.len()); + let run_compaction_source = &source[run_compaction_start..run_compaction_end]; + + assert!( + !run_compaction_source.contains(".tool_server_handle("), + "compactor summary path must stay toolless" + ); + assert!( + run_compaction_source.contains(".default_max_turns(1)"), + "compactor summary path must stay single-turn" + ); + } +} diff --git a/src/agent/cortex.rs b/src/agent/cortex.rs index f95007bb3..3a5313f13 100644 --- a/src/agent/cortex.rs +++ b/src/agent/cortex.rs @@ -3148,6 +3148,25 @@ async fn gather_sections_from_list( gathered } +fn should_regenerate_knowledge_synthesis_state( + current_version: u64, + last_version: u64, + last_change_unix_secs: i64, + debounce_secs: u64, + now_unix_secs: i64, +) -> bool { + if current_version <= last_version { + return false; + } + + let elapsed = if now_unix_secs <= last_change_unix_secs { + 0 + } else { + (now_unix_secs - last_change_unix_secs) as u64 + }; + elapsed >= debounce_secs +} + /// Check if knowledge synthesis needs regeneration based on dirty flag and debounce. pub fn should_regenerate_knowledge_synthesis(deps: &AgentDeps) -> bool { let current_version = deps @@ -3158,21 +3177,24 @@ pub fn should_regenerate_knowledge_synthesis(deps: &AgentDeps) -> bool { .runtime_config .knowledge_synthesis_last_version .load(std::sync::atomic::Ordering::Acquire); - - if current_version == last_version { - return false; - } - - // Debounce: wait for activity to settle. - let cortex_config = **deps.runtime_config.cortex.load(); let last_change = deps .runtime_config .knowledge_synthesis_last_change .load(std::sync::atomic::Ordering::Acquire); + let debounce_secs = deps + .runtime_config + .cortex + .load() + .knowledge_synthesis_debounce_secs; let now = chrono::Utc::now().timestamp(); - let elapsed = now.saturating_sub(last_change) as u64; - elapsed >= cortex_config.knowledge_synthesis_debounce_secs + should_regenerate_knowledge_synthesis_state( + current_version, + last_version, + last_change, + debounce_secs, + now, + ) } // -- Intra-Day Synthesis + Daily Summaries -- @@ -4635,8 +4657,9 @@ mod tests { mark_knowledge_synthesis_version_complete, maybe_close_bulletin_refresh_circuit, maybe_generate_bulletin_under_lock, maybe_spawn_synthesis_task, parse_structured_success_flag, push_signal_into_buffer, record_bulletin_refresh_failure, - should_execute_warmup, should_generate_bulletin_from_bulletin_loop, signal_from_event, - summarize_signal_text, take_lagged_control_flag, + should_execute_warmup, should_generate_bulletin_from_bulletin_loop, + should_regenerate_knowledge_synthesis_state, signal_from_event, summarize_signal_text, + take_lagged_control_flag, }; use crate::ProcessEvent; use crate::agent::process_control::ControlActionResult; @@ -5036,6 +5059,46 @@ mod tests { assert_eq!(last_version.load(Ordering::Acquire), target_version); } + #[test] + fn knowledge_synthesis_trigger_requires_newer_version() { + assert!(!should_regenerate_knowledge_synthesis_state( + 3, 3, 10, 60, 500 + )); + assert!(!should_regenerate_knowledge_synthesis_state( + 2, 3, 10, 60, 500 + )); + } + + #[test] + fn knowledge_synthesis_trigger_respects_debounce_window() { + let current_version = 4; + let last_version = 3; + let last_change = 1_000; + let debounce_secs = 60; + + assert!(!should_regenerate_knowledge_synthesis_state( + current_version, + last_version, + last_change, + debounce_secs, + 1_050 + )); + assert!(should_regenerate_knowledge_synthesis_state( + current_version, + last_version, + last_change, + debounce_secs, + 1_060 + )); + } + + #[test] + fn knowledge_synthesis_trigger_handles_clock_skew_safely() { + assert!(!should_regenerate_knowledge_synthesis_state( + 5, 4, 2_000, 30, 1_900 + )); + } + #[test] fn gathered_sections_fail_when_any_section_query_failed() { let gathered = GatheredSections { diff --git a/src/api/channels.rs b/src/api/channels.rs index 8d0645d8f..e7f1626ff 100644 --- a/src/api/channels.rs +++ b/src/api/channels.rs @@ -527,7 +527,10 @@ pub(super) async fn inspect_prompt( let skills = rc.skills.load(); let skills_prompt = skills .render_channel_prompt(&prompt_engine) - .unwrap_or_default(); + .map_err(|error| { + tracing::warn!(%error, "failed to render skills prompt for inspect"); + StatusCode::INTERNAL_SERVER_ERROR + })?; let browser_enabled = rc.browser_config.load().enabled; let web_search_enabled = rc.brave_search_key.load().is_some(); @@ -540,7 +543,10 @@ pub(super) async fn inspect_prompt( opencode_enabled, &mcp_tool_names, ) - .unwrap_or_default(); + .map_err(|error| { + tracing::warn!(%error, "failed to render worker capabilities for inspect"); + StatusCode::INTERNAL_SERVER_ERROR + })?; let system_info = crate::agent::status::SystemInfo::from_runtime_config( rc.as_ref(), @@ -563,16 +569,25 @@ pub(super) async fn inspect_prompt( .or_else(|| meta.get("slack_workspace_id")) }) .and_then(|v| v.as_str()); - prompt_engine - .render_conversation_context( - &info.platform, - server_name, - info.display_name.as_deref(), - Some(&info.id), - ) - .ok() + Some( + prompt_engine + .render_conversation_context( + &info.platform, + server_name, + info.display_name.as_deref(), + Some(&info.id), + ) + .map_err(|error| { + tracing::warn!(%error, "failed to render conversation context for inspect"); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ) + } + Ok(None) => None, + Err(error) => { + tracing::warn!(%error, "failed to fetch channel metadata for inspect"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); } - _ => None, }; let sandbox_enabled = channel_state.deps.sandbox.containment_active(); @@ -588,7 +603,10 @@ pub(super) async fn inspect_prompt( wm_timezone, ) .await - .unwrap_or_default(); + .map_err(|error| { + tracing::warn!(%error, "failed to render working memory for prompt inspection"); + StatusCode::INTERNAL_SERVER_ERROR + })?; let channel_activity_map = crate::memory::working::render_channel_activity_map( &channel_state.deps.sqlite_pool, @@ -598,7 +616,10 @@ pub(super) async fn inspect_prompt( wm_timezone, ) .await - .unwrap_or_default(); + .map_err(|error| { + tracing::warn!(%error, "failed to render channel activity map for prompt inspection"); + StatusCode::INTERNAL_SERVER_ERROR + })?; let participant_config = **rc.participant_context.load(); let tracked_participants = { @@ -612,14 +633,14 @@ pub(super) async fn inspect_prompt( &participant_config, ) .await - .unwrap_or_else(|error| { + .map_err(|error| { tracing::warn!( %error, channel_id = %query.channel_id, "failed to render participant context for prompt inspection" ); - String::new() - }); + StatusCode::INTERNAL_SERVER_ERROR + })?; // ── Available channels ── let available_channels = { @@ -627,7 +648,10 @@ pub(super) async fn inspect_prompt( .channel_store .list_active() .await - .unwrap_or_default(); + .map_err(|error| { + tracing::warn!(%error, "failed to list channels for prompt inspection"); + StatusCode::INTERNAL_SERVER_ERROR + })?; let entries: Vec = channels .into_iter() .filter(|channel| { @@ -644,7 +668,10 @@ pub(super) async fn inspect_prompt( if entries.is_empty() { None } else { - prompt_engine.render_available_channels(entries).ok() + Some(prompt_engine.render_available_channels(entries).map_err(|error| { + tracing::warn!(%error, "failed to render available channels for prompt inspection"); + StatusCode::INTERNAL_SERVER_ERROR + })?) } }; @@ -710,13 +737,18 @@ pub(super) async fn inspect_prompt( if superiors.is_empty() && subordinates.is_empty() && peers.is_empty() { None } else { - prompt_engine - .render_org_context(crate::prompts::engine::OrgContext { - superiors, - subordinates, - peers, - }) - .ok() + Some( + prompt_engine + .render_org_context(crate::prompts::engine::OrgContext { + superiors, + subordinates, + peers, + }) + .map_err(|error| { + tracing::warn!(%error, "failed to render org context for inspect"); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ) } } }; @@ -732,17 +764,35 @@ pub(super) async fn inspect_prompt( let projects = store .list_projects(Some(crate::projects::ProjectStatus::Active)) .await - .unwrap_or_default(); + .map_err(|error| { + tracing::warn!(%error, "failed to list active projects for prompt inspection"); + StatusCode::INTERNAL_SERVER_ERROR + })?; if projects.is_empty() { None } else { let mut contexts = Vec::with_capacity(projects.len()); for project in &projects { - let repos = store.list_repos(&project.id).await.unwrap_or_default(); - let worktrees = store - .list_worktrees_with_repos(&project.id) - .await - .unwrap_or_default(); + let repos = store.list_repos(&project.id).await.map_err(|error| { + tracing::warn!( + %error, + project_id = %project.id, + "failed to list project repos for prompt inspection" + ); + StatusCode::INTERNAL_SERVER_ERROR + })?; + let worktrees = + store + .list_worktrees_with_repos(&project.id) + .await + .map_err(|error| { + tracing::warn!( + %error, + project_id = %project.id, + "failed to list project worktrees for prompt inspection" + ); + StatusCode::INTERNAL_SERVER_ERROR + })?; contexts.push(ProjectContext { name: project.name.clone(), root_path: project.root_path.clone(), @@ -776,7 +826,10 @@ pub(super) async fn inspect_prompt( .collect(), }); } - prompt_engine.render_projects_context(contexts).ok() + Some(prompt_engine.render_projects_context(contexts).map_err(|error| { + tracing::warn!(%error, "failed to render projects context for prompt inspection"); + StatusCode::INTERNAL_SERVER_ERROR + })?) } }; @@ -803,7 +856,10 @@ pub(super) async fn inspect_prompt( empty_to_none(participant_context), false, // direct_mode — resolved at runtime by the channel, not available here ) - .unwrap_or_default(); + .map_err(|error| { + tracing::warn!(%error, "failed to render full channel prompt for inspect"); + StatusCode::INTERNAL_SERVER_ERROR + })?; let total_chars = system_prompt.chars().count(); diff --git a/src/cron/scheduler.rs b/src/cron/scheduler.rs index cd4dfb12d..dced1a1ac 100644 --- a/src/cron/scheduler.rs +++ b/src/cron/scheduler.rs @@ -130,6 +130,7 @@ pub struct CronContext { } const MAX_CONSECUTIVE_FAILURES: u32 = 3; +const WORKING_MEMORY_CRON_ERROR_MAX_CHARS: usize = 500; /// RAII guard that clears an `AtomicBool` on drop, ensuring the flag is /// released even if the holding task panics. @@ -153,7 +154,12 @@ fn emit_cron_error( failure_class: &'static str, error: &crate::error::Error, ) { - let message = format!("Cron {failure_class}: {job_id}: {error}"); + let secrets_guard = context.deps.runtime_config.secrets.load(); + let tool_secret_pairs = match (*secrets_guard).as_ref() { + Some(store) => store.tool_secret_pairs(), + None => Vec::new(), + }; + let message = cron_error_memory_message(job_id, failure_class, error, &tool_secret_pairs); // Emit to working memory for agent context awareness context @@ -167,6 +173,19 @@ fn emit_cron_error( tracing::error!(cron_id = %job_id, failure_class, %error, "cron job execution failed"); } +fn cron_error_memory_message( + job_id: &str, + failure_class: &'static str, + error: &crate::error::Error, + tool_secret_pairs: &[(String, String)], +) -> String { + crate::secrets::scrub::scrub_working_memory_text( + &format!("Cron {failure_class}: {job_id}: {error}"), + tool_secret_pairs, + WORKING_MEMORY_CRON_ERROR_MAX_CHARS, + ) +} + #[derive(Debug)] enum CronRunError { Execution(crate::error::Error), @@ -1730,9 +1749,11 @@ fn normalize_cron_delivery_response(response: OutboundResponse) -> Option= token_budget { + return Ok(String::new()); + } + let now = Utc::now(); let mut output = String::with_capacity(512); - writeln!(output, "## Other Channels\n").ok(); + output.push_str(header); + let mut tokens_used = header_tokens; + let mut rendered_channels = 0usize; for row in &rows { let channel_id: String = row.get("id"); @@ -767,7 +776,18 @@ pub async fn render_channel_activity_map( }; write!(line, ": {truncated}").ok(); } - writeln!(output, "{line}").ok(); + let candidate = format!("{line}\n"); + let candidate_tokens = estimate_tokens(&candidate); + if tokens_used + candidate_tokens > token_budget { + break; + } + write!(output, "{candidate}").ok(); + tokens_used += candidate_tokens; + rendered_channels += 1; + } + + if rendered_channels == 0 { + return Ok(String::new()); } Ok(output) @@ -1559,6 +1579,86 @@ mod tests { assert!(!rendered.contains("No activity yet today")); } + #[tokio::test] + async fn test_record_fire_and_forget_event_accumulates() { + let store = setup_test_store().await; + let today = store.today(); + + store + .emit( + WorkingMemoryEventType::WorkerCompleted, + "Worker completed: fire-and-forget write", + ) + .channel("chan-1") + .record(); + + tokio::time::timeout(std::time::Duration::from_secs(2), async { + loop { + let events = store.get_events_for_day(&today).await.unwrap(); + if events.iter().any(|event| { + event.summary == "Worker completed: fire-and-forget write" + && event.channel_id.as_deref() == Some("chan-1") + }) { + break; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + }) + .await + .expect("fire-and-forget event should persist"); + } + + #[tokio::test] + async fn test_render_working_memory_prefixes_other_channel_events() { + let store = setup_test_store().await; + let config = test_config(); + let today = store.today(); + + let other_channel_event = WorkingMemoryEvent { + id: Uuid::new_v4().to_string(), + event_type: WorkingMemoryEventType::WorkerCompleted, + timestamp: Utc::now(), + channel_id: Some("chan-2".to_string()), + user_id: None, + summary: "recompiled dependency cache".to_string(), + detail: None, + importance: 0.6, + day: today, + }; + insert_event(&store.pool, &other_channel_event) + .await + .unwrap(); + + let rendered = render_working_memory(&store, "chan-1", &config, Tz::UTC) + .await + .unwrap(); + + assert!( + rendered.contains("[chan-2] Worker completed: recompiled dependency cache"), + "other-channel events should be prefixed for cross-channel awareness" + ); + } + + #[tokio::test] + async fn test_day_for_timestamp_respects_timezone_rollover() { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let tokyo_timezone: Tz = "Asia/Tokyo".parse().unwrap(); + let tokyo_store = WorkingMemoryStore::new(pool.clone(), tokyo_timezone); + let late_utc = DateTime::parse_from_rfc3339("2026-03-18T23:30:00Z") + .unwrap() + .with_timezone(&Utc); + assert_eq!(tokyo_store.day_for_timestamp(late_utc), "2026-03-19"); + + let la_timezone: Tz = "America/Los_Angeles".parse().unwrap(); + let la_store = WorkingMemoryStore::new(pool, la_timezone); + let early_utc = DateTime::parse_from_rfc3339("2026-03-18T02:30:00Z") + .unwrap() + .with_timezone(&Utc); + assert_eq!(la_store.day_for_timestamp(early_utc), "2026-03-17"); + } + #[tokio::test] async fn test_render_working_memory_with_synthesis_and_tail() { let store = setup_test_store().await; @@ -1697,6 +1797,171 @@ mod tests { assert!(rendered.is_empty(), "should be empty with no channels"); } + #[tokio::test] + async fn test_render_channel_activity_map_formats_and_respects_max_channels() { + let store = setup_test_store().await; + let mut config = test_config(); + config.channel_map_max_channels = 2; + config.channel_map_token_budget = 2_000; + config.channel_map_inactive_hours = 72; + + for (id, name) in [ + ("chan-1", "Current Channel"), + ("chan-2", "Channel Two"), + ("chan-3", "Channel Three"), + ("chan-4", "Channel Four"), + ] { + sqlx::query( + "INSERT INTO channels (id, platform, display_name, is_active, last_activity_at) \ + VALUES (?, 'discord', ?, 1, ?)", + ) + .bind(id) + .bind(name) + .bind(Utc::now()) + .execute(&store.pool) + .await + .unwrap(); + } + + for (id, channel, sender, minutes_ago) in [ + ("msg-1", "chan-2", "Alice", 1_i64), + ("msg-2", "chan-3", "Bob", 2_i64), + ("msg-3", "chan-4", "Carol", 3_i64), + ] { + sqlx::query( + "INSERT INTO conversation_messages \ + (id, channel_id, role, sender_name, sender_id, content, created_at) \ + VALUES (?, ?, 'user', ?, 'sender', 'hello', ?)", + ) + .bind(id) + .bind(channel) + .bind(sender) + .bind(Utc::now() - chrono::Duration::minutes(minutes_ago)) + .execute(&store.pool) + .await + .unwrap(); + } + + let branch_topic = WorkingMemoryEvent { + id: Uuid::new_v4().to_string(), + event_type: WorkingMemoryEventType::BranchCompleted, + timestamp: Utc::now(), + channel_id: Some("chan-2".to_string()), + user_id: None, + summary: "Branch outcome: stabilized memory trigger ordering".to_string(), + detail: None, + importance: 0.8, + day: store.today(), + }; + insert_event(&store.pool, &branch_topic).await.unwrap(); + + let rendered = render_channel_activity_map(&store.pool, &store, "chan-1", &config, Tz::UTC) + .await + .unwrap(); + + assert!(rendered.starts_with("## Other Channels\n")); + let channel_lines: Vec<&str> = rendered + .lines() + .filter(|line| line.contains(" -- ")) + .collect(); + assert_eq!( + channel_lines.len(), + 2, + "should include only max_channels entries" + ); + assert!(rendered.contains("Channel Two --")); + assert!(rendered.contains("Channel Three --")); + assert!(!rendered.contains("Channel Four --")); + assert!( + rendered.contains("Branch outcome: stabilized memory trigger ordering"), + "topic hints should be surfaced for recent branch outcomes" + ); + } + + #[tokio::test] + async fn test_render_channel_activity_map_respects_token_budget() { + let store = setup_test_store().await; + let mut config = test_config(); + config.channel_map_max_channels = 3; + config.channel_map_token_budget = 2_000; + config.channel_map_inactive_hours = 72; + + for (id, name, sender) in [ + ("chan-1", "Current Channel", "Current Sender"), + ( + "chan-2", + "A Very Long Channel Name For Budget Tests", + "A Very Long Sender Name", + ), + ( + "chan-3", + "Another Very Long Channel Name For Budget Tests", + "Another Very Long Sender Name", + ), + ] { + sqlx::query( + "INSERT INTO channels (id, platform, display_name, is_active, last_activity_at) \ + VALUES (?, 'discord', ?, 1, ?)", + ) + .bind(id) + .bind(name) + .bind(Utc::now()) + .execute(&store.pool) + .await + .unwrap(); + + sqlx::query( + "INSERT INTO conversation_messages \ + (id, channel_id, role, sender_name, sender_id, content, created_at) \ + VALUES (?, ?, 'user', ?, 'sender', 'hello', ?)", + ) + .bind(format!("msg-{id}")) + .bind(id) + .bind(sender) + .bind(Utc::now()) + .execute(&store.pool) + .await + .unwrap(); + } + + let full = render_channel_activity_map(&store.pool, &store, "chan-1", &config, Tz::UTC) + .await + .unwrap(); + let mut full_lines = full.lines().filter(|line| line.contains(" -- ")); + let first_line = full_lines + .next() + .expect("should render at least one channel line"); + let second_line = full_lines + .next() + .expect("should render at least two channel lines"); + + let one_line_budget = estimate_tokens(&format!("## Other Channels\n\n{first_line}\n")); + let two_line_tokens = estimate_tokens(&format!( + "## Other Channels\n\n{first_line}\n{second_line}\n" + )); + assert!(two_line_tokens > one_line_budget); + + config.channel_map_token_budget = one_line_budget; + let constrained = + render_channel_activity_map(&store.pool, &store, "chan-1", &config, Tz::UTC) + .await + .unwrap(); + + let constrained_lines: Vec<&str> = constrained + .lines() + .filter(|line| line.contains(" -- ")) + .collect(); + assert_eq!( + constrained_lines.len(), + 1, + "token budget should cap rendered channel lines" + ); + assert!( + estimate_tokens(&constrained) <= one_line_budget, + "rendered map should stay within token budget" + ); + } + #[tokio::test] async fn test_render_participant_context_uses_humans_and_recent_activity() { let store = setup_test_store().await; diff --git a/src/secrets/scrub.rs b/src/secrets/scrub.rs index 7eb2dce3a..abe88848e 100644 --- a/src/secrets/scrub.rs +++ b/src/secrets/scrub.rs @@ -254,6 +254,47 @@ pub fn scrub_leaks(content: &str) -> String { result } +/// Scrub and bound text before storing it in working memory. +/// +/// Working memory is replayed into future LLM context, so it must not persist +/// exact tool secrets, known plaintext leak patterns, encoded leak patterns, +/// or unbounded payloads. +pub fn scrub_working_memory_text( + text: &str, + tool_secrets: &[(String, String)], + max_chars: usize, +) -> String { + let scrubbed = scrub_secrets(text, tool_secrets); + let scrubbed = scrub_leaks(&scrubbed); + let scrubbed = redact_working_memory_encoded_leaks(&scrubbed); + truncate_for_working_memory(&scrubbed, max_chars) +} + +fn redact_working_memory_encoded_leaks(text: &str) -> String { + if scan_for_leaks(text).is_some() { + return "[WORKING_MEMORY_REDACTED:encoded-secret]".to_string(); + } + text.to_string() +} + +fn truncate_for_working_memory(text: &str, max_chars: usize) -> String { + const TRUNCATED_SUFFIX: &str = " ... [truncated]"; + + if text.chars().count() <= max_chars { + return text.to_string(); + } + + let suffix_len = TRUNCATED_SUFFIX.chars().count(); + if max_chars <= suffix_len { + return TRUNCATED_SUFFIX.chars().take(max_chars).collect(); + } + + let kept_chars = max_chars - suffix_len; + let mut result: String = text.chars().take(kept_chars).collect(); + result.push_str(TRUNCATED_SUFFIX); + result +} + #[cfg(test)] mod tests { use super::*; @@ -380,4 +421,83 @@ mod tests { "surrounding text should be preserved in: {result}" ); } + + #[test] + fn scrub_working_memory_text_redacts_exact_and_pattern_secrets() { + let tool_secrets = vec![("API_KEY".to_string(), "stored-secret".to_string())]; + let input = "stored-secret and sk-ant-abc123456789012345678"; + let result = scrub_working_memory_text(input, &tool_secrets, 200); + + assert!( + !result.contains("stored-secret"), + "stored secret should be redacted in: {result}" + ); + assert!( + !result.contains("sk-ant-"), + "leak pattern should be redacted in: {result}" + ); + assert!( + result.contains("[REDACTED:API_KEY]"), + "exact redaction marker missing in: {result}" + ); + assert!( + result.contains("[LEAKED_SECRET_REDACTED]"), + "leak redaction marker missing in: {result}" + ); + } + + #[test] + fn scrub_working_memory_text_truncates_on_character_boundaries() { + let input = "é".repeat(100); + let result = scrub_working_memory_text(&input, &[], 20); + + assert_eq!(result.chars().count(), 20); + assert!(result.ends_with(" ... [truncated]")); + assert!(std::str::from_utf8(result.as_bytes()).is_ok()); + } + + #[test] + fn scrub_working_memory_text_fails_closed_for_url_encoded_secret() { + let input = "worker task has sk%2Dant%2Dabc123456789012345678 and context"; + let result = scrub_working_memory_text(input, &[], 200); + + assert_eq!(result, "[WORKING_MEMORY_REDACTED:encoded-secret]"); + assert!( + !result.contains("sk%2Dant"), + "encoded secret should not appear in: {result}" + ); + assert!(scan_for_leaks(&result).is_none()); + } + + #[test] + fn scrub_working_memory_text_fails_closed_for_base64_encoded_secret() { + use base64::Engine as _; + + let secret = "sk-ant-abc123456789012345678"; + let encoded = base64::engine::general_purpose::STANDARD.encode(secret); + let input = format!("cron error included {encoded}"); + let result = scrub_working_memory_text(&input, &[], 200); + + assert_eq!(result, "[WORKING_MEMORY_REDACTED:encoded-secret]"); + assert!( + !result.contains(&encoded), + "encoded secret should not appear in: {result}" + ); + assert!(scan_for_leaks(&result).is_none()); + } + + #[test] + fn scrub_working_memory_text_fails_closed_for_hex_encoded_secret() { + let secret = "sk-ant-abc123456789012345678"; + let encoded = hex::encode(secret); + let input = format!("cron error included {encoded}"); + let result = scrub_working_memory_text(&input, &[], 200); + + assert_eq!(result, "[WORKING_MEMORY_REDACTED:encoded-secret]"); + assert!( + !result.contains(&encoded), + "encoded secret should not appear in: {result}" + ); + assert!(scan_for_leaks(&result).is_none()); + } } diff --git a/src/tasks/store.rs b/src/tasks/store.rs index 6378da948..15c4380e7 100644 --- a/src/tasks/store.rs +++ b/src/tasks/store.rs @@ -103,7 +103,7 @@ impl std::fmt::Display for TaskPriority { } } -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, utoipa::ToSchema)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, JsonSchema, utoipa::ToSchema)] pub struct TaskSubtask { pub title: String, pub completed: bool, @@ -163,6 +163,7 @@ pub struct UpdateTaskInput { #[derive(Debug, Clone)] pub struct TaskUpdateResult { + pub previous_task: Task, pub previous_status: TaskStatus, pub task: Task, } @@ -403,6 +404,7 @@ impl TaskStore { }; let current = task_from_row(row)?; + let previous_task = current.clone(); let previous_status = current.status; let task = Self::update_current_in_tx(&mut tx, task_number, current, input).await?; @@ -411,6 +413,7 @@ impl TaskStore { .context("failed to commit task update transaction")?; Ok(Some(TaskUpdateResult { + previous_task, previous_status, task, })) @@ -458,6 +461,7 @@ impl TaskStore { }; let current = task_from_row(row)?; + let previous_task = current.clone(); let previous_status = current.status; let task = Self::update_current_in_tx(&mut tx, task_number, current, input).await?; @@ -467,6 +471,7 @@ impl TaskStore { Ok(WorkerTaskUpdateResult::Updated(Box::new( TaskUpdateResult { + previous_task, previous_status, task, }, @@ -1086,6 +1091,33 @@ mod tests { ); } + #[tokio::test] + async fn update_result_returns_applied_snapshot_pair() { + let store = setup_store().await; + let created = store + .create(self_assigned_input("old title", TaskStatus::Backlog)) + .await + .expect("task should be created"); + + let result = store + .update_with_status_transition( + created.task_number, + UpdateTaskInput { + title: Some("new title".to_string()), + priority: Some(TaskPriority::High), + ..Default::default() + }, + ) + .await + .expect("update should succeed") + .expect("task should exist"); + + assert_eq!(result.previous_task.title, "old title"); + assert_eq!(result.previous_task.priority, TaskPriority::Medium); + assert_eq!(result.task.title, "new title"); + assert_eq!(result.task.priority, TaskPriority::High); + } + #[tokio::test] async fn global_task_numbers_are_unique_across_agents() { let store = setup_store().await; diff --git a/src/tools/spawn_worker.rs b/src/tools/spawn_worker.rs index 6e1d14978..7b5cd9fcf 100644 --- a/src/tools/spawn_worker.rs +++ b/src/tools/spawn_worker.rs @@ -44,6 +44,16 @@ fn summarize_duplicate_task(task: &str) -> String { } } +const WORKING_MEMORY_TASK_MAX_CHARS: usize = 500; + +fn sanitize_worker_memory_task(task: &str, tool_secret_pairs: &[(String, String)]) -> String { + crate::secrets::scrub::scrub_working_memory_text( + task, + tool_secret_pairs, + WORKING_MEMORY_TASK_MAX_CHARS, + ) +} + /// Error type for spawn worker tool. #[derive(Debug, thiserror::Error)] #[error("Worker spawn failed: {0}")] @@ -443,9 +453,9 @@ impl Tool for DetachedSpawnWorkerTool { let sandbox_write_allowlist = self.deps.sandbox.prompt_write_allowlist(); let secrets_guard = rc.secrets.load(); - let tool_secret_names = match (*secrets_guard).as_ref() { - Some(store) => store.tool_secret_names(), - None => Vec::new(), + let (tool_secret_names, tool_secret_pairs) = match (*secrets_guard).as_ref() { + Some(store) => (store.tool_secret_names(), store.tool_secret_pairs()), + None => (Vec::new(), Vec::new()), }; let browser_config = (**rc.browser_config.load()).clone(); @@ -512,11 +522,12 @@ impl Tool for DetachedSpawnWorkerTool { directory: None, }); + let memory_task = sanitize_worker_memory_task(&args.task, &tool_secret_pairs); self.deps .working_memory .emit( crate::memory::WorkingMemoryEventType::WorkerSpawned, - format!("Worker spawned (cortex): {}", &args.task), + format!("Worker spawned (cortex): {memory_task}"), ) .importance(0.5) .record(); diff --git a/src/tools/task_update.rs b/src/tools/task_update.rs index f76daa4da..219a423b6 100644 --- a/src/tools/task_update.rs +++ b/src/tools/task_update.rs @@ -1,7 +1,7 @@ //! Task update tool for branch and worker processes. use crate::tasks::{ - TaskPriority, TaskStatus, TaskStore, TaskSubtask, UpdateTaskInput, WorkerTaskUpdateResult, + Task, TaskPriority, TaskStatus, TaskStore, TaskSubtask, UpdateTaskInput, WorkerTaskUpdateResult, }; use crate::{AgentId, WorkerId}; use rig::completion::ToolDefinition; @@ -235,6 +235,7 @@ impl Tool for TaskUpdateTool { } }, }; + let previous_task = update_result.previous_task; let previous_status = update_result.previous_status; let updated = update_result.task; @@ -250,10 +251,7 @@ impl Tool for TaskUpdateTool { } else { ( crate::memory::WorkingMemoryEventType::TaskUpdate, - format!( - "Task #{} updated to {}", - updated.task_number, updated.status - ), + task_update_memory_summary(&previous_task, &updated), 0.4, ) }; @@ -272,6 +270,62 @@ impl Tool for TaskUpdateTool { } } +fn task_update_memory_summary(previous: &Task, updated: &Task) -> String { + let mut changes = Vec::new(); + + if previous.status != updated.status { + changes.push(format!("status {} -> {}", previous.status, updated.status)); + } + if previous.priority != updated.priority { + changes.push(format!( + "priority {} -> {}", + previous.priority, updated.priority + )); + } + if previous.title != updated.title { + changes.push("title".to_string()); + } + if previous.description != updated.description { + changes.push("description".to_string()); + } + if previous.subtasks != updated.subtasks { + changes.push("subtasks".to_string()); + } + if previous.metadata != updated.metadata { + changes.push("metadata".to_string()); + } + if previous.worker_id != updated.worker_id { + changes.push(match (&previous.worker_id, &updated.worker_id) { + (None, Some(_)) => "worker assigned".to_string(), + (Some(_), None) => "worker unassigned".to_string(), + _ => "worker binding".to_string(), + }); + } + if previous.approved_by != updated.approved_by { + changes.push("approval".to_string()); + } + if previous.assigned_agent_id != updated.assigned_agent_id { + changes.push("assignment".to_string()); + } + + if changes.is_empty() { + return format!("Task #{} updated", updated.task_number); + } + + if changes.len() == 1 && previous.status != updated.status { + return format!( + "Task #{} updated to {}", + updated.task_number, updated.status + ); + } + + format!( + "Task #{} updated: {}", + updated.task_number, + changes.join(", ") + ) +} + #[cfg(test)] mod tests { use super::*; @@ -279,6 +333,7 @@ mod tests { use crate::memory::working::WorkingMemoryEvent; use crate::memory::{WorkingMemoryEventType, WorkingMemoryStore}; use crate::tasks::store::setup_test_store; + use crate::tasks::{Task, TaskPriority, TaskStatus, TaskSubtask}; use chrono_tz::Tz; use sqlx::sqlite::SqlitePoolOptions; use std::time::Duration; @@ -313,6 +368,72 @@ mod tests { .expect("timed out waiting for working memory event") } + fn task_fixture() -> Task { + Task { + id: "task-id".to_string(), + task_number: 7, + title: "Original title".to_string(), + description: Some("Original description".to_string()), + status: TaskStatus::Backlog, + priority: TaskPriority::Medium, + owner_agent_id: "agent".to_string(), + assigned_agent_id: "agent".to_string(), + subtasks: Vec::new(), + metadata: serde_json::json!({}), + source_memory_id: None, + worker_id: None, + created_by: "branch".to_string(), + approved_at: None, + approved_by: None, + created_at: "2026-04-19T00:00:00Z".to_string(), + updated_at: "2026-04-19T00:00:00Z".to_string(), + completed_at: None, + } + } + + #[test] + fn task_update_memory_summary_preserves_status_update_wording() { + let previous = task_fixture(); + let mut updated = previous.clone(); + updated.status = TaskStatus::Ready; + + assert_eq!( + task_update_memory_summary(&previous, &updated), + "Task #7 updated to ready" + ); + } + + #[test] + fn task_update_memory_summary_names_non_status_changes() { + let previous = task_fixture(); + let mut updated = previous.clone(); + updated.title = "New title".to_string(); + updated.description = Some("New description".to_string()); + updated.priority = TaskPriority::High; + updated.subtasks = vec![TaskSubtask { + title: "Check output".to_string(), + completed: true, + }]; + updated.metadata = serde_json::json!({"source": "review"}); + updated.worker_id = Some("worker-1".to_string()); + updated.approved_by = Some("victor".to_string()); + + assert_eq!( + task_update_memory_summary(&previous, &updated), + "Task #7 updated: priority medium -> high, title, description, subtasks, metadata, worker assigned, approval" + ); + } + + #[test] + fn task_update_memory_summary_handles_no_actual_delta() { + let previous = task_fixture(); + + assert_eq!( + task_update_memory_summary(&previous, &previous), + "Task #7 updated" + ); + } + #[tokio::test] async fn task_update_emits_outcome_for_done_status() { let task_store = Arc::new(setup_test_store().await); @@ -409,7 +530,7 @@ mod tests { assert_eq!(event.event_type, WorkingMemoryEventType::TaskUpdate); assert_eq!( event.summary, - format!("Task #{} updated to done", created.task_number) + format!("Task #{} updated: title", created.task_number) ); } From eda6807a97b7ad1a6dae43d940483d815dc59ee5 Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Thu, 23 Apr 2026 16:16:10 -0400 Subject: [PATCH 2/2] fix(review): close #579 unresolved comment set --- src/agent/channel.rs | 57 ++++++++++----------- src/agent/channel_dispatch.rs | 84 ++++++++++++++++++++----------- src/agent/compactor.rs | 50 ++++++++++++------- src/agent/cortex.rs | 7 +++ src/api/channels.rs | 23 +++++++-- src/secrets/scrub.rs | 93 +++++++++++++++++++++++++++++++++-- src/tasks/store.rs | 2 + src/tools/spawn_worker.rs | 35 ++++++------- 8 files changed, 249 insertions(+), 102 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 440edd96f..0d65b7618 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -3731,38 +3731,39 @@ impl Channel { let wm_config = **self.deps.runtime_config.working_memory.load(); let elapsed = self.last_persistence_at.elapsed(); - let event_count_since_last = if memory_persistence_trigger_kind( + let elapsed_secs = elapsed.as_secs(); + let trigger = match memory_persistence_trigger_kind( self.message_count, - elapsed.as_secs(), + elapsed_secs, None, &wm_config, - ) - .is_none() - { - let since = chrono::Utc::now() - chrono::Duration::seconds(elapsed.as_secs() as i64); - match self - .deps - .working_memory - .count_events_since(self.id.as_ref(), since) - .await - { - Ok(count) => Some(count as usize), - Err(error) => { - tracing::debug!(%error, "event density check failed, skipping"); - None - } - } - } else { - None - }; + ) { + Some(kind) => kind, + None => { + let since = chrono::Utc::now() - chrono::Duration::seconds(elapsed_secs as i64); + let event_count_since_last = match self + .deps + .working_memory + .count_events_since(self.id.as_ref(), since) + .await + { + Ok(count) => Some(count as usize), + Err(error) => { + tracing::debug!(%error, "event density check failed, skipping"); + None + } + }; - let Some(trigger) = memory_persistence_trigger_kind( - self.message_count, - elapsed.as_secs(), - event_count_since_last, - &wm_config, - ) else { - return; + let Some(kind) = memory_persistence_trigger_kind( + self.message_count, + elapsed_secs, + event_count_since_last, + &wm_config, + ) else { + return; + }; + kind + } }; // Reset counters before spawning so subsequent messages don't pile up. diff --git a/src/agent/channel_dispatch.rs b/src/agent/channel_dispatch.rs index 5087457fd..b2761a0bd 100644 --- a/src/agent/channel_dispatch.rs +++ b/src/agent/channel_dispatch.rs @@ -40,8 +40,6 @@ enum WorkerCompletionKind { Failed, } -const WORKING_MEMORY_TASK_MAX_CHARS: usize = 500; - #[derive(Debug, Clone)] pub(crate) enum WorkerCompletionError { Cancelled { reason: String }, @@ -101,14 +99,6 @@ pub(crate) fn map_worker_completion_result( (result_text, notify, success) } -fn sanitize_worker_memory_task(task: &str, tool_secret_pairs: &[(String, String)]) -> String { - crate::secrets::scrub::scrub_working_memory_text( - task, - tool_secret_pairs, - WORKING_MEMORY_TASK_MAX_CHARS, - ) -} - /// Build the worker status text (time + system info) used in worker system prompts. /// /// Centralises the `SystemInfo` + `TemporalContext` assembly so every worker @@ -765,6 +755,8 @@ async fn spawn_worker_inner( }; let worker_id = worker.id; + let sanitized_task = + crate::secrets::scrub::scrub_worker_task_for_memory(task, &tool_secret_pairs); let worker_span = tracing::info_span!( "worker.run", @@ -786,7 +778,7 @@ async fn spawn_worker_inner( { let mut status = state.status_block.write().await; - status.add_worker(worker_id, task, false, interactive); + status.add_worker(worker_id, &sanitized_task, false, interactive); } state @@ -796,26 +788,30 @@ async fn spawn_worker_inner( agent_id: state.deps.agent_id.clone(), worker_id, channel_id: Some(state.channel_id.clone()), - task: task.to_string(), + task: sanitized_task.clone(), worker_type: "builtin".into(), interactive, directory: None, }) .ok(); - let memory_task = sanitize_worker_memory_task(task, &tool_secret_pairs); state .deps .working_memory .emit( crate::memory::WorkingMemoryEventType::WorkerSpawned, - format!("Worker spawned: {memory_task}"), + format!("Worker spawned: {sanitized_task}"), ) .channel(state.channel_id.to_string()) .importance(0.6) .record(); - tracing::info!(worker_id = %worker_id, task = %task, interactive, "worker spawned"); + tracing::info!( + worker_id = %worker_id, + task = %sanitized_task, + interactive, + "worker spawned" + ); Ok(worker_id) } @@ -936,6 +932,8 @@ async fn spawn_opencode_worker_inner( }; let worker_id = worker.id; + let sanitized_task = + crate::secrets::scrub::scrub_worker_task_for_memory(task, &oc_tool_secret_pairs); let worker_span = tracing::info_span!( "worker.run", @@ -990,7 +988,7 @@ async fn spawn_opencode_worker_inner( state.worker_handles.write().await.insert(worker_id, handle); - let opencode_task = format!("[opencode] {task}"); + let opencode_task = format!("[opencode] {sanitized_task}"); { let mut status = state.status_block.write().await; status.add_worker(worker_id, &opencode_task, false, interactive); @@ -1010,19 +1008,23 @@ async fn spawn_opencode_worker_inner( }) .ok(); - let memory_task = sanitize_worker_memory_task(task, &oc_tool_secret_pairs); state .deps .working_memory .emit( crate::memory::WorkingMemoryEventType::WorkerSpawned, - format!("Worker spawned (opencode): {memory_task}"), + format!("Worker spawned (opencode): {sanitized_task}"), ) .channel(state.channel_id.to_string()) .importance(0.6) .record(); - tracing::info!(worker_id = %worker_id, task = %task, interactive, "OpenCode worker spawned"); + tracing::info!( + worker_id = %worker_id, + task = %sanitized_task, + interactive, + "OpenCode worker spawned" + ); Ok(worker_id) } @@ -1247,7 +1249,15 @@ pub async fn resume_idle_worker_into_state( state.worker_handles.write().await.insert(worker_id, handle); - let opencode_task = format!("[opencode] {}", idle_worker.task); + let task_secret_pairs = match rc.secrets.load().as_ref() { + Some(store) => store.tool_secret_pairs(), + None => Vec::new(), + }; + let sanitized_task = crate::secrets::scrub::scrub_worker_task_for_memory( + &idle_worker.task, + &task_secret_pairs, + ); + let opencode_task = format!("[opencode] {sanitized_task}"); { let mut status = state.status_block.write().await; status.add_worker(worker_id, &opencode_task, false, true); @@ -1267,7 +1277,11 @@ pub async fn resume_idle_worker_into_state( }) .ok(); - tracing::info!(worker_id = %worker_id, task = %idle_worker.task, "OpenCode worker resumed"); + tracing::info!( + worker_id = %worker_id, + task = %sanitized_task, + "OpenCode worker resumed" + ); Ok(worker_id) } _ => { @@ -1366,9 +1380,17 @@ pub async fn resume_idle_worker_into_state( state.worker_handles.write().await.insert(worker_id, handle); + let task_secret_pairs = match rc.secrets.load().as_ref() { + Some(store) => store.tool_secret_pairs(), + None => Vec::new(), + }; + let sanitized_task = crate::secrets::scrub::scrub_worker_task_for_memory( + &idle_worker.task, + &task_secret_pairs, + ); { let mut status = state.status_block.write().await; - status.add_worker(worker_id, &idle_worker.task, false, true); + status.add_worker(worker_id, &sanitized_task, false, true); } state @@ -1378,14 +1400,18 @@ pub async fn resume_idle_worker_into_state( agent_id: state.deps.agent_id.clone(), worker_id, channel_id: Some(state.channel_id.clone()), - task: idle_worker.task.clone(), + task: sanitized_task.clone(), worker_type: "builtin".into(), interactive: true, directory: None, }) .ok(); - tracing::info!(worker_id = %worker_id, task = %idle_worker.task, "builtin worker resumed"); + tracing::info!( + worker_id = %worker_id, + task = %sanitized_task, + "builtin worker resumed" + ); Ok(worker_id) } } @@ -1411,10 +1437,8 @@ fn expand_tilde(path: &str) -> std::path::PathBuf { #[cfg(test)] mod tests { - use super::{ - WORKING_MEMORY_TASK_MAX_CHARS, WorkerCompletionError, map_worker_completion_result, - sanitize_worker_memory_task, spawn_worker_task, - }; + use super::{WorkerCompletionError, map_worker_completion_result, spawn_worker_task}; + use crate::secrets::scrub::{WORKING_MEMORY_TASK_MAX_CHARS, scrub_worker_task_for_memory}; use crate::{ProcessEvent, WorkerId}; use std::sync::Arc; use std::time::Duration; @@ -1436,7 +1460,7 @@ mod tests { fn worker_spawned_memory_task_redacts_secrets() { let tool_secret_pairs = vec![("API_KEY".to_string(), "stored-secret".to_string())]; let task = "use stored-secret and sk-ant-abc123456789012345678"; - let result = sanitize_worker_memory_task(task, &tool_secret_pairs); + let result = scrub_worker_task_for_memory(task, &tool_secret_pairs); assert!( !result.contains("stored-secret"), @@ -1459,7 +1483,7 @@ mod tests { #[test] fn worker_spawned_memory_task_is_bounded() { let task = "a".repeat(WORKING_MEMORY_TASK_MAX_CHARS + 100); - let result = sanitize_worker_memory_task(&task, &[]); + let result = scrub_worker_task_for_memory(&task, &[]); assert_eq!(result.chars().count(), WORKING_MEMORY_TASK_MAX_CHARS); assert!(result.ends_with(" ... [truncated]")); diff --git a/src/agent/compactor.rs b/src/agent/compactor.rs index c33106eb3..94ddb2488 100644 --- a/src/agent/compactor.rs +++ b/src/agent/compactor.rs @@ -16,6 +16,32 @@ use std::sync::Arc; use tokio::sync::RwLock; use uuid::Uuid; +const COMPACTOR_SUMMARY_MAX_TURNS: usize = 1; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct CompactionPromptPolicy { + max_turns: usize, + tool_server_enabled: bool, +} + +const fn compaction_prompt_policy() -> CompactionPromptPolicy { + CompactionPromptPolicy { + max_turns: COMPACTOR_SUMMARY_MAX_TURNS, + tool_server_enabled: false, + } +} + +fn build_compaction_summary_agent( + model: SpacebotModel, + compactor_prompt: &str, +) -> rig::agent::Agent { + let policy = compaction_prompt_policy(); + AgentBuilder::new(model) + .preamble(compactor_prompt) + .default_max_turns(policy.max_turns) + .build() +} + /// Programmatic monitor that watches channel context size and triggers compaction. pub struct Compactor { pub channel_id: ChannelId, @@ -248,13 +274,9 @@ async fn run_compaction( .with_context(&*deps.agent_id, "compactor") .with_routing((**routing).clone()); - // Give the compaction worker memory_save so it can directly persist memories // No tool server — the compactor's sole job is producing a summary. - // Memory extraction is handled by persistence branches (Phase 5a). - let agent = AgentBuilder::new(model) - .preamble(compactor_prompt) - .default_max_turns(1) - .build(); + // Memory extraction is handled by persistence branches. + let agent = build_compaction_summary_agent(model, compactor_prompt); let hook = SpacebotHook::new( deps.agent_id.clone(), @@ -443,7 +465,7 @@ pub enum CompactionAction { #[cfg(test)] mod tests { - use super::extract_summary_section; + use super::{compaction_prompt_policy, extract_summary_section}; #[test] fn extract_summary_section_strips_markdown_header() { @@ -456,19 +478,13 @@ mod tests { #[test] fn run_compaction_path_remains_toolless_and_one_turn() { - let source = include_str!("compactor.rs"); - let run_compaction_start = source - .find("async fn run_compaction") - .expect("run_compaction should exist"); - let run_compaction_end = source.find("#[cfg(test)]").unwrap_or(source.len()); - let run_compaction_source = &source[run_compaction_start..run_compaction_end]; - + let policy = compaction_prompt_policy(); assert!( - !run_compaction_source.contains(".tool_server_handle("), + !policy.tool_server_enabled, "compactor summary path must stay toolless" ); - assert!( - run_compaction_source.contains(".default_max_turns(1)"), + assert_eq!( + policy.max_turns, 1, "compactor summary path must stay single-turn" ); } diff --git a/src/agent/cortex.rs b/src/agent/cortex.rs index 3a5313f13..2877295ae 100644 --- a/src/agent/cortex.rs +++ b/src/agent/cortex.rs @@ -5099,6 +5099,13 @@ mod tests { )); } + #[test] + fn knowledge_synthesis_trigger_allows_zero_debounce_at_change_instant() { + assert!(should_regenerate_knowledge_synthesis_state( + 5, 4, 2_000, 0, 2_000 + )); + } + #[test] fn gathered_sections_fail_when_any_section_query_failed() { let gathered = GatheredSections { diff --git a/src/api/channels.rs b/src/api/channels.rs index e7f1626ff..187c06033 100644 --- a/src/api/channels.rs +++ b/src/api/channels.rs @@ -765,7 +765,11 @@ pub(super) async fn inspect_prompt( .list_projects(Some(crate::projects::ProjectStatus::Active)) .await .map_err(|error| { - tracing::warn!(%error, "failed to list active projects for prompt inspection"); + tracing::warn!( + %error, + projects_count = 0usize, + "failed to list active projects for prompt inspection" + ); StatusCode::INTERNAL_SERVER_ERROR })?; if projects.is_empty() { @@ -826,10 +830,19 @@ pub(super) async fn inspect_prompt( .collect(), }); } - Some(prompt_engine.render_projects_context(contexts).map_err(|error| { - tracing::warn!(%error, "failed to render projects context for prompt inspection"); - StatusCode::INTERNAL_SERVER_ERROR - })?) + let projects_count = contexts.len(); + Some( + prompt_engine + .render_projects_context(contexts) + .map_err(|error| { + tracing::warn!( + %error, + projects_count, + "failed to render projects context for prompt inspection" + ); + StatusCode::INTERNAL_SERVER_ERROR + })?, + ) } }; diff --git a/src/secrets/scrub.rs b/src/secrets/scrub.rs index abe88848e..3de70c9df 100644 --- a/src/secrets/scrub.rs +++ b/src/secrets/scrub.rs @@ -47,6 +47,12 @@ static BASE64_SEGMENT: LazyLock = static HEX_SEGMENT: LazyLock = LazyLock::new(|| Regex::new(r"(?i)(?:0x)?([0-9a-f]{40,})").expect("hardcoded regex")); +/// Default character bound for worker task text written to working memory. +pub const WORKING_MEMORY_TASK_MAX_CHARS: usize = 500; + +/// Extra chars to include in the pre-scrub scan window beyond the persisted bound. +const WORKING_MEMORY_SCAN_MARGIN_CHARS: usize = 256; + /// Check content against known API key patterns (plaintext only). pub fn match_leak_patterns(content: &str) -> Option { for pattern in LEAK_PATTERNS.iter() { @@ -264,23 +270,70 @@ pub fn scrub_working_memory_text( tool_secrets: &[(String, String)], max_chars: usize, ) -> String { - let scrubbed = scrub_secrets(text, tool_secrets); + let bounded_input = limit_working_memory_scan_input(text, max_chars); + let scrubbed = scrub_secrets(bounded_input, tool_secrets); let scrubbed = scrub_leaks(&scrubbed); - let scrubbed = redact_working_memory_encoded_leaks(&scrubbed); + let scrubbed = redact_working_memory_encoded_leaks(&scrubbed, tool_secrets); truncate_for_working_memory(&scrubbed, max_chars) } -fn redact_working_memory_encoded_leaks(text: &str) -> String { - if scan_for_leaks(text).is_some() { +pub fn scrub_worker_task_for_memory(task: &str, tool_secrets: &[(String, String)]) -> String { + scrub_working_memory_text(task, tool_secrets, WORKING_MEMORY_TASK_MAX_CHARS) +} + +fn limit_working_memory_scan_input(text: &str, max_chars: usize) -> &str { + let scan_limit_chars = max_chars.saturating_add(WORKING_MEMORY_SCAN_MARGIN_CHARS); + if let Some((byte_index, _)) = text.char_indices().nth(scan_limit_chars) { + &text[..byte_index] + } else { + text + } +} + +fn redact_working_memory_encoded_leaks(text: &str, tool_secrets: &[(String, String)]) -> String { + if scan_for_leaks(text).is_some() || contains_encoded_tool_secret(text, tool_secrets) { return "[WORKING_MEMORY_REDACTED:encoded-secret]".to_string(); } text.to_string() } +fn contains_encoded_tool_secret(text: &str, tool_secrets: &[(String, String)]) -> bool { + use base64::Engine; + + let text_lower = text.to_ascii_lowercase(); + for (_, secret) in tool_secrets { + if secret.len() < 8 { + continue; + } + + let base64_standard = base64::engine::general_purpose::STANDARD.encode(secret); + if text.contains(&base64_standard) || text.contains(base64_standard.trim_end_matches('=')) { + return true; + } + + let base64_url_safe = base64::engine::general_purpose::URL_SAFE.encode(secret); + if text.contains(&base64_url_safe) || text.contains(base64_url_safe.trim_end_matches('=')) { + return true; + } + + let hex_lower = hex::encode(secret); + if text_lower.contains(&hex_lower) { + return true; + } + + let url_encoded = urlencoding::encode(secret); + if text.contains(url_encoded.as_ref()) { + return true; + } + } + + false +} + fn truncate_for_working_memory(text: &str, max_chars: usize) -> String { const TRUNCATED_SUFFIX: &str = " ... [truncated]"; - if text.chars().count() <= max_chars { + if text.chars().nth(max_chars).is_none() { return text.to_string(); } @@ -500,4 +553,34 @@ mod tests { ); assert!(scan_for_leaks(&result).is_none()); } + + #[test] + fn scrub_working_memory_text_fails_closed_for_encoded_tool_secret_variants() { + use base64::Engine as _; + + let tool_secrets = vec![ + ( + "STORE_SECRET".to_string(), + "my-tool-secret-12345".to_string(), + ), + ("URL_SECRET".to_string(), "url/secret+value".to_string()), + ]; + let base64_encoded = base64::engine::general_purpose::STANDARD.encode(&tool_secrets[0].1); + let hex_encoded = hex::encode(&tool_secrets[0].1); + let url_encoded = urlencoding::encode(&tool_secrets[1].1).to_string(); + + let base64_result = scrub_working_memory_text( + &format!("task payload: {base64_encoded}"), + &tool_secrets, + 200, + ); + let hex_result = + scrub_working_memory_text(&format!("task payload: {hex_encoded}"), &tool_secrets, 200); + let url_result = + scrub_working_memory_text(&format!("task payload: {url_encoded}"), &tool_secrets, 200); + + assert_eq!(base64_result, "[WORKING_MEMORY_REDACTED:encoded-secret]"); + assert_eq!(hex_result, "[WORKING_MEMORY_REDACTED:encoded-secret]"); + assert_eq!(url_result, "[WORKING_MEMORY_REDACTED:encoded-secret]"); + } } diff --git a/src/tasks/store.rs b/src/tasks/store.rs index 15c4380e7..75bc2f04d 100644 --- a/src/tasks/store.rs +++ b/src/tasks/store.rs @@ -1114,6 +1114,8 @@ mod tests { assert_eq!(result.previous_task.title, "old title"); assert_eq!(result.previous_task.priority, TaskPriority::Medium); + assert_eq!(result.previous_task.status, TaskStatus::Backlog); + assert_eq!(result.task.status, TaskStatus::Backlog); assert_eq!(result.task.title, "new title"); assert_eq!(result.task.priority, TaskPriority::High); } diff --git a/src/tools/spawn_worker.rs b/src/tools/spawn_worker.rs index 7b5cd9fcf..7bd8618f1 100644 --- a/src/tools/spawn_worker.rs +++ b/src/tools/spawn_worker.rs @@ -44,16 +44,6 @@ fn summarize_duplicate_task(task: &str) -> String { } } -const WORKING_MEMORY_TASK_MAX_CHARS: usize = 500; - -fn sanitize_worker_memory_task(task: &str, tool_secret_pairs: &[(String, String)]) -> String { - crate::secrets::scrub::scrub_working_memory_text( - task, - tool_secret_pairs, - WORKING_MEMORY_TASK_MAX_CHARS, - ) -} - /// Error type for spawn worker tool. #[derive(Debug, thiserror::Error)] #[error("Worker spawn failed: {0}")] @@ -291,17 +281,23 @@ impl Tool for SpawnWorkerTool { } let worker_type_label = if is_opencode { "OpenCode" } else { "builtin" }; + let tool_secret_pairs = match self.state.deps.runtime_config.secrets.load().as_ref() { + Some(store) => store.tool_secret_pairs(), + None => Vec::new(), + }; + let safe_task = + crate::secrets::scrub::scrub_worker_task_for_memory(&args.task, &tool_secret_pairs); // OpenCode workers are always interactive regardless of args.interactive. let effectively_interactive = args.interactive || is_opencode; let message = if effectively_interactive { format!( "Interactive {worker_type_label} worker {worker_id} spawned for: {}. Route follow-ups with route_to_worker.", - args.task + safe_task ) } else { format!( "{worker_type_label} worker {worker_id} spawned for: {}. It will report back when done.", - args.task + safe_task ) }; let readiness_note = if readiness.ready { @@ -512,17 +508,18 @@ impl Tool for DetachedSpawnWorkerTool { let worker_id = worker.id; // Emit WorkerStarted event so the UI can track it. + let memory_task = + crate::secrets::scrub::scrub_worker_task_for_memory(&args.task, &tool_secret_pairs); let _ = self.deps.event_tx.send(crate::ProcessEvent::WorkerStarted { agent_id: self.deps.agent_id.clone(), worker_id, channel_id: None, - task: args.task.clone(), + task: memory_task.clone(), worker_type: "cortex".into(), interactive: false, directory: None, }); - let memory_task = sanitize_worker_memory_task(&args.task, &tool_secret_pairs); self.deps .working_memory .emit( @@ -538,7 +535,7 @@ impl Tool for DetachedSpawnWorkerTool { run_logger.log_worker_started( None, worker_id, - &args.task, + &memory_task, "cortex", &self.deps.agent_id, false, @@ -578,7 +575,11 @@ impl Tool for DetachedSpawnWorkerTool { } } - tracing::info!(worker_id = %worker_id, task = %args.task, "cortex chat spawned detached worker"); + tracing::info!( + worker_id = %worker_id, + task = %memory_task, + "cortex chat spawned detached worker" + ); Ok(SpawnWorkerOutput { worker_id, @@ -586,7 +587,7 @@ impl Tool for DetachedSpawnWorkerTool { interactive: false, message: format!( "Worker {worker_id} spawned for: {}. It will report back when done.", - args.task + memory_task ), }) }