diff --git a/extensions/memory-core/src/memory/manager-sync-ops.ts b/extensions/memory-core/src/memory/manager-sync-ops.ts index 53d3cceb2840..6d456d306dd1 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.ts @@ -297,7 +297,7 @@ export abstract class MemoryManagerSyncOps { return openMemoryDatabaseAtPath(dbPath, this.settings.store.vector.enabled); } - private seedEmbeddingCache(sourceDb: DatabaseSync): void { + private async seedEmbeddingCache(sourceDb: DatabaseSync): Promise { if (!this.cache.enabled) { return; } @@ -306,7 +306,7 @@ export abstract class MemoryManagerSyncOps { .prepare( `SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`, ) - .all() as Array<{ + .iterate() as IterableIterator<{ provider: string; model: string; provider_key: string; @@ -315,9 +315,7 @@ export abstract class MemoryManagerSyncOps { dims: number | null; updated_at: number; }>; - if (!rows.length) { - return; - } + // Note: no early-return on empty iterator; BEGIN/COMMIT over an empty tx is cheap. const insert = this.db.prepare( `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) @@ -327,6 +325,9 @@ export abstract class MemoryManagerSyncOps { updated_at=excluded.updated_at`, ); this.db.exec("BEGIN"); + // Yield to event loop every N rows so HTTP /health probes stay responsive. + const SEED_EMBEDDING_YIELD_EVERY = 1000; + let rowCount = 0; for (const row of rows) { insert.run( row.provider, @@ -337,6 +338,12 @@ export abstract class MemoryManagerSyncOps { row.dims, row.updated_at, ); + rowCount += 1; + if (rowCount % SEED_EMBEDDING_YIELD_EVERY === 0) { + await new Promise((resolve) => { + setImmediate(resolve); + }); + } } this.db.exec("COMMIT"); } catch (err) { @@ -1156,7 +1163,7 @@ export abstract class MemoryManagerSyncOps { targetPath: dbPath, tempPath: tempDbPath, build: async () => { - this.seedEmbeddingCache(originalDb); + await this.seedEmbeddingCache(originalDb); const shouldSyncMemory = this.sources.has("memory"); const shouldSyncSessions = this.shouldSyncSessions( { reason: params.reason, force: params.force }, diff --git a/src/gateway/http-utils.ts b/src/gateway/http-utils.ts index 9efa4db6fda9..57f7b4522fff 100644 --- a/src/gateway/http-utils.ts +++ b/src/gateway/http-utils.ts @@ -367,7 +367,16 @@ export function resolveSessionKey(params: { return explicit; } + // Allow callers to route to the main session by sending x-openclaw-dm-scope: main + // or user: "main". Without this, each unique user field creates a separate session, + // splitting context between iMessage, bridge, and other sources. + const dmScope = getHeader(params.req, "x-openclaw-dm-scope")?.trim(); const user = params.user?.trim(); + + if (dmScope === "main" || user === "main") { + return buildAgentMainSessionKey({ agentId: params.agentId, mainKey: "main" }); + } + const mainKey = user ? `${params.prefix}-user:${user}` : `${params.prefix}:${randomUUID()}`; return buildAgentMainSessionKey({ agentId: params.agentId, mainKey }); } diff --git a/src/gateway/openai-http.ts b/src/gateway/openai-http.ts index 0f4b502de5f2..1469c4e2992a 100644 --- a/src/gateway/openai-http.ts +++ b/src/gateway/openai-http.ts @@ -1,6 +1,8 @@ import { randomUUID } from "node:crypto"; import type { IncomingMessage, ServerResponse } from "node:http"; import type { ImageContent } from "../agents/command/types.js"; +import { queueEmbeddedPiMessage } from "../agents/pi-embedded-runner/runs.js"; +import { loadSessionEntryByKey } from "../agents/subagent-announce-delivery.js"; import { hasNonzeroUsage, normalizeUsage, @@ -10,6 +12,7 @@ import { import { createDefaultDeps } from "../cli/deps.js"; import { agentCommandFromIngress } from "../commands/agent.js"; import type { GatewayHttpChatCompletionsConfig } from "../config/types.gateway.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; import { emitAgentEvent, onAgentEvent } from "../infra/agent-events.js"; import { logWarn } from "../logger.js"; import { estimateBase64DecodedBytes } from "../media/base64.js"; @@ -48,6 +51,7 @@ import { normalizeInputHostnameAllowlist } from "./input-allowlist.js"; type OpenAiHttpOptions = { auth: ResolvedGatewayAuth; config?: GatewayHttpChatCompletionsConfig; + runtimeConfig: OpenClawConfig; maxBodyBytes?: number; trustedProxies?: string[]; allowRealIpFallback?: boolean; @@ -608,6 +612,57 @@ export async function handleOpenAiHttpRequest( senderIsOwner, }); + // Steer-backlog: queue into active run if session is busy. + let queuedAsSteer = false; + try { + const queueMode = opts.runtimeConfig.messages?.queue?.mode; + if (queueMode === "steer" || queueMode === "steer-backlog") { + const sessionEntryForQueue = loadSessionEntryByKey(sessionKey); + const sessionIdForQueue = sessionEntryForQueue?.sessionId; + if (sessionIdForQueue) { + queuedAsSteer = queueEmbeddedPiMessage(sessionIdForQueue, prompt.message); + } + } + } catch (err) { + logWarn(`openai-compat: steer-backlog pre-check failed: ${String(err)}`); + } + + const queuedContent = "[queued] Delivered to the agent's next-turn queue."; + + if (queuedAsSteer && !stream) { + res.setHeader("x-openclaw-queued", "next-turn"); + sendJson(res, 200, { + id: runId, + object: "chat.completion", + created: Math.floor(Date.now() / 1000), + model, + choices: [ + { + index: 0, + message: { role: "assistant", content: queuedContent }, + finish_reason: "stop", + }, + ], + usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 }, + }); + return true; + } + + if (queuedAsSteer && stream) { + res.setHeader("x-openclaw-queued", "next-turn"); + setSseHeaders(res); + writeAssistantRoleChunk(res, { runId, model }); + writeAssistantContentChunk(res, { + runId, + model, + content: queuedContent, + finishReason: "stop", + }); + writeDone(res); + res.end(); + return true; + } + if (!stream) { const stopWatchingDisconnect = watchClientDisconnect(req, res, abortController); try { diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index 26f3c6e8eddb..325e3dc9c338 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -1044,6 +1044,7 @@ export function createGatewayHttpServer(opts: { (await getOpenAiHttpModule()).handleOpenAiHttpRequest(req, res, { auth: resolvedAuth, config: openAiChatCompletionsConfig, + runtimeConfig: configSnapshot, trustedProxies, allowRealIpFallback, rateLimiter, diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index 8fc9ea98f760..7ec8d6931db5 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -1,6 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ChannelPlugin } from "../channels/plugins/types.plugin.js"; -import { mergeMockedModule } from "../test-utils/vitest-module-mocks.js"; type LoadedSessionEntry = ReturnType; type RecordInboundSessionAndDispatchReplyParams = Parameters< @@ -157,22 +156,19 @@ vi.mock("../utils/delivery-context.shared.js", () => ({ mergeDeliveryContext: mocks.mergeDeliveryContext, })); -vi.mock("../channels/plugins/index.js", async () => { - return await mergeMockedModule( - await vi.importActual( - "../channels/plugins/index.js", +vi.mock("../channels/plugins/index.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getChannelPlugin: mocks.getChannelPlugin, + normalizeChannelId: mocks.normalizeChannelId.mockImplementation( + (channel?: string | null) => + actual.normalizeChannelId(channel) ?? + (typeof channel === "string" && channel.trim().length > 0 + ? channel.trim().toLowerCase() + : null), ), - (actual) => ({ - getChannelPlugin: mocks.getChannelPlugin, - normalizeChannelId: mocks.normalizeChannelId.mockImplementation( - (channel?: string | null) => - actual.normalizeChannelId(channel) ?? - (typeof channel === "string" && channel.trim().length > 0 - ? channel.trim().toLowerCase() - : null), - ), - }), - ); + }; }); vi.mock("../infra/outbound/targets.js", () => ({ @@ -197,15 +193,12 @@ vi.mock("../plugin-sdk/inbound-reply-dispatch.js", () => ({ recordInboundSessionAndDispatchReply: mocks.recordInboundSessionAndDispatchReply, })); -vi.mock("../infra/heartbeat-wake.js", async () => { - return await mergeMockedModule( - await vi.importActual( - "../infra/heartbeat-wake.js", - ), - () => ({ - requestHeartbeatNow: mocks.requestHeartbeatNow, - }), - ); +vi.mock("../infra/heartbeat-wake.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + requestHeartbeatNow: mocks.requestHeartbeatNow, + }; }); vi.mock("../logging/subsystem.js", () => ({ diff --git a/src/plugins/bundled-runtime-deps.ts b/src/plugins/bundled-runtime-deps.ts index 6053d2ada9a2..046ed48d7091 100644 --- a/src/plugins/bundled-runtime-deps.ts +++ b/src/plugins/bundled-runtime-deps.ts @@ -1026,8 +1026,8 @@ function shouldIncludeBundledPluginRuntimeDeps(params: { includeConfiguredChannels?: boolean; manifestCache?: BundledPluginRuntimeDepsManifestCache; }): boolean { - if (params.pluginIds && !params.pluginIds.has(params.pluginId)) { - return false; + if (params.pluginIds) { + return params.pluginIds.has(params.pluginId); } if (!params.config) { return true; diff --git a/test/scripts/npm-telegram-live.test.ts b/test/scripts/npm-telegram-live.test.ts index cc6a44b08ec6..e116f4b6a986 100644 --- a/test/scripts/npm-telegram-live.test.ts +++ b/test/scripts/npm-telegram-live.test.ts @@ -30,16 +30,12 @@ describe("npm Telegram live Docker E2E", () => { it("installs the npm package before forwarding runtime secrets", () => { const script = readFileSync(DOCKER_SCRIPT_PATH, "utf8"); - const installRunStart = script.indexOf('echo "Running published npm Telegram live Docker E2E'); - const fallbackInstallRunStart = script.indexOf('echo "Running npm Telegram live Docker E2E'); + const installRunStart = script.indexOf('echo "Running package Telegram live Docker E2E'); const installRunEnd = script.indexOf('run_logged docker run --rm \\\n "${docker_env[@]}"'); - const installRun = script.slice( - installRunStart >= 0 ? installRunStart : fallbackInstallRunStart, - installRunEnd, - ); + const installRun = script.slice(installRunStart, installRunEnd); - expect(installRun).toContain('npm install -g "$package_spec" --no-fund --no-audit'); - expect(installRun).toContain('"${PACKAGE_MOUNT_ARGS[@]}"'); + expect(installRun).toContain('npm install -g "$install_source" --no-fund --no-audit'); + expect(installRun).toContain('"${package_mount_args[@]}"'); expect(installRun).not.toContain('"${docker_env[@]}"'); expect(script).toContain('if [ -z "$credential_role" ] && [ -n "${CI:-}" ]'); expect(script).toContain('credential_role="ci"');