[DEMO ONLY] AIP-96: resumable operator example for discussion#66453
Draft
1fanwang wants to merge 2 commits intoapache:mainfrom
Draft
[DEMO ONLY] AIP-96: resumable operator example for discussion#664531fanwang wants to merge 2 commits intoapache:mainfrom
1fanwang wants to merge 2 commits intoapache:mainfrom
Conversation
Discussion-only — companion to AIP-96 v2 (cwiki) and the AIP-96 PR set (apache#66402 foundation, apache#66410 listener hook, apache#66445 supervisor wiring). NOT FOR MERGE. Demonstrates the v1 integration pattern from AIP-96 v2: a subclass of DatabricksSubmitRunOperator that survives worker disruption by: 1. Persisting self.run_id to AIP-103 task_state after submit_run. 2. Reading prior run_id on next attempt and reconnecting (no resubmit). 3. Converting SIGTERM into AirflowTaskCheckpointed instead of letting on_kill cancel the Databricks run. Imports AirflowTaskCheckpointed from airflow.sdk.exceptions, which is introduced by apache#66402 and not yet on main; this branch is rebased onto main so the diff shows only the new file. The file is self-explanatory as a design illustration; CI on this branch will not pass alone.
ffffc20 to
55991f5
Compare
Layer 1: test_resumable_databricks_demo.py (~145 lines)
- Mock-based pytest (no real Databricks workspace).
- Asserts the resume contract:
- First execute(): submit_run called, run_id persisted.
- Disruption during poll raises AirflowTaskCheckpointed; run_id persists.
- Second execute() (after CHECKPOINTED): submit_run NOT called; prior
run_id reused; task_state cleared on success.
- on_kill on the resumable variant does NOT cancel the Databricks run.
Layer 2: test_aip96_resumable_pattern.py (~155 lines)
- Provider-agnostic, in-process simulator.
- Demonstrates the AIP-96 + AIP-103 primitives compose correctly without
per-provider code: submit-once, persist-via-task_state, reconnect-on-resume,
repeat-disruption-cycles preserve external_id.
All 8 tests pass against the AIP-96 stack code (apache#66402, apache#66410, apache#66445).
On this rebased-onto-main demo branch the tests will fail to import
AirflowTaskCheckpointed; that's expected — the discussion artifact is
the design, the working tests are evidence.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What this is
A discussion-only demo PR — companion to the AIP-96 v2 cwiki page and the AIP-96 PR set (#66402, #66410, #66445). Not for merge. Rebased onto
mainso the diff is one file:providers/databricks/tests/system/databricks/example_resumable_databricks.py(~170 lines).The file imports
AirflowTaskCheckpointedfromairflow.sdk.exceptions, which is introduced by #66402 and not yet onmain; this branch's CI will not pass standalone. The file is self-explanatory as a design illustration of the v1 integration pattern.What's added
A
ResumableDatabricksSubmitRunOperatorsubclassing the existingDatabricksSubmitRunOperator. The subclass:self.run_idto AIP-103task_stateimmediately aftersubmit_run.run_idfromtask_stateand skipssubmit_run— reconnects to the existing Databricks job.AirflowTaskCheckpointedinstead of letting the defaulton_killcancel the Databricks run.on_killto be a no-op so non-SIGTERM kill paths also preserve the run.The same pattern applies to EMR, Spark-on-K8s, Beam, Dataproc, Flink — anywhere there's a long-running external job referenced by an ID.
Reviewer questions
providers/databricks/tests/system/databricks/(matches existing Databricks examples). Better as a real operator inproviders/databricks/.../operators/databricks.pyshipping alongside the parent? Or aresumable=Trueflag on the existing operator?ResumableExternalJobOperatorbase class so per-provider integrations are 3 lines instead of ~50? The demo deliberately writes the wiring inline to show what the primitive alone needs.AirflowTaskCheckpointedhandler automatically (operator just declaresresume_key), or is operator-author control important?on_killsemantics. Should the framework provide a flag likecancel_external_on_kill: bool = Truethat operators flip to opt out of cancel-on-kill, instead of overridingon_killper operator?Stacking
Depends on:
E2e test plan
Three layers, ordered by cost:
Unit test (in-PR, future commit) — pytest with
unittest.mockpatching the Databricks hook. Asserts:_hook.submit_runinvoked,task_state["databricks_run_id"]set.AirflowTaskCheckpointed.task_state["databricks_run_id"]read,_hook.submit_runNOT called, polling resumes with prior run_id.task_state["databricks_run_id"]cleared.Framework-level integration test (no Databricks) — a separate test fixture using an in-process simulator (sleep loop + signal handler) that exercises the full state machine: worker → supervisor → API server → DB row at
state=checkpointed→ next attempt resumes. Lives intask-sdk/tests/or similar; reuses the AIP-96 supervisor-wiring code path (AIP-96: wire CHECKPOINTED through supervisor → API server → DB #66445). This proves the framework wiring without provider dependencies.Manual verification (real Databricks) — runbook (not for CI):
airflow standalonewith this branch + the AIP-96 PR set composed.example_resumable_databricksDAG against a real workspace.run_idfromtask_state, reconnects to the still-running Databricks job, completes without resubmission.Layer 1 + 2 are achievable in CI; layer 3 is documentation-only.