From f9575479969394cae56e98ab9e768eb291ec554d Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:21:02 -0700 Subject: [PATCH 1/3] Tighten on_task_instance_failed error type to BaseException | None The ``error`` arg on ``on_task_instance_failed`` was typed ``None | str | BaseException``. The string variant only ever appeared on the manual-set FAILED state path on the API server, where the call site passed a hard-coded human-readable message. Listener implementations had to ``isinstance(error, str)`` to detect that path even though ``str(error)`` worked uniformly across both branches. This change wraps the manual-set message in a ``RuntimeError`` at the call site, tightens the hookspec type to ``BaseException | None``, and updates the example listener and test fixture accordingly. Listeners now always receive an exception type with ``str(error)`` carrying the message; the ``msg`` arg added in #66394 (``msg == "manually_set_to_failed"``) remains the canonical signal for "this came from the API path". Backwards compatibility: listeners relying on ``isinstance(error, str)`` will need to read the message via ``str(error)`` and route on ``msg``. --- .../listener-failed-error-type.significant.rst | 11 +++++++++++ .../core_api/services/public/task_instances.py | 4 +++- .../airflow/example_dags/plugins/event_listener.py | 5 +++-- .../core_api/routes/public/test_task_instances.py | 5 +++++ .../tests/unit/listeners/class_listener.py | 4 +++- .../airflow_shared/listeners/spec/taskinstance.py | 14 ++++++++++++-- 6 files changed, 37 insertions(+), 6 deletions(-) create mode 100644 airflow-core/newsfragments/listener-failed-error-type.significant.rst diff --git a/airflow-core/newsfragments/listener-failed-error-type.significant.rst b/airflow-core/newsfragments/listener-failed-error-type.significant.rst new file mode 100644 index 0000000000000..fe27670cff610 --- /dev/null +++ b/airflow-core/newsfragments/listener-failed-error-type.significant.rst @@ -0,0 +1,11 @@ +The ``error`` argument on the ``on_task_instance_failed`` listener hook is now +typed as ``BaseException | None`` (previously ``None | str | BaseException``). +The manual-set FAILED state path on the API server now wraps its human-readable +reason in a ``RuntimeError`` instead of passing a plain string, so listeners +always receive an exception type and ``str(error)`` continues to yield the +human-readable message. + +Listener implementations that rely on ``isinstance(error, str)`` to detect +the manual-set path should switch to the ``msg`` keyword argument introduced +in #66394 (``msg == "manually_set_to_failed"``); the message text remains +accessible via ``str(error)``. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 65e8260c2f47a..4de3b43e13ae7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -83,7 +83,9 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta get_listener_manager().hook.on_task_instance_failed( previous_state=None, task_instance=ti, - error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", + error=RuntimeError( + f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`." + ), ) elif new_state == TaskInstanceState.SKIPPED: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) diff --git a/airflow-core/src/airflow/example_dags/plugins/event_listener.py b/airflow-core/src/airflow/example_dags/plugins/event_listener.py index 91af9f5ccc6df..6fc03744a8249 100644 --- a/airflow-core/src/airflow/example_dags/plugins/event_listener.py +++ b/airflow-core/src/airflow/example_dags/plugins/event_listener.py @@ -99,7 +99,7 @@ def on_task_instance_success( def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance, - error: None | str | BaseException, + error: BaseException | None, ): """ Called when task state changes to FAILED. @@ -108,7 +108,8 @@ def on_task_instance_failed( task_instance that has failed, its dag_run, task and dag information. A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered - through the API. In that case, the TaskInstance available on the API server will be provided instead. + through the API. In that case, the TaskInstance available on the API server will be provided instead, + and ``error`` is a ``RuntimeError`` carrying the human-readable reason. """ print("Task instance in failure state") diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 505a390f7bf33..f3e0d96496371 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -4411,6 +4411,11 @@ def test_patch_task_instance_notifies_listeners( assert response2.status_code == 200 assert response2.json()["state"] == state assert listener.state == listener_state + if state == "failed": + # The manual-set FAILED path wraps its human-readable reason in a + # RuntimeError so listener authors always receive an exception type. + assert isinstance(listener.last_error, RuntimeError) + assert "manually set to" in str(listener.last_error) @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.set_task_instance_state") def test_should_call_mocked_api(self, mock_set_ti_state, test_client, session): diff --git a/airflow-core/tests/unit/listeners/class_listener.py b/airflow-core/tests/unit/listeners/class_listener.py index 4b9ef3a9f26f1..587a15ee3419f 100644 --- a/airflow-core/tests/unit/listeners/class_listener.py +++ b/airflow-core/tests/unit/listeners/class_listener.py @@ -26,6 +26,7 @@ def __init__(self): self.started_component = None self.stopped_component = None self.state = [] + self.last_error: BaseException | None = None @hookimpl def on_starting(self, component): @@ -46,8 +47,9 @@ def on_task_instance_success(self, previous_state, task_instance): self.state.append(TaskInstanceState.SUCCESS) @hookimpl - def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException): + def on_task_instance_failed(self, previous_state, task_instance, error: BaseException | None): self.state.append(TaskInstanceState.FAILED) + self.last_error = error @hookimpl def on_task_instance_skipped(self, previous_state, task_instance): diff --git a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py index d3450d6b05aa7..9407c339cef9c 100644 --- a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py +++ b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py @@ -51,9 +51,19 @@ def on_task_instance_success( def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance, - error: None | str | BaseException, + error: BaseException | None, ): - """Execute when task state changes to FAIL. previous_state can be None.""" + """ + Execute when task state changes to FAIL. previous_state can be None. + + :param previous_state: Previous state of the task instance (can be None) + :param task_instance: The task instance object (RuntimeTaskInstance when called + from task execution context, TaskInstance when called from API server) + :param error: The exception that caused the failure. ``None`` only when the + cause is unavailable. Manual API-driven state transitions wrap their + human-readable reason in ``RuntimeError`` so listeners always receive an + exception type and ``str(error)`` yields the message. + """ @hookspec From 55db8e34af69080b3d65c4b66bb055ea4ab7e42e Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:22:06 -0700 Subject: [PATCH 2/3] Rename newsfragment to match PR number --- ...er-failed-error-type.significant.rst => 66399.significant.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{listener-failed-error-type.significant.rst => 66399.significant.rst} (100%) diff --git a/airflow-core/newsfragments/listener-failed-error-type.significant.rst b/airflow-core/newsfragments/66399.significant.rst similarity index 100% rename from airflow-core/newsfragments/listener-failed-error-type.significant.rst rename to airflow-core/newsfragments/66399.significant.rst From 2083cf49bd1e24d3eaa2fe6b21a8f56e4f4e7ac6 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:20 -0700 Subject: [PATCH 3/3] fix: newsfragment must have empty second line (lint rule) --- airflow-core/newsfragments/66399.significant.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow-core/newsfragments/66399.significant.rst b/airflow-core/newsfragments/66399.significant.rst index fe27670cff610..03286d79bd326 100644 --- a/airflow-core/newsfragments/66399.significant.rst +++ b/airflow-core/newsfragments/66399.significant.rst @@ -1,9 +1,9 @@ -The ``error`` argument on the ``on_task_instance_failed`` listener hook is now -typed as ``BaseException | None`` (previously ``None | str | BaseException``). -The manual-set FAILED state path on the API server now wraps its human-readable -reason in a ``RuntimeError`` instead of passing a plain string, so listeners -always receive an exception type and ``str(error)`` continues to yield the -human-readable message. +Tighten the ``error`` argument on the ``on_task_instance_failed`` listener hook to ``BaseException | None`` (previously ``None | str | BaseException``). + +The manual-set FAILED state path on the API server now wraps its +human-readable reason in a ``RuntimeError`` instead of passing a plain string, +so listeners always receive an exception type and ``str(error)`` continues to +yield the human-readable message. Listener implementations that rely on ``isinstance(error, str)`` to detect the manual-set path should switch to the ``msg`` keyword argument introduced