diff --git a/.gitignore b/.gitignore index 9d11d1ef..a88387b0 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ Cargo.lock tests/data/sync/ /tests/data/unsized_secondary_sync/ /tests/data/unsized_primary_sync/ +/tests/data/key/ +/tests/data/uuid/ +/tests/data/concurrent diff --git a/Cargo.toml b/Cargo.toml index e665a16a..01d675d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,14 +27,14 @@ lockfree = { version = "0.5.1" } fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } -data_bucket = "0.3.0" +# data_bucket = "0.3.0" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -# data_bucket = { path = "../DataBucket", version = "0.2.10" } +data_bucket = { path = "../DataBucket", version = "0.3.0" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } # indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } -# indexset = { package = "wt-indexset", path = "../indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] } -indexset = { package = "wt-indexset", version = "0.12.6", features = ["concurrent", "cdc", "multimap"] } +indexset = { package = "wt-indexset", path = "../indexset", version = "0.12.6", features = ["concurrent", "cdc", "multimap"] } +# indexset = { package = "wt-indexset", version = "0.12.6", features = ["concurrent", "cdc", "multimap"] } convert_case = "0.6.0" ordered-float = "5.0.0" parking_lot = "0.12.3" diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index 89bf166f..5617ff9b 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -77,7 +77,7 @@ impl Generator { quote! { let secondary_keys_events = self.0.indexes.delete_row_cdc(row, link)?; let (_, primary_key_events) = TableIndexCdc::remove_cdc(&self.0.pk_map, pk.clone(), link); - self.0.data.delete(link).map_err(WorkTableError::PagesError)?; + let empty_link_registry_ops = self.0.data.delete(link).map_err(WorkTableError::PagesError)?; let mut op: Operation< <<#pk_ident as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State, #pk_ident, @@ -86,6 +86,7 @@ impl Generator { id: uuid::Uuid::now_v7().into(), secondary_keys_events, primary_key_events, + empty_link_registry_operations: empty_link_registry_ops, link, }); self.2.apply_operation(op); diff --git a/codegen/src/worktable/generator/row.rs b/codegen/src/worktable/generator/row.rs index 5d353687..8827051e 100644 --- a/codegen/src/worktable/generator/row.rs +++ b/codegen/src/worktable/generator/row.rs @@ -85,7 +85,7 @@ impl Generator { }; quote! { - #[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat)] + #[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat, SizeMeasure)] #custom_derives #[rkyv(derive(Debug))] #[repr(C)] diff --git a/codegen/src/worktable/generator/table/mod.rs b/codegen/src/worktable/generator/table/mod.rs index fb096078..8e41b1d4 100644 --- a/codegen/src/worktable/generator/table/mod.rs +++ b/codegen/src/worktable/generator/table/mod.rs @@ -83,6 +83,13 @@ impl Generator { }) .collect::>(); let pk_types_unsized = is_unsized_vec(pk_types); + let row_types = &self + .columns + .columns_map + .values() + .map(|t| t.to_string()) + .collect::>(); + let row_unsized = is_unsized_vec(row_types); let derive = if self.is_persist { if pk_types_unsized { quote! { @@ -108,6 +115,15 @@ impl Generator { Vec> } }; + let empty_links_type = if row_unsized { + quote! { + UnsizedEmptyLinkRegistry + } + } else { + quote! { + SizedEmptyLinkRegistry + } + }; if self.config.as_ref().and_then(|c| c.page_size).is_some() { quote! { @@ -116,6 +132,7 @@ impl Generator { WorkTable< #row_type, #primary_key_type, + #empty_links_type, #avt_type_ident, #avt_index_ident, #index_type, @@ -134,6 +151,7 @@ impl Generator { WorkTable< #row_type, #primary_key_type, + #empty_links_type, #avt_type_ident, #avt_index_ident, #index_type, diff --git a/codegen/src/worktable/generator/wrapper.rs b/codegen/src/worktable/generator/wrapper.rs index b77f0ad6..455d946a 100644 --- a/codegen/src/worktable/generator/wrapper.rs +++ b/codegen/src/worktable/generator/wrapper.rs @@ -24,7 +24,7 @@ impl Generator { let wrapper_ident = name_generator.get_wrapper_type_ident(); quote! { - #[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] + #[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize, SizeMeasure)] #[repr(C)] pub struct #wrapper_ident { inner: #row_ident, diff --git a/src/in_memory/data.rs b/src/in_memory/data.rs index 0e824cbd..427e09d4 100644 --- a/src/in_memory/data.rs +++ b/src/in_memory/data.rs @@ -5,10 +5,10 @@ use std::sync::atomic::{AtomicU32, Ordering}; use data_bucket::page::INNER_PAGE_SIZE; use data_bucket::page::PageId; -use data_bucket::{DataPage, GeneralPage}; +use data_bucket::{ + DataPage, DefaultSizeMeasurable, GeneralPage, SizeMeasurable, SizeMeasure, align, +}; use derive_more::{Display, Error}; -#[cfg(feature = "perf_measurements")] -use performance_measurement_codegen::performance_measurement; use rkyv::{ Archive, Deserialize, Portable, Serialize, api::high::HighDeserializer, @@ -45,6 +45,28 @@ impl DerefMut for AlignedBytes { } } +/// Value that represents size of the row which will be written just after this +/// value. +/// +/// `u64` is used to keep correct alignment +#[derive( + Archive, + Copy, + Clone, + Deserialize, + Debug, + Default, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + Serialize, + SizeMeasure, +)] +#[rkyv(derive(Debug, PartialOrd, PartialEq, Eq, Ord))] +pub struct RowLength(pub u64); + #[derive(Archive, Deserialize, Debug, Serialize)] pub struct Data { /// [`Id`] of the [`General`] page of this [`Data`]. @@ -92,10 +114,6 @@ impl Data { self.id = id; } - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "DataRow") - )] pub fn save_row(&self, row: &Row) -> Result where Row: Archive @@ -105,7 +123,10 @@ impl Data { { let bytes = rkyv::to_bytes::(row) .map_err(|_| ExecutionError::SerializeError)?; - let length = bytes.len(); + let link_length = bytes.len(); + let length_bytes = rkyv::to_bytes::(&RowLength(link_length as u64)) + .map_err(|_| ExecutionError::SerializeError)?; + let length = link_length + RowLength::default_aligned_size(); if length > DATA_LENGTH { return Err(ExecutionError::PageTooSmall { need: length, @@ -113,7 +134,7 @@ impl Data { }); } let length = length as u32; - let offset = self.free_offset.fetch_add(length, Ordering::AcqRel); + let mut offset = self.free_offset.fetch_add(length, Ordering::AcqRel); if offset > DATA_LENGTH as u32 - length { return Err(ExecutionError::PageIsFull { need: length, @@ -122,40 +143,62 @@ impl Data { } let inner_data = unsafe { &mut *self.inner_data.get() }; - inner_data[offset as usize..][..length as usize].copy_from_slice(bytes.as_slice()); + inner_data[offset as usize..][..RowLength::default_aligned_size()] + .copy_from_slice(length_bytes.as_slice()); + offset += RowLength::default_aligned_size() as u32; + inner_data[offset as usize..][..link_length].copy_from_slice(bytes.as_slice()); let link = Link { page_id: self.id, offset, - length, + length: link_length as u32, }; Ok(link) } #[allow(clippy::missing_safety_doc)] - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "DataRow") - )] - pub unsafe fn save_row_by_link(&self, row: &Row, link: Link) -> Result + pub unsafe fn save_row_by_link( + &self, + row: &Row, + mut link: Link, + ) -> Result<(Link, Option), ExecutionError> where Row: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, >, { + // links in this fn should be used from EmptyLinkRegistry (returned after `delete` method) + let bytes = rkyv::to_bytes(row).map_err(|_| ExecutionError::SerializeError)?; let length = bytes.len() as u32; - if length != link.length { + let length_bytes = rkyv::to_bytes::(&RowLength(length as u64)) + .map_err(|_| ExecutionError::SerializeError)?; + + let link_left = if link.length > length + RowLength::default_aligned_size() as u32 { + Some(Link { + page_id: link.page_id, + offset: link.offset + length + RowLength::default_aligned_size() as u32, + length: link.length - (length + RowLength::default_aligned_size() as u32), + }) + } else if link.length == length + RowLength::default_aligned_size() as u32 { + None + } else { return Err(ExecutionError::InvalidLink); - } + }; + + link.offset += RowLength::default_aligned_size() as u32; + link.length = length; let inner_data = unsafe { &mut *self.inner_data.get() }; + inner_data[link.offset as usize - RowLength::default_aligned_size()..] + [..RowLength::default_aligned_size()] + .copy_from_slice(length_bytes.as_slice()); inner_data[link.offset as usize..][..link.length as usize] .copy_from_slice(bytes.as_slice()); - Ok(link) + Ok((link, link_left)) } /// # Safety @@ -179,10 +222,6 @@ impl Data { Ok(unsafe { rkyv::access_unchecked_mut::<::Archived>(&mut bytes[..]) }) } - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "DataRow") - )] pub fn get_row_ref(&self, link: Link) -> Result<&::Archived, ExecutionError> where Row: Archive, @@ -196,10 +235,44 @@ impl Data { Ok(unsafe { rkyv::access_unchecked::<::Archived>(bytes) }) } - //#[cfg_attr( - // feature = "perf_measurements", - // performance_measurement(prefix_name = "DataRow") - //)] + pub fn get_next_lying_row_ref( + &self, + link: Link, + ) -> Result<(&::Archived, Link), ExecutionError> + where + Row: Archive, + ::Archived: Deserialize>, + { + if link.offset > self.free_offset.load(Ordering::Acquire) + || link.offset + link.length + RowLength::default_aligned_size() as u32 + > self.free_offset.load(Ordering::Acquire) + { + return Err(ExecutionError::InvalidLink); + } + + let inner_data = unsafe { &*self.inner_data.get() }; + let length_bytes = &inner_data[(link.offset + link.length) as usize..] + [..RowLength::default_aligned_size()]; + let archived = + unsafe { rkyv::access_unchecked::<::Archived>(length_bytes) }; + let length = rkyv::deserialize::(archived) + .map_err(|_| ExecutionError::DeserializeError)? + .0; + let offset = (link.offset + link.length) as usize + RowLength::default_aligned_size(); + let bytes = &inner_data[offset..][..length as usize]; + + let row_link = Link { + page_id: link.page_id, + offset: link.offset + link.length + RowLength::default_aligned_size() as u32, + length: length as u32, + }; + + Ok(( + unsafe { rkyv::access_unchecked::<::Archived>(bytes) }, + row_link, + )) + } + pub fn get_row(&self, link: Link) -> Result where Row: Archive, @@ -210,6 +283,19 @@ impl Data { .map_err(|_| ExecutionError::DeserializeError) } + pub fn get_next_lying_row(&self, link: Link) -> Result<(Row, Link), ExecutionError> + where + Row: Archive, + ::Archived: Deserialize>, + { + let (row, row_link) = self.get_next_lying_row_ref(link)?; + Ok(( + rkyv::deserialize::<_, rkyv::rancor::Error>(row) + .map_err(|_| ExecutionError::DeserializeError)?, + row_link, + )) + } + pub fn get_raw_row(&self, link: Link) -> Result, ExecutionError> { if link.offset > self.free_offset.load(Ordering::Acquire) { return Err(ExecutionError::DeserializeError); @@ -220,10 +306,23 @@ impl Data { Ok(bytes.to_vec()) } + pub fn delete_row(mut link: Link) -> Link { + // we don't remove data on delete, but we need to append space allocated + // for length bytes to link + link.offset -= RowLength::default_aligned_size() as u32; + link.length += RowLength::default_aligned_size() as u32; + + link + } + pub fn get_bytes(&self) -> [u8; DATA_LENGTH] { let data = unsafe { &*self.inner_data.get() }; data.0 } + + pub(crate) fn set_free_offset(&self, offset: u32) { + self.free_offset.store(offset, Ordering::Relaxed) + } } /// Error that can appear on [`Data`] page operations. @@ -253,9 +352,12 @@ mod tests { use std::sync::{Arc, mpsc}; use std::thread; + use data_bucket::{DefaultSizeMeasurable, Link}; use rkyv::{Archive, Deserialize, Serialize}; - use crate::in_memory::data::{Data, ExecutionError, INNER_PAGE_SIZE}; + use crate::in_memory::data::{ + ArchivedRowLength, Data, ExecutionError, INNER_PAGE_SIZE, RowLength, + }; #[derive( Archive, Copy, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, @@ -266,6 +368,15 @@ mod tests { b: u64, } + #[derive( + Archive, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, + )] + #[rkyv(compare(PartialEq), derive(Debug))] + struct UnsizedTestRow { + a: u64, + b: String, + } + #[test] fn data_page_length_valid() { let data = Data::<()>::new(1.into()); @@ -282,13 +393,16 @@ mod tests { let link = page.save_row(&row).unwrap(); assert_eq!(link.page_id, page.id); assert_eq!(link.length, 16); - assert_eq!(link.offset, 0); + assert_eq!(link.offset, RowLength::default_aligned_size() as u32); - assert_eq!(page.free_offset.load(Ordering::Relaxed), link.length); + assert_eq!( + page.free_offset.load(Ordering::Relaxed), + link.length + RowLength::default_aligned_size() as u32 + ); let inner_data = unsafe { &mut *page.inner_data.get() }; - let bytes = &inner_data[link.offset as usize..link.length as usize]; - let archived = unsafe { rkyv::access_unchecked::(bytes) }; + let bytes = &inner_data[link.offset as usize..(link.offset + link.length) as usize]; + let archived = unsafe { rkyv::access_unchecked::(&bytes) }; assert_eq!(archived, &row) } @@ -298,21 +412,64 @@ mod tests { let row = TestRow { a: 10, b: 20 }; let link = page.save_row(&row).unwrap(); + let res_link = Data::::delete_row(link); let new_row = TestRow { a: 20, b: 20 }; - let res = unsafe { page.save_row_by_link(&new_row, link) }.unwrap(); + let res = unsafe { page.save_row_by_link(&new_row, res_link) }.unwrap(); - assert_eq!(res, link); + assert_eq!(res, (link, None)); let inner_data = unsafe { &mut *page.inner_data.get() }; - let bytes = &inner_data[link.offset as usize..link.length as usize]; + let bytes = &inner_data[link.offset as usize..(link.offset + link.length) as usize]; let archived = unsafe { rkyv::access_unchecked::(bytes) }; assert_eq!(archived, &new_row) } + #[test] + fn data_page_overwrite_row_unsized() { + let page = Data::::new(1.into()); + let row = UnsizedTestRow { + a: 10, + b: "SomeString__________2000".to_string(), + }; + + let link = page.save_row(&row).unwrap(); + let res_link = Data::::delete_row(link); + + let new_row = UnsizedTestRow { + a: 20, + b: "SomeString".to_string(), + }; + let (res_link, left_link) = unsafe { page.save_row_by_link(&new_row, res_link) }.unwrap(); + + assert_ne!(res_link, link); + assert_eq!( + left_link, + Some(Link { + page_id: link.page_id, + offset: 40, + length: 8 + }) + ); + assert_eq!(left_link.unwrap().offset, res_link.offset + res_link.length); + assert_eq!(left_link.unwrap().length, link.length - res_link.length); + + let inner_data = unsafe { &mut *page.inner_data.get() }; + let bytes = + &inner_data[res_link.offset as usize..(res_link.offset + res_link.length) as usize]; + let archived = unsafe { rkyv::access_unchecked::(bytes) }; + assert_eq!(archived, &new_row); + + let length_bytes = &inner_data[res_link.offset as usize - RowLength::default_aligned_size() + ..res_link.offset as usize]; + let archived = unsafe { rkyv::access_unchecked::(length_bytes) }; + let length = rkyv::deserialize::(archived).unwrap(); + assert_eq!(length.0 as u32, res_link.length); + } + #[test] fn data_page_full() { - let page = Data::::new(1.into()); + let page = Data::::new(1.into()); let row = TestRow { a: 10, b: 20 }; let _ = page.save_row(&row).unwrap(); @@ -417,6 +574,19 @@ mod tests { assert_eq!(deserialized, row) } + #[test] + fn data_page_get_next_row() { + let page = Data::::new(1.into()); + let row1 = TestRow { a: 10, b: 20 }; + let link1 = page.save_row(&row1).unwrap(); + let row2 = TestRow { a: 40, b: 50 }; + let link2 = page.save_row(&row2).unwrap(); + + let (deserialized, res_link) = page.get_next_lying_row(link1).unwrap(); + assert_eq!(deserialized, row2); + assert_eq!(res_link, link2) + } + #[test] fn multithread() { let page = Data::::new(1.into()); diff --git a/src/in_memory/empty_links_registry/mod.rs b/src/in_memory/empty_links_registry/mod.rs new file mode 100644 index 00000000..6d00bbd8 --- /dev/null +++ b/src/in_memory/empty_links_registry/mod.rs @@ -0,0 +1,31 @@ +mod sized; +mod unsized_; + +use data_bucket::Link; + +pub use sized::SizedEmptyLinkRegistry; +pub use unsized_::UnsizedEmptyLinkRegistry; + +/// [`EmptyLinksRegistry`] is used for storing [`Link`]'s after their release in +/// [`DataPages`]. +/// +/// [`DataPages`]: crate::in_memory::DataPages +pub trait EmptyLinksRegistry { + /// Stores empty [`Link`] in this registry. + fn add_empty_link(&self, link: Link); + + /// Returns [`Link`] that will be enough to fit data with provided `size`. + fn find_link_with_length(&self, size: u32) -> Option; + + /// Pop's all [`Link`]'s from this [`EmptyLinksRegistry`] for further + /// operations. + fn as_vec(&self) -> Vec; + + /// Returns length of this [`EmptyLinksRegistry`]. + fn len(&self) -> usize; + + /// Checks if this [`EmptyLinksRegistry`] is empty. + fn is_empty(&self) -> bool { + self.len() == 0 + } +} diff --git a/src/in_memory/empty_links_registry/sized.rs b/src/in_memory/empty_links_registry/sized.rs new file mode 100644 index 00000000..b67cdae9 --- /dev/null +++ b/src/in_memory/empty_links_registry/sized.rs @@ -0,0 +1,40 @@ +use crate::in_memory::empty_links_registry::EmptyLinksRegistry; +use data_bucket::Link; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// [`EmptyLinksRegistry`] that should be used for sized `Row`'s. It uses +/// [`lockfree::Stack`] under hood and just [`pop`]'s [`Link`] when any is needed +/// because sized `Row`'s all have same size. +/// +/// [`lockfree::Stack`]: lockfree::stack::Stack +/// [`pop`]: lockfree::stack::Stack::pop +#[derive(Debug, Default)] +pub struct SizedEmptyLinkRegistry { + stack: lockfree::stack::Stack, + length: AtomicUsize, +} + +impl EmptyLinksRegistry for SizedEmptyLinkRegistry { + fn add_empty_link(&self, link: Link) { + self.stack.push(link); + self.length.fetch_add(1, Ordering::Relaxed); + } + + fn find_link_with_length(&self, _size: u32) -> Option { + // `size` can be ignored as sized row's all have same size. + if let Some(val) = self.stack.pop() { + self.length.fetch_sub(1, Ordering::Relaxed); + Some(val) + } else { + None + } + } + + fn as_vec(&self) -> Vec { + self.stack.pop_iter().collect() + } + + fn len(&self) -> usize { + self.length.load(Ordering::Relaxed) + } +} diff --git a/src/in_memory/empty_links_registry/unsized_.rs b/src/in_memory/empty_links_registry/unsized_.rs new file mode 100644 index 00000000..e020b179 --- /dev/null +++ b/src/in_memory/empty_links_registry/unsized_.rs @@ -0,0 +1,35 @@ +use data_bucket::{DefaultSizeMeasurable, Link}; + +use crate::IndexMultiMap; +use crate::in_memory::RowLength; +use crate::in_memory::empty_links_registry::EmptyLinksRegistry; + +#[derive(Debug, Default)] +pub struct UnsizedEmptyLinkRegistry(IndexMultiMap); + +impl EmptyLinksRegistry for UnsizedEmptyLinkRegistry { + fn add_empty_link(&self, link: Link) { + self.0.insert(link.length, link); + } + + fn find_link_with_length(&self, size: u32) -> Option { + if let Some(link) = self.0.remove_max().map(|(_, l)| l) { + if link.length < size + RowLength::default_aligned_size() as u32 { + self.0.insert(link.length, link); + None + } else { + Some(link) + } + } else { + None + } + } + + fn as_vec(&self) -> Vec { + self.0.drain().into_iter().map(|p| p.value).collect() + } + + fn len(&self) -> usize { + self.0.len() + } +} diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index 1a14334e..b299cbb9 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -1,7 +1,11 @@ mod data; +mod empty_links_registry; mod pages; mod row; -pub use data::{DATA_INNER_LENGTH, Data, ExecutionError as DataExecutionError}; -pub use pages::{DataPages, ExecutionError as PagesExecutionError}; +pub use data::{DATA_INNER_LENGTH, Data, ExecutionError as DataExecutionError, RowLength}; +pub use empty_links_registry::{ + EmptyLinksRegistry, SizedEmptyLinkRegistry, UnsizedEmptyLinkRegistry, +}; +pub use pages::{DataPages, ExecutionError as PagesExecutionError, InsertCdcOutput}; pub use row::{GhostWrapper, Query, RowWrapper, StorableRow}; diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 65823b94..bd18eeca 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -1,14 +1,13 @@ use std::{ fmt::Debug, + sync::Arc, sync::atomic::{AtomicU32, AtomicU64, Ordering}, - sync::{Arc, RwLock}, }; +use data_bucket::SizeMeasurable; use data_bucket::page::PageId; use derive_more::{Display, Error, From}; -use lockfree::stack::Stack; -#[cfg(feature = "perf_measurements")] -use performance_measurement_codegen::performance_measurement; +use parking_lot::RwLock; use rkyv::{ Archive, Deserialize, Portable, Serialize, api::high::HighDeserializer, @@ -17,6 +16,8 @@ use rkyv::{ util::AlignedVec, }; +use crate::in_memory::empty_links_registry::EmptyLinksRegistry; +use crate::prelude::EmptyLinkRegistryOperation; use crate::{ in_memory::{ DATA_INNER_LENGTH, Data, DataExecutionError, @@ -30,16 +31,15 @@ fn page_id_mapper(page_id: usize) -> usize { } #[derive(Debug)] -pub struct DataPages +pub struct DataPages where Row: StorableRow, { /// Pages vector. Currently, not lock free. pages: RwLock::WrappedRow, DATA_LENGTH>>>>, - /// Stack with empty [`Link`]s. It stores [`Link`]s of rows that was deleted. - // TODO: Proper empty links registry + defragmentation - empty_links: Stack, + /// Registry with empty [`Link`]s. It stores [`Link`]s of rows that was deleted. + empty_links: EmptyLinks, /// Count of saved rows. row_count: AtomicU64, @@ -49,8 +49,16 @@ where current_page_id: AtomicU32, } -impl Default for DataPages +#[derive(Debug)] +pub struct InsertCdcOutput { + pub link: Link, + pub data_bytes: Vec, + pub empty_link_registry_ops: Vec, +} + +impl Default for DataPages where + EmptyLinks: Default + EmptyLinksRegistry, Row: StorableRow, ::WrappedRow: RowWrapper, { @@ -59,8 +67,9 @@ where } } -impl DataPages +impl DataPages where + EmptyLinks: Default + EmptyLinksRegistry, Row: StorableRow, ::WrappedRow: RowWrapper, { @@ -68,7 +77,7 @@ where Self { // We are starting ID's from `1` because `0`'s page in file is info page. pages: RwLock::new(vec![Arc::new(Data::new(1.into()))]), - empty_links: Stack::new(), + empty_links: EmptyLinks::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(1), current_page_id: AtomicU32::new(1), @@ -83,7 +92,7 @@ where let last_page_id = vec.len(); Self { pages: RwLock::new(vec), - empty_links: Stack::new(), + empty_links: EmptyLinks::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(last_page_id as u32), current_page_id: AtomicU32::new(last_page_id as u32), @@ -98,35 +107,64 @@ where Strategy, Share>, rkyv::rancor::Error>, >, ::WrappedRow: Archive + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + > + SizeMeasurable, + { + self.insert_inner(row).map(|r| r.0) + } + + fn insert_inner( + &self, + row: Row, + ) -> Result<(Link, Vec), ExecutionError> + where + Row: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, >, + ::WrappedRow: Archive + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + > + SizeMeasurable, { let general_row = ::WrappedRow::from_inner(row); - if let Some(link) = self.empty_links.pop() { - let pages = self.pages.read().unwrap(); + if let Some(link) = self + .empty_links + .find_link_with_length(general_row.aligned_size() as u32) + { + let mut empty_link_registry_ops = vec![]; + empty_link_registry_ops.push(EmptyLinkRegistryOperation::Remove(link)); + + let pages = self.pages.read(); let current_page: usize = page_id_mapper(link.page_id.into()); let page = &pages[current_page]; - if let Err(e) = unsafe { page.save_row_by_link(&general_row, link) } { - match e { + match unsafe { page.save_row_by_link(&general_row, link) } { + Ok((link, left_link)) => { + if let Some(link) = left_link { + empty_link_registry_ops.push(EmptyLinkRegistryOperation::Add(link)); + self.empty_links.add_empty_link(link) + } + return Ok((link, empty_link_registry_ops)); + } + Err(e) => match e { DataExecutionError::InvalidLink => { - self.empty_links.push(link); + empty_link_registry_ops.pop(); + self.empty_links.add_empty_link(link); } DataExecutionError::PageIsFull { .. } | DataExecutionError::PageTooSmall { .. } | DataExecutionError::SerializeError | DataExecutionError::DeserializeError => return Err(e.into()), - } - } else { - return Ok(link); - }; + }, + } } loop { let (link, tried_page) = { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let current_page = page_id_mapper(self.current_page_id.load(Ordering::Acquire) as usize); let page = &pages[current_page]; @@ -136,7 +174,7 @@ where match link { Ok(link) => { self.row_count.fetch_add(1, Ordering::Relaxed); - return Ok(link); + return Ok((link, vec![])); } Err(e) => match e { DataExecutionError::PageIsFull { .. } => { @@ -155,7 +193,7 @@ where } } - pub fn insert_cdc(&self, row: Row) -> Result<(Link, Vec), ExecutionError> + pub fn insert_cdc(&self, row: Row) -> Result where Row: Archive + for<'a> Serialize< @@ -164,18 +202,22 @@ where ::WrappedRow: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - >, + > + SizeMeasurable, { - let link = self.insert(row.clone())?; + let (link, ops) = self.insert_inner(row.clone())?; let general_row = ::WrappedRow::from_inner(row); let bytes = rkyv::to_bytes(&general_row) .expect("should be ok as insert not failed") .into_vec(); - Ok((link, bytes)) + Ok(InsertCdcOutput { + link, + data_bytes: bytes, + empty_link_registry_ops: ops, + }) } fn add_next_page(&self, tried_page: usize) { - let mut pages = self.pages.write().expect("lock should be not poisoned"); + let mut pages = self.pages.write(); if tried_page == page_id_mapper(self.current_page_id.load(Ordering::Acquire) as usize) { let index = self.last_page_id.fetch_add(1, Ordering::AcqRel) + 1; @@ -184,10 +226,6 @@ where } } - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "DataPages") - )] pub fn select(&self, link: Link) -> Result where Row: Archive @@ -197,9 +235,8 @@ where <::WrappedRow as Archive>::Archived: Portable + Deserialize<::WrappedRow, HighDeserializer>, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages - // - 1 is used because page ids are starting from 1. .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; let gen_row = page.get_row(link).map_err(ExecutionError::DataPageError)?; @@ -215,9 +252,8 @@ where <::WrappedRow as Archive>::Archived: Portable + Deserialize<::WrappedRow, HighDeserializer>, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages - // - 1 is used because page ids are starting from 1. .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; let gen_row = page.get_row(link).map_err(ExecutionError::DataPageError)?; @@ -227,10 +263,6 @@ where Ok(gen_row.get_inner()) } - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "DataPages") - )] pub fn with_ref(&self, link: Link, op: Op) -> Result where Row: Archive @@ -239,7 +271,7 @@ where >, Op: Fn(&<::WrappedRow as Archive>::Archived) -> Res, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get::(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; @@ -251,10 +283,6 @@ where } #[allow(clippy::missing_safety_doc)] - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "DataPages") - )] pub unsafe fn with_mut_ref( &self, link: Link, @@ -268,7 +296,7 @@ where <::WrappedRow as Archive>::Archived: Portable, Op: FnMut(&mut <::WrappedRow as Archive>::Archived) -> Res, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; @@ -299,24 +327,27 @@ where Strategy, Share>, rkyv::rancor::Error>, >, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; let gen_row = ::WrappedRow::from_inner(row); unsafe { page.save_row_by_link(&gen_row, link) - .map_err(ExecutionError::DataPageError) + .map_err(ExecutionError::DataPageError)?; } + + Ok(link) } - pub fn delete(&self, link: Link) -> Result<(), ExecutionError> { - self.empty_links.push(link); - Ok(()) + pub fn delete(&self, link: Link) -> Result, ExecutionError> { + let res_link = Data::::delete_row(link); + self.empty_links.add_empty_link(res_link); + Ok(vec![EmptyLinkRegistryOperation::Add(res_link)]) } pub fn select_raw(&self, link: Link) -> Result, ExecutionError> { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; @@ -325,7 +356,7 @@ where } pub fn get_bytes(&self) -> Vec<([u8; DATA_LENGTH], u32)> { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); pages .iter() .map(|p| (p.get_bytes(), p.free_offset.load(Ordering::Relaxed))) @@ -333,27 +364,34 @@ where } pub fn get_page_count(&self) -> usize { - self.pages.read().unwrap().len() + self.pages.read().len() } pub fn get_empty_links(&self) -> Vec { - let mut res = vec![]; - for l in self.empty_links.pop_iter() { - res.push(l) - } - - res + self.empty_links.as_vec() } pub fn with_empty_links(mut self, links: Vec) -> Self { - let stack = Stack::new(); + let registry = EmptyLinks::default(); for l in links { - stack.push(l) + registry.add_empty_link(l) } - self.empty_links = stack; + self.empty_links = registry; self } + + pub(crate) fn get_page( + &self, + id: PageId, + ) -> Result::WrappedRow, DATA_LENGTH>>, ExecutionError> { + let pages = self.pages.read(); + let page = pages + .get(page_id_mapper(id.into())) + .ok_or(ExecutionError::PageNotFound(id))?; + + Ok(page.clone()) + } } #[derive(Debug, Display, Error, From, PartialEq)] @@ -370,19 +408,34 @@ pub enum ExecutionError { #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Instant; - use rkyv::with::{AtomicLoad, Relaxed}; + use data_bucket::DefaultSizeMeasurable; use rkyv::{Archive, Deserialize, Serialize}; + use crate::in_memory::data::RowLength; use crate::in_memory::pages::DataPages; - use crate::in_memory::{PagesExecutionError, RowWrapper, StorableRow}; + use crate::in_memory::{Data, PagesExecutionError, RowWrapper, StorableRow}; + use crate::prelude::{ + EmptyLinksRegistry, SizeMeasurable, SizeMeasure, SizedEmptyLinkRegistry, align, + }; #[derive( - Archive, Copy, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, + Archive, + Copy, + Clone, + Deserialize, + Debug, + Eq, + Hash, + Ord, + PartialEq, + PartialOrd, + Serialize, + SizeMeasure, )] struct TestRow { a: u64, @@ -391,60 +444,58 @@ mod tests { /// General `Row` wrapper that is used to append general data for every `Inner` /// `Row`. - #[derive(Archive, Deserialize, Debug, Serialize)] - pub struct GeneralRow { + #[derive(Archive, Deserialize, Debug, Serialize, SizeMeasure)] + struct GeneralRow { /// Inner generic `Row`. - pub inner: Inner, + pub inner: TestRow, /// Indicator for ghosted rows. - #[rkyv(with = AtomicLoad)] - pub is_ghosted: AtomicBool, + pub is_ghosted: bool, /// Indicator for deleted rows. - #[rkyv(with = AtomicLoad)] - pub deleted: AtomicBool, + pub deleted: bool, } - impl RowWrapper for GeneralRow { - fn get_inner(self) -> Inner { + impl RowWrapper for GeneralRow { + fn get_inner(self) -> TestRow { self.inner } fn is_ghosted(&self) -> bool { - self.is_ghosted.load(Ordering::Relaxed) + self.is_ghosted } /// Creates new [`GeneralRow`] from `Inner`. - fn from_inner(inner: Inner) -> Self { + fn from_inner(inner: TestRow) -> Self { Self { inner, - is_ghosted: AtomicBool::new(true), - deleted: AtomicBool::new(false), + is_ghosted: true, + deleted: false, } } } impl StorableRow for TestRow { - type WrappedRow = GeneralRow; + type WrappedRow = GeneralRow; } #[test] fn insert() { - let pages = DataPages::::new(); + let pages = DataPages::::new(); let row = TestRow { a: 10, b: 20 }; let link = pages.insert(row).unwrap(); assert_eq!(link.page_id, 1.into()); assert_eq!(link.length, 24); - assert_eq!(link.offset, 0); + assert_eq!(link.offset, RowLength::default_aligned_size() as u32); assert_eq!(pages.row_count.load(Ordering::Relaxed), 1); } #[test] fn insert_many() { - let pages = DataPages::::new(); + let pages = DataPages::::new(); for _ in 0..10_000 { let row = TestRow { a: 10, b: 20 }; @@ -457,7 +508,7 @@ mod tests { #[test] fn select() { - let pages = DataPages::::new(); + let pages = DataPages::::new(); let row = TestRow { a: 10, b: 20 }; let link = pages.insert(row).unwrap(); @@ -468,7 +519,7 @@ mod tests { #[test] fn select_non_ghosted() { - let pages = DataPages::::new(); + let pages = DataPages::::new(); let row = TestRow { a: 10, b: 20 }; let link = pages.insert(row).unwrap(); @@ -479,7 +530,7 @@ mod tests { #[test] fn update() { - let pages = DataPages::::new(); + let pages = DataPages::::new(); let row = TestRow { a: 10, b: 20 }; let link = pages.insert(row).unwrap(); @@ -490,14 +541,19 @@ mod tests { #[test] fn delete() { - let pages = DataPages::::new(); + let pages = DataPages::::new(); let row = TestRow { a: 10, b: 20 }; let link = pages.insert(row).unwrap(); pages.delete(link).unwrap(); - assert_eq!(pages.empty_links.pop(), Some(link)); - pages.empty_links.push(link); + assert_eq!( + pages.empty_links.find_link_with_length(link.length), + Some(Data::::delete_row(link)) + ); + pages + .empty_links + .add_empty_link(Data::::delete_row(link)); let row = TestRow { a: 20, b: 20 }; let new_link = pages.insert(row).unwrap(); @@ -506,7 +562,7 @@ mod tests { #[test] fn insert_on_empty() { - let pages = DataPages::::new(); + let pages = DataPages::::new(); let row = TestRow { a: 10, b: 20 }; let link = pages.insert(row).unwrap(); @@ -519,7 +575,7 @@ mod tests { //#[test] fn _bench() { - let pages = Arc::new(DataPages::::new()); + let pages = Arc::new(DataPages::::new()); let mut v = Vec::new(); diff --git a/src/index/unsized_node.rs b/src/index/unsized_node.rs index 6b62e8d5..2aa0f079 100644 --- a/src/index/unsized_node.rs +++ b/src/index/unsized_node.rs @@ -213,6 +213,12 @@ where { self.inner.deref().iter() } + + fn drain(&mut self) -> Vec { + // we can not change `length` and other fields as node will be removed + // after drain. + self.inner.drain(..).collect() + } } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index d6699cb3..3d646b1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,15 +21,18 @@ pub use table::*; pub use worktable_codegen::worktable; pub mod prelude { - pub use crate::in_memory::{Data, DataPages, GhostWrapper, Query, RowWrapper, StorableRow}; + pub use crate::in_memory::{ + Data, DataPages, EmptyLinksRegistry, GhostWrapper, Query, RowWrapper, + SizedEmptyLinkRegistry, StorableRow, UnsizedEmptyLinkRegistry, + }; pub use crate::lock::LockMap; pub use crate::lock::{Lock, RowLock}; pub use crate::mem_stat::MemStat; pub use crate::persistence::{ - DeleteOperation, IndexTableOfContents, InsertOperation, Operation, OperationId, - PersistenceConfig, PersistenceEngine, PersistenceEngineOps, PersistenceTask, SpaceData, - SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, SpaceSecondaryIndexOps, - UpdateOperation, map_index_pages_to_toc_and_general, + DeleteOperation, EmptyLinkRegistryOperation, IndexTableOfContents, InsertOperation, + Operation, OperationId, PersistenceConfig, PersistenceEngine, PersistenceEngineOps, + PersistenceTask, SpaceData, SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, + SpaceSecondaryIndexOps, UpdateOperation, map_index_pages_to_toc_and_general, map_unsized_index_pages_to_toc_and_general, validate_events, }; pub use crate::primary_key::{PrimaryKeyGenerator, PrimaryKeyGeneratorState, TablePrimaryKey}; diff --git a/src/persistence/engine.rs b/src/persistence/engine.rs index 135335dc..be3de98b 100644 --- a/src/persistence/engine.rs +++ b/src/persistence/engine.rs @@ -152,8 +152,8 @@ where >, ) -> eyre::Result<()> { let batch_data_op = batch_op.get_batch_data_op()?; - let (pk_evs, secondary_evs) = batch_op.get_indexes_evs()?; + let mut empty_links_ops = batch_op.get_empty_link_registry_ops()?; { let mut futs = FuturesUnordered::new(); futs.push(Either::Left(Either::Right( @@ -167,12 +167,32 @@ where .process_change_event_batch(secondary_evs), )); - while (futs.next().await).is_some() {} + while futs.next().await.is_some() {} + } + + let mut need_to_save = false; + let info = self.data.get_mut_info(); + if !empty_links_ops.is_empty() { + let mut links_to_add = empty_links_ops.add_set.into_iter().collect::>(); + let empty_links = &mut info.inner.empty_links_list; + links_to_add.extend( + empty_links + .iter() + .filter(|l| !empty_links_ops.remove_set.remove(*l)) + .copied() + .collect::>(), + ); + std::mem::swap(&mut info.inner.empty_links_list, &mut links_to_add); + + need_to_save = true; } if let Some(pk_gen_state_update) = batch_op.get_pk_gen_state()? { - let info = self.data.get_mut_info(); info.inner.pk_gen_state = pk_gen_state_update; + need_to_save = true; + } + + if need_to_save { self.data.save_info().await?; } diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index 6c9b8a1c..6e475b29 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -8,8 +8,8 @@ use crate::persistence::operation::BatchOperation; pub use engine::PersistenceEngine; pub use manager::PersistenceConfig; pub use operation::{ - DeleteOperation, InsertOperation, Operation, OperationId, OperationType, UpdateOperation, - validate_events, + DeleteOperation, EmptyLinkRegistryOperation, InsertOperation, Operation, OperationId, + OperationType, UpdateOperation, validate_events, }; pub use space::{ IndexTableOfContents, SpaceData, SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, diff --git a/src/persistence/operation/batch.rs b/src/persistence/operation/batch.rs index 12c118a4..e40681c4 100644 --- a/src/persistence/operation/batch.rs +++ b/src/persistence/operation/batch.rs @@ -363,6 +363,31 @@ where } } + pub fn get_empty_link_registry_ops(&self) -> eyre::Result { + let mut state = EmptyLinkOperationsState::default(); + for row in self + .info_wt + .select_all() + .order_on(BatchInnerRowFields::OperationId, Order::Asc) + .execute()? + { + if row.op_type == OperationType::Update { + continue; + } + let pos = row.pos; + let op = self + .ops + .get(pos) + .expect("should be available as loaded from info table"); + let evs = op + .empty_links_ops() + .expect("should be available as `Update` operations are sorted out"); + state.append_ops(evs.iter()); + } + + Ok(state) + } + pub fn get_batch_data_op(&self) -> eyre::Result { let mut data = HashMap::new(); for link in self.info_wt.iter_links() { @@ -392,3 +417,150 @@ where Ok(data) } } + +#[derive(Debug, Default)] +pub struct EmptyLinkOperationsState { + pub add_set: HashSet, + pub remove_set: HashSet, +} + +impl EmptyLinkOperationsState { + pub fn append_ops<'a>(&mut self, ops: impl Iterator) { + for op in ops { + match op { + EmptyLinkRegistryOperation::Add(link) => { + if !self.remove_set.remove(link) { + self.add_set.insert(*link); + } + } + EmptyLinkRegistryOperation::Remove(link) => { + if !self.add_set.remove(link) { + self.remove_set.insert(*link); + } + } + } + } + } + + #[cfg(test)] + pub fn get_result_ops(self) -> Vec { + self.add_set + .into_iter() + .map(EmptyLinkRegistryOperation::Add) + .chain( + self.remove_set + .into_iter() + .map(EmptyLinkRegistryOperation::Remove), + ) + .collect() + } + + pub fn is_empty(&self) -> bool { + self.remove_set.is_empty() && self.add_set.is_empty() + } +} + +#[cfg(test)] +mod empty_links_tests { + use std::collections::HashSet; + + use data_bucket::Link; + use data_bucket::page::PageId; + + use crate::persistence::operation::batch::EmptyLinkOperationsState; + use crate::prelude::EmptyLinkRegistryOperation; + + fn gen_ops(len: usize, add: bool) -> Vec { + let mut ops = vec![]; + let mut offset = 0; + + for _ in 0..len { + let length = fastrand::u32(10..60); + let link = Link { + page_id: PageId::from(0), + offset, + length, + }; + offset += length; + if add { + ops.push(EmptyLinkRegistryOperation::Add(link)) + } else { + ops.push(EmptyLinkRegistryOperation::Remove(link)) + } + } + + ops + } + + #[test] + fn test_links_collection_basic() { + let ops = [ + EmptyLinkRegistryOperation::Add(Link { + page_id: PageId::from(0), + offset: 0, + length: 20, + }), + EmptyLinkRegistryOperation::Add(Link { + page_id: PageId::from(0), + offset: 20, + length: 60, + }), + EmptyLinkRegistryOperation::Remove(Link { + page_id: PageId::from(0), + offset: 20, + length: 60, + }), + ]; + + let mut state = EmptyLinkOperationsState::default(); + state.append_ops(ops.iter()); + let res = state.get_result_ops(); + + assert_eq!(res.len(), 1); + assert_eq!(res, vec![ops[0]]) + } + + #[test] + fn test_links_collection_random_remove() { + let add_ops = gen_ops(256, true); + let mut remove_ops = HashSet::new(); + for _ in 0..64 { + let op_pos = fastrand::usize(0..256); + let mut op = add_ops[op_pos]; + while !remove_ops.insert(EmptyLinkRegistryOperation::Remove(op.link())) { + let op_pos = fastrand::usize(0..256); + op = add_ops[op_pos]; + } + } + assert_eq!(remove_ops.len(), 64); + + let mut state = EmptyLinkOperationsState::default(); + state.append_ops(add_ops.iter()); + state.append_ops(remove_ops.iter()); + let res = state.get_result_ops(); + + assert_eq!(res.len(), add_ops.len() - remove_ops.len()); + } + + #[test] + fn test_links_collection_random_add() { + let remove_ops = gen_ops(256, false); + let mut add_ops = HashSet::new(); + for _ in 0..64 { + let op_pos = fastrand::usize(0..256); + let mut op = remove_ops[op_pos]; + while !add_ops.insert(EmptyLinkRegistryOperation::Add(op.link())) { + let op_pos = fastrand::usize(0..256); + op = remove_ops[op_pos]; + } + } + assert_eq!(add_ops.len(), 64); + + let mut state = EmptyLinkOperationsState::default(); + state.append_ops(remove_ops.iter()); + state.append_ops(add_ops.iter()); + let res = state.get_result_ops(); + + assert_eq!(res.len(), remove_ops.len() - add_ops.len()); + } +} diff --git a/src/persistence/operation/mod.rs b/src/persistence/operation/mod.rs index cd4e5499..c6c07fc5 100644 --- a/src/persistence/operation/mod.rs +++ b/src/persistence/operation/mod.rs @@ -15,7 +15,9 @@ use uuid::Uuid; use crate::prelude::From; pub use batch::{BatchInnerRow, BatchInnerWorkTable, BatchOperation, PosByOpIdQuery}; -pub use operation::{DeleteOperation, InsertOperation, Operation, UpdateOperation}; +pub use operation::{ + DeleteOperation, EmptyLinkRegistryOperation, InsertOperation, Operation, UpdateOperation, +}; pub use util::validate_events; /// Represents page's identifier. Is unique within the table bounds diff --git a/src/persistence/operation/operation.rs b/src/persistence/operation/operation.rs index e00ad976..040dff04 100644 --- a/src/persistence/operation/operation.rs +++ b/src/persistence/operation/operation.rs @@ -105,6 +105,29 @@ impl Operation::Delete(_) => None, } } + + pub fn empty_links_ops(&self) -> Option<&Vec> { + match &self { + Operation::Insert(insert) => Some(&insert.empty_link_registry_operations), + Operation::Delete(delete) => Some(&delete.empty_link_registry_operations), + Operation::Update(_) => None, + } + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Hash, Eq)] +pub enum EmptyLinkRegistryOperation { + Add(Link), + Remove(Link), +} + +impl EmptyLinkRegistryOperation { + pub fn link(&self) -> Link { + match self { + EmptyLinkRegistryOperation::Add(l) => *l, + EmptyLinkRegistryOperation::Remove(l) => *l, + } + } } #[derive(Clone, Debug)] @@ -112,6 +135,7 @@ pub struct InsertOperation { pub id: OperationId, pub primary_key_events: Vec>>, pub secondary_keys_events: SecondaryKeys, + pub empty_link_registry_operations: Vec, pub pk_gen_state: PrimaryKeyGenState, pub bytes: Vec, pub link: Link, @@ -130,5 +154,6 @@ pub struct DeleteOperation { pub id: OperationId, pub primary_key_events: Vec>>, pub secondary_keys_events: SecondaryKeys, + pub empty_link_registry_operations: Vec, pub link: Link, } diff --git a/src/table/defragmentator/mod.rs b/src/table/defragmentator/mod.rs new file mode 100644 index 00000000..1f2a95b5 --- /dev/null +++ b/src/table/defragmentator/mod.rs @@ -0,0 +1,339 @@ +use data_bucket::page::PageId; +use data_bucket::{Link, SizeMeasurable}; +use indexset::core::node::NodeLike; +use indexset::core::pair::Pair; +use rkyv::api::high::HighDeserializer; +use rkyv::rancor::Strategy; +use rkyv::ser::Serializer; +use rkyv::ser::allocator::ArenaHandle; +use rkyv::ser::sharing::Share; +use rkyv::util::AlignedVec; +use rkyv::{Archive, Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use tokio::sync::{Notify, RwLock}; + +use crate::in_memory::{ + DataExecutionError, EmptyLinksRegistry, GhostWrapper, RowWrapper, StorableRow, +}; +use crate::lock::LockMap; +use crate::prelude::{Lock, TablePrimaryKey}; +use crate::{TableRow, TableSecondaryIndex, WorkTable, WorkTableError}; + +pub struct DefragmentatorTask { + /// Shared map for locking pages that are in defragmentation progress. + lock: Arc>, + + /// Shared notifier for waking up [`Defragmentator`] + notify: Arc, +} + +#[derive(Debug)] +pub struct Defragmentator< + Row, + PrimaryKey, + EmptyLinks, + AvailableTypes, + AvailableIndexes, + SecondaryIndexes, + LockType, + PkGen, + NodeType, + const DATA_LENGTH: usize, +> where + PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, + Row: StorableRow + Send + Clone + 'static, + NodeType: NodeLike> + Send + 'static, +{ + /// [`WorkTable`] to work with. + table: Arc< + WorkTable< + Row, + PrimaryKey, + EmptyLinks, + AvailableTypes, + AvailableIndexes, + SecondaryIndexes, + LockType, + PkGen, + NodeType, + DATA_LENGTH, + >, + >, + + /// Map for locking pages that are in defragmentation progress. + lock_map: Arc>, + + /// Notifier for waking up [`Defragmentator`] + notify: Arc, +} + +impl< + Row, + PrimaryKey, + EmptyLinks, + AvailableTypes, + AvailableIndexes, + SecondaryIndexes, + LockType, + PkGen, + NodeType, + const DATA_LENGTH: usize, +> + Defragmentator< + Row, + PrimaryKey, + EmptyLinks, + AvailableTypes, + AvailableIndexes, + SecondaryIndexes, + LockType, + PkGen, + NodeType, + DATA_LENGTH, + > +where + PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, + EmptyLinks: Default + EmptyLinksRegistry, + SecondaryIndexes: Default + TableSecondaryIndex, + PkGen: Default, + AvailableIndexes: Debug, + NodeType: NodeLike> + Send + 'static, + Row: Archive + TableRow + StorableRow + Send + Clone + 'static, + ::WrappedRow: RowWrapper, + Row: Archive + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + >, + <::WrappedRow as Archive>::Archived: + Deserialize<::WrappedRow, HighDeserializer>, + ::WrappedRow: Archive + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + > + SizeMeasurable, + <::WrappedRow as Archive>::Archived: GhostWrapper, +{ + pub fn new( + table: Arc< + WorkTable< + Row, + PrimaryKey, + EmptyLinks, + AvailableTypes, + AvailableIndexes, + SecondaryIndexes, + LockType, + PkGen, + NodeType, + DATA_LENGTH, + >, + >, + ) -> (Self, DefragmentatorTask) { + let s = Self { + table, + lock_map: Arc::new(LockMap::default()), + notify: Arc::new(Notify::new()), + }; + + let t = DefragmentatorTask { + lock: s.lock_map.clone(), + notify: s.notify.clone(), + }; + + (s, t) + } + + fn defragment(&self) -> eyre::Result<()> { + const SINGLE_LINK_RATIO_TRIGGER: f32 = 0.4; + + let empty_links = self.table.data.get_empty_links(); + let empty_links_len = empty_links.len(); + let mapped_links = Self::map_empty_links_by_pages(empty_links); + let single_link_pages = mapped_links.values().filter(|v| v.len() == 1).count(); + + let links_left = + if single_link_pages as f32 / empty_links_len as f32 > SINGLE_LINK_RATIO_TRIGGER { + self.defragment_if_triggered(mapped_links) + } else { + self.defragment_if_not_triggered(mapped_links) + }?; + + Ok(()) + } + + fn defragment_if_triggered( + &self, + mapped_links: HashMap>, + ) -> eyre::Result> { + let mut links_left = vec![]; + Ok(links_left) + } + + fn defragment_if_not_triggered( + &self, + mapped_links: HashMap>, + ) -> eyre::Result> { + let mut links_left = vec![]; + + for (page_id, mut links) in mapped_links { + // sorting `Link`s in ascending order. + let first_link = links + .iter() + .min_by(|l1, l2| l1.offset.cmp(&l2.offset)) + .copied() + .expect("links should not be empty"); + let mut temporary_removed_rows = vec![]; + + let lock_id = self.lock_map.next_id(); + let lock = Arc::new(RwLock::new(Lock::new(lock_id))); + self.lock_map.insert(page_id, lock).expect( + "nothing should be returned as this is single defragmentation thread for table", + ); + + links.remove(&first_link); + let mut link = first_link; + let page = self.table.data.get_page(page_id)?; + + // removing all rows on page after first link + loop { + let res = page.get_next_lying_row(link); + let (wrapped_row, res_link) = match res { + Ok(res) => res, + Err(e) => match e { + DataExecutionError::InvalidLink => { + break; + } + _ => return Err(e.into()), + }, + }; + + if !links.remove(&res_link) { + let row = wrapped_row.get_inner().clone(); + temporary_removed_rows.push(row.clone()); + self.table + .pk_map + .remove(&row.get_primary_key()) + .expect("should exist as current page is blocked"); + self.table + .indexes + .delete_row(row, res_link) + .expect("should be ok as current page is blocked") + } + + link = res_link; + } + + page.set_free_offset(first_link.offset); + + for row in temporary_removed_rows { + let wrapped_row = Row::WrappedRow::from_inner(row.clone()); + let new_link = page.save_row(&wrapped_row)?; + self.table + .pk_map + .insert(row.get_primary_key(), new_link) + .expect("should not exist as current page was blocked"); + self.table + .indexes + .save_row(row, new_link) + .expect("should be ok as current page was blocked"); + unsafe { self.table.data.with_mut_ref(new_link, |r| r.unghost())? } + } + + let current_offset = page.free_offset.load(Ordering::Relaxed); + page.set_free_offset(DATA_LENGTH as u32); + + let link_left = Link { + page_id, + offset: current_offset, + length: DATA_LENGTH as u32 - current_offset, + }; + + links_left.push(link_left) + } + + Ok(links_left) + } + + fn map_empty_links_by_pages(empty_links: Vec) -> HashMap> { + let mut map = HashMap::new(); + for link in empty_links { + map.entry(link.page_id) + .and_modify(|s: &mut HashSet<_>| { + s.insert(link); + }) + .or_insert({ + let mut s = HashSet::new(); + s.insert(link); + s + }); + } + + map + } +} + +#[cfg(test)] +mod tests { + use crate::prelude::*; + use crate::table::defragmentator::Defragmentator; + use std::collections::HashSet; + use std::sync::Arc; + use worktable_codegen::worktable; + + worktable! ( + name: Test, + columns: { + id: u64 primary_key autoincrement, + test: i64, + another: u64, + exchange: String + }, + indexes: { + test_idx: test unique, + exchnage_idx: exchange, + another_idx: another, + } + queries: { + delete: { + ByAnother() by another, + ByExchange() by exchange, + ByTest() by test, + } + } + ); + + #[tokio::test] + async fn test_defragmentation_logic() { + let table = Arc::new(TestWorkTable::default()); + + let mut pks = HashSet::new(); + + for i in 0..100 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: (i as u64) % 20, + exchange: format!("another_{i}"), + }; + let pk = table.insert(row).unwrap(); + pks.insert(pk.0); + } + + let mut removed_pks = HashSet::new(); + while removed_pks.len() != 30 { + let mut id = fastrand::u64(0..100); + while !removed_pks.insert(id) { + id = fastrand::u64(0..100); + } + } + + for pk in removed_pks { + pks.remove(&pk); + table.delete(pk.into()).await.unwrap(); + } + + let (d, _) = Defragmentator::new(table.clone()); + } +} diff --git a/src/table/mod.rs b/src/table/mod.rs index 0bb06cb7..6c8a517a 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -1,10 +1,13 @@ +mod defragmentator; pub mod select; pub mod system_info; use std::fmt::Debug; use std::marker::PhantomData; -use crate::in_memory::{DataPages, GhostWrapper, RowWrapper, StorableRow}; +use crate::in_memory::{ + DataPages, EmptyLinksRegistry, GhostWrapper, InsertCdcOutput, RowWrapper, StorableRow, +}; use crate::lock::LockMap; use crate::persistence::{InsertOperation, Operation}; use crate::prelude::{OperationId, PrimaryKeyGeneratorState}; @@ -13,12 +16,10 @@ use crate::{ AvailableIndex, IndexError, IndexMap, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, in_memory, }; -use data_bucket::{INNER_PAGE_SIZE, Link}; +use data_bucket::{INNER_PAGE_SIZE, Link, SizeMeasurable}; use derive_more::{Display, Error, From}; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; -#[cfg(feature = "perf_measurements")] -use performance_measurement_codegen::performance_measurement; use rkyv::api::high::HighDeserializer; use rkyv::rancor::Strategy; use rkyv::ser::Serializer; @@ -32,6 +33,7 @@ use uuid::Uuid; pub struct WorkTable< Row, PrimaryKey, + EmptyLinks, AvailableTypes = (), AvailableIndexes = (), SecondaryIndexes = (), @@ -44,7 +46,7 @@ pub struct WorkTable< Row: StorableRow + Send + Clone + 'static, PkNodeType: NodeLike> + Send + 'static, { - pub data: DataPages, + pub data: DataPages, pub pk_map: IndexMap, @@ -65,6 +67,7 @@ pub struct WorkTable< impl< Row, PrimaryKey, + EmptyLinks, AvailableTypes, AvailableIndexes, SecondaryIndexes, @@ -76,6 +79,7 @@ impl< for WorkTable< Row, PrimaryKey, + EmptyLinks, AvailableTypes, AvailableIndexes, SecondaryIndexes, @@ -86,6 +90,7 @@ impl< > where PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, + EmptyLinks: Default + EmptyLinksRegistry, SecondaryIndexes: Default, PkGen: Default, PkNodeType: NodeLike> + Send + 'static, @@ -109,6 +114,7 @@ where impl< Row, PrimaryKey, + EmptyLinks, AvailableTypes, AvailableIndexes, SecondaryIndexes, @@ -120,6 +126,7 @@ impl< WorkTable< Row, PrimaryKey, + EmptyLinks, AvailableTypes, AvailableIndexes, SecondaryIndexes, @@ -131,6 +138,7 @@ impl< where Row: TableRow, PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, + EmptyLinks: Default + EmptyLinksRegistry, PkNodeType: NodeLike> + Send + 'static, Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, @@ -143,10 +151,6 @@ where } /// Selects `Row` from table identified with provided primary key. Returns `None` if no value presented. - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "WorkTable") - )] pub fn select(&self, pk: PrimaryKey) -> Option where LockType: 'static, @@ -170,10 +174,6 @@ where } } - #[cfg_attr( - feature = "perf_measurements", - performance_measurement(prefix_name = "WorkTable") - )] pub fn insert(&self, row: Row) -> Result where Row: Archive @@ -184,7 +184,7 @@ where ::WrappedRow: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - >, + > + SizeMeasurable, <::WrappedRow as Archive>::Archived: GhostWrapper, PrimaryKey: Clone, AvailableTypes: 'static, @@ -246,7 +246,7 @@ where ::WrappedRow: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - >, + > + SizeMeasurable, <::WrappedRow as Archive>::Archived: GhostWrapper, PrimaryKey: Clone, SecondaryIndexes: TableSecondaryIndex @@ -255,10 +255,16 @@ where AvailableIndexes: Debug + AvailableIndex, { let pk = row.get_primary_key().clone(); - let (link, _) = self + let output = self .data .insert_cdc(row.clone()) .map_err(WorkTableError::PagesError)?; + let InsertCdcOutput { + link, + data_bytes: _, + empty_link_registry_ops, + } = output; + let primary_key_events = self.pk_map.checked_insert_cdc(pk.clone(), link); if primary_key_events.is_none() { self.data.delete(link).map_err(WorkTableError::PagesError)?; @@ -296,6 +302,7 @@ where pk_gen_state: self.pk_gen.get_state(), primary_key_events: primary_key_events.expect("should be checked before for existence"), secondary_keys_events: indexes_res.expect("was checked before"), + empty_link_registry_operations: empty_link_registry_ops, bytes, link, }); @@ -321,7 +328,7 @@ where ::WrappedRow: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - >, + > + SizeMeasurable, <::WrappedRow as Archive>::Archived: GhostWrapper, PrimaryKey: Clone, AvailableTypes: 'static, @@ -397,7 +404,7 @@ where ::WrappedRow: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - >, + > + SizeMeasurable, <::WrappedRow as Archive>::Archived: GhostWrapper, PrimaryKey: Clone, SecondaryIndexes: TableSecondaryIndex @@ -414,10 +421,15 @@ where .get(&pk) .map(|v| v.get().value) .ok_or(WorkTableError::NotFound)?; - let (new_link, _) = self + let output = self .data .insert_cdc(row_new.clone()) .map_err(WorkTableError::PagesError)?; + let InsertCdcOutput { + link: new_link, + data_bytes: _, + mut empty_link_registry_ops, + } = output; unsafe { self.data .with_mut_ref(new_link, |r| r.unghost()) @@ -446,9 +458,11 @@ where }; } - self.data + let delete_ops = self + .data .delete(old_link) .map_err(WorkTableError::PagesError)?; + empty_link_registry_ops.extend(delete_ops); let bytes = self .data .select_raw(new_link) @@ -459,6 +473,7 @@ where pk_gen_state: self.pk_gen.get_state(), primary_key_events, secondary_keys_events: indexes_res.expect("was checked just before"), + empty_link_registry_operations: empty_link_registry_ops, bytes, link: new_link, }); diff --git a/src/table/system_info.rs b/src/table/system_info.rs index 6575195b..bec128b5 100644 --- a/src/table/system_info.rs +++ b/src/table/system_info.rs @@ -4,7 +4,7 @@ use indexset::core::pair::Pair; use prettytable::{Table, format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR, row}; use std::fmt::{self, Debug, Display, Formatter}; -use crate::in_memory::{RowWrapper, StorableRow}; +use crate::in_memory::{EmptyLinksRegistry, RowWrapper, StorableRow}; use crate::mem_stat::MemStat; use crate::{TableSecondaryIndexInfo, WorkTable}; @@ -48,6 +48,7 @@ impl Display for IndexKind { impl< Row, PrimaryKey, + EmptyLinks, AvailableTypes, AvailableIndexes, SecondaryIndexes, @@ -59,6 +60,7 @@ impl< WorkTable< Row, PrimaryKey, + EmptyLinks, AvailableTypes, AvailableIndexes, SecondaryIndexes, @@ -69,6 +71,7 @@ impl< > where PrimaryKey: Debug + Clone + Ord + Send + 'static + std::hash::Hash, + EmptyLinks: Default + EmptyLinksRegistry, Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, NodeType: NodeLike> + Send + 'static, diff --git a/tests/data/expected/test_persist/.wt.data b/tests/data/expected/test_persist/.wt.data index cf48e267..23de210f 100644 Binary files a/tests/data/expected/test_persist/.wt.data and b/tests/data/expected/test_persist/.wt.data differ diff --git a/tests/data/expected/test_persist/another_idx.wt.idx b/tests/data/expected/test_persist/another_idx.wt.idx index b8849267..6100f911 100644 Binary files a/tests/data/expected/test_persist/another_idx.wt.idx and b/tests/data/expected/test_persist/another_idx.wt.idx differ diff --git a/tests/data/expected/test_persist/primary.wt.idx b/tests/data/expected/test_persist/primary.wt.idx index 6924ed7a..af1f5647 100644 Binary files a/tests/data/expected/test_persist/primary.wt.idx and b/tests/data/expected/test_persist/primary.wt.idx differ diff --git a/tests/data/expected/test_without_secondary_indexes/.wt.data b/tests/data/expected/test_without_secondary_indexes/.wt.data index 555d8cb0..a9fdec9f 100644 Binary files a/tests/data/expected/test_without_secondary_indexes/.wt.data and b/tests/data/expected/test_without_secondary_indexes/.wt.data differ diff --git a/tests/data/expected/test_without_secondary_indexes/primary.wt.idx b/tests/data/expected/test_without_secondary_indexes/primary.wt.idx index c2cce055..af5ecab4 100644 Binary files a/tests/data/expected/test_without_secondary_indexes/primary.wt.idx and b/tests/data/expected/test_without_secondary_indexes/primary.wt.idx differ diff --git a/tests/data/test_persist/.wt.data b/tests/data/test_persist/.wt.data index cf48e267..23de210f 100644 Binary files a/tests/data/test_persist/.wt.data and b/tests/data/test_persist/.wt.data differ diff --git a/tests/data/test_persist/another_idx.wt.idx b/tests/data/test_persist/another_idx.wt.idx index b8849267..6100f911 100644 Binary files a/tests/data/test_persist/another_idx.wt.idx and b/tests/data/test_persist/another_idx.wt.idx differ diff --git a/tests/data/test_persist/primary.wt.idx b/tests/data/test_persist/primary.wt.idx index 6924ed7a..af1f5647 100644 Binary files a/tests/data/test_persist/primary.wt.idx and b/tests/data/test_persist/primary.wt.idx differ diff --git a/tests/data/test_without_secondary_indexes/.wt.data b/tests/data/test_without_secondary_indexes/.wt.data index 555d8cb0..a9fdec9f 100644 Binary files a/tests/data/test_without_secondary_indexes/.wt.data and b/tests/data/test_without_secondary_indexes/.wt.data differ diff --git a/tests/data/test_without_secondary_indexes/primary.wt.idx b/tests/data/test_without_secondary_indexes/primary.wt.idx index c2cce055..af5ecab4 100644 Binary files a/tests/data/test_without_secondary_indexes/primary.wt.idx and b/tests/data/test_without_secondary_indexes/primary.wt.idx differ diff --git a/tests/persistence/read.rs b/tests/persistence/read.rs index 7b658052..4c2fc05d 100644 --- a/tests/persistence/read.rs +++ b/tests/persistence/read.rs @@ -49,7 +49,7 @@ async fn test_primary_index_parse() { let mut key = 1; let length = 24; - let mut offset = 0; + let mut offset = 8; let page_id = 1.into(); for val in &index.inner.index_values[..index.inner.current_length as usize] { @@ -64,7 +64,7 @@ async fn test_primary_index_parse() { ); key += 1; - offset += length; + offset += length + 8; } } @@ -86,7 +86,7 @@ async fn test_another_idx_index_parse() { let mut key = 1; let length = 24; - let mut offset = 0; + let mut offset = 8; let page_id = 1.into(); for val in &index.inner.index_values[..index.inner.current_length as usize] { @@ -101,7 +101,7 @@ async fn test_another_idx_index_parse() { ); key += 1; - offset += length; + offset += length + 8; } } @@ -121,7 +121,7 @@ async fn test_data_parse() { assert_eq!(data.header.previous_id, 0.into()); assert_eq!(data.header.next_id, 0.into()); assert_eq!(data.header.page_type, PageType::Data); - assert_eq!(data.header.data_length, 2376); + assert_eq!(data.header.data_length, 3168); } #[tokio::test] diff --git a/tests/persistence/sync/mod.rs b/tests/persistence/sync/mod.rs index ba206960..bd53fd27 100644 --- a/tests/persistence/sync/mod.rs +++ b/tests/persistence/sync/mod.rs @@ -1,9 +1,10 @@ -use crate::remove_dir_if_exists; use std::time::Duration; use worktable::prelude::*; use worktable::worktable; +use crate::remove_dir_if_exists; + mod string_primary_index; mod string_re_read; mod string_secondary_index; @@ -394,3 +395,49 @@ fn test_space_delete_query_sync() { } }); } + +#[test] +fn test_space_empty_links_available() { + let config = + PersistenceConfig::new("tests/data/sync/empty_links", "tests/data/sync/empty_links"); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/empty_links".to_string()).await; + + let pk = { + let table = TestSyncWorkTable::load_from_file(config.clone()) + .await + .unwrap(); + let row1 = TestSyncRow { + another: 42, + non_unique: 0, + field: 0.234, + id: table.get_next_pk().into(), + }; + table.insert(row1.clone()).unwrap(); + let row2 = TestSyncRow { + another: 43, + non_unique: 0, + field: 0.235, + id: table.get_next_pk().into(), + }; + table.insert(row2.clone()).unwrap(); + table.delete(row2.id.into()).await.unwrap(); + + table.wait_for_ops().await; + row1.id + }; + { + let table = TestSyncWorkTable::load_from_file(config).await.unwrap(); + assert!(table.select(pk).is_some()); + assert_eq!(table.0.data.get_empty_links().len(), 1); + } + }); +} diff --git a/tests/persistence/toc/read.rs b/tests/persistence/toc/read.rs index 1f54a762..2ef11101 100644 --- a/tests/persistence/toc/read.rs +++ b/tests/persistence/toc/read.rs @@ -46,7 +46,7 @@ async fn test_index_table_of_contents_read_from_space() { 99, Link { page_id: 1.into(), - offset: 2352, + offset: 3144, length: 24 } )), diff --git a/tests/worktable/base.rs b/tests/worktable/base.rs index dacc0148..45f3ce93 100644 --- a/tests/worktable/base.rs +++ b/tests/worktable/base.rs @@ -199,7 +199,10 @@ async fn update_string() { let selected_row = table.select(pk).unwrap(); assert_eq!(selected_row, updated); - assert_eq!(table.0.data.get_empty_links().first().unwrap(), &first_link); + assert_eq!( + table.0.data.get_empty_links().first().unwrap(), + &Data::::delete_row(first_link) + ); assert!(table.select(2).is_none()) } @@ -1199,9 +1202,9 @@ async fn test_update_by_pk() { fn _bench() { let table = TestWorkTable::default(); - let mut v = Vec::with_capacity(10000); + let mut v = Vec::with_capacity(100_000); - for i in 0..10000 { + for i in 0..100_000 { let row = TestRow { id: table.get_next_pk().into(), test: i + 1, diff --git a/tests/worktable/unsized_.rs b/tests/worktable/unsized_.rs index 834b0de7..3b0c738b 100644 --- a/tests/worktable/unsized_.rs +++ b/tests/worktable/unsized_.rs @@ -62,7 +62,10 @@ async fn test_update_string_full_row() { exchange: "bigger test to test string update".to_string(), } ); - assert_eq!(table.0.data.get_empty_links().first().unwrap(), &first_link) + assert_eq!( + table.0.data.get_empty_links().first().unwrap(), + &Data::::delete_row(first_link) + ) } #[tokio::test] @@ -93,7 +96,10 @@ async fn test_update_string_by_unique() { exchange: "bigger test to test string update".to_string(), } ); - assert_eq!(table.0.data.get_empty_links().first().unwrap(), &first_link) + assert_eq!( + table.0.data.get_empty_links().first().unwrap(), + &Data::::delete_row(first_link) + ) } #[tokio::test] @@ -124,7 +130,10 @@ async fn test_update_string_by_pk() { exchange: "bigger test to test string update".to_string(), } ); - assert_eq!(table.0.data.get_empty_links().first().unwrap(), &first_link) + assert_eq!( + table.0.data.get_empty_links().first().unwrap(), + &Data::::delete_row(first_link) + ) } #[tokio::test] @@ -175,8 +184,8 @@ async fn test_update_string_by_non_unique() { ); let empty_links = table.0.data.get_empty_links(); assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert!(empty_links.contains(&Data::::delete_row(first_link))); + assert!(empty_links.contains(&Data::::delete_row(second_link))) } #[tokio::test] @@ -350,7 +359,10 @@ async fn test_update_many_strings_by_unique() { other_srting: "other".to_string(), } ); - assert_eq!(table.0.data.get_empty_links().first().unwrap(), &first_link) + assert_eq!( + table.0.data.get_empty_links().first().unwrap(), + &Data::::delete_row(first_link) + ) } #[tokio::test] @@ -386,7 +398,10 @@ async fn test_update_many_strings_by_pk() { other_srting: "other".to_string(), } ); - assert_eq!(table.0.data.get_empty_links().first().unwrap(), &first_link) + assert_eq!( + table.0.data.get_empty_links().first().unwrap(), + &Data::::delete_row(first_link) + ) } #[tokio::test] @@ -449,8 +464,8 @@ async fn test_update_many_strings_by_non_unique() { ); let empty_links = table.0.data.get_empty_links(); assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert!(empty_links.contains(&Data::::delete_row(first_link))); + assert!(empty_links.contains(&Data::::delete_row(second_link))) } #[tokio::test] @@ -513,8 +528,8 @@ async fn test_update_many_strings_by_string() { ); let empty_links = table.0.data.get_empty_links(); assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert!(empty_links.contains(&Data::::delete_row(first_link))); + assert!(empty_links.contains(&Data::::delete_row(second_link))) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/tests/worktable/with_enum.rs b/tests/worktable/with_enum.rs index 9ee99167..89c5f9f2 100644 --- a/tests/worktable/with_enum.rs +++ b/tests/worktable/with_enum.rs @@ -2,7 +2,9 @@ use rkyv::{Archive, Deserialize, Serialize}; use worktable::prelude::*; use worktable::worktable; -#[derive(Archive, Clone, Copy, Debug, Deserialize, Serialize, PartialEq, PartialOrd, MemStat)] +#[derive( + Archive, Clone, Copy, Debug, Deserialize, Serialize, PartialEq, PartialOrd, MemStat, SizeMeasure, +)] #[rkyv(compare(PartialEq), derive(Debug))] pub enum SomeEnum { First,