Add required context messages to all task instance state change notifications#66394
Draft
1fanwang wants to merge 5 commits intoapache:mainfrom
Draft
Add required context messages to all task instance state change notifications#663941fanwang wants to merge 5 commits intoapache:mainfrom
1fanwang wants to merge 5 commits intoapache:mainfrom
Conversation
…ications Mirrors the DagRun listener msg arg pattern (apache#56272) for the four task instance listener hooks: on_task_instance_running, on_task_instance_success, on_task_instance_failed, on_task_instance_skipped. The new msg arg carries short canonical context for the state change so listeners can route or filter events without re-deriving intent from other fields. Canonical values: "started", "success", "skipped", "failed", "up_for_retry" from the worker; "manually_set_to_*" when the state was changed via the API. on_task_instance_failed especially benefits because the msg distinguishes terminal failures from up_for_retry without inspecting error. pluggy dispatches by parameter name, so existing @hookimpl implementations that don't declare msg keep working unchanged.
This was referenced May 5, 2026
This was referenced May 5, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Mirrors the DagRun listener
msgarg pattern (#56272) for the four taskinstance listener hooks:
on_task_instance_running,on_task_instance_success,on_task_instance_failed,on_task_instance_skipped.The new
msgarg carries short canonical context for the state change solisteners can route or filter events without re-deriving intent from other
fields.
on_task_instance_failedespecially benefits — themsgdistinguishes terminal failures from
up_for_retrypaths withoutinspecting
erroror task config.Canonical values
on_task_instance_running"started"on_task_instance_success"success""manually_set_to_success"on_task_instance_failed"failed"(terminal),"up_for_retry"(will retry)"manually_set_to_failed"on_task_instance_skipped"skipped""manually_set_to_skipped"Backwards compatibility
pluggy dispatches by parameter name, so existing
@hookimplimplementations that don't declare
msgkeep working unchanged.Testing
CustomListenertest fixture intask-sdk/tests/task_sdk/execution_time/test_task_runner.pyrecordsthe msg on every hook call and asserts the canonical value on the
existing running/success/failed/skipped tests.
test_task_runner_listener_msg_distinguishes_retry_vs_terminalexercises both branches in
finalize():should_retry=True→(UP_FOR_RETRY, "up_for_retry"),should_retry=False→(FAILED, "failed").^ Add meaningful description above
Read the Pull Request Guidelines for more information.
E2E validation
Direct pluggy dispatch test against the worktree source (no full Airflow boot, just the listener manager + new hookspec + recording listeners). Result:
Confirms: the new
msgarg flows to listeners that declare it, the canonical values match the table above, and pre-existing listeners that don't declaremsgcontinue to work unchanged (pluggy's name-based dispatch).Real E2E validation (Airflow standalone)
Re-ran with
airflow standalone(real scheduler + API server + LocalExecutor worker + sqlite metadata DB) on the worktree's editable install. Recording listener plugin appended every TI hook invocation to a log file. DAGs that exercise success / failed / skipped / retry-then-fail / manual-set paths.All 6 canonical msg values land correctly through the real worker/scheduler/API-server pipeline:
started/success/failed/skipped/up_for_retryfrom the worker (Task SDKtask_runner.run()→finalize())manually_set_to_failedfrom the API server (_emit_state_listener_hookstriggered by the PATCH endpoint)The
error_type=stron the manual-set row is the current behavior on this branch — #66399 wraps that in aRuntimeErrorfor type uniformity.Integrated mega-branch validation (all 7 PRs composed)
This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services —
airflow standalonerunning scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:What this validates jointly:
msg=...started,success,failed,skipped,up_for_retry,manually_set_to_failed)error: BaseException | NoneRuntimeError(wasstron PR-A alone)AirflowTaskCheckpointedrunning → checkpointedtransition observed at the listener and at the supervisor message boundaryfailure_detailskwargfailure_details=Noneflowing through every failure (no executor populates yet)checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}Repro
Bugs surfaced and fixed during this validation
This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:
AirflowTaskCheckpointedimport inrun()(NameError)_generated.pyTaskInstanceStatemissing CHECKPOINTED (AttributeError)TaskStatesupervisor message Literal rejected CHECKPOINTED (PydanticValidationError)_generated.pyIntermediateTIStatemissing CHECKPOINTEDfailure_details=Nonedefault silently received Noneon_task_instance_failedcall site missingfailure_detailskwarg (HookCallError: hook call must provide argument)Last two would have broken every task failure on apache/airflow
mainif the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.Documented gap (deliberately not fixed in this stack)
task-sdk/.../supervisor.py:STATES_SENT_DIRECTLYlists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back toclient.task_instances.finish()which the API server constrains to terminal states. The mega listener log shows the worker successfully loggingTask checkpointed; reporting CHECKPOINTED state.andon_task_instance_checkpointedfiring with the correct payload — but the DB row eventually transitions tofailedbecause the supervisor cannot persist CHECKPOINTED throughfinish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.