From c5024469bbca7cd9abc8b22807f07b5598ef8017 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:30:52 -0700 Subject: [PATCH 01/16] AIP-96: add CHECKPOINTED state and AirflowTaskCheckpointed exception Foundation for AIP-96 (Resumable Operators). Adds the vocabulary needed to discuss the resumable-operator API shape against a real artifact: - ``CHECKPOINTED`` task instance state (intermediate, lightyellow in UI), added to ``State.unfinished`` so downstream trigger rules treat it as "not yet finished". - ``AirflowTaskCheckpointed`` exception with optional ``checkpoint_data`` payload, mirroring the ``AirflowRescheduleException`` pattern (serializable, accepts a single domain payload). - Worker handler in ``run()`` that catches the exception and reports the ``CHECKPOINTED`` state to the supervisor. - Parametrized test ``TestTaskCheckpointed.test_run_returns_checkpointed_state`` covering ``checkpoint_data`` shapes (``None``, dict, list). Intentionally deferred to follow-ups so the API shape can be discussed against a real artifact before committing to a single policy: - ``checkpoint_data`` persistence and resume wiring. - Scheduler treatment of ``CHECKPOINTED`` (auto-resume after delay vs. manual API-driven resume only). - New listener hook ``on_task_instance_checkpointed`` and downstream observer integration. - Trigger-rule semantics for downstream tasks of a ``CHECKPOINTED`` upstream. Marking draft so the design discussion can happen on the PR. --- ...96-checkpointed-foundation.significant.rst | 25 +++++++++++++++ airflow-core/src/airflow/utils/state.py | 8 +++++ task-sdk/src/airflow/sdk/exceptions.py | 27 ++++++++++++++++ .../airflow/sdk/execution_time/task_runner.py | 17 ++++++++++ .../execution_time/test_task_runner.py | 31 +++++++++++++++++++ 5 files changed, 108 insertions(+) create mode 100644 airflow-core/newsfragments/aip96-checkpointed-foundation.significant.rst diff --git a/airflow-core/newsfragments/aip96-checkpointed-foundation.significant.rst b/airflow-core/newsfragments/aip96-checkpointed-foundation.significant.rst new file mode 100644 index 0000000000000..5569c5b3bfa04 --- /dev/null +++ b/airflow-core/newsfragments/aip96-checkpointed-foundation.significant.rst @@ -0,0 +1,25 @@ +AIP-96 (Resumable Operators) foundation — adds the ``CHECKPOINTED`` task +instance state and the ``AirflowTaskCheckpointed`` exception so an operator +can signal "I have reached a stable checkpoint and intend to pause": + +.. code-block:: python + + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + class ResumablePythonOperator(PythonOperator): + def execute(self, context): + for step in range(self.total_steps): + if self.should_pause(): + raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) + self.do_work(step) + +The worker catches the exception and reports ``CHECKPOINTED`` state. +``CHECKPOINTED`` is an intermediate state and is included in +``State.unfinished``. + +This change intentionally ships only the vocabulary plus the +worker-side state transition. ``checkpoint_data`` persistence, +scheduler auto-resume semantics, the listener hook, and downstream +trigger-rule integration are deferred to follow-up PRs so the API +shape can be discussed against a real artifact before committing to a +single resumption policy. diff --git a/airflow-core/src/airflow/utils/state.py b/airflow-core/src/airflow/utils/state.py index 332efb105533d..83b468c01d68d 100644 --- a/airflow-core/src/airflow/utils/state.py +++ b/airflow-core/src/airflow/utils/state.py @@ -56,6 +56,7 @@ class IntermediateTIState(str, Enum): UP_FOR_RETRY = "up_for_retry" UP_FOR_RESCHEDULE = "up_for_reschedule" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" def __str__(self) -> str: return self.value @@ -87,6 +88,10 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream deps failed SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger + # Operator reached a stable checkpoint and paused; resume requires explicit + # action (manual API call) for now. AIP-96 will define automatic resume + # semantics; this state is the foundation for that discussion. + CHECKPOINTED = IntermediateTIState.CHECKPOINTED def __str__(self) -> str: return self.value @@ -130,6 +135,7 @@ class State: UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED SKIPPED = TaskInstanceState.SKIPPED DEFERRED = TaskInstanceState.DEFERRED + CHECKPOINTED = TaskInstanceState.CHECKPOINTED finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) @@ -157,6 +163,7 @@ class State: TaskInstanceState.REMOVED: "lightgrey", TaskInstanceState.SCHEDULED: "tan", TaskInstanceState.DEFERRED: "mediumpurple", + TaskInstanceState.CHECKPOINTED: "lightyellow", } @classmethod @@ -200,6 +207,7 @@ def color_fg(cls, state): TaskInstanceState.UP_FOR_RETRY, TaskInstanceState.UP_FOR_RESCHEDULE, TaskInstanceState.DEFERRED, + TaskInstanceState.CHECKPOINTED, ] ) """ diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index 7d42dad5d8502..cbde73db11224 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -165,6 +165,33 @@ class AirflowSkipException(AirflowException): """Raise when the task should be skipped.""" +class AirflowTaskCheckpointed(AirflowException): + """ + Raise when the operator has reached a stable checkpoint and intends to pause. + + The worker reports the task as CHECKPOINTED and persists the optional + ``checkpoint_data`` so a subsequent run can resume from the same point. + + This is the AIP-96 (Resumable Operators) foundation primitive. Auto-resume + semantics, scheduler treatment, and downstream trigger-rule integration are + intentionally deferred to follow-ups so the API shape can be discussed + without committing to a single resumption policy. + + :param checkpoint_data: Arbitrary serializable payload representing the + operator's resume point. Persistence and resume wiring are out of scope + for this foundation; the parameter exists so listeners and operator + authors can iterate against the final shape. + """ + + def __init__(self, checkpoint_data=None): + super().__init__() + self.checkpoint_data = checkpoint_data + + def serialize(self): + cls = self.__class__ + return f"{cls.__module__}.{cls.__name__}", (), {"checkpoint_data": self.checkpoint_data} + + class AirflowTaskTerminated(BaseException): """Raise when the task execution is terminated.""" diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..2bed358ed33dc 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1342,6 +1342,23 @@ def _on_term(signum, frame): rendered_map_index=ti.rendered_map_index, ) state = TaskInstanceState.SKIPPED + except AirflowTaskCheckpointed as checkpoint: + # AIP-96 foundation: the operator reached a stable checkpoint and is + # opting to pause. ``checkpoint_data`` persistence and resume wiring are + # out of scope for this foundation PR; this branch establishes the + # state-transition contract so listeners and operator authors can + # iterate against the final shape on a real artifact. + log.info("Task checkpointed; reporting CHECKPOINTED state.") + msg = TaskState( + state=TaskInstanceState.CHECKPOINTED, + end_date=datetime.now(tz=timezone.utc), + rendered_map_index=ti.rendered_map_index, + ) + state = TaskInstanceState.CHECKPOINTED + # Reference the payload so static analysis flags accidental drops once + # persistence wiring lands; the foundation PR intentionally does not + # forward this to the supervisor yet. + _ = checkpoint.checkpoint_data except AirflowRescheduleException as reschedule: log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE") msg = RescheduleTask( diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 630aff9094ed1..03b290eb05e73 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4758,6 +4758,37 @@ def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only( mock_supervisor_comms.send.assert_any_call(msg) +class TestTaskCheckpointed: + """AIP-96 foundation: AirflowTaskCheckpointed -> CHECKPOINTED state.""" + + @pytest.mark.parametrize( + "checkpoint_data", + [ + pytest.param(None, id="no-payload"), + pytest.param({"step": 3, "iterator_offset": 1024}, id="dict-payload"), + pytest.param([1, 2, 3], id="list-payload"), + ], + ) + def test_run_returns_checkpointed_state( + self, checkpoint_data, create_runtime_ti, mock_supervisor_comms + ): + """``run()`` reports CHECKPOINTED when the operator raises + ``AirflowTaskCheckpointed``. The exception's ``checkpoint_data`` payload + is preserved on the exception object regardless of shape; persistence + and resume wiring are out of scope for the foundation PR.""" + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + def _raise_checkpointed(): + raise AirflowTaskCheckpointed(checkpoint_data=checkpoint_data) + + task = PythonOperator(task_id="checkpointed_task", python_callable=_raise_checkpointed) + ti = create_runtime_ti(task=task) + + state, _msg, _error = run(ti, context=ti.get_template_context(), log=mock.MagicMock()) + + assert state == TaskInstanceState.CHECKPOINTED + + class TestTaskInstanceMetrics: def test_ti_start_metric_emitted(self, create_runtime_ti, mock_supervisor_comms): """Test that ti.start metric is emitted at the beginning of task.""" From b64d9976cd648db5b24db13a86258216fc4a53e0 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:31:44 -0700 Subject: [PATCH 02/16] Rename newsfragment to match PR number --- ...ckpointed-foundation.significant.rst => 66402.significant.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{aip96-checkpointed-foundation.significant.rst => 66402.significant.rst} (100%) diff --git a/airflow-core/newsfragments/aip96-checkpointed-foundation.significant.rst b/airflow-core/newsfragments/66402.significant.rst similarity index 100% rename from airflow-core/newsfragments/aip96-checkpointed-foundation.significant.rst rename to airflow-core/newsfragments/66402.significant.rst From 882780f6cc470b33e533debf27be8a0f92f6c6c4 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 02:57:04 -0700 Subject: [PATCH 03/16] Strip task/AIP-flavored source comments --- airflow-core/src/airflow/utils/state.py | 5 +---- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 10 +--------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/airflow-core/src/airflow/utils/state.py b/airflow-core/src/airflow/utils/state.py index 83b468c01d68d..890043f094656 100644 --- a/airflow-core/src/airflow/utils/state.py +++ b/airflow-core/src/airflow/utils/state.py @@ -88,10 +88,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream deps failed SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger - # Operator reached a stable checkpoint and paused; resume requires explicit - # action (manual API call) for now. AIP-96 will define automatic resume - # semantics; this state is the foundation for that discussion. - CHECKPOINTED = IntermediateTIState.CHECKPOINTED + CHECKPOINTED = IntermediateTIState.CHECKPOINTED # Operator paused at a stable checkpoint def __str__(self) -> str: return self.value diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 2bed358ed33dc..905d4427f3746 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1343,11 +1343,6 @@ def _on_term(signum, frame): ) state = TaskInstanceState.SKIPPED except AirflowTaskCheckpointed as checkpoint: - # AIP-96 foundation: the operator reached a stable checkpoint and is - # opting to pause. ``checkpoint_data`` persistence and resume wiring are - # out of scope for this foundation PR; this branch establishes the - # state-transition contract so listeners and operator authors can - # iterate against the final shape on a real artifact. log.info("Task checkpointed; reporting CHECKPOINTED state.") msg = TaskState( state=TaskInstanceState.CHECKPOINTED, @@ -1355,10 +1350,7 @@ def _on_term(signum, frame): rendered_map_index=ti.rendered_map_index, ) state = TaskInstanceState.CHECKPOINTED - # Reference the payload so static analysis flags accidental drops once - # persistence wiring lands; the foundation PR intentionally does not - # forward this to the supervisor yet. - _ = checkpoint.checkpoint_data + ti._checkpoint_data = checkpoint.checkpoint_data # type: ignore[attr-defined] except AirflowRescheduleException as reschedule: log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE") msg = RescheduleTask( From 485b22e40a9f571d64eb752ee69d83e3c6657ec7 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 03:01:06 -0700 Subject: [PATCH 04/16] Add on_task_instance_checkpointed listener hook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hookspec, finalize() emit, example listener handler, and test that the operator-supplied checkpoint_data is delivered to listeners. Stacks on the CHECKPOINTED state introduced in #66402; merge that one first. The hook fires from finalize() when run() catches AirflowTaskCheckpointed and reports CHECKPOINTED. checkpoint_data is stashed on the runtime ti inside the catch and read inside finalize() — the supervisor message does not carry the payload yet (separate question). --- .../aip96-checkpointed-listener.feature.rst | 9 ++++ .../example_dags/plugins/event_listener.py | 20 +++++++++ .../listeners/spec/taskinstance.py | 22 ++++++++++ .../airflow/sdk/execution_time/task_runner.py | 9 ++++ .../execution_time/test_task_runner.py | 44 ++++++++++++++++--- 5 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 airflow-core/newsfragments/aip96-checkpointed-listener.feature.rst diff --git a/airflow-core/newsfragments/aip96-checkpointed-listener.feature.rst b/airflow-core/newsfragments/aip96-checkpointed-listener.feature.rst new file mode 100644 index 0000000000000..f0160092a28a1 --- /dev/null +++ b/airflow-core/newsfragments/aip96-checkpointed-listener.feature.rst @@ -0,0 +1,9 @@ +Adds the ``on_task_instance_checkpointed`` listener hook so listener +authors observe checkpoint events alongside ``running`` / +``success`` / ``failed`` / ``skipped``. Receives ``previous_state``, +``task_instance``, and the operator-supplied ``checkpoint_data`` +payload. + +The hook fires from the worker's ``finalize()`` when the operator +raises ``AirflowTaskCheckpointed`` and ``run()`` reports the +``CHECKPOINTED`` state. diff --git a/airflow-core/src/airflow/example_dags/plugins/event_listener.py b/airflow-core/src/airflow/example_dags/plugins/event_listener.py index 91af9f5ccc6df..252ce2b65f344 100644 --- a/airflow-core/src/airflow/example_dags/plugins/event_listener.py +++ b/airflow-core/src/airflow/example_dags/plugins/event_listener.py @@ -173,6 +173,26 @@ def on_task_instance_skipped( # [END howto_listen_ti_skipped_task] +# [START howto_listen_ti_checkpointed_task] +@hookimpl +def on_task_instance_checkpointed( + previous_state: TaskInstanceState | None, + task_instance: RuntimeTaskInstance | TaskInstance, + checkpoint_data, +): + """ + Called when a task reaches a stable checkpoint and pauses. + + The operator raised ``AirflowTaskCheckpointed`` from ``execute()`` and the + worker reported ``CHECKPOINTED`` state. ``checkpoint_data`` is the payload + the operator passed when raising; it can be ``None``. + """ + print(f"Task instance reached a checkpoint (data={checkpoint_data!r})") + + +# [END howto_listen_ti_checkpointed_task] + + # [START howto_listen_dagrun_success_task] @hookimpl def on_dag_run_success(dag_run: DagRun, msg: str): diff --git a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py index d3450d6b05aa7..f524b12226169 100644 --- a/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py +++ b/shared/listeners/src/airflow_shared/listeners/spec/taskinstance.py @@ -79,3 +79,25 @@ def on_task_instance_skipped( :param task_instance: The task instance object (RuntimeTaskInstance when called from task execution context, TaskInstance when called from API server) """ + + +@hookspec +def on_task_instance_checkpointed( + previous_state: TaskInstanceState | None, + task_instance: RuntimeTaskInstance | TaskInstance, + checkpoint_data, +): + """ + Execute when a task instance reaches a stable checkpoint and pauses. + + Fires when an operator raises ``AirflowTaskCheckpointed`` and the worker + reports ``CHECKPOINTED`` state. + + :param previous_state: Previous state of the task instance (typically RUNNING) + :param task_instance: The task instance object (RuntimeTaskInstance when + called from task execution context, TaskInstance when called from API + server) + :param checkpoint_data: The serializable payload the operator passed to + ``AirflowTaskCheckpointed(checkpoint_data=...)``, or ``None`` if no + payload was supplied. + """ diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 905d4427f3746..7e87461c53bb0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1941,6 +1941,15 @@ def finalize( log.exception("error calling listener") if error and task.email_on_failure and task.email: _send_error_email_notification(task, ti, context, error, log) + elif state == TaskInstanceState.CHECKPOINTED: + try: + get_listener_manager().hook.on_task_instance_checkpointed( + previous_state=TaskInstanceState.RUNNING, + task_instance=ti, + checkpoint_data=getattr(ti, "_checkpoint_data", None), + ) + except Exception: + log.exception("error calling listener") try: get_listener_manager().hook.before_stopping(component=TaskRunnerMarker()) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 03b290eb05e73..aaae8cc9ba1f0 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4759,8 +4759,6 @@ def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only( class TestTaskCheckpointed: - """AIP-96 foundation: AirflowTaskCheckpointed -> CHECKPOINTED state.""" - @pytest.mark.parametrize( "checkpoint_data", [ @@ -4773,9 +4771,7 @@ def test_run_returns_checkpointed_state( self, checkpoint_data, create_runtime_ti, mock_supervisor_comms ): """``run()`` reports CHECKPOINTED when the operator raises - ``AirflowTaskCheckpointed``. The exception's ``checkpoint_data`` payload - is preserved on the exception object regardless of shape; persistence - and resume wiring are out of scope for the foundation PR.""" + ``AirflowTaskCheckpointed``.""" from airflow.sdk.exceptions import AirflowTaskCheckpointed def _raise_checkpointed(): @@ -4788,6 +4784,44 @@ def _raise_checkpointed(): assert state == TaskInstanceState.CHECKPOINTED + @pytest.mark.parametrize( + "checkpoint_data", + [ + pytest.param(None, id="no-payload"), + pytest.param({"step": 7}, id="dict-payload"), + ], + ) + def test_listener_receives_checkpoint_data( + self, checkpoint_data, create_runtime_ti, mock_supervisor_comms, listener_manager + ): + """``finalize()`` invokes ``on_task_instance_checkpointed`` and forwards + the operator-supplied ``checkpoint_data``.""" + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + received = {"called": 0, "data": ""} + + class CheckpointListener: + @hookimpl + def on_task_instance_checkpointed(self, previous_state, task_instance, checkpoint_data): + received["called"] += 1 + received["data"] = checkpoint_data + + listener_manager(CheckpointListener()) + + def _raise_checkpointed(): + raise AirflowTaskCheckpointed(checkpoint_data=checkpoint_data) + + task = PythonOperator(task_id="checkpointed_task", python_callable=_raise_checkpointed) + ti = create_runtime_ti(task=task) + log = mock.MagicMock() + context = ti.get_template_context() + state, _msg, _error = run(ti, context=context, log=log) + finalize(ti, state, context, log) + + assert state == TaskInstanceState.CHECKPOINTED + assert received["called"] == 1 + assert received["data"] == checkpoint_data + class TestTaskInstanceMetrics: def test_ti_start_metric_emitted(self, create_runtime_ti, mock_supervisor_comms): From ae2e5f5f4d512ff393f84abf513ab68ed4c8b848 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 03:01:36 -0700 Subject: [PATCH 05/16] Rename newsfragment to match PR number --- ...{aip96-checkpointed-listener.feature.rst => 66410.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{aip96-checkpointed-listener.feature.rst => 66410.feature.rst} (100%) diff --git a/airflow-core/newsfragments/aip96-checkpointed-listener.feature.rst b/airflow-core/newsfragments/66410.feature.rst similarity index 100% rename from airflow-core/newsfragments/aip96-checkpointed-listener.feature.rst rename to airflow-core/newsfragments/66410.feature.rst From 31e50c785046bc30a3f8c8ddf2122c27996741a5 Mon Sep 17 00:00:00 2001 From: Stefan Wang <21269208+1fanwang@users.noreply.github.com> Date: Tue, 5 May 2026 03:05:13 -0700 Subject: [PATCH 06/16] AIP-96 (draft): add CHECKPOINTED state and AirflowTaskCheckpointed exception MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for AIP-96 (Resumable Operators) — adds vocabulary so the operator-author API surface can be discussed against running code. - CHECKPOINTED added to TaskInstanceState (intermediate state, in State.unfinished, lightyellow in the UI) - AirflowTaskCheckpointed exception with optional checkpoint_data payload, mirroring the AirflowRescheduleException pattern (serializable, single domain payload) - run() catches the exception and reports CHECKPOINTED state to the supervisor Persistence of checkpoint_data, scheduler auto-resume semantics, the listener hook (#66410), and downstream trigger-rule integration are intentionally out of scope here so each can be discussed separately. --- .../newsfragments/66402.significant.rst | 25 +++++++++++++++ airflow-core/src/airflow/utils/state.py | 5 +++ task-sdk/src/airflow/sdk/exceptions.py | 27 ++++++++++++++++ .../airflow/sdk/execution_time/task_runner.py | 9 ++++++ .../execution_time/test_task_runner.py | 31 +++++++++++++++++++ 5 files changed, 97 insertions(+) create mode 100644 airflow-core/newsfragments/66402.significant.rst diff --git a/airflow-core/newsfragments/66402.significant.rst b/airflow-core/newsfragments/66402.significant.rst new file mode 100644 index 0000000000000..5569c5b3bfa04 --- /dev/null +++ b/airflow-core/newsfragments/66402.significant.rst @@ -0,0 +1,25 @@ +AIP-96 (Resumable Operators) foundation — adds the ``CHECKPOINTED`` task +instance state and the ``AirflowTaskCheckpointed`` exception so an operator +can signal "I have reached a stable checkpoint and intend to pause": + +.. code-block:: python + + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + class ResumablePythonOperator(PythonOperator): + def execute(self, context): + for step in range(self.total_steps): + if self.should_pause(): + raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) + self.do_work(step) + +The worker catches the exception and reports ``CHECKPOINTED`` state. +``CHECKPOINTED`` is an intermediate state and is included in +``State.unfinished``. + +This change intentionally ships only the vocabulary plus the +worker-side state transition. ``checkpoint_data`` persistence, +scheduler auto-resume semantics, the listener hook, and downstream +trigger-rule integration are deferred to follow-up PRs so the API +shape can be discussed against a real artifact before committing to a +single resumption policy. diff --git a/airflow-core/src/airflow/utils/state.py b/airflow-core/src/airflow/utils/state.py index 332efb105533d..890043f094656 100644 --- a/airflow-core/src/airflow/utils/state.py +++ b/airflow-core/src/airflow/utils/state.py @@ -56,6 +56,7 @@ class IntermediateTIState(str, Enum): UP_FOR_RETRY = "up_for_retry" UP_FOR_RESCHEDULE = "up_for_reschedule" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" def __str__(self) -> str: return self.value @@ -87,6 +88,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = TerminalTIState.UPSTREAM_FAILED # One or more upstream deps failed SKIPPED = TerminalTIState.SKIPPED # Skipped by branching or some other mechanism DEFERRED = IntermediateTIState.DEFERRED # Deferrable operator waiting on a trigger + CHECKPOINTED = IntermediateTIState.CHECKPOINTED # Operator paused at a stable checkpoint def __str__(self) -> str: return self.value @@ -130,6 +132,7 @@ class State: UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED SKIPPED = TaskInstanceState.SKIPPED DEFERRED = TaskInstanceState.DEFERRED + CHECKPOINTED = TaskInstanceState.CHECKPOINTED finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED]) unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING]) @@ -157,6 +160,7 @@ class State: TaskInstanceState.REMOVED: "lightgrey", TaskInstanceState.SCHEDULED: "tan", TaskInstanceState.DEFERRED: "mediumpurple", + TaskInstanceState.CHECKPOINTED: "lightyellow", } @classmethod @@ -200,6 +204,7 @@ def color_fg(cls, state): TaskInstanceState.UP_FOR_RETRY, TaskInstanceState.UP_FOR_RESCHEDULE, TaskInstanceState.DEFERRED, + TaskInstanceState.CHECKPOINTED, ] ) """ diff --git a/task-sdk/src/airflow/sdk/exceptions.py b/task-sdk/src/airflow/sdk/exceptions.py index 7d42dad5d8502..cbde73db11224 100644 --- a/task-sdk/src/airflow/sdk/exceptions.py +++ b/task-sdk/src/airflow/sdk/exceptions.py @@ -165,6 +165,33 @@ class AirflowSkipException(AirflowException): """Raise when the task should be skipped.""" +class AirflowTaskCheckpointed(AirflowException): + """ + Raise when the operator has reached a stable checkpoint and intends to pause. + + The worker reports the task as CHECKPOINTED and persists the optional + ``checkpoint_data`` so a subsequent run can resume from the same point. + + This is the AIP-96 (Resumable Operators) foundation primitive. Auto-resume + semantics, scheduler treatment, and downstream trigger-rule integration are + intentionally deferred to follow-ups so the API shape can be discussed + without committing to a single resumption policy. + + :param checkpoint_data: Arbitrary serializable payload representing the + operator's resume point. Persistence and resume wiring are out of scope + for this foundation; the parameter exists so listeners and operator + authors can iterate against the final shape. + """ + + def __init__(self, checkpoint_data=None): + super().__init__() + self.checkpoint_data = checkpoint_data + + def serialize(self): + cls = self.__class__ + return f"{cls.__module__}.{cls.__name__}", (), {"checkpoint_data": self.checkpoint_data} + + class AirflowTaskTerminated(BaseException): """Raise when the task execution is terminated.""" diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 56ba8343c648b..905d4427f3746 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1342,6 +1342,15 @@ def _on_term(signum, frame): rendered_map_index=ti.rendered_map_index, ) state = TaskInstanceState.SKIPPED + except AirflowTaskCheckpointed as checkpoint: + log.info("Task checkpointed; reporting CHECKPOINTED state.") + msg = TaskState( + state=TaskInstanceState.CHECKPOINTED, + end_date=datetime.now(tz=timezone.utc), + rendered_map_index=ti.rendered_map_index, + ) + state = TaskInstanceState.CHECKPOINTED + ti._checkpoint_data = checkpoint.checkpoint_data # type: ignore[attr-defined] except AirflowRescheduleException as reschedule: log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE") msg = RescheduleTask( diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 630aff9094ed1..03b290eb05e73 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4758,6 +4758,37 @@ def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only( mock_supervisor_comms.send.assert_any_call(msg) +class TestTaskCheckpointed: + """AIP-96 foundation: AirflowTaskCheckpointed -> CHECKPOINTED state.""" + + @pytest.mark.parametrize( + "checkpoint_data", + [ + pytest.param(None, id="no-payload"), + pytest.param({"step": 3, "iterator_offset": 1024}, id="dict-payload"), + pytest.param([1, 2, 3], id="list-payload"), + ], + ) + def test_run_returns_checkpointed_state( + self, checkpoint_data, create_runtime_ti, mock_supervisor_comms + ): + """``run()`` reports CHECKPOINTED when the operator raises + ``AirflowTaskCheckpointed``. The exception's ``checkpoint_data`` payload + is preserved on the exception object regardless of shape; persistence + and resume wiring are out of scope for the foundation PR.""" + from airflow.sdk.exceptions import AirflowTaskCheckpointed + + def _raise_checkpointed(): + raise AirflowTaskCheckpointed(checkpoint_data=checkpoint_data) + + task = PythonOperator(task_id="checkpointed_task", python_callable=_raise_checkpointed) + ti = create_runtime_ti(task=task) + + state, _msg, _error = run(ti, context=ti.get_template_context(), log=mock.MagicMock()) + + assert state == TaskInstanceState.CHECKPOINTED + + class TestTaskInstanceMetrics: def test_ti_start_metric_emitted(self, create_runtime_ti, mock_supervisor_comms): """Test that ti.start metric is emitted at the beginning of task.""" From 3dd7d637d4c5f4754772c3d02b7442589449316b Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:18:26 -0700 Subject: [PATCH 07/16] fix: import AirflowTaskCheckpointed in run() so the except clause resolves --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 905d4427f3746..bfbf124611fb5 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1248,6 +1248,7 @@ def run( AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, + AirflowTaskCheckpointed, AirflowTaskTerminated, DagRunTriggerException, DownstreamTasksSkipped, From 9c5ffa2e3f54a00c4a78f7a31d79d7933b9b7a60 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:22:39 -0700 Subject: [PATCH 08/16] fix: add CHECKPOINTED to task-sdk datamodel enum (mirrors airflow.utils.state) --- task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index b5b100154c389..4a0cd29b86dd9 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -365,6 +365,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = "upstream_failed" SKIPPED = "skipped" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class TaskStatePutBody(BaseModel): From 7e25b73a4faaf09e99487ad781bbf1970f6ab326 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:24:45 -0700 Subject: [PATCH 09/16] fix: allow CHECKPOINTED in TaskState supervisor message Literal --- task-sdk/src/airflow/sdk/execution_time/comms.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py index 87c7881333ad4..ded750db1840c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/comms.py +++ b/task-sdk/src/airflow/sdk/execution_time/comms.py @@ -754,6 +754,7 @@ class TaskState(BaseModel): TaskInstanceState.FAILED, TaskInstanceState.SKIPPED, TaskInstanceState.REMOVED, + TaskInstanceState.CHECKPOINTED, ] end_date: datetime | None = None type: Literal["TaskState"] = "TaskState" From ea29e2a7d6c02525f1c660da9af4cf2f095f4dc5 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 10:27:43 -0700 Subject: [PATCH 10/16] fix: add CHECKPOINTED to task-sdk IntermediateTIState datamodel --- task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 1 + 1 file changed, 1 insertion(+) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index 4a0cd29b86dd9..306cd386c38ac 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -198,6 +198,7 @@ class IntermediateTIState(str, Enum): UP_FOR_RETRY = "up_for_retry" UP_FOR_RESCHEDULE = "up_for_reschedule" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class PrevSuccessfulDagRunResponse(BaseModel): From 92b31aa0d77f16b32da89df70ef048a50b2090c9 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:12 -0700 Subject: [PATCH 11/16] fix: newsfragment must have empty second line (lint rule) --- .../newsfragments/66402.significant.rst | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/airflow-core/newsfragments/66402.significant.rst b/airflow-core/newsfragments/66402.significant.rst index 5569c5b3bfa04..7829297e3bedb 100644 --- a/airflow-core/newsfragments/66402.significant.rst +++ b/airflow-core/newsfragments/66402.significant.rst @@ -1,6 +1,10 @@ -AIP-96 (Resumable Operators) foundation — adds the ``CHECKPOINTED`` task -instance state and the ``AirflowTaskCheckpointed`` exception so an operator -can signal "I have reached a stable checkpoint and intend to pause": +Add the ``CHECKPOINTED`` task instance state and the ``AirflowTaskCheckpointed`` exception (AIP-96 foundation). + +An operator can raise ``AirflowTaskCheckpointed(checkpoint_data=...)`` from +``execute()`` to signal "I have reached a stable checkpoint and intend to +pause". The worker catches the exception and reports the ``CHECKPOINTED`` +state. ``CHECKPOINTED`` is an intermediate state and is included in +``State.unfinished``. .. code-block:: python @@ -13,13 +17,8 @@ can signal "I have reached a stable checkpoint and intend to pause": raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) self.do_work(step) -The worker catches the exception and reports ``CHECKPOINTED`` state. -``CHECKPOINTED`` is an intermediate state and is included in -``State.unfinished``. - -This change intentionally ships only the vocabulary plus the -worker-side state transition. ``checkpoint_data`` persistence, -scheduler auto-resume semantics, the listener hook, and downstream -trigger-rule integration are deferred to follow-up PRs so the API -shape can be discussed against a real artifact before committing to a -single resumption policy. +This change intentionally ships only the vocabulary plus the worker-side +state transition. ``checkpoint_data`` persistence, scheduler auto-resume +semantics, the listener hook, and downstream trigger-rule integration are +deferred to follow-up PRs so the API shape can be discussed against a real +artifact before committing to a single resumption policy. From ce5a4543ae870988190f36f449abafada8f2ee01 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 13:45:18 -0700 Subject: [PATCH 12/16] fix: newsfragment must have empty second line (lint rule) --- airflow-core/newsfragments/66410.feature.rst | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/airflow-core/newsfragments/66410.feature.rst b/airflow-core/newsfragments/66410.feature.rst index f0160092a28a1..d5b669ebdbcff 100644 --- a/airflow-core/newsfragments/66410.feature.rst +++ b/airflow-core/newsfragments/66410.feature.rst @@ -1,9 +1,6 @@ -Adds the ``on_task_instance_checkpointed`` listener hook so listener -authors observe checkpoint events alongside ``running`` / -``success`` / ``failed`` / ``skipped``. Receives ``previous_state``, -``task_instance``, and the operator-supplied ``checkpoint_data`` -payload. +Add the ``on_task_instance_checkpointed`` listener hook so listener authors observe checkpoint events alongside ``running``, ``success``, ``failed``, ``skipped``. -The hook fires from the worker's ``finalize()`` when the operator -raises ``AirflowTaskCheckpointed`` and ``run()`` reports the +Receives ``previous_state``, ``task_instance``, and the operator-supplied +``checkpoint_data`` payload. The hook fires from the worker's ``finalize()`` +when the operator raises ``AirflowTaskCheckpointed`` and ``run()`` reports the ``CHECKPOINTED`` state. From 83b9ff0b6e042d0bb11c42e8a55418a74a7b1a3e Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:30:14 -0700 Subject: [PATCH 13/16] fix newsfragment: feature type must be single-line --- airflow-core/newsfragments/66410.feature.rst | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/airflow-core/newsfragments/66410.feature.rst b/airflow-core/newsfragments/66410.feature.rst index d5b669ebdbcff..9faebc476ff62 100644 --- a/airflow-core/newsfragments/66410.feature.rst +++ b/airflow-core/newsfragments/66410.feature.rst @@ -1,6 +1 @@ -Add the ``on_task_instance_checkpointed`` listener hook so listener authors observe checkpoint events alongside ``running``, ``success``, ``failed``, ``skipped``. - -Receives ``previous_state``, ``task_instance``, and the operator-supplied -``checkpoint_data`` payload. The hook fires from the worker's ``finalize()`` -when the operator raises ``AirflowTaskCheckpointed`` and ``run()`` reports the -``CHECKPOINTED`` state. +Add the ``on_task_instance_checkpointed`` listener hook so listener authors observe checkpoint events alongside running, success, failed, and skipped. From 624bc6fb9ea92065b59bd5f60a59f0ba70e6d9e4 Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:32:59 -0700 Subject: [PATCH 14/16] fix: ruff format on test_task_runner.py --- task-sdk/tests/task_sdk/execution_time/test_task_runner.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index fd2144694ab85..84894b2bec19b 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -4767,9 +4767,7 @@ class TestTaskCheckpointed: pytest.param([1, 2, 3], id="list-payload"), ], ) - def test_run_returns_checkpointed_state( - self, checkpoint_data, create_runtime_ti, mock_supervisor_comms - ): + def test_run_returns_checkpointed_state(self, checkpoint_data, create_runtime_ti, mock_supervisor_comms): """``run()`` reports CHECKPOINTED when the operator raises ``AirflowTaskCheckpointed``.""" from airflow.sdk.exceptions import AirflowTaskCheckpointed @@ -4822,6 +4820,7 @@ def _raise_checkpointed(): assert received["called"] == 1 assert received["data"] == checkpoint_data + class TestTaskInstanceMetrics: def test_ti_start_metric_emitted(self, create_runtime_ti, mock_supervisor_comms): """Test that ti.start metric is emitted at the beginning of task.""" From c8645008d9c76f9200044ca60721ee4c6c83cd4b Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 18:34:47 -0700 Subject: [PATCH 15/16] remove PR-E's newsfragment (this branch stacks on #66402; that file lives there) --- .../newsfragments/66402.significant.rst | 24 ------------------- 1 file changed, 24 deletions(-) delete mode 100644 airflow-core/newsfragments/66402.significant.rst diff --git a/airflow-core/newsfragments/66402.significant.rst b/airflow-core/newsfragments/66402.significant.rst deleted file mode 100644 index 7829297e3bedb..0000000000000 --- a/airflow-core/newsfragments/66402.significant.rst +++ /dev/null @@ -1,24 +0,0 @@ -Add the ``CHECKPOINTED`` task instance state and the ``AirflowTaskCheckpointed`` exception (AIP-96 foundation). - -An operator can raise ``AirflowTaskCheckpointed(checkpoint_data=...)`` from -``execute()`` to signal "I have reached a stable checkpoint and intend to -pause". The worker catches the exception and reports the ``CHECKPOINTED`` -state. ``CHECKPOINTED`` is an intermediate state and is included in -``State.unfinished``. - -.. code-block:: python - - from airflow.sdk.exceptions import AirflowTaskCheckpointed - - class ResumablePythonOperator(PythonOperator): - def execute(self, context): - for step in range(self.total_steps): - if self.should_pause(): - raise AirflowTaskCheckpointed(checkpoint_data={"step": step}) - self.do_work(step) - -This change intentionally ships only the vocabulary plus the worker-side -state transition. ``checkpoint_data`` persistence, scheduler auto-resume -semantics, the listener hook, and downstream trigger-rule integration are -deferred to follow-up PRs so the API shape can be discussed against a real -artifact before committing to a single resumption policy. From 06e73d977f9f3a70c0a4855094263d796d2aec8c Mon Sep 17 00:00:00 2001 From: 1fanwang <1fannnw@gmail.com> Date: Tue, 5 May 2026 21:51:58 -0700 Subject: [PATCH 16/16] regenerate openapi specs + datamodels for CHECKPOINTED state --- .../src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml | 1 + .../api_fastapi/core_api/openapi/v2-rest-api-generated.yaml | 1 + airflow-ctl/src/airflowctl/api/datamodels/generated.py | 1 + .../airflow/providers/edge3/worker_api/v2-edge-generated.yaml | 1 + 4 files changed, 4 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 2983263bbc59b..bdf69e69be527 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -3546,6 +3546,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5ed96855c24ac..64a120d893c37 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -14445,6 +14445,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in. diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 276c8699de058..d549d78711481 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -841,6 +841,7 @@ class TaskInstanceState(str, Enum): UPSTREAM_FAILED = "upstream_failed" SKIPPED = "skipped" DEFERRED = "deferred" + CHECKPOINTED = "checkpointed" class TaskInstancesBatchBody(BaseModel): diff --git a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml index 01c8149d1dad8..e894eda0837dd 100644 --- a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml +++ b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml @@ -1274,6 +1274,7 @@ components: - upstream_failed - skipped - deferred + - checkpointed title: TaskInstanceState description: 'All possible states that a Task Instance can be in.