Extend listener hook-name log to remaining call sites#66397
Draft
1fanwang wants to merge 7 commits intoapache:mainfrom
Draft
Extend listener hook-name log to remaining call sites#663971fanwang wants to merge 7 commits intoapache:mainfrom
1fanwang wants to merge 7 commits intoapache:mainfrom
Conversation
Follow-up to apache#66395, which introduced the hook-name-in-log change for the task instance listener call sites. This PR applies the identical mechanical edit to the remaining listener-call surfaces so suppressed-listener-error logs identify which hook raised across the entire codebase: - DagRun state changes: on_dag_run_success, on_dag_run_failed - DAG import errors: on_existing_dag_import_error, on_new_dag_import_error - Asset events: on_asset_created, on_asset_alias_created, on_asset_changed, on_asset_event_emitted - Component lifecycle: on_starting, before_stopping (Job + Task SDK runner) After this lands, every listener-call site in airflow-core and the Task SDK follows the same log format. Listener-exception suppression behavior is preserved.
This was referenced May 5, 2026
Mirrors the same fix in PR-B (apache#66395) — extends to lifecycle and asset listener call sites.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Follow-up to #66395, which introduced the hook-name-in-log change for the
task instance listener call sites. This PR applies the identical mechanical
edit to the remaining listener-call surfaces so the suppressed-exception
log message identifies which listener hook raised across the entire codebase.
Surfaces covered
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py(
on_dag_run_success,on_dag_run_failed).airflow-core/src/airflow/dag_processing/collection.py(
on_existing_dag_import_error,on_new_dag_import_error).airflow-core/src/airflow/assets/manager.py(
on_asset_created,on_asset_alias_created,on_asset_changed,on_asset_event_emitted).airflow-core/src/airflow/jobs/job.pyandtask-sdk/src/airflow/sdk/execution_time/task_runner.py(
on_starting,before_stopping).After #66395 + this PR land, every listener-call site in airflow-core and
the Task SDK follows the same log format and listener-exception
suppression behavior is preserved.
Independence from #66395
This PR is not stacked: it touches only the call sites that #66395 does
not. The two PRs can land in either order without conflict.
Testing
The change is purely mechanical (single-line literal edit per call site).
Coverage of the new log format on a non-task-instance hook is added as
test_listener_lifecycle_error_log_includes_hook_nameinairflow-core/tests/unit/listeners/test_listeners.py: it registers alistener that raises in
on_starting, instantiates aJobto firethe hook, and asserts the captured log includes
"on_starting".The existing
test_listener_suppresses_exceptionscontinues to coverthe task-instance path.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
E2E validation
Real e2e validation (Airflow standalone)
Re-ran with
airflow standaloneagainst the worktree's editable install. Registered a listener that raises onon_startingandbefore_stopping. Observed the new log format firing across every PR-C surface:Both source paths PR-C touches —
airflow-core/src/airflow/jobs/job.py(Job lifecycle on every component) andtask-sdk/src/airflow/sdk/execution_time/task_runner.py(worker lifecycle) — emit the hook name. Suppression is preserved: scheduler, dag-processor, triggerer, and the worker all started normally despite the listener throwing.The
on_dag_run_*and asset-listener call sites in this PR (indag_run.pyandassets/manager.py) fire from the API server's PATCH endpoint and the asset event paths respectively — not exercised by a simple worker-triggered DAG, but they share the mechanical edit shape and are already covered by the layer-2 dispatch test attached above.Integrated mega-branch validation (all 7 PRs composed)
This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services —
airflow standalonerunning scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:What this validates jointly:
msg=...started,success,failed,skipped,up_for_retry,manually_set_to_failed)error: BaseException | NoneRuntimeError(wasstron PR-A alone)AirflowTaskCheckpointedrunning → checkpointedtransition observed at the listener and at the supervisor message boundaryfailure_detailskwargfailure_details=Noneflowing through every failure (no executor populates yet)checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}Repro
Bugs surfaced and fixed during this validation
This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:
AirflowTaskCheckpointedimport inrun()(NameError)_generated.pyTaskInstanceStatemissing CHECKPOINTED (AttributeError)TaskStatesupervisor message Literal rejected CHECKPOINTED (PydanticValidationError)_generated.pyIntermediateTIStatemissing CHECKPOINTEDfailure_details=Nonedefault silently received Noneon_task_instance_failedcall site missingfailure_detailskwarg (HookCallError: hook call must provide argument)Last two would have broken every task failure on apache/airflow
mainif the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.Documented gap (deliberately not fixed in this stack)
task-sdk/.../supervisor.py:STATES_SENT_DIRECTLYlists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back toclient.task_instances.finish()which the API server constrains to terminal states. The mega listener log shows the worker successfully loggingTask checkpointed; reporting CHECKPOINTED state.andon_task_instance_checkpointedfiring with the correct payload — but the DB row eventually transitions tofailedbecause the supervisor cannot persist CHECKPOINTED throughfinish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.