From e9316ca1fa70aa4fdc0b4278fe855ac45b218e04 Mon Sep 17 00:00:00 2001 From: Nanda Pranesh Date: Sat, 20 Jun 2026 00:35:55 +0530 Subject: [PATCH 1/2] Fix Copilot MCP-server sanctioning silent bypass (identification + gated fail-secure + telemetry) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Copilot emits sanitized `-` tool names with no `mcp__` prefix and no structured server field, so the PreToolUse hook reverse-engineers the server from the MCP config — and when it couldn't resolve it returned {} (ALLOW), silently bypassing sanctioning (which works on Claude Code & Cursor through the same gateway path). All changes in copilot/hooks/unbound.py: - Identification: honor COPILOT_HOME for the CLI config dir; parse the flat top-level server form; fall back to the SessionStart-logged cwd when the PreToolUse event omits it (and log SessionStart so the fallback works); defensive structured-field read. - Fail-secure (gated, DEFAULT-OFF via UNBOUND_COPILOT_MCP_DENY_UNRESOLVED): an unresolved tool on an MCP-configured machine that isn't a known built-in is DENIED instead of allowed. Off by default => inert; distinct from the infra fail-open path. - Telemetry: per-surface resolution categories over a SEPARATE rate-limit budget so success telemetry can't suppress real error/bypass reports. Gateway/backend unchanged (already enforce given a resolved server). New test_pretool_mcp.py (38 tests). SOC 2: touches the policy-enforcement hook path — needs named-approver review. Co-Authored-By: Claude Opus 4.8 (1M context) --- copilot/hooks/test_pretool_mcp.py | 647 ++++++++++++++++++++++++++++++ copilot/hooks/unbound.py | 270 +++++++++++-- 2 files changed, 892 insertions(+), 25 deletions(-) create mode 100644 copilot/hooks/test_pretool_mcp.py diff --git a/copilot/hooks/test_pretool_mcp.py b/copilot/hooks/test_pretool_mcp.py new file mode 100644 index 00000000..3326bdfe --- /dev/null +++ b/copilot/hooks/test_pretool_mcp.py @@ -0,0 +1,647 @@ +""" +Integration tests for Copilot PreToolUse MCP identification + fail-secure. + +Driven through process_pre_tool_use / main() (the outermost layer) — assertions +are on the RETURNED dict, never on internal helpers. No network: send_to_hook_api +and report_error_to_gateway are patched. Mirrors the harness in +claude-code/hooks/test_identity.py (sibling `unbound` module, unittest, tmpdir + +patch.object for LOG_DIR / POLICY_CACHE_FILE / config paths). + +Covers: + - both surfaces (VS Code camelCase + CLI snake_case) resolve identically + - cwd resolution via event cwd AND via the SessionStart-cwd fallback + - COPILOT_HOME override + - flat top-level server form + - fail-secure matrix (flag ON) + inert-when-OFF + - infra fail-open (resolved call, gateway {}) stays distinct from identity deny + - never-crash on garbage/oversized/missing input + - telemetry categories on each path +""" + +import json +import os +import tempfile +import unittest +from datetime import datetime, timezone +from pathlib import Path +from unittest.mock import patch + +import unbound + + +def _fresh_cache(): + """A present, non-stale policy cache so resolved tools reach the API and + native-file tools aren't force-pulled.""" + return { + 'last_synced': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), + 'tools_to_check': ['Read', 'Write', 'Edit', 'Bash'], + 'policy_check_failure_action': 'allow', + } + + +class _Base(unittest.TestCase): + def setUp(self): + self.tmp = Path(tempfile.mkdtemp()) + + # Redirect all hook state into the tmpdir. + self.audit_log = self.tmp / "agent-audit.log" + self.policy_cache = self.tmp / ".policy_cache.json" + self._patchers = [ + patch.object(unbound, "LOG_DIR", self.tmp), + patch.object(unbound, "AUDIT_LOG", self.audit_log), + patch.object(unbound, "ERROR_LOG", self.tmp / "error.log"), + patch.object(unbound, "POLICY_CACHE_FILE", self.policy_cache), + patch.object(unbound, "LAST_REPORT_FILE", self.tmp / ".last_error_report"), + # No network, ever. + patch.object(unbound, "send_to_hook_api", return_value={}), + patch.object(unbound, "report_error_to_gateway"), + ] + for p in self._patchers: + p.start() + self.addCleanup(p.stop) + + self.policy_cache.write_text(json.dumps(_fresh_cache()), encoding="utf-8") + + # MCP config the hook will read. Tests point _copilot_mcp_config_paths + # here unless they specifically exercise path resolution. + self.mcp_config = self.tmp / "mcp-config.json" + + def _point_config_paths_at(self, *paths): + return patch.object(unbound, "_copilot_mcp_config_paths", lambda cwd=None: list(paths)) + + def _write_mcp(self, data): + self.mcp_config.write_text(json.dumps(data), encoding="utf-8") + + @property + def mock_api(self): + return unbound.send_to_hook_api + + @property + def mock_telemetry(self): + return unbound.report_error_to_gateway + + def _telemetry_categories(self): + return [kw.get('category') for _, kw in self.mock_telemetry.call_args_list] + + +class TestResolutionBothSurfaces(_Base): + """A configured `github` server resolves `github-create_issue` identically on + both surfaces, and the resolved MCP tool reaches the gateway as + mcp__github__create_issue.""" + + def setUp(self): + super().setUp() + self._write_mcp({"servers": {"github": {"command": "github-mcp"}}}) + self._cp = self._point_config_paths_at(self.mcp_config) + self._cp.start() + self.addCleanup(self._cp.stop) + + def _run(self, event): + with patch.object(unbound, "send_to_hook_api", return_value={}) as api: + result = unbound.process_pre_tool_use(event, "api-key") + return result, api + + def test_cli_snake_case_resolves(self): + event = { + "hook_event_name": "PreToolUse", + "session_id": "s1", + "tool_name": "github-create_issue", + "tool_input": {"title": "x"}, + } + result, api = self._run(event) + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__github__create_issue') + self.assertEqual(result, {}) # gateway returned {} → allow + + def test_vscode_camel_case_resolves(self): + event = { + "hookEventName": "PreToolUse", + "sessionId": "s1", + "toolName": "github-create_issue", + "toolArgs": {"title": "x"}, + } + result, api = self._run(event) + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__github__create_issue') + self.assertEqual(result, {}) + + def test_both_surfaces_emit_same_canonical(self): + cli = {"tool_name": "github-create_issue", "session_id": "s1"} + vs = {"toolName": "github-create_issue", "sessionId": "s1"} + with patch.object(unbound, "send_to_hook_api", return_value={}) as api: + unbound.process_pre_tool_use(cli, "k") + cli_sent = api.call_args[0][0]['pre_tool_use_data']['tool_name'] + with patch.object(unbound, "send_to_hook_api", return_value={}) as api: + unbound.process_pre_tool_use(vs, "k") + vs_sent = api.call_args[0][0]['pre_tool_use_data']['tool_name'] + self.assertEqual(cli_sent, vs_sent) + self.assertEqual(cli_sent, 'mcp__github__create_issue') + + def test_resolved_emits_resolved_telemetry(self): + self.mock_telemetry.reset_mock() + unbound.process_pre_tool_use( + {"tool_name": "github-create_issue", "session_id": "s1"}, "k" + ) + self.assertIn('copilot_mcp_resolved', self._telemetry_categories()) + + def test_mcp_double_underscore_form_resolves(self): + # The self-delimiting mcp__ form short-circuits before config lookup. + event = {"tool_name": "mcp__github__create_issue", "session_id": "s1"} + with patch.object(unbound, "send_to_hook_api", return_value={}) as api: + unbound.process_pre_tool_use(event, "k") + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__github__create_issue') + + +class TestCwdFallback(_Base): + """Workspace-scoped server resolves via event cwd, and via the SessionStart + cwd fallback when PreToolUse omits cwd.""" + + def setUp(self): + super().setUp() + # Workspace config under a project dir; resolution must find it via cwd. + self.project = self.tmp / "project" + (self.project).mkdir() + (self.project / ".mcp.json").write_text( + json.dumps({"mcpServers": {"ws": {"command": "ws-mcp"}}}), encoding="utf-8" + ) + + def test_resolves_via_event_cwd(self): + event = {"tool_name": "ws-do_thing", "session_id": "s1", "cwd": str(self.project)} + with patch.object(unbound, "send_to_hook_api", return_value={}) as api: + unbound.process_pre_tool_use(event, "k") + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__ws__do_thing') + + def test_resolves_via_session_start_cwd_when_pretool_omits_cwd(self): + # Log a SessionStart carrying cwd (via main()), then a PreToolUse with NO cwd. + with patch("sys.stdin") as stdin: + stdin.read.return_value = json.dumps({ + "hook_event_name": "SessionStart", + "session_id": "s1", + "cwd": str(self.project), + }) + with patch.object(unbound, "_check_self_update"), \ + patch.object(unbound, "_dispatch_discovery"), \ + patch("builtins.print"): + unbound.main() + + # Sanity: the helper recovers it. + self.assertEqual(unbound.get_session_start_cwd("s1"), str(self.project)) + + event = {"tool_name": "ws-do_thing", "session_id": "s1"} # no cwd + with patch.object(unbound, "send_to_hook_api", return_value={}) as api: + unbound.process_pre_tool_use(event, "k") + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__ws__do_thing') + + +class TestCopilotHomeOverride(_Base): + """COPILOT_HOME relocates the CLI config dir; the server there resolves.""" + + def test_copilot_home_override_resolves(self): + reloc = self.tmp / "relocated-copilot" + reloc.mkdir() + (reloc / "mcp-config.json").write_text( + json.dumps({"mcpServers": {"relo": {"command": "relo-mcp"}}}), encoding="utf-8" + ) + # Do NOT patch _copilot_mcp_config_paths — exercise real resolution. + # Pin home to tmp and set COPILOT_HOME so _copilot_cli_config_dir wins. + with patch.object(unbound.Path, "home", staticmethod(lambda: self.tmp)), \ + patch.dict(os.environ, {"COPILOT_HOME": str(reloc)}), \ + patch.object(unbound, "send_to_hook_api", return_value={}) as api: + event = {"tool_name": "relo-go", "session_id": "s1"} + unbound.process_pre_tool_use(event, "k") + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__relo__go') + + +class TestFlatTopLevelForm(_Base): + """Flat (unwrapped) top-level server form resolves.""" + + def test_flat_form_resolves(self): + self._write_mcp({"github": {"command": "github-mcp"}}) + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}) as api: + event = {"tool_name": "github-create_issue", "session_id": "s1"} + unbound.process_pre_tool_use(event, "k") + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__github__create_issue') + + def test_flat_form_ignores_scalar_and_non_server_objects(self): + # `inputs` (VS Code block) and a scalar must NOT be treated as servers, + # so a tool named after them stays unresolved (allow when flag OFF). + self._write_mcp({"inputs": {"foo": "bar"}, "version": "1", "github": {"command": "c"}}) + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}) as api: + # `inputs-x` shares the `inputs` prefix but inputs isn't a server. + unbound.process_pre_tool_use({"tool_name": "inputs-x", "session_id": "s1"}, "k") + # Unresolved → no API call. + self.assertFalse(api.called) + + def test_flat_form_inputs_block_with_command_is_not_a_server(self): + # A VS Code `inputs` metadata block can itself carry a `command` key; it + # must NOT mint a phantom `inputs` server, while a real sibling server + # alongside it still resolves. + self._write_mcp({ + "inputs": {"command": "echo", "args": ["${input:token}"]}, + "$schema": "https://example/schema.json", + "github": {"command": "github-mcp"}, + }) + # The `inputs` block must not appear as a parsed server. + with self._point_config_paths_at(self.mcp_config): + servers = unbound.read_copilot_mcp_servers() + self.assertNotIn("inputs", servers) + self.assertIn("github", servers) + + # A tool named after the phantom `inputs` server stays unresolved (no + # API call); the real `github` server still resolves to canonical form. + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}) as api: + unbound.process_pre_tool_use({"tool_name": "inputs-do_thing", "session_id": "s1"}, "k") + self.assertFalse(api.called) # phantom server not minted → unresolved + + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}) as api: + unbound.process_pre_tool_use({"tool_name": "github-create_issue", "session_id": "s1"}, "k") + sent = api.call_args[0][0] + self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__github__create_issue') + + +class TestFailSecureFlagOn(_Base): + """Fail-secure matrix with UNBOUND_COPILOT_MCP_DENY_UNRESOLVED ON.""" + + def setUp(self): + super().setUp() + self._deny = patch.object(unbound, "DENY_UNRESOLVED_MCP", True) + self._deny.start() + self.addCleanup(self._deny.stop) + + def _run(self, event, config): + self._write_mcp(config) if config is not None else None + paths = (self.mcp_config,) if config is not None else () + with self._point_config_paths_at(*paths), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + return unbound.process_pre_tool_use(event, "k") + + def test_unresolved_with_config_nonnative_denies(self): + result = self._run( + {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, + {"servers": {"github": {"command": "c"}}}, + ) + self.assertEqual(result.get('permissionDecision'), 'deny') + self.assertEqual( + result['hookSpecificOutput']['permissionDecision'], 'deny' + ) + + def test_unresolved_with_config_emits_denied_telemetry(self): + self.mock_telemetry.reset_mock() + self._run( + {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, + {"servers": {"github": {"command": "c"}}}, + ) + self.assertIn('copilot_mcp_denied_unresolved', self._telemetry_categories()) + + def test_unresolved_no_config_allows(self): + # No MCP configured at all → cannot be an MCP call → allow. + result = self._run({"tool_name": "weird-thing", "session_id": "s1"}, None) + self.assertEqual(result, {}) + + def test_known_builtin_never_blocked_even_with_config(self): + # `read_file` is a known built-in; never denied even when MCP configured. + result = self._run( + {"tool_name": "read_file", "session_id": "s1", "tool_input": {"filePath": "/x"}}, + {"servers": {"github": {"command": "c"}}}, + ) + self.assertEqual(result, {}) + + def test_internal_tool_never_blocked(self): + result = self._run( + {"tool_name": "manage_todo_list", "session_id": "s1"}, + {"servers": {"github": {"command": "c"}}}, + ) + self.assertEqual(result, {}) + + def test_terminal_like_tool_never_blocked(self): + result = self._run( + {"tool_name": "grep_search", "session_id": "s1", "tool_input": {"query": "x"}}, + {"servers": {"github": {"command": "c"}}}, + ) + self.assertEqual(result, {}) + + def test_vscode_agent_builtins_never_blocked_even_with_config(self): + # The expanded VS Code agent-mode built-in allow-set (read-only search, + # test running, notebook editing, repo/task meta — both camel & snake + # variants) must never be denied when the deny flag is ON, so flipping + # it on can't false-block routine agent built-ins. + builtins = [ + "fetch", "fetch_webpage", "semantic_search", "codebase", "usages", + "findTestFiles", "runTests", "run_tests", "get_errors", + "test_search", "test_failure", "githubRepo", "runTasks", + "runCommands", "open_simple_browser", "editNotebook", + "runNotebookCell", "edit_notebook", "run_notebook_cell", + ] + for tool in builtins: + result = self._run( + {"tool_name": tool, "session_id": "s1"}, + {"servers": {"github": {"command": "c"}}}, + ) + self.assertEqual(result, {}, f"{tool} should never be blocked") + + def test_vscode_agent_builtin_emits_no_denied_telemetry(self): + # A known built-in must not even register as denied/unresolved-with-config. + self.mock_telemetry.reset_mock() + self._run( + {"tool_name": "semantic_search", "session_id": "s1"}, + {"servers": {"github": {"command": "c"}}}, + ) + cats = self._telemetry_categories() + self.assertNotIn('copilot_mcp_denied_unresolved', cats) + self.assertNotIn('copilot_mcp_unresolved_with_config', cats) + + def test_native_tools_unaffected(self): + # Native Read/Write/Edit/Bash go through the normal path (not the MCP + # branch) → resolved call to the gateway, which returns {} → allow. + for tool, payload in [ + ("Read", {"file_path": "/a"}), + ("Write", {"file_path": "/a"}), + ("Edit", {"file_path": "/a"}), + ("Bash", {"command": "ls"}), + ]: + event = {"tool_name": tool, "session_id": "s1", "tool_input": payload} + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}) as api: + self._write_mcp({"servers": {"github": {"command": "c"}}}) + result = unbound.process_pre_tool_use(event, "k") + self.assertEqual(result, {}, f"{tool} should be allowed") + self.assertTrue(api.called, f"{tool} should reach the gateway") + + +class TestFailSecureFlagOffIsInert(_Base): + """Default (flag OFF): the deny case from the ON matrix returns {} — proving + the change is inert until the kill-switch is flipped.""" + + def test_unresolved_with_config_allows_when_flag_off(self): + self.assertFalse(unbound.DENY_UNRESOLVED_MCP) # default + self._write_mcp({"servers": {"github": {"command": "c"}}}) + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + result = unbound.process_pre_tool_use( + {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, "k" + ) + self.assertEqual(result, {}) + + def test_unresolved_with_config_telemetry_still_emitted_when_flag_off(self): + # Observability ships with component 1 regardless of the deny gate. + self.mock_telemetry.reset_mock() + self._write_mcp({"servers": {"github": {"command": "c"}}}) + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + unbound.process_pre_tool_use( + {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, "k" + ) + cats = self._telemetry_categories() + self.assertIn('copilot_mcp_unresolved_with_config', cats) + self.assertNotIn('copilot_mcp_denied_unresolved', cats) + + +class TestInfraVsIdentity(_Base): + """A RESOLVED MCP call where the gateway is unreachable ({}) must honor + policy_check_failure_action — distinct from the identity deny.""" + + def setUp(self): + super().setUp() + self._write_mcp({"servers": {"github": {"command": "c"}}}) + self._cp = self._point_config_paths_at(self.mcp_config) + self._cp.start() + self.addCleanup(self._cp.stop) + + def test_resolved_gateway_down_block_action_blocks_with_infra_reason(self): + # failure-action = block → infra fail-CLOSED with the infra reason. + self.policy_cache.write_text(json.dumps({ + **_fresh_cache(), 'policy_check_failure_action': 'block', + }), encoding="utf-8") + with patch.object(unbound, "send_to_hook_api", return_value={}): + result = unbound.process_pre_tool_use( + {"tool_name": "github-create_issue", "session_id": "s1"}, "k" + ) + self.assertEqual(result.get('permissionDecision'), 'deny') + self.assertEqual( + result['permissionDecisionReason'], unbound.POLICY_CHECK_FAILURE_BLOCK_REASON + ) + + def test_resolved_gateway_down_allow_action_allows(self): + # failure-action = allow (default) → infra fail-OPEN. + with patch.object(unbound, "send_to_hook_api", return_value={}): + result = unbound.process_pre_tool_use( + {"tool_name": "github-create_issue", "session_id": "s1"}, "k" + ) + self.assertEqual(result, {}) + + +class TestNeverCrash(_Base): + """The hook must never raise on customer machines.""" + + def test_oversized_config_does_not_crash(self): + # Real resolution path (no _copilot_mcp_config_paths patch) over a >1MB file. + big = self.tmp / "huge-mcp-config.json" + big.write_text("{" + '"x":1,' * 200000 + '"github":{"command":"c"}}', encoding="utf-8") + with self._point_config_paths_at(big), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + result = unbound.process_pre_tool_use( + {"tool_name": "github-create_issue", "session_id": "s1"}, "k" + ) + self.assertIsInstance(result, dict) # skipped (too big) → unresolved → {} + + def test_garbage_config_does_not_crash(self): + self.mcp_config.write_text("{ this is not json ::: ", encoding="utf-8") + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + result = unbound.process_pre_tool_use( + {"tool_name": "github-create_issue", "session_id": "s1"}, "k" + ) + self.assertIsInstance(result, dict) + + def test_missing_cwd_and_no_config_does_not_crash(self): + with self._point_config_paths_at(), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + result = unbound.process_pre_tool_use({"tool_name": "whatever"}, "k") + self.assertIsInstance(result, dict) + + def test_malformed_event_does_not_crash(self): + with self._point_config_paths_at(): + result = unbound.process_pre_tool_use({}, "k") + self.assertIsInstance(result, dict) + + def test_main_session_start_garbage_does_not_crash(self): + with patch("sys.stdin") as stdin: + stdin.read.return_value = "not json at all" + with patch("builtins.print") as pr: + unbound.main() + pr.assert_called() # emitted {} and returned + + +class TestTelemetryCategories(_Base): + """Each resolution outcome emits its stable category (names only).""" + + def test_unresolved_no_config_category(self): + self.mock_telemetry.reset_mock() + with self._point_config_paths_at(), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + unbound.process_pre_tool_use({"tool_name": "weird-thing", "session_id": "s1"}, "k") + self.assertIn('copilot_mcp_unresolved_no_config', self._telemetry_categories()) + + def test_unresolved_with_config_category(self): + self.mock_telemetry.reset_mock() + self._write_mcp({"servers": {"github": {"command": "c"}}}) + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + unbound.process_pre_tool_use({"tool_name": "unknown-xyz", "session_id": "s1"}, "k") + self.assertIn('copilot_mcp_unresolved_with_config', self._telemetry_categories()) + + def test_resolved_category(self): + self.mock_telemetry.reset_mock() + self._write_mcp({"servers": {"github": {"command": "c"}}}) + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + unbound.process_pre_tool_use({"tool_name": "github-x", "session_id": "s1"}, "k") + self.assertIn('copilot_mcp_resolved', self._telemetry_categories()) + + def test_telemetry_payload_contains_only_names_not_args(self): + # The redaction discipline: the telemetry message must not carry args. + self.mock_telemetry.reset_mock() + self._write_mcp({"servers": {"github": {"command": "c"}}}) + with self._point_config_paths_at(self.mcp_config), \ + patch.object(unbound, "send_to_hook_api", return_value={}): + unbound.process_pre_tool_use( + {"tool_name": "github-x", "session_id": "s1", + "tool_input": {"secret_token": "sk-shouldnotappear"}}, + "k", + ) + for args, _ in self.mock_telemetry.call_args_list: + self.assertNotIn("sk-shouldnotappear", args[0]) + + +class TestResolutionTelemetryBudgetIsolation(unittest.TestCase): + """Resolution telemetry must use a SEPARATE rate-limit budget so it can never + suppress a genuine error/bypass report. Exercises the REAL + report_error_to_gateway + _should_report against real (tmpdir) throttle + files; only the network POST is stubbed (counted, never sent).""" + + def setUp(self): + self.tmp = Path(tempfile.mkdtemp()) + self._patchers = [ + patch.object(unbound, "LOG_DIR", self.tmp), + patch.object(unbound, "ERROR_LOG", self.tmp / "error.log"), + patch.object(unbound, "LAST_REPORT_FILE", self.tmp / ".last_error_report"), + patch.object(unbound, "LAST_RESOLUTION_REPORT_FILE", self.tmp / ".last_resolution_report"), + # Stub only the actual network POST so we count sends without curl. + # report_error_to_gateway / _should_report run for real. + patch.object(unbound, "_post_error_payload"), + ] + for p in self._patchers: + p.start() + self.addCleanup(p.stop) + + @property + def _post(self): + return unbound._post_error_payload + + def test_resolution_telemetry_does_not_consume_error_budget(self): + # 1) Routine resolution telemetry fires (consuming ONLY its own budget). + unbound._report_mcp_resolution( + "copilot_mcp_resolved", "github-x", {"tool_name": "github-x"}, "k" + ) + self.assertEqual(self._post.call_count, 1) + + # 2) A genuine error/bypass report — issued immediately after, well + # within the 60s window — must STILL go out. If resolution telemetry + # shared the error budget, this would be throttled (regression). + unbound.report_error_to_gateway( + "Hook bypassed_due_to_failure: gateway unreachable", + category="bypassed_due_to_failure", + api_key="k", + ) + self.assertEqual( + self._post.call_count, 2, + "real error report was suppressed by resolution telemetry's budget", + ) + + def test_resolution_budget_self_throttles_independently(self): + # Resolution telemetry still rate-limits itself at 1/60s on its own file. + for _ in range(3): + unbound._report_mcp_resolution( + "copilot_mcp_resolved", "github-x", {"tool_name": "github-x"}, "k" + ) + self.assertEqual(self._post.call_count, 1) + + def test_error_budget_does_not_consume_resolution_budget(self): + # Symmetric: a real error report must not starve resolution telemetry. + unbound.report_error_to_gateway("real error", category="general", api_key="k") + self.assertEqual(self._post.call_count, 1) + unbound._report_mcp_resolution( + "copilot_mcp_resolved", "github-x", {"tool_name": "github-x"}, "k" + ) + self.assertEqual( + self._post.call_count, 2, + "resolution telemetry was suppressed by the error budget", + ) + + +class TestCleanupPreservesSessionStart(unittest.TestCase): + """cleanup_old_logs must preserve the current session's SessionStart record + so the cwd/model fallback survives a very long single session.""" + + def setUp(self): + self.tmp = Path(tempfile.mkdtemp()) + self._patchers = [ + patch.object(unbound, "AUDIT_LOG", self.tmp / "agent-audit.log"), + patch.object(unbound, "ERROR_LOG", self.tmp / "error.log"), + patch.object(unbound, "AUDIT_LOG_TOTAL_LIMIT", 100), + ] + for p in self._patchers: + p.start() + self.addCleanup(p.stop) + + def test_long_single_session_keeps_session_start_cwd_and_model(self): + unbound.append_to_audit_log({"event": { + "hook_event_name": "SessionStart", "session_id": "s1", + "cwd": "/proj", "model": "gpt-x", + }}) + for i in range(150): + unbound.append_to_audit_log({"event": { + "hook_event_name": "PreToolUse", "session_id": "s1", "seq": i, + }}) + + unbound.cleanup_old_logs() + + # The SessionStart-derived fallbacks must still resolve. + self.assertEqual(unbound.get_session_start_cwd("s1"), "/proj") + self.assertEqual(unbound.get_session_start_model("s1"), "gpt-x") + # Still bounded: the tail window plus the preserved SessionStart. + self.assertEqual(len(unbound.load_existing_logs()), 101) + + def test_long_session_with_session_start_in_tail_is_unchanged(self): + # If SessionStart already falls inside the tail window, no duplication. + for i in range(150): + unbound.append_to_audit_log({"event": { + "hook_event_name": "PreToolUse", "session_id": "s1", "seq": i, + }}) + unbound.append_to_audit_log({"event": { + "hook_event_name": "SessionStart", "session_id": "s1", + "cwd": "/late", "model": "m", + }}) + + unbound.cleanup_old_logs() + + logs = unbound.load_existing_logs() + self.assertEqual(len(logs), 100) + starts = [ + l for l in logs + if l.get("event", {}).get("hook_event_name") == "SessionStart" + ] + self.assertEqual(len(starts), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/copilot/hooks/unbound.py b/copilot/hooks/unbound.py index b0dae1a5..286ea402 100644 --- a/copilot/hooks/unbound.py +++ b/copilot/hooks/unbound.py @@ -48,6 +48,10 @@ AUDIT_LOG = LOG_DIR / "agent-audit.log" ERROR_LOG = LOG_DIR / "error.log" LAST_REPORT_FILE = LOG_DIR / ".last_error_report" +# Resolution telemetry has its OWN throttle budget (separate file) so routine +# copilot_mcp_resolved/unresolved telemetry can never consume the error/bypass +# budget guarded by LAST_REPORT_FILE and suppress a genuine error report. +LAST_RESOLUTION_REPORT_FILE = LOG_DIR / ".last_resolution_report" SELF_UPDATE_URL = "https://raw.githubusercontent.com/websentry-ai/setup/refs/heads/main/copilot/hooks/unbound.py" SELF_UPDATE_INTERVAL_SECONDS = 2 * 3600 @@ -119,6 +123,58 @@ } ALLOWED_NON_MCP_HOOK_NAMES = {'Bash', 'Read', 'Write', 'Edit'} # MCP tools (mcp*) are always checked separately NATIVE_FILE_TOOLS = {'Read', 'Write', 'Edit'} + +# Additional VS Code agent-mode built-ins that carry no security-relevant action +# of their own (read-only search/fetch, test running, notebook editing, terminal +# meta) and are NOT in the scoring sets above. Listed here ONLY so the +# fail-secure deny never false-blocks them when the kill-switch is flipped ON; +# they are not mapped/scored as analytics (canonical_tool_name still returns '' +# for them, exactly as before). +# +# This set is INTENTIONALLY CONSERVATIVE — it lists known-benign built-ins by +# name rather than allow-by-default. Flipping UNBOUND_COPILOT_MCP_DENY_UNRESOLVED +# ON is gated on telemetry showing `copilot_mcp_unresolved_with_config` ~= 0: +# the soak surfaces any built-in we missed (as an unresolved-with-config event) +# so this set can be topped up before the deny is ever enabled in prod. +VSCODE_AGENT_BUILTINS = { + # read-only search / context retrieval / web fetch + 'fetch', 'fetch_webpage', 'fetchWebPage', + 'semantic_search', 'semanticSearch', + 'codebase', 'usages', + # test discovery / execution + 'findTestFiles', 'find_test_files', + 'runTests', 'run_tests', + 'get_errors', 'getErrors', + 'test_search', 'testSearch', + 'test_failure', 'testFailure', + # repo / task / command meta + 'githubRepo', 'github_repo', + 'runTasks', 'run_tasks', + 'runCommands', 'run_commands', + 'run_in_terminal', 'runInTerminal', + 'open_simple_browser', 'openSimpleBrowser', + # notebook editing / execution + 'editNotebook', 'edit_notebook', + 'runNotebookCell', 'run_notebook_cell', +} + +# Every Copilot tool name we recognize as a non-MCP built-in (both surfaces). +# Used only by the fail-secure deny: a genuinely-unknown tool on an +# MCP-configured machine is the false-block-risk surface; anything in this union +# is a known built-in and is never denied. TERMINAL_LIKE_TOOLS contributes its +# keys (the values are lambdas). +KNOWN_NON_MCP_BUILTINS = ( + SHELL_TOOLS | READ_TOOLS | WRITE_TOOLS | EDIT_TOOLS + | ALLOWED_NON_MCP_HOOK_NAMES | INTERNAL_TOOLS + | VSCODE_AGENT_BUILTINS + | set(TERMINAL_LIKE_TOOLS) +) + +# Fail-secure kill-switch (default OFF). When OFF, an unresolved bare tool on an +# MCP-configured machine still ALLOWs (return {}) exactly as before. When ON, it +# is DENIED as a potential unidentified MCP call. Read once at module load. +DENY_UNRESOLVED_MCP = os.environ.get('UNBOUND_COPILOT_MCP_DENY_UNRESOLVED', '').strip().lower() in ('1', 'true', 'yes', 'on') + POLICY_CACHE_FILE = LOG_DIR / ".policy_cache.json" CACHE_TTL_SECONDS = 300 POLICY_CHECK_FAILURE_DEFAULT = 'allow' @@ -136,6 +192,7 @@ AUDIT_LOG = LOG_DIR / "agent-audit.log" ERROR_LOG = LOG_DIR / "error.log" LAST_REPORT_FILE = LOG_DIR / ".last_error_report" + LAST_RESOLUTION_REPORT_FILE = LOG_DIR / ".last_resolution_report" POLICY_CACHE_FILE = LOG_DIR / ".policy_cache.json" @@ -143,25 +200,27 @@ _reporting_error = False -def _should_report(): - """Rate limit: max 1 remote error report per 60 seconds. Fails closed.""" +def _should_report(throttle_file=None): + """Rate limit: max 1 remote report per 60 seconds for the given throttle + file. Fails closed. Defaults to the error-report budget (LAST_REPORT_FILE); + resolution telemetry passes its own file so the two budgets never collide.""" + if throttle_file is None: + throttle_file = LAST_REPORT_FILE try: - if LAST_REPORT_FILE.exists(): - mtime = LAST_REPORT_FILE.stat().st_mtime + if throttle_file.exists(): + mtime = throttle_file.stat().st_mtime if (datetime.now().timestamp() - mtime) < 60: return False - LAST_REPORT_FILE.touch() + throttle_file.touch() return True except Exception: return False -def report_error_to_gateway(message, category='general', api_key=None): - """Fire-and-forget error report to gateway. Never blocks, never raises.""" - global _reporting_error - if _reporting_error or not api_key or not _should_report(): - return - _reporting_error = True +def _post_error_payload(message, category, api_key): + """Fire-and-forget POST of a single message to /v1/hooks/errors. No + throttling or reentrancy gating here — callers own their own budget. Never + blocks, never raises.""" try: payload = json.dumps({ 'errors': [{'message': message, 'timestamp': datetime.utcnow().isoformat() + 'Z', 'category': category}], @@ -180,6 +239,19 @@ def report_error_to_gateway(message, category='general', api_key=None): proc.stdin.close() except Exception: pass + + +def report_error_to_gateway(message, category='general', api_key=None, throttle_file=None): + """Fire-and-forget error report to gateway. Never blocks, never raises. + Throttled by the shared error budget (LAST_REPORT_FILE) unless a distinct + throttle_file is supplied — MCP-resolution telemetry passes its own file so + routine telemetry can never consume the error/bypass budget.""" + global _reporting_error + if _reporting_error or not api_key or not _should_report(throttle_file): + return + _reporting_error = True + try: + _post_error_payload(message, category, api_key) finally: _reporting_error = False @@ -333,7 +405,21 @@ def cleanup_old_logs(): ] save_logs(kept_logs) elif len(logs) > AUDIT_LOG_TOTAL_LIMIT: - save_logs(logs[-AUDIT_LOG_TOTAL_LIMIT:]) + # Single session, tail-trim to the last AUDIT_LOG_TOTAL_LIMIT entries — + # but preserve any SessionStart record that would fall outside the tail + # window, so the cwd/model fallback (get_session_start_cwd / + # get_session_start_model) survives a very long single session. + tail = logs[-AUDIT_LOG_TOTAL_LIMIT:] + if not any( + log.get('event', {}).get('hook_event_name') == 'SessionStart' + for log in tail + ): + session_starts = [ + log for log in logs[:-AUDIT_LOG_TOTAL_LIMIT] + if log.get('event', {}).get('hook_event_name') == 'SessionStart' + ] + tail = session_starts + tail + save_logs(tail) def get_recent_user_prompts_for_session(session_id, n): @@ -372,6 +458,28 @@ def get_session_start_model(session_id): return found +def get_session_start_cwd(session_id): + """Return the cwd from the audit-logged SessionStart event for a session. + Copilot includes `cwd` on SessionStart but may omit it on PreToolUse; this + recovers it so MCP workspace configs can still be located. Latest entry wins. + Best-effort: cleanup_old_logs preserves the current session's SessionStart, + but the record can still be absent if the log was rotated/truncated out of + band — callers must tolerate None.""" + if not session_id: + return None + found = None + for log in load_existing_logs(): + event = log.get('event', {}) + if event.get('hook_event_name') != 'SessionStart': + continue + if event.get('session_id') != session_id: + continue + cwd = event.get('cwd') + if cwd: + found = cwd + return found + + def _build_user_prompt_payload(recent_user_prompts): last = recent_user_prompts[-1] if recent_user_prompts else None return { @@ -421,6 +529,20 @@ def _vscode_user_dirs(): return [base / "Code" / "User", base / "Code - Insiders" / "User"] +# Resolve the Copilot CLI config dir, honoring COPILOT_HOME (which — per GitHub's +# docs — replaces the entire ~/.copilot path). Mirrors discovery's +# _resolve_copilot_dir: COPILOT_HOME read from the process env only validly +# applies to the running user. Never raises. +def _copilot_cli_config_dir(): + try: + override = (os.environ.get("COPILOT_HOME") or "").strip() + if override: + return Path(os.path.expanduser(os.path.expandvars(override))) + except Exception: + pass + return Path.home() / ".copilot" + + # All Copilot MCP config locations. Untrusted workspace files (from cwd) come # first; trusted user/global/CLI configs last so they win the last-wins merge in # read_copilot_mcp_servers. @@ -444,7 +566,7 @@ def _copilot_mcp_config_paths(cwd=None): except OSError: pass trusted.append(home / ".config" / "github-copilot" / "intellij" / "mcp.json") - trusted.append(home / ".copilot" / "mcp-config.json") + trusted.append(_copilot_cli_config_dir() / "mcp-config.json") return workspace + trusted @@ -544,6 +666,11 @@ def _sanitize_mcp_server_fields(server): _MCP_CONFIG_MAX_BYTES = 1_000_000 +# Top-level keys that are config metadata, never MCP servers. Excluded from the +# flat top-level server-form fallback so a metadata block carrying a `command`/ +# `url` (e.g. VS Code's `inputs`) can't be misread as a phantom server. +_FLAT_FORM_NON_SERVER_KEYS = {'inputs', 'version', '$schema'} + def read_copilot_mcp_servers(cwd=None): servers = {} @@ -564,6 +691,21 @@ def read_copilot_mcp_servers(cwd=None): nested = config.get('mcp') raw = nested.get('servers') if isinstance(nested, dict) else None if not isinstance(raw, dict): + # Flat top-level form (Claude-style, accepted by the GitHub CLI): + # treat each top-level entry whose value is a dict carrying a + # `command` or `url` as a server. Mirrors discovery's + # _extract_servers_obj fallback; ignores scalar metadata and + # known non-server top-level blocks (e.g. a VS Code `inputs` + # block that itself carries a `command` key) so they can't mint + # a phantom server. + raw = { + name: value + for name, value in config.items() + if name not in _FLAT_FORM_NON_SERVER_KEYS + and isinstance(value, dict) + and ('command' in value or 'url' in value) + } + if not isinstance(raw, dict) or not raw: continue for name, server in raw.items(): fields = _sanitize_mcp_server_fields(server) @@ -576,7 +718,9 @@ def read_copilot_mcp_servers(cwd=None): return servers -# Mirror Copilot's server-name sanitization for tool-name prefixes. +# Mirror Copilot's server-name sanitization for tool-name prefixes. NOTE: parity +# with Copilot's real derivation is unverified — the fail-secure deny (gated by +# DENY_UNRESOLVED_MCP) is the safety net for any case this heuristic misses. def _sanitize_copilot_server_name(name): return re.sub(r'[^a-zA-Z0-9_-]', '-', name.replace('@', '-')) @@ -822,6 +966,34 @@ def transform_response_for_copilot_prompt(api_response): return {} +def _copilot_surface(event): + """`cli` (snake_case keys) vs `vscode` (camelCase keys). Best-effort: keyed + off which casing the event carries; defaults to cli. This is a telemetry + LABELING heuristic only — never authoritative for control flow.""" + if 'toolName' in event or 'hookEventName' in event or 'sessionId' in event: + return 'vscode' + return 'cli' + + +def _report_mcp_resolution(category, raw_tool, event, api_key): + """Fire-and-forget telemetry for the MCP-resolution outcome. NAMES ONLY — + never args or config. Computes a rate, not just a count, by emitting on the + resolved path too. Reuses the report_error_to_gateway plumbing (which never + raises) but on its OWN throttle budget (LAST_RESOLUTION_REPORT_FILE), so this + routine telemetry can never suppress a genuine error/bypass report sharing + the LAST_REPORT_FILE budget.""" + try: + surface = _copilot_surface(event) + report_error_to_gateway( + f"{category} surface={surface} tool={raw_tool}", + category=category, + api_key=api_key, + throttle_file=LAST_RESOLUTION_REPORT_FILE, + ) + except Exception: + pass + + def process_pre_tool_use(event, api_key): """Process PreToolUse event - check policy before tool execution.""" raw_tool = event.get('tool_name') or event.get('toolName') or '' @@ -834,23 +1006,58 @@ def process_pre_tool_use(event, api_key): mcp_server = mcp_tool = mcp_server_config = None if not is_mcp and canonical not in ALLOWED_NON_MCP_HOOK_NAMES: - cwd = event.get('cwd') + # cwd locates workspace MCP configs. Copilot sends it on SessionStart but + # may omit it on PreToolUse, so fall back to the session's logged cwd. + cwd = event.get('cwd') or get_session_start_cwd(session_id) + mcp_servers = read_copilot_mcp_servers(cwd) - mcp_server, mcp_tool, mcp_server_config = detect_mcp_call(raw_tool, mcp_servers) + + # Forward-compat (DEAD today): if Copilot ever emits a structured server + # field, trust it and skip the heuristic. Confirmed: today's Copilot + # PreToolUse emits NEITHER of these — this is future-proofing, not the + # active fix. + structured_server = event.get('mcp_server') or event.get('serverName') or event.get('server') + if structured_server: + mcp_server = structured_server + mcp_tool = event.get('mcp_tool') or event.get('toolName') or event.get('tool_name') or '' + mcp_server_config = mcp_servers.get(structured_server) + else: + mcp_server, mcp_tool, mcp_server_config = detect_mcp_call(raw_tool, mcp_servers) if mcp_server is None: - # A bare (non-mcp__) tool can only be resolved against the MCP config. - # If no config was readable, a genuine MCP call can't be identified - # and would slip the allow-list — surface that distinctly so the - # potential bypass is observable rather than silent. Skip known-benign - # native tools so the log isn't noisy. - if raw_tool and raw_tool not in INTERNAL_TOOLS and raw_tool not in TERMINAL_LIKE_TOOLS: - if not mcp_servers and not raw_tool.startswith('mcp__'): + # A bare (non-mcp__) tool couldn't be resolved to a configured MCP + # server. Two mutually-exclusive cases, surfaced distinctly so the + # false-block-risk RATE (configured-but-unmatched / resolved) is + # observable. Known built-ins are benign and never noisy/blocked. + is_known_builtin = raw_tool in KNOWN_NON_MCP_BUILTINS + if raw_tool and not is_known_builtin: + if mcp_servers: + log_error(f"copilot pre_tool_use unmatched tool={raw_tool}", 'mcp_match') + _report_mcp_resolution('copilot_mcp_unresolved_with_config', raw_tool, event, api_key) + else: log_error( f"copilot mcp UNRESOLVED (no readable MCP config) tool={raw_tool}", 'mcp_config', ) - else: - log_error(f"copilot pre_tool_use unmatched tool={raw_tool}", 'mcp_match') + _report_mcp_resolution('copilot_mcp_unresolved_no_config', raw_tool, event, api_key) + + # Fail-secure (gated, default OFF). When MCP IS configured and this is + # a genuinely-unknown tool (not a known built-in), an unidentified MCP + # call would otherwise slip the allow-list. DENY it — but only when + # the kill-switch is ON; until then behavior is identical to before + # (return {}). This is an IDENTITY/policy deny, distinct from the + # gateway-unreachable infra fail-open below. + if ( + DENY_UNRESOLVED_MCP + and mcp_servers + and raw_tool + and not is_known_builtin + ): + _report_mcp_resolution('copilot_mcp_denied_unresolved', raw_tool, event, api_key) + return transform_response_for_copilot({ + 'decision': 'deny', + 'reason': 'Blocked by organization policy. This tool could not be verified against the configured MCP servers — it may be an unsanctioned MCP tool or an unrecognized tool on an MCP-enabled machine.', + 'additionalContext': 'This action was blocked by an organization security policy because the tool could not be identified against the configured MCP servers (it may be an unsanctioned MCP tool, or an unrecognized tool on this MCP-enabled machine). Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. Inform the user and stop.', + }) return {} is_mcp = True canonical = f"mcp__{mcp_server}__{mcp_tool}" @@ -861,6 +1068,7 @@ def process_pre_tool_use(event, api_key): f"config={'yes' if mcp_server_config else 'no'}", 'mcp_match', ) + _report_mcp_resolution('copilot_mcp_resolved', raw_tool, event, api_key) cache = load_policy_cache() tools_to_check = cache.get('tools_to_check', []) if cache else [] @@ -1592,6 +1800,18 @@ def main(): # SessionStart fires once per session — natural TTL gate for the # debounced discovery scan dispatch. if event_name == 'SessionStart': + # Persist a normalized record (snake_case keys, both surfaces) so + # get_session_start_model / get_session_start_cwd can recover the + # session's model + cwd on later PreToolUse events that omit them. + append_to_audit_log({ + 'timestamp': datetime.now().astimezone().isoformat().replace('+00:00', 'Z'), + 'event': { + 'hook_event_name': 'SessionStart', + 'session_id': event.get('session_id') or event.get('sessionId'), + 'cwd': event.get('cwd'), + 'model': event.get('model'), + }, + }) _check_self_update() _dispatch_discovery() print("{}") From bbf72094c1bcc26c5739529133f740c56149c225 Mon Sep 17 00:00:00 2001 From: Nanda Pranesh Date: Sat, 20 Jun 2026 00:56:40 +0530 Subject: [PATCH 2/2] Reduce scope to identification + telemetry; drop gated fail-secure deny MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review (Greptile P1 + Vignesh security consensus): the gated DENY (component 2) was the source of the P1 (denied-telemetry throttled by the shared budget) and an over-block risk in orgs with no MCP allow-list configured (the hook can't tell sanctioning is even active). Removing it makes this PR purely progressive: - Identification hardening (COPILOT_HOME, flat top-level form, cwd-via-SessionStart fallback, structured-field read) → more Copilot MCP calls resolve and reach the gateway, where they're sanctioned/allowed exactly like Claude Code & Cursor. - Telemetry (resolved / unresolved_with_config / unresolved_no_config) to measure the residual unresolved rate before any deny is built. No enforcement-direction change: an unresolved tool still returns {} (allow) as before. Also fixes the COPILOT_HOME expandvars nit (expanduser on the literal only). The fail-secure DENY moves to a follow-up that ALSO adds a gateway `mcp_sanctioning_active` signal, so it denies only when an allow-list is actually configured (no over-block in non-sanctioning orgs). Co-Authored-By: Claude Opus 4.8 (1M context) --- copilot/hooks/test_pretool_mcp.py | 143 +++++++++--------------------- copilot/hooks/unbound.py | 60 +++++-------- 2 files changed, 62 insertions(+), 141 deletions(-) diff --git a/copilot/hooks/test_pretool_mcp.py b/copilot/hooks/test_pretool_mcp.py index 3326bdfe..ab0b6ea8 100644 --- a/copilot/hooks/test_pretool_mcp.py +++ b/copilot/hooks/test_pretool_mcp.py @@ -1,5 +1,5 @@ """ -Integration tests for Copilot PreToolUse MCP identification + fail-secure. +Integration tests for Copilot PreToolUse MCP identification + telemetry. Driven through process_pre_tool_use / main() (the outermost layer) — assertions are on the RETURNED dict, never on internal helpers. No network: send_to_hook_api @@ -12,10 +12,10 @@ - cwd resolution via event cwd AND via the SessionStart-cwd fallback - COPILOT_HOME override - flat top-level server form - - fail-secure matrix (flag ON) + inert-when-OFF - - infra fail-open (resolved call, gateway {}) stays distinct from identity deny + - unresolved bare tool is allowed (return {}) while still emitting telemetry + - infra fail-open (resolved call, gateway {}) honors policy_check_failure_action - never-crash on garbage/oversized/missing input - - telemetry categories on each path + - telemetry categories on each path + resolution-budget isolation """ import json @@ -268,72 +268,45 @@ def test_flat_form_inputs_block_with_command_is_not_a_server(self): self.assertEqual(sent['pre_tool_use_data']['tool_name'], 'mcp__github__create_issue') -class TestFailSecureFlagOn(_Base): - """Fail-secure matrix with UNBOUND_COPILOT_MCP_DENY_UNRESOLVED ON.""" - - def setUp(self): - super().setUp() - self._deny = patch.object(unbound, "DENY_UNRESOLVED_MCP", True) - self._deny.start() - self.addCleanup(self._deny.stop) +class TestUnresolvedIsAllowed(_Base): + """An unresolved bare tool is ALLOWED (return {}) — identical to pre-PR + behavior. Identification + telemetry are observe-only; no deny is issued. + Known built-ins stay out of the unresolved telemetry (noise reduction).""" def _run(self, event, config): - self._write_mcp(config) if config is not None else None + if config is not None: + self._write_mcp(config) paths = (self.mcp_config,) if config is not None else () with self._point_config_paths_at(*paths), \ - patch.object(unbound, "send_to_hook_api", return_value={}): - return unbound.process_pre_tool_use(event, "k") - - def test_unresolved_with_config_nonnative_denies(self): - result = self._run( - {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, - {"servers": {"github": {"command": "c"}}}, - ) - self.assertEqual(result.get('permissionDecision'), 'deny') - self.assertEqual( - result['hookSpecificOutput']['permissionDecision'], 'deny' - ) + patch.object(unbound, "send_to_hook_api", return_value={}) as api: + return unbound.process_pre_tool_use(event, "k"), api - def test_unresolved_with_config_emits_denied_telemetry(self): - self.mock_telemetry.reset_mock() - self._run( + def test_unresolved_with_config_allows(self): + # A genuinely-unknown tool on an MCP-configured machine is NOT denied. + result, api = self._run( {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, {"servers": {"github": {"command": "c"}}}, ) - self.assertIn('copilot_mcp_denied_unresolved', self._telemetry_categories()) + self.assertEqual(result, {}) + self.assertFalse(api.called) # unresolved → no gateway call def test_unresolved_no_config_allows(self): # No MCP configured at all → cannot be an MCP call → allow. - result = self._run({"tool_name": "weird-thing", "session_id": "s1"}, None) + result, _ = self._run({"tool_name": "weird-thing", "session_id": "s1"}, None) self.assertEqual(result, {}) - def test_known_builtin_never_blocked_even_with_config(self): - # `read_file` is a known built-in; never denied even when MCP configured. - result = self._run( + def test_known_builtin_allows_with_config(self): + # `read_file` is a known built-in; allowed even when MCP configured. + result, _ = self._run( {"tool_name": "read_file", "session_id": "s1", "tool_input": {"filePath": "/x"}}, {"servers": {"github": {"command": "c"}}}, ) self.assertEqual(result, {}) - def test_internal_tool_never_blocked(self): - result = self._run( - {"tool_name": "manage_todo_list", "session_id": "s1"}, - {"servers": {"github": {"command": "c"}}}, - ) - self.assertEqual(result, {}) - - def test_terminal_like_tool_never_blocked(self): - result = self._run( - {"tool_name": "grep_search", "session_id": "s1", "tool_input": {"query": "x"}}, - {"servers": {"github": {"command": "c"}}}, - ) - self.assertEqual(result, {}) - - def test_vscode_agent_builtins_never_blocked_even_with_config(self): - # The expanded VS Code agent-mode built-in allow-set (read-only search, - # test running, notebook editing, repo/task meta — both camel & snake - # variants) must never be denied when the deny flag is ON, so flipping - # it on can't false-block routine agent built-ins. + def test_vscode_agent_builtins_allow_with_config(self): + # The expanded VS Code agent-mode built-in set (read-only search, test + # running, notebook editing, repo/task meta — both camel & snake + # variants) is allowed when MCP is configured. builtins = [ "fetch", "fetch_webpage", "semantic_search", "codebase", "usages", "findTestFiles", "runTests", "run_tests", "get_errors", @@ -342,72 +315,36 @@ def test_vscode_agent_builtins_never_blocked_even_with_config(self): "runNotebookCell", "edit_notebook", "run_notebook_cell", ] for tool in builtins: - result = self._run( + result, _ = self._run( {"tool_name": tool, "session_id": "s1"}, {"servers": {"github": {"command": "c"}}}, ) - self.assertEqual(result, {}, f"{tool} should never be blocked") + self.assertEqual(result, {}, f"{tool} should be allowed") - def test_vscode_agent_builtin_emits_no_denied_telemetry(self): - # A known built-in must not even register as denied/unresolved-with-config. + def test_known_builtin_emits_no_unresolved_telemetry(self): + # A known built-in must not register as unresolved-with-config (noise). self.mock_telemetry.reset_mock() self._run( {"tool_name": "semantic_search", "session_id": "s1"}, {"servers": {"github": {"command": "c"}}}, ) - cats = self._telemetry_categories() - self.assertNotIn('copilot_mcp_denied_unresolved', cats) - self.assertNotIn('copilot_mcp_unresolved_with_config', cats) - - def test_native_tools_unaffected(self): - # Native Read/Write/Edit/Bash go through the normal path (not the MCP - # branch) → resolved call to the gateway, which returns {} → allow. - for tool, payload in [ - ("Read", {"file_path": "/a"}), - ("Write", {"file_path": "/a"}), - ("Edit", {"file_path": "/a"}), - ("Bash", {"command": "ls"}), - ]: - event = {"tool_name": tool, "session_id": "s1", "tool_input": payload} - with self._point_config_paths_at(self.mcp_config), \ - patch.object(unbound, "send_to_hook_api", return_value={}) as api: - self._write_mcp({"servers": {"github": {"command": "c"}}}) - result = unbound.process_pre_tool_use(event, "k") - self.assertEqual(result, {}, f"{tool} should be allowed") - self.assertTrue(api.called, f"{tool} should reach the gateway") - - -class TestFailSecureFlagOffIsInert(_Base): - """Default (flag OFF): the deny case from the ON matrix returns {} — proving - the change is inert until the kill-switch is flipped.""" - - def test_unresolved_with_config_allows_when_flag_off(self): - self.assertFalse(unbound.DENY_UNRESOLVED_MCP) # default - self._write_mcp({"servers": {"github": {"command": "c"}}}) - with self._point_config_paths_at(self.mcp_config), \ - patch.object(unbound, "send_to_hook_api", return_value={}): - result = unbound.process_pre_tool_use( - {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, "k" - ) - self.assertEqual(result, {}) + self.assertNotIn('copilot_mcp_unresolved_with_config', self._telemetry_categories()) - def test_unresolved_with_config_telemetry_still_emitted_when_flag_off(self): - # Observability ships with component 1 regardless of the deny gate. + def test_unresolved_with_config_still_emits_telemetry(self): + # Identification telemetry ships regardless: the unresolved (allowed) + # case still emits copilot_mcp_unresolved_with_config for the soak. self.mock_telemetry.reset_mock() - self._write_mcp({"servers": {"github": {"command": "c"}}}) - with self._point_config_paths_at(self.mcp_config), \ - patch.object(unbound, "send_to_hook_api", return_value={}): - unbound.process_pre_tool_use( - {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, "k" - ) - cats = self._telemetry_categories() - self.assertIn('copilot_mcp_unresolved_with_config', cats) - self.assertNotIn('copilot_mcp_denied_unresolved', cats) + self._run( + {"tool_name": "totally-unknown-mcp-tool", "session_id": "s1"}, + {"servers": {"github": {"command": "c"}}}, + ) + self.assertIn('copilot_mcp_unresolved_with_config', self._telemetry_categories()) class TestInfraVsIdentity(_Base): """A RESOLVED MCP call where the gateway is unreachable ({}) must honor - policy_check_failure_action — distinct from the identity deny.""" + policy_check_failure_action — the infra fail-open/closed path, distinct from + the identification path (an unresolved tool, which simply allows).""" def setUp(self): super().setUp() diff --git a/copilot/hooks/unbound.py b/copilot/hooks/unbound.py index 286ea402..96ccf902 100644 --- a/copilot/hooks/unbound.py +++ b/copilot/hooks/unbound.py @@ -126,16 +126,15 @@ # Additional VS Code agent-mode built-ins that carry no security-relevant action # of their own (read-only search/fetch, test running, notebook editing, terminal -# meta) and are NOT in the scoring sets above. Listed here ONLY so the -# fail-secure deny never false-blocks them when the kill-switch is flipped ON; +# meta) and are NOT in the scoring sets above. Listed here ONLY to keep these +# known-benign built-ins out of the unresolved-MCP telemetry (noise reduction); # they are not mapped/scored as analytics (canonical_tool_name still returns '' # for them, exactly as before). # # This set is INTENTIONALLY CONSERVATIVE — it lists known-benign built-ins by -# name rather than allow-by-default. Flipping UNBOUND_COPILOT_MCP_DENY_UNRESOLVED -# ON is gated on telemetry showing `copilot_mcp_unresolved_with_config` ~= 0: -# the soak surfaces any built-in we missed (as an unresolved-with-config event) -# so this set can be topped up before the deny is ever enabled in prod. +# name rather than allow-by-default. The unresolved-with-config telemetry soak +# surfaces any built-in we missed (as an unresolved-with-config event) so this +# set can be topped up. VSCODE_AGENT_BUILTINS = { # read-only search / context retrieval / web fetch 'fetch', 'fetch_webpage', 'fetchWebPage', @@ -159,10 +158,11 @@ } # Every Copilot tool name we recognize as a non-MCP built-in (both surfaces). -# Used only by the fail-secure deny: a genuinely-unknown tool on an -# MCP-configured machine is the false-block-risk surface; anything in this union -# is a known built-in and is never denied. TERMINAL_LIKE_TOOLS contributes its -# keys (the values are lambdas). +# Used only to keep known-benign tools OUT of the unresolved-MCP telemetry: a +# genuinely-unknown tool on an MCP-configured machine is the signal worth +# surfacing, so anything in this union is treated as a known built-in and never +# emits unresolved telemetry. TERMINAL_LIKE_TOOLS contributes its keys (the +# values are lambdas). KNOWN_NON_MCP_BUILTINS = ( SHELL_TOOLS | READ_TOOLS | WRITE_TOOLS | EDIT_TOOLS | ALLOWED_NON_MCP_HOOK_NAMES | INTERNAL_TOOLS @@ -170,11 +170,6 @@ | set(TERMINAL_LIKE_TOOLS) ) -# Fail-secure kill-switch (default OFF). When OFF, an unresolved bare tool on an -# MCP-configured machine still ALLOWs (return {}) exactly as before. When ON, it -# is DENIED as a potential unidentified MCP call. Read once at module load. -DENY_UNRESOLVED_MCP = os.environ.get('UNBOUND_COPILOT_MCP_DENY_UNRESOLVED', '').strip().lower() in ('1', 'true', 'yes', 'on') - POLICY_CACHE_FILE = LOG_DIR / ".policy_cache.json" CACHE_TTL_SECONDS = 300 POLICY_CHECK_FAILURE_DEFAULT = 'allow' @@ -537,7 +532,7 @@ def _copilot_cli_config_dir(): try: override = (os.environ.get("COPILOT_HOME") or "").strip() if override: - return Path(os.path.expanduser(os.path.expandvars(override))) + return Path(os.path.expanduser(override)) except Exception: pass return Path.home() / ".copilot" @@ -719,8 +714,9 @@ def read_copilot_mcp_servers(cwd=None): # Mirror Copilot's server-name sanitization for tool-name prefixes. NOTE: parity -# with Copilot's real derivation is unverified — the fail-secure deny (gated by -# DENY_UNRESOLVED_MCP) is the safety net for any case this heuristic misses. +# with Copilot's real derivation is unverified — a case this heuristic misses +# falls through as unresolved (allowed, with unresolved-MCP telemetry), exactly +# as a bare tool did before this change. def _sanitize_copilot_server_name(name): return re.sub(r'[^a-zA-Z0-9_-]', '-', name.replace('@', '-')) @@ -1026,8 +1022,9 @@ def process_pre_tool_use(event, api_key): if mcp_server is None: # A bare (non-mcp__) tool couldn't be resolved to a configured MCP # server. Two mutually-exclusive cases, surfaced distinctly so the - # false-block-risk RATE (configured-but-unmatched / resolved) is - # observable. Known built-ins are benign and never noisy/blocked. + # unresolved RATE (configured-but-unmatched vs. resolved) is + # observable. Known built-ins are benign and kept out of the + # telemetry so it isn't noisy. is_known_builtin = raw_tool in KNOWN_NON_MCP_BUILTINS if raw_tool and not is_known_builtin: if mcp_servers: @@ -1040,24 +1037,11 @@ def process_pre_tool_use(event, api_key): ) _report_mcp_resolution('copilot_mcp_unresolved_no_config', raw_tool, event, api_key) - # Fail-secure (gated, default OFF). When MCP IS configured and this is - # a genuinely-unknown tool (not a known built-in), an unidentified MCP - # call would otherwise slip the allow-list. DENY it — but only when - # the kill-switch is ON; until then behavior is identical to before - # (return {}). This is an IDENTITY/policy deny, distinct from the - # gateway-unreachable infra fail-open below. - if ( - DENY_UNRESOLVED_MCP - and mcp_servers - and raw_tool - and not is_known_builtin - ): - _report_mcp_resolution('copilot_mcp_denied_unresolved', raw_tool, event, api_key) - return transform_response_for_copilot({ - 'decision': 'deny', - 'reason': 'Blocked by organization policy. This tool could not be verified against the configured MCP servers — it may be an unsanctioned MCP tool or an unrecognized tool on an MCP-enabled machine.', - 'additionalContext': 'This action was blocked by an organization security policy because the tool could not be identified against the configured MCP servers (it may be an unsanctioned MCP tool, or an unrecognized tool on this MCP-enabled machine). Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. Inform the user and stop.', - }) + # A bare tool that couldn't be resolved to a configured MCP server is + # ALLOWED (return {}) — identical to pre-PR behavior. Identification + + # telemetry above are observe-only; no deny is issued here. (The + # gateway-unreachable infra fail-open for RESOLVED calls is separate, + # handled below.) return {} is_mcp = True canonical = f"mcp__{mcp_server}__{mcp_tool}"