Add on_task_instance_checkpointed listener hook#66410
Draft
1fanwang wants to merge 17 commits intoapache:mainfrom
Draft
Add on_task_instance_checkpointed listener hook#664101fanwang wants to merge 17 commits intoapache:mainfrom
1fanwang wants to merge 17 commits intoapache:mainfrom
Conversation
Foundation for AIP-96 (Resumable Operators). Adds the vocabulary needed to discuss the resumable-operator API shape against a real artifact: - ``CHECKPOINTED`` task instance state (intermediate, lightyellow in UI), added to ``State.unfinished`` so downstream trigger rules treat it as "not yet finished". - ``AirflowTaskCheckpointed`` exception with optional ``checkpoint_data`` payload, mirroring the ``AirflowRescheduleException`` pattern (serializable, accepts a single domain payload). - Worker handler in ``run()`` that catches the exception and reports the ``CHECKPOINTED`` state to the supervisor. - Parametrized test ``TestTaskCheckpointed.test_run_returns_checkpointed_state`` covering ``checkpoint_data`` shapes (``None``, dict, list). Intentionally deferred to follow-ups so the API shape can be discussed against a real artifact before committing to a single policy: - ``checkpoint_data`` persistence and resume wiring. - Scheduler treatment of ``CHECKPOINTED`` (auto-resume after delay vs. manual API-driven resume only). - New listener hook ``on_task_instance_checkpointed`` and downstream observer integration. - Trigger-rule semantics for downstream tasks of a ``CHECKPOINTED`` upstream. Marking draft so the design discussion can happen on the PR.
Hookspec, finalize() emit, example listener handler, and test that the operator-supplied checkpoint_data is delivered to listeners. Stacks on the CHECKPOINTED state introduced in apache#66402; merge that one first. The hook fires from finalize() when run() catches AirflowTaskCheckpointed and reports CHECKPOINTED. checkpoint_data is stashed on the runtime ti inside the catch and read inside finalize() — the supervisor message does not carry the payload yet (separate question).
…ception
Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the
operator-author API surface can be discussed against running code.
- CHECKPOINTED added to TaskInstanceState (intermediate state, in
State.unfinished, lightyellow in the UI)
- AirflowTaskCheckpointed exception with optional checkpoint_data
payload, mirroring the AirflowRescheduleException pattern
(serializable, single domain payload)
- run() catches the exception and reports CHECKPOINTED state to the
supervisor
Persistence of checkpoint_data, scheduler auto-resume semantics, the
listener hook (apache#66410), and downstream trigger-rule integration are
intentionally out of scope here so each can be discussed separately.
This was referenced May 5, 2026
…file lives there)
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
…he#66410; their newsfragments live there)
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set (apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring). NOT FOR MERGE. Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of DatabricksSubmitRunOperator that survives worker disruption by: 1. Persisting self.run_id to AIP-103 task_state after submit_run. 2. Reading prior run_id on next attempt and reconnecting (no resubmit). 3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting on_kill cancel the Databricks run. Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is introduced by apache#66402 and not yet on main; this branch is rebased onto main so the diff shows only the new file. The file is self-explanatory as a design illustration; CI on this branch will not pass alone.
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
Layer 1: test_resumable_databricks_demo.py (~145 lines)
- Mock-based pytest (no real Databricks workspace).
- Asserts the resume contract:
- First execute(): submit_run called, run_id persisted.
- Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
- Second execute() (after CHECKPOINTED): submit_run NOT called; prior
run_id reused; task_state cleared on success.
- on_kill on the resumable variant does NOT cancel the Databricks run.
Layer 2: test_aip96_resumable_pattern.py (~155 lines)
- Provider-agnostic, in-process simulator.
- Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
repeat-disruption-cycles preserve external_id.
All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on top of #66402 (CHECKPOINTED state foundation). Adds the listener hook so plugin authors observe checkpoint events alongside running / success / failed / skipped:
The hook fires from
finalize()whenrun()catchesAirflowTaskCheckpointedand reportsCHECKPOINTED. The operator-suppliedcheckpoint_datais delivered to the listener — payload-to-supervisor wiring is a separate question I left out of the foundation PR.Open to feedback on the
checkpoint_dataplumbing — currently stashed on the runtime ti in the catch block and read in finalize(), which works but is a bit hidden. The other shape would be a fourth return fromrun()or threading it through the supervisor message; happy to switch.Testing
test_listener_receives_checkpoint_dataverifies the listener is called with the operator-supplied payload (None or dict).test_run_returns_checkpointed_statecontinues to cover the upstream state-transition contract.Marking draft because #66402 needs to merge first.
E2E validation
Listener author signature must declare
checkpoint_datawithout a default — pluggy treats the impl's default as authoritative and silently overrides the value the caller passed (the same gotcha I documented in #66405).Integrated mega-branch validation (all 7 PRs composed)
This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services —
airflow standalonerunning scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:What this validates jointly:
msg=...started,success,failed,skipped,up_for_retry,manually_set_to_failed)error: BaseException | NoneRuntimeError(wasstron PR-A alone)AirflowTaskCheckpointedrunning → checkpointedtransition observed at the listener and at the supervisor message boundaryfailure_detailskwargfailure_details=Noneflowing through every failure (no executor populates yet)checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}Repro
Bugs surfaced and fixed during this validation
This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:
AirflowTaskCheckpointedimport inrun()(NameError)_generated.pyTaskInstanceStatemissing CHECKPOINTED (AttributeError)TaskStatesupervisor message Literal rejected CHECKPOINTED (PydanticValidationError)_generated.pyIntermediateTIStatemissing CHECKPOINTEDfailure_details=Nonedefault silently received Noneon_task_instance_failedcall site missingfailure_detailskwarg (HookCallError: hook call must provide argument)Last two would have broken every task failure on apache/airflow
mainif the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.Documented gap (deliberately not fixed in this stack)
task-sdk/.../supervisor.py:STATES_SENT_DIRECTLYlists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back toclient.task_instances.finish()which the API server constrains to terminal states. The mega listener log shows the worker successfully loggingTask checkpointed; reporting CHECKPOINTED state.andon_task_instance_checkpointedfiring with the correct payload — but the DB row eventually transitions tofailedbecause the supervisor cannot persist CHECKPOINTED throughfinish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.