diff --git a/Cargo.lock b/Cargo.lock index c85204fe57a34..1913b71fb9440 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1981,7 +1981,7 @@ dependencies = [ [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#7760a903c9c451f7cf039a3978994251bafd8721" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#bb5836a50e5233795cf3efcc042f8945f78b4350" dependencies = [ "abomonation", "abomonation_derive", @@ -2037,7 +2037,7 @@ checksum = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" [[package]] name = "dogsdogsdogs" version = "0.1.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#7760a903c9c451f7cf039a3978994251bafd8721" +source = "git+https://github.com/antiguru/differential-dataflow.git?branch=region_update#bb5836a50e5233795cf3efcc042f8945f78b4350" dependencies = [ "abomonation", "abomonation_derive", @@ -2417,9 +2417,8 @@ dependencies = [ [[package]] name = "flatcontainer" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcaca60d6093f2c5328fe97b9dfafa16a3968577bc5df75ebd6b23ea79b0a0a4" +version = "0.5.0" +source = "git+https://github.com/antiguru/flatcontainer.git#7dc86fadbbaecd6fa3c549e20e1c46b0e448c778" dependencies = [ "cfg-if", "paste", @@ -5438,6 +5437,7 @@ dependencies = [ "criterion", "ctor", "derivative", + "differential-dataflow", "either", "flatcontainer", "futures", @@ -5469,6 +5469,7 @@ dependencies = [ "serde_json", "smallvec", "stacker", + "timely", "tokio", "tokio-native-tls", "tokio-openssl", @@ -9590,7 +9591,7 @@ dependencies = [ [[package]] name = "timely" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" dependencies = [ "abomonation", "abomonation_derive", @@ -9607,12 +9608,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" [[package]] name = "timely_communication" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" dependencies = [ "abomonation", "abomonation_derive", @@ -9628,7 +9629,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" dependencies = [ "columnation", "flatcontainer", @@ -9638,7 +9639,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#0c26e5e4198085d6c90db11930f2dba52e9f32cc" +source = "git+https://github.com/antiguru/timely-dataflow.git?branch=flatcontainer_storage#63103bd5f7584fd77b3256580df229189cbcd314" [[package]] name = "tiny-keccak" diff --git a/Cargo.toml b/Cargo.toml index d2bdcceeec326..94359e72ecdba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -263,15 +263,24 @@ debug = 2 # tend to get rewritten or disappear (e.g., because a PR is force pushed or gets # merged), after which point it becomes impossible to build that historical # version of Materialize. +[patch."https://github.com/TimelyDataflow/timely-dataflow"] +# Projects that do not reliably release to crates.io. +timely = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_bytes = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_communication = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_container = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_logging = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } [patch.crates-io] # Projects that do not reliably release to crates.io. -timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" } -differential-dataflow = { git = "https://github.com/MaterializeInc/differential-dataflow.git" } -dogsdogsdogs = { git = "https://github.com/MaterializeInc/differential-dataflow.git" } +timely = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_bytes = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_communication = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_container = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +timely_logging = { git = "https://github.com/antiguru/timely-dataflow.git", branch = "flatcontainer_storage" } +differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "region_update" } +dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "region_update" } + +flatcontainer = { git = "https://github.com/antiguru/flatcontainer.git" } # Waiting on https://github.com/sfackler/rust-postgres/pull/752. postgres = { git = "https://github.com/MaterializeInc/rust-postgres" } diff --git a/misc/cargo-vet/audits.toml b/misc/cargo-vet/audits.toml index 35bb6ba16ae2a..085ec34c39777 100644 --- a/misc/cargo-vet/audits.toml +++ b/misc/cargo-vet/audits.toml @@ -1,4 +1,3 @@ - # cargo-vet audits file [criteria.maintained-and-necessary] @@ -281,7 +280,7 @@ version = "23.5.26" [[audits.flatcontainer]] who = "Moritz Hoffmann " criteria = "safe-to-deploy" -version = "0.4.1" +version = "0.5.0" [[audits.fluent-uri]] who = "Nikhil Benesch " diff --git a/src/cluster/src/communication.rs b/src/cluster/src/communication.rs index 01298c9df4b49..8598878371794 100644 --- a/src/cluster/src/communication.rs +++ b/src/cluster/src/communication.rs @@ -36,6 +36,7 @@ use std::any::Any; use std::cmp::Ordering; use std::fmt::Display; +use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -109,7 +110,7 @@ where } } - match initialize_networking_from_sockets(sockets, process, workers, Box::new(|_| None)) { + match initialize_networking_from_sockets(sockets, process, workers, Arc::new(|_| None)) { Ok((stuff, guard)) => { info!(process = process, "successfully initialized network"); Ok(( diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 2ae7ed265d57b..689d5ee29410a 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -30,7 +30,7 @@ mz-compute-types = { path = "../compute-types" } mz-dyncfg = { path = "../dyncfg" } mz-dyncfgs = { path = "../dyncfgs" } mz-expr = { path = "../expr" } -mz-ore = { path = "../ore", features = ["async", "flatcontainer", "process", "tracing_"] } +mz-ore = { path = "../ore", features = ["async", "differential", "flatcontainer", "process", "tracing_"] } mz-persist-client = { path = "../persist-client" } mz-persist-types = { path = "../persist-types" } mz-repr = { path = "../repr" } diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index cc63887c6cd67..22e46ab88ea5f 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -23,7 +23,7 @@ use timely::progress::Timestamp; use timely::Container; use crate::logging::compute::ComputeEvent; -use crate::typedefs::{KeyAgent, KeyValAgent, RowAgent, RowRowAgent, RowValAgent}; +use crate::typedefs::{KeyAgent, RowAgent, RowRowAgent, RowValAgent}; /// Extension trait to arrange data. pub trait MzArrange: MzArrangeCore @@ -270,36 +270,6 @@ where } } -impl ArrangementSize for Arranged> -where - G: Scope, - G::Timestamp: Lattice + Ord + Columnation, - K: Data + Columnation, - V: Data + Columnation, - T: Lattice + Timestamp, - R: Semigroup + Ord + Columnation + 'static, -{ - fn log_arrangement_size(self) -> Self { - log_arrangement_size_inner(self, |trace| { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let mut callback = |siz, cap| { - size += siz; - capacity += cap; - allocations += usize::from(cap > 0); - }; - trace.map_batches(|batch| { - batch.storage.keys.heap_size(&mut callback); - batch.storage.keys_offs.heap_size(&mut callback); - batch.storage.vals.heap_size(&mut callback); - batch.storage.vals_offs.heap_size(&mut callback); - batch.storage.times.heap_size(&mut callback); - batch.storage.diffs.heap_size(&mut callback); - }); - (size, capacity, allocations) - }) - } -} - impl ArrangementSize for Arranged> where G: Scope, @@ -415,8 +385,8 @@ mod flatcontainer { use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; - use mz_ore::flatcontainer::MzRegionPreference; - use timely::container::flatcontainer::{IntoOwned, Push, Region, ReserveItems}; + use mz_ore::flatcontainer::{MzIndex, MzRegion, MzRegionPreference}; + use timely::container::flatcontainer::{IntoOwned, Region}; use timely::dataflow::Scope; use timely::progress::Timestamp; use timely::PartialOrder; @@ -429,31 +399,10 @@ mod flatcontainer { Self: Clone, G: Scope, G::Timestamp: Lattice + Ord + MzRegionPreference, - K: Region - + Clone - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, - V: Region - + Clone - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, - T: Region - + Clone - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, - R: Region - + Clone - + Push<::Owned> - + for<'a> Push<&'a ::Owned> - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>> - + 'static, + K: MzRegion, + V: MzRegion, + T: MzRegion, + R: MzRegion, K::Owned: Clone + Ord, V::Owned: Clone + Ord, T::Owned: Lattice + for<'a> PartialOrder<::ReadItem<'a>> + Timestamp, diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index da9c28872fbce..a92612d35c9ae 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -130,13 +130,13 @@ pub(super) fn construct( let stream_to_collection = |input: Stream<_, ((usize, ()), Timestamp, Diff)>, log, name| { let mut packer = PermutedRowPacker::new(log); input - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, &format!("PreArrange Differential {name}"), ) .as_collection(move |op, ()| { packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(*op)), + Datum::UInt64(u64::cast_from(op)), Datum::UInt64(u64::cast_from(worker_id)), ]) }) diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 8a5a6ab0ccd4d..715d169c7a524 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -10,14 +10,19 @@ use std::collections::BTreeMap; use std::rc::Rc; use std::time::{Duration, Instant}; +use crate::arrangement::manager::TraceBundle; +use crate::extensions::arrange::{KeyCollection, MzArrange}; +use crate::logging::compute::ComputeEvent; +use crate::logging::{BatchLogger, EventQueue, SharedLoggingState}; use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::logging::DifferentialEvent; use differential_dataflow::Collection; use mz_compute_client::logging::{LogVariant, LoggingConfig}; -use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Diff, Timestamp}; use mz_storage_operators::persist_source::Subtime; use mz_storage_types::errors::DataflowError; +use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::flatcontainer::FlatStack; @@ -26,11 +31,6 @@ use timely::logging::{Logger, ProgressEventTimestamp, TimelyEvent, WorkerIdentif use timely::order::Product; use timely::progress::reachability::logging::TrackerEvent; -use crate::arrangement::manager::TraceBundle; -use crate::extensions::arrange::{KeyCollection, MzArrange}; -use crate::logging::compute::ComputeEvent; -use crate::logging::{BatchLogger, EventQueue, SharedLoggingState}; - /// Initialize logging dataflows. /// /// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for @@ -86,11 +86,13 @@ type ReachabilityEventRegionPreference = ( OwnedRegionOpinion>, OwnedRegionOpinion, Diff)>>, ); -pub(super) type ReachabilityEventRegion = <( - Duration, - WorkerIdentifier, - ReachabilityEventRegionPreference, -) as MzRegionPreference>::Region; +pub(super) type ReachabilityEventRegion = ItemRegion< + <( + Duration, + WorkerIdentifier, + ReachabilityEventRegionPreference, + ) as MzRegionPreference>::Region, +>; struct LoggingContext<'a, A: Allocate> { worker: &'a mut timely::worker::Worker, @@ -99,7 +101,7 @@ struct LoggingContext<'a, A: Allocate> { now: Instant, start_offset: Duration, t_event_queue: EventQueue>, - r_event_queue: EventQueue>, + r_event_queue: EventQueue>, d_event_queue: EventQueue>, c_event_queue: EventQueue>, shared_state: Rc>, @@ -187,10 +189,10 @@ impl LoggingContext<'_, A> { fn reachability_logger(&self) -> Logger { let event_queue = self.r_event_queue.clone(); - let mut logger = BatchLogger::< - CapacityContainerBuilder>, - _, - >::new(event_queue.link, self.interval_ms); + type CB = PreallocatingCapacityContainerBuilder< + FlatStack, + >; + let mut logger = BatchLogger::::new(event_queue.link, self.interval_ms); Logger::new( self.now, self.start_offset, diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 67988eeea7361..76b83e7c8eeca 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -16,13 +16,13 @@ use std::rc::Rc; use mz_compute_client::logging::LoggingConfig; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_ore::cast::CastFrom; -use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_ore::iter::IteratorExt; use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp}; +use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; use timely::container::flatcontainer::FlatStack; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use crate::extensions::arrange::{MzArrange, MzArrangeCore}; @@ -39,7 +39,7 @@ use crate::typedefs::{FlatKeyValSpineDefault, RowRowSpine}; pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, - event_queue: EventQueue>, + event_queue: EventQueue>, ) -> BTreeMap { let interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_index = worker.index(); @@ -55,9 +55,10 @@ pub(super) fn construct( usize, Option, ); - type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region; + type UpdatesRegion = + ItemRegion<<((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region>; - type CB = CapacityContainerBuilder>; + type CB = PreallocatingCapacityContainerBuilder>; let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>( scope, "reachability logs", @@ -118,7 +119,7 @@ pub(super) fn construct( Datum::UInt64(u64::cast_from(port)), Datum::UInt64(u64::cast_from(worker_index)), Datum::String(update_type), - Datum::from(ts.clone()), + Datum::from(ts), ]; row_builder.packer().extend(key.iter().map(|k| datums[*k])); let key_row = row_builder.clone(); diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index 42c95e01c7c03..8dada0159b7cd 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -14,14 +14,15 @@ use std::collections::BTreeMap; use std::rc::Rc; use std::time::Duration; -use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference, OwnedRegionOpinion}; use mz_repr::{Datum, Diff, Timestamp}; +use mz_timely_util::containers::PreallocatingCapacityContainerBuilder; use mz_timely_util::replay::MzReplay; use serde::{Deserialize, Serialize}; use timely::communication::Allocate; -use timely::container::columnation::{Columnation, CopyRegion}; +use timely::container::flatcontainer::{FlatStack, IntoOwned, MirrorRegion}; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::channels::pushers::buffer::Session; @@ -150,17 +151,23 @@ pub(super) fn construct( // updates that reach `Row` encoding. let mut packer = PermutedRowPacker::new(TimelyLog::Operates); let operates = operates - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely operates") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely operates", + ) .as_collection(move |id, name| { packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(*id)), + Datum::UInt64(u64::cast_from(id)), Datum::UInt64(u64::cast_from(worker_id)), Datum::String(name), ]) }); let mut packer = PermutedRowPacker::new(TimelyLog::Channels); let channels = channels - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely operates") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely operates", + ) .as_collection(move |datum, ()| { let (source_node, source_port) = datum.source; let (target_node, target_port) = datum.target; @@ -176,11 +183,14 @@ pub(super) fn construct( let mut packer = PermutedRowPacker::new(TimelyLog::Addresses); let addresses = addresses - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely addresses") + .mz_arrange_core::<_, KeyValSpine>, Timestamp, Diff, _>>( + Pipeline, + "PreArrange Timely addresses", + ) .as_collection({ move |id, address| { packer.pack_by_index(|packer, index| match index { - 0 => packer.push(Datum::UInt64(u64::cast_from(*id))), + 0 => packer.push(Datum::UInt64(u64::cast_from(id))), 1 => packer.push(Datum::UInt64(u64::cast_from(worker_id))), 2 => packer .push_list(address.iter().map(|i| Datum::UInt64(u64::cast_from(*i)))), @@ -190,7 +200,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::Parks); let parks = parks - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely parks") + .mz_arrange_core::<_, KeyValSpine>(Pipeline, "PreArrange Timely parks") .as_collection(move |datum, ()| { packer.pack_slice(&[ Datum::UInt64(u64::cast_from(worker_id)), @@ -203,7 +213,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::BatchesSent); let batches_sent = batches_sent - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely batches sent", ) @@ -216,7 +226,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::BatchesReceived); let batches_received = batches_received - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely batches received", ) @@ -229,7 +239,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::MessagesSent); let messages_sent = messages_sent - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely messages sent", ) @@ -242,7 +252,7 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::MessagesReceived); let messages_received = messages_received - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>( + .mz_arrange_core::<_, KeyValSpine>( Pipeline, "PreArrange Timely messages received", ) @@ -255,16 +265,22 @@ pub(super) fn construct( }); let mut packer = PermutedRowPacker::new(TimelyLog::Elapsed); let elapsed = schedules_duration - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely duration") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely duration", + ) .as_collection(move |operator, _| { packer.pack_slice(&[ - Datum::UInt64(u64::cast_from(*operator)), + Datum::UInt64(u64::cast_from(operator)), Datum::UInt64(u64::cast_from(worker_id)), ]) }); let mut packer = PermutedRowPacker::new(TimelyLog::Histogram); let histogram = schedules_histogram - .mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(Pipeline, "PreArrange Timely histogram") + .mz_arrange_core::<_, KeyValSpine>( + Pipeline, + "PreArrange Timely histogram", + ) .as_collection(move |datum, _| { packer.pack_slice(&[ Datum::UInt64(u64::cast_from(datum.operator)), @@ -344,10 +360,12 @@ struct MessageCount { records: i64, } -type Pusher = - Counter, Tee>>; +type FlatStackFor = + FlatStack::Region>, MzIndexOptimized>; + +type Pusher = Counter, Tee>>; type OutputSession<'a, D> = - Session<'a, Timestamp, ConsolidatingContainerBuilder>, Pusher>; + Session<'a, Timestamp, PreallocatingCapacityContainerBuilder>, Pusher>; /// Bundled output buffers used by the demux operator. // @@ -357,7 +375,7 @@ type OutputSession<'a, D> = struct DemuxOutput<'a> { operates: OutputSession<'a, (usize, String)>, channels: OutputSession<'a, (ChannelDatum, ())>, - addresses: OutputSession<'a, (usize, Vec)>, + addresses: OutputSession<'a, (usize, OwnedRegionOpinion>)>, parks: OutputSession<'a, (ParkDatum, ())>, batches_sent: OutputSession<'a, (MessageDatum, ())>, batches_received: OutputSession<'a, (MessageDatum, ())>, @@ -374,8 +392,25 @@ struct ChannelDatum { target: (usize, usize), } -impl Columnation for ChannelDatum { - type InnerRegion = CopyRegion; +impl MzRegionPreference for ChannelDatum { + type Owned = Self; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for ChannelDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -384,8 +419,25 @@ struct ParkDatum { requested_pow: Option, } -impl Columnation for ParkDatum { - type InnerRegion = CopyRegion; +impl MzRegionPreference for ParkDatum { + type Owned = Self; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for ParkDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -394,8 +446,25 @@ struct MessageDatum { worker: usize, } -impl Columnation for MessageDatum { - type InnerRegion = CopyRegion; +impl MzRegionPreference for MessageDatum { + type Owned = Self; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for MessageDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -404,8 +473,25 @@ struct ScheduleHistogramDatum { duration_pow: u128, } -impl Columnation for ScheduleHistogramDatum { - type InnerRegion = CopyRegion; +impl MzRegionPreference for ScheduleHistogramDatum { + type Owned = Self; + type Region = MirrorRegion; +} + +impl<'a> IntoOwned<'a> for ScheduleHistogramDatum { + type Owned = Self; + + fn into_owned(self) -> Self::Owned { + self + } + + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } } /// Event handler of the demux operator. diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index 1c4736a948a6c..5076dcce3f067 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -11,6 +11,9 @@ #![allow(dead_code, missing_docs)] +pub use crate::row_spine::{RowRowSpine, RowSpine, RowValSpine}; +use crate::typedefs::spines::MzFlatLayout; +pub use crate::typedefs::spines::{ColKeySpine, ColValSpine}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::trace::implementations::chunker::ColumnationChunker; @@ -19,16 +22,12 @@ use differential_dataflow::trace::implementations::merge_batcher_col::Columnatio use differential_dataflow::trace::implementations::ord_neu::{FlatValSpine, OrdValBatch}; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; -use mz_ore::flatcontainer::MzRegionPreference; +use mz_ore::flatcontainer::{ItemRegion, MzIndexOptimized, MzRegionPreference}; use mz_repr::Diff; use mz_storage_types::errors::DataflowError; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use timely::dataflow::ScopeParent; -pub use crate::row_spine::{RowRowSpine, RowSpine, RowValSpine}; -use crate::typedefs::spines::MzFlatLayout; -pub use crate::typedefs::spines::{ColKeySpine, ColValSpine}; - pub(crate) mod spines { use std::rc::Rc; @@ -40,9 +39,10 @@ pub(crate) mod spines { use differential_dataflow::trace::implementations::spine_fueled::Spine; use differential_dataflow::trace::implementations::{Layout, Update}; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; + use mz_ore::flatcontainer::{MzIndex, MzIndexOptimized, MzRegion}; use mz_timely_util::containers::stack::StackWrapper; use timely::container::columnation::{Columnation, TimelyStack}; - use timely::container::flatcontainer::{FlatStack, Push, Region}; + use timely::container::flatcontainer::FlatStack; use timely::progress::Timestamp; use crate::row_spine::OffsetOptimized; @@ -83,63 +83,55 @@ pub(crate) mod spines { } /// A layout based on flat container stacks - pub struct MzFlatLayout { - phantom: std::marker::PhantomData<(K, V, T, R)>, + pub struct MzFlatLayout { + phantom: std::marker::PhantomData<(KR, VR, TR, RR)>, } - impl Update for MzFlatLayout + impl Update for MzFlatLayout where - K: Region, - V: Region, - T: Region, - R: Region, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, + KR: MzRegion, + VR: MzRegion, + TR: MzRegion, + RR: MzRegion, + KR::Owned: Ord + Clone + 'static, + VR::Owned: Ord + Clone + 'static, + TR::Owned: Ord + Clone + Lattice + Timestamp + 'static, + RR::Owned: Ord + Semigroup + 'static, + for<'a> KR::ReadItem<'a>: Copy + Ord, + for<'a> VR::ReadItem<'a>: Copy + Ord, + for<'a> TR::ReadItem<'a>: Copy + Ord, + for<'a> RR::ReadItem<'a>: Copy + Ord, { - type Key = K::Owned; - type Val = V::Owned; - type Time = T::Owned; - type Diff = R::Owned; + type Key = KR::Owned; + type Val = VR::Owned; + type Time = TR::Owned; + type Diff = RR::Owned; } /// Layout implementation for [`MzFlatLayout`]. Mostly equivalent to differential's /// flat layout but with a different opinion for the offset container. Here, we use /// [`OffsetOptimized`] instead of an offset list. If differential should gain access /// to the optimized variant, we might be able to remove this implementation. - impl Layout for MzFlatLayout + impl Layout for MzFlatLayout where - K: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - V: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - T: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - R: Region - + Push<::Owned> - + for<'a> Push<::ReadItem<'a>> - + 'static, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, - for<'a> K::ReadItem<'a>: Copy + Ord, - for<'a> V::ReadItem<'a>: Copy + Ord, - for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R::ReadItem<'a>: Copy + Ord, + KR: MzRegion, + VR: MzRegion, + TR: MzRegion, + RR: MzRegion, + KR::Owned: Ord + Clone + 'static, + VR::Owned: Ord + Clone + 'static, + TR::Owned: Ord + Clone + Lattice + Timestamp + 'static, + RR::Owned: Ord + Semigroup + 'static, + for<'a> KR::ReadItem<'a>: Copy + Ord, + for<'a> VR::ReadItem<'a>: Copy + Ord, + for<'a> TR::ReadItem<'a>: Copy + Ord, + for<'a> RR::ReadItem<'a>: Copy + Ord, { type Target = Self; - type KeyContainer = FlatStack; - type ValContainer = FlatStack; - type TimeContainer = FlatStack; - type DiffContainer = FlatStack; + type KeyContainer = FlatStack; + type ValContainer = FlatStack; + type TimeContainer = FlatStack; + type DiffContainer = FlatStack; type OffsetContainer = OffsetOptimized; } } @@ -148,10 +140,10 @@ pub(crate) mod spines { // Agents are wrappers around spines that allow shared read access. // Fully generic spines and agents. -pub type KeyValSpine = ColValSpine; -pub type KeyValAgent = TraceAgent>; -pub type KeyValEnter = - TraceEnter>, TEnter>; +pub type KeyValSpine = FlatKeyValSpineDefault; +pub type KeyValAgent = TraceAgent>; +pub type KeyValEnter = + TraceEnter>, TEnter>; // Fully generic key-only spines and agents pub type KeySpine = ColKeySpine; @@ -188,13 +180,17 @@ pub type KeyValBatcher = MergeBatcher< >; pub type FlatKeyValBatch = OrdValBatch>; -pub type FlatKeyValSpine = - FlatValSpine, TupleABCRegion, T, R>, C>; +pub type FlatKeyValSpine = FlatValSpine< + MzFlatLayout, + ItemRegion, T, R>>, + C, + MzIndexOptimized, +>; pub type FlatKeyValSpineDefault = FlatKeyValSpine< - ::Region, - ::Region, - ::Region, - ::Region, + ItemRegion<::Region>, + ItemRegion<::Region>, + ItemRegion<::Region>, + ItemRegion<::Region>, C, >; pub type FlatKeyValAgent = TraceAgent>; diff --git a/src/ore/Cargo.toml b/src/ore/Cargo.toml index f13219d524289..49ec1dcf7aa57 100644 --- a/src/ore/Cargo.toml +++ b/src/ore/Cargo.toml @@ -27,8 +27,9 @@ clap = { version = "3.2.24", features = ["env"], optional = true } compact_bytes = { version = "0.1.2", optional = true } ctor = { version = "0.1.26", optional = true } derivative = { version = "2.2.0", optional = true } +differential-dataflow = { version = "0.12.0", optional = true } either = "1.8.0" -flatcontainer = { version = "0.4.1", optional = true } +flatcontainer = { version = "0.5.0", optional = true } futures = { version = "0.3.25", optional = true } hibitset = { version = "0.6.4", optional = true } lgalloc = { version = "0.3", optional = true } @@ -50,6 +51,7 @@ smallvec = { version = "1.10.0", optional = true } stacker = { version = "0.1.15", optional = true } sentry = { version = "0.29.1", optional = true, features = ["debug-images"] } serde = { version = "1.0.152", features = ["derive"] } +timely = { version = "0.12.0", default-features = false, features = ["bincode"], optional = true } tokio = { version = "1.38.0", features = [ "io-util", "net", @@ -115,6 +117,8 @@ async = [ "tokio", "tracing", ] +differential = ["differential-dataflow", "flatcontainer_", "timely"] +flatcontainer_ = ["flatcontainer", "region"] bytes_ = ["bytes", "compact_bytes", "smallvec", "smallvec/const_generics", "region", "tracing_"] network = ["async", "bytes", "hyper", "smallvec", "tonic", "tracing"] process = ["libc"] diff --git a/src/ore/src/flatcontainer.rs b/src/ore/src/flatcontainer.rs index d74b84ac082e6..e862381fadd77 100644 --- a/src/ore/src/flatcontainer.rs +++ b/src/ore/src/flatcontainer.rs @@ -15,17 +15,54 @@ //! Flat container utilities -use flatcontainer::{Push, Region, ReserveItems}; +use flatcontainer::impls::deduplicate::ConsecutiveIndexPairs; +use flatcontainer::{OptionRegion, Push, Region, ReserveItems, StringRegion}; +use serde::{Deserialize, Serialize}; + +pub use item::ItemRegion; +pub use offset::MzIndexOptimized; /// Associate a type with a flat container region. pub trait MzRegionPreference: 'static { /// The owned type of the container. type Owned; /// A region that can hold `Self`. - type Region: for<'a> Region - + Push - + for<'a> Push<::ReadItem<'a>> - + for<'a> ReserveItems<::ReadItem<'a>>; + type Region: MzRegion; +} + +/// TODO +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct MzIndex(usize); + +impl std::ops::Deref for MzIndex { + type Target = usize; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// TODO +pub trait MzRegion: + Region + + Push<::Owned> + + for<'a> Push<&'a ::Owned> + + for<'a> Push<::ReadItem<'a>> + + for<'a> ReserveItems<::ReadItem<'a>> + + Clone + + 'static +{ +} + +impl MzRegion for R where + R: Region + + Push<::Owned> + + for<'a> Push<&'a ::Owned> + + for<'a> Push<::ReadItem<'a>> + + for<'a> ReserveItems<::ReadItem<'a>> + + Clone + + 'static +{ } /// Opinion indicating that the contents of a collection should be stored in an @@ -41,15 +78,14 @@ mod tuple { use crate::flatcontainer::MzRegionPreference; + /// The macro creates the region implementation for tuples macro_rules! tuple_flatcontainer { - ($($name:ident)+) => ( - paste! { - impl<$($name: MzRegionPreference),*> MzRegionPreference for ($($name,)*) { - type Owned = ($($name::Owned,)*); - type Region = []<$($name::Region,)*>; - } + ($($name:ident)+) => (paste! { + impl<$($name: MzRegionPreference),*> MzRegionPreference for ($($name,)*) { + type Owned = ($($name::Owned,)*); + type Region = []<$($name::Region,)*>; } - ) + }); } tuple_flatcontainer!(A); @@ -104,18 +140,745 @@ mod copy { implement_for!(std::time::Duration); } -mod vec { - use flatcontainer::OwnedRegion; +impl MzRegionPreference for String { + type Owned = String; + type Region = ConsecutiveIndexPairs; +} +mod vec { + use crate::flatcontainer::lgalloc::LgAllocOwnedRegion; use crate::flatcontainer::{MzRegionPreference, OwnedRegionOpinion}; impl MzRegionPreference for OwnedRegionOpinion> { type Owned = Vec; - type Region = OwnedRegion; + type Region = LgAllocOwnedRegion; } } impl MzRegionPreference for Option { - type Owned = as Region>::Owned; - type Region = flatcontainer::OptionRegion; + type Owned = as Region>::Owned; + type Region = OptionRegion; +} + +mod lgalloc { + //! A region that stores slices of clone types in lgalloc + + use crate::flatcontainer::MzIndex; + use crate::region::LgAllocVec; + use flatcontainer::impls::index::{IndexContainer, IndexOptimized}; + use flatcontainer::impls::storage::Storage; + use flatcontainer::{Push, PushIter, Region, ReserveItems}; + + /// A container for owned types. + /// + /// The container can absorb any type, and stores an owned version of the type, similarly to what + /// vectors do. We recommend using this container for copy types, but there is no restriction in + /// the implementation, and in fact it can correctly store owned values, although any data owned + /// by `T` is regular heap-allocated data, and not contained in regions. + /// + /// # Examples + /// + /// ``` + /// use flatcontainer::{Push, OwnedRegion, Region}; + /// let mut r = >::default(); + /// + /// let panagram_en = "The quick fox jumps over the lazy dog"; + /// let panagram_de = "Zwölf Boxkämpfer jagen Viktor quer über den großen Sylter Deich"; + /// + /// let en_index = r.push(panagram_en.as_bytes()); + /// let de_index = r.push(panagram_de.as_bytes()); + /// + /// assert_eq!(panagram_de.as_bytes(), r.index(de_index)); + /// assert_eq!(panagram_en.as_bytes(), r.index(en_index)); + /// ``` + #[derive(Debug)] + pub struct LgAllocOwnedRegion { + slices: LgAllocVec, + offsets: IndexOptimized, + } + + impl Clone for LgAllocOwnedRegion { + #[inline] + fn clone(&self) -> Self { + Self { + slices: self.slices.clone(), + offsets: self.offsets.clone(), + } + } + + #[inline] + fn clone_from(&mut self, source: &Self) { + self.slices.clone_from(&source.slices); + self.offsets.clone_from(&source.offsets); + } + } + + impl Region for LgAllocOwnedRegion + where + [T]: ToOwned, + { + type Owned = <[T] as ToOwned>::Owned; + type ReadItem<'a> = &'a [T] where Self: 'a; + type Index = MzIndex; + + #[inline] + fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self + where + Self: 'a, + { + let mut this = Self { + slices: LgAllocVec::with_capacity(regions.map(|r| r.slices.len()).sum()), + offsets: IndexOptimized::default(), + }; + this.offsets.push(0); + this + } + + #[inline] + fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + let start = self.offsets.index(*index); + let end = self.offsets.index(*index + 1); + &self.slices[start..end] + } + + #[inline] + fn reserve_regions<'a, I>(&mut self, regions: I) + where + Self: 'a, + I: Iterator + Clone, + { + self.slices.reserve(regions.map(|r| r.slices.len()).sum()); + } + + #[inline] + fn clear(&mut self) { + self.slices.clear(); + self.offsets.clear(); + self.offsets.push(0); + } + + #[inline] + fn heap_size(&self, mut callback: F) { + let size_of_t = std::mem::size_of::(); + callback( + self.slices.len() * size_of_t, + self.slices.capacity() * size_of_t, + ); + } + + #[inline] + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> + where + Self: 'a, + { + item + } + } + + impl Default for LgAllocOwnedRegion { + #[inline] + fn default() -> Self { + let mut this = Self { + slices: LgAllocVec::default(), + offsets: IndexOptimized::default(), + }; + this.offsets.push(0); + this + } + } + + impl Push<&[T; N]> for LgAllocOwnedRegion { + #[inline] + fn push(&mut self, item: &[T; N]) -> as Region>::Index { + self.slices.extend_from_slice(item); + self.offsets.push(self.slices.len()); + MzIndex(self.offsets.len() - 2) + } + } + + impl Push<&&[T; N]> for LgAllocOwnedRegion { + #[inline] + fn push(&mut self, item: &&[T; N]) -> as Region>::Index { + self.push(*item) + } + } + + impl<'b, T: Clone, const N: usize> ReserveItems<&'b [T; N]> for LgAllocOwnedRegion { + #[inline] + fn reserve_items(&mut self, items: I) + where + I: Iterator + Clone, + { + self.slices.reserve(items.map(|i| i.len()).sum()); + } + } + + impl Push<&[T]> for LgAllocOwnedRegion { + #[inline] + fn push(&mut self, item: &[T]) -> as Region>::Index { + self.slices.extend_from_slice(item); + self.offsets.push(self.slices.len()); + MzIndex(self.offsets.len() - 2) + } + } + + impl Push<&&[T]> for LgAllocOwnedRegion + where + for<'a> Self: Push<&'a [T]>, + { + #[inline] + fn push(&mut self, item: &&[T]) -> as Region>::Index { + self.push(*item) + } + } + + impl<'b, T> ReserveItems<&'b [T]> for LgAllocOwnedRegion + where + [T]: ToOwned, + { + #[inline] + fn reserve_items(&mut self, items: I) + where + I: Iterator + Clone, + { + self.slices.reserve(items.map(<[T]>::len).sum()); + } + } + + impl Push> for LgAllocOwnedRegion + where + [T]: ToOwned, + { + #[inline] + fn push(&mut self, mut item: Vec) -> as Region>::Index { + self.slices.append(&mut item); + self.offsets.push(self.slices.len()); + MzIndex(self.offsets.len() - 2) + } + } + + impl Push<&Vec> for LgAllocOwnedRegion { + #[inline] + fn push(&mut self, item: &Vec) -> as Region>::Index { + self.push(item.as_slice()) + } + } + + impl<'a, T> ReserveItems<&'a Vec> for LgAllocOwnedRegion + where + [T]: ToOwned, + { + #[inline] + fn reserve_items(&mut self, items: I) + where + I: Iterator> + Clone, + { + self.reserve_items(items.map(Vec::as_slice)); + } + } + + impl> ReserveItems> for LgAllocOwnedRegion + where + [T]: ToOwned, + { + #[inline] + fn reserve_items(&mut self, items: I) + where + I: Iterator> + Clone, + { + self.slices + .reserve(items.flat_map(|i| i.0.into_iter()).count()); + } + } + + #[cfg(test)] + mod tests { + use flatcontainer::{Push, Region, ReserveItems}; + + use super::*; + + #[crate::test] + fn test_copy_ref_ref_array() { + let mut r = >::default(); + ReserveItems::reserve_items(&mut r, std::iter::once(&[1; 4])); + let index = r.push(&&[1; 4]); + assert_eq!([1, 1, 1, 1], r.index(index)); + } + + #[crate::test] + fn test_copy_vec() { + let mut r = >::default(); + ReserveItems::reserve_items(&mut r, std::iter::once(&vec![1; 4])); + let index = r.push(&vec![1; 4]); + assert_eq!([1, 1, 1, 1], r.index(index)); + let index = r.push(vec![2; 4]); + assert_eq!([2, 2, 2, 2], r.index(index)); + } + } +} + +mod item { + //! A region that stores indexes in lgalloc, converting indexes to [`MzIndex`]. + use flatcontainer::{Push, Region, ReserveItems}; + + use crate::flatcontainer::MzIndex; + use crate::region::LgAllocVec; + + /// A region that stores indexes in lgalloc. + pub struct ItemRegion { + inner: R, + storage: LgAllocVec, + } + + impl std::fmt::Debug for ItemRegion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ItemRegion").finish_non_exhaustive() + } + } + + impl Clone for ItemRegion { + #[inline] + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + storage: self.storage.clone(), + } + } + + #[inline] + fn clone_from(&mut self, source: &Self) { + self.inner.clone_from(&source.inner); + self.storage.clone_from(&source.storage); + } + } + + impl Default for ItemRegion { + #[inline] + fn default() -> Self { + Self { + inner: R::default(), + storage: LgAllocVec::default(), + } + } + } + + impl Region for ItemRegion { + type Owned = R::Owned; + type ReadItem<'a> = R::ReadItem<'a> + where + Self: 'a; + type Index = MzIndex; + + #[inline] + fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self + where + Self: 'a, + { + Self { + inner: R::merge_regions(regions.clone().map(|r| &r.inner)), + storage: LgAllocVec::with_capacity(regions.map(|r| r.storage.len()).sum()), + } + } + + #[inline] + fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + self.inner.index(self.storage[*index]) + } + + #[inline] + fn reserve_regions<'a, I>(&mut self, regions: I) + where + Self: 'a, + I: Iterator + Clone, + { + self.inner + .reserve_regions(regions.clone().map(|r| &r.inner)); + self.storage.reserve(regions.map(|r| r.storage.len()).sum()); + } + + #[inline] + fn clear(&mut self) { + self.inner.clear(); + self.storage.clear(); + } + + #[inline] + fn heap_size(&self, mut callback: F) { + self.inner.heap_size(&mut callback); + self.storage.heap_size(callback); + } + + #[inline] + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> + where + Self: 'a, + { + R::reborrow(item) + } + } + + impl, T> Push for ItemRegion { + #[inline] + fn push(&mut self, item: T) -> Self::Index { + let index = self.inner.push(item); + self.storage.push(index); + MzIndex(self.storage.len() - 1) + } + } + + impl, T> ReserveItems for ItemRegion { + #[inline] + fn reserve_items(&mut self, items: I) + where + I: Iterator + Clone, + { + self.inner.reserve_items(items.clone()); + self.storage.reserve(items.count()); + } + } + + #[cfg(feature = "differential")] + mod differential { + use differential_dataflow::trace::implementations::merge_batcher_flat::RegionUpdate; + use differential_dataflow::trace::implementations::Update; + + use crate::flatcontainer::{ItemRegion, MzRegion}; + + impl Update for ItemRegion + where + UR: Update + MzRegion, + UR::Owned: Clone + Ord, + for<'a> UR::ReadItem<'a>: Copy + Ord, + { + type Key = UR::Key; + type Val = UR::Val; + type Time = UR::Time; + type Diff = UR::Diff; + } + + impl RegionUpdate for ItemRegion + where + UR: RegionUpdate + MzRegion, + for<'a> UR::ReadItem<'a>: Copy + Ord, + { + type Key<'a> = UR::Key<'a> where Self: 'a; + type Val<'a> = UR::Val<'a> where Self: 'a; + type Time<'a> = UR::Time<'a> where Self: 'a; + type TimeOwned = UR::TimeOwned; + type Diff<'a> = UR::Diff<'a> where Self: 'a; + type DiffOwned = UR::DiffOwned; + + #[inline] + fn into_parts<'a>( + item: Self::ReadItem<'a>, + ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { + UR::into_parts(item) + } + + #[inline] + fn reborrow_key<'b, 'a: 'b>(item: Self::Key<'a>) -> Self::Key<'b> + where + Self: 'a, + { + UR::reborrow_key(item) + } + + #[inline] + fn reborrow_val<'b, 'a: 'b>(item: Self::Val<'a>) -> Self::Val<'b> + where + Self: 'a, + { + UR::reborrow_val(item) + } + + #[inline] + fn reborrow_time<'b, 'a: 'b>(item: Self::Time<'a>) -> Self::Time<'b> + where + Self: 'a, + { + UR::reborrow_time(item) + } + + #[inline] + fn reborrow_diff<'b, 'a: 'b>(item: Self::Diff<'a>) -> Self::Diff<'b> + where + Self: 'a, + { + UR::reborrow_diff(item) + } + } + } +} + +mod lgallocvec { + //! A vector-like structure that stores its contents in lgalloc. + + use crate::flatcontainer::MzIndex; + use crate::region::LgAllocVec; + use flatcontainer::impls::index::IndexContainer; + use flatcontainer::impls::storage::Storage; + use flatcontainer::{Push, Region, ReserveItems}; + + impl Region for LgAllocVec { + type Owned = T; + type ReadItem<'a> = &'a T where Self: 'a; + type Index = MzIndex; + + #[inline] + fn merge_regions<'a>(regions: impl Iterator + Clone) -> Self + where + Self: 'a, + { + Self::with_capacity(regions.map(LgAllocVec::len).sum()) + } + + #[inline] + fn index(&self, index: Self::Index) -> Self::ReadItem<'_> { + &self[*index] + } + + #[inline] + fn reserve_regions<'a, I>(&mut self, regions: I) + where + Self: 'a, + I: Iterator + Clone, + { + self.reserve(regions.map(LgAllocVec::len).sum()); + } + + #[inline] + fn clear(&mut self) { + self.clear(); + } + + #[inline] + fn heap_size(&self, mut callback: F) { + let size_of_t = std::mem::size_of::(); + callback(self.len() * size_of_t, self.capacity() * size_of_t); + } + + #[inline] + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> + where + Self: 'a, + { + item + } + } + + impl Storage for LgAllocVec { + #[inline] + fn with_capacity(capacity: usize) -> Self { + Self::with_capacity(capacity) + } + + #[inline] + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } + + #[inline] + fn clear(&mut self) { + self.clear(); + } + + #[inline] + fn heap_size(&self, callback: F) { + self.heap_size(callback); + } + + #[inline] + fn len(&self) -> usize { + self.len() + } + + #[inline] + fn is_empty(&self) -> bool { + self.is_empty() + } + } + + impl IndexContainer for LgAllocVec { + type Iter<'a> = std::iter::Copied> + where + Self: 'a; + + #[inline] + fn index(&self, index: usize) -> T { + self[index] + } + + #[inline] + fn push(&mut self, item: T) { + self.push(item); + } + + #[inline] + fn extend>(&mut self, iter: I) + where + I::IntoIter: ExactSizeIterator, + { + for item in iter { + self.push(item); + } + } + + #[inline] + fn iter(&self) -> Self::Iter<'_> { + self.iter().copied() + } + } + + impl Push for LgAllocVec { + #[inline] + fn push(&mut self, item: T) -> Self::Index { + self.push(item); + MzIndex(self.len() - 1) + } + } + + impl Push<&T> for LgAllocVec { + #[inline] + fn push(&mut self, item: &T) -> Self::Index { + self.push(item.clone()); + MzIndex(self.len() - 1) + } + } + + impl Push<&&T> for LgAllocVec { + #[inline] + fn push(&mut self, item: &&T) -> Self::Index { + self.push((*item).clone()); + MzIndex(self.len() - 1) + } + } + + impl ReserveItems for LgAllocVec { + #[inline] + fn reserve_items(&mut self, items: I) + where + I: Iterator + Clone, + { + self.reserve(items.count()); + } + } + + #[cfg(test)] + mod tests { + #[crate::test] + fn vec() { + use flatcontainer::{Push, Region, ReserveItems}; + + use crate::region::LgAllocVec; + + let mut region = LgAllocVec::::default(); + let index = <_ as Push<_>>::push(&mut region, 42); + assert_eq!(region.index(index), &42); + + let mut region = LgAllocVec::::default(); + let i0 = <_ as Push<_>>::push(&mut region, 42); + let i1 = <_ as Push<_>>::push(&mut region, 43); + let i2 = <_ as Push<_>>::push(&mut region, 44); + region.reserve_items([1, 2, 3].iter()); + assert_eq!(region.index(i0), &42); + assert_eq!(region.index(i1), &43); + assert_eq!(region.index(i2), &44); + } + } +} + +mod offset { + use crate::flatcontainer::MzIndex; + use flatcontainer::impls::index::{IndexContainer, IndexOptimized}; + use flatcontainer::impls::storage::Storage; + + /// TODO + #[derive(Default, Clone, Debug)] + pub struct MzIndexOptimized(IndexOptimized); + + impl Storage for MzIndexOptimized { + #[inline] + fn with_capacity(capacity: usize) -> Self { + Self(IndexOptimized::with_capacity(capacity)) + } + + #[inline] + fn reserve(&mut self, additional: usize) { + self.0.reserve(additional) + } + + #[inline] + fn clear(&mut self) { + self.0.clear(); + } + + #[inline] + fn heap_size(&self, callback: F) { + self.0.heap_size(callback); + } + + #[inline] + fn len(&self) -> usize { + self.0.len() + } + + #[inline] + fn is_empty(&self) -> bool { + self.0.is_empty() + } + } + + impl IndexContainer for MzIndexOptimized { + type Iter<'a> = MzOffsetOptimizedIter<>::Iter<'a>> + where + Self: 'a; + + #[inline] + fn index(&self, index: usize) -> MzIndex { + MzIndex(self.0.index(index)) + } + + #[inline] + fn push(&mut self, item: MzIndex) { + self.0.push(item.0); + } + + #[inline] + fn extend>(&mut self, iter: I) + where + I::IntoIter: ExactSizeIterator, + { + self.0.extend(iter.into_iter().map(|item| item.0)); + } + + #[inline] + fn iter(&self) -> Self::Iter<'_> { + MzOffsetOptimizedIter(self.0.iter()) + } + } + + impl<'a> IntoIterator for &'a MzIndexOptimized { + type Item = MzIndex; + type IntoIter = MzOffsetOptimizedIter<>::Iter<'a>>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } + } + + /// TODO + #[derive(Clone, Copy, Debug)] + pub struct MzOffsetOptimizedIter(I); + + impl Iterator for MzOffsetOptimizedIter + where + I: Iterator, + { + type Item = MzIndex; + + #[inline] + fn next(&mut self) -> Option { + self.0.next().map(MzIndex) + } + } } diff --git a/src/ore/src/lib.rs b/src/ore/src/lib.rs index f771740382898..518cf43b7ef7c 100644 --- a/src/ore/src/lib.rs +++ b/src/ore/src/lib.rs @@ -40,7 +40,7 @@ pub mod codegen; pub mod collections; pub mod env; pub mod error; -#[cfg(feature = "flatcontainer")] +#[cfg(feature = "flatcontainer_")] pub mod flatcontainer; pub mod fmt; #[cfg_attr(nightly_doc_features, doc(cfg(feature = "async")))] diff --git a/src/ore/src/region.rs b/src/ore/src/region.rs index 9fcc974dac9e8..c11ffc18e8f3d 100644 --- a/src/ore/src/region.rs +++ b/src/ore/src/region.rs @@ -19,6 +19,8 @@ use std::fmt::{Debug, Formatter}; use std::mem::ManuallyDrop; use std::ops::{Deref, DerefMut}; +pub use vec::LgAllocVec; + /// A region allocator which holds items at stable memory locations. /// /// Items once inserted will not be moved, and their locations in memory @@ -392,3 +394,348 @@ impl Drop for MMapRegion { lgalloc::deallocate(self.handle.take().unwrap()); } } + +mod vec { + use std::fmt::{Debug, Formatter}; + use std::mem::{ManuallyDrop, MaybeUninit}; + use std::ops::Deref; + use std::sync::atomic::AtomicUsize; + + /// Configuration variable to dynamically configure the cut over point from heap to lgalloc. + /// TODO: Wire up a configuration mechanism to set this value. + static LGALLOC_VEC_HEAP_LIMIT_BYTES: AtomicUsize = AtomicUsize::new(64 << 10); + + /// A fixed-length region in memory, which is either allocated from heap or lgalloc. + pub struct LgAllocVec { + /// A handle to lgalloc. None for heap allocations, Some if the memory comes from lgalloc. + handle: Option, + /// Slice representation of the memory. Elements 0..self.length are valid. + elements: ManuallyDrop]>>, + /// The number of valid elements in `elements` + length: usize, + } + + impl LgAllocVec { + /// Create a new [`LgAllocVec`] with the specified capacity. The actual capacity of the returned + /// array is at least as big as the requested capacity. + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + // Allocate memory, fall-back to regular heap allocations if we cannot acquire memory through + // lgalloc. + let bytes = capacity * std::mem::size_of::(); + if bytes <= LGALLOC_VEC_HEAP_LIMIT_BYTES.load(std::sync::atomic::Ordering::Relaxed) { + Self::new_heap(capacity) + } else { + match Self::try_new_lgalloc(capacity) { + Ok(vec) => vec, + Err(_) => Self::new_heap(capacity), + } + } + } + + /// Construct a new instance allocated on the heap. + #[inline] + fn new_heap(capacity: usize) -> Self { + let mut vec = Vec::with_capacity(capacity); + // SAFETY: We treat all elements as uninitialized and track initialized elements + // through `self.length`. + unsafe { + vec.set_len(vec.capacity()); + } + + Self { + handle: None, + elements: ManuallyDrop::new(vec.into_boxed_slice()), + length: 0, + } + } + + /// Construct a new instance allocated through lgalloc, or an error should it fail. + #[inline] + fn try_new_lgalloc(capacity: usize) -> Result { + let (ptr, actual_capacity, handle) = lgalloc::allocate::>(capacity)?; + // We allocated sucessfully through lgalloc. + let handle = Some(handle); + // SAFETY: `ptr` is valid for constructing a slice: + // 1. Valid for reading and writing, and enough capacity. + // 2. Properly initialized (left for writing). + // 3. Not aliased. + // 4. Total size not longer than isize::MAX because lgalloc has a capacity limit. + let slice = unsafe { std::slice::from_raw_parts_mut(ptr.as_ptr(), actual_capacity) }; + // SAFETY: slice is valid, and we deallocate it usinge lgalloc. + let boxed = unsafe { Box::from_raw(slice) }; + let elements = ManuallyDrop::new(boxed); + + Ok(Self { + handle, + elements, + length: 0, + }) + } + + /// Visit contained allocations to determine their size and capacity. + #[inline] + pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) { + let size_of_t = std::mem::size_of::(); + callback(self.len() * size_of_t, self.capacity() * size_of_t) + } + + /// Move an element on the array. Panics if there is no more capacity. + #[inline] + pub fn push(&mut self, item: T) { + if self.len() == self.capacity() { + self.reserve(1); + } + self.elements[self.length].write(item); + self.length += 1; + } + + /// Extend the array from a slice. Increases the capacity if required. + #[inline] + pub fn extend_from_slice(&mut self, slice: &[T]) + where + T: Clone, + { + self.reserve(slice.len()); + let mut iterator = slice.iter().cloned(); + while let Some(element) = iterator.next() { + let len = self.len(); + if len == self.capacity() { + let (lower, _) = iterator.size_hint(); + self.reserve(lower.saturating_add(1)); + } + unsafe { + std::ptr::write( + self.elements.as_mut_ptr().add(len), + MaybeUninit::new(element), + ); + self.set_len(len + 1); + } + } + } + + /// Extend the array from a slice of copyable elements. Increases the capacity if required. + #[inline] + pub fn extend_from_copy_slice(&mut self, slice: &[T]) + where + T: Copy, + { + let count = slice.len(); + self.reserve(count); + let len = self.len(); + unsafe { + #[allow(clippy::as_conversions)] + std::ptr::copy_nonoverlapping( + slice.as_ptr(), + self.elements.as_mut_ptr().add(len) as *const MaybeUninit as *mut T, + count, + ); + self.set_len(len + count); + } + } + + /// Move elements from a vector to the array. Increases the capacity if required. + #[inline] + pub fn append(&mut self, data: &mut Vec) { + let count = data.len(); + self.reserve(count); + let len = self.len(); + unsafe { + data.set_len(0); + #[allow(clippy::as_conversions)] + std::ptr::copy_nonoverlapping( + data.as_ptr(), + self.elements.as_mut_ptr().add(len) as *const MaybeUninit as *mut T, + count, + ); + self.set_len(len + count); + } + } + + /// Update the length. Highly unsafe because it doesn't drop elements when reducing the length, + /// and doesn't initialize elements when increasing the length. + #[inline] + pub unsafe fn set_len(&mut self, length: usize) { + debug_assert!(length <= self.capacity()); + self.length = length; + } + + /// The number of elements in the array. + #[inline] + pub fn len(&self) -> usize { + self.length + } + + /// Returns `true` if the array contains no elements. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// The number of elements this array can absorb. + #[inline] + pub fn capacity(&self) -> usize { + self.elements.len() + } + + /// Remove all elements. Drops the contents, but leaves the allocation untouched. + #[inline] + pub fn clear(&mut self) { + let elems = &mut self.elements[..self.length]; + // We are about to run the type's destructor, which may panic. Therefore we set the length + // of the array to zero so that if we have to unwind the stack we don't end up re-dropping + // in valid memory through the Drop impl of Array itself. + self.length = 0; + for e in elems { + // SAFETY: We know elements up to `length` are initialized. + unsafe { + e.assume_init_drop(); + } + } + } + + /// The minimum capacity for a non-zero array. + const MIN_NON_ZERO_CAP: usize = if std::mem::size_of::() == 1 { + 8 + } else if std::mem::size_of::() <= 1024 { + 4 + } else { + 1 + }; + + /// Grow the array to at least `new_len` elements. Reallocates the underlying storage. + #[cold] + fn grow(&mut self, new_len: usize) { + let new_capacity = std::cmp::max(self.capacity() * 2, new_len); + let new_capacity = std::cmp::max(new_capacity, Self::MIN_NON_ZERO_CAP); + let mut new_vec = Self::with_capacity(new_capacity); + + let src_ptr = self.elements.as_ptr(); + let dst_ptr = new_vec.elements.as_mut_ptr(); + let len = self.len(); + + unsafe { + // SAFETY: We forget the current contents momentarily. + self.set_len(0); + // SAFETY: `src_ptr` and `dst_ptr` are valid pointers to `len` elements. + std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, len); + // SAFETY: Surface exactly as many elements as we just copied.. + new_vec.set_len(len); + } + + std::mem::swap(&mut new_vec, self); + } + + /// Reserve space for at least `additional` elements. The capacity is increased if necessary. + #[inline] + pub fn reserve(&mut self, additional: usize) { + let new_len = self.len() + additional; + if new_len > self.capacity() { + self.grow(new_len); + } + } + + /// Iterate over the elements. + #[inline] + pub fn iter(&self) -> std::slice::Iter<'_, T> { + self.deref().iter() + } + } + + impl Clone for LgAllocVec { + fn clone(&self) -> Self { + let mut new_vec = LgAllocVec::with_capacity(self.len()); + new_vec.extend_from_slice(self); + new_vec + } + + fn clone_from(&mut self, source: &Self) { + // TODO: Optimize for reuse of existing elements. + self.clear(); + self.extend_from_slice(source); + } + } + + impl Default for LgAllocVec { + #[inline] + fn default() -> Self { + Self { + handle: None, + elements: ManuallyDrop::new(Vec::new().into_boxed_slice()), + length: 0, + } + } + } + + impl Deref for LgAllocVec { + type Target = [T]; + + #[inline] + fn deref(&self) -> &Self::Target { + // TODO: Use `slice_assume_init_ref` once stable. + // Context: https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.slice_assume_init_ref + // The following safety argument is adapted from the source. + // SAFETY: casting `elements` to a `*const [T]` is safe since the caller guarantees that + // `slice` is initialized, and `MaybeUninit` is guaranteed to have the same layout as `T`. + // The pointer obtained is valid since it refers to memory owned by `elements` which is a + // reference and thus guaranteed to be valid for reads. + #[allow(clippy::as_conversions)] + unsafe { + &*(&self.elements[..self.length] as *const [MaybeUninit] as *const [T]) + } + } + } + + impl Drop for LgAllocVec { + #[inline] + fn drop(&mut self) { + // Clear the contents, but don't drop the allocation. + self.clear(); + + if let Some(handle) = self.handle.take() { + // Memory allocated through lgalloc, deallocate accordingly. + lgalloc::deallocate(handle); + } else { + // Regular heap allocation + // SAFETY: `elements` is a sliced box allocated from the global allocator, drop it. + unsafe { + ManuallyDrop::drop(&mut self.elements); + } + } + } + } + + impl Debug for LgAllocVec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.deref().fmt(f) + } + } + + #[cfg(test)] + mod test { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use super::*; + + #[crate::test] + fn double_drop() { + static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); + struct DropGuard; + + impl Drop for DropGuard { + fn drop(&mut self) { + let drops = DROP_COUNT.fetch_add(1, Ordering::Relaxed); + // If this is the first time we're being dropped, panic. + if drops == 0 { + panic!(); + } + } + } + + let mut array = LgAllocVec::with_capacity(1); + array.push(DropGuard); + let _ = mz_ore::panic::catch_unwind(move || array.clear()); + assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 1); + } + } +} diff --git a/src/repr/Cargo.toml b/src/repr/Cargo.toml index d9b3f8ebe1df2..feff320a7dab7 100644 --- a/src/repr/Cargo.toml +++ b/src/repr/Cargo.toml @@ -40,14 +40,14 @@ differential-dataflow = "0.12.0" enum_dispatch = "0.3.11" enum-kinds = "0.5.1" fast-float = "0.2.0" -flatcontainer = "0.4.1" +flatcontainer = "0.5.0" hex = "0.4.3" itertools = "0.10.5" once_cell = "1.16.0" mz-lowertest = { path = "../lowertest" } mz-ore = { path = "../ore", features = [ "bytes_", - "flatcontainer", + "flatcontainer_", "id_gen", "smallvec", "region", diff --git a/src/timely-util/src/containers.rs b/src/timely-util/src/containers.rs index 9431900d8edfd..521daf1560e32 100644 --- a/src/timely-util/src/containers.rs +++ b/src/timely-util/src/containers.rs @@ -9,5 +9,90 @@ //! Reusable containers. +use std::collections::VecDeque; + +use timely::container::flatcontainer::impls::index::IndexContainer; +use timely::container::flatcontainer::{FlatStack, Push, Region}; +use timely::container::{CapacityContainer, ContainerBuilder, PushInto}; +use timely::Container; + pub mod array; pub mod stack; + +/// A container builder that uses length and preferred capacity to chunk data. Preallocates the next +/// container based on the capacity of the previous one once a container is full. +/// +/// Ideally, we'd have a `TryPush` trait that would fail if a push would cause a reallocation, but +/// we aren't there yet. +/// +/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not +/// across [`Self::finish`] to maintain a low memory footprint. +/// +/// Maintains FIFO order. +#[derive(Default, Debug)] +pub struct PreallocatingCapacityContainerBuilder { + /// Container that we're writing to. + current: Option, + /// Emtpy allocation. + empty: Option, + /// Completed containers pending to be sent. + pending: VecDeque, +} + +impl PushInto for PreallocatingCapacityContainerBuilder> +where + R: Region + Push + Clone + 'static, + S: IndexContainer + Clone + 'static, + FlatStack: CapacityContainer, +{ + #[inline] + fn push_into(&mut self, item: T) { + if self.current.is_none() { + let mut empty = self.empty.take().unwrap_or_default(); + empty.clear(); + self.current = Some(empty); + } + + let current = self.current.as_mut().unwrap(); + + // Ensure capacity + current.ensure_preferred_capacity(); + // Push item + current.push(item); + + // Maybe flush + if current.len() >= FlatStack::::preferred_capacity() { + let pending = std::mem::take(current); + *current = FlatStack::merge_capacity(std::iter::once(&pending)); + self.pending.push_back(pending); + } + } +} + +impl ContainerBuilder for PreallocatingCapacityContainerBuilder> +where + R: Region + Clone + 'static, + S: IndexContainer + Clone + 'static, + FlatStack: CapacityContainer, +{ + type Container = FlatStack; + + #[inline] + fn extract(&mut self) -> Option<&mut Self::Container> { + self.empty = Some(self.pending.pop_front()?); + self.empty.as_mut() + } + + #[inline] + fn finish(&mut self) -> Option<&mut Self::Container> { + let current = self.current.as_mut(); + if current.as_ref().map_or(false, |c| !c.is_empty()) { + let current = current.unwrap(); + let pending = std::mem::take(current); + *current = FlatStack::merge_capacity(std::iter::once(&pending)); + self.pending.push_back(pending); + } + self.empty = self.pending.pop_front(); + self.empty.as_mut() + } +}