From 6cc5b0849ae175d050b85a49129a831d7c2c0dca Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 21 May 2026 09:21:02 +0000 Subject: [PATCH 1/3] [SPARK-55978][SQL][FOLLOWUP] Don't block V2 join pushdown when pushed Sample has fraction=1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Followup to https://github.com/apache/spark/pull/54972. Narrow the `V2ScanRelationPushDown` join-pushdown guard so it only blocks pushdown when at least one side has a pushed `Sample` with fraction < 1. At fraction = 1 the sample is a no-op on the result set, so dropping it inside the merged scan builder is safe. ### Why are the changes needed? The guard added in SPARK-55978 exists because the merged scan builder for `SupportsPushDownJoin` cannot carry a pushed sample and would silently discard it. The hazard is *silent result change*. At fraction = 1, no rows are excluded, so dropping the sample changes nothing observable. The current guard is therefore stricter than its rationale requires, and unnecessarily skips join pushdown for queries that land at `TABLESAMPLE SYSTEM (100 PERCENT)` (parameterized queries, query generators, environment-tuned fractions). ### Does this PR introduce _any_ user-facing change? No behavior change for queries with fraction < 1. For queries where the pushed sample has fraction = 1, join pushdown now proceeds — same result set, faster plan. ### How was this patch tested? - Existing `"join pushdown is skipped when a side has a pushed sample"` test moved from `100 PERCENT` to `50 PERCENT` so it keeps exercising the (still-active) fraction < 1 branch of the guard. - New `"100% SYSTEM sample does not block join pushdown"` test asserts the new fraction = 1 short-circuit, locking in the contract. ### Was this patch authored or co-authored using generative AI tooling? No. --- .../v2/V2ScanRelationPushDown.scala | 9 ++++-- .../DataSourceV2TableSampleSuite.scala | 32 +++++++++++++++++-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 60a2017e6947c..3383f373c5173 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -151,11 +151,14 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { rightProjections.forall(_.isInstanceOf[AttributeReference]) && // Cross joins are not supported because they increase the amount of data. condition.isDefined && - // Do not push down join if either side has a pushed sample, because - // the merged scan builder would silently discard it. + // Do not push down join if either side has a pushed sample with + // fraction < 1, because the merged scan builder would silently + // discard it and change the result. At fraction = 1 the sample is + // a no-op on the result set, so dropping it is safe. // TODO(SPARK-56504): Extend SupportsPushDownJoin to accept pushed // samples so sources supporting both can handle the composition. - leftHolder.pushedSample.isEmpty && rightHolder.pushedSample.isEmpty && + leftHolder.pushedSample.forall(s => s.upperBound - s.lowerBound >= 1.0) && + rightHolder.pushedSample.forall(s => s.upperBound - s.lowerBound >= 1.0) && lBuilder.isOtherSideCompatibleForJoin(rBuilder) => // Process left and right columns in original order val (leftSideRequiredColumnsWithAliases, rightSideRequiredColumnsWithAliases) = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala index 76ec2e588eae6..be82c8bf6b0f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala @@ -171,9 +171,11 @@ class DataSourceV2TableSampleSuite extends DatasourceV2SQLBase val dfNoSample = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.id = $t2.id") checkJoinPushed(dfNoSample) - // With SYSTEM sample on one side: join pushdown should be skipped + // With a SYSTEM sample (fraction < 1) on one side: join pushdown + // should be skipped because the merged scan builder would silently + // discard the sample. val dfWithSample = sql( - s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " + + s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (50 PERCENT) " + s"JOIN $t2 ON $t1.id = $t2.id") checkJoinNotPushed(dfWithSample) // The sample should still be pushed down though @@ -185,6 +187,32 @@ class DataSourceV2TableSampleSuite extends DatasourceV2SQLBase } } + test("SPARK-55978: 100% SYSTEM sample does not block join pushdown") { + val joinSampleCatalog = "testjoinsample100" + registerCatalog(joinSampleCatalog, classOf[InMemoryTableWithJoinAndSampleCatalog]) + val t1 = s"$joinSampleCatalog.ns.t1" + val t2 = s"$joinSampleCatalog.ns.t2" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING _") + sql(s"CREATE TABLE $t2 (id bigint, data string) USING _") + try { + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')") + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + // At fraction = 1 the sample is a no-op on the result set, so + // dropping it inside the merged scan builder is safe. The guard + // in V2ScanRelationPushDown short-circuits and join pushdown + // proceeds. + val dfWithSample = sql( + s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " + + s"JOIN $t2 ON $t1.id = $t2.id") + checkJoinPushed(dfWithSample) + } + } finally { + sql(s"DROP TABLE IF EXISTS $t1") + sql(s"DROP TABLE IF EXISTS $t2") + } + } + test("SPARK-55978: legacy connector with only 4-arg pushTableSample - BERNOULLI pushes down") { val legacyCatalog = "testlegacysample" registerCatalog(legacyCatalog, classOf[InMemoryTableWithLegacyTableSampleCatalog]) From 3ad10ac9526958e5579208ec3fa3422c162720a1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 22 May 2026 09:16:17 +0000 Subject: [PATCH 2/3] Address review: also block withReplacement=1.0 samples A sample with withReplacement=true at fraction 1.0 uses PoissonSampler and is not a no-op (each row can emit 0, 1, 2, ... times). Tighten the guard so only !withReplacement, fraction >= 1.0 samples short-circuit the join-pushdown block. --- .../datasources/v2/V2ScanRelationPushDown.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 3383f373c5173..a1c69847c509b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -153,12 +153,16 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { condition.isDefined && // Do not push down join if either side has a pushed sample with // fraction < 1, because the merged scan builder would silently - // discard it and change the result. At fraction = 1 the sample is - // a no-op on the result set, so dropping it is safe. + // discard it and change the result. At fraction = 1 without + // replacement the sample is a no-op on the result set, so dropping + // it is safe. With replacement (Poisson sampling), even fraction 1 + // can emit each row 0, 1, 2, ... times, so it is not a no-op. // TODO(SPARK-56504): Extend SupportsPushDownJoin to accept pushed // samples so sources supporting both can handle the composition. - leftHolder.pushedSample.forall(s => s.upperBound - s.lowerBound >= 1.0) && - rightHolder.pushedSample.forall(s => s.upperBound - s.lowerBound >= 1.0) && + leftHolder.pushedSample.forall(s => + !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) && + rightHolder.pushedSample.forall(s => + !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) && lBuilder.isOtherSideCompatibleForJoin(rBuilder) => // Process left and right columns in original order val (leftSideRequiredColumnsWithAliases, rightSideRequiredColumnsWithAliases) = From 1e4dbae927f30c0be8f938956b0216dff9bca8d3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 22 May 2026 09:25:17 +0000 Subject: [PATCH 3/3] Add test: with-replacement sample at fraction 1 still blocks join pushdown Locks in the new !s.withReplacement guard via the DataFrame.sample API (SQL TABLESAMPLE can't set withReplacement=true). --- .../DataSourceV2TableSampleSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala index be82c8bf6b0f3..164c098e95e8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala @@ -213,6 +213,32 @@ class DataSourceV2TableSampleSuite extends DatasourceV2SQLBase } } + test("SPARK-55978: with-replacement sample blocks join pushdown even at fraction 1") { + val joinSampleCatalog = "testjoinsamplerepl" + registerCatalog(joinSampleCatalog, classOf[InMemoryTableWithJoinAndSampleCatalog]) + val t1 = s"$joinSampleCatalog.ns.t1" + val t2 = s"$joinSampleCatalog.ns.t2" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING _") + sql(s"CREATE TABLE $t2 (id bigint, data string) USING _") + try { + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')") + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + // SQL TABLESAMPLE always sets withReplacement=false, so use the + // DataFrame API. Poisson sampling at fraction 1 still emits each + // input row 0, 1, 2, ... times, so the sample is not a no-op and + // join pushdown must remain blocked. + val df = spark.table(t1).sample(withReplacement = true, fraction = 1.0) + .join(spark.table(t2), "id") + checkJoinNotPushed(df) + checkSamplePushed(df, pushed = true) + } + } finally { + sql(s"DROP TABLE IF EXISTS $t1") + sql(s"DROP TABLE IF EXISTS $t2") + } + } + test("SPARK-55978: legacy connector with only 4-arg pushTableSample - BERNOULLI pushes down") { val legacyCatalog = "testlegacysample" registerCatalog(legacyCatalog, classOf[InMemoryTableWithLegacyTableSampleCatalog])