Skip to content

Commit b685d5e

Browse files
committed
Back out of MzTupleRegion
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 3fd02e2 commit b685d5e

8 files changed

Lines changed: 233 additions & 327 deletions

File tree

src/compute/src/extensions/arrange.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ mod flatcontainer {
385385
use differential_dataflow::lattice::Lattice;
386386
use differential_dataflow::operators::arrange::Arranged;
387387
use differential_dataflow::trace::TraceReader;
388-
use mz_ore::flatcontainer::{MzRegion, MzRegionPreference};
388+
use mz_ore::flatcontainer::{MzIndex, MzRegion, MzRegionPreference};
389389
use timely::container::flatcontainer::{IntoOwned, Region};
390390
use timely::dataflow::Scope;
391391
use timely::progress::Timestamp;
@@ -399,10 +399,10 @@ mod flatcontainer {
399399
Self: Clone,
400400
G: Scope<Timestamp = T::Owned>,
401401
G::Timestamp: Lattice + Ord + MzRegionPreference,
402-
K: MzRegion,
403-
V: MzRegion,
404-
T: MzRegion,
405-
R: MzRegion,
402+
K: MzRegion<Index = MzIndex>,
403+
V: MzRegion<Index = MzIndex>,
404+
T: MzRegion<Index = MzIndex>,
405+
R: MzRegion<Index = MzIndex>,
406406
K::Owned: Clone + Ord,
407407
V::Owned: Clone + Ord,
408408
T::Owned: Lattice + for<'a> PartialOrder<<T as Region>::ReadItem<'a>> + Timestamp,

src/compute/src/logging/differential.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pub(super) fn construct<A: Allocate>(
136136
)
137137
.as_collection(move |op, ()| {
138138
packer.pack_slice(&[
139-
Datum::UInt64(u64::cast_from(*op)),
139+
Datum::UInt64(u64::cast_from(op)),
140140
Datum::UInt64(u64::cast_from(worker_id)),
141141
])
142142
})

src/compute/src/logging/initialize.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@ use std::collections::BTreeMap;
1010
use std::rc::Rc;
1111
use std::time::{Duration, Instant};
1212

13+
use crate::arrangement::manager::TraceBundle;
14+
use crate::extensions::arrange::{KeyCollection, MzArrange};
15+
use crate::logging::compute::ComputeEvent;
16+
use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
1317
use differential_dataflow::dynamic::pointstamp::PointStamp;
1418
use differential_dataflow::logging::DifferentialEvent;
1519
use differential_dataflow::Collection;
1620
use mz_compute_client::logging::{LogVariant, LoggingConfig};
17-
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
21+
use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion};
1822
use mz_repr::{Diff, Timestamp};
1923
use mz_storage_operators::persist_source::Subtime;
2024
use mz_storage_types::errors::DataflowError;
@@ -27,11 +31,6 @@ use timely::logging::{Logger, ProgressEventTimestamp, TimelyEvent, WorkerIdentif
2731
use timely::order::Product;
2832
use timely::progress::reachability::logging::TrackerEvent;
2933

30-
use crate::arrangement::manager::TraceBundle;
31-
use crate::extensions::arrange::{KeyCollection, MzArrange};
32-
use crate::logging::compute::ComputeEvent;
33-
use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
34-
3534
/// Initialize logging dataflows.
3635
///
3736
/// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for
@@ -87,11 +86,13 @@ type ReachabilityEventRegionPreference = (
8786
OwnedRegionOpinion<Vec<usize>>,
8887
OwnedRegionOpinion<Vec<(usize, usize, bool, Option<Timestamp>, Diff)>>,
8988
);
90-
pub(super) type ReachabilityEventRegion = <(
91-
Duration,
92-
WorkerIdentifier,
93-
ReachabilityEventRegionPreference,
94-
) as MzRegionPreference>::Region;
89+
pub(super) type ReachabilityEventRegion = ItemRegion<
90+
<(
91+
Duration,
92+
WorkerIdentifier,
93+
ReachabilityEventRegionPreference,
94+
) as MzRegionPreference>::Region,
95+
>;
9596

9697
struct LoggingContext<'a, A: Allocate> {
9798
worker: &'a mut timely::worker::Worker<A>,
@@ -100,7 +101,7 @@ struct LoggingContext<'a, A: Allocate> {
100101
now: Instant,
101102
start_offset: Duration,
102103
t_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, TimelyEvent)>>,
103-
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
104+
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzIndexOptimized>>,
104105
d_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, DifferentialEvent)>>,
105106
c_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, ComputeEvent)>>,
106107
shared_state: Rc<RefCell<SharedLoggingState>>,
@@ -189,7 +190,7 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
189190
fn reachability_logger(&self) -> Logger<TrackerEvent> {
190191
let event_queue = self.r_event_queue.clone();
191192
type CB = PreallocatingCapacityContainerBuilder<
192-
FlatStack<ReachabilityEventRegion, MzOffsetOptimized>,
193+
FlatStack<ReachabilityEventRegion, MzIndexOptimized>,
193194
>;
194195
let mut logger = BatchLogger::<CB, _>::new(event_queue.link, self.interval_ms);
195196
Logger::new(

src/compute/src/logging/reachability.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::rc::Rc;
1616
use mz_compute_client::logging::LoggingConfig;
1717
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
1818
use mz_ore::cast::CastFrom;
19-
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
19+
use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion};
2020
use mz_ore::iter::IteratorExt;
2121
use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp};
2222
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
@@ -39,7 +39,7 @@ use crate::typedefs::{FlatKeyValSpineDefault, RowRowSpine};
3939
pub(super) fn construct<A: Allocate>(
4040
worker: &mut timely::worker::Worker<A>,
4141
config: &LoggingConfig,
42-
event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
42+
event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzIndexOptimized>>,
4343
) -> BTreeMap<LogVariant, LogCollection> {
4444
let interval_ms = std::cmp::max(1, config.interval.as_millis());
4545
let worker_index = worker.index();
@@ -55,10 +55,10 @@ pub(super) fn construct<A: Allocate>(
5555
usize,
5656
Option<Timestamp>,
5757
);
58-
type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region;
58+
type UpdatesRegion =
59+
ItemRegion<<((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region>;
5960

60-
type CB =
61-
PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion, MzOffsetOptimized>>;
61+
type CB = PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion, MzIndexOptimized>>;
6262
let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>(
6363
scope,
6464
"reachability logs",
@@ -103,7 +103,7 @@ pub(super) fn construct<A: Allocate>(
103103
);
104104

105105
let updates =
106-
updates.as_collection(move |(&update_type, addr, &source, &port, ts), _| {
106+
updates.as_collection(move |(update_type, addr, source, port, ts), _| {
107107
let row_arena = RowArena::default();
108108
let update_type = if update_type { "source" } else { "target" };
109109
let binding = SharedRow::get();
@@ -119,7 +119,7 @@ pub(super) fn construct<A: Allocate>(
119119
Datum::UInt64(u64::cast_from(port)),
120120
Datum::UInt64(u64::cast_from(worker_index)),
121121
Datum::String(update_type),
122-
Datum::from(ts.copied()),
122+
Datum::from(ts),
123123
];
124124
row_builder.packer().extend(key.iter().map(|k| datums[*k]));
125125
let key_row = row_builder.clone();

src/compute/src/logging/timely.rs

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ use std::time::Duration;
1616

1717
use mz_compute_client::logging::LoggingConfig;
1818
use mz_ore::cast::CastFrom;
19-
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
20-
use mz_ore::region::LgAllocVec;
19+
use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion};
2120
use mz_repr::{Datum, Diff, Timestamp};
2221
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
2322
use mz_timely_util::replay::MzReplay;
2423
use serde::{Deserialize, Serialize};
2524
use timely::communication::Allocate;
26-
use timely::container::flatcontainer::FlatStack;
25+
use timely::container::flatcontainer::{FlatStack, IntoOwned, MirrorRegion};
2726
use timely::container::CapacityContainerBuilder;
2827
use timely::dataflow::channels::pact::Pipeline;
2928
use timely::dataflow::channels::pushers::buffer::Session;
@@ -158,7 +157,7 @@ pub(super) fn construct<A: Allocate>(
158157
)
159158
.as_collection(move |id, name| {
160159
packer.pack_slice(&[
161-
Datum::UInt64(u64::cast_from(*id)),
160+
Datum::UInt64(u64::cast_from(id)),
162161
Datum::UInt64(u64::cast_from(worker_id)),
163162
Datum::String(name),
164163
])
@@ -191,7 +190,7 @@ pub(super) fn construct<A: Allocate>(
191190
.as_collection({
192191
move |id, address| {
193192
packer.pack_by_index(|packer, index| match index {
194-
0 => packer.push(Datum::UInt64(u64::cast_from(*id))),
193+
0 => packer.push(Datum::UInt64(u64::cast_from(id))),
195194
1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))),
196195
2 => packer
197196
.push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)))),
@@ -272,7 +271,7 @@ pub(super) fn construct<A: Allocate>(
272271
)
273272
.as_collection(move |operator, _| {
274273
packer.pack_slice(&[
275-
Datum::UInt64(u64::cast_from(*operator)),
274+
Datum::UInt64(u64::cast_from(operator)),
276275
Datum::UInt64(u64::cast_from(worker_id)),
277276
])
278277
});
@@ -362,7 +361,7 @@ struct MessageCount {
362361
}
363362

364363
type FlatStackFor<D> =
365-
FlatStack<<(D, Timestamp, Diff) as MzRegionPreference>::Region, MzOffsetOptimized>;
364+
FlatStack<ItemRegion<<(D, Timestamp, Diff) as MzRegionPreference>::Region>, MzIndexOptimized>;
366365

367366
type Pusher<D> = Counter<Timestamp, FlatStackFor<D>, Tee<Timestamp, FlatStackFor<D>>>;
368367
type OutputSession<'a, D> =
@@ -395,7 +394,23 @@ struct ChannelDatum {
395394

396395
impl MzRegionPreference for ChannelDatum {
397396
type Owned = Self;
398-
type Region = LgAllocVec<Self>;
397+
type Region = MirrorRegion<Self>;
398+
}
399+
400+
impl<'a> IntoOwned<'a> for ChannelDatum {
401+
type Owned = Self;
402+
403+
fn into_owned(self) -> Self::Owned {
404+
self
405+
}
406+
407+
fn clone_onto(self, other: &mut Self::Owned) {
408+
*other = self;
409+
}
410+
411+
fn borrow_as(owned: &'a Self::Owned) -> Self {
412+
*owned
413+
}
399414
}
400415

401416
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
@@ -406,7 +421,23 @@ struct ParkDatum {
406421

407422
impl MzRegionPreference for ParkDatum {
408423
type Owned = Self;
409-
type Region = LgAllocVec<Self>;
424+
type Region = MirrorRegion<Self>;
425+
}
426+
427+
impl<'a> IntoOwned<'a> for ParkDatum {
428+
type Owned = Self;
429+
430+
fn into_owned(self) -> Self::Owned {
431+
self
432+
}
433+
434+
fn clone_onto(self, other: &mut Self::Owned) {
435+
*other = self;
436+
}
437+
438+
fn borrow_as(owned: &'a Self::Owned) -> Self {
439+
*owned
440+
}
410441
}
411442

412443
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
@@ -417,7 +448,23 @@ struct MessageDatum {
417448

418449
impl MzRegionPreference for MessageDatum {
419450
type Owned = Self;
420-
type Region = LgAllocVec<Self>;
451+
type Region = MirrorRegion<Self>;
452+
}
453+
454+
impl<'a> IntoOwned<'a> for MessageDatum {
455+
type Owned = Self;
456+
457+
fn into_owned(self) -> Self::Owned {
458+
self
459+
}
460+
461+
fn clone_onto(self, other: &mut Self::Owned) {
462+
*other = self;
463+
}
464+
465+
fn borrow_as(owned: &'a Self::Owned) -> Self {
466+
*owned
467+
}
421468
}
422469

423470
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
@@ -428,7 +475,23 @@ struct ScheduleHistogramDatum {
428475

429476
impl MzRegionPreference for ScheduleHistogramDatum {
430477
type Owned = Self;
431-
type Region = LgAllocVec<Self>;
478+
type Region = MirrorRegion<Self>;
479+
}
480+
481+
impl<'a> IntoOwned<'a> for ScheduleHistogramDatum {
482+
type Owned = Self;
483+
484+
fn into_owned(self) -> Self::Owned {
485+
self
486+
}
487+
488+
fn clone_onto(self, other: &mut Self::Owned) {
489+
*other = self;
490+
}
491+
492+
fn borrow_as(owned: &'a Self::Owned) -> Self {
493+
*owned
494+
}
432495
}
433496

434497
/// Event handler of the demux operator.

0 commit comments

Comments
 (0)