Skip to content

Extend listener hook-name log to remaining call sites#66397

Draft
1fanwang wants to merge 7 commits intoapache:mainfrom
1fanwang:1fanwang/listener-log-hook-context-rest
Draft

Extend listener hook-name log to remaining call sites#66397
1fanwang wants to merge 7 commits intoapache:mainfrom
1fanwang:1fanwang/listener-log-hook-context-rest

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 5, 2026

Description

Follow-up to #66395, which introduced the hook-name-in-log change for the
task instance listener call sites. This PR applies the identical mechanical
edit to the remaining listener-call surfaces so the suppressed-exception
log message identifies which listener hook raised across the entire codebase.

Surfaces covered

  • DagRun state changes
    airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
    (on_dag_run_success, on_dag_run_failed).
  • DAG import errors
    airflow-core/src/airflow/dag_processing/collection.py
    (on_existing_dag_import_error, on_new_dag_import_error).
  • Asset eventsairflow-core/src/airflow/assets/manager.py
    (on_asset_created, on_asset_alias_created, on_asset_changed,
    on_asset_event_emitted).
  • Component lifecycleairflow-core/src/airflow/jobs/job.py and
    task-sdk/src/airflow/sdk/execution_time/task_runner.py
    (on_starting, before_stopping).

After #66395 + this PR land, every listener-call site in airflow-core and
the Task SDK follows the same log format and listener-exception
suppression behavior is preserved.

Independence from #66395

This PR is not stacked: it touches only the call sites that #66395 does
not. The two PRs can land in either order without conflict.

Testing

The change is purely mechanical (single-line literal edit per call site).
Coverage of the new log format on a non-task-instance hook is added as
test_listener_lifecycle_error_log_includes_hook_name in
airflow-core/tests/unit/listeners/test_listeners.py: it registers a
listener that raises in on_starting, instantiates a Job to fire
the hook, and asserts the captured log includes "on_starting".
The existing test_listener_suppresses_exceptions continues to cover
the task-instance path.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.

E2E validation

=== non-TI listener call sites with hook name ===
  dag_run.py:227    hook='on_dag_run_success'
  dag_run.py:236    hook='on_dag_run_failed'
  collection.py:399 hook='on_existing_dag_import_error'
  collection.py:414 hook='on_new_dag_import_error'
  job.py:150        hook='on_starting'
  job.py:283        hook='before_stopping'
  manager.py:399    hook='on_asset_created'
  manager.py:407    hook='on_asset_alias_created'
  manager.py:415    hook='on_asset_changed'
  manager.py:423    hook='on_asset_event_emitted'
  task_runner.py:962  hook='on_starting'
  task_runner.py:1939 hook='before_stopping'

Total: 12 non-TI sites across 5 files. TI sites (task_runner.py:1181/1906/1914/1922/1932) untouched here — those are #66395's scope.

Real e2e validation (Airflow standalone)

Re-ran with airflow standalone against the worktree's editable install. Registered a listener that raises on on_starting and before_stopping. Observed the new log format firing across every PR-C surface:

scheduler  | error calling listener for hook 'on_starting'      loc=job.py:150
dag-processor | error calling listener for hook 'on_starting'   loc=job.py:150
triggerer  | error calling listener for hook 'on_starting'      loc=job.py:150
worker     | error calling listener for hook 'on_starting'      loc=task_runner.py:962
worker     | error calling listener for hook 'before_stopping'  loc=task_runner.py:1939

Both source paths PR-C touches — airflow-core/src/airflow/jobs/job.py (Job lifecycle on every component) and task-sdk/src/airflow/sdk/execution_time/task_runner.py (worker lifecycle) — emit the hook name. Suppression is preserved: scheduler, dag-processor, triggerer, and the worker all started normally despite the listener throwing.

The on_dag_run_* and asset-listener call sites in this PR (in dag_run.py and assets/manager.py) fire from the API server's PATCH endpoint and the asset event paths respectively — not exercised by a simple worker-triggered DAG, but they share the mechanical edit shape and are already covered by the layer-2 dispatch test attached above.

Integrated mega-branch validation (all 7 PRs composed)

This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services — airflow standalone running scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:

