Skip to content

AIP-96: wire CHECKPOINTED through supervisor → API server → DB#66445

Draft
1fanwang wants to merge 12 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-supervisor-wiring
Draft

AIP-96: wire CHECKPOINTED through supervisor → API server → DB#66445
1fanwang wants to merge 12 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-supervisor-wiring

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 5, 2026

Stacked on #66402 (CHECKPOINTED state + AirflowTaskCheckpointed exception). The foundation PR reached the message boundary — the worker constructs TaskState(state=CHECKPOINTED) and the supervisor accepts it — but the DB row never transitioned because:

  1. The supervisor's STATES_SENT_DIRECTLY list didn't include CHECKPOINTED, so it fell through to client.task_instances.finish() which is constrained to terminal states.
  2. The API server's state-update endpoint had explicit branches for terminal / retry / success / deferred / reschedule payloads but no branch for TITargetStatePayload (the discriminated-union catch-all that CHECKPOINTED routes through).

This PR closes both gaps:

  • Supervisor: CHECKPOINTED is added to STATES_SENT_DIRECTLY. The TaskState handler calls a new client.task_instances.checkpoint() method when the worker reports CHECKPOINTED.
  • Client: new checkpoint() method PATCHes /task-instances/{id}/state with TITargetStatePayload(state=CHECKPOINTED) — the same shape DEFERRED and UP_FOR_RESCHEDULE already use.
  • API server: a new branch in the state-update handler accepts TITargetStatePayload and updates the row to checkpointed without additional bookkeeping.

checkpoint_data persistence is intentionally not added in this PR. The right shape (XCom-backed, new TaskInstance JSON column, separate metadata table) is the open AIP-96 question this PR deliberately leaves for the AIP discussion. Opening this draft so the discussion has a concrete state-transition wiring to react to — reviewers can keep this shape, suggest a different one, or reject the entire approach without me having silently picked it.

Real e2e validation

Verified end-to-end on airflow standalone (scheduler + API server + LocalExecutor + sqlite):

worker  | Task checkpointed; reporting CHECKPOINTED state.
\` curl PATCH /task-instances/{id}/state \`
api     | (new branch updates state=checkpointed)
sqlite> SELECT state FROM task_instance WHERE dag_id='e2e_checkpointed';
        state = checkpointed

Before this PR (only #66402 applied), the same DAG ran the worker through the catch block but the DB row landed at failed because the supervisor → API path rejected the state. After this PR, the row lands at checkpointed.

Testing

Open questions for the AIP discussion (deferred)

  1. checkpoint_data persistence. This PR drops the operator-supplied payload. Where should it land — XCom, a new TaskInstance JSON column, a separate metadata table?
  2. Auto-resume vs manual-resume-only. Once a row is at checkpointed, should the scheduler auto-transition it back to queued after a delay (like up_for_retry), or only resume on explicit user action?
  3. Trigger-rule treatment. Downstream tasks of a checkpointed upstream wait per existing State.unfinished semantics. Should all_done treat checkpointed as "done" for fan-out scenarios?

I deliberately did not pick on any of these — happy to follow up once the AIP discussion converges.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.

1fanwang and others added 6 commits May 5, 2026 03:05
…ception

Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the
operator-author API surface can be discussed against running code.

  - CHECKPOINTED added to TaskInstanceState (intermediate state, in
    State.unfinished, lightyellow in the UI)
  - AirflowTaskCheckpointed exception with optional checkpoint_data
    payload, mirroring the AirflowRescheduleException pattern
    (serializable, single domain payload)
  - run() catches the exception and reports CHECKPOINTED state to the
    supervisor

Persistence of checkpoint_data, scheduler auto-resume semantics, the
listener hook (apache#66410), and downstream trigger-rule integration are
intentionally out of scope here so each can be discussed separately.
The CHECKPOINTED state introduced in apache#66402 reached the message boundary
but the DB row never transitioned because the supervisor's
STATES_SENT_DIRECTLY list and the API server's state-update endpoint
both rejected it. This PR closes the wiring:

  - Supervisor: CHECKPOINTED is added to STATES_SENT_DIRECTLY so it
    skips the terminal-state finish() path. The TaskState handler
    now calls a new client.task_instances.checkpoint() method.
  - Client: new checkpoint() method PATCHes /task-instances/{id}/state
    with TITargetStatePayload(state=CHECKPOINTED).
  - API server: a new branch in the state-update handler accepts
    TITargetStatePayload (the discriminated-union 'other' tag) and
    updates the row to checkpointed without additional bookkeeping.

Verified end-to-end on airflow standalone — a DAG that raises
AirflowTaskCheckpointed now lands the DB row at state='checkpointed'
(was 'failed' before).

checkpoint_data persistence is intentionally not added; the right shape
(XCom-backed, new TaskInstance JSON column, separate metadata table) is
the open AIP-96 question this PR deliberately leaves to the AIP discussion.
1fanwang added 5 commits May 5, 2026 13:45
mypy on task-sdk: TITargetStatePayload.state expects IntermediateTIState,
not the broader TaskInstanceState. Both enums have CHECKPOINTED with the
same string value 'checkpointed'.

Also regenerated the openapi specs and airflowctl datamodel to include
the CHECKPOINTED state from PR-E (apache#66402).
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set
(apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring).
NOT FOR MERGE.

Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of
DatabricksSubmitRunOperator that survives worker disruption by:
  1. Persisting self.run_id to AIP-103 task_state after submit_run.
  2. Reading prior run_id on next attempt and reconnecting (no resubmit).
  3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting
     on_kill cancel the Databricks run.

Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is
introduced by apache#66402 and not yet on main; this branch is rebased onto
main so the diff shows only the new file. The file is self-explanatory
as a design illustration; CI on this branch will not pass alone.
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
Layer 1: test_resumable_databricks_demo.py (~145 lines)
  - Mock-based pytest (no real Databricks workspace).
  - Asserts the resume contract:
    - First execute(): submit_run called, run_id persisted.
    - Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
    - Second execute() (after CHECKPOINTED): submit_run NOT called; prior
      run_id reused; task_state cleared on success.
    - on_kill on the resumable variant does NOT cancel the Databricks run.

Layer 2: test_aip96_resumable_pattern.py (~155 lines)
  - Provider-agnostic, in-process simulator.
  - Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
    per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
    repeat-disruption-cycles preserve external_id.

All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant