diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..306630637 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -304,11 +304,29 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov is deferred to ``_ensure_episode()`` (called from the first ``on_turn_start``), so the actual user message can be passed as the episode's initial text instead of a generic placeholder. + + Idempotency (issue #1910): the host occasionally re-enters + ``initialize()`` on the same provider instance (plugin reload, + session restart). Without the close-before-spawn guard below the + previous ``MemosBridgeClient`` would be replaced by reference + only, orphaning the previous Node subprocess and accumulating + a fresh ``bridge.cjs`` per turn. """ self._session_id = session_id or self._session_id self._hermes_home = str(kwargs.get("hermes_home") or "") self._platform = str(kwargs.get("platform") or "cli") self._agent_identity = str(kwargs.get("agent_identity") or "hermes") + if self._bridge is not None: + prev_pid = getattr(self._bridge, "pid", "?") + logger.info( + "MemOS: initialize() invoked while bridge already exists; " + "closing previous bridge (pid=%s) before respawn", + prev_pid, + ) + old_bridge = self._bridge + self._bridge = None + with contextlib.suppress(Exception): + old_bridge.close() try: ensure_bridge_running() except Exception as err: diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index 5b986ec66..b6f6e0c53 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -32,6 +32,18 @@ HOST_HANDLER_WAIT_SECONDS = 5.0 +# ─── Module-level singleton tracker ───────────────────────────────────── +# Each entry maps a ``(agent, no_viewer)`` key to the most-recent active +# ``MemosBridgeClient`` for that slot. When a new client is constructed +# for an existing key, the previous client is closed synchronously so the +# Node-side ``bridge.cjs`` subprocess does not leak. +# +# This is the Python-side guard against issue #1910 (bridge process leak: +# every turn spawns new bridge.cjs). Defence in depth on the Node side +# lives in ``bridge.cts`` via ``bridge-stdio.pid``. +_ACTIVE_CLIENTS: dict[tuple[str, bool], MemosBridgeClient] = {} +_ACTIVE_CLIENTS_LOCK = threading.Lock() + def _installed_node_binary(plugin_root: Path) -> str | None: marker = plugin_root / ".memos-node-bin" @@ -156,6 +168,40 @@ def __init__( ) self._stderr_reader.start() + # Singleton tracking (issue #1910). Register ourselves as the + # active client for ``(agent, no_viewer)`` and reap any previous + # holder synchronously so its subprocess does not leak. The reap + # happens AFTER our reader threads are running, so the previous + # client's ``close()`` (which closes stdin and waits for exit) + # cannot interfere with our own startup. + self._singleton_agent = agent + self._singleton_no_viewer = bool(no_viewer) + previous = self._register_active() + if previous is not None and previous is not self: + prev_pid = getattr(previous, "pid", "?") + logger.info( + "MemOS: closing previous bridge client (pid=%s) before adopting new one (pid=%s)", + prev_pid, + self.pid, + ) + with contextlib.suppress(Exception): + previous.close() + + def _register_active(self) -> MemosBridgeClient | None: + """Register self as the active singleton; return the displaced client.""" + key = (self._singleton_agent, self._singleton_no_viewer) + with _ACTIVE_CLIENTS_LOCK: + previous = _ACTIVE_CLIENTS.get(key) + _ACTIVE_CLIENTS[key] = self + return previous + + def _unregister_active(self) -> None: + """Remove self from the active registry if we are still the current entry.""" + key = (self._singleton_agent, self._singleton_no_viewer) + with _ACTIVE_CLIENTS_LOCK: + if _ACTIVE_CLIENTS.get(key) is self: + _ACTIVE_CLIENTS.pop(key, None) + @property def pid(self) -> int: """Return the PID of the bridge subprocess.""" @@ -244,6 +290,12 @@ def close(self) -> None: self._closed = True self._host_handlers_cv.notify_all() + # Drop self from the module-level singleton tracker (issue #1910) + # BEFORE the potentially-slow stdin/SIGTERM/SIGKILL dance. We + # only evict the registry slot if we still own it — a newer + # client that displaced us must remain reachable. + self._unregister_active() + pid = self.pid # 1. Close stdin (triggers bridge's graceful exit) diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 81848acf7..13e00d045 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -70,19 +70,22 @@ function parseArgs(argv: readonly string[]): BridgeArgs { // ─── PID file singleton guard ─────────────────────────────────────────── // Prevents bridge process accumulation: each new bridge that wants to // own the viewer port kills the previous holder via its PID file. -// `--no-viewer` (headless) bridges skip this PID file entirely — they don't -// need the port and should coexist with the daemon that owns it. +// `--no-viewer` (headless) bridges use a SEPARATE PID file so they can +// reap their own predecessors without colliding with the viewer daemon +// that owns the port. Without the headless reap, every Hermes turn that +// respawns the Python adapter leaks an old bridge.cjs (issue #1910). const PID_FILENAME = "bridge.pid"; +const STDIO_PID_FILENAME = "bridge-stdio.pid"; -function pidFilePath(agent: string): string { +function pidFilePath(agent: string, filename: string = PID_FILENAME): string { const agentHome = agent === "hermes" ? ".hermes" : ".openclaw"; return path.join( process.env.HOME ?? "/tmp", agentHome, "memos-plugin", "daemon", - PID_FILENAME, + filename, ); } @@ -146,14 +149,24 @@ async function main(): Promise { // ─── Singleton: kill previous bridge that owns the viewer port ─── const pidPath = pidFilePath(args.agent); + const stdioPidPath = pidFilePath(args.agent, STDIO_PID_FILENAME); const ownsViewerPort = args.daemon || !args.noViewer; const removeOwnedPidFile = () => { if (ownsViewerPort) removePidFile(pidPath); + // Headless bridges own a separate PID slot; remove it on exit too. + if (args.noViewer) removePidFile(stdioPidPath); }; if (ownsViewerPort) { killExistingBridge(pidPath); writePidFile(pidPath); } + if (args.noViewer) { + // Reap any previous --no-viewer bridge for this agent. This is the + // headless counterpart to the viewer-port singleton above and the + // Node-side defense against issue #1910 (bridge process leak). + killExistingBridge(stdioPidPath); + writePidFile(stdioPidPath); + } // Lazy-import ESM core. Using dynamic import so this file remains // CommonJS and stays `require`-able. diff --git a/apps/memos-local-plugin/tests/python/test_bridge_client.py b/apps/memos-local-plugin/tests/python/test_bridge_client.py index b47d5660e..2becc4992 100644 --- a/apps/memos-local-plugin/tests/python/test_bridge_client.py +++ b/apps/memos-local-plugin/tests/python/test_bridge_client.py @@ -256,12 +256,16 @@ def _factory(*args, **kwargs): ) self._popen_patch.start() self._which_patch.start() + # Hermetic singleton: clear any tracking left over from prior tests + # so each test starts with a fresh module-level registry. + bridge_client_mod._ACTIVE_CLIENTS.clear() def tearDown(self) -> None: if self._fake is not None: self._fake.stdout._done = True self._popen_patch.stop() self._which_patch.stop() + bridge_client_mod._ACTIVE_CLIENTS.clear() def test_request_returns_result_on_success(self) -> None: client = MemosBridgeClient(bridge_path="/tmp/bridge.cts") @@ -295,6 +299,45 @@ def test_close_is_idempotent(self) -> None: client.close() client.close() # second call must not raise + def test_module_singleton_closes_previous_client_same_agent(self) -> None: + """Constructing a second client with the same agent must reap the first. + + Regression for issue #1910: each turn the Hermes adapter could + spawn a fresh bridge subprocess without closing its predecessor, + accumulating 4+ processes per session. The singleton tracker in + ``MemosBridgeClient`` prevents that by closing any active client + for the same ``(agent, no_viewer)`` slot at construction time. + """ + first = MemosBridgeClient(bridge_path="/tmp/bridge.cts") + self.assertFalse(first._closed) + second = MemosBridgeClient(bridge_path="/tmp/bridge.cts") + # The new constructor must have reaped the previous one. + self.assertTrue(first._closed) + self.assertFalse(second._closed) + second.close() + self.assertTrue(second._closed) + + def test_module_singleton_independent_for_distinct_agents(self) -> None: + """A bridge for a different agent must not reap an unrelated bridge.""" + hermes = MemosBridgeClient(bridge_path="/tmp/bridge.cts", agent="hermes") + openclaw = MemosBridgeClient(bridge_path="/tmp/bridge.cts", agent="openclaw") + self.assertFalse(hermes._closed) + self.assertFalse(openclaw._closed) + hermes.close() + openclaw.close() + + def test_close_unregisters_active_client_only_when_still_current(self) -> None: + """A stale close() must not evict the newer registered client.""" + first = MemosBridgeClient(bridge_path="/tmp/bridge.cts") + second = MemosBridgeClient(bridge_path="/tmp/bridge.cts") + # First was already closed by second's __init__. Closing it again is + # a no-op and must not touch the registry's current entry (second). + first.close() + key = (second._singleton_agent, second._singleton_no_viewer) + self.assertIs(bridge_client_mod._ACTIVE_CLIENTS.get(key), second) + second.close() + self.assertIsNone(bridge_client_mod._ACTIVE_CLIENTS.get(key)) + def test_stdio_bridge_starts_without_viewer_by_default(self) -> None: client = MemosBridgeClient(bridge_path="/tmp/bridge.cts") assert self._fake is not None @@ -396,6 +439,49 @@ def test_handle_tool_call_fails_gracefully_without_bridge(self) -> None: parsed = json.loads(res) self.assertIn("error", parsed) + def test_initialize_closes_pre_existing_bridge(self) -> None: + """Calling initialize twice must reap the previous bridge. + + Regression for #1910: every Hermes turn could call `initialize()` + a second time (re-entry from the host plugin loader), overwriting + `self._bridge` and leaking the previous Node subprocess. + """ + + class TrackedBridge: + def __init__(self) -> None: + self.closed = False + self.pid = 4242 + + def register_host_handler(self, *_a, **_kw) -> None: # pragma: no cover + pass + + def request(self, method, params=None, **_kwargs): + if method == "session.open": + return {"sessionId": (params or {}).get("sessionId", "sess")} + return {} + + def close(self) -> None: + self.closed = True + + p = self._provider_mod.MemTensorProvider() + + first = TrackedBridge() + second = TrackedBridge() + constructed: list[TrackedBridge] = [first, second] + + def _factory(*_a, **_kw) -> TrackedBridge: + return constructed.pop(0) + + with patch("memos_provider.MemosBridgeClient", side_effect=_factory): + p.initialize("sess-A", hermes_home="/tmp/h", platform="cli") + self.assertIs(p._bridge, first) + p.initialize("sess-A", hermes_home="/tmp/h", platform="cli") + # The second initialize must close the first bridge before + # adopting the new one; otherwise the previous subprocess + # leaks (issue #1910). + self.assertTrue(first.closed) + self.assertIs(p._bridge, second) + def test_handle_tool_call_routes_all_exposed_tools(self) -> None: p = self._provider_mod.MemTensorProvider() bridge = RecordingBridge()