From e5d3a4aa571aa104cd039a19ee97d2e7ad4105ed Mon Sep 17 00:00:00 2001 From: a844810597 Date: Tue, 16 Jun 2026 11:49:30 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix(memos-local):=20L2=20candidate=20policy?= =?UTF-8?q?=20stuck=20in=20candidate=20=E2=80=94=20add=20Step=205=20rechec?= =?UTF-8?q?k?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A candidate policy whose support/gain already satisfy the promotion thresholds (support >= minSupport && gain >= minGain) but is no longer cosine-matched by any trace in subsequent episodes will never enter 'touched' and therefore nextStatus() is never executed. Add Step 5 after Step 4 to scan all candidate policies and promote any whose stored support/gain already meet the thresholds. Fixes #1932 --- apps/memos-local-plugin/core/memory/l2/l2.ts | 43 +++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/apps/memos-local-plugin/core/memory/l2/l2.ts b/apps/memos-local-plugin/core/memory/l2/l2.ts index 9c6685409..903502b32 100644 --- a/apps/memos-local-plugin/core/memory/l2/l2.ts +++ b/apps/memos-local-plugin/core/memory/l2/l2.ts @@ -34,7 +34,7 @@ import { L2_INDUCTION_PROMPT } from "../../llm/prompts/l2-induction.js"; import { associateTraces } from "./associate.js"; import { makeCandidatePool } from "./candidate-pool.js"; import { buildPolicyRow, induceDraft } from "./induce.js"; -import { applyGain, computeGain, smoothGain } from "./gain.js"; +import { applyGain, computeGain, nextStatus, smoothGain } from "./gain.js"; import { signatureOf } from "./signature.js"; import { tracePolicySimilarity } from "./similarity.js"; import type { @@ -455,6 +455,47 @@ export async function runL2( timings.gain = Date.now() - t0; } + // ─── Step 5: Re-evaluate untouched candidates ──────────────────────── + // A candidate policy that was induced in a previous episode but no longer + // matches any trace will never enter `touched` and therefore never have + // `nextStatus()` run against it. Without this sweep, it stays `candidate` + // forever even though its stored gain/support already satisfy the + // promotion thresholds. + { + const untouchedCandidates = repos.policies.list({ status: "candidate" }); + for (const policy of untouchedCandidates) { + if (touched.has(policy.id)) continue; // already handled in Step 4 + const next = nextStatus({ + currentStatus: policy.status, + support: policy.support, + gain: policy.gain, + thresholds, + }); + if (next !== policy.status) { + repos.policies.updateStats(policy.id, { + support: policy.support, + gain: policy.gain, + status: next, + updatedAt: input.now ?? Date.now(), + }); + emit(bus, { + kind: "l2.policy.updated", + episodeId: input.episodeId, + policyId: policy.id, + status: next, + support: policy.support, + gain: policy.gain, + }); + log.info("run.recheck_candidate_promoted", { + policyId: policy.id, + status: next, + support: policy.support, + gain: policy.gain, + }); + } + } + } + timings.persist = 0; // reserved for future split const completedAt = Date.now(); timings.total = completedAt - startedAt; From 3d70752c7416c33e00f9e89acb533689d6862fcc Mon Sep 17 00:00:00 2001 From: a844810597 Date: Tue, 16 Jun 2026 17:20:14 +0800 Subject: [PATCH 2/2] fix(memos-provider): prevent bridge subprocess leak via weakref + __del__ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The keepalive thread closure in _start_bridge_keepalive captured self directly, creating a reference cycle that prevented GC from collecting the provider. Combined with the missing __del__, a leaked provider would leave its bridge subprocess (Popen) and keepalive thread alive indefinitely. Two complementary fixes: 1. weakref — break the reference cycle so GC can collect the provider when all external references are dropped. 2. __del__ — safety net that calls shutdown() during GC when the caller forgot to invoke it explicitly (e.g. model/routing change paths that assign self.agent = None without calling shutdown). Both are required: weakref enables GC collection, __del__ ensures cleanup happens when collection occurs. --- .../hermes/memos_provider/__init__.py | 49 ++++++++++++++++--- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..a67392ca5 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -51,6 +51,7 @@ import re import sys import threading +import weakref import time from pathlib import Path @@ -1509,6 +1510,21 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor with contextlib.suppress(Exception): self._bridge.request("session.close", {"sessionId": self._session_id}) + def __del__(self) -> None: + # Safety net — if shutdown() was never called (e.g. caller forgot, + # Hermes agent routed model change with self.agent = None), clean + # up the bridge subprocess and keepalive thread on GC. + if self._bridge is not None or ( + self._bridge_keepalive_thread is not None + and self._bridge_keepalive_thread.is_alive() + ): + logger.warning( + "MemOS: __del__ cleaning up leaked provider " + "— shutdown() was never called" + ) + with contextlib.suppress(Exception): + self.shutdown() + def shutdown(self) -> None: # type: ignore[override] self._bridge_keepalive_stop.set() if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive(): @@ -1790,20 +1806,37 @@ def _start_bridge_keepalive(self) -> None: return self._bridge_keepalive_stop.clear() + _self_ref = weakref.ref(self) + def _run() -> None: - while not self._bridge_keepalive_stop.wait(5.0): - if not self._ensure_bridge(self._session_id, timeout=10.0): + while True: + # Stop signal set (e.g. shutdown called by another thread). + # When self is garbage-collected the weakref resolves to None + # and we exit gracefully instead of keeping the thread + bridge + # subprocess alive forever. + provider = _self_ref() + if provider is None: + break + if provider._bridge_keepalive_stop.wait(5.0): + break + if not provider._ensure_bridge(provider._session_id, timeout=10.0): continue try: - assert self._bridge is not None - self._bridge.request("core.health", {}, timeout=10.0) + assert provider._bridge is not None + provider._bridge.request("core.health", {}, timeout=10.0) except Exception as err: - if self._is_transport_closed(err): - logger.info("MemOS: bridge keepalive reconnecting after transport close") + if provider._is_transport_closed(err): + logger.info( + "MemOS: bridge keepalive reconnecting after transport close" + ) with contextlib.suppress(Exception): - self._reconnect_bridge(self._session_id, timeout=10.0) + provider._reconnect_bridge( + provider._session_id, timeout=10.0 + ) else: - logger.debug("MemOS: bridge keepalive failed — %s", err) + logger.debug( + "MemOS: bridge keepalive failed — %s", err + ) self._bridge_keepalive_thread = threading.Thread( target=_run,