Skip to content

AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception#66402

Draft
1fanwang wants to merge 8 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-checkpointed-state-foundation
Draft

AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception#66402
1fanwang wants to merge 8 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-checkpointed-state-foundation

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 5, 2026

AIP-96 (Resumable Operators) is currently in DRAFT on the cwiki. Opening
this as the smallest concrete artifact so the API shape can be discussed
against real code rather than only on the wiki — marking the PR draft for
that reason.

The minimum surface: a CHECKPOINTED task instance state (intermediate,
included in State.unfinished so trigger rules treat it as not-yet-done)
and an AirflowTaskCheckpointed exception that operators raise to signal
"I reached a stable point and want to pause":

from airflow.sdk.exceptions import AirflowTaskCheckpointed

class ResumablePythonOperator(PythonOperator):
    def execute(self, context):
        for step in range(self.total_steps):
            if self.should_pause():
                raise AirflowTaskCheckpointed(checkpoint_data={"step": step})
            self.do_work(step)

The worker catches it in run() and reports the state. Persistence of
checkpoint_data, scheduler auto-resume semantics, listener notification,
and trigger-rule integration for downstream tasks are intentionally deferred
to follow-ups so each can be discussed without committing to a single
resumption policy.

Scheduler hot-path impact

CHECKPOINTED is added to the existing State.unfinished frozenset only — trigger rule evaluation continues to do O(1) membership checks. No new branching in the scheduler loop, no new SQL, no new iteration cost over task candidates.

Scheduler treatment of CHECKPOINTED

Downstream tasks of a CHECKPOINTED upstream wait per the existing State.unfinished semantics — same shape as DEFERRED or UP_FOR_RETRY upstream. The foundation deliberately does not introduce auto-resume yet; that decision (auto-resume after delay vs manual-resume-only via API) is the single biggest design knob the AIP discussion needs to settle, and is intentionally left open here.

A few questions that probably matter more than the implementation details
above:

  • Should AirflowTaskCheckpointed be a BaseException (like
    AirflowTaskTerminated / AirflowTaskTimeout) so user
    try/except Exception blocks don't accidentally swallow it? Inherited
    from AirflowException here for symmetry with the skip/reschedule
    exceptions, but happy to flip.
  • Color (lightyellow) is a placeholder.
  • Naming: AirflowTaskCheckpointed mirrors AirflowTaskTerminated;
    CHECKPOINTED mirrors DEFERRED. Open to a different name.

Testing

  • New parametrized test in
    task-sdk/tests/task_sdk/execution_time/test_task_runner.py
    exercises run() against three checkpoint_data shapes
    (None, dict, list). Each variant raises
    AirflowTaskCheckpointed from the operator's execute, runs
    through run(), and asserts the returned state is
    TaskInstanceState.CHECKPOINTED.
  • The existing tests for AirflowSkipException /
    AirflowRescheduleException continue to pass — the new branch is
    additive and lives between the existing skip and reschedule
    branches in run()'s exception chain.

^ Add meaningful description above
Read the Pull Request Guidelines for more information.

E2E validation

=== state ===
  TaskInstanceState.CHECKPOINTED.value = 'checkpointed'
  in State.unfinished: True   (downstream trigger rules wait)
  in State.finished:   False
  color: lightyellow

=== exception ===
  AirflowTaskCheckpointed(BaseException) — extends AirflowException
  exc = AirflowTaskCheckpointed(checkpoint_data={"step": 5, "offset": 1024})
  exc.checkpoint_data == {"step": 5, "offset": 1024}
  serialize() == ('airflow.sdk.exceptions.AirflowTaskCheckpointed', (), {'checkpoint_data': {...}})

=== worker handler ===
  task_runner.run() catches AirflowTaskCheckpointed: True
  reports state=TaskInstanceState.CHECKPOINTED: True

What this doesn't verify (intentionally — those are out of scope here):

  • The supervisor message does not yet carry checkpoint_data (a follow-up). Reviewers wanting to see the listener path can look at Add on_task_instance_checkpointed listener hook #66410 (stacked PR) which fires on_task_instance_checkpointed(checkpoint_data=...) from finalize().
  • Scheduler treatment of CHECKPOINTED downstream is the wait-forever path today since nothing transitions it. Auto-resume vs. manual-only-resume is a separate question for the AIP.

E2E validation

=== state ===
  TaskInstanceState.CHECKPOINTED.value = 'checkpointed'
  in State.unfinished: True   (downstream trigger rules wait)
  in State.finished:   False
  color: lightyellow

=== exception ===
  AirflowTaskCheckpointed(BaseException) — extends AirflowException
  exc = AirflowTaskCheckpointed(checkpoint_data={"step": 5, "offset": 1024})
  exc.checkpoint_data == {"step": 5, "offset": 1024}
  serialize() == ('airflow.sdk.exceptions.AirflowTaskCheckpointed', (), {'checkpoint_data': {...}})

=== worker handler ===
  task_runner.run() catches AirflowTaskCheckpointed: True
  reports state=TaskInstanceState.CHECKPOINTED: True

What this doesn't verify (intentionally — those are out of scope here):

  • The supervisor message does not yet carry checkpoint_data (a follow-up). Reviewers wanting to see the listener path can look at Add on_task_instance_checkpointed listener hook #66410 (stacked PR) which fires on_task_instance_checkpointed(checkpoint_data=...) from finalize().
  • Scheduler treatment of CHECKPOINTED downstream is the wait-forever path today since nothing transitions it. Auto-resume vs. manual-only-resume is a separate question for the AIP.

Integrated mega-branch validation (all 7 PRs composed)

This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services — airflow standalone running scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:

running   prev=QUEUED    msg=started               task=ok_task                                        ← PR-A msg arg
success   prev=RUNNING   msg=success               task=ok_task                                        ← PR-A
running   prev=QUEUED    msg=started               task=boom_task
failed    prev=RUNNING   msg=failed                task=boom_task   error_type=ValueError   fd=None    ← PR-A + PR-D + PR-F kwarg
running   prev=QUEUED    msg=started               task=skip_task
skipped   prev=RUNNING   msg=skipped               task=skip_task                                      ← PR-A skipped path
running   prev=QUEUED    msg=started               task=retry_task
failed    prev=RUNNING   msg=up_for_retry          task=retry_task  error_type=ValueError              ← PR-A retry-vs-terminal
running   prev=QUEUED    msg=started               task=retry_task    (try 2 of 2)
failed    prev=RUNNING   msg=failed                task=retry_task  error_type=ValueError
running   prev=QUEUED    msg=started               task=checkpoint_task
checkpointed prev=RUNNING                          task=checkpoint_task  checkpoint_data={'step': 5,
                                                                          'iterator_offset': 1024}     ← PR-E + PR-G

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None      msg=manually_set_to_failed task=ok_task    error_type=RuntimeError   fd=None  ← PR-D RuntimeError wrap
                                                                                                          (would be `str` on the PR-A-only branch)

What this validates jointly:

PR Surface Evidence in log
#66394 (msg arg) every TI hook has msg=... 6 canonical values fire (started, success, failed, skipped, up_for_retry, manually_set_to_failed)
#66395 (hook-name log, TI) logs identify the failing hook tested separately with throwing listener — see PR body
#66397 (hook-name log, rest) lifecycle / DagRun / asset surfaces tested separately with throwing listener — see PR body
#66399 (tighten error type) error: BaseException | None manual-set path delivers RuntimeError (was str on PR-A alone)
#66402 (CHECKPOINTED state) worker catches AirflowTaskCheckpointed running → checkpointed transition observed at the listener and at the supervisor message boundary
#66405 (FailureDetails) listener can declare failure_details kwarg failure_details=None flowing through every failure (no executor populates yet)
#66410 (on_task_instance_checkpointed) new hook fires with payload checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}

Repro

# Combine all 7 branches onto a mega branch (resolve trivial overlap on the
# spec file's failure hook signature — error + msg + failure_details kwargs
# in one signature) and install editable:
pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &

# Drop the recording listener (declares all 5 hooks including the new
# checkpointed one) into $AIRFLOW_HOME/plugins/, drop 5 DAGs into dags/
# (success / failed / skipped / retry-then-fail / checkpointed), trigger them.
for dag in e2e_success e2e_failed e2e_skipped e2e_retry_then_fail e2e_checkpointed; do
  airflow dags trigger $dag
done

# Then PATCH a state via the public API to exercise the manual path.

Bugs surfaced and fixed during this validation

This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:

Last two would have broken every task failure on apache/airflow main if the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.

Supervisor wiring follow-up — #66445

The original gap noted here (CHECKPOINTED not in STATES_SENT_DIRECTLY, TITargetStatePayload not handled by the API server's state-update endpoint) is now addressed in stacked PR #66445. With #66445 applied, the DB row lands at state=checkpointed end-to-end through real airflow standalone services. checkpoint_data persistence and scheduler auto-resume semantics remain deliberately open for the AIP discussion.

…ception

Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the
operator-author API surface can be discussed against running code.

  - CHECKPOINTED added to TaskInstanceState (intermediate state, in
    State.unfinished, lightyellow in the UI)
  - AirflowTaskCheckpointed exception with optional checkpoint_data
    payload, mirroring the AirflowRescheduleException pattern
    (serializable, single domain payload)
  - run() catches the exception and reports CHECKPOINTED state to the
    supervisor

Persistence of checkpoint_data, scheduler auto-resume semantics, the
listener hook (apache#66410), and downstream trigger-rule integration are
intentionally out of scope here so each can be discussed separately.
@1fanwang 1fanwang force-pushed the 1fanwang/aip96-checkpointed-state-foundation branch from 882780f to 31e50c7 Compare May 5, 2026 10:05
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 5, 2026
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
mypy on task-sdk: TITargetStatePayload.state expects IntermediateTIState,
not the broader TaskInstanceState. Both enums have CHECKPOINTED with the
same string value 'checkpointed'.

Also regenerated the openapi specs and airflowctl datamodel to include
the CHECKPOINTED state from PR-E (apache#66402).
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set
(apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring).
NOT FOR MERGE.

Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of
DatabricksSubmitRunOperator that survives worker disruption by:
  1. Persisting self.run_id to AIP-103 task_state after submit_run.
  2. Reading prior run_id on next attempt and reconnecting (no resubmit).
  3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting
     on_kill cancel the Databricks run.

Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is
introduced by apache#66402 and not yet on main; this branch is rebased onto
main so the diff shows only the new file. The file is self-explanatory
as a design illustration; CI on this branch will not pass alone.
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
Layer 1: test_resumable_databricks_demo.py (~145 lines)
  - Mock-based pytest (no real Databricks workspace).
  - Asserts the resume contract:
    - First execute(): submit_run called, run_id persisted.
    - Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
    - Second execute() (after CHECKPOINTED): submit_run NOT called; prior
      run_id reused; task_state cleared on success.
    - on_kill on the resumable variant does NOT cancel the Databricks run.

Layer 2: test_aip96_resumable_pattern.py (~155 lines)
  - Provider-agnostic, in-process simulator.
  - Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
    per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
    repeat-disruption-cycles preserve external_id.

All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant