Skip to content

[DEMO ONLY] AIP-96: resumable operator example for discussion#66453

Draft
1fanwang wants to merge 2 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-resumable-operator-demo
Draft

[DEMO ONLY] AIP-96: resumable operator example for discussion#66453
1fanwang wants to merge 2 commits intoapache:mainfrom
1fanwang:1fanwang/aip96-resumable-operator-demo

Conversation

@1fanwang
Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang commented May 6, 2026

What this is

A discussion-only demo PR — companion to the AIP-96 v2 cwiki page and the AIP-96 PR set (#66402, #66410, #66445). Not for merge. Rebased onto main so the diff is one file: providers/databricks/tests/system/databricks/example_resumable_databricks.py (~170 lines).

The file imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is introduced by #66402 and not yet on main; this branch's CI will not pass standalone. The file is self-explanatory as a design illustration of the v1 integration pattern.

What's added

A ResumableDatabricksSubmitRunOperator subclassing the existing DatabricksSubmitRunOperator. The subclass:

  1. Persists self.run_id to AIP-103 task_state immediately after submit_run.
  2. On the next attempt, reads the prior run_id from task_state and skips submit_run — reconnects to the existing Databricks job.
  3. Installs a SIGTERM handler during execute that raises AirflowTaskCheckpointed instead of letting the default on_kill cancel the Databricks run.
  4. Overrides on_kill to be a no-op so non-SIGTERM kill paths also preserve the run.
class ResumableDatabricksSubmitRunOperator(DatabricksSubmitRunOperator):
    RESUME_KEY = "databricks_run_id"

    def execute(self, context: Context):
        prior_run_id = context["task_state"].get(self.RESUME_KEY)
        if prior_run_id is not None:
            self.run_id = prior_run_id  # reconnect
        else:
            # ... submit ...
            self.run_id = self._hook.submit_run(json_normalised)
            context["task_state"].set(self.RESUME_KEY, self.run_id)

        def _on_sigterm(signum, frame):
            raise AirflowTaskCheckpointed(checkpoint_data={"run_id": self.run_id})

        # ... install handler, run polling, clear task_state on success

The same pattern applies to EMR, Spark-on-K8s, Beam, Dataproc, Flink — anywhere there's a long-running external job referenced by an ID.

Reviewer questions

  1. Demo location. Currently in providers/databricks/tests/system/databricks/ (matches existing Databricks examples). Better as a real operator in providers/databricks/.../operators/databricks.py shipping alongside the parent? Or a resumable=True flag on the existing operator?
  2. Helper class. AIP-96 v2 ships only the primitive (state + exception + listener). Should v1 also ship a generic ResumableExternalJobOperator base class so per-provider integrations are 3 lines instead of ~50? The demo deliberately writes the wiring inline to show what the primitive alone needs.
  3. Framework-side SIGTERM handling. Should the framework install the SIGTERM-to-AirflowTaskCheckpointed handler automatically (operator just declares resume_key), or is operator-author control important?
  4. on_kill semantics. Should the framework provide a flag like cancel_external_on_kill: bool = True that operators flip to opt out of cancel-on-kill, instead of overriding on_kill per operator?

Stacking

Depends on:

E2e test plan

Three layers, ordered by cost:

  1. Unit test (in-PR, future commit) — pytest with unittest.mock patching the Databricks hook. Asserts:

    • First call: _hook.submit_run invoked, task_state["databricks_run_id"] set.
    • SIGTERM during poll → raises AirflowTaskCheckpointed.
    • Second call (after CHECKPOINTED): task_state["databricks_run_id"] read, _hook.submit_run NOT called, polling resumes with prior run_id.
    • Success path: task_state["databricks_run_id"] cleared.
    • No real Databricks workspace required.
  2. Framework-level integration test (no Databricks) — a separate test fixture using an in-process simulator (sleep loop + signal handler) that exercises the full state machine: worker → supervisor → API server → DB row at state=checkpointed → next attempt resumes. Lives in task-sdk/tests/ or similar; reuses the AIP-96 supervisor-wiring code path (AIP-96: wire CHECKPOINTED through supervisor → API server → DB #66445). This proves the framework wiring without provider dependencies.

  3. Manual verification (real Databricks) — runbook (not for CI):

    1. airflow standalone with this branch + the AIP-96 PR set composed.
    2. Trigger example_resumable_databricks DAG against a real workspace.
    3. While the task is RUNNING, send SIGTERM to the worker process.
    4. Observe task transitions to CHECKPOINTED on the UI.
    5. Scheduler dispatches next attempt.
    6. Operator reads prior run_id from task_state, reconnects to the still-running Databricks job, completes without resubmission.
    7. Assert in Databricks UI: only one run was submitted (no duplicate).

Layer 1 + 2 are achievable in CI; layer 3 is documentation-only.

Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set
(apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring).
NOT FOR MERGE.

Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of
DatabricksSubmitRunOperator that survives worker disruption by:
  1. Persisting self.run_id to AIP-103 task_state after submit_run.
  2. Reading prior run_id on next attempt and reconnecting (no resubmit).
  3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting
     on_kill cancel the Databricks run.

Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is
introduced by apache#66402 and not yet on main; this branch is rebased onto
main so the diff shows only the new file. The file is self-explanatory
as a design illustration; CI on this branch will not pass alone.
@1fanwang 1fanwang force-pushed the 1fanwang/aip96-resumable-operator-demo branch from ffffc20 to 55991f5 Compare May 6, 2026 06:13
Layer 1: test_resumable_databricks_demo.py (~145 lines)
  - Mock-based pytest (no real Databricks workspace).
  - Asserts the resume contract:
    - First execute(): submit_run called, run_id persisted.
    - Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
    - Second execute() (after CHECKPOINTED): submit_run NOT called; prior
      run_id reused; task_state cleared on success.
    - on_kill on the resumable variant does NOT cancel the Databricks run.

Layer 2: test_aip96_resumable_pattern.py (~155 lines)
  - Provider-agnostic, in-process simulator.
  - Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
    per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
    repeat-disruption-cycles preserve external_id.

All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant