diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..a67392ca5 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -51,6 +51,7 @@ import re import sys import threading +import weakref import time from pathlib import Path @@ -1509,6 +1510,21 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor with contextlib.suppress(Exception): self._bridge.request("session.close", {"sessionId": self._session_id}) + def __del__(self) -> None: + # Safety net — if shutdown() was never called (e.g. caller forgot, + # Hermes agent routed model change with self.agent = None), clean + # up the bridge subprocess and keepalive thread on GC. + if self._bridge is not None or ( + self._bridge_keepalive_thread is not None + and self._bridge_keepalive_thread.is_alive() + ): + logger.warning( + "MemOS: __del__ cleaning up leaked provider " + "— shutdown() was never called" + ) + with contextlib.suppress(Exception): + self.shutdown() + def shutdown(self) -> None: # type: ignore[override] self._bridge_keepalive_stop.set() if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive(): @@ -1790,20 +1806,37 @@ def _start_bridge_keepalive(self) -> None: return self._bridge_keepalive_stop.clear() + _self_ref = weakref.ref(self) + def _run() -> None: - while not self._bridge_keepalive_stop.wait(5.0): - if not self._ensure_bridge(self._session_id, timeout=10.0): + while True: + # Stop signal set (e.g. shutdown called by another thread). + # When self is garbage-collected the weakref resolves to None + # and we exit gracefully instead of keeping the thread + bridge + # subprocess alive forever. + provider = _self_ref() + if provider is None: + break + if provider._bridge_keepalive_stop.wait(5.0): + break + if not provider._ensure_bridge(provider._session_id, timeout=10.0): continue try: - assert self._bridge is not None - self._bridge.request("core.health", {}, timeout=10.0) + assert provider._bridge is not None + provider._bridge.request("core.health", {}, timeout=10.0) except Exception as err: - if self._is_transport_closed(err): - logger.info("MemOS: bridge keepalive reconnecting after transport close") + if provider._is_transport_closed(err): + logger.info( + "MemOS: bridge keepalive reconnecting after transport close" + ) with contextlib.suppress(Exception): - self._reconnect_bridge(self._session_id, timeout=10.0) + provider._reconnect_bridge( + provider._session_id, timeout=10.0 + ) else: - logger.debug("MemOS: bridge keepalive failed — %s", err) + logger.debug( + "MemOS: bridge keepalive failed — %s", err + ) self._bridge_keepalive_thread = threading.Thread( target=_run, diff --git a/apps/memos-local-plugin/core/memory/l2/l2.ts b/apps/memos-local-plugin/core/memory/l2/l2.ts index 9c6685409..903502b32 100644 --- a/apps/memos-local-plugin/core/memory/l2/l2.ts +++ b/apps/memos-local-plugin/core/memory/l2/l2.ts @@ -34,7 +34,7 @@ import { L2_INDUCTION_PROMPT } from "../../llm/prompts/l2-induction.js"; import { associateTraces } from "./associate.js"; import { makeCandidatePool } from "./candidate-pool.js"; import { buildPolicyRow, induceDraft } from "./induce.js"; -import { applyGain, computeGain, smoothGain } from "./gain.js"; +import { applyGain, computeGain, nextStatus, smoothGain } from "./gain.js"; import { signatureOf } from "./signature.js"; import { tracePolicySimilarity } from "./similarity.js"; import type { @@ -455,6 +455,47 @@ export async function runL2( timings.gain = Date.now() - t0; } + // ─── Step 5: Re-evaluate untouched candidates ──────────────────────── + // A candidate policy that was induced in a previous episode but no longer + // matches any trace will never enter `touched` and therefore never have + // `nextStatus()` run against it. Without this sweep, it stays `candidate` + // forever even though its stored gain/support already satisfy the + // promotion thresholds. + { + const untouchedCandidates = repos.policies.list({ status: "candidate" }); + for (const policy of untouchedCandidates) { + if (touched.has(policy.id)) continue; // already handled in Step 4 + const next = nextStatus({ + currentStatus: policy.status, + support: policy.support, + gain: policy.gain, + thresholds, + }); + if (next !== policy.status) { + repos.policies.updateStats(policy.id, { + support: policy.support, + gain: policy.gain, + status: next, + updatedAt: input.now ?? Date.now(), + }); + emit(bus, { + kind: "l2.policy.updated", + episodeId: input.episodeId, + policyId: policy.id, + status: next, + support: policy.support, + gain: policy.gain, + }); + log.info("run.recheck_candidate_promoted", { + policyId: policy.id, + status: next, + support: policy.support, + gain: policy.gain, + }); + } + } + } + timings.persist = 0; // reserved for future split const completedAt = Date.now(); timings.total = completedAt - startedAt; diff --git a/apps/memos-local-plugin/core/pipeline/orchestrator.ts b/apps/memos-local-plugin/core/pipeline/orchestrator.ts index 75dc7e244..1e57b4802 100644 --- a/apps/memos-local-plugin/core/pipeline/orchestrator.ts +++ b/apps/memos-local-plugin/core/pipeline/orchestrator.ts @@ -1367,6 +1367,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle { await subs.l3.drain(); await nextTick(); await subs.skills.flush(); + await subs.skills.lifecycleTick(); await subs.feedback.flush(); await embeddingRetryWorker.flush(); } diff --git a/apps/memos-local-plugin/core/skill/lifecycle.ts b/apps/memos-local-plugin/core/skill/lifecycle.ts index 49f43bb02..8a8e6daa1 100644 --- a/apps/memos-local-plugin/core/skill/lifecycle.ts +++ b/apps/memos-local-plugin/core/skill/lifecycle.ts @@ -201,6 +201,22 @@ export function shouldArchiveIdle( return skill.eta < cfg.minEtaForRetrieval; } +/** + * Decide whether a non-repair candidate skill should skip trial + * and become active immediately. `eta` already encodes the + * gain / support of the source policies (initialised at + * crystallisation and kept current by reward drift), so we + * don't need a separate gain / support gate. + */ +export function shouldPromoteCandidate( + skill: SkillRow, + cfg: SkillConfig, +): boolean { + if (skill.status !== "candidate") return false; + if (skill.repairOrigin) return false; + return skill.eta >= cfg.minEtaForRetrieval; +} + function clamp01(n: number): number { if (!Number.isFinite(n)) return 0; if (n < 0) return 0; diff --git a/apps/memos-local-plugin/core/skill/subscriber.ts b/apps/memos-local-plugin/core/skill/subscriber.ts index 52ae6a4f6..664863f92 100644 --- a/apps/memos-local-plugin/core/skill/subscriber.ts +++ b/apps/memos-local-plugin/core/skill/subscriber.ts @@ -207,5 +207,19 @@ export function attachSkillSubscriber( } } - return { dispose, runOnce, applyFeedback, flush }; + /** Periodic lifecycle pass: promote eligible candidate skills to active. */ + async function lifecycleTick(): Promise { + const candidates = deps.repos.skills.list({ status: "candidate", limit: 500 }); + const minEta = deps.config.minEtaForRetrieval ?? 0.1; + for (const s of candidates) { + if (s.status !== "candidate") continue; + if (s.repairOrigin) continue; + if ((s.eta ?? 0) < minEta) continue; + deps.repos.skills.setStatus(s.id, "active"); + log.info("skill.auto_promoted", { skillId: s.id, name: s.name, eta: s.eta }); + try { deps.bus?.emit({ kind: "skill.status.changed", skillId: s.id, from: "candidate", to: "active", reason: "auto_lifecycle" }); } catch { /* best-effort */ } + } + } + + return { dispose, runOnce, applyFeedback, flush, lifecycleTick }; }