From 8e1060dc0783775c551eb1e7cc20f5abb0a2632f Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:35:15 -0700 Subject: [PATCH 1/8] AIP-97: add FailureDetails primitive for infrastructure-side failure context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for AIP-97 (Infrastructure-Aware Task Execution). Today the on_task_instance_failed listener only sees the worker-side error exception; failure causes that originate outside the worker process — OOMKilled / PodEvicted on Kubernetes, WorkerLost / SoftTimeLimit on Celery, oom-killer SIGKILL on the local executor, preemption / eviction on resource-managed clusters — are visible only to the executor and never reach the listener. This PR adds the executor-agnostic FailureDetails type (executor_kind, infra_reason, infra_metadata) and extends the on_task_instance_failed hookspec to accept it as an optional keyword argument. Per-executor wiring is intentionally deferred to follow-up PRs so each executor's surfacing PR can iterate against a fixed contract. pluggy dispatches by parameter name, so existing hookimpls that don't declare failure_details keep working unchanged. Tests cover the FailureDetails dataclass surface (parametrized over five realistic executor / reason / metadata shapes, default factory empty dict, frozen immutability) and verify both the new listener-receives-failure-details path and back-compat for legacy listeners that don't declare the parameter. --- ...p97-failure-details-foundation.feature.rst | 41 ++++++ airflow-core/src/airflow/listeners/types.py | 33 ++++- .../unit/listeners/test_listener_types.py | 117 ++++++++++++++++++ .../listeners/spec/taskinstance.py | 19 ++- 4 files changed, 208 insertions(+), 2 deletions(-) create mode 100644 airflow-core/newsfragments/aip97-failure-details-foundation.feature.rst create mode 100644 airflow-core/tests/unit/listeners/test_listener_types.py diff --git a/airflow-core/newsfragments/aip97-failure-details-foundation.feature.rst b/airflow-core/newsfragments/aip97-failure-details-foundation.feature.rst new file mode 100644 index 0000000000000..bc219d880a2bb --- /dev/null +++ b/airflow-core/newsfragments/aip97-failure-details-foundation.feature.rst @@ -0,0 +1,41 @@ +AIP-97 (Infrastructure-Aware Task Execution) foundation — adds the +``FailureDetails`` value object and extends the ``on_task_instance_failed`` +listener hook to accept it as an optional ``failure_details`` keyword +argument. + +Today the listener only sees the worker-side ``error`` exception; failure +causes that originate outside the worker process — pod ``OOMKilled`` / +``PodEvicted`` on Kubernetes, ``WorkerLost`` / ``SoftTimeLimit`` on Celery, +oom-killer ``SIGKILL`` on the local executor, preemption / eviction on +resource-managed clusters — are visible only to the executor and never +reach the listener. + +``FailureDetails`` is the executor-agnostic shape every executor populates +when surfacing infrastructure failure causes. The ``executor_kind`` is the +canonical executor name; ``infra_reason`` is the executor's short +categorical token; ``infra_metadata`` carries any structured payload the +executor wants to ship. + +.. code-block:: python + + from airflow.listeners import hookimpl + from airflow.listeners.types import FailureDetails + + class InfraTrackingListener: + @hookimpl + def on_task_instance_failed( + self, + previous_state, + task_instance, + error, + failure_details: FailureDetails | None = None, + ): + if failure_details and failure_details.infra_reason == "OOMKilled": + ... # route to capacity-planning alert + +This change ships only the type and the hookspec extension. Per-executor +wiring (Kubernetes, Celery, local, etc.) is intentionally deferred to +follow-up PRs so each executor's surfacing PR can iterate against a fixed +contract. pluggy dispatches by parameter name, so existing ``hookimpl`` +implementations that don't declare ``failure_details`` keep working +unchanged. diff --git a/airflow-core/src/airflow/listeners/types.py b/airflow-core/src/airflow/listeners/types.py index 120b8ef503a6e..96df51495a2a1 100644 --- a/airflow-core/src/airflow/listeners/types.py +++ b/airflow-core/src/airflow/listeners/types.py @@ -18,7 +18,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import attrs @@ -40,3 +40,34 @@ class AssetEvent: source_map_index: int | None source_aliases: list[SerializedAssetAlias] partition_key: str | None + + +@attrs.define(frozen=True) +class FailureDetails: + """ + Structured infrastructure-side failure context for ``on_task_instance_failed``. + + Foundation for AIP-97 (Infrastructure-Aware Task Execution). Today the + listener only sees the worker-side ``error`` exception; failure causes + that originate outside the worker process — pod ``OOMKilled`` / + ``PodEvicted`` on Kubernetes, ``WorkerLost`` / ``SoftTimeLimit`` on + Celery, oom-killer ``SIGKILL`` on the local executor, preemption / + eviction on resource-managed clusters — are visible only to the + executor and never reach the listener. + + This type is the executor-agnostic shape every executor populates when + surfacing infrastructure failure causes. The ``executor_kind`` is the + canonical executor name (``"kubernetes"``, ``"celery"``, + ``"local_executor"``, etc.); ``infra_reason`` is the executor's + short categorical token; ``infra_metadata`` carries any structured + payload the executor wants to ship (pod status conditions, exit + codes, evicted-by reasons, queue names, etc.). + + Wiring per executor is out of scope for the foundation: this type + only defines the shape so each executor's surfacing PR can iterate + against a fixed contract. + """ + + executor_kind: str + infra_reason: str | None = None + infra_metadata: dict[str, Any] = attrs.field(factory=dict) diff --git a/airflow-core/tests/unit/listeners/test_listener_types.py b/airflow-core/tests/unit/listeners/test_listener_types.py new file mode 100644 index 0000000000000..4cb4e6a5156f2 --- /dev/null +++ b/airflow-core/tests/unit/listeners/test_listener_types.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.listeners import hookimpl +from airflow.listeners.types import FailureDetails +from airflow.plugins_manager import get_listener_manager + + +class TestFailureDetails: + """AIP-97 foundation primitive: structured executor-side failure context.""" + + @pytest.mark.parametrize( + ("executor_kind", "infra_reason", "infra_metadata"), + [ + pytest.param("kubernetes", "OOMKilled", {"exit_code": 137}, id="k8s-oom"), + pytest.param("kubernetes", "PodEvicted", {"node": "n-3", "evicted_by": "node-pressure"}, id="k8s-evicted"), + pytest.param("celery", "WorkerLost", {"worker_pid": 4321}, id="celery-worker-lost"), + pytest.param("local_executor", "SIGKILL", {"signal": 9}, id="local-sigkill"), + pytest.param("kubernetes", None, {}, id="unknown-reason"), + ], + ) + def test_construct(self, executor_kind, infra_reason, infra_metadata): + details = FailureDetails( + executor_kind=executor_kind, + infra_reason=infra_reason, + infra_metadata=infra_metadata, + ) + assert details.executor_kind == executor_kind + assert details.infra_reason == infra_reason + assert details.infra_metadata == infra_metadata + + def test_default_metadata_is_empty_dict(self): + details = FailureDetails(executor_kind="kubernetes", infra_reason="OOMKilled") + assert details.infra_metadata == {} + + def test_frozen(self): + """FailureDetails is immutable so listeners can safely cache it.""" + import attrs + + details = FailureDetails(executor_kind="kubernetes") + with pytest.raises(attrs.exceptions.FrozenInstanceError): + details.executor_kind = "celery" # type: ignore[misc] + + +class TestOnTaskInstanceFailedAcceptsFailureDetails: + """The on_task_instance_failed hookspec accepts the optional + ``failure_details`` argument so listener authors can opt in to + receiving infrastructure-side failure context as executors begin + populating it.""" + + def test_listener_with_failure_details_receives_it(self, listener_manager): + received: dict[str, FailureDetails | None] = {"failure_details": None} + + class InfraListener: + @hookimpl + def on_task_instance_failed( + self, + previous_state, + task_instance, + error, + failure_details: FailureDetails | None = None, + ): + received["failure_details"] = failure_details + + listener_manager(InfraListener()) + + details = FailureDetails( + executor_kind="kubernetes", + infra_reason="OOMKilled", + infra_metadata={"exit_code": 137}, + ) + get_listener_manager().hook.on_task_instance_failed( + previous_state=None, + task_instance=None, + error=RuntimeError("boom"), + failure_details=details, + ) + + assert received["failure_details"] == details + + def test_listener_without_failure_details_param_keeps_working(self, listener_manager): + """Pluggy dispatches by parameter name, so existing hookimpls that + don't declare ``failure_details`` continue to work unchanged.""" + called = {"count": 0} + + class LegacyListener: + @hookimpl + def on_task_instance_failed(self, previous_state, task_instance, error): + called["count"] += 1 + + listener_manager(LegacyListener()) + + get_listener_manager().hook.on_task_instance_failed( + previous_state=None, + task_instance=None, + error=RuntimeError("boom"), + failure_details=FailureDetails(executor_kind="kubernetes"), + ) + + assert called["count"] == 1 diff --git a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py index d3450d6b05aa7..3ab2cc7452172 100644 --- a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py +++ b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py @@ -24,6 +24,7 @@ if TYPE_CHECKING: # These imports are for type checking only - no runtime dependency + from airflow.listeners.types import FailureDetails from airflow.models.taskinstance import TaskInstance from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance from airflow.utils.state import TaskInstanceState @@ -52,8 +53,24 @@ def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance, error: None | str | BaseException, + failure_details: FailureDetails | None = None, ): - """Execute when task state changes to FAIL. previous_state can be None.""" + """ + Execute when task state changes to FAIL. previous_state can be None. + + :param previous_state: Previous state of the task instance (can be None) + :param task_instance: The task instance object + :param error: The exception that caused the failure (or human-readable + message string for API-driven manual transitions) + :param failure_details: AIP-97 (Infrastructure-Aware Task Execution) + foundation. Structured infrastructure-side failure context populated + by the executor when the failure cause originates outside the worker + process — e.g. ``OOMKilled`` / ``PodEvicted`` on Kubernetes, + ``WorkerLost`` on Celery. ``None`` when no executor-side cause is + available. Pluggy dispatches by parameter name, so existing + ``hookimpl`` implementations that don't declare ``failure_details`` + keep working unchanged. + """ @hookspec From 8c53810832c0069b2dd5c488ea21a6eb21839115 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:36:15 -0700 Subject: [PATCH 2/8] Rename newsfragment to match PR number --- ...undation.feature.rst => 66405.feature.rst} | 0 .../unit/listeners/test_listener_types.py | 5 +++- .../listeners/spec/taskinstance.py | 26 +++++++++++++------ 3 files changed, 22 insertions(+), 9 deletions(-) rename airflow-core/newsfragments/{aip97-failure-details-foundation.feature.rst => 66405.feature.rst} (100%) diff --git a/airflow-core/newsfragments/aip97-failure-details-foundation.feature.rst b/airflow-core/newsfragments/66405.feature.rst similarity index 100% rename from airflow-core/newsfragments/aip97-failure-details-foundation.feature.rst rename to airflow-core/newsfragments/66405.feature.rst diff --git a/airflow-core/tests/unit/listeners/test_listener_types.py b/airflow-core/tests/unit/listeners/test_listener_types.py index 4cb4e6a5156f2..e38bc714d39c8 100644 --- a/airflow-core/tests/unit/listeners/test_listener_types.py +++ b/airflow-core/tests/unit/listeners/test_listener_types.py @@ -68,6 +68,9 @@ class TestOnTaskInstanceFailedAcceptsFailureDetails: def test_listener_with_failure_details_receives_it(self, listener_manager): received: dict[str, FailureDetails | None] = {"failure_details": None} + # Per the hookspec docstring, listener implementations must declare + # failure_details WITHOUT a default value — pluggy treats the impl + # default as authoritative and silently overrides the caller's value. class InfraListener: @hookimpl def on_task_instance_failed( @@ -75,7 +78,7 @@ def on_task_instance_failed( previous_state, task_instance, error, - failure_details: FailureDetails | None = None, + failure_details, ): received["failure_details"] = failure_details diff --git a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py index 3ab2cc7452172..0b8a4602bf7ce 100644 --- a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py +++ b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py @@ -53,7 +53,7 @@ def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance, error: None | str | BaseException, - failure_details: FailureDetails | None = None, + failure_details: FailureDetails | None, ): """ Execute when task state changes to FAIL. previous_state can be None. @@ -62,14 +62,24 @@ def on_task_instance_failed( :param task_instance: The task instance object :param error: The exception that caused the failure (or human-readable message string for API-driven manual transitions) - :param failure_details: AIP-97 (Infrastructure-Aware Task Execution) - foundation. Structured infrastructure-side failure context populated - by the executor when the failure cause originates outside the worker - process — e.g. ``OOMKilled`` / ``PodEvicted`` on Kubernetes, + :param failure_details: Structured infrastructure-side failure context + populated by the executor when the failure cause originates outside + the worker process — e.g. ``OOMKilled`` / ``PodEvicted`` on Kubernetes, ``WorkerLost`` on Celery. ``None`` when no executor-side cause is - available. Pluggy dispatches by parameter name, so existing - ``hookimpl`` implementations that don't declare ``failure_details`` - keep working unchanged. + available. + + Pluggy dispatches by parameter name, so existing ``hookimpl`` + implementations that don't declare ``failure_details`` keep working + unchanged. Implementations that DO declare it must not assign a + default value (``failure_details=None``) — pluggy treats the + impl-side default as authoritative and silently overrides the value + the caller passed. Declare it without a default:: + + @hookimpl + def on_task_instance_failed( + self, previous_state, task_instance, error, failure_details + ): + ... """ From 9a51cbbe60a756acdeb70f693c46839a919d3ca5 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 11:58:29 -0700 Subject: [PATCH 3/8] fix: pass failure_details=None at every on_task_instance_failed call site Hookspec extension required all callers to provide the kwarg explicitly. Real e2e against airflow standalone surfaced HookCallError 'hook call must provide argument failure_details' on the existing call sites in task_runner.py, taskinstance.py, and the API server's manual-set path. Pass None until each executor's wiring PR populates it. --- .../core_api/services/public/task_instances.py | 1 + airflow-core/src/airflow/models/taskinstance.py | 5 ++++- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 10 ++++++++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 65e8260c2f47a..68bc04414281e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -84,6 +84,7 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta previous_state=None, task_instance=ti, error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", + failure_details=None, ) elif new_state == TaskInstanceState.SKIPPED: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index e5b19f2768da9..e21428f6b0f09 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1773,7 +1773,10 @@ def fetch_handle_failure_context( try: get_listener_manager().hook.on_task_instance_failed( - previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error + previous_state=TaskInstanceState.RUNNING, + task_instance=ti, + error=error, + failure_details=None, ) except Exception: log.exception("error calling listener") diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..5fda9544081f6 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1916,7 +1916,10 @@ def finalize( _run_task_state_change_callbacks(task, "on_retry_callback", context, log) try: get_listener_manager().hook.on_task_instance_failed( - previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error + previous_state=TaskInstanceState.RUNNING, + task_instance=ti, + error=error, + failure_details=None, ) except Exception: log.exception("error calling listener") @@ -1926,7 +1929,10 @@ def finalize( _run_task_state_change_callbacks(task, "on_failure_callback", context, log) try: get_listener_manager().hook.on_task_instance_failed( - previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error + previous_state=TaskInstanceState.RUNNING, + task_instance=ti, + error=error, + failure_details=None, ) except Exception: log.exception("error calling listener") From 98236bda5dca8fa5dd1342604bfdf64578f10ea3 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:25 -0700 Subject: [PATCH 4/8] fix: newsfragment must have empty second line (lint rule) --- airflow-core/newsfragments/66405.feature.rst | 22 +++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/airflow-core/newsfragments/66405.feature.rst b/airflow-core/newsfragments/66405.feature.rst index bc219d880a2bb..278f0b5bbf833 100644 --- a/airflow-core/newsfragments/66405.feature.rst +++ b/airflow-core/newsfragments/66405.feature.rst @@ -1,20 +1,16 @@ -AIP-97 (Infrastructure-Aware Task Execution) foundation — adds the -``FailureDetails`` value object and extends the ``on_task_instance_failed`` -listener hook to accept it as an optional ``failure_details`` keyword -argument. +Add the ``FailureDetails`` value object and extend the ``on_task_instance_failed`` listener hook to accept it as a ``failure_details`` keyword argument (AIP-97 foundation). Today the listener only sees the worker-side ``error`` exception; failure causes that originate outside the worker process — pod ``OOMKilled`` / ``PodEvicted`` on Kubernetes, ``WorkerLost`` / ``SoftTimeLimit`` on Celery, -oom-killer ``SIGKILL`` on the local executor, preemption / eviction on -resource-managed clusters — are visible only to the executor and never -reach the listener. +oom-killer ``SIGKILL`` on the local executor — are visible only to the +executor and never reach the listener. ``FailureDetails`` is the executor-agnostic shape every executor populates when surfacing infrastructure failure causes. The ``executor_kind`` is the -canonical executor name; ``infra_reason`` is the executor's short -categorical token; ``infra_metadata`` carries any structured payload the -executor wants to ship. +canonical executor name; ``infra_reason`` is the executor's short categorical +token; ``infra_metadata`` carries any structured payload the executor wants +to ship. .. code-block:: python @@ -24,11 +20,7 @@ executor wants to ship. class InfraTrackingListener: @hookimpl def on_task_instance_failed( - self, - previous_state, - task_instance, - error, - failure_details: FailureDetails | None = None, + self, previous_state, task_instance, error, failure_details ): if failure_details and failure_details.infra_reason == "OOMKilled": ... # route to capacity-planning alert From 25de4f0ac7f2fdf1eda6f8c167398364efc530ce Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:30:13 -0700 Subject: [PATCH 5/8] fix newsfragment type: feature -> significant (multi-line bodies require significant type) --- .../newsfragments/{66405.feature.rst => 66405.significant.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{66405.feature.rst => 66405.significant.rst} (100%) diff --git a/airflow-core/newsfragments/66405.feature.rst b/airflow-core/newsfragments/66405.significant.rst similarity index 100% rename from airflow-core/newsfragments/66405.feature.rst rename to airflow-core/newsfragments/66405.significant.rst From 2f07815636489397aa6b5cba8dd98453783bda59 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:32:55 -0700 Subject: [PATCH 6/8] fix: ruff format on listener test + spec module --- airflow-core/tests/unit/listeners/test_listener_types.py | 4 +++- .../src/airflow_shared/listeners/spec/taskinstance.py | 5 +---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow-core/tests/unit/listeners/test_listener_types.py b/airflow-core/tests/unit/listeners/test_listener_types.py index e38bc714d39c8..165a4b3420371 100644 --- a/airflow-core/tests/unit/listeners/test_listener_types.py +++ b/airflow-core/tests/unit/listeners/test_listener_types.py @@ -30,7 +30,9 @@ class TestFailureDetails: ("executor_kind", "infra_reason", "infra_metadata"), [ pytest.param("kubernetes", "OOMKilled", {"exit_code": 137}, id="k8s-oom"), - pytest.param("kubernetes", "PodEvicted", {"node": "n-3", "evicted_by": "node-pressure"}, id="k8s-evicted"), + pytest.param( + "kubernetes", "PodEvicted", {"node": "n-3", "evicted_by": "node-pressure"}, id="k8s-evicted" + ), pytest.param("celery", "WorkerLost", {"worker_pid": 4321}, id="celery-worker-lost"), pytest.param("local_executor", "SIGKILL", {"signal": 9}, id="local-sigkill"), pytest.param("kubernetes", None, {}, id="unknown-reason"), diff --git a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py index 0b8a4602bf7ce..4769c69cb78b5 100644 --- a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py +++ b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py @@ -76,10 +76,7 @@ def on_task_instance_failed( the caller passed. Declare it without a default:: @hookimpl - def on_task_instance_failed( - self, previous_state, task_instance, error, failure_details - ): - ... + def on_task_instance_failed(self, previous_state, task_instance, error, failure_details): ... """ From 95cd9c8b911973fb815ca32bab803c3cd32d506b Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:41:17 -0700 Subject: [PATCH 7/8] fix import: get_listener_manager lives in airflow.listeners.listener --- airflow-core/tests/unit/listeners/test_listener_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/listeners/test_listener_types.py b/airflow-core/tests/unit/listeners/test_listener_types.py index 165a4b3420371..1399f18585adb 100644 --- a/airflow-core/tests/unit/listeners/test_listener_types.py +++ b/airflow-core/tests/unit/listeners/test_listener_types.py @@ -19,8 +19,8 @@ import pytest from airflow.listeners import hookimpl +from airflow.listeners.listener import get_listener_manager from airflow.listeners.types import FailureDetails -from airflow.plugins_manager import get_listener_manager class TestFailureDetails: From 57f338ed2c1d729f8f111e7c871aa6fff40f826f Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 21:49:37 -0700 Subject: [PATCH 8/8] fix newsfragment: blacken-docs reflow --- airflow-core/newsfragments/66405.significant.rst | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow-core/newsfragments/66405.significant.rst b/airflow-core/newsfragments/66405.significant.rst index 278f0b5bbf833..16939226f96dd 100644 --- a/airflow-core/newsfragments/66405.significant.rst +++ b/airflow-core/newsfragments/66405.significant.rst @@ -17,11 +17,10 @@ to ship. from airflow.listeners import hookimpl from airflow.listeners.types import FailureDetails + class InfraTrackingListener: @hookimpl - def on_task_instance_failed( - self, previous_state, task_instance, error, failure_details - ): + def on_task_instance_failed(self, previous_state, task_instance, error, failure_details): if failure_details and failure_details.infra_reason == "OOMKilled": ... # route to capacity-planning alert