Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6855,6 +6855,11 @@
"Duplicate streaming source names detected: <names>. Each streaming source must have a unique name."
]
},
"INVALID_SINK_NAME" : {
"message" : [
"Invalid streaming sink name: '<sinkName>'. Sink names must only contain ASCII letters ('a'-'z', 'A'-'Z'), digits ('0'-'9'), and underscores ('_')."
]
},
"INVALID_SOURCE_NAME" : {
"message" : [
"Invalid streaming source name '<sourceName>'. Source names must only contain ASCII letters (a-z, A-Z), digits (0-9), and underscores (_)."
Expand All @@ -6865,6 +6870,11 @@
"Streaming source naming is not supported. Source name '<name>' was provided but the feature is disabled. Please enable the feature by setting spark.sql.streaming.queryEvolution.enableSourceEvolution to true."
]
},
"UNNAMED_STREAMING_SINKS_WITH_ENFORCEMENT" : {
"message" : [
"Streaming sink must be named when spark.sql.streaming.queryEvolution.enableSinkEvolution is enabled. Use the name() method on DataStreamWriter to assign a name to the streaming sink."
]
},
"UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT" : {
"message" : [
"All streaming sources must be named when spark.sql.streaming.queryEvolution.enableSourceEvolution is enabled. Unnamed sources found: <sourceInfo>. Use the name() method to assign names to all streaming sources."
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.artifact.ArtifactManager.cachedBlockIdList"),

// [SPARK-54323][PYTHON] Change the way to access logs to TVF instead of system view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.TableValuedFunction.python_worker_logs"),

// [SPARK-56719][SS] Add DataStreamWriter.name() API for sink evolution
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.DataStreamWriter.name")
)

// Default exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package org.apache.spark.sql.streaming

import java.util.concurrent.TimeoutException

import scala.util.matching.Regex

import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql.{Dataset, ForeachWriter, WriteConfigMethods}
import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter, WriteConfigMethods}

/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -90,6 +92,19 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T
*/
def queryName(queryName: String): this.type

/**
* Assigns a name to this streaming sink for sink evolution capability. When sinks are named,
* they can be tracked in checkpoint metadata, enabling query evolution.
*
* If not specified, sinks are automatically assigned a default name based on their position in
* the query, which maintains backward compatibility.
*
* @param sinkName
* the unique name for this sink (alphanumeric and underscore only)
* @since 4.1.0
*/
private[sql] def name(sinkName: String): this.type

/**
* Specifies the underlying output data source.
*
Expand Down Expand Up @@ -217,6 +232,26 @@ abstract class DataStreamWriter[T] extends WriteConfigMethods[DataStreamWriter[T
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery

/**
* Validates that a streaming sink name only contains alphanumeric characters and underscores.
*
* @param sinkName
* the sink name to validate
* @throws AnalysisException
* if the sink name contains invalid characters
*/
private[sql] def validateSinkName(sinkName: String): Unit = {
require(sinkName != null, "Sink name cannot be null")
require(sinkName.nonEmpty, "Sink name cannot be empty")

val validNamePattern: Regex = "^[a-zA-Z0-9_]+$".r
if (!validNamePattern.pattern.matcher(sinkName).matches()) {
throw new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
messageParameters = Map("sinkName" -> sinkName))
}
}

///////////////////////////////////////////////////////////////////////////////////////
// Covariant Overrides
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.streaming.OutputMode
*/
case class WriteToStream(
name: String,
sinkName: Option[String],
resolvedCheckpointLocation: String,
sink: Table,
outputMode: OutputMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
* rule [[ResolveStreamWrite]].
*
* @param userSpecifiedName Query name optionally specified by the user.
* @param userSpecifiedSinkName Sink name optionally specified by the user for sink evolution.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
* @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user
* has not specified one. If false, then error will be thrown.
Expand All @@ -47,6 +48,7 @@ import org.apache.spark.sql.streaming.{OutputMode, Trigger}
*/
case class WriteToStreamStatement(
userSpecifiedName: Option[String],
userSpecifiedSinkName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2545,6 +2545,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("sourceName" -> sourceName))
}

def invalidStreamingSinkNameError(sinkName: String): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SINK_NAME",
messageParameters = Map("sinkName" -> sinkName))
}

