AIP-96: wire CHECKPOINTED through supervisor → API server → DB#66445
Draft
1fanwang wants to merge 12 commits intoapache:mainfrom
Draft
AIP-96: wire CHECKPOINTED through supervisor → API server → DB#664451fanwang wants to merge 12 commits intoapache:mainfrom
1fanwang wants to merge 12 commits intoapache:mainfrom
Conversation
…ception
Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the
operator-author API surface can be discussed against running code.
- CHECKPOINTED added to TaskInstanceState (intermediate state, in
State.unfinished, lightyellow in the UI)
- AirflowTaskCheckpointed exception with optional checkpoint_data
payload, mirroring the AirflowRescheduleException pattern
(serializable, single domain payload)
- run() catches the exception and reports CHECKPOINTED state to the
supervisor
Persistence of checkpoint_data, scheduler auto-resume semantics, the
listener hook (apache#66410), and downstream trigger-rule integration are
intentionally out of scope here so each can be discussed separately.
The CHECKPOINTED state introduced in apache#66402 reached the message boundary but the DB row never transitioned because the supervisor's STATES_SENT_DIRECTLY list and the API server's state-update endpoint both rejected it. This PR closes the wiring: - Supervisor: CHECKPOINTED is added to STATES_SENT_DIRECTLY so it skips the terminal-state finish() path. The TaskState handler now calls a new client.task_instances.checkpoint() method. - Client: new checkpoint() method PATCHes /task-instances/{id}/state with TITargetStatePayload(state=CHECKPOINTED). - API server: a new branch in the state-update handler accepts TITargetStatePayload (the discriminated-union 'other' tag) and updates the row to checkpointed without additional bookkeeping. Verified end-to-end on airflow standalone — a DAG that raises AirflowTaskCheckpointed now lands the DB row at state='checkpointed' (was 'failed' before). checkpoint_data persistence is intentionally not added; the right shape (XCom-backed, new TaskInstance JSON column, separate metadata table) is the open AIP-96 question this PR deliberately leaves to the AIP discussion.
…ire significant type)
…he#66410; their newsfragments live there)
mypy on task-sdk: TITargetStatePayload.state expects IntermediateTIState, not the broader TaskInstanceState. Both enums have CHECKPOINTED with the same string value 'checkpointed'. Also regenerated the openapi specs and airflowctl datamodel to include the CHECKPOINTED state from PR-E (apache#66402).
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set (apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring). NOT FOR MERGE. Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of DatabricksSubmitRunOperator that survives worker disruption by: 1. Persisting self.run_id to AIP-103 task_state after submit_run. 2. Reading prior run_id on next attempt and reconnecting (no resubmit). 3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting on_kill cancel the Databricks run. Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is introduced by apache#66402 and not yet on main; this branch is rebased onto main so the diff shows only the new file. The file is self-explanatory as a design illustration; CI on this branch will not pass alone.
1fanwang
added a commit
to 1fanwang/airflow
that referenced
this pull request
May 6, 2026
Layer 1: test_resumable_databricks_demo.py (~145 lines)
- Mock-based pytest (no real Databricks workspace).
- Asserts the resume contract:
- First execute(): submit_run called, run_id persisted.
- Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
- Second execute() (after CHECKPOINTED): submit_run NOT called; prior
run_id reused; task_state cleared on success.
- on_kill on the resumable variant does NOT cancel the Databricks run.
Layer 2: test_aip96_resumable_pattern.py (~155 lines)
- Provider-agnostic, in-process simulator.
- Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
repeat-disruption-cycles preserve external_id.
All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Stacked on #66402 (CHECKPOINTED state +
AirflowTaskCheckpointedexception). The foundation PR reached the message boundary — the worker constructsTaskState(state=CHECKPOINTED)and the supervisor accepts it — but the DB row never transitioned because:STATES_SENT_DIRECTLYlist didn't include CHECKPOINTED, so it fell through toclient.task_instances.finish()which is constrained to terminal states.TITargetStatePayload(the discriminated-union catch-all that CHECKPOINTED routes through).This PR closes both gaps:
STATES_SENT_DIRECTLY. TheTaskStatehandler calls a newclient.task_instances.checkpoint()method when the worker reports CHECKPOINTED.checkpoint()method PATCHes/task-instances/{id}/statewithTITargetStatePayload(state=CHECKPOINTED)— the same shapeDEFERREDandUP_FOR_RESCHEDULEalready use.TITargetStatePayloadand updates the row tocheckpointedwithout additional bookkeeping.checkpoint_datapersistence is intentionally not added in this PR. The right shape (XCom-backed, new TaskInstance JSON column, separate metadata table) is the open AIP-96 question this PR deliberately leaves for the AIP discussion. Opening this draft so the discussion has a concrete state-transition wiring to react to — reviewers can keep this shape, suggest a different one, or reject the entire approach without me having silently picked it.Real e2e validation
Verified end-to-end on
airflow standalone(scheduler + API server + LocalExecutor + sqlite):Before this PR (only #66402 applied), the same DAG ran the worker through the catch block but the DB row landed at
failedbecause the supervisor → API path rejected the state. After this PR, the row lands atcheckpointed.Testing
test_task_instance_checkpointintask-sdk/tests/task_sdk/api/test_client.pyverifies the client's PATCH carriesstate="checkpointed".TestTaskCheckpointed.test_run_returns_checkpointed_statecontinues to cover the worker → state-return contract from AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception #66402.Open questions for the AIP discussion (deferred)
checkpoint_datapersistence. This PR drops the operator-supplied payload. Where should it land — XCom, a new TaskInstance JSON column, a separate metadata table?checkpointed, should the scheduler auto-transition it back toqueuedafter a delay (likeup_for_retry), or only resume on explicit user action?checkpointedupstream wait per existingState.unfinishedsemantics. Shouldall_donetreatcheckpointedas "done" for fan-out scenarios?I deliberately did not pick on any of these — happy to follow up once the AIP discussion converges.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.