Skip to content
99 changes: 92 additions & 7 deletions claude-code/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
AUDIT_LOG = Path.home() / ".claude" / "hooks" / "agent-audit.log"
ERROR_LOG = Path.home() / ".claude" / "hooks" / "error.log"
LAST_REPORT_FILE = Path.home() / ".claude" / "hooks" / ".last_error_report"
ALLOWED_NON_MCP_HOOK_NAMES = ['Bash', 'Read', 'Write', 'Edit'] # MCP tools (mcp__*) are always checked separately
NATIVE_FILE_TOOLS = {'Read', 'Write', 'Edit'}
ALLOWED_NON_MCP_HOOK_NAMES = ['Bash', 'Read', 'Write', 'Edit', 'MultiEdit', 'NotebookEdit'] # MCP tools (mcp__*) are always checked separately
NATIVE_FILE_TOOLS = {'Read', 'Write', 'Edit', 'MultiEdit', 'NotebookEdit'}
MCP_TOOL_PREFIX = 'mcp__'
CLAUDE_MCP_CONFIG_PATH = Path.home() / ".claude.json"
CLAUDE_PLUGIN_CACHE_DIR = Path.home() / ".claude" / "plugins" / "cache"
Expand Down Expand Up @@ -488,6 +488,13 @@ def _build_user_prompt_payload(recent_user_prompts: List[str]) -> Dict:
}


def _tool_file_path(tool_input: Dict) -> Optional[str]:
"""Target path for a native file tool. NotebookEdit nests it under
notebook_path; Read/Write/Edit/MultiEdit use file_path."""
path = tool_input.get('file_path') or tool_input.get('notebook_path')
return path if isinstance(path, str) and path else None


def extract_command_for_pretool(event: Dict) -> str:
"""Extract command from tool_input based on tool type."""
tool_input = event.get('tool_input') or {}
Expand All @@ -499,9 +506,11 @@ def extract_command_for_pretool(event: Dict) -> str:
# MCP tools: stringify the input
if tool_name.startswith(MCP_TOOL_PREFIX):
return json.dumps(tool_input)
# File tools: file_path
if tool_name in ['Write', 'Edit', 'Read'] and 'file_path' in tool_input:
return tool_input['file_path']
# File tools: file_path / notebook_path
if tool_name in NATIVE_FILE_TOOLS:
path = _tool_file_path(tool_input)
if path:
return path
# Grep: pattern
if tool_name == 'Grep' and 'pattern' in tool_input:
return tool_input['pattern']
Expand Down Expand Up @@ -937,6 +946,77 @@ def _get_device_serial() -> Optional[str]:
return None


_GIT_CONTEXT_CACHE: Dict = {}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
_GIT_CONTEXT_CACHE_MAX = 256


def _cache_git_context(key, value) -> None:
if key not in _GIT_CONTEXT_CACHE and len(_GIT_CONTEXT_CACHE) >= _GIT_CONTEXT_CACHE_MAX:
_GIT_CONTEXT_CACHE.pop(next(iter(_GIT_CONTEXT_CACHE)), None)
_GIT_CONTEXT_CACHE[key] = value


def _strip_git_credentials(url):
try:
if not url or '@' not in url:
return url
scheme = re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', url)
prefix = scheme.group(0) if scheme else ''
rest = url[len(prefix):]
slash = rest.find('/')
authority = rest if slash == -1 else rest[:slash]
tail = '' if slash == -1 else rest[slash:]
at = authority.rfind('@')
if at == -1:
return url
return prefix + authority[at + 1:] + tail
except Exception:
return None


def _get_git_context(session_id: Optional[str], cwd: Optional[str]) -> Optional[str]:
"""Credential-stripped origin remote URL for cwd, or None. Successful and
conclusive-no-repo lookups are cached per (session_id, cwd); transient
failures are not cached. Never raises."""
key = (session_id, cwd)
if key in _GIT_CONTEXT_CACHE:
return _GIT_CONTEXT_CACHE[key]
if not cwd:
return None
try:
out = subprocess.run(
['git', '-C', cwd, 'config', '--get', 'remote.origin.url'],
capture_output=True, text=True, timeout=2,
)
except Exception as exc:
log_error(f"git context lookup failed session={session_id} cwd={cwd}: {exc}", 'git_context')
return None
result = None
if out.returncode == 0:
url = out.stdout.strip()
if url:
result = _strip_git_credentials(url)
_cache_git_context(key, result)
return result
Comment on lines +977 to +1000

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Missing observability metrics for the git context capture flow

