Skip to content
Open
1 change: 1 addition & 0 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,7 @@ mod tests {
project_field_ids: vec![2, 3],
predicate: None,
deletes: vec![pos_del, eq_del],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand All @@ -423,6 +424,7 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_3],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -478,6 +480,7 @@ pub(crate) mod tests {
partition_spec_id: 0,
equality_ids: None,
}],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down
31 changes: 28 additions & 3 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ impl ArrowReader {
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
) -> Result<ArrowRecordBatchStream> {
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
let should_load_page_index = (row_selection_enabled && task.predicate.is_some())
|| !task.deletes.is_empty()
|| task.limit.is_some();

let delete_filter_rx =
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
Expand Down Expand Up @@ -453,6 +454,10 @@ impl ArrowReader {
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
}

if let Some(limit) = task.limit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I extended should_load_page_index logic and ArrowReaderOptions is initialized with with_page_index(should_load_page_index).

record_batch_stream_builder = record_batch_stream_builder.with_limit(limit);
}

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream =
Expand Down Expand Up @@ -486,7 +491,9 @@ impl ArrowReader {
.with_preload_page_index(should_load_page_index);

// Create the record batch stream builder, which wraps the parquet file reader
let options = arrow_reader_options.unwrap_or_default();
let options = arrow_reader_options
.unwrap_or_default()
.with_page_index(should_load_page_index);
let record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
Ok(record_batch_stream_builder)
Expand Down Expand Up @@ -2106,6 +2113,7 @@ message schema {
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -2428,6 +2436,7 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand All @@ -2445,6 +2454,7 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -2573,6 +2583,7 @@ message schema {
project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -2745,6 +2756,7 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -2963,6 +2975,7 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3174,6 +3187,7 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3278,6 +3292,7 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3376,6 +3391,7 @@ message schema {
project_field_ids: vec![1, 3],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3463,6 +3479,7 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3564,6 +3581,7 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3694,6 +3712,7 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3791,6 +3810,7 @@ message schema {
project_field_ids: vec![1, 5, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3901,6 +3921,7 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -3992,6 +4013,7 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand All @@ -4007,6 +4029,7 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand All @@ -4022,6 +4045,7 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: None,
partition_spec: None,
name_mapping: None,
Expand Down Expand Up @@ -4201,6 +4225,7 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
limit: None,
partition: Some(partition_data),
partition_spec: Some(partition_spec),
name_mapping: None,
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub(crate) struct ManifestFileContext {

field_ids: Arc<Vec<i32>>,
bound_predicates: Option<Arc<BoundPredicates>>,
limit: Option<usize>,
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
Expand All @@ -60,6 +61,7 @@ pub(crate) struct ManifestEntryContext {
pub partition_spec_id: i32,
pub snapshot_schema: SchemaRef,
pub delete_file_index: DeleteFileIndex,
pub limit: Option<usize>,
pub case_sensitive: bool,
}

Expand All @@ -76,6 +78,7 @@ impl ManifestFileContext {
mut sender,
expression_evaluator_cache,
delete_file_index,
limit,
..
} = self;

Expand All @@ -91,6 +94,7 @@ impl ManifestFileContext {
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
delete_file_index: delete_file_index.clone(),
limit,
case_sensitive: self.case_sensitive,
};

Expand Down Expand Up @@ -132,6 +136,7 @@ impl ManifestEntryContext {

deletes,

limit: self.limit,
// Include partition data and spec from manifest entry
partition: Some(self.manifest_entry.data_file.partition.clone()),
// TODO: Pass actual PartitionSpec through context chain for native flow
Expand All @@ -153,6 +158,7 @@ pub(crate) struct PlanContext {
pub snapshot_schema: SchemaRef,
pub case_sensitive: bool,
pub predicate: Option<Arc<Predicate>>,
pub limit: Option<usize>,
pub snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
pub object_cache: Arc<ObjectCache>,
pub field_ids: Arc<Vec<i32>>,
Expand Down Expand Up @@ -276,6 +282,7 @@ impl PlanContext {
manifest_file: manifest_file.clone(),
bound_predicates,
sender,
limit: self.limit,
object_cache: self.object_cache.clone(),
snapshot_schema: self.snapshot_schema.clone(),
field_ids: self.field_ids.clone(),
Expand Down
Loading
Loading