Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airflow-core/newsfragments/66399.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Tighten the ``error`` argument on the ``on_task_instance_failed`` listener hook to ``BaseException | None`` (previously ``None | str | BaseException``).

The manual-set FAILED state path on the API server now wraps its
human-readable reason in a ``RuntimeError`` instead of passing a plain string,
so listeners always receive an exception type and ``str(error)`` continues to
yield the human-readable message.

Listener implementations that rely on ``isinstance(error, str)`` to detect
the manual-set path should switch to the ``msg`` keyword argument introduced
in #66394 (``msg == "manually_set_to_failed"``); the message text remains
accessible via ``str(error)``.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta
get_listener_manager().hook.on_task_instance_failed(
previous_state=None,
task_instance=ti,
error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.",
error=RuntimeError(
f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`."
),
)
elif new_state == TaskInstanceState.SKIPPED:
get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def on_task_instance_success(
def on_task_instance_failed(
previous_state: TaskInstanceState | None,
task_instance: RuntimeTaskInstance | TaskInstance,
error: None | str | BaseException,
error: BaseException | None,
):
"""
Called when task state changes to FAILED.
Expand All @@ -108,7 +108,8 @@ def on_task_instance_failed(
task_instance that has failed, its dag_run, task and dag information.

A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
through the API. In that case, the TaskInstance available on the API server will be provided instead.
through the API. In that case, the TaskInstance available on the API server will be provided instead,
and ``error`` is a ``RuntimeError`` carrying the human-readable reason.
"""
print("Task instance in failure state")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4411,6 +4411,11 @@ def test_patch_task_instance_notifies_listeners(
assert response2.status_code == 200
assert response2.json()["state"] == state
assert listener.state == listener_state
if state == "failed":
# The manual-set FAILED path wraps its human-readable reason in a
# RuntimeError so listener authors always receive an exception type.
assert isinstance(listener.last_error, RuntimeError)
assert "manually set to" in str(listener.last_error)

@mock.patch("airflow.serialization.definitions.dag.SerializedDAG.set_task_instance_state")
def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session):
Expand Down
4 changes: 3 additions & 1 deletion airflow-core/tests/unit/listeners/class_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self):
self.started_component = None
self.stopped_component = None
self.state = []
self.last_error: BaseException | None = None

@hookimpl
def on_starting(self, component):
Expand All @@ -46,8 +47,9 @@ def on_task_instance_success(self, previous_state, task_instance):
self.state.append(TaskInstanceState.SUCCESS)

@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException):
def on_task_instance_failed(self, previous_state, task_instance, error: BaseException | None):
self.state.append(TaskInstanceState.FAILED)
self.last_error = error

@hookimpl
def on_task_instance_skipped(self, previous_state, task_instance):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,19 @@ def on_task_instance_success(
def on_task_instance_failed(
previous_state: TaskInstanceState | None,
task_instance: RuntimeTaskInstance | TaskInstance,
error: None | str | BaseException,
error: BaseException | None,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
"""
Execute when task state changes to FAIL. previous_state can be None.

:param previous_state: Previous state of the task instance (can be None)
:param task_instance: The task instance object (RuntimeTaskInstance when called
from task execution context, TaskInstance when called from API server)
:param error: The exception that caused the failure. ``None`` only when the
cause is unavailable. Manual API-driven state transitions wrap their
human-readable reason in ``RuntimeError`` so listeners always receive an
exception type and ``str(error)`` yields the message.
"""


@hookspec
Expand Down
Loading