Skip to content

[WIP][SS] Stop dropping records with same event time when watermark delay is 0#56043

Draft
eason-yuchen-liu wants to merge 1 commit into
apache:masterfrom
eason-yuchen-liu:SC-206726-shift-watermark-delay
Draft

[WIP][SS] Stop dropping records with same event time when watermark delay is 0#56043
eason-yuchen-liu wants to merge 1 commit into
apache:masterfrom
eason-yuchen-liu:SC-206726-shift-watermark-delay

Conversation

@eason-yuchen-liu
Copy link
Copy Markdown
Contributor

@eason-yuchen-liu eason-yuchen-liu commented May 21, 2026

What changes were proposed in this pull request?

When a user configures withWatermark(col, "0 seconds") (or any other zero-length interval), every record whose event time equals the current watermark is dropped by the late-event filter. This change bumps a configured delay of 0 to 1 ms internally so that records sharing the latest event time across batches are no longer dropped, and logs a one-line warning at the call site so the user is aware. The bump is gated by a new default-on SQL conf spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs; set it to false to restore the pre-Spark 5.0 behavior. Non-zero delays are not affected.

Why are the changes needed?

In microbatch mode the watermark is computed at the end of each batch as max(event_time) - delay and used to filter late events in the next batch with the predicate event_time_us <= watermark_ms * 1000. With a delay of 0 this means the predicate is satisfied (and the row is dropped) whenever an incoming record's event time equals the previous batch's maximum event time — even though that record is no more "late" than the one that produced the maximum in the first place. The effect is that two records with identical event times are treated differently solely based on which batch they happen to land in: the record that produced the max is kept, the record that arrives in the next batch is dropped.

In other words, with a 0-second watermark, rows that share the exact same event time are discriminated against based on batch arrival. This fix removes that asymmetry by ensuring the effective delay is at least one millisecond, which makes the late-event comparison strict in practice (event_time_us < watermark_ms * 1000).

Does this PR introduce any user-facing change?

Yes. When withWatermark(col, "0 seconds") is used (or any other zero-length interval):

  • A warning is logged the first time the delay is resolved.
  • Records whose event time exactly matches the current watermark are no longer dropped as late.
  • State for groups/windows ending exactly at the watermark is retained for one extra microbatch.

The change is gated by spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs, which defaults to true. Set it to false to recover the prior behavior. Non-zero delays are unaffected.

How was this patch tested?

Added EventTimeWatermarkSuite."zero-delay watermark keeps records at max event time across batches" which exercises both the new default (bumpZeroDelayToOneMs=true) and the legacy path (bumpZeroDelayToOneMs=false).

A handful of existing MultiStatefulOperatorsSuite tests encode the legacy 0-delay boundary in their expected watermark/output and have been wrapped in withSQLConf(... -> "false") so they continue to validate that path.

EventTimeWatermarkSuite, MultiStatefulOperatorsSuite, StreamingJoinSuite, FlatMapGroupsWithStateSuite, and StreamingQueryOptimizationCorrectnessSuite all pass locally.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Anthropic, claude-opus-4-7), with human review.

…elay is 0

When `withWatermark(col, "0 seconds")` is used, Spark's late-event predicate
`event_time_us <= watermark_ms * 1000` treats records whose event time equals
the previous batch's max event time as late and drops them. Two records with
the same event time are then handled differently based on which batch carried
them: the one that produced the max event time is kept, the one that arrives
in the next batch is dropped.

This change bumps a configured delay of 0 to 1 ms internally so the comparison
becomes strict in practice and same-event-time records are no longer
discriminated against. A warning is logged when the bump activates. Gated by a
new default-on SQL conf `spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs`.
Non-zero delays are unaffected.

Added an `EventTimeWatermarkSuite` test that exercises both the new default
and the legacy path. Existing tests in `MultiStatefulOperatorsSuite` that
encode the legacy 0-delay boundary in their expected output are wrapped in
`withSQLConf(... -> "false")` so they continue to validate that path.
@dongjoon-hyun dongjoon-hyun marked this pull request as draft May 21, 2026 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant