From fe6fafa80f675b793d6fbd3aae6013b7f28ce101 Mon Sep 17 00:00:00 2001 From: Yuchen Liu <170372783+eason-yuchen-liu@users.noreply.github.com> Date: Thu, 21 May 2026 10:34:58 -0700 Subject: [PATCH] [WIP][SS] Stop dropping records with same event time when watermark delay is 0 When `withWatermark(col, "0 seconds")` is used, Spark's late-event predicate `event_time_us <= watermark_ms * 1000` treats records whose event time equals the previous batch's max event time as late and drops them. Two records with the same event time are then handled differently based on which batch carried them: the one that produced the max event time is kept, the one that arrives in the next batch is dropped. This change bumps a configured delay of 0 to 1 ms internally so the comparison becomes strict in practice and same-event-time records are no longer discriminated against. A warning is logged when the bump activates. Gated by a new default-on SQL conf `spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs`. Non-zero delays are unaffected. Added an `EventTimeWatermarkSuite` test that exercises both the new default and the legacy path. Existing tests in `MultiStatefulOperatorsSuite` that encode the legacy 0-delay boundary in their expected output are wrapped in `withSQLConf(... -> "false")` so they continue to validate that path. --- .../plans/logical/EventTimeWatermark.scala | 28 +++++- .../apache/spark/sql/internal/SQLConf.scala | 12 +++ .../streaming/EventTimeWatermarkSuite.scala | 41 +++++++++ .../MultiStatefulOperatorsSuite.scala | 87 ++++++++++++------- 4 files changed, 134 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 0d7f2b1d0f3f4..d2def9cf0c247 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -20,19 +20,43 @@ package org.apache.spark.sql.catalyst.plans.logical import java.util.UUID import java.util.concurrent.TimeUnit +import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys.CONFIG import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark.updateEventTimeColumn import org.apache.spark.sql.catalyst.trees.TreePattern.{EVENT_TIME_WATERMARK, TreePattern, UPDATE_EVENT_TIME_WATERMARK_COLUMN} import org.apache.spark.sql.catalyst.util.IntervalUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval -object EventTimeWatermark { +object EventTimeWatermark extends Logging { /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */ val delayKey = "spark.watermarkDelayMs" + /** + * The effective delay in milliseconds for `withWatermark`. When the configured delay is 0 and + * [[SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS]] is enabled (the default), it is + * bumped to 1 ms. + * + * Spark's late-event filter and state-eviction predicates compare event times to the watermark + * with `event_time_us <= watermark_ms * 1000`, so with a delay of 0 every record whose event + * time lands on the same millisecond as the current watermark is treated as late. Bumping the + * effective delay to 1 ms makes the comparison strict in practice and avoids that footgun. + */ def getDelayMs(delay: CalendarInterval): Long = { - IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS) + val rawDelayMs = IntervalUtils.getDuration(delay, TimeUnit.MILLISECONDS) + if (rawDelayMs == 0 && + SQLConf.get.getConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS)) { + logWarning(log"withWatermark was called with a delay of 0; bumping the internal delay to " + + log"1 ms so that records whose event time equals the current watermark are not dropped " + + log"as late. Set " + + log"${MDC(CONFIG, SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key)}=false " + + log"to disable.") + 1L + } else { + rawDelayMs + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 77ef8bb600f9c..39c9aede751bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3689,6 +3689,18 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS = + buildConf("spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs") + .doc("When true, a zero watermark delay configured via withWatermark is internally " + + "bumped to 1 millisecond, and a warning is logged. Without this bump, the late-event " + + "predicate `event_time_us <= watermark_ms * 1000` drops every record whose event time " + + "lands on the same millisecond as the current watermark; with a delay of 0 seconds " + + "this means all but the first record per millisecond is dropped. Set to false to " + + "restore the pre-Spark 5.0 behavior. Non-zero delays are not affected.") + .version("5.0.0") + .booleanConf + .createWithDefault(true) + val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD = buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 59ac06ccadc79..03b8dd1fa2d19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -153,6 +153,47 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche matchPVals = true) } + test("zero-delay watermark keeps records at max event time across batches") { + // When the configured delay is 0 seconds, the late-event predicate `timestamp_us <= + // watermark_ms * 1000` drops every record at exactly the watermark, which means a record + // arriving in a later batch at the previous batch's max event time is dropped. Bumping the + // internal delay to 1 ms preserves such records. + def buildQuery(input: MemoryStream[Int]): Dataset[(Long, Long)] = input.toDF() + .withColumn("eventTime", timestamp_seconds($"value")) + .withWatermark("eventTime", "0 seconds") + .groupBy($"eventTime") + .agg(count("*").as("cnt")) + .select($"eventTime".cast("long").as[Long], $"cnt".as[Long]) + + val input = MemoryStream[Int] + testStream(buildQuery(input), outputMode = Append)( + AddData(input, 10), + // With the bump, watermark = 10s - 1 ms after the batch, so the group for eventTime=10 + // is retained and nothing is emitted yet. + CheckAnswer(), + AddData(input, 10), + // Same timestamp as the previous batch's max: must NOT be dropped as late. + CheckAnswer(), + AddData(input, 11), + // Once the watermark strictly passes 10 s, both records show up in the emitted group. + CheckAnswer((10L, 2L)) + ) + + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { + val input2 = MemoryStream[Int] + testStream(buildQuery(input2), outputMode = Append)( + AddData(input2, 10), + // Legacy: watermark = 10s exactly, so the group is evicted and emitted immediately. + CheckAnswer((10L, 1L)), + AddData(input2, 10), + // The second record at 10 s is dropped by the late-event filter under legacy semantics. + CheckAnswer((10L, 1L)), + AddData(input2, 11), + CheckAnswer((10L, 1L), (11L, 1L)) + ) + } + } + test("withWatermark should work with alias-qualified column name") { // When a DataFrame has an alias, referencing the event time column via // "alias.columnName" should be allowed because it still refers to a top-level column. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala index cd901deae8e14..15b3c5b48d47f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala @@ -143,6 +143,10 @@ class MultiStatefulOperatorsSuite } test("agg -> agg -> agg, append mode") { + // The expected watermark progression and window emission boundaries below are calibrated + // against the legacy 0-delay semantics; disable the auto-bump so the test continues to + // exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { val inputData = MemoryStream[Int] val stream = inputData.toDF() @@ -224,6 +228,7 @@ class MultiStatefulOperatorsSuite assertNumStateRows(Seq(0, 0, 1)), assertNumRowsDroppedByWatermark(Seq(0, 0, 1)) ) + } } test("stream deduplication -> aggregation, append mode") { @@ -269,6 +274,9 @@ class MultiStatefulOperatorsSuite } test("join -> window agg, append mode") { + // The expected watermark progression below assumes the legacy 0-delay boundary; disable the + // auto-bump so the test continues to exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { val input1 = MemoryStream[Int] val inputDF1 = input1.toDF() .withColumnRenamed("value", "value1") @@ -334,9 +342,13 @@ class MultiStatefulOperatorsSuite assertNumStateRows(Seq(1, 0)), assertNumRowsDroppedByWatermark(Seq(0, 0)) ) + } } test("aggregation -> stream deduplication, append mode") { + // The expected watermark progression below assumes the legacy 0-delay boundary; disable the + // auto-bump so the test continues to exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { val inputData = MemoryStream[Int] val aggStream = inputData.toDF() @@ -415,37 +427,42 @@ class MultiStatefulOperatorsSuite assertNumStateRows(Seq(0, 1)), assertNumRowsDroppedByWatermark(Seq(0, 0)) ) + } } test("join with range join on non-time intervals -> window agg, append mode, shouldn't fail") { - val input1 = MemoryStream[Int] - val inputDF1 = input1.toDF() - .withColumnRenamed("value", "value1") - .withColumn("eventTime1", timestamp_seconds($"value1")) - .withColumn("v1", timestamp_seconds($"value1")) - .withWatermark("eventTime1", "0 seconds") - - val input2 = MemoryStream[(Int, Int)] - val inputDF2 = input2.toDS().toDF("start", "end") - .withColumn("eventTime2Start", timestamp_seconds($"start")) - .withColumn("start2", timestamp_seconds($"start")) - .withColumn("end2", timestamp_seconds($"end")) - .withWatermark("eventTime2Start", "0 seconds") - - val stream = inputDF1.join(inputDF2, - expr("v1 >= start2 AND v1 < end2 " + - "AND eventTime1 = start2"), "inner") - .groupBy(window($"eventTime1", "5 seconds") as Symbol("window")) - .agg(count("*") as Symbol("count")) - .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - - testStream(stream)( - AddData(input1, 1, 2, 3, 4), - AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)), - CheckNewAnswer(), - assertNumStateRows(Seq(1, 0)), - assertNumRowsDroppedByWatermark(Seq(0, 0)) - ) + // The expected watermark progression below assumes the legacy 0-delay boundary; disable the + // auto-bump so the test continues to exercise that boundary math. + withSQLConf(SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { + val input1 = MemoryStream[Int] + val inputDF1 = input1.toDF() + .withColumnRenamed("value", "value1") + .withColumn("eventTime1", timestamp_seconds($"value1")) + .withColumn("v1", timestamp_seconds($"value1")) + .withWatermark("eventTime1", "0 seconds") + + val input2 = MemoryStream[(Int, Int)] + val inputDF2 = input2.toDS().toDF("start", "end") + .withColumn("eventTime2Start", timestamp_seconds($"start")) + .withColumn("start2", timestamp_seconds($"start")) + .withColumn("end2", timestamp_seconds($"end")) + .withWatermark("eventTime2Start", "0 seconds") + + val stream = inputDF1.join(inputDF2, + expr("v1 >= start2 AND v1 < end2 " + + "AND eventTime1 = start2"), "inner") + .groupBy(window($"eventTime1", "5 seconds") as Symbol("window")) + .agg(count("*") as Symbol("count")) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + testStream(stream)( + AddData(input1, 1, 2, 3, 4), + AddData(input2, (1, 2), (2, 3), (3, 4), (4, 5)), + CheckNewAnswer(), + assertNumStateRows(Seq(1, 0)), + assertNumRowsDroppedByWatermark(Seq(0, 0)) + ) + } } test("stream-stream time interval left outer join -> aggregation, append mode") { @@ -473,8 +490,11 @@ class MultiStatefulOperatorsSuite .selectExpr("CAST(window.start AS STRING) AS window_start", "CAST(window.end AS STRING) AS window_end", "cnt") - // for ease of verification, we change the session timezone to UTC - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + // for ease of verification, we change the session timezone to UTC. The expected eviction + // watermark below also assumes the legacy 0-delay boundary, so disable the auto-bump. + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", + SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { testStream(agg)( MultiAddData( (input1, Seq( @@ -733,8 +753,11 @@ class MultiStatefulOperatorsSuite .selectExpr("CAST(window.start AS STRING) AS window_start", "CAST(window.end AS STRING) AS window_end", "cnt") - // for ease of verification, we change the session timezone to UTC - withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC") { + // for ease of verification, we change the session timezone to UTC. The expected eviction + // watermark below also assumes the legacy 0-delay boundary, so disable the auto-bump. + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC", + SQLConf.STREAMING_WATERMARK_BUMP_ZERO_DELAY_TO_ONE_MS.key -> "false") { testStream(agg)( MultiAddData( (input2, Seq(