diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 5d0b1da712..d62e3c019c 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -936,6 +936,7 @@ mod tests { project_field_ids: vec![2, 3], predicate: None, deletes: vec![pos_del, eq_del], + limit: None, partition: None, partition_spec: None, name_mapping: None, diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 4af9f6b6ff..753303f3e9 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -408,6 +408,7 @@ pub(crate) mod tests { project_field_ids: vec![], predicate: None, deletes: vec![pos_del_1, pos_del_2.clone()], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -423,6 +424,7 @@ pub(crate) mod tests { project_field_ids: vec![], predicate: None, deletes: vec![pos_del_3], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -478,6 +480,7 @@ pub(crate) mod tests { partition_spec_id: 0, equality_ids: None, }], + limit: None, partition: None, partition_spec: None, name_mapping: None, diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index aa45a12973..e26f99c0df 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -206,8 +206,9 @@ impl ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, ) -> Result { - let should_load_page_index = - (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); + let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) + || !task.deletes.is_empty() + || task.limit.is_some(); let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); @@ -453,6 +454,10 @@ impl ArrowReader { record_batch_stream_builder.with_row_groups(selected_row_group_indices); } + if let Some(limit) = task.limit { + record_batch_stream_builder = record_batch_stream_builder.with_limit(limit); + } + // Build the batch stream and send all the RecordBatches that it generates // to the requester. let record_batch_stream = @@ -486,7 +491,9 @@ impl ArrowReader { .with_preload_page_index(should_load_page_index); // Create the record batch stream builder, which wraps the parquet file reader - let options = arrow_reader_options.unwrap_or_default(); + let options = arrow_reader_options + .unwrap_or_default() + .with_page_index(should_load_page_index); let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?; Ok(record_batch_stream_builder) @@ -2106,6 +2113,7 @@ message schema { project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -2428,6 +2436,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -2445,6 +2454,7 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -2573,6 +2583,7 @@ message schema { project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -2745,6 +2756,7 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -2963,6 +2975,7 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3174,6 +3187,7 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3278,6 +3292,7 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3376,6 +3391,7 @@ message schema { project_field_ids: vec![1, 3], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3463,6 +3479,7 @@ message schema { project_field_ids: vec![1, 2, 3], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3564,6 +3581,7 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3694,6 +3712,7 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3791,6 +3810,7 @@ message schema { project_field_ids: vec![1, 5, 2], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3901,6 +3921,7 @@ message schema { project_field_ids: vec![1, 2, 3], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -3992,6 +4013,7 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -4007,6 +4029,7 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -4022,6 +4045,7 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -4201,6 +4225,7 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + limit: None, partition: Some(partition_data), partition_spec: Some(partition_spec), name_mapping: None, diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 169d8e6405..1ea156a6d6 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -42,6 +42,7 @@ pub(crate) struct ManifestFileContext { field_ids: Arc>, bound_predicates: Option>, + limit: Option, object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, @@ -60,6 +61,7 @@ pub(crate) struct ManifestEntryContext { pub partition_spec_id: i32, pub snapshot_schema: SchemaRef, pub delete_file_index: DeleteFileIndex, + pub limit: Option, pub case_sensitive: bool, } @@ -76,6 +78,7 @@ impl ManifestFileContext { mut sender, expression_evaluator_cache, delete_file_index, + limit, .. } = self; @@ -91,6 +94,7 @@ impl ManifestFileContext { bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), delete_file_index: delete_file_index.clone(), + limit, case_sensitive: self.case_sensitive, }; @@ -132,6 +136,7 @@ impl ManifestEntryContext { deletes, + limit: self.limit, // Include partition data and spec from manifest entry partition: Some(self.manifest_entry.data_file.partition.clone()), // TODO: Pass actual PartitionSpec through context chain for native flow @@ -153,6 +158,7 @@ pub(crate) struct PlanContext { pub snapshot_schema: SchemaRef, pub case_sensitive: bool, pub predicate: Option>, + pub limit: Option, pub snapshot_bound_predicate: Option>, pub object_cache: Arc, pub field_ids: Arc>, @@ -276,6 +282,7 @@ impl PlanContext { manifest_file: manifest_file.clone(), bound_predicates, sender, + limit: self.limit, object_cache: self.object_cache.clone(), snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index c055c12c9a..c627b48126 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -60,6 +60,8 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + + limit: Option, } impl<'a> TableScanBuilder<'a> { @@ -78,9 +80,16 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + limit: None, } } + /// Sets the maximum number of records to return + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + /// Sets the desired size of batches in the response /// to something other than the default pub fn with_batch_size(mut self, batch_size: Option) -> Self { @@ -285,6 +294,7 @@ impl<'a> TableScanBuilder<'a> { snapshot_schema: schema, case_sensitive: self.case_sensitive, predicate: self.filter.map(Arc::new), + limit: self.limit, snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new), object_cache: self.table.object_cache(), field_ids: Arc::new(field_ids), @@ -1508,6 +1518,130 @@ pub mod tests { assert_eq!(int64_arr.value(0), 2); } + #[tokio::test] + async fn test_limit() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let mut builder = fixture.table.scan(); + builder = builder.with_limit(Some(1)); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 1); + assert_eq!(batches[1].num_rows(), 1); + + let col = batches[0].column_by_name("x").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + + let col = batches[0].column_by_name("y").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 2); + + let col = batches[0].column_by_name("x").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + + let col = batches[0].column_by_name("y").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 2); + } + + #[tokio::test] + async fn test_limit_with_predicate() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y > 3 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y").greater_than(Datum::long(3)); + builder = builder.with_filter(predicate).with_limit(Some(1)); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 1); + assert_eq!(batches[1].num_rows(), 1); + + let col = batches[0].column_by_name("x").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + + let col = batches[0].column_by_name("y").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 4); + } + + #[tokio::test] + async fn test_limit_with_predicate_and_row_selection() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y > 3 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y").greater_than(Datum::long(3)); + builder = builder + .with_filter(predicate) + .with_limit(Some(1)) + .with_row_selection_enabled(true); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 1); + assert_eq!(batches[1].num_rows(), 1); + + let col = batches[0].column_by_name("x").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + + let col = batches[0].column_by_name("y").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 4); + } + + #[tokio::test] + async fn test_limit_higher_than_total_rows() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Filter: y > 3 + let mut builder = fixture.table.scan(); + let predicate = Reference::new("y").greater_than(Datum::long(3)); + builder = builder + .with_filter(predicate) + .with_limit(Some(100_000_000)) + .with_row_selection_enabled(true); + let table_scan = builder.build().unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 312); + assert_eq!(batches[1].num_rows(), 312); + + let col = batches[0].column_by_name("x").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 1); + + let col = batches[0].column_by_name("y").unwrap(); + let int64_arr = col.as_any().downcast_ref::().unwrap(); + assert_eq!(int64_arr.value(0), 4); + } + #[tokio::test] async fn test_filter_on_arrow_gt_eq() { let mut fixture = TableTestFixture::new(); @@ -1882,6 +2016,7 @@ pub mod tests { record_count: Some(100), data_file_format: DataFileFormat::Parquet, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, @@ -1900,6 +2035,7 @@ pub mod tests { record_count: None, data_file_format: DataFileFormat::Avro, deletes: vec![], + limit: None, partition: None, partition_spec: None, name_mapping: None, diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 5349a9bdd2..172889f79c 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -78,6 +78,9 @@ pub struct FileScanTask { /// The list of delete files that may need to be applied to this data file pub deletes: Vec, + /// Maximum number of records to return, None means no limit + pub limit: Option, + /// Partition data from the manifest entry, used to identify which columns can use /// constant values from partition metadata vs. reading from the data file. /// Per the Iceberg spec, only identity-transformed partition fields should use constants. diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index d627b6a63d..42583ddfff 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -51,7 +51,7 @@ pub struct IcebergTableScan { projection: Option>, /// Filters to apply to the table scan predicates: Option, - /// Optional limit on the number of rows to return + /// Maximum number of records to return, None means no limit limit: Option, } @@ -151,6 +151,7 @@ impl ExecutionPlan for IcebergTableScan { self.snapshot_id, self.projection.clone(), self.predicates.clone(), + self.limit, ); let stream = futures::stream::once(fut).try_flatten(); @@ -189,13 +190,14 @@ impl DisplayAs for IcebergTableScan { ) -> std::fmt::Result { write!( f, - "IcebergTableScan projection:[{}] predicate:[{}]", + "IcebergTableScan projection:[{}] predicate:[{}] limit:[{}]", self.projection .clone() .map_or(String::new(), |v| v.join(",")), self.predicates .clone() - .map_or(String::from(""), |p| format!("{p}")) + .map_or(String::from(""), |p| format!("{p}")), + self.limit.map_or(String::from(""), |l| format!("{l}")), ) } } @@ -210,6 +212,7 @@ async fn get_batch_stream( snapshot_id: Option, column_names: Option>, predicates: Option, + limit: Option, ) -> DFResult> + Send>>> { let scan_builder = match snapshot_id { Some(snapshot_id) => table.scan().snapshot_id(snapshot_id), @@ -223,6 +226,9 @@ async fn get_batch_stream( if let Some(pred) = predicates { scan_builder = scan_builder.with_filter(pred); } + + scan_builder = scan_builder.with_limit(limit); + let table_scan = scan_builder.build().map_err(to_datafusion_error)?; let stream = table_scan