From 31e50c785046bc30a3f8c8ddf2122c27996741a5 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 03:05:13 -0700 Subject: [PATCH 01/12] AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the operator-author API surface can be discussed against running code. - CHECKPOINTED added to TaskInstanceState (intermediate state, in State.unfinished, lightyellow in the UI) - AirflowTaskCheckpointed exception with optional checkpoint_data payload, mirroring the AirflowRescheduleException pattern (serializable, single domain payload) - run() catches the exception and reports CHECKPOINTED state to the supervisor Persistence of checkpoint_data, scheduler auto-resume semantics, the listener hook (#66410), and downstream trigger-rule integration are intentionally out of scope here so each can be discussed separately. --- .../newsfragments/66402.significant.rst | 25 +++++++++++++++ airflow-core/src/airflow/utils/state.py | 5 +++ task-sdk/src/airflow/sdk/exceptions.py | 27 ++++++++++++++++ .../airflow/sdk/execution_time/task_runner.py | 9 ++++++ .../execution_time/test_task_runner.py | 31 +++++++++++++++++++ 5 files changed, 97 insertions(+) create mode 100644 airflow-core/newsfragments/66402.significant.rst diff --git a/airflow-core/newsfragments/66402.significant.rst b/airflow-core/newsfragments/66402.significant.rst new file mode 100644 index 0000000000000..5569c5b3bfa04 --- /dev/null +++ b/airflow-core/newsfragments/66402.significant.rst @@ -0,0 +1,25 @@ +AIP-96 (Resumable Operators) foundation — adds the ``CHECKPOINTED`` task +instance state and the ``AirflowTaskCheckpointed`` exception so an operator +can signal "I have reached a stable checkpoint and intend to pause": + +.. code-block:: python + + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + class ResumablePythonOperator(PythonOperator): + def execute(self, context): + for step in range(self.total_steps): + if self.should_pause(): + raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) + self.do_work(step) + +The worker catches the exception and reports ``CHECKPOINTED`` state. +``CHECKPOINTED`` is an intermediate state and is included in +``State.unfinished``. + +This change intentionally ships only the vocabulary plus the +worker-side state transition. ``checkpoint_data`` persistence, +scheduler auto-resume semantics, the listener hook, and downstream +trigger-rule integration are deferred to follow-up PRs so the API +shape can be discussed against a real artifact before committing to a +single resumption policy. diff --git a/airflow-core/src/airflow/utils/state.py b/airflow-core/src/airflow/utils/state.py index 332efb105533d..890043f094656 100644 --- a/airflow-core/src/airflow/utils/state.py +++ b/airflow-core/src/airflow/utils/state.py @@ -56,6 +56,7 @@ class IntermediateTIState(str, Enum): UP_FOR_RETRY = "up_for_retry" UP_FOR_RESCHEDULE = "up_for_reschedule" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" def __str__(self) -> str: return self.value @@ -87,6 +88,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream deps failed SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger + CHECKPOINTED = IntermediateTIState.CHECKPOINTED # Operator paused at a stable checkpoint def __str__(self) -> str: return self.value @@ -130,6 +132,7 @@ class State: UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED SKIPPED = TaskInstanceState.SKIPPED DEFERRED = TaskInstanceState.DEFERRED + CHECKPOINTED = TaskInstanceState.CHECKPOINTED finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) @@ -157,6 +160,7 @@ class State: TaskInstanceState.REMOVED: "lightgrey", TaskInstanceState.SCHEDULED: "tan", TaskInstanceState.DEFERRED: "mediumpurple", + TaskInstanceState.CHECKPOINTED: "lightyellow", } @classmethod @@ -200,6 +204,7 @@ def color_fg(cls, state): TaskInstanceState.UP_FOR_RETRY, TaskInstanceState.UP_FOR_RESCHEDULE, TaskInstanceState.DEFERRED, + TaskInstanceState.CHECKPOINTED, ] ) """ diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index 7d42dad5d8502..cbde73db11224 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -165,6 +165,33 @@ class AirflowSkipException(AirflowException): """Raise when the task should be skipped.""" +class AirflowTaskCheckpointed(AirflowException): + """ + Raise when the operator has reached a stable checkpoint and intends to pause. + + The worker reports the task as CHECKPOINTED and persists the optional + ``checkpoint_data`` so a subsequent run can resume from the same point. + + This is the AIP-96 (Resumable Operators) foundation primitive. Auto-resume + semantics, scheduler treatment, and downstream trigger-rule integration are + intentionally deferred to follow-ups so the API shape can be discussed + without committing to a single resumption policy. + + :param checkpoint_data: Arbitrary serializable payload representing the + operator's resume point. Persistence and resume wiring are out of scope + for this foundation; the parameter exists so listeners and operator + authors can iterate against the final shape. + """ + + def __init__(self, checkpoint_data=None): + super().__init__() + self.checkpoint_data = checkpoint_data + + def serialize(self): + cls = self.__class__ + return f"{cls.__module__}.{cls.__name__}", (), {"checkpoint_data": self.checkpoint_data} + + class AirflowTaskTerminated(BaseException): """Raise when the task execution is terminated.""" diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..905d4427f3746 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1342,6 +1342,15 @@ def _on_term(signum, frame): rendered_map_index=ti.rendered_map_index, ) state = TaskInstanceState.SKIPPED + except AirflowTaskCheckpointed as checkpoint: + log.info("Task checkpointed; reporting CHECKPOINTED state.") + msg = TaskState( + state=TaskInstanceState.CHECKPOINTED, + end_date=datetime.now(tz=timezone.utc), + rendered_map_index=ti.rendered_map_index, + ) + state = TaskInstanceState.CHECKPOINTED + ti._checkpoint_data = checkpoint.checkpoint_data # type: ignore[attr-defined] except AirflowRescheduleException as reschedule: log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE") msg = RescheduleTask( diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 630aff9094ed1..03b290eb05e73 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4758,6 +4758,37 @@ def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only( mock_supervisor_comms.send.assert_any_call(msg) +class TestTaskCheckpointed: + """AIP-96 foundation: AirflowTaskCheckpointed -> CHECKPOINTED state.""" + + @pytest.mark.parametrize( + "checkpoint_data", + [ + pytest.param(None, id="no-payload"), + pytest.param({"step": 3, "iterator_offset": 1024}, id="dict-payload"), + pytest.param([1, 2, 3], id="list-payload"), + ], + ) + def test_run_returns_checkpointed_state( + self, checkpoint_data, create_runtime_ti, mock_supervisor_comms + ): + """``run()`` reports CHECKPOINTED when the operator raises + ``AirflowTaskCheckpointed``. The exception's ``checkpoint_data`` payload + is preserved on the exception object regardless of shape; persistence + and resume wiring are out of scope for the foundation PR.""" + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + def _raise_checkpointed(): + raise AirflowTaskCheckpointed(checkpoint_data=checkpoint_data) + + task = PythonOperator(task_id="checkpointed_task", python_callable=_raise_checkpointed) + ti = create_runtime_ti(task=task) + + state, _msg, _error = run(ti, context=ti.get_template_context(), log=mock.MagicMock()) + + assert state == TaskInstanceState.CHECKPOINTED + + class TestTaskInstanceMetrics: def test_ti_start_metric_emitted(self, create_runtime_ti, mock_supervisor_comms): """Test that ti.start metric is emitted at the beginning of task.""" From 3dd7d637d4c5f4754772c3d02b7442589449316b Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:18:26 -0700 Subject: [PATCH 02/12] fix: import AirflowTaskCheckpointed in run() so the except clause resolves --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 905d4427f3746..bfbf124611fb5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1248,6 +1248,7 @@ def run( AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, + AirflowTaskCheckpointed, AirflowTaskTerminated, DagRunTriggerException, DownstreamTasksSkipped, From 9c5ffa2e3f54a00c4a78f7a31d79d7933b9b7a60 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:22:39 -0700 Subject: [PATCH 03/12] fix: add CHECKPOINTED to task-sdk datamodel enum (mirrors airflow.utils.state) --- task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index b5b100154c389..4a0cd29b86dd9 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -365,6 +365,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = "upstream_failed" SKIPPED = "skipped" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class TaskStatePutBody(BaseModel): From 7e25b73a4faaf09e99487ad781bbf1970f6ab326 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:24:45 -0700 Subject: [PATCH 04/12] fix: allow CHECKPOINTED in TaskState supervisor message Literal --- task-sdk/src/airflow/sdk/execution_time/comms.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py index 87c7881333ad4..ded750db1840c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/comms.py +++ b/task-sdk/src/airflow/sdk/execution_time/comms.py @@ -754,6 +754,7 @@ class TaskState(BaseModel): TaskInstanceState.FAILED, TaskInstanceState.SKIPPED, TaskInstanceState.REMOVED, + TaskInstanceState.CHECKPOINTED, ] end_date: datetime | None = None type: Literal["TaskState"] = "TaskState" From ea29e2a7d6c02525f1c660da9af4cf2f095f4dc5 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:27:43 -0700 Subject: [PATCH 05/12] fix: add CHECKPOINTED to task-sdk IntermediateTIState datamodel --- task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 4a0cd29b86dd9..306cd386c38ac 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -198,6 +198,7 @@ class IntermediateTIState(str, Enum): UP_FOR_RETRY = "up_for_retry" UP_FOR_RESCHEDULE = "up_for_reschedule" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class PrevSuccessfulDagRunResponse(BaseModel): From 89c2edb81d94e8e1be786adad63eee0d86c79bf9 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:34:55 -0700 Subject: [PATCH 06/12] =?UTF-8?q?AIP-96:=20wire=20CHECKPOINTED=20through?= =?UTF-8?q?=20supervisor=20=E2=86=92=20API=20server=20=E2=86=92=20DB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CHECKPOINTED state introduced in #66402 reached the message boundary but the DB row never transitioned because the supervisor's STATES_SENT_DIRECTLY list and the API server's state-update endpoint both rejected it. This PR closes the wiring: - Supervisor: CHECKPOINTED is added to STATES_SENT_DIRECTLY so it skips the terminal-state finish() path. The TaskState handler now calls a new client.task_instances.checkpoint() method. - Client: new checkpoint() method PATCHes /task-instances/{id}/state with TITargetStatePayload(state=CHECKPOINTED). - API server: a new branch in the state-update handler accepts TITargetStatePayload (the discriminated-union 'other' tag) and updates the row to checkpointed without additional bookkeeping. Verified end-to-end on airflow standalone — a DAG that raises AirflowTaskCheckpointed now lands the DB row at state='checkpointed' (was 'failed' before). checkpoint_data persistence is intentionally not added; the right shape (XCom-backed, new TaskInstance JSON column, separate metadata table) is the open AIP-96 question this PR deliberately leaves to the AIP discussion. --- .../aip96-supervisor-wiring.feature.rst | 18 ++++++++++++++++++ .../execution_api/routes/task_instances.py | 7 +++++++ task-sdk/src/airflow/sdk/api/client.py | 13 +++++++++++++ .../airflow/sdk/execution_time/supervisor.py | 7 +++++++ task-sdk/tests/task_sdk/api/test_client.py | 13 +++++++++++++ 5 files changed, 58 insertions(+) create mode 100644 airflow-core/newsfragments/aip96-supervisor-wiring.feature.rst diff --git a/airflow-core/newsfragments/aip96-supervisor-wiring.feature.rst b/airflow-core/newsfragments/aip96-supervisor-wiring.feature.rst new file mode 100644 index 0000000000000..d77d085d6a25b --- /dev/null +++ b/airflow-core/newsfragments/aip96-supervisor-wiring.feature.rst @@ -0,0 +1,18 @@ +Wires ``CHECKPOINTED`` (introduced in #66402) through the supervisor → +API server path so the DB row actually transitions to ``checkpointed`` +when an operator raises ``AirflowTaskCheckpointed``. + +- ``CHECKPOINTED`` is added to ``STATES_SENT_DIRECTLY`` so the supervisor + does not route it through the terminal-state ``finish()`` endpoint. +- The supervisor's ``TaskState`` handler calls a new + ``client.task_instances.checkpoint()`` method when the worker reports + ``CHECKPOINTED``. +- The new client method PATCHes ``/task-instances/{id}/state`` with + ``TITargetStatePayload(state=CHECKPOINTED)`` — the same shape + ``DEFERRED`` and ``UP_FOR_RESCHEDULE`` already use. + +``checkpoint_data`` persistence is intentionally not added in this PR. +The right shape (XCom-backed, new TaskInstance JSON column, separate +metadata table) is the open AIP-96 question; this PR ships the +state-transition wiring so the DB lands at ``checkpointed`` while the +persistence question is debated. diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index 0fccd6ad1b61c..bb96e17b0eaa5 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -635,6 +635,13 @@ def _create_ti_state_update_query_and_update_state( # clear the next_method and next_kwargs so that none of the retries pick them up updated_state = TaskInstanceState.UP_FOR_RESCHEDULE query = query.values(state=updated_state, next_method=None, next_kwargs=None) + elif type(ti_patch_payload).__name__ == "TITargetStatePayload": + # Generic target-state transition (e.g. CHECKPOINTED). No additional + # bookkeeping beyond setting the state — persistence of any + # state-specific payload (e.g. checkpoint_data) is the open AIP-96 + # question not addressed here. + updated_state = TaskInstanceState(ti_patch_payload.state.value) + query = query.values(state=updated_state) else: raise ValueError(f"Unexpected Payload Type {type(ti_patch_payload)}") diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index 54927794bf17e..025fdd1594920 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -281,6 +281,19 @@ def reschedule(self, id: uuid.UUID, msg: RescheduleTask): # Create a reschedule state payload from msg self.client.patch(f"task-instances/{id}/state", content=body.model_dump_json()) + def checkpoint( + self, + id: uuid.UUID, + end_date: datetime | None = None, + rendered_map_index: str | None = None, + ) -> None: + """Tell the API server that this TI has reached a stable checkpoint and paused.""" + from airflow.sdk.api.datamodels._generated import TITargetStatePayload + + del end_date, rendered_map_index # reserved for future persistence; current API server does not store these + body = TITargetStatePayload(state=TaskInstanceState.CHECKPOINTED) + self.client.patch(f"task-instances/{id}/state", content=body.model_dump_json()) + def heartbeat(self, id: uuid.UUID, pid: int): body = TIHeartbeatInfo(pid=pid, hostname=get_hostname()) self.client.put(f"task-instances/{id}/heartbeat", content=body.model_dump_json()) diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 375c5a9e30b8e..5ef521990d2b0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -166,6 +166,7 @@ TaskInstanceState.UP_FOR_RESCHEDULE, TaskInstanceState.UP_FOR_RETRY, TaskInstanceState.SUCCESS, + TaskInstanceState.CHECKPOINTED, SERVER_TERMINATED, ] @@ -1397,6 +1398,12 @@ def _handle_request(self, msg: ToSupervisor, log: FilteringBoundLogger, req_id: self._terminal_state = msg.state self._task_end_time_monotonic = time.monotonic() self._rendered_map_index = msg.rendered_map_index + if msg.state == TaskInstanceState.CHECKPOINTED: + self.client.task_instances.checkpoint( + id=self.id, + end_date=msg.end_date, + rendered_map_index=self._rendered_map_index, + ) elif isinstance(msg, SucceedTask): self._terminal_state = msg.state self._task_end_time_monotonic = time.monotonic() diff --git a/task-sdk/tests/task_sdk/api/test_client.py b/task-sdk/tests/task_sdk/api/test_client.py index 26e2a7e66bb6e..38c94481cdcd1 100644 --- a/task-sdk/tests/task_sdk/api/test_client.py +++ b/task-sdk/tests/task_sdk/api/test_client.py @@ -482,6 +482,19 @@ def handle_request(request: httpx.Request) -> httpx.Response: ) client.task_instances.reschedule(ti_id, msg) + def test_task_instance_checkpoint(self): + ti_id = uuid6.uuid7() + + def handle_request(request: httpx.Request) -> httpx.Response: + if request.url.path == f"/task-instances/{ti_id}/state": + actual_body = json.loads(request.read()) + assert actual_body["state"] == "checkpointed" + return httpx.Response(status_code=204) + return httpx.Response(status_code=400, json={"detail": "Bad Request"}) + + client = make_client(transport=httpx.MockTransport(handle_request)) + client.task_instances.checkpoint(ti_id) + def test_task_instance_up_for_retry(self): ti_id = uuid6.uuid7() From e0fdd4f3df85e9252398de01f746657e80110a75 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:35:51 -0700 Subject: [PATCH 07/12] Rename newsfragment to match PR number --- .../{aip96-supervisor-wiring.feature.rst => 66445.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{aip96-supervisor-wiring.feature.rst => 66445.feature.rst} (100%) diff --git a/airflow-core/newsfragments/aip96-supervisor-wiring.feature.rst b/airflow-core/newsfragments/66445.feature.rst similarity index 100% rename from airflow-core/newsfragments/aip96-supervisor-wiring.feature.rst rename to airflow-core/newsfragments/66445.feature.rst From 11b7d17441e66d5d4d1ed19e906ca4d2ad70891a Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:15 -0700 Subject: [PATCH 08/12] fix: newsfragment must have empty second line (lint rule) --- airflow-core/newsfragments/66445.feature.rst | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/airflow-core/newsfragments/66445.feature.rst b/airflow-core/newsfragments/66445.feature.rst index d77d085d6a25b..6eeafa360664e 100644 --- a/airflow-core/newsfragments/66445.feature.rst +++ b/airflow-core/newsfragments/66445.feature.rst @@ -1,6 +1,4 @@ -Wires ``CHECKPOINTED`` (introduced in #66402) through the supervisor → -API server path so the DB row actually transitions to ``checkpointed`` -when an operator raises ``AirflowTaskCheckpointed``. +Wire ``CHECKPOINTED`` (introduced in #66402) through the supervisor → API server path so the DB row transitions to ``checkpointed`` when an operator raises ``AirflowTaskCheckpointed``. - ``CHECKPOINTED`` is added to ``STATES_SENT_DIRECTLY`` so the supervisor does not route it through the terminal-state ``finish()`` endpoint. @@ -11,8 +9,8 @@ when an operator raises ``AirflowTaskCheckpointed``. ``TITargetStatePayload(state=CHECKPOINTED)`` — the same shape ``DEFERRED`` and ``UP_FOR_RESCHEDULE`` already use. -``checkpoint_data`` persistence is intentionally not added in this PR. -The right shape (XCom-backed, new TaskInstance JSON column, separate -metadata table) is the open AIP-96 question; this PR ships the -state-transition wiring so the DB lands at ``checkpointed`` while the -persistence question is debated. +``checkpoint_data`` persistence is intentionally not added in this PR. The +right shape (XCom-backed, new TaskInstance JSON column, separate metadata +table) is the open AIP-96 question; this PR ships the state-transition +wiring so the DB lands at ``checkpointed`` while the persistence question +is debated. From c3e44909e3adae548af529e0337f8cf82e1fd846 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:30:14 -0700 Subject: [PATCH 09/12] fix newsfragment type: feature -> significant (multi-line bodies require significant type) --- .../newsfragments/{66445.feature.rst => 66445.significant.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{66445.feature.rst => 66445.significant.rst} (100%) diff --git a/airflow-core/newsfragments/66445.feature.rst b/airflow-core/newsfragments/66445.significant.rst similarity index 100% rename from airflow-core/newsfragments/66445.feature.rst rename to airflow-core/newsfragments/66445.significant.rst From 52808e01083b33300be071739ac05feb6df06c0b Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:33:03 -0700 Subject: [PATCH 10/12] fix: ruff format on supervisor client + test_task_runner.py --- task-sdk/src/airflow/sdk/api/client.py | 5 ++++- task-sdk/tests/task_sdk/execution_time/test_task_runner.py | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index 025fdd1594920..2bc98a06c0355 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -290,7 +290,10 @@ def checkpoint( """Tell the API server that this TI has reached a stable checkpoint and paused.""" from airflow.sdk.api.datamodels._generated import TITargetStatePayload - del end_date, rendered_map_index # reserved for future persistence; current API server does not store these + del ( + end_date, + rendered_map_index, + ) # reserved for future persistence; current API server does not store these body = TITargetStatePayload(state=TaskInstanceState.CHECKPOINTED) self.client.patch(f"task-instances/{id}/state", content=body.model_dump_json()) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 03b290eb05e73..4a6a68165fd3c 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4769,9 +4769,7 @@ class TestTaskCheckpointed: pytest.param([1, 2, 3], id="list-payload"), ], ) - def test_run_returns_checkpointed_state( - self, checkpoint_data, create_runtime_ti, mock_supervisor_comms - ): + def test_run_returns_checkpointed_state(self, checkpoint_data, create_runtime_ti, mock_supervisor_comms): """``run()`` reports CHECKPOINTED when the operator raises ``AirflowTaskCheckpointed``. The exception's ``checkpoint_data`` payload is preserved on the exception object regardless of shape; persistence From 828a16252f7efc92b81efd85729ec2e0bd8ba023 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:34:56 -0700 Subject: [PATCH 11/12] remove PR-E's newsfragment (this branch stacks on #66402 + #66410; their newsfragments live there) --- .../newsfragments/66402.significant.rst | 25 ------------------- 1 file changed, 25 deletions(-) delete mode 100644 airflow-core/newsfragments/66402.significant.rst diff --git a/airflow-core/newsfragments/66402.significant.rst b/airflow-core/newsfragments/66402.significant.rst deleted file mode 100644 index 5569c5b3bfa04..0000000000000 --- a/airflow-core/newsfragments/66402.significant.rst +++ /dev/null @@ -1,25 +0,0 @@ -AIP-96 (Resumable Operators) foundation — adds the ``CHECKPOINTED`` task -instance state and the ``AirflowTaskCheckpointed`` exception so an operator -can signal "I have reached a stable checkpoint and intend to pause": - -.. code-block:: python - - from airflow.sdk.exceptions import AirflowTaskCheckpointed - - class ResumablePythonOperator(PythonOperator): - def execute(self, context): - for step in range(self.total_steps): - if self.should_pause(): - raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) - self.do_work(step) - -The worker catches the exception and reports ``CHECKPOINTED`` state. -``CHECKPOINTED`` is an intermediate state and is included in -``State.unfinished``. - -This change intentionally ships only the vocabulary plus the -worker-side state transition. ``checkpoint_data`` persistence, -scheduler auto-resume semantics, the listener hook, and downstream -trigger-rule integration are deferred to follow-up PRs so the API -shape can be discussed against a real artifact before committing to a -single resumption policy. From 0667f297d46d69bd84802939127bebe3184cf6da Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 21:53:44 -0700 Subject: [PATCH 12/12] fix: use IntermediateTIState.CHECKPOINTED + regenerate openapi specs mypy on task-sdk: TITargetStatePayload.state expects IntermediateTIState, not the broader TaskInstanceState. Both enums have CHECKPOINTED with the same string value 'checkpointed'. Also regenerated the openapi specs and airflowctl datamodel to include the CHECKPOINTED state from PR-E (#66402). --- .../src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml | 1 + .../api_fastapi/core_api/openapi/v2-rest-api-generated.yaml | 1 + airflow-ctl/src/airflowctl/api/datamodels/generated.py | 1 + .../airflow/providers/edge3/worker_api/v2-edge-generated.yaml | 1 + task-sdk/src/airflow/sdk/api/client.py | 4 ++-- 5 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 2983263bbc59b..bdf69e69be527 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -3546,6 +3546,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5ed96855c24ac..64a120d893c37 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -14445,6 +14445,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in. diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 276c8699de058..d549d78711481 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -841,6 +841,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = "upstream_failed" SKIPPED = "skipped" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class TaskInstancesBatchBody(BaseModel): diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml index 01c8149d1dad8..e894eda0837dd 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml @@ -1274,6 +1274,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in. diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py index 2bc98a06c0355..e4c8f61e0b779 100644 --- a/task-sdk/src/airflow/sdk/api/client.py +++ b/task-sdk/src/airflow/sdk/api/client.py @@ -288,13 +288,13 @@ def checkpoint( rendered_map_index: str | None = None, ) -> None: """Tell the API server that this TI has reached a stable checkpoint and paused.""" - from airflow.sdk.api.datamodels._generated import TITargetStatePayload + from airflow.sdk.api.datamodels._generated import IntermediateTIState, TITargetStatePayload del ( end_date, rendered_map_index, ) # reserved for future persistence; current API server does not store these - body = TITargetStatePayload(state=TaskInstanceState.CHECKPOINTED) + body = TITargetStatePayload(state=IntermediateTIState.CHECKPOINTED) self.client.patch(f"task-instances/{id}/state", content=body.model_dump_json()) def heartbeat(self, id: uuid.UUID, pid: int):