Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions src/trace/implementations/huffman_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@ impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
}
}

impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
type Owned = Vec<B>;
type ReadItem<'a> = Wrapped<'a, B>;

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: Self::ReadItem<'_>) {
impl<'a, B: Ord + Clone + 'static> PushInto<Wrapped<'a, B>> for HuffmanContainer<B> {
fn push_into(&mut self, item: Wrapped<'a, B>) {
match item.decode() {
Ok(decoded) => {
for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; }
Expand All @@ -74,21 +69,24 @@ impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
raw.extend(decoded.cloned());
self.offsets.push(raw.len());
}
(Err(symbols), Ok((huffman, bytes))) => {
(Err(symbols), Ok((huffman, bytes))) => {
bytes.extend(huffman.encode(symbols.iter()));
self.offsets.push(bytes.len());
}
(Err(symbols), Err(raw)) => {
(Err(symbols), Err(raw)) => {
raw.extend(symbols.iter().cloned());
self.offsets.push(raw.len());
}
}
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for index in start .. end {
self.copy(other.index(index));
}
}
}

impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
type Owned = Vec<B>;
type ReadItem<'a> = Wrapped<'a, B>;

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn with_capacity(size: usize) -> Self {
let mut offsets = OffsetList::with_capacity(size + 1);
offsets.push(0);
Expand Down
58 changes: 6 additions & 52 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,6 @@ impl BatchContainer for OffsetList {
type Owned = usize;
type ReadItem<'a> = usize;

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.push(item);
}

fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for offset in start..end {
self.push(other.index(offset));
}
}

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn with_capacity(size: usize) -> Self {
Expand Down Expand Up @@ -510,7 +500,7 @@ pub mod containers {
use crate::trace::IntoOwned;

/// A general-purpose container resembling `Vec<T>`.
pub trait BatchContainer: 'static {
pub trait BatchContainer: for<'a> PushInto<Self::ReadItem<'a>> + 'static {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in person: We could opt out of requiring BatchContainer to be PushInto<...> which would permit implementing containers that cannot be pushed at. For the moment, the change as implemented is probably the right thing to do, but in the future we could split the trait into ReadBatchContainer and BatchContainer where the former does not expose any functionality to modify the contents of the container. There's a similar idea in flatcontainer: antiguru/flatcontainer#27

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I think this is a non-regression in the sense that copy prevents a read-only container also, and the main change is that in this PR that logic must be done through PushInto, and .. idk perhaps it's hard to use it for the implementation. But .. as long as it works for our current containers, great and it seems easy to roll back at any point.

/// An owned instance of `Self::ReadItem<'_>`.
type Owned;

Expand All @@ -521,14 +511,6 @@ pub mod containers {
fn push<D>(&mut self, item: D) where Self: PushInto<D> {
self.push_into(item);
}
/// Inserts a borrowed item.
fn copy(&mut self, item: Self::ReadItem<'_>);
/// Extends from a range of items in another`Self`.
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for index in start .. end {
self.copy(other.index(index));
}
}
/// Creates a new container with sufficient capacity.
fn with_capacity(size: usize) -> Self;
/// Creates a new container with sufficient capacity.
Expand Down Expand Up @@ -606,12 +588,6 @@ pub mod containers {

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: &T) {
self.push(item.clone());
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
self.extend_from_slice(&other[start .. end]);
}
fn with_capacity(size: usize) -> Self {
Vec::with_capacity(size)
}
Expand All @@ -634,16 +610,6 @@ pub mod containers {

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: &T) {
self.copy(item);
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
let slice = &other[start .. end];
self.reserve_items(slice.iter());
for item in slice.iter() {
self.copy(item);
}
}
fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}
Expand Down Expand Up @@ -672,10 +638,6 @@ pub mod containers {
type Owned = R::Owned;
type ReadItem<'a> = R::ReadItem<'a>;

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.copy(item);
}

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}
Expand Down Expand Up @@ -711,13 +673,16 @@ pub mod containers {

impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
fn push_into(&mut self, item: &[B]) {
self.copy(item);
for x in item.iter() {
self.inner.push_into(x);
}
self.offsets.push(self.inner.len());
}
}

impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: &Vec<B>) {
self.copy(item);
self.push_into(&item[..]);
}
}

