diff --git a/antigravity/hooks/README.md b/antigravity/hooks/README.md new file mode 100644 index 00000000..7b54a3ca --- /dev/null +++ b/antigravity/hooks/README.md @@ -0,0 +1,21 @@ +# Antigravity Hooks for Unbound Gateway + +Run the command to set up Antigravity hooks with Unbound: + +```bash +python3 <(curl -fsSL https://raw.githubusercontent.com/websentry-ai/setup/refs/heads/main/antigravity/hooks/setup.py) --domain gateway.getunbound.ai +``` + +Optional overrides for tenant deployments: `--backend-url `, `--gateway-url ` (defaults: `https://backend.getunbound.ai`, `https://api.getunbound.ai`). + +If you already have an API key: + +```bash +python3 <(curl -fsSL https://raw.githubusercontent.com/websentry-ai/setup/refs/heads/main/antigravity/hooks/setup.py) --api-key +``` + +Uninstall: + +```bash +python3 <(curl -fsSL https://raw.githubusercontent.com/websentry-ai/setup/refs/heads/main/antigravity/hooks/setup.py) --clear +``` diff --git a/antigravity/hooks/mdm/README.md b/antigravity/hooks/mdm/README.md new file mode 100644 index 00000000..879ed82f --- /dev/null +++ b/antigravity/hooks/mdm/README.md @@ -0,0 +1,17 @@ +# Antigravity Hooks - MDM Setup + +Device-wide installation of Unbound hooks for Antigravity. Requires root. + +```bash +sudo python3 setup.py --api-key +``` + +Optional flags: `--backend-url `, `--gateway-url `, `--app_name `. + +Uninstall: + +```bash +sudo python3 setup.py --clear +``` + +The MDM installer enumerates every user on the device, fetches a per-device API key from the Unbound backend, drops privileges to each user, and writes `~/.gemini/config/hooks.json` plus `~/.unbound/antigravity-hooks/unbound_*.py` for that user. A marker is dropped at `/etc/unbound/antigravity.policy.json` (or `%ProgramFiles%\Unbound\antigravity.policy.json` on Windows) so reruns are idempotent. diff --git a/antigravity/hooks/mdm/setup.py b/antigravity/hooks/mdm/setup.py new file mode 100644 index 00000000..475d8174 --- /dev/null +++ b/antigravity/hooks/mdm/setup.py @@ -0,0 +1,893 @@ +#!/usr/bin/env python3 +"""MDM (device-wide) Unbound hooks installer for Antigravity (agy 1.0.5+). + +Mirrors ``claude-code/hooks/mdm/setup.py``: enumerates user homes, drops +privileges to each user, and runs the same user-level install logic against +``~/.gemini/config/hooks.json``. + + --api-key MDM admin API key, used to fetch a per-device key. + --backend-url Backend host (default https://backend.getunbound.ai). + --gateway-url Unbound gateway base URL (baked into hook scripts). + --app_name Optional MDM application identifier. + --clear Uninstall — surgically remove our entries for every + user, delete our scripts, drop the policy marker. + --backfill No-op for Antigravity. Accepted for CLI parity. + --debug Always on for MDM; flag accepted for parity. + +Drops a marker at ``/etc/unbound/antigravity.policy.json`` (Unix) or +``%ProgramFiles%\\Unbound\\antigravity.policy.json`` (Windows) so reruns are +idempotent. +""" + +import json +import os +import platform +import shutil +import subprocess +import sys +import urllib.error +import urllib.request +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +try: + import pwd +except ImportError: + pwd = None + + +DEFAULT_GATEWAY_URL = "https://api.getunbound.ai" +DEFAULT_BACKEND_URL = "https://backend.getunbound.ai" +UNBOUND_APP_LABEL = "antigravity" + +# Only PreToolUse and PostToolUse actually wire through to user-supplied +# commands in agy 1.0.5. UserPromptSubmit/SessionStart are silently dropped at +# parse time; PreInvocation/PostInvocation/Stop register and log "executing +# command" but never spawn the process. Don't install hooks we know won't fire. +HOOK_EVENT_SCRIPTS: List[Tuple[str, str]] = [ + ("PreToolUse", "unbound_pre_tool_use.py"), + ("PostToolUse", "unbound_post_tool_use.py"), +] + +# Catch-all matcher for both events. ``""`` (empty string) is verified to fire +# on every tool. A regex allowlist here silently bypasses our hook for any +# tool not in the list — server-side filtering (gateway's +# APP_NATIVE_FILE_TOOLS / tools_to_check) is the right gate. +HOOK_EVENT_MATCHERS: Dict[str, Optional[str]] = { + "PreToolUse": "", + "PostToolUse": "", +} + +HOOK_TIMEOUT_SECONDS = 15 +TELEMETRY_TIMEOUT_SECONDS = 60 + +# MDM scripts always run with debug logging on — administrators need full +# diagnostic output for troubleshooting across managed devices. +DEBUG = True + + +def debug_print(message: str) -> None: + if DEBUG: + print(f"[DEBUG] {message}") + + +def normalize_url(value: str) -> str: + value = (value or "").strip() + if not value: + return value + if not (value.startswith("http://") or value.startswith("https://")): + value = f"https://{value}" + return value.rstrip("/") + + +def check_admin_privileges() -> bool: + try: + system = platform.system().lower() + if system in ("darwin", "linux"): + return os.geteuid() == 0 + if system == "windows": + import ctypes + try: + return bool(ctypes.windll.shell32.IsUserAnAdmin()) + except Exception: + return False + return False + except Exception as e: + debug_print(f"Failed to check privileges: {e}") + return False + + +def get_policy_marker_path() -> Path: + """Where we drop the install marker so reruns are idempotent.""" + system = platform.system().lower() + if system == "windows": + program_files = os.environ.get("ProgramFiles", r"C:\Program Files") + return Path(program_files) / "Unbound" / "antigravity.policy.json" + return Path("/etc/unbound/antigravity.policy.json") + + +# ----------------------------------------------------------------------------- +# User enumeration (mirrors claude-code/hooks/mdm/setup.py) +# ----------------------------------------------------------------------------- + +def get_all_user_homes() -> List[Tuple[str, Path]]: + user_homes: List[Tuple[str, Path]] = [] + system = platform.system().lower() + try: + if system == "darwin" and pwd is not None: + for user in pwd.getpwall(): + uid = user.pw_uid + username = user.pw_name + home_dir = Path(user.pw_dir) + if uid >= 500 and home_dir.exists() and home_dir.is_dir(): + if str(home_dir).startswith("/Users/") and username not in ("Shared", "Guest"): + user_homes.append((username, home_dir)) + debug_print(f"Found user: {username} -> {home_dir}") + elif system == "linux" and pwd is not None: + for user in pwd.getpwall(): + uid = user.pw_uid + username = user.pw_name + home_dir = Path(user.pw_dir) + if uid >= 1000 and home_dir.exists() and home_dir.is_dir(): + if str(home_dir).startswith("/home/"): + user_homes.append((username, home_dir)) + debug_print(f"Found user: {username} -> {home_dir}") + elif system == "windows": + system_drive = os.environ.get("SystemDrive", "C:") + users_dir = Path(system_drive + r"\Users") + if users_dir.exists(): + for user_dir in users_dir.iterdir(): + if user_dir.is_dir() and user_dir.name not in ( + "Public", "Default", "Default User", "Administrator", "All Users", + ): + user_homes.append((user_dir.name, user_dir)) + debug_print(f"Found user: {user_dir.name} -> {user_dir}") + return user_homes + except Exception as e: + debug_print(f"Error enumerating users: {e}") + return [] + + +def _run_as_user(username: Optional[str], fn, *args, **kwargs): + """Fork+exec fn as the unprivileged user `username`. Returns whatever fn + returns on success, or None on failure. + + Security-critical: any FS op that touches a user-controlled path must go + through this to avoid symlink-following privilege escalation. Mirrors + claude-code/hooks/mdm/setup.py::_run_as_user.""" + if platform.system().lower() == "windows": + try: + return fn(*args, **kwargs) + except Exception: + return None + if pwd is None or username is None: + return None + try: + info = pwd.getpwnam(username) + except KeyError: + return None + uid, gid = info.pw_uid, info.pw_gid + + r_fd, w_fd = os.pipe() + pid = os.fork() + if pid == 0: + os.close(r_fd) + try: + os.setgroups([]) + os.setgid(gid) + os.setuid(uid) + result = fn(*args, **kwargs) + import pickle + os.write(w_fd, pickle.dumps(result, protocol=pickle.HIGHEST_PROTOCOL)) + os.close(w_fd) + os._exit(0) + except Exception: + try: + os.close(w_fd) + except OSError: + pass + os._exit(1) + else: + os.close(w_fd) + data = b"" + while True: + try: + chunk = os.read(r_fd, 65536) + except OSError: + break + if not chunk: + break + data += chunk + os.close(r_fd) + try: + _, status = os.waitpid(pid, 0) + except OSError: + return None + if os.WEXITSTATUS(status) != 0: + return None + try: + import pickle + return pickle.loads(data) if data else None + except Exception: + return None + + +# ----------------------------------------------------------------------------- +# MDM API key fetch (mirrors claude-code MDM) +# ----------------------------------------------------------------------------- + +def get_device_identifier() -> Optional[str]: + system = platform.system().lower() + try: + if system == "darwin": + result = subprocess.run( + ["system_profiler", "SPHardwareDataType"], + capture_output=True, text=True, timeout=10, + ) + if result.returncode == 0: + for line in result.stdout.split("\n"): + if "Serial Number" in line: + parts = line.split(": ") + if len(parts) >= 2 and parts[1].strip(): + return parts[1].strip() + return None + if system == "linux": + try: + result = subprocess.run( + ["dmidecode", "-s", "system-serial-number"], + capture_output=True, text=True, timeout=10, stderr=subprocess.DEVNULL, + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except Exception: + pass + for p in ("/etc/machine-id", "/var/lib/dbus/machine-id"): + try: + with open(p, "r", encoding="utf-8") as f: + v = f.read().strip() + if v: + return v + except Exception: + continue + try: + import socket + return socket.gethostname() + except Exception: + return None + if system == "windows": + try: + result = subprocess.run( + ["powershell", "-NoProfile", "-Command", + "(Get-CimInstance -ClassName Win32_BIOS).SerialNumber"], + capture_output=True, text=True, timeout=10, + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip() + except Exception: + pass + try: + import winreg + with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, + r"SOFTWARE\Microsoft\Cryptography") as key: + value, _ = winreg.QueryValueEx(key, "MachineGuid") + if value: + return str(value).strip() + except Exception: + pass + try: + import socket + return socket.gethostname() + except Exception: + return None + except Exception as e: + debug_print(f"Failed to get device identifier: {e}") + return None + + +def fetch_api_key_from_mdm( + base_url: str, app_name: Optional[str], auth_api_key: str, device_id: str +) -> Optional[str]: + # Use stdlib urllib instead of shelling out to curl: passing the bearer + # token via curl's argv leaks it through ``ps auxe`` / + # ``/proc//cmdline`` to any other user on the device. urllib sets the + # header inside our own process — argv stays secret-free. + params = f"serial_number={device_id}&app_type={UNBOUND_APP_LABEL}" + if app_name: + params = f"app_name={app_name}&{params}" + url = f"{base_url.rstrip('/')}/api/v1/automations/mdm/get_application_api_key/?{params}" + debug_print(f"Fetching API key from: {url}") + try: + req = urllib.request.Request( + url, + method="GET", + headers={"Authorization": f"Bearer {auth_api_key}"}, + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + http_code = resp.getcode() + body = resp.read().decode("utf-8") + except urllib.error.HTTPError as e: + try: + err_body = e.read().decode("utf-8", errors="replace") + except Exception: + err_body = "" + debug_print(f"HTTP {e.code} body: {err_body[:200]}") + print(f"API request failed with status {e.code}") + return None + except urllib.error.URLError as e: + debug_print(f"URL error: {e}") + print("Failed to reach backend") + return None + + if http_code != 200: + print(f"API request failed with status {http_code}") + return None + data = json.loads(body) + api_key = data.get("api_key") + if not api_key: + print("No api_key in response") + return None + return api_key + except (json.JSONDecodeError, ValueError): + print("Invalid JSON response from server") + return None + except Exception as e: + debug_print(f"Request failed: {e}") + return None + + +# ----------------------------------------------------------------------------- +# Per-user install logic — runs inside the privilege-dropped fork. +# ----------------------------------------------------------------------------- + +def _script_source_dir() -> Path: + """Templates live next to this file at install time, two levels up + (``antigravity/hooks/scripts/``).""" + return Path(__file__).resolve().parent.parent / "scripts" + + +SCRIPT_BASE_URL = ( + "https://raw.githubusercontent.com/websentry-ai/setup/refs/heads/main/" + "antigravity/hooks/scripts" +) + + +def _read_script_template(filename: str) -> Optional[bytes]: + """Read a script template either from the local checkout or by fetching + from GitHub. Run as root *before* the privilege drop so we don't need + network/FS access inside the unprivileged child.""" + src = _script_source_dir() / filename + if src.exists(): + try: + return src.read_bytes() + except OSError as e: + print(f"Failed to read {src}: {e}") + return None + url = f"{SCRIPT_BASE_URL}/{filename}" + try: + result = subprocess.run( + ["curl", "-fsSL", url], capture_output=True, timeout=30, + ) + if result.returncode == 0: + return result.stdout + return None + except (subprocess.TimeoutExpired, FileNotFoundError): + return None + + +def _build_hook_command(script_path: Path) -> Tuple[str, bool]: + is_windows = platform.system().lower() == "windows" + if is_windows: + launcher = "py -3" if shutil.which("py") else "python" + return f'{launcher} "{script_path}"', True + return str(script_path), False + + +def _build_event_entry(event: str, script_path: Path) -> Dict: + command, is_windows = _build_hook_command(script_path) + matcher = HOOK_EVENT_MATCHERS.get(event, "") + inner: Dict = { + "type": "command", + "command": command, + "timeout": TELEMETRY_TIMEOUT_SECONDS if event != "PreToolUse" else HOOK_TIMEOUT_SECONDS, + } + if event == "PostToolUse": + inner["async"] = True + if is_windows: + inner["shell"] = "powershell" + return {"matcher": matcher if matcher is not None else "", "hooks": [inner]} + + +def _is_our_hook_command(command: str, install_prefix: str, is_windows: bool) -> bool: + if not command: + return False + if is_windows: + return install_prefix in command and "unbound_" in command + try: + path = Path(command) + return ( + str(path.parent) == install_prefix + and path.name.startswith("unbound_") + and path.name.endswith(".py") + ) + except (ValueError, OSError): + return False + + +def _atomic_write_json(path: Path, data: Dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + f.write("\n") + os.replace(tmp, path) + + +def _write_unbound_config_payload(home_dir: Path, api_key: str, gateway_url: str, backend_url: str) -> bool: + """Body of the per-user ~/.unbound/config.json write. Runs inside the + privilege-dropped fork so attacker-planted symlinks in $HOME can't + redirect a root write. Mirrors claude-code/hooks/mdm/setup.py's + write_unbound_config_for_user pattern: tighten perms, atomic write, + O_NOFOLLOW + 0o600. + + This is what the runtime hook scripts read in + ``scripts/_common.py::load_credentials`` to authenticate to the gateway — + without it every PreToolUse silently fail-opens and the org policy is + never enforced.""" + try: + config_dir = home_dir / ".unbound" + config_file = config_dir / "config.json" + config_dir.mkdir(mode=0o700, parents=True, exist_ok=True) + if platform.system().lower() != "windows": + try: + os.chmod(config_dir, 0o700) + except OSError: + pass + + # Preserve unrelated fields a previous tool's setup may have written + # (e.g. claude-code writes the same file). Only api_key/gateway_url/ + # backend_url are owned by us. + config: Dict = {} + if config_file.exists(): + try: + with open(config_file, "r", encoding="utf-8") as f: + config = json.loads(f.read()) + if not isinstance(config, dict): + config = {} + except (json.JSONDecodeError, OSError): + config = {} + + config["api_key"] = api_key + if gateway_url: + config["gateway_url"] = gateway_url + if backend_url: + config["backend_url"] = backend_url + + # Atomic write via tmp + rename. O_NOFOLLOW on both the tmp open AND + # the rename target so a symlink at either path is refused. 0o600 so + # only the target user can read the secret. + flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC | getattr(os, "O_NOFOLLOW", 0) + tmp = config_file.with_suffix(config_file.suffix + ".tmp") + if tmp.exists(): + try: + tmp.unlink() + except OSError: + pass + fd = os.open(str(tmp), flags, 0o600) + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(json.dumps(config, indent=2)) + os.replace(str(tmp), str(config_file)) + # os.replace preserves the source's mode, but be defensive in case the + # rename target pre-existed with a wider mode and somehow survives. + if platform.system().lower() != "windows": + try: + os.chmod(config_file, 0o600) + except OSError: + pass + return True + except Exception as e: + debug_print(f"per-user config write failed in {home_dir}: {e}") + return False + + +def install_for_user_payload(home_dir: Path, gateway_url: str, backend_url: str, api_key: str, script_templates: Dict[str, bytes]) -> bool: + """Body of the per-user install. Runs inside the privilege-dropped fork. + Arguments are inherited from the parent via copy-on-write (os.fork), so + script bytes are passed by value (we already read them as root before + dropping). Only the return value is pickled back to the parent over the + write end of the pipe. + + Order matters: write ~/.unbound/config.json BEFORE hooks.json so a + hooks.json failure never leaves the user with an entry-point hook + that can't authenticate (which would fail-open every call).""" + try: + # Hook scripts go under our own namespace (~/.unbound/antigravity-hooks/); + # the agy-readable hooks.json lives at ~/.gemini/config/hooks.json + # (verified empirically — that's the path agy auto-loads). + unbound_dir = home_dir / ".unbound" + hooks_dir = unbound_dir / "antigravity-hooks" + gemini_config_dir = home_dir / ".gemini" / "config" + hooks_json_path = gemini_config_dir / "hooks.json" + + unbound_dir.mkdir(parents=True, exist_ok=True) + hooks_dir.mkdir(parents=True, exist_ok=True) + gemini_config_dir.mkdir(parents=True, exist_ok=True) + + # 0. Write the credentials file FIRST. Hook scripts read this at + # runtime via _common.py::load_credentials; without it the PreToolUse + # gate silently fail-opens on every call. + if not _write_unbound_config_payload(home_dir, api_key, gateway_url, backend_url): + return False + + # 1. Write the shared helper, with the gateway URL baked in. + common_bytes = script_templates["_common.py"] + common_text = common_bytes.decode("utf-8") + if gateway_url and gateway_url != DEFAULT_GATEWAY_URL: + common_text = common_text.replace( + f'"{DEFAULT_GATEWAY_URL}"', f'"{gateway_url}"' + ) + common_dest = hooks_dir / "_common.py" + flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC | getattr(os, "O_NOFOLLOW", 0) + fd = os.open(str(common_dest), flags, 0o644) + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(common_text) + + # 2. Write the two event scripts. + for _event, installed_name in HOOK_EVENT_SCRIPTS: + src_name = installed_name.replace("unbound_", "", 1) + script_bytes = script_templates[src_name] + dest = hooks_dir / installed_name + fd = os.open(str(dest), flags, 0o755) + with os.fdopen(fd, "wb") as f: + f.write(script_bytes) + + # 3. Non-destructive hooks.json merge. + if hooks_json_path.exists(): + try: + # O_NOFOLLOW so an attacker-planted symlink at this path can't + # redirect our read (and the subsequent write) to a file outside + # the user's home. Matches the write side and the script-write + # loop above. + read_flags = os.O_RDONLY | getattr(os, "O_NOFOLLOW", 0) + fd = os.open(str(hooks_json_path), read_flags) + with os.fdopen(fd, "r", encoding="utf-8") as f: + settings = json.load(f) + if not isinstance(settings, dict): + return False + except (json.JSONDecodeError, OSError): + return False + else: + settings = {} + + if "hooks" not in settings or not isinstance(settings["hooks"], dict): + settings["hooks"] = {} + + for event, installed_name in HOOK_EVENT_SCRIPTS: + script_path = hooks_dir / installed_name + our_entry = _build_event_entry(event, script_path) + our_command = our_entry["hooks"][0]["command"] + existing = settings["hooks"].get(event) + if not isinstance(existing, list): + settings["hooks"][event] = [our_entry] + continue + already_present = False + for item in existing: + if not isinstance(item, dict): + continue + hooks_list = item.get("hooks", []) + if not isinstance(hooks_list, list): + continue + for h in hooks_list: + if isinstance(h, dict) and h.get("command") == our_command: + already_present = True + break + if already_present: + break + if not already_present: + existing.append(our_entry) + + _atomic_write_json(hooks_json_path, settings) + return True + except Exception as e: + debug_print(f"per-user install failed in {home_dir}: {e}") + return False + + +def clear_for_user_payload(home_dir: Path) -> str: + """Body of the per-user clear. Mirrors install_for_user_payload. Returns + "cleared" | "not_found" | "failed".""" + try: + unbound_dir = home_dir / ".unbound" + hooks_dir = unbound_dir / "antigravity-hooks" + hooks_json_path = home_dir / ".gemini" / "config" / "hooks.json" + install_prefix = str(hooks_dir) + is_windows = platform.system().lower() == "windows" + + any_cleared = False + any_failed = False + + # 1. Remove our hook entries from hooks.json. + if hooks_json_path.exists(): + try: + with open(hooks_json_path, "r", encoding="utf-8") as f: + settings = json.load(f) + if isinstance(settings, dict) and isinstance(settings.get("hooks"), dict): + hooks_block = settings["hooks"] + modified = False + for event in list(hooks_block.keys()): + event_config = hooks_block[event] + if not isinstance(event_config, list): + continue + new_event_config = [] + for item in event_config: + if not isinstance(item, dict): + new_event_config.append(item) + continue + hooks_list = item.get("hooks", []) + if not isinstance(hooks_list, list): + new_event_config.append(item) + continue + new_hooks = [ + h for h in hooks_list + if not ( + isinstance(h, dict) + and _is_our_hook_command( + h.get("command", ""), install_prefix, is_windows, + ) + ) + ] + if len(new_hooks) == len(hooks_list): + new_event_config.append(item) + continue + modified = True + if new_hooks: + item["hooks"] = new_hooks + new_event_config.append(item) + if new_event_config: + hooks_block[event] = new_event_config + else: + del hooks_block[event] + modified = True + if not hooks_block: + del settings["hooks"] + modified = True + if modified: + _atomic_write_json(hooks_json_path, settings) + any_cleared = True + except (json.JSONDecodeError, OSError) as e: + debug_print(f"Failed to clean {hooks_json_path}: {e}") + any_failed = True + + # 2. Delete the installed scripts. + if hooks_dir.exists(): + for _event, installed_name in HOOK_EVENT_SCRIPTS: + p = hooks_dir / installed_name + if p.exists(): + try: + p.unlink() + any_cleared = True + except OSError: + any_failed = True + common = hooks_dir / "_common.py" + if common.exists(): + try: + common.unlink() + any_cleared = True + except OSError: + any_failed = True + # Best-effort: drop the hooks dir if empty. + try: + if not any(hooks_dir.iterdir()): + hooks_dir.rmdir() + except OSError: + pass + + if any_cleared: + return "cleared" + if any_failed: + return "failed" + return "not_found" + except Exception as e: + debug_print(f"per-user clear failed in {home_dir}: {e}") + return "failed" + + +# ----------------------------------------------------------------------------- +# Policy marker (idempotency) +# ----------------------------------------------------------------------------- + +def write_policy_marker(api_key_present: bool, device_id: Optional[str]) -> None: + marker = get_policy_marker_path() + payload = { + "tool_type": UNBOUND_APP_LABEL, + "api_key_set": bool(api_key_present), + "device_id": device_id or "", + } + try: + marker.parent.mkdir(parents=True, exist_ok=True) + _atomic_write_json(marker, payload) + debug_print(f"Wrote policy marker {marker}") + except OSError as e: + debug_print(f"Could not write policy marker {marker}: {e}") + + +def remove_policy_marker() -> None: + marker = get_policy_marker_path() + if marker.exists(): + try: + marker.unlink() + debug_print(f"Removed policy marker {marker}") + except OSError as e: + debug_print(f"Could not remove policy marker {marker}: {e}") + + +def notify_setup_complete(api_key: str, backend_url: str, device_id: Optional[str]) -> None: + # Use stdlib urllib so the API key never appears on the curl argv — + # see fetch_api_key_from_mdm above for the same reason. + try: + url = f"{backend_url.rstrip('/')}/api/v1/setup/complete/" + body: Dict = {"tool_type": UNBOUND_APP_LABEL} + if device_id: + body["serial_number"] = device_id + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + url, + data=data, + method="POST", + headers={ + "X-API-KEY": api_key, + "Content-Type": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=10): + pass + except urllib.error.HTTPError as e: + debug_print(f"Setup-complete returned HTTP {e.code}") + debug_print("Setup completion notification sent") + except Exception as e: + debug_print(f"Could not notify backend: {e}") + + +# ----------------------------------------------------------------------------- +# Top-level orchestration +# ----------------------------------------------------------------------------- + +def run_install(api_key: str, gateway_url: str, backend_url: str, device_id: Optional[str]) -> None: + user_homes = get_all_user_homes() + if not user_homes: + print("No user home directories found") + return + + print(f"\nInstalling hooks for {len(user_homes)} user(s)...") + + # Read all script templates once as root, before any privilege drop. + templates: Dict[str, bytes] = {} + needed = ["_common.py"] + [name.replace("unbound_", "", 1) for _e, name in HOOK_EVENT_SCRIPTS] + for filename in needed: + data = _read_script_template(filename) + if data is None: + print(f"Failed to read hook script template {filename}") + return + templates[filename] = data + + success_count = 0 + for username, home_dir in user_homes: + ok = _run_as_user( + username, install_for_user_payload, + home_dir, gateway_url, backend_url, api_key, templates, + ) + if ok: + success_count += 1 + debug_print(f"Installed for {username}") + else: + print(f"Failed to install for {username}") + + if success_count > 0: + print(f"Installed for {success_count} user(s)") + write_policy_marker(api_key_present=True, device_id=device_id) + notify_setup_complete(api_key, backend_url, device_id) + else: + print("Install failed for all users") + + +def run_clear() -> None: + print("\nClearing Antigravity hooks for all users...") + user_homes = get_all_user_homes() + if not user_homes: + print("No user home directories found") + + cleared = 0 + not_found = 0 + failed = 0 + for username, home_dir in user_homes: + status = _run_as_user(username, clear_for_user_payload, home_dir) + if status == "cleared": + cleared += 1 + elif status == "not_found": + not_found += 1 + else: + failed += 1 + + if cleared: + print(f"Cleared for {cleared} user(s)") + if not_found: + print(f"Not installed for {not_found} user(s)") + if failed: + print(f"Failed to clear for {failed} user(s)") + + remove_policy_marker() + + +def _arg_value(name: str, argv: List[str]) -> Optional[str]: + for i, arg in enumerate(argv): + if arg == name and i + 1 < len(argv): + return argv[i + 1] + return None + + +def main() -> None: + argv = sys.argv[1:] + clear_mode = "--clear" in argv + + print("=" * 60) + print("Antigravity Hooks - MDM Setup") + print("=" * 60) + + if not check_admin_privileges(): + if platform.system().lower() == "windows": + sys.exit( + "Error: MDM setup requires an elevated shell on Windows. " + "Right-click PowerShell -> Run as Administrator, then rerun." + ) + print("This script requires administrator/root privileges") + print(" Please re-run with sudo.") + sys.exit(1) + + if clear_mode: + run_clear() + print("\n" + "=" * 60) + print("Clear Complete!") + print("=" * 60) + return + + backend_url = normalize_url(_arg_value("--backend-url", argv) or DEFAULT_BACKEND_URL) + gateway_url = normalize_url(_arg_value("--gateway-url", argv) or DEFAULT_GATEWAY_URL) + app_name = _arg_value("--app_name", argv) + auth_api_key = _arg_value("--api-key", argv) + + if not auth_api_key: + print("\nMissing required argument: --api-key") + print("Usage: sudo python3 setup.py --api-key [--backend-url ] " + "[--gateway-url ] [--app_name ] [--debug] [--backfill]") + print(" Or: sudo python3 setup.py --clear [--debug]") + sys.exit(1) + + print("\nGetting device identifier...") + device_id = get_device_identifier() + if not device_id: + print("Failed to get device identifier") + sys.exit(1) + debug_print(f"Device identifier: {device_id}") + + print("\nFetching API key from MDM...") + api_key = fetch_api_key_from_mdm(backend_url, app_name, auth_api_key, device_id) + if not api_key: + sys.exit(1) + print("API key received") + + run_install(api_key, gateway_url, backend_url, device_id) + + print("\n" + "=" * 60) + print("Setup Complete!") + print("=" * 60) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nSetup cancelled.") + except Exception as e: + print(f"\nError: {e}") + sys.exit(1) diff --git a/antigravity/hooks/mdm/test_setup.py b/antigravity/hooks/mdm/test_setup.py new file mode 100644 index 00000000..5f19a0af --- /dev/null +++ b/antigravity/hooks/mdm/test_setup.py @@ -0,0 +1,336 @@ +"""Integration tests for antigravity/hooks/mdm/setup.py. + +Run from this directory: + + cd antigravity/hooks/mdm && python3 -m unittest test_setup.py -v + +These tests drive the privilege-dropped install payload against a tmpdir-rooted +fake home so the real ``~/.gemini/config/hooks.json`` is never touched. + +What they cover (and why): + +1. ``_write_unbound_config_payload`` actually writes ``~/.unbound/config.json`` + with the API key, mode 0o600, and the gateway/backend URLs. CRITICAL: the + runtime hook scripts read this file via ``_common.py::load_credentials`` — + without it every PreToolUse silently fail-opens and the org policy is + never enforced. + +2. ``install_for_user_payload`` (called from inside the fork) drops the + config file BEFORE the hooks.json merge, so a hooks.json failure + never strands a half-installed device with a hook entry pointing at a + config that doesn't exist. + +3. The matcher shape uses the catch-all ``""`` (verified to fire on every + tool), not a regex allowlist that hardcoded the tool list. Any future + tool (browser/*, notebook/*, subagent/*) still hits our gate. + +4. Only the two events agy 1.0.5 actually fires (PreToolUse, PostToolUse) + are installed. UserPromptSubmit/SessionStart/PreInvocation/etc. are + non-functional in agy and must not be written. + +5. ``notify_setup_complete`` + ``fetch_api_key_from_mdm`` do NOT shell out + to ``curl`` — passing the API key via curl's argv leaks it to any other + user on the device through ``ps auxe``. +""" + +import json +import os +import shutil +import stat +import sys +import tempfile +import unittest +from pathlib import Path + + +# Make ``setup`` (this directory's module) importable when tests run from here. +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import setup as mdm_setup # noqa: E402 + + +class TestWriteUnboundConfig(unittest.TestCase): + """``_write_unbound_config_payload`` is the fix for CRITICAL #1: MDM was + installing scripts + hooks.json but never the credentials file the + hook scripts read at runtime.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.home = Path(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_writes_config_with_api_key(self): + ok = mdm_setup._write_unbound_config_payload( + self.home, + api_key="dev-key", + gateway_url="https://gw.example.test", + backend_url="https://be.example.test", + ) + self.assertTrue(ok) + + config_file = self.home / ".unbound" / "config.json" + self.assertTrue(config_file.exists()) + with open(config_file, "r", encoding="utf-8") as f: + cfg = json.load(f) + self.assertEqual(cfg["api_key"], "dev-key") + self.assertEqual(cfg["gateway_url"], "https://gw.example.test") + self.assertEqual(cfg["backend_url"], "https://be.example.test") + + @unittest.skipIf(os.name == "nt", "POSIX modes only") + def test_config_file_mode_is_0600(self): + mdm_setup._write_unbound_config_payload( + self.home, api_key="x", gateway_url="https://g.test", backend_url="https://b.test", + ) + config_file = self.home / ".unbound" / "config.json" + mode = stat.S_IMODE(config_file.stat().st_mode) + self.assertEqual(mode, 0o600, f"expected 0o600, got {oct(mode)}") + + @unittest.skipIf(os.name == "nt", "POSIX modes only") + def test_config_dir_mode_is_0700(self): + mdm_setup._write_unbound_config_payload( + self.home, api_key="x", gateway_url="https://g.test", backend_url="https://b.test", + ) + config_dir = self.home / ".unbound" + mode = stat.S_IMODE(config_dir.stat().st_mode) + self.assertEqual(mode, 0o700, f"expected 0o700, got {oct(mode)}") + + def test_preserves_unrelated_existing_fields(self): + """If a previous tool's setup wrote sibling fields into the same + config (e.g. claude-code), we must not clobber them.""" + config_dir = self.home / ".unbound" + config_dir.mkdir() + existing = config_dir / "config.json" + existing.write_text(json.dumps({ + "api_key": "old-key", + "claude_code_specific": "preserve-me", + })) + + mdm_setup._write_unbound_config_payload( + self.home, api_key="new-key", + gateway_url="https://g.test", backend_url="https://b.test", + ) + + with open(existing, "r", encoding="utf-8") as f: + cfg = json.load(f) + self.assertEqual(cfg["api_key"], "new-key") + self.assertEqual(cfg["claude_code_specific"], "preserve-me") + + +class TestInstallForUserPayload(unittest.TestCase): + """The full per-user install body. Drives it directly (without the fork) + against a tmpdir home and asserts the credentials file, scripts, and + hooks.json all land at the agy-verified paths.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.home = Path(self.tmp) + # Read script templates the same way run_install does. + self.templates = {} + for filename in ["_common.py"] + [ + name.replace("unbound_", "", 1) + for _e, name in mdm_setup.HOOK_EVENT_SCRIPTS + ]: + data = mdm_setup._read_script_template(filename) + assert data is not None, f"failed to read template {filename}" + self.templates[filename] = data + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_install_writes_config_scripts_and_hooks_json(self): + ok = mdm_setup.install_for_user_payload( + self.home, + gateway_url="https://gw.example.test", + backend_url="https://be.example.test", + api_key="install-key", + script_templates=self.templates, + ) + self.assertTrue(ok) + + # 1. Credentials file landed (CRITICAL #1). + config_file = self.home / ".unbound" / "config.json" + self.assertTrue(config_file.exists()) + with open(config_file, "r", encoding="utf-8") as f: + cfg = json.load(f) + self.assertEqual(cfg["api_key"], "install-key") + + # 2. Both hook scripts + _common.py exist under our namespace. + hooks_dir = self.home / ".unbound" / "antigravity-hooks" + for _event, installed_name in mdm_setup.HOOK_EVENT_SCRIPTS: + self.assertTrue((hooks_dir / installed_name).exists()) + self.assertTrue((hooks_dir / "_common.py").exists()) + + # 3. hooks.json landed at ~/.gemini/config/ (the path agy auto-loads) + # and lists only the two events agy actually fires. + hooks_json_path = self.home / ".gemini" / "config" / "hooks.json" + self.assertTrue(hooks_json_path.exists()) + with open(hooks_json_path, "r", encoding="utf-8") as f: + settings = json.load(f) + self.assertEqual(set(settings["hooks"].keys()), {"PreToolUse", "PostToolUse"}) + + def test_does_not_write_unsupported_events(self): + """Regression: agy 1.0.5 silently drops UserPromptSubmit/SessionStart + and logs-but-never-runs PreInvocation/PostInvocation/Stop. Don't + install hooks we know won't fire.""" + ok = mdm_setup.install_for_user_payload( + self.home, + gateway_url="https://gw.test", + backend_url="https://be.test", + api_key="k", + script_templates=self.templates, + ) + self.assertTrue(ok) + + hooks_json_path = self.home / ".gemini" / "config" / "hooks.json" + with open(hooks_json_path, "r", encoding="utf-8") as f: + settings = json.load(f) + for unsupported in ( + "UserPromptSubmit", "SessionStart", + "PreInvocation", "PostInvocation", "Stop", + ): + self.assertNotIn(unsupported, settings["hooks"]) + + def test_matchers_are_catch_all(self): + """CRITICAL #3 regression: PreToolUse and PostToolUse matchers must + be ``""`` (verified empirically as catch-all), not a tool-name + allowlist. Any tool not in the list would silently bypass the gate.""" + ok = mdm_setup.install_for_user_payload( + self.home, + gateway_url="https://gw.test", + backend_url="https://be.test", + api_key="k", + script_templates=self.templates, + ) + self.assertTrue(ok) + + hooks_json_path = self.home / ".gemini" / "config" / "hooks.json" + with open(hooks_json_path, "r", encoding="utf-8") as f: + settings = json.load(f) + + for event in ("PreToolUse", "PostToolUse"): + ours = None + for item in settings["hooks"][event]: + for h in item.get("hooks", []): + if "unbound_" in h.get("command", ""): + ours = item + break + if ours: + break + self.assertIsNotNone(ours, f"no Unbound entry in {event}") + self.assertEqual(ours.get("matcher"), "", f"{event} matcher not catch-all") + # Hard regression assertion: no allowlist alternation. + self.assertNotIn("|", ours.get("matcher", "")) + + def test_config_written_before_hooks_json(self): + """Ordering invariant: if hooks.json write fails we MUST still + have written the credentials file. (If the order were reversed, a + crash between steps could leave hooks.json pointing at scripts + that have no creds to read.)""" + original = mdm_setup._atomic_write_json + seen_config_first = {"value": False} + + def boom(*a, **kw): + # By the time we try to write hooks.json, the config file must + # already exist. + cfg = self.home / ".unbound" / "config.json" + seen_config_first["value"] = cfg.exists() + raise OSError("simulated hooks.json failure") + + try: + mdm_setup._atomic_write_json = boom + ok = mdm_setup.install_for_user_payload( + self.home, + gateway_url="https://gw.test", + backend_url="https://be.test", + api_key="k", + script_templates=self.templates, + ) + finally: + mdm_setup._atomic_write_json = original + + # The install should have failed (hooks.json write raised), but the + # config file should have landed first. + self.assertFalse(ok) + self.assertTrue( + seen_config_first["value"], + "config.json was not written before hooks.json — ordering bug", + ) + + +class TestUnknownToolHitsHook(unittest.TestCase): + """CRITICAL #3: an unfamiliar tool_name like ``browser_drag`` must still + match the entry we install. With the old allowlist + ``Bash|bash|Write|Edit|Read|Glob|Grep|Task`` it would not. With ``""`` + it does (verified empirically — empty matcher = catch-all).""" + + def test_unknown_tool_matches_catch_all(self): + entry = mdm_setup._build_event_entry( + "PreToolUse", Path("/tmp/unbound_pre_tool_use.py"), + ) + # The catch-all matcher means: anything matches. + self.assertEqual(entry.get("matcher"), "") + + +class TestNotifySetupCompleteNoCurl(unittest.TestCase): + """CRITICAL #2: ``notify_setup_complete`` MUST NOT shell out to ``curl``. + ``X-API-KEY: `` on argv is visible to every other user on the + device via ``ps auxe``. We assert the move to urllib by ensuring the + curl-shim on PATH never gets invoked.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.home = Path(self.tmp) + bin_dir = self.home / "bin" + bin_dir.mkdir() + self.curl_log = self.home / "curl.log" + fake = bin_dir / "curl" + fake.write_text( + "#!/usr/bin/env python3\n" + "import sys\n" + f"with open({repr(str(self.curl_log))}, 'a') as f:\n" + " f.write(' '.join(sys.argv) + '\\n')\n" + "sys.exit(0)\n" + ) + os.chmod(fake, fake.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + self._old_path = os.environ.get("PATH", "") + os.environ["PATH"] = f"{bin_dir}{os.pathsep}{self._old_path}" + + def tearDown(self): + os.environ["PATH"] = self._old_path + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_notify_setup_complete_does_not_invoke_curl(self): + # Closed port → urllib raises URLError, which notify_setup_complete + # swallows (fail-soft). The relevant assertion is that no curl process + # ever ran with our secret on argv. + mdm_setup.notify_setup_complete( + api_key="super-secret-mdm-key", + backend_url="http://127.0.0.1:1", + device_id="DEVICE-1", + ) + self.assertFalse( + self.curl_log.exists(), + "curl was invoked from notify_setup_complete — API key leaked via argv", + ) + + def test_fetch_api_key_does_not_invoke_curl(self): + # Same shim, but exercise the MDM-key fetch path. URL points at a + # closed port so urllib raises URLError; fetch returns None. Again, + # the assertion is that no curl process ran. + result = mdm_setup.fetch_api_key_from_mdm( + base_url="http://127.0.0.1:1", + app_name=None, + auth_api_key="super-secret-admin-key", + device_id="DEVICE-1", + ) + self.assertIsNone(result) + self.assertFalse( + self.curl_log.exists(), + "curl was invoked from fetch_api_key_from_mdm — bearer token leaked via argv", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/antigravity/hooks/scripts/_common.py b/antigravity/hooks/scripts/_common.py new file mode 100644 index 00000000..5ae02b29 --- /dev/null +++ b/antigravity/hooks/scripts/_common.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +"""Shared helpers for Antigravity hook scripts. + +Both installed hook scripts (`unbound_pre_tool_use.py`, +`unbound_post_tool_use.py`) are deployed side-by-side into +``~/.unbound/antigravity-hooks/`` by ``setup.py`` and import this file +via a ``sys.path`` insert at the top of each script. + +The Antigravity (agy 1.0.5) wire format, verified empirically (see +``AGY-EMPIRICAL-FINDINGS.md``): + +- Stdin: camelCase + ``{"artifactDirectoryPath","conversationId","stepIdx", + "toolCall":{"name","args":{}}, + "transcriptPath","workspacePaths":[...], + "error":""}`` (``error`` and ``toolCall: null`` are PostToolUse-only) +- Env: ``ANTIGRAVITY_CONVERSATION_ID`` mirrors ``conversationId``. +- Stdout (only when overriding the default allow): bare native-proto + ``{"decision": "allow"|"deny"|"ask", "reason": ""}``. + No ``hookSpecificOutput`` wrapper — chop's shape is wrong for agy. +- Tool names are agy-native and lowercase (``run_command``, ``view_file``, + ``edit_file``, ``write_to_file``, ``codebase_search``, ``ask_permission``). + No ``bash`` / ``Bash`` duality. +- Fail-open on any infra error (timeout, non-2xx, JSON parse): print + nothing, exit 0. Never block the agent on our infra. +""" + +import http.client +import json +import os +import socket +import sys +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any, Dict, Optional + + +UNBOUND_APP_LABEL = "antigravity" +GATEWAY_HOOK_PATH = "/hooks/antigravity" +DEFAULT_GATEWAY_URL = "https://api.getunbound.ai" +GATEWAY_TIMEOUT_SECONDS = 3 + +UNBOUND_CONFIG_PATH = Path.home() / ".unbound" / "config.json" + + +def read_stdin_event() -> Optional[Dict[str, Any]]: + """Read the agy hook payload from stdin. Returns None on any error.""" + try: + raw = sys.stdin.read() + except Exception: + return None + if not raw: + return None + try: + event = json.loads(raw) + except (ValueError, TypeError): + return None + return event if isinstance(event, dict) else None + + +def load_credentials() -> Dict[str, str]: + """Resolve api_key and gateway_url. Env vars override ``~/.unbound/config.json``. + + Returns a dict with possibly-empty string values; never raises. + """ + api_key = os.environ.get("UNBOUND_API_KEY", "") or "" + gateway_url = os.environ.get("UNBOUND_GATEWAY_URL", "") or "" + + if not api_key or not gateway_url: + try: + if UNBOUND_CONFIG_PATH.exists(): + with open(UNBOUND_CONFIG_PATH, "r", encoding="utf-8") as f: + config = json.loads(f.read()) + if isinstance(config, dict): + if not api_key: + api_key = (config.get("api_key") or "").strip() + if not gateway_url: + gateway_url = (config.get("gateway_url") or "").strip() + except (OSError, ValueError): + pass + + if not gateway_url: + gateway_url = DEFAULT_GATEWAY_URL + + return {"api_key": api_key, "gateway_url": gateway_url.rstrip("/")} + + +def _coerce_str(value: Any) -> str: + """Stringify a value for the gateway's ``command`` field; never raise.""" + if value is None: + return "" + if isinstance(value, str): + return value + try: + return json.dumps(value, ensure_ascii=False) + except (TypeError, ValueError): + return str(value) + + +def _extract_command_and_metadata(tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]: + """Map agy's per-tool PascalCase args onto the gateway's ``command`` + + ``metadata`` shape. Unknown tools fall through to a JSON-stringified + args blob so we never crash on a tool we haven't taught the map yet.""" + if not isinstance(args, dict): + args = {} + + if tool_name == "run_command": + command = _coerce_str(args.get("CommandLine")) + metadata: Dict[str, Any] = {} + if args.get("Cwd"): + metadata["cwd"] = args["Cwd"] + return {"command": command, "metadata": metadata} + + if tool_name == "view_file": + path = _coerce_str(args.get("AbsolutePath")) + return {"command": path, "metadata": {"file_path": path}} + + if tool_name == "edit_file": + target = _coerce_str(args.get("TargetFile")) + metadata = {"file_path": target} + if args.get("CodeMarkdownLanguage"): + metadata["code_markdown_language"] = args["CodeMarkdownLanguage"] + return { + "command": _coerce_str(args.get("Instruction")), + "metadata": metadata, + } + + if tool_name == "write_to_file": + target = _coerce_str(args.get("TargetFile")) + return {"command": "", "metadata": {"file_path": target}} + + if tool_name == "codebase_search": + query = _coerce_str(args.get("Query")) + metadata = {} + if args.get("TargetDirectories") is not None: + metadata["target_directories"] = args["TargetDirectories"] + return {"command": query, "metadata": metadata} + + if tool_name == "ask_permission": + action = _coerce_str(args.get("Action")) + target = _coerce_str(args.get("Target")) + reason = _coerce_str(args.get("Reason")) + return { + "command": f"{action}: {target}".strip(": "), + "metadata": {"action": action, "target": target, "reason": reason}, + } + + # Fallback: stringify args opaquely so unknown tools don't crash. + return {"command": _coerce_str(args), "metadata": {"args": args}} + + +def build_request_body(event: Dict[str, Any], event_name: str) -> Dict[str, Any]: + """Shape the gateway request body to match ``PretoolRequestBody`` in + ``ai-gateway/src/handlers/preToolUseHandler.ts:86-100``. + + ``event_name`` is the script's identity (PreToolUse vs PostToolUse) — + agy's stdin payload has no ``hook_event_name`` field, so each script + knows its own event from its filename. + """ + tool_call = event.get("toolCall") if isinstance(event, dict) else None + if isinstance(tool_call, dict): + tool_name = tool_call.get("name") or "" + tool_args = tool_call.get("args") or {} + else: + tool_name = "" + tool_args = {} + + mapped = _extract_command_and_metadata(tool_name, tool_args if isinstance(tool_args, dict) else {}) + command = mapped["command"] + metadata: Dict[str, Any] = mapped["metadata"] or {} + + # Always tag the metadata with the hook event and a workspace hint if we + # have one — these are the two breadcrumbs the gateway uses for policy + # context that aren't already in the per-tool field map. + metadata["hook_event_name"] = event_name + workspaces = event.get("workspacePaths") if isinstance(event, dict) else None + if isinstance(workspaces, list) and workspaces: + first = workspaces[0] + if isinstance(first, str) and first: + metadata.setdefault("cwd", first) + metadata["workspace"] = first + if isinstance(event, dict) and event.get("stepIdx") is not None: + metadata["step_idx"] = event["stepIdx"] + if isinstance(event, dict) and event.get("error"): + metadata["error"] = event["error"] + + conversation_id = "" + if isinstance(event, dict): + conversation_id = event.get("conversationId") or "" + if not conversation_id: + conversation_id = os.environ.get("ANTIGRAVITY_CONVERSATION_ID", "") or "" + + # ``event_name: 'tool_use'`` matches claude-code/hooks/unbound.py — the + # gateway treats tool events under a single key regardless of which + # pre/post phase fired, the actual phase is in metadata.hook_event_name. + return { + "conversation_id": conversation_id, + "event_name": "tool_use", + "unbound_app_label": UNBOUND_APP_LABEL, + "model": "auto", + "pre_tool_use_data": { + "tool_name": tool_name, + "command": command, + "metadata": metadata, + }, + "messages": [], + "user_prompts": [], + } + + +class _NoRedirectHandler(urllib.request.HTTPRedirectHandler): + """Block redirects: returning None from ``redirect_request`` makes urllib + surface 3xx as an HTTPError instead of silently following the Location.""" + + def redirect_request(self, req, fp, code, msg, headers, newurl): # noqa: D401 + return None + + +_NO_REDIRECT_OPENER = urllib.request.build_opener(_NoRedirectHandler()) + + +def post_to_gateway( + body: Dict[str, Any], + api_key: str, + gateway_url: str, + timeout: int = GATEWAY_TIMEOUT_SECONDS, +) -> Optional[Dict[str, Any]]: + """POST to ${gateway_url}/hooks/antigravity. Returns parsed JSON dict on + HTTP 2xx with a JSON body, otherwise None. Fail-open by contract: any + exception, timeout, non-2xx, or non-JSON body returns None. + + Uses urllib (not curl) so the Authorization header never appears on argv + — curl's argv is world-readable via ``ps auxe`` / ``/proc//cmdline`` + and would leak the bearer token to any local user. Redirects are blocked + via a custom opener: the gateway is a single hop, so following 3xx would + mask misconfig (unintended HTTP→HTTPS, proxy rewrite) instead of + surfacing it as a hard failure we can debug. + """ + if not api_key or not gateway_url: + return None + url = f"{gateway_url}{GATEWAY_HOOK_PATH}" + try: + req = urllib.request.Request( + url, + data=json.dumps(body).encode("utf-8"), + method="POST", + headers={ + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + }, + ) + with _NO_REDIRECT_OPENER.open(req, timeout=timeout) as resp: + if not (200 <= resp.status < 300): + return None + raw = resp.read() + except (urllib.error.URLError, http.client.HTTPException, OSError, + socket.timeout, ValueError, UnicodeDecodeError): + return None + + if not raw: + return None + try: + parsed = json.loads(raw.decode("utf-8")) + except (ValueError, UnicodeDecodeError): + return None + return parsed if isinstance(parsed, dict) else None + + +def emit_hook_output(decision: str, reason: str = "") -> None: + """Write the agy stdout payload. Bare native-proto shape: + ``{"decision": "...", "reason": "..."}``. No wrapper. Only call this + when overriding the default allow — empty stdout + exit 0 is the + canonical allow.""" + decision = (decision or "").lower() + if decision not in ("allow", "deny", "ask"): + return + payload: Dict[str, Any] = {"decision": decision} + if reason: + payload["reason"] = reason + sys.stdout.write(json.dumps(payload)) + + +def fire_and_forget_telemetry(event: Dict[str, Any], event_name: str) -> None: + """Post-tool-use telemetry. Best-effort, fail-open, exits 0 silently. + + For PostToolUse with ``toolCall: null`` (non-tool turns — agy fires this + on every step), the caller should skip this entirely; we don't have + enough context to make a useful telemetry record. + """ + creds = load_credentials() + if not creds["api_key"]: + return + body = build_request_body(event, event_name) + # Telemetry endpoints don't gate the agent — we don't even need the response. + post_to_gateway(body, creds["api_key"], creds["gateway_url"]) diff --git a/antigravity/hooks/scripts/post_tool_use.py b/antigravity/hooks/scripts/post_tool_use.py new file mode 100644 index 00000000..e9e91483 --- /dev/null +++ b/antigravity/hooks/scripts/post_tool_use.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +"""Antigravity PostToolUse telemetry hook. Fire-and-forget; never blocks. + +agy fires PostToolUse on every step including non-tool turns, where +``toolCall`` is ``null``. We skip the gateway POST in that case — no tool +identity means no policy-relevant telemetry to record. +""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from _common import fire_and_forget_telemetry, read_stdin_event # noqa: E402 + +try: + event = read_stdin_event() + if event is not None and isinstance(event.get("toolCall"), dict): + fire_and_forget_telemetry(event, "PostToolUse") +except Exception: + pass +sys.exit(0) diff --git a/antigravity/hooks/scripts/pre_tool_use.py b/antigravity/hooks/scripts/pre_tool_use.py new file mode 100644 index 00000000..bc11a2d6 --- /dev/null +++ b/antigravity/hooks/scripts/pre_tool_use.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Antigravity PreToolUse hook. + +Reads agy's camelCase stdin payload, POSTs to ``${gateway}/hooks/antigravity``, +and emits the bare native-proto ``{"decision","reason"}`` on stdout when the +gateway returns a non-default decision. Fail-open: any infra error means +silent allow. +""" + +import os +import sys + +# When installed to ~/.unbound/antigravity-hooks/unbound_pre_tool_use.py, +# _common.py sits beside it; make sure we can import either way. +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from _common import ( # noqa: E402 + build_request_body, + emit_hook_output, + load_credentials, + post_to_gateway, + read_stdin_event, +) + + +EVENT_NAME = "PreToolUse" + + +def main() -> int: + event = read_stdin_event() + if event is None: + return 0 # malformed input → fail-open silent allow + + # Defensive: agy always populates toolCall for PreToolUse, but guard + # anyway so a future schema tweak can't break the agent. + if not isinstance(event.get("toolCall"), dict): + return 0 + + creds = load_credentials() + if not creds["api_key"]: + return 0 # not configured → fail-open + + body = build_request_body(event, EVENT_NAME) + api_response = post_to_gateway(body, creds["api_key"], creds["gateway_url"]) + if not api_response: + return 0 # gateway unreachable / non-2xx → fail-open + + decision = (api_response.get("decision") or "allow").lower() + if decision == "allow": + return 0 # silent allow + + reason = api_response.get("reason") or "" + emit_hook_output(decision, reason=reason) + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except Exception: + # Iron law: never block the agent on our infra. Any unhandled error + # at this layer is a bug in our hook script, not a user-visible event. + sys.exit(0) diff --git a/antigravity/hooks/scripts/test_hooks.py b/antigravity/hooks/scripts/test_hooks.py new file mode 100644 index 00000000..c1ff1ae9 --- /dev/null +++ b/antigravity/hooks/scripts/test_hooks.py @@ -0,0 +1,664 @@ +"""Integration tests for the Antigravity hook scripts. + +Run with: + + cd antigravity/hooks/scripts && python3 -m unittest test_hooks.py -v + +Tests drive ``pre_tool_use.py`` and ``post_tool_use.py`` end-to-end by +spawning a subprocess, piping the agy-actual camelCase stdin payload in, +and asserting on stdout / exit code. The gateway POST is intercepted by +a local HTTP server bound on 127.0.0.1: that records each request +the hook makes — no real network calls. + +Stdin shapes come from AGY-EMPIRICAL-FINDINGS.md (verified against agy 1.0.5). +""" + +import json +import os +import shutil +import socket +import stat +import subprocess +import sys +import tempfile +import threading +import unittest +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path + + +SCRIPT_DIR = Path(__file__).resolve().parent + + +# --- agy-verified stdin payloads (camelCase, PascalCase arg keys) ------------- + +GOLDEN_PRE_TOOL_USE_RUN_COMMAND = { + "artifactDirectoryPath": "/Users/me/.gemini/antigravity-cli/brain/conv-123", + "conversationId": "conv-123", + "stepIdx": 1, + "toolCall": { + "name": "run_command", + "args": { + "CommandLine": "git status", + "Cwd": "/tmp", + "Blocking": True, + "WaitMsBeforeAsync": 1000, + }, + }, + "transcriptPath": "/Users/me/.gemini/antigravity-cli/brain/conv-123/.system_generated/logs/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_PRE_TOOL_USE_VIEW_FILE = { + "artifactDirectoryPath": "/Users/me/.gemini/antigravity-cli/brain/conv-123", + "conversationId": "conv-123", + "stepIdx": 2, + "toolCall": { + "name": "view_file", + "args": {"AbsolutePath": "/etc/passwd"}, + }, + "transcriptPath": "/Users/me/.gemini/antigravity-cli/brain/conv-123/.system_generated/logs/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_PRE_TOOL_USE_EDIT_FILE = { + "conversationId": "conv-123", + "stepIdx": 3, + "toolCall": { + "name": "edit_file", + "args": { + "TargetFile": "/tmp/foo.py", + "Instruction": "Refactor to remove the global", + "CodeMarkdownLanguage": "python", + "Blocking": True, + "CodeEdit": "x = 1", + }, + }, + "transcriptPath": "/tmp/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_PRE_TOOL_USE_WRITE_TO_FILE = { + "conversationId": "conv-123", + "stepIdx": 4, + "toolCall": { + "name": "write_to_file", + "args": {"TargetFile": "/tmp/bar.py", "CodeContent": "print('hi')"}, + }, + "transcriptPath": "/tmp/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_PRE_TOOL_USE_CODEBASE_SEARCH = { + "conversationId": "conv-123", + "stepIdx": 5, + "toolCall": { + "name": "codebase_search", + "args": {"Query": "password handling", "TargetDirectories": ["/tmp"]}, + }, + "transcriptPath": "/tmp/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_PRE_TOOL_USE_ASK_PERMISSION = { + "conversationId": "conv-123", + "stepIdx": 6, + "toolCall": { + "name": "ask_permission", + "args": { + "Action": "execute", + "Target": "rm -rf /tmp/sensitive", + "Reason": "Cleanup before reinstall", + }, + }, + "transcriptPath": "/tmp/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_PRE_TOOL_USE_UNKNOWN = { + "conversationId": "conv-123", + "stepIdx": 7, + "toolCall": { + "name": "browser_drag", + "args": {"Selector": "#draggable", "X": 100, "Y": 200}, + }, + "transcriptPath": "/tmp/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_POST_TOOL_USE_RUN_COMMAND = { + "artifactDirectoryPath": "/Users/me/.gemini/antigravity-cli/brain/conv-123", + "conversationId": "conv-123", + "stepIdx": 1, + "toolCall": { + "name": "run_command", + "args": {"CommandLine": "git status", "Cwd": "/tmp"}, + }, + "error": "", + "transcriptPath": "/tmp/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + +GOLDEN_POST_TOOL_USE_NULL_TOOL = { + # agy fires PostToolUse on every step including non-tool turns — toolCall + # is null when the model didn't invoke a tool that step. + "artifactDirectoryPath": "/Users/me/.gemini/antigravity-cli/brain/conv-123", + "conversationId": "conv-123", + "stepIdx": 1, + "toolCall": None, + "error": "", + "transcriptPath": "/tmp/transcript.jsonl", + "workspacePaths": ["/tmp"], +} + + +class _FakeGateway: + """Local HTTP server stand-in for the Unbound gateway. + + Records every incoming request (path/method/headers/body) into + ``requests`` and replies with the configured ``response_body`` + + ``status``. Bind on ``127.0.0.1:0`` so tests can run in parallel. + """ + + def __init__(self, response_body: dict = None, status: int = 200): + self.response_body = response_body if response_body is not None else {} + self.status = status + self.requests = [] # list of dicts: {path, method, headers, body} + self._server = None + self._thread = None + + def __enter__(self): + outer = self + + class _Handler(BaseHTTPRequestHandler): + def _read_and_record(self, method: str) -> None: + length = int(self.headers.get("Content-Length") or 0) + body = self.rfile.read(length) if length > 0 else b"" + outer.requests.append({ + "path": self.path, + "method": method, + "headers": {k: v for k, v in self.headers.items()}, + "body": body.decode("utf-8", errors="replace"), + }) + + def do_POST(self): # noqa: N802 + self._read_and_record("POST") + payload = json.dumps(outer.response_body).encode("utf-8") + self.send_response(outer.status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, format, *args): # noqa: A002 + return # silence default stderr logging + + self._server = HTTPServer(("127.0.0.1", 0), _Handler) + self._thread = threading.Thread(target=self._server.serve_forever, daemon=True) + self._thread.start() + return self + + def __exit__(self, exc_type, exc, tb): + try: + self._server.shutdown() + self._server.server_close() + except Exception: + pass + + @property + def url(self) -> str: + host, port = self._server.server_address + return f"http://{host}:{port}" + + +def _closed_port_url() -> str: + """Bind a socket to 127.0.0.1:0, read the port, close the socket. + The returned URL points at a port nothing is listening on — connections + will fail fast with ECONNREFUSED.""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return f"http://127.0.0.1:{port}" + + +def _run_hook_script( + script_name: str, + stdin_payload: dict, + gateway_url: str, + api_key: str = "test-api-key", + home: Path = None, + extra_path_dir: Path = None, +): + """Invoke ``scripts/`` as a child Python process with a + sandboxed HOME. The gateway URL is injected via env so the hook posts + to our local test server. Returns the completed subprocess.""" + tmp = home if home else Path(tempfile.mkdtemp()) + cfg_dir = tmp / ".unbound" + cfg_dir.mkdir(parents=True, exist_ok=True) + (cfg_dir / "config.json").write_text( + json.dumps({"api_key": api_key, "gateway_url": gateway_url}) + ) + + env = os.environ.copy() + env["HOME"] = str(tmp) + env["USERPROFILE"] = str(tmp) + if extra_path_dir is not None: + env["PATH"] = f"{extra_path_dir}{os.pathsep}{env.get('PATH', '')}" + # Don't let real env vars override the config-file values during tests. + env.pop("UNBOUND_API_KEY", None) + env.pop("UNBOUND_GATEWAY_URL", None) + env.pop("ANTIGRAVITY_CONVERSATION_ID", None) + + proc = subprocess.run( + [sys.executable, str(SCRIPT_DIR / script_name)], + input=json.dumps(stdin_payload).encode("utf-8"), + capture_output=True, + env=env, + timeout=10, + ) + return proc + + +class TestPreToolUseDecisions(unittest.TestCase): + """The only hook that emits a non-empty stdout: pre_tool_use.py.""" + + def test_allow_emits_silent_stdout(self): + """Gateway returns ``allow`` → we print NOTHING and exit 0.""" + with _FakeGateway(response_body={"decision": "allow"}) as gw: + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND, + gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + + def test_deny_emits_bare_native_proto(self): + """Gateway returns ``deny`` → we emit bare ``{decision, reason}`` — + NO hookSpecificOutput wrapper (that was chop's shape; agy uses the + native proto shape verbatim, verified empirically).""" + with _FakeGateway(response_body={ + "decision": "deny", + "reason": "Blocked by org policy.", + }) as gw: + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND, + gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + out = json.loads(proc.stdout.decode()) + # Native-proto shape: bare keys, no wrapper. + self.assertEqual(out, {"decision": "deny", "reason": "Blocked by org policy."}) + self.assertNotIn("hookSpecificOutput", out) + + def test_ask_emits_bare_native_proto(self): + with _FakeGateway(response_body={"decision": "ask"}) as gw: + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND, + gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + out = json.loads(proc.stdout.decode()) + self.assertEqual(out["decision"], "ask") + self.assertNotIn("hookSpecificOutput", out) + + def test_non_run_command_tool_still_calls_gateway(self): + """Non-run_command tools (view_file, edit_file, etc.) are checked too + — gateway decides whether they're policy-relevant, not the hook script.""" + with _FakeGateway(response_body={"decision": "allow"}) as gw: + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_VIEW_FILE, + gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(len(gw.requests), 1) + + +class TestPreToolUseFailOpen(unittest.TestCase): + """Iron law: never block the agent on our infra. Any infra failure + (HTTP non-2xx, malformed JSON, unreachable gateway) must result in + silent exit 0 (== allow).""" + + def test_gateway_5xx_is_silent_allow(self): + with _FakeGateway(response_body={}, status=500) as gw: + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND, + gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + + def test_gateway_unreachable_is_silent_allow(self): + """Connection refused → fail open, exit 0, no stdout.""" + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND, + gateway_url=_closed_port_url(), + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + + def test_gateway_returns_garbage_is_silent_allow(self): + """Even with HTTP 200, non-JSON body → fail open.""" + + class _GarbageHandler(BaseHTTPRequestHandler): + def do_POST(self): # noqa: N802 + length = int(self.headers.get("Content-Length") or 0) + self.rfile.read(length) + payload = b"not json at all" + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, format, *args): # noqa: A002 + return + + server = HTTPServer(("127.0.0.1", 0), _GarbageHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + try: + host, port = server.server_address + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND, + gateway_url=f"http://{host}:{port}", + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + finally: + server.shutdown() + server.server_close() + + def test_no_api_key_is_silent_allow(self): + """If ~/.unbound/config.json is absent and no env override is set, + we silently allow — never crash, never block.""" + tmp = Path(tempfile.mkdtemp()) + env = os.environ.copy() + env["HOME"] = str(tmp) + env["USERPROFILE"] = str(tmp) + env.pop("UNBOUND_API_KEY", None) + env.pop("UNBOUND_GATEWAY_URL", None) + try: + proc = subprocess.run( + [sys.executable, str(SCRIPT_DIR / "pre_tool_use.py")], + input=json.dumps(GOLDEN_PRE_TOOL_USE_RUN_COMMAND).encode(), + capture_output=True, env=env, timeout=10, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + finally: + shutil.rmtree(tmp, ignore_errors=True) + + def test_malformed_stdin_is_silent_allow(self): + """Garbage on stdin → exit 0 silently. Never raise.""" + tmp = Path(tempfile.mkdtemp()) + env = os.environ.copy() + env["HOME"] = str(tmp) + env["USERPROFILE"] = str(tmp) + try: + proc = subprocess.run( + [sys.executable, str(SCRIPT_DIR / "pre_tool_use.py")], + input=b"not json at all", + capture_output=True, env=env, timeout=10, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + finally: + shutil.rmtree(tmp, ignore_errors=True) + + def test_null_tool_call_is_silent_allow(self): + """Defensive: a PreToolUse event with toolCall: null (shouldn't + happen in practice, but agy proto allows it) → fail-open silently.""" + payload = dict(GOLDEN_PRE_TOOL_USE_RUN_COMMAND) + payload["toolCall"] = None + with _FakeGateway(response_body={"decision": "allow"}) as gw: + proc = _run_hook_script( + "pre_tool_use.py", payload, gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + # And we should NOT have posted to the gateway — no tool identity to gate on. + self.assertEqual(len(gw.requests), 0) + + +class TestPostToolUseTelemetry(unittest.TestCase): + """post_tool_use.py: telemetry only. Exit 0, no stdout, regardless of + gateway response. Skips the POST when toolCall is null.""" + + def test_post_tool_use_with_tool_call_posts(self): + with _FakeGateway(response_body={ + "decision": "deny", "reason": "should be ignored", + }) as gw: + proc = _run_hook_script( + "post_tool_use.py", GOLDEN_POST_TOOL_USE_RUN_COMMAND, + gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + self.assertEqual(len(gw.requests), 1, "post_tool_use must POST telemetry") + + def test_post_tool_use_with_null_tool_call_skips_post(self): + """agy fires PostToolUse on every step including non-tool turns + (toolCall: null). No tool identity = no useful telemetry; skip the + POST entirely so we don't flood the gateway with no-op records.""" + with _FakeGateway(response_body={"decision": "allow"}) as gw: + proc = _run_hook_script( + "post_tool_use.py", GOLDEN_POST_TOOL_USE_NULL_TOOL, + gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(proc.stdout, b"") + self.assertEqual( + len(gw.requests), 0, + "post_tool_use with toolCall: null must not POST", + ) + + +class TestRequestBodyPerTool(unittest.TestCase): + """Verify the POSTed body uses the gateway's snake_case shape, with the + right ``command`` and ``metadata`` extracted per agy tool name.""" + + def _post_and_get_body(self, payload): + with _FakeGateway(response_body={"decision": "allow"}) as gw: + proc = _run_hook_script( + "pre_tool_use.py", payload, gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(len(gw.requests), 1) + return json.loads(gw.requests[0]["body"]) + + def test_request_body_envelope_uses_gateway_field_names(self): + """Outgoing POST body keeps the gateway's snake_case field names — + conversation_id, event_name, unbound_app_label, pre_tool_use_data. + Tied to ai-gateway/src/handlers/preToolUseHandler.ts.""" + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_RUN_COMMAND) + self.assertEqual(body["conversation_id"], "conv-123") + # event_name is 'tool_use' — matches claude-code/hooks/unbound.py:756 + # and the gateway's hook event registry. The agy hook phase + # (PreToolUse vs PostToolUse) goes in metadata.hook_event_name. + self.assertEqual(body["event_name"], "tool_use") + self.assertEqual(body["unbound_app_label"], "antigravity") + self.assertIn("pre_tool_use_data", body) + + def test_run_command_extracts_command_line_and_cwd(self): + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_RUN_COMMAND) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["tool_name"], "run_command") + self.assertEqual(ptud["command"], "git status") + self.assertEqual(ptud["metadata"]["cwd"], "/tmp") + self.assertEqual(ptud["metadata"]["hook_event_name"], "PreToolUse") + + def test_view_file_extracts_absolute_path(self): + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_VIEW_FILE) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["tool_name"], "view_file") + self.assertEqual(ptud["command"], "/etc/passwd") + self.assertEqual(ptud["metadata"]["file_path"], "/etc/passwd") + + def test_edit_file_extracts_instruction_and_target_file(self): + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_EDIT_FILE) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["tool_name"], "edit_file") + self.assertEqual(ptud["command"], "Refactor to remove the global") + self.assertEqual(ptud["metadata"]["file_path"], "/tmp/foo.py") + self.assertEqual(ptud["metadata"]["code_markdown_language"], "python") + + def test_write_to_file_extracts_target_file(self): + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_WRITE_TO_FILE) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["tool_name"], "write_to_file") + self.assertEqual(ptud["metadata"]["file_path"], "/tmp/bar.py") + + def test_codebase_search_extracts_query(self): + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_CODEBASE_SEARCH) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["tool_name"], "codebase_search") + self.assertEqual(ptud["command"], "password handling") + self.assertEqual(ptud["metadata"]["target_directories"], ["/tmp"]) + + def test_ask_permission_extracts_action_and_target(self): + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_ASK_PERMISSION) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["tool_name"], "ask_permission") + self.assertEqual(ptud["command"], "execute: rm -rf /tmp/sensitive") + self.assertEqual(ptud["metadata"]["action"], "execute") + self.assertEqual(ptud["metadata"]["target"], "rm -rf /tmp/sensitive") + self.assertEqual(ptud["metadata"]["reason"], "Cleanup before reinstall") + + def test_unknown_tool_falls_back_to_args_blob(self): + """An unmapped tool name (e.g. browser_drag) must not crash — we + stringify the args opaquely so the gateway still gets *something* + to log.""" + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_UNKNOWN) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["tool_name"], "browser_drag") + # command is the JSON-stringified args. + self.assertIn("Selector", ptud["command"]) + # args blob is preserved verbatim in metadata. + self.assertEqual(ptud["metadata"]["args"], {"Selector": "#draggable", "X": 100, "Y": 200}) + + def test_workspace_path_propagates_to_metadata(self): + body = self._post_and_get_body(GOLDEN_PRE_TOOL_USE_VIEW_FILE) + self.assertEqual(body["pre_tool_use_data"]["metadata"]["workspace"], "/tmp") + + def test_authorization_header_and_path(self): + with _FakeGateway(response_body={"decision": "allow"}) as gw: + proc = _run_hook_script( + "pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND, + gateway_url=gw.url, + api_key="my-secret-key", + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(len(gw.requests), 1) + req = gw.requests[0] + headers_lower = {k.lower(): v for k, v in req["headers"].items()} + self.assertEqual(headers_lower.get("authorization"), "Bearer my-secret-key") + self.assertEqual(headers_lower.get("content-type"), "application/json") + self.assertEqual(req["path"], "/hooks/antigravity") + self.assertEqual(req["method"], "POST") + + def test_conversation_id_env_fallback(self): + """If stdin omits conversationId, the ANTIGRAVITY_CONVERSATION_ID env + var (which agy always sets on the hook process) is the fallback.""" + payload = dict(GOLDEN_PRE_TOOL_USE_RUN_COMMAND) + payload.pop("conversationId", None) + tmp = Path(tempfile.mkdtemp()) + cfg_dir = tmp / ".unbound" + cfg_dir.mkdir(parents=True, exist_ok=True) + try: + with _FakeGateway(response_body={"decision": "allow"}) as gw: + (cfg_dir / "config.json").write_text( + json.dumps({"api_key": "k", "gateway_url": gw.url}) + ) + env = os.environ.copy() + env["HOME"] = str(tmp) + env["USERPROFILE"] = str(tmp) + env["ANTIGRAVITY_CONVERSATION_ID"] = "env-conv-id" + env.pop("UNBOUND_API_KEY", None) + env.pop("UNBOUND_GATEWAY_URL", None) + proc = subprocess.run( + [sys.executable, str(SCRIPT_DIR / "pre_tool_use.py")], + input=json.dumps(payload).encode(), + capture_output=True, env=env, timeout=10, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(len(gw.requests), 1) + body = json.loads(gw.requests[0]["body"]) + self.assertEqual(body["conversation_id"], "env-conv-id") + finally: + shutil.rmtree(tmp, ignore_errors=True) + + +class TestPostToolUseRequestBody(unittest.TestCase): + """PostToolUse telemetry carries the same per-tool extraction as + PreToolUse, plus an ``error`` propagation in metadata when the tool + failed.""" + + def test_post_tool_use_carries_error_in_metadata(self): + payload = dict(GOLDEN_POST_TOOL_USE_RUN_COMMAND) + payload["error"] = "command failed with exit 1" + with _FakeGateway(response_body={}) as gw: + proc = _run_hook_script( + "post_tool_use.py", payload, gateway_url=gw.url, + ) + self.assertEqual(proc.returncode, 0) + self.assertEqual(len(gw.requests), 1) + body = json.loads(gw.requests[0]["body"]) + ptud = body["pre_tool_use_data"] + self.assertEqual(ptud["metadata"]["hook_event_name"], "PostToolUse") + self.assertEqual(ptud["metadata"]["error"], "command failed with exit 1") + + +class TestNoCurlAtRuntime(unittest.TestCase): + """Regression lock-in: ``post_to_gateway`` MUST NOT shell out to ``curl``. + + Passing ``Authorization: Bearer `` on curl's argv leaks the bearer + token to any other user on the device via ``ps auxe`` / + ``/proc//cmdline``. The fix is to use stdlib urllib (headers stay + inside the process). We assert that by putting a fake ``curl`` shim + first on PATH and verifying it never gets invoked end-to-end across + pre_tool_use and post_tool_use. + """ + + def setUp(self): + self.tmp = Path(tempfile.mkdtemp()) + # Fake curl shim that logs every invocation. + self.bin_dir = self.tmp / "bin" + self.bin_dir.mkdir() + self.curl_log = self.tmp / "curl.log" + fake = self.bin_dir / "curl" + fake.write_text( + "#!/usr/bin/env python3\n" + "import sys\n" + f"with open({repr(str(self.curl_log))}, 'a') as f:\n" + " f.write(' '.join(sys.argv) + '\\n')\n" + "sys.exit(0)\n" + ) + os.chmod(fake, fake.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _assert_no_curl(self, script_name: str, payload: dict) -> None: + with _FakeGateway(response_body={"decision": "allow"}) as gw: + proc = _run_hook_script( + script_name, payload, + gateway_url=gw.url, + home=self.tmp, + extra_path_dir=self.bin_dir, + ) + self.assertEqual(proc.returncode, 0) + self.assertFalse( + self.curl_log.exists(), + f"curl was invoked from {script_name} — bearer token leaked via argv", + ) + + def test_pre_tool_use_does_not_invoke_curl(self): + self._assert_no_curl("pre_tool_use.py", GOLDEN_PRE_TOOL_USE_RUN_COMMAND) + + def test_post_tool_use_does_not_invoke_curl(self): + self._assert_no_curl("post_tool_use.py", GOLDEN_POST_TOOL_USE_RUN_COMMAND) + + +if __name__ == "__main__": + unittest.main() diff --git a/antigravity/hooks/setup.py b/antigravity/hooks/setup.py new file mode 100644 index 00000000..8e48cf6a --- /dev/null +++ b/antigravity/hooks/setup.py @@ -0,0 +1,720 @@ +#!/usr/bin/env python3 +"""User-level Unbound hooks installer for Antigravity (agy 1.0.5+). + +Mirrors the flag surface and idioms of ``claude-code/hooks/setup.py``: + + --api-key Skip OAuth callback and use this key directly. + --domain Frontend host that hands off an API key via a + localhost callback (mirrors Claude Code). + --backend-url Backend host for setup-complete notifications. + --gateway-url Unbound gateway base URL (baked into hook scripts). + --clear Surgically remove only our entries from + ~/.gemini/config/hooks.json + delete our scripts. + --backfill No-op for Antigravity (no transcript store yet) — + accepted for CLI compatibility with other tools. + --debug Verbose logging. + +Wire format (verified empirically against agy 1.0.5, see +``AGY-EMPIRICAL-FINDINGS.md``): + Hooks file: ~/.gemini/config/hooks.json + Hooks file keys: hooks.{PreToolUse,PostToolUse} + Stdin payload: camelCase {conversationId,stepIdx,toolCall:{name,args},...} + Stdout payload: bare native-proto {decision,reason} + Tool names: agy-native lowercase (run_command, view_file, edit_file, ...) +""" + +import http.server +import json +import os +import platform +import re +import shutil +import socketserver +import subprocess +import sys +import threading +import urllib.error +import urllib.parse +import urllib.request +import webbrowser +from pathlib import Path +from typing import Dict, List, Optional, Tuple + + +DEFAULT_GATEWAY_URL = "https://api.getunbound.ai" +DEFAULT_BACKEND_URL = "https://backend.getunbound.ai" +UNBOUND_APP_LABEL = "antigravity" + +# agy auto-loads hooks from ~/.gemini/config/hooks.json. Verified empirically; +# the older chop-derived path (~/.antigravity/settings.json) is not read. +GEMINI_CONFIG_DIR = Path.home() / ".gemini" / "config" +HOOKS_JSON_PATH = GEMINI_CONFIG_DIR / "hooks.json" + +# Hook scripts live under our own namespace, not inside agy's config tree — +# keeps Unbound artifacts together with ~/.unbound/config.json so --clear has +# one place to look and so agy upgrades can't surprise-delete our scripts. +UNBOUND_DIR = Path.home() / ".unbound" +HOOKS_INSTALL_DIR = UNBOUND_DIR / "antigravity-hooks" +SENTINEL_PATH = HOOKS_INSTALL_DIR / ".unbound-installed.json" + +# Only PreToolUse and PostToolUse actually wire through to user-supplied +# commands in agy 1.0.5. UserPromptSubmit/SessionStart are silently dropped at +# parse time; PreInvocation/PostInvocation/Stop register and log "executing +# command" but never spawn the process. Don't install hooks we know won't fire. +HOOK_EVENT_SCRIPTS: List[Tuple[str, str]] = [ + ("PreToolUse", "unbound_pre_tool_use.py"), + ("PostToolUse", "unbound_post_tool_use.py"), +] + +# Catch-all matcher for both events. ``""`` (empty string) is verified to fire +# on every tool. Server-side filtering (gateway's APP_NATIVE_FILE_TOOLS / +# tools_to_check) is where we defang; the matcher is the wrong place to +# allowlist — any tool not in the regex would silently bypass our hook. +HOOK_EVENT_MATCHERS: Dict[str, Optional[str]] = { + "PreToolUse": "", + "PostToolUse": "", +} + +HOOK_TIMEOUT_SECONDS = 15 +TELEMETRY_TIMEOUT_SECONDS = 60 + +DEBUG = False + + +def debug_print(message: str) -> None: + if DEBUG: + print(f"[DEBUG] {message}") + + +def normalize_url(value: str) -> str: + value = (value or "").strip() + if not value: + return value + if not (value.startswith("http://") or value.startswith("https://")): + value = f"https://{value}" + return value.rstrip("/") + + +def install_macos_certificates() -> None: + if platform.system().lower() != "darwin": + return + py_version = f"{sys.version_info.major}.{sys.version_info.minor}" + cert_path = f"/Applications/Python {py_version}/Install Certificates.command" + if os.path.exists(cert_path): + subprocess.run([cert_path], capture_output=True) + + +# ----------------------------------------------------------------------------- +# OAuth callback (mirrors claude-code/hooks/setup.py:run_callback_server) +# ----------------------------------------------------------------------------- + +def run_callback_server(frontend_url: str) -> Optional[Dict]: + result: Dict = {"method": None, "path": None, "query": None, "headers": None, "body": None} + done_evt = threading.Event() + + class CallbackHandler(http.server.BaseHTTPRequestHandler): + def _finish(self, code: int = 200, message: bytes = b"Logged in successfully! You can close this tab.") -> None: + try: + self.send_response(code) + self.send_header("Content-Type", "text/plain; charset=utf-8") + self.send_header("Content-Length", str(len(message))) + self.end_headers() + self.wfile.write(message) + except Exception: + pass + + def do_GET(self) -> None: + parsed = urllib.parse.urlparse(self.path) + result["method"] = "GET" + result["path"] = self.path + result["query"] = dict(urllib.parse.parse_qsl(parsed.query)) + result["headers"] = {k: v for k, v in self.headers.items()} + result["body"] = None + query = result["query"] + if "error" in query: + self._finish(code=400, message=f"Setup failed: {query['error'][:200]}\nPlease try again or contact support.".encode()) + else: + self._finish() + done_evt.set() + + def log_message(self, format: str, *args) -> None: + return + + class _CallbackServer(socketserver.TCPServer): + allow_reuse_address = True + + try: + httpd = _CallbackServer(("127.0.0.1", 0), CallbackHandler) + port = httpd.server_address[1] + callback_url = f"http://127.0.0.1:{port}/callback" + + thread = threading.Thread(target=httpd.serve_forever, daemon=True) + thread.start() + + encoded_callback = urllib.parse.quote(callback_url, safe="") + target_url = ( + f"{frontend_url.rstrip('/')}/automations/api-key-callback" + f"?callback_url={encoded_callback}&app_type={UNBOUND_APP_LABEL}" + ) + webbrowser.open(target_url) + print("Opening browser...") + print("If browser doesn't open automatically, open this link:") + print(target_url) + print("Waiting for authentication...") + + try: + if not done_evt.wait(timeout=300): + print("Timed out waiting for authentication (5 minutes). Please re-run setup.") + return None + finally: + try: + httpd.shutdown() + httpd.server_close() + except Exception: + pass + + return result + except Exception as e: + print(f"Failed to run callback server: {e}") + return None + + +# ----------------------------------------------------------------------------- +# Unbound config (~/.unbound/config.json) — shared with unbound-cli + hooks +# ----------------------------------------------------------------------------- + +def write_unbound_config(api_key: str, gateway_url: str) -> bool: + config_dir = Path.home() / ".unbound" + config_file = config_dir / "config.json" + try: + config_dir.mkdir(mode=0o700, parents=True, exist_ok=True) + if platform.system().lower() != "windows": + os.chmod(config_dir, 0o700) + config: Dict = {} + if config_file.exists(): + try: + with open(config_file, "r", encoding="utf-8") as f: + config = json.loads(f.read()) + except (json.JSONDecodeError, OSError): + config = {} + config["api_key"] = api_key + if gateway_url: + config["gateway_url"] = gateway_url + fd = os.open(str(config_file), os.O_WRONLY | os.O_CREAT | os.O_TRUNC | getattr(os, "O_NOFOLLOW", 0), 0o600) + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(json.dumps(config, indent=2)) + return True + except Exception as e: + print(f"Could not write config: {e}") + return False + + +# ----------------------------------------------------------------------------- +# Hook script installation +# ----------------------------------------------------------------------------- + +def _script_source_dir() -> Path: + """The directory holding our packaged hook script templates. + + When ``setup.py`` is run from a checkout of websentry-ai/setup, the + scripts live in ``./scripts/`` next to this file. When the curl-piped + install pulls just ``setup.py``, the user runs with ``--domain`` and the + scripts get fetched from GitHub on demand (see ``download_script``). + """ + return Path(__file__).resolve().parent / "scripts" + + +SCRIPT_BASE_URL = ( + "https://raw.githubusercontent.com/websentry-ai/setup/refs/heads/main/" + "antigravity/hooks/scripts" +) + + +def download_script(filename: str, dest_path: Path) -> bool: + url = f"{SCRIPT_BASE_URL}/{filename}" + try: + dest_path.parent.mkdir(parents=True, exist_ok=True) + debug_print(f"Downloading {url} to {dest_path}") + result = subprocess.run( + ["curl", "-fsSL", "-o", str(dest_path), url], + capture_output=True, + timeout=30, + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError) as e: + print(f"Failed to download {url}: {e}") + return False + + +def _install_one_script(src_filename: str, dest_path: Path) -> bool: + """Copy from local checkout if present, otherwise fetch from GitHub.""" + src = _script_source_dir() / src_filename + dest_path.parent.mkdir(parents=True, exist_ok=True) + if src.exists(): + try: + shutil.copyfile(src, dest_path) + debug_print(f"Installed {dest_path} from local checkout") + return True + except OSError as e: + print(f"Failed to copy {src} → {dest_path}: {e}") + return False + return download_script(src_filename, dest_path) + + +def install_hook_scripts(gateway_url: str) -> bool: + """Install the two hook scripts plus the _common helper into + ``~/.unbound/antigravity-hooks/``. Bake the gateway_url into _common.py so + tenant deployments don't depend on env vars at runtime.""" + HOOKS_INSTALL_DIR.mkdir(parents=True, exist_ok=True) + + # 1. The shared helper. + common_dest = HOOKS_INSTALL_DIR / "_common.py" + if not _install_one_script("_common.py", common_dest): + return False + rewrite_gateway_url_in_file(common_dest, gateway_url) + + # 2. The two event scripts. Each gets a stable name so --clear can find them. + for _event, installed_name in HOOK_EVENT_SCRIPTS: + # Source filename mirrors event name without the unbound_ prefix. + src = installed_name.replace("unbound_", "", 1) + dest = HOOKS_INSTALL_DIR / installed_name + if not _install_one_script(src, dest): + return False + # Make executable on Unix. + if platform.system().lower() != "windows": + try: + current_mode = dest.stat().st_mode + os.chmod(dest, current_mode | 0o111) + except OSError: + pass + + return True + + +def rewrite_gateway_url_in_file(path: Path, gateway_url: str) -> None: + """Replace the default gateway URL inside _common.py at install time so + tenant deployments don't depend on UNBOUND_GATEWAY_URL being set.""" + if not gateway_url or gateway_url == DEFAULT_GATEWAY_URL: + return + try: + text = path.read_text(encoding="utf-8") + new_text = text.replace(f'"{DEFAULT_GATEWAY_URL}"', f'"{gateway_url}"') + if new_text != text: + path.write_text(new_text, encoding="utf-8") + except Exception as e: + debug_print(f"Could not rewrite gateway URL in {path}: {e}") + + +# ----------------------------------------------------------------------------- +# hooks.json merge / unmerge +# ----------------------------------------------------------------------------- + +def _build_hook_command(script_filename: str) -> Tuple[str, bool]: + """Return (command_string, is_windows). On Windows we wrap with the + Python launcher and quote for spaces. Same trick as + claude-code/hooks/setup.py.""" + script_path = HOOKS_INSTALL_DIR / script_filename + is_windows = platform.system().lower() == "windows" + if is_windows: + launcher = "py -3" if shutil.which("py") else "python" + return f'{launcher} "{script_path}"', True + return str(script_path), False + + +def _build_event_entry(event: str, script_filename: str) -> Dict: + """Construct the matcher+hooks block for a single event. + + Both PreToolUse and PostToolUse use the catch-all ``""`` (verified to + fire on every tool). Server-side filtering, not the matcher, is where + we defang. + """ + command, is_windows = _build_hook_command(script_filename) + matcher = HOOK_EVENT_MATCHERS.get(event, "") + + inner: Dict = { + "type": "command", + "command": command, + "timeout": TELEMETRY_TIMEOUT_SECONDS if event != "PreToolUse" else HOOK_TIMEOUT_SECONDS, + } + # PostToolUse is telemetry — let it run async. + if event == "PostToolUse": + inner["async"] = True + if is_windows: + inner["shell"] = "powershell" + + # Place "matcher" first to mirror the layout chop and Claude Code use. + return {"matcher": matcher if matcher is not None else "", "hooks": [inner]} + + +def _is_our_hook_command(command: str, is_windows: bool) -> bool: + """Identify a hook entry we wrote. On Unix that's an exact match against + ``~/.unbound/antigravity-hooks/unbound_*.py``; on Windows we look for the + install dir substring because the command is wrapped in ``py -3 "..."``.""" + if not command: + return False + install_prefix = str(HOOKS_INSTALL_DIR) + if is_windows: + return install_prefix in command and "unbound_" in command + # Exact path match: our installed file lives inside HOOKS_INSTALL_DIR and + # starts with "unbound_". + try: + path = Path(command) + return ( + str(path.parent) == install_prefix + and path.name.startswith("unbound_") + and path.name.endswith(".py") + ) + except (ValueError, OSError): + return False + + +def _atomic_write_json(path: Path, data: Dict) -> None: + """Write JSON atomically: tmp file + rename. Never leaves a partial file.""" + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + f.write("\n") + os.replace(tmp, path) + + +def configure_antigravity_settings() -> bool: + """Non-destructively merge our hook entries into ~/.gemini/config/hooks.json. + + - Creates the file with ``{}`` if absent. + - Preserves every existing hook entry that we did not write. + - Idempotent: re-running install is a no-op if our entries are already present. + """ + try: + if HOOKS_JSON_PATH.exists(): + try: + with open(HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + except json.JSONDecodeError as e: + print(f"Failed to parse existing hooks.json: {e}") + print(" Please check your hooks.json file for syntax errors") + return False + else: + settings = {} + + if not isinstance(settings, dict): + print(f"Existing hooks.json is not a JSON object; refusing to overwrite.") + return False + + if "hooks" not in settings or not isinstance(settings["hooks"], dict): + settings["hooks"] = {} + + is_windows = platform.system().lower() == "windows" + sentinel_entries: List[Dict[str, str]] = [] + + for event, script_filename in HOOK_EVENT_SCRIPTS: + our_entry = _build_event_entry(event, script_filename) + our_command = our_entry["hooks"][0]["command"] + sentinel_entries.append({"event": event, "script": script_filename}) + + existing = settings["hooks"].get(event) + if not isinstance(existing, list): + settings["hooks"][event] = [our_entry] + continue + + # Is there already an entry pointing at our installed script? + already_present = False + for item in existing: + if not isinstance(item, dict): + continue + hooks_list = item.get("hooks", []) + if not isinstance(hooks_list, list): + continue + for h in hooks_list: + if not isinstance(h, dict): + continue + if h.get("command") == our_command: + already_present = True + break + if already_present: + break + + if not already_present: + existing.append(our_entry) + + _atomic_write_json(HOOKS_JSON_PATH, settings) + + sentinel = {"version": 1, "entries": sentinel_entries} + _atomic_write_json(SENTINEL_PATH, sentinel) + + # Best-effort: tighten perms on the sentinel. + if not is_windows: + try: + os.chmod(SENTINEL_PATH, 0o600) + except OSError: + pass + return True + except Exception as e: + print(f"Failed to configure settings: {e}") + return False + + +def remove_hooks_from_settings() -> str: + """Surgically remove our entries from hooks.json. Returns + "cleared" | "not_found" | "failed". + + Pattern mirrors ``AgusRdz/chop:hooks/antigravity_install.go::antigravityUninstallFrom``: + walk each event's list, drop any hook whose command points at our install + dir, drop the wrapping matcher entry if its hooks list ends empty, drop + the event key if no entries remain, drop ``hooks`` if it ends empty. + """ + if not HOOKS_JSON_PATH.exists(): + return "not_found" + + is_windows = platform.system().lower() == "windows" + + try: + with open(HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + except (json.JSONDecodeError, OSError) as e: + print(f"Failed to read hooks.json: {e}") + return "failed" + + if not isinstance(settings, dict) or not isinstance(settings.get("hooks"), dict): + return "not_found" + + modified = False + hooks_block = settings["hooks"] + + for event in list(hooks_block.keys()): + event_config = hooks_block[event] + if not isinstance(event_config, list): + continue + new_event_config: List[Dict] = [] + for item in event_config: + if not isinstance(item, dict): + new_event_config.append(item) + continue + hooks_list = item.get("hooks") + if not isinstance(hooks_list, list): + new_event_config.append(item) + continue + new_hooks = [ + h for h in hooks_list + if not ( + isinstance(h, dict) + and _is_our_hook_command(h.get("command", ""), is_windows) + ) + ] + if len(new_hooks) == len(hooks_list): + new_event_config.append(item) + continue + modified = True + if new_hooks: + item["hooks"] = new_hooks + new_event_config.append(item) + # else: drop this matcher entry entirely. + if new_event_config: + hooks_block[event] = new_event_config + else: + del hooks_block[event] + modified = True + + if not hooks_block: + del settings["hooks"] + modified = True + + if not modified: + return "not_found" + + try: + _atomic_write_json(HOOKS_JSON_PATH, settings) + except OSError as e: + print(f"Failed to write hooks.json: {e}") + return "failed" + return "cleared" + + +# ----------------------------------------------------------------------------- +# Clear path +# ----------------------------------------------------------------------------- + +def _delete_path(path: Path, label: str) -> str: + if not path.exists(): + return "not_found" + try: + path.unlink() + debug_print(f"Removed {path}") + return "cleared" + except Exception as e: + print(f"Failed to clear {label}: {e}") + return "failed" + + +def clear_setup() -> None: + print("=" * 60) + print("Antigravity Hooks - Clearing Setup") + print("=" * 60) + + any_cleared = False + any_failed = False + + # 1. Surgically remove our entries from hooks.json. + settings_status = remove_hooks_from_settings() + if settings_status == "cleared": + any_cleared = True + elif settings_status == "failed": + any_failed = True + + # 2. Delete each ~/.unbound/antigravity-hooks/unbound_*.py. + if HOOKS_INSTALL_DIR.exists(): + for _event, installed_name in HOOK_EVENT_SCRIPTS: + status = _delete_path(HOOKS_INSTALL_DIR / installed_name, installed_name) + if status == "cleared": + any_cleared = True + elif status == "failed": + any_failed = True + # And the shared helper. + status = _delete_path(HOOKS_INSTALL_DIR / "_common.py", "_common.py") + if status == "cleared": + any_cleared = True + elif status == "failed": + any_failed = True + # 3. Drop the sentinel (lives inside HOOKS_INSTALL_DIR). + sentinel_status = _delete_path(SENTINEL_PATH, "install sentinel") + if sentinel_status == "cleared": + any_cleared = True + elif sentinel_status == "failed": + any_failed = True + # Drop the hooks dir if empty. + try: + if not any(HOOKS_INSTALL_DIR.iterdir()): + HOOKS_INSTALL_DIR.rmdir() + debug_print(f"Removed empty {HOOKS_INSTALL_DIR}") + except OSError: + pass + + if any_cleared: + print("Cleared") + elif not any_failed: + print("Nothing to clear (Unbound hooks were not installed for Antigravity)") + print("\n" + "=" * 60) + print("Clear Complete!") + print("=" * 60) + + +# ----------------------------------------------------------------------------- +# Backend notification (best-effort) +# ----------------------------------------------------------------------------- + +def notify_setup_complete(api_key: str, backend_url: str) -> None: + # Use stdlib urllib instead of shelling out to curl: passing the API key + # via curl's argv leaks it through ``ps auxe`` / ``/proc//cmdline`` + # to any other user on the device. urllib sets the header inside the + # process — argv stays secret-free. + try: + url = f"{backend_url.rstrip('/')}/api/v1/setup/complete/" + body = json.dumps({"tool_type": UNBOUND_APP_LABEL}).encode("utf-8") + req = urllib.request.Request( + url, + data=body, + method="POST", + headers={ + "X-API-KEY": api_key, + "Content-Type": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=10): + pass + except urllib.error.HTTPError as e: + debug_print(f"Setup-complete returned HTTP {e.code}") + debug_print("Setup completion notification sent") + except Exception as e: + debug_print(f"Could not notify backend: {e}") + + +# ----------------------------------------------------------------------------- +# Argument parsing + main +# ----------------------------------------------------------------------------- + +def _arg_value(name: str, argv: List[str]) -> Optional[str]: + for i, arg in enumerate(argv): + if arg == name and i + 1 < len(argv): + return argv[i + 1] + return None + + +def main() -> None: + global DEBUG + + argv = sys.argv[1:] + clear_mode = "--clear" in argv + DEBUG = "--debug" in argv + backfill_mode = "--backfill" in argv + + if clear_mode: + clear_setup() + return + + install_macos_certificates() + + print("=" * 60) + print("Antigravity Hooks Setup for Unbound Gateway") + print("=" * 60) + + domain = _arg_value("--domain", argv) + backend_url = normalize_url(_arg_value("--backend-url", argv) or DEFAULT_BACKEND_URL) + gateway_url = normalize_url(_arg_value("--gateway-url", argv) or DEFAULT_GATEWAY_URL) + api_key = _arg_value("--api-key", argv) + + if not api_key: + if not domain: + print("Missing required argument: --domain or --api-key") + sys.exit(1) + cb_response = run_callback_server(normalize_url(domain)) + if cb_response is None: + print("Failed to receive callback. Exiting.") + sys.exit(1) + try: + api_key = (cb_response.get("query") or {}).get("api_key") + except Exception: + api_key = None + if not api_key: + error_msg = (cb_response.get("query") or {}).get("error") + if error_msg: + safe_error = re.sub(r"[\x00-\x1f\x7f]", "", error_msg)[:200] + print(f"Setup failed: {safe_error}") + else: + print("No API key received. Exiting.") + sys.exit(1) + + debug_print("API key resolved") + + write_unbound_config(api_key, gateway_url) + + debug_print("Installing hook scripts...") + if not install_hook_scripts(gateway_url): + print("Failed to install hook scripts") + sys.exit(1) + + debug_print("Configuring Antigravity hooks.json...") + if not configure_antigravity_settings(): + print("Failed to configure Antigravity hooks.json") + sys.exit(1) + + print("API key verified and added") + print("Setup complete") + print("=" * 60) + + notify_setup_complete(api_key, backend_url) + + if backfill_mode: + # Antigravity has no on-disk transcript store equivalent to + # ~/.claude/projects yet — accept the flag for CLI parity but no-op. + debug_print("--backfill: no-op for Antigravity (no transcript store)") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n\nSetup cancelled.") + except Exception as e: + print(f"\nError: {e}") + sys.exit(1) diff --git a/antigravity/hooks/test_setup.py b/antigravity/hooks/test_setup.py new file mode 100644 index 00000000..619b7faf --- /dev/null +++ b/antigravity/hooks/test_setup.py @@ -0,0 +1,385 @@ +"""Integration tests for antigravity/hooks/setup.py. + +Run from this directory (or anywhere) with: + + cd antigravity/hooks && python3 -m unittest test_setup.py -v + +These tests exercise the actual setup entrypoint — they call into +``setup.configure_antigravity_settings``, ``setup.remove_hooks_from_settings``, +and the top-level ``setup.main`` against an isolated ``HOME`` so the real +``~/.gemini/config/hooks.json`` and ``~/.unbound/`` are never touched. +""" + +import json +import os +import shutil +import stat +import sys +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + + +# Make ``setup`` importable when tests are run from this directory. +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + + +def _reload_setup_with_home(home: Path): + """Re-import setup with HOME pointing at the given temp dir so the + module-level path constants (HOOKS_JSON_PATH, HOOKS_INSTALL_DIR, etc.) all + pick up the test home. Returns the freshly imported module.""" + import importlib + if "setup" in sys.modules: + del sys.modules["setup"] + os.environ["HOME"] = str(home) + # Windows-only fallback; harmless on Unix. + os.environ["USERPROFILE"] = str(home) + import setup as _setup # noqa: E402 + importlib.reload(_setup) + return _setup + + +class TestSettingsMerge(unittest.TestCase): + """Verify the non-destructive merge into ~/.gemini/config/hooks.json.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.home = Path(self.tmp) + self._old_home = os.environ.get("HOME") + self._old_userprofile = os.environ.get("USERPROFILE") + self.setup = _reload_setup_with_home(self.home) + + def tearDown(self): + if self._old_home is not None: + os.environ["HOME"] = self._old_home + if self._old_userprofile is not None: + os.environ["USERPROFILE"] = self._old_userprofile + shutil.rmtree(self.tmp, ignore_errors=True) + + def _seed_third_party_settings(self): + """Pre-seed hooks.json with an unrelated third-party hook so we + can verify our merge doesn't clobber it.""" + self.setup.HOOKS_JSON_PATH.parent.mkdir(parents=True, exist_ok=True) + third_party = { + "hooks": { + "PreToolUse": [ + { + "matcher": "run_command", + "hooks": [ + {"type": "command", "command": "/usr/local/bin/some-other-tool"} + ], + } + ], + "SessionEnd": [ + {"hooks": [{"type": "command", "command": "/opt/foo/session_end"}]} + ], + }, + "someUnrelatedSetting": True, + } + with open(self.setup.HOOKS_JSON_PATH, "w", encoding="utf-8") as f: + json.dump(third_party, f) + return third_party + + def test_install_path_is_gemini_config_hooks_json(self): + """Verified empirically: agy auto-loads ~/.gemini/config/hooks.json. + Regression lock-in — never reintroduce the old chop-derived + ~/.antigravity/settings.json path.""" + expected = self.home / ".gemini" / "config" / "hooks.json" + self.assertEqual(self.setup.HOOKS_JSON_PATH, expected) + # And our scripts land under ~/.unbound/antigravity-hooks/ (Unbound's + # own namespace, not inside agy's tree). + self.assertEqual( + self.setup.HOOKS_INSTALL_DIR, + self.home / ".unbound" / "antigravity-hooks", + ) + + def test_install_creates_settings_when_absent(self): + """install with no pre-existing hooks.json writes a valid file.""" + ok = self.setup.configure_antigravity_settings() + self.assertTrue(ok) + self.assertTrue(self.setup.HOOKS_JSON_PATH.exists()) + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + # Only PreToolUse and PostToolUse — agy doesn't actually fire the + # other event types in 1.0.5. + self.assertEqual(set(settings["hooks"].keys()), {"PreToolUse", "PostToolUse"}) + for event in ("PreToolUse", "PostToolUse"): + self.assertEqual(len(settings["hooks"][event]), 1) + + def test_install_does_not_write_unsupported_events(self): + """Regression: UserPromptSubmit/SessionStart/PreInvocation/etc. must + not be installed — they're either silently dropped (UserPromptSubmit, + SessionStart) or log "executing command" but never spawn the process + (PreInvocation/PostInvocation/Stop) in agy 1.0.5.""" + self.setup.configure_antigravity_settings() + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + for unsupported in ( + "UserPromptSubmit", "SessionStart", + "PreInvocation", "PostInvocation", "Stop", + ): + self.assertNotIn(unsupported, settings["hooks"]) + + def test_install_preserves_third_party_hooks(self): + """Pre-existing third-party hooks must survive our install.""" + original = self._seed_third_party_settings() + ok = self.setup.configure_antigravity_settings() + self.assertTrue(ok) + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + + # Third-party run_command hook must still be present in PreToolUse. + pre = settings["hooks"]["PreToolUse"] + third_party_cmds = [ + h["command"] + for item in pre + for h in item.get("hooks", []) + ] + self.assertIn("/usr/local/bin/some-other-tool", third_party_cmds) + + # SessionEnd was untouched by us, so it must still be there exactly as-is. + self.assertEqual(settings["hooks"]["SessionEnd"], original["hooks"]["SessionEnd"]) + + # Non-hook settings must be preserved. + self.assertTrue(settings["someUnrelatedSetting"]) + + def test_install_is_idempotent(self): + """Running install twice produces the same on-disk state.""" + self.setup.configure_antigravity_settings() + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + first = f.read() + self.setup.configure_antigravity_settings() + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + second = f.read() + self.assertEqual(first, second) + + def test_clear_removes_only_our_entries(self): + """After install + clear, only our entries are removed; third-party + hooks and other settings are intact.""" + original = self._seed_third_party_settings() + self.setup.configure_antigravity_settings() + + status = self.setup.remove_hooks_from_settings() + self.assertEqual(status, "cleared") + + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + + # Third-party hooks remain. + self.assertEqual(settings["hooks"]["SessionEnd"], original["hooks"]["SessionEnd"]) + # PreToolUse still contains the third-party tool but NOT our scripts. + pre = settings["hooks"]["PreToolUse"] + cmds = [h["command"] for item in pre for h in item.get("hooks", [])] + self.assertIn("/usr/local/bin/some-other-tool", cmds) + for cmd in cmds: + self.assertNotIn("unbound_", cmd) + + # Non-hook settings preserved. + self.assertTrue(settings["someUnrelatedSetting"]) + + def test_install_clear_roundtrip_no_third_party(self): + """install then clear on a clean slate returns hooks.json to a state + with no Unbound traces. ``hooks`` should be entirely gone.""" + self.setup.configure_antigravity_settings() + self.setup.remove_hooks_from_settings() + + # The file may still exist but should have no `hooks` key. + if self.setup.HOOKS_JSON_PATH.exists(): + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + self.assertNotIn("hooks", settings) + + def test_clear_when_nothing_installed(self): + """clear with no hooks.json returns not_found and does nothing.""" + status = self.setup.remove_hooks_from_settings() + self.assertEqual(status, "not_found") + + +class TestFullInstallFlow(unittest.TestCase): + """Drive setup.main() end-to-end against an isolated HOME, mocking only + the parts that touch the real network (callback server + backend POST).""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.home = Path(self.tmp) + self._old_home = os.environ.get("HOME") + self.setup = _reload_setup_with_home(self.home) + + def tearDown(self): + if self._old_home is not None: + os.environ["HOME"] = self._old_home + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_main_with_api_key_installs_files_and_settings(self): + """``setup.py --api-key X`` writes hooks.json and both scripts.""" + old_argv = sys.argv + sys.argv = ["setup.py", "--api-key", "test-api-key"] + try: + with patch.object(self.setup, "notify_setup_complete"): + self.setup.main() + finally: + sys.argv = old_argv + + # hooks.json exists and lists only the two events agy actually fires. + self.assertTrue(self.setup.HOOKS_JSON_PATH.exists()) + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + self.assertEqual(set(settings["hooks"].keys()), {"PreToolUse", "PostToolUse"}) + + # Both hook scripts + _common.py exist on disk in ~/.unbound/antigravity-hooks/. + for _event, installed_name in self.setup.HOOK_EVENT_SCRIPTS: + self.assertTrue((self.setup.HOOKS_INSTALL_DIR / installed_name).exists()) + self.assertTrue((self.setup.HOOKS_INSTALL_DIR / "_common.py").exists()) + + # Sentinel written. + self.assertTrue(self.setup.SENTINEL_PATH.exists()) + + # Unbound config got the API key. + cfg_path = self.home / ".unbound" / "config.json" + self.assertTrue(cfg_path.exists()) + with open(cfg_path, "r", encoding="utf-8") as f: + cfg = json.load(f) + self.assertEqual(cfg["api_key"], "test-api-key") + + def test_main_clear_after_install_returns_to_clean_state(self): + """install --> clear should remove every artifact we wrote.""" + old_argv = sys.argv + try: + sys.argv = ["setup.py", "--api-key", "k"] + with patch.object(self.setup, "notify_setup_complete"): + self.setup.main() + sys.argv = ["setup.py", "--clear"] + self.setup.main() + finally: + sys.argv = old_argv + + # All installed scripts gone. + for _event, installed_name in self.setup.HOOK_EVENT_SCRIPTS: + self.assertFalse((self.setup.HOOKS_INSTALL_DIR / installed_name).exists()) + self.assertFalse((self.setup.HOOKS_INSTALL_DIR / "_common.py").exists()) + # Sentinel gone. + self.assertFalse(self.setup.SENTINEL_PATH.exists()) + # hooks.json either gone or empty of hooks. + if self.setup.HOOKS_JSON_PATH.exists(): + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + self.assertNotIn("hooks", settings) + + +class TestMatcherShape(unittest.TestCase): + """Verify the matcher and event-key shape matches the agy wire format + documented in AGY-EMPIRICAL-FINDINGS.md (catch-all matcher, only the two + events agy actually fires).""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.home = Path(self.tmp) + self.setup = _reload_setup_with_home(self.home) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _find_our_entry(self, event_list, expected_script_substr): + """Locate the entry whose hook command points at our installed script.""" + for item in event_list: + for h in item.get("hooks", []): + if expected_script_substr in h.get("command", ""): + return item + return None + + def test_pre_tool_use_matcher_is_catch_all(self): + """PreToolUse must use the empty-string catch-all so unknown tools + (browser/*, notebook/*, subagent/*, future tools) still trigger our + hook. Server-side filtering decides what's policy-relevant.""" + self.setup.configure_antigravity_settings() + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + pre = settings["hooks"]["PreToolUse"] + ours = self._find_our_entry(pre, "unbound_pre_tool_use.py") + self.assertIsNotNone(ours, "our PreToolUse entry should be present") + self.assertEqual(ours.get("matcher"), "") + + def test_post_tool_use_matcher_is_catch_all(self): + self.setup.configure_antigravity_settings() + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + post = settings["hooks"]["PostToolUse"] + ours = self._find_our_entry(post, "unbound_post_tool_use.py") + self.assertIsNotNone(ours, "our PostToolUse entry should be present") + self.assertEqual(ours.get("matcher"), "") + + def test_matcher_does_not_allowlist_specific_tools(self): + """Regression: no entry we write may use a regex allowlist like + ``run_command|view_file|edit_file|...``. Any tool not in that list + would silently bypass the gate.""" + self.setup.configure_antigravity_settings() + with open(self.setup.HOOKS_JSON_PATH, "r", encoding="utf-8") as f: + settings = json.load(f) + for event in ("PreToolUse", "PostToolUse"): + for item in settings["hooks"][event]: + # Only inspect entries that include our hook script. + if not any( + "unbound_" in h.get("command", "") + for h in item.get("hooks", []) + ): + continue + matcher = item.get("matcher", "") + self.assertNotIn( + "|", matcher, + f"{event} matcher contains an allowlist: {matcher!r}", + ) + + +class TestNotifySetupCompleteNoCurl(unittest.TestCase): + """Regression: ``notify_setup_complete`` MUST NOT shell out to ``curl``. + Passing ``X-API-KEY: `` on curl's argv leaks the key to any other + user on the device via ``ps auxe`` / ``/proc//cmdline``. The fix is + to use stdlib urllib (headers stay inside the process). We assert that + by putting a fake ``curl`` shim first on PATH and verifying it never + gets invoked.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp() + self.home = Path(self.tmp) + self.setup = _reload_setup_with_home(self.home) + + # Fake curl shim that logs every invocation. If we still shell to + # curl, this log will contain entries. + bin_dir = self.home / "bin" + bin_dir.mkdir() + self.curl_log = self.home / "curl.log" + fake = bin_dir / "curl" + fake.write_text( + "#!/usr/bin/env python3\n" + "import sys\n" + f"with open({repr(str(self.curl_log))}, 'a') as f:\n" + " f.write(' '.join(sys.argv) + '\\n')\n" + "sys.exit(0)\n" + ) + os.chmod(fake, fake.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) + self._old_path = os.environ.get("PATH", "") + os.environ["PATH"] = f"{bin_dir}{os.pathsep}{self._old_path}" + + def tearDown(self): + os.environ["PATH"] = self._old_path + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_notify_setup_complete_does_not_invoke_curl(self): + """Drive notify_setup_complete() at an unreachable URL and assert + the curl shim was never executed. urllib raising URLError is fine — + the point is that the secret never appeared on any argv.""" + # 127.0.0.1:1 is closed → urllib raises URLError. notify_setup_complete + # is fail-soft (try/except), so the call returns cleanly. + self.setup.notify_setup_complete( + api_key="super-secret-key", + backend_url="http://127.0.0.1:1", + ) + self.assertFalse( + self.curl_log.exists(), + "curl was invoked from notify_setup_complete — API key leaked via argv", + ) + + +if __name__ == "__main__": + unittest.main()