Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion binary/src/unbound_hook/setup_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,78 @@ def _atomic_write_text(path: Path, text: str) -> None:
tmp.write_text(text, encoding="utf-8")
os.replace(tmp, path)


def _command_is_unbound(command, owner_path: str) -> bool:
"""True only when a hook command INVOKES the Unbound hook at owner_path —
i.e. the command starts with our path in one of the forms our writers emit:
"<path>" ... (quoted, the binary + Unix python writers)
<path> ... (bare path)
py -3 "<path>" ... (the Windows python writer)
plus the bare Windows-launcher form for tolerance.

Prefix-anchored on purpose (WEB-4814): an earlier substring match would
mis-claim — and then DELETE — a *foreign* hook whose command merely
references our install path mid-string (e.g. a wrapper that execs our
binary as an argument). Trailing args/version drift after the path are
still tolerated, so idempotency is preserved.
"""
if not isinstance(command, str) or not owner_path:
return False
text = command.lstrip()
quoted = f'"{owner_path}"'
# Forms where our path is the program being run, optionally behind a
# Windows python launcher. Bare-path forms require a trailing boundary
# (whitespace or end) so "/a/foo" doesn't match a command for "/a/foobar".
if text.startswith(quoted):
return True
for launcher in ("py -3 ", "py ", "python "):
if text.startswith(launcher + quoted):
return True
if text == owner_path or text.startswith(owner_path + " "):
return True
for launcher in ("py -3 ", "py ", "python "):
rest = launcher + owner_path
if text == rest or text.startswith(rest + " "):
return True
return False


def _entry_is_unbound(entry, owner_substr: str) -> bool:
"""True when a Claude-Code-style hooks-array entry (a {matcher?, hooks:[...]}
group) owns at least one command that INVOKES the Unbound hook binary.

Matching is path-prefix (not exact command equality) so that re-running
setup after a path/version change still recognizes — and replaces — our own
stale entries instead of stacking a duplicate, while refusing to claim a
foreign command that only references our path mid-string (WEB-4814).
"""
if not isinstance(entry, dict):
return False
for hook in entry.get("hooks", []) or []:
if isinstance(hook, dict) and _command_is_unbound(hook.get("command"), owner_substr):
return True
return False


def _merge_hooks(existing_hooks, our_hooks, owner_substr: str) -> dict:
"""Merge Unbound's per-event hook config into whatever hooks the editor's
managed settings already declared (other tools, hand-authored entries).

Per event: drop any prior Unbound-owned entry (idempotency — never stack
duplicates of our own hook), preserve everyone else's entries, then append
the current Unbound entry. Fails safe: if the existing hooks block is not a
dict we start from an empty one rather than crashing the installer.
"""
merged = dict(existing_hooks) if isinstance(existing_hooks, dict) else {}
for event, our_entries in our_hooks.items():
prior = merged.get(event)
if not isinstance(prior, list):
prior = []
kept = [e for e in prior if not _entry_is_unbound(e, owner_substr)]
merged[event] = kept + list(our_entries)
return merged


def _claude_hooks_config():
cmd = lambda ev: hook_command_for_event("claude-code", ev)
return {
Expand Down Expand Up @@ -252,7 +324,13 @@ def _write_claude_managed_settings(m) -> bool:
if not env:
del settings["env"]

settings["hooks"] = _claude_hooks_config()
# Merge our hooks into any pre-existing hooks block instead of
# overwriting it — a blind assignment here clobbers other tools'
# managed hooks (WEB-4814). Idempotent: stale Unbound entries are
# stripped (matched on the binary path) before our current one is
# appended, so re-running setup never stacks duplicates.
settings["hooks"] = _merge_hooks(
settings.get("hooks"), _claude_hooks_config(), str(HOOK_BINARY))
_atomic_write_text(settings_path, json.dumps(settings, indent=2))

gateway_key_helper = managed_dir / "anthropic_key.sh"
Expand Down
127 changes: 127 additions & 0 deletions binary/tests/test_setup_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,133 @@ def test_setup_is_idempotent(env):
assert (env["tmp"] / "managed-claude" / "managed-settings.json").read_text() == first


# ---------------------------------------------------------------------------
# WEB-4814: the managed-settings hooks writer must MERGE, not overwrite — a
# blind assignment clobbered hooks belonging to other tools / prior config.
# ---------------------------------------------------------------------------

def _foreign_pretooluse_entry():
"""A PreToolUse entry owned by some other tool (not the Unbound binary)."""
return {"matcher": "*", "hooks": [
{"type": "command", "command": "/usr/local/bin/other-tool pre", "timeout": 5}]}


def _unbound_entries(settings, event):
return [e for e in settings["hooks"].get(event, [])
if any(BIN in h.get("command", "") for h in e.get("hooks", []))]


def _seed_managed_settings(env, hooks):
managed = env["tmp"] / "managed-claude"
managed.mkdir(parents=True, exist_ok=True)
(managed / "managed-settings.json").write_text(
json.dumps({"model": "opus", "hooks": hooks}, indent=2))


def test_setup_preserves_foreign_hooks_and_adds_unbound_once(env):
"""A pre-existing foreign PreToolUse hook must survive setup, Unbound's own
PreToolUse entry must be present exactly once, and unrelated top-level keys
must be preserved."""
foreign = _foreign_pretooluse_entry()
_seed_managed_settings(env, {"PreToolUse": [foreign]})

assert setup_cmd.run(["--api-key", "admin-key"]) == 0
settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text())

# (a) foreign entry survives, byte-for-byte
assert foreign in settings["hooks"]["PreToolUse"]
# (b) Unbound's PreToolUse entry present exactly once
unbound = _unbound_entries(settings, "PreToolUse")
assert len(unbound) == 1
assert unbound[0]["hooks"][0]["command"] == _cmd("claude-code", "PreToolUse")
# unrelated top-level key preserved
assert settings["model"] == "opus"


def test_setup_merge_is_idempotent_on_rerun(env):
"""Re-running setup over a settings file that already has both a foreign
hook and a prior Unbound hook must NOT duplicate the Unbound entry, and
must keep the foreign one."""
foreign = _foreign_pretooluse_entry()
stale_unbound = {"matcher": "*", "hooks": [
{"type": "command", "command": _cmd("claude-code", "PreToolUse"), "timeout": 15000}]}
_seed_managed_settings(env, {"PreToolUse": [foreign, stale_unbound]})

assert setup_cmd.run(["--api-key", "admin-key"]) == 0
settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text())
assert foreign in settings["hooks"]["PreToolUse"]
assert len(_unbound_entries(settings, "PreToolUse")) == 1

# second run is a no-op on the hooks shape
assert setup_cmd.run(["--api-key", "admin-key"]) == 0
settings2 = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text())
assert foreign in settings2["hooks"]["PreToolUse"]
assert len(_unbound_entries(settings2, "PreToolUse")) == 1


def test_setup_strips_stale_unbound_hook_with_changed_command(env):
"""Idempotency must match by binary path, not exact command string: a prior
Unbound entry whose command differs only in trailing args is replaced, not
stacked."""
drifted = {"matcher": "*", "hooks": [
{"type": "command", "command": f'"{BIN}" hook claude-code PreToolUse --old-flag'}]}
_seed_managed_settings(env, {"PreToolUse": [drifted]})

assert setup_cmd.run(["--api-key", "admin-key"]) == 0
settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text())
unbound = _unbound_entries(settings, "PreToolUse")
assert len(unbound) == 1
assert unbound[0]["hooks"][0]["command"] == _cmd("claude-code", "PreToolUse")


def test_setup_keeps_foreign_hook_that_references_our_path_midcommand(env):
"""WEB-4814 ownership tightening: a FOREIGN hook whose command merely
contains our binary path mid-string (a wrapper that execs our binary as an
argument) is NOT Unbound-owned and MUST survive the merge — only commands
that *invoke* our binary (path-prefixed) get stripped-and-replaced.

Fails on the pre-fix substring matcher (it would delete the wrapper); passes
after. Also asserts the genuine, args-drifted Unbound entry is replaced once.
"""
wrapper = {"matcher": "*", "hooks": [
{"type": "command",
"command": f'/usr/local/bin/foo "{BIN}" --as-arg pre', "timeout": 5}]}
drifted_unbound = {"matcher": "*", "hooks": [
{"type": "command", "command": f'"{BIN}" hook claude-code PreToolUse --old'}]}
_seed_managed_settings(env, {"PreToolUse": [wrapper, drifted_unbound]})

assert setup_cmd.run(["--api-key", "admin-key"]) == 0
settings = json.loads((env["tmp"] / "managed-claude" / "managed-settings.json").read_text())

entries = settings["hooks"]["PreToolUse"]
# (a) the foreign wrapper survives byte-for-byte despite referencing our path
assert wrapper in entries
# (b) by the PRODUCTION ownership matcher, exactly one entry is ours and it
# is the freshly-emitted (un-drifted) command — the wrapper is NOT ours
owned = [e for e in entries if setup_cmd._entry_is_unbound(e, BIN)]
assert len(owned) == 1
assert owned[0]["hooks"][0]["command"] == _cmd("claude-code", "PreToolUse")
assert not setup_cmd._entry_is_unbound(wrapper, BIN)
# the drifted form is gone (replaced, not stacked)
cmds = [h["command"] for e in entries for h in e["hooks"]]
assert f'"{BIN}" hook claude-code PreToolUse --old' not in cmds


