Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import re
import sys
import threading
import weakref
import time

from pathlib import Path
Expand Down Expand Up @@ -1509,6 +1510,21 @@ def on_session_end(self, messages: list[dict[str, Any]]) -> None: # type: ignor
with contextlib.suppress(Exception):
self._bridge.request("session.close", {"sessionId": self._session_id})

def __del__(self) -> None:
# Safety net — if shutdown() was never called (e.g. caller forgot,
# Hermes agent routed model change with self.agent = None), clean
# up the bridge subprocess and keepalive thread on GC.
if self._bridge is not None or (
self._bridge_keepalive_thread is not None
and self._bridge_keepalive_thread.is_alive()
):
logger.warning(
"MemOS: __del__ cleaning up leaked provider "
"— shutdown() was never called"
)
with contextlib.suppress(Exception):
self.shutdown()

def shutdown(self) -> None: # type: ignore[override]
self._bridge_keepalive_stop.set()
if self._bridge_keepalive_thread and self._bridge_keepalive_thread.is_alive():
Expand Down Expand Up @@ -1790,20 +1806,37 @@ def _start_bridge_keepalive(self) -> None:
return
self._bridge_keepalive_stop.clear()

_self_ref = weakref.ref(self)

def _run() -> None:
while not self._bridge_keepalive_stop.wait(5.0):
if not self._ensure_bridge(self._session_id, timeout=10.0):
while True:
# Stop signal set (e.g. shutdown called by another thread).
# When self is garbage-collected the weakref resolves to None
# and we exit gracefully instead of keeping the thread + bridge
# subprocess alive forever.
provider = _self_ref()
if provider is None:
break
if provider._bridge_keepalive_stop.wait(5.0):
break
if not provider._ensure_bridge(provider._session_id, timeout=10.0):
continue
try:
assert self._bridge is not None
self._bridge.request("core.health", {}, timeout=10.0)
assert provider._bridge is not None
provider._bridge.request("core.health", {}, timeout=10.0)
except Exception as err:
if self._is_transport_closed(err):
logger.info("MemOS: bridge keepalive reconnecting after transport close")
if provider._is_transport_closed(err):
logger.info(
"MemOS: bridge keepalive reconnecting after transport close"
)
with contextlib.suppress(Exception):
self._reconnect_bridge(self._session_id, timeout=10.0)
provider._reconnect_bridge(
provider._session_id, timeout=10.0
)
else:
logger.debug("MemOS: bridge keepalive failed — %s", err)
logger.debug(
"MemOS: bridge keepalive failed — %s", err
)

self._bridge_keepalive_thread = threading.Thread(
target=_run,
Expand Down
43 changes: 42 additions & 1 deletion apps/memos-local-plugin/core/memory/l2/l2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import { L2_INDUCTION_PROMPT } from "../../llm/prompts/l2-induction.js";
import { associateTraces } from "./associate.js";
import { makeCandidatePool } from "./candidate-pool.js";
import { buildPolicyRow, induceDraft } from "./induce.js";
import { applyGain, computeGain, smoothGain } from "./gain.js";
import { applyGain, computeGain, nextStatus, smoothGain } from "./gain.js";
import { signatureOf } from "./signature.js";
import { tracePolicySimilarity } from "./similarity.js";
import type {
Expand Down Expand Up @@ -455,6 +455,47 @@ export async function runL2(
timings.gain = Date.now() - t0;
}

// ─── Step 5: Re-evaluate untouched candidates ────────────────────────
// A candidate policy that was induced in a previous episode but no longer
// matches any trace will never enter `touched` and therefore never have
// `nextStatus()` run against it. Without this sweep, it stays `candidate`
// forever even though its stored gain/support already satisfy the
// promotion thresholds.
{
const untouchedCandidates = repos.policies.list({ status: "candidate" });
for (const policy of untouchedCandidates) {
if (touched.has(policy.id)) continue; // already handled in Step 4
const next = nextStatus({
currentStatus: policy.status,
support: policy.support,
gain: policy.gain,
thresholds,
});
if (next !== policy.status) {
repos.policies.updateStats(policy.id, {
support: policy.support,
gain: policy.gain,
status: next,
updatedAt: input.now ?? Date.now(),
});
emit(bus, {
kind: "l2.policy.updated",
episodeId: input.episodeId,
policyId: policy.id,
status: next,
support: policy.support,
gain: policy.gain,
});
log.info("run.recheck_candidate_promoted", {
policyId: policy.id,
status: next,
support: policy.support,
gain: policy.gain,
});
}
}
}

timings.persist = 0; // reserved for future split
const completedAt = Date.now();
timings.total = completedAt - startedAt;
Expand Down
1 change: 1 addition & 0 deletions apps/memos-local-plugin/core/pipeline/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ export function createPipeline(deps: PipelineDeps): PipelineHandle {
await subs.l3.drain();
await nextTick();
await subs.skills.flush();
await subs.skills.lifecycleTick();
await subs.feedback.flush();
await embeddingRetryWorker.flush();
}
Expand Down
16 changes: 16 additions & 0 deletions apps/memos-local-plugin/core/skill/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,22 @@ export function shouldArchiveIdle(
return skill.eta < cfg.minEtaForRetrieval;
}

/**
* Decide whether a non-repair candidate skill should skip trial
* and become active immediately. `eta` already encodes the
* gain / support of the source policies (initialised at
* crystallisation and kept current by reward drift), so we
* don't need a separate gain / support gate.
*/
export function shouldPromoteCandidate(
skill: SkillRow,
cfg: SkillConfig,
): boolean {
if (skill.status !== "candidate") return false;
if (skill.repairOrigin) return false;
return skill.eta >= cfg.minEtaForRetrieval;
}

function clamp01(n: number): number {
if (!Number.isFinite(n)) return 0;
if (n < 0) return 0;
Expand Down
16 changes: 15 additions & 1 deletion apps/memos-local-plugin/core/skill/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,5 +207,19 @@ export function attachSkillSubscriber(
}
}

return { dispose, runOnce, applyFeedback, flush };
/** Periodic lifecycle pass: promote eligible candidate skills to active. */
async function lifecycleTick(): Promise<void> {
const candidates = deps.repos.skills.list({ status: "candidate", limit: 500 });
const minEta = deps.config.minEtaForRetrieval ?? 0.1;
for (const s of candidates) {
if (s.status !== "candidate") continue;
if (s.repairOrigin) continue;
if ((s.eta ?? 0) < minEta) continue;
deps.repos.skills.setStatus(s.id, "active");
log.info("skill.auto_promoted", { skillId: s.id, name: s.name, eta: s.eta });
try { deps.bus?.emit({ kind: "skill.status.changed", skillId: s.id, from: "candidate", to: "active", reason: "auto_lifecycle" }); } catch { /* best-effort */ }
}
}

return { dispose, runOnce, applyFeedback, flush, lifecycleTick };
}