Expand All @@ -739,17 +704,6 @@ pub mod containers {

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: Self::ReadItem<'_>) {
for x in item.iter() {
self.inner.copy(x);
}
self.offsets.push(self.inner.len());
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for index in start .. end {
self.copy(other.index(index));
}
}
fn with_capacity(size: usize) -> Self {
let mut offsets = Vec::with_capacity(size + 1);
offsets.push(0);
Expand Down
60 changes: 30 additions & 30 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ mod val_batch {

// Mark explicit types because type inference fails to resolve it.
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.copy(0);
keys_offs.push(0);
let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
vals_offs.copy(0);
vals_offs.push(0);

OrdValMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -302,16 +302,16 @@ mod val_batch {
while lower < upper {
self.stash_updates_for_val(source, lower);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source.vals.index(lower));
self.result.vals_offs.push(off);
self.result.vals.push(source.vals.index(lower));
}
lower += 1;
}

// If we have pushed any values, copy the key as well.
if self.result.vals.len() > init_vals {
self.result.keys.copy(source.keys.index(cursor));
self.result.keys_offs.copy(self.result.vals.len());
self.result.keys.push(source.keys.index(cursor));
self.result.keys_offs.push(self.result.vals.len());
}
}
/// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
Expand All @@ -330,8 +330,8 @@ mod val_batch {
let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
self.result.keys.copy(source1.keys.index(self.key_cursor1));
self.result.keys_offs.copy(off);
self.result.keys.push(source1.keys.index(self.key_cursor1));
self.result.keys_offs.push(off);
}
// Increment cursors in either case; the keys are merged.
self.key_cursor1 += 1;
Expand Down Expand Up @@ -364,17 +364,17 @@ mod val_batch {
// Extend stash by updates, with logical compaction applied.
self.stash_updates_for_val(source1, lower1);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source1.vals.index(lower1));
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
},
Ordering::Equal => {
self.stash_updates_for_val(source1, lower1);
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source1.vals.index(lower1));
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
lower2 += 1;
Expand All @@ -383,8 +383,8 @@ mod val_batch {
// Extend stash by updates, with logical compaction applied.
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source2.vals.index(lower2));
self.result.vals_offs.push(off);
self.result.vals.push(source2.vals.index(lower2));
}
lower2 += 1;
},
Expand All @@ -394,16 +394,16 @@ mod val_batch {
while lower1 < upper1 {
self.stash_updates_for_val(source1, lower1);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source1.vals.index(lower1));
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
}
while lower2 < upper2 {
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source2.vals.index(lower2));
self.result.vals_offs.push(off);
self.result.vals.push(source2.vals.index(lower2));
}
lower2 += 1;
}
Expand Down Expand Up @@ -616,16 +616,16 @@ mod val_batch {
self.push_update(time, diff);
} else {
// New value; complete representation of prior value.
self.result.vals_offs.copy(self.result.times.len());
self.result.vals_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
self.result.vals.push(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.copy(self.result.times.len());
self.result.vals_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.copy(self.result.vals.len());
self.result.keys_offs.push(self.result.vals.len());
self.push_update(time, diff);
self.result.vals.push(val);
self.result.keys.push(key);
Expand All @@ -636,10 +636,10 @@ mod val_batch {
#[inline(never)]
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdValBatch<L> {
// Record the final offsets
self.result.vals_offs.copy(self.result.times.len());
self.result.vals_offs.push(self.result.times.len());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.copy(self.result.vals.len());
self.result.keys_offs.push(self.result.vals.len());
OrdValBatch {
updates: self.result.times.len() + self.singletons,
storage: self.result,
Expand Down Expand Up @@ -795,7 +795,7 @@ mod key_batch {
};

let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.copy(0);
keys_offs.push(0);

OrdKeyMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -855,8 +855,8 @@ mod key_batch {
fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
self.stash_updates_for_key(source, cursor);
if let Some(off) = self.consolidate_updates() {
self.result.keys_offs.copy(off);
self.result.keys.copy(source.keys.index(cursor));
self.result.keys_offs.push(off);
self.result.keys.push(source.keys.index(cursor));
}
}
/// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
Expand All @@ -875,8 +875,8 @@ mod key_batch {
self.stash_updates_for_key(source1, self.key_cursor1);
self.stash_updates_for_key(source2, self.key_cursor2);
if let Some(off) = self.consolidate_updates() {
self.result.keys_offs.copy(off);
self.result.keys.copy(source1.keys.index(self.key_cursor1));
self.result.keys_offs.push(off);
self.result.keys.push(source1.keys.index(self.key_cursor1));
}
// Increment cursors in either case; the keys are merged.
self.key_cursor1 += 1;
Expand Down Expand Up @@ -1078,7 +1078,7 @@ mod key_batch {
self.push_update(time, diff);
} else {
// New key; complete representation of prior key.
self.result.keys_offs.copy(self.result.times.len());
self.result.keys_offs.push(self.result.times.len());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
Expand All @@ -1090,7 +1090,7 @@ mod key_batch {
#[inline(never)]
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdKeyBatch<L> {
// Record the final offsets
self.result.keys_offs.copy(self.result.times.len());
self.result.keys_offs.push(self.result.times.len());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
OrdKeyBatch {
Expand Down
Loading