From 5a0a3fc39696f2e4273b1180977417ceaf516261 Mon Sep 17 00:00:00 2001 From: MemTensor AutoDev Date: Tue, 16 Jun 2026 01:07:03 +0800 Subject: [PATCH] fix(memos-local-plugin): close previous bridge in initialize() to stop process leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hermes calls `MemTensorProvider.initialize()` on every reconnect / new session. Each call spawned a fresh `MemosBridgeClient` (and therefore a new `--no-viewer` Node subprocess) but never closed the previous one referenced by `self._bridge`. The orphaned bridges accumulated at ~93 MB each, indefinitely — the headless-bridge reaper does not catch them because their parent (Hermes dashboard / gateway) stays alive. Close any pre-existing `self._bridge` at the top of `initialize()`, mirroring the safe pattern already used by `_reconnect_bridge()` (swallowing exceptions so a stuck Node subprocess never blocks re-init). Added 3 regression tests covering: the leak scenario, the no-previous-bridge happy path, and isolation when the old bridge's close() raises. Fixes #1927 --- .../hermes/memos_provider/__init__.py | 19 ++++ .../python/test_hermes_provider_pipeline.py | 99 +++++++++++++++++++ 2 files changed, 118 insertions(+) diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py index fd9d0d827..25c22ea96 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py @@ -305,6 +305,25 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov ``on_turn_start``), so the actual user message can be passed as the episode's initial text instead of a generic placeholder. """ + # Hermes can call `initialize()` multiple times across a single + # parent process (e.g. on reconnect or a new session). Each call + # spawns a fresh `MemosBridgeClient` (and therefore a new + # `--no-viewer` Node subprocess); if we simply overwrite + # `self._bridge` we leak the old subprocess — its parent stays + # alive so `_reap_stale_headless_bridges_locked` never collects + # it. Close the previous bridge first, mirroring the safe pattern + # already used by `_reconnect_bridge()`. See #1927. + previous_bridge = self._bridge + if previous_bridge is not None: + old_pid = getattr(previous_bridge, "pid", "?") + logger.info( + "MemOS: closing previous bridge (pid=%s) before re-init", + old_pid, + ) + with contextlib.suppress(Exception): + previous_bridge.close() + self._bridge = None + self._session_id = session_id or self._session_id self._hermes_home = str(kwargs.get("hermes_home") or "") self._platform = str(kwargs.get("platform") or "cli") diff --git a/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py b/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py index 149cea586..1c8cb6a6c 100644 --- a/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py +++ b/apps/memos-local-plugin/tests/python/test_hermes_provider_pipeline.py @@ -123,6 +123,105 @@ def test_lifecycle_persists_turn_and_closes_real_episode(self) -> None: self.assertTrue(bridge.closed) + def test_initialize_closes_previous_bridge_before_spawning_new_one(self) -> None: + """Regression for #1927: re-calling ``initialize()`` must close the + previously-spawned bridge instead of leaking it. + + Hermes calls ``initialize()`` on every reconnect / new session. + Before the fix, each call replaced ``self._bridge`` with a fresh + ``MemosBridgeClient`` (and thus a new ``--no-viewer`` Node + subprocess) without closing the old one, leaking ~93 MB per call. + """ + first_bridge = FakeBridge() + second_bridge = FakeBridge() + bridge_attempts = [first_bridge, second_bridge] + + with ( + patch("memos_provider.ensure_bridge_running", return_value=True), + patch("memos_provider.ensure_viewer_daemon", return_value=True), + patch( + "memos_provider.MemosBridgeClient", + side_effect=lambda: bridge_attempts.pop(0), + ), + ): + provider = memos_provider.MemTensorProvider() + provider.initialize("session-1") + self.assertIs(provider._bridge, first_bridge) + self.assertFalse(first_bridge.closed) + + # Second initialize (e.g. reconnect / new Hermes session) must + # close the previous bridge before allocating a new one. + provider.initialize("session-2") + self.assertTrue( + first_bridge.closed, + "previous bridge was not closed — leak (#1927)", + ) + self.assertIs(provider._bridge, second_bridge) + self.assertFalse(second_bridge.closed) + + provider.shutdown() + + # And the second bridge must be cleaned up by shutdown(), so we + # know we did not somehow drop the new reference along the way. + self.assertTrue(second_bridge.closed) + + def test_initialize_when_no_previous_bridge_does_not_call_close(self) -> None: + """First-ever ``initialize()`` must not blow up on the missing + previous bridge — it should just spawn one.""" + bridge = FakeBridge() + with ( + patch("memos_provider.ensure_bridge_running", return_value=True), + patch("memos_provider.ensure_viewer_daemon", return_value=True), + patch("memos_provider.MemosBridgeClient", return_value=bridge), + ): + provider = memos_provider.MemTensorProvider() + self.assertIsNone(provider._bridge) + + provider.initialize("fresh-session") + self.assertIs(provider._bridge, bridge) + self.assertFalse(bridge.closed) + + provider.shutdown() + + self.assertTrue(bridge.closed) + + def test_initialize_swallows_exception_from_old_bridge_close(self) -> None: + """If the previous bridge's ``close()`` raises (e.g. stuck Node + subprocess), ``initialize()`` must still allocate the new bridge + and proceed — never leak just because cleanup is flaky.""" + + class StuckCloseBridge(FakeBridge): + def close(self) -> None: + # Mark closed so we can still assert it was attempted, + # but raise to mimic a misbehaving subprocess teardown. + self.closed = True + raise RuntimeError("simulated stuck bridge close") + + stuck_bridge = StuckCloseBridge() + healthy_bridge = FakeBridge() + bridge_attempts = [stuck_bridge, healthy_bridge] + + with ( + patch("memos_provider.ensure_bridge_running", return_value=True), + patch("memos_provider.ensure_viewer_daemon", return_value=True), + patch( + "memos_provider.MemosBridgeClient", + side_effect=lambda: bridge_attempts.pop(0), + ), + ): + provider = memos_provider.MemTensorProvider() + provider.initialize("session-1") + self.assertIs(provider._bridge, stuck_bridge) + + # Must not propagate the close() failure to the caller. + provider.initialize("session-2") + self.assertTrue(stuck_bridge.closed) + self.assertIs(provider._bridge, healthy_bridge) + + provider.shutdown() + + self.assertTrue(healthy_bridge.closed) + def test_sync_turn_recovers_when_initial_bridge_open_timed_out(self) -> None: failed_bridge = FailingSessionOpenBridge() recovered_bridge = FakeBridge()