Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,25 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
``on_turn_start``), so the actual user message can be passed as
the episode's initial text instead of a generic placeholder.
"""
# Hermes can call `initialize()` multiple times across a single
# parent process (e.g. on reconnect or a new session). Each call
# spawns a fresh `MemosBridgeClient` (and therefore a new
# `--no-viewer` Node subprocess); if we simply overwrite
# `self._bridge` we leak the old subprocess — its parent stays
# alive so `_reap_stale_headless_bridges_locked` never collects
# it. Close the previous bridge first, mirroring the safe pattern
# already used by `_reconnect_bridge()`. See #1927.
previous_bridge = self._bridge
if previous_bridge is not None:
old_pid = getattr(previous_bridge, "pid", "?")
logger.info(
"MemOS: closing previous bridge (pid=%s) before re-init",
old_pid,
)
with contextlib.suppress(Exception):
previous_bridge.close()
self._bridge = None

self._session_id = session_id or self._session_id
self._hermes_home = str(kwargs.get("hermes_home") or "")
self._platform = str(kwargs.get("platform") or "cli")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,105 @@ def test_lifecycle_persists_turn_and_closes_real_episode(self) -> None:

self.assertTrue(bridge.closed)

def test_initialize_closes_previous_bridge_before_spawning_new_one(self) -> None:
"""Regression for #1927: re-calling ``initialize()`` must close the
previously-spawned bridge instead of leaking it.

Hermes calls ``initialize()`` on every reconnect / new session.
Before the fix, each call replaced ``self._bridge`` with a fresh
``MemosBridgeClient`` (and thus a new ``--no-viewer`` Node
subprocess) without closing the old one, leaking ~93 MB per call.
"""
first_bridge = FakeBridge()
second_bridge = FakeBridge()
bridge_attempts = [first_bridge, second_bridge]

with (
patch("memos_provider.ensure_bridge_running", return_value=True),
patch("memos_provider.ensure_viewer_daemon", return_value=True),
patch(
"memos_provider.MemosBridgeClient",
side_effect=lambda: bridge_attempts.pop(0),
),
):
provider = memos_provider.MemTensorProvider()
provider.initialize("session-1")
self.assertIs(provider._bridge, first_bridge)
self.assertFalse(first_bridge.closed)

# Second initialize (e.g. reconnect / new Hermes session) must
# close the previous bridge before allocating a new one.
provider.initialize("session-2")
self.assertTrue(
first_bridge.closed,
"previous bridge was not closed — leak (#1927)",
)
self.assertIs(provider._bridge, second_bridge)
self.assertFalse(second_bridge.closed)

provider.shutdown()

# And the second bridge must be cleaned up by shutdown(), so we
# know we did not somehow drop the new reference along the way.
self.assertTrue(second_bridge.closed)

def test_initialize_when_no_previous_bridge_does_not_call_close(self) -> None:
"""First-ever ``initialize()`` must not blow up on the missing
previous bridge — it should just spawn one."""
bridge = FakeBridge()
with (
patch("memos_provider.ensure_bridge_running", return_value=True),
patch("memos_provider.ensure_viewer_daemon", return_value=True),
patch("memos_provider.MemosBridgeClient", return_value=bridge),
):
provider = memos_provider.MemTensorProvider()
self.assertIsNone(provider._bridge)

provider.initialize("fresh-session")
self.assertIs(provider._bridge, bridge)
self.assertFalse(bridge.closed)

provider.shutdown()

self.assertTrue(bridge.closed)

def test_initialize_swallows_exception_from_old_bridge_close(self) -> None:
"""If the previous bridge's ``close()`` raises (e.g. stuck Node
subprocess), ``initialize()`` must still allocate the new bridge
and proceed — never leak just because cleanup is flaky."""

class StuckCloseBridge(FakeBridge):
def close(self) -> None:
# Mark closed so we can still assert it was attempted,
# but raise to mimic a misbehaving subprocess teardown.
self.closed = True
raise RuntimeError("simulated stuck bridge close")

stuck_bridge = StuckCloseBridge()
healthy_bridge = FakeBridge()
bridge_attempts = [stuck_bridge, healthy_bridge]

with (
patch("memos_provider.ensure_bridge_running", return_value=True),
patch("memos_provider.ensure_viewer_daemon", return_value=True),
patch(
"memos_provider.MemosBridgeClient",
side_effect=lambda: bridge_attempts.pop(0),
),
):
provider = memos_provider.MemTensorProvider()
provider.initialize("session-1")
self.assertIs(provider._bridge, stuck_bridge)

# Must not propagate the close() failure to the caller.
provider.initialize("session-2")
self.assertTrue(stuck_bridge.closed)
self.assertIs(provider._bridge, healthy_bridge)

provider.shutdown()

self.assertTrue(healthy_bridge.closed)

def test_sync_turn_recovers_when_initial_bridge_open_timed_out(self) -> None:
failed_bridge = FailingSessionOpenBridge()
recovered_bridge = FakeBridge()
Expand Down
Loading