Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/memos-local-plugin/core/config/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ export const DEFAULT_CONFIG: ResolvedConfig = {
// hits before injection.
llmFilterMinCandidates: 2,
llmFilterCandidateBodyChars: 500,
// Default 0 — no time-window bound, keeping the legacy
// brute-force scan behaviour for fresh installs that haven't
// grown past the threshold where the bound starts paying off.
// Operators with >50K traces are expected to flip this on (we
// suggest 86_400_000 = 24h, or 2_592_000_000 = 30 days).
vectorScanMaxAgeMs: 0,
},
},
hub: {
Expand Down
22 changes: 22 additions & 0 deletions apps/memos-local-plugin/core/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,28 @@ const AlgorithmSchema = Type.Object({
* slightly larger window pays for itself).
*/
llmFilterCandidateBodyChars: NumberInRange(500, 120, 2000),
/**
* Tier-2 vector scan time-window bound (ms). When > 0, the
* vector scan path (`scanAndTopK` in `core/storage/vector.ts`)
* only considers traces written within the last
* `vectorScanMaxAgeMs` milliseconds. Set to `0` to disable the
* cap (legacy behaviour: full-table brute-force scan).
*
* Background: at 93K rows × 1536 dims the unbounded scan blocks
* the Node event loop for 5–30 s every `onTurnStart`
* (https://github.com/MemTensor/MemOS/issues/1929). A 24-hour
* window keeps onTurnStart latency under control without
* sacrificing recall for active-session memories. FTS keyword
* channels still cover older traces, so this bound only affects
* the cosine-only path.
*
* Hard cap is one year (31_536_000_000 ms) — anything larger is
* indistinguishable from "unbounded" at the corpus sizes where
* the bound starts to matter, and accepting absurdly large
* values lets misconfigured deployments silently revert to the
* old behaviour.
*/
vectorScanMaxAgeMs: NumberInRange(0, 0, 31_536_000_000),
}, { default: {} }),
}, { default: {} });

Expand Down
1 change: 1 addition & 0 deletions apps/memos-local-plugin/core/pipeline/deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export function extractAlgorithmConfig(
llmFilterMinCandidates: alg.lightweightMemory.enabled ? 1 : alg.retrieval.llmFilterMinCandidates,
llmFilterCandidateBodyChars: alg.retrieval.llmFilterCandidateBodyChars,
lightweightMemory: alg.lightweightMemory.enabled,
vectorScanMaxAgeMs: alg.retrieval.vectorScanMaxAgeMs,
},
session: {
followUpMode: alg.session.followUpMode,
Expand Down
109 changes: 58 additions & 51 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ import { rootLogger } from "../logger/index.js";
import type { Logger } from "../logger/types.js";
import { openDb } from "../storage/connection.js";
import { runMigrations } from "../storage/migrator.js";
import { makeRepos } from "../storage/repos/index.js";
import {
embeddingMaintenanceCounts,
inferStoredEmbeddingByteLen,
makeRepos,
} from "../storage/repos/index.js";
import { createEmbedder } from "../embedding/embedder.js";
import { createLlmClient } from "../llm/client.js";
import {
Expand Down Expand Up @@ -112,6 +116,14 @@ import type { UserFeedback } from "../reward/types.js";

const FINAL_HUB_LLM_FILTER_TIMEOUT_MS = 3_000;
const IMPORT_WRITE_BATCH_SIZE = 500;
/**
* Float32 byte width. Stored vector BLOBs are little-endian Float32
* arrays produced by `encodeVector(Float32Array)`; their byte length
* is `dimensions * FLOAT32_BYTES_PER_ELEMENT`. The SQL fast path of
* `computeEmbeddingMaintenanceStats` compares stored BLOB byte length
* against `configuredDimension * FLOAT32_BYTES_PER_ELEMENT`.
*/
const FLOAT32_BYTES_PER_ELEMENT = 4;

export interface BootstrapOptions {
agent: AgentKind;
Expand Down Expand Up @@ -4032,24 +4044,37 @@ export function createMemoryCore(
};

function computeEmbeddingMaintenanceStats(): EmbeddingMaintenanceStats {
// SQL-only fast path (issue #1929).
//
// The previous implementation paginated `traces` / `policies` /
// `world_model` / `skills` end-to-end via `repos.<table>.list()`,
// which hydrates the full row — BLOB vector columns included —
// through `mapRow()`. On a production deployment with ~93K rows
// and ~270 MB of vector BLOBs that single call blocked the Node
// event loop for 4+ minutes at 100% CPU.
//
// `embeddingMaintenanceCounts` runs five `SELECT COUNT(*) +
// SUM(CASE WHEN ...)` queries — `LENGTH(blob)` reads only the BLOB
// header, never the payload — so we keep the same per-bucket
// semantics without touching a single vector byte.
const configuredDimension = handle.embedder?.dimensions ?? 0;
const allSlots = collectEmbeddingSlots();
const dimension = configuredDimension > 0 ? configuredDimension : inferStoredEmbeddingDimension(allSlots);
const byKind = emptyEmbeddingStatsByKind();
for (const slot of allSlots) {
const bucket = byKind[slot.kind];
bucket.totalSlots++;
if (!slot.vec) {
bucket.missing++;
} else if (dimension > 0 && slot.vec.length !== dimension) {
bucket.dimMismatch++;
} else {
bucket.ready++;
}
}
for (const bucket of Object.values(byKind)) {
bucket.needsRepair = bucket.missing + bucket.dimMismatch;
}
const inferredByteLen = configuredDimension > 0
? configuredDimension * FLOAT32_BYTES_PER_ELEMENT
: inferStoredEmbeddingByteLen(handle.db);
const dimension = configuredDimension > 0
? configuredDimension
: Math.floor(inferredByteLen / FLOAT32_BYTES_PER_ELEMENT);

const raw = embeddingMaintenanceCounts(handle.db, {
expectedByteLen: inferredByteLen,
});

const byKind: EmbeddingMaintenanceStats["byKind"] = {
trace: addNeedsRepair(raw.trace),
policy: addNeedsRepair(raw.policy),
world_model: addNeedsRepair(raw.world_model),
skill: addNeedsRepair(raw.skill),
};
const totalSlots = sumEmbeddingStats(byKind, "totalSlots");
const ready = sumEmbeddingStats(byKind, "ready");
const missing = sumEmbeddingStats(byKind, "missing");
Expand All @@ -4066,6 +4091,21 @@ export function createMemoryCore(
};
}

function addNeedsRepair(bucket: {
totalSlots: number;
ready: number;
missing: number;
dimMismatch: number;
}): EmbeddingMaintenanceStats["byKind"]["trace"] {
return {
totalSlots: bucket.totalSlots,
ready: bucket.ready,
missing: bucket.missing,
dimMismatch: bucket.dimMismatch,
needsRepair: bucket.missing + bucket.dimMismatch,
};
}

async function ensureEmbeddingDimensionKnown(): Promise<void> {
if (!handle.embedder || handle.embedder.dimensions > 0) return;
try {
Expand All @@ -4080,23 +4120,6 @@ export function createMemoryCore(
}
}

function inferStoredEmbeddingDimension(slots: readonly EmbeddingSlot[]): number {
const counts = new Map<number, number>();
for (const slot of slots) {
if (!slot.vec) continue;
counts.set(slot.vec.length, (counts.get(slot.vec.length) ?? 0) + 1);
}
let bestDim = 0;
let bestCount = 0;
for (const [dim, count] of counts) {
if (count > bestCount) {
bestDim = dim;
bestCount = count;
}
}
return bestDim;
}

function shouldTraceHaveEmbeddings(row: TraceRow): boolean {
// Skip traces where both user and agent text are very short
const userLen = row.userText.trim().length;
Expand Down Expand Up @@ -4206,22 +4229,6 @@ export function createMemoryCore(
return row.tags.includes("lightweight_memory");
}

function emptyEmbeddingStatsByKind(): EmbeddingMaintenanceStats["byKind"] {
const empty = () => ({
totalSlots: 0,
ready: 0,
missing: 0,
dimMismatch: 0,
needsRepair: 0,
});
return {
trace: empty(),
policy: empty(),
world_model: empty(),
skill: empty(),
};
}

function sumEmbeddingStats(
byKind: EmbeddingMaintenanceStats["byKind"],
key: "totalSlots" | "ready" | "missing" | "dimMismatch" | "needsRepair",
Expand Down
48 changes: 42 additions & 6 deletions apps/memos-local-plugin/core/retrieval/tier2-trace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export async function runTier2(deps: Tier2Deps, input: Tier2Input): Promise<Tier
const startedAt = Date.now();
try {
const searchFilters = buildTraceSearchFilters(deps, input);
const vectorFilters = buildVectorSearchFilters(deps, searchFilters);
const vecPoolSize = Math.max(
config.tier2TopK,
Math.ceil(config.tier2TopK * config.candidatePoolFactor),
Expand All @@ -100,8 +101,8 @@ export async function runTier2(deps: Tier2Deps, input: Tier2Input): Promise<Tier
const summaryHits = repos.traces.searchByVector(input.queryVec, vecPoolSize, {
kind: "summary",
anyOfTags: tagsForStorage,
where: searchFilters.where,
params: searchFilters.params,
where: vectorFilters.where,
params: vectorFilters.params,
hardCap: vecPoolSize * 4,
});
mergeChannelHits(blended, summaryHits, "vec_summary", input.queryVec);
Expand All @@ -110,8 +111,8 @@ export async function runTier2(deps: Tier2Deps, input: Tier2Input): Promise<Tier
const actionHits = repos.traces.searchByVector(input.queryVec, vecPoolSize, {
kind: "action",
anyOfTags: tagsForStorage,
where: searchFilters.where,
params: searchFilters.params,
where: vectorFilters.where,
params: vectorFilters.params,
hardCap: vecPoolSize * 4,
});
mergeChannelHits(blended, actionHits, "vec_action", input.queryVec);
Expand All @@ -125,8 +126,8 @@ export async function runTier2(deps: Tier2Deps, input: Tier2Input): Promise<Tier
log.debug("tag_filter_relaxed", { tags: tagsForStorage });
const retry = repos.traces.searchByVector(input.queryVec, vecPoolSize, {
kind: "summary",
where: searchFilters.where,
params: searchFilters.params,
where: vectorFilters.where,
params: vectorFilters.params,
hardCap: vecPoolSize * 4,
});
mergeChannelHits(blended, retry, "vec_summary", input.queryVec);
Expand Down Expand Up @@ -287,6 +288,41 @@ function buildTraceSearchFilters(
return { where: parts.join(" AND "), params };
}

/**
* Layered on top of {@link buildTraceSearchFilters}. Adds the
* `vectorScanMaxAgeMs` time-window bound (issue #1929) so the cosine
* brute-force scan over `traces.vec_summary` / `vec_action` only
* touches rows newer than the configured cutoff. The keyword
* channels (FTS / pattern / structural) keep the unbounded view so
* ancient traces remain reachable by exact-text recall.
*
* Returns the base filters unchanged when `vectorScanMaxAgeMs` is
* `0`, missing, or any other non-positive value — matching the
* legacy "scan everything" behaviour.
*/
function buildVectorSearchFilters(
deps: Tier2Deps,
base: { where?: string; params?: Record<string, unknown> },
): { where?: string; params?: Record<string, unknown> } {
const maxAgeMs = deps.config.vectorScanMaxAgeMs;
if (
typeof maxAgeMs !== "number" ||
!Number.isFinite(maxAgeMs) ||
maxAgeMs <= 0
) {
return base;
}
const minTs = deps.now() - maxAgeMs;
const params: Record<string, unknown> = {
...(base.params ?? {}),
vector_scan_min_ts: minTs,
};
const where = base.where
? `${base.where} AND ts >= @vector_scan_min_ts`
: "ts >= @vector_scan_min_ts";
return { where, params };
}

function resolveTagFilter(
tags: readonly string[],
config: RetrievalConfig,
Expand Down
10 changes: 10 additions & 0 deletions apps/memos-local-plugin/core/retrieval/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ export interface RetrievalConfig {
llmFilterCandidateBodyChars?: number;
/** Low-cost mode: retrieve raw trace memories only. */
lightweightMemory?: boolean;
/**
* Tier-2 vector scan time-window bound (ms). When > 0, the cosine
* scan path only considers `traces` rows whose `ts` is within the
* last `vectorScanMaxAgeMs` milliseconds. Set to `0` to disable
* (legacy full-table brute-force scan). See
* https://github.com/MemTensor/MemOS/issues/1929 for the original
* starvation report and `core/config/schema.ts` for the YAML
* binding + validation rules.
*/
vectorScanMaxAgeMs?: number;
}

/**
Expand Down
Loading
Loading