running   prev=QUEUED    msg=started               task=ok_task                                        ← PR-A msg arg
success   prev=RUNNING   msg=success               task=ok_task                                        ← PR-A
running   prev=QUEUED    msg=started               task=boom_task
failed    prev=RUNNING   msg=failed                task=boom_task   error_type=ValueError   fd=None    ← PR-A + PR-D + PR-F kwarg
running   prev=QUEUED    msg=started               task=skip_task
skipped   prev=RUNNING   msg=skipped               task=skip_task                                      ← PR-A skipped path
running   prev=QUEUED    msg=started               task=retry_task
failed    prev=RUNNING   msg=up_for_retry          task=retry_task  error_type=ValueError              ← PR-A retry-vs-terminal
running   prev=QUEUED    msg=started               task=retry_task    (try 2 of 2)
failed    prev=RUNNING   msg=failed                task=retry_task  error_type=ValueError
running   prev=QUEUED    msg=started               task=checkpoint_task
checkpointed prev=RUNNING                          task=checkpoint_task  checkpoint_data={'step': 5,
                                                                          'iterator_offset': 1024}     ← PR-E + PR-G

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None      msg=manually_set_to_failed task=ok_task    error_type=RuntimeError   fd=None  ← PR-D RuntimeError wrap
                                                                                                          (would be `str` on the PR-A-only branch)

What this validates jointly:

PR Surface Evidence in log
#66394 (msg arg) every TI hook has msg=... 6 canonical values fire (started, success, failed, skipped, up_for_retry, manually_set_to_failed)
#66395 (hook-name log, TI) logs identify the failing hook tested separately with throwing listener — see PR body
#66397 (hook-name log, rest) lifecycle / DagRun / asset surfaces tested separately with throwing listener — see PR body
#66399 (tighten error type) error: BaseException | None manual-set path delivers RuntimeError (was str on PR-A alone)
#66402 (CHECKPOINTED state) worker catches AirflowTaskCheckpointed running → checkpointed transition observed at the listener and at the supervisor message boundary
#66405 (FailureDetails) listener can declare failure_details kwarg failure_details=None flowing through every failure (no executor populates yet)
#66410 (on_task_instance_checkpointed) new hook fires with payload checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}

Repro

# Combine all 7 branches onto a mega branch (resolve trivial overlap on the
# spec file's failure hook signature — error + msg + failure_details kwargs
# in one signature) and install editable:
pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &

# Drop the recording listener (declares all 5 hooks including the new
# checkpointed one) into $AIRFLOW_HOME/plugins/, drop 5 DAGs into dags/
# (success / failed / skipped / retry-then-fail / checkpointed), trigger them.
for dag in e2e_success e2e_failed e2e_skipped e2e_retry_then_fail e2e_checkpointed; do
  airflow dags trigger $dag
done

# Then PATCH a state via the public API to exercise the manual path.

Bugs surfaced and fixed during this validation

This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:

Last two would have broken every task failure on apache/airflow main if the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.

Documented gap (deliberately not fixed in this stack)

task-sdk/.../supervisor.py:STATES_SENT_DIRECTLY lists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back to client.task_instances.finish() which the API server constrains to terminal states. The mega listener log shows the worker successfully logging Task checkpointed; reporting CHECKPOINTED state. and on_task_instance_checkpointed firing with the correct payload — but the DB row eventually transitions to failed because the supervisor cannot persist CHECKPOINTED through finish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.

Follow-up to apache#66395, which introduced the hook-name-in-log change for the
task instance listener call sites. This PR applies the identical mechanical
edit to the remaining listener-call surfaces so suppressed-listener-error
logs identify which hook raised across the entire codebase:

- DagRun state changes: on_dag_run_success, on_dag_run_failed
- DAG import errors: on_existing_dag_import_error, on_new_dag_import_error
- Asset events: on_asset_created, on_asset_alias_created, on_asset_changed,
  on_asset_event_emitted
- Component lifecycle: on_starting, before_stopping (Job + Task SDK runner)

After this lands, every listener-call site in airflow-core and the Task SDK
follows the same log format. Listener-exception suppression behavior is
preserved.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant