From 6b1e73303b8ab99d59be58221fb91b979ddcedf6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 18:15:08 -0600 Subject: [PATCH 1/9] test --- .../ParquetInt96NtzCorrectnessSuite.scala | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala new file mode 100644 index 0000000000..57b0ccd7af --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import java.sql.Timestamp +import java.time.LocalDateTime + +import org.apache.spark.SparkException +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf + +/** + * Demonstrates the correctness issue tracked in + * https://github.com/apache/datafusion-comet/issues/3720: when a Parquet file stores timestamps + * as INT96 (Spark's TimestampType, UTC-adjusted local-time semantics) and the read schema + * requests TimestampNTZ, the `native_datafusion` scan silently returns wall-clock values that + * disagree with what was written. Spark itself raises (SPARK-36182) to prevent the silent + * reinterpretation. + */ +class ParquetInt96NtzCorrectnessSuite extends CometTestBase { + import testImplicits._ + + test("INT96 TimestampType read as TimestampNTZ silently returns wrong values") { + val sessionTz = "America/Los_Angeles" + val written = "2020-01-01 12:00:00" + + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + + // Write "2020-01-01 12:00:00" America/Los_Angeles as INT96. The bits encode + // the UTC instant 2020-01-01 20:00:00; reading back as TimestampType applies + // session-TZ adjustment to recover the original local wall-clock value. + Seq(Timestamp.valueOf(written)).toDF("ts").write.parquet(path) + + // Reference behavior: Spark refuses to read INT96 as TimestampNTZ + // (SPARK-36182) because it cannot safely reinterpret an LTZ instant as NTZ. + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + intercept[SparkException] { + spark.read.schema("ts timestamp_ntz").parquet(path).collect() + } + } + + // native_datafusion does not refuse; it silently returns a value that + // disagrees with the wall-clock value originally written. This is the + // correctness issue the safety-check fallback is intended to prevent. + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { + val rows = spark.read.schema("ts timestamp_ntz").parquet(path).collect() + assert(rows.length == 1) + val actual = rows.head.getAs[LocalDateTime](0) + assert( + actual != LocalDateTime.parse("2020-01-01T12:00:00"), + s"native_datafusion returned the original wall-clock value $actual; " + + "expected a silently-shifted value demonstrating the LTZ->NTZ " + + "correctness divergence (issue #3720 / SPARK-36182).") + } + } + } + } +} From cc54c7b2aa7f1ee3297a19c73bcec51bb3f0708a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 18:48:12 -0600 Subject: [PATCH 2/9] update test --- .../ParquetInt96NtzCorrectnessSuite.scala | 62 +++++++++++-------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala index 57b0ccd7af..f0ef32efbb 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala @@ -20,28 +20,29 @@ package org.apache.comet.parquet import java.sql.Timestamp -import java.time.LocalDateTime import org.apache.spark.SparkException import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus /** - * Demonstrates the correctness issue tracked in - * https://github.com/apache/datafusion-comet/issues/3720: when a Parquet file stores timestamps - * as INT96 (Spark's TimestampType, UTC-adjusted local-time semantics) and the read schema - * requests TimestampNTZ, the `native_datafusion` scan silently returns wall-clock values that - * disagree with what was written. Spark itself raises (SPARK-36182) to prevent the silent - * reinterpretation. + * Tests for reading INT96 Parquet timestamps as TimestampNTZ. + * + * Prior to Spark 4.0, Spark raises an error (SPARK-36182) when asked to read INT96 as + * TimestampNTZ. Comet should match this behavior. In Spark 4.0+, this read is permitted and + * Comet should produce matching results. + * + * See https://github.com/apache/datafusion-comet/issues/3720 */ class ParquetInt96NtzCorrectnessSuite extends CometTestBase { import testImplicits._ - test("INT96 TimestampType read as TimestampNTZ silently returns wrong values") { + test("INT96 TimestampType read as TimestampNTZ throws pre-Spark 4") { + assume(!isSpark40Plus, "Spark 4.0+ allows reading INT96 as TimestampNTZ") val sessionTz = "America/Los_Angeles" - val written = "2020-01-01 12:00:00" withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, @@ -49,32 +50,39 @@ class ParquetInt96NtzCorrectnessSuite extends CometTestBase { SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { withTempPath { dir => val path = dir.getCanonicalPath + Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) - // Write "2020-01-01 12:00:00" America/Los_Angeles as INT96. The bits encode - // the UTC instant 2020-01-01 20:00:00; reading back as TimestampType applies - // session-TZ adjustment to recover the original local wall-clock value. - Seq(Timestamp.valueOf(written)).toDF("ts").write.parquet(path) - - // Reference behavior: Spark refuses to read INT96 as TimestampNTZ - // (SPARK-36182) because it cannot safely reinterpret an LTZ instant as NTZ. + // Spark refuses to read INT96 as TimestampNTZ (SPARK-36182) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { intercept[SparkException] { spark.read.schema("ts timestamp_ntz").parquet(path).collect() } } - // native_datafusion does not refuse; it silently returns a value that - // disagrees with the wall-clock value originally written. This is the - // correctness issue the safety-check fallback is intended to prevent. + // Comet should also refuse + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { + intercept[SparkException] { + spark.read.schema("ts timestamp_ntz").parquet(path).collect() + } + } + } + } + } + + test("INT96 TimestampType read as TimestampNTZ matches Spark") { + // assume(isSpark40Plus, "Spark 4.0+ allows reading INT96 as TimestampNTZ") + val sessionTz = "America/Los_Angeles" + + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { - val rows = spark.read.schema("ts timestamp_ntz").parquet(path).collect() - assert(rows.length == 1) - val actual = rows.head.getAs[LocalDateTime](0) - assert( - actual != LocalDateTime.parse("2020-01-01T12:00:00"), - s"native_datafusion returned the original wall-clock value $actual; " + - "expected a silently-shifted value demonstrating the LTZ->NTZ " + - "correctness divergence (issue #3720 / SPARK-36182).") + checkSparkAnswerAndOperator(spark.read.schema("ts timestamp_ntz").parquet(path)) } } } From 3d434a7f69b474cde1dd4a8f08f3ca00162fe2a1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:04:38 -0600 Subject: [PATCH 3/9] =?UTF-8?q?test:=20expect=20exception=20for=20INT96?= =?UTF-8?q?=E2=86=92NTZ=20read=20on=20pre-Spark=204?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spark 3.x raises per SPARK-36182; Comet should match. Skip test for native_datafusion/auto scans until #4219 is fixed. On Spark 4.0+ (SPARK-47447), verify Comet matches Spark instead. --- .../ParquetInt96NtzCorrectnessSuite.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala index f0ef32efbb..3d6577bf39 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala @@ -32,8 +32,8 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus * Tests for reading INT96 Parquet timestamps as TimestampNTZ. * * Prior to Spark 4.0, Spark raises an error (SPARK-36182) when asked to read INT96 as - * TimestampNTZ. Comet should match this behavior. In Spark 4.0+, this read is permitted and - * Comet should produce matching results. + * TimestampNTZ. Comet should match this behavior. In Spark 4.0+, this read is permitted and Comet + * should produce matching results. * * See https://github.com/apache/datafusion-comet/issues/3720 */ @@ -42,6 +42,13 @@ class ParquetInt96NtzCorrectnessSuite extends CometTestBase { test("INT96 TimestampType read as TimestampNTZ throws pre-Spark 4") { assume(!isSpark40Plus, "Spark 4.0+ allows reading INT96 as TimestampNTZ") + + val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get() + assume( + scanImpl != CometConf.SCAN_AUTO && scanImpl != CometConf.SCAN_NATIVE_DATAFUSION, + s"https://github.com/apache/datafusion-comet/issues/4219 ($scanImpl scan does not " + + "reject INT96 read as TimestampNTZ)") + val sessionTz = "America/Los_Angeles" withSQLConf( @@ -60,17 +67,15 @@ class ParquetInt96NtzCorrectnessSuite extends CometTestBase { } // Comet should also refuse - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { - intercept[SparkException] { - spark.read.schema("ts timestamp_ntz").parquet(path).collect() - } + intercept[SparkException] { + spark.read.schema("ts timestamp_ntz").parquet(path).collect() } } } } test("INT96 TimestampType read as TimestampNTZ matches Spark") { - // assume(isSpark40Plus, "Spark 4.0+ allows reading INT96 as TimestampNTZ") + assume(isSpark40Plus, "Spark 4.0+ allows reading INT96 as TimestampNTZ") val sessionTz = "America/Los_Angeles" withSQLConf( From 4f8a2b16422285336012657e16eab9719f61dae7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:10:44 -0600 Subject: [PATCH 4/9] =?UTF-8?q?add=20suite=20to=20CI,=20update=20spark=20d?= =?UTF-8?q?iff=20issue=20links=20for=20INT96=E2=86=92NTZ?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add ParquetInt96NtzCorrectnessSuite to pr_build workflows - Update SPARK-36182 ignore link from #3720 to #4219 in 3.4/3.5 diffs - Remove IgnoreCometNativeDataFusion from SPARK-47447 test in 4.0/4.1 diffs since Comet should match Spark 4 behavior --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + dev/diffs/3.4.3.diff | 2 +- dev/diffs/3.5.8.diff | 2 +- dev/diffs/4.0.2.diff | 6 ++---- dev/diffs/4.1.1.diff | 6 ++---- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index ba8a9fb743..1062623920 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -337,6 +337,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetInt96NtzCorrectnessSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 66f0c7698f..e182c9bd65 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -176,6 +176,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetInt96NtzCorrectnessSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 8a939f2028..8d414db205 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2160,7 +2160,7 @@ index 29cb224c878..ee5a87fa200 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index d3a5c617dc..2db66cb34c 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2127,7 +2127,7 @@ index f6472ba3d9d..5ea2d938664 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 5948292d7b..9daa97913d 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -2743,10 +2743,8 @@ index bba71f1c48d..35247c13ad9 100644 @@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + + test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 4ffb5377bf..ad058987ba 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -3115,10 +3115,8 @@ index c530dc0d3df..abf36a7ab09 100644 @@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { + + test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => From 5fcdf49791ca4377c7cbeb5c7857dc22bd68539a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:12:59 -0600 Subject: [PATCH 5/9] =?UTF-8?q?docs:=20document=20INT96=E2=86=92NTZ=20limi?= =?UTF-8?q?tation=20in=20compatibility=20guide?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/source/user-guide/latest/compatibility/scans.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index d68c59d562..53a754bb2f 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -81,6 +81,16 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. +The following `native_datafusion` limitation may produce incorrect results on Spark versions prior to 4.0 +without falling back to Spark: + +- Reading INT96 timestamps as `TimestampNTZ`. On Spark 3.x, Spark raises an error per + [SPARK-36182](https://issues.apache.org/jira/browse/SPARK-36182) because INT96 encodes UTC-adjusted instants + that cannot be safely reinterpreted as timezone-free values. Comet does not raise this error and instead + returns the raw UTC instant as a `TimestampNTZ` value. On Spark 4.0+, this read is permitted + ([SPARK-47447](https://issues.apache.org/jira/browse/SPARK-47447)) and Comet matches Spark's behavior. + See [#4219](https://github.com/apache/datafusion-comet/issues/4219). + ## `native_iceberg_compat` Limitations The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results From e5da30233cf39966e504a1ff2289e576e2af0993 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:18:45 -0600 Subject: [PATCH 6/9] fix: regenerate 4.0/4.1 diffs properly via Spark worktree Remove IgnoreCometNativeDataFusion from SPARK-47447 test by applying diffs to Spark v4.0.2/v4.1.1, editing, and regenerating with git diff. --- dev/diffs/4.0.2.diff | 20 ++++++-------------- dev/diffs/4.1.1.diff | 22 +++++++--------------- 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 9daa97913d..9202464947 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -2729,7 +2729,7 @@ index 4474ec1fd42..05fa0257c82 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..35247c13ad9 100644 +index bba71f1c48d..5a111a937a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2740,15 +2740,7 @@ index bba71f1c48d..35247c13ad9 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - - test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2761,7 +2753,7 @@ index bba71f1c48d..35247c13ad9 100644 } } } -@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -2771,7 +2763,7 @@ index bba71f1c48d..35247c13ad9 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2781,7 +2773,7 @@ index bba71f1c48d..35247c13ad9 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2791,7 +2783,7 @@ index bba71f1c48d..35247c13ad9 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1131,7 +1139,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index ad058987ba..f198e9975b 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -3101,7 +3101,7 @@ index 3072657a095..b2293ccab17 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index c530dc0d3df..abf36a7ab09 100644 +index c530dc0d3df..bd0b5042a24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -3112,15 +3112,7 @@ index c530dc0d3df..abf36a7ab09 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - - test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -318,7 +319,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -3130,7 +3122,7 @@ index c530dc0d3df..abf36a7ab09 100644 def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath -@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -3143,7 +3135,7 @@ index c530dc0d3df..abf36a7ab09 100644 } } } -@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -3153,7 +3145,7 @@ index c530dc0d3df..abf36a7ab09 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -3163,7 +3155,7 @@ index c530dc0d3df..abf36a7ab09 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -3173,7 +3165,7 @@ index c530dc0d3df..abf36a7ab09 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } From bf6ac924b8b96e7f41af51b1eb784fdb7100bb98 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:19:57 -0600 Subject: [PATCH 7/9] rename suite to ParquetTimestampLtzAsNtzSuite The issue is not specific to INT96; it applies to any TimestampLTZ column read as TimestampNTZ regardless of physical encoding. --- .github/workflows/pr_build_linux.yml | 2 +- .github/workflows/pr_build_macos.yml | 2 +- ...la => ParquetTimestampLtzAsNtzSuite.scala} | 24 +++++++++---------- 3 files changed, 14 insertions(+), 14 deletions(-) rename spark/src/test/scala/org/apache/comet/parquet/{ParquetInt96NtzCorrectnessSuite.scala => ParquetTimestampLtzAsNtzSuite.scala} (79%) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index 1062623920..0714226b44 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -337,7 +337,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite - org.apache.comet.parquet.ParquetInt96NtzCorrectnessSuite + org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index e182c9bd65..4c9130cfca 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -176,7 +176,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite - org.apache.comet.parquet.ParquetInt96NtzCorrectnessSuite + org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala similarity index 79% rename from spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala rename to spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala index 3d6577bf39..bb70be20dc 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetInt96NtzCorrectnessSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala @@ -29,25 +29,25 @@ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus /** - * Tests for reading INT96 Parquet timestamps as TimestampNTZ. + * Tests for reading Parquet TimestampLTZ columns as TimestampNTZ. * - * Prior to Spark 4.0, Spark raises an error (SPARK-36182) when asked to read INT96 as - * TimestampNTZ. Comet should match this behavior. In Spark 4.0+, this read is permitted and Comet - * should produce matching results. + * Prior to Spark 4.0, Spark raises an error (SPARK-36182) when asked to read TimestampLTZ as + * TimestampNTZ. Comet should match this behavior. In Spark 4.0+, this read is permitted + * (SPARK-47447) and Comet should produce matching results. * - * See https://github.com/apache/datafusion-comet/issues/3720 + * See https://github.com/apache/datafusion-comet/issues/4219 */ -class ParquetInt96NtzCorrectnessSuite extends CometTestBase { +class ParquetTimestampLtzAsNtzSuite extends CometTestBase { import testImplicits._ - test("INT96 TimestampType read as TimestampNTZ throws pre-Spark 4") { - assume(!isSpark40Plus, "Spark 4.0+ allows reading INT96 as TimestampNTZ") + test("read TimestampLTZ as TimestampNTZ throws pre-Spark 4") { + assume(!isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get() assume( scanImpl != CometConf.SCAN_AUTO && scanImpl != CometConf.SCAN_NATIVE_DATAFUSION, s"https://github.com/apache/datafusion-comet/issues/4219 ($scanImpl scan does not " + - "reject INT96 read as TimestampNTZ)") + "reject TimestampLTZ read as TimestampNTZ)") val sessionTz = "America/Los_Angeles" @@ -59,7 +59,7 @@ class ParquetInt96NtzCorrectnessSuite extends CometTestBase { val path = dir.getCanonicalPath Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) - // Spark refuses to read INT96 as TimestampNTZ (SPARK-36182) + // Spark refuses to read TimestampLTZ as TimestampNTZ (SPARK-36182) withSQLConf(CometConf.COMET_ENABLED.key -> "false") { intercept[SparkException] { spark.read.schema("ts timestamp_ntz").parquet(path).collect() @@ -74,8 +74,8 @@ class ParquetInt96NtzCorrectnessSuite extends CometTestBase { } } - test("INT96 TimestampType read as TimestampNTZ matches Spark") { - assume(isSpark40Plus, "Spark 4.0+ allows reading INT96 as TimestampNTZ") + test("read TimestampLTZ as TimestampNTZ matches Spark") { + assume(isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") val sessionTz = "America/Los_Angeles" withSQLConf( From 7f588ee469230e557a2b5b56eaadd86e93ef44c8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:20:58 -0600 Subject: [PATCH 8/9] test all LTZ encoding types (INT96, MICROS, MILLIS) --- .../ParquetTimestampLtzAsNtzSuite.scala | 74 ++++++++++--------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala index bb70be20dc..cd7d2a6271 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala @@ -40,54 +40,60 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class ParquetTimestampLtzAsNtzSuite extends CometTestBase { import testImplicits._ - test("read TimestampLTZ as TimestampNTZ throws pre-Spark 4") { - assume(!isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") + private val tsTypes = Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS") - val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get() - assume( - scanImpl != CometConf.SCAN_AUTO && scanImpl != CometConf.SCAN_NATIVE_DATAFUSION, - s"https://github.com/apache/datafusion-comet/issues/4219 ($scanImpl scan does not " + - "reject TimestampLTZ read as TimestampNTZ)") + tsTypes.foreach { tsType => + test(s"read TimestampLTZ ($tsType) as TimestampNTZ throws pre-Spark 4") { + assume(!isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") - val sessionTz = "America/Los_Angeles" + val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get() + assume( + scanImpl != CometConf.SCAN_AUTO && scanImpl != CometConf.SCAN_NATIVE_DATAFUSION, + s"https://github.com/apache/datafusion-comet/issues/4219 ($scanImpl scan does not " + + "reject TimestampLTZ read as TimestampNTZ)") - withSQLConf( - SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", - SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { dir => - val path = dir.getCanonicalPath - Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) + val sessionTz = "America/Los_Angeles" - // Spark refuses to read TimestampLTZ as TimestampNTZ (SPARK-36182) - withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) + + // Spark refuses to read TimestampLTZ as TimestampNTZ (SPARK-36182) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + intercept[SparkException] { + spark.read.schema("ts timestamp_ntz").parquet(path).collect() + } + } + + // Comet should also refuse intercept[SparkException] { spark.read.schema("ts timestamp_ntz").parquet(path).collect() } } - - // Comet should also refuse - intercept[SparkException] { - spark.read.schema("ts timestamp_ntz").parquet(path).collect() - } } } } - test("read TimestampLTZ as TimestampNTZ matches Spark") { - assume(isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") - val sessionTz = "America/Los_Angeles" + tsTypes.foreach { tsType => + test(s"read TimestampLTZ ($tsType) as TimestampNTZ matches Spark") { + assume(isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") + val sessionTz = "America/Los_Angeles" - withSQLConf( - SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, - SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96", - SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - withTempPath { dir => - val path = dir.getCanonicalPath - Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) - withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { - checkSparkAnswerAndOperator(spark.read.schema("ts timestamp_ntz").parquet(path)) + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { + checkSparkAnswerAndOperator(spark.read.schema("ts timestamp_ntz").parquet(path)) + } } } } From adaeeddb1b2101dced6878b48aea324375902a42 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 May 2026 19:24:45 -0600 Subject: [PATCH 9/9] docs: add type widening limitations for Spark 3.4/3.5 Broaden scans.md to cover all LTZ encodings and add note about unsupported type conversions (#3720). Add known limitations to Spark 3.4/3.5 sections linking to the scan compat doc. --- .../user-guide/latest/compatibility/scans.md | 14 ++++++++---- .../latest/compatibility/spark-versions.md | 22 +++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index 53a754bb2f..315b07fd82 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -81,16 +81,22 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. -The following `native_datafusion` limitation may produce incorrect results on Spark versions prior to 4.0 +The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0 without falling back to Spark: -- Reading INT96 timestamps as `TimestampNTZ`. On Spark 3.x, Spark raises an error per - [SPARK-36182](https://issues.apache.org/jira/browse/SPARK-36182) because INT96 encodes UTC-adjusted instants +- Reading `TimestampLTZ` as `TimestampNTZ`. On Spark 3.x, Spark raises an error per + [SPARK-36182](https://issues.apache.org/jira/browse/SPARK-36182) because LTZ encodes UTC-adjusted instants that cannot be safely reinterpreted as timezone-free values. Comet does not raise this error and instead - returns the raw UTC instant as a `TimestampNTZ` value. On Spark 4.0+, this read is permitted + returns the raw UTC instant as a `TimestampNTZ` value. This applies to all LTZ physical encodings (INT96, + TIMESTAMP_MICROS, TIMESTAMP_MILLIS). On Spark 4.0+, this read is permitted ([SPARK-47447](https://issues.apache.org/jira/browse/SPARK-47447)) and Comet matches Spark's behavior. See [#4219](https://github.com/apache/datafusion-comet/issues/4219). +- Unsupported Parquet type conversions. Spark raises schema incompatibility errors for certain conversions + (e.g., reading INT32 as BIGINT, reading BINARY as TIMESTAMP, unsupported decimal precision changes). + The `native_datafusion` scan may not detect these mismatches and could return unexpected values instead + of raising an error. See [#3720](https://github.com/apache/datafusion-comet/issues/3720). + ## `native_iceberg_compat` Limitations The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results diff --git a/docs/source/user-guide/latest/compatibility/spark-versions.md b/docs/source/user-guide/latest/compatibility/spark-versions.md index 115b1595be..ec9764099b 100644 --- a/docs/source/user-guide/latest/compatibility/spark-versions.md +++ b/docs/source/user-guide/latest/compatibility/spark-versions.md @@ -28,10 +28,32 @@ compatibility guide. Spark 3.4.3 is supported with Java 11/17 and Scala 2.12/2.13. +### Known Limitations + +- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.4 raises an error for this operation + (SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + +- **Unsupported Parquet type conversions**: Spark 3.4 raises schema incompatibility errors for + certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's + `native_datafusion` scan may not detect these and could return unexpected values. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + ## Spark 3.5 Spark 3.5.8 is supported with Java 11/17 and Scala 2.12/2.13. +### Known Limitations + +- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.5 raises an error for this operation + (SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + +- **Unsupported Parquet type conversions**: Spark 3.5 raises schema incompatibility errors for + certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's + `native_datafusion` scan may not detect these and could return unexpected values. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + ## Spark 4.0 Spark 4.0.2 is supported with Java 17 and Scala 2.13.