From a8662ce6881e1ddd2ab9bc4e09034b838082854c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 31 May 2024 09:08:54 -0400 Subject: [PATCH 1/2] Add PushInto constraint --- .../implementations/huffman_container.rs | 32 +++++++++++++++++++ src/trace/implementations/mod.rs | 2 +- src/trace/implementations/rhh.rs | 2 +- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index a58d6ba2c..7780d3227 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -49,6 +49,38 @@ impl PushInto> for HuffmanContainer { } } +impl<'a, B: Ord + Clone + 'static> PushInto> for HuffmanContainer { + fn push_into(&mut self, item: Wrapped<'a, B>) { + match item.decode() { + Ok(decoded) => { + for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; } + + }, + Err(symbols) => { + for x in symbols.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } + } + } + match (item.decode(), &mut self.inner) { + (Ok(decoded), Ok((huffman, bytes))) => { + bytes.extend(huffman.encode(decoded)); + self.offsets.push(bytes.len()); + } + (Ok(decoded), Err(raw)) => { + raw.extend(decoded.cloned()); + self.offsets.push(raw.len()); + } + (Err(symbols), Ok((huffman, bytes))) => { + bytes.extend(huffman.encode(symbols.iter())); + self.offsets.push(bytes.len()); + } + (Err(symbols), Err(raw)) => { + raw.extend(symbols.iter().cloned()); + self.offsets.push(raw.len()); + } + } + } +} + impl BatchContainer for HuffmanContainer { type Owned = Vec; type ReadItem<'a> = Wrapped<'a, B>; diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index feed2ff81..bb8dfc2b6 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -510,7 +510,7 @@ pub mod containers { use crate::trace::IntoOwned; /// A general-purpose container resembling `Vec`. - pub trait BatchContainer: 'static { + pub trait BatchContainer: for<'a> PushInto> + 'static { /// An owned instance of `Self::ReadItem<'_>`. type Owned; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index a2fb6241a..4069bacaa 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -191,7 +191,7 @@ mod val_batch { while self.keys.len() < desired { // We insert a default (dummy) key and repeat the offset to indicate this. let current_offset = self.keys_offs.index(self.keys.len()); - self.keys.push(Default::default()); + self.keys.push(<::Key as Default>::default()); self.keys_offs.copy(current_offset); } From b4668eb5bab8e4124e12177b22d6d25b23a67d9c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 31 May 2024 09:16:43 -0400 Subject: [PATCH 2/2] Remove BatchContainer::{copy, copy_range} --- .../implementations/huffman_container.rs | 34 ----------- src/trace/implementations/mod.rs | 56 ++--------------- src/trace/implementations/ord_neu.rs | 60 +++++++++---------- src/trace/implementations/rhh.rs | 44 +++++++------- 4 files changed, 57 insertions(+), 137 deletions(-) diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index 7780d3227..288ba8d4c 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -87,40 +87,6 @@ impl BatchContainer for HuffmanContainer { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - fn copy(&mut self, item: Self::ReadItem<'_>) { - match item.decode() { - Ok(decoded) => { - for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; } - - }, - Err(symbols) => { - for x in symbols.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } - } - } - match (item.decode(), &mut self.inner) { - (Ok(decoded), Ok((huffman, bytes))) => { - bytes.extend(huffman.encode(decoded)); - self.offsets.push(bytes.len()); - } - (Ok(decoded), Err(raw)) => { - raw.extend(decoded.cloned()); - self.offsets.push(raw.len()); - } - (Err(symbols), Ok((huffman, bytes))) => { - bytes.extend(huffman.encode(symbols.iter())); - self.offsets.push(bytes.len()); - } - (Err(symbols), Err(raw)) => { - raw.extend(symbols.iter().cloned()); - self.offsets.push(raw.len()); - } - } - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } fn with_capacity(size: usize) -> Self { let mut offsets = OffsetList::with_capacity(size + 1); offsets.push(0); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index bb8dfc2b6..e8d068387 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -300,16 +300,6 @@ impl BatchContainer for OffsetList { type Owned = usize; type ReadItem<'a> = usize; - fn copy(&mut self, item: Self::ReadItem<'_>) { - self.push(item); - } - - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for offset in start..end { - self.push(other.index(offset)); - } - } - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn with_capacity(size: usize) -> Self { @@ -521,14 +511,6 @@ pub mod containers { fn push(&mut self, item: D) where Self: PushInto { self.push_into(item); } - /// Inserts a borrowed item. - fn copy(&mut self, item: Self::ReadItem<'_>); - /// Extends from a range of items in another`Self`. - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } /// Creates a new container with sufficient capacity. fn with_capacity(size: usize) -> Self; /// Creates a new container with sufficient capacity. @@ -606,12 +588,6 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - fn copy(&mut self, item: &T) { - self.push(item.clone()); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - self.extend_from_slice(&other[start .. end]); - } fn with_capacity(size: usize) -> Self { Vec::with_capacity(size) } @@ -634,16 +610,6 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - fn copy(&mut self, item: &T) { - self.copy(item); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - let slice = &other[start .. end]; - self.reserve_items(slice.iter()); - for item in slice.iter() { - self.copy(item); - } - } fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -672,10 +638,6 @@ pub mod containers { type Owned = R::Owned; type ReadItem<'a> = R::ReadItem<'a>; - fn copy(&mut self, item: Self::ReadItem<'_>) { - self.copy(item); - } - fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -711,13 +673,16 @@ pub mod containers { impl PushInto<&[B]> for SliceContainer { fn push_into(&mut self, item: &[B]) { - self.copy(item); + for x in item.iter() { + self.inner.push_into(x); + } + self.offsets.push(self.inner.len()); } } impl PushInto<&Vec> for SliceContainer { fn push_into(&mut self, item: &Vec) { - self.copy(item); + self.push_into(&item[..]); } } @@ -739,17 +704,6 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - fn copy(&mut self, item: Self::ReadItem<'_>) { - for x in item.iter() { - self.inner.copy(x); - } - self.offsets.push(self.inner.len()); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } fn with_capacity(size: usize) -> Self { let mut offsets = Vec::with_capacity(size + 1); offsets.push(0); diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 7009c57f3..e475a0b3d 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -236,9 +236,9 @@ mod val_batch { // Mark explicit types because type inference fails to resolve it. let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.copy(0); + keys_offs.push(0); let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.copy(0); + vals_offs.push(0); OrdValMerger { key_cursor1: 0, @@ -302,16 +302,16 @@ mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source.vals.index(lower)); + self.result.vals_offs.push(off); + self.result.vals.push(source.vals.index(lower)); } lower += 1; } // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - self.result.keys.copy(source.keys.index(cursor)); - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys.push(source.keys.index(cursor)); + self.result.keys_offs.push(self.result.vals.len()); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -330,8 +330,8 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - self.result.keys.copy(source1.keys.index(self.key_cursor1)); - self.result.keys_offs.copy(off); + self.result.keys.push(source1.keys.index(self.key_cursor1)); + self.result.keys_offs.push(off); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -364,8 +364,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; }, @@ -373,8 +373,8 @@ mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; lower2 += 1; @@ -383,8 +383,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; }, @@ -394,16 +394,16 @@ mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; } while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; } @@ -616,16 +616,16 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); self.result.keys.push(key); @@ -636,10 +636,10 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { // Record the final offsets - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); OrdValBatch { updates: self.result.times.len() + self.singletons, storage: self.result, @@ -795,7 +795,7 @@ mod key_batch { }; let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.copy(0); + keys_offs.push(0); OrdKeyMerger { key_cursor1: 0, @@ -855,8 +855,8 @@ mod key_batch { fn copy_key(&mut self, source: &OrdKeyStorage, cursor: usize) { self.stash_updates_for_key(source, cursor); if let Some(off) = self.consolidate_updates() { - self.result.keys_offs.copy(off); - self.result.keys.copy(source.keys.index(cursor)); + self.result.keys_offs.push(off); + self.result.keys.push(source.keys.index(cursor)); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -875,8 +875,8 @@ mod key_batch { self.stash_updates_for_key(source1, self.key_cursor1); self.stash_updates_for_key(source2, self.key_cursor2); if let Some(off) = self.consolidate_updates() { - self.result.keys_offs.copy(off); - self.result.keys.copy(source1.keys.index(self.key_cursor1)); + self.result.keys_offs.push(off); + self.result.keys.push(source1.keys.index(self.key_cursor1)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -1078,7 +1078,7 @@ mod key_batch { self.push_update(time, diff); } else { // New key; complete representation of prior key. - self.result.keys_offs.copy(self.result.times.len()); + self.result.keys_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); @@ -1090,7 +1090,7 @@ mod key_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { // Record the final offsets - self.result.keys_offs.copy(self.result.times.len()); + self.result.keys_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } OrdKeyBatch { diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 4069bacaa..1d87d55d2 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -192,14 +192,14 @@ mod val_batch { // We insert a default (dummy) key and repeat the offset to indicate this. let current_offset = self.keys_offs.index(self.keys.len()); self.keys.push(<::Key as Default>::default()); - self.keys_offs.copy(current_offset); + self.keys_offs.push(current_offset); } // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.copy(key); + self.keys.push(key); if let Some(offset) = offset { - self.keys_offs.copy(offset); + self.keys_offs.push(offset); } self.key_count += 1; } @@ -368,9 +368,9 @@ mod val_batch { // Mark explicit types because type inference fails to resolve it. let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.copy(0); + keys_offs.push(0); let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.copy(0); + vals_offs.push(0); RhhValMerger { key_cursor1: 0, @@ -445,8 +445,8 @@ mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source.vals.index(lower)); + self.result.vals_offs.push(off); + self.result.vals.push(source.vals.index(lower)); } lower += 1; } @@ -506,8 +506,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; }, @@ -515,8 +515,8 @@ mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; lower2 += 1; @@ -525,8 +525,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; }, @@ -536,16 +536,16 @@ mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; } while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; } @@ -807,16 +807,16 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); // Insert the key, but with no specified offset. @@ -828,10 +828,10 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> RhhValBatch { // Record the final offsets - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); RhhValBatch { updates: self.result.times.len() + self.singletons, storage: self.result,