Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ jobs:
org.apache.comet.parquet.ParquetReadV1Suite
org.apache.comet.parquet.ParquetReadV2Suite
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuite
org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite
org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite
org.apache.spark.sql.comet.ParquetEncryptionITCase
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ jobs:
org.apache.comet.parquet.ParquetReadV1Suite
org.apache.comet.parquet.ParquetReadV2Suite
org.apache.comet.parquet.ParquetReadFromFakeHadoopFsSuite
org.apache.comet.parquet.ParquetTimestampLtzAsNtzSuite
org.apache.spark.sql.comet.ParquetDatetimeRebaseV1Suite
org.apache.spark.sql.comet.ParquetDatetimeRebaseV2Suite
org.apache.spark.sql.comet.ParquetEncryptionITCase
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2160,7 +2160,7 @@ index 29cb224c878..ee5a87fa200 100644

- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) {
val data = (1 to 1000).map { i =>
val ts = new java.sql.Timestamp(i)
Row(ts)
Expand Down
2 changes: 1 addition & 1 deletion dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ index f6472ba3d9d..5ea2d938664 100644

- test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ") {
+ test("SPARK-36182: can't read TimestampLTZ as TimestampNTZ",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/4219")) {
val data = (1 to 1000).map { i =>
val ts = new java.sql.Timestamp(i)
Row(ts)
Expand Down
22 changes: 6 additions & 16 deletions dev/diffs/4.0.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2729,7 +2729,7 @@ index 4474ec1fd42..05fa0257c82 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index bba71f1c48d..35247c13ad9 100644
index bba71f1c48d..5a111a937a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
Expand All @@ -2740,17 +2740,7 @@ index bba71f1c48d..35247c13ad9 100644
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") {
+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))

Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -996,7 +997,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
.write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
Expand All @@ -2763,7 +2753,7 @@ index bba71f1c48d..35247c13ad9 100644
}
}
}
@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1042,7 +1047,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
}

Expand All @@ -2773,7 +2763,7 @@ index bba71f1c48d..35247c13ad9 100644
def readParquet(schema: String, path: File): DataFrame = {
spark.read.schema(schema).parquet(path.toString)
}
@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1060,7 +1066,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
checkAnswer(readParquet(schema2, path), df)
}

Expand All @@ -2783,7 +2773,7 @@ index bba71f1c48d..35247c13ad9 100644
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1084,7 +1091,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
df.write.parquet(path.toString)

Expand All @@ -2793,7 +2783,7 @@ index bba71f1c48d..35247c13ad9 100644
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1131,7 +1139,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

Expand Down
24 changes: 7 additions & 17 deletions dev/diffs/4.1.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -3101,7 +3101,7 @@ index 3072657a095..b2293ccab17 100644
checkAnswer(
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index c530dc0d3df..abf36a7ab09 100644
index c530dc0d3df..bd0b5042a24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
Expand All @@ -3112,17 +3112,7 @@ index c530dc0d3df..abf36a7ab09 100644
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.util.ArrayData
@@ -185,7 +186,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

- test("SPARK-47447: read TimestampLTZ as TimestampNTZ") {
+ test("SPARK-47447: read TimestampLTZ as TimestampNTZ",
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3720")) {
val providedSchema = StructType(Seq(StructField("time", TimestampNTZType, false)))

Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS").foreach { tsType =>
@@ -318,7 +320,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -318,7 +319,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

Expand All @@ -3132,7 +3122,7 @@ index c530dc0d3df..abf36a7ab09 100644
def testIgnoreCorruptFiles(options: Map[String, String]): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
@@ -996,7 +999,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -996,7 +998,11 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
Seq(Some("A"), Some("A"), None).toDF().repartition(1)
.write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
Expand All @@ -3145,7 +3135,7 @@ index c530dc0d3df..abf36a7ab09 100644
}
}
}
@@ -1042,7 +1049,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1042,7 +1048,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96")
}

Expand All @@ -3155,7 +3145,7 @@ index c530dc0d3df..abf36a7ab09 100644
def readParquet(schema: String, path: File): DataFrame = {
spark.read.schema(schema).parquet(path.toString)
}
@@ -1060,7 +1068,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1060,7 +1067,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
checkAnswer(readParquet(schema2, path), df)
}

Expand All @@ -3165,7 +3155,7 @@ index c530dc0d3df..abf36a7ab09 100644
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
@@ -1084,7 +1093,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1084,7 +1092,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d")
df.write.parquet(path.toString)

Expand All @@ -3175,7 +3165,7 @@ index c530dc0d3df..abf36a7ab09 100644
checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("a DECIMAL(11, 2)", path), sql("SELECT 1.00"))
checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
@@ -1131,7 +1141,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
@@ -1131,7 +1140,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
}
}

Expand Down
16 changes: 16 additions & 0 deletions docs/source/user-guide/latest/compatibility/scans.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ requires `spark.comet.exec.enabled=true` because the scan node must be wrapped b
are detected at read time and raise a `SparkRuntimeException` with error class `_LEGACY_ERROR_TEMP_2093`,
matching Spark's behavior.

The following `native_datafusion` limitations may produce incorrect results on Spark versions prior to 4.0
without falling back to Spark:

- Reading `TimestampLTZ` as `TimestampNTZ`. On Spark 3.x, Spark raises an error per
[SPARK-36182](https://issues.apache.org/jira/browse/SPARK-36182) because LTZ encodes UTC-adjusted instants
that cannot be safely reinterpreted as timezone-free values. Comet does not raise this error and instead
returns the raw UTC instant as a `TimestampNTZ` value. This applies to all LTZ physical encodings (INT96,
TIMESTAMP_MICROS, TIMESTAMP_MILLIS). On Spark 4.0+, this read is permitted
([SPARK-47447](https://issues.apache.org/jira/browse/SPARK-47447)) and Comet matches Spark's behavior.
See [#4219](https://github.com/apache/datafusion-comet/issues/4219).

- Unsupported Parquet type conversions. Spark raises schema incompatibility errors for certain conversions
(e.g., reading INT32 as BIGINT, reading BINARY as TIMESTAMP, unsupported decimal precision changes).
The `native_datafusion` scan may not detect these mismatches and could return unexpected values instead
of raising an error. See [#3720](https://github.com/apache/datafusion-comet/issues/3720).

## `native_iceberg_compat` Limitations

The `native_iceberg_compat` scan has the following additional limitation that may produce incorrect results
Expand Down
22 changes: 22 additions & 0 deletions docs/source/user-guide/latest/compatibility/spark-versions.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,32 @@ compatibility guide.

Spark 3.4.3 is supported with Java 11/17 and Scala 2.12/2.13.

### Known Limitations

- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.4 raises an error for this operation
(SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.

- **Unsupported Parquet type conversions**: Spark 3.4 raises schema incompatibility errors for
certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's
`native_datafusion` scan may not detect these and could return unexpected values.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.

## Spark 3.5

Spark 3.5.8 is supported with Java 11/17 and Scala 2.12/2.13.

### Known Limitations

- **Reading `TimestampLTZ` as `TimestampNTZ`**: Spark 3.5 raises an error for this operation
(SPARK-36182), but Comet's `native_datafusion` scan silently returns the raw UTC value instead.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.

- **Unsupported Parquet type conversions**: Spark 3.5 raises schema incompatibility errors for
certain type mismatches (e.g., reading INT32 as BIGINT, decimal precision changes), but Comet's
`native_datafusion` scan may not detect these and could return unexpected values.
See [Parquet Compatibility](scans.md#native_datafusion-limitations) for details.

## Spark 4.0

Spark 4.0.2 is supported with Java 17 and Scala 2.13.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.parquet

import java.sql.Timestamp

import org.apache.spark.SparkException
import org.apache.spark.sql.CometTestBase
import org.apache.spark.sql.internal.SQLConf

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus

/**
* Tests for reading Parquet TimestampLTZ columns as TimestampNTZ.
*
* Prior to Spark 4.0, Spark raises an error (SPARK-36182) when asked to read TimestampLTZ as
* TimestampNTZ. Comet should match this behavior. In Spark 4.0+, this read is permitted
* (SPARK-47447) and Comet should produce matching results.
*
* See https://github.com/apache/datafusion-comet/issues/4219
*/
class ParquetTimestampLtzAsNtzSuite extends CometTestBase {
import testImplicits._

private val tsTypes = Seq("INT96", "TIMESTAMP_MICROS", "TIMESTAMP_MILLIS")

tsTypes.foreach { tsType =>
test(s"read TimestampLTZ ($tsType) as TimestampNTZ throws pre-Spark 4") {
assume(!isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ")

val scanImpl = CometConf.COMET_NATIVE_SCAN_IMPL.get()
assume(
scanImpl != CometConf.SCAN_AUTO && scanImpl != CometConf.SCAN_NATIVE_DATAFUSION,
s"https://github.com/apache/datafusion-comet/issues/4219 ($scanImpl scan does not " +
"reject TimestampLTZ read as TimestampNTZ)")

val sessionTz = "America/Los_Angeles"

withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType,
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { dir =>
val path = dir.getCanonicalPath
Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path)

// Spark refuses to read TimestampLTZ as TimestampNTZ (SPARK-36182)
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
intercept[SparkException] {
spark.read.schema("ts timestamp_ntz").parquet(path).collect()
}
}

// Comet should also refuse
intercept[SparkException] {
spark.read.schema("ts timestamp_ntz").parquet(path).collect()
}
}
}
}
}

tsTypes.foreach { tsType =>
test(s"read TimestampLTZ ($tsType) as TimestampNTZ matches Spark") {
assume(isSpark40Plus, "Spark 4.0+ allows reading TimestampLTZ as TimestampNTZ")
val sessionTz = "America/Los_Angeles"

withSQLConf(
SQLConf.SESSION_LOCAL_TIMEZONE.key -> sessionTz,
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsType,
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { dir =>
val path = dir.getCanonicalPath
Seq(Timestamp.valueOf("2020-01-01 12:00:00")).toDF("ts").write.parquet(path)

withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
checkSparkAnswerAndOperator(spark.read.schema("ts timestamp_ntz").parquet(path))
}
}
}
}
}
}
Loading