_get_git_context introduces a new subprocess call path with caching but ships zero Prometheus metrics. Per the team's instrumentation requirement, every new flow must be instrumented now regardless of dashboard use. At minimum this flow needs: (1) a latency histogram for the subprocess.run call (labeled with outcome: hit, miss, timeout, error), (2) a success/failure counter, and (3) a cache hit/miss counter. Without these, a regression (e.g., a git binary path change causing silent None for all users) would be completely invisible in production. The same gap exists identically in copilot/hooks/unbound.py.

Context Used: P0 — Critical (must block merge)
Django / Backend ... (source)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional — these are client-side hooks that run as short-lived subprocesses inside the user's editor (Claude Code / Copilot / Cursor), not a long-running server. There is no Prometheus registry, scrape endpoint, or process for a collector to reach, so a histogram/counter would have nothing to expose. The server-side instrumentation requirement doesn't apply to this runtime. Observability for this flow is handled the way the rest of the hook is: _get_git_context now emits a log_error (category git_context, fire-and-forward to the gateway error sink) on the failure path, which is the signal a 'git binary path change → silent None' regression would surface through.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid correction — the client-side subprocess runtime makes Prometheus metrics inapplicable here, and the log_error path to the gateway error sink is the right observability primitive for this context. Withdrawing this comment.

Tip: You can customize Greptile's behavior for this repo with .greptile/rules.md and .greptile/config.json.



def _repo_context_dir(cwd: Optional[str], file_path) -> Optional[str]:
"""Directory whose git repo governs the operation: the nearest existing
ancestor of the target file for file tools, else the session cwd. The
ancestor walk lets a write into a not-yet-created path still resolve to its
enclosing repo."""
if isinstance(file_path, str) and file_path:
base = file_path if os.path.isabs(file_path) else os.path.join(cwd or '', file_path)
d = os.path.dirname(base) or cwd
while d and not os.path.isdir(d):
parent = os.path.dirname(d)
if parent == d:
break
d = parent
return d or cwd
return cwd


