diff --git a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala index 8bcf99dbd1..241405c0ea 100644 --- a/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala +++ b/common/src/main/scala/org/apache/comet/parquet/CometParquetUtils.scala @@ -29,6 +29,12 @@ object CometParquetUtils { private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled" private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing" + // Field-metadata key arrow-rs writes when it lifts Parquet field IDs into the Arrow schema + // (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). Spark's local key for the same concept is + // `parquet.field.id` (`ParquetUtils.FIELD_ID_METADATA_KEY`). The serde translates at the proto + // boundary so the native side can match the same key it gets from arrow-rs. + val PARQUET_FIELD_ID_META_KEY = "PARQUET:field_id" + // Map of encryption configuration key-value pairs that, if present, are only supported with // these specific values. Generally, these are the default values that won't be present, // but if they are present we want to check them. diff --git a/native/common/src/error.rs b/native/common/src/error.rs index ff9059571a..e43224cb40 100644 --- a/native/common/src/error.rs +++ b/native/common/src/error.rs @@ -187,6 +187,21 @@ pub enum SparkError { matched_fields: String, }, + /// Multiple Parquet fields share the same field id when the read schema requested an + /// id-based lookup. Mirrors Spark's `_LEGACY_ERROR_TEMP_2094` + /// (`foundDuplicateFieldInFieldIdLookupModeError`). + #[error("[_LEGACY_ERROR_TEMP_2094] Found duplicate field(s) by id: id={required_id} matches [{matched_fields}] in id-lookup mode")] + DuplicateFieldByFieldId { + required_id: i32, + matched_fields: String, + }, + + /// The read schema requests Parquet field-id matching but the file carries no field ids. + /// Mirrors the runtime error raised in Spark's `ParquetReadSupport` when + /// `spark.sql.parquet.fieldId.read.ignoreMissing` is false. + #[error("Spark read schema expects field Ids, but Parquet file schema doesn't contain any field Ids. Please remove the field ids from Spark schema or ignore missing ids by setting `spark.sql.parquet.fieldId.read.ignoreMissing = true`")] + ParquetMissingFieldIds, + /// Schema mismatch when reading a Parquet column under a requested schema /// that's incompatible with the physical column type. Translated by the JVM /// shim into Spark's `SchemaColumnConvertNotSupportedException`. The @@ -273,6 +288,8 @@ impl SparkError { SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows", SparkError::FileNotFound { .. } => "FileNotFound", SparkError::DuplicateFieldCaseInsensitive { .. } => "DuplicateFieldCaseInsensitive", + SparkError::DuplicateFieldByFieldId { .. } => "DuplicateFieldByFieldId", + SparkError::ParquetMissingFieldIds => "ParquetMissingFieldIds", SparkError::ParquetSchemaConvert { .. } => "ParquetSchemaConvert", SparkError::Arrow(_) => "Arrow", SparkError::Internal(_) => "Internal", @@ -489,6 +506,15 @@ impl SparkError { "matchedOrcFields": matched_fields, }) } + SparkError::DuplicateFieldByFieldId { + required_id, + matched_fields, + } => { + serde_json::json!({ + "requiredId": required_id, + "matchedFields": matched_fields, + }) + } SparkError::ParquetSchemaConvert { file_path, column, @@ -577,6 +603,15 @@ impl SparkError { "org/apache/spark/SparkRuntimeException" } + // DuplicateFieldByFieldId - converted to SparkRuntimeException by the shim + // (Spark's `foundDuplicateFieldInFieldIdLookupModeError` returns SparkRuntimeException) + SparkError::DuplicateFieldByFieldId { .. } => "org/apache/spark/SparkRuntimeException", + + // ParquetMissingFieldIds - converted to a plain RuntimeException by the shim, + // matching the `RuntimeException` Spark's ParquetReadSupport throws when the + // file lacks field ids and `spark.sql.parquet.fieldId.read.ignoreMissing=false`. + SparkError::ParquetMissingFieldIds => "java/lang/RuntimeException", + // ParquetSchemaConvert - converted to SchemaColumnConvertNotSupportedException by the shim SparkError::ParquetSchemaConvert { .. } => { "org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException" @@ -661,6 +696,12 @@ impl SparkError { // Duplicate field in case-insensitive mode SparkError::DuplicateFieldCaseInsensitive { .. } => Some("_LEGACY_ERROR_TEMP_2093"), + // Duplicate field id in id-lookup mode + SparkError::DuplicateFieldByFieldId { .. } => Some("_LEGACY_ERROR_TEMP_2094"), + + // ParquetMissingFieldIds is a plain RuntimeException with no error class. + SparkError::ParquetMissingFieldIds => None, + // Parquet schema mismatch — translated to SchemaColumnConvertNotSupportedException // by the JVM shim. The shim wraps it in the version-appropriate // SparkException error class, so no error class is exposed here. diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 844cc07c69..872a62cb80 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1326,6 +1326,8 @@ impl PhysicalPlanner { common.return_null_struct_if_all_fields_missing, self.session_ctx(), common.encryption_enabled, + common.use_field_id, + common.ignore_missing_field_id, )?; Ok(( vec![], @@ -3018,11 +3020,16 @@ fn convert_spark_types_to_arrow_schema( let arrow_fields = spark_types .iter() .map(|spark_type| { - Field::new( + let field = Field::new( String::clone(&spark_type.name), to_arrow_datatype(spark_type.data_type.as_ref().unwrap()), spark_type.nullable, - ) + ); + if spark_type.metadata.is_empty() { + field + } else { + field.with_metadata(spark_type.metadata.clone()) + } }) .collect_vec(); let arrow_schema: SchemaRef = Arc::new(Schema::new(arrow_fields)); diff --git a/native/core/src/execution/serde.rs b/native/core/src/execution/serde.rs index ae0554ee76..5d60288f68 100644 --- a/native/core/src/execution/serde.rs +++ b/native/core/src/execution/serde.rs @@ -157,11 +157,19 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType { .iter() .enumerate() .map(|(idx, name)| { - Field::new( + let field = Field::new( name, to_arrow_datatype(&info.field_datatypes[idx]), info.field_nullable[idx], - ) + ); + // Attach Spark field metadata (currently parquet.field.id) when present. + // field_metadata is parallel to field_names; either empty or full length. + if let Some(meta) = info.field_metadata.get(idx) { + if !meta.metadata.is_empty() { + return field.with_metadata(meta.metadata.clone()); + } + } + field }) .collect(); ArrowDataType::Struct(fields) diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index 3d61251447..5de14aa610 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -515,6 +515,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat return_null_struct_if_all_fields_missing != JNI_FALSE, session_ctx, encryption_enabled, + // The iceberg-compat path resolves IDs in the JVM via NativeBatchReader, + // so the native side does not need to do field-ID matching here. + false, + false, )?; let partition_index: usize = 0; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index e67700e629..29b792e72d 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -74,14 +74,18 @@ pub(crate) fn init_datasource_exec( return_null_struct_if_all_fields_missing: bool, session_ctx: &Arc, encryption_enabled: bool, + use_field_id: bool, + ignore_missing_field_id: bool, ) -> Result, ExecutionError> { - let (table_parquet_options, spark_parquet_options) = get_options( + let (table_parquet_options, mut spark_parquet_options) = get_options( session_timezone, case_sensitive, return_null_struct_if_all_fields_missing, &object_store_url, encryption_enabled, ); + spark_parquet_options.use_field_id = use_field_id; + spark_parquet_options.ignore_missing_field_id = ignore_missing_field_id; // Determine the schema and projection to use for ParquetSource. // When data_schema is provided, use it as the base schema so DataFusion knows the full diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 1e0c64ea4b..4a48aaca28 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -38,6 +38,7 @@ use datafusion_comet_spark_expr::EvalMode; use log::debug; use object_store::path::Path; use object_store::{parse_url, ObjectStore}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use std::collections::HashMap; use std::sync::OnceLock; use std::time::Duration; @@ -84,6 +85,14 @@ pub struct SparkParquetOptions { /// legacy behavior); false preserves the parent struct's nullness from the file /// so non-null parents return a struct of all-null fields. pub return_null_struct_if_all_fields_missing: bool, + /// When true, resolve fields by parquet.field.id metadata instead of name + /// (mirrors Spark's `spark.sql.parquet.fieldId.read.enabled`). Only takes effect + /// when both physical and logical fields actually carry IDs. + pub use_field_id: bool, + /// When false (Spark's default), reading a file that has no field ids while the + /// requested schema does carry ids raises a runtime error rather than silently + /// producing nulls (mirrors `spark.sql.parquet.fieldId.read.ignoreMissing`). + pub ignore_missing_field_id: bool, } impl SparkParquetOptions { @@ -97,6 +106,8 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, + use_field_id: false, + ignore_missing_field_id: false, } } @@ -110,6 +121,8 @@ impl SparkParquetOptions { use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, + use_field_id: false, + ignore_missing_field_id: false, } } } @@ -240,6 +253,14 @@ fn parquet_convert_array( } } +/// Read the Parquet field id stored under arrow-rs's `PARQUET_FIELD_ID_META_KEY`. +fn field_id(field: &arrow::datatypes::Field) -> Option { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|v| v.parse::().ok()) +} + /// Cast between struct types based on logic in /// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`. fn parquet_convert_struct_to_struct( @@ -250,39 +271,60 @@ fn parquet_convert_struct_to_struct( ) -> DataFusionResult { match (from_type, to_type) { (DataType::Struct(from_fields), DataType::Struct(to_fields)) => { - // if dest and target schemas has any column in common - let mut field_overlap = false; - // TODO some of this logic may be specific to converting Parquet to Spark - let mut field_name_to_index_map = HashMap::new(); - for (i, field) in from_fields.iter().enumerate() { + // Match `from` (file) fields to `to` (logical) fields. Mirrors Spark's + // `clipParquetGroupFields`: when the logical struct carries Parquet field IDs + // anywhere, ID-bearing logical fields match ONLY by ID; non-ID-bearing fields + // fall back to name match. When no logical field carries an ID, fall back to + // name match across the board. + let should_match_by_id = + parquet_options.use_field_id && to_fields.iter().any(|f| field_id(f).is_some()); + + let from_id_to_index: HashMap = if should_match_by_id { + let mut map = HashMap::new(); + for (i, field) in from_fields.iter().enumerate() { + if let Some(id) = field_id(field) { + map.entry(id).or_insert(i); + } + } + map + } else { + HashMap::new() + }; + + let normalize_name = |name: &str| -> String { if parquet_options.case_sensitive { - field_name_to_index_map.insert(field.name().clone(), i); + name.to_string() } else { - field_name_to_index_map.insert(field.name().to_lowercase(), i); + name.to_lowercase() } + }; + let mut field_name_to_index_map = HashMap::new(); + for (i, field) in from_fields.iter().enumerate() { + field_name_to_index_map.insert(normalize_name(field.name()), i); } assert_eq!(field_name_to_index_map.len(), from_fields.len()); + + let mut field_overlap = false; let mut cast_fields: Vec = Vec::with_capacity(to_fields.len()); - for i in 0..to_fields.len() { - // Fields in the to_type schema may not exist in the from_type schema - // i.e. the required schema may have fields that the file does not - // have - let key = if parquet_options.case_sensitive { - to_fields[i].name().clone() - } else { - to_fields[i].name().to_lowercase() + for to_field in to_fields.iter() { + let from_index = match (should_match_by_id, field_id(to_field)) { + // Spark treats a missing ID match as a missing column rather than + // falling back to name match. + (true, Some(id)) => from_id_to_index.get(&id).copied(), + _ => field_name_to_index_map + .get(&normalize_name(to_field.name())) + .copied(), }; - if field_name_to_index_map.contains_key(&key) { - let from_index = field_name_to_index_map[&key]; - let cast_field = parquet_convert_array( + + if let Some(from_index) = from_index { + cast_fields.push(parquet_convert_array( Arc::clone(array.column(from_index)), - to_fields[i].data_type(), + to_field.data_type(), parquet_options, - )?; - cast_fields.push(cast_field); + )?); field_overlap = true; } else { - cast_fields.push(new_null_array(to_fields[i].data_type(), array.len())); + cast_fields.push(new_null_array(to_field.data_type(), array.len())); } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 4e68902585..40cea642ac 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -17,7 +17,7 @@ use crate::parquet::cast_column::CometCastColumnExpr; use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::common::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_expr::expressions::Column; @@ -30,6 +30,7 @@ use datafusion_physical_expr_adapter::{ replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, }; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use std::collections::HashMap; use std::sync::Arc; @@ -59,39 +60,176 @@ impl SparkPhysicalExprAdapterFactory { } } -/// Remap physical schema field names to match logical schema field names using -/// case-insensitive matching. This allows the DefaultPhysicalExprAdapter (which -/// uses exact name matching) to correctly find columns when the parquet file has -/// different casing than the table schema (e.g., file has "a" but table has "A"). -fn remap_physical_schema_names( +/// Read the Parquet field id stored under arrow-rs's `PARQUET_FIELD_ID_META_KEY`. +fn parse_field_id(field: &Field) -> Option { + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|v| v.parse::().ok()) +} + +fn schema_has_field_ids(schema: &SchemaRef) -> bool { + schema.fields().iter().any(|f| parse_field_id(f).is_some()) +} + +/// Remap physical schema field names to match logical schema field names. Mirrors Spark's +/// `clipParquetGroupFields`: prefer ID match for any logical field that carries a +/// `PARQUET:field_id`, fall back to case-insensitive name match otherwise. +/// +/// The remap only changes top-level field NAMES so that `DefaultPhysicalExprAdapter`'s +/// exact-name lookup hits. Indices, types, nullability, and metadata stay as in the file. +/// Returns the rewritten schema and a `logical_name -> original_physical_name` map used +/// downstream to restore the original physical names before stream consumption. +fn remap_physical_schema( logical_schema: &SchemaRef, physical_schema: &SchemaRef, -) -> SchemaRef { - let remapped_fields: Vec<_> = physical_schema + case_sensitive: bool, + use_field_id: bool, + ignore_missing_field_id: bool, +) -> DataFusionResult<(SchemaRef, HashMap)> { + let should_match_by_id = use_field_id && schema_has_field_ids(logical_schema); + + if should_match_by_id && !ignore_missing_field_id && !schema_has_field_ids(physical_schema) { + // Mirrors `ParquetReadSupport.inferSchema`'s eager check (Spark throws a runtime + // error rather than silently returning null columns). + return Err(DataFusionError::External(Box::new( + SparkError::ParquetMissingFieldIds, + ))); + } + + // Build id -> all matching physical field names. We need the full list so we can mirror + // Spark's `_LEGACY_ERROR_TEMP_2094` "Found duplicate field(s)" error when an ID-bearing + // logical field would resolve to more than one physical field. + let mut id_to_phys_names: HashMap> = HashMap::new(); + if should_match_by_id { + for pf in physical_schema.fields() { + if let Some(id) = parse_field_id(pf) { + id_to_phys_names + .entry(id) + .or_default() + .push(pf.name().clone()); + } + } + for lf in logical_schema.fields() { + if let Some(id) = parse_field_id(lf) { + if let Some(matches) = id_to_phys_names.get(&id) { + if matches.len() > 1 { + return Err(DataFusionError::External(Box::new( + SparkError::DuplicateFieldByFieldId { + required_id: id, + matched_fields: matches.join(", "), + }, + ))); + } + } + } + } + } + + // Pre-build id -> first matching logical field for the per-physical rename pass below. + let id_to_logical: HashMap = if should_match_by_id { + let mut map = HashMap::new(); + for lf in logical_schema.fields() { + if let Some(id) = parse_field_id(lf) { + map.entry(id).or_insert(lf); + } + } + map + } else { + HashMap::new() + }; + + // Names of ID-bearing logical fields whose ID is not present in the file. Any physical + // field that shares one of these names must be renamed to something the + // `DefaultPhysicalExprAdapter` cannot name-match, otherwise the read would silently fall + // through to a name match. Spark's `matchIdField` solves the same problem with + // `generateFakeColumnName` (see `ParquetReadSupport.scala`). + let unmatched_id_logical_names: std::collections::HashSet = if should_match_by_id { + logical_schema + .fields() + .iter() + .filter_map(|lf| { + parse_field_id(lf).and_then(|id| { + if id_to_phys_names.contains_key(&id) { + None + } else { + Some(lf.name().clone()) + } + }) + }) + .collect() + } else { + std::collections::HashSet::new() + }; + let mut fake_counter: usize = 0; + + let mut name_map: HashMap = HashMap::new(); + let remapped_fields: Vec = physical_schema .fields() .iter() .map(|field| { - if let Some(logical_field) = logical_schema - .fields() - .iter() - .find(|lf| lf.name().eq_ignore_ascii_case(field.name())) + // ID match first when the logical schema is ID-bearing. + if should_match_by_id { + if let Some(phys_id) = parse_field_id(field) { + if let Some(logical_field) = id_to_logical.get(&phys_id) { + if logical_field.name() != field.name() { + name_map.insert(logical_field.name().clone(), field.name().clone()); + return Arc::new( + Field::new( + logical_field.name(), + field.data_type().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + ); + } + return Arc::clone(field); + } + } + } + + // Block accidental name match for ID-bearing logical fields whose ID is missing + // from the file. Mirrors Spark's `generateFakeColumnName` in `matchIdField`. + if should_match_by_id + && unmatched_id_logical_names + .iter() + .any(|name| name.eq_ignore_ascii_case(field.name())) { - if logical_field.name() != field.name() { - Arc::new(Field::new( - logical_field.name(), - field.data_type().clone(), - field.is_nullable(), - )) - } else { - Arc::clone(field) + fake_counter += 1; + let fake_name = format!("__comet_unmatched_field_id_{}", fake_counter); + return Arc::new( + Field::new(fake_name, field.data_type().clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()), + ); + } + + // Name match. Spark's `matchIdField` does not fall through to a name match for + // ID-bearing logical fields, so skip those when the schema is ID-bearing. + if !case_sensitive { + let logical_field = logical_schema.fields().iter().find(|lf| { + let lf_has_id = should_match_by_id && parse_field_id(lf).is_some(); + !lf_has_id && lf.name().eq_ignore_ascii_case(field.name()) + }); + if let Some(logical_field) = logical_field { + if logical_field.name() != field.name() { + name_map.insert(logical_field.name().clone(), field.name().clone()); + return Arc::new( + Field::new( + logical_field.name(), + field.data_type().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()), + ); + } } - } else { - Arc::clone(field) } + + Arc::clone(field) }) .collect(); - Arc::new(Schema::new(remapped_fields)) + Ok((Arc::new(Schema::new(remapped_fields)), name_map)) } /// Check if a specific column name has duplicate matches in the physical schema @@ -117,34 +255,28 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { logical_file_schema: SchemaRef, physical_file_schema: SchemaRef, ) -> DataFusionResult> { - // When case-insensitive, remap physical schema field names to match logical - // field names. The DefaultPhysicalExprAdapter uses exact name matching, so - // without this remapping, columns like "a" won't match logical "A" and will - // be filled with nulls. + // Remap physical schema field names to match logical names by Parquet field id + // (when the logical schema carries IDs and `use_field_id` is set) and/or by + // case-insensitive name match. The DefaultPhysicalExprAdapter uses exact name + // matching, so without this remapping, columns whose file names differ from the + // logical names won't match and will be filled with NULLs. // - // We also build a reverse map (logical name -> physical name) so that after - // the default adapter produces expressions, we can remap column names back + // We also keep a reverse map (logical name -> original physical name) so that + // after the default adapter produces expressions, we can remap column names back // to the original physical names. This is necessary because downstream code - // (reassign_expr_columns) looks up columns by name in the actual stream - // schema, which uses the original physical file column names. + // (reassign_expr_columns) looks up columns by name in the actual stream schema, + // which uses the original physical file column names. + let needs_remap = !self.parquet_options.case_sensitive + || (self.parquet_options.use_field_id && schema_has_field_ids(&logical_file_schema)); let (adapted_physical_schema, logical_to_physical_names, original_physical_schema) = - if !self.parquet_options.case_sensitive { - let logical_to_physical: HashMap = logical_file_schema - .fields() - .iter() - .filter_map(|logical_field| { - physical_file_schema - .fields() - .iter() - .find(|pf| { - pf.name().eq_ignore_ascii_case(logical_field.name()) - && pf.name() != logical_field.name() - }) - .map(|pf| (logical_field.name().clone(), pf.name().clone())) - }) - .collect(); - let remapped = - remap_physical_schema_names(&logical_file_schema, &physical_file_schema); + if needs_remap { + let (remapped, logical_to_physical) = remap_physical_schema( + &logical_file_schema, + &physical_file_schema, + self.parquet_options.case_sensitive, + self.parquet_options.use_field_id, + self.parquet_options.ignore_missing_field_id, + )?; ( remapped, if logical_to_physical.is_empty() { @@ -152,8 +284,13 @@ impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { } else { Some(logical_to_physical) }, - // Keep original physical schema for per-column duplicate detection - Some(Arc::clone(&physical_file_schema)), + // Keep original physical schema for per-column duplicate detection. + // Only meaningful in case-insensitive mode (matches existing behavior). + if !self.parquet_options.case_sensitive { + Some(Arc::clone(&physical_file_schema)) + } else { + None + }, ) } else { (Arc::clone(&physical_file_schema), None, None) diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 9afb26470c..7cefe06da7 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -74,6 +74,9 @@ message SparkStructField { string name = 1; spark.spark_expression.DataType data_type = 2; bool nullable = 3; + // Spark field metadata. Currently used to carry parquet.field.id through + // to the native side. Empty when not needed. + map metadata = 4; } message Scan { @@ -112,6 +115,12 @@ message NativeScanCommon { // pre-4.1 behavior); false preserves the parent struct's nullness from the file // so non-null parents return a struct of all-null fields. bool return_null_struct_if_all_fields_missing = 14; + // True when spark.sql.parquet.fieldId.read.enabled is set and the requested + // schema actually carries parquet.field.id metadata. When false the native + // scan keeps its existing name-based path with no extra work. + bool use_field_id = 15; + // True when spark.sql.parquet.fieldId.read.ignoreMissing is set. + bool ignore_missing_field_id = 16; } message NativeScan { diff --git a/native/proto/src/proto/types.proto b/native/proto/src/proto/types.proto index 2fd3d59a73..fec972a8f0 100644 --- a/native/proto/src/proto/types.proto +++ b/native/proto/src/proto/types.proto @@ -91,6 +91,14 @@ message DataType { repeated string field_names = 1; repeated DataType field_datatypes = 2; repeated bool field_nullable = 3; + // Parallel to field_names. Each entry holds Spark field metadata for the + // corresponding nested field. Currently used to carry parquet.field.id + // through to the native side. Empty when no fields carry metadata. + repeated FieldMetadata field_metadata = 4; + } + + message FieldMetadata { + map metadata = 1; } DataTypeInfo type_info = 2; diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 1c9ec98a7a..aee11d4ce4 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefa import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SparkPlan, SubqueryAdaptiveBroadcastExec} import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf @@ -237,11 +236,6 @@ case class CometScanRule(session: SparkSession) withInfo(scanExec, "Native DataFusion scan does not support row index generation") return None } - if (session.sessionState.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) && - ParquetUtils.hasFieldIds(scanExec.requiredSchema)) { - withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID matching") - return None - } if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) { return None } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e73cf12f79..e4eae767d8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -29,12 +29,14 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.comet.DecimalPrecision import org.apache.spark.sql.execution.{ScalarSubquery, SparkPlan} +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions._ +import org.apache.comet.parquet.CometParquetUtils import org.apache.comet.serde.ExprOuterClass.{AggExpr, Expr, ScalarFunc} import org.apache.comet.serde.Types.{DataType => ProtoDataType} import org.apache.comet.serde.Types.DataType._ @@ -454,6 +456,21 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim { struct.addAllFieldDatatypes(fieldDatatypes.map(_.get).asJava) struct.addAllFieldNullable(fieldNullable) + val fieldIds = s.fields.map { f => + if (ParquetUtils.hasFieldId(f)) Some(ParquetUtils.getFieldId(f)) else None + } + if (fieldIds.exists(_.isDefined)) { + // Emit one FieldMetadata entry per nested field, parallel to field_names. Entries + // for fields without an ID are empty so the slot index stays aligned. + fieldIds.foreach { idOpt => + val metaBuilder = Types.DataType.FieldMetadata.newBuilder() + idOpt.foreach { id => + metaBuilder.putMetadata(CometParquetUtils.PARQUET_FIELD_ID_META_KEY, id.toString) + } + struct.addFieldMetadata(metaBuilder.build()) + } + } + info.setStruct(struct) builder.setTypeInfo(info.build()).build() case _ => builder.build() diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index 066b770bbb..4c5c8e8fcc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet.{CometNativeExec, CometNativeScanExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, InSubqueryExec, SubqueryAdaptiveBroadcastExec} +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf import org.apache.comet.{CometConf, ConfigEntry} @@ -200,6 +201,16 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.setReturnNullStructIfAllFieldsMissing( scan.conf.getConfString(returnNullStructConfKey, returnNullStructDefault).toBoolean) + // Field-ID matching: only ask the native side to do extra work when the conf is on AND + // the requested schema actually carries IDs. Spark's ParquetReadSupport applies the same + // gate before invoking matchIdField. + val useFieldId = + scan.conf.getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED) && + ParquetUtils.hasFieldIds(scan.requiredSchema) + commonBuilder.setUseFieldId(useFieldId) + commonBuilder.setIgnoreMissingFieldId( + scan.conf.getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)) + // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala index 7b811d09e7..4bdfc58046 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/package.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/package.scala @@ -21,8 +21,10 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources.FilePartition +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.comet.parquet.CometParquetUtils import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} package object operator { @@ -33,6 +35,12 @@ package object operator { fieldBuilder.setName(field.name) fieldBuilder.setDataType(serializeDataType(field.dataType).get) fieldBuilder.setNullable(field.nullable) + fieldBuilder.clearMetadata() + if (ParquetUtils.hasFieldId(field)) { + fieldBuilder.putMetadata( + CometParquetUtils.PARQUET_FIELD_ID_META_KEY, + ParquetUtils.getFieldId(field).toString) + } fieldBuilder.build() } } diff --git a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 9810b24be1..14062778d7 100644 --- a/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.4/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -293,6 +293,26 @@ trait ShimSparkErrorConverter { params("requiredFieldName").toString, params("matchedOrcFields").toString)) + case "DuplicateFieldByFieldId" => + // Mirror Spark's `ParquetReadSupport.matchIdField` which calls + // `foundDuplicateFieldInFieldIdLookupModeError` when more than one Parquet field + // shares an id requested by the read schema. + Some( + QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + params("requiredId").toString.toInt, + params("matchedFields").toString)) + + case "ParquetMissingFieldIds" => + // Mirror Spark's `ParquetReadSupport.inferSchema`, which throws a plain + // `RuntimeException` (not a SparkException) when the read schema requests field + // ids and the file carries none. + Some( + new RuntimeException( + "Spark read schema expects field Ids, but Parquet file schema doesn't " + + "contain any field Ids. Please remove the field ids from Spark schema or " + + "ignore missing ids by setting " + + "`spark.sql.parquet.fieldId.read.ignoreMissing = true`")) + case "ParquetSchemaConvert" => // Mirror Spark's FileScanRDD: wrap the SchemaColumnConvertNotSupportedException // in a SparkException whose message is "Parquet column cannot be converted in diff --git a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index 6cceb8bfc8..3853064fb2 100644 --- a/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-3.5/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -289,6 +289,26 @@ trait ShimSparkErrorConverter { params("requiredFieldName").toString, params("matchedOrcFields").toString)) + case "DuplicateFieldByFieldId" => + // Mirror Spark's `ParquetReadSupport.matchIdField` which calls + // `foundDuplicateFieldInFieldIdLookupModeError` when more than one Parquet field + // shares an id requested by the read schema. + Some( + QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + params("requiredId").toString.toInt, + params("matchedFields").toString)) + + case "ParquetMissingFieldIds" => + // Mirror Spark's `ParquetReadSupport.inferSchema`, which throws a plain + // `RuntimeException` (not a SparkException) when the read schema requests field + // ids and the file carries none. + Some( + new RuntimeException( + "Spark read schema expects field Ids, but Parquet file schema doesn't " + + "contain any field Ids. Please remove the field ids from Spark schema or " + + "ignore missing ids by setting " + + "`spark.sql.parquet.fieldId.read.ignoreMissing = true`")) + case "ParquetSchemaConvert" => // Mirror Spark's FileScanRDD: wrap the SchemaColumnConvertNotSupportedException // in a SparkException whose message is "Parquet column cannot be converted in diff --git a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala index bdb10fa2f2..851016601b 100644 --- a/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala +++ b/spark/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimSparkErrorConverter.scala @@ -295,6 +295,31 @@ trait ShimSparkErrorConverter { params("requiredFieldName").toString, params("matchedOrcFields").toString)) + case "DuplicateFieldByFieldId" => + // Mirror Spark's `ParquetReadSupport.matchIdField` which calls + // `foundDuplicateFieldInFieldIdLookupModeError`. Wrap in FAILED_READ_FILE.NO_HINT + // so the outer exception is a SparkException (matches what Spark's `FileScanRDD` + // produces for stock parquet-mr at task boundary on 4.x); keeps the underlying + // SparkRuntimeException as `getCause` so tests that assert a RuntimeException + // with "Found duplicate field(s)" still pass. + val dupCause = QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError( + params("requiredId").toString.toInt, + params("matchedFields").toString) + val filePath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(dupCause, filePath)) + + case "ParquetMissingFieldIds" => + // Mirror Spark's `ParquetReadSupport.inferSchema`. Same wrapping rationale as + // `DuplicateFieldByFieldId`: wrap the RuntimeException in FAILED_READ_FILE.NO_HINT + // so the outer is a SparkException. + val missingCause = new RuntimeException( + "Spark read schema expects field Ids, but Parquet file schema doesn't " + + "contain any field Ids. Please remove the field ids from Spark schema or " + + "ignore missing ids by setting " + + "`spark.sql.parquet.fieldId.read.ignoreMissing = true`") + val missingPath = params.get("filePath").map(_.toString).getOrElse("") + Some(QueryExecutionErrors.cannotReadFilesError(missingCause, missingPath)) + case "ParquetSchemaConvert" => // Mirror Spark 4.0's FileDataSourceV2: wrap the // SchemaColumnConvertNotSupportedException in a FAILED_READ_FILE diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 922a4255c4..3a634bb2b0 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1419,6 +1419,177 @@ abstract class ParquetReadSuite extends CometTestBase { } } } + + // Based on Spark ParquetFieldIdIOSuite.test("Parquet reads infer fields using field ids + // correctly"). Forces SCAN_NATIVE_DATAFUSION so we can prove that the gate in CometScanRule + // is removed and that the native_datafusion scan resolves columns by field id rather than by + // name (the read schema names differ from what is in the file). + test("native_datafusion: read by Parquet field id when names differ") { + val writeSchema = StructType( + Seq( + StructField("random", IntegerType, nullable = true, withId(1)), + StructField("name", StringType, nullable = true, withId(0)))) + val readSchema = StructType( + Seq( + StructField("a", StringType, nullable = true, withId(0)), + StructField("b", IntegerType, nullable = true, withId(1)))) + val writeData = Seq(Row(100, "text"), Row(200, "more")) + + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + spark + .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + checkSparkAnswerAndOperator(df) + } + } + } + + // Based on Spark ParquetFieldIdIOSuite.test("SPARK-38094: absence of field ids: reading nested + // schema"). Exercises ID matching at every nesting level (struct, array, map) under + // SCAN_NATIVE_DATAFUSION. Names differ from the file at every level. + test("native_datafusion: read nested types by Parquet field id when names differ") { + val writeSchema = StructType( + Seq(StructField( + "outer", + StructType(Seq( + StructField("inner_a", IntegerType, nullable = true, withId(11)), + StructField( + "inner_arr", + ArrayType(StructType(Seq( + StructField("ea", StringType, nullable = true, withId(21)), + StructField("eb", IntegerType, nullable = true, withId(22))))), + nullable = true, + withId(12)), + StructField( + "inner_map", + MapType(StringType, IntegerType, valueContainsNull = true), + nullable = true, + withId(13)))), + nullable = true, + withId(1)))) + + val readSchema = StructType( + Seq(StructField( + "renamed_outer", + StructType(Seq( + StructField("renamed_a", IntegerType, nullable = true, withId(11)), + StructField( + "renamed_arr", + ArrayType(StructType(Seq( + StructField("renamed_ea", StringType, nullable = true, withId(21)), + StructField("renamed_eb", IntegerType, nullable = true, withId(22))))), + nullable = true, + withId(12)), + StructField( + "renamed_map", + MapType(StringType, IntegerType, valueContainsNull = true), + nullable = true, + withId(13)))), + nullable = true, + withId(1)))) + + val data = Seq( + Row(Row(1, Seq(Row("x", 10), Row("y", 20)), Map("k1" -> 100))), + Row(Row(2, Seq(Row("z", 30)), Map("k2" -> 200, "k3" -> 300)))) + + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + spark + .createDataFrame(spark.sparkContext.parallelize(data), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + val df = spark.read.schema(readSchema).parquet(dir.getCanonicalPath) + checkSparkAnswerAndOperator(df) + } + } + } + + // Verbatim port of Spark `ParquetFieldIdIOSuite.test("multiple id matches")`, pinned to + // `SCAN_NATIVE_DATAFUSION` so the shim error path is exercised on both 3.x and 4.x. + // The stock suite is the CI signal but it requires the Spark test jars and + // `withAllParquetReaders`; keeping a copy here lets us iterate locally. + test("multiple id matches") { + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + val readSchema = + new StructType() + .add("a", IntegerType, true, withId(1)) + + val writeSchema = + new StructType() + .add("a", IntegerType, true, withId(1)) + .add("rand1", StringType, true, withId(2)) + .add("rand2", StringType, true, withId(1)) + + val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr")) + spark + .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + + val cause = intercept[SparkException] { + spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect() + }.getCause + assert( + cause.isInstanceOf[RuntimeException] && + cause.getMessage.contains("Found duplicate field(s)")) + } + } + } + + // Verbatim port of Spark `ParquetFieldIdIOSuite.test("read parquet file without ids")`, + // pinned to `SCAN_NATIVE_DATAFUSION` for the same reason as the duplicate-id test above. + test("read parquet file without ids") { + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true") { + withTempPath { dir => + val readSchema = + new StructType() + .add("a", IntegerType, true, withId(1)) + + val writeSchema = + new StructType() + .add("a", IntegerType, true) + .add("rand1", StringType, true) + .add("rand2", StringType, true) + + val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr")) + spark + .createDataFrame(spark.sparkContext.parallelize(writeData), writeSchema) + .write + .mode("overwrite") + .parquet(dir.getCanonicalPath) + + Seq(readSchema, readSchema.add("b", StringType, true)).foreach { schema => + val cause = intercept[SparkException] { + spark.read.schema(schema).parquet(dir.getCanonicalPath).collect() + }.getCause + assert( + cause.isInstanceOf[RuntimeException] && + cause.getMessage.contains("Parquet file schema doesn't contain any field Ids")) + val expectedValues = (1 to schema.length).map(_ => null) + withSQLConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key -> "true") { + checkAnswer( + spark.read.schema(schema).parquet(dir.getCanonicalPath), + Row(expectedValues: _*) :: Row(expectedValues: _*) :: Nil) + } + } + } + } + } } class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper {