Skip to content

[SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution#56020

Open
ericm-db wants to merge 6 commits into
apache:masterfrom
ericm-db:sink-evolution-mbe-wiring
Open

[SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution#56020
ericm-db wants to merge 6 commits into
apache:masterfrom
ericm-db:sink-evolution-mbe-wiring

Conversation

@ericm-db
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Wire the sink name through MicroBatchExecution so that, when sink evolution is enabled, each committed batch writes a CommitMetadataV3 whose sinkMetadataMap records the current sink as the active entry alongside any sinks that were active in earlier batches:

  • Add a per-execution sinkMetadataMap that is hydrated from the latest CommitMetadataV3 in populateStartOffsets.
  • When spark.sql.streaming.queryEvolution.enableSinkEvolution is true, the commit-log write in runBatch produces CommitMetadataV3 with every prior entry marked isActive = false and the current (sinkName, sink.getClass.getName) entered as isActive = true.
  • When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged.

This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred.

This PR is built on top of #56019 (SPARK-56971), which itself sits on #56018 (SPARK-56970). It currently shows the upstream commits in its diff; that will resolve as the predecessors merge.

Why are the changes needed?

SPARK-56719 introduced the DataStreamWriter.name() API and the in-memory sinkName plumbing inside MicroBatchExecution, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable.

Does this PR introduce any user-facing change?

Behavior change only when enableSinkEvolution is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off.

How was this patch tested?

Added four new tests in StreamingSinkEvolutionSuite:

  • V3 commit log records the active sink for a named query.
  • Renaming the sink across a restart retains the previous sink as isActive = false and marks the new one active.
  • With sink evolution disabled, the commit log remains V1/V2.
  • Enabling sink evolution on a checkpoint that previously used V1/V2 transparently upgrades to V3 on the next commit.

Existing StreamingSinkEvolutionSuite, CommitLogSuite, MicroBatchExecutionSuite, and AsyncProgressTrackingMicroBatchExecutionSuite all pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

ericm-db added 6 commits May 4, 2026 12:34
…ception for sink evolution

Adds the MiMa `ReversedMissingMethodProblem` exclusion for the newly added
`DataStreamWriter.name()` API, and registers the new
`spark.sql.streaming.queryEvolution.enableSinkEvolution` SQL config in the
binding-policy exceptions file (consistent with its `enableSourceEvolution`
sibling).

Co-authored-by: Isaac
…2 case classes

Refactor `CommitLog` so that the commit log metadata is dispatched through a
`CommitMetadataBase` trait with concrete `CommitMetadata` (V1, watermark only)
and `CommitMetadataV2` (watermark + `stateUniqueIds`) case classes. The
deserializer now reads the wire-format version from the file header and
constructs the matching subclass.

This is preparation for `CommitMetadataV3` (which adds sink metadata for
streaming sink evolution) in a follow-up PR.

Notable changes:
- Add `CommitMetadataBase` trait and `CommitMetadataV2` case class.
- `CommitMetadata` becomes V1 (no `stateUniqueIds` field).
- Add `CommitLog.createMetadata` factory that dispatches by version and
  defaults to the configured `STATE_STORE_CHECKPOINT_FORMAT_VERSION`.
- `CommitLog.readCommitMetadata` reads the version line and constructs the
  matching subclass.
- `MicroBatchExecution`, `OfflineStateRepartitionRunner`, and the existing
  tests are updated to use the new types / factory.

The pre-refactor `CommitMetadata` carried both the V1 and V2 wire shape in a
single case class, with `stateUniqueIds` optional. That made it awkward to
add a V3 wire format with additional fields, and forced `serialize` to
take the wire version from `SQLConf` rather than from the metadata itself.

No new public API. The wire format for V1 changes slightly: V1 commit log
files no longer serialize `stateUniqueIds: null`. Old V1 files continue to
be read because the V1 deserializer ignores the (now-unknown) field.

This PR also relaxes the version-exact-match check on read so that a
commit log opened with the V2 conf can deserialize a V1 file. This
incidentally resolves SPARK-50653.

- Existing `CommitLogSuite` (V1, V2, and cross-version) passes; the
  cross-version test now asserts successful V1 deserialization.
- `StreamingSinkEvolutionSuite` (from SPARK-56719) still passes.

Generated-by: Claude Code (claude-opus-4-7)

Co-authored-by: Isaac
…evolution

Add the commit log data structures for streaming sink evolution:

- `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a
  `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in
  addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`).
- `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset`
  (serialized via `OffsetV2.json()`), `providerName`, and an `isActive`
  flag used to distinguish the current sink from historical sinks that
  were used in earlier batches but are no longer in use.
- `CommitMetadataV3.activeSinkMetadataInfoOpt` returns the entry with
  `isActive = true`, if any.
- `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when
  `commitLogFormatVersion = VERSION_3`, requiring a non-empty
  `sinkMetadataMap`.
- `CommitLog.readCommitMetadata` dispatches `v3` files to the new class.

The V3 metadata is dormant in this PR: no caller produces it yet.
Wiring through `MicroBatchExecution` (so each batch persists its sink
name + offset, and so restarts read the map back and validate the sink
identity) is a follow-up.

SPARK-56719 added `DataStreamWriter.name()` as the API surface for
sink evolution. Without a place in the commit log to durably record the
sink name and offset alongside the rest of a committed batch's metadata,
sink names cannot be observed on restart and the evolution feature
cannot be completed. This PR introduces that storage in a separate,
narrowly scoped change.

No. `CommitMetadataV3` is in the internal
`org.apache.spark.sql.execution.streaming.checkpointing` package and is
not produced by any code path yet.

Added unit tests in `CommitLogSuite`:
- V3 SerDe with a single active sink (round-trips through commit log).
- V3 retains historical sinks alongside the active one and
  `activeSinkMetadataInfoOpt` resolves correctly.
- `createMetadata(version = V3, sinkMetadataMap = Map.empty)` fails fast
  with `IllegalArgumentException`.

Generated-by: Claude Code (claude-opus-4-7)

Co-authored-by: Isaac
…ecution

### What changes were proposed in this pull request?

Wire the sink name through `MicroBatchExecution` so that, when sink
evolution is enabled, each committed batch writes a
`CommitMetadataV3` whose `sinkMetadataMap` records the current sink as
the active entry alongside any sinks that were active in earlier
batches:

- Add a per-execution `sinkMetadataMap` that is hydrated from the latest
  `CommitMetadataV3` in `populateStartOffsets`.
- When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true,
  the commit-log write in `runBatch` produces `CommitMetadataV3` with
  every prior entry marked `isActive = false` and the current
  `(sinkName, sink.getClass.getName)` entered as `isActive = true`.
- When sink evolution is disabled, the existing V1/V2 commit-log path
  is preserved unchanged.

This is the minimal write-then-read parity for the sink evolution
feature added in SPARK-56719. Provider-mismatch and sink-reuse
validation are intentionally deferred.

### Why are the changes needed?

SPARK-56719 introduced the `DataStreamWriter.name()` API and the
in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the
sink name was not yet persisted to the checkpoint. Without
persistence, restarts cannot observe historical sink identity and the
feature is not durable.

### Does this PR introduce _any_ user-facing change?

Behavior change only when `enableSinkEvolution` is true (off by
default): the commit log directory now contains V3 commit log files
instead of V1/V2 files. Wire format compatibility is preserved when
the flag is left off.

### How was this patch tested?

Added four new tests in `StreamingSinkEvolutionSuite`:
- V3 commit log records the active sink for a named query.
- Renaming the sink across a restart retains the previous sink as
  `isActive = false` and marks the new one active.
- With sink evolution disabled, the commit log remains V1/V2.
- Enabling sink evolution on a checkpoint that previously used
  V1/V2 transparently upgrades to V3 on the next commit.

Existing `StreamingSinkEvolutionSuite`, `CommitLogSuite`,
`MicroBatchExecutionSuite`, and
`AsyncProgressTrackingMicroBatchExecutionSuite` all pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

Co-authored-by: Isaac
@ericm-db ericm-db force-pushed the sink-evolution-mbe-wiring branch from d4e2af8 to d76cfb5 Compare May 20, 2026 23:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant