diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 8a939f2028..0b57ca5839 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -918,7 +918,7 @@ index b5b34922694..a72403780c4 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 525d97e4998..8a3e7457618 100644 +index 525d97e4998..3c06f3f9261 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -931,6 +931,16 @@ index 525d97e4998..8a3e7457618 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -3107,7 +3108,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + } + +- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { ++ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 48ad10992c5..51d1ee65422 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -2111,28 +2121,10 @@ index 104b4e416cd..b8af360fa14 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..9411af57a26 100644 +index 8670d95c65e..b624c3811dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} - - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} - import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} - import org.apache.spark.sql.catalyst.util.DateTimeUtils -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1335,7 +1337,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2143,7 +2135,7 @@ index 8670d95c65e..9411af57a26 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index 29cb224c878..ee5a87fa200 100644 +index 29cb224c878..1658c3c9750 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2160,7 +2152,7 @@ index 29cb224c878..ee5a87fa200 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2204,17 +2196,7 @@ index 29cb224c878..ee5a87fa200 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need -@@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1128,7 +1134,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index d3a5c617dc..4e0dfaeb3c 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -937,7 +937,7 @@ index c26757c9cff..d55775f09d7 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 3cf2bfd17ab..b1c1e41e6a9 100644 +index 3cf2bfd17ab..44ac7b5369c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -950,6 +950,16 @@ index 3cf2bfd17ab..b1c1e41e6a9 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -3127,7 +3128,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + } + +- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { ++ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 8b4ac474f87..3f79f20822f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -2094,20 +2104,10 @@ index 8e88049f51e..20d7ef7b1bc 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..71e22972a47 100644 +index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1075,7 +1075,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1345,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2118,7 +2118,7 @@ index 8ed9ef1630e..71e22972a47 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..5ea2d938664 100644 +index f6472ba3d9d..68ec528b10e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2127,7 +2127,7 @@ index f6472ba3d9d..5ea2d938664 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2171,17 +2171,7 @@ index f6472ba3d9d..5ea2d938664 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need -@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1148,7 +1153,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 5948292d7b..ad67c3e952 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -2729,7 +2729,7 @@ index 4474ec1fd42..05fa0257c82 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..35247c13ad9 100644 +index bba71f1c48d..f3a203caa85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2740,17 +2740,7 @@ index bba71f1c48d..35247c13ad9 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2763,7 +2753,7 @@ index bba71f1c48d..35247c13ad9 100644 } } } -@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -2773,7 +2763,7 @@ index bba71f1c48d..35247c13ad9 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2783,7 +2773,7 @@ index bba71f1c48d..35247c13ad9 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2793,16 +2783,6 @@ index bba71f1c48d..35247c13ad9 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -2933,7 +2913,7 @@ index 0acb21f3e6f..8d60dfb686d 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..5cd856ff7b6 100644 +index 09ed6955a51..a49e9a6fe41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -3015,16 +2995,17 @@ index 09ed6955a51..5cd856ff7b6 100644 checkAllParquetReaders( values = Seq("1.23", "10.34"), fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite +@@ -322,8 +329,8 @@ class ParquetTypeWideningSuite Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) } test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + - s"Decimal($toPrecision, $toScale)" +- ) { + s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720") - ) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { checkAllParquetReaders( values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index ca5341f7f6..736fb952d9 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -2986,7 +2986,7 @@ index 3072657a095..6b5b9103363 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index c530dc0d3df..418d5ea4b4d 100644 +index c530dc0d3df..3eac4aff201 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2997,17 +2997,7 @@ index c530dc0d3df..418d5ea4b4d 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -3020,7 +3010,7 @@ index c530dc0d3df..418d5ea4b4d 100644 } } } -@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -3030,7 +3020,7 @@ index c530dc0d3df..418d5ea4b4d 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -3040,7 +3030,7 @@ index c530dc0d3df..418d5ea4b4d 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -3050,16 +3040,6 @@ index c530dc0d3df..418d5ea4b4d 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -3157,7 +3137,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 56076175d60..78c4a9755c0 100644 +index 56076175d60..e04d7aaadc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -3189,18 +3169,8 @@ index 56076175d60..78c4a9755c0 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) -@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..5cd856ff7b6 100644 +index 09ed6955a51..a49e9a6fe41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -3282,16 +3252,17 @@ index 09ed6955a51..5cd856ff7b6 100644 checkAllParquetReaders( values = Seq("1.23", "10.34"), fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite +@@ -322,8 +329,8 @@ class ParquetTypeWideningSuite Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) } test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + - s"Decimal($toPrecision, $toScale)" +- ) { + s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720") - ) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { checkAllParquetReaders( values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 1cc6d3afbee..8275727fbb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 844cc07c69..9d9ec1506a 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1324,6 +1324,7 @@ impl PhysicalPlanner { common.session_timezone.as_str(), common.case_sensitive, common.return_null_struct_if_all_fields_missing, + common.allow_type_promotion, self.session_ctx(), common.encryption_enabled, )?; diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 3d61251447..3405782cd8 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -513,6 +513,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat session_timezone.as_str(), case_sensitive != JNI_FALSE, return_null_struct_if_all_fields_missing != JNI_FALSE, + true, // allow_type_promotion: JVM side already validated via TypeUtil.checkParquetType session_ctx, encryption_enabled, )?; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index e67700e629..918abaf5af 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -72,6 +72,7 @@ pub(crate) fn init_datasource_exec( session_timezone: &str, case_sensitive: bool, return_null_struct_if_all_fields_missing: bool, + allow_type_promotion: bool, session_ctx: &Arc, encryption_enabled: bool, ) -> Result, ExecutionError> { @@ -79,6 +80,7 @@ pub(crate) fn init_datasource_exec( session_timezone, case_sensitive, return_null_struct_if_all_fields_missing, + allow_type_promotion, &object_store_url, encryption_enabled, ); @@ -188,6 +190,7 @@ fn get_options( session_timezone: &str, case_sensitive: bool, return_null_struct_if_all_fields_missing: bool, + allow_type_promotion: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { @@ -201,6 +204,7 @@ fn get_options( spark_parquet_options.case_sensitive = case_sensitive; spark_parquet_options.return_null_struct_if_all_fields_missing = return_null_struct_if_all_fields_missing; + spark_parquet_options.allow_type_promotion = allow_type_promotion; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 1e0c64ea4b..2eba9830aa 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -84,6 +84,9 @@ pub struct SparkParquetOptions { /// legacy behavior); false preserves the parent struct's nullness from the file /// so non-null parents return a struct of all-null fields. pub return_null_struct_if_all_fields_missing: bool, + /// Whether type promotion (schema evolution) is allowed, e.g. INT32 -> INT64, + /// FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled. + pub allow_type_promotion: bool, } impl SparkParquetOptions { @@ -97,6 +100,7 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, + allow_type_promotion: false, } } @@ -110,6 +114,7 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, + allow_type_promotion: false, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 4e68902585..ecd7a8b816 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -94,6 +94,35 @@ fn remap_physical_schema_names( Arc::new(Schema::new(remapped_fields)) } +/// Format an Arrow `DataType` as Spark's catalog string (e.g. `Int64` -> `bigint`). +/// Used so the SchemaColumnConvertNotSupportedException error message matches +/// the format produced by Spark's own vectorized Parquet reader. +fn spark_catalog_name(dt: &DataType) -> &'static str { + match dt { + DataType::Boolean => "boolean", + DataType::Int8 => "tinyint", + DataType::Int16 => "smallint", + DataType::Int32 => "int", + DataType::Int64 => "bigint", + DataType::Float32 => "float", + DataType::Float64 => "double", + _ => "unknown", + } +} + +/// Format an Arrow `DataType` as the Parquet primitive type name +/// (e.g. `Int64` -> `INT64`, matching `PrimitiveTypeName.toString()` in parquet-mr). +fn parquet_primitive_name(dt: &DataType) -> &'static str { + match dt { + DataType::Boolean => "BOOLEAN", + DataType::Int8 | DataType::Int16 | DataType::Int32 => "INT32", + DataType::Int64 => "INT64", + DataType::Float32 => "FLOAT", + DataType::Float64 => "DOUBLE", + _ => "UNKNOWN", + } +} + /// Check if a specific column name has duplicate matches in the physical schema /// (case-insensitive). Returns the error info if so. fn check_column_duplicate(col_name: &str, physical_schema: &SchemaRef) -> Option<(String, String)> { @@ -457,6 +486,32 @@ impl SparkPhysicalExprAdapter { } } + // Type promotion (widening) check. + // When allow_type_promotion is false, reject numeric type widening + // (INT32→INT64, FLOAT→DOUBLE) to match Spark 3.x behavior where + // reading a column with a promoted type throws + // SchemaColumnConvertNotSupportedException. When allow_type_promotion + // is true (Spark 4.0+ default), these promotions are allowed. + // This mirrors TypeUtil.checkParquetType in the JVM code. + if !self.parquet_options.allow_type_promotion { + let is_disallowed_promotion = matches!( + (physical_type, target_type), + (DataType::Int32, DataType::Int64) + | (DataType::Float32, DataType::Float64) + | (DataType::Int32, DataType::Float64) + ); + if is_disallowed_promotion { + return Err(DataFusionError::External(Box::new( + SparkError::ParquetSchemaConvert { + file_path: String::new(), + column: format!("[{}]", cast.input_field().name()), + physical_type: parquet_primitive_name(physical_type).to_string(), + spark_type: spark_catalog_name(target_type).to_string(), + }, + ))); + } + } + // For complex nested types (Struct, List, Map), Timestamp timezone // mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr // with spark_parquet_convert which handles field-name-based selection, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 9afb26470c..3d58c849d5 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -112,6 +112,11 @@ message NativeScanCommon { // pre-4.1 behavior); false preserves the parent struct's nullness from the file // so non-null parents return a struct of all-null fields. bool return_null_struct_if_all_fields_missing = 14; + // Whether schema evolution (type promotion) is allowed, e.g. INT32 -> INT64, + // FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled. When false, + // reading a column with an incompatible promoted type throws an error matching + // Spark's SchemaColumnConvertNotSupportedException behavior. + bool allow_type_promotion = 15; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 066b770bbb..02f419cf3f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -200,6 +200,8 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setReturnNullStructIfAllFieldsMissing( scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean) + commonBuilder.setAllowTypePromotion(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get()) + // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 922a4255c4..3f4e1b79f3 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -981,12 +981,7 @@ abstract class ParquetReadSuite extends CometTestBase { Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) withParquetDataFrame(data, schema = Some(readSchema)) { df => - val scan = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) - val isNativeDataFusionScan = - scan == CometConf.SCAN_NATIVE_DATAFUSION || scan == CometConf.SCAN_AUTO - if (enableSchemaEvolution || isNativeDataFusionScan) { - // native_datafusion has more permissive schema evolution - // https://github.com/apache/datafusion-comet/issues/3720 + if (enableSchemaEvolution) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect())