diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 60a2017e6947c..a1c69847c509b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -151,11 +151,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { rightProjections.forall(_.isInstanceOf[AttributeReference]) && // Cross joins are not supported because they increase the amount of data. condition.isDefined && - // Do not push down join if either side has a pushed sample, because - // the merged scan builder would silently discard it. + // Do not push down join if either side has a pushed sample with + // fraction < 1, because the merged scan builder would silently + // discard it and change the result. At fraction = 1 without + // replacement the sample is a no-op on the result set, so dropping + // it is safe. With replacement (Poisson sampling), even fraction 1 + // can emit each row 0, 1, 2, ... times, so it is not a no-op. // TODO(SPARK-56504): Extend SupportsPushDownJoin to accept pushed // samples so sources supporting both can handle the composition. - leftHolder.pushedSample.isEmpty && rightHolder.pushedSample.isEmpty && + leftHolder.pushedSample.forall(s => + !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) && + rightHolder.pushedSample.forall(s => + !s.withReplacement && s.upperBound - s.lowerBound >= 1.0) && lBuilder.isOtherSideCompatibleForJoin(rBuilder) => // Process left and right columns in original order val (leftSideRequiredColumnsWithAliases, rightSideRequiredColumnsWithAliases) = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala index 76ec2e588eae6..164c098e95e8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableSampleSuite.scala @@ -171,9 +171,11 @@ class DataSourceV2TableSampleSuite extends DatasourceV2SQLBase val dfNoSample = sql(s"SELECT * FROM $t1 JOIN $t2 ON $t1.id = $t2.id") checkJoinPushed(dfNoSample) - // With SYSTEM sample on one side: join pushdown should be skipped + // With a SYSTEM sample (fraction < 1) on one side: join pushdown + // should be skipped because the merged scan builder would silently + // discard the sample. val dfWithSample = sql( - s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " + + s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (50 PERCENT) " + s"JOIN $t2 ON $t1.id = $t2.id") checkJoinNotPushed(dfWithSample) // The sample should still be pushed down though @@ -185,6 +187,58 @@ class DataSourceV2TableSampleSuite extends DatasourceV2SQLBase } } + test("SPARK-55978: 100% SYSTEM sample does not block join pushdown") { + val joinSampleCatalog = "testjoinsample100" + registerCatalog(joinSampleCatalog, classOf[InMemoryTableWithJoinAndSampleCatalog]) + val t1 = s"$joinSampleCatalog.ns.t1" + val t2 = s"$joinSampleCatalog.ns.t2" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING _") + sql(s"CREATE TABLE $t2 (id bigint, data string) USING _") + try { + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')") + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + // At fraction = 1 the sample is a no-op on the result set, so + // dropping it inside the merged scan builder is safe. The guard + // in V2ScanRelationPushDown short-circuits and join pushdown + // proceeds. + val dfWithSample = sql( + s"SELECT * FROM $t1 TABLESAMPLE SYSTEM (100 PERCENT) " + + s"JOIN $t2 ON $t1.id = $t2.id") + checkJoinPushed(dfWithSample) + } + } finally { + sql(s"DROP TABLE IF EXISTS $t1") + sql(s"DROP TABLE IF EXISTS $t2") + } + } + + test("SPARK-55978: with-replacement sample blocks join pushdown even at fraction 1") { + val joinSampleCatalog = "testjoinsamplerepl" + registerCatalog(joinSampleCatalog, classOf[InMemoryTableWithJoinAndSampleCatalog]) + val t1 = s"$joinSampleCatalog.ns.t1" + val t2 = s"$joinSampleCatalog.ns.t2" + sql(s"CREATE TABLE $t1 (id bigint, data string) USING _") + sql(s"CREATE TABLE $t2 (id bigint, data string) USING _") + try { + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"INSERT INTO $t2 VALUES (2, 'x'), (3, 'y'), (4, 'z')") + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + // SQL TABLESAMPLE always sets withReplacement=false, so use the + // DataFrame API. Poisson sampling at fraction 1 still emits each + // input row 0, 1, 2, ... times, so the sample is not a no-op and + // join pushdown must remain blocked. + val df = spark.table(t1).sample(withReplacement = true, fraction = 1.0) + .join(spark.table(t2), "id") + checkJoinNotPushed(df) + checkSamplePushed(df, pushed = true) + } + } finally { + sql(s"DROP TABLE IF EXISTS $t1") + sql(s"DROP TABLE IF EXISTS $t2") + } + } + test("SPARK-55978: legacy connector with only 4-arg pushTableSample - BERNOULLI pushes down") { val legacyCatalog = "testlegacysample" registerCatalog(legacyCatalog, classOf[InMemoryTableWithLegacyTableSampleCatalog])