Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions differential-dataflow/examples/col-spines.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! Arrangement experiments starting from columnar containers.
//!
//! This example builds `ColContainer` inputs directly using a
//! `ColContainerBuilder` and `InputHandle`, then feeds them through
//! the arrangement pipeline via `arrange_core` with `ColumnarKeyBatcher`.

use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::InputHandle;
use timely::dataflow::ProbeHandle;

use differential_dataflow::operators::arrange::arrangement::arrange_core;
use differential_dataflow::containers::ColContainerBuilder;

fn main() {

type Update = ((String, ()), u64, isize);
type Builder = ColContainerBuilder<Update>;

let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let size: usize = std::env::args().nth(2).unwrap().parse().unwrap();

let timer1 = ::std::time::Instant::now();
let timer2 = timer1.clone();

timely::execute_from_args(std::env::args(), move |worker| {

let mut data_input = <InputHandle<_, Builder>>::new_with_builder();
let mut keys_input = <InputHandle<_, Builder>>::new_with_builder();
let mut probe = ProbeHandle::new();

worker.dataflow::<u64, _, _>(|scope| {

let data = data_input.to_stream(scope);
let keys = keys_input.to_stream(scope);

use timely::dataflow::channels::pact::Pipeline;
use differential_dataflow::trace::implementations::ord_neu::{
ColumnarColKeyBatcher, RcColumnarKeyBuilder, ColumnarKeySpine,
};

let data = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>(
data, Pipeline, "Data",
);
let keys = arrange_core::<_, _, ColumnarColKeyBatcher<_,_,_>, RcColumnarKeyBuilder<_,_,_>, ColumnarKeySpine<_,_,_>>(
keys, Pipeline, "Keys",
);

keys.join_core(data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
});

let mut data_builder = Builder::default();
let mut keys_builder = Builder::default();

// Load up data in batches.
// Note: `format_args!` avoids allocating a String; the columnar
// `Strings` container accepts `fmt::Arguments` directly and writes
// the formatted output straight into its byte buffer.
let mut counter = 0;
while counter < 10 * keys {
let mut i = worker.index();
let time = *data_input.time();
while i < size {
let val = (counter + i) % keys;
data_builder.push_into(((format_args!("{:?}", val), ()), time, 1isize));
i += worker.peers();
}
while let Some(container) = data_builder.finish() {
data_input.send_batch(container);
}
counter += size;
data_input.advance_to(data_input.time() + 1);
keys_input.advance_to(keys_input.time() + 1);
while probe.less_than(data_input.time()) {
worker.step_or_park(None);
}
}
println!("{:?}\tloading complete", timer1.elapsed());

let mut queries = 0;
while queries < 10 * keys {
let mut i = worker.index();
let time = *data_input.time();
while i < size {
let val = (queries + i) % keys;
keys_builder.push_into(((format_args!("{:?}", val), ()), time, 1isize));
i += worker.peers();
}
while let Some(container) = keys_builder.finish() {
keys_input.send_batch(container);
}
queries += size;
data_input.advance_to(data_input.time() + 1);
keys_input.advance_to(keys_input.time() + 1);
while probe.less_than(data_input.time()) {
worker.step_or_park(None);
}
}

println!("{:?}\tqueries complete", timer1.elapsed());

}).unwrap();

println!("{:?}\tshut down", timer2.elapsed());

}
122 changes: 122 additions & 0 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,21 @@ pub mod vec {
}
}

pub use col::Collection as ColCollection;
/// Specializations of `Collection` that use columnar containers.
pub mod col {

use timely::dataflow::ScopeParent;

use crate::containers::ColContainer;

/// An evolving collection of `(D, T, R)` updates backed by columnar storage.
///
/// This type is analogous to [`super::vec::Collection`] but stores data in a
/// columnar layout via `<(D, G::Timestamp, R) as Columnar>::Container`.
pub type Collection<G, D, R = isize> = super::Collection<G, ColContainer<(D, <G as ScopeParent>::Timestamp, R)>>;
}

/// Conversion to a differential dataflow Collection.
pub trait AsCollection<G: Scope, C> {
/// Converts the type to a differential dataflow collection.
Expand Down Expand Up @@ -1358,6 +1373,113 @@ pub mod containers {
}
}

/// Implementations of container traits for the `ColContainer` container.
mod col {

use columnar::{Columnar, Borrow, Len, Index, Push};
use timely::progress::{Timestamp, timestamp::Refines};
use crate::collection::Abelian;
use crate::containers::ColContainer;

use super::{Negate, Enter, Leave, ResultsIn};

// For `(D, T, R): Columnar`, the container is `(D::Container, T::Container, R::Container)`.
// From `Columnar`, this container is `Borrow + Clear + Len + Clone + Default`.
// `Borrow::Borrowed<'a>` implements `Index<Ref = Borrow::Ref<'a>>` and `Len`.
// `Borrow::Ref<'a>` is `Copy` and equals `(Ref<'a, D>, Ref<'a, T>, Ref<'a, R>)`.
// The container also implements `Push<Ref<'a>>` and `Push<&'a (D, T, R)>`.

impl<D, T, R> Negate for ColContainer<(D, T, R)>
where
D: Columnar,
T: Columnar,
R: Columnar + Abelian,
{
fn negate(self) -> Self {
let mut result = ColContainer::<(D, T, R)>::default();
let borrowed = self.container.borrow();
for i in 0..borrowed.len() {
let (d, t, r) = borrowed.get(i);
let mut r_owned = R::into_owned(r);
r_owned.negate();
result.container.0.push(d);
result.container.1.push(t);
result.container.2.push(&r_owned);
}
result
}
}

impl<D, T1, T2, R> Enter<T1, T2> for ColContainer<(D, T1, R)>
where
D: Columnar,
T1: Columnar + Timestamp,
T2: Columnar + Refines<T1>,
R: Columnar,
T2::Container: Push<T2>,
{
type InnerContainer = ColContainer<(D, T2, R)>;
fn enter(self) -> Self::InnerContainer {
let mut result = ColContainer::<(D, T2, R)>::default();
let borrowed = self.container.borrow();
for i in 0..borrowed.len() {
let (d, t, r) = borrowed.get(i);
let t_inner = T2::to_inner(T1::into_owned(t));
result.container.0.push(d);
result.container.1.push(t_inner);
result.container.2.push(r);
}
result
}
}

impl<D, T1, T2, R> Leave<T1, T2> for ColContainer<(D, T1, R)>
where
D: Columnar,
T1: Columnar + Refines<T2>,
T2: Columnar + Timestamp,
R: Columnar,
T2::Container: Push<T2>,
{
type OuterContainer = ColContainer<(D, T2, R)>;
fn leave(self) -> Self::OuterContainer {
let mut result = ColContainer::<(D, T2, R)>::default();
let borrowed = self.container.borrow();
for i in 0..borrowed.len() {
let (d, t, r) = borrowed.get(i);
let t_outer = T1::into_owned(t).to_outer();
result.container.0.push(d);
result.container.1.push(t_outer);
result.container.2.push(r);
}
result
}
}

impl<D, T, R> ResultsIn<T::Summary> for ColContainer<(D, T, R)>
where
D: Columnar,
T: Columnar + Timestamp,
R: Columnar,
T::Container: Push<T>,
{
fn results_in(self, step: &T::Summary) -> Self {
use timely::progress::PathSummary;
let mut result = ColContainer::<(D, T, R)>::default();
let borrowed = self.container.borrow();
for i in 0..borrowed.len() {
let (d, t, r) = borrowed.get(i);
if let Some(t2) = step.results_in(&T::into_owned(t)) {
result.container.0.push(d);
result.container.1.push(t2);
result.container.2.push(r);
}
}
result
}
}
}

/// Implementations of container traits for the `Rc` container.
mod rc {
use std::rc::Rc;
Expand Down
Loading
Loading