diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..a67392ca5 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -51,6 +51,7 @@ import re import sys import threading +import weakref import time from pathlib import Path @@ -1509,6 +1510,21 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor with contextlib.suppress(Exception): self._bridge.request("session.close", {"sessionId": self._session_id}) + def __del__(self) -> None: + # Safety net — if shutdown() was never called (e.g. caller forgot, + # Hermes agent routed model change with self.agent = None), clean + # up the bridge subprocess and keepalive thread on GC. + if self._bridge is not None or ( + self._bridge_keepalive_thread is not None + and self._bridge_keepalive_thread.is_alive() + ): + logger.warning( + "MemOS: __del__ cleaning up leaked provider " + "— shutdown() was never called" + ) + with contextlib.suppress(Exception): + self.shutdown() + def shutdown(self) -> None: # type: ignore[override] self._bridge_keepalive_stop.set() if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive(): @@ -1790,20 +1806,37 @@ def _start_bridge_keepalive(self) -> None: return self._bridge_keepalive_stop.clear() + _self_ref = weakref.ref(self) + def _run() -> None: - while not self._bridge_keepalive_stop.wait(5.0): - if not self._ensure_bridge(self._session_id, timeout=10.0): + while True: + # Stop signal set (e.g. shutdown called by another thread). + # When self is garbage-collected the weakref resolves to None + # and we exit gracefully instead of keeping the thread + bridge + # subprocess alive forever. + provider = _self_ref() + if provider is None: + break + if provider._bridge_keepalive_stop.wait(5.0): + break + if not provider._ensure_bridge(provider._session_id, timeout=10.0): continue try: - assert self._bridge is not None - self._bridge.request("core.health", {}, timeout=10.0) + assert provider._bridge is not None + provider._bridge.request("core.health", {}, timeout=10.0) except Exception as err: - if self._is_transport_closed(err): - logger.info("MemOS: bridge keepalive reconnecting after transport close") + if provider._is_transport_closed(err): + logger.info( + "MemOS: bridge keepalive reconnecting after transport close" + ) with contextlib.suppress(Exception): - self._reconnect_bridge(self._session_id, timeout=10.0) + provider._reconnect_bridge( + provider._session_id, timeout=10.0 + ) else: - logger.debug("MemOS: bridge keepalive failed — %s", err) + logger.debug( + "MemOS: bridge keepalive failed — %s", err + ) self._bridge_keepalive_thread = threading.Thread( target=_run, diff --git a/apps/memos-local-plugin/core/memory/l2/l2.ts b/apps/memos-local-plugin/core/memory/l2/l2.ts index 9c6685409..903502b32 100644 --- a/apps/memos-local-plugin/core/memory/l2/l2.ts +++ b/apps/memos-local-plugin/core/memory/l2/l2.ts @@ -34,7 +34,7 @@ import { L2_INDUCTION_PROMPT } from "../../llm/prompts/l2-induction.js"; import { associateTraces } from "./associate.js"; import { makeCandidatePool } from "./candidate-pool.js"; import { buildPolicyRow, induceDraft } from "./induce.js"; -import { applyGain, computeGain, smoothGain } from "./gain.js"; +import { applyGain, computeGain, nextStatus, smoothGain } from "./gain.js"; import { signatureOf } from "./signature.js"; import { tracePolicySimilarity } from "./similarity.js"; import type { @@ -455,6 +455,47 @@ export async function runL2( timings.gain = Date.now() - t0; } + // ─── Step 5: Re-evaluate untouched candidates ──────────────────────── + // A candidate policy that was induced in a previous episode but no longer + // matches any trace will never enter `touched` and therefore never have + // `nextStatus()` run against it. Without this sweep, it stays `candidate` + // forever even though its stored gain/support already satisfy the + // promotion thresholds. + { + const untouchedCandidates = repos.policies.list({ status: "candidate" }); + for (const policy of untouchedCandidates) { + if (touched.has(policy.id)) continue; // already handled in Step 4 + const next = nextStatus({ + currentStatus: policy.status, + support: policy.support, + gain: policy.gain, + thresholds, + }); + if (next !== policy.status) { + repos.policies.updateStats(policy.id, { + support: policy.support, + gain: policy.gain, + status: next, + updatedAt: input.now ?? Date.now(), + }); + emit(bus, { + kind: "l2.policy.updated", + episodeId: input.episodeId, + policyId: policy.id, + status: next, + support: policy.support, + gain: policy.gain, + }); + log.info("run.recheck_candidate_promoted", { + policyId: policy.id, + status: next, + support: policy.support, + gain: policy.gain, + }); + } + } + } + timings.persist = 0; // reserved for future split const completedAt = Date.now(); timings.total = completedAt - startedAt;