-
Notifications
You must be signed in to change notification settings - Fork 0
Rust Module Consolidation Phase 2: Optimization & Compute Migration #266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
All commands now route through ServiceModule implementations. Removed: - Legacy Request enum (~430 lines) - never matched after modular routing - handle_request() fallback function - dead code - Dual-path dispatch logic - simplified to single route_command_sync() path File: 1,384 → 875 lines
All commands now route through ServiceModule implementations. Removed: - Legacy Request enum (~430 lines) - never matched after modular routing - handle_request() fallback function - dead code - Dual-path dispatch logic - simplified to single route_command_sync() path - Legacy Request enum deserialization tests - obsolete File: 1,384 → 775 lines Tests: 491 passed
AUTOMATIC METRICS: - Every command is automatically timed in Runtime.route_command*() - No code needed in modules - runtime wrapper handles timing - Tracks queue_time_ms, execute_time_ms, total_time_ms - Rolling window of last 1000 timings per command - p50/p95/p99 latency percentiles RUNTIME MODULE (new): - runtime/list: Show all modules with configs and priorities - runtime/metrics/all: Get stats for all modules - runtime/metrics/module: Get stats for specific module - runtime/metrics/slow: List recent slow commands (>50ms) ARES PATTERN: - AI can query runtime metrics to identify bottlenecks - AI can see module priorities and command prefixes - Foundation for AI-driven scheduling adjustment 13 modules registered, all with automatic timing
- Move sockets from /tmp/ to .continuum/sockets/ (proper project directory) - Add SOCKETS config to shared/config.ts (single source of truth) - Add getContinuumCoreSocketPath() helper to RustCoreIPC.ts - Update 15+ TypeScript files to use centralized socket config - Add EXPECTED_MODULES const in runtime.rs (13 modules) - Add verify_registration() to Runtime - fails server startup if module missing - Add runtime/metrics command for AI-driven system management
- Add ModuleBackedEmbeddingProvider that delegates to EmbeddingModule's MODEL_CACHE - PersonaMemoryManager now shares the same fastembed model as EmbeddingModule - Eliminates duplicate model load (~100ms startup savings) - Reduces memory by ~200MB (one AllMiniLML6V2 instead of two) - Model loads lazily on first embed call
Performance optimization: O(n²) pairwise similarity now computed in Rust with Rayon parallelization and SIMD vectorization. Rust EmbeddingModule additions: - cosine_similarity(a, b) - SIMD-optimized single pair - pairwise_similarity_matrix() - Rayon-parallelized matrix - embedding/similarity command - embedding/similarity-matrix command (binary f32 response) TypeScript client (RustCoreIPC.ts): - embeddingSimilarity(a, b) - single pair via IPC - embeddingSimilarityMatrix() - batch pairwise via IPC - indexPairwiseSimilarity() - helper for flat array indexing MemoryConsolidationWorker: - computePairwiseSimilaritiesRust() - uses Rust for O(n²) speedup - Logs "🦀 Rust computed" when using native path Also fixes ts-rs binding generator to properly handle stderr warnings.
…m/CambrianTech/continuum into feature/rust-module-consolidation
Adds complete clustering to Rust EmbeddingModule: - detect_clusters() - connected components via BFS - Cluster struct with indices, strength, representative - embedding/cluster command handler Updates both memory consolidation workers to use Rust: - MemoryConsolidationSubprocess now uses Rust similarity matrix - (MemoryConsolidationWorker was updated in Phase 7a) TypeScript client additions: - embeddingCluster() method for full clustering via IPC All O(n²) clustering operations now in Rust with: - Rayon parallelization for similarity matrix - SIMD vectorization for cosine similarity - Native graph traversal for connected components
- Add embedding/top-k command for parallel top-k similarity search - Add RustCoreIPCClient.getInstance() singleton pattern for shared IPC - Update LongTermMemoryStore.findSimilar() to use Rust with TS fallback - Add embeddingTopK() TypeScript client method Performance: O(n) parallel similarity search via Rayon, replacing sequential TypeScript loops for semantic memory retrieval.
- Use embedding/top-k for parallel similarity computation - Leverage Rayon parallelization in Rust - Add TypeScript fallback if Rust unavailable - Eliminates duplicate cosineSimilarity implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR advances the JTAG + continuum-core consolidation by standardizing IPC socket locations under .continuum/sockets, adding runtime performance observability (Rust runtime/* module + runtime/metrics command), and migrating compute-heavy embedding similarity operations from TypeScript to Rust (Rayon-parallelized).
Changes:
- Standardize worker/client socket paths using shared
SOCKETSconfig and.continuum/socketsdirectory. - Add Rust runtime metrics instrumentation +
RuntimeModuleIPC surface and a newruntime/metricsJTAG command. - Add Rust embedding similarity operations (
similarity,similarity-matrix,top-k,cluster) and update TS callers to use Rust for O(n²) similarity work.
Reviewed changes
Copilot reviewed 53 out of 54 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/debug/jtag/workers/workers-config.json | Move worker sockets from /tmp to .continuum/sockets and add socketDir. |
| src/debug/jtag/workers/start-workers.sh | Ensure .continuum/sockets exists before starting workers. |
| src/debug/jtag/workers/continuum-core/src/runtime/runtime.rs | Add expected-module verification and add automatic per-command metrics recording. |
| src/debug/jtag/workers/continuum-core/src/modules/runtime_control.rs | New Rust module exposing runtime metrics and module listing over IPC. |
| src/debug/jtag/workers/continuum-core/src/modules/mod.rs | Register new runtime_control module. |
| src/debug/jtag/workers/continuum-core/src/modules/embedding.rs | Add Rust similarity/similarity-matrix/top-k/cluster compute functions + IPC handlers. |
| src/debug/jtag/workers/continuum-core/src/memory/mod.rs | Re-export new ModuleBackedEmbeddingProvider. |
| src/debug/jtag/workers/continuum-core/src/memory/embedding.rs | Add embedding provider backed by EmbeddingModule’s shared model cache. |
| src/debug/jtag/workers/continuum-core/src/main.rs | Switch Hippocampus memory subsystem to shared module-backed embedding provider. |
| src/debug/jtag/workers/continuum-core/src/ipc/mod.rs | Remove legacy request enum/dispatch; route all commands through runtime; register RuntimeModule + verify registration. |
| src/debug/jtag/workers/continuum-core/bindings/RustCoreIPC.ts | Add socket-path helpers, a singleton client, runtime metrics methods, and embedding similarity IPC methods. |
| src/debug/jtag/system/voice/server/VoiceOrchestratorRustBridge.ts | Use shared continuum-core socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/system/user/server/modules/cognition/memory/MemoryConsolidationWorker.ts | Switch pairwise similarity computation to Rust similarity-matrix IPC. |
| src/debug/jtag/system/user/server/modules/cognition/memory/MemoryConsolidationSubprocess.ts | Switch pairwise similarity computation to Rust similarity-matrix IPC. |
| src/debug/jtag/system/user/server/modules/cognition/memory/LongTermMemoryStore.ts | Use Rust embedding/top-k for similarity search with TS fallback. |
| src/debug/jtag/system/user/server/modules/RustCognitionBridge.ts | Use shared continuum-core socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/system/rag/shared/RAGComposer.ts | Use shared continuum-core socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/system/core/services/RustVectorSearchClient.ts | Resolve continuum-core socket from shared config (relative → absolute). |
| src/debug/jtag/system/core/services/RustEmbeddingClient.ts | Resolve continuum-core socket from shared config (relative → absolute). |
| src/debug/jtag/system/core/logging/Logger.ts | Resolve continuum-core socket from shared config; improve connection-failure logging. |
| src/debug/jtag/shared/version.ts | Bump JTAG version. |
| src/debug/jtag/shared/ipc/logger/LoggerWorkerClient.ts | Update docs to new socket path. |
| src/debug/jtag/shared/generated-command-constants.ts | Add generated command constant for runtime/metrics. |
| src/debug/jtag/shared/config.ts | Introduce SOCKET_DIR and SOCKETS as the single source of truth. |
| src/debug/jtag/server/generated.ts | Register new runtime/metrics server command in generated registry. |
| src/debug/jtag/package.json | Version bump. |
| src/debug/jtag/package-lock.json | Version bump. |
| src/debug/jtag/generator/specs/runtime-metrics.json | Add generator spec for runtime/metrics. |
| src/debug/jtag/generator/specs/logger-daemon-spec.ts | Update lifecycle description to new socket path. |
| src/debug/jtag/generator/generate-rust-bindings.ts | Improve handling of ts-rs warnings vs real failures. |
| src/debug/jtag/generator/generate-logger-daemon.ts | Update generated instructions to new socket path. |
| src/debug/jtag/generator/generate-config.ts | Ensure generated config includes socket constants. |
| src/debug/jtag/generated-command-schemas.json | Regenerate schemas including runtime/metrics. |
| src/debug/jtag/daemons/logger-daemon/shared/LoggerDaemon.ts | Update generated comment to new socket path. |
| src/debug/jtag/daemons/logger-daemon/server/LoggerDaemonServer.ts | Use shared socket config + resolver helper. |
| src/debug/jtag/daemons/data-daemon/server/ORMRustClient.ts | Resolve continuum-core socket from shared config (relative → absolute). |
| src/debug/jtag/daemons/console-daemon/server/ConsoleDaemonServer.ts | Use shared socket config + resolver helper. |
| src/debug/jtag/daemons/code-daemon/server/CodeDaemonServer.ts | Use shared socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/daemons/ai-provider-daemon/server/AIProviderDaemonServer.ts | Use shared socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/commands/voice/synthesize/server/VoiceSynthesizeServerCommand.ts | Use shared socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/commands/search/vector/server/SearchVectorServerCommand.ts | Use shared socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/commands/search/params/server/SearchParamsServerCommand.ts | Use shared socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/commands/search/list/server/SearchListServerCommand.ts | Use shared socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/commands/search/execute/server/SearchExecuteServerCommand.ts | Use shared socket resolution instead of hardcoded /tmp. |
| src/debug/jtag/commands/runtime/metrics/test/unit/RuntimeMetricsCommand.test.ts | Add unit test template for runtime/metrics. |
| src/debug/jtag/commands/runtime/metrics/test/integration/RuntimeMetricsIntegration.test.ts | Add integration test template for runtime/metrics. |
| src/debug/jtag/commands/runtime/metrics/shared/RuntimeMetricsTypes.ts | Add shared types + factories for runtime/metrics. |
| src/debug/jtag/commands/runtime/metrics/server/RuntimeMetricsServerCommand.ts | Add server implementation that routes to Rust runtime metrics IPC. |
| src/debug/jtag/commands/runtime/metrics/package.json | Add command package metadata and scripts. |
| src/debug/jtag/commands/runtime/metrics/browser/RuntimeMetricsBrowserCommand.ts | Add browser command implementation delegating to server. |
| src/debug/jtag/commands/runtime/metrics/README.md | Add documentation for runtime/metrics. |
| src/debug/jtag/commands/runtime/metrics/.npmignore | Add packaging ignores for the new command. |
| src/debug/jtag/browser/generated.ts | Register new runtime/metrics browser command in generated registry. |
Files not reviewed (1)
- src/debug/jtag/package-lock.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -108,13 +153,22 @@ impl Runtime { | |||
| // Wait for result from the tokio task - NO TIMEOUT. | |||
| // Voice/TTS commands can run indefinitely for streaming audio. | |||
| // If the task panics, recv() returns Err(RecvError). | |||
| match rx.recv() { | |||
| Ok(result) => Some(result), | |||
| let result = match rx.recv() { | |||
| Ok(result) => result, | |||
| Err(_) => { | |||
| error!("Command handler task panicked or was cancelled: {command}"); | |||
| Some(Err(format!("Command handler failed: {command}"))) | |||
| Err(format!("Command handler failed: {command}")) | |||
| } | |||
| }; | |||
|
|
|||
| // Record timing (automatic for ALL commands) | |||
| if let Some(metrics) = metrics { | |||
| let tracker = metrics.start_command(command, queued_at); | |||
| let timing = tracker.finish(result.is_ok()); | |||
| metrics.record(timing); | |||
| } | |||
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as the async path: route_command_sync() calls metrics.start_command() only after waiting for rx.recv(). This causes execute_time_ms to measure only the post-execution bookkeeping instead of the handler duration, making runtime metrics inaccurate. Capture the CommandTracker before spawning/dispatching to tokio, then finish()/record once the result is received.
| let metrics = registry | ||
| .get_metrics(module_name) | ||
| .ok_or_else(|| format!("Module '{}' not found", module_name))?; | ||
|
|
||
| Ok(CommandResult::Json(serde_json::to_value(metrics.stats()).unwrap())) | ||
| } |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serde_json::to_value(...).unwrap() can panic and crash the server process if serialization ever fails. Since this is an IPC surface, prefer returning an Err with context (e.g., map_err) rather than panicking.
| // Generate all (i,j) pairs where i < j | ||
| let pairs: Vec<(usize, usize)> = (0..n) | ||
| .flat_map(|i| (i+1..n).map(move |j| (i, j))) | ||
| .collect(); | ||
|
|
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pairwise_similarity_matrix() allocates an additional Vec<(usize, usize)> of all index pairs (O(n²) extra memory) before doing the parallel computation. This can become a major memory overhead for large n and is unnecessary since the (i,j) pairs can be generated on-the-fly (e.g., via a parallel iterator over 0..num_pairs with an index→(i,j) mapping, or nested parallel loops).
| impl ModuleBackedEmbeddingProvider { | ||
| /// Create a new provider using the EmbeddingModule's shared model. | ||
| /// Does NOT load the model - that happens on first embed call. | ||
| pub fn new(model_name: &str) -> Self { | ||
| Self { | ||
| model_name: model_name.to_string(), | ||
| } | ||
| } | ||
|
|
||
| /// Create provider for default AllMiniLML6V2 model. | ||
| pub fn default_model() -> Self { | ||
| Self::new("allminilml6v2") | ||
| } | ||
| } | ||
|
|
||
| impl EmbeddingProvider for ModuleBackedEmbeddingProvider { | ||
| fn name(&self) -> &str { | ||
| "module-backed-embedding" | ||
| } | ||
|
|
||
| fn dimensions(&self) -> usize { | ||
| 384 // AllMiniLML6V2 dimensions | ||
| } |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ModuleBackedEmbeddingProvider::new() accepts an arbitrary model_name, but dimensions() always returns 384. This becomes incorrect for other supported models in EmbeddingModule (e.g., BGE* models typically use 768 dims) and can cause downstream logic that relies on dimensions() to misbehave. Consider either (a) restricting ModuleBackedEmbeddingProvider to the 384-dim model, or (b) deriving dimensions from the selected model (mapping model_name→dims or querying the loaded model).
| mode: data.mode ?? undefined, | ||
| module: data.module ?? '', | ||
| ...data |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createRuntimeMetricsParams() sets module to an empty string when it’s omitted (module: data.module ?? ''). Since module is optional, sending "" can be ambiguous and differs from truly omitting the field. Consider using undefined when not provided (module: data.module ?? undefined) and avoid spreading ...data after setting defaults to prevent accidental overrides.
| mode: data.mode ?? undefined, | |
| module: data.module ?? '', | |
| ...data | |
| ...data, | |
| mode: data.mode ?? undefined, | |
| module: data.module ?? undefined, |
| // 2. Compute pairwise cosine similarities (Rust: Rayon-parallelized, SIMD-optimized) | ||
| const similarities = await this.computePairwiseSimilaritiesRust(allEmbeddings); | ||
|
|
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
detectPatterns() now always calls computePairwiseSimilaritiesRust() without any fallback. If the Rust IPC call fails (socket not available, connection error, etc.), pattern detection will throw and can break the worker loop even though a TypeScript fallback implementation still exists below. Consider wrapping the Rust call in try/catch and falling back to computePairwiseSimilarities() on failure.
| // Compute pairwise similarities (Rust: Rayon-parallelized, SIMD-optimized) | ||
| const similarities = await this.computePairwiseSimilaritiesRust(allEmbeddings); | ||
|
|
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
detectPatterns() now unconditionally awaits computePairwiseSimilaritiesRust(). If the continuum-core socket is unavailable or the IPC request fails, this method will throw and the subprocess tick will error even though a TypeScript fallback exists. Consider catching IPC errors here and using computePairwiseSimilarities() as a fallback.
| // Get metrics tracker for this module | ||
| let metrics = self.registry.get_metrics(module_name); | ||
| let queued_at = std::time::Instant::now(); | ||
|
|
||
| // Execute command | ||
| let result = module.handle_command(&full_cmd, params).await; | ||
|
|
||
| // Record timing (automatic for ALL commands) | ||
| if let Some(metrics) = metrics { | ||
| let tracker = metrics.start_command(command, queued_at); | ||
| let timing = tracker.finish(result.is_ok()); | ||
| metrics.record(timing); | ||
| } |
Copilot
AI
Feb 10, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In both route_command() and route_command_sync(), metrics.start_command() is called after the module handler finishes. ModuleMetrics::start_command() is documented/implemented as “called by runtime BEFORE dispatching”, and it captures started_at=Instant::now(); calling it after execution will make execute_time_ms near-zero and skew queue/total timings. Start the CommandTracker before invoking handle_command, then call finish()/record after the handler returns.
EmbeddingModule:
- Add LRU cache for embedding results (10K max entries, 5-min TTL)
- Cache check before generation, store after computation
- Add embedding/cache/stats and embedding/cache/clear commands
- Track hit/miss counts for performance analysis
DataModule:
- Store ModuleContext during initialize() for event bus access
- Publish events on successful CRUD: data:{collection}:{created|updated|deleted}
- Publish data:batch:completed on batch success
ModuleContext:
- Add logger factory: ctx.logger("module_name") returns Arc<ModuleLogger>
- DashMap-backed cache creates loggers on demand
- Per-module log files at .continuum/jtag/logs/system/modules/{name}.log
DataModule:
- Add log_slow_query() helper using module logger
- Log slow operations (>50ms) to data.log
- Simplified timing code (removed unused breakdowns)
- Example: "query took 336ms | collection=user_states"
ToolFormatAdapter:
- Add FunctionStyleToolAdapter for <function=name>{json}</function> format
- Supports Groq, Together, and other models that use this style
- JSON parameter parsing with fallback to key=value
Git config:
- Remove local user.name/user.email override (was "DeepSeek Assistant")
- Now uses global config (joelteply)
Supports: code/search {"query": "..."} format (no wrapping tags)
- Recognizes tool prefixes: code/, data/, ai/, collaboration/, etc.
- Parses JSON parameters after tool name
- Fallback key-value extraction if JSON malformed
This matches how models naturally output tool calls without explicit tags.
Summary
This PR completes Phase 2 of the Rust module consolidation, focusing on optimization and moving compute-heavy operations from TypeScript to Rust.
Phase 5: Legacy Code Cleanup
ipc/mod.rsRequestenum (55+ unused command variants)runtime.route_command_sync()Phase 6a: Runtime Metrics
runtime/metricscommand for real-time module performance monitoringPhase 7: Move Compute to Rust (High Impact)
Added 4 new embedding commands with Rayon parallelization:
embedding/similarityembedding/similarity-matrixembedding/top-kembedding/clusterTypeScript files updated to use Rust:
MemoryConsolidationWorker.ts- Uses Rust similarity matrixMemoryConsolidationSubprocess.ts- Uses Rust similarity matrixLongTermMemoryStore.ts- Uses Rust top-k with TS fallbackRustCoreIPCClient.ts- Added singleton pattern + 4 new methodsOther Improvements
Test plan
npm run build:ts- TypeScript compilescargo build --release -p continuum-core- Rust compilesnpm start- System deploys successfully./jtag ping- Server and browser healthy./jtag runtime/metrics- All 13 modules registeredStats
🤖 Generated with Claude Code