Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions examples/openclaw-plugin/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ export type RuntimeIdentity = {
agentId: string;
};
export type LocalClientCacheEntry = {
client: OpenVikingClient;
clientsByAgentId: Map<string, OpenVikingClient>;
process: ReturnType<typeof spawn> | null;
};

export type PendingClientEntry = {
promise: Promise<OpenVikingClient>;
resolve: (c: OpenVikingClient) => void;
promise: Promise<LocalClientCacheEntry>;
resolve: (entry: LocalClientCacheEntry) => void;
reject: (err: unknown) => void;
};

Expand Down Expand Up @@ -69,11 +69,7 @@ export class OpenVikingClient {
) {}

/**
* Dynamically switch the agent identity for multi-agent memory isolation.
* When a shared client serves multiple agents (e.g. in OpenClaw multi-agent
* gateway), call this before each agent's recall/capture to route memories
* to the correct agent_space = md5(user_id + agent_id)[:12].
* Clears cached space resolution so the next request re-derives agent_space.
* @deprecated Prefer creating/requesting an agent-scoped client instance.
*/
setAgentId(newAgentId: string): void {
if (newAgentId && newAgentId !== this.agentId) {
Expand Down
16 changes: 5 additions & 11 deletions examples/openclaw-plugin/context-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export function createMemoryOpenVikingContextEngine(params: {
version?: string;
cfg: Required<MemoryOpenVikingConfig>;
logger: Logger;
getClient: () => Promise<OpenVikingClient>;
getClient: (agentId?: string) => Promise<OpenVikingClient>;
resolveAgentId: (sessionId: string) => string;
}): ContextEngine {
const {
Expand All @@ -145,15 +145,9 @@ export function createMemoryOpenVikingContextEngine(params: {
resolveAgentId,
} = params;

const switchClientAgent = async (sessionId: string, phase: "assemble" | "afterTurn") => {
const client = await getClient();
const getAgentScopedClient = async (sessionId: string, _phase: "assemble" | "afterTurn") => {
const resolvedAgentId = resolveAgentId(sessionId);
const before = client.getAgentId();
if (resolvedAgentId && resolvedAgentId !== before) {
client.setAgentId(resolvedAgentId);
logger.info(`openviking: switched to agentId=${resolvedAgentId} for ${phase}`);
}
return client;
return getClient(resolvedAgentId);
};

return {
Expand Down Expand Up @@ -186,7 +180,7 @@ export function createMemoryOpenVikingContextEngine(params: {
}

try {
await switchClientAgent(afterTurnParams.sessionId, "afterTurn");
await getAgentScopedClient(afterTurnParams.sessionId, "afterTurn");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里调用建议删除掉,没有实际作用


const messages = afterTurnParams.messages ?? [];
if (messages.length === 0) {
Expand Down Expand Up @@ -221,7 +215,7 @@ export function createMemoryOpenVikingContextEngine(params: {
return;
}

const client = await getClient();
const client = await getAgentScopedClient(afterTurnParams.sessionId, "afterTurn");
const sessionId = await client.createSession();
try {
await client.addSessionMessage(sessionId, "user", decision.normalizedText);
Expand Down
46 changes: 31 additions & 15 deletions examples/openclaw-plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ const contextEnginePlugin = {
const localCacheKey = `${cfg.mode}:${cfg.baseUrl}:${cfg.configPath}:${cfg.apiKey}`;

let clientPromise: Promise<OpenVikingClient>;
let localEntryPromise: Promise<import("./client.js").LocalClientCacheEntry> | null = null;
let localProcess: ReturnType<typeof spawn> | null = null;
let resolveLocalClient: ((c: OpenVikingClient) => void) | null = null;
let resolveLocalClient: ((entry: import("./client.js").LocalClientCacheEntry) => void) | null = null;
let rejectLocalClient: ((err: unknown) => void) | null = null;
let localUnavailableReason: string | null = null;
const markLocalUnavailable = (reason: string, err?: unknown) => {
Expand All @@ -107,26 +108,44 @@ const contextEnginePlugin = {
const cached = localClientCache.get(localCacheKey);
if (cached) {
localProcess = cached.process;
clientPromise = Promise.resolve(cached.client);
localEntryPromise = Promise.resolve(cached);
clientPromise = Promise.resolve(cached.clientsByAgentId.get(cfg.agentId) ?? new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs));
} else {
const existingPending = localClientPendingPromises.get(localCacheKey);
if (existingPending) {
clientPromise = existingPending.promise;
localEntryPromise = existingPending.promise;
clientPromise = localEntryPromise.then((entry) => entry.clientsByAgentId.get(cfg.agentId) ?? new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs));
} else {
const entry = {} as PendingClientEntry;
entry.promise = new Promise<OpenVikingClient>((resolve, reject) => {
entry.promise = new Promise((resolve, reject) => {
entry.resolve = resolve;
entry.reject = reject;
});
clientPromise = entry.promise;
localEntryPromise = entry.promise;
clientPromise = localEntryPromise.then((entry) => entry.clientsByAgentId.get(cfg.agentId) ?? new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs));
localClientPendingPromises.set(localCacheKey, entry);
}
}
} else {
clientPromise = Promise.resolve(new OpenVikingClient(cfg.baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs));
}

const getClient = (): Promise<OpenVikingClient> => clientPromise;
const getClient = async (agentId?: string): Promise<OpenVikingClient> => {
const effectiveAgentId = agentId ?? cfg.agentId;
if (cfg.mode !== "local") {
if (effectiveAgentId === cfg.agentId) {
return clientPromise;
}
return new OpenVikingClient(cfg.baseUrl, cfg.apiKey, effectiveAgentId, cfg.timeoutMs);
}
const entry = localEntryPromise ? await localEntryPromise : await Promise.resolve(localClientCache.get(localCacheKey)!);
let client = entry.clientsByAgentId.get(effectiveAgentId);
if (!client) {
client = new OpenVikingClient(cfg.baseUrl, cfg.apiKey, effectiveAgentId, cfg.timeoutMs);
entry.clientsByAgentId.set(effectiveAgentId, client);
}
return client;
};

api.registerTool(
{
Expand Down Expand Up @@ -254,7 +273,7 @@ const contextEnginePlugin = {
let sessionId = sessionIdIn;
let createdTempSession = false;
try {
const c = await getClient();
const c = await getClient(cfg.agentId);
if (!sessionId) {
sessionId = await c.createSession();
createdTempSession = true;
Expand Down Expand Up @@ -283,7 +302,7 @@ const contextEnginePlugin = {
throw err;
} finally {
if (createdTempSession && sessionId) {
const c = await getClient().catch(() => null);
const c = await getClient(cfg.agentId).catch(() => null);
if (c) await c.deleteSession(sessionId!).catch(() => {});
}
}
Expand Down Expand Up @@ -427,18 +446,14 @@ const contextEnginePlugin = {
let client: OpenVikingClient;
try {
client = await withTimeout(
getClient(),
getClient(resolvedAgentId),
5000,
"openviking: client initialization timeout (OpenViking service not ready yet)"
);
} catch (err) {
api.logger.warn?.(`openviking: failed to get client: ${String(err)}`);
return;
}
if (resolvedAgentId && client.getAgentId() !== resolvedAgentId) {
client.setAgentId(resolvedAgentId);
api.logger.info(`openviking: switched to agentId=${resolvedAgentId} for before_prompt_build`);
}

const eventObj = (event ?? {}) as { messages?: unknown[]; prompt?: string };
const queryText =
Expand Down Expand Up @@ -677,8 +692,9 @@ const contextEnginePlugin = {
try {
await waitForHealth(baseUrl, timeoutMs, intervalMs);
const client = new OpenVikingClient(baseUrl, cfg.apiKey, cfg.agentId, cfg.timeoutMs);
localClientCache.set(localCacheKey, { client, process: child });
resolveLocalClient!(client);
const entry = { clientsByAgentId: new Map([[cfg.agentId, client]]), process: child };
localClientCache.set(localCacheKey, entry);
resolveLocalClient!(entry);
rejectLocalClient = null;
api.logger.info(
`openviking: local server started (${baseUrl}, config: ${cfg.configPath})`,
Expand Down
Loading