diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 0d7f2b1d0f3f4..d2def9cf0c247 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -20,19 +20,43 @@ package org.apache.spark.sql.catalyst.plans.logical import java.util.UUID import java.util.concurrent.TimeUnit +import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys.CONFIG import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark.updateEventTimeColumn import org.apache.spark.sql.catalyst.trees.TreePattern.{EVENT_TIME_WATERMARK, TreePattern, UPDATE_EVENT_TIME_WATERMARK_COLUMN} import org.apache.spark.sql.catalyst.util.IntervalUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval -object EventTimeWatermark { +object EventTimeWatermark extends Logging { /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */ val delayKey = "spark.watermarkDelayMs" + /** + * The effective delay in milliseconds for `withWatermark`. When the configured delay is 0 and + * [[SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS]] is enabled (the default), it is + * bumped to 1 ms. + * + * Spark's late-event filter and state-eviction predicates compare event times to the watermark + * with `event_time_us <= watermark_ms * 1000`, so with a delay of 0 every record whose event + * time lands on the same millisecond as the current watermark is treated as late. Bumping the + * effective delay to 1 ms makes the comparison strict in practice and avoids that footgun. + */ def getDelayMs(delay: CalendarInterval): Long = { - IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS) + val rawDelayMs = IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS) + if (rawDelayMs == 0 && + SQLConf.get.getConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS)) { + logWarning(log"withWatermark was called with a delay of 0; bumping the internal delay to " + + log"1 ms so that records whose event time equals the current watermark are not dropped " + + log"as late. Set " + + log"${MDC(CONFIG, SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key)}=false " + + log"to disable.") + 1L + } else { + rawDelayMs + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 77ef8bb600f9c..39c9aede751bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3689,6 +3689,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS = + buildConf("spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs") + .doc("When true, a zero watermark delay configured via withWatermark is internally " + + "bumped to 1 millisecond, and a warning is logged. Without this bump, the late-event " + + "predicate `event_time_us <= watermark_ms * 1000` drops every record whose event time " + + "lands on the same millisecond as the current watermark; with a delay of 0 seconds " + + "this means all but the first record per millisecond is dropped. Set to false to " + + "restore the pre-Spark 5.0 behavior. Non-zero delays are not affected.") + .version("5.0.0") + .booleanConf + .createWithDefault(true) + val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD = buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 59ac06ccadc79..03b8dd1fa2d19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -153,6 +153,47 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche matchPVals = true) } + test("zero-delay watermark keeps records at max event time across batches") { + // When the configured delay is 0 seconds, the late-event predicate `timestamp_us <= + // watermark_ms * 1000` drops every record at exactly the watermark, which means a record + // arriving in a later batch at the previous batch's max event time is dropped. Bumping the + // internal delay to 1 ms preserves such records. + def buildQuery(input: MemoryStream[Int]): Dataset[(Long, Long)] = input.toDF() + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", "0 seconds") + .groupBy($"eventTime") + .agg(count("*").as("cnt")) + .select($"eventTime".cast("long").as[Long], $"cnt".as[Long]) + + val input = MemoryStream[Int] + testStream(buildQuery(input), outputMode = Append)( + AddData(input, 10), + // With the bump, watermark = 10s - 1 ms after the batch, so the group for eventTime=10 + // is retained and nothing is emitted yet. + CheckAnswer(), + AddData(input, 10), + // Same timestamp as the previous batch's max: must NOT be dropped as late. + CheckAnswer(), + AddData(input, 11), + // Once the watermark strictly passes 10 s, both records show up in the emitted group. + CheckAnswer((10L, 2L)) + ) + + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { + val input2 = MemoryStream[Int] + testStream(buildQuery(input2), outputMode = Append)( + AddData(input2, 10), + // Legacy: watermark = 10s exactly, so the group is evicted and emitted immediately. + CheckAnswer((10L, 1L)), + AddData(input2, 10), + // The second record at 10 s is dropped by the late-event filter under legacy semantics. + CheckAnswer((10L, 1L)), + AddData(input2, 11), + CheckAnswer((10L, 1L), (11L, 1L)) + ) + } + } + test("withWatermark should work with alias-qualified column name") { // When a DataFrame has an alias, referencing the event time column via // "alias.columnName" should be allowed because it still refers to a top-level column. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala index cd901deae8e14..15b3c5b48d47f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala @@ -143,6 +143,10 @@ class MultiStatefulOperatorsSuite } test("agg -> agg -> agg, append mode") { + // The expected watermark progression and window emission boundaries below are calibrated + // against the legacy 0-delay semantics; disable the auto-bump so the test continues to + // exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { val inputData = MemoryStream[Int] val stream = inputData.toDF() @@ -224,6 +228,7 @@ class MultiStatefulOperatorsSuite assertNumStateRows(Seq(0, 0, 1)), assertNumRowsDroppedByWatermark(Seq(0, 0, 1)) ) + } } test("stream deduplication -> aggregation, append mode") { @@ -269,6 +274,9 @@ class MultiStatefulOperatorsSuite } test("join -> window agg, append mode") { + // The expected watermark progression below assumes the legacy 0-delay boundary; disable the + // auto-bump so the test continues to exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { val input1 = MemoryStream[Int] val inputDF1 = input1.toDF() .withColumnRenamed("value", "value1") @@ -334,9 +342,13 @@ class MultiStatefulOperatorsSuite assertNumStateRows(Seq(1, 0)), assertNumRowsDroppedByWatermark(Seq(0, 0)) ) + } } test("aggregation -> stream deduplication, append mode") { + // The expected watermark progression below assumes the legacy 0-delay boundary; disable the + // auto-bump so the test continues to exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { val inputData = MemoryStream[Int] val aggStream = inputData.toDF() @@ -415,37 +427,42 @@ class MultiStatefulOperatorsSuite assertNumStateRows(Seq(0, 1)), assertNumRowsDroppedByWatermark(Seq(0, 0)) ) + } } test("join with range join on non-time intervals -> window agg, append mode, shouldn't fail") { - val input1 = MemoryStream[Int] - val inputDF1 = input1.toDF() - .withColumnRenamed("value", "value1") - .withColumn("eventTime1", timestamp_seconds($"value1")) - .withColumn("v1", timestamp_seconds($"value1")) - .withWatermark("eventTime1", "0 seconds") - - val input2 = MemoryStream[(Int, Int)] - val inputDF2 = input2.toDS().toDF("start", "end") - .withColumn("eventTime2Start", timestamp_seconds($"start")) - .withColumn("start2", timestamp_seconds($"start")) - .withColumn("end2", timestamp_seconds($"end")) - .withWatermark("eventTime2Start", "0 seconds") - - val stream = inputDF1.join(inputDF2, - expr("v1 >= start2 AND v1 < end2 " + - "AND eventTime1 = start2"), "inner") - .groupBy(window($"eventTime1", "5 seconds") as Symbol("window")) - .agg(count("*") as Symbol("count")) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - - testStream(stream)( - AddData(input1, 1, 2, 3, 4), - AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)), - CheckNewAnswer(), - assertNumStateRows(Seq(1, 0)), - assertNumRowsDroppedByWatermark(Seq(0, 0)) - ) + // The expected watermark progression below assumes the legacy 0-delay boundary; disable the + // auto-bump so the test continues to exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { + val input1 = MemoryStream[Int] + val inputDF1 = input1.toDF() + .withColumnRenamed("value", "value1") + .withColumn("eventTime1", timestamp_seconds($"value1")) + .withColumn("v1", timestamp_seconds($"value1")) + .withWatermark("eventTime1", "0 seconds") + + val input2 = MemoryStream[(Int, Int)] + val inputDF2 = input2.toDS().toDF("start", "end") + .withColumn("eventTime2Start", timestamp_seconds($"start")) + .withColumn("start2", timestamp_seconds($"start")) + .withColumn("end2", timestamp_seconds($"end")) + .withWatermark("eventTime2Start", "0 seconds") + + val stream = inputDF1.join(inputDF2, + expr("v1 >= start2 AND v1 < end2 " + + "AND eventTime1 = start2"), "inner") + .groupBy(window($"eventTime1", "5 seconds") as Symbol("window")) + .agg(count("*") as Symbol("count")) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(stream)( + AddData(input1, 1, 2, 3, 4), + AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)), + CheckNewAnswer(), + assertNumStateRows(Seq(1, 0)), + assertNumRowsDroppedByWatermark(Seq(0, 0)) + ) + } } test("stream-stream time interval left outer join -> aggregation, append mode") { @@ -473,8 +490,11 @@ class MultiStatefulOperatorsSuite .selectExpr("CAST(window.start AS STRING) AS window_start", "CAST(window.end AS STRING) AS window_end", "cnt") - // for ease of verification, we change the session timezone to UTC - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + // for ease of verification, we change the session timezone to UTC. The expected eviction + // watermark below also assumes the legacy 0-delay boundary, so disable the auto-bump. + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", + SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { testStream(agg)( MultiAddData( (input1, Seq( @@ -733,8 +753,11 @@ class MultiStatefulOperatorsSuite .selectExpr("CAST(window.start AS STRING) AS window_start", "CAST(window.end AS STRING) AS window_end", "cnt") - // for ease of verification, we change the session timezone to UTC - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + // for ease of verification, we change the session timezone to UTC. The expected eviction + // watermark below also assumes the legacy 0-delay boundary, so disable the auto-bump. + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", + SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { testStream(agg)( MultiAddData( (input2, Seq(