AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception#66402
Draft
1fanwang wants to merge 8 commits intoapache:mainfrom
Draft
AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception#664021fanwang wants to merge 8 commits intoapache:mainfrom
1fanwang wants to merge 8 commits intoapache:mainfrom
Conversation
…ception
Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the
operator-author API surface can be discussed against running code.
- CHECKPOINTED added to TaskInstanceState (intermediate state, in
State.unfinished, lightyellow in the UI)
- AirflowTaskCheckpointed exception with optional checkpoint_data
payload, mirroring the AirflowRescheduleException pattern
(serializable, single domain payload)
- run() catches the exception and reports CHECKPOINTED state to the
supervisor
Persistence of checkpoint_data, scheduler auto-resume semantics, the
listener hook (apache#66410), and downstream trigger-rule integration are
intentionally out of scope here so each can be discussed separately.
882780f to
31e50c7
Compare
This was referenced May 5, 2026
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 5, 2026
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
…file lives there)
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
…he#66410; their newsfragments live there)
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
mypy on task-sdk: TITargetStatePayload.state expects IntermediateTIState, not the broader TaskInstanceState. Both enums have CHECKPOINTED with the same string value 'checkpointed'. Also regenerated the openapi specs and airflowctl datamodel to include the CHECKPOINTED state from PR-E (apache#66402).
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set (apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring). NOT FOR MERGE. Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of DatabricksSubmitRunOperator that survives worker disruption by: 1. Persisting self.run_id to AIP-103 task_state after submit_run. 2. Reading prior run_id on next attempt and reconnecting (no resubmit). 3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting on_kill cancel the Databricks run. Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is introduced by apache#66402 and not yet on main; this branch is rebased onto main so the diff shows only the new file. The file is self-explanatory as a design illustration; CI on this branch will not pass alone.
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
Layer 1: test_resumable_databricks_demo.py (~145 lines)
- Mock-based pytest (no real Databricks workspace).
- Asserts the resume contract:
- First execute(): submit_run called, run_id persisted.
- Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
- Second execute() (after CHECKPOINTED): submit_run NOT called; prior
run_id reused; task_state cleared on success.
- on_kill on the resumable variant does NOT cancel the Databricks run.
Layer 2: test_aip96_resumable_pattern.py (~155 lines)
- Provider-agnostic, in-process simulator.
- Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
repeat-disruption-cycles preserve external_id.
All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
AIP-96 (Resumable Operators) is currently in DRAFT on the cwiki. Opening
this as the smallest concrete artifact so the API shape can be discussed
against real code rather than only on the wiki — marking the PR draft for
that reason.
The minimum surface: a
CHECKPOINTEDtask instance state (intermediate,included in
State.unfinishedso trigger rules treat it as not-yet-done)and an
AirflowTaskCheckpointedexception that operators raise to signal"I reached a stable point and want to pause":
The worker catches it in
run()and reports the state. Persistence ofcheckpoint_data, scheduler auto-resume semantics, listener notification,and trigger-rule integration for downstream tasks are intentionally deferred
to follow-ups so each can be discussed without committing to a single
resumption policy.
Scheduler hot-path impact
CHECKPOINTEDis added to the existingState.unfinishedfrozenset only — trigger rule evaluation continues to do O(1) membership checks. No new branching in the scheduler loop, no new SQL, no new iteration cost over task candidates.Scheduler treatment of
CHECKPOINTEDDownstream tasks of a
CHECKPOINTEDupstream wait per the existingState.unfinishedsemantics — same shape asDEFERREDorUP_FOR_RETRYupstream. The foundation deliberately does not introduce auto-resume yet; that decision (auto-resume after delay vs manual-resume-only via API) is the single biggest design knob the AIP discussion needs to settle, and is intentionally left open here.A few questions that probably matter more than the implementation details
above:
AirflowTaskCheckpointedbe aBaseException(likeAirflowTaskTerminated/AirflowTaskTimeout) so usertry/except Exceptionblocks don't accidentally swallow it? Inheritedfrom
AirflowExceptionhere for symmetry with the skip/rescheduleexceptions, but happy to flip.
lightyellow) is a placeholder.AirflowTaskCheckpointedmirrorsAirflowTaskTerminated;CHECKPOINTEDmirrorsDEFERRED. Open to a different name.Testing
task-sdk/tests/task_sdk/execution_time/test_task_runner.pyexercises
run()against threecheckpoint_datashapes(
None, dict, list). Each variant raisesAirflowTaskCheckpointedfrom the operator'sexecute, runsthrough
run(), and asserts the returned state isTaskInstanceState.CHECKPOINTED.AirflowSkipException/AirflowRescheduleExceptioncontinue to pass — the new branch isadditive and lives between the existing skip and reschedule
branches in
run()'s exception chain.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
E2E validation
What this doesn't verify (intentionally — those are out of scope here):
checkpoint_data(a follow-up). Reviewers wanting to see the listener path can look at Add on_task_instance_checkpointed listener hook #66410 (stacked PR) which fireson_task_instance_checkpointed(checkpoint_data=...)fromfinalize().E2E validation
What this doesn't verify (intentionally — those are out of scope here):
checkpoint_data(a follow-up). Reviewers wanting to see the listener path can look at Add on_task_instance_checkpointed listener hook #66410 (stacked PR) which fireson_task_instance_checkpointed(checkpoint_data=...)fromfinalize().Integrated mega-branch validation (all 7 PRs composed)
This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services —
airflow standalonerunning scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:What this validates jointly:
msg=...started,success,failed,skipped,up_for_retry,manually_set_to_failed)error: BaseException | NoneRuntimeError(wasstron PR-A alone)AirflowTaskCheckpointedrunning → checkpointedtransition observed at the listener and at the supervisor message boundaryfailure_detailskwargfailure_details=Noneflowing through every failure (no executor populates yet)checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}Repro
Bugs surfaced and fixed during this validation
This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:
AirflowTaskCheckpointedimport inrun()(NameError)_generated.pyTaskInstanceStatemissing CHECKPOINTED (AttributeError)TaskStatesupervisor message Literal rejected CHECKPOINTED (PydanticValidationError)_generated.pyIntermediateTIStatemissing CHECKPOINTEDfailure_details=Nonedefault silently received Noneon_task_instance_failedcall site missingfailure_detailskwarg (HookCallError: hook call must provide argument)Last two would have broken every task failure on apache/airflow
mainif the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.Supervisor wiring follow-up — #66445
The original gap noted here (CHECKPOINTED not in
STATES_SENT_DIRECTLY,TITargetStatePayloadnot handled by the API server's state-update endpoint) is now addressed in stacked PR #66445. With #66445 applied, the DB row lands atstate=checkpointedend-to-end through realairflow standaloneservices.checkpoint_datapersistence and scheduler auto-resume semantics remain deliberately open for the AIP discussion.