def test_setup_survives_malformed_existing_hooks_block(env):
"""Fail-open: a hooks key that isn't a dict must not crash the installer —
Unbound's hooks are written, the malformed value is discarded."""
managed = env["tmp"] / "managed-claude"
managed.mkdir(parents=True, exist_ok=True)
(managed / "managed-settings.json").write_text(
json.dumps({"model": "opus", "hooks": "garbage-not-a-dict"}))

assert setup_cmd.run(["--api-key", "admin-key"]) == 0
settings = json.loads((managed / "managed-settings.json").read_text())
assert isinstance(settings["hooks"], dict)
assert len(_unbound_entries(settings, "PreToolUse")) == 1
assert settings["model"] == "opus"


# ---------------------------------------------------------------------------
# WEB-4788 migration sweep fixtures: clean, half-installed, previously-binary
# ---------------------------------------------------------------------------
Expand Down
80 changes: 78 additions & 2 deletions claude-code/hooks/mdm/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,76 @@ def rewrite_gateway_url_in_file(path: Path, gateway_url: str) -> None:
debug_print(f"Could not rewrite gateway URL in {path}: {e}")


def _command_is_unbound(command, owner_path: str) -> bool:
"""True only when a hook command INVOKES the Unbound hook at owner_path —
i.e. the command starts with our path in one of the forms our writers emit:
"<path>" ... (quoted, the Unix python writer)
<path> ... (bare path)
py -3 "<path>" ... (the Windows python writer)
plus the bare Windows-launcher form for tolerance.

Prefix-anchored on purpose (WEB-4814): an earlier substring match would
mis-claim — and then DELETE — a *foreign* hook whose command merely
references our install path mid-string. Trailing args/version drift after
the path are still tolerated, so idempotency is preserved."""
if not isinstance(command, str) or not owner_path:
return False
text = command.lstrip()
quoted = f'"{owner_path}"'
if text.startswith(quoted):
return True
for launcher in ("py -3 ", "py ", "python "):
if text.startswith(launcher + quoted):
return True
if text == owner_path or text.startswith(owner_path + " "):
return True
for launcher in ("py -3 ", "py ", "python "):
rest = launcher + owner_path
if text == rest or text.startswith(rest + " "):
return True
return False


def _entry_is_unbound(entry, owner_substr: str) -> bool:
"""True when a hooks-array entry ({matcher?, hooks:[...]}) owns at least
one command that INVOKES our managed hook script. Path-prefix match (not
exact equality) so a changed command form still matches our own entry,
while refusing to claim a foreign command that only references our path
mid-string (WEB-4814)."""
if not isinstance(entry, dict):
return False
for hook in entry.get("hooks", []) or []:
if isinstance(hook, dict) and _command_is_unbound(hook.get("command"), owner_substr):
return True
return False


def _merge_hooks(existing_hooks, our_hooks: dict, owner_substr: str) -> dict:
"""Merge our per-event hook config into the editor's existing hooks block
instead of overwriting it (WEB-4814). Per event: drop prior Unbound-owned
entries (idempotency), keep everyone else's, append the current one. Fails
safe — a non-dict existing block is treated as empty rather than crashing."""
merged = dict(existing_hooks) if isinstance(existing_hooks, dict) else {}
for event, our_entries in our_hooks.items():
prior = merged.get(event)
if not isinstance(prior, list):
prior = []
kept = [e for e in prior if not _entry_is_unbound(e, owner_substr)]
merged[event] = kept + list(our_entries)
return merged


def _atomic_write_text(path: Path, text: str) -> None:
"""tmp + os.replace so a crash mid-write never leaves Claude Code reading a
truncated managed-settings.json — which it would silently ignore, dropping
the security control (WEB-4814 LOW-1). The tmp file lives in the same dir
so os.replace is an atomic rename on the same filesystem. File mode is left
to the caller's existing chmod step (unchanged behavior)."""
tmp = path.parent / (path.name + "." + str(os.getpid()) + ".tmp")
tmp.write_text(text, encoding="utf-8")
os.replace(str(tmp), str(path))


def setup_managed_hooks(gateway_url: str = DEFAULT_GATEWAY_URL) -> bool:
"""
Set up system-wide managed hooks for Claude Code.
Expand Down Expand Up @@ -863,8 +933,14 @@ def _hook(entry: dict) -> dict:
]
}

settings["hooks"] = hooks_config
settings_path.write_text(json.dumps(settings, indent=2), encoding="utf-8")
# Merge into any pre-existing hooks block rather than overwriting it,
# so other tools' managed hooks survive setup (WEB-4814). Idempotent:
# stale Unbound entries (matched on the script path, which covers both
# the Unix '"path"' and Windows 'py -3 "path"' command forms) are
# stripped before the current one is appended.
settings["hooks"] = _merge_hooks(
settings.get("hooks"), hooks_config, str(script_path))
_atomic_write_text(settings_path, json.dumps(settings, indent=2))
debug_print(f"Created managed settings: {settings_path}")

# Delete the gateway key helper only after the hooks settings are
Expand Down
Loading