Skip to content
32 changes: 32 additions & 0 deletions airflow-core/newsfragments/66405.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
Add the ``FailureDetails`` value object and extend the ``on_task_instance_failed`` listener hook to accept it as a ``failure_details`` keyword argument (AIP-97 foundation).

Today the listener only sees the worker-side ``error`` exception; failure
causes that originate outside the worker process — pod ``OOMKilled`` /
``PodEvicted`` on Kubernetes, ``WorkerLost`` / ``SoftTimeLimit`` on Celery,
oom-killer ``SIGKILL`` on the local executor — are visible only to the
executor and never reach the listener.

``FailureDetails`` is the executor-agnostic shape every executor populates
when surfacing infrastructure failure causes. The ``executor_kind`` is the
canonical executor name; ``infra_reason`` is the executor's short categorical
token; ``infra_metadata`` carries any structured payload the executor wants
to ship.

.. code-block:: python

from airflow.listeners import hookimpl
from airflow.listeners.types import FailureDetails


class InfraTrackingListener:
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, error, failure_details):
if failure_details and failure_details.infra_reason == "OOMKilled":
... # route to capacity-planning alert

This change ships only the type and the hookspec extension. Per-executor
wiring (Kubernetes, Celery, local, etc.) is intentionally deferred to
follow-up PRs so each executor's surfacing PR can iterate against a fixed
contract. pluggy dispatches by parameter name, so existing ``hookimpl``
implementations that don't declare ``failure_details`` keep working
unchanged.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def _emit_state_listener_hooks(updated_tis: list[TI], new_state: str | TaskInsta
previous_state=None,
task_instance=ti,
error=f"TaskInstance's state was manually set to `{TaskInstanceState.FAILED}`.",
failure_details=None,
)
elif new_state == TaskInstanceState.SKIPPED:
get_listener_manager().hook.on_task_instance_skipped(previous_state=None, task_instance=ti)
Expand Down
33 changes: 32 additions & 1 deletion airflow-core/src/airflow/listeners/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import attrs

Expand All @@ -40,3 +40,34 @@ class AssetEvent:
source_map_index: int | None
source_aliases: list[SerializedAssetAlias]
partition_key: str | None


@attrs.define(frozen=True)
class FailureDetails:
"""
Structured infrastructure-side failure context for ``on_task_instance_failed``.

Foundation for AIP-97 (Infrastructure-Aware Task Execution). Today the
listener only sees the worker-side ``error`` exception; failure causes
that originate outside the worker process — pod ``OOMKilled`` /
``PodEvicted`` on Kubernetes, ``WorkerLost`` / ``SoftTimeLimit`` on
Celery, oom-killer ``SIGKILL`` on the local executor, preemption /
eviction on resource-managed clusters — are visible only to the
executor and never reach the listener.

This type is the executor-agnostic shape every executor populates when
surfacing infrastructure failure causes. The ``executor_kind`` is the
canonical executor name (``"kubernetes"``, ``"celery"``,
``"local_executor"``, etc.); ``infra_reason`` is the executor's
short categorical token; ``infra_metadata`` carries any structured
payload the executor wants to ship (pod status conditions, exit
codes, evicted-by reasons, queue names, etc.).

Wiring per executor is out of scope for the foundation: this type
only defines the shape so each executor's surfacing PR can iterate
against a fixed contract.
"""

executor_kind: str
infra_reason: str | None = None
infra_metadata: dict[str, Any] = attrs.field(factory=dict)
5 changes: 4 additions & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1773,7 +1773,10 @@ def fetch_handle_failure_context(

try:
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
previous_state=TaskInstanceState.RUNNING,
task_instance=ti,
error=error,
failure_details=None,
)
except Exception:
log.exception("error calling listener")
Expand Down
122 changes: 122 additions & 0 deletions airflow-core/tests/unit/listeners/test_listener_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import pytest

from airflow.listeners import hookimpl
from airflow.listeners.listener import get_listener_manager
from airflow.listeners.types import FailureDetails


class TestFailureDetails:
"""AIP-97 foundation primitive: structured executor-side failure context."""

@pytest.mark.parametrize(
("executor_kind", "infra_reason", "infra_metadata"),
[
pytest.param("kubernetes", "OOMKilled", {"exit_code": 137}, id="k8s-oom"),
pytest.param(
"kubernetes", "PodEvicted", {"node": "n-3", "evicted_by": "node-pressure"}, id="k8s-evicted"
),
pytest.param("celery", "WorkerLost", {"worker_pid": 4321}, id="celery-worker-lost"),
pytest.param("local_executor", "SIGKILL", {"signal": 9}, id="local-sigkill"),
pytest.param("kubernetes", None, {}, id="unknown-reason"),
],
)
def test_construct(self, executor_kind, infra_reason, infra_metadata):
details = FailureDetails(
executor_kind=executor_kind,
infra_reason=infra_reason,
infra_metadata=infra_metadata,
)
assert details.executor_kind == executor_kind
assert details.infra_reason == infra_reason
assert details.infra_metadata == infra_metadata

