diff --git a/airflow-core/newsfragments/66405.significant.rst b/airflow-core/newsfragments/66405.significant.rst new file mode 100644 index 0000000000000..16939226f96dd --- /dev/null +++ b/airflow-core/newsfragments/66405.significant.rst @@ -0,0 +1,32 @@ +Add the ``FailureDetails`` value object and extend the ``on_task_instance_failed`` listener hook to accept it as a ``failure_details`` keyword argument (AIP-97 foundation). + +Today the listener only sees the worker-side ``error`` exception; failure +causes that originate outside the worker process — pod ``OOMKilled`` / +``PodEvicted`` on Kubernetes, ``WorkerLost`` / ``SoftTimeLimit`` on Celery, +oom-killer ``SIGKILL`` on the local executor — are visible only to the +executor and never reach the listener. + +``FailureDetails`` is the executor-agnostic shape every executor populates +when surfacing infrastructure failure causes. The ``executor_kind`` is the +canonical executor name; ``infra_reason`` is the executor's short categorical +token; ``infra_metadata`` carries any structured payload the executor wants +to ship. + +.. code-block:: python + + from airflow.listeners import hookimpl + from airflow.listeners.types import FailureDetails + + + class InfraTrackingListener: + @hookimpl + def on_task_instance_failed(self, previous_state, task_instance, error, failure_details): + if failure_details and failure_details.infra_reason == "OOMKilled": + ... # route to capacity-planning alert + +This change ships only the type and the hookspec extension. Per-executor +wiring (Kubernetes, Celery, local, etc.) is intentionally deferred to +follow-up PRs so each executor's surfacing PR can iterate against a fixed +contract. pluggy dispatches by parameter name, so existing ``hookimpl`` +implementations that don't declare ``failure_details`` keep working +unchanged. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py index 65e8260c2f47a..68bc04414281e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py @@ -84,6 +84,7 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta previous_state=None, task_instance=ti, error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.", + failure_details=None, ) elif new_state == TaskInstanceState.SKIPPED: get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti) diff --git a/airflow-core/src/airflow/listeners/types.py b/airflow-core/src/airflow/listeners/types.py index 120b8ef503a6e..96df51495a2a1 100644 --- a/airflow-core/src/airflow/listeners/types.py +++ b/airflow-core/src/airflow/listeners/types.py @@ -18,7 +18,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import attrs @@ -40,3 +40,34 @@ class AssetEvent: source_map_index: int | None source_aliases: list[SerializedAssetAlias] partition_key: str | None + + +@attrs.define(frozen=True) +class FailureDetails: + """ + Structured infrastructure-side failure context for ``on_task_instance_failed``. + + Foundation for AIP-97 (Infrastructure-Aware Task Execution). Today the + listener only sees the worker-side ``error`` exception; failure causes + that originate outside the worker process — pod ``OOMKilled`` / + ``PodEvicted`` on Kubernetes, ``WorkerLost`` / ``SoftTimeLimit`` on + Celery, oom-killer ``SIGKILL`` on the local executor, preemption / + eviction on resource-managed clusters — are visible only to the + executor and never reach the listener. + + This type is the executor-agnostic shape every executor populates when + surfacing infrastructure failure causes. The ``executor_kind`` is the + canonical executor name (``"kubernetes"``, ``"celery"``, + ``"local_executor"``, etc.); ``infra_reason`` is the executor's + short categorical token; ``infra_metadata`` carries any structured + payload the executor wants to ship (pod status conditions, exit + codes, evicted-by reasons, queue names, etc.). + + Wiring per executor is out of scope for the foundation: this type + only defines the shape so each executor's surfacing PR can iterate + against a fixed contract. + """ + + executor_kind: str + infra_reason: str | None = None + infra_metadata: dict[str, Any] = attrs.field(factory=dict) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index e5b19f2768da9..e21428f6b0f09 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1773,7 +1773,10 @@ def fetch_handle_failure_context( try: get_listener_manager().hook.on_task_instance_failed( - previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error + previous_state=TaskInstanceState.RUNNING, + task_instance=ti, + error=error, + failure_details=None, ) except Exception: log.exception("error calling listener") diff --git a/airflow-core/tests/unit/listeners/test_listener_types.py b/airflow-core/tests/unit/listeners/test_listener_types.py new file mode 100644 index 0000000000000..1399f18585adb --- /dev/null +++ b/airflow-core/tests/unit/listeners/test_listener_types.py @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.listeners import hookimpl +from airflow.listeners.listener import get_listener_manager +from airflow.listeners.types import FailureDetails + + +class TestFailureDetails: + """AIP-97 foundation primitive: structured executor-side failure context.""" + + @pytest.mark.parametrize( + ("executor_kind", "infra_reason", "infra_metadata"), + [ + pytest.param("kubernetes", "OOMKilled", {"exit_code": 137}, id="k8s-oom"), + pytest.param( + "kubernetes", "PodEvicted", {"node": "n-3", "evicted_by": "node-pressure"}, id="k8s-evicted" + ), + pytest.param("celery", "WorkerLost", {"worker_pid": 4321}, id="celery-worker-lost"), + pytest.param("local_executor", "SIGKILL", {"signal": 9}, id="local-sigkill"), + pytest.param("kubernetes", None, {}, id="unknown-reason"), + ], + ) + def test_construct(self, executor_kind, infra_reason, infra_metadata): + details = FailureDetails( + executor_kind=executor_kind, + infra_reason=infra_reason, + infra_metadata=infra_metadata, + ) + assert details.executor_kind == executor_kind + assert details.infra_reason == infra_reason + assert details.infra_metadata == infra_metadata + + def test_default_metadata_is_empty_dict(self): + details = FailureDetails(executor_kind="kubernetes", infra_reason="OOMKilled") + assert details.infra_metadata == {} + + def test_frozen(self): + """FailureDetails is immutable so listeners can safely cache it.""" + import attrs + + details = FailureDetails(executor_kind="kubernetes") + with pytest.raises(attrs.exceptions.FrozenInstanceError): + details.executor_kind = "celery" # type: ignore[misc] + + +class TestOnTaskInstanceFailedAcceptsFailureDetails: + """The on_task_instance_failed hookspec accepts the optional + ``failure_details`` argument so listener authors can opt in to + receiving infrastructure-side failure context as executors begin + populating it.""" + + def test_listener_with_failure_details_receives_it(self, listener_manager): + received: dict[str, FailureDetails | None] = {"failure_details": None} + + # Per the hookspec docstring, listener implementations must declare + # failure_details WITHOUT a default value — pluggy treats the impl + # default as authoritative and silently overrides the caller's value. + class InfraListener: + @hookimpl + def on_task_instance_failed( + self, + previous_state, + task_instance, + error, + failure_details, + ): + received["failure_details"] = failure_details + + listener_manager(InfraListener()) + + details = FailureDetails( + executor_kind="kubernetes", + infra_reason="OOMKilled", + infra_metadata={"exit_code": 137}, + ) + get_listener_manager().hook.on_task_instance_failed( + previous_state=None, + task_instance=None, + error=RuntimeError("boom"), + failure_details=details, + ) + + assert received["failure_details"] == details + + def test_listener_without_failure_details_param_keeps_working(self, listener_manager): + """Pluggy dispatches by parameter name, so existing hookimpls that + don't declare ``failure_details`` continue to work unchanged.""" + called = {"count": 0} + + class LegacyListener: + @hookimpl + def on_task_instance_failed(self, previous_state, task_instance, error): + called["count"] += 1 + + listener_manager(LegacyListener()) + + get_listener_manager().hook.on_task_instance_failed( + previous_state=None, + task_instance=None, + error=RuntimeError("boom"), + failure_details=FailureDetails(executor_kind="kubernetes"), + ) + + assert called["count"] == 1 diff --git a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py index d3450d6b05aa7..4769c69cb78b5 100644 --- a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py +++ b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py @@ -24,6 +24,7 @@ if TYPE_CHECKING: # These imports are for type checking only - no runtime dependency + from airflow.listeners.types import FailureDetails from airflow.models.taskinstance import TaskInstance from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance from airflow.utils.state import TaskInstanceState @@ -52,8 +53,31 @@ def on_task_instance_failed( previous_state: TaskInstanceState | None, task_instance: RuntimeTaskInstance | TaskInstance, error: None | str | BaseException, + failure_details: FailureDetails | None, ): - """Execute when task state changes to FAIL. previous_state can be None.""" + """ + Execute when task state changes to FAIL. previous_state can be None. + + :param previous_state: Previous state of the task instance (can be None) + :param task_instance: The task instance object + :param error: The exception that caused the failure (or human-readable + message string for API-driven manual transitions) + :param failure_details: Structured infrastructure-side failure context + populated by the executor when the failure cause originates outside + the worker process — e.g. ``OOMKilled`` / ``PodEvicted`` on Kubernetes, + ``WorkerLost`` on Celery. ``None`` when no executor-side cause is + available. + + Pluggy dispatches by parameter name, so existing ``hookimpl`` + implementations that don't declare ``failure_details`` keep working + unchanged. Implementations that DO declare it must not assign a + default value (``failure_details=None``) — pluggy treats the + impl-side default as authoritative and silently overrides the value + the caller passed. Declare it without a default:: + + @hookimpl + def on_task_instance_failed(self, previous_state, task_instance, error, failure_details): ... + """ @hookspec diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..5fda9544081f6 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1916,7 +1916,10 @@ def finalize( _run_task_state_change_callbacks(task, "on_retry_callback", context, log) try: get_listener_manager().hook.on_task_instance_failed( - previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error + previous_state=TaskInstanceState.RUNNING, + task_instance=ti, + error=error, + failure_details=None, ) except Exception: log.exception("error calling listener") @@ -1926,7 +1929,10 @@ def finalize( _run_task_state_change_callbacks(task, "on_failure_callback", context, log) try: get_listener_manager().hook.on_task_instance_failed( - previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error + previous_state=TaskInstanceState.RUNNING, + task_instance=ti, + error=error, + failure_details=None, ) except Exception: log.exception("error calling listener")