From 5b35007d46e6f153aca206cf2f23a0f95fae0500 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 May 2026 12:52:35 -0600 Subject: [PATCH 1/5] fix: reject disallowed type promotions in native_datafusion scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When COMET_SCHEMA_EVOLUTION_ENABLED is false, the native_datafusion scan path now rejects reading Parquet INT32 as INT64, FLOAT as DOUBLE, and INT32 as DOUBLE — matching the existing validation in native_iceberg_compat. The allow_type_promotion flag is passed from JVM via protobuf and checked in replace_with_spark_cast() before allowing widening casts. Closes #3720 Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/planner.rs | 1 + native/core/src/parquet/mod.rs | 1 + native/core/src/parquet/parquet_exec.rs | 4 +++ native/core/src/parquet/parquet_support.rs | 5 ++++ native/core/src/parquet/schema_adapter.rs | 26 +++++++++++++++++++ native/core/src/parquet/util/mod.rs | 1 + native/proto/src/proto/operator.proto | 5 ++++ .../serde/operator/CometNativeScan.scala | 3 +++ .../comet/parquet/ParquetReadSuite.scala | 7 +---- 9 files changed, 47 insertions(+), 6 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 844cc07c69..9d9ec1506a 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1324,6 +1324,7 @@ impl PhysicalPlanner { common.session_timezone.as_str(), common.case_sensitive, common.return_null_struct_if_all_fields_missing, + common.allow_type_promotion, self.session_ctx(), common.encryption_enabled, )?; diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 3d61251447..3405782cd8 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -513,6 +513,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat session_timezone.as_str(), case_sensitive != JNI_FALSE, return_null_struct_if_all_fields_missing != JNI_FALSE, + true, // allow_type_promotion: JVM side already validated via TypeUtil.checkParquetType session_ctx, encryption_enabled, )?; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index e67700e629..918abaf5af 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -72,6 +72,7 @@ pub(crate) fn init_datasource_exec( session_timezone: &str, case_sensitive: bool, return_null_struct_if_all_fields_missing: bool, + allow_type_promotion: bool, session_ctx: &Arc, encryption_enabled: bool, ) -> Result, ExecutionError> { @@ -79,6 +80,7 @@ pub(crate) fn init_datasource_exec( session_timezone, case_sensitive, return_null_struct_if_all_fields_missing, + allow_type_promotion, &object_store_url, encryption_enabled, ); @@ -188,6 +190,7 @@ fn get_options( session_timezone: &str, case_sensitive: bool, return_null_struct_if_all_fields_missing: bool, + allow_type_promotion: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { @@ -201,6 +204,7 @@ fn get_options( spark_parquet_options.case_sensitive = case_sensitive; spark_parquet_options.return_null_struct_if_all_fields_missing = return_null_struct_if_all_fields_missing; + spark_parquet_options.allow_type_promotion = allow_type_promotion; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 1e0c64ea4b..2eba9830aa 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -84,6 +84,9 @@ pub struct SparkParquetOptions { /// legacy behavior); false preserves the parent struct's nullness from the file /// so non-null parents return a struct of all-null fields. pub return_null_struct_if_all_fields_missing: bool, + /// Whether type promotion (schema evolution) is allowed, e.g. INT32 -> INT64, + /// FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled. + pub allow_type_promotion: bool, } impl SparkParquetOptions { @@ -97,6 +100,7 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, + allow_type_promotion: false, } } @@ -110,6 +114,7 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, + allow_type_promotion: false, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 4e68902585..3da2c5f247 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -457,6 +457,32 @@ impl SparkPhysicalExprAdapter { } } + // Type promotion (widening) check. + // When allow_type_promotion is false, reject numeric type widening + // (INT32→INT64, FLOAT→DOUBLE) to match Spark 3.x behavior where + // reading a column with a promoted type throws + // SchemaColumnConvertNotSupportedException. When allow_type_promotion + // is true (Spark 4.0+ default), these promotions are allowed. + // This mirrors TypeUtil.checkParquetType in the JVM code. + if !self.parquet_options.allow_type_promotion { + let is_disallowed_promotion = matches!( + (physical_type, target_type), + (DataType::Int32, DataType::Int64) + | (DataType::Float32, DataType::Float64) + | (DataType::Int32, DataType::Float64) + ); + if is_disallowed_promotion { + return Err(DataFusionError::External(Box::new( + SparkError::ParquetSchemaConvert { + file_path: String::new(), + column: cast.input_field().name().to_string(), + physical_type: physical_type.to_string(), + spark_type: target_type.to_string(), + }, + ))); + } + } + // For complex nested types (Struct, List, Map), Timestamp timezone // mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr // with spark_parquet_convert which handles field-name-based selection, diff --git a/native/core/src/parquet/util/mod.rs b/native/core/src/parquet/util/mod.rs index e71a6e2c78..72a0b4a194 100644 --- a/native/core/src/parquet/util/mod.rs +++ b/native/core/src/parquet/util/mod.rs @@ -22,4 +22,5 @@ pub mod memory; mod buffer; pub use buffer::*; +#[cfg(test)] pub mod test_common; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 9afb26470c..3d58c849d5 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -112,6 +112,11 @@ message NativeScanCommon { // pre-4.1 behavior); false preserves the parent struct's nullness from the file // so non-null parents return a struct of all-null fields. bool return_null_struct_if_all_fields_missing = 14; + // Whether schema evolution (type promotion) is allowed, e.g. INT32 -> INT64, + // FLOAT -> DOUBLE. Mirrors spark.comet.schemaEvolution.enabled. When false, + // reading a column with an incompatible promoted type throws an error matching + // Spark's SchemaColumnConvertNotSupportedException behavior. + bool allow_type_promotion = 15; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 066b770bbb..5437066538 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -200,6 +200,9 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setReturnNullStructIfAllFieldsMissing( scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean) + commonBuilder.setAllowTypePromotion( + CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get()) + // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 922a4255c4..3f4e1b79f3 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -981,12 +981,7 @@ abstract class ParquetReadSuite extends CometTestBase { Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) withParquetDataFrame(data, schema = Some(readSchema)) { df => - val scan = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) - val isNativeDataFusionScan = - scan == CometConf.SCAN_NATIVE_DATAFUSION || scan == CometConf.SCAN_AUTO - if (enableSchemaEvolution || isNativeDataFusionScan) { - // native_datafusion has more permissive schema evolution - // https://github.com/apache/datafusion-comet/issues/3720 + if (enableSchemaEvolution) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect()) From 35da54dd439059f6e38770ff4d942118fd999771 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 May 2026 12:56:55 -0600 Subject: [PATCH 2/5] format --- .../org/apache/comet/serde/operator/CometNativeScan.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 5437066538..02f419cf3f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -200,8 +200,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setReturnNullStructIfAllFieldsMissing( scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean) - commonBuilder.setAllowTypePromotion( - CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get()) + commonBuilder.setAllowTypePromotion(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get()) // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState From 715b64b1c188d03d6b5cec770d4d16105499435c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 May 2026 13:27:28 -0600 Subject: [PATCH 3/5] fix: format type-promotion error message to match Spark Format the SchemaColumnConvertNotSupportedException message produced by the type-promotion check so it matches Spark's vectorized reader output: column rendered as [name], expected as Spark catalog string (bigint), found as Parquet primitive name (INT32). This lets the SPARK-35640 and "row group skipping doesn't overflow" tests pass, and updates 3.4.3.diff to remove their IgnoreCometNativeDataFusion tags. The TimestampLTZ to TimestampNTZ case (SPARK-36182) and decimal precision/scale case (SPARK-34212) remain ignored, tracked under #4219 and #3720 respectively. Also reverts the cfg(test) gate on parquet/util/test_common so the parquet_read benchmark builds. --- dev/diffs/3.4.3.diff | 38 +++-------------------- native/core/src/parquet/schema_adapter.rs | 35 +++++++++++++++++++-- native/core/src/parquet/util/mod.rs | 1 - 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 8a939f2028..3a19878fe1 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2111,28 +2111,10 @@ index 104b4e416cd..b8af360fa14 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8670d95c65e..9411af57a26 100644 +index 8670d95c65e..b624c3811dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -41,6 +41,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} - - import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} - import org.apache.spark.sql._ -+import org.apache.spark.sql.IgnoreCometNativeDataFusion - import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} - import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} - import org.apache.spark.sql.catalyst.util.DateTimeUtils -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1335,7 +1337,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2143,7 +2125,7 @@ index 8670d95c65e..9411af57a26 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index 29cb224c878..ee5a87fa200 100644 +index 29cb224c878..1658c3c9750 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2160,7 +2142,7 @@ index 29cb224c878..ee5a87fa200 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2204,17 +2186,7 @@ index 29cb224c878..ee5a87fa200 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1113,7 +1119,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need -@@ -1128,7 +1135,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1128,7 +1134,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 3da2c5f247..ecd7a8b816 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -94,6 +94,35 @@ fn remap_physical_schema_names( Arc::new(Schema::new(remapped_fields)) } +/// Format an Arrow `DataType` as Spark's catalog string (e.g. `Int64` -> `bigint`). +/// Used so the SchemaColumnConvertNotSupportedException error message matches +/// the format produced by Spark's own vectorized Parquet reader. +fn spark_catalog_name(dt: &DataType) -> &'static str { + match dt { + DataType::Boolean => "boolean", + DataType::Int8 => "tinyint", + DataType::Int16 => "smallint", + DataType::Int32 => "int", + DataType::Int64 => "bigint", + DataType::Float32 => "float", + DataType::Float64 => "double", + _ => "unknown", + } +} + +/// Format an Arrow `DataType` as the Parquet primitive type name +/// (e.g. `Int64` -> `INT64`, matching `PrimitiveTypeName.toString()` in parquet-mr). +fn parquet_primitive_name(dt: &DataType) -> &'static str { + match dt { + DataType::Boolean => "BOOLEAN", + DataType::Int8 | DataType::Int16 | DataType::Int32 => "INT32", + DataType::Int64 => "INT64", + DataType::Float32 => "FLOAT", + DataType::Float64 => "DOUBLE", + _ => "UNKNOWN", + } +} + /// Check if a specific column name has duplicate matches in the physical schema /// (case-insensitive). Returns the error info if so. fn check_column_duplicate(col_name: &str, physical_schema: &SchemaRef) -> Option<(String, String)> { @@ -475,9 +504,9 @@ impl SparkPhysicalExprAdapter { return Err(DataFusionError::External(Box::new( SparkError::ParquetSchemaConvert { file_path: String::new(), - column: cast.input_field().name().to_string(), - physical_type: physical_type.to_string(), - spark_type: target_type.to_string(), + column: format!("[{}]", cast.input_field().name()), + physical_type: parquet_primitive_name(physical_type).to_string(), + spark_type: spark_catalog_name(target_type).to_string(), }, ))); } diff --git a/native/core/src/parquet/util/mod.rs b/native/core/src/parquet/util/mod.rs index 72a0b4a194..e71a6e2c78 100644 --- a/native/core/src/parquet/util/mod.rs +++ b/native/core/src/parquet/util/mod.rs @@ -22,5 +22,4 @@ pub mod memory; mod buffer; pub use buffer::*; -#[cfg(test)] pub mod test_common; From ce4adc43d0dd46d8d333a3590efa80ee69217caa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 May 2026 13:59:40 -0600 Subject: [PATCH 4/5] fix: unignore passing 3720 tests across all spark version diffs Run the 3720-tagged tests in dev/diffs/3.5.8.diff, 4.0.2.diff, and 4.1.1.diff against patched Spark trees with the type-promotion fix applied, then drop the IgnoreCometNativeDataFusion tag for tests that now pass and keep it on tests that still fail. 3.5.8: Drop tags for SPARK-35640 (int as long) and "row group skipping doesn't overflow", repoint SPARK-36182 at issue #4219. Same scope as 3.4.3, since the test source matches. 4.0.2 and 4.1.1: Drop tags for SPARK-47447 (TimestampLTZ as TimestampNTZ) and "row group skipping". 4.1.1 also drops the tag for SPARK-45604 (timestamp_ntz to array). Tests for SPARK-35640 (binary as timestamp), SPARK-34212 (decimal precision/scale), the schema-mismatch vectorized-reader test, and the parameterized ParquetTypeWideningSuite cases (unsupported parquet conversion, unsupported parquet timestamp conversion, parquet decimal precision change, parquet decimal precision and scale change) still fail and remain ignored under #3720. --- dev/diffs/3.5.8.diff | 30 +++++--------------------- dev/diffs/4.0.2.diff | 39 +++++++++------------------------ dev/diffs/4.1.1.diff | 51 ++++++++++---------------------------------- 3 files changed, 26 insertions(+), 94 deletions(-) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index d3a5c617dc..080663bac4 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2094,20 +2094,10 @@ index 8e88049f51e..20d7ef7b1bc 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..71e22972a47 100644 +index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1075,7 +1075,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1345,7 +1346,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2118,7 +2108,7 @@ index 8ed9ef1630e..71e22972a47 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..5ea2d938664 100644 +index f6472ba3d9d..68ec528b10e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2127,7 +2117,7 @@ index f6472ba3d9d..5ea2d938664 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) @@ -2171,17 +2161,7 @@ index f6472ba3d9d..5ea2d938664 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need -@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1148,7 +1153,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 5948292d7b..ad67c3e952 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -2729,7 +2729,7 @@ index 4474ec1fd42..05fa0257c82 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..35247c13ad9 100644 +index bba71f1c48d..f3a203caa85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2740,17 +2740,7 @@ index bba71f1c48d..35247c13ad9 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2763,7 +2753,7 @@ index bba71f1c48d..35247c13ad9 100644 } } } -@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -2773,7 +2763,7 @@ index bba71f1c48d..35247c13ad9 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2783,7 +2773,7 @@ index bba71f1c48d..35247c13ad9 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2793,16 +2783,6 @@ index bba71f1c48d..35247c13ad9 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -2933,7 +2913,7 @@ index 0acb21f3e6f..8d60dfb686d 100644 val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..5cd856ff7b6 100644 +index 09ed6955a51..a49e9a6fe41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -3015,16 +2995,17 @@ index 09ed6955a51..5cd856ff7b6 100644 checkAllParquetReaders( values = Seq("1.23", "10.34"), fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite +@@ -322,8 +329,8 @@ class ParquetTypeWideningSuite Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) } test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + - s"Decimal($toPrecision, $toScale)" +- ) { + s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720") - ) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { checkAllParquetReaders( values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index ca5341f7f6..736fb952d9 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -2986,7 +2986,7 @@ index 3072657a095..6b5b9103363 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index c530dc0d3df..418d5ea4b4d 100644 +index c530dc0d3df..3eac4aff201 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2997,17 +2997,7 @@ index c530dc0d3df..418d5ea4b4d 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -3020,7 +3010,7 @@ index c530dc0d3df..418d5ea4b4d 100644 } } } -@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -3030,7 +3020,7 @@ index c530dc0d3df..418d5ea4b4d 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -3040,7 +3030,7 @@ index c530dc0d3df..418d5ea4b4d 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -3050,16 +3040,6 @@ index c530dc0d3df..418d5ea4b4d 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - withAllParquetReaders { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala index 30503af0fab..1491f4bc2d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRebaseDatetimeSuite.scala @@ -3157,7 +3137,7 @@ index 5c0b7def039..151184bc98c 100644 assert(fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size, s"Found ${fileSourceScanSchemata.size} file sources in dataframe, " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala -index 56076175d60..78c4a9755c0 100644 +index 56076175d60..e04d7aaadc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -27,7 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -3189,18 +3169,8 @@ index 56076175d60..78c4a9755c0 100644 withTempPath { dir => val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true) assert(e.getCause.isInstanceOf[SchemaColumnConvertNotSupportedException]) -@@ -1079,7 +1081,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { - } - } - -- test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array") { -+ test("SPARK-45604: schema mismatch failure error on timestamp_ntz to array", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - import testImplicits._ - - withTempPath { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala -index 09ed6955a51..5cd856ff7b6 100644 +index 09ed6955a51..a49e9a6fe41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypeWideningSuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter @@ -3282,16 +3252,17 @@ index 09ed6955a51..5cd856ff7b6 100644 checkAllParquetReaders( values = Seq("1.23", "10.34"), fromType = DecimalType(fromPrecision, 2), -@@ -322,7 +329,8 @@ class ParquetTypeWideningSuite +@@ -322,8 +329,8 @@ class ParquetTypeWideningSuite Seq((5, 2) -> (6, 4), (10, 4) -> (12, 7), (20, 5) -> (22, 8)) } test(s"parquet decimal precision and scale change Decimal($fromPrecision, $fromScale) -> " + - s"Decimal($toPrecision, $toScale)" +- ) { + s"Decimal($toPrecision, $toScale)", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720") - ) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { checkAllParquetReaders( values = Seq("1.23", "10.34"), + fromType = DecimalType(fromPrecision, fromScale), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala index 1cc6d3afbee..8275727fbb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala From 60f2db18b4e9d3132642b7c97c614697e3572aa9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 May 2026 14:50:17 -0600 Subject: [PATCH 5/5] fix: ignore SPARK-26709 under native_datafusion in 3.x diffs The test reads a partitioned dataset where one partition is an empty parquet file written with INT32 schema and the other has 10 rows of INT64. Spark's vectorized reader silently skips the type check for the empty file because no row groups are scanned. The native_datafusion adapter rejects the INT32 to INT64 promotion at plan time regardless of file row count, so the test now fails when allow_type_promotion is false (Spark 3.x default). Tag the test with IgnoreCometNativeDataFusion under the existing 3720 umbrella in 3.4.3.diff and 3.5.8.diff. Spark 4.x defaults allow_type_promotion to true so its diffs are unaffected. --- dev/diffs/3.4.3.diff | 12 +++++++++++- dev/diffs/3.5.8.diff | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 3a19878fe1..0b57ca5839 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -918,7 +918,7 @@ index b5b34922694..a72403780c4 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 525d97e4998..8a3e7457618 100644 +index 525d97e4998..3c06f3f9261 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -931,6 +931,16 @@ index 525d97e4998..8a3e7457618 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -3107,7 +3108,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + } + +- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { ++ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 48ad10992c5..51d1ee65422 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 080663bac4..4e0dfaeb3c 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -937,7 +937,7 @@ index c26757c9cff..d55775f09d7 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 3cf2bfd17ab..b1c1e41e6a9 100644 +index 3cf2bfd17ab..44ac7b5369c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -950,6 +950,16 @@ index 3cf2bfd17ab..b1c1e41e6a9 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -3127,7 +3128,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + } + +- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { ++ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 8b4ac474f87..3f79f20822f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala