Skip to content

Add on_task_instance_checkpointed listener hook#66410

Draft
1fanwang wants to merge 17 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-on-task-instance-checkpointed
Draft

Add on_task_instance_checkpointed listener hook#66410
1fanwang wants to merge 17 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-on-task-instance-checkpointed

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 5, 2026

Stacked on top of #66402 (CHECKPOINTED state foundation). Adds the listener hook so plugin authors observe checkpoint events alongside running / success / failed / skipped:

@hookimpl
def on_task_instance_checkpointed(previous_state, task_instance, checkpoint_data):
    ...

The hook fires from finalize() when run() catches AirflowTaskCheckpointed and reports CHECKPOINTED. The operator-supplied checkpoint_data is delivered to the listener — payload-to-supervisor wiring is a separate question I left out of the foundation PR.

Open to feedback on the checkpoint_data plumbing — currently stashed on the runtime ti in the catch block and read in finalize(), which works but is a bit hidden. The other shape would be a fourth return from run() or threading it through the supervisor message; happy to switch.

Testing

  • New parametrized test test_listener_receives_checkpoint_data verifies the listener is called with the operator-supplied payload (None or dict).
  • The PR-66402 test_run_returns_checkpointed_state continues to cover the upstream state-transition contract.

Marking draft because #66402 needs to merge first.

E2E validation

=== hookspec ===
  on_task_instance_checkpointed(previous_state, task_instance, checkpoint_data)

=== runtime ===
  caller: hook.on_task_instance_checkpointed(..., checkpoint_data={"step": 5, "offset": 1024})
  result: ('got', {'step': 5, 'offset': 1024})

Listener author signature must declare checkpoint_data without a default — pluggy treats the impl's default as authoritative and silently overrides the value the caller passed (the same gotcha I documented in #66405).

Integrated mega-branch validation (all 7 PRs composed)

