Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions claude-code/hooks/test_broken_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
Tests for AI-GATEWAY-3J: a benign BrokenPipeError raised while emitting the
hook response (host closed the read end of stdout) must NOT be self-reported,
while genuine exceptions must still be reported. Also verifies _emit itself
swallows a dead pipe instead of crashing.
"""

import io
import sys
import unittest
from unittest.mock import Mock, patch

import unbound


class TestMainBrokenPipeNotReported(unittest.TestCase):
def test_broken_pipe_on_emit_is_not_reported(self):
# Empty stdin -> main() hits the first _emit; make that _emit raise a
# broken pipe. main() must catch it, NOT log, NOT report to gateway.
with patch.object(unbound, "_emit", side_effect=BrokenPipeError(32, "Broken pipe")), \
patch.object(unbound, "report_error_to_gateway", Mock()) as report, \
patch.object(unbound, "log_error", Mock()) as log, \
patch.object(unbound, "get_api_key", lambda: "K"), \
patch.object(unbound.sys, "stdin", io.StringIO("")):
try:
unbound.main()
except BrokenPipeError:
self.fail("main() let BrokenPipeError escape")
self.assertEqual(report.call_count, 0)
self.assertEqual(log.call_count, 0)


class TestMainRealExceptionStillReported(unittest.TestCase):
def test_real_exception_is_reported(self):
# A Stop event reaches append_to_audit_log; make that collaborator raise
# a genuine error. main() must report it via log_error with category
# 'general' and a message containing the original text.
event = '{"hook_event_name": "Stop", "session_id": "s"}'
with patch.object(unbound, "log_error", Mock()) as log, \
patch.object(unbound, "append_to_audit_log", side_effect=RuntimeError("boom")), \
patch.object(unbound, "get_api_key", lambda: "K"), \
patch.object(unbound.sys, "stdin", io.StringIO(event)), \
patch.object(unbound.sys, "stdout", io.StringIO()):
unbound.main()
self.assertEqual(log.call_count, 1)
args, _ = log.call_args
self.assertIn("Exception in main: boom", args[0])
self.assertEqual(args[1], "general")


class TestEmitDeadPipe(unittest.TestCase):
def setUp(self):
self._real_stdout = sys.stdout

def tearDown(self):
swapped = unbound.sys.stdout
sys.stdout = self._real_stdout
if swapped is not self._real_stdout:
try:
swapped.close()
except Exception:
pass

def test_emit_to_dead_pipe_does_not_raise(self):
class DeadPipe:
def write(self, _):
raise BrokenPipeError(32, "Broken pipe")

def flush(self):
raise BrokenPipeError(32, "Broken pipe")

unbound.sys.stdout = DeadPipe()
try:
unbound._emit("{}")
except Exception as e:
self.fail(f"_emit raised on dead pipe: {e!r}")
# _emit swapped stdout to a working sink, so a second emit is also safe.
self.assertNotIsInstance(unbound.sys.stdout, DeadPipe)
try:
unbound._emit("{}")
except Exception as e:
self.fail(f"second _emit raised after swap: {e!r}")


if __name__ == "__main__":
unittest.main()
37 changes: 29 additions & 8 deletions claude-code/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,20 @@ def log_error(message: str, category: str = 'general'):
report_error_to_gateway(message, category, _cached_api_key)


def _emit(text):
"""Write a hook response line to stdout, treating a closed reader pipe as
a benign no-op. The host may close the read end (timeout, cancel, session
end, blocked approval-poll) before we flush — that is not a hook error."""
try:
sys.stdout.write(text + "\n")
sys.stdout.flush()
except (BrokenPipeError, OSError):
try:
sys.stdout = open(os.devnull, "w")
except Exception:
Comment on lines +155 to +163

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Silent OSError swallow is broader than intended

_emit catches (BrokenPipeError, OSError), but BrokenPipeError is already a subclass of OSError, so the tuple is redundant and the bare OSError catch covers every OS-level I/O error on stdout — not just closed-pipe scenarios. A non-pipe OSError (e.g., ENOSPC on a redirected stdout, a permission error on a named pipe used for IPC) would be silently swallowed, stdout swapped to /dev/null, and no error reported anywhere. The same pattern is duplicated in codex/hooks/unbound.py, copilot/hooks/unbound.py, and cursor/unbound.py.

pass


def _read_policy_cache_raw() -> Optional[Dict]:
"""Read and JSON-parse the policy cache file. Returns None on missing/corrupt."""
try:
Expand Down Expand Up @@ -1659,13 +1673,13 @@ def main():
input_data = sys.stdin.read().strip()

if not input_data:
print('{"suppressOutput": true}', flush=True)
_emit('{"suppressOutput": true}')
return

try:
event = json.loads(input_data)
except json.JSONDecodeError:
print('{"suppressOutput": true}', flush=True)
_emit('{"suppressOutput": true}')
return

hook_event_name = event.get('hook_event_name')
Expand All @@ -1676,15 +1690,15 @@ def main():
_device_serial() # warm the (slow) serial probe + cache once per session
_check_self_update()
_dispatch_discovery()
print("{}")
_emit("{}")
return
session_id = event.get('session_id')

# Handle PreToolUse - return immediately after decision is made
if hook_event_name == 'PreToolUse':
response = process_pre_tool_use(event, api_key)
response["suppressOutput"] = True
print(json.dumps(response), flush=True)
_emit(json.dumps(response))
return

# Handle UserPromptSubmit - check policy before processing
Expand All @@ -1699,7 +1713,7 @@ def main():
'event': event
})
response["suppressOutput"] = True
print(json.dumps(response), flush=True)
_emit(json.dumps(response))
return

# If allowed, continue to log the event (output printed at end)
Expand All @@ -1718,12 +1732,19 @@ def main():

cleanup_old_logs()

print('{"suppressOutput": true}', flush=True)

_emit('{"suppressOutput": true}')

except BrokenPipeError:
# Host closed the read end of our stdout pipe (timeout / cancel /
# session end / blocked approval-poll). Benign — do not self-report.
try:
sys.stdout = open(os.devnull, "w")
except Exception:
pass
Comment on lines +1737 to +1743

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Broken-pipe path is completely dark — no way to confirm the fix is working

The except BrokenPipeError handler redirects stdout to /dev/null and exits silently. Nothing is written to stderr, no counter is incremented, and no gateway call is made. Once deployed, the team can only verify the fix by watching AI-GATEWAY-3J go quiet in Sentry. If broken-pipe events spike again (e.g., a new 4h foreground poll timeout trigger), or if the fix has a regression in an edge case, there is no signal to detect it. Even a single print("broken pipe suppressed", file=sys.stderr) (or a non-Sentry lightweight counter via the existing gateway metrics path) would allow the team to confirm frequency post-deploy. This same gap exists in all four hooks.

except Exception as e:
# Still return empty JSON object to Claude Code to indicate completion
log_error(f"Exception in main: {str(e)}", 'general')
print('{"suppressOutput": true}', flush=True)
_emit('{"suppressOutput": true}')


if __name__ == '__main__':
Expand Down
84 changes: 84 additions & 0 deletions codex/hooks/test_broken_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
Tests for AI-GATEWAY-3J: a benign BrokenPipeError raised while emitting the
hook response (host closed the read end of stdout) must NOT be self-reported,
while genuine exceptions must still be reported. Also verifies _emit itself
swallows a dead pipe instead of crashing.
"""

import io
import sys
import unittest
from unittest.mock import Mock, patch

import unbound


class TestMainBrokenPipeNotReported(unittest.TestCase):
def test_broken_pipe_on_emit_is_not_reported(self):
# Empty stdin -> main() hits the first _emit; make that _emit raise a
# broken pipe. main() must catch it, NOT log, NOT report to gateway.
with patch.object(unbound, "_emit", side_effect=BrokenPipeError(32, "Broken pipe")), \
patch.object(unbound, "report_error_to_gateway", Mock()) as report, \
patch.object(unbound, "log_error", Mock()) as log, \
patch.object(unbound.sys, "stdin", io.StringIO("")):
try:
unbound.main()
except BrokenPipeError:
self.fail("main() let BrokenPipeError escape")
self.assertEqual(report.call_count, 0)
self.assertEqual(log.call_count, 0)


class TestMainRealExceptionStillReported(unittest.TestCase):
def test_real_exception_is_reported(self):
# A Stop event reaches append_to_audit_log; make that collaborator raise
# a genuine error. main() must report it via log_error with category
# 'general' and a message containing the original text.
event = '{"hook_event_name": "Stop", "session_id": "s"}'
with patch.object(unbound, "log_error", Mock()) as log, \
patch.object(unbound, "append_to_audit_log", side_effect=RuntimeError("boom")), \
patch.object(unbound.sys, "stdin", io.StringIO(event)), \
patch.object(unbound.sys, "stdout", io.StringIO()):
unbound.main()
self.assertEqual(log.call_count, 1)
args, _ = log.call_args
self.assertIn("Exception in main: boom", args[0])
self.assertEqual(args[1], "general")


class TestEmitDeadPipe(unittest.TestCase):
def setUp(self):
self._real_stdout = sys.stdout

def tearDown(self):
swapped = unbound.sys.stdout
sys.stdout = self._real_stdout
if swapped is not self._real_stdout:
try:
swapped.close()
except Exception:
pass

def test_emit_to_dead_pipe_does_not_raise(self):
class DeadPipe:
def write(self, _):
raise BrokenPipeError(32, "Broken pipe")

def flush(self):
raise BrokenPipeError(32, "Broken pipe")

unbound.sys.stdout = DeadPipe()
try:
unbound._emit("{}")
except Exception as e:
self.fail(f"_emit raised on dead pipe: {e!r}")
# _emit swapped stdout to a working sink, so a second emit is also safe.
self.assertNotIsInstance(unbound.sys.stdout, DeadPipe)
try:
unbound._emit("{}")
except Exception as e:
self.fail(f"second _emit raised after swap: {e!r}")


if __name__ == "__main__":
unittest.main()
35 changes: 28 additions & 7 deletions codex/hooks/unbound.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ def log_error(message: str, category: str = 'general'):
report_error_to_gateway(message, category, _cached_api_key)


def _emit(text):
"""Write a hook response line to stdout, treating a closed reader pipe as
a benign no-op. The host may close the read end (timeout, cancel, session
end, blocked approval-poll) before we flush — that is not a hook error."""
try:
sys.stdout.write(text + "\n")
sys.stdout.flush()
except (BrokenPipeError, OSError):
try:
sys.stdout = open(os.devnull, "w")
except Exception:
pass


def _read_policy_cache_raw() -> Optional[Dict]:
"""Read and JSON-parse the policy cache file. Returns None on missing/corrupt."""
try:
Expand Down Expand Up @@ -1570,13 +1584,13 @@ def main():
input_data = sys.stdin.read().strip()

if not input_data:
print('{"suppressOutput": true}', flush=True)
_emit('{"suppressOutput": true}')
return

try:
event = json.loads(input_data)
except json.JSONDecodeError:
print('{"suppressOutput": true}', flush=True)
_emit('{"suppressOutput": true}')
return

hook_event_name = event.get('hook_event_name')
Expand All @@ -1586,15 +1600,15 @@ def main():
if hook_event_name == "SessionStart":
_check_self_update()
_dispatch_discovery()
print("{}")
_emit("{}")
return
session_id = event.get('session_id')

# Handle PreToolUse - return immediately after decision is made
# Note: Codex PreToolUse does not support suppressOutput
if hook_event_name == 'PreToolUse':
response = process_pre_tool_use(event, api_key)
print(json.dumps(response), flush=True)
_emit(json.dumps(response))
return

# Handle UserPromptSubmit - check policy before processing
Expand All @@ -1609,7 +1623,7 @@ def main():
'event': event
})
response["suppressOutput"] = True
print(json.dumps(response), flush=True)
_emit(json.dumps(response))
return

# If allowed, continue to log the event (output printed at end)
Expand All @@ -1628,12 +1642,19 @@ def main():

cleanup_old_logs()

print('{"suppressOutput": true}', flush=True)
_emit('{"suppressOutput": true}')

except BrokenPipeError:
# Host closed the read end of our stdout pipe (timeout / cancel /
# session end / blocked approval-poll). Benign — do not self-report.
try:
sys.stdout = open(os.devnull, "w")
except Exception:
pass
except Exception as e:
# Still return empty JSON object to Codex to indicate completion
log_error(f"Exception in main: {str(e)}", 'general')
print('{"suppressOutput": true}', flush=True)
_emit('{"suppressOutput": true}')


if __name__ == '__main__':
Expand Down
Loading