def test_default_metadata_is_empty_dict(self):
details = FailureDetails(executor_kind="kubernetes", infra_reason="OOMKilled")
assert details.infra_metadata == {}

def test_frozen(self):
"""FailureDetails is immutable so listeners can safely cache it."""
import attrs

details = FailureDetails(executor_kind="kubernetes")
with pytest.raises(attrs.exceptions.FrozenInstanceError):
details.executor_kind = "celery" # type: ignore[misc]


class TestOnTaskInstanceFailedAcceptsFailureDetails:
"""The on_task_instance_failed hookspec accepts the optional
``failure_details`` argument so listener authors can opt in to
receiving infrastructure-side failure context as executors begin
populating it."""

def test_listener_with_failure_details_receives_it(self, listener_manager):
received: dict[str, FailureDetails | None] = {"failure_details": None}

# Per the hookspec docstring, listener implementations must declare
# failure_details WITHOUT a default value — pluggy treats the impl
# default as authoritative and silently overrides the caller's value.
class InfraListener:
@hookimpl
def on_task_instance_failed(
self,
previous_state,
task_instance,
error,
failure_details,
):
received["failure_details"] = failure_details

listener_manager(InfraListener())

details = FailureDetails(
executor_kind="kubernetes",
infra_reason="OOMKilled",
infra_metadata={"exit_code": 137},
)
get_listener_manager().hook.on_task_instance_failed(
previous_state=None,
task_instance=None,
error=RuntimeError("boom"),
failure_details=details,
)

assert received["failure_details"] == details

def test_listener_without_failure_details_param_keeps_working(self, listener_manager):
"""Pluggy dispatches by parameter name, so existing hookimpls that
don't declare ``failure_details`` continue to work unchanged."""
called = {"count": 0}

class LegacyListener:
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, error):
called["count"] += 1

listener_manager(LegacyListener())

get_listener_manager().hook.on_task_instance_failed(
previous_state=None,
task_instance=None,
error=RuntimeError("boom"),
failure_details=FailureDetails(executor_kind="kubernetes"),
)

assert called["count"] == 1
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

if TYPE_CHECKING:
# These imports are for type checking only - no runtime dependency
from airflow.listeners.types import FailureDetails
from airflow.models.taskinstance import TaskInstance
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -52,8 +53,31 @@ def on_task_instance_failed(
previous_state: TaskInstanceState | None,
task_instance: RuntimeTaskInstance | TaskInstance,
error: None | str | BaseException,
failure_details: FailureDetails | None,
):
"""Execute when task state changes to FAIL. previous_state can be None."""
"""
Execute when task state changes to FAIL. previous_state can be None.

:param previous_state: Previous state of the task instance (can be None)
:param task_instance: The task instance object
:param error: The exception that caused the failure (or human-readable
message string for API-driven manual transitions)
:param failure_details: Structured infrastructure-side failure context
populated by the executor when the failure cause originates outside
the worker process — e.g. ``OOMKilled`` / ``PodEvicted`` on Kubernetes,
``WorkerLost`` on Celery. ``None`` when no executor-side cause is
available.

Pluggy dispatches by parameter name, so existing ``hookimpl``
implementations that don't declare ``failure_details`` keep working
unchanged. Implementations that DO declare it must not assign a
default value (``failure_details=None``) — pluggy treats the
impl-side default as authoritative and silently overrides the value
the caller passed. Declare it without a default::

@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, error, failure_details): ...
"""


@hookspec
Expand Down
10 changes: 8 additions & 2 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1916,7 +1916,10 @@ def finalize(
_run_task_state_change_callbacks(task, "on_retry_callback", context, log)
try:
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
previous_state=TaskInstanceState.RUNNING,
task_instance=ti,
error=error,
failure_details=None,
)
except Exception:
log.exception("error calling listener")
Expand All @@ -1926,7 +1929,10 @@ def finalize(
_run_task_state_change_callbacks(task, "on_failure_callback", context, log)
try:
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error
previous_state=TaskInstanceState.RUNNING,
task_instance=ti,
error=error,
failure_details=None,
)
except Exception:
log.exception("error calling listener")
Expand Down
Loading