Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions augment/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,16 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict:
metadata['file_path'] = tool_input[key]
break

# Attach the target file's contents for pre-tool telemetry. Write-style tools
# carry the new text inline (file may not exist on disk yet); others read disk.
if metadata.get('file_path'):
_attach_file_content(
metadata,
metadata['file_path'],
event.get('cwd'),
tool_input.get('content') if isinstance(tool_input.get('content'), str) else None,
)

if is_mcp:
# mcp_metadata is set only when the matcher has includeMCPMetadata AND the
# surface populates it (the VS Code extension sends null). Prefer it; else
Expand Down Expand Up @@ -1373,6 +1383,13 @@ def _io_response():
path = (tool_input.get('file_path') or tool_input.get('path')
or tool_input.get('filePath') or first_change.get('path') or '')
canon_input = {'file_path': path}
# Reuse post-execution captured content (file_changes / tool_output) as
# inline_content so we don't re-read disk; fall back to disk only if absent.
inline = (first_change.get('content') or tool_input.get('content')
or (tool_output if canonical == 'Read' else None))
if path:
_attach_file_content(canon_input, path, ev.get('cwd'),
inline if isinstance(inline, str) else None)
if canonical == 'Read':
tool_response = {'content': tool_output} if tool_output else {}
else:
Expand Down Expand Up @@ -2006,6 +2023,77 @@ def _resolve_cwd(event: Dict) -> Optional[str]:
return None


_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap


def _cap_file_text(text):
"""Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8."""
encoded = text.encode('utf-8')
if len(encoded) <= _MAX_FILE_CONTENT_BYTES:
return text, False
return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True


def _resolve_and_read_file(file_path, cwd):
"""Read one file as text, resolving a relative path against cwd. Returns
(content, truncated), or None on any problem — missing file, unresolvable
relative path, permission denied, a directory, binary or non-UTF8 content, or
any OS error. Never raises (fail-open): the caller just omits the content."""
try:
if not file_path or not isinstance(file_path, str):
return None
if not os.path.isabs(file_path):
if not cwd:
return None # cannot form an absolute path safely -> skip
file_path = os.path.join(cwd, file_path)
if not os.path.isfile(file_path):
return None # missing file or a directory
with open(file_path, 'rb') as f:
raw = f.read(_MAX_FILE_CONTENT_BYTES + 1)
truncated = len(raw) > _MAX_FILE_CONTENT_BYTES
raw = raw[:_MAX_FILE_CONTENT_BYTES]
if b'\x00' in raw:
return None # binary file
if truncated:
return raw.decode('utf-8', errors='ignore'), True
try:
return raw.decode('utf-8'), False
except UnicodeDecodeError:
return None # non-UTF8 / binary
except Exception:
return None # permission denied, OS error, etc. -> skip


def _make_file_entry(path, cwd, inline_content=None):
"""Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools
pass the new text as inline_content (the file may not exist on disk yet)."""
try:
if not path or not isinstance(path, str):
return None
if isinstance(inline_content, str):
content, truncated = _cap_file_text(inline_content)
else:
res = _resolve_and_read_file(path, cwd)
if res is None:
return None
content, truncated = res
return {'path': path, 'content': content, 'truncated': truncated}
except Exception:
return None


def _attach_file_content(target, file_path, cwd, inline_content=None):
"""Attach target['file_content'] (a list of {path, content, truncated}) for a
single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools.
Best-effort: on any unreadable file the key is simply left absent."""
try:
entry = _make_file_entry(file_path, cwd, inline_content)
if entry is not None:
target['file_content'] = [entry]
except Exception:
return


def main():
global _cached_api_key
api_key = get_api_key()
Expand Down
90 changes: 88 additions & 2 deletions claude-code/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,77 @@ def _read_script_body_b64(command, args, cwd):
return None


_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap


def _cap_file_text(text):
"""Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8."""
encoded = text.encode('utf-8')
if len(encoded) <= _MAX_FILE_CONTENT_BYTES:
return text, False
return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True


def _resolve_and_read_file(file_path, cwd):
"""Read one file as text, resolving a relative path against cwd. Returns
(content, truncated), or None on any problem — missing file, unresolvable
relative path, permission denied, a directory, binary or non-UTF8 content, or
any OS error. Never raises (fail-open): the caller just omits the content."""
try:
if not file_path or not isinstance(file_path, str):
return None
if not os.path.isabs(file_path):
if not cwd:
return None # cannot form an absolute path safely -> skip
file_path = os.path.join(cwd, file_path)
if not os.path.isfile(file_path):
return None # missing file or a directory
with open(file_path, 'rb') as f:
raw = f.read(_MAX_FILE_CONTENT_BYTES + 1)
truncated = len(raw) > _MAX_FILE_CONTENT_BYTES
raw = raw[:_MAX_FILE_CONTENT_BYTES]
if b'\x00' in raw:
return None # binary file
if truncated:
return raw.decode('utf-8', errors='ignore'), True
try:
return raw.decode('utf-8'), False
except UnicodeDecodeError:
return None # non-UTF8 / binary
except Exception:
return None # permission denied, OS error, etc. -> skip


def _make_file_entry(path, cwd, inline_content=None):
"""Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools
pass the new text as inline_content (the file may not exist on disk yet)."""
try:
if not path or not isinstance(path, str):
return None
if isinstance(inline_content, str):
content, truncated = _cap_file_text(inline_content)
else:
res = _resolve_and_read_file(path, cwd)
if res is None:
return None
content, truncated = res
return {'path': path, 'content': content, 'truncated': truncated}
except Exception:
return None


def _attach_file_content(target, file_path, cwd, inline_content=None):
"""Attach target['file_content'] (a list of {path, content, truncated}) for a
single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools.
Best-effort: on any unreadable file the key is simply left absent."""
try:
entry = _make_file_entry(file_path, cwd, inline_content)
if entry is not None:
target['file_content'] = [entry]
except Exception:
return


def _read_mcp_server_config(server_name: str, config_path: Path, cwd: Optional[str] = None) -> Optional[Dict]:
try:
if not config_path.exists():
Expand Down Expand Up @@ -1195,6 +1266,10 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict:
tool_input = event.get('tool_input') or {}
if 'file_path' in tool_input:
metadata['file_path'] = tool_input['file_path']
_attach_file_content(
metadata, tool_input.get('file_path'), event.get('cwd'),
tool_input.get('content'),
)

