diff --git a/airflow-core/newsfragments/66399.significant.rst b/airflow-core/newsfragments/66399.significant.rst new file mode 100644 index 0000000000000..03286d79bd326 --- /dev/null +++ b/airflow-core/newsfragments/66399.significant.rst @@ -0,0 +1,11 @@ +Tighten the ``error`` argument on the ``on_task_instance_failed`` listener hook to ``BaseException | None`` (previously ``None | str | BaseException``). + +The manual-set FAILED state path on the API server now wraps its +human-readable reason in a ``RuntimeError`` instead of passing a plain string, +so listeners always receive an exception type and ``str(error)`` continues to +yield the human-readable message. + +Listener implementations that rely on ``isinstance(error, str)`` to detect +the manual-set path should switch to the ``msg`` keyword argument introduced +in #66394 (``msg == "manually_set_to_failed"``); the message text remains +accessible via ``str(error)``. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 65e8260c2f47a..4de3b43e13ae7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -83,7 +83,9 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta get_listener_manager().hook.on_task_instance_failed( previous_state=None, task_instance=ti, - error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", + error=RuntimeError( + f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`." + ), ) elif new_state == TaskInstanceState.SKIPPED: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) diff --git a/airflow-core/src/airflow/example_dags/plugins/event_listener.py b/airflow-core/src/airflow/example_dags/plugins/event_listener.py index 91af9f5ccc6df..6fc03744a8249 100644 --- a/airflow-core/src/airflow/example_dags/plugins/event_listener.py +++ b/airflow-core/src/airflow/example_dags/plugins/event_listener.py @@ -99,7 +99,7 @@ def on_task_instance_success( def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance, - error: None | str | BaseException, + error: BaseException | None, ): """ Called when task state changes to FAILED. @@ -108,7 +108,8 @@ def on_task_instance_failed( task_instance that has failed, its dag_run, task and dag information. A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered - through the API. In that case, the TaskInstance available on the API server will be provided instead. + through the API. In that case, the TaskInstance available on the API server will be provided instead, + and ``error`` is a ``RuntimeError`` carrying the human-readable reason. """ print("Task instance in failure state") diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 505a390f7bf33..f3e0d96496371 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -4411,6 +4411,11 @@ def test_patch_task_instance_notifies_listeners( assert response2.status_code == 200 assert response2.json()["state"] == state assert listener.state == listener_state + if state == "failed": + # The manual-set FAILED path wraps its human-readable reason in a + # RuntimeError so listener authors always receive an exception type. + assert isinstance(listener.last_error, RuntimeError) + assert "manually set to" in str(listener.last_error) @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.set_task_instance_state") def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): diff --git a/airflow-core/tests/unit/listeners/class_listener.py b/airflow-core/tests/unit/listeners/class_listener.py index 4b9ef3a9f26f1..587a15ee3419f 100644 --- a/airflow-core/tests/unit/listeners/class_listener.py +++ b/airflow-core/tests/unit/listeners/class_listener.py @@ -26,6 +26,7 @@ def __init__(self): self.started_component = None self.stopped_component = None self.state = [] + self.last_error: BaseException | None = None @hookimpl def on_starting(self, component): @@ -46,8 +47,9 @@ def on_task_instance_success(self, previous_state, task_instance): self.state.append(TaskInstanceState.SUCCESS) @hookimpl - def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException): + def on_task_instance_failed(self, previous_state, task_instance, error: BaseException | None): self.state.append(TaskInstanceState.FAILED) + self.last_error = error @hookimpl def on_task_instance_skipped(self, previous_state, task_instance): diff --git a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py index d3450d6b05aa7..9407c339cef9c 100644 --- a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py +++ b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py @@ -51,9 +51,19 @@ def on_task_instance_success( def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance, - error: None | str | BaseException, + error: BaseException | None, ): - """Execute when task state changes to FAIL. previous_state can be None.""" + """ + Execute when task state changes to FAIL. previous_state can be None. + + :param previous_state: Previous state of the task instance (can be None) + :param task_instance: The task instance object (RuntimeTaskInstance when called + from task execution context, TaskInstance when called from API server) + :param error: The exception that caused the failure. ``None`` only when the + cause is unavailable. Manual API-driven state transitions wrap their + human-readable reason in ``RuntimeError`` so listeners always receive an + exception type and ``str(error)`` yields the message. + """ @hookspec