Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 259 additions & 35 deletions apps/memos-local-plugin/core/capture/capture.ts

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions apps/memos-local-plugin/core/capture/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ export interface CaptureConfig {
alphaScoring: boolean;
synthReflections: boolean;
llmConcurrency: number;
/**
* Hard cap for LLM calls made by one topic-end reflect pass. This bounds
* startup recovery / dirty-episode replay so a single large episode cannot
* generate unbounded paid requests.
*/
maxReflectLlmCalls: number;
/**
* Startup-recovered episodes are reconstructed from already-persisted
* traces. Any "orphan" during that replay is usually a matching drift, not
* genuinely missing content. Keep inserts disabled by default to avoid
* duplicating historical rows while still allowing operators to opt in.
*/
maxRecoveryOrphanInserts: number;
/**
* V7 §3.2 batched variant. Controls when reflection synthesis + α scoring
* collapse into ONE LLM call per episode instead of N per-step calls.
Expand Down
6 changes: 6 additions & 0 deletions apps/memos-local-plugin/core/config/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ export const DEFAULT_CONFIG: ResolvedConfig = {
// still contribute useful α values.
synthReflections: true,
llmConcurrency: 4,
// Bound topic-end reflect work so dirty startup recovery cannot replay
// a huge historical episode into thousands of paid LLM calls.
maxReflectLlmCalls: 128,
// Recovered episodes are reconstructed from persisted traces; replay
// orphans are usually matching drift, so do not insert duplicate rows.
maxRecoveryOrphanInserts: 0,
// V7 §3.2 batched variant. With "auto" we issue a single LLM call
// per episode for both reflection synth and α scoring as long as
// the episode is short enough — this collapses 2N per-step calls
Expand Down
4 changes: 4 additions & 0 deletions apps/memos-local-plugin/core/config/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ const AlgorithmSchema = Type.Object({
synthReflections: Bool(false),
/** Concurrency for α scoring + synth LLM calls (per_step mode only). */
llmConcurrency: NumberInRange(4, 1, 32),
/** Hard cap for one topic-end reflect pass, including recovery replay. */
maxReflectLlmCalls: NumberInRange(128, 0, 10_000),
/** Max orphan trace inserts allowed during startup-recovered replay. */
maxRecoveryOrphanInserts: NumberInRange(0, 0, 10_000),
/**
* V7 §3.2 batched variant. When/how to fold per-step reflection synth +
* α scoring into one episode-level LLM call:
Expand Down
134 changes: 82 additions & 52 deletions apps/memos-local-plugin/core/llm/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,12 @@ export function createLlmClientWithProvider(
}

function throwBreakerOpen(): never {
throw makeBreakerOpenError();
}

function makeBreakerOpenError(): MemosError {
const until = circuitOpenUntil ?? breakerNow();
throw new MemosError(
return new MemosError(
ERROR_CODES.LLM_UNAVAILABLE,
`circuit_open: ${circuitOpenedReason ?? "terminal provider error"}`,
{
Expand All @@ -177,6 +181,14 @@ export function createLlmClientWithProvider(
);
}

function canUseHostFallback(): boolean {
return (
config.fallbackToHost === true &&
provider.name !== "host" &&
getHostLlmBridge() !== null
);
}

/**
* Mark a successful primary-provider call. We **do not** clear
* `lastError` / `lastFallbackAt` here — the viewer picks the most
Expand Down Expand Up @@ -258,12 +270,18 @@ export function createLlmClientWithProvider(
op: string,
): Promise<{ completion: LlmCompletion }> {
// ── Circuit breaker short-circuit ──
// When the breaker is open we never reach the provider, so no paid
// request is generated. We still emit (coalesced) `circuit_open`
// status rows so the Logs viewer / Overview can surface that
// suppression is happening.
// When the breaker is open we never reach the primary provider, so
// no request is generated against the broken paid API. We still
// emit (coalesced) `circuit_open` status rows so the Logs viewer /
// Overview can surface that suppression is happening.
if (breakerIsOpen()) {
maybeEmitCircuitOpenStatus(opts, op);
if (canUseHostFallback()) {
return callHostFallback(makeBreakerOpenError(), messages, input, opts, op, {
keepBreakerOpen: true,
notifyError: false,
});
}
throwBreakerOpen();
}
requests++;
Expand Down Expand Up @@ -295,62 +313,21 @@ export function createLlmClientWithProvider(
return { completion };
} catch (err) {
if (shouldFallback(err, config, provider.name)) {
const hostProv = new HostLlmProvider();
const primaryTerminal = breakerIsTerminal(err);
if (primaryTerminal) breakerTrip(err);
try {
const res = await hostProv.complete(messages, input, makeCtx(opts, asProviderLog(rootLogger.child({ channel: "llm.host" }))));
hostFallbacks++;
facadeLog.warn("host.fallback", {
from: provider.name,
op,
reason: summarizeErr(err),
});
const completion: LlmCompletion = {
text: res.text,
provider: provider.name,
model: config.model,
finishReason: res.finishReason,
usage: res.usage,
servedBy: "host_fallback",
durationMs: res.durationMs,
};
record(completion, op, messages);
// The primary provider is still broken even though the host
// bridge saved this call. Tag the slot yellow (`lastFallbackAt`)
// and surface the upstream error to the user via the
// system_error log so they can see *why* fallback engaged.
//
// The circuit breaker stays CLOSED here: from the caller's
// perspective the call was rescued, and tripping the breaker
// on host-fallback success would defeat the point of the
// bridge (it exists precisely to keep going when the primary
// is down). The fallback path also already records the
// primary's failure, so the operator still sees the red trail
// in the Logs viewer.
const fallbackAt = markFallback(err);
breakerRecordSuccess();
notifyOnError(err);
notifyStatus({
status: "fallback",
provider: provider.name,
model: config.model,
message: summarizeErrMessage(err),
code: err instanceof MemosError ? err.code : undefined,
at: fallbackAt,
durationMs: completion.durationMs,
fallbackProvider: "host",
op,
episodeId: opts?.episodeId,
phase: opts?.phase,
return await callHostFallback(err, messages, input, opts, op, {
keepBreakerOpen: primaryTerminal,
notifyError: true,
});
return { completion };
} catch (hostErr) {
failures++;
const failAt = markFail(hostErr);
facadeLog.error("host.fallback_failed", {
primary: summarizeErr(err),
host: summarizeErr(hostErr),
});
// Primary AND host bridge both failed terminally. Trip on the
// Primary AND host bridge both failed. Trip on a terminal
// primary error (the one the operator typically needs to fix
// — host bridge failures are usually transient stdio issues).
if (breakerIsTerminal(err)) breakerTrip(err);
Expand Down Expand Up @@ -617,6 +594,59 @@ export function createLlmClientWithProvider(
}
}

async function callHostFallback(
primaryErr: unknown,
messages: LlmMessage[],
input: ProviderCallInput,
opts: LlmCallOptions | undefined,
op: string,
behavior: { keepBreakerOpen: boolean; notifyError: boolean },
): Promise<{ completion: LlmCompletion }> {
const hostProv = new HostLlmProvider();
const res = await hostProv.complete(
messages,
input,
makeCtx(opts, asProviderLog(rootLogger.child({ channel: "llm.host" }))),
);
hostFallbacks++;
facadeLog.warn("host.fallback", {
from: provider.name,
op,
reason: summarizeErr(primaryErr),
});
const completion: LlmCompletion = {
text: res.text,
provider: provider.name,
model: config.model,
finishReason: res.finishReason,
usage: res.usage,
servedBy: "host_fallback",
durationMs: res.durationMs,
};
record(completion, op, messages);
// The primary provider is still broken even though the host bridge
// saved this call. Keep the breaker open for terminal primary
// errors so later calls can go straight to host fallback without
// touching the paid provider again.
const fallbackAt = markFallback(primaryErr);
if (!behavior.keepBreakerOpen) breakerRecordSuccess();
if (behavior.notifyError) notifyOnError(primaryErr);
notifyStatus({
status: "fallback",
provider: provider.name,
model: config.model,
message: summarizeErrMessage(primaryErr),
code: primaryErr instanceof MemosError ? primaryErr.code : undefined,
at: fallbackAt,
durationMs: completion.durationMs,
fallbackProvider: "host",
op,
episodeId: opts?.episodeId,
phase: opts?.phase,
});
return { completion };
}

const client: LlmClient = {
provider: provider.name,
model: config.model,
Expand Down
14 changes: 14 additions & 0 deletions apps/memos-local-plugin/core/storage/repos/traces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ export function makeTracesRepo(db: StorageDb) {
return db.prepare<typeof params, RawTraceRow>(sql).all(params).map(mapRow);
},

/**
* Full episode-scoped trace fetch with no pagination cap. Capture
* reconciliation must see every row; the normal `list()` path applies
* viewer pagination and can misclassify rows past the page as missing.
*/
listAllForEpisode(episodeId: EpisodeId): TraceRow[] {
const sql =
`SELECT ${COLUMNS.join(", ")} FROM traces WHERE episode_id = @episode_id ORDER BY ts ASC`;
return db
.prepare<{ episode_id: string }, RawTraceRow>(sql)
.all({ episode_id: episodeId })
.map(mapRow);
},

/**
* Total row count matching the same filter (no limit/offset).
* Used by list endpoints so the viewer can show "Page N of M".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ function baseConfig(overrides: Partial<CaptureConfig> = {}): CaptureConfig {
alphaScoring: true,
synthReflections: true,
llmConcurrency: 2,
maxReflectLlmCalls: 128,
maxRecoveryOrphanInserts: 0,
batchMode: "auto",
batchThreshold: 12,
reflectionContextMode: "none",
Expand Down
Loading