if is_mcp:
# Parse mcp__<server>__<tool> to extract server and tool for gateway matching
Expand Down Expand Up @@ -1371,13 +1446,24 @@ def build_llm_exchange(events: List[Dict], stop_assistant_message: Optional[str]
if tool_response['content'] == tool_input['content']:
tool_response = {k: v for k, v in tool_response.items() if k != 'content'}

assistant_tool_uses.append({
tool_use_obj = {
'type': 'PostToolUse',
'tool_name': tool_name,
'tool_input': tool_input,
'tool_response': tool_response,
'tool_use_id': event.get('tool_use_id')
})
}
if isinstance(tool_input, dict) and 'file_path' in tool_input:
_inline = tool_input.get('content')
if not isinstance(_inline, str) and isinstance(tool_response, dict):
_resp_content = tool_response.get('content')
if isinstance(_resp_content, str):
_inline = _resp_content
_attach_file_content(
tool_use_obj, tool_input.get('file_path'),
event.get('cwd'), _inline,
)
assistant_tool_uses.append(tool_use_obj)

if user_prompt:
messages.append({'role': 'user', 'content': user_prompt})
Expand Down
81 changes: 81 additions & 0 deletions codex/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,77 @@ def _compute_script_hash(command, args, cwd):
return None


_MAX_FILE_CONTENT_BYTES = 64 * 1024 # per-file cap


def _cap_file_text(text):
"""Return (text, truncated) capped to _MAX_FILE_CONTENT_BYTES of UTF-8."""
encoded = text.encode('utf-8')
if len(encoded) <= _MAX_FILE_CONTENT_BYTES:
return text, False
return encoded[:_MAX_FILE_CONTENT_BYTES].decode('utf-8', errors='ignore'), True


def _resolve_and_read_file(file_path, cwd):
"""Read one file as text, resolving a relative path against cwd. Returns
(content, truncated), or None on any problem — missing file, unresolvable
relative path, permission denied, a directory, binary or non-UTF8 content, or
any OS error. Never raises (fail-open): the caller just omits the content."""
try:
if not file_path or not isinstance(file_path, str):
return None
if not os.path.isabs(file_path):
if not cwd:
return None # cannot form an absolute path safely -> skip
file_path = os.path.join(cwd, file_path)
if not os.path.isfile(file_path):
return None # missing file or a directory
with open(file_path, 'rb') as f:
raw = f.read(_MAX_FILE_CONTENT_BYTES + 1)
truncated = len(raw) > _MAX_FILE_CONTENT_BYTES
raw = raw[:_MAX_FILE_CONTENT_BYTES]
if b'\x00' in raw:
return None # binary file
if truncated:
return raw.decode('utf-8', errors='ignore'), True
try:
return raw.decode('utf-8'), False
except UnicodeDecodeError:
return None # non-UTF8 / binary
except Exception:
return None # permission denied, OS error, etc. -> skip


def _make_file_entry(path, cwd, inline_content=None):
"""Build one {'path', 'content', 'truncated'} entry, or None. Write-style tools
pass the new text as inline_content (the file may not exist on disk yet)."""
try:
if not path or not isinstance(path, str):
return None
if isinstance(inline_content, str):
content, truncated = _cap_file_text(inline_content)
else:
res = _resolve_and_read_file(path, cwd)
if res is None:
return None
content, truncated = res
return {'path': path, 'content': content, 'truncated': truncated}
except Exception:
return None


def _attach_file_content(target, file_path, cwd, inline_content=None):
"""Attach target['file_content'] (a list of {path, content, truncated}) for a
single-file tool (Read/Write/Edit/MCP). Uniform key + shape across all tools.
Best-effort: on any unreadable file the key is simply left absent."""
try:
entry = _make_file_entry(file_path, cwd, inline_content)
if entry is not None:
target['file_content'] = [entry]
except Exception:
return


def _augment_script_hash(result, cwd):
"""Add scriptHash to an MCP server config when it runs a local script, so the
gateway can fingerprint it as `script:<hash>`."""
Expand Down Expand Up @@ -855,6 +926,10 @@ def process_pre_tool_use(event: Dict, api_key: str) -> Dict:
# Build metadata with the raw event
metadata = dict(event)

tool_input = event.get('tool_input') or {}
if tool_input.get('file_path'):
_attach_file_content(metadata, tool_input.get('file_path'), event.get('cwd'), tool_input.get('content'))

if is_mcp:
# Parse mcp__<server>__<tool> to extract server and tool for gateway matching
parts = tool_name[len(MCP_TOOL_PREFIX):].split('__', 1)
Expand Down Expand Up @@ -1224,6 +1299,12 @@ def process_stop_event(event: Dict, api_key: str):
# Parse tool uses from Codex transcript (function_call/function_call_output pairs)
assistant_tool_uses = parse_codex_transcript_for_tools(transcript_path, user_prompt_timestamp)

cwd = event.get('cwd')
for tool_use in assistant_tool_uses:
tu_input = tool_use.get('tool_input') or {}
if tu_input.get('file_path'):
_attach_file_content(tool_use, tu_input.get('file_path'), cwd, tu_input.get('content'))

assistant_msg = {
'role': 'assistant',
'content': last_assistant_message or ''
Expand Down
Loading