From 31e50c785046bc30a3f8c8ddf2122c27996741a5 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 03:05:13 -0700 Subject: [PATCH 1/8] AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the operator-author API surface can be discussed against running code. - CHECKPOINTED added to TaskInstanceState (intermediate state, in State.unfinished, lightyellow in the UI) - AirflowTaskCheckpointed exception with optional checkpoint_data payload, mirroring the AirflowRescheduleException pattern (serializable, single domain payload) - run() catches the exception and reports CHECKPOINTED state to the supervisor Persistence of checkpoint_data, scheduler auto-resume semantics, the listener hook (#66410), and downstream trigger-rule integration are intentionally out of scope here so each can be discussed separately. --- .../newsfragments/66402.significant.rst | 25 +++++++++++++++ airflow-core/src/airflow/utils/state.py | 5 +++ task-sdk/src/airflow/sdk/exceptions.py | 27 ++++++++++++++++ .../airflow/sdk/execution_time/task_runner.py | 9 ++++++ .../execution_time/test_task_runner.py | 31 +++++++++++++++++++ 5 files changed, 97 insertions(+) create mode 100644 airflow-core/newsfragments/66402.significant.rst diff --git a/airflow-core/newsfragments/66402.significant.rst b/airflow-core/newsfragments/66402.significant.rst new file mode 100644 index 0000000000000..5569c5b3bfa04 --- /dev/null +++ b/airflow-core/newsfragments/66402.significant.rst @@ -0,0 +1,25 @@ +AIP-96 (Resumable Operators) foundation — adds the ``CHECKPOINTED`` task +instance state and the ``AirflowTaskCheckpointed`` exception so an operator +can signal "I have reached a stable checkpoint and intend to pause": + +.. code-block:: python + + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + class ResumablePythonOperator(PythonOperator): + def execute(self, context): + for step in range(self.total_steps): + if self.should_pause(): + raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) + self.do_work(step) + +The worker catches the exception and reports ``CHECKPOINTED`` state. +``CHECKPOINTED`` is an intermediate state and is included in +``State.unfinished``. + +This change intentionally ships only the vocabulary plus the +worker-side state transition. ``checkpoint_data`` persistence, +scheduler auto-resume semantics, the listener hook, and downstream +trigger-rule integration are deferred to follow-up PRs so the API +shape can be discussed against a real artifact before committing to a +single resumption policy. diff --git a/airflow-core/src/airflow/utils/state.py b/airflow-core/src/airflow/utils/state.py index 332efb105533d..890043f094656 100644 --- a/airflow-core/src/airflow/utils/state.py +++ b/airflow-core/src/airflow/utils/state.py @@ -56,6 +56,7 @@ class IntermediateTIState(str, Enum): UP_FOR_RETRY = "up_for_retry" UP_FOR_RESCHEDULE = "up_for_reschedule" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" def __str__(self) -> str: return self.value @@ -87,6 +88,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream deps failed SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger + CHECKPOINTED = IntermediateTIState.CHECKPOINTED # Operator paused at a stable checkpoint def __str__(self) -> str: return self.value @@ -130,6 +132,7 @@ class State: UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED SKIPPED = TaskInstanceState.SKIPPED DEFERRED = TaskInstanceState.DEFERRED + CHECKPOINTED = TaskInstanceState.CHECKPOINTED finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) @@ -157,6 +160,7 @@ class State: TaskInstanceState.REMOVED: "lightgrey", TaskInstanceState.SCHEDULED: "tan", TaskInstanceState.DEFERRED: "mediumpurple", + TaskInstanceState.CHECKPOINTED: "lightyellow", } @classmethod @@ -200,6 +204,7 @@ def color_fg(cls, state): TaskInstanceState.UP_FOR_RETRY, TaskInstanceState.UP_FOR_RESCHEDULE, TaskInstanceState.DEFERRED, + TaskInstanceState.CHECKPOINTED, ] ) """ diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index 7d42dad5d8502..cbde73db11224 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -165,6 +165,33 @@ class AirflowSkipException(AirflowException): """Raise when the task should be skipped.""" +class AirflowTaskCheckpointed(AirflowException): + """ + Raise when the operator has reached a stable checkpoint and intends to pause. + + The worker reports the task as CHECKPOINTED and persists the optional + ``checkpoint_data`` so a subsequent run can resume from the same point. + + This is the AIP-96 (Resumable Operators) foundation primitive. Auto-resume + semantics, scheduler treatment, and downstream trigger-rule integration are + intentionally deferred to follow-ups so the API shape can be discussed + without committing to a single resumption policy. + + :param checkpoint_data: Arbitrary serializable payload representing the + operator's resume point. Persistence and resume wiring are out of scope + for this foundation; the parameter exists so listeners and operator + authors can iterate against the final shape. + """ + + def __init__(self, checkpoint_data=None): + super().__init__() + self.checkpoint_data = checkpoint_data + + def serialize(self): + cls = self.__class__ + return f"{cls.__module__}.{cls.__name__}", (), {"checkpoint_data": self.checkpoint_data} + + class AirflowTaskTerminated(BaseException): """Raise when the task execution is terminated.""" diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..905d4427f3746 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1342,6 +1342,15 @@ def _on_term(signum, frame): rendered_map_index=ti.rendered_map_index, ) state = TaskInstanceState.SKIPPED + except AirflowTaskCheckpointed as checkpoint: + log.info("Task checkpointed; reporting CHECKPOINTED state.") + msg = TaskState( + state=TaskInstanceState.CHECKPOINTED, + end_date=datetime.now(tz=timezone.utc), + rendered_map_index=ti.rendered_map_index, + ) + state = TaskInstanceState.CHECKPOINTED + ti._checkpoint_data = checkpoint.checkpoint_data # type: ignore[attr-defined] except AirflowRescheduleException as reschedule: log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE") msg = RescheduleTask( diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 630aff9094ed1..03b290eb05e73 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4758,6 +4758,37 @@ def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only( mock_supervisor_comms.send.assert_any_call(msg) +class TestTaskCheckpointed: + """AIP-96 foundation: AirflowTaskCheckpointed -> CHECKPOINTED state.""" + + @pytest.mark.parametrize( + "checkpoint_data", + [ + pytest.param(None, id="no-payload"), + pytest.param({"step": 3, "iterator_offset": 1024}, id="dict-payload"), + pytest.param([1, 2, 3], id="list-payload"), + ], + ) + def test_run_returns_checkpointed_state( + self, checkpoint_data, create_runtime_ti, mock_supervisor_comms + ): + """``run()`` reports CHECKPOINTED when the operator raises + ``AirflowTaskCheckpointed``. The exception's ``checkpoint_data`` payload + is preserved on the exception object regardless of shape; persistence + and resume wiring are out of scope for the foundation PR.""" + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + def _raise_checkpointed(): + raise AirflowTaskCheckpointed(checkpoint_data=checkpoint_data) + + task = PythonOperator(task_id="checkpointed_task", python_callable=_raise_checkpointed) + ti = create_runtime_ti(task=task) + + state, _msg, _error = run(ti, context=ti.get_template_context(), log=mock.MagicMock()) + + assert state == TaskInstanceState.CHECKPOINTED + + class TestTaskInstanceMetrics: def test_ti_start_metric_emitted(self, create_runtime_ti, mock_supervisor_comms): """Test that ti.start metric is emitted at the beginning of task.""" From 3dd7d637d4c5f4754772c3d02b7442589449316b Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:18:26 -0700 Subject: [PATCH 2/8] fix: import AirflowTaskCheckpointed in run() so the except clause resolves --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 905d4427f3746..bfbf124611fb5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1248,6 +1248,7 @@ def run( AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, + AirflowTaskCheckpointed, AirflowTaskTerminated, DagRunTriggerException, DownstreamTasksSkipped, From 9c5ffa2e3f54a00c4a78f7a31d79d7933b9b7a60 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:22:39 -0700 Subject: [PATCH 3/8] fix: add CHECKPOINTED to task-sdk datamodel enum (mirrors airflow.utils.state) --- task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index b5b100154c389..4a0cd29b86dd9 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -365,6 +365,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = "upstream_failed" SKIPPED = "skipped" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class TaskStatePutBody(BaseModel): From 7e25b73a4faaf09e99487ad781bbf1970f6ab326 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:24:45 -0700 Subject: [PATCH 4/8] fix: allow CHECKPOINTED in TaskState supervisor message Literal --- task-sdk/src/airflow/sdk/execution_time/comms.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py index 87c7881333ad4..ded750db1840c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/comms.py +++ b/task-sdk/src/airflow/sdk/execution_time/comms.py @@ -754,6 +754,7 @@ class TaskState(BaseModel): TaskInstanceState.FAILED, TaskInstanceState.SKIPPED, TaskInstanceState.REMOVED, + TaskInstanceState.CHECKPOINTED, ] end_date: datetime | None = None type: Literal["TaskState"] = "TaskState" From ea29e2a7d6c02525f1c660da9af4cf2f095f4dc5 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:27:43 -0700 Subject: [PATCH 5/8] fix: add CHECKPOINTED to task-sdk IntermediateTIState datamodel --- task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 4a0cd29b86dd9..306cd386c38ac 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -198,6 +198,7 @@ class IntermediateTIState(str, Enum): UP_FOR_RETRY = "up_for_retry" UP_FOR_RESCHEDULE = "up_for_reschedule" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class PrevSuccessfulDagRunResponse(BaseModel): From 92b31aa0d77f16b32da89df70ef048a50b2090c9 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:12 -0700 Subject: [PATCH 6/8] fix: newsfragment must have empty second line (lint rule) --- .../newsfragments/66402.significant.rst | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/airflow-core/newsfragments/66402.significant.rst b/airflow-core/newsfragments/66402.significant.rst index 5569c5b3bfa04..7829297e3bedb 100644 --- a/airflow-core/newsfragments/66402.significant.rst +++ b/airflow-core/newsfragments/66402.significant.rst @@ -1,6 +1,10 @@ -AIP-96 (Resumable Operators) foundation — adds the ``CHECKPOINTED`` task -instance state and the ``AirflowTaskCheckpointed`` exception so an operator -can signal "I have reached a stable checkpoint and intend to pause": +Add the ``CHECKPOINTED`` task instance state and the ``AirflowTaskCheckpointed`` exception (AIP-96 foundation). + +An operator can raise ``AirflowTaskCheckpointed(checkpoint_data=...)`` from +``execute()`` to signal "I have reached a stable checkpoint and intend to +pause". The worker catches the exception and reports the ``CHECKPOINTED`` +state. ``CHECKPOINTED`` is an intermediate state and is included in +``State.unfinished``. .. code-block:: python @@ -13,13 +17,8 @@ can signal "I have reached a stable checkpoint and intend to pause": raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) self.do_work(step) -The worker catches the exception and reports ``CHECKPOINTED`` state. -``CHECKPOINTED`` is an intermediate state and is included in -``State.unfinished``. - -This change intentionally ships only the vocabulary plus the -worker-side state transition. ``checkpoint_data`` persistence, -scheduler auto-resume semantics, the listener hook, and downstream -trigger-rule integration are deferred to follow-up PRs so the API -shape can be discussed against a real artifact before committing to a -single resumption policy. +This change intentionally ships only the vocabulary plus the worker-side +state transition. ``checkpoint_data`` persistence, scheduler auto-resume +semantics, the listener hook, and downstream trigger-rule integration are +deferred to follow-up PRs so the API shape can be discussed against a real +artifact before committing to a single resumption policy. From cf881f268f8e14e98f9eece5f184868a93f9950d Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:32:34 -0700 Subject: [PATCH 7/8] fix: ruff format on test_task_runner.py --- task-sdk/tests/task_sdk/execution_time/test_task_runner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 03b290eb05e73..4a6a68165fd3c 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4769,9 +4769,7 @@ class TestTaskCheckpointed: pytest.param([1, 2, 3], id="list-payload"), ], ) - def test_run_returns_checkpointed_state( - self, checkpoint_data, create_runtime_ti, mock_supervisor_comms - ): + def test_run_returns_checkpointed_state(self, checkpoint_data, create_runtime_ti, mock_supervisor_comms): """``run()`` reports CHECKPOINTED when the operator raises ``AirflowTaskCheckpointed``. The exception's ``checkpoint_data`` payload is preserved on the exception object regardless of shape; persistence From 237f6abf44ed03ec8d50598f64a416945cdb62c2 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 21:48:45 -0700 Subject: [PATCH 8/8] regenerate openapi specs + datamodels for CHECKPOINTED state --- .../src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml | 1 + .../api_fastapi/core_api/openapi/v2-rest-api-generated.yaml | 1 + airflow-ctl/src/airflowctl/api/datamodels/generated.py | 1 + .../airflow/providers/edge3/worker_api/v2-edge-generated.yaml | 1 + 4 files changed, 4 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 2983263bbc59b..bdf69e69be527 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -3546,6 +3546,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5ed96855c24ac..64a120d893c37 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -14445,6 +14445,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in. diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 276c8699de058..d549d78711481 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -841,6 +841,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = "upstream_failed" SKIPPED = "skipped" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class TaskInstancesBatchBody(BaseModel): diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml index 01c8149d1dad8..e894eda0837dd 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml @@ -1274,6 +1274,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in.