Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6935,6 +6935,11 @@
"Streaming query evolution error:"
],
"subClass" : {
"CANNOT_ENABLE_ON_EXISTING_CHECKPOINT" : {
"message" : [
"Cannot enable streaming source evolution on a checkpoint that was created without it. The existing checkpoint uses offset log format version <existingVersion>, which does not support the named source tracking required by streaming source evolution. To use source evolution, start the query with a fresh checkpoint."
]
},
"DUPLICATE_SOURCE_NAMES" : {
"message" : [
"Duplicate streaming source names detected: <names>. Each streaming source must have a unique name."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("names" -> duplicateNames.mkString(", ")))
}

def cannotEnableSourceEvolutionOnExistingCheckpointError(existingVersion: Int): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
messageParameters = Map("existingVersion" -> existingVersion.toString))
}

def columnNotFoundInExistingColumnsError(
columnType: String, columnName: String, validColumnNames: Seq[String]): Throwable = {
new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.checkpointing

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

/**
* Case class for managing the internal versioning for the streaming checkpoint. Versions are
* tracked per component (currently just the offset log; other components are managed elsewhere via
* dedicated configs).
*/
case class StreamingCheckpointVersion(offsetLogVersion: Int) {
override def toString: String = {
s"StreamingCheckpointVersion(offsetLogVersion: $offsetLogVersion)"
}
}

sealed trait CheckpointLogType
case object OffsetLogType extends CheckpointLogType

/**
* The `CheckpointVersionManager` is responsible for managing the versioning of the streaming
* checkpoint. It determines which version of each system-managed log format to use when starting
* a streaming query, and validates that the requested feature set is compatible with the existing
* checkpoint when restarting.
*
* Writer versions are typically used only while starting a new streaming query and are not
* intended to be exposed directly to users; once set, they are not intended to change for the
* lifetime of the query.
*/
object CheckpointVersionManager extends Logging {

// Streaming checkpoint writer version 1: base version supporting DataFrame-based streaming
// queries across the standard trigger types.
private val CHECKPOINT_VERSION_V1 = StreamingCheckpointVersion(OffsetSeqLog.VERSION_1)

// The current version of the streaming checkpoint. To bump this, define a new
// `StreamingCheckpointVersion` instance with the new per-component version numbers and update
// this constant.
private val CURRENT_VERSION = CHECKPOINT_VERSION_V1

def getCurrentVersion(): StreamingCheckpointVersion = CURRENT_VERSION

/**
* Returns the offset log format version to use for a new streaming query. We take the max of:
* - the current default version
* - the minimum required version implied by enabled features (e.g. streaming source evolution
* requires [[OffsetSeqLog.VERSION_2]] for OffsetMap-based named source tracking)
* - the configured version (via [[SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION]])
*
* @param sparkSessionForStream the cloned `SparkSession` for the streaming query
*/
private def getOffsetLogVersion(sparkSessionForStream: SparkSession): Int = {
val currentDefaultVersion = getCurrentVersion().offsetLogVersion
val minRequiredVersion = getMinRequiredOffsetLogVersion(sparkSessionForStream)
val configuredVersion = sparkSessionForStream.sessionState.conf.streamingOffsetLogFormatVersion
val result = List[Int](currentDefaultVersion, minRequiredVersion, configuredVersion).max
logInfo(s"Retrieved offset log writer version=$result")
result
}

/**
* Minimum offset log format version required by the features enabled on this session. Streaming
* source evolution relies on the OffsetMap (sourceId -> offset) format, which is only available
* in [[OffsetSeqLog.VERSION_2]].
*/
private def getMinRequiredOffsetLogVersion(sparkSessionForStream: SparkSession): Int = {
if (sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
OffsetSeqLog.VERSION_2
} else {
OffsetSeqLog.VERSION_1
}
}

/**
* Set the SparkSession configurations for the offset log format version.
*/
private def setSparkSessionConfigsForOffsetLog(
sparkSessionForStream: SparkSession,
offsetLogFormatVersion: Int): Unit = {
sparkSessionForStream.conf.set(
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
}

/**
* Returns the format version for the given log type. Reads any feature-driven minimums from the
* `sparkSessionForStream` config, which must be initialized before calling.
*/
def getFormatVersionFromSession(
sparkSessionForStream: SparkSession,
logType: CheckpointLogType): Int = {
logType match {
case OffsetLogType => getOffsetLogVersion(sparkSessionForStream)
}
}

/**
* Determines the offset log format version for this query run. For existing queries, reads from
* the last written offset log entry. For new queries, delegates to the session config (honoring
* any feature-driven minimums).
*
* Also validates that the session config is compatible with the existing checkpoint. Currently,
* enabling streaming source evolution on a checkpoint whose offset log is below VERSION_2 is
* rejected, since the OffsetMap-based named source tracking required by source evolution is not
* available in earlier versions.
*/
def resolveOffsetLogVersion(
sparkSessionForStream: SparkSession,
latestStartedBatch: Option[(Long, OffsetSeqBase)]): Int = {
latestStartedBatch match {
case Some((_, offsetSeq)) =>
val existingVersion = offsetSeq.version
if (existingVersion < OffsetSeqLog.VERSION_2 &&
sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution) {
throw QueryCompilationErrors.cannotEnableSourceEvolutionOnExistingCheckpointError(
existingVersion)
}
existingVersion
case None =>
getFormatVersionFromSession(sparkSessionForStream, OffsetLogType)
}
}

def setFormatVersion(
sparkSessionForStream: SparkSession,
logType: CheckpointLogType,
version: Int): Unit = {
logType match {
case OffsetLogType =>
setSparkSessionConfigsForOffsetLog(sparkSessionForStream, version)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CheckpointVersionManager, CommitMetadata, OffsetLogType, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
Expand Down Expand Up @@ -470,20 +470,21 @@ class MicroBatchExecution(
sourceIdMap
)

// Read the offset log format version from the last written offset log entry. If no entries
// are found, use the set/default value from the config.
val offsetLogFormatVersion = if (latestStartedBatch.isDefined) {
latestStartedBatch.get._2.version
} else {
// For existing queries, the offset log format version is read from the last written offset log
// entry; for new queries, it is resolved from the session config (honoring any feature-driven
// minimums). Restarting with an incompatible feature set on an existing checkpoint is rejected
// inside the manager.
if (latestStartedBatch.isEmpty) {
// If no offset log entries are found, assert that the query does not have any committed
// batches to be extra safe.
assert(lastCommittedBatchId == -1L)
sparkSessionForStream.conf.get(SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION)
}
val offsetLogFormatVersion = CheckpointVersionManager.resolveOffsetLogVersion(
sparkSessionForStream, latestStartedBatch)

// Set the offset log format version in the sparkSessionForStream conf
sparkSessionForStream.conf.set(
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key, offsetLogFormatVersion)
// Persist the resolved offset log format version on the streaming session.
CheckpointVersionManager.setFormatVersion(
sparkSessionForStream, OffsetLogType, offsetLogFormatVersion)

val execCtx = new MicroBatchExecutionContext(id, runId, name, triggerClock, sources, sink,
progressReporter, -1, sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import java.io.File

import org.scalatest.Tag

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeq, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata}
import org.apache.spark.sql.execution.streaming.runtime.{LongOffset, MemoryStream, SerializedOffset}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -368,6 +370,51 @@ class OffsetSeqLogSuite extends SharedSparkSession {
}
}

test("enabling source evolution on an existing V1 checkpoint is rejected") {
withTempDir { checkpointDir =>
withTempDir { outputDir =>
val inputData = MemoryStream[Int]

// Start query without source evolution, writing V1 offset log entries.
val query1 = inputData.toDF()
.writeStream
.format("parquet")
.option("path", outputDir.getAbsolutePath)
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()
inputData.addData(1, 2)
query1.processAllAvailable()
query1.stop()

val offsetLog = new OffsetSeqLog(spark, s"${checkpointDir.getAbsolutePath}/offsets")
val initialBatch = offsetLog.getLatest()
assert(initialBatch.isDefined)
assert(initialBatch.get._2.version === 1)
assert(initialBatch.get._2.isInstanceOf[OffsetSeq])

// Restart with the source evolution session flag enabled. The existing V1 checkpoint does
// not support OffsetMap-based named source tracking, so the query must fail loudly rather
// than silently downgrading the user's session config.
withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
val query2 = inputData.toDF()
.writeStream
.format("parquet")
.option("path", outputDir.getAbsolutePath)
.option("checkpointLocation", checkpointDir.getAbsolutePath)
.start()
val ex = intercept[StreamingQueryException] {
inputData.addData(3, 4)
query2.processAllAvailable()
}
checkError(
exception = ex.cause.asInstanceOf[AnalysisException],
condition = "STREAMING_QUERY_EVOLUTION_ERROR.CANNOT_ENABLE_ON_EXISTING_CHECKPOINT",
parameters = Map("existingVersion" -> "1"))
}
}
}
}

test("SPARK-55131: offset log records defaults to merge operator version 2") {
val offsetSeqMetadata = OffsetSeqMetadata.apply(batchWatermarkMs = 0, batchTimestampMs = 0,
spark.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.Tag

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeqLog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.Trigger._
Expand Down Expand Up @@ -211,6 +212,33 @@ class StreamingSourceEvolutionSuite extends StreamTest {
// Metadata Path Tests
// =======================

testWithSourceEvolution("offset log uses VERSION_2 when source evolution is enabled") {
LastOptions.clear()

val checkpointLocation = new Path(newMetadataDir)

val df1 = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.name("source1")
.load()

val q = df1.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", checkpointLocation.toString)
.trigger(ProcessingTime(10.seconds))
.start()
q.processAllAvailable()
q.stop()

val offsetLog = new OffsetSeqLog(spark, s"$checkpointLocation/offsets")
val latestBatch = offsetLog.getLatest()
assert(latestBatch.isDefined, "Offset log should have at least one entry")
val (_, offsetSeq) = latestBatch.get
assert(offsetSeq.isInstanceOf[OffsetMap],
s"Expected OffsetMap but got ${offsetSeq.getClass.getSimpleName}")
assert(offsetSeq.version === 2, s"Expected version 2 but got ${offsetSeq.version}")
}

testWithSourceEvolution("named sources - metadata path uses source name") {
LastOptions.clear()

Expand Down Expand Up @@ -506,13 +534,12 @@ class StreamingSourceEvolutionSuite extends StreamTest {

/**
* Helper method to run tests with source evolution enabled.
* Sets offset log format to V2 (OffsetMap) since named sources require it.
* Enabling source evolution automatically forces offset log format V2 (OffsetMap) for new
* queries, since named sources require it.
*/
def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => Any): Unit = {
test(testName, testTags: _*) {
withSQLConf(
SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true",
SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") {
withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update any other tests that were setting both configs ?

testBody
}
}
Expand Down