diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index ba8a9fb743..0714226b44 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -337,6 +337,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 66f0c7698f..4c9130cfca 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -176,6 +176,7 @@ jobs: org.apache.comet.parquet.ParquetReadV1Suite org.apache.comet.parquet.ParquetReadV2Suite org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite + org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuite org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite org.apache.spark.sql.comet.ParquetEncryptionITCase diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 8a939f2028..8d414db205 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2160,7 +2160,7 @@ index 29cb224c878..ee5a87fa200 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index d3a5c617dc..2db66cb34c 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -2127,7 +2127,7 @@ index f6472ba3d9d..5ea2d938664 100644 - test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") { + test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) { val data = (1 to 1000).map { i => val ts = new java.sql.Timestamp(i) Row(ts) diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 5948292d7b..9202464947 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -2729,7 +2729,7 @@ index 4474ec1fd42..05fa0257c82 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index bba71f1c48d..35247c13ad9 100644 +index bba71f1c48d..5a111a937a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -2740,17 +2740,7 @@ index bba71f1c48d..35247c13ad9 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -2763,7 +2753,7 @@ index bba71f1c48d..35247c13ad9 100644 } } } -@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -2773,7 +2763,7 @@ index bba71f1c48d..35247c13ad9 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -2783,7 +2773,7 @@ index bba71f1c48d..35247c13ad9 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -2793,7 +2783,7 @@ index bba71f1c48d..35247c13ad9 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1131,7 +1139,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 4ffb5377bf..f198e9975b 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -3101,7 +3101,7 @@ index 3072657a095..b2293ccab17 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index c530dc0d3df..abf36a7ab09 100644 +index c530dc0d3df..bd0b5042a24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat @@ -3112,17 +3112,7 @@ index c530dc0d3df..abf36a7ab09 100644 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.util.ArrayData -@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") { -+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) { - val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false))) - - Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType => -@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -318,7 +319,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } @@ -3132,7 +3122,7 @@ index c530dc0d3df..abf36a7ab09 100644 def testIgnoreCorruptFiles(options: Map[String, String]): Unit = { withTempDir { dir => val basePath = dir.getCanonicalPath -@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS Seq(Some("A"), Some("A"), None).toDF().repartition(1) .write.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) @@ -3145,7 +3135,7 @@ index c530dc0d3df..abf36a7ab09 100644 } } } -@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } @@ -3155,7 +3145,7 @@ index c530dc0d3df..abf36a7ab09 100644 def readParquet(schema: String, path: File): DataFrame = { spark.read.schema(schema).parquet(path.toString) } -@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS checkAnswer(readParquet(schema2, path), df) } @@ -3165,7 +3155,7 @@ index c530dc0d3df..abf36a7ab09 100644 val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" checkAnswer(readParquet(schema1, path), df) val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" -@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") df.write.parquet(path.toString) @@ -3175,7 +3165,7 @@ index c530dc0d3df..abf36a7ab09 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) -@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } diff --git a/docs/source/user-guide/latest/compatibility/scans.md b/docs/source/user-guide/latest/compatibility/scans.md index d68c59d562..315b07fd82 100644 --- a/docs/source/user-guide/latest/compatibility/scans.md +++ b/docs/source/user-guide/latest/compatibility/scans.md @@ -81,6 +81,22 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`, matching Spark's behavior. +The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0 +without falling back to Spark: + +- Reading `TimestampLTZ` as `TimestampNTZ`. On Spark 3.x, Spark raises an error per + [SPARK-36182](https://issues.apache.org/jira/browse/SPARK-36182) because LTZ encodes UTC-adjusted instants + that cannot be safely reinterpreted as timezone-free values. Comet does not raise this error and instead + returns the raw UTC instant as a `TimestampNTZ` value. This applies to all LTZ physical encodings (INT96, + TIMESTAMP_MICROS, TIMESTAMP_MILLIS). On Spark 4.0+, this read is permitted + ([SPARK-47447](https://issues.apache.org/jira/browse/SPARK-47447)) and Comet matches Spark's behavior. + See [#4219](https://github.com/apache/datafusion-comet/issues/4219). + +- Unsupported Parquet type conversions. Spark raises schema incompatibility errors for certain conversions + (e.g., reading INT32 as BIGINT, reading BINARY as TIMESTAMP, unsupported decimal precision changes). + The `native_datafusion` scan may not detect these mismatches and could return unexpected values instead + of raising an error. See [#3720](https://github.com/apache/datafusion-comet/issues/3720). + ## `native_iceberg_compat` Limitations The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results diff --git a/docs/source/user-guide/latest/compatibility/spark-versions.md b/docs/source/user-guide/latest/compatibility/spark-versions.md index 115b1595be..ec9764099b 100644 --- a/docs/source/user-guide/latest/compatibility/spark-versions.md +++ b/docs/source/user-guide/latest/compatibility/spark-versions.md @@ -28,10 +28,32 @@ compatibility guide. Spark 3.4.3 is supported with Java 11/17 and Scala 2.12/2.13. +### Known Limitations + +- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.4 raises an error for this operation + (SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + +- **Unsupported Parquet type conversions**: Spark 3.4 raises schema incompatibility errors for + certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's + `native_datafusion` scan may not detect these and could return unexpected values. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + ## Spark 3.5 Spark 3.5.8 is supported with Java 11/17 and Scala 2.12/2.13. +### Known Limitations + +- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.5 raises an error for this operation + (SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + +- **Unsupported Parquet type conversions**: Spark 3.5 raises schema incompatibility errors for + certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's + `native_datafusion` scan may not detect these and could return unexpected values. + See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details. + ## Spark 4.0 Spark 4.0.2 is supported with Java 17 and Scala 2.13. diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala new file mode 100644 index 0000000000..cd7d2a6271 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetTimestampLtzAsNtzSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import java.sql.Timestamp + +import org.apache.spark.SparkException +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus + +/** + * Tests for reading Parquet TimestampLTZ columns as TimestampNTZ. + * + * Prior to Spark 4.0, Spark raises an error (SPARK-36182) when asked to read TimestampLTZ as + * TimestampNTZ. Comet should match this behavior. In Spark 4.0+, this read is permitted + * (SPARK-47447) and Comet should produce matching results. + * + * See https://github.com/apache/datafusion-comet/issues/4219 + */ +class ParquetTimestampLtzAsNtzSuite extends CometTestBase { + import testImplicits._ + + private val tsTypes = Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS") + + tsTypes.foreach { tsType => + test(s"read TimestampLTZ ($tsType) as TimestampNTZ throws pre-Spark 4") { + assume(!isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") + + val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get() + assume( + scanImpl != CometConf.SCAN_AUTO && scanImpl != CometConf.SCAN_NATIVE_DATAFUSION, + s"https://github.com/apache/datafusion-comet/issues/4219 ($scanImpl scan does not " + + "reject TimestampLTZ read as TimestampNTZ)") + + val sessionTz = "America/Los_Angeles" + + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) + + // Spark refuses to read TimestampLTZ as TimestampNTZ (SPARK-36182) + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + intercept[SparkException] { + spark.read.schema("ts timestamp_ntz").parquet(path).collect() + } + } + + // Comet should also refuse + intercept[SparkException] { + spark.read.schema("ts timestamp_ntz").parquet(path).collect() + } + } + } + } + } + + tsTypes.foreach { tsType => + test(s"read TimestampLTZ ($tsType) as TimestampNTZ matches Spark") { + assume(isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ") + val sessionTz = "America/Los_Angeles" + + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz, + SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path) + + withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) { + checkSparkAnswerAndOperator(spark.read.schema("ts timestamp_ntz").parquet(path)) + } + } + } + } + } +}