diff --git a/binary/src/unbound_hook/setup_cmd.py b/binary/src/unbound_hook/setup_cmd.py index f7cb9fd2..fdbb4fa9 100644 --- a/binary/src/unbound_hook/setup_cmd.py +++ b/binary/src/unbound_hook/setup_cmd.py @@ -147,6 +147,78 @@ def _atomic_write_text(path: Path, text: str) -> None: tmp.write_text(text, encoding="utf-8") os.replace(tmp, path) + +def _command_is_unbound(command, owner_path: str) -> bool: + """True only when a hook command INVOKES the Unbound hook at owner_path — + i.e. the command starts with our path in one of the forms our writers emit: + "" ... (quoted, the binary + Unix python writers) + ... (bare path) + py -3 "" ... (the Windows python writer) + plus the bare Windows-launcher form for tolerance. + + Prefix-anchored on purpose (WEB-4814): an earlier substring match would + mis-claim — and then DELETE — a *foreign* hook whose command merely + references our install path mid-string (e.g. a wrapper that execs our + binary as an argument). Trailing args/version drift after the path are + still tolerated, so idempotency is preserved. + """ + if not isinstance(command, str) or not owner_path: + return False + text = command.lstrip() + quoted = f'"{owner_path}"' + # Forms where our path is the program being run, optionally behind a + # Windows python launcher. Bare-path forms require a trailing boundary + # (whitespace or end) so "/a/foo" doesn't match a command for "/a/foobar". + if text.startswith(quoted): + return True + for launcher in ("py -3 ", "py ", "python "): + if text.startswith(launcher + quoted): + return True + if text == owner_path or text.startswith(owner_path + " "): + return True + for launcher in ("py -3 ", "py ", "python "): + rest = launcher + owner_path + if text == rest or text.startswith(rest + " "): + return True + return False + + +def _entry_is_unbound(entry, owner_substr: str) -> bool: + """True when a Claude-Code-style hooks-array entry (a {matcher?, hooks:[...]} + group) owns at least one command that INVOKES the Unbound hook binary. + + Matching is path-prefix (not exact command equality) so that re-running + setup after a path/version change still recognizes — and replaces — our own + stale entries instead of stacking a duplicate, while refusing to claim a + foreign command that only references our path mid-string (WEB-4814). + """ + if not isinstance(entry, dict): + return False + for hook in entry.get("hooks", []) or []: + if isinstance(hook, dict) and _command_is_unbound(hook.get("command"), owner_substr): + return True + return False + + +def _merge_hooks(existing_hooks, our_hooks, owner_substr: str) -> dict: + """Merge Unbound's per-event hook config into whatever hooks the editor's + managed settings already declared (other tools, hand-authored entries). + + Per event: drop any prior Unbound-owned entry (idempotency — never stack + duplicates of our own hook), preserve everyone else's entries, then append + the current Unbound entry. Fails safe: if the existing hooks block is not a + dict we start from an empty one rather than crashing the installer. + """ + merged = dict(existing_hooks) if isinstance(existing_hooks, dict) else {} + for event, our_entries in our_hooks.items(): + prior = merged.get(event) + if not isinstance(prior, list): + prior = [] + kept = [e for e in prior if not _entry_is_unbound(e, owner_substr)] + merged[event] = kept + list(our_entries) + return merged + + def _claude_hooks_config(): cmd = lambda ev: hook_command_for_event("claude-code", ev) return { @@ -252,7 +324,13 @@ def _write_claude_managed_settings(m) -> bool: if not env: del settings["env"] - settings["hooks"] = _claude_hooks_config() + # Merge our hooks into any pre-existing hooks block instead of + # overwriting it — a blind assignment here clobbers other tools' + # managed hooks (WEB-4814). Idempotent: stale Unbound entries are + # stripped (matched on the binary path) before our current one is + # appended, so re-running setup never stacks duplicates. + settings["hooks"] = _merge_hooks( + settings.get("hooks"), _claude_hooks_config(), str(HOOK_BINARY)) _atomic_write_text(settings_path, json.dumps(settings, indent=2)) gateway_key_helper = managed_dir / "anthropic_key.sh" diff --git a/binary/tests/test_setup_migration.py b/binary/tests/test_setup_migration.py index 7e0db684..3638f4c9 100644 --- a/binary/tests/test_setup_migration.py +++ b/binary/tests/test_setup_migration.py @@ -162,6 +162,133 @@ def test_setup_is_idempotent(env): assert (env["tmp"] / "managed-claude" / "managed-settings.json").read_text() == first +# --------------------------------------------------------------------------- +# WEB-4814: the managed-settings hooks writer must MERGE, not overwrite — a +# blind assignment clobbered hooks belonging to other tools / prior config. +# --------------------------------------------------------------------------- + +def _foreign_pretooluse_entry(): + """A PreToolUse entry owned by some other tool (not the Unbound binary).""" + return {"matcher": "*", "hooks": [ + {"type": "command", "command": "/usr/local/bin/other-tool pre", "timeout": 5}]} + + +def _unbound_entries(settings, event): + return [e for e in settings["hooks"].get(event, []) + if any(BIN in h.get("command", "") for h in e.get("hooks", []))] + + +def _seed_managed_settings(env, hooks): + managed = env["tmp"] / "managed-claude" + managed.mkdir(parents=True, exist_ok=True) + (managed / "managed-settings.json").write_text( + json.dumps({"model": "opus", "hooks": hooks}, indent=2)) + + +def test_setup_preserves_foreign_hooks_and_adds_unbound_once(env): + """A pre-existing foreign PreToolUse hook must survive setup, Unbound's own + PreToolUse entry must be present exactly once, and unrelated top-level keys + must be preserved.""" + foreign = _foreign_pretooluse_entry() + _seed_managed_settings(env, {"PreToolUse": [foreign]}) + + assert setup_cmd.run(["--api-key", "admin-key"]) == 0 + settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text()) + + # (a) foreign entry survives, byte-for-byte + assert foreign in settings["hooks"]["PreToolUse"] + # (b) Unbound's PreToolUse entry present exactly once + unbound = _unbound_entries(settings, "PreToolUse") + assert len(unbound) == 1 + assert unbound[0]["hooks"][0]["command"] == _cmd("claude-code", "PreToolUse") + # unrelated top-level key preserved + assert settings["model"] == "opus" + + +def test_setup_merge_is_idempotent_on_rerun(env): + """Re-running setup over a settings file that already has both a foreign + hook and a prior Unbound hook must NOT duplicate the Unbound entry, and + must keep the foreign one.""" + foreign = _foreign_pretooluse_entry() + stale_unbound = {"matcher": "*", "hooks": [ + {"type": "command", "command": _cmd("claude-code", "PreToolUse"), "timeout": 15000}]} + _seed_managed_settings(env, {"PreToolUse": [foreign, stale_unbound]}) + + assert setup_cmd.run(["--api-key", "admin-key"]) == 0 + settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text()) + assert foreign in settings["hooks"]["PreToolUse"] + assert len(_unbound_entries(settings, "PreToolUse")) == 1 + + # second run is a no-op on the hooks shape + assert setup_cmd.run(["--api-key", "admin-key"]) == 0 + settings2 = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text()) + assert foreign in settings2["hooks"]["PreToolUse"] + assert len(_unbound_entries(settings2, "PreToolUse")) == 1 + + +def test_setup_strips_stale_unbound_hook_with_changed_command(env): + """Idempotency must match by binary path, not exact command string: a prior + Unbound entry whose command differs only in trailing args is replaced, not + stacked.""" + drifted = {"matcher": "*", "hooks": [ + {"type": "command", "command": f'"{BIN}" hook claude-code PreToolUse --old-flag'}]} + _seed_managed_settings(env, {"PreToolUse": [drifted]}) + + assert setup_cmd.run(["--api-key", "admin-key"]) == 0 + settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text()) + unbound = _unbound_entries(settings, "PreToolUse") + assert len(unbound) == 1 + assert unbound[0]["hooks"][0]["command"] == _cmd("claude-code", "PreToolUse") + + +def test_setup_keeps_foreign_hook_that_references_our_path_midcommand(env): + """WEB-4814 ownership tightening: a FOREIGN hook whose command merely + contains our binary path mid-string (a wrapper that execs our binary as an + argument) is NOT Unbound-owned and MUST survive the merge — only commands + that *invoke* our binary (path-prefixed) get stripped-and-replaced. + + Fails on the pre-fix substring matcher (it would delete the wrapper); passes + after. Also asserts the genuine, args-drifted Unbound entry is replaced once. + """ + wrapper = {"matcher": "*", "hooks": [ + {"type": "command", + "command": f'/usr/local/bin/foo "{BIN}" --as-arg pre', "timeout": 5}]} + drifted_unbound = {"matcher": "*", "hooks": [ + {"type": "command", "command": f'"{BIN}" hook claude-code PreToolUse --old'}]} + _seed_managed_settings(env, {"PreToolUse": [wrapper, drifted_unbound]}) + + assert setup_cmd.run(["--api-key", "admin-key"]) == 0 + settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text()) + + entries = settings["hooks"]["PreToolUse"] + # (a) the foreign wrapper survives byte-for-byte despite referencing our path + assert wrapper in entries + # (b) by the PRODUCTION ownership matcher, exactly one entry is ours and it + # is the freshly-emitted (un-drifted) command — the wrapper is NOT ours + owned = [e for e in entries if setup_cmd._entry_is_unbound(e, BIN)] + assert len(owned) == 1 + assert owned[0]["hooks"][0]["command"] == _cmd("claude-code", "PreToolUse") + assert not setup_cmd._entry_is_unbound(wrapper, BIN) + # the drifted form is gone (replaced, not stacked) + cmds = [h["command"] for e in entries for h in e["hooks"]] + assert f'"{BIN}" hook claude-code PreToolUse --old' not in cmds + + +def test_setup_survives_malformed_existing_hooks_block(env): + """Fail-open: a hooks key that isn't a dict must not crash the installer — + Unbound's hooks are written, the malformed value is discarded.""" + managed = env["tmp"] / "managed-claude" + managed.mkdir(parents=True, exist_ok=True) + (managed / "managed-settings.json").write_text( + json.dumps({"model": "opus", "hooks": "garbage-not-a-dict"})) + + assert setup_cmd.run(["--api-key", "admin-key"]) == 0 + settings = json.loads((managed / "managed-settings.json").read_text()) + assert isinstance(settings["hooks"], dict) + assert len(_unbound_entries(settings, "PreToolUse")) == 1 + assert settings["model"] == "opus" + + # --------------------------------------------------------------------------- # WEB-4788 migration sweep fixtures: clean, half-installed, previously-binary # --------------------------------------------------------------------------- diff --git a/claude-code/hooks/mdm/setup.py b/claude-code/hooks/mdm/setup.py index 13d2ca46..719ad460 100644 --- a/claude-code/hooks/mdm/setup.py +++ b/claude-code/hooks/mdm/setup.py @@ -712,6 +712,76 @@ def rewrite_gateway_url_in_file(path: Path, gateway_url: str) -> None: debug_print(f"Could not rewrite gateway URL in {path}: {e}") +def _command_is_unbound(command, owner_path: str) -> bool: + """True only when a hook command INVOKES the Unbound hook at owner_path — + i.e. the command starts with our path in one of the forms our writers emit: + "" ... (quoted, the Unix python writer) + ... (bare path) + py -3 "" ... (the Windows python writer) + plus the bare Windows-launcher form for tolerance. + + Prefix-anchored on purpose (WEB-4814): an earlier substring match would + mis-claim — and then DELETE — a *foreign* hook whose command merely + references our install path mid-string. Trailing args/version drift after + the path are still tolerated, so idempotency is preserved.""" + if not isinstance(command, str) or not owner_path: + return False + text = command.lstrip() + quoted = f'"{owner_path}"' + if text.startswith(quoted): + return True + for launcher in ("py -3 ", "py ", "python "): + if text.startswith(launcher + quoted): + return True + if text == owner_path or text.startswith(owner_path + " "): + return True + for launcher in ("py -3 ", "py ", "python "): + rest = launcher + owner_path + if text == rest or text.startswith(rest + " "): + return True + return False + + +def _entry_is_unbound(entry, owner_substr: str) -> bool: + """True when a hooks-array entry ({matcher?, hooks:[...]}) owns at least + one command that INVOKES our managed hook script. Path-prefix match (not + exact equality) so a changed command form still matches our own entry, + while refusing to claim a foreign command that only references our path + mid-string (WEB-4814).""" + if not isinstance(entry, dict): + return False + for hook in entry.get("hooks", []) or []: + if isinstance(hook, dict) and _command_is_unbound(hook.get("command"), owner_substr): + return True + return False + + +def _merge_hooks(existing_hooks, our_hooks: dict, owner_substr: str) -> dict: + """Merge our per-event hook config into the editor's existing hooks block + instead of overwriting it (WEB-4814). Per event: drop prior Unbound-owned + entries (idempotency), keep everyone else's, append the current one. Fails + safe — a non-dict existing block is treated as empty rather than crashing.""" + merged = dict(existing_hooks) if isinstance(existing_hooks, dict) else {} + for event, our_entries in our_hooks.items(): + prior = merged.get(event) + if not isinstance(prior, list): + prior = [] + kept = [e for e in prior if not _entry_is_unbound(e, owner_substr)] + merged[event] = kept + list(our_entries) + return merged + + +def _atomic_write_text(path: Path, text: str) -> None: + """tmp + os.replace so a crash mid-write never leaves Claude Code reading a + truncated managed-settings.json — which it would silently ignore, dropping + the security control (WEB-4814 LOW-1). The tmp file lives in the same dir + so os.replace is an atomic rename on the same filesystem. File mode is left + to the caller's existing chmod step (unchanged behavior).""" + tmp = path.parent / (path.name + "." + str(os.getpid()) + ".tmp") + tmp.write_text(text, encoding="utf-8") + os.replace(str(tmp), str(path)) + + def setup_managed_hooks(gateway_url: str = DEFAULT_GATEWAY_URL) -> bool: """ Set up system-wide managed hooks for Claude Code. @@ -863,8 +933,14 @@ def _hook(entry: dict) -> dict: ] } - settings["hooks"] = hooks_config - settings_path.write_text(json.dumps(settings, indent=2), encoding="utf-8") + # Merge into any pre-existing hooks block rather than overwriting it, + # so other tools' managed hooks survive setup (WEB-4814). Idempotent: + # stale Unbound entries (matched on the script path, which covers both + # the Unix '"path"' and Windows 'py -3 "path"' command forms) are + # stripped before the current one is appended. + settings["hooks"] = _merge_hooks( + settings.get("hooks"), hooks_config, str(script_path)) + _atomic_write_text(settings_path, json.dumps(settings, indent=2)) debug_print(f"Created managed settings: {settings_path}") # Delete the gateway key helper only after the hooks settings are diff --git a/claude-code/hooks/test_setup.py b/claude-code/hooks/test_setup.py index 7e91f9fb..eaffda84 100644 --- a/claude-code/hooks/test_setup.py +++ b/claude-code/hooks/test_setup.py @@ -1,5 +1,6 @@ import unittest from unittest.mock import patch +import json import os import socket import tempfile @@ -297,5 +298,132 @@ def test_capped_home_not_advanced(self): self.assertEqual(writes, [ok_home]) +class TestMdmManagedHooksMerge(unittest.TestCase): + """WEB-4814: setup_managed_hooks must MERGE its hooks into any pre-existing + managed-settings hooks block instead of overwriting it, must add Unbound's + own entry exactly once, and must be idempotent across re-runs.""" + + @staticmethod + def _load_mdm(): + import importlib.util + spec = importlib.util.spec_from_file_location( + "mdm_setup", str(Path(__file__).parent / "mdm" / "setup.py") + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + def _run_setup(self, mdm, managed_dir): + """Run setup_managed_hooks with the network download + path discovery + stubbed, against a sandboxed managed dir. The download stub writes a + stub script so the real post-download steps (gateway rewrite, chmod) + have a file to operate on, exactly as in production.""" + def fake_download(url, dest): + Path(dest).parent.mkdir(parents=True, exist_ok=True) + Path(dest).write_text("# stub hook script\n") + return True + + with patch.object(mdm, "download_file", side_effect=fake_download), \ + patch.object(mdm, "get_managed_settings_dir", return_value=managed_dir): + ok = mdm.setup_managed_hooks() + self.assertTrue(ok) + return json.loads((managed_dir / "managed-settings.json").read_text()) + + @staticmethod + def _unbound_entries(settings, script_path, event): + s = str(script_path) + return [e for e in settings["hooks"].get(event, []) + if any(s in h.get("command", "") for h in e.get("hooks", []))] + + def test_preserves_foreign_hook_and_adds_unbound_once_idempotently(self): + mdm = self._load_mdm() + with tempfile.TemporaryDirectory() as td: + managed = Path(td) / "managed" + managed.mkdir() + script_path = managed / "hooks" / "unbound.py" + foreign = {"matcher": "*", "hooks": [ + {"type": "command", "command": "/usr/local/bin/other pre"}]} + (managed / "managed-settings.json").write_text(json.dumps( + {"model": "opus", "hooks": {"PreToolUse": [foreign]}})) + + settings = self._run_setup(mdm, managed) + # foreign survives, unrelated top-level key preserved + self.assertIn(foreign, settings["hooks"]["PreToolUse"]) + self.assertEqual(settings["model"], "opus") + # Unbound entry present exactly once + self.assertEqual( + len(self._unbound_entries(settings, script_path, "PreToolUse")), 1) + + # re-run: no duplicate, foreign still there + settings2 = self._run_setup(mdm, managed) + self.assertIn(foreign, settings2["hooks"]["PreToolUse"]) + self.assertEqual( + len(self._unbound_entries(settings2, script_path, "PreToolUse")), 1) + + def test_keeps_foreign_hook_that_references_our_path_midcommand(self): + """WEB-4814 ownership tightening: a FOREIGN hook whose command merely + contains our script path mid-string (a wrapper exec'ing it as an arg) + is NOT Unbound-owned and MUST survive the merge — only path-prefixed + commands that invoke our script get stripped-and-replaced. Fails on the + pre-fix substring matcher; passes after.""" + mdm = self._load_mdm() + with tempfile.TemporaryDirectory() as td: + managed = Path(td) / "managed" + managed.mkdir() + script_path = managed / "hooks" / "unbound.py" + sp = str(script_path) + wrapper = {"matcher": "*", "hooks": [ + {"type": "command", "command": f'/usr/local/bin/foo "{sp}" --as-arg pre'}]} + drifted_unbound = {"matcher": "*", "hooks": [ + {"type": "command", "command": f'"{sp}" --old-flag'}]} + (managed / "managed-settings.json").write_text(json.dumps( + {"model": "opus", + "hooks": {"PreToolUse": [wrapper, drifted_unbound]}})) + + settings = self._run_setup(mdm, managed) + + entries = settings["hooks"]["PreToolUse"] + # the foreign wrapper survives despite referencing our path + self.assertIn(wrapper, entries) + # by the PRODUCTION ownership matcher, exactly one entry is ours and + # the wrapper is NOT ours + owned = [e for e in entries if mdm._entry_is_unbound(e, sp)] + self.assertEqual(len(owned), 1) + self.assertFalse(mdm._entry_is_unbound(wrapper, sp)) + # the drifted form is gone (replaced, not stacked) + cmds = [h["command"] for e in entries for h in e["hooks"]] + self.assertNotIn(f'"{sp}" --old-flag', cmds) + + def test_managed_settings_write_is_atomic_and_leaves_no_temp(self): + """WEB-4814 LOW-1: setup_managed_hooks must write managed-settings.json + atomically (tmp + os.replace) so a crash mid-write never leaves a + truncated file that Claude Code silently ignores. Asserts valid JSON and + no stray .tmp left in the managed dir.""" + mdm = self._load_mdm() + with tempfile.TemporaryDirectory() as td: + managed = Path(td) / "managed" + managed.mkdir() + settings = self._run_setup(mdm, managed) + # file is complete, parseable JSON with our hooks + self.assertIsInstance(settings["hooks"], dict) + # no temp artifacts left behind in the managed dir + leftover = list(managed.glob("managed-settings.json.*.tmp")) + self.assertEqual(leftover, [], f"stray temp files: {leftover}") + + def test_malformed_existing_hooks_block_does_not_crash(self): + mdm = self._load_mdm() + with tempfile.TemporaryDirectory() as td: + managed = Path(td) / "managed" + managed.mkdir() + script_path = managed / "hooks" / "unbound.py" + (managed / "managed-settings.json").write_text(json.dumps( + {"model": "opus", "hooks": "garbage"})) + settings = self._run_setup(mdm, managed) + self.assertIsInstance(settings["hooks"], dict) + self.assertEqual( + len(self._unbound_entries(settings, script_path, "PreToolUse")), 1) + self.assertEqual(settings["model"], "opus") + + if __name__ == "__main__": unittest.main()