From b9b783ba478d1e103d11d226d11583b3efb2f22a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 18 Mar 2026 19:47:06 -0400 Subject: [PATCH 1/5] ColContainer --- differential-dataflow/examples/col-spines.rs | 106 +++++++++++++++ differential-dataflow/src/collection.rs | 122 ++++++++++++++++++ differential-dataflow/src/containers.rs | 120 +++++++++++++++++ differential-dataflow/src/lib.rs | 2 +- .../src/trace/implementations/chunker.rs | 107 +++++++++++++++ .../src/trace/implementations/ord_neu.rs | 9 +- 6 files changed, 463 insertions(+), 3 deletions(-) create mode 100644 differential-dataflow/examples/col-spines.rs diff --git a/differential-dataflow/examples/col-spines.rs b/differential-dataflow/examples/col-spines.rs new file mode 100644 index 000000000..203b08303 --- /dev/null +++ b/differential-dataflow/examples/col-spines.rs @@ -0,0 +1,106 @@ +//! Arrangement experiments starting from columnar containers. +//! +//! This example builds `ColContainer` inputs directly using a +//! `ColContainerBuilder` and `InputHandle`, then feeds them through +//! the arrangement pipeline via `arrange_core` with `ColumnarKeyBatcher`. + +use timely::container::{ContainerBuilder, PushInto}; +use timely::dataflow::InputHandle; +use timely::dataflow::ProbeHandle; + +use differential_dataflow::operators::arrange::arrangement::arrange_core; +use differential_dataflow::containers::ColContainerBuilder; + +fn main() { + + type Update = ((String, ()), u64, isize); + type Builder = ColContainerBuilder; + + let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let size: usize = std::env::args().nth(2).unwrap().parse().unwrap(); + + let timer1 = ::std::time::Instant::now(); + let timer2 = timer1.clone(); + + timely::execute_from_args(std::env::args(), move |worker| { + + let mut data_input = >::new_with_builder(); + let mut keys_input = >::new_with_builder(); + let mut probe = ProbeHandle::new(); + + worker.dataflow::(|scope| { + + let data = data_input.to_stream(scope); + let keys = keys_input.to_stream(scope); + + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::trace::implementations::ord_neu::{ + ColumnarKeyBatcher, RcOrdKeyBuilder, OrdKeySpine, + }; + + let data = arrange_core::<_, _, ColumnarKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>( + data, Pipeline, "Data", + ); + let keys = arrange_core::<_, _, ColumnarKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>( + keys, Pipeline, "Keys", + ); + + keys.join_core(data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + }); + + let mut data_builder = Builder::default(); + let mut keys_builder = Builder::default(); + + // Load up data in batches. + // Note: `format_args!` avoids allocating a String; the columnar + // `Strings` container accepts `fmt::Arguments` directly and writes + // the formatted output straight into its byte buffer. + let mut counter = 0; + while counter < 10 * keys { + let mut i = worker.index(); + let time = *data_input.time(); + while i < size { + let val = (counter + i) % keys; + data_builder.push_into(((format_args!("{:?}", val), ()), time, 1isize)); + i += worker.peers(); + } + while let Some(container) = data_builder.finish() { + data_input.send_batch(container); + } + counter += size; + data_input.advance_to(data_input.time() + 1); + keys_input.advance_to(keys_input.time() + 1); + while probe.less_than(data_input.time()) { + worker.step_or_park(None); + } + } + println!("{:?}\tloading complete", timer1.elapsed()); + + let mut queries = 0; + while queries < 10 * keys { + let mut i = worker.index(); + let time = *data_input.time(); + while i < size { + let val = (queries + i) % keys; + keys_builder.push_into(((format_args!("{:?}", val), ()), time, 1isize)); + i += worker.peers(); + } + while let Some(container) = keys_builder.finish() { + keys_input.send_batch(container); + } + queries += size; + data_input.advance_to(data_input.time() + 1); + keys_input.advance_to(keys_input.time() + 1); + while probe.less_than(data_input.time()) { + worker.step_or_park(None); + } + } + + println!("{:?}\tqueries complete", timer1.elapsed()); + + }).unwrap(); + + println!("{:?}\tshut down", timer2.elapsed()); + +} diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index c13227f2c..5405d6091 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -1241,6 +1241,21 @@ pub mod vec { } } +pub use col::Collection as ColCollection; +/// Specializations of `Collection` that use columnar containers. +pub mod col { + + use timely::dataflow::ScopeParent; + + use crate::containers::ColContainer; + + /// An evolving collection of `(D, T, R)` updates backed by columnar storage. + /// + /// This type is analogous to [`super::vec::Collection`] but stores data in a + /// columnar layout via `<(D, G::Timestamp, R) as Columnar>::Container`. + pub type Collection = super::Collection::Timestamp, R)>>; +} + /// Conversion to a differential dataflow Collection. pub trait AsCollection { /// Converts the type to a differential dataflow collection. @@ -1358,6 +1373,113 @@ pub mod containers { } } + /// Implementations of container traits for the `ColContainer` container. + mod col { + + use columnar::{Columnar, Borrow, Len, Index, Push}; + use timely::progress::{Timestamp, timestamp::Refines}; + use crate::collection::Abelian; + use crate::containers::ColContainer; + + use super::{Negate, Enter, Leave, ResultsIn}; + + // For `(D, T, R): Columnar`, the container is `(D::Container, T::Container, R::Container)`. + // From `Columnar`, this container is `Borrow + Clear + Len + Clone + Default`. + // `Borrow::Borrowed<'a>` implements `Index>` and `Len`. + // `Borrow::Ref<'a>` is `Copy` and equals `(Ref<'a, D>, Ref<'a, T>, Ref<'a, R>)`. + // The container also implements `Push>` and `Push<&'a (D, T, R)>`. + + impl Negate for ColContainer<(D, T, R)> + where + D: Columnar, + T: Columnar, + R: Columnar + Abelian, + { + fn negate(self) -> Self { + let mut result = ColContainer::<(D, T, R)>::default(); + let borrowed = self.container.borrow(); + for i in 0..borrowed.len() { + let (d, t, r) = borrowed.get(i); + let mut r_owned = R::into_owned(r); + r_owned.negate(); + result.container.0.push(d); + result.container.1.push(t); + result.container.2.push(&r_owned); + } + result + } + } + + impl Enter for ColContainer<(D, T1, R)> + where + D: Columnar, + T1: Columnar + Timestamp, + T2: Columnar + Refines, + R: Columnar, + T2::Container: Push, + { + type InnerContainer = ColContainer<(D, T2, R)>; + fn enter(self) -> Self::InnerContainer { + let mut result = ColContainer::<(D, T2, R)>::default(); + let borrowed = self.container.borrow(); + for i in 0..borrowed.len() { + let (d, t, r) = borrowed.get(i); + let t_inner = T2::to_inner(T1::into_owned(t)); + result.container.0.push(d); + result.container.1.push(t_inner); + result.container.2.push(r); + } + result + } + } + + impl Leave for ColContainer<(D, T1, R)> + where + D: Columnar, + T1: Columnar + Refines, + T2: Columnar + Timestamp, + R: Columnar, + T2::Container: Push, + { + type OuterContainer = ColContainer<(D, T2, R)>; + fn leave(self) -> Self::OuterContainer { + let mut result = ColContainer::<(D, T2, R)>::default(); + let borrowed = self.container.borrow(); + for i in 0..borrowed.len() { + let (d, t, r) = borrowed.get(i); + let t_outer = T1::into_owned(t).to_outer(); + result.container.0.push(d); + result.container.1.push(t_outer); + result.container.2.push(r); + } + result + } + } + + impl ResultsIn for ColContainer<(D, T, R)> + where + D: Columnar, + T: Columnar + Timestamp, + R: Columnar, + T::Container: Push, + { + fn results_in(self, step: &T::Summary) -> Self { + use timely::progress::PathSummary; + let mut result = ColContainer::<(D, T, R)>::default(); + let borrowed = self.container.borrow(); + for i in 0..borrowed.len() { + let (d, t, r) = borrowed.get(i); + if let Some(t2) = step.results_in(&T::into_owned(t)) { + result.container.0.push(d); + result.container.1.push(t2); + result.container.2.push(r); + } + } + result + } + } + } + /// Implementations of container traits for the `Rc` container. mod rc { use std::rc::Rc; diff --git a/differential-dataflow/src/containers.rs b/differential-dataflow/src/containers.rs index 4d3f684e5..72668d0c1 100644 --- a/differential-dataflow/src/containers.rs +++ b/differential-dataflow/src/containers.rs @@ -300,3 +300,123 @@ mod container { } } } + +/// A container backed by columnar storage. +/// +/// This type wraps a `::Container` and provides the trait +/// implementations needed to use it as a timely dataflow container. It is +/// an append-only container: elements are pushed in and then read out by +/// index, but not mutated in place. +pub struct ColContainer { + /// The underlying columnar container. + pub container: C::Container, +} + +impl Default for ColContainer { + fn default() -> Self { Self { container: Default::default() } } +} + +impl Clone for ColContainer +where + C::Container: Clone, +{ + fn clone(&self) -> Self { Self { container: self.container.clone() } } +} + +impl ColContainer { + /// The number of elements in the container. + pub fn len(&self) -> usize where C::Container: columnar::Len { + columnar::Len::len(&self.container) + } + /// Whether the container is empty. + pub fn is_empty(&self) -> bool where C::Container: columnar::Len { + self.len() == 0 + } + /// Clears the container. + pub fn clear(&mut self) where C::Container: columnar::Clear { + columnar::Clear::clear(&mut self.container) + } + /// Pushes an element into the container. + pub fn push(&mut self, item: T) where C::Container: columnar::Push { + columnar::Push::push(&mut self.container, item) + } +} + +impl timely::container::Accountable for ColContainer +where + C::Container: columnar::Len, +{ + #[inline] + fn record_count(&self) -> i64 { self.len() as i64 } + #[inline] + fn is_empty(&self) -> bool { self.is_empty() } +} + +/// A `ContainerBuilder` that accumulates elements into `ColContainer`. +/// +/// Elements are pushed into a columnar container, and when it exceeds a +/// size threshold a completed container is made available. +pub struct ColContainerBuilder { + current: ColContainer, + pending: std::collections::VecDeque>, + empty: Option>, +} + +impl Default for ColContainerBuilder { + fn default() -> Self { + Self { + current: Default::default(), + pending: Default::default(), + empty: None, + } + } +} + +impl ColContainerBuilder { + const TARGET_SIZE: usize = 1 << 20; +} + +impl PushInto for ColContainerBuilder +where + C::Container: columnar::Push + columnar::Len + columnar::Clear, +{ + fn push_into(&mut self, item: T) { + columnar::Push::push(&mut self.current.container, item); + if self.current.len() >= Self::TARGET_SIZE { + self.pending.push_back(std::mem::take(&mut self.current)); + } + } +} + +impl timely::container::ContainerBuilder for ColContainerBuilder +where + C::Container: columnar::Len + columnar::Clear + Clone, +{ + type Container = ColContainer; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.pending.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.current.is_empty() { + self.pending.push_back(std::mem::take(&mut self.current)); + } + if let Some(ready) = self.pending.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } +} + +impl timely::container::LengthPreservingContainerBuilder for ColContainerBuilder +where + C::Container: columnar::Len + columnar::Clear + Clone, +{ } diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index 615aa5ded..6b86c7bb6 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -76,7 +76,7 @@ use std::fmt::Debug; -pub use collection::{AsCollection, Collection, VecCollection}; +pub use collection::{AsCollection, Collection, VecCollection, ColCollection}; pub use hashable::Hashable; pub use difference::Abelian as Diff; diff --git a/differential-dataflow/src/trace/implementations/chunker.rs b/differential-dataflow/src/trace/implementations/chunker.rs index 71d7aeb41..bd584845f 100644 --- a/differential-dataflow/src/trace/implementations/chunker.rs +++ b/differential-dataflow/src/trace/implementations/chunker.rs @@ -235,6 +235,113 @@ where } } +/// Chunk a stream of `ColContainer` into chains of vectors. +/// +/// Accepts `ColContainer<(D, T, R)>` as input, reads elements into an +/// internal `Vec`, sorts, consolidates, and produces `Vec<(D, T, R)>` chunks. +pub struct ColumnarChunker { + pending: Vec, + ready: VecDeque>, + empty: Option>, +} + +impl Default for ColumnarChunker { + fn default() -> Self { + Self { + pending: Vec::default(), + ready: VecDeque::default(), + empty: None, + } + } +} + +impl ColumnarChunker<(D, T, R)> +where + D: Ord, + T: Ord, + R: Semigroup, +{ + const BUFFER_SIZE_BYTES: usize = 8 << 10; + fn chunk_capacity() -> usize { + let size = ::std::mem::size_of::<(D, T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + fn form_chunk(&mut self) { + consolidate_updates(&mut self.pending); + if self.pending.len() >= Self::chunk_capacity() { + while self.pending.len() > Self::chunk_capacity() { + let mut chunk = Vec::with_capacity(Self::chunk_capacity()); + chunk.extend(self.pending.drain(..chunk.capacity())); + self.ready.push_back(chunk); + } + } + } +} + +impl<'a, D, T, R> PushInto<&'a mut crate::containers::ColContainer<(D, T, R)>> for ColumnarChunker<(D, T, R)> +where + D: columnar::Columnar + Ord + Clone, + T: columnar::Columnar + Ord + Clone, + R: columnar::Columnar + Semigroup + Clone, +{ + fn push_into(&mut self, container: &'a mut crate::containers::ColContainer<(D, T, R)>) { + use columnar::{Borrow, Index, Len}; + // Ensure capacity. + if self.pending.capacity() < Self::chunk_capacity() * 2 { + self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); + } + + // Read elements from the columnar container into the pending Vec. + let borrowed = container.container.borrow(); + for i in 0..borrowed.len() { + let (d, t, r) = borrowed.get(i); + self.pending.push((D::into_owned(d), T::into_owned(t), R::into_owned(r))); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); + } + } + container.clear(); + } +} + +impl ContainerBuilder for ColumnarChunker<(D, T, R)> +where + D: Ord + Clone + 'static, + T: Ord + Clone + 'static, + R: Semigroup + Clone + 'static, +{ + type Container = Vec<(D, T, R)>; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.pending.is_empty() { + consolidate_updates(&mut self.pending); + while !self.pending.is_empty() { + let mut chunk = Vec::with_capacity(Self::chunk_capacity()); + chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity()))); + self.ready.push_back(chunk); + } + } + self.empty = self.ready.pop_front(); + self.empty.as_mut() + } +} + /// Chunk a stream of containers into chains of vectors. pub struct ContainerChunker { pending: Output, diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index b6f9a198e..3c28d448d 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -10,8 +10,8 @@ use std::rc::Rc; -use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; +use crate::containers::{TimelyStack, ColContainer}; +use crate::trace::implementations::chunker::{ColumnationChunker, ColumnarChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::rc_blanket_impls::RcBuilder; @@ -58,6 +58,11 @@ pub type ColKeyBuilder = RcBuilder, // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; +/// A batcher accepting `ColContainer` input, producing Vec-based key batches. +pub type ColumnarKeyBatcher = MergeBatcher, ColumnarChunker<((K,()), T, R)>, VecMerger<(K,()), T, R>>; +/// A batcher accepting `ColContainer` input, producing Vec-based key-value batches. +pub type ColumnarValBatcher = MergeBatcher, ColumnarChunker<((K,V), T, R)>, VecMerger<(K,V), T, R>>; + pub use layers::{Vals, Upds}; /// Layers are containers of lists of some type. /// From 0dc0f3a6d58178e17db2d9805a227bc0b134f97a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 19 Mar 2026 06:10:11 -0400 Subject: [PATCH 2/5] Columnar container used in chunkers and batchers --- differential-dataflow/examples/col-spines.rs | 6 +- differential-dataflow/src/containers.rs | 50 ++++++ .../src/trace/implementations/chunker.rs | 165 ++++++++++++++++++ .../trace/implementations/merge_batcher.rs | 109 ++++++++++++ .../src/trace/implementations/ord_neu.rs | 56 +++++- 5 files changed, 382 insertions(+), 4 deletions(-) diff --git a/differential-dataflow/examples/col-spines.rs b/differential-dataflow/examples/col-spines.rs index 203b08303..65f1a639e 100644 --- a/differential-dataflow/examples/col-spines.rs +++ b/differential-dataflow/examples/col-spines.rs @@ -35,13 +35,13 @@ fn main() { use timely::dataflow::channels::pact::Pipeline; use differential_dataflow::trace::implementations::ord_neu::{ - ColumnarKeyBatcher, RcOrdKeyBuilder, OrdKeySpine, + ColumnarColKeyBatcher, RcOrdKeyBuilderFromCol, OrdKeySpine, }; - let data = arrange_core::<_, _, ColumnarKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>( + let data = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcOrdKeyBuilderFromCol<_,_,_>, OrdKeySpine<_,_,_>>( data, Pipeline, "Data", ); - let keys = arrange_core::<_, _, ColumnarKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>( + let keys = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcOrdKeyBuilderFromCol<_,_,_>, OrdKeySpine<_,_,_>>( keys, Pipeline, "Keys", ); diff --git a/differential-dataflow/src/containers.rs b/differential-dataflow/src/containers.rs index 72668d0c1..ff235f880 100644 --- a/differential-dataflow/src/containers.rs +++ b/differential-dataflow/src/containers.rs @@ -352,6 +352,56 @@ where fn is_empty(&self) -> bool { self.is_empty() } } +/// An iterator that walks a `ColContainer<(D,T,R)>` by index, yielding columnar refs. +pub struct ColContainerDrain<'a, D: columnar::Columnar, T: columnar::Columnar, R: columnar::Columnar> { + borrowed: <(D::Container, T::Container, R::Container) as columnar::Borrow>::Borrowed<'a>, + index: usize, + len: usize, +} + +impl<'a, D: columnar::Columnar, T: columnar::Columnar, R: columnar::Columnar> Iterator for ColContainerDrain<'a, D, T, R> { + type Item = (columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>); + #[inline] + fn next(&mut self) -> Option { + if self.index < self.len { + let item = columnar::Index::get(&self.borrowed, self.index); + self.index += 1; + Some(item) + } else { + None + } + } +} + +impl timely::container::DrainContainer for ColContainer<(D, T, R)> { + type Item<'a> = (columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>); + type DrainIter<'a> = ColContainerDrain<'a, D, T, R>; + fn drain(&mut self) -> Self::DrainIter<'_> { + let borrowed = columnar::Borrow::borrow(&self.container); + let len = columnar::Len::len(&borrowed); + ColContainerDrain { borrowed, index: 0, len } + } +} + +impl timely::container::SizableContainer for ColContainer<(D, T, R)> { + fn at_capacity(&self) -> bool { + self.len() >= timely::container::buffer::default_capacity::<(D, T, R)>() + } + fn ensure_capacity(&mut self, _stash: &mut Option) { + // Columnar containers grow dynamically; nothing special needed. + } +} + +impl<'a, D: columnar::Columnar, T: columnar::Columnar, R: columnar::Columnar> + PushInto<(columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>)> + for ColContainer<(D, T, R)> +{ + #[inline] + fn push_into(&mut self, item: (columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>)) { + columnar::Push::push(&mut self.container, item); + } +} + /// A `ContainerBuilder` that accumulates elements into `ColContainer`. /// /// Elements are pushed into a columnar container, and when it exceeds a diff --git a/differential-dataflow/src/trace/implementations/chunker.rs b/differential-dataflow/src/trace/implementations/chunker.rs index bd584845f..90047e999 100644 --- a/differential-dataflow/src/trace/implementations/chunker.rs +++ b/differential-dataflow/src/trace/implementations/chunker.rs @@ -342,6 +342,171 @@ where } } +/// Chunk a stream of `ColContainer` into chains of `ColContainer`. +/// +/// Accumulates input into a staging `ColContainer`, then sorts columnar refs +/// and consolidates into output `ColContainer` chunks — staying columnar throughout. +pub struct ColumnarColChunker { + /// Staging area: input gets appended here. + pending: crate::containers::ColContainer<(D, T, R)>, + /// Completed, sorted, consolidated chunks. + ready: VecDeque>, + /// Scratch space for the empty output. + empty: Option>, + /// Reusable index buffer for sorting. + indices: Vec, +} + +impl Default for ColumnarColChunker { + fn default() -> Self { + Self { + pending: Default::default(), + ready: Default::default(), + empty: None, + indices: Vec::new(), + } + } +} + +impl ColumnarColChunker { + const BUFFER_SIZE_BYTES: usize = 8 << 10; + fn chunk_capacity() -> usize { + let size = ::std::mem::size_of::<(D, T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } +} + +impl ColumnarColChunker +where + D: columnar::Columnar, + T: columnar::Columnar, + R: columnar::Columnar + Default + Semigroup, + for<'a> columnar::Ref<'a, D>: Ord, + for<'a> columnar::Ref<'a, T>: Ord, + for<'a> (D::Container, T::Container, R::Container): columnar::Push<(columnar::Ref<'a, D>, columnar::Ref<'a, T>, &'a R)>, +{ + /// Sort refs from `pending`, consolidate, push sorted results into `ColContainer` chunks. + fn form_chunks(&mut self) { + use columnar::{Borrow, Index, Len, Push}; + + // Swap pending out so we can borrow it immutably while pushing to ready. + let mut source = std::mem::take(&mut self.pending); + let borrowed = source.container.borrow(); + let len = borrowed.len(); + if len == 0 { + // Swap the (empty) allocation back. + self.pending = source; + return; + } + + // Reuse the indices buffer: fill with 0..len, sort by (data, time). + self.indices.clear(); + Extend::extend(&mut self.indices, 0..len); + self.indices.sort_unstable_by(|&i, &j| { + let (d1, t1, _) = borrowed.get(i); + let (d2, t2, _) = borrowed.get(j); + (d1, t1).cmp(&(d2, t2)) + }); + + // Consolidate: merge adjacent equal (d, t) pairs by summing diffs. + let mut chunk = crate::containers::ColContainer::<(D, T, R)>::default(); + let mut idx = 0; + while idx < self.indices.len() { + let (d, t, r) = borrowed.get(self.indices[idx]); + let mut r_owned = R::into_owned(r); + idx += 1; + while idx < self.indices.len() { + let (d2, t2, r2) = borrowed.get(self.indices[idx]); + if d == d2 && t == t2 { + let r2_owned = R::into_owned(r2); + r_owned.plus_equals(&r2_owned); + idx += 1; + } else { + break; + } + } + if !r_owned.is_zero() { + chunk.container.0.push(d); + chunk.container.1.push(t); + chunk.container.2.push(&r_owned); + if chunk.len() >= Self::chunk_capacity() { + self.ready.push_back(std::mem::take(&mut chunk)); + } + } + } + if !chunk.is_empty() { + self.ready.push_back(chunk); + } + + // Clear and reclaim the source's allocation for next time. + let _ = borrowed; + source.clear(); + self.pending = source; + } +} + +impl<'a, D, T, R> PushInto<&'a mut crate::containers::ColContainer<(D, T, R)>> for ColumnarColChunker +where + D: columnar::Columnar, + T: columnar::Columnar, + R: columnar::Columnar + Default + Semigroup, + for<'b> columnar::Ref<'b, D>: Ord, + for<'b> columnar::Ref<'b, T>: Ord, + for<'b> (D::Container, T::Container, R::Container): columnar::Push<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'b R)>, +{ + fn push_into(&mut self, container: &'a mut crate::containers::ColContainer<(D, T, R)>) { + use columnar::{Borrow, Container, Len}; + // Append input into pending by extending from the source container. + let borrowed = container.container.borrow(); + let len = borrowed.len(); + self.pending.container.extend_from_self(borrowed, 0..len); + container.clear(); + // If pending is large enough, sort and chunk. + if self.pending.len() >= Self::chunk_capacity() * 2 { + self.form_chunks(); + } + } +} + +impl ContainerBuilder for ColumnarColChunker +where + D: columnar::Columnar + 'static, + T: columnar::Columnar + 'static, + R: columnar::Columnar + Default + Semigroup + 'static, + for<'a> columnar::Ref<'a, D>: Ord, + for<'a> columnar::Ref<'a, T>: Ord, + for<'a> (D::Container, T::Container, R::Container): columnar::Push<(columnar::Ref<'a, D>, columnar::Ref<'a, T>, &'a R)>, +{ + type Container = crate::containers::ColContainer<(D, T, R)>; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.pending.is_empty() { + self.form_chunks(); + } + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } +} + /// Chunk a stream of containers into chains of vectors. pub struct ContainerChunker { pending: Output, diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 5c932af9f..2d10fef01 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -603,4 +603,113 @@ pub mod container { #[inline] fn clear(&mut self) { TimelyStack::clear(self) } } } + + pub use col_container::ColumnarMerger; + /// Implementations of `ContainerQueue` and `MergerChunk` for `ColContainer` (columnar). + pub mod col_container { + + use columnar::Columnar; + use timely::progress::{Antichain, frontier::AntichainRef}; + use timely::container::PushInto; + + use crate::containers::ColContainer; + use crate::difference::Semigroup; + + use super::{ContainerQueue, MergerChunk}; + + /// A `Merger` implementation backed by `ColContainer` (columnar storage). + pub type ColumnarMerger = super::ContainerMerger, ColContainerQueue>; + + /// A queue that walks a `ColContainer<(D,T,R)>` by index. + pub struct ColContainerQueue { + container: ColContainer<(D, T, R)>, + head: usize, + } + + impl ColContainerQueue { + fn peek(&self) -> (columnar::Ref<'_, D>, columnar::Ref<'_, T>, columnar::Ref<'_, R>) { + let borrowed = columnar::Borrow::borrow(&self.container.container); + columnar::Index::get(&borrowed, self.head) + } + } + + impl ContainerQueue> for ColContainerQueue + where + D: Columnar, + T: Columnar, + R: Columnar, + for<'a> columnar::Ref<'a, D>: Ord, + for<'a> columnar::Ref<'a, T>: Ord, + { + fn next_or_alloc(&mut self) -> Result< as timely::container::DrainContainer>::Item<'_>, ColContainer<(D, T, R)>> { + if self.head >= self.container.len() { + Err(std::mem::take(&mut self.container)) + } else { + let borrowed = columnar::Borrow::borrow(&self.container.container); + let item = columnar::Index::get(&borrowed, self.head); + self.head += 1; + Ok(item) + } + } + fn is_empty(&self) -> bool { + self.head >= self.container.len() + } + fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { + let (d1, t1, _) = self.peek(); + let (d2, t2, _) = other.peek(); + (d1, t1).cmp(&(d2, t2)) + } + fn from(container: ColContainer<(D, T, R)>) -> Self { + ColContainerQueue { container, head: 0 } + } + } + + impl MergerChunk for ColContainer<(D, T, R)> + where + D: Columnar + 'static, + T: Columnar + Ord + timely::PartialOrder + Clone + 'static, + R: Columnar + Default + Semigroup + 'static, + for<'a> columnar::Ref<'a, T>: Ord, + for<'a> ColContainer<(D, T, R)>: PushInto<(columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>)>, + { + type TimeOwned = T; + type DiffOwned = R; + + fn time_kept( + (_, time, _): &::Item<'_>, + upper: &AntichainRef, + frontier: &mut Antichain, + ) -> bool { + let time_owned = T::into_owned(*time); + if upper.less_equal(&time_owned) { + frontier.insert(time_owned); + true + } else { + false + } + } + + fn push_and_add<'a>( + &mut self, + (d1, t1, r1): ::Item<'a>, + (_d2, _t2, r2): ::Item<'a>, + stash: &mut Self::DiffOwned, + ) { + use columnar::Push; + *stash = R::into_owned(r1); + let r2_owned = R::into_owned(r2); + stash.plus_equals(&r2_owned); + if !stash.is_zero() { + self.container.0.push(d1); + self.container.1.push(t1); + self.container.2.push(stash as &R); + } + } + + #[inline] + fn clear(&mut self) { + ColContainer::clear(self); + } + } + } } diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 3c28d448d..a8f338698 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -11,9 +11,10 @@ use std::rc::Rc; use crate::containers::{TimelyStack, ColContainer}; -use crate::trace::implementations::chunker::{ColumnationChunker, ColumnarChunker, VecChunker}; +use crate::trace::implementations::chunker::{ColumnationChunker, ColumnarChunker, ColumnarColChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; +use crate::trace::implementations::merge_batcher::container::col_container::ColumnarMerger; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Layout, Vector, TStack}; @@ -63,6 +64,59 @@ pub type ColumnarKeyBatcher = MergeBatcher /// A batcher accepting `ColContainer` input, producing Vec-based key-value batches. pub type ColumnarValBatcher = MergeBatcher, ColumnarChunker<((K,V), T, R)>, VecMerger<(K,V), T, R>>; +/// A batcher accepting `ColContainer` input, staying columnar throughout (chunker and merger). +pub type ColumnarColKeyBatcher = MergeBatcher, ColumnarColChunker<(K,()), T, R>, ColumnarMerger<(K,()), T, R>>; +/// A batcher accepting `ColContainer` input, staying columnar throughout (key-value variant). +pub type ColumnarColValBatcher = MergeBatcher, ColumnarColChunker<(K,V), T, R>, ColumnarMerger<(K,V), T, R>>; + +/// A builder that accepts `ColContainer` chunks and converts them to `Vec` for an inner builder. +pub struct ColToVecBuilder { + inner: B, +} + +impl crate::trace::Builder for ColToVecBuilder +where + D: columnar::Columnar + Clone + 'static, + T: columnar::Columnar + Clone + timely::progress::Timestamp, + R: columnar::Columnar + Clone + 'static, + B: crate::trace::Builder, Time = T>, +{ + type Input = ColContainer<(D, T, R)>; + type Time = T; + type Output = B::Output; + + fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { + ColToVecBuilder { inner: B::with_capacity(keys, vals, upds) } + } + fn push(&mut self, chunk: &mut Self::Input) { + use columnar::{Borrow, Index, Len}; + let borrowed = chunk.container.borrow(); + let mut vec = Vec::with_capacity(borrowed.len()); + for i in 0..borrowed.len() { + let (d, t, r) = borrowed.get(i); + vec.push((D::into_owned(d), T::into_owned(t), R::into_owned(r))); + } + self.inner.push(&mut vec); + } + fn done(self, description: crate::trace::Description) -> Self::Output { + self.inner.done(description) + } + fn seal(chain: &mut Vec, description: crate::trace::Description) -> Self::Output { + use columnar::{Borrow, Index, Len}; + let mut vec_chain: Vec> = chain.drain(..).map(|col| { + let borrowed = col.container.borrow(); + (0..borrowed.len()).map(|i| { + let (d, t, r) = borrowed.get(i); + (D::into_owned(d), T::into_owned(t), R::into_owned(r)) + }).collect() + }).collect(); + B::seal(&mut vec_chain, description) + } +} + +/// Builder alias: wraps `RcOrdKeyBuilder` to accept `ColContainer` chunks. +pub type RcOrdKeyBuilderFromCol = ColToVecBuilder>; + pub use layers::{Vals, Upds}; /// Layers are containers of lists of some type. /// From d9f681cbccaf8d50804f8946d5641b911b417d45 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 19 Mar 2026 20:57:56 -0400 Subject: [PATCH 3/5] Columnar end-to-end --- differential-dataflow/examples/col-spines.rs | 8 +- differential-dataflow/src/containers.rs | 15 ++- .../src/trace/implementations/chunker.rs | 2 +- .../src/trace/implementations/mod.rs | 123 ++++++++++++++++++ .../src/trace/implementations/ord_neu.rs | 10 ++ 5 files changed, 149 insertions(+), 9 deletions(-) diff --git a/differential-dataflow/examples/col-spines.rs b/differential-dataflow/examples/col-spines.rs index 65f1a639e..178f74005 100644 --- a/differential-dataflow/examples/col-spines.rs +++ b/differential-dataflow/examples/col-spines.rs @@ -35,17 +35,17 @@ fn main() { use timely::dataflow::channels::pact::Pipeline; use differential_dataflow::trace::implementations::ord_neu::{ - ColumnarColKeyBatcher, RcOrdKeyBuilderFromCol, OrdKeySpine, + ColumnarColKeyBatcher, RcColumnarKeyBuilder, ColumnarKeySpine, }; - let data = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcOrdKeyBuilderFromCol<_,_,_>, OrdKeySpine<_,_,_>>( + let data = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>( data, Pipeline, "Data", ); - let keys = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcOrdKeyBuilderFromCol<_,_,_>, OrdKeySpine<_,_,_>>( + let keys = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>( keys, Pipeline, "Keys", ); - keys.join_core(data, |_k, &(), &()| Option::<()>::None) + keys.join_core(data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); }); diff --git a/differential-dataflow/src/containers.rs b/differential-dataflow/src/containers.rs index ff235f880..072adbbdd 100644 --- a/differential-dataflow/src/containers.rs +++ b/differential-dataflow/src/containers.rs @@ -301,6 +301,13 @@ mod container { } } +/// A columnar container whose references can be ordered. +/// +/// This trait alias packages the `for<'a> Container: Ord>` bound +/// that is needed in many places (e.g. `BatchContainer`, `Layout`, `BuilderInput`). +pub trait OrdContainer : for<'a> columnar::Container : Ord> { } +impl columnar::Container : Ord>> OrdContainer for C { } + /// A container backed by columnar storage. /// /// This type wraps a `::Container` and provides the trait @@ -392,12 +399,12 @@ impl timely } } -impl<'a, D: columnar::Columnar, T: columnar::Columnar, R: columnar::Columnar> - PushInto<(columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>)> - for ColContainer<(D, T, R)> +impl PushInto for ColContainer +where + C::Container: columnar::Push, { #[inline] - fn push_into(&mut self, item: (columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>)) { + fn push_into(&mut self, item: T) { columnar::Push::push(&mut self.container, item); } } diff --git a/differential-dataflow/src/trace/implementations/chunker.rs b/differential-dataflow/src/trace/implementations/chunker.rs index 90047e999..b32166d61 100644 --- a/differential-dataflow/src/trace/implementations/chunker.rs +++ b/differential-dataflow/src/trace/implementations/chunker.rs @@ -369,7 +369,7 @@ impl Defaul } impl ColumnarColChunker { - const BUFFER_SIZE_BYTES: usize = 8 << 10; + const BUFFER_SIZE_BYTES: usize = 64 << 10; fn chunk_capacity() -> usize { let size = ::std::mem::size_of::<(D, T, R)>(); if size == 0 { diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index d73eb71f4..efce34c4e 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -250,6 +250,30 @@ where type OffsetContainer = OffsetList; } +/// A layout based on columnar containers. +pub struct Columnar { + phantom: std::marker::PhantomData, +} + +impl Layout for Columnar +where + U: Update, + U::Key: columnar::Columnar + Clone + Ord, + U::Val: columnar::Columnar + Clone + Ord, + U::Time: columnar::Columnar + Clone + Ord, + U::Diff: columnar::Columnar + Clone + Ord, + ::Container: crate::containers::OrdContainer, + ::Container: crate::containers::OrdContainer, + ::Container: crate::containers::OrdContainer, + ::Container: crate::containers::OrdContainer, +{ + type KeyContainer = crate::containers::ColContainer; + type ValContainer = crate::containers::ColContainer; + type TimeContainer = crate::containers::ColContainer; + type DiffContainer = crate::containers::ColContainer; + type OffsetContainer = OffsetList; +} + /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)] pub struct OffsetList { @@ -509,6 +533,66 @@ where } } +impl BuilderInput for crate::containers::ColContainer<((K, V), T, R)> +where + K: columnar::Columnar + Clone + Ord, + V: columnar::Columnar + Clone + Ord, + T: columnar::Columnar + Clone + Ord + Timestamp + Lattice, + R: columnar::Columnar + Clone + Ord + Semigroup, + for<'a> columnar::Ref<'a, K>: Ord, + for<'a> columnar::Ref<'a, V>: Ord, + for<'a> columnar::Ref<'a, T>: Ord, + for<'a> columnar::Ref<'a, R>: Ord, + KBC: for<'a> BatchContainer: PartialEq>>, + VBC: for<'a> BatchContainer: PartialEq>>, +{ + type Key<'a> = columnar::Ref<'a, K>; + type Val<'a> = columnar::Ref<'a, V>; + type Time = T; + type Diff = R; + + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + let ((key, val), time, diff) = item; + (key, val, T::into_owned(time), R::into_owned(diff)) + } + + fn key_eq(this: &columnar::Ref<'_, K>, other: KBC::ReadItem<'_>) -> bool { + KBC::reborrow(other) == K::reborrow(*this) + } + + fn val_eq(this: &columnar::Ref<'_, V>, other: VBC::ReadItem<'_>) -> bool { + VBC::reborrow(other) == V::reborrow(*this) + } + + fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { + use columnar::{Borrow, Index, Len}; + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval: Option<(columnar::Ref<'_, K>, columnar::Ref<'_, V>)> = None; + for link in chain.iter() { + let borrowed = link.container.borrow(); + for i in 0..borrowed.len() { + let ((key, val), _, _) = borrowed.get(i); + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; + } + } else { + keys += 1; + vals += 1; + } + upds += 1; + prev_keyval = Some((key, val)); + } + } + (keys, vals, upds) + } +} + pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. @@ -688,6 +772,45 @@ pub mod containers { } } + impl BatchContainer for crate::containers::ColContainer + where + C: columnar::Columnar + Clone + Ord, + C::Container: crate::containers::OrdContainer, + { + type Owned = C; + type ReadItem<'a> = columnar::Ref<'a, C>; + + #[inline(always)] + fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { C::into_owned(item) } + #[inline(always)] + fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { C::copy_from(other, item); } + + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { C::reborrow(item) } + + fn push_ref(&mut self, item: Self::ReadItem<'_>) { columnar::Push::push(&mut self.container, item) } + fn push_own(&mut self, item: &Self::Owned) { columnar::Push::<&C>::push(&mut self.container, item) } + + fn clear(&mut self) { columnar::Clear::clear(&mut self.container) } + + fn with_capacity(_size: usize) -> Self { + Default::default() + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + let borrowed1 = columnar::Borrow::borrow(&cont1.container); + let borrowed2 = columnar::Borrow::borrow(&cont2.container); + let mut new = Self::default(); + columnar::Container::reserve_for(&mut new.container, std::iter::once(borrowed1).chain(std::iter::once(borrowed2))); + new + } + #[inline(always)] + fn index(&self, index: usize) -> Self::ReadItem<'_> { + columnar::Index::get(&columnar::Borrow::borrow(&self.container), index) + } + fn len(&self) -> usize { + columnar::Len::len(&self.container) + } + } + /// A container that accepts slices `[B::Item]`. pub struct SliceContainer { /// Offsets that bound each contained slice. diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index a8f338698..551d471a6 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -117,6 +117,16 @@ where /// Builder alias: wraps `RcOrdKeyBuilder` to accept `ColContainer` chunks. pub type RcOrdKeyBuilderFromCol = ColToVecBuilder>; +/// A spine backed by columnar containers throughout. +pub type ColumnarKeySpine = Spine>>>; +/// A spine backed by columnar containers throughout (key-value variant). +pub type ColumnarValSpine = Spine>>>; + +/// A builder that accepts `ColContainer` chunks directly into a columnar layout. +pub type RcColumnarKeyBuilder = RcBuilder, ColContainer<((K,()),T,R)>>>; +/// A builder that accepts `ColContainer` chunks directly into a columnar layout (key-value variant). +pub type RcColumnarValBuilder = RcBuilder, ColContainer<((K,V),T,R)>>>; + pub use layers::{Vals, Upds}; /// Layers are containers of lists of some type. /// From 3e32cdeafd96970577189ddd0265d0b03b29e04e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 19 Mar 2026 21:44:30 -0400 Subject: [PATCH 4/5] Internal merger --- differential-dataflow/examples/col-spines.rs | 6 +- .../trace/implementations/merge_batcher.rs | 308 +++++++++++++++++- .../src/trace/implementations/ord_neu.rs | 7 +- 3 files changed, 315 insertions(+), 6 deletions(-) diff --git a/differential-dataflow/examples/col-spines.rs b/differential-dataflow/examples/col-spines.rs index 178f74005..cb2434d94 100644 --- a/differential-dataflow/examples/col-spines.rs +++ b/differential-dataflow/examples/col-spines.rs @@ -35,13 +35,13 @@ fn main() { use timely::dataflow::channels::pact::Pipeline; use differential_dataflow::trace::implementations::ord_neu::{ - ColumnarColKeyBatcher, RcColumnarKeyBuilder, ColumnarKeySpine, + ColumnarInternalKeyBatcher, RcColumnarKeyBuilder, ColumnarKeySpine, }; - let data = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>( + let data = arrange_core::<_, _, ColumnarInternalKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>( data, Pipeline, "Data", ); - let keys = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>( + let keys = arrange_core::<_, _, ColumnarInternalKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>( keys, Pipeline, "Keys", ); diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 2d10fef01..2a567a0c7 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -604,8 +604,191 @@ pub mod container { } } - pub use col_container::ColumnarMerger; - /// Implementations of `ContainerQueue` and `MergerChunk` for `ColContainer` (columnar). + /// A container that can merge two sorted, consolidated instances using internal iteration. + /// + /// Unlike `ContainerQueue` + `MergerChunk` which use external iteration (pulling items + /// one at a time), this trait lets the container borrow both inputs once and merge + /// efficiently using index arithmetic, galloping, and bulk copies. + pub trait InternalMerge: MergerChunk { + /// The number of items in this container. + fn len(&self) -> usize; + + /// The target number of items per chunk. + fn target_size() -> usize; + + /// Merge items from sorted inputs into `self`, advancing positions. + /// Decrements `fuel` by the number of items written. + /// + /// Dispatches based on the number of inputs: + /// - **0**: no-op + /// - **1**: bulk copy (may swap the input into `self`) + /// - **2**: merge two sorted streams + /// + /// Inputs are mutable to allow optimizations like swapping an entire + /// input chunk into the output. + fn merge_from( + &mut self, + others: &mut [Self], + positions: &mut [usize], + fuel: &mut usize, + ); + } + + /// A merger that uses internal iteration via [`InternalMerge`]. + pub struct InternalMerger { + _marker: PhantomData, + } + + impl Default for InternalMerger { + fn default() -> Self { Self { _marker: PhantomData } } + } + + impl InternalMerger where MC: InternalMerge { + #[inline] + fn empty(&self, stash: &mut Vec) -> MC { + stash.pop().unwrap_or_else(|| { + let mut container = MC::default(); + container.ensure_capacity(&mut None); + container + }) + } + #[inline] + fn recycle(&self, mut chunk: MC, stash: &mut Vec) { + chunk.clear(); + stash.push(chunk); + } + /// Drain remaining items from one side into `result`/`output`. + fn drain_side( + &self, + head: &mut MC, + pos: &mut usize, + list: &mut std::vec::IntoIter, + result: &mut MC, + output: &mut Vec, + stash: &mut Vec, + ) { + while *pos < head.len() { + let mut fuel = MC::target_size().saturating_sub(result.len()); + if fuel == 0 { fuel = 1; } // always make progress + result.merge_from( + std::slice::from_mut(head), + std::slice::from_mut(pos), + &mut fuel, + ); + if *pos >= head.len() { + let old = std::mem::replace(head, list.next().unwrap_or_default()); + self.recycle(old, stash); + *pos = 0; + } + if result.at_capacity() { + output.push(std::mem::take(result)); + *result = self.empty(stash); + } + } + } + } + + impl Merger for InternalMerger + where + for<'a> MC: InternalMerge + + Clone + PushInto<::Item<'a>> + 'static, + { + type Time = MC::TimeOwned; + type Chunk = MC; + + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); + + let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()]; + let mut positions = [0usize, 0usize]; + + let mut result = self.empty(stash); + + // Main merge loop: both sides have data. + while positions[0] < heads[0].len() && positions[1] < heads[1].len() { + let mut fuel = MC::target_size().saturating_sub(result.len()); + if fuel == 0 { fuel = 1; } // always make progress + result.merge_from(&mut heads, &mut positions, &mut fuel); + + if positions[0] >= heads[0].len() { + let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default()); + self.recycle(old, stash); + positions[0] = 0; + } + if positions[1] >= heads[1].len() { + let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default()); + self.recycle(old, stash); + positions[1] = 0; + } + if result.at_capacity() { + output.push(std::mem::take(&mut result)); + result = self.empty(stash); + } + } + + // Drain remaining from side 0. + self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash); + if !result.is_empty() { + output.push(std::mem::take(&mut result)); + result = self.empty(stash); + } + output.extend(list1); + + // Drain remaining from side 1. + self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash); + if !result.is_empty() { + output.push(std::mem::take(&mut result)); + } + output.extend(list2); + } + + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + readied: &mut Vec, + kept: &mut Vec, + stash: &mut Vec, + ) { + // Reuse the drain-based approach for extract. + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for mut buffer in merged { + for item in buffer.drain() { + if MC::time_kept(&item, &upper, frontier) { + if keep.at_capacity() && !keep.is_empty() { + kept.push(keep); + keep = self.empty(stash); + } + keep.push_into(item); + } else { + if ready.at_capacity() && !ready.is_empty() { + readied.push(ready); + ready = self.empty(stash); + } + ready.push_into(item); + } + } + self.recycle(buffer, stash); + } + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + readied.push(ready); + } + } + + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { + chunk.account() + } + } + + pub use col_container::{ColumnarMerger, ColumnarInternalMerger}; + /// Implementations of `ContainerQueue`, `MergerChunk`, and `InternalMerge` for `ColContainer` (columnar). pub mod col_container { use columnar::Columnar; @@ -711,5 +894,126 @@ pub mod container { ColContainer::clear(self); } } + + impl super::InternalMerge for ColContainer<(D, T, R)> + where + D: Columnar + 'static, + T: Columnar + Ord + timely::PartialOrder + Clone + 'static, + R: Columnar + Default + Semigroup + 'static, + for<'a> columnar::Ref<'a, D>: Ord, + for<'a> columnar::Ref<'a, T>: Ord, + for<'a> ColContainer<(D, T, R)>: PushInto<(columnar::Ref<'a, D>, columnar::Ref<'a, T>, columnar::Ref<'a, R>)>, + { + fn len(&self) -> usize { + ColContainer::len(self) + } + + fn target_size() -> usize { + let size = std::mem::size_of::<(D, T, R)>(); + let target_bytes = 64 << 10; + if size == 0 { target_bytes } else { target_bytes / size } + } + + fn merge_from( + &mut self, + others: &mut [Self], + positions: &mut [usize], + fuel: &mut usize, + ) { + match others.len() { + 0 => {}, + 1 => { + use columnar::{Borrow, Container, Len}; + let other = &mut others[0]; + let pos = &mut positions[0]; + // If self is empty and the entire input remains, just swap. + if self.len() == 0 && *pos == 0 && other.len() <= *fuel { + std::mem::swap(self, other); + *fuel -= self.len(); + return; + } + let borrowed = other.container.borrow(); + let len = borrowed.len(); + let count = std::cmp::min(len - *pos, *fuel); + if count > 0 { + self.container.extend_from_self(borrowed, *pos .. *pos + count); + *pos += count; + *fuel -= count; + } + }, + 2 => { + use columnar::{Borrow, Container, Index, Len, Push}; + use std::cmp::Ordering; + + let borrowed1 = others[0].container.borrow(); + let borrowed2 = others[1].container.borrow(); + let len1 = borrowed1.len(); + let len2 = borrowed2.len(); + + let mut diff_stash: R; + + while positions[0] < len1 && positions[1] < len2 && *fuel > 0 { + let (d1, t1, _r1) = borrowed1.get(positions[0]); + let (d2, t2, _r2) = borrowed2.get(positions[1]); + match (&d1, &t1).cmp(&(&d2, &t2)) { + Ordering::Less => { + // Scan for the end of the run from side 0. + let run_start = positions[0]; + positions[0] += 1; + while positions[0] < len1 && *fuel > (positions[0] - run_start) { + let (d1, t1, _) = borrowed1.get(positions[0]); + let (d2, t2, _) = borrowed2.get(positions[1]); + if (&d1, &t1) < (&d2, &t2) { + positions[0] += 1; + } else { + break; + } + } + let count = positions[0] - run_start; + self.container.extend_from_self(borrowed1, run_start .. positions[0]); + *fuel -= count; + } + Ordering::Greater => { + // Scan for the end of the run from side 1. + let run_start = positions[1]; + positions[1] += 1; + while positions[1] < len2 && *fuel > (positions[1] - run_start) { + let (d1, t1, _) = borrowed1.get(positions[0]); + let (d2, t2, _) = borrowed2.get(positions[1]); + if (&d2, &t2) < (&d1, &t1) { + positions[1] += 1; + } else { + break; + } + } + let count = positions[1] - run_start; + self.container.extend_from_self(borrowed2, run_start .. positions[1]); + *fuel -= count; + } + Ordering::Equal => { + let (_, _, r1) = borrowed1.get(positions[0]); + let (_, _, r2) = borrowed2.get(positions[1]); + diff_stash = R::into_owned(r1); + let r2_owned = R::into_owned(r2); + diff_stash.plus_equals(&r2_owned); + if !diff_stash.is_zero() { + self.container.0.push(d1); + self.container.1.push(t1); + self.container.2.push(&diff_stash as &R); + *fuel -= 1; + } + positions[0] += 1; + positions[1] += 1; + } + } + } + }, + n => unimplemented!("{n}-way merge not yet supported"), + } + } + } + + /// A `Merger` using internal iteration for `ColContainer`. + pub type ColumnarInternalMerger = super::InternalMerger>; } } diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 551d471a6..c4215e2c0 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -14,7 +14,7 @@ use crate::containers::{TimelyStack, ColContainer}; use crate::trace::implementations::chunker::{ColumnationChunker, ColumnarChunker, ColumnarColChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; -use crate::trace::implementations::merge_batcher::container::col_container::ColumnarMerger; +use crate::trace::implementations::merge_batcher::container::col_container::{ColumnarMerger, ColumnarInternalMerger}; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Layout, Vector, TStack}; @@ -69,6 +69,11 @@ pub type ColumnarColKeyBatcher = MergeBatcher = MergeBatcher, ColumnarColChunker<(K,V), T, R>, ColumnarMerger<(K,V), T, R>>; +/// A batcher using internal iteration for merging (key variant). +pub type ColumnarInternalKeyBatcher = MergeBatcher, ColumnarColChunker<(K,()), T, R>, ColumnarInternalMerger<(K,()), T, R>>; +/// A batcher using internal iteration for merging (key-value variant). +pub type ColumnarInternalValBatcher = MergeBatcher, ColumnarColChunker<(K,V), T, R>, ColumnarInternalMerger<(K,V), T, R>>; + /// A builder that accepts `ColContainer` chunks and converts them to `Vec` for an inner builder. pub struct ColToVecBuilder { inner: B, From e34449584ca3f55aad0c7883eef0207dc9492667 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Mar 2026 04:25:22 -0400 Subject: [PATCH 5/5] Clean-up and premature optimization --- .../trace/implementations/merge_batcher.rs | 40 ++++++++++++++++ .../src/trace/implementations/ord_neu.rs | 48 ------------------- 2 files changed, 40 insertions(+), 48 deletions(-) diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 2a567a0c7..cb5b5a7e5 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -945,6 +945,46 @@ pub mod container { use columnar::{Borrow, Container, Index, Len, Push}; use std::cmp::Ordering; + // Check if one side's remainder is entirely before the other's. + // If so, we can bulk-copy it, or swap it in when possible. + { + let b1 = others[0].container.borrow(); + let b2 = others[1].container.borrow(); + let len1 = b1.len(); + let len2 = b2.len(); + if positions[0] < len1 && positions[1] < len2 { + let (d1_last, t1_last, _) = b1.get(len1 - 1); + let (d2_first, t2_first, _) = b2.get(positions[1]); + if (&d1_last, &t1_last) < (&d2_first, &t2_first) { + // Side 0's remainder is entirely before side 1. + let count = std::cmp::min(len1 - positions[0], *fuel); + if self.len() == 0 && positions[0] == 0 && count == len1 { + // Take the whole container. + std::mem::swap(self, &mut others[0]); + } else { + self.container.extend_from_self(b1, positions[0] .. positions[0] + count); + } + positions[0] += count; + *fuel -= count; + return; + } + let (d1_first, t1_first, _) = b1.get(positions[0]); + let (d2_last, t2_last, _) = b2.get(len2 - 1); + if (&d2_last, &t2_last) < (&d1_first, &t1_first) { + // Side 1's remainder is entirely before side 0. + let count = std::cmp::min(len2 - positions[1], *fuel); + if self.len() == 0 && positions[1] == 0 && count == len2 { + std::mem::swap(self, &mut others[1]); + } else { + self.container.extend_from_self(b2, positions[1] .. positions[1] + count); + } + positions[1] += count; + *fuel -= count; + return; + } + } + } + let borrowed1 = others[0].container.borrow(); let borrowed2 = others[1].container.borrow(); let len1 = borrowed1.len(); diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index c4215e2c0..3987d575e 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -74,54 +74,6 @@ pub type ColumnarInternalKeyBatcher = MergeBatcher = MergeBatcher, ColumnarColChunker<(K,V), T, R>, ColumnarInternalMerger<(K,V), T, R>>; -/// A builder that accepts `ColContainer` chunks and converts them to `Vec` for an inner builder. -pub struct ColToVecBuilder { - inner: B, -} - -impl crate::trace::Builder for ColToVecBuilder -where - D: columnar::Columnar + Clone + 'static, - T: columnar::Columnar + Clone + timely::progress::Timestamp, - R: columnar::Columnar + Clone + 'static, - B: crate::trace::Builder, Time = T>, -{ - type Input = ColContainer<(D, T, R)>; - type Time = T; - type Output = B::Output; - - fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { - ColToVecBuilder { inner: B::with_capacity(keys, vals, upds) } - } - fn push(&mut self, chunk: &mut Self::Input) { - use columnar::{Borrow, Index, Len}; - let borrowed = chunk.container.borrow(); - let mut vec = Vec::with_capacity(borrowed.len()); - for i in 0..borrowed.len() { - let (d, t, r) = borrowed.get(i); - vec.push((D::into_owned(d), T::into_owned(t), R::into_owned(r))); - } - self.inner.push(&mut vec); - } - fn done(self, description: crate::trace::Description) -> Self::Output { - self.inner.done(description) - } - fn seal(chain: &mut Vec, description: crate::trace::Description) -> Self::Output { - use columnar::{Borrow, Index, Len}; - let mut vec_chain: Vec> = chain.drain(..).map(|col| { - let borrowed = col.container.borrow(); - (0..borrowed.len()).map(|i| { - let (d, t, r) = borrowed.get(i); - (D::into_owned(d), T::into_owned(t), R::into_owned(r)) - }).collect() - }).collect(); - B::seal(&mut vec_chain, description) - } -} - -/// Builder alias: wraps `RcOrdKeyBuilder` to accept `ColContainer` chunks. -pub type RcOrdKeyBuilderFromCol = ColToVecBuilder>; - /// A spine backed by columnar containers throughout. pub type ColumnarKeySpine = Spine>>>; /// A spine backed by columnar containers throughout (key-value variant).