Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = [
# "advent_of_code_2017",
"dogsdogsdogs",
"experiments",
"interactive",
# "interactive", # TODO: Value enum is recursive, needs manual Columnar impl
"server",
"server/dataflows/degr_dist",
"server/dataflows/neighborhood",
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/examples/interpreted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn main() {

fn interpret<G>(edges: VecCollection<G, Edge>, relations: &[(usize, usize)]) -> VecCollection<G, Vec<Node>>
where
G: Scope<Timestamp: Lattice+Hash+Ord>,
G: Scope<Timestamp: Lattice+Hash+Ord+differential_dataflow::Data>,
{

// arrange the edge relation three ways.
Expand All @@ -65,15 +65,15 @@ where
// Both variables are bound, so this is a semijoin.
results
.map(move |vec| ((vec[src], vec[dst]), vec))
.join_core(as_self.clone(), |_key, vec, &()| Some(vec.clone()))
.join_core(as_self.clone(), |_key, vec, _| Some(columnar::Columnar::into_owned(vec)))
}
(true, false) => {
// Only `src` is bound, so we must use `forward` to propose `dst`.
field_present.insert(dst);
results
.map(move |vec| (vec[src], vec))
.join_core(forward.clone(), move |_src_val, vec, &dst_val| {
let mut temp = vec.clone();
let mut temp: Vec<Node> = columnar::Columnar::into_owned(vec);
while temp.len() <= dst { temp.push(0); }
temp[dst] = dst_val;
Some(temp)
Expand All @@ -85,7 +85,7 @@ where
results
.map(move |vec| (vec[dst], vec))
.join_core(reverse.clone(), move |_dst_val, vec, &src_val| {
let mut temp = vec.clone();
let mut temp: Vec<Node> = columnar::Columnar::into_owned(vec);
while temp.len() <= src { temp.push(0); }
temp[src] = src_val;
Some(temp)
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::lattice::Lattice;
/// Returns pairs (node, dist) indicating distance of each node from a root.
pub fn bfs<G, N>(edges: VecCollection<G, (N,N)>, roots: VecCollection<G, N>) -> VecCollection<G, (N,u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+crate::Data>,
N: ExchangeData+Hash,
{
let edges = edges.arrange_by_key();
Expand All @@ -26,7 +26,7 @@ pub fn bfs_arranged<G, N, Tr>(edges: Arranged<G, Tr>, roots: VecCollection<G, N>
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand All @@ -37,8 +37,8 @@ where
let edges = edges.enter(&scope);
let nodes = nodes.enter(&scope);

inner.join_core(edges, |_k,l,d| Some((d.clone(), l+1)))
inner.join_core(edges, |_k,l,d| Some((<N as columnar::Columnar>::into_owned(d), l+1)))
.concat(nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
.reduce(|_, s, t| t.push((columnar::Columnar::into_owned(s[0].0), 1)))
})
}
22 changes: 14 additions & 8 deletions differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::operators::iterate::Variable;
/// could be good insurance here.
pub fn bidijkstra<G, N>(edges: VecCollection<G, (N,N)>, goals: VecCollection<G, (N,N)>) -> VecCollection<G, ((N,N), u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+crate::Data>,
N: ExchangeData+Hash,
{
let forward = edges.clone().arrange_by_key();
Expand All @@ -41,7 +41,7 @@ pub fn bidijkstra_arranged<G, N, Tr>(
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down Expand Up @@ -72,8 +72,8 @@ where
let reached =
forward
.clone()
.join_map(reverse.clone(), |_, (src,d1), (dst,d2)| ((src.clone(), dst.clone()), *d1 + *d2))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.join_map(reverse.clone(), |_, &(ref src, d1), &(ref dst, d2)| ((src.clone(), dst.clone()), d1 + d2))
.reduce(|_key, s, t| t.push((<u32 as columnar::Columnar>::into_owned(s[0].0), 1)))
.semijoin(goals.clone());

let active =
Expand All @@ -92,10 +92,13 @@ where
.map(|(med, (src, dist))| (src, (med, dist)))
.semijoin(forward_active)
.map(|(src, (med, dist))| (med, (src, dist)))
.join_core(forward_edges, |_med, (src, dist), next| Some((next.clone(), (src.clone(), *dist+1))))
.join_core(forward_edges, |_med, srcdist, next| {
let (src, dist): (N, u32) = columnar::Columnar::into_owned(srcdist);
Some((<N as columnar::Columnar>::into_owned(next), (src, dist+1)))
})
.concat(forward)
.map(|(next, (src, dist))| ((next, src), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.reduce(|_key, s: &[(columnar::Ref<'_, u32>, isize)], t: &mut Vec<(u32, isize)>| t.push((<u32 as columnar::Columnar>::into_owned(s[0].0), 1)))
.map(|((next, src), dist)| (next, (src, dist)));

forward_next.clone().map(|_| ()).consolidate().inspect(|x| println!("forward_next: {:?}", x));
Expand All @@ -110,10 +113,13 @@ where
.map(|(med, (rev, dist))| (rev, (med, dist)))
.semijoin(reverse_active)
.map(|(rev, (med, dist))| (med, (rev, dist)))
.join_core(reverse_edges, |_med, (rev, dist), next| Some((next.clone(), (rev.clone(), *dist+1))))
.join_core(reverse_edges, |_med, revdist, next| {
let (rev, dist): (N, u32) = columnar::Columnar::into_owned(revdist);
Some((<N as columnar::Columnar>::into_owned(next), (rev, dist+1)))
})
.concat(reverse)
.map(|(next, (rev, dist))| ((next, rev), dist))
.reduce(|_key, s, t| t.push((*s[0].0, 1)))
.reduce(|_key, s: &[(columnar::Ref<'_, u32>, isize)], t: &mut Vec<(u32, isize)>| t.push((<u32 as columnar::Columnar>::into_owned(s[0].0), 1)))
.map(|((next,rev), dist)| (next, (rev, dist)));

reverse_next.clone().map(|_| ()).consolidate().inspect(|x| println!("reverse_next: {:?}", x));
Expand Down
12 changes: 6 additions & 6 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::difference::{Abelian, Multiply};
/// method to limit the introduction of labels.
pub fn propagate<G, N, L, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+crate::Data>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -32,7 +32,7 @@ where
/// method to limit the introduction of labels.
pub fn propagate_at<G, N, L, F, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+crate::Data>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -59,7 +59,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Time:Hash, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down Expand Up @@ -92,17 +92,17 @@ where
let labels =
proposals
.concat(nodes)
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((<L as columnar::Columnar>::into_owned(s[0].0), R::from(1_i8))));

let propagate: VecCollection<_, (N, L), R> =
labels
.clone()
.join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
.join_core(edges, |_k, l, d| Some((<N as columnar::Columnar>::into_owned(d), <L as columnar::Columnar>::into_owned(l))));

proposals_bind.set(propagate);

labels
.as_collection(|k,v| (k.clone(), v.clone()))
.as_collection(|k,v| (<N as columnar::Columnar>::into_owned(k), <L as columnar::Columnar>::into_owned(v)))
.leave()
})
}
8 changes: 4 additions & 4 deletions differential-dataflow/src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::propagate::propagate;
/// Iteratively removes nodes with no in-edges.
pub fn trim<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+crate::Data>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -35,7 +35,7 @@ where
/// Returns the subset of edges in the same strongly connected component.
pub fn strongly_connected<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+crate::Data>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -51,7 +51,7 @@ where
fn trim_edges<G, N, R>(cycle: VecCollection<G, (N,N), R>, edges: VecCollection<G, (N,N), R>)
-> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+crate::Data>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -66,7 +66,7 @@ where
let labels = propagate(cycle, nodes);

edges.join_map(labels.clone(), |e1,e2,l1| (e2.clone(),(e1.clone(),l1.clone())))
.join_map(labels, |e2,(e1,l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
.join_map(labels, |e2,&(ref e1, ref l1),l2| ((e1.clone(),e2.clone()),(l1.clone(),l2.clone())))
.filter(|(_,(l1,l2))| l1 == l2)
.map(|((x1,x2),_)| (x2,x1))
}
15 changes: 9 additions & 6 deletions differential-dataflow/src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::hashable::Hashable;

fn _color<G, N>(edges: VecCollection<G, (N,N)>) -> VecCollection<G,(N,Option<u32>)>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+crate::Data>,
N: ExchangeData+Hash,
{
// need some bogus initial values.
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn sequence<G, N, V, F>(
edges: VecCollection<G, (N,N)>,
logic: F) -> VecCollection<G, (N,Option<V>)>
where
G: Scope<Timestamp: Lattice+Hash+Ord>,
G: Scope<Timestamp: Lattice+Hash+Ord+crate::Data>,
N: ExchangeData+Hashable,
V: ExchangeData,
F: Fn(&N, &[(&V, isize)])->V+'static
Expand All @@ -66,12 +66,12 @@ where
let reverse = edges.filter(|edge| edge.0 > edge.1);

// new state goes along forward edges, old state along reverse edges
let new_messages = new_state.join_map(forward, |_k,v,d| (d.clone(),v.clone()));
let new_messages = new_state.join_map(forward, |_k,v,d| (d.clone(), v.clone()));

let incomplete = new_messages.clone().filter(|x| x.1.is_none()).map(|x| x.0).distinct();
let incomplete = new_messages.clone().filter(|x: &(N, Option<V>)| x.1.is_none()).map(|x: (N, Option<V>)| x.0).distinct();
let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));

let old_messages = old_state.join_map(reverse, |_k,v,d| (d.clone(),v.clone()));
let old_messages = old_state.join_map(reverse, |_k,v,d| (d.clone(), v.clone()));

let messages = new_messages.concat(old_messages).antijoin(incomplete.clone());

Expand All @@ -82,7 +82,10 @@ where
messages
// .concat(old_messages) // /-- possibly too clever: None if any inputs None.
// .antijoin(incomplete)
.reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
.reduce(move |k, vs, t| {
let vs_ref: Vec<(&V, isize)> = vs.iter().map(|(v, r)| (*v, *r)).collect();
t.push((Some(logic(k, &vs_ref)), 1))
})
.concat(incomplete.map(|x| (x, None)))
})
}
24 changes: 12 additions & 12 deletions differential-dataflow/src/algorithms/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

impl<G, D, R> Identifiers<G, D, R> for VecCollection<G, D, R>
where
G: Scope<Timestamp: Lattice>,
G: Scope<Timestamp: Lattice+crate::Data>,
D: ExchangeData + ::std::hash::Hash,
R: ExchangeData + Abelian,
{
Expand Down Expand Up @@ -64,19 +64,19 @@
.map(|pair| (pair.hashed(), pair))
.reduce(|_hash, input, output| {
// keep round-positive records as changes.
let ((round, record), count) = &input[0];
if *round > 0 {
let (&(round, ref record), ref count) = input[0];
if round > 0 {
let mut neg_count = count.clone();
neg_count.negate();
output.push(((0, record.clone()), neg_count));
output.push(((*round, record.clone()), count.clone()));
output.push(((0i32, record.clone()), neg_count));
output.push(((round, record.clone()), count.clone()));
}
// if any losers, increment their rounds.
for ((round, record), count) in input[1..].iter() {
for (&(round, ref record), ref count) in input[1..].iter() {
let mut neg_count = count.clone();
neg_count.negate();
output.push(((0, record.clone()), neg_count));
output.push(((*round+1, record.clone()), count.clone()));
output.push(((0i32, record.clone()), neg_count));
output.push(((round+1, record.clone()), count.clone()));
}
})
.map(|(_hash, pair)| pair)
Expand Down Expand Up @@ -119,13 +119,13 @@
// keep round-positive records as changes.
let ((round, record), count) = &input[0];
if *round > 0 {
output.push(((0, record.clone()), -*count));
output.push(((*round, record.clone()), *count));
output.push(((0, columnar::Columnar::into_owned(record.clone())), -*count));

Check failure on line 122 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`

Check failure on line 122 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.86

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`
output.push(((*round, columnar::Columnar::into_owned(record.clone())), *count));

Check failure on line 123 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`

Check failure on line 123 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.86

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`
}
// if any losers, increment their rounds.
for ((round, record), count) in input[1..].iter() {
output.push(((0, record.clone()), -*count));
output.push(((*round+1, record.clone()), *count));
output.push(((0, columnar::Columnar::into_owned(record.clone())), -*count));

Check failure on line 127 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`

Check failure on line 127 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.86

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`
output.push(((*round+1, columnar::Columnar::into_owned(record.clone())), *count));

Check failure on line 128 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`

Check failure on line 128 in differential-dataflow/src/algorithms/identifiers.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.86

type mismatch resolving `<Vec<i32> as Borrow>::Ref<'_> == i32`
}
})
.inspect(|x| println!("{:?}", x))
Expand Down
17 changes: 10 additions & 7 deletions differential-dataflow/src/algorithms/prefix_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait PrefixSum<G: Scope, K, D> {

impl<G, K, D> PrefixSum<G, K, D> for VecCollection<G, ((usize, K), D)>
where
G: Scope<Timestamp: Lattice>,
G: Scope<Timestamp: Lattice+crate::Data>,
K: ExchangeData + ::std::hash::Hash,
D: ExchangeData + ::std::hash::Hash,
{
Expand All @@ -42,7 +42,7 @@ where
/// Accumulate data in `collection` into all powers-of-two intervals containing them.
pub fn aggregate<G, K, D, F>(collection: VecCollection<G, ((usize, K), D)>, combine: F) -> VecCollection<G, ((usize, usize, K), D)>
where
G: Scope<Timestamp: Lattice>,
G: Scope<Timestamp: Lattice+crate::Data>,
K: ExchangeData + ::std::hash::Hash,
D: ExchangeData + ::std::hash::Hash,
F: Fn(&K,&D,&D)->D + 'static,
Expand All @@ -64,8 +64,10 @@ where
.filter(|&((_pos, log, _), _)| log < 64)
.map(|((pos, log, key), data)| ((pos >> 1, log + 1, key), (pos, data)))
.reduce(move |&(_pos, _log, ref key), input, output| {
let mut result = (input[0].0).1.clone();
if input.len() > 1 { result = combine(key, &result, &(input[1].0).1); }
let mut result: D = (input[0].0).1.clone();
if input.len() > 1 {
result = combine(key, &result, &(input[1].0).1);
}
output.push((result, 1));
})
.concat(unit_ranges)
Expand All @@ -79,7 +81,7 @@ pub fn broadcast<G, K, D, F>(
zero: D,
combine: F) -> VecCollection<G, ((usize, K), D)>
where
G: Scope<Timestamp: Lattice + Ord + ::std::fmt::Debug>,
G: Scope<Timestamp: Lattice + Ord + ::std::fmt::Debug + crate::Data>,
K: ExchangeData + ::std::hash::Hash,
D: ExchangeData + ::std::hash::Hash,
F: Fn(&K,&D,&D)->D + 'static,
Expand Down Expand Up @@ -146,8 +148,9 @@ where
used_ranges
.enter(&scope)
.map(|((pos, log, key), data)| ((pos << log, key), (log, data)))
.join_map(states, move |&(pos, ref key), &(log, ref data), state|
((pos + (1 << log), key.clone()), combine(key, state, data)))
.join_map(states, move |&(pos, ref key), &(log, ref data), state| {
((pos + (1 << log), key.clone()), combine(key, state, data))
})
.concat(init_states)
.distinct()
})
Expand Down
Loading
Loading