Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ object CometParquetUtils {
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"

// Field-metadata key arrow-rs writes when it lifts Parquet field IDs into the Arrow schema
// (`parquet::arrow::PARQUET_FIELD_ID_META_KEY`). Spark's local key for the same concept is
// `parquet.field.id` (`ParquetUtils.FIELD_ID_METADATA_KEY`). The serde translates at the proto
// boundary so the native side can match the same key it gets from arrow-rs.
val PARQUET_FIELD_ID_META_KEY = "PARQUET:field_id"

// Map of encryption configuration key-value pairs that, if present, are only supported with
// these specific values. Generally, these are the default values that won't be present,
// but if they are present we want to check them.
Expand Down
41 changes: 41 additions & 0 deletions native/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,21 @@ pub enum SparkError {
matched_fields: String,
},

/// Multiple Parquet fields share the same field id when the read schema requested an
/// id-based lookup. Mirrors Spark's `_LEGACY_ERROR_TEMP_2094`
/// (`foundDuplicateFieldInFieldIdLookupModeError`).
#[error("[_LEGACY_ERROR_TEMP_2094] Found duplicate field(s) by id: id={required_id} matches [{matched_fields}] in id-lookup mode")]
DuplicateFieldByFieldId {
required_id: i32,
matched_fields: String,
},

/// The read schema requests Parquet field-id matching but the file carries no field ids.
/// Mirrors the runtime error raised in Spark's `ParquetReadSupport` when
/// `spark.sql.parquet.fieldId.read.ignoreMissing` is false.
#[error("Spark read schema expects field Ids, but Parquet file schema doesn't contain any field Ids. Please remove the field ids from Spark schema or ignore missing ids by setting `spark.sql.parquet.fieldId.read.ignoreMissing = true`")]
ParquetMissingFieldIds,

/// Schema mismatch when reading a Parquet column under a requested schema
/// that's incompatible with the physical column type. Translated by the JVM
/// shim into Spark's `SchemaColumnConvertNotSupportedException`. The
Expand Down Expand Up @@ -273,6 +288,8 @@ impl SparkError {
SparkError::ScalarSubqueryTooManyRows => "ScalarSubqueryTooManyRows",
SparkError::FileNotFound { .. } => "FileNotFound",
SparkError::DuplicateFieldCaseInsensitive { .. } => "DuplicateFieldCaseInsensitive",
SparkError::DuplicateFieldByFieldId { .. } => "DuplicateFieldByFieldId",
SparkError::ParquetMissingFieldIds => "ParquetMissingFieldIds",
SparkError::ParquetSchemaConvert { .. } => "ParquetSchemaConvert",
SparkError::Arrow(_) => "Arrow",
SparkError::Internal(_) => "Internal",
Expand Down Expand Up @@ -489,6 +506,15 @@ impl SparkError {
"matchedOrcFields": matched_fields,
})
}
SparkError::DuplicateFieldByFieldId {
required_id,
matched_fields,
} => {
serde_json::json!({
"requiredId": required_id,
"matchedFields": matched_fields,
})
}
SparkError::ParquetSchemaConvert {
file_path,
column,
Expand Down Expand Up @@ -577,6 +603,15 @@ impl SparkError {
"org/apache/spark/SparkRuntimeException"
}

// DuplicateFieldByFieldId - converted to SparkRuntimeException by the shim
// (Spark's `foundDuplicateFieldInFieldIdLookupModeError` returns SparkRuntimeException)
SparkError::DuplicateFieldByFieldId { .. } => "org/apache/spark/SparkRuntimeException",

// ParquetMissingFieldIds - converted to a plain RuntimeException by the shim,
// matching the `RuntimeException` Spark's ParquetReadSupport throws when the
// file lacks field ids and `spark.sql.parquet.fieldId.read.ignoreMissing=false`.
SparkError::ParquetMissingFieldIds => "java/lang/RuntimeException",

// ParquetSchemaConvert - converted to SchemaColumnConvertNotSupportedException by the shim
SparkError::ParquetSchemaConvert { .. } => {
"org/apache/spark/sql/execution/datasources/SchemaColumnConvertNotSupportedException"
Expand Down Expand Up @@ -661,6 +696,12 @@ impl SparkError {
// Duplicate field in case-insensitive mode
SparkError::DuplicateFieldCaseInsensitive { .. } => Some("_LEGACY_ERROR_TEMP_2093"),

// Duplicate field id in id-lookup mode
SparkError::DuplicateFieldByFieldId { .. } => Some("_LEGACY_ERROR_TEMP_2094"),

// ParquetMissingFieldIds is a plain RuntimeException with no error class.
SparkError::ParquetMissingFieldIds => None,

// Parquet schema mismatch — translated to SchemaColumnConvertNotSupportedException
// by the JVM shim. The shim wraps it in the version-appropriate
// SparkException error class, so no error class is exposed here.
Expand Down
11 changes: 9 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,8 @@ impl PhysicalPlanner {
common.return_null_struct_if_all_fields_missing,
self.session_ctx(),
common.encryption_enabled,
common.use_field_id,
common.ignore_missing_field_id,
)?;
Ok((
vec![],
Expand Down Expand Up @@ -3018,11 +3020,16 @@ fn convert_spark_types_to_arrow_schema(
let arrow_fields = spark_types
.iter()
.map(|spark_type| {
Field::new(
let field = Field::new(
String::clone(&spark_type.name),
to_arrow_datatype(spark_type.data_type.as_ref().unwrap()),
spark_type.nullable,
)
);
if spark_type.metadata.is_empty() {
field
} else {
field.with_metadata(spark_type.metadata.clone())
}
})
.collect_vec();
let arrow_schema: SchemaRef = Arc::new(Schema::new(arrow_fields));
Expand Down
12 changes: 10 additions & 2 deletions native/core/src/execution/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,19 @@ pub fn to_arrow_datatype(dt_value: &DataType) -> ArrowDataType {
.iter()
.enumerate()
.map(|(idx, name)| {
Field::new(
let field = Field::new(
name,
to_arrow_datatype(&info.field_datatypes[idx]),
info.field_nullable[idx],
)
);
// Attach Spark field metadata (currently parquet.field.id) when present.
// field_metadata is parallel to field_names; either empty or full length.
if let Some(meta) = info.field_metadata.get(idx) {
if !meta.metadata.is_empty() {
return field.with_metadata(meta.metadata.clone());
}
}
field
})
.collect();
ArrowDataType::Struct(fields)
Expand Down
4 changes: 4 additions & 0 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
return_null_struct_if_all_fields_missing != JNI_FALSE,
session_ctx,
encryption_enabled,
// The iceberg-compat path resolves IDs in the JVM via NativeBatchReader,
// so the native side does not need to do field-ID matching here.
false,
false,
)?;

let partition_index: usize = 0;
Expand Down
6 changes: 5 additions & 1 deletion native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,18 @@ pub(crate) fn init_datasource_exec(
return_null_struct_if_all_fields_missing: bool,
session_ctx: &Arc<SessionContext>,
encryption_enabled: bool,
use_field_id: bool,
ignore_missing_field_id: bool,
) -> Result<Arc<DataSourceExec>, ExecutionError> {
let (table_parquet_options, spark_parquet_options) = get_options(
let (table_parquet_options, mut spark_parquet_options) = get_options(
session_timezone,
case_sensitive,
return_null_struct_if_all_fields_missing,
&object_store_url,
encryption_enabled,
);
spark_parquet_options.use_field_id = use_field_id;
spark_parquet_options.ignore_missing_field_id = ignore_missing_field_id;

// Determine the schema and projection to use for ParquetSource.
// When data_schema is provided, use it as the base schema so DataFusion knows the full
Expand Down
86 changes: 64 additions & 22 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_comet_spark_expr::EvalMode;
use log::debug;
use object_store::path::Path;
use object_store::{parse_url, ObjectStore};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use std::collections::HashMap;
use std::sync::OnceLock;
use std::time::Duration;
Expand Down Expand Up @@ -84,6 +85,14 @@ pub struct SparkParquetOptions {
/// legacy behavior); false preserves the parent struct's nullness from the file
/// so non-null parents return a struct of all-null fields.
pub return_null_struct_if_all_fields_missing: bool,
/// When true, resolve fields by parquet.field.id metadata instead of name
/// (mirrors Spark's `spark.sql.parquet.fieldId.read.enabled`). Only takes effect
/// when both physical and logical fields actually carry IDs.
pub use_field_id: bool,
/// When false (Spark's default), reading a file that has no field ids while the
/// requested schema does carry ids raises a runtime error rather than silently
/// producing nulls (mirrors `spark.sql.parquet.fieldId.read.ignoreMissing`).
pub ignore_missing_field_id: bool,
}

impl SparkParquetOptions {
Expand All @@ -97,6 +106,8 @@ impl SparkParquetOptions {
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
return_null_struct_if_all_fields_missing: true,
use_field_id: false,
ignore_missing_field_id: false,
}
}

Expand All @@ -110,6 +121,8 @@ impl SparkParquetOptions {
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
return_null_struct_if_all_fields_missing: true,
use_field_id: false,
ignore_missing_field_id: false,
}
}
}
Expand Down Expand Up @@ -240,6 +253,14 @@ fn parquet_convert_array(
}
}

/// Read the Parquet field id stored under arrow-rs's `PARQUET_FIELD_ID_META_KEY`.
fn field_id(field: &arrow::datatypes::Field) -> Option<i32> {
field
.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|v| v.parse::<i32>().ok())
}

/// Cast between struct types based on logic in
/// `org.apache.spark.sql.catalyst.expressions.Cast#castStruct`.
fn parquet_convert_struct_to_struct(
Expand All @@ -250,39 +271,60 @@ fn parquet_convert_struct_to_struct(
) -> DataFusionResult<ArrayRef> {
match (from_type, to_type) {
(DataType::Struct(from_fields), DataType::Struct(to_fields)) => {
// if dest and target schemas has any column in common
let mut field_overlap = false;
// TODO some of this logic may be specific to converting Parquet to Spark
let mut field_name_to_index_map = HashMap::new();
for (i, field) in from_fields.iter().enumerate() {
// Match `from` (file) fields to `to` (logical) fields. Mirrors Spark's
// `clipParquetGroupFields`: when the logical struct carries Parquet field IDs
// anywhere, ID-bearing logical fields match ONLY by ID; non-ID-bearing fields
// fall back to name match. When no logical field carries an ID, fall back to
// name match across the board.
let should_match_by_id =
parquet_options.use_field_id && to_fields.iter().any(|f| field_id(f).is_some());

let from_id_to_index: HashMap<i32, usize> = if should_match_by_id {
let mut map = HashMap::new();
for (i, field) in from_fields.iter().enumerate() {
if let Some(id) = field_id(field) {
map.entry(id).or_insert(i);
}
}
map
} else {
HashMap::new()
};

let normalize_name = |name: &str| -> String {
if parquet_options.case_sensitive {
field_name_to_index_map.insert(field.name().clone(), i);
name.to_string()
} else {
field_name_to_index_map.insert(field.name().to_lowercase(), i);
name.to_lowercase()
}
};
let mut field_name_to_index_map = HashMap::new();
for (i, field) in from_fields.iter().enumerate() {
field_name_to_index_map.insert(normalize_name(field.name()), i);
}
assert_eq!(field_name_to_index_map.len(), from_fields.len());

let mut field_overlap = false;
let mut cast_fields: Vec<ArrayRef> = Vec::with_capacity(to_fields.len());
for i in 0..to_fields.len() {
// Fields in the to_type schema may not exist in the from_type schema
// i.e. the required schema may have fields that the file does not
// have
let key = if parquet_options.case_sensitive {
to_fields[i].name().clone()
} else {
to_fields[i].name().to_lowercase()
for to_field in to_fields.iter() {
let from_index = match (should_match_by_id, field_id(to_field)) {
// Spark treats a missing ID match as a missing column rather than
// falling back to name match.
(true, Some(id)) => from_id_to_index.get(&id).copied(),
_ => field_name_to_index_map
.get(&normalize_name(to_field.name()))
.copied(),
};
if field_name_to_index_map.contains_key(&key) {
let from_index = field_name_to_index_map[&key];
let cast_field = parquet_convert_array(

if let Some(from_index) = from_index {
cast_fields.push(parquet_convert_array(
Arc::clone(array.column(from_index)),
to_fields[i].data_type(),
to_field.data_type(),
parquet_options,
)?;
cast_fields.push(cast_field);
)?);
field_overlap = true;
} else {
cast_fields.push(new_null_array(to_fields[i].data_type(), array.len()));
cast_fields.push(new_null_array(to_field.data_type(), array.len()));
}
}

Expand Down
Loading
Loading