-
Notifications
You must be signed in to change notification settings - Fork 1
fix(hooks): stop self-reporting benign BrokenPipeError to gateway (AI-GATEWAY-3J) #177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| """ | ||
| Tests for AI-GATEWAY-3J: a benign BrokenPipeError raised while emitting the | ||
| hook response (host closed the read end of stdout) must NOT be self-reported, | ||
| while genuine exceptions must still be reported. Also verifies _emit itself | ||
| swallows a dead pipe instead of crashing. | ||
| """ | ||
|
|
||
| import io | ||
| import sys | ||
| import unittest | ||
| from unittest.mock import Mock, patch | ||
|
|
||
| import unbound | ||
|
|
||
|
|
||
| class TestMainBrokenPipeNotReported(unittest.TestCase): | ||
| def test_broken_pipe_on_emit_is_not_reported(self): | ||
| # Empty stdin -> main() hits the first _emit; make that _emit raise a | ||
| # broken pipe. main() must catch it, NOT log, NOT report to gateway. | ||
| with patch.object(unbound, "_emit", side_effect=BrokenPipeError(32, "Broken pipe")), \ | ||
| patch.object(unbound, "report_error_to_gateway", Mock()) as report, \ | ||
| patch.object(unbound, "log_error", Mock()) as log, \ | ||
| patch.object(unbound, "get_api_key", lambda: "K"), \ | ||
| patch.object(unbound.sys, "stdin", io.StringIO("")): | ||
| try: | ||
| unbound.main() | ||
| except BrokenPipeError: | ||
| self.fail("main() let BrokenPipeError escape") | ||
| self.assertEqual(report.call_count, 0) | ||
| self.assertEqual(log.call_count, 0) | ||
|
|
||
|
|
||
| class TestMainRealExceptionStillReported(unittest.TestCase): | ||
| def test_real_exception_is_reported(self): | ||
| # A Stop event reaches append_to_audit_log; make that collaborator raise | ||
| # a genuine error. main() must report it via log_error with category | ||
| # 'general' and a message containing the original text. | ||
| event = '{"hook_event_name": "Stop", "session_id": "s"}' | ||
| with patch.object(unbound, "log_error", Mock()) as log, \ | ||
| patch.object(unbound, "append_to_audit_log", side_effect=RuntimeError("boom")), \ | ||
| patch.object(unbound, "get_api_key", lambda: "K"), \ | ||
| patch.object(unbound.sys, "stdin", io.StringIO(event)), \ | ||
| patch.object(unbound.sys, "stdout", io.StringIO()): | ||
| unbound.main() | ||
| self.assertEqual(log.call_count, 1) | ||
| args, _ = log.call_args | ||
| self.assertIn("Exception in main: boom", args[0]) | ||
| self.assertEqual(args[1], "general") | ||
|
|
||
|
|
||
| class TestEmitDeadPipe(unittest.TestCase): | ||
| def setUp(self): | ||
| self._real_stdout = sys.stdout | ||
|
|
||
| def tearDown(self): | ||
| swapped = unbound.sys.stdout | ||
| sys.stdout = self._real_stdout | ||
| if swapped is not self._real_stdout: | ||
| try: | ||
| swapped.close() | ||
| except Exception: | ||
| pass | ||
|
|
||
| def test_emit_to_dead_pipe_does_not_raise(self): | ||
| class DeadPipe: | ||
| def write(self, _): | ||
| raise BrokenPipeError(32, "Broken pipe") | ||
|
|
||
| def flush(self): | ||
| raise BrokenPipeError(32, "Broken pipe") | ||
|
|
||
| unbound.sys.stdout = DeadPipe() | ||
| try: | ||
| unbound._emit("{}") | ||
| except Exception as e: | ||
| self.fail(f"_emit raised on dead pipe: {e!r}") | ||
| # _emit swapped stdout to a working sink, so a second emit is also safe. | ||
| self.assertNotIsInstance(unbound.sys.stdout, DeadPipe) | ||
| try: | ||
| unbound._emit("{}") | ||
| except Exception as e: | ||
| self.fail(f"second _emit raised after swap: {e!r}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -150,6 +150,20 @@ def log_error(message: str, category: str = 'general'): | |
| report_error_to_gateway(message, category, _cached_api_key) | ||
|
|
||
|
|
||
| def _emit(text): | ||
| """Write a hook response line to stdout, treating a closed reader pipe as | ||
| a benign no-op. The host may close the read end (timeout, cancel, session | ||
| end, blocked approval-poll) before we flush — that is not a hook error.""" | ||
| try: | ||
| sys.stdout.write(text + "\n") | ||
| sys.stdout.flush() | ||
| except (BrokenPipeError, OSError): | ||
| try: | ||
| sys.stdout = open(os.devnull, "w") | ||
| except Exception: | ||
| pass | ||
|
|
||
|
|
||
| def _read_policy_cache_raw() -> Optional[Dict]: | ||
| """Read and JSON-parse the policy cache file. Returns None on missing/corrupt.""" | ||
| try: | ||
|
|
@@ -1659,13 +1673,13 @@ def main(): | |
| input_data = sys.stdin.read().strip() | ||
|
|
||
| if not input_data: | ||
| print('{"suppressOutput": true}', flush=True) | ||
| _emit('{"suppressOutput": true}') | ||
| return | ||
|
|
||
| try: | ||
| event = json.loads(input_data) | ||
| except json.JSONDecodeError: | ||
| print('{"suppressOutput": true}', flush=True) | ||
| _emit('{"suppressOutput": true}') | ||
| return | ||
|
|
||
| hook_event_name = event.get('hook_event_name') | ||
|
|
@@ -1676,15 +1690,15 @@ def main(): | |
| _device_serial() # warm the (slow) serial probe + cache once per session | ||
| _check_self_update() | ||
| _dispatch_discovery() | ||
| print("{}") | ||
| _emit("{}") | ||
| return | ||
| session_id = event.get('session_id') | ||
|
|
||
| # Handle PreToolUse - return immediately after decision is made | ||
| if hook_event_name == 'PreToolUse': | ||
| response = process_pre_tool_use(event, api_key) | ||
| response["suppressOutput"] = True | ||
| print(json.dumps(response), flush=True) | ||
| _emit(json.dumps(response)) | ||
| return | ||
|
|
||
| # Handle UserPromptSubmit - check policy before processing | ||
|
|
@@ -1699,7 +1713,7 @@ def main(): | |
| 'event': event | ||
| }) | ||
| response["suppressOutput"] = True | ||
| print(json.dumps(response), flush=True) | ||
| _emit(json.dumps(response)) | ||
| return | ||
|
|
||
| # If allowed, continue to log the event (output printed at end) | ||
|
|
@@ -1718,12 +1732,19 @@ def main(): | |
|
|
||
| cleanup_old_logs() | ||
|
|
||
| print('{"suppressOutput": true}', flush=True) | ||
|
|
||
| _emit('{"suppressOutput": true}') | ||
|
|
||
| except BrokenPipeError: | ||
| # Host closed the read end of our stdout pipe (timeout / cancel / | ||
| # session end / blocked approval-poll). Benign — do not self-report. | ||
| try: | ||
| sys.stdout = open(os.devnull, "w") | ||
| except Exception: | ||
| pass | ||
|
Comment on lines
+1737
to
+1743
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The |
||
| except Exception as e: | ||
| # Still return empty JSON object to Claude Code to indicate completion | ||
| log_error(f"Exception in main: {str(e)}", 'general') | ||
| print('{"suppressOutput": true}', flush=True) | ||
| _emit('{"suppressOutput": true}') | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| """ | ||
| Tests for AI-GATEWAY-3J: a benign BrokenPipeError raised while emitting the | ||
| hook response (host closed the read end of stdout) must NOT be self-reported, | ||
| while genuine exceptions must still be reported. Also verifies _emit itself | ||
| swallows a dead pipe instead of crashing. | ||
| """ | ||
|
|
||
| import io | ||
| import sys | ||
| import unittest | ||
| from unittest.mock import Mock, patch | ||
|
|
||
| import unbound | ||
|
|
||
|
|
||
| class TestMainBrokenPipeNotReported(unittest.TestCase): | ||
| def test_broken_pipe_on_emit_is_not_reported(self): | ||
| # Empty stdin -> main() hits the first _emit; make that _emit raise a | ||
| # broken pipe. main() must catch it, NOT log, NOT report to gateway. | ||
| with patch.object(unbound, "_emit", side_effect=BrokenPipeError(32, "Broken pipe")), \ | ||
| patch.object(unbound, "report_error_to_gateway", Mock()) as report, \ | ||
| patch.object(unbound, "log_error", Mock()) as log, \ | ||
| patch.object(unbound.sys, "stdin", io.StringIO("")): | ||
| try: | ||
| unbound.main() | ||
| except BrokenPipeError: | ||
| self.fail("main() let BrokenPipeError escape") | ||
| self.assertEqual(report.call_count, 0) | ||
| self.assertEqual(log.call_count, 0) | ||
|
|
||
|
|
||
| class TestMainRealExceptionStillReported(unittest.TestCase): | ||
| def test_real_exception_is_reported(self): | ||
| # A Stop event reaches append_to_audit_log; make that collaborator raise | ||
| # a genuine error. main() must report it via log_error with category | ||
| # 'general' and a message containing the original text. | ||
| event = '{"hook_event_name": "Stop", "session_id": "s"}' | ||
| with patch.object(unbound, "log_error", Mock()) as log, \ | ||
| patch.object(unbound, "append_to_audit_log", side_effect=RuntimeError("boom")), \ | ||
| patch.object(unbound.sys, "stdin", io.StringIO(event)), \ | ||
| patch.object(unbound.sys, "stdout", io.StringIO()): | ||
| unbound.main() | ||
| self.assertEqual(log.call_count, 1) | ||
| args, _ = log.call_args | ||
| self.assertIn("Exception in main: boom", args[0]) | ||
| self.assertEqual(args[1], "general") | ||
|
|
||
|
|
||
| class TestEmitDeadPipe(unittest.TestCase): | ||
| def setUp(self): | ||
| self._real_stdout = sys.stdout | ||
|
|
||
| def tearDown(self): | ||
| swapped = unbound.sys.stdout | ||
| sys.stdout = self._real_stdout | ||
| if swapped is not self._real_stdout: | ||
| try: | ||
| swapped.close() | ||
| except Exception: | ||
| pass | ||
|
|
||
| def test_emit_to_dead_pipe_does_not_raise(self): | ||
| class DeadPipe: | ||
| def write(self, _): | ||
| raise BrokenPipeError(32, "Broken pipe") | ||
|
|
||
| def flush(self): | ||
| raise BrokenPipeError(32, "Broken pipe") | ||
|
|
||
| unbound.sys.stdout = DeadPipe() | ||
| try: | ||
| unbound._emit("{}") | ||
| except Exception as e: | ||
| self.fail(f"_emit raised on dead pipe: {e!r}") | ||
| # _emit swapped stdout to a working sink, so a second emit is also safe. | ||
| self.assertNotIsInstance(unbound.sys.stdout, DeadPipe) | ||
| try: | ||
| unbound._emit("{}") | ||
| except Exception as e: | ||
| self.fail(f"second _emit raised after swap: {e!r}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OSErrorswallow is broader than intended_emitcatches(BrokenPipeError, OSError), butBrokenPipeErroris already a subclass ofOSError, so the tuple is redundant and the bareOSErrorcatch covers every OS-level I/O error on stdout — not just closed-pipe scenarios. A non-pipeOSError(e.g.,ENOSPCon a redirected stdout, a permission error on a named pipe used for IPC) would be silently swallowed, stdout swapped to/dev/null, and no error reported anywhere. The same pattern is duplicated incodex/hooks/unbound.py,copilot/hooks/unbound.py, andcursor/unbound.py.