From db887dc1e8ddaaa3f647db420aaf6b3a31d8d986 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 4 May 2026 17:14:44 -0400 Subject: [PATCH 1/7] Stash with two passing tests. --- native/core/src/execution/planner.rs | 10 +- native/core/src/execution/serde.rs | 12 +- native/core/src/parquet/mod.rs | 3 + native/core/src/parquet/parquet_exec.rs | 4 +- native/core/src/parquet/parquet_support.rs | 75 ++++++-- native/core/src/parquet/schema_adapter.rs | 167 ++++++++++++------ native/proto/src/proto/operator.proto | 9 + native/proto/src/proto/types.proto | 8 + .../apache/comet/rules/CometScanRule.scala | 6 - .../apache/comet/serde/QueryPlanSerde.scala | 17 ++ .../serde/operator/CometNativeScan.scala | 11 ++ .../apache/comet/serde/operator/package.scala | 13 ++ .../comet/parquet/ParquetReadSuite.scala | 93 ++++++++++ 13 files changed, 353 insertions(+), 75 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 5522e5b90e..609f593897 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1325,6 +1325,7 @@ impl PhysicalPlanner { common.case_sensitive, self.session_ctx(), common.encryption_enabled, + common.use_field_id, )?; Ok(( vec![], @@ -3010,11 +3011,16 @@ fn convert_spark_types_to_arrow_schema( let arrow_fields = spark_types .iter() .map(|spark_type| { - Field::new( + let field = Field::new( String::clone(&spark_type.name), to_arrow_datatype(spark_type.data_type.as_ref().unwrap()), spark_type.nullable, - ) + ); + if spark_type.metadata.is_empty() { + field + } else { + field.with_metadata(spark_type.metadata.clone()) + } }) .collect_vec(); let arrow_schema: SchemaRef = Arc::new(Schema::new(arrow_fields)); diff --git a/native/core/src/execution/serde.rs b/native/core/src/execution/serde.rs index ae0554ee76..5d60288f68 100644 --- a/native/core/src/execution/serde.rs +++ b/native/core/src/execution/serde.rs @@ -157,11 +157,19 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType { .iter() .enumerate() .map(|(idx, name)| { - Field::new( + let field = Field::new( name, to_arrow_datatype(&info.field_datatypes[idx]), info.field_nullable[idx], - ) + ); + // Attach Spark field metadata (currently parquet.field.id) when present. + // field_metadata is parallel to field_names; either empty or full length. + if let Some(meta) = info.field_metadata.get(idx) { + if !meta.metadata.is_empty() { + return field.with_metadata(meta.metadata.clone()); + } + } + field }) .collect(); ArrowDataType::Struct(fields) diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 61ff4fc0db..52d602e72a 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -513,6 +513,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat case_sensitive != JNI_FALSE, session_ctx, encryption_enabled, + // The iceberg-compat path resolves IDs in the JVM via NativeBatchReader, + // so the native side does not need to do field-ID matching here. + false, )?; let partition_index: usize = 0; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ef4c878b9a..f3057aecf7 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -73,13 +73,15 @@ pub(crate) fn init_datasource_exec( case_sensitive: bool, session_ctx: &Arc, encryption_enabled: bool, + use_field_id: bool, ) -> Result, ExecutionError> { - let (table_parquet_options, spark_parquet_options) = get_options( + let (table_parquet_options, mut spark_parquet_options) = get_options( session_timezone, case_sensitive, &object_store_url, encryption_enabled, ); + spark_parquet_options.use_field_id = use_field_id; // Determine the schema and projection to use for ParquetSource. // When data_schema is provided, use it as the base schema so DataFusion knows the full diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 3418a17c43..c125ba03c3 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -79,6 +79,10 @@ pub struct SparkParquetOptions { pub use_legacy_date_timestamp_or_ntz: bool, // Whether schema field names are case sensitive pub case_sensitive: bool, + /// When true, resolve fields by parquet.field.id metadata instead of name + /// (mirrors Spark's `spark.sql.parquet.fieldId.read.enabled`). Only takes effect + /// when both physical and logical fields actually carry IDs. + pub use_field_id: bool, } impl SparkParquetOptions { @@ -91,6 +95,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + use_field_id: false, } } @@ -103,6 +108,7 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + use_field_id: false, } } } @@ -243,9 +249,27 @@ fn parquet_convert_struct_to_struct( ) -> DataFusionResult { match (from_type, to_type) { (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { - // if dest and target schemas has any column in common - let mut field_overlap = false; - // TODO some of this logic may be specific to converting Parquet to Spark + // Match `from` (file) fields to `to` (logical) fields. Mirrors Spark's + // `clipParquetGroupFields`: when the logical struct carries `parquet.field.id` + // metadata anywhere in its fields, prefer ID match for ID-bearing logical + // fields; non-ID-bearing logical fields fall back to name match. When no + // logical field carries an ID, fall back to name match across the board. + let should_match_by_id = + parquet_options.use_field_id && to_fields.iter().any(|f| field_id(f).is_some()); + + // ID -> from-index, built only when ID matching is in play. + let from_id_to_index: HashMap = if should_match_by_id { + let mut map = HashMap::new(); + for (i, field) in from_fields.iter().enumerate() { + if let Some(id) = field_id(field) { + map.entry(id).or_insert(i); + } + } + map + } else { + HashMap::new() + }; + let mut field_name_to_index_map = HashMap::new(); for (i, field) in from_fields.iter().enumerate() { if parquet_options.case_sensitive { @@ -255,27 +279,44 @@ fn parquet_convert_struct_to_struct( } } assert_eq!(field_name_to_index_map.len(), from_fields.len()); + + let mut field_overlap = false; let mut cast_fields: Vec = Vec::with_capacity(to_fields.len()); for i in 0..to_fields.len() { - // Fields in the to_type schema may not exist in the from_type schema - // i.e. the required schema may have fields that the file does not - // have - let key = if parquet_options.case_sensitive { - to_fields[i].name().clone() + let to_field = &to_fields[i]; + let from_index = if should_match_by_id { + if let Some(id) = field_id(to_field) { + // Logical field carries an ID: ID-only match (Spark treats + // missing ID match as a missing column). + from_id_to_index.get(&id).copied() + } else { + // Logical field has no ID: name match. + let key = if parquet_options.case_sensitive { + to_field.name().clone() + } else { + to_field.name().to_lowercase() + }; + field_name_to_index_map.get(&key).copied() + } } else { - to_fields[i].name().to_lowercase() + let key = if parquet_options.case_sensitive { + to_field.name().clone() + } else { + to_field.name().to_lowercase() + }; + field_name_to_index_map.get(&key).copied() }; - if field_name_to_index_map.contains_key(&key) { - let from_index = field_name_to_index_map[&key]; + + if let Some(from_index) = from_index { let cast_field = parquet_convert_array( Arc::clone(array.column(from_index)), - to_fields[i].data_type(), + to_field.data_type(), parquet_options, )?; cast_fields.push(cast_field); field_overlap = true; } else { - cast_fields.push(new_null_array(to_fields[i].data_type(), array.len())); + cast_fields.push(new_null_array(to_field.data_type(), array.len())); } } @@ -297,6 +338,14 @@ fn parquet_convert_struct_to_struct( } } +/// Read the Parquet field id stored under arrow-rs's `PARQUET:field_id` field metadata key. +fn field_id(field: &arrow::datatypes::Field) -> Option { + field + .metadata() + .get("PARQUET:field_id") + .and_then(|v| v.parse::().ok()) +} + /// Cast a map type to another map type. The same as arrow-cast except we recursively call our own /// parquet_convert_array fn parquet_convert_map_to_map( diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 4e68902585..df3947dbed 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -17,7 +17,7 @@ use crate::parquet::cast_column::CometCastColumnExpr; use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::common::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_expr::expressions::Column; @@ -59,39 +59,106 @@ impl SparkPhysicalExprAdapterFactory { } } -/// Remap physical schema field names to match logical schema field names using -/// case-insensitive matching. This allows the DefaultPhysicalExprAdapter (which -/// uses exact name matching) to correctly find columns when the parquet file has -/// different casing than the table schema (e.g., file has "a" but table has "A"). -fn remap_physical_schema_names( +/// Field metadata key arrow-rs uses to carry Parquet field IDs through to the +/// Arrow schema (`PARQUET:field_id`). Mirrors `ParquetUtils.FIELD_ID_METADATA_KEY` +/// on the JVM side. +const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; + +fn parse_field_id(field: &Field) -> Option { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|v| v.parse::().ok()) +} + +fn schema_has_field_ids(schema: &SchemaRef) -> bool { + schema.fields().iter().any(|f| parse_field_id(f).is_some()) +} + +/// Remap physical schema field names to match logical schema field names. Mirrors Spark's +/// `clipParquetGroupFields`: prefer ID match for any logical field that carries a +/// `PARQUET:field_id`, fall back to case-insensitive name match otherwise. +/// +/// The remap only changes top-level field NAMES so that `DefaultPhysicalExprAdapter`'s +/// exact-name lookup hits. Indices, types, nullability, and metadata stay as in the file. +/// Returns the rewritten schema and a `logical_name -> original_physical_name` map used +/// downstream to restore the original physical names before stream consumption. +fn remap_physical_schema( logical_schema: &SchemaRef, physical_schema: &SchemaRef, -) -> SchemaRef { - let remapped_fields: Vec<_> = physical_schema + case_sensitive: bool, + use_field_id: bool, +) -> (SchemaRef, HashMap) { + let should_match_by_id = use_field_id && schema_has_field_ids(logical_schema); + + // Pre-build a lookup from id -> first matching logical field (Spark surfaces a + // duplicate-ID error during `clipParquetGroupFields`; we follow a similar shape but + // raise it lazily during `rewrite` if a duplicate is actually referenced). + let id_to_logical: HashMap = if should_match_by_id { + let mut map = HashMap::new(); + for lf in logical_schema.fields() { + if let Some(id) = parse_field_id(lf) { + map.entry(id).or_insert(lf); + } + } + map + } else { + HashMap::new() + }; + + let mut name_map: HashMap = HashMap::new(); + let remapped_fields: Vec = physical_schema .fields() .iter() .map(|field| { - if let Some(logical_field) = logical_schema - .fields() - .iter() - .find(|lf| lf.name().eq_ignore_ascii_case(field.name())) - { - if logical_field.name() != field.name() { - Arc::new(Field::new( - logical_field.name(), - field.data_type().clone(), - field.is_nullable(), - )) - } else { - Arc::clone(field) + // ID match first when the logical schema is ID-bearing. + if should_match_by_id { + if let Some(phys_id) = parse_field_id(field) { + if let Some(logical_field) = id_to_logical.get(&phys_id) { + if logical_field.name() != field.name() { + name_map.insert(logical_field.name().clone(), field.name().clone()); + return Arc::new( + Field::new( + logical_field.name(), + field.data_type().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + ); + } + return Arc::clone(field); + } + } + } + + // Case-insensitive name match. Skip logical fields that have IDs when the + // schema is ID-bearing — Spark's `matchIdField` does not fall through to a + // name match for ID-bearing logical fields. + if !case_sensitive { + let logical_field = logical_schema.fields().iter().find(|lf| { + let lf_has_id = should_match_by_id && parse_field_id(lf).is_some(); + !lf_has_id && lf.name().eq_ignore_ascii_case(field.name()) + }); + if let Some(logical_field) = logical_field { + if logical_field.name() != field.name() { + name_map.insert(logical_field.name().clone(), field.name().clone()); + return Arc::new( + Field::new( + logical_field.name(), + field.data_type().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + ); + } } - } else { - Arc::clone(field) } + + Arc::clone(field) }) .collect(); - Arc::new(Schema::new(remapped_fields)) + (Arc::new(Schema::new(remapped_fields)), name_map) } /// Check if a specific column name has duplicate matches in the physical schema @@ -117,34 +184,27 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { logical_file_schema: SchemaRef, physical_file_schema: SchemaRef, ) -> DataFusionResult> { - // When case-insensitive, remap physical schema field names to match logical - // field names. The DefaultPhysicalExprAdapter uses exact name matching, so - // without this remapping, columns like "a" won't match logical "A" and will - // be filled with nulls. + // Remap physical schema field names to match logical names by Parquet field id + // (when the logical schema carries IDs and `use_field_id` is set) and/or by + // case-insensitive name match. The DefaultPhysicalExprAdapter uses exact name + // matching, so without this remapping, columns whose file names differ from the + // logical names won't match and will be filled with NULLs. // - // We also build a reverse map (logical name -> physical name) so that after - // the default adapter produces expressions, we can remap column names back + // We also keep a reverse map (logical name -> original physical name) so that + // after the default adapter produces expressions, we can remap column names back // to the original physical names. This is necessary because downstream code - // (reassign_expr_columns) looks up columns by name in the actual stream - // schema, which uses the original physical file column names. + // (reassign_expr_columns) looks up columns by name in the actual stream schema, + // which uses the original physical file column names. + let needs_remap = !self.parquet_options.case_sensitive + || (self.parquet_options.use_field_id && schema_has_field_ids(&logical_file_schema)); let (adapted_physical_schema, logical_to_physical_names, original_physical_schema) = - if !self.parquet_options.case_sensitive { - let logical_to_physical: HashMap = logical_file_schema - .fields() - .iter() - .filter_map(|logical_field| { - physical_file_schema - .fields() - .iter() - .find(|pf| { - pf.name().eq_ignore_ascii_case(logical_field.name()) - && pf.name() != logical_field.name() - }) - .map(|pf| (logical_field.name().clone(), pf.name().clone())) - }) - .collect(); - let remapped = - remap_physical_schema_names(&logical_file_schema, &physical_file_schema); + if needs_remap { + let (remapped, logical_to_physical) = remap_physical_schema( + &logical_file_schema, + &physical_file_schema, + self.parquet_options.case_sensitive, + self.parquet_options.use_field_id, + ); ( remapped, if logical_to_physical.is_empty() { @@ -152,8 +212,13 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { } else { Some(logical_to_physical) }, - // Keep original physical schema for per-column duplicate detection - Some(Arc::clone(&physical_file_schema)), + // Keep original physical schema for per-column duplicate detection. + // Only meaningful in case-insensitive mode (matches existing behavior). + if !self.parquet_options.case_sensitive { + Some(Arc::clone(&physical_file_schema)) + } else { + None + }, ) } else { (Arc::clone(&physical_file_schema), None, None) diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index dd85d47e6e..7205c010dc 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -74,6 +74,9 @@ message SparkStructField { string name = 1; spark.spark_expression.DataType data_type = 2; bool nullable = 3; + // Spark field metadata. Currently used to carry parquet.field.id through + // to the native side. Empty when not needed. + map metadata = 4; } message Scan { @@ -107,6 +110,12 @@ message NativeScanCommon { bool encryption_enabled = 11; string source = 12; repeated spark.spark_expression.DataType fields = 13; + // True when spark.sql.parquet.fieldId.read.enabled is set and the requested + // schema actually carries parquet.field.id metadata. When false the native + // scan keeps its existing name-based path with no extra work. + bool use_field_id = 14; + // True when spark.sql.parquet.fieldId.read.ignoreMissing is set. + bool ignore_missing_field_id = 15; } message NativeScan { diff --git a/native/proto/src/proto/types.proto b/native/proto/src/proto/types.proto index 2fd3d59a73..fec972a8f0 100644 --- a/native/proto/src/proto/types.proto +++ b/native/proto/src/proto/types.proto @@ -91,6 +91,14 @@ message DataType { repeated string field_names = 1; repeated DataType field_datatypes = 2; repeated bool field_nullable = 3; + // Parallel to field_names. Each entry holds Spark field metadata for the + // corresponding nested field. Currently used to carry parquet.field.id + // through to the native side. Empty when no fields carry metadata. + repeated FieldMetadata field_metadata = 4; + } + + message FieldMetadata { + map metadata = 1; } DataTypeInfo type_info = 2; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 1c9ec98a7a..aee11d4ce4 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec} import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf @@ -237,11 +236,6 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support row index generation") return None } - if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) && - ParquetUtils.hasFieldIds(scanExec.requiredSchema)) { - withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") - return None - } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1cfb58b88b..eef3f35a8c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.comet.DecimalPrecision import org.apache.spark.sql.execution.{ScalarSubquery, SparkPlan} +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -454,6 +455,22 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { struct.addAllFieldDatatypes(fieldDatatypes.map(_.get).asJava) struct.addAllFieldNullable(fieldNullable) + val fieldIds = s.fields.map { f => + if (ParquetUtils.hasFieldId(f)) Some(ParquetUtils.getFieldId(f)) else None + } + if (fieldIds.exists(_.isDefined)) { + fieldIds.foreach { idOpt => + val metaBuilder = Types.DataType.FieldMetadata.newBuilder() + // Use arrow-rs's metadata key (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). + // Spark's local key is `parquet.field.id`; we translate at the proto boundary so + // the native side matches the same key it gets from arrow-rs. + idOpt.foreach { id => + metaBuilder.putMetadata("PARQUET:field_id", id.toString) + } + struct.addFieldMetadata(metaBuilder.build()) + } + } + info.setStruct(struct) builder.setTypeInfo(info.build()).build() case _ => builder.build() diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 70f06f5741..9b95e873e2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec} +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.comet.{CometConf, ConfigEntry} @@ -189,6 +190,16 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) + // Field-ID matching: only ask the native side to do extra work when the conf is on AND + // the requested schema actually carries IDs. Spark's ParquetReadSupport applies the same + // gate before invoking matchIdField. + val useFieldId = + scan.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) && + ParquetUtils.hasFieldIds(scan.requiredSchema) + commonBuilder.setUseFieldId(useFieldId) + commonBuilder.setIgnoreMissingFieldId( + scan.conf.getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)) + // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala index 7b811d09e7..35cdce8e5d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala @@ -21,18 +21,31 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.types.{StructField, StructType} import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} package object operator { + // Metadata key arrow-rs writes when it lifts Parquet field IDs into the Arrow schema + // (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). Spark's local key is `parquet.field.id` + // (`ParquetUtils.FIELD_ID_METADATA_KEY`); we translate at the proto boundary so the native + // side matches the same key it gets from arrow-rs. + private val PARQUET_FIELD_ID_META_KEY = "PARQUET:field_id" + def schema2Proto(fields: Array[StructField]): Array[OperatorOuterClass.SparkStructField] = { val fieldBuilder = OperatorOuterClass.SparkStructField.newBuilder() fields.map { field => fieldBuilder.setName(field.name) fieldBuilder.setDataType(serializeDataType(field.dataType).get) fieldBuilder.setNullable(field.nullable) + fieldBuilder.clearMetadata() + if (ParquetUtils.hasFieldId(field)) { + fieldBuilder.putMetadata( + PARQUET_FIELD_ID_META_KEY, + ParquetUtils.getFieldId(field).toString) + } fieldBuilder.build() } } diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 922a4255c4..dc9da34c3d 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1419,6 +1419,99 @@ abstract class ParquetReadSuite extends CometTestBase { } } } + + // Based on Spark ParquetFieldIdIOSuite.test("Parquet reads infer fields using field ids + // correctly"). Forces SCAN_NATIVE_DATAFUSION so we can prove that the gate in CometScanRule + // is removed and that the native_datafusion scan resolves columns by field id rather than by + // name (the read schema names differ from what is in the file). + test("native_datafusion: read by Parquet field id when names differ") { + val writeSchema = StructType( + Seq( + StructField("random", IntegerType, nullable = true, withId(1)), + StructField("name", StringType, nullable = true, withId(0)))) + val readSchema = StructType( + Seq( + StructField("a", StringType, nullable = true, withId(0)), + StructField("b", IntegerType, nullable = true, withId(1)))) + val writeData = Seq(Row(100, "text"), Row(200, "more")) + + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + spark + .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + checkSparkAnswerAndOperator(df) + } + } + } + + // Based on Spark ParquetFieldIdIOSuite.test("SPARK-38094: absence of field ids: reading nested + // schema"). Exercises ID matching at every nesting level (struct, array, map) under + // SCAN_NATIVE_DATAFUSION. Names differ from the file at every level. + test("native_datafusion: read nested types by Parquet field id when names differ") { + val writeSchema = StructType( + Seq(StructField( + "outer", + StructType(Seq( + StructField("inner_a", IntegerType, nullable = true, withId(11)), + StructField( + "inner_arr", + ArrayType(StructType(Seq( + StructField("ea", StringType, nullable = true, withId(21)), + StructField("eb", IntegerType, nullable = true, withId(22))))), + nullable = true, + withId(12)), + StructField( + "inner_map", + MapType(StringType, IntegerType, valueContainsNull = true), + nullable = true, + withId(13)))), + nullable = true, + withId(1)))) + + val readSchema = StructType( + Seq(StructField( + "renamed_outer", + StructType(Seq( + StructField("renamed_a", IntegerType, nullable = true, withId(11)), + StructField( + "renamed_arr", + ArrayType(StructType(Seq( + StructField("renamed_ea", StringType, nullable = true, withId(21)), + StructField("renamed_eb", IntegerType, nullable = true, withId(22))))), + nullable = true, + withId(12)), + StructField( + "renamed_map", + MapType(StringType, IntegerType, valueContainsNull = true), + nullable = true, + withId(13)))), + nullable = true, + withId(1)))) + + val data = Seq( + Row(Row(1, Seq(Row("x", 10), Row("y", 20)), Map("k1" -> 100))), + Row(Row(2, Seq(Row("z", 30)), Map("k2" -> 200, "k3" -> 300)))) + + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + spark + .createDataFrame(spark.sparkContext.parallelize(data), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + checkSparkAnswerAndOperator(df) + } + } + } } class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { From 2e3b4646e6baaabcae283bf56a12fab04582dbe8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 4 May 2026 17:23:05 -0400 Subject: [PATCH 2/7] Cleanup. --- .../comet/parquet/CometParquetUtils.scala | 6 ++ native/core/src/parquet/parquet_support.rs | 78 ++++++++----------- native/core/src/parquet/schema_adapter.rs | 18 ++--- .../apache/comet/serde/QueryPlanSerde.scala | 8 +- .../apache/comet/serde/operator/package.scala | 9 +-- 5 files changed, 52 insertions(+), 67 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala index 8bcf99dbd1..241405c0ea 100644 --- a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala +++ b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala @@ -29,6 +29,12 @@ object CometParquetUtils { private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled" private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing" + // Field-metadata key arrow-rs writes when it lifts Parquet field IDs into the Arrow schema + // (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). Spark's local key for the same concept is + // `parquet.field.id` (`ParquetUtils.FIELD_ID_METADATA_KEY`). The serde translates at the proto + // boundary so the native side can match the same key it gets from arrow-rs. + val PARQUET_FIELD_ID_META_KEY = "PARQUET:field_id" + // Map of encryption configuration key-value pairs that, if present, are only supported with // these specific values. Generally, these are the default values that won't be present, // but if they are present we want to check them. diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index c125ba03c3..27d72199a4 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -38,6 +38,7 @@ use datafusion_comet_spark_expr::EvalMode; use log::debug; use object_store::path::Path; use object_store::{parse_url, ObjectStore}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use std::collections::HashMap; use std::sync::OnceLock; use std::time::Duration; @@ -239,6 +240,14 @@ fn parquet_convert_array( } } +/// Read the Parquet field id stored under arrow-rs's `PARQUET_FIELD_ID_META_KEY`. +fn field_id(field: &arrow::datatypes::Field) -> Option { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|v| v.parse::().ok()) +} + /// Cast between struct types based on logic in /// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`. fn parquet_convert_struct_to_struct( @@ -250,14 +259,13 @@ fn parquet_convert_struct_to_struct( match (from_type, to_type) { (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { // Match `from` (file) fields to `to` (logical) fields. Mirrors Spark's - // `clipParquetGroupFields`: when the logical struct carries `parquet.field.id` - // metadata anywhere in its fields, prefer ID match for ID-bearing logical - // fields; non-ID-bearing logical fields fall back to name match. When no - // logical field carries an ID, fall back to name match across the board. + // `clipParquetGroupFields`: when the logical struct carries Parquet field IDs + // anywhere, ID-bearing logical fields match ONLY by ID; non-ID-bearing fields + // fall back to name match. When no logical field carries an ID, fall back to + // name match across the board. let should_match_by_id = parquet_options.use_field_id && to_fields.iter().any(|f| field_id(f).is_some()); - // ID -> from-index, built only when ID matching is in play. let from_id_to_index: HashMap = if should_match_by_id { let mut map = HashMap::new(); for (i, field) in from_fields.iter().enumerate() { @@ -270,58 +278,46 @@ fn parquet_convert_struct_to_struct( HashMap::new() }; - let mut field_name_to_index_map = HashMap::new(); - for (i, field) in from_fields.iter().enumerate() { + let normalize_name = |name: &str| -> String { if parquet_options.case_sensitive { - field_name_to_index_map.insert(field.name().clone(), i); + name.to_string() } else { - field_name_to_index_map.insert(field.name().to_lowercase(), i); + name.to_lowercase() } + }; + let mut field_name_to_index_map = HashMap::new(); + for (i, field) in from_fields.iter().enumerate() { + field_name_to_index_map.insert(normalize_name(field.name()), i); } assert_eq!(field_name_to_index_map.len(), from_fields.len()); let mut field_overlap = false; let mut cast_fields: Vec = Vec::with_capacity(to_fields.len()); - for i in 0..to_fields.len() { - let to_field = &to_fields[i]; - let from_index = if should_match_by_id { - if let Some(id) = field_id(to_field) { - // Logical field carries an ID: ID-only match (Spark treats - // missing ID match as a missing column). - from_id_to_index.get(&id).copied() - } else { - // Logical field has no ID: name match. - let key = if parquet_options.case_sensitive { - to_field.name().clone() - } else { - to_field.name().to_lowercase() - }; - field_name_to_index_map.get(&key).copied() - } - } else { - let key = if parquet_options.case_sensitive { - to_field.name().clone() - } else { - to_field.name().to_lowercase() - }; - field_name_to_index_map.get(&key).copied() + for to_field in to_fields.iter() { + let from_index = match (should_match_by_id, field_id(to_field)) { + // Spark treats a missing ID match as a missing column rather than + // falling back to name match. + (true, Some(id)) => from_id_to_index.get(&id).copied(), + _ => field_name_to_index_map + .get(&normalize_name(to_field.name())) + .copied(), }; if let Some(from_index) = from_index { - let cast_field = parquet_convert_array( + cast_fields.push(parquet_convert_array( Arc::clone(array.column(from_index)), to_field.data_type(), parquet_options, - )?; - cast_fields.push(cast_field); + )?); field_overlap = true; } else { cast_fields.push(new_null_array(to_field.data_type(), array.len())); } } - // If target schema doesn't contain any of the existing fields - // mark such a column in array as NULL + // When no `to` field overlaps with the source struct we treat the whole + // result as null, matching Spark's `castStruct` semantics for fully-disjoint + // schemas. let nulls = if field_overlap { array.nulls().cloned() } else { @@ -338,14 +334,6 @@ fn parquet_convert_struct_to_struct( } } -/// Read the Parquet field id stored under arrow-rs's `PARQUET:field_id` field metadata key. -fn field_id(field: &arrow::datatypes::Field) -> Option { - field - .metadata() - .get("PARQUET:field_id") - .and_then(|v| v.parse::().ok()) -} - /// Cast a map type to another map type. The same as arrow-cast except we recursively call our own /// parquet_convert_array fn parquet_convert_map_to_map( diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index df3947dbed..7f31def0c5 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -30,6 +30,7 @@ use datafusion_physical_expr_adapter::{ replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, }; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use std::collections::HashMap; use std::sync::Arc; @@ -59,11 +60,7 @@ impl SparkPhysicalExprAdapterFactory { } } -/// Field metadata key arrow-rs uses to carry Parquet field IDs through to the -/// Arrow schema (`PARQUET:field_id`). Mirrors `ParquetUtils.FIELD_ID_METADATA_KEY` -/// on the JVM side. -const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; - +/// Read the Parquet field id stored under arrow-rs's `PARQUET_FIELD_ID_META_KEY`. fn parse_field_id(field: &Field) -> Option { field .metadata() @@ -91,9 +88,9 @@ fn remap_physical_schema( ) -> (SchemaRef, HashMap) { let should_match_by_id = use_field_id && schema_has_field_ids(logical_schema); - // Pre-build a lookup from id -> first matching logical field (Spark surfaces a - // duplicate-ID error during `clipParquetGroupFields`; we follow a similar shape but - // raise it lazily during `rewrite` if a duplicate is actually referenced). + // Pre-build id -> first matching logical field. Spark surfaces a duplicate-ID error + // during `clipParquetGroupFields`; we follow the same first-wins shape and would raise + // a similar error lazily during `rewrite` if a duplicate is referenced. let id_to_logical: HashMap = if should_match_by_id { let mut map = HashMap::new(); for lf in logical_schema.fields() { @@ -131,9 +128,8 @@ fn remap_physical_schema( } } - // Case-insensitive name match. Skip logical fields that have IDs when the - // schema is ID-bearing — Spark's `matchIdField` does not fall through to a - // name match for ID-bearing logical fields. + // Name match. Spark's `matchIdField` does not fall through to a name match for + // ID-bearing logical fields, so skip those when the schema is ID-bearing. if !case_sensitive { let logical_field = logical_schema.fields().iter().find(|lf| { let lf_has_id = should_match_by_id && parse_field_id(lf).is_some(); diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index eef3f35a8c..1f3227f248 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions._ +import org.apache.comet.parquet.CometParquetUtils import org.apache.comet.serde.ExprOuterClass.{AggExpr, Expr, ScalarFunc} import org.apache.comet.serde.Types.{DataType => ProtoDataType} import org.apache.comet.serde.Types.DataType._ @@ -459,13 +460,12 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { if (ParquetUtils.hasFieldId(f)) Some(ParquetUtils.getFieldId(f)) else None } if (fieldIds.exists(_.isDefined)) { + // Emit one FieldMetadata entry per nested field, parallel to field_names. Entries + // for fields without an ID are empty so the slot index stays aligned. fieldIds.foreach { idOpt => val metaBuilder = Types.DataType.FieldMetadata.newBuilder() - // Use arrow-rs's metadata key (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). - // Spark's local key is `parquet.field.id`; we translate at the proto boundary so - // the native side matches the same key it gets from arrow-rs. idOpt.foreach { id => - metaBuilder.putMetadata("PARQUET:field_id", id.toString) + metaBuilder.putMetadata(CometParquetUtils.PARQUET_FIELD_ID_META_KEY, id.toString) } struct.addFieldMetadata(metaBuilder.build()) } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala index 35cdce8e5d..4bdfc58046 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala @@ -24,16 +24,11 @@ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.comet.parquet.CometParquetUtils import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} package object operator { - // Metadata key arrow-rs writes when it lifts Parquet field IDs into the Arrow schema - // (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). Spark's local key is `parquet.field.id` - // (`ParquetUtils.FIELD_ID_METADATA_KEY`); we translate at the proto boundary so the native - // side matches the same key it gets from arrow-rs. - private val PARQUET_FIELD_ID_META_KEY = "PARQUET:field_id" - def schema2Proto(fields: Array[StructField]): Array[OperatorOuterClass.SparkStructField] = { val fieldBuilder = OperatorOuterClass.SparkStructField.newBuilder() fields.map { field => @@ -43,7 +38,7 @@ package object operator { fieldBuilder.clearMetadata() if (ParquetUtils.hasFieldId(field)) { fieldBuilder.putMetadata( - PARQUET_FIELD_ID_META_KEY, + CometParquetUtils.PARQUET_FIELD_ID_META_KEY, ParquetUtils.getFieldId(field).toString) } fieldBuilder.build() From 9214d7b97155058af7ded3a17066ed2509a33fae Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 4 May 2026 17:50:40 -0400 Subject: [PATCH 3/7] Respect ignoreMissing and generate correct errors. --- native/common/src/error.rs | 41 +++++++++ native/core/src/execution/planner.rs | 1 + native/core/src/parquet/mod.rs | 1 + native/core/src/parquet/parquet_exec.rs | 2 + native/core/src/parquet/parquet_support.rs | 6 ++ native/core/src/parquet/schema_adapter.rs | 88 +++++++++++++++++-- .../comet/shims/ShimSparkErrorConverter.scala | 20 +++++ .../comet/shims/ShimSparkErrorConverter.scala | 20 +++++ .../comet/shims/ShimSparkErrorConverter.scala | 20 +++++ .../comet/parquet/ParquetReadSuite.scala | 63 +++++++++++++ 10 files changed, 256 insertions(+), 6 deletions(-) diff --git a/native/common/src/error.rs b/native/common/src/error.rs index ff9059571a..e43224cb40 100644 --- a/native/common/src/error.rs +++ b/native/common/src/error.rs @@ -187,6 +187,21 @@ pub enum SparkError { matched_fields: String, }, + /// Multiple Parquet fields share the same field id when the read schema requested an + /// id-based lookup. Mirrors Spark's `_LEGACY_ERROR_TEMP_2094` + /// (`foundDuplicateFieldInFieldIdLookupModeError`). + #[error("[_LEGACY_ERROR_TEMP_2094] Found duplicate field(s) by id: id={required_id} matches [{matched_fields}] in id-lookup mode")] + DuplicateFieldByFieldId { + required_id: i32, + matched_fields: String, + }, + + /// The read schema requests Parquet field-id matching but the file carries no field ids. + /// Mirrors the runtime error raised in Spark's `ParquetReadSupport` when + /// `spark.sql.parquet.fieldId.read.ignoreMissing` is false. + #[error("Spark read schema expects field Ids, but Parquet file schema doesn't contain any field Ids. Please remove the field ids from Spark schema or ignore missing ids by setting `spark.sql.parquet.fieldId.read.ignoreMissing = true`")] + ParquetMissingFieldIds, + /// Schema mismatch when reading a Parquet column under a requested schema /// that's incompatible with the physical column type. Translated by the JVM /// shim into Spark's `SchemaColumnConvertNotSupportedException`. The @@ -273,6 +288,8 @@ impl SparkError { SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", SparkError::FileNotFound { .. } => "FileNotFound", SparkError::DuplicateFieldCaseInsensitive { .. } => "DuplicateFieldCaseInsensitive", + SparkError::DuplicateFieldByFieldId { .. } => "DuplicateFieldByFieldId", + SparkError::ParquetMissingFieldIds => "ParquetMissingFieldIds", SparkError::ParquetSchemaConvert { .. } => "ParquetSchemaConvert", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", @@ -489,6 +506,15 @@ impl SparkError { "matchedOrcFields": matched_fields, }) } + SparkError::DuplicateFieldByFieldId { + required_id, + matched_fields, + } => { + serde_json::json!({ + "requiredId": required_id, + "matchedFields": matched_fields, + }) + } SparkError::ParquetSchemaConvert { file_path, column, @@ -577,6 +603,15 @@ impl SparkError { "org/apache/spark/SparkRuntimeException" } + // DuplicateFieldByFieldId - converted to SparkRuntimeException by the shim + // (Spark's `foundDuplicateFieldInFieldIdLookupModeError` returns SparkRuntimeException) + SparkError::DuplicateFieldByFieldId { .. } => "org/apache/spark/SparkRuntimeException", + + // ParquetMissingFieldIds - converted to a plain RuntimeException by the shim, + // matching the `RuntimeException` Spark's ParquetReadSupport throws when the + // file lacks field ids and `spark.sql.parquet.fieldId.read.ignoreMissing=false`. + SparkError::ParquetMissingFieldIds => "java/lang/RuntimeException", + // ParquetSchemaConvert - converted to SchemaColumnConvertNotSupportedException by the shim SparkError::ParquetSchemaConvert { .. } => { "org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException" @@ -661,6 +696,12 @@ impl SparkError { // Duplicate field in case-insensitive mode SparkError::DuplicateFieldCaseInsensitive { .. } => Some("_LEGACY_ERROR_TEMP_2093"), + // Duplicate field id in id-lookup mode + SparkError::DuplicateFieldByFieldId { .. } => Some("_LEGACY_ERROR_TEMP_2094"), + + // ParquetMissingFieldIds is a plain RuntimeException with no error class. + SparkError::ParquetMissingFieldIds => None, + // Parquet schema mismatch — translated to SchemaColumnConvertNotSupportedException // by the JVM shim. The shim wraps it in the version-appropriate // SparkException error class, so no error class is exposed here. diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 609f593897..d635582639 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1326,6 +1326,7 @@ impl PhysicalPlanner { self.session_ctx(), common.encryption_enabled, common.use_field_id, + common.ignore_missing_field_id, )?; Ok(( vec![], diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 52d602e72a..e73bd607ce 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -516,6 +516,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat // The iceberg-compat path resolves IDs in the JVM via NativeBatchReader, // so the native side does not need to do field-ID matching here. false, + false, )?; let partition_index: usize = 0; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index f3057aecf7..1a92d1a88e 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -74,6 +74,7 @@ pub(crate) fn init_datasource_exec( session_ctx: &Arc, encryption_enabled: bool, use_field_id: bool, + ignore_missing_field_id: bool, ) -> Result, ExecutionError> { let (table_parquet_options, mut spark_parquet_options) = get_options( session_timezone, @@ -82,6 +83,7 @@ pub(crate) fn init_datasource_exec( encryption_enabled, ); spark_parquet_options.use_field_id = use_field_id; + spark_parquet_options.ignore_missing_field_id = ignore_missing_field_id; // Determine the schema and projection to use for ParquetSource. // When data_schema is provided, use it as the base schema so DataFusion knows the full diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 27d72199a4..5f96895ba7 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -84,6 +84,10 @@ pub struct SparkParquetOptions { /// (mirrors Spark's `spark.sql.parquet.fieldId.read.enabled`). Only takes effect /// when both physical and logical fields actually carry IDs. pub use_field_id: bool, + /// When false (Spark's default), reading a file that has no field ids while the + /// requested schema does carry ids raises a runtime error rather than silently + /// producing nulls (mirrors `spark.sql.parquet.fieldId.read.ignoreMissing`). + pub ignore_missing_field_id: bool, } impl SparkParquetOptions { @@ -97,6 +101,7 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, use_field_id: false, + ignore_missing_field_id: false, } } @@ -110,6 +115,7 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, use_field_id: false, + ignore_missing_field_id: false, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 7f31def0c5..40cea642ac 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -85,12 +85,48 @@ fn remap_physical_schema( physical_schema: &SchemaRef, case_sensitive: bool, use_field_id: bool, -) -> (SchemaRef, HashMap) { + ignore_missing_field_id: bool, +) -> DataFusionResult<(SchemaRef, HashMap)> { let should_match_by_id = use_field_id && schema_has_field_ids(logical_schema); - // Pre-build id -> first matching logical field. Spark surfaces a duplicate-ID error - // during `clipParquetGroupFields`; we follow the same first-wins shape and would raise - // a similar error lazily during `rewrite` if a duplicate is referenced. + if should_match_by_id && !ignore_missing_field_id && !schema_has_field_ids(physical_schema) { + // Mirrors `ParquetReadSupport.inferSchema`'s eager check (Spark throws a runtime + // error rather than silently returning null columns). + return Err(DataFusionError::External(Box::new( + SparkError::ParquetMissingFieldIds, + ))); + } + + // Build id -> all matching physical field names. We need the full list so we can mirror + // Spark's `_LEGACY_ERROR_TEMP_2094` "Found duplicate field(s)" error when an ID-bearing + // logical field would resolve to more than one physical field. + let mut id_to_phys_names: HashMap> = HashMap::new(); + if should_match_by_id { + for pf in physical_schema.fields() { + if let Some(id) = parse_field_id(pf) { + id_to_phys_names + .entry(id) + .or_default() + .push(pf.name().clone()); + } + } + for lf in logical_schema.fields() { + if let Some(id) = parse_field_id(lf) { + if let Some(matches) = id_to_phys_names.get(&id) { + if matches.len() > 1 { + return Err(DataFusionError::External(Box::new( + SparkError::DuplicateFieldByFieldId { + required_id: id, + matched_fields: matches.join(", "), + }, + ))); + } + } + } + } + } + + // Pre-build id -> first matching logical field for the per-physical rename pass below. let id_to_logical: HashMap = if should_match_by_id { let mut map = HashMap::new(); for lf in logical_schema.fields() { @@ -103,6 +139,30 @@ fn remap_physical_schema( HashMap::new() }; + // Names of ID-bearing logical fields whose ID is not present in the file. Any physical + // field that shares one of these names must be renamed to something the + // `DefaultPhysicalExprAdapter` cannot name-match, otherwise the read would silently fall + // through to a name match. Spark's `matchIdField` solves the same problem with + // `generateFakeColumnName` (see `ParquetReadSupport.scala`). + let unmatched_id_logical_names: std::collections::HashSet = if should_match_by_id { + logical_schema + .fields() + .iter() + .filter_map(|lf| { + parse_field_id(lf).and_then(|id| { + if id_to_phys_names.contains_key(&id) { + None + } else { + Some(lf.name().clone()) + } + }) + }) + .collect() + } else { + std::collections::HashSet::new() + }; + let mut fake_counter: usize = 0; + let mut name_map: HashMap = HashMap::new(); let remapped_fields: Vec = physical_schema .fields() @@ -128,6 +188,21 @@ fn remap_physical_schema( } } + // Block accidental name match for ID-bearing logical fields whose ID is missing + // from the file. Mirrors Spark's `generateFakeColumnName` in `matchIdField`. + if should_match_by_id + && unmatched_id_logical_names + .iter() + .any(|name| name.eq_ignore_ascii_case(field.name())) + { + fake_counter += 1; + let fake_name = format!("__comet_unmatched_field_id_{}", fake_counter); + return Arc::new( + Field::new(fake_name, field.data_type().clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()), + ); + } + // Name match. Spark's `matchIdField` does not fall through to a name match for // ID-bearing logical fields, so skip those when the schema is ID-bearing. if !case_sensitive { @@ -154,7 +229,7 @@ fn remap_physical_schema( }) .collect(); - (Arc::new(Schema::new(remapped_fields)), name_map) + Ok((Arc::new(Schema::new(remapped_fields)), name_map)) } /// Check if a specific column name has duplicate matches in the physical schema @@ -200,7 +275,8 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { &physical_file_schema, self.parquet_options.case_sensitive, self.parquet_options.use_field_id, - ); + self.parquet_options.ignore_missing_field_id, + )?; ( remapped, if logical_to_physical.is_empty() { diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 9810b24be1..14062778d7 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -293,6 +293,26 @@ trait ShimSparkErrorConverter { params("requiredFieldName").toString, params("matchedOrcFields").toString)) + case "DuplicateFieldByFieldId" => + // Mirror Spark's `ParquetReadSupport.matchIdField` which calls + // `foundDuplicateFieldInFieldIdLookupModeError` when more than one Parquet field + // shares an id requested by the read schema. + Some( + QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + params("requiredId").toString.toInt, + params("matchedFields").toString)) + + case "ParquetMissingFieldIds" => + // Mirror Spark's `ParquetReadSupport.inferSchema`, which throws a plain + // `RuntimeException` (not a SparkException) when the read schema requests field + // ids and the file carries none. + Some( + new RuntimeException( + "Spark read schema expects field Ids, but Parquet file schema doesn't " + + "contain any field Ids. Please remove the field ids from Spark schema or " + + "ignore missing ids by setting " + + "`spark.sql.parquet.fieldId.read.ignoreMissing = true`")) + case "ParquetSchemaConvert" => // Mirror Spark's FileScanRDD: wrap the SchemaColumnConvertNotSupportedException // in a SparkException whose message is "Parquet column cannot be converted in diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 6cceb8bfc8..3853064fb2 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -289,6 +289,26 @@ trait ShimSparkErrorConverter { params("requiredFieldName").toString, params("matchedOrcFields").toString)) + case "DuplicateFieldByFieldId" => + // Mirror Spark's `ParquetReadSupport.matchIdField` which calls + // `foundDuplicateFieldInFieldIdLookupModeError` when more than one Parquet field + // shares an id requested by the read schema. + Some( + QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + params("requiredId").toString.toInt, + params("matchedFields").toString)) + + case "ParquetMissingFieldIds" => + // Mirror Spark's `ParquetReadSupport.inferSchema`, which throws a plain + // `RuntimeException` (not a SparkException) when the read schema requests field + // ids and the file carries none. + Some( + new RuntimeException( + "Spark read schema expects field Ids, but Parquet file schema doesn't " + + "contain any field Ids. Please remove the field ids from Spark schema or " + + "ignore missing ids by setting " + + "`spark.sql.parquet.fieldId.read.ignoreMissing = true`")) + case "ParquetSchemaConvert" => // Mirror Spark's FileScanRDD: wrap the SchemaColumnConvertNotSupportedException // in a SparkException whose message is "Parquet column cannot be converted in diff --git a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index bdb10fa2f2..edda9a53c2 100644 --- a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -295,6 +295,26 @@ trait ShimSparkErrorConverter { params("requiredFieldName").toString, params("matchedOrcFields").toString)) + case "DuplicateFieldByFieldId" => + // Mirror Spark's `ParquetReadSupport.matchIdField` which calls + // `foundDuplicateFieldInFieldIdLookupModeError` when more than one Parquet field + // shares an id requested by the read schema. + Some( + QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + params("requiredId").toString.toInt, + params("matchedFields").toString)) + + case "ParquetMissingFieldIds" => + // Mirror Spark's `ParquetReadSupport.inferSchema`, which throws a plain + // `RuntimeException` (not a SparkException) when the read schema requests field + // ids and the file carries none. + Some( + new RuntimeException( + "Spark read schema expects field Ids, but Parquet file schema doesn't " + + "contain any field Ids. Please remove the field ids from Spark schema or " + + "ignore missing ids by setting " + + "`spark.sql.parquet.fieldId.read.ignoreMissing = true`")) + case "ParquetSchemaConvert" => // Mirror Spark 4.0's FileDataSourceV2: wrap the // SchemaColumnConvertNotSupportedException in a FAILED_READ_FILE diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index dc9da34c3d..1769a017ce 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1512,6 +1512,69 @@ abstract class ParquetReadSuite extends CometTestBase { } } } + + // Mirrors Spark ParquetFieldIdIOSuite.test("multiple id matches"). The native scan must + // reject reads where the file has more than one column matching a requested field id, the + // same way Spark's `matchIdField` raises foundDuplicateFieldInFieldIdLookupModeError. + test("native_datafusion: duplicate Parquet field ids raise a runtime error") { + val writeSchema = StructType( + Seq( + StructField("a", IntegerType, nullable = true, withId(1)), + StructField("rand1", StringType, nullable = true, withId(2)), + StructField("rand2", StringType, nullable = true, withId(1)))) + val readSchema = StructType(Seq(StructField("a", IntegerType, nullable = true, withId(1)))) + val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr")) + + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + spark + .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + val cause = intercept[SparkException] { + spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() + }.getCause + assert(cause.getMessage.contains("Found duplicate field(s)")) + } + } + } + + // Mirrors Spark ParquetFieldIdIOSuite.test("read parquet file without ids"). The native + // scan must raise when the read schema requests ids but the file has none, and must NULL + // (not error) when `spark.sql.parquet.fieldId.read.ignoreMissing` is true. + test("native_datafusion: missing Parquet field ids respects ignoreMissing") { + val writeSchema = StructType( + Seq( + StructField("a", IntegerType, nullable = true), + StructField("rand1", StringType, nullable = true))) + val readSchema = StructType(Seq(StructField("a", IntegerType, nullable = true, withId(1)))) + val writeData = Seq(Row(100, "text"), Row(200, "more")) + + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + spark + .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + + val cause = intercept[SparkException] { + spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() + }.getCause + assert(cause.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + + withSQLConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key -> "true") { + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + checkSparkAnswerAndOperator(df) + } + } + } + } } class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { From c20b2fa333278ccde79c2b40a14a1159d00979c6 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 4 May 2026 18:05:11 -0400 Subject: [PATCH 4/7] Fix tests for Spark 4.x. --- .../comet/parquet/ParquetReadSuite.scala | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 1769a017ce..51b96be121 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1534,10 +1534,16 @@ abstract class ParquetReadSuite extends CometTestBase { .write .mode("overwrite") .parquet(dir.getCanonicalPath) - val cause = intercept[SparkException] { - spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() - }.getCause - assert(cause.getMessage.contains("Found duplicate field(s)")) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + // Spark 3.x wraps the error in SparkException; Spark 4.x throws SparkRuntimeException + // directly. checkSparkAnswerMaybeThrows asserts both sides throw and gives us the raw + // Throwables to message-match against, version-agnostic. + checkSparkAnswerMaybeThrows(df) match { + case (Some(sparkExc), Some(cometExc)) => + assert(sparkExc.getMessage.contains("Found duplicate field(s)")) + assert(cometExc.getMessage.contains("Found duplicate field(s)")) + case other => fail(s"Expected duplicate-field error from both sides, got $other") + } } } } @@ -1563,14 +1569,19 @@ abstract class ParquetReadSuite extends CometTestBase { .mode("overwrite") .parquet(dir.getCanonicalPath) - val cause = intercept[SparkException] { - spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() - }.getCause - assert(cause.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + checkSparkAnswerMaybeThrows(df) match { + case (Some(sparkExc), Some(cometExc)) => + assert( + sparkExc.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + assert( + cometExc.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + case other => fail(s"Expected missing-field-ids error from both sides, got $other") + } withSQLConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key -> "true") { - val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) - checkSparkAnswerAndOperator(df) + val ignoredDf = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + checkSparkAnswerAndOperator(ignoredDf) } } } From 2a3672b0ea5152bede83407d8c8eff586c8d01c9 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 4 May 2026 18:26:18 -0400 Subject: [PATCH 5/7] Fix tests for Spark 4.x. --- .../comet/parquet/ParquetReadSuite.scala | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 51b96be121..cf1194ab0f 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1352,6 +1352,18 @@ abstract class ParquetReadSuite extends CometTestBase { private def withId(id: Int) = new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() + // Spark 4.x wraps read errors in FAILED_READ_FILE.NO_HINT, so the underlying message + // is only reachable by walking the cause chain. + private def ˚causeChainContains(t: Throwable, needle: String): Boolean = { + var cur: Throwable = t + while (cur != null) { + val msg = cur.getMessage + if (msg != null && msg.contains(needle)) return true + cur = cur.getCause + } + false + } + // Based on Spark ParquetIOSuite.test("vectorized reader: array of nested struct") test("array of nested struct with and without field id") { val nestedSchema = StructType( @@ -1535,13 +1547,12 @@ abstract class ParquetReadSuite extends CometTestBase { .mode("overwrite") .parquet(dir.getCanonicalPath) val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) - // Spark 3.x wraps the error in SparkException; Spark 4.x throws SparkRuntimeException - // directly. checkSparkAnswerMaybeThrows asserts both sides throw and gives us the raw - // Throwables to message-match against, version-agnostic. + // Spark 3.x wraps the underlying error one level deep in SparkException; Spark 4.x + // wraps it in FAILED_READ_FILE.NO_HINT. Walk the cause chain to stay version-agnostic. checkSparkAnswerMaybeThrows(df) match { case (Some(sparkExc), Some(cometExc)) => - assert(sparkExc.getMessage.contains("Found duplicate field(s)")) - assert(cometExc.getMessage.contains("Found duplicate field(s)")) + assert(causeChainContains(sparkExc, "Found duplicate field(s)")) + assert(causeChainContains(cometExc, "Found duplicate field(s)")) case other => fail(s"Expected duplicate-field error from both sides, got $other") } } @@ -1573,9 +1584,9 @@ abstract class ParquetReadSuite extends CometTestBase { checkSparkAnswerMaybeThrows(df) match { case (Some(sparkExc), Some(cometExc)) => assert( - sparkExc.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + causeChainContains(sparkExc, "Parquet file schema doesn't contain any field Ids")) assert( - cometExc.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + causeChainContains(cometExc, "Parquet file schema doesn't contain any field Ids")) case other => fail(s"Expected missing-field-ids error from both sides, got $other") } From 6f56b0ab598f18c6bf7ab8eb1b3b2bacee8ac1ce Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 4 May 2026 18:44:30 -0400 Subject: [PATCH 6/7] Fix tests for Spark 4.x. --- .../test/scala/org/apache/comet/parquet/ParquetReadSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index cf1194ab0f..6ed7340e3d 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1354,7 +1354,7 @@ abstract class ParquetReadSuite extends CometTestBase { // Spark 4.x wraps read errors in FAILED_READ_FILE.NO_HINT, so the underlying message // is only reachable by walking the cause chain. - private def ˚causeChainContains(t: Throwable, needle: String): Boolean = { + private def causeChainContains(t: Throwable, needle: String): Boolean = { var cur: Throwable = t while (cur != null) { val msg = cur.getMessage From ebd164e30d3ee6b15906d52c7db7efbaef0feb30 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 5 May 2026 10:01:22 -0400 Subject: [PATCH 7/7] Adjust 4.x shim. --- .../comet/shims/ShimSparkErrorConverter.scala | 35 +++--- .../comet/parquet/ParquetReadSuite.scala | 107 ++++++++---------- 2 files changed, 70 insertions(+), 72 deletions(-) diff --git a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index edda9a53c2..851016601b 100644 --- a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -297,23 +297,28 @@ trait ShimSparkErrorConverter { case "DuplicateFieldByFieldId" => // Mirror Spark's `ParquetReadSupport.matchIdField` which calls - // `foundDuplicateFieldInFieldIdLookupModeError` when more than one Parquet field - // shares an id requested by the read schema. - Some( - QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( - params("requiredId").toString.toInt, - params("matchedFields").toString)) + // `foundDuplicateFieldInFieldIdLookupModeError`. Wrap in FAILED_READ_FILE.NO_HINT + // so the outer exception is a SparkException (matches what Spark's `FileScanRDD` + // produces for stock parquet-mr at task boundary on 4.x); keeps the underlying + // SparkRuntimeException as `getCause` so tests that assert a RuntimeException + // with "Found duplicate field(s)" still pass. + val dupCause = QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + params("requiredId").toString.toInt, + params("matchedFields").toString) + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(dupCause, filePath)) case "ParquetMissingFieldIds" => - // Mirror Spark's `ParquetReadSupport.inferSchema`, which throws a plain - // `RuntimeException` (not a SparkException) when the read schema requests field - // ids and the file carries none. - Some( - new RuntimeException( - "Spark read schema expects field Ids, but Parquet file schema doesn't " + - "contain any field Ids. Please remove the field ids from Spark schema or " + - "ignore missing ids by setting " + - "`spark.sql.parquet.fieldId.read.ignoreMissing = true`")) + // Mirror Spark's `ParquetReadSupport.inferSchema`. Same wrapping rationale as + // `DuplicateFieldByFieldId`: wrap the RuntimeException in FAILED_READ_FILE.NO_HINT + // so the outer is a SparkException. + val missingCause = new RuntimeException( + "Spark read schema expects field Ids, but Parquet file schema doesn't " + + "contain any field Ids. Please remove the field ids from Spark schema or " + + "ignore missing ids by setting " + + "`spark.sql.parquet.fieldId.read.ignoreMissing = true`") + val missingPath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(missingCause, missingPath)) case "ParquetSchemaConvert" => // Mirror Spark 4.0's FileDataSourceV2: wrap the diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 6ed7340e3d..3a634bb2b0 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1352,18 +1352,6 @@ abstract class ParquetReadSuite extends CometTestBase { private def withId(id: Int) = new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build() - // Spark 4.x wraps read errors in FAILED_READ_FILE.NO_HINT, so the underlying message - // is only reachable by walking the cause chain. - private def causeChainContains(t: Throwable, needle: String): Boolean = { - var cur: Throwable = t - while (cur != null) { - val msg = cur.getMessage - if (msg != null && msg.contains(needle)) return true - cur = cur.getCause - } - false - } - // Based on Spark ParquetIOSuite.test("vectorized reader: array of nested struct") test("array of nested struct with and without field id") { val nestedSchema = StructType( @@ -1525,74 +1513,79 @@ abstract class ParquetReadSuite extends CometTestBase { } } - // Mirrors Spark ParquetFieldIdIOSuite.test("multiple id matches"). The native scan must - // reject reads where the file has more than one column matching a requested field id, the - // same way Spark's `matchIdField` raises foundDuplicateFieldInFieldIdLookupModeError. - test("native_datafusion: duplicate Parquet field ids raise a runtime error") { - val writeSchema = StructType( - Seq( - StructField("a", IntegerType, nullable = true, withId(1)), - StructField("rand1", StringType, nullable = true, withId(2)), - StructField("rand2", StringType, nullable = true, withId(1)))) - val readSchema = StructType(Seq(StructField("a", IntegerType, nullable = true, withId(1)))) - val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr")) - + // Verbatim port of Spark `ParquetFieldIdIOSuite.test("multiple id matches")`, pinned to + // `SCAN_NATIVE_DATAFUSION` so the shim error path is exercised on both 3.x and 4.x. + // The stock suite is the CI signal but it requires the Spark test jars and + // `withAllParquetReaders`; keeping a copy here lets us iterate locally. + test("multiple id matches") { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { withTempPath { dir => + val readSchema = + new StructType() + .add("a", IntegerType, true, withId(1)) + + val writeSchema = + new StructType() + .add("a", IntegerType, true, withId(1)) + .add("rand1", StringType, true, withId(2)) + .add("rand2", StringType, true, withId(1)) + + val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr")) spark .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) .write .mode("overwrite") .parquet(dir.getCanonicalPath) - val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) - // Spark 3.x wraps the underlying error one level deep in SparkException; Spark 4.x - // wraps it in FAILED_READ_FILE.NO_HINT. Walk the cause chain to stay version-agnostic. - checkSparkAnswerMaybeThrows(df) match { - case (Some(sparkExc), Some(cometExc)) => - assert(causeChainContains(sparkExc, "Found duplicate field(s)")) - assert(causeChainContains(cometExc, "Found duplicate field(s)")) - case other => fail(s"Expected duplicate-field error from both sides, got $other") - } + + val cause = intercept[SparkException] { + spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() + }.getCause + assert( + cause.isInstanceOf[RuntimeException] && + cause.getMessage.contains("Found duplicate field(s)")) } } } - // Mirrors Spark ParquetFieldIdIOSuite.test("read parquet file without ids"). The native - // scan must raise when the read schema requests ids but the file has none, and must NULL - // (not error) when `spark.sql.parquet.fieldId.read.ignoreMissing` is true. - test("native_datafusion: missing Parquet field ids respects ignoreMissing") { - val writeSchema = StructType( - Seq( - StructField("a", IntegerType, nullable = true), - StructField("rand1", StringType, nullable = true))) - val readSchema = StructType(Seq(StructField("a", IntegerType, nullable = true, withId(1)))) - val writeData = Seq(Row(100, "text"), Row(200, "more")) - + // Verbatim port of Spark `ParquetFieldIdIOSuite.test("read parquet file without ids")`, + // pinned to `SCAN_NATIVE_DATAFUSION` for the same reason as the duplicate-id test above. + test("read parquet file without ids") { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { withTempPath { dir => + val readSchema = + new StructType() + .add("a", IntegerType, true, withId(1)) + + val writeSchema = + new StructType() + .add("a", IntegerType, true) + .add("rand1", StringType, true) + .add("rand2", StringType, true) + + val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr")) spark .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) .write .mode("overwrite") .parquet(dir.getCanonicalPath) - val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) - checkSparkAnswerMaybeThrows(df) match { - case (Some(sparkExc), Some(cometExc)) => - assert( - causeChainContains(sparkExc, "Parquet file schema doesn't contain any field Ids")) - assert( - causeChainContains(cometExc, "Parquet file schema doesn't contain any field Ids")) - case other => fail(s"Expected missing-field-ids error from both sides, got $other") - } - - withSQLConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key -> "true") { - val ignoredDf = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) - checkSparkAnswerAndOperator(ignoredDf) + Seq(readSchema, readSchema.add("b", StringType, true)).foreach { schema => + val cause = intercept[SparkException] { + spark.read.schema(schema).parquet(dir.getCanonicalPath).collect() + }.getCause + assert( + cause.isInstanceOf[RuntimeException] && + cause.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + val expectedValues = (1 to schema.length).map(_ => null) + withSQLConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key -> "true") { + checkAnswer( + spark.read.schema(schema).parquet(dir.getCanonicalPath), + Row(expectedValues: _*) :: Row(expectedValues: _*) :: Nil) + } } } }