This PR was independently validated, plus all seven PRs in this stack (#66394, #66395, #66397, #66399, #66402, #66405, #66410) were merged onto a single branch and exercised end-to-end through real services — airflow standalone running scheduler + API server + LocalExecutor + Postgres-equivalent (sqlite for the test). A single listener plugin declaring every new hook and parameter was registered, then 5 DAGs covering every state-transition path were triggered + a manual-set-state PATCH via the public API was issued. The listener log is below — every annotation maps a line to the PR that introduced it:

running   prev=QUEUED    msg=started               task=ok_task                                        ← PR-A msg arg
success   prev=RUNNING   msg=success               task=ok_task                                        ← PR-A
running   prev=QUEUED    msg=started               task=boom_task
failed    prev=RUNNING   msg=failed                task=boom_task   error_type=ValueError   fd=None    ← PR-A + PR-D + PR-F kwarg
running   prev=QUEUED    msg=started               task=skip_task
skipped   prev=RUNNING   msg=skipped               task=skip_task                                      ← PR-A skipped path
running   prev=QUEUED    msg=started               task=retry_task
failed    prev=RUNNING   msg=up_for_retry          task=retry_task  error_type=ValueError              ← PR-A retry-vs-terminal
running   prev=QUEUED    msg=started               task=retry_task    (try 2 of 2)
failed    prev=RUNNING   msg=failed                task=retry_task  error_type=ValueError
running   prev=QUEUED    msg=started               task=checkpoint_task
checkpointed prev=RUNNING                          task=checkpoint_task  checkpoint_data={'step': 5,
                                                                          'iterator_offset': 1024}     ← PR-E + PR-G

--- BEGIN MANUAL SET (PATCH /api/v2/.../taskInstances/ok_task new_state=failed) ---
failed    prev=None      msg=manually_set_to_failed task=ok_task    error_type=RuntimeError   fd=None  ← PR-D RuntimeError wrap
                                                                                                          (would be `str` on the PR-A-only branch)

What this validates jointly:

PR Surface Evidence in log
#66394 (msg arg) every TI hook has msg=... 6 canonical values fire (started, success, failed, skipped, up_for_retry, manually_set_to_failed)
#66395 (hook-name log, TI) logs identify the failing hook tested separately with throwing listener — see PR body
#66397 (hook-name log, rest) lifecycle / DagRun / asset surfaces tested separately with throwing listener — see PR body
#66399 (tighten error type) error: BaseException | None manual-set path delivers RuntimeError (was str on PR-A alone)
#66402 (CHECKPOINTED state) worker catches AirflowTaskCheckpointed running → checkpointed transition observed at the listener and at the supervisor message boundary
#66405 (FailureDetails) listener can declare failure_details kwarg failure_details=None flowing through every failure (no executor populates yet)
#66410 (on_task_instance_checkpointed) new hook fires with payload checkpointed task=checkpoint_task checkpoint_data={'step': 5, ...}

Repro

# Combine all 7 branches onto a mega branch (resolve trivial overlap on the
# spec file's failure hook signature — error + msg + failure_details kwargs
# in one signature) and install editable:
pip install -e shared/listeners -e task-sdk -e airflow-core
AIRFLOW__CORE__EXECUTOR=LocalExecutor airflow standalone &

# Drop the recording listener (declares all 5 hooks including the new
# checkpointed one) into $AIRFLOW_HOME/plugins/, drop 5 DAGs into dags/
# (success / failed / skipped / retry-then-fail / checkpointed), trigger them.
for dag in e2e_success e2e_failed e2e_skipped e2e_retry_then_fail e2e_checkpointed; do
  airflow dags trigger $dag
done

# Then PATCH a state via the public API to exercise the manual path.

Bugs surfaced and fixed during this validation

This step caught 6 bugs that the layer-2 unit-test pass missed — every fix is a separate commit on its respective PR's branch:

Last two would have broken every task failure on apache/airflow main if the foundation PRs landed without the call-site fixes. The standalone-against-editable-install harness is a fast catch for this class.

Documented gap (deliberately not fixed in this stack)

task-sdk/.../supervisor.py:STATES_SENT_DIRECTLY lists the states the worker sends to the supervisor with a dedicated direct-send branch. CHECKPOINTED is not in that list, so it falls back to client.task_instances.finish() which the API server constrains to terminal states. The mega listener log shows the worker successfully logging Task checkpointed; reporting CHECKPOINTED state. and on_task_instance_checkpointed firing with the correct payload — but the DB row eventually transitions to failed because the supervisor cannot persist CHECKPOINTED through finish(). This is the AIP-96 design knob (auto-resume vs manual-resume-only) we deliberately want the discussion to settle, not silently pick. Documented in #66402.

1fanwang added 4 commits May 5, 2026 02:30
Foundation for AIP-96 (Resumable Operators). Adds the vocabulary needed
to discuss the resumable-operator API shape against a real artifact:

- ``CHECKPOINTED`` task instance state (intermediate, lightyellow in UI),
  added to ``State.unfinished`` so downstream trigger rules treat it as
  "not yet finished".
- ``AirflowTaskCheckpointed`` exception with optional
  ``checkpoint_data`` payload, mirroring the ``AirflowRescheduleException``
  pattern (serializable, accepts a single domain payload).
- Worker handler in ``run()`` that catches the exception and reports the
  ``CHECKPOINTED`` state to the supervisor.
- Parametrized test ``TestTaskCheckpointed.test_run_returns_checkpointed_state``
  covering ``checkpoint_data`` shapes (``None``, dict, list).

Intentionally deferred to follow-ups so the API shape can be discussed
against a real artifact before committing to a single policy:

- ``checkpoint_data`` persistence and resume wiring.
- Scheduler treatment of ``CHECKPOINTED`` (auto-resume after delay vs.
  manual API-driven resume only).
- New listener hook ``on_task_instance_checkpointed`` and downstream
  observer integration.
- Trigger-rule semantics for downstream tasks of a ``CHECKPOINTED``
  upstream.

Marking draft so the design discussion can happen on the PR.
Hookspec, finalize() emit, example listener handler, and test that the
operator-supplied checkpoint_data is delivered to listeners. Stacks on the
CHECKPOINTED state introduced in apache#66402; merge that one first.

The hook fires from finalize() when run() catches AirflowTaskCheckpointed
and reports CHECKPOINTED. checkpoint_data is stashed on the runtime ti
inside the catch and read inside finalize() — the supervisor message does
not carry the payload yet (separate question).
1fanwang added 2 commits May 5, 2026 03:01
…ception

Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the
operator-author API surface can be discussed against running code.

  - CHECKPOINTED added to TaskInstanceState (intermediate state, in
    State.unfinished, lightyellow in the UI)
  - AirflowTaskCheckpointed exception with optional checkpoint_data
    payload, mirroring the AirflowRescheduleException pattern
    (serializable, single domain payload)
  - run() catches the exception and reports CHECKPOINTED state to the
    supervisor

Persistence of checkpoint_data, scheduler auto-resume semantics, the
listener hook (apache#66410), and downstream trigger-rule integration are
intentionally out of scope here so each can be discussed separately.
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set
(apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring).
NOT FOR MERGE.

Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of
DatabricksSubmitRunOperator that survives worker disruption by:
  1. Persisting self.run_id to AIP-103 task_state after submit_run.
  2. Reading prior run_id on next attempt and reconnecting (no resubmit).
  3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting
     on_kill cancel the Databricks run.

Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is
introduced by apache#66402 and not yet on main; this branch is rebased onto
main so the diff shows only the new file. The file is self-explanatory
as a design illustration; CI on this branch will not pass alone.
1fanwang added a commit to 1fanwang/airflow that referenced this pull request May 6, 2026
Layer 1: test_resumable_databricks_demo.py (~145 lines)
  - Mock-based pytest (no real Databricks workspace).
  - Asserts the resume contract:
    - First execute(): submit_run called, run_id persisted.
    - Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
    - Second execute() (after CHECKPOINTED): submit_run NOT called; prior
      run_id reused; task_state cleared on success.
    - on_kill on the resumable variant does NOT cancel the Databricks run.

Layer 2: test_aip96_resumable_pattern.py (~155 lines)
  - Provider-agnostic, in-process simulator.
  - Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
    per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
    repeat-disruption-cycles preserve external_id.

All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant