ML-12776: Route failing recovery steps to the error stream instead of poisoning the source#642
Merged
Merged
Conversation
… poisoning the source When a step's recovery step (error handler) raised, `_do_and_recover` invoked it via `recovery_step._do` and let the exception propagate raw. On an explicit-ack source this latched `self._ex`, permanently poisoning the flow: the failing event was committed but all subsequent events were rejected at the emit gate and never committed. Wrap the recovery-step call so a failing error handler is routed through the same error-stream-or-raise handling as any other unrecovered error (extracted into `_handle_unrecovered_error` to avoid duplication). The recovery step is still invoked via `_do` and its failure handled inline rather than re-entering recovery, so a recovery cycle cannot loop infinitely. Add `test_async_offset_commit_with_failing_step`, parametrized over a plain failing step and a failing recovery step, asserting all events are dead-lettered and committed.
royischoss
reviewed
Jun 18, 2026
royischoss
left a comment
Collaborator
There was a problem hiding this comment.
Looks good two small comments
- _ConcurrentJobExecution._worker: a failing recovery step (error handler) is now routed to the error-stream/raise handling instead of propagating raw, which otherwise poisons an explicit-ack source the same way the main path did. Add test_concurrent_execution_failing_recovery_step. - _do_and_recover: narrow the recovery-step catch from BaseException to Exception so asyncio.CancelledError (and KeyboardInterrupt/SystemExit) propagate. - test_async_offset_commit_with_failing_step: drop the now-dead try/except around terminate and the stale "poisoned today" comment.
royischoss
reviewed
Jun 21, 2026
In _ConcurrentJobExecution._worker the dead-letter traceback was formatted after the inner `except Exception` had exited, so traceback.format_exc() returned the original exception's traceback while the payload string used the reassigned recovery exception. Capture the traceback inside the inner except (while recovery_ex is the active exception) and use it for the error-stream payload. Extend test_concurrent_execution_failing_recovery_step to assert the dead-letter message carries the recovery step's traceback.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
When a step has a recovery step (error handler) and that error handler itself raises,
Flow._do_and_recoverinvoked the recovery step viarecovery_step._do(event)and let the exception propagate raw. On an explicit-ack source this latchedself._ex, permanently poisoning the flow: only the failing event's offset was committed, and every subsequent event was rejected at the emit gate and never processed or committed (the stream stalls).Fix
Wrap the recovery-step call so a failing error handler is routed through the same error-stream-or-raise handling as any other unrecovered error, extracted into a shared
_handle_unrecovered_errorhelper (no duplication).The recovery step is still invoked via
_do(not_do_and_recover) and its failure is handled inline rather than re-entering recovery — so a recovery cycle (e.g. a graph-wide handler that is its own recovery step) cannot loop infinitely.Note: failed events are committed only when the context exposes
push_error(the mlrun/nuclio setup).Tests
Added
tests/test_flow.py::test_async_offset_commit_with_failing_step, parametrized over a plain failing step and a failing recovery step, asserting all events are dead-lettered and committed.Jira
ML-12776 (fix version 1.12.0)