Skip to content

Include hook name in suppressed listener-exception log#66395

Draft
1fanwang wants to merge 7 commits intoapache:mainfrom
1fanwang:1fanwang/listener-log-hook-context
Draft

Include hook name in suppressed listener-exception log#66395
1fanwang wants to merge 7 commits intoapache:mainfrom
1fanwang:1fanwang/listener-log-hook-context

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 5, 2026

Description

When a listener hookimpl raises, the suppressed-exception log line now
identifies which hook raised. The previous generic
log.exception(\"error calling listener\") made it impossible to tell which
of the registered hooks failed without re-reading the stack trace —
especially painful when several listeners are registered and one of them
sporadically misbehaves.

New format:

```text
error calling listener for hook 'on_task_instance_failed'
```

The existing substring error calling listener remains in the message,
so any downstream log-grep tooling continues to match.

Scope

This PR covers the task instance listener call sites only, mirroring
the surface of #66394:

  • task-sdk/src/airflow/sdk/execution_time/task_runner.py — 5 sites
    (running, success, skipped, up_for_retry, failed)
  • airflow-core/src/airflow/models/taskinstance.py — 1 site (API
    server retry path)
  • airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py
    — split the single try/except wrapping three branches into per-branch
    try/except so each log line names exactly one hook

DagRun, asset, dag-processing, and lifecycle listener call sites follow the
same pattern and can be migrated as a follow-up; keeping this PR narrow
makes the diff trivially reviewable.

Behavior

Listener-exception suppression is preserved — task execution is not
affected by listener failures.

Testing

  • Existing test test_listener_suppresses_exceptions (airflow-core) is
    extended to also assert the hook name appears in the captured log
    output. It uses the existing throwing_listener fixture which raises
    in on_task_instance_success.
  • New unit test test_listener_error_log_includes_hook_name (task-sdk)
    registers a listener that raises in on_task_instance_success,
    drives the runner, and asserts log.exception was called with
    (\"error calling listener for hook %r\", \"on_task_instance_success\").

^ Add meaningful description above
Read the Pull Request Guidelines for more information.

E2E validation

=== TI listener call sites with hook name in log ===
  task-sdk/src/airflow/sdk/execution_time/task_runner.py:1181  hook='on_task_instance_running'
  task-sdk/src/airflow/sdk/execution_time/task_runner.py:1906  hook='on_task_instance_success'
  task-sdk/src/airflow/sdk/execution_time/task_runner.py:1914  hook='on_task_instance_skipped'
  task-sdk/src/airflow/sdk/execution_time/task_runner.py:1922  hook='on_task_instance_failed'
  task-sdk/src/airflow/sdk/execution_time/task_runner.py:1932  hook='on_task_instance_failed'
  airflow-core/src/airflow/models/taskinstance.py:1779         hook='on_task_instance_failed'
  airflow-core/src/airflow/api_fastapi/.../task_instances.py:83 hook='on_task_instance_success'
  airflow-core/src/airflow/api_fastapi/.../task_instances.py:92 hook='on_task_instance_failed'
  airflow-core/src/airflow/api_fastapi/.../task_instances.py:97 hook='on_task_instance_skipped'

Runtime log call: call('error calling listener for hook %r', 'on_task_instance_failed')

9/9 TI listener call sites updated. Lifecycle hooks (on_starting, before_stopping) are scoped to the follow-up #66397.

Real e2e validation (Airflow standalone)

Re-ran with airflow standalone (real scheduler + API server + LocalExecutor + sqlite). Registered a listener that raises on every TI hook; observed the new log format on every suppressed exception:

error calling listener for hook 'on_task_instance_running'    (raised in _prepare, task_runner.py:1181)
error calling listener for hook 'on_task_instance_success'    (raised in finalize, task_runner.py:1906)
error calling listener for hook 'on_task_instance_failed'     (raised in finalize, task_runner.py:1932)
error calling listener for hook 'on_task_instance_skipped'    (raised in finalize, task_runner.py:1914)

All 4 TI hooks now identify themselves by name in the log when an impl raises. Listener-exception suppression behavior is preserved — the DAGs that triggered these errors all completed normally (success/failed/skipped) regardless of the listener throwing.

Integrated mega-branch validation (all 7 PRs composed)

This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services — airflow standalone running scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:

running   prev=QUEUED    msg=started               task=ok_task                                        ← PR-A msg arg
success   prev=RUNNING   msg=success               task=ok_task                                        ← PR-A
running   prev=QUEUED    msg=started               task=boom_task
failed    prev=RUNNING   msg=failed                task=boom_task   error_type=ValueError   fd=None    ← PR-A + PR-D + PR-F kwarg
running   prev=QUEUED    msg=started               task=skip_task
skipped   prev=RUNNING   msg=skipped               task=skip_task                                      ← PR-A skipped path
running   prev=QUEUED    msg=started               task=retry_task
failed    prev=RUNNING   msg=up_for_retry          task=retry_task  error_type=ValueError              ← PR-A retry-vs-terminal
running   prev=QUEUED    msg=started               task=retry_task    (try 2 of 2)
failed    prev=RUNNING   msg=failed                task=retry_task  error_type=ValueError
running   prev=QUEUED    msg=started               task=checkpoint_task
checkpointed prev=RUNNING                          task=checkpoint_task  checkpoint_data={'step': 5,
                                                                          'iterator_offset': 1024}     ← PR-E + PR-G

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None      msg=manually_set_to_failed task=ok_task    error_type=RuntimeError   fd=None  ← PR-D RuntimeError wrap
                                                                                                          (would be `str` on the PR-A-only branch)

What this validates jointly:

PR Surface Evidence in log
#66394 (msg arg) every TI hook has msg=... 6 canonical values fire (started, success, failed, skipped, up_for_retry, manually_set_to_failed)
#66395 (hook-name log, TI) logs identify the failing hook tested separately with throwing listener — see PR body
#66397 (hook-name log, rest) lifecycle / DagRun / asset surfaces tested separately with throwing listener — see PR body
#66399 (tighten error type) error: BaseException | None manual-set path delivers RuntimeError (was str on PR-A alone)
#66402 (CHECKPOINTED state) worker catches AirflowTaskCheckpointed running → checkpointed transition observed at the listener and at the supervisor message boundary
#66405 (FailureDetails) listener can declare failure_details kwarg failure_details=None flowing through every failure (no executor populates yet)
#66410 (on_task_instance_checkpointed) new hook fires with payload checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}

Repro

# Combine all 7 branches onto a mega branch (resolve trivial overlap on the
# spec file's failure hook signature — error + msg + failure_details kwargs
# in one signature) and install editable:
pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &

# Drop the recording listener (declares all 5 hooks including the new
# checkpointed one) into $AIRFLOW_HOME/plugins/, drop 5 DAGs into dags/
# (success / failed / skipped / retry-then-fail / checkpointed), trigger them.
for dag in e2e_success e2e_failed e2e_skipped e2e_retry_then_fail e2e_checkpointed; do
  airflow dags trigger $dag
done

# Then PATCH a state via the public API to exercise the manual path.

Bugs surfaced and fixed during this validation

This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:

Last two would have broken every task failure on apache/airflow main if the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.

Documented gap (deliberately not fixed in this stack)

task-sdk/.../supervisor.py:STATES_SENT_DIRECTLY lists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back to client.task_instances.finish() which the API server constrains to terminal states. The mega listener log shows the worker successfully logging Task checkpointed; reporting CHECKPOINTED state. and on_task_instance_checkpointed firing with the correct payload — but the DB row eventually transitions to failed because the supervisor cannot persist CHECKPOINTED through finish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.

When a listener hookimpl raises, the suppressed-exception log line now
identifies which hook raised. The previous generic "error calling listener"
made it impossible to tell which of the registered hooks failed without
re-reading the stack trace, especially painful when several listeners are
registered and one of them sporadically misbehaves.

The new format is "error calling listener for hook 'on_task_instance_<x>'".
Existing listener-error suppression behavior is preserved unchanged.

Scope: task instance listener call sites (task_runner.py, taskinstance.py
on the API server retry path, and the manual-set-state path in the
fastapi service). DagRun, asset, and dag-processing listener call sites
follow the same pattern and can be migrated incrementally.
@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:task-sdk labels May 5, 2026
1fanwang added 5 commits May 5, 2026 13:45
stdlib Logger.exception accepts (msg, *args) but not arbitrary kwargs;
mypy flagged log.exception('msg', hook=name) as call-arg error in
taskinstance.py and other stdlib-Logger sites.

Reverting to format-string form which works for both stdlib and structlog
loggers. The structlog adapter interpolates the format args into the
event field, so cap_structlog still captures the rendered hook name.

Tests updated to match the rendered event field.
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
Mirrors the same fix in PR-B (apache#66395) — extends to lifecycle and asset
listener call sites.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant