Skip to content

AIP-97 (draft): add FailureDetails primitive for infrastructure-side failure context#66405

Draft
1fanwang wants to merge 8 commits intoapache:mainfrom
1fanwang:1fanwang/aip97-failure-details
Draft

AIP-97 (draft): add FailureDetails primitive for infrastructure-side failure context#66405
1fanwang wants to merge 8 commits intoapache:mainfrom
1fanwang:1fanwang/aip97-failure-details

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 5, 2026

AIP-97 (Infrastructure-Aware Task Execution) is currently in DRAFT on the
cwiki. Opening this as the smallest concrete artifact so the listener-side
API shape can be discussed against real code — marking the PR draft for
that reason.

The gap: on_task_instance_failed only sees the worker-side error
exception. Failure causes that originate outside the worker process
(Kubernetes OOMKilled/PodEvicted/ImagePullBackOff, Celery
WorkerLost/SoftTimeLimit, LocalExecutor oom-killer SIGKILL, and so
on) are visible only to the executor and never reach the listener. The
shape is the same across executors — a kind tag, a categorical reason,
structured metadata — and FailureDetails formalises it:

from airflow.listeners import hookimpl
from airflow.listeners.types import FailureDetails

class InfraTrackingListener:
    @hookimpl
    def on_task_instance_failed(
        self,
        previous_state,
        task_instance,
        error,
        failure_details: FailureDetails | None,  # NOTE: do not assign a default — see hookspec docstring
    ):
        if failure_details and failure_details.infra_reason == "OOMKilled":
            ...  # route to capacity-planning alert

FailureDetails is a frozen attrs class with executor_kind,
infra_reason, infra_metadata. The hookspec gains the optional
failure_details keyword arg; pluggy dispatches by name, so existing
hookimpls that don't declare it keep working.

Per-executor wiring (Kubernetes, Celery, LocalExecutor, and any other
executor that surfaces eviction/preemption) is intentionally deferred so
each can iterate against a fixed contract. Persisting FailureDetails on
the TaskInstance for UI rendering is also a separate question.

A few questions worth settling on the discussion:

  • infra_reason as free-form string vs constrained enum — free-form here
    for permissiveness, open to switching.
  • Whether a sibling FailureDetails belongs on on_task_instance_success
    for success-with-warnings cases (out of scope here; the failure path is
    the demonstrably-broken one).
  • Naming: FailureDetails vs InfraFailureDetails vs
    ExecutorFailureContext.

Testing

  • Parametrized TestFailureDetails.test_construct covers five
    realistic executor / reason / metadata shapes (k8s OOM, k8s
    evicted, celery worker-lost, local SIGKILL, unknown reason).
  • test_default_metadata_is_empty_dict and test_frozen lock
    the dataclass invariants.
  • TestOnTaskInstanceFailedAcceptsFailureDetails exercises the
    pluggy-dispatch contract: the new listener receives the payload,
    the legacy listener (no failure_details parameter) still gets
    called.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.

E2E validation

Bug found and fixed during validation

Initial implementation had failure_details: FailureDetails | None = None in the hookspec. End-to-end pluggy dispatch revealed a quirk: when a listener implementation declares the same parameter with a default (failure_details=None), pluggy treats the impl-side default as authoritative and silently overrides the value the caller passed. Removed the default from the hookspec and added an explicit warning to the docstring telling impl authors to declare failure_details without a default.

After-fix runtime check

class New:
    @hookimpl
    def on_task_instance_failed(self, previous_state, task_instance, error, failure_details):
        ...

class Legacy:
    @hookimpl
    def on_task_instance_failed(self, previous_state, task_instance, error):
        ...

caller: hook.on_task_instance_failed(..., failure_details=FailureDetails(executor_kind="kubernetes", infra_reason="OOMKilled", ...))

results:
  ('legacy',)                                                  # legacy listener still fires
  ('new', FailureDetails(executor_kind='kubernetes',
                         infra_reason='OOMKilled',
                         infra_metadata={'exit_code': 137}))    # new listener gets the value

The fix means the impl-side signature must be exactly failure_details (no default). Documented in the hookspec.

Real e2e validation surfaced another bug — fixed in this PR

Re-ran with airflow standalone against the worktree's editable install and dropped a listener that declares failure_details (no default, per the hookspec docstring). Triggered a failing DAG.

First run revealed a follow-on bug from removing the default: every existing on_task_instance_failed call site in the codebase (task_runner.py for the worker, taskinstance.py for the API server retry path, _emit_state_listener_hooks for manual API state changes) didn't pass failure_details, and pluggy raised HookCallError: hook call must provide argument 'failure_details' once the hookspec required it. Every task failure on the upstream branch would have hit this.

Fix: pass failure_details=None at every call site until each executor's wiring PR populates it. Commit 9a51cbbe60 updates the three call sites.

After the fix, the listener receives the call cleanly:

failed   task=boom_task   error_type=ValueError   failure_details=None

failure_details=None is the expected value today — no executor populates this field yet, that's deferred to per-executor follow-up PRs (Kubernetes, Celery, LocalExecutor). The listener interface is wired and back-compat: a listener that doesn't declare failure_details keeps working unchanged (pluggy's name-based dispatch); a listener that declares it (without a default) gets None until an executor populates it.

Repro

pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &
# trigger a DAG that fails
airflow dags trigger e2e_failed
# observe listener log
tail /tmp/listener.log

DAG used:

@dag(...)
def e2e_failed_dag():
    @task(retries=0)
    def boom_task():
        raise ValueError("...")
    boom_task()

class FailureDetailsListener:
    @hookimpl
    def on_task_instance_failed(self, previous_state, task_instance, error, failure_details):
        # failure_details is None until any executor populates it.
        ...

Integrated mega-branch validation (all 7 PRs composed)

This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services — airflow standalone running scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:

running   prev=QUEUED    msg=started               task=ok_task                                        ← PR-A msg arg
success   prev=RUNNING   msg=success               task=ok_task                                        ← PR-A
running   prev=QUEUED    msg=started               task=boom_task
failed    prev=RUNNING   msg=failed                task=boom_task   error_type=ValueError   fd=None    ← PR-A + PR-D + PR-F kwarg
running   prev=QUEUED    msg=started               task=skip_task
skipped   prev=RUNNING   msg=skipped               task=skip_task                                      ← PR-A skipped path
running   prev=QUEUED    msg=started               task=retry_task
failed    prev=RUNNING   msg=up_for_retry          task=retry_task  error_type=ValueError              ← PR-A retry-vs-terminal
running   prev=QUEUED    msg=started               task=retry_task    (try 2 of 2)
failed    prev=RUNNING   msg=failed                task=retry_task  error_type=ValueError
running   prev=QUEUED    msg=started               task=checkpoint_task
checkpointed prev=RUNNING                          task=checkpoint_task  checkpoint_data={'step': 5,
                                                                          'iterator_offset': 1024}     ← PR-E + PR-G

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None      msg=manually_set_to_failed task=ok_task    error_type=RuntimeError   fd=None  ← PR-D RuntimeError wrap
                                                                                                          (would be `str` on the PR-A-only branch)

What this validates jointly:

PR Surface Evidence in log
#66394 (msg arg) every TI hook has msg=... 6 canonical values fire (started, success, failed, skipped, up_for_retry, manually_set_to_failed)
#66395 (hook-name log, TI) logs identify the failing hook tested separately with throwing listener — see PR body
#66397 (hook-name log, rest) lifecycle / DagRun / asset surfaces tested separately with throwing listener — see PR body
#66399 (tighten error type) error: BaseException | None manual-set path delivers RuntimeError (was str on PR-A alone)
#66402 (CHECKPOINTED state) worker catches AirflowTaskCheckpointed running → checkpointed transition observed at the listener and at the supervisor message boundary
#66405 (FailureDetails) listener can declare failure_details kwarg failure_details=None flowing through every failure (no executor populates yet)
#66410 (on_task_instance_checkpointed) new hook fires with payload checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}

Repro

# Combine all 7 branches onto a mega branch (resolve trivial overlap on the
# spec file's failure hook signature — error + msg + failure_details kwargs
# in one signature) and install editable:
pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &

# Drop the recording listener (declares all 5 hooks including the new
# checkpointed one) into $AIRFLOW_HOME/plugins/, drop 5 DAGs into dags/
# (success / failed / skipped / retry-then-fail / checkpointed), trigger them.
for dag in e2e_success e2e_failed e2e_skipped e2e_retry_then_fail e2e_checkpointed; do
  airflow dags trigger $dag
done

# Then PATCH a state via the public API to exercise the manual path.

Bugs surfaced and fixed during this validation

This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:

Last two would have broken every task failure on apache/airflow main if the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.

Documented gap (deliberately not fixed in this stack)

task-sdk/.../supervisor.py:STATES_SENT_DIRECTLY lists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back to client.task_instances.finish() which the API server constrains to terminal states. The mega listener log shows the worker successfully logging Task checkpointed; reporting CHECKPOINTED state. and on_task_instance_checkpointed firing with the correct payload — but the DB row eventually transitions to failed because the supervisor cannot persist CHECKPOINTED through finish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.

1fanwang added 2 commits May 5, 2026 02:35
…context

Foundation for AIP-97 (Infrastructure-Aware Task Execution). Today the
on_task_instance_failed listener only sees the worker-side error
exception; failure causes that originate outside the worker process —
OOMKilled / PodEvicted on Kubernetes, WorkerLost / SoftTimeLimit on
Celery, oom-killer SIGKILL on the local executor, preemption /
eviction on resource-managed clusters — are visible only to the
executor and never reach the listener.

This PR adds the executor-agnostic FailureDetails type
(executor_kind, infra_reason, infra_metadata) and extends the
on_task_instance_failed hookspec to accept it as an optional
keyword argument. Per-executor wiring is intentionally deferred to
follow-up PRs so each executor's surfacing PR can iterate against a
fixed contract.

pluggy dispatches by parameter name, so existing hookimpls that
don't declare failure_details keep working unchanged.

Tests cover the FailureDetails dataclass surface (parametrized over
five realistic executor / reason / metadata shapes, default factory
empty dict, frozen immutability) and verify both the new
listener-receives-failure-details path and back-compat for legacy
listeners that don't declare the parameter.
…site

Hookspec extension required all callers to provide the kwarg explicitly. Real
e2e against airflow standalone surfaced HookCallError 'hook call must provide
argument failure_details' on the existing call sites in task_runner.py,
taskinstance.py, and the API server's manual-set path. Pass None until each
executor's wiring PR populates it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant