From 0ee9702105b3d8c478021d711ed1b684071508cb Mon Sep 17 00:00:00 2001 From: Pugazhendhi Date: Wed, 1 Jul 2026 20:41:44 +0530 Subject: [PATCH] WEB-5004: attach referenced file contents to tool telemetry Add a uniform `file_content` field to the PreToolUse and PostToolUse/Stop payloads of all five coding-tool hooks (claude-code, cursor, codex, copilot, augment). For a file tool (Read/Write/Edit/MCP) the hook reads the referenced file and attaches `file_content = [{path, content, truncated}]`. - Shared helpers (_cap_file_text, _resolve_and_read_file, _make_file_entry, _attach_file_content) are byte-identical across all five hooks. - Write reuses the inline content (file may not exist on disk yet); Read/Edit read from disk, resolving relative paths against cwd. - 64KB per-file cap with a per-entry `truncated` flag. - Fail-open: missing/binary/non-UTF8/permission-denied/directory/unresolvable paths are silently skipped; no new imports; never raises or blocks the editor. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01GrSjwjtXsfxcBpANpkqQg4 --- augment/hooks/unbound.py | 88 +++++++++++++++++++++++++++++++++++ claude-code/hooks/unbound.py | 90 +++++++++++++++++++++++++++++++++++- codex/hooks/unbound.py | 81 ++++++++++++++++++++++++++++++++ copilot/hooks/unbound.py | 84 +++++++++++++++++++++++++++++++-- cursor/unbound.py | 84 +++++++++++++++++++++++++++++++-- 5 files changed, 418 insertions(+), 9 deletions(-) diff --git a/augment/hooks/unbound.py b/augment/hooks/unbound.py index bee8f64..e75f270 100644 --- a/augment/hooks/unbound.py +++ b/augment/hooks/unbound.py @@ -1195,6 +1195,16 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: metadata['file_path'] = tool_input[key] break + # Attach the target file's contents for pre-tool telemetry. Write-style tools + # carry the new text inline (file may not exist on disk yet); others read disk. + if metadata.get('file_path'): + _attach_file_content( + metadata, + metadata['file_path'], + event.get('cwd'), + tool_input.get('content') if isinstance(tool_input.get('content'), str) else None, + ) + if is_mcp: # mcp_metadata is set only when the matcher has includeMCPMetadata AND the # surface populates it (the VS Code extension sends null). Prefer it; else @@ -1373,6 +1383,13 @@ def _io_response(): path = (tool_input.get('file_path') or tool_input.get('path') or tool_input.get('filePath') or first_change.get('path') or '') canon_input = {'file_path': path} + # Reuse post-execution captured content (file_changes / tool_output) as + # inline_content so we don't re-read disk; fall back to disk only if absent. + inline = (first_change.get('content') or tool_input.get('content') + or (tool_output if canonical == 'Read' else None)) + if path: + _attach_file_content(canon_input, path, ev.get('cwd'), + inline if isinstance(inline, str) else None) if canonical == 'Read': tool_response = {'content': tool_output} if tool_output else {} else: @@ -2006,6 +2023,77 @@ def _resolve_cwd(event: Dict) -> Optional[str]: return None +_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap + + +def _cap_file_text(text): + """Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8.""" + encoded = text.encode('utf-8') + if len(encoded) <= _MAX_FILE_CONTENT_BYTES: + return text, False + return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True + + +def _resolve_and_read_file(file_path, cwd): + """Read one file as text, resolving a relative path against cwd. Returns + (content, truncated), or None on any problem — missing file, unresolvable + relative path, permission denied, a directory, binary or non-UTF8 content, or + any OS error. Never raises (fail-open): the caller just omits the content.""" + try: + if not file_path or not isinstance(file_path, str): + return None + if not os.path.isabs(file_path): + if not cwd: + return None # cannot form an absolute path safely -> skip + file_path = os.path.join(cwd, file_path) + if not os.path.isfile(file_path): + return None # missing file or a directory + with open(file_path, 'rb') as f: + raw = f.read(_MAX_FILE_CONTENT_BYTES + 1) + truncated = len(raw) > _MAX_FILE_CONTENT_BYTES + raw = raw[:_MAX_FILE_CONTENT_BYTES] + if b'\x00' in raw: + return None # binary file + if truncated: + return raw.decode('utf-8', errors='ignore'), True + try: + return raw.decode('utf-8'), False + except UnicodeDecodeError: + return None # non-UTF8 / binary + except Exception: + return None # permission denied, OS error, etc. -> skip + + +def _make_file_entry(path, cwd, inline_content=None): + """Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools + pass the new text as inline_content (the file may not exist on disk yet).""" + try: + if not path or not isinstance(path, str): + return None + if isinstance(inline_content, str): + content, truncated = _cap_file_text(inline_content) + else: + res = _resolve_and_read_file(path, cwd) + if res is None: + return None + content, truncated = res + return {'path': path, 'content': content, 'truncated': truncated} + except Exception: + return None + + +def _attach_file_content(target, file_path, cwd, inline_content=None): + """Attach target['file_content'] (a list of {path, content, truncated}) for a + single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools. + Best-effort: on any unreadable file the key is simply left absent.""" + try: + entry = _make_file_entry(file_path, cwd, inline_content) + if entry is not None: + target['file_content'] = [entry] + except Exception: + return + + def main(): global _cached_api_key api_key = get_api_key() diff --git a/claude-code/hooks/unbound.py b/claude-code/hooks/unbound.py index 821aaca..9bcb902 100644 --- a/claude-code/hooks/unbound.py +++ b/claude-code/hooks/unbound.py @@ -967,6 +967,77 @@ def _read_script_body_b64(command, args, cwd): return None +_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap + + +def _cap_file_text(text): + """Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8.""" + encoded = text.encode('utf-8') + if len(encoded) <= _MAX_FILE_CONTENT_BYTES: + return text, False + return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True + + +def _resolve_and_read_file(file_path, cwd): + """Read one file as text, resolving a relative path against cwd. Returns + (content, truncated), or None on any problem — missing file, unresolvable + relative path, permission denied, a directory, binary or non-UTF8 content, or + any OS error. Never raises (fail-open): the caller just omits the content.""" + try: + if not file_path or not isinstance(file_path, str): + return None + if not os.path.isabs(file_path): + if not cwd: + return None # cannot form an absolute path safely -> skip + file_path = os.path.join(cwd, file_path) + if not os.path.isfile(file_path): + return None # missing file or a directory + with open(file_path, 'rb') as f: + raw = f.read(_MAX_FILE_CONTENT_BYTES + 1) + truncated = len(raw) > _MAX_FILE_CONTENT_BYTES + raw = raw[:_MAX_FILE_CONTENT_BYTES] + if b'\x00' in raw: + return None # binary file + if truncated: + return raw.decode('utf-8', errors='ignore'), True + try: + return raw.decode('utf-8'), False + except UnicodeDecodeError: + return None # non-UTF8 / binary + except Exception: + return None # permission denied, OS error, etc. -> skip + + +def _make_file_entry(path, cwd, inline_content=None): + """Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools + pass the new text as inline_content (the file may not exist on disk yet).""" + try: + if not path or not isinstance(path, str): + return None + if isinstance(inline_content, str): + content, truncated = _cap_file_text(inline_content) + else: + res = _resolve_and_read_file(path, cwd) + if res is None: + return None + content, truncated = res + return {'path': path, 'content': content, 'truncated': truncated} + except Exception: + return None + + +def _attach_file_content(target, file_path, cwd, inline_content=None): + """Attach target['file_content'] (a list of {path, content, truncated}) for a + single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools. + Best-effort: on any unreadable file the key is simply left absent.""" + try: + entry = _make_file_entry(file_path, cwd, inline_content) + if entry is not None: + target['file_content'] = [entry] + except Exception: + return + + def _read_mcp_server_config(server_name: str, config_path: Path, cwd: Optional[str] = None) -> Optional[Dict]: try: if not config_path.exists(): @@ -1195,6 +1266,10 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: tool_input = event.get('tool_input') or {} if 'file_path' in tool_input: metadata['file_path'] = tool_input['file_path'] + _attach_file_content( + metadata, tool_input.get('file_path'), event.get('cwd'), + tool_input.get('content'), + ) if is_mcp: # Parse mcp____ to extract server and tool for gateway matching @@ -1371,13 +1446,24 @@ def build_llm_exchange(events: List[Dict], stop_assistant_message: Optional[str] if tool_response['content'] == tool_input['content']: tool_response = {k: v for k, v in tool_response.items() if k != 'content'} - assistant_tool_uses.append({ + tool_use_obj = { 'type': 'PostToolUse', 'tool_name': tool_name, 'tool_input': tool_input, 'tool_response': tool_response, 'tool_use_id': event.get('tool_use_id') - }) + } + if isinstance(tool_input, dict) and 'file_path' in tool_input: + _inline = tool_input.get('content') + if not isinstance(_inline, str) and isinstance(tool_response, dict): + _resp_content = tool_response.get('content') + if isinstance(_resp_content, str): + _inline = _resp_content + _attach_file_content( + tool_use_obj, tool_input.get('file_path'), + event.get('cwd'), _inline, + ) + assistant_tool_uses.append(tool_use_obj) if user_prompt: messages.append({'role': 'user', 'content': user_prompt}) diff --git a/codex/hooks/unbound.py b/codex/hooks/unbound.py index 752ba40..760510b 100644 --- a/codex/hooks/unbound.py +++ b/codex/hooks/unbound.py @@ -674,6 +674,77 @@ def _compute_script_hash(command, args, cwd): return None +_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap + + +def _cap_file_text(text): + """Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8.""" + encoded = text.encode('utf-8') + if len(encoded) <= _MAX_FILE_CONTENT_BYTES: + return text, False + return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True + + +def _resolve_and_read_file(file_path, cwd): + """Read one file as text, resolving a relative path against cwd. Returns + (content, truncated), or None on any problem — missing file, unresolvable + relative path, permission denied, a directory, binary or non-UTF8 content, or + any OS error. Never raises (fail-open): the caller just omits the content.""" + try: + if not file_path or not isinstance(file_path, str): + return None + if not os.path.isabs(file_path): + if not cwd: + return None # cannot form an absolute path safely -> skip + file_path = os.path.join(cwd, file_path) + if not os.path.isfile(file_path): + return None # missing file or a directory + with open(file_path, 'rb') as f: + raw = f.read(_MAX_FILE_CONTENT_BYTES + 1) + truncated = len(raw) > _MAX_FILE_CONTENT_BYTES + raw = raw[:_MAX_FILE_CONTENT_BYTES] + if b'\x00' in raw: + return None # binary file + if truncated: + return raw.decode('utf-8', errors='ignore'), True + try: + return raw.decode('utf-8'), False + except UnicodeDecodeError: + return None # non-UTF8 / binary + except Exception: + return None # permission denied, OS error, etc. -> skip + + +def _make_file_entry(path, cwd, inline_content=None): + """Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools + pass the new text as inline_content (the file may not exist on disk yet).""" + try: + if not path or not isinstance(path, str): + return None + if isinstance(inline_content, str): + content, truncated = _cap_file_text(inline_content) + else: + res = _resolve_and_read_file(path, cwd) + if res is None: + return None + content, truncated = res + return {'path': path, 'content': content, 'truncated': truncated} + except Exception: + return None + + +def _attach_file_content(target, file_path, cwd, inline_content=None): + """Attach target['file_content'] (a list of {path, content, truncated}) for a + single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools. + Best-effort: on any unreadable file the key is simply left absent.""" + try: + entry = _make_file_entry(file_path, cwd, inline_content) + if entry is not None: + target['file_content'] = [entry] + except Exception: + return + + def _augment_script_hash(result, cwd): """Add scriptHash to an MCP server config when it runs a local script, so the gateway can fingerprint it as `script:`.""" @@ -855,6 +926,10 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict: # Build metadata with the raw event metadata = dict(event) + tool_input = event.get('tool_input') or {} + if tool_input.get('file_path'): + _attach_file_content(metadata, tool_input.get('file_path'), event.get('cwd'), tool_input.get('content')) + if is_mcp: # Parse mcp____ to extract server and tool for gateway matching parts = tool_name[len(MCP_TOOL_PREFIX):].split('__', 1) @@ -1224,6 +1299,12 @@ def process_stop_event(event: Dict, api_key: str): # Parse tool uses from Codex transcript (function_call/function_call_output pairs) assistant_tool_uses = parse_codex_transcript_for_tools(transcript_path, user_prompt_timestamp) + cwd = event.get('cwd') + for tool_use in assistant_tool_uses: + tu_input = tool_use.get('tool_input') or {} + if tu_input.get('file_path'): + _attach_file_content(tool_use, tu_input.get('file_path'), cwd, tu_input.get('content')) + assistant_msg = { 'role': 'assistant', 'content': last_assistant_message or '' diff --git a/copilot/hooks/unbound.py b/copilot/hooks/unbound.py index 781f4b1..305b508 100644 --- a/copilot/hooks/unbound.py +++ b/copilot/hooks/unbound.py @@ -652,6 +652,77 @@ def _compute_script_hash(command, args, cwd): return None +_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap + + +def _cap_file_text(text): + """Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8.""" + encoded = text.encode('utf-8') + if len(encoded) <= _MAX_FILE_CONTENT_BYTES: + return text, False + return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True + + +def _resolve_and_read_file(file_path, cwd): + """Read one file as text, resolving a relative path against cwd. Returns + (content, truncated), or None on any problem — missing file, unresolvable + relative path, permission denied, a directory, binary or non-UTF8 content, or + any OS error. Never raises (fail-open): the caller just omits the content.""" + try: + if not file_path or not isinstance(file_path, str): + return None + if not os.path.isabs(file_path): + if not cwd: + return None # cannot form an absolute path safely -> skip + file_path = os.path.join(cwd, file_path) + if not os.path.isfile(file_path): + return None # missing file or a directory + with open(file_path, 'rb') as f: + raw = f.read(_MAX_FILE_CONTENT_BYTES + 1) + truncated = len(raw) > _MAX_FILE_CONTENT_BYTES + raw = raw[:_MAX_FILE_CONTENT_BYTES] + if b'\x00' in raw: + return None # binary file + if truncated: + return raw.decode('utf-8', errors='ignore'), True + try: + return raw.decode('utf-8'), False + except UnicodeDecodeError: + return None # non-UTF8 / binary + except Exception: + return None # permission denied, OS error, etc. -> skip + + +def _make_file_entry(path, cwd, inline_content=None): + """Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools + pass the new text as inline_content (the file may not exist on disk yet).""" + try: + if not path or not isinstance(path, str): + return None + if isinstance(inline_content, str): + content, truncated = _cap_file_text(inline_content) + else: + res = _resolve_and_read_file(path, cwd) + if res is None: + return None + content, truncated = res + return {'path': path, 'content': content, 'truncated': truncated} + except Exception: + return None + + +def _attach_file_content(target, file_path, cwd, inline_content=None): + """Attach target['file_content'] (a list of {path, content, truncated}) for a + single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools. + Best-effort: on any unreadable file the key is simply left absent.""" + try: + entry = _make_file_entry(file_path, cwd, inline_content) + if entry is not None: + target['file_content'] = [entry] + except Exception: + return + + def _augment_script_hash(result, cwd): """Add scriptHash to an MCP server config when it runs a local script, so the gateway can fingerprint it as `script:`.""" @@ -1093,6 +1164,7 @@ def process_pre_tool_use(event, api_key): file_path = tool_input.get('filePath') or tool_input.get('path') or tool_input.get('file_path') if file_path: metadata['file_path'] = file_path + _attach_file_content(metadata, file_path, event.get('cwd'), tool_input.get('content')) if mcp_server is not None: metadata['mcp_server'] = mcp_server @@ -1233,7 +1305,7 @@ def _extract_patch_target_path(args): return m.group(1).strip() if m else '' -def map_copilot_tool(name, args, result_content): +def map_copilot_tool(name, args, result_content, cwd=None): """Map a Copilot tool call to a cursor-style tool_use entry. Returns None for internal orchestration tools (intentionally not emitted). @@ -1272,11 +1344,16 @@ def map_copilot_tool(name, args, result_content): 'tool_input': args, 'result_json': result_content or '', } + # Uniform file_content for file-oriented tools; reuse the content already on + # the entry, falling back to a disk read (relative paths resolved via cwd). + file_path = entry.get('file_path') + if file_path: + _attach_file_content(entry, file_path, cwd, entry.get('content') or None) # Drop empty-string values. return {k: v for k, v in entry.items() if v != ''} -def build_exchange_from_transcript(transcript_path, fallback_session_id, session_start_model=None): +def build_exchange_from_transcript(transcript_path, fallback_session_id, session_start_model=None, cwd=None): """Parse a Copilot JSONL transcript into a cursor-style LLM exchange. Reads defensively — blank or unparseable lines are skipped, never raised.""" @@ -1381,7 +1458,7 @@ def _register(call_id): tool_use = [] for call_id in tool_calls: call = tool_data[call_id] - mapped = map_copilot_tool(call['name'], call['arguments'], call['result']) + mapped = map_copilot_tool(call['name'], call['arguments'], call['result'], cwd) # `is not None` (not truthiness): None means a consciously-dropped internal # tool; an empty-but-valid dict should still be appended. if mapped is not None: @@ -1833,6 +1910,7 @@ def main(): exchange = build_exchange_from_transcript( event.get('transcript_path'), session_id, session_start_model=get_session_start_model(session_id), + cwd=event.get('cwd'), ) if exchange: # Turn boundaries from event-fire times diff --git a/cursor/unbound.py b/cursor/unbound.py index db15133..7f08163 100644 --- a/cursor/unbound.py +++ b/cursor/unbound.py @@ -688,6 +688,7 @@ def process_pre_tool_use(event, api_key): file_path = tool_input.get('file_path', '') if file_path: metadata['file_path'] = file_path + _attach_file_content(metadata, file_path, event.get('cwd'), tool_input.get('content')) approval_key = f"{tool_name}:{file_path}" if file_path else tool_name is_retry = _is_approval_retry(approval_key) @@ -879,6 +880,77 @@ def _augment_script_hash(result, cwd): return result +_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap + + +def _cap_file_text(text): + """Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8.""" + encoded = text.encode('utf-8') + if len(encoded) <= _MAX_FILE_CONTENT_BYTES: + return text, False + return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True + + +def _resolve_and_read_file(file_path, cwd): + """Read one file as text, resolving a relative path against cwd. Returns + (content, truncated), or None on any problem — missing file, unresolvable + relative path, permission denied, a directory, binary or non-UTF8 content, or + any OS error. Never raises (fail-open): the caller just omits the content.""" + try: + if not file_path or not isinstance(file_path, str): + return None + if not os.path.isabs(file_path): + if not cwd: + return None # cannot form an absolute path safely -> skip + file_path = os.path.join(cwd, file_path) + if not os.path.isfile(file_path): + return None # missing file or a directory + with open(file_path, 'rb') as f: + raw = f.read(_MAX_FILE_CONTENT_BYTES + 1) + truncated = len(raw) > _MAX_FILE_CONTENT_BYTES + raw = raw[:_MAX_FILE_CONTENT_BYTES] + if b'\x00' in raw: + return None # binary file + if truncated: + return raw.decode('utf-8', errors='ignore'), True + try: + return raw.decode('utf-8'), False + except UnicodeDecodeError: + return None # non-UTF8 / binary + except Exception: + return None # permission denied, OS error, etc. -> skip + + +def _make_file_entry(path, cwd, inline_content=None): + """Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools + pass the new text as inline_content (the file may not exist on disk yet).""" + try: + if not path or not isinstance(path, str): + return None + if isinstance(inline_content, str): + content, truncated = _cap_file_text(inline_content) + else: + res = _resolve_and_read_file(path, cwd) + if res is None: + return None + content, truncated = res + return {'path': path, 'content': content, 'truncated': truncated} + except Exception: + return None + + +def _attach_file_content(target, file_path, cwd, inline_content=None): + """Attach target['file_content'] (a list of {path, content, truncated}) for a + single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools. + Best-effort: on any unreadable file the key is simply left absent.""" + try: + entry = _make_file_entry(file_path, cwd, inline_content) + if entry is not None: + target['file_content'] = [entry] + except Exception: + return + + def _read_mcp_server_config(server_name, config_path): """ Read an MCP server's config (url, command, args) from a config file. @@ -1129,12 +1201,14 @@ def build_llm_exchange(events, api_key=None): usage = _cursor_usage_from_event(event) or usage elif hook_event_name == 'beforeReadFile': - assistant_tool_uses.append({ + tool_use = { 'type': hook_event_name, 'file_path': event.get('file_path'), 'content': event.get('content', ''), 'attachments': event.get('attachments', []) - }) + } + _attach_file_content(tool_use, event.get('file_path'), event.get('cwd'), event.get('content') or None) + assistant_tool_uses.append(tool_use) elif hook_event_name == 'postToolUse': tool_name = event.get('tool_name', '') @@ -1154,11 +1228,13 @@ def build_llm_exchange(events, api_key=None): }) elif hook_event_name == 'afterFileEdit': - assistant_tool_uses.append({ + tool_use = { 'type': hook_event_name, 'file_path': event.get('file_path'), 'edits': event.get('edits', []) - }) + } + _attach_file_content(tool_use, event.get('file_path'), event.get('cwd'), None) + assistant_tool_uses.append(tool_use) elif hook_event_name == 'afterShellExecution': assistant_tool_uses.append({