def duplicateStreamingSourceNamesError(duplicateNames: Seq[String]): Throwable = {
new AnalysisException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3143,6 +3143,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ENABLE_STREAMING_SINK_EVOLUTION =
buildConf("spark.sql.streaming.queryEvolution.enableSinkEvolution")
.internal()
.doc("When true, streaming sinks can be named using the name() API on DataStreamWriter. " +
"This enables sink evolution capability where sinks can be changed while maintaining " +
"a historical record of all sinks used in the checkpoint.")
.version("4.1.0")
.booleanConf
.createWithDefault(false)

val STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART =
buildConf("spark.sql.streaming.checkUnfinishedRepartitionOnRestart")
.internal()
Expand Down Expand Up @@ -7673,6 +7683,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def enableStreamingSourceEvolution: Boolean = getConf(ENABLE_STREAMING_SOURCE_EVOLUTION)

def enableStreamingSinkEvolution: Boolean = getConf(ENABLE_STREAMING_SINK_EVOLUTION)

def streamingCheckUnfinishedRepartitionOnRestart: Boolean =
getConf(STREAMING_CHECK_UNFINISHED_REPARTITION_ON_RESTART)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T])
this
}

/** @inheritdoc */
private[sql] def name(sinkName: String): this.type = {
throw new UnsupportedOperationException("Sink naming is not supported in Spark Connect")
}

/** @inheritdoc */
def format(source: String): this.type = {
sinkBuilder.setFormat(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
this
}

/** @inheritdoc */
private[sql] def name(sinkName: String): this.type = {
validateSinkName(sinkName)
this.sinkName = Some(sinkName)
this
}

/** @inheritdoc */
def format(source: String): this.type = {
this.source = source
Expand Down Expand Up @@ -312,6 +319,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D

ds.sparkSession.sessionState.streamingQueryManager.startQuery(
newOptions.get("queryName"),
sinkName,
newOptions.get("checkpointLocation"),
ds,
newOptions.originalMap,
Expand Down Expand Up @@ -444,6 +452,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D
private var partitioningColumns: Option[Seq[String]] = None

private var clusteringColumns: Option[Seq[String]] = None

private var sinkName: Option[String] = None
}

object DataStreamWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class StreamingQueryManager private[sql] (
// scalastyle:off argcount
private def createQuery(
userSpecifiedName: Option[String],
userSpecifiedSinkName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: Dataset[_],
extraOptions: Map[String, String],
Expand Down Expand Up @@ -207,6 +208,7 @@ class StreamingQueryManager private[sql] (

val dataStreamWritePlan = WriteToStreamStatement(
userSpecifiedName,
userSpecifiedSinkName,
userSpecifiedCheckpointLocation,
useTempCheckpointLocation,
recoverFromCheckpointLocation,
Expand Down Expand Up @@ -277,6 +279,7 @@ class StreamingQueryManager private[sql] (
@throws[TimeoutException]
private[sql] def startQuery(
userSpecifiedName: Option[String],
userSpecifiedSinkName: Option[String] = None,
userSpecifiedCheckpointLocation: Option[String],
df: Dataset[_],
extraOptions: Map[String, String],
Expand All @@ -290,6 +293,7 @@ class StreamingQueryManager private[sql] (
catalogTable: Option[CatalogTable] = None): StreamingQuery = {
val query = createQuery(
userSpecifiedName,
userSpecifiedSinkName,
userSpecifiedCheckpointLocation,
df,
extraOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService:
* the async write of the batch is completed. Future may also be completed exceptionally
* to indicate some write error.
*/
def addAsync(batchId: Long, metadata: CommitMetadata): CompletableFuture[Long] = {
def addAsync(batchId: Long, metadata: CommitMetadataBase): CompletableFuture[Long] = {
require(metadata != null, "'null' metadata cannot be written to a metadata log")
val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output =>
serialize(metadata, output)
Expand All @@ -72,7 +72,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService:
* @param metadata metadata of batch to write
* @return true if operation is successful otherwise false.
*/
def addInMemory(batchId: Long, metadata: CommitMetadata): Boolean = {
def addInMemory(batchId: Long, metadata: CommitMetadataBase): Boolean = {
if (batchCache.containsKey(batchId)) {
false
} else {
Expand Down
Loading