Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,29 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
is deferred to ``_ensure_episode()`` (called from the first
``on_turn_start``), so the actual user message can be passed as
the episode's initial text instead of a generic placeholder.

Idempotency (issue #1910): the host occasionally re-enters
``initialize()`` on the same provider instance (plugin reload,
session restart). Without the close-before-spawn guard below the
previous ``MemosBridgeClient`` would be replaced by reference
only, orphaning the previous Node subprocess and accumulating
a fresh ``bridge.cjs`` per turn.
"""
self._session_id = session_id or self._session_id
self._hermes_home = str(kwargs.get("hermes_home") or "")
self._platform = str(kwargs.get("platform") or "cli")
self._agent_identity = str(kwargs.get("agent_identity") or "hermes")
if self._bridge is not None:
prev_pid = getattr(self._bridge, "pid", "?")
logger.info(
"MemOS: initialize() invoked while bridge already exists; "
"closing previous bridge (pid=%s) before respawn",
prev_pid,
)
old_bridge = self._bridge
self._bridge = None
with contextlib.suppress(Exception):
old_bridge.close()
try:
ensure_bridge_running()
except Exception as err:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@

HOST_HANDLER_WAIT_SECONDS = 5.0

# ─── Module-level singleton tracker ─────────────────────────────────────
# Each entry maps a ``(agent, no_viewer)`` key to the most-recent active
# ``MemosBridgeClient`` for that slot. When a new client is constructed
# for an existing key, the previous client is closed synchronously so the
# Node-side ``bridge.cjs`` subprocess does not leak.
#
# This is the Python-side guard against issue #1910 (bridge process leak:
# every turn spawns new bridge.cjs). Defence in depth on the Node side
# lives in ``bridge.cts`` via ``bridge-stdio.pid``.
_ACTIVE_CLIENTS: dict[tuple[str, bool], MemosBridgeClient] = {}
_ACTIVE_CLIENTS_LOCK = threading.Lock()


def _installed_node_binary(plugin_root: Path) -> str | None:
marker = plugin_root / ".memos-node-bin"
Expand Down Expand Up @@ -156,6 +168,40 @@ def __init__(
)
self._stderr_reader.start()

# Singleton tracking (issue #1910). Register ourselves as the
# active client for ``(agent, no_viewer)`` and reap any previous
# holder synchronously so its subprocess does not leak. The reap
# happens AFTER our reader threads are running, so the previous
# client's ``close()`` (which closes stdin and waits for exit)
# cannot interfere with our own startup.
self._singleton_agent = agent
self._singleton_no_viewer = bool(no_viewer)
previous = self._register_active()
if previous is not None and previous is not self:
prev_pid = getattr(previous, "pid", "?")
logger.info(
"MemOS: closing previous bridge client (pid=%s) before adopting new one (pid=%s)",
prev_pid,
self.pid,
)
with contextlib.suppress(Exception):
previous.close()

def _register_active(self) -> MemosBridgeClient | None:
"""Register self as the active singleton; return the displaced client."""
key = (self._singleton_agent, self._singleton_no_viewer)
with _ACTIVE_CLIENTS_LOCK:
previous = _ACTIVE_CLIENTS.get(key)
_ACTIVE_CLIENTS[key] = self
return previous

def _unregister_active(self) -> None:
"""Remove self from the active registry if we are still the current entry."""
key = (self._singleton_agent, self._singleton_no_viewer)
with _ACTIVE_CLIENTS_LOCK:
if _ACTIVE_CLIENTS.get(key) is self:
_ACTIVE_CLIENTS.pop(key, None)

@property
def pid(self) -> int:
"""Return the PID of the bridge subprocess."""
Expand Down Expand Up @@ -244,6 +290,12 @@ def close(self) -> None:
self._closed = True
self._host_handlers_cv.notify_all()

# Drop self from the module-level singleton tracker (issue #1910)
# BEFORE the potentially-slow stdin/SIGTERM/SIGKILL dance. We
# only evict the registry slot if we still own it — a newer
# client that displaced us must remain reachable.
self._unregister_active()

pid = self.pid

# 1. Close stdin (triggers bridge's graceful exit)
Expand Down
21 changes: 17 additions & 4 deletions apps/memos-local-plugin/bridge.cts
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,22 @@ function parseArgs(argv: readonly string[]): BridgeArgs {
// ─── PID file singleton guard ───────────────────────────────────────────
// Prevents bridge process accumulation: each new bridge that wants to
// own the viewer port kills the previous holder via its PID file.
// `--no-viewer` (headless) bridges skip this PID file entirely — they don't
// need the port and should coexist with the daemon that owns it.
// `--no-viewer` (headless) bridges use a SEPARATE PID file so they can
// reap their own predecessors without colliding with the viewer daemon
// that owns the port. Without the headless reap, every Hermes turn that
// respawns the Python adapter leaks an old bridge.cjs (issue #1910).

const PID_FILENAME = "bridge.pid";
const STDIO_PID_FILENAME = "bridge-stdio.pid";

function pidFilePath(agent: string): string {
function pidFilePath(agent: string, filename: string = PID_FILENAME): string {
const agentHome = agent === "hermes" ? ".hermes" : ".openclaw";
return path.join(
process.env.HOME ?? "/tmp",
agentHome,
"memos-plugin",
"daemon",
PID_FILENAME,
filename,
);
}

Expand Down Expand Up @@ -146,14 +149,24 @@ async function main(): Promise<void> {

// ─── Singleton: kill previous bridge that owns the viewer port ───
const pidPath = pidFilePath(args.agent);
const stdioPidPath = pidFilePath(args.agent, STDIO_PID_FILENAME);
const ownsViewerPort = args.daemon || !args.noViewer;
const removeOwnedPidFile = () => {
if (ownsViewerPort) removePidFile(pidPath);
// Headless bridges own a separate PID slot; remove it on exit too.
if (args.noViewer) removePidFile(stdioPidPath);
};
if (ownsViewerPort) {
killExistingBridge(pidPath);
writePidFile(pidPath);
}
if (args.noViewer) {
// Reap any previous --no-viewer bridge for this agent. This is the
// headless counterpart to the viewer-port singleton above and the
// Node-side defense against issue #1910 (bridge process leak).
killExistingBridge(stdioPidPath);
writePidFile(stdioPidPath);
}

// Lazy-import ESM core. Using dynamic import so this file remains
// CommonJS and stays `require`-able.
Expand Down
86 changes: 86 additions & 0 deletions apps/memos-local-plugin/tests/python/test_bridge_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,16 @@ def _factory(*args, **kwargs):
)
self._popen_patch.start()
self._which_patch.start()
# Hermetic singleton: clear any tracking left over from prior tests
# so each test starts with a fresh module-level registry.
bridge_client_mod._ACTIVE_CLIENTS.clear()

def tearDown(self) -> None:
if self._fake is not None:
self._fake.stdout._done = True
self._popen_patch.stop()
self._which_patch.stop()
bridge_client_mod._ACTIVE_CLIENTS.clear()

def test_request_returns_result_on_success(self) -> None:
client = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
Expand Down Expand Up @@ -295,6 +299,45 @@ def test_close_is_idempotent(self) -> None:
client.close()
client.close() # second call must not raise

def test_module_singleton_closes_previous_client_same_agent(self) -> None:
"""Constructing a second client with the same agent must reap the first.

Regression for issue #1910: each turn the Hermes adapter could
spawn a fresh bridge subprocess without closing its predecessor,
accumulating 4+ processes per session. The singleton tracker in
``MemosBridgeClient`` prevents that by closing any active client
for the same ``(agent, no_viewer)`` slot at construction time.
"""
first = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
self.assertFalse(first._closed)
second = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
# The new constructor must have reaped the previous one.
self.assertTrue(first._closed)
self.assertFalse(second._closed)
second.close()
self.assertTrue(second._closed)

def test_module_singleton_independent_for_distinct_agents(self) -> None:
"""A bridge for a different agent must not reap an unrelated bridge."""
hermes = MemosBridgeClient(bridge_path="/tmp/bridge.cts", agent="hermes")
openclaw = MemosBridgeClient(bridge_path="/tmp/bridge.cts", agent="openclaw")
self.assertFalse(hermes._closed)
self.assertFalse(openclaw._closed)
hermes.close()
openclaw.close()

def test_close_unregisters_active_client_only_when_still_current(self) -> None:
"""A stale close() must not evict the newer registered client."""
first = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
second = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
# First was already closed by second's __init__. Closing it again is
# a no-op and must not touch the registry's current entry (second).
first.close()
key = (second._singleton_agent, second._singleton_no_viewer)
self.assertIs(bridge_client_mod._ACTIVE_CLIENTS.get(key), second)
second.close()
self.assertIsNone(bridge_client_mod._ACTIVE_CLIENTS.get(key))

def test_stdio_bridge_starts_without_viewer_by_default(self) -> None:
client = MemosBridgeClient(bridge_path="/tmp/bridge.cts")
assert self._fake is not None
Expand Down Expand Up @@ -396,6 +439,49 @@ def test_handle_tool_call_fails_gracefully_without_bridge(self) -> None:
parsed = json.loads(res)
self.assertIn("error", parsed)

def test_initialize_closes_pre_existing_bridge(self) -> None:
"""Calling initialize twice must reap the previous bridge.

Regression for #1910: every Hermes turn could call `initialize()`
a second time (re-entry from the host plugin loader), overwriting
`self._bridge` and leaking the previous Node subprocess.
"""

class TrackedBridge:
def __init__(self) -> None:
self.closed = False
self.pid = 4242

def register_host_handler(self, *_a, **_kw) -> None: # pragma: no cover
pass

def request(self, method, params=None, **_kwargs):
if method == "session.open":
return {"sessionId": (params or {}).get("sessionId", "sess")}
return {}

def close(self) -> None:
self.closed = True

p = self._provider_mod.MemTensorProvider()

first = TrackedBridge()
second = TrackedBridge()
constructed: list[TrackedBridge] = [first, second]

def _factory(*_a, **_kw) -> TrackedBridge:
return constructed.pop(0)

with patch("memos_provider.MemosBridgeClient", side_effect=_factory):
p.initialize("sess-A", hermes_home="/tmp/h", platform="cli")
self.assertIs(p._bridge, first)
p.initialize("sess-A", hermes_home="/tmp/h", platform="cli")
# The second initialize must close the first bridge before
# adopting the new one; otherwise the previous subprocess
# leaks (issue #1910).
self.assertTrue(first.closed)
self.assertIs(p._bridge, second)

def test_handle_tool_call_routes_all_exposed_tools(self) -> None:
p = self._provider_mod.MemTensorProvider()
bridge = RecordingBridge()
Expand Down
Loading