From 97d04e2027b65508855f0d03dd30f45f3268afec Mon Sep 17 00:00:00 2001 From: MohamedAklamaash Date: Tue, 30 Jun 2026 00:01:59 +0530 Subject: [PATCH 1/7] feat(hooks): add git_remote_url to claude-code + copilot payloads (Repo Allowlist client-hook) Capture the credential-stripped origin remote URL for the event cwd via a short-timeout `git config --get remote.origin.url` subprocess, cached per (session_id, cwd), and inject it as `git_remote_url` into both the user_prompt and pre_tool_use bodies. Never raises; null when not a git repo, no origin, git missing/timeout, or no cwd. Copilot UserPromptSubmit cwd is unverified in-repo, so its prompt-submit path reads event.get('cwd') defensively. Co-Authored-By: Claude Opus 4.8 (1M context) --- claude-code/hooks/unbound.py | 32 ++++++++ copilot/hooks/unbound.py | 32 ++++++++ test_repo_allowlist.py | 151 +++++++++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+) create mode 100644 test_repo_allowlist.py diff --git a/claude-code/hooks/unbound.py b/claude-code/hooks/unbound.py index d0d59181..a2c0b182 100644 --- a/claude-code/hooks/unbound.py +++ b/claude-code/hooks/unbound.py @@ -937,6 +937,36 @@ def _get_device_serial() -> Optional[str]: return None +_GIT_CONTEXT_CACHE: Dict = {} + + +def _strip_git_credentials(url: str) -> str: + return re.sub(r'(://)[^/@]+@', r'\1', url, count=1) + + +def _get_git_context(session_id: Optional[str], cwd: Optional[str]) -> Optional[str]: + """Credential-stripped origin remote URL for cwd, or None. Cached per + (session_id, cwd); never raises.""" + key = (session_id, cwd) + if key in _GIT_CONTEXT_CACHE: + return _GIT_CONTEXT_CACHE[key] + result = None + if cwd: + try: + out = subprocess.run( + ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], + capture_output=True, text=True, timeout=2, + ) + if out.returncode == 0: + url = out.stdout.strip() + if url: + result = _strip_git_credentials(url) + except Exception: + result = None + _GIT_CONTEXT_CACHE[key] = result + return result + + def _device_serial(probe: bool = True) -> Optional[str]: """Hardware serial, computed once and cached. Never raises and never blocks the hook. On the latency-critical pre-tool path callers pass probe=False to read the @@ -1060,6 +1090,7 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: 'unbound_app_label': 'claude-code', 'model': model, 'event_name': 'tool_use', + 'git_remote_url': _get_git_context(session_id, event.get('cwd')), 'pre_tool_use_data': { 'command': command, 'tool_name': tool_name, @@ -1155,6 +1186,7 @@ def process_user_prompt_submit(event: Dict, api_key: str) -> Dict: 'unbound_app_label': 'claude-code', 'model': model, 'event_name': 'user_prompt', + 'git_remote_url': _get_git_context(session_id, event.get('cwd')), 'account_identity': build_account_identity(), 'messages': [{'role': 'user', 'content': prompt}] if prompt else [] } diff --git a/copilot/hooks/unbound.py b/copilot/hooks/unbound.py index 8c2469ed..5ecfabc7 100644 --- a/copilot/hooks/unbound.py +++ b/copilot/hooks/unbound.py @@ -919,6 +919,36 @@ def transform_response_for_copilot_prompt(api_response): return {} +_GIT_CONTEXT_CACHE = {} + + +def _strip_git_credentials(url): + return re.sub(r'(://)[^/@]+@', r'\1', url, count=1) + + +def _get_git_context(session_id, cwd): + """Credential-stripped origin remote URL for cwd, or None. Cached per + (session_id, cwd); never raises.""" + key = (session_id, cwd) + if key in _GIT_CONTEXT_CACHE: + return _GIT_CONTEXT_CACHE[key] + result = None + if cwd: + try: + out = subprocess.run( + ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], + capture_output=True, text=True, timeout=2, + ) + if out.returncode == 0: + url = out.stdout.strip() + if url: + result = _strip_git_credentials(url) + except Exception: + result = None + _GIT_CONTEXT_CACHE[key] = result + return result + + def process_pre_tool_use(event, api_key): """Process PreToolUse event - check policy before tool execution.""" raw_tool = event.get('tool_name') or event.get('toolName') or '' @@ -1017,6 +1047,7 @@ def process_pre_tool_use(event, api_key): 'unbound_app_label': 'copilot', 'model': model, 'event_name': 'tool_use', + 'git_remote_url': _get_git_context(session_id, event.get('cwd')), 'pre_tool_use_data': { 'tool_name': canonical, 'command': command, @@ -1110,6 +1141,7 @@ def process_user_prompt_submit(event, api_key): 'unbound_app_label': 'copilot', 'model': model, 'event_name': 'user_prompt', + 'git_remote_url': _get_git_context(session_id, event.get('cwd')), 'messages': [{'role': 'user', 'content': prompt}] if prompt else [] } diff --git a/test_repo_allowlist.py b/test_repo_allowlist.py new file mode 100644 index 00000000..24aa9686 --- /dev/null +++ b/test_repo_allowlist.py @@ -0,0 +1,151 @@ +"""Repo Allowlist client-hook tests: _get_git_context behavior and +git_remote_url payload parity across claude-code and copilot.""" + +import contextlib +import importlib.util +import subprocess +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +ROOT = Path(__file__).resolve().parent + + +def _load(name, rel): + spec = importlib.util.spec_from_file_location(name, ROOT / rel) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +cc = _load("cc_unbound", "claude-code/hooks/unbound.py") +co = _load("co_unbound", "copilot/hooks/unbound.py") + + +def _git(args, cwd): + subprocess.run(["git", *args], cwd=cwd, check=True, + capture_output=True, text=True) + + +def _make_repo(remote_url): + d = tempfile.mkdtemp() + _git(["init", "-q"], d) + _git(["remote", "add", "origin", remote_url], d) + return d + + +class TestGetGitContext(unittest.TestCase): + def setUp(self): + cc._GIT_CONTEXT_CACHE.clear() + co._GIT_CONTEXT_CACHE.clear() + + def _both(self): + return [("claude-code", cc), ("copilot", co)] + + def test_returns_origin_url(self): + repo = _make_repo("https://github.com/org/repo.git") + for label, mod in self._both(): + with self.subTest(hook=label): + self.assertEqual( + mod._get_git_context("s1", repo), + "https://github.com/org/repo.git", + ) + + def test_strips_credentials(self): + repo = _make_repo("https://user:token@github.com/org/repo.git") + for label, mod in self._both(): + with self.subTest(hook=label): + result = mod._get_git_context("s1", repo) + self.assertNotIn("user:token@", result) + self.assertNotIn("@", result) + self.assertEqual(result, "https://github.com/org/repo.git") + + def test_no_git_repo(self): + plain = tempfile.mkdtemp() + for label, mod in self._both(): + with self.subTest(hook=label): + self.assertIsNone(mod._get_git_context("s1", plain)) + + def test_no_cwd(self): + for label, mod in self._both(): + with self.subTest(hook=label): + self.assertIsNone(mod._get_git_context("s1", None)) + + def test_git_missing_no_raise(self): + for label, mod in self._both(): + with self.subTest(hook=label): + with patch("subprocess.run", side_effect=FileNotFoundError()): + self.assertIsNone(mod._get_git_context("s_missing", "/x")) + + def test_timeout_no_raise(self): + for label, mod in self._both(): + with self.subTest(hook=label): + with patch("subprocess.run", + side_effect=subprocess.TimeoutExpired("git", 2)): + self.assertIsNone(mod._get_git_context("s_timeout", "/x")) + + def test_caches_per_session_cwd(self): + repo = _make_repo("https://github.com/org/repo.git") + for label, mod in self._both(): + with self.subTest(hook=label): + mod._GIT_CONTEXT_CACHE.clear() + real = subprocess.run + with patch("subprocess.run", side_effect=real) as spy: + mod._get_git_context("s_cache", repo) + mod._get_git_context("s_cache", repo) + self.assertEqual(spy.call_count, 1) + + +class TestPayloadParity(unittest.TestCase): + """git_remote_url must appear with the same key/shape in both the + user_prompt and pre_tool_use bodies for claude-code and copilot. + + Copilot's UserPromptSubmit cwd is UNVERIFIED in-repo; the hook reads + event.get('cwd') defensively, so the field is present and null when cwd + is absent (contract-compatible) and populated when cwd is supplied.""" + + def setUp(self): + cc._GIT_CONTEXT_CACHE.clear() + co._GIT_CONTEXT_CACHE.clear() + self.repo = _make_repo("https://github.com/org/repo.git") + self.expected = "https://github.com/org/repo.git" + + def _capture(self, mod, fn, event): + with contextlib.ExitStack() as stack: + send = stack.enter_context( + patch.object(mod, "send_to_hook_api", return_value={"decision": "allow"})) + if hasattr(mod, "build_account_identity"): + stack.enter_context( + patch.object(mod, "build_account_identity", return_value={})) + fn(event, "key") + return send.call_args.args[0] + + def test_all_four_bodies_carry_git_remote_url(self): + prompt_event = {"session_id": "p", "prompt": "hi", "cwd": self.repo} + tool_event = {"session_id": "t", "tool_name": "Bash", + "tool_input": {"command": "ls"}, "cwd": self.repo} + + bodies = { + "cc_prompt": self._capture(cc, cc.process_user_prompt_submit, dict(prompt_event)), + "cc_tool": self._capture(cc, cc.process_pre_tool_use, dict(tool_event)), + "co_prompt": self._capture(co, co.process_user_prompt_submit, dict(prompt_event)), + "co_tool": self._capture(co, co.process_pre_tool_use, dict(tool_event)), + } + + for name, body in bodies.items(): + with self.subTest(body=name): + self.assertIn("git_remote_url", body) + self.assertEqual(body["git_remote_url"], self.expected) + + def test_field_is_null_when_no_cwd(self): + prompt_event = {"session_id": "n", "prompt": "hi"} + for mod in (cc, co): + body = self._capture(mod, mod.process_user_prompt_submit, dict(prompt_event)) + with self.subTest(mod=mod.__name__): + self.assertIn("git_remote_url", body) + self.assertIsNone(body["git_remote_url"]) + + +if __name__ == "__main__": + unittest.main() From df98388a3de78bbc2ba1766bca2e98e08332b602 Mon Sep 17 00:00:00 2001 From: MohamedAklamaash Date: Tue, 30 Jun 2026 00:50:08 +0530 Subject: [PATCH 2/7] fix(hooks): strip git userinfo for scp form and @-in-password _strip_git_credentials only stripped scheme:// URLs via first-@, so scp-form remotes (user:token@host:org/repo) leaked credentials on the wire and an @ in the password left a fragment. Rewrite to strip userinfo using the last @ before the path boundary for both scheme and scp forms; total (never raises). Helper kept byte-for-byte identical across claude-code and copilot. Extends the credential test matrix (scp-with-token, @-in-password, ssh port). Co-Authored-By: Claude Opus 4.8 (1M context) --- claude-code/hooks/unbound.py | 18 +++++++++++++-- copilot/hooks/unbound.py | 16 +++++++++++++- test_repo_allowlist.py | 43 ++++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/claude-code/hooks/unbound.py b/claude-code/hooks/unbound.py index a2c0b182..b436aa26 100644 --- a/claude-code/hooks/unbound.py +++ b/claude-code/hooks/unbound.py @@ -940,8 +940,22 @@ def _get_device_serial() -> Optional[str]: _GIT_CONTEXT_CACHE: Dict = {} -def _strip_git_credentials(url: str) -> str: - return re.sub(r'(://)[^/@]+@', r'\1', url, count=1) +def _strip_git_credentials(url): + try: + if not url or '@' not in url: + return url + scheme = re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', url) + prefix = scheme.group(0) if scheme else '' + rest = url[len(prefix):] + slash = rest.find('/') + authority = rest if slash == -1 else rest[:slash] + tail = '' if slash == -1 else rest[slash:] + at = authority.rfind('@') + if at == -1: + return url + return prefix + authority[at + 1:] + tail + except Exception: + return url def _get_git_context(session_id: Optional[str], cwd: Optional[str]) -> Optional[str]: diff --git a/copilot/hooks/unbound.py b/copilot/hooks/unbound.py index 5ecfabc7..36faf71d 100644 --- a/copilot/hooks/unbound.py +++ b/copilot/hooks/unbound.py @@ -923,7 +923,21 @@ def transform_response_for_copilot_prompt(api_response): def _strip_git_credentials(url): - return re.sub(r'(://)[^/@]+@', r'\1', url, count=1) + try: + if not url or '@' not in url: + return url + scheme = re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', url) + prefix = scheme.group(0) if scheme else '' + rest = url[len(prefix):] + slash = rest.find('/') + authority = rest if slash == -1 else rest[:slash] + tail = '' if slash == -1 else rest[slash:] + at = authority.rfind('@') + if at == -1: + return url + return prefix + authority[at + 1:] + tail + except Exception: + return url def _get_git_context(session_id, cwd): diff --git a/test_repo_allowlist.py b/test_repo_allowlist.py index 24aa9686..40822d1f 100644 --- a/test_repo_allowlist.py +++ b/test_repo_allowlist.py @@ -147,5 +147,48 @@ def test_field_is_null_when_no_cwd(self): self.assertIsNone(body["git_remote_url"]) +class TestStripGitCredentials(unittest.TestCase): + def _both(self): + return [("claude-code", cc), ("copilot", co)] + + def _check(self, url, expected): + for label, mod in self._both(): + with self.subTest(hook=label, url=url): + out = mod._strip_git_credentials(url) + self.assertEqual(out, expected) + self.assertNotIn("@", out) + + def test_scheme_with_token(self): + self._check("https://user:token@github.com/org/repo.git", + "https://github.com/org/repo.git") + + def test_scp_form_with_token(self): + self._check("user:token@github.com:org/repo.git", + "github.com:org/repo.git") + + def test_scp_form_plain_user(self): + self._check("git@github.com:org/repo.git", + "github.com:org/repo.git") + + def test_password_with_at_scheme(self): + self._check("https://user:p@ss@w@rd@github.com/org/repo", + "https://github.com/org/repo") + + def test_password_with_at_scp(self): + self._check("user:p@ss@github.com:org/repo", + "github.com:org/repo") + + def test_ssh_scheme_with_port(self): + self._check("ssh://git@github.com:22/org/repo", + "ssh://github.com:22/org/repo") + + def test_no_userinfo_unchanged(self): + for url in ("https://github.com/org/repo.git", + "github.com:org/repo.git", "", "/local/path/repo"): + for label, mod in self._both(): + with self.subTest(hook=label, url=url): + self.assertEqual(mod._strip_git_credentials(url), url) + + if __name__ == "__main__": unittest.main() From 6b926f1a69f6f5c26cfd8eb5d32db20fb9647572 Mon Sep 17 00:00:00 2001 From: MohamedAklamaash Date: Tue, 30 Jun 2026 09:21:22 +0530 Subject: [PATCH 3/7] feat(repo-allowlist): resolve git context from the edited file's repo Co-Authored-By: Claude Opus 4.8 (1M context) --- claude-code/hooks/unbound.py | 22 ++++- copilot/hooks/unbound.py | 22 ++++- test_repo_allowlist.py | 157 ++++++++++++++++++++++++++--------- 3 files changed, 158 insertions(+), 43 deletions(-) diff --git a/claude-code/hooks/unbound.py b/claude-code/hooks/unbound.py index b436aa26..f7902cef 100644 --- a/claude-code/hooks/unbound.py +++ b/claude-code/hooks/unbound.py @@ -981,6 +981,23 @@ def _get_git_context(session_id: Optional[str], cwd: Optional[str]) -> Optional[ return result +def _repo_context_dir(cwd: Optional[str], file_path) -> Optional[str]: + """Directory whose git repo governs the operation: the nearest existing + ancestor of the target file for file tools, else the session cwd. The + ancestor walk lets a write into a not-yet-created path still resolve to its + enclosing repo.""" + if isinstance(file_path, str) and file_path: + base = file_path if os.path.isabs(file_path) else os.path.join(cwd or '', file_path) + d = os.path.dirname(base) or cwd + while d and not os.path.isdir(d): + parent = os.path.dirname(d) + if parent == d: + break + d = parent + return d or cwd + return cwd + + def _device_serial(probe: bool = True) -> Optional[str]: """Hardware serial, computed once and cached. Never raises and never blocks the hook. On the latency-critical pre-tool path callers pass probe=False to read the @@ -1104,7 +1121,9 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: 'unbound_app_label': 'claude-code', 'model': model, 'event_name': 'tool_use', - 'git_remote_url': _get_git_context(session_id, event.get('cwd')), + 'git_remote_url': _get_git_context( + session_id, _repo_context_dir(event.get('cwd'), tool_input.get('file_path')) + ), 'pre_tool_use_data': { 'command': command, 'tool_name': tool_name, @@ -1200,7 +1219,6 @@ def process_user_prompt_submit(event: Dict, api_key: str) -> Dict: 'unbound_app_label': 'claude-code', 'model': model, 'event_name': 'user_prompt', - 'git_remote_url': _get_git_context(session_id, event.get('cwd')), 'account_identity': build_account_identity(), 'messages': [{'role': 'user', 'content': prompt}] if prompt else [] } diff --git a/copilot/hooks/unbound.py b/copilot/hooks/unbound.py index 36faf71d..1fddb277 100644 --- a/copilot/hooks/unbound.py +++ b/copilot/hooks/unbound.py @@ -963,6 +963,23 @@ def _get_git_context(session_id, cwd): return result +def _repo_context_dir(cwd, file_path): + """Directory whose git repo governs the operation: the nearest existing + ancestor of the target file for file tools, else the session cwd. The + ancestor walk lets a write into a not-yet-created path still resolve to its + enclosing repo.""" + if isinstance(file_path, str) and file_path: + base = file_path if os.path.isabs(file_path) else os.path.join(cwd or '', file_path) + d = os.path.dirname(base) or cwd + while d and not os.path.isdir(d): + parent = os.path.dirname(d) + if parent == d: + break + d = parent + return d or cwd + return cwd + + def process_pre_tool_use(event, api_key): """Process PreToolUse event - check policy before tool execution.""" raw_tool = event.get('tool_name') or event.get('toolName') or '' @@ -1061,7 +1078,9 @@ def process_pre_tool_use(event, api_key): 'unbound_app_label': 'copilot', 'model': model, 'event_name': 'tool_use', - 'git_remote_url': _get_git_context(session_id, event.get('cwd')), + 'git_remote_url': _get_git_context( + session_id, _repo_context_dir(event.get('cwd'), file_path) + ), 'pre_tool_use_data': { 'tool_name': canonical, 'command': command, @@ -1155,7 +1174,6 @@ def process_user_prompt_submit(event, api_key): 'unbound_app_label': 'copilot', 'model': model, 'event_name': 'user_prompt', - 'git_remote_url': _get_git_context(session_id, event.get('cwd')), 'messages': [{'role': 'user', 'content': prompt}] if prompt else [] } diff --git a/test_repo_allowlist.py b/test_repo_allowlist.py index 40822d1f..ac44cd96 100644 --- a/test_repo_allowlist.py +++ b/test_repo_allowlist.py @@ -3,6 +3,7 @@ import contextlib import importlib.util +import os import subprocess import tempfile import unittest @@ -97,55 +98,133 @@ def test_caches_per_session_cwd(self): self.assertEqual(spy.call_count, 1) -class TestPayloadParity(unittest.TestCase): - """git_remote_url must appear with the same key/shape in both the - user_prompt and pre_tool_use bodies for claude-code and copilot. +def _capture(mod, fn, event): + with contextlib.ExitStack() as stack: + send = stack.enter_context( + patch.object(mod, "send_to_hook_api", return_value={"decision": "allow"})) + if hasattr(mod, "build_account_identity"): + stack.enter_context( + patch.object(mod, "build_account_identity", return_value={})) + if hasattr(mod, "load_policy_cache"): + stack.enter_context(patch.object(mod, "load_policy_cache", return_value=None)) + fn(event, "key") + return send.call_args.args[0] + + +class TestRepoContextDir(unittest.TestCase): + """The repo identity follows the operation target: the nearest existing + ancestor of the edited/read file for file tools, the session cwd otherwise.""" + + def test_uses_existing_target_directory(self): + parent = tempfile.mkdtemp() + sub = os.path.join(parent, "src") + os.makedirs(sub) + target = os.path.join(sub, "x.py") + for mod in (cc, co): + with self.subTest(mod=mod.__name__): + self.assertEqual(mod._repo_context_dir(parent, target), sub) + + def test_walks_up_to_nearest_existing_dir(self): + parent = tempfile.mkdtemp() + target = os.path.join(parent, "brand", "new", "deep", "file.py") + for mod in (cc, co): + with self.subTest(mod=mod.__name__): + self.assertEqual(mod._repo_context_dir(parent, target), parent) + + def test_falls_back_to_cwd_without_file_path(self): + parent = tempfile.mkdtemp() + for mod in (cc, co): + with self.subTest(mod=mod.__name__): + self.assertEqual(mod._repo_context_dir(parent, None), parent) + - Copilot's UserPromptSubmit cwd is UNVERIFIED in-repo; the hook reads - event.get('cwd') defensively, so the field is present and null when cwd - is absent (contract-compatible) and populated when cwd is supplied.""" +class TestTargetDirResolution(unittest.TestCase): + """A file tool launched from a non-git parent resolves to the subdir-repo + it actually edits; a file outside any repo resolves to nothing.""" def setUp(self): cc._GIT_CONTEXT_CACHE.clear() co._GIT_CONTEXT_CACHE.clear() - self.repo = _make_repo("https://github.com/org/repo.git") - self.expected = "https://github.com/org/repo.git" - - def _capture(self, mod, fn, event): - with contextlib.ExitStack() as stack: - send = stack.enter_context( - patch.object(mod, "send_to_hook_api", return_value={"decision": "allow"})) - if hasattr(mod, "build_account_identity"): - stack.enter_context( - patch.object(mod, "build_account_identity", return_value={})) - fn(event, "key") - return send.call_args.args[0] - - def test_all_four_bodies_carry_git_remote_url(self): - prompt_event = {"session_id": "p", "prompt": "hi", "cwd": self.repo} - tool_event = {"session_id": "t", "tool_name": "Bash", - "tool_input": {"command": "ls"}, "cwd": self.repo} - bodies = { - "cc_prompt": self._capture(cc, cc.process_user_prompt_submit, dict(prompt_event)), - "cc_tool": self._capture(cc, cc.process_pre_tool_use, dict(tool_event)), - "co_prompt": self._capture(co, co.process_user_prompt_submit, dict(prompt_event)), - "co_tool": self._capture(co, co.process_pre_tool_use, dict(tool_event)), - } + def test_file_in_subdir_repo_resolves_from_parent_cwd(self): + parent = tempfile.mkdtemp() + repo = Path(parent) / "service" + (repo / "src").mkdir(parents=True) + _git(["init", "-q"], str(repo)) + _git(["remote", "add", "origin", "https://github.com/org/service.git"], str(repo)) + target = str(repo / "src" / "x.py") + + cc_body = _capture(cc, cc.process_pre_tool_use, { + "session_id": "t", "tool_name": "Edit", "cwd": parent, + "tool_input": {"file_path": target}}) + co_body = _capture(co, co.process_pre_tool_use, { + "session_id": "t", "tool_name": "Edit", "cwd": parent, + "tool_input": {"filePath": target}}) + + for label, body in (("claude-code", cc_body), ("copilot", co_body)): + with self.subTest(hook=label): + self.assertEqual( + body["git_remote_url"], "https://github.com/org/service.git") - for name, body in bodies.items(): - with self.subTest(body=name): - self.assertIn("git_remote_url", body) - self.assertEqual(body["git_remote_url"], self.expected) + def test_file_outside_any_repo_resolves_null(self): + parent = tempfile.mkdtemp() + target = str(Path(parent) / "notes.py") - def test_field_is_null_when_no_cwd(self): - prompt_event = {"session_id": "n", "prompt": "hi"} - for mod in (cc, co): - body = self._capture(mod, mod.process_user_prompt_submit, dict(prompt_event)) - with self.subTest(mod=mod.__name__): - self.assertIn("git_remote_url", body) + cc_body = _capture(cc, cc.process_pre_tool_use, { + "session_id": "t2", "tool_name": "Edit", "cwd": parent, + "tool_input": {"file_path": target}}) + co_body = _capture(co, co.process_pre_tool_use, { + "session_id": "t2", "tool_name": "Edit", "cwd": parent, + "tool_input": {"filePath": target}}) + + for label, body in (("claude-code", cc_body), ("copilot", co_body)): + with self.subTest(hook=label): self.assertIsNone(body["git_remote_url"]) + def test_warm_cache_still_sends_file_tool_when_listed(self): + repo = _make_repo("https://github.com/org/repo.git") + target = os.path.join(repo, "x.py") + for label, mod, key in (("claude-code", cc, "file_path"), + ("copilot", co, "filePath")): + with self.subTest(hook=label), contextlib.ExitStack() as stack: + stack.enter_context(patch.object( + mod, "load_policy_cache", return_value={"tools_to_check": ["Edit"]})) + stack.enter_context(patch.object(mod, "is_cache_stale", return_value=False)) + send = stack.enter_context(patch.object( + mod, "send_to_hook_api", return_value={"decision": "allow"})) + if hasattr(mod, "build_account_identity"): + stack.enter_context(patch.object(mod, "build_account_identity", return_value={})) + mod.process_pre_tool_use({ + "session_id": "w", "tool_name": "Edit", "cwd": repo, + "tool_input": {key: target}}, "k") + self.assertTrue(send.called) + + +class TestPayloadParity(unittest.TestCase): + """The prompt path is no longer gated, so its body omits git_remote_url; + the tool path carries it for both hooks.""" + + def setUp(self): + cc._GIT_CONTEXT_CACHE.clear() + co._GIT_CONTEXT_CACHE.clear() + self.repo = _make_repo("https://github.com/org/repo.git") + + def test_tool_bodies_carry_git_remote_url(self): + tool_event = {"session_id": "t", "tool_name": "Bash", + "tool_input": {"command": "ls"}, "cwd": self.repo} + for label, mod in (("claude-code", cc), ("copilot", co)): + body = _capture(mod, mod.process_pre_tool_use, dict(tool_event)) + with self.subTest(hook=label): + self.assertEqual( + body["git_remote_url"], "https://github.com/org/repo.git") + + def test_prompt_bodies_omit_git_remote_url(self): + prompt_event = {"session_id": "p", "prompt": "hi", "cwd": self.repo} + for label, mod in (("claude-code", cc), ("copilot", co)): + body = _capture(mod, mod.process_user_prompt_submit, dict(prompt_event)) + with self.subTest(hook=label): + self.assertNotIn("git_remote_url", body) + class TestStripGitCredentials(unittest.TestCase): def _both(self): From 7849e780700dea795541dc5b380f80d0f0f731ff Mon Sep 17 00:00:00 2001 From: MohamedAklamaash Date: Tue, 30 Jun 2026 10:41:21 +0530 Subject: [PATCH 4/7] feat(repo-allowlist): extend file-target git context to the cursor hook Co-Authored-By: Claude Opus 4.8 (1M context) --- cursor/unbound.py | 62 ++++++++++++++++++++++++++++++++++++++++ test_repo_allowlist.py | 65 +++++++++++++++++++++--------------------- 2 files changed, 95 insertions(+), 32 deletions(-) diff --git a/cursor/unbound.py b/cursor/unbound.py index fc8afd84..9c978565 100644 --- a/cursor/unbound.py +++ b/cursor/unbound.py @@ -662,6 +662,65 @@ def build_account_identity(event=None, probe=False): return identity +_GIT_CONTEXT_CACHE = {} + + +def _strip_git_credentials(url): + try: + if not url or '@' not in url: + return url + scheme = re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', url) + prefix = scheme.group(0) if scheme else '' + rest = url[len(prefix):] + slash = rest.find('/') + authority = rest if slash == -1 else rest[:slash] + tail = '' if slash == -1 else rest[slash:] + at = authority.rfind('@') + if at == -1: + return url + return prefix + authority[at + 1:] + tail + except Exception: + return url + + +def _get_git_context(session_id, cwd): + """Credential-stripped origin remote URL for cwd, or None. Cached per + (session_id, cwd); never raises.""" + key = (session_id, cwd) + if key in _GIT_CONTEXT_CACHE: + return _GIT_CONTEXT_CACHE[key] + result = None + if cwd: + try: + out = subprocess.run( + ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], + capture_output=True, text=True, timeout=2, + ) + if out.returncode == 0: + url = out.stdout.strip() + if url: + result = _strip_git_credentials(url) + except Exception: + result = None + _GIT_CONTEXT_CACHE[key] = result + return result + + +def _repo_context_dir(cwd, file_path): + """Directory whose git repo governs the operation: the nearest existing + ancestor of the target file for file tools, else the session cwd.""" + if isinstance(file_path, str) and file_path: + base = file_path if os.path.isabs(file_path) else os.path.join(cwd or '', file_path) + d = os.path.dirname(base) or cwd + while d and not os.path.isdir(d): + parent = os.path.dirname(d) + if parent == d: + break + d = parent + return d or cwd + return cwd + + def process_pre_tool_use(event, api_key): """Process preToolUse event - check policy before tool execution.""" tool_name = event.get('tool_name', '') @@ -697,6 +756,9 @@ def process_pre_tool_use(event, api_key): 'unbound_app_label': 'cursor', 'model': model, 'event_name': 'tool_use', + 'git_remote_url': _get_git_context( + conversation_id, _repo_context_dir(event.get('cwd'), file_path) + ), 'pre_tool_use_data': { 'tool_name': tool_name, 'command': '', diff --git a/test_repo_allowlist.py b/test_repo_allowlist.py index ac44cd96..73d0a0a5 100644 --- a/test_repo_allowlist.py +++ b/test_repo_allowlist.py @@ -22,6 +22,16 @@ def _load(name, rel): cc = _load("cc_unbound", "claude-code/hooks/unbound.py") co = _load("co_unbound", "copilot/hooks/unbound.py") +cur = _load("cur_unbound", "cursor/unbound.py") + +ALL_HOOKS = [("claude-code", cc), ("copilot", co), ("cursor", cur)] + +# (label, module, native file-tool name, session-id key, file_path key) +HOOK_TOOL_CASES = [ + ("claude-code", cc, "Edit", "session_id", "file_path"), + ("copilot", co, "Edit", "session_id", "filePath"), + ("cursor", cur, "Write", "conversation_id", "file_path"), +] def _git(args, cwd): @@ -38,11 +48,11 @@ def _make_repo(remote_url): class TestGetGitContext(unittest.TestCase): def setUp(self): - cc._GIT_CONTEXT_CACHE.clear() - co._GIT_CONTEXT_CACHE.clear() + for _, mod in ALL_HOOKS: + mod._GIT_CONTEXT_CACHE.clear() def _both(self): - return [("claude-code", cc), ("copilot", co)] + return ALL_HOOKS def test_returns_origin_url(self): repo = _make_repo("https://github.com/org/repo.git") @@ -120,20 +130,20 @@ def test_uses_existing_target_directory(self): sub = os.path.join(parent, "src") os.makedirs(sub) target = os.path.join(sub, "x.py") - for mod in (cc, co): + for mod in (cc, co, cur): with self.subTest(mod=mod.__name__): self.assertEqual(mod._repo_context_dir(parent, target), sub) def test_walks_up_to_nearest_existing_dir(self): parent = tempfile.mkdtemp() target = os.path.join(parent, "brand", "new", "deep", "file.py") - for mod in (cc, co): + for mod in (cc, co, cur): with self.subTest(mod=mod.__name__): self.assertEqual(mod._repo_context_dir(parent, target), parent) def test_falls_back_to_cwd_without_file_path(self): parent = tempfile.mkdtemp() - for mod in (cc, co): + for mod in (cc, co, cur): with self.subTest(mod=mod.__name__): self.assertEqual(mod._repo_context_dir(parent, None), parent) @@ -143,8 +153,8 @@ class TestTargetDirResolution(unittest.TestCase): it actually edits; a file outside any repo resolves to nothing.""" def setUp(self): - cc._GIT_CONTEXT_CACHE.clear() - co._GIT_CONTEXT_CACHE.clear() + for _, mod in ALL_HOOKS: + mod._GIT_CONTEXT_CACHE.clear() def test_file_in_subdir_repo_resolves_from_parent_cwd(self): parent = tempfile.mkdtemp() @@ -154,15 +164,11 @@ def test_file_in_subdir_repo_resolves_from_parent_cwd(self): _git(["remote", "add", "origin", "https://github.com/org/service.git"], str(repo)) target = str(repo / "src" / "x.py") - cc_body = _capture(cc, cc.process_pre_tool_use, { - "session_id": "t", "tool_name": "Edit", "cwd": parent, - "tool_input": {"file_path": target}}) - co_body = _capture(co, co.process_pre_tool_use, { - "session_id": "t", "tool_name": "Edit", "cwd": parent, - "tool_input": {"filePath": target}}) - - for label, body in (("claude-code", cc_body), ("copilot", co_body)): + for label, mod, tool, idk, pathk in HOOK_TOOL_CASES: with self.subTest(hook=label): + body = _capture(mod, mod.process_pre_tool_use, { + idk: "t", "tool_name": tool, "cwd": parent, + "tool_input": {pathk: target}}) self.assertEqual( body["git_remote_url"], "https://github.com/org/service.git") @@ -170,33 +176,28 @@ def test_file_outside_any_repo_resolves_null(self): parent = tempfile.mkdtemp() target = str(Path(parent) / "notes.py") - cc_body = _capture(cc, cc.process_pre_tool_use, { - "session_id": "t2", "tool_name": "Edit", "cwd": parent, - "tool_input": {"file_path": target}}) - co_body = _capture(co, co.process_pre_tool_use, { - "session_id": "t2", "tool_name": "Edit", "cwd": parent, - "tool_input": {"filePath": target}}) - - for label, body in (("claude-code", cc_body), ("copilot", co_body)): + for label, mod, tool, idk, pathk in HOOK_TOOL_CASES: with self.subTest(hook=label): + body = _capture(mod, mod.process_pre_tool_use, { + idk: "t2", "tool_name": tool, "cwd": parent, + "tool_input": {pathk: target}}) self.assertIsNone(body["git_remote_url"]) def test_warm_cache_still_sends_file_tool_when_listed(self): repo = _make_repo("https://github.com/org/repo.git") target = os.path.join(repo, "x.py") - for label, mod, key in (("claude-code", cc, "file_path"), - ("copilot", co, "filePath")): + for label, mod, tool, idk, pathk in HOOK_TOOL_CASES: with self.subTest(hook=label), contextlib.ExitStack() as stack: stack.enter_context(patch.object( - mod, "load_policy_cache", return_value={"tools_to_check": ["Edit"]})) + mod, "load_policy_cache", return_value={"tools_to_check": [tool]})) stack.enter_context(patch.object(mod, "is_cache_stale", return_value=False)) send = stack.enter_context(patch.object( mod, "send_to_hook_api", return_value={"decision": "allow"})) if hasattr(mod, "build_account_identity"): stack.enter_context(patch.object(mod, "build_account_identity", return_value={})) mod.process_pre_tool_use({ - "session_id": "w", "tool_name": "Edit", "cwd": repo, - "tool_input": {key: target}}, "k") + idk: "w", "tool_name": tool, "cwd": repo, + "tool_input": {pathk: target}}, "k") self.assertTrue(send.called) @@ -205,8 +206,8 @@ class TestPayloadParity(unittest.TestCase): the tool path carries it for both hooks.""" def setUp(self): - cc._GIT_CONTEXT_CACHE.clear() - co._GIT_CONTEXT_CACHE.clear() + for _, mod in ALL_HOOKS: + mod._GIT_CONTEXT_CACHE.clear() self.repo = _make_repo("https://github.com/org/repo.git") def test_tool_bodies_carry_git_remote_url(self): @@ -228,7 +229,7 @@ def test_prompt_bodies_omit_git_remote_url(self): class TestStripGitCredentials(unittest.TestCase): def _both(self): - return [("claude-code", cc), ("copilot", co)] + return ALL_HOOKS def _check(self, url, expected): for label, mod in self._both(): From a7fce30ec3909283d9e139350f5753444a7f8c93 Mon Sep 17 00:00:00 2001 From: MohamedAklamaash Date: Tue, 30 Jun 2026 11:42:09 +0530 Subject: [PATCH 5/7] feat(repo-allowlist): gate MultiEdit/NotebookEdit in the claude-code hook MultiEdit and NotebookEdit mutate files but were absent from the claude-code native-file-tool set, so edits through them skipped the git_remote_url stamp and never reached the repo-allowlist gate. Add both to ALLOWED_NON_MCP_HOOK_NAMES and NATIVE_FILE_TOOLS, and extract the target path (file_path for MultiEdit, notebook_path for NotebookEdit) so the gate resolves the governing repo exactly like Edit. Co-Authored-By: Claude Opus 4.8 --- claude-code/hooks/unbound.py | 26 ++++++++++++++++++-------- test_repo_allowlist.py | 2 ++ 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/claude-code/hooks/unbound.py b/claude-code/hooks/unbound.py index f7902cef..fe95f18f 100644 --- a/claude-code/hooks/unbound.py +++ b/claude-code/hooks/unbound.py @@ -20,8 +20,8 @@ AUDIT_LOG = Path.home() / ".claude" / "hooks" / "agent-audit.log" ERROR_LOG = Path.home() / ".claude" / "hooks" / "error.log" LAST_REPORT_FILE = Path.home() / ".claude" / "hooks" / ".last_error_report" -ALLOWED_NON_MCP_HOOK_NAMES = ['Bash', 'Read', 'Write', 'Edit'] # MCP tools (mcp__*) are always checked separately -NATIVE_FILE_TOOLS = {'Read', 'Write', 'Edit'} +ALLOWED_NON_MCP_HOOK_NAMES = ['Bash', 'Read', 'Write', 'Edit', 'MultiEdit', 'NotebookEdit'] # MCP tools (mcp__*) are always checked separately +NATIVE_FILE_TOOLS = {'Read', 'Write', 'Edit', 'MultiEdit', 'NotebookEdit'} MCP_TOOL_PREFIX = 'mcp__' CLAUDE_MCP_CONFIG_PATH = Path.home() / ".claude.json" CLAUDE_PLUGIN_CACHE_DIR = Path.home() / ".claude" / "plugins" / "cache" @@ -488,6 +488,13 @@ def _build_user_prompt_payload(recent_user_prompts: List[str]) -> Dict: } +def _tool_file_path(tool_input: Dict) -> Optional[str]: + """Target path for a native file tool. NotebookEdit nests it under + notebook_path; Read/Write/Edit/MultiEdit use file_path.""" + path = tool_input.get('file_path') or tool_input.get('notebook_path') + return path if isinstance(path, str) and path else None + + def extract_command_for_pretool(event: Dict) -> str: """Extract command from tool_input based on tool type.""" tool_input = event.get('tool_input') or {} @@ -499,9 +506,11 @@ def extract_command_for_pretool(event: Dict) -> str: # MCP tools: stringify the input if tool_name.startswith(MCP_TOOL_PREFIX): return json.dumps(tool_input) - # File tools: file_path - if tool_name in ['Write', 'Edit', 'Read'] and 'file_path' in tool_input: - return tool_input['file_path'] + # File tools: file_path / notebook_path + if tool_name in NATIVE_FILE_TOOLS: + path = _tool_file_path(tool_input) + if path: + return path # Grep: pattern if tool_name == 'Grep' and 'pattern' in tool_input: return tool_input['pattern'] @@ -1084,8 +1093,9 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: # Build metadata with the raw event metadata = dict(event) tool_input = event.get('tool_input') or {} - if 'file_path' in tool_input: - metadata['file_path'] = tool_input['file_path'] + file_path = _tool_file_path(tool_input) + if file_path: + metadata['file_path'] = file_path if is_mcp: # Parse mcp____ to extract server and tool for gateway matching @@ -1122,7 +1132,7 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: 'model': model, 'event_name': 'tool_use', 'git_remote_url': _get_git_context( - session_id, _repo_context_dir(event.get('cwd'), tool_input.get('file_path')) + session_id, _repo_context_dir(event.get('cwd'), file_path) ), 'pre_tool_use_data': { 'command': command, diff --git a/test_repo_allowlist.py b/test_repo_allowlist.py index 73d0a0a5..3f61a80e 100644 --- a/test_repo_allowlist.py +++ b/test_repo_allowlist.py @@ -29,6 +29,8 @@ def _load(name, rel): # (label, module, native file-tool name, session-id key, file_path key) HOOK_TOOL_CASES = [ ("claude-code", cc, "Edit", "session_id", "file_path"), + ("claude-code-multiedit", cc, "MultiEdit", "session_id", "file_path"), + ("claude-code-notebookedit", cc, "NotebookEdit", "session_id", "notebook_path"), ("copilot", co, "Edit", "session_id", "filePath"), ("cursor", cur, "Write", "conversation_id", "file_path"), ] From 4a8021e9b0a2b65f24b2e76074035b39881947da Mon Sep 17 00:00:00 2001 From: MohamedAklamaash Date: Tue, 30 Jun 2026 14:33:50 +0530 Subject: [PATCH 6/7] test: cover _tool_file_path negative shapes in repo allowlist Assert _tool_file_path returns None for empty/missing file_path so the file-tool repo resolution never treats a blank path as a target. Co-Authored-By: Claude Opus 4.8 --- test_repo_allowlist.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test_repo_allowlist.py b/test_repo_allowlist.py index 3f61a80e..0a07f071 100644 --- a/test_repo_allowlist.py +++ b/test_repo_allowlist.py @@ -229,6 +229,12 @@ def test_prompt_bodies_omit_git_remote_url(self): self.assertNotIn("git_remote_url", body) +class TestToolFilePath(unittest.TestCase): + def test_empty_or_missing_path_is_none(self): + self.assertIsNone(cc._tool_file_path({})) + self.assertIsNone(cc._tool_file_path({'file_path': ''})) + + class TestStripGitCredentials(unittest.TestCase): def _both(self): return ALL_HOOKS From 122e7ed1e0badb4d569ff0a91cd12289a58ce29b Mon Sep 17 00:00:00 2001 From: MohamedAklamaash Date: Tue, 30 Jun 2026 16:22:22 +0530 Subject: [PATCH 7/7] fix(repo-allowlist): harden git-context capture in client hooks - Fail closed in _strip_git_credentials: return None on parse failure instead of the raw credential-bearing URL (no userinfo leak to gateway). - Do not cache transient git-subprocess failures; log them at debug level with session/cwd. Bound _GIT_CONTEXT_CACHE to 256 entries (FIFO evict). - Skip the git-context lookup on the Slack approval-retry path whose body is never sent. - copilot: resolve apply_patch repo identity from the patch payload path. Mirrored across claude-code, copilot, and cursor hooks. Co-Authored-By: Claude Opus 4.8 --- claude-code/hooks/unbound.py | 49 +++++++++++-------- copilot/hooks/unbound.py | 52 ++++++++++++-------- cursor/unbound.py | 49 +++++++++++-------- test_repo_allowlist.py | 95 ++++++++++++++++++++++++++++++++++++ 4 files changed, 187 insertions(+), 58 deletions(-) diff --git a/claude-code/hooks/unbound.py b/claude-code/hooks/unbound.py index fe95f18f..7e0194bd 100644 --- a/claude-code/hooks/unbound.py +++ b/claude-code/hooks/unbound.py @@ -947,6 +947,13 @@ def _get_device_serial() -> Optional[str]: _GIT_CONTEXT_CACHE: Dict = {} +_GIT_CONTEXT_CACHE_MAX = 256 + + +def _cache_git_context(key, value) -> None: + if key not in _GIT_CONTEXT_CACHE and len(_GIT_CONTEXT_CACHE) >= _GIT_CONTEXT_CACHE_MAX: + _GIT_CONTEXT_CACHE.pop(next(iter(_GIT_CONTEXT_CACHE)), None) + _GIT_CONTEXT_CACHE[key] = value def _strip_git_credentials(url): @@ -964,29 +971,32 @@ def _strip_git_credentials(url): return url return prefix + authority[at + 1:] + tail except Exception: - return url + return None def _get_git_context(session_id: Optional[str], cwd: Optional[str]) -> Optional[str]: - """Credential-stripped origin remote URL for cwd, or None. Cached per - (session_id, cwd); never raises.""" + """Credential-stripped origin remote URL for cwd, or None. Successful and + conclusive-no-repo lookups are cached per (session_id, cwd); transient + failures are not cached. Never raises.""" key = (session_id, cwd) if key in _GIT_CONTEXT_CACHE: return _GIT_CONTEXT_CACHE[key] + if not cwd: + return None + try: + out = subprocess.run( + ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], + capture_output=True, text=True, timeout=2, + ) + except Exception as exc: + log_error(f"git context lookup failed session={session_id} cwd={cwd}: {exc}", 'git_context') + return None result = None - if cwd: - try: - out = subprocess.run( - ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], - capture_output=True, text=True, timeout=2, - ) - if out.returncode == 0: - url = out.stdout.strip() - if url: - result = _strip_git_credentials(url) - except Exception: - result = None - _GIT_CONTEXT_CACHE[key] = result + if out.returncode == 0: + url = out.stdout.strip() + if url: + result = _strip_git_credentials(url) + _cache_git_context(key, result) return result @@ -1131,9 +1141,6 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: 'unbound_app_label': 'claude-code', 'model': model, 'event_name': 'tool_use', - 'git_remote_url': _get_git_context( - session_id, _repo_context_dir(event.get('cwd'), file_path) - ), 'pre_tool_use_data': { 'command': command, 'tool_name': tool_name, @@ -1182,6 +1189,10 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: 'additionalContext': 'This command was blocked by an organization security policy that requires approval. Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. The user must approve via Slack and retry.', }) + request_body['git_remote_url'] = _get_git_context( + session_id, _repo_context_dir(event.get('cwd'), file_path) + ) + if need_pull_policies: request_body['pull_policies'] = True diff --git a/copilot/hooks/unbound.py b/copilot/hooks/unbound.py index 1fddb277..f3c772aa 100644 --- a/copilot/hooks/unbound.py +++ b/copilot/hooks/unbound.py @@ -920,6 +920,13 @@ def transform_response_for_copilot_prompt(api_response): _GIT_CONTEXT_CACHE = {} +_GIT_CONTEXT_CACHE_MAX = 256 + + +def _cache_git_context(key, value): + if key not in _GIT_CONTEXT_CACHE and len(_GIT_CONTEXT_CACHE) >= _GIT_CONTEXT_CACHE_MAX: + _GIT_CONTEXT_CACHE.pop(next(iter(_GIT_CONTEXT_CACHE)), None) + _GIT_CONTEXT_CACHE[key] = value def _strip_git_credentials(url): @@ -937,29 +944,32 @@ def _strip_git_credentials(url): return url return prefix + authority[at + 1:] + tail except Exception: - return url + return None def _get_git_context(session_id, cwd): - """Credential-stripped origin remote URL for cwd, or None. Cached per - (session_id, cwd); never raises.""" + """Credential-stripped origin remote URL for cwd, or None. Successful and + conclusive-no-repo lookups are cached per (session_id, cwd); transient + failures are not cached. Never raises.""" key = (session_id, cwd) if key in _GIT_CONTEXT_CACHE: return _GIT_CONTEXT_CACHE[key] + if not cwd: + return None + try: + out = subprocess.run( + ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], + capture_output=True, text=True, timeout=2, + ) + except Exception as exc: + log_error(f"git context lookup failed session={session_id} cwd={cwd}: {exc}", 'git_context') + return None result = None - if cwd: - try: - out = subprocess.run( - ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], - capture_output=True, text=True, timeout=2, - ) - if out.returncode == 0: - url = out.stdout.strip() - if url: - result = _strip_git_credentials(url) - except Exception: - result = None - _GIT_CONTEXT_CACHE[key] = result + if out.returncode == 0: + url = out.stdout.strip() + if url: + result = _strip_git_credentials(url) + _cache_git_context(key, result) return result @@ -1060,7 +1070,8 @@ def process_pre_tool_use(event, api_key): # Preserve the raw event (raw tool_name + tool_input) inside metadata. metadata = dict(event) - file_path = tool_input.get('filePath') or tool_input.get('path') or tool_input.get('file_path') + file_path = (tool_input.get('filePath') or tool_input.get('path') + or tool_input.get('file_path') or _extract_patch_target_path(tool_input)) if file_path: metadata['file_path'] = file_path @@ -1078,9 +1089,6 @@ def process_pre_tool_use(event, api_key): 'unbound_app_label': 'copilot', 'model': model, 'event_name': 'tool_use', - 'git_remote_url': _get_git_context( - session_id, _repo_context_dir(event.get('cwd'), file_path) - ), 'pre_tool_use_data': { 'tool_name': canonical, 'command': command, @@ -1116,6 +1124,10 @@ def process_pre_tool_use(event, api_key): 'additionalContext': 'This action was blocked by an organization security policy that requires approval. Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. The user must approve via Slack and retry.', }) + request_body['git_remote_url'] = _get_git_context( + session_id, _repo_context_dir(event.get('cwd'), file_path) + ) + if need_pull_policies: request_body['pull_policies'] = True diff --git a/cursor/unbound.py b/cursor/unbound.py index 9c978565..b52c2498 100644 --- a/cursor/unbound.py +++ b/cursor/unbound.py @@ -663,6 +663,13 @@ def build_account_identity(event=None, probe=False): _GIT_CONTEXT_CACHE = {} +_GIT_CONTEXT_CACHE_MAX = 256 + + +def _cache_git_context(key, value): + if key not in _GIT_CONTEXT_CACHE and len(_GIT_CONTEXT_CACHE) >= _GIT_CONTEXT_CACHE_MAX: + _GIT_CONTEXT_CACHE.pop(next(iter(_GIT_CONTEXT_CACHE)), None) + _GIT_CONTEXT_CACHE[key] = value def _strip_git_credentials(url): @@ -680,29 +687,32 @@ def _strip_git_credentials(url): return url return prefix + authority[at + 1:] + tail except Exception: - return url + return None def _get_git_context(session_id, cwd): - """Credential-stripped origin remote URL for cwd, or None. Cached per - (session_id, cwd); never raises.""" + """Credential-stripped origin remote URL for cwd, or None. Successful and + conclusive-no-repo lookups are cached per (session_id, cwd); transient + failures are not cached. Never raises.""" key = (session_id, cwd) if key in _GIT_CONTEXT_CACHE: return _GIT_CONTEXT_CACHE[key] + if not cwd: + return None + try: + out = subprocess.run( + ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], + capture_output=True, text=True, timeout=2, + ) + except Exception as exc: + log_error(f"git context lookup failed session={session_id} cwd={cwd}: {exc}", 'git_context') + return None result = None - if cwd: - try: - out = subprocess.run( - ['git', '-C', cwd, 'config', '--get', 'remote.origin.url'], - capture_output=True, text=True, timeout=2, - ) - if out.returncode == 0: - url = out.stdout.strip() - if url: - result = _strip_git_credentials(url) - except Exception: - result = None - _GIT_CONTEXT_CACHE[key] = result + if out.returncode == 0: + url = out.stdout.strip() + if url: + result = _strip_git_credentials(url) + _cache_git_context(key, result) return result @@ -756,9 +766,6 @@ def process_pre_tool_use(event, api_key): 'unbound_app_label': 'cursor', 'model': model, 'event_name': 'tool_use', - 'git_remote_url': _get_git_context( - conversation_id, _repo_context_dir(event.get('cwd'), file_path) - ), 'pre_tool_use_data': { 'tool_name': tool_name, 'command': '', @@ -804,6 +811,10 @@ def process_pre_tool_use(event, api_key): 'agent_message': 'This action was blocked by an organization security policy that requires approval. Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. The user must approve via Slack and retry.', } + request_body['git_remote_url'] = _get_git_context( + conversation_id, _repo_context_dir(event.get('cwd'), file_path) + ) + if need_pull_policies: request_body['pull_policies'] = True diff --git a/test_repo_allowlist.py b/test_repo_allowlist.py index 0a07f071..0f8dd345 100644 --- a/test_repo_allowlist.py +++ b/test_repo_allowlist.py @@ -277,6 +277,101 @@ def test_no_userinfo_unchanged(self): with self.subTest(hook=label, url=url): self.assertEqual(mod._strip_git_credentials(url), url) + def test_parse_failure_fails_closed(self): + for label, mod in self._both(): + with self.subTest(hook=label): + with patch.object(mod.re, "match", side_effect=ValueError("boom")): + out = mod._strip_git_credentials( + "https://user:token@github.com/org/repo.git") + self.assertIsNone(out) + + +class TestGitContextFailureHandling(unittest.TestCase): + def setUp(self): + for _, mod in ALL_HOOKS: + mod._GIT_CONTEXT_CACHE.clear() + + def test_transient_failure_not_cached(self): + repo = _make_repo("https://github.com/org/repo.git") + for label, mod in ALL_HOOKS: + with self.subTest(hook=label): + mod._GIT_CONTEXT_CACHE.clear() + real = subprocess.run + state = {"n": 0} + + def flaky(*a, **k): + state["n"] += 1 + if state["n"] == 1: + raise subprocess.TimeoutExpired("git", 2) + return real(*a, **k) + + with patch("subprocess.run", side_effect=flaky): + first = mod._get_git_context("s_flaky", repo) + second = mod._get_git_context("s_flaky", repo) + self.assertIsNone(first) + self.assertEqual(second, "https://github.com/org/repo.git") + + def test_cache_is_bounded(self): + for label, mod in ALL_HOOKS: + with self.subTest(hook=label): + mod._GIT_CONTEXT_CACHE.clear() + cap = mod._GIT_CONTEXT_CACHE_MAX + for i in range(cap + 50): + mod._cache_git_context(("s", i), "url") + self.assertLessEqual(len(mod._GIT_CONTEXT_CACHE), cap) + + +class TestApprovalRetrySkipsGit(unittest.TestCase): + def setUp(self): + for _, mod in ALL_HOOKS: + mod._GIT_CONTEXT_CACHE.clear() + + def test_retry_path_skips_git_context(self): + repo = _make_repo("https://github.com/org/repo.git") + cases = [ + (cc, {"session_id": "r", "tool_name": "Bash", + "tool_input": {"command": "ls"}, "cwd": repo}), + (co, {"session_id": "r", "tool_name": "Bash", + "tool_input": {"command": "ls"}, "cwd": repo}), + (cur, {"conversation_id": "r", "tool_name": "Write", + "tool_input": {"file_path": str(Path(repo) / "x.py")}, "cwd": repo}), + ] + for mod, event in cases: + with self.subTest(mod=mod.__name__), contextlib.ExitStack() as stack: + stack.enter_context(patch.object(mod, "_is_approval_retry", return_value=True)) + stack.enter_context(patch.object( + mod, "_get_approval_marker_data", + return_value={"policyIds": [], "applicationId": "", "requestId": ""})) + stack.enter_context(patch.object(mod, "poll_approval_status", return_value="approved")) + stack.enter_context(patch.object(mod, "_clear_approval_marker")) + stack.enter_context(patch.object(mod, "load_policy_cache", return_value=None)) + if hasattr(mod, "build_account_identity"): + stack.enter_context(patch.object(mod, "build_account_identity", return_value={})) + git = stack.enter_context(patch.object(mod, "_get_git_context")) + mod.process_pre_tool_use(event, "k") + git.assert_not_called() + + +class TestApplyPatchRepoContext(unittest.TestCase): + """Copilot canonicalizes apply_patch as Edit; the repo identity must come + from the path inside the patch payload, not the (missing) filePath arg.""" + + def setUp(self): + co._GIT_CONTEXT_CACHE.clear() + + def test_apply_patch_resolves_repo_from_payload(self): + parent = tempfile.mkdtemp() + repo = Path(parent) / "svc" + (repo / "src").mkdir(parents=True) + _git(["init", "-q"], str(repo)) + _git(["remote", "add", "origin", "https://github.com/org/svc.git"], str(repo)) + target = str(repo / "src" / "a.py") + patch_text = f"*** Begin Patch\n*** Update File: {target}\n@@\n-old\n+new\n*** End Patch\n" + body = _capture(co, co.process_pre_tool_use, { + "session_id": "ap", "tool_name": "apply_patch", + "tool_input": {"input": patch_text}, "cwd": parent}) + self.assertEqual(body["git_remote_url"], "https://github.com/org/svc.git") + if __name__ == "__main__": unittest.main()