def _device_serial(probe: bool = True) -> Optional[str]:
"""Hardware serial, computed once and cached. Never raises and never blocks the
hook. On the latency-critical pre-tool path callers pass probe=False to read the
Expand Down Expand Up @@ -1023,8 +1103,9 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict:
# Build metadata with the raw event
metadata = dict(event)
tool_input = event.get('tool_input') or {}
if 'file_path' in tool_input:
metadata['file_path'] = tool_input['file_path']
file_path = _tool_file_path(tool_input)
if file_path:
metadata['file_path'] = file_path

if is_mcp:
# Parse mcp__<server>__<tool> to extract server and tool for gateway matching
Expand Down Expand Up @@ -1108,6 +1189,10 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict:
'additionalContext': 'This command was blocked by an organization security policy that requires approval. Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. The user must approve via Slack and retry.',
})

request_body['git_remote_url'] = _get_git_context(
session_id, _repo_context_dir(event.get('cwd'), file_path)
)

if need_pull_policies:
request_body['pull_policies'] = True

Expand Down
78 changes: 77 additions & 1 deletion copilot/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,77 @@ def transform_response_for_copilot_prompt(api_response):
return {}


_GIT_CONTEXT_CACHE = {}
_GIT_CONTEXT_CACHE_MAX = 256


def _cache_git_context(key, value):
if key not in _GIT_CONTEXT_CACHE and len(_GIT_CONTEXT_CACHE) >= _GIT_CONTEXT_CACHE_MAX:
_GIT_CONTEXT_CACHE.pop(next(iter(_GIT_CONTEXT_CACHE)), None)
_GIT_CONTEXT_CACHE[key] = value


def _strip_git_credentials(url):
try:
if not url or '@' not in url:
return url
scheme = re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', url)
prefix = scheme.group(0) if scheme else ''
rest = url[len(prefix):]
slash = rest.find('/')
authority = rest if slash == -1 else rest[:slash]
tail = '' if slash == -1 else rest[slash:]
at = authority.rfind('@')
if at == -1:
return url
return prefix + authority[at + 1:] + tail
except Exception:
return None


def _get_git_context(session_id, cwd):
"""Credential-stripped origin remote URL for cwd, or None. Successful and
conclusive-no-repo lookups are cached per (session_id, cwd); transient
failures are not cached. Never raises."""
key = (session_id, cwd)
if key in _GIT_CONTEXT_CACHE:
return _GIT_CONTEXT_CACHE[key]
if not cwd:
return None
try:
out = subprocess.run(
['git', '-C', cwd, 'config', '--get', 'remote.origin.url'],
capture_output=True, text=True, timeout=2,
)
except Exception as exc:
log_error(f"git context lookup failed session={session_id} cwd={cwd}: {exc}", 'git_context')
return None
result = None
if out.returncode == 0:
url = out.stdout.strip()
if url:
result = _strip_git_credentials(url)
_cache_git_context(key, result)
return result


def _repo_context_dir(cwd, file_path):
"""Directory whose git repo governs the operation: the nearest existing
ancestor of the target file for file tools, else the session cwd. The
ancestor walk lets a write into a not-yet-created path still resolve to its
enclosing repo."""
if isinstance(file_path, str) and file_path:
base = file_path if os.path.isabs(file_path) else os.path.join(cwd or '', file_path)
d = os.path.dirname(base) or cwd
while d and not os.path.isdir(d):
parent = os.path.dirname(d)
if parent == d:
break
d = parent
return d or cwd
return cwd


def process_pre_tool_use(event, api_key):
"""Process PreToolUse event - check policy before tool execution."""
raw_tool = event.get('tool_name') or event.get('toolName') or ''
Expand Down Expand Up @@ -999,7 +1070,8 @@ def process_pre_tool_use(event, api_key):

# Preserve the raw event (raw tool_name + tool_input) inside metadata.
metadata = dict(event)
file_path = tool_input.get('filePath') or tool_input.get('path') or tool_input.get('file_path')
file_path = (tool_input.get('filePath') or tool_input.get('path')
or tool_input.get('file_path') or _extract_patch_target_path(tool_input))
if file_path:
metadata['file_path'] = file_path

Expand Down Expand Up @@ -1052,6 +1124,10 @@ def process_pre_tool_use(event, api_key):
'additionalContext': 'This action was blocked by an organization security policy that requires approval. Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. The user must approve via Slack and retry.',
})

request_body['git_remote_url'] = _get_git_context(
session_id, _repo_context_dir(event.get('cwd'), file_path)
)

if need_pull_policies:
request_body['pull_policies'] = True

Expand Down
73 changes: 73 additions & 0 deletions cursor/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,75 @@ def build_account_identity(event=None, probe=False):
return identity


_GIT_CONTEXT_CACHE = {}
_GIT_CONTEXT_CACHE_MAX = 256


def _cache_git_context(key, value):
if key not in _GIT_CONTEXT_CACHE and len(_GIT_CONTEXT_CACHE) >= _GIT_CONTEXT_CACHE_MAX:
_GIT_CONTEXT_CACHE.pop(next(iter(_GIT_CONTEXT_CACHE)), None)
_GIT_CONTEXT_CACHE[key] = value


def _strip_git_credentials(url):
try:
if not url or '@' not in url:
return url
scheme = re.match(r'^[a-zA-Z][a-zA-Z0-9+.-]*://', url)
prefix = scheme.group(0) if scheme else ''
rest = url[len(prefix):]
slash = rest.find('/')
authority = rest if slash == -1 else rest[:slash]
tail = '' if slash == -1 else rest[slash:]
at = authority.rfind('@')
if at == -1:
return url
return prefix + authority[at + 1:] + tail
except Exception:
return None


def _get_git_context(session_id, cwd):
"""Credential-stripped origin remote URL for cwd, or None. Successful and
conclusive-no-repo lookups are cached per (session_id, cwd); transient
failures are not cached. Never raises."""
key = (session_id, cwd)
if key in _GIT_CONTEXT_CACHE:
return _GIT_CONTEXT_CACHE[key]
if not cwd:
return None
try:
out = subprocess.run(
['git', '-C', cwd, 'config', '--get', 'remote.origin.url'],
capture_output=True, text=True, timeout=2,
)
except Exception as exc:
log_error(f"git context lookup failed session={session_id} cwd={cwd}: {exc}", 'git_context')
return None
result = None
if out.returncode == 0:
url = out.stdout.strip()
if url:
result = _strip_git_credentials(url)
_cache_git_context(key, result)
return result


def _repo_context_dir(cwd, file_path):
"""Directory whose git repo governs the operation: the nearest existing
ancestor of the target file for file tools, else the session cwd."""
if isinstance(file_path, str) and file_path:
base = file_path if os.path.isabs(file_path) else os.path.join(cwd or '', file_path)
d = os.path.dirname(base) or cwd
while d and not os.path.isdir(d):
parent = os.path.dirname(d)
if parent == d:
break
d = parent
return d or cwd
return cwd


def process_pre_tool_use(event, api_key):
"""Process preToolUse event - check policy before tool execution."""
tool_name = event.get('tool_name', '')
Expand Down Expand Up @@ -742,6 +811,10 @@ def process_pre_tool_use(event, api_key):
'agent_message': 'This action was blocked by an organization security policy that requires approval. Do not attempt to achieve the same result using alternative tools, file operations, or workarounds. The user must approve via Slack and retry.',
}

request_body['git_remote_url'] = _get_git_context(
conversation_id, _repo_context_dir(event.get('cwd'), file_path)
)

if need_pull_policies:
request_body['pull_policies'] = True

Expand Down
Loading