From 00b9d446b67f0c30c321440e233c65b1342378e0 Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 08:36:02 -0400 Subject: [PATCH 01/10] init --- examples/ipmpsc-receive.rs | 31 ++++- examples/ipmpsc-send.rs | 14 ++- src/lib.rs | 229 +++++++++++++++++++++++++++---------- 3 files changed, 209 insertions(+), 65 deletions(-) diff --git a/examples/ipmpsc-receive.rs b/examples/ipmpsc-receive.rs index 89b6a3d..c32e0ba 100644 --- a/examples/ipmpsc-receive.rs +++ b/examples/ipmpsc-receive.rs @@ -1,7 +1,32 @@ #![deny(warnings)] use clap::{App, Arg}; -use ipmpsc::{Receiver, SharedRingBuffer}; +use ipmpsc::{Receiver, SharedRingBuffer, ShmDeserializer, ShmZeroCopyDeserializer}; +use serde::{Deserialize}; + +#[derive(Debug)] +pub struct BincodeZeroCopyDeserializer(pub T); + +impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer +where + T: Deserialize<'de>, +{ + fn deserialize_from_bytes(bytes: &'de [u8]) -> ipmpsc::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} + +#[derive(Debug)] +pub struct BincodeDeserializer(pub T); + +impl ShmDeserializer for BincodeDeserializer +where T: for<'de> Deserialize<'de> +{ + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> ipmpsc::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} + fn main() -> Result<(), Box> { let matches = App::new("ipmpsc-send") @@ -34,9 +59,9 @@ fn main() -> Result<(), Box> { loop { if zero_copy { - println!("received {:?}", rx.zero_copy_context().recv::<&str>()?); + println!("received {:?}", rx.zero_copy_context().recv::>()?); } else { - println!("received {:?}", rx.recv::()?); + println!("received {:?}", rx.recv::>()?); } } } diff --git a/examples/ipmpsc-send.rs b/examples/ipmpsc-send.rs index 2c85398..2e4121d 100644 --- a/examples/ipmpsc-send.rs +++ b/examples/ipmpsc-send.rs @@ -1,9 +1,19 @@ #![deny(warnings)] use clap::{App, Arg}; -use ipmpsc::{Sender, SharedRingBuffer}; +use ipmpsc::{Sender, SharedRingBuffer, ShmSerializer}; +use serde::Serialize; use std::io::{self, BufRead}; +#[derive(Debug)] +pub struct BincodeSerializer(pub T); + +impl ShmSerializer for BincodeSerializer { + fn serialize(&self) -> ipmpsc::Result> { + Ok(bincode::serialize(&self.0)?) + } +} + fn main() -> Result<(), Box> { let matches = App::new("ipmpsc-send") .about("ipmpsc sender example") @@ -29,7 +39,7 @@ fn main() -> Result<(), Box> { println!("Ready! Enter some lines of text to send them to the receiver."); while handle.read_line(&mut buffer)? > 0 { - tx.send(&buffer)?; + tx.send::>(&BincodeSerializer(&buffer))?; buffer.clear(); } diff --git a/src/lib.rs b/src/lib.rs index c486847..59ec686 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,17 +9,11 @@ use memmap2::MmapMut; use os::{Buffer, Header, View}; -use serde::{Deserialize, Serialize}; use std::{ - cell::UnsafeCell, - ffi::c_void, - fs::{File, OpenOptions}, - mem, - sync::{ + array::TryFromSliceError, cell::UnsafeCell, convert::TryInto, ffi::c_void, fs::{File, OpenOptions}, mem, sync::{ atomic::Ordering::{Acquire, Relaxed, Release}, Arc, - }, - time::{Duration, Instant}, + }, time::{Duration, Instant} }; use tempfile::NamedTempFile; use thiserror::Error as ThisError; @@ -92,6 +86,9 @@ pub enum Error { /// Wrapped bincode error encountered during (de)serialization. #[error(transparent)] Bincode(#[from] bincode::Error), + + #[error(transparent)] + TryFrom(#[from] TryFromSliceError), } /// `ipmpsc`-specific Result type alias @@ -116,6 +113,20 @@ fn map(file: &File) -> Result { } } +pub trait ShmSerializer { + fn serialize(&self) -> Result>; +} + +// pub trait ShmDeserializer { +// fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> Result where Self: Sized; +// } +pub trait ShmZeroCopyDeserializer<'de>: Sized { + fn deserialize_from_bytes(bytes: &'de [u8]) -> Result; +} +pub trait ShmDeserializer: Sized { + fn deserialize_from_bytes(bytes: &[u8]) -> Result; +} + /// Represents a file-backed shared memory ring buffer, suitable for constructing a /// [`Receiver`](struct.Receiver.html) or [`Sender`](struct.Sender.html). /// @@ -222,11 +233,9 @@ impl Receiver { /// Attempt to read a message without blocking. /// /// This will return `Ok(None)` if there are no messages immediately available. - pub fn try_recv(&self) -> Result> - where - T: for<'de> Deserialize<'de>, + pub fn try_recv(&self) -> Result> { - Ok(if let Some((value, position)) = self.try_recv_0()? { + Ok(if let Some((value, position)) = self.try_recv_0::()? { self.seek(position)?; Some(value) @@ -235,7 +244,7 @@ impl Receiver { }) } - fn try_recv_0<'a, T: Deserialize<'a>>(&'a self) -> Result> { + fn try_recv_0<'a, T: ShmDeserializer>(&'a self) -> Result> { let buffer = self.0 .0.buffer(); let map = buffer.map(); @@ -246,11 +255,45 @@ impl Receiver { if write != read { let slice = map.as_ref(); let start = read + 4; - let size = bincode::deserialize::(&slice[read as usize..start as usize])?; + let size = u32::from_le_bytes(slice[read as usize..start as usize].try_into()?); + if size > 0 { let end = start + size; break Some(( - bincode::deserialize(&slice[start as usize..end as usize])?, + T::deserialize_from_bytes(&slice[start as usize..end as usize])?, + end, + )); + } else if write < read { + read = BEGINNING; + let mut lock = buffer.lock()?; + buffer.header().read.store(read, Relaxed); + lock.notify_all()?; + } else { + return Err(Error::Runtime("corrupt ring buffer".into())); + } + } else { + break None; + } + }) + } + + fn try_zc_recv_0<'a, T: ShmZeroCopyDeserializer<'a>>(&'a self) -> Result> { + let buffer = self.0 .0.buffer(); + let map = buffer.map(); + + let mut read = buffer.header().read.load(Relaxed); + let write = buffer.header().write.load(Acquire); + + Ok(loop { + if write != read { + let slice = map.as_ref(); + let start = read + 4; + let size = u32::from_le_bytes(slice[read as usize..start as usize].try_into()?); + + if size > 0 { + let end = start + size; + break Some(( + T::deserialize_from_bytes(&slice[start as usize..end as usize])?, end, )); } else if write < read { @@ -268,11 +311,9 @@ impl Receiver { } /// Attempt to read a message, blocking if necessary until one becomes available. - pub fn recv(&self) -> Result - where - T: for<'de> Deserialize<'de>, + pub fn recv(&self) -> Result { - let (value, position) = self.recv_timeout_0(None)?.unwrap(); + let (value, position) = self.recv_timeout_0::(None)?.unwrap(); self.seek(position)?; @@ -281,12 +322,10 @@ impl Receiver { /// Attempt to read a message, blocking for up to the specified duration if necessary until one becomes /// available. - pub fn recv_timeout(&self, timeout: Duration) -> Result> - where - T: for<'de> Deserialize<'de>, + pub fn recv_timeout(&self, timeout: Duration) -> Result> { Ok( - if let Some((value, position)) = self.recv_timeout_0(Some(timeout))? { + if let Some((value, position)) = self.recv_timeout_0::(Some(timeout))? { self.seek(position)?; Some(value) @@ -320,13 +359,43 @@ impl Receiver { } } - fn recv_timeout_0<'a, T: Deserialize<'a>>( + fn recv_timeout_0<'a, T: ShmDeserializer>( + &'a self, + timeout: Option, + ) -> Result> { + let mut deadline = None; + loop { + if let Some(value_and_position) = self.try_recv_0::()? { + return Ok(Some(value_and_position)); + } + + let buffer = self.0 .0.buffer(); + + let mut now = Instant::now(); + deadline = deadline.or_else(|| timeout.map(|timeout| now + timeout)); + + let read = buffer.header().read.load(Relaxed); + + let mut lock = buffer.lock()?; + while read == buffer.header().write.load(Acquire) { + if deadline.map(|deadline| deadline > now).unwrap_or(true) { + lock.timed_wait(&self.0 .0, deadline.map(|deadline| deadline - now))?; + + now = Instant::now(); + } else { + return Ok(None); + } + } + } + } + + fn recv_zc_timeout_0<'a, T: ShmZeroCopyDeserializer<'a>>( &'a self, timeout: Option, ) -> Result> { let mut deadline = None; loop { - if let Some(value_and_position) = self.try_recv_0()? { + if let Some(value_and_position) = self.try_zc_recv_0::()? { return Ok(Some(value_and_position)); } @@ -371,12 +440,12 @@ impl<'a> ZeroCopyContext<'a> { /// This will return `Ok(None)` if there are no messages immediately available. It will return /// `Err(`[`Error::AlreadyReceived`](enum.Error.html#variant.AlreadyReceived)`))` if this instance has already /// been used to read a message. - pub fn try_recv<'b, T: Deserialize<'b>>(&'b mut self) -> Result> { + pub fn try_recv<'b, T: ShmZeroCopyDeserializer<'b>>(&'b mut self) -> Result> { if self.position.is_some() { Err(Error::AlreadyReceived) } else { Ok( - if let Some((value, position)) = self.receiver.try_recv_0()? { + if let Some((value, position)) = self.receiver.try_zc_recv_0::()? { self.position = Some(position); Some(value) } else { @@ -390,8 +459,8 @@ impl<'a> ZeroCopyContext<'a> { /// /// This will return `Err(`[`Error::AlreadyReceived`](enum.Error.html#variant.AlreadyReceived)`))` if this /// instance has already been used to read a message. - pub fn recv<'b, T: Deserialize<'b>>(&'b mut self) -> Result { - let (value, position) = self.receiver.recv_timeout_0(None)?.unwrap(); + pub fn recv<'b, T: ShmZeroCopyDeserializer<'b>>(&'b mut self) -> Result { + let (value, position) = self.receiver.recv_zc_timeout_0::(None)?.unwrap(); self.position = Some(position); @@ -403,7 +472,7 @@ impl<'a> ZeroCopyContext<'a> { /// /// This will return `Err(`[`Error::AlreadyReceived`](enum.Error.html#variant.AlreadyReceived)`))` if this /// instance has already been used to read a message. - pub fn recv_timeout<'b, T: Deserialize<'b>>( + pub fn recv_timeout<'b, T: ShmZeroCopyDeserializer<'b>>( &'b mut self, timeout: Duration, ) -> Result> { @@ -411,7 +480,7 @@ impl<'a> ZeroCopyContext<'a> { Err(Error::AlreadyReceived) } else { Ok( - if let Some((value, position)) = self.receiver.recv_timeout_0(Some(timeout))? { + if let Some((value, position)) = self.receiver.recv_zc_timeout_0::(Some(timeout))? { self.position = Some(position); Some(value) } else { @@ -448,8 +517,8 @@ impl Sender { /// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the serialized size is /// greater than the ring buffer capacity, this method will return /// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`. - pub fn send(&self, value: &impl Serialize) -> Result<()> { - self.send_timeout_0(value, false, None).map(drop) + pub fn send(&self, value: &T) -> Result<()> { + self.send_timeout_0::(value, false, None).map(drop) } /// Send the specified message, waiting for sufficient contiguous space to become available in the ring buffer @@ -461,8 +530,8 @@ impl Sender { /// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the serialized size is /// greater than the ring buffer capacity, this method will return /// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`. - pub fn send_timeout(&self, value: &impl Serialize, timeout: Duration) -> Result { - self.send_timeout_0(value, false, Some(timeout)) + pub fn send_timeout(&self, value: &T, timeout: Duration) -> Result { + self.send_timeout_0::(value, false, Some(timeout)) } /// Send the specified message, waiting for the ring buffer to become completely empty first. @@ -474,20 +543,25 @@ impl Sender { /// `Err(`[`Error::ZeroSizedMessage`](enum.Error.html#variant.ZeroSizedMessage)`))`. If the serialized size /// is greater than the ring buffer capacity, this method will return /// `Err(`[`Error::MessageTooLarge`](enum.Error.html#variant.MessageTooLarge)`))`. - pub fn send_when_empty(&self, value: &impl Serialize) -> Result<()> { - self.send_timeout_0(value, true, None).map(drop) + pub fn send_when_empty(&self, value: &T) -> Result<()> { + self.send_timeout_0::(value, true, None).map(drop) } - fn send_timeout_0( + fn send_timeout_0( &self, - value: &impl Serialize, + value: &impl ShmSerializer, wait_until_empty: bool, timeout: Option, ) -> Result { let buffer = self.0 .0.buffer(); let map = self.0 .0.map_mut(); - let size = bincode::serialized_size(value)? as u32; + let bytes = value.serialize()?; + + let size = bytes.len() as u32; + + // DELETE + // let size = bincode::serialized_size(value)? as u32; if size == 0 { return Err(Error::ZeroSizedMessage); @@ -512,10 +586,8 @@ impl Sender { } else if read != BEGINNING { assert!(write > BEGINNING); - bincode::serialize_into( - &mut map[write as usize..(write + 4) as usize], - &0_u32, - )?; + map[write as usize..(write + 4) as usize].copy_from_slice(&0_u32.to_le_bytes()); + write = BEGINNING; buffer.header().write.store(write, Release); lock.notify_all()?; @@ -536,10 +608,14 @@ impl Sender { } let start = write + 4; - bincode::serialize_into(&mut map[write as usize..start as usize], &size)?; + map[write as usize..start as usize].copy_from_slice(&size.to_le_bytes()); + // DELETE + //bincode::serialize_into(&mut map[write as usize..start as usize], &size)?; let end = start + size; - bincode::serialize_into(&mut map[start as usize..end as usize], value)?; + map[start as usize..end as usize].copy_from_slice(&bytes); + // DELETE + // bincode::serialize_into(&mut map[start as usize..end as usize], value)?; buffer.header().write.store(end, Release); @@ -554,8 +630,41 @@ mod tests { use super::*; use anyhow::{anyhow, Result}; use proptest::{arbitrary::any, collection::vec, prop_assume, proptest, strategy::Strategy}; + use serde::{Serialize, Deserialize}; use std::thread; + #[derive(Debug)] + pub struct BincodeZeroCopyDeserializer(pub T); + + impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer + where + T: Deserialize<'de>, + { + fn deserialize_from_bytes(bytes: &'de [u8]) -> super::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } + } + + #[derive(Debug)] + pub struct BincodeDeserializer(pub T); + + impl ShmDeserializer for BincodeDeserializer + where T: for<'de> Deserialize<'de> + { + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> super::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } + } + + #[derive(Debug)] + pub struct BincodeSerializer(pub T); + + impl ShmSerializer for BincodeSerializer { + fn serialize(&self) -> super::Result> { + Ok(bincode::serialize(&self.0)?) + } + } + #[derive(Debug)] struct Case { channel_size: u32, @@ -573,8 +682,8 @@ mod tests { let expected = self.data.clone(); thread::spawn(move || -> Result<()> { for item in &expected { - let received = rx.recv::>()?; - assert_eq!(item, &received); + let received = rx.recv::>>()?; + assert_eq!(item, &received.0); } Ok(()) @@ -585,7 +694,7 @@ mod tests { let expected = self.data.len() * self.sender_count as usize; thread::spawn(move || -> Result<()> { for _ in 0..expected { - rx.recv::>()?; + rx.recv::>>()?; } Ok(()) }) @@ -601,7 +710,7 @@ mod tests { let tx = Sender::new(SharedRingBuffer::open(&name)?); for item in data.as_ref() { - tx.send(item)?; + tx.send(&BincodeSerializer(item))?; } Ok(()) @@ -662,17 +771,17 @@ mod tests { let mut rx = Receiver::new(buffer); let tx = Sender::new(SharedRingBuffer::open(&name)?); - tx.send(&sent)?; - tx.send(&42_u32)?; + tx.send(&BincodeSerializer(&sent))?; + tx.send(&BincodeSerializer(42_u32))?; { let mut rx = rx.zero_copy_context(); - let received = rx.recv()?; + let received = rx.recv::>()?; - assert_eq!(sent, received); + assert_eq!(sent, received.0); } - assert_eq!(42_u32, rx.recv()?); + assert_eq!(42_u32, rx.recv::>()?.0); Ok(()) } @@ -685,12 +794,12 @@ mod tests { let sender = os::test::fork(move || { thread::sleep(Duration::from_secs(1)); - tx.send(&42_u32).map_err(anyhow::Error::from) + tx.send(&BincodeSerializer(42_u32)).map_err(anyhow::Error::from) })?; loop { - if let Some(value) = rx.recv_timeout(Duration::from_millis(1))? { - assert_eq!(42_u32, value); + if let Some(value) = rx.recv_timeout::>(Duration::from_millis(1))? { + assert_eq!(42_u32, value.0); break; } } @@ -707,13 +816,13 @@ mod tests { let tx = Sender::new(SharedRingBuffer::open(&name)?); let sender = os::test::fork(move || loop { - if tx.send_timeout(&42_u32, Duration::from_millis(1))? { + if tx.send_timeout(&BincodeSerializer(42_u32), Duration::from_millis(1))? { break Ok(()); } })?; thread::sleep(Duration::from_secs(1)); - assert_eq!(42_u32, rx.recv()?); + assert_eq!(42_u32, rx.recv::>()?.0); sender.join().map_err(|e| anyhow!("{:?}", e))??; From 6258d3ca60a3844ad3166c0b23ad5e8d20527e00 Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 08:41:11 -0400 Subject: [PATCH 02/10] saving --- src/lib.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 59ec686..3095b1a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,9 +117,6 @@ pub trait ShmSerializer { fn serialize(&self) -> Result>; } -// pub trait ShmDeserializer { -// fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> Result where Self: Sized; -// } pub trait ShmZeroCopyDeserializer<'de>: Sized { fn deserialize_from_bytes(bytes: &'de [u8]) -> Result; } @@ -560,9 +557,6 @@ impl Sender { let size = bytes.len() as u32; - // DELETE - // let size = bincode::serialized_size(value)? as u32; - if size == 0 { return Err(Error::ZeroSizedMessage); } @@ -609,13 +603,9 @@ impl Sender { let start = write + 4; map[write as usize..start as usize].copy_from_slice(&size.to_le_bytes()); - // DELETE - //bincode::serialize_into(&mut map[write as usize..start as usize], &size)?; let end = start + size; map[start as usize..end as usize].copy_from_slice(&bytes); - // DELETE - // bincode::serialize_into(&mut map[start as usize..end as usize], value)?; buffer.header().write.store(end, Release); From 5c506ea47dfdf1450ec716facf69164da5b5ebcf Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 08:43:59 -0400 Subject: [PATCH 03/10] saving --- src/lib.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3095b1a..ea75ce9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -87,6 +87,7 @@ pub enum Error { #[error(transparent)] Bincode(#[from] bincode::Error), + /// Errors from converting little endian bytes to u32 will be caught here. #[error(transparent)] TryFrom(#[from] TryFromSliceError), } @@ -113,17 +114,23 @@ fn map(file: &File) -> Result { } } +/// Trait apis to decouple the serialization backend from the mechanical send/recv +/// For a writer to work the payload must implement this trait pub trait ShmSerializer { fn serialize(&self) -> Result>; } -pub trait ShmZeroCopyDeserializer<'de>: Sized { - fn deserialize_from_bytes(bytes: &'de [u8]) -> Result; -} +/// For a reader to work they payload must implement this trait pub trait ShmDeserializer: Sized { fn deserialize_from_bytes(bytes: &[u8]) -> Result; } +/// To use the zero_copy_context the payload must implement this trait allowing for more +/// explict lifetimes +pub trait ShmZeroCopyDeserializer<'de>: Sized { + fn deserialize_from_bytes(bytes: &'de [u8]) -> Result; +} + /// Represents a file-backed shared memory ring buffer, suitable for constructing a /// [`Receiver`](struct.Receiver.html) or [`Sender`](struct.Sender.html). /// From 2cc74bf3292de706ebac45e904caf5717e26181f Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 11:20:54 -0400 Subject: [PATCH 04/10] saving --- ipc-benchmarks/src/lib.rs | 61 +++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/ipc-benchmarks/src/lib.rs b/ipc-benchmarks/src/lib.rs index dcb70da..a151698 100644 --- a/ipc-benchmarks/src/lib.rs +++ b/ipc-benchmarks/src/lib.rs @@ -2,9 +2,8 @@ extern crate test; -use serde_derive::{Deserialize, Serialize}; - -use std::time::Duration; +use ipmpsc::{ShmDeserializer, ShmSerializer, ShmZeroCopyDeserializer}; +use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] pub struct YuvFrameInfo { @@ -37,12 +36,44 @@ pub struct OwnedYuvFrame { pub v_pixels: Vec, } +#[derive(Debug)] +pub struct BincodeZeroCopyDeserializer(pub T); + +impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer +where + T: Deserialize<'de>, +{ + fn deserialize_from_bytes(bytes: &'de [u8]) -> ipmpsc::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} + +#[derive(Debug)] +pub struct BincodeDeserializer(pub T); + +impl ShmDeserializer for BincodeDeserializer +where T: for<'de> Deserialize<'de> +{ + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> ipmpsc::Result { + Ok(Self(bincode::deserialize::(bytes)?)) + } +} + +#[derive(Debug)] +pub struct BincodeSerializer(pub T); + +impl ShmSerializer for BincodeSerializer { + fn serialize(&self) -> ipmpsc::Result> { + Ok(bincode::serialize(&self.0)?) + } +} + #[cfg(test)] mod tests { use super::*; use anyhow::{anyhow, Error, Result}; use ipc_channel::ipc; - use ipmpsc::{Receiver, Sender, SharedRingBuffer}; + use ipmpsc::{Receiver, Sender, SharedRingBuffer, ShmDeserializer, ShmSerializer}; use test::Bencher; const SMALL: (usize, usize) = (3, 2); @@ -97,8 +128,8 @@ mod tests { v_pixels: &v_pixels, }; - while exit_rx.try_recv::()?.is_none() { - tx.send_timeout(&frame, Duration::from_millis(100))?; + while exit_rx.try_recv::>()?.is_none() { + tx.send_timeout(&BincodeSerializer(&frame), Duration::from_millis(100))?; } Ok(()) @@ -107,20 +138,20 @@ mod tests { // wait for first frame to arrive { let mut context = rx.zero_copy_context(); - if let Err(e) = context.recv::() { + if let Err(e) = context.recv::>() { panic!("error receiving: {:?}", e); }; } bencher.iter(|| { let mut context = rx.zero_copy_context(); - match context.recv::() { + match context.recv::>() { Err(e) => panic!("error receiving: {:?}", e), Ok(frame) => test::black_box(&frame), }; }); - exit_tx.send(&1_u8)?; + exit_tx.send(&BincodeSerializer(1_u8))?; sender.join().map_err(|e| anyhow!("{:?}", e))??; @@ -147,7 +178,7 @@ mod tests { let exit_buffer = SharedRingBuffer::open(&exit_name)?; let exit_rx = Receiver::new(exit_buffer); - while exit_rx.try_recv::()?.is_none() { + while exit_rx.try_recv::>()?.is_none() { let y_pixels = vec![128_u8; y_stride(width) * height]; let u_pixels = vec![192_u8; uv_stride(width) * height / 2]; let v_pixels = vec![255_u8; uv_stride(width) * height / 2]; @@ -166,7 +197,7 @@ mod tests { }; if let Err(e) = tx.send(frame) { - if exit_rx.try_recv::()?.is_none() { + if exit_rx.try_recv::>()?.is_none() { return Err(Error::from(e)); } else { break; @@ -187,7 +218,7 @@ mod tests { }; }); - exit_tx.send(&1_u8)?; + exit_tx.send(&BincodeSerializer(1_u8))?; while rx.recv().is_ok() {} @@ -239,10 +270,10 @@ mod tests { let size = bincode::serialized_size(&frame).unwrap() as usize; let mut buffer = vec![0_u8; size]; - while exit_rx.try_recv::()?.is_none() { + while exit_rx.try_recv::>()?.is_none() { bincode::serialize_into(&mut buffer as &mut [u8], &frame).unwrap(); if let Err(e) = tx.send(&buffer) { - if exit_rx.try_recv::()?.is_none() { + if exit_rx.try_recv::>()?.is_none() { return Err(Error::from(e)); } else { break; @@ -263,7 +294,7 @@ mod tests { }; }); - exit_tx.send(&1_u8)?; + exit_tx.send(&BincodeSerializer(1_u8))?; while rx.recv().is_ok() {} From 2041573a78634abc48daf88a2f2acc7efc80f43d Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 11:21:20 -0400 Subject: [PATCH 05/10] saving --- ipc-benchmarks/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ipc-benchmarks/src/lib.rs b/ipc-benchmarks/src/lib.rs index a151698..eace78d 100644 --- a/ipc-benchmarks/src/lib.rs +++ b/ipc-benchmarks/src/lib.rs @@ -70,6 +70,8 @@ impl ShmSerializer for BincodeSerializer { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use anyhow::{anyhow, Error, Result}; use ipc_channel::ipc; From 0a7750b32a3fc7e132e6bac9b9b5dbae8d467d49 Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 11:26:15 -0400 Subject: [PATCH 06/10] saving --- ipc-benchmarks/src/lib.rs | 3 +-- src/lib.rs | 2 +- src/posix.rs | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/ipc-benchmarks/src/lib.rs b/ipc-benchmarks/src/lib.rs index eace78d..4f5d431 100644 --- a/ipc-benchmarks/src/lib.rs +++ b/ipc-benchmarks/src/lib.rs @@ -71,11 +71,10 @@ impl ShmSerializer for BincodeSerializer { #[cfg(test)] mod tests { use std::time::Duration; - use super::*; use anyhow::{anyhow, Error, Result}; use ipc_channel::ipc; - use ipmpsc::{Receiver, Sender, SharedRingBuffer, ShmDeserializer, ShmSerializer}; + use ipmpsc::{Receiver, Sender, SharedRingBuffer}; use test::Bencher; const SMALL: (usize, usize) = (3, 2); diff --git a/src/lib.rs b/src/lib.rs index ea75ce9..c25956a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -356,7 +356,7 @@ impl Receiver { /// 3. A given [`ZeroCopyContext`](struct.ZeroCopyContext.html) can only be used to deserialize a single /// message before it must be discarded since the read pointer is advanced only when the instance is dropped /// (enforced at run time). - pub fn zero_copy_context(&mut self) -> ZeroCopyContext { + pub fn zero_copy_context(&mut self) -> ZeroCopyContext<'_> { ZeroCopyContext { receiver: self, position: None, diff --git a/src/posix.rs b/src/posix.rs index 163b4d3..c94054a 100644 --- a/src/posix.rs +++ b/src/posix.rs @@ -106,7 +106,7 @@ impl Buffer { } } - pub fn lock(&self) -> Result { + pub fn lock(&self) -> Result> { Lock::try_new(self) } @@ -122,7 +122,7 @@ impl Buffer { pub struct Lock<'a>(&'a Buffer); impl<'a> Lock<'a> { - pub fn try_new(buffer: &Buffer) -> Result { + pub fn try_new(buffer: &Buffer) -> Result> { unsafe { nonzero!(libc::pthread_mutex_lock(buffer.header().mutex.get()))?; } From 59054ce5bd6970917645f1a59c8211df4a0d9e5f Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 11:31:45 -0400 Subject: [PATCH 07/10] saving --- ipc-benchmarks/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ipc-benchmarks/src/lib.rs b/ipc-benchmarks/src/lib.rs index 4f5d431..1377b52 100644 --- a/ipc-benchmarks/src/lib.rs +++ b/ipc-benchmarks/src/lib.rs @@ -43,6 +43,7 @@ impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer where T: Deserialize<'de>, { + #[inline(always)] fn deserialize_from_bytes(bytes: &'de [u8]) -> ipmpsc::Result { Ok(Self(bincode::deserialize::(bytes)?)) } @@ -54,6 +55,7 @@ pub struct BincodeDeserializer(pub T); impl ShmDeserializer for BincodeDeserializer where T: for<'de> Deserialize<'de> { + #[inline(always)] fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> ipmpsc::Result { Ok(Self(bincode::deserialize::(bytes)?)) } @@ -63,6 +65,7 @@ where T: for<'de> Deserialize<'de> pub struct BincodeSerializer(pub T); impl ShmSerializer for BincodeSerializer { + #[inline(always)] fn serialize(&self) -> ipmpsc::Result> { Ok(bincode::serialize(&self.0)?) } From 70ee49421343900637091e0d402fd1ef92cf3187 Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 11:59:09 -0400 Subject: [PATCH 08/10] saving --- examples/ipmpsc-receive.rs | 20 +++++-- examples/ipmpsc-send.rs | 4 +- src/lib.rs | 109 ++++++++++++++++++++++++++++--------- 3 files changed, 100 insertions(+), 33 deletions(-) diff --git a/examples/ipmpsc-receive.rs b/examples/ipmpsc-receive.rs index c32e0ba..c4e3e84 100644 --- a/examples/ipmpsc-receive.rs +++ b/examples/ipmpsc-receive.rs @@ -2,7 +2,7 @@ use clap::{App, Arg}; use ipmpsc::{Receiver, SharedRingBuffer, ShmDeserializer, ShmZeroCopyDeserializer}; -use serde::{Deserialize}; +use serde::Deserialize; #[derive(Debug)] pub struct BincodeZeroCopyDeserializer(pub T); @@ -11,7 +11,9 @@ impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer where T: Deserialize<'de>, { - fn deserialize_from_bytes(bytes: &'de [u8]) -> ipmpsc::Result { + type Error = bincode::Error; + + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result { Ok(Self(bincode::deserialize::(bytes)?)) } } @@ -20,14 +22,16 @@ where pub struct BincodeDeserializer(pub T); impl ShmDeserializer for BincodeDeserializer -where T: for<'de> Deserialize<'de> +where + T: for<'de> Deserialize<'de>, { - fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> ipmpsc::Result { + type Error = bincode::Error; + + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result { Ok(Self(bincode::deserialize::(bytes)?)) } } - fn main() -> Result<(), Box> { let matches = App::new("ipmpsc-send") .about("ipmpsc sender example") @@ -59,7 +63,11 @@ fn main() -> Result<(), Box> { loop { if zero_copy { - println!("received {:?}", rx.zero_copy_context().recv::>()?); + println!( + "received {:?}", + rx.zero_copy_context() + .recv::>()? + ); } else { println!("received {:?}", rx.recv::>()?); } diff --git a/examples/ipmpsc-send.rs b/examples/ipmpsc-send.rs index 2e4121d..4449271 100644 --- a/examples/ipmpsc-send.rs +++ b/examples/ipmpsc-send.rs @@ -9,7 +9,9 @@ use std::io::{self, BufRead}; pub struct BincodeSerializer(pub T); impl ShmSerializer for BincodeSerializer { - fn serialize(&self) -> ipmpsc::Result> { + type Error = bincode::Error; + + fn serialize(&self) -> std::result::Result, Self::Error> { Ok(bincode::serialize(&self.0)?) } } diff --git a/src/lib.rs b/src/lib.rs index c25956a..09628dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,10 +10,18 @@ use memmap2::MmapMut; use os::{Buffer, Header, View}; use std::{ - array::TryFromSliceError, cell::UnsafeCell, convert::TryInto, ffi::c_void, fs::{File, OpenOptions}, mem, sync::{ + array::TryFromSliceError, + cell::UnsafeCell, + convert::TryInto, + ffi::c_void, + fmt::{Debug, Display}, + fs::{File, OpenOptions}, + mem, + sync::{ atomic::Ordering::{Acquire, Relaxed, Release}, Arc, - }, time::{Duration, Instant} + }, + time::{Duration, Instant}, }; use tempfile::NamedTempFile; use thiserror::Error as ThisError; @@ -83,9 +91,9 @@ pub enum Error { #[error(transparent)] Io(#[from] std::io::Error), - /// Wrapped bincode error encountered during (de)serialization. + /// Errors from ser/deser operations. #[error(transparent)] - Bincode(#[from] bincode::Error), + Serialize(#[from] SerializeError), /// Errors from converting little endian bytes to u32 will be caught here. #[error(transparent)] @@ -114,21 +122,39 @@ fn map(file: &File) -> Result { } } +/// `ipmpsc`-specific error type +#[derive(ThisError, Debug)] +pub enum SerializeError { + #[error("failed to serialize: {0:?} => {1}@{2}")] + FailedSerialize(String, u32, &'static str), + + #[error("failed to deserialize: {0:?} => {1}@{2}")] + FailedDeserialize(String, u32, &'static str), +} + +pub type SerializeResult = std::result::Result; + /// Trait apis to decouple the serialization backend from the mechanical send/recv /// For a writer to work the payload must implement this trait pub trait ShmSerializer { - fn serialize(&self) -> Result>; + type Error: Display; + + fn serialize(&self) -> std::result::Result, Self::Error>; } /// For a reader to work they payload must implement this trait pub trait ShmDeserializer: Sized { - fn deserialize_from_bytes(bytes: &[u8]) -> Result; + type Error: Display; + + fn deserialize_from_bytes(bytes: &[u8]) -> std::result::Result; } /// To use the zero_copy_context the payload must implement this trait allowing for more /// explict lifetimes pub trait ShmZeroCopyDeserializer<'de>: Sized { - fn deserialize_from_bytes(bytes: &'de [u8]) -> Result; + type Error: Display; + + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result; } /// Represents a file-backed shared memory ring buffer, suitable for constructing a @@ -237,8 +263,7 @@ impl Receiver { /// Attempt to read a message without blocking. /// /// This will return `Ok(None)` if there are no messages immediately available. - pub fn try_recv(&self) -> Result> - { + pub fn try_recv(&self) -> Result> { Ok(if let Some((value, position)) = self.try_recv_0::()? { self.seek(position)?; @@ -264,7 +289,15 @@ impl Receiver { if size > 0 { let end = start + size; break Some(( - T::deserialize_from_bytes(&slice[start as usize..end as usize])?, + T::deserialize_from_bytes(&slice[start as usize..end as usize]).map_err( + |e| { + Error::Serialize(SerializeError::FailedDeserialize( + e.to_string(), + line!(), + file!(), + )) + }, + )?, end, )); } else if write < read { @@ -297,7 +330,15 @@ impl Receiver { if size > 0 { let end = start + size; break Some(( - T::deserialize_from_bytes(&slice[start as usize..end as usize])?, + T::deserialize_from_bytes(&slice[start as usize..end as usize]).map_err( + |e| { + Error::Serialize(SerializeError::FailedDeserialize( + e.to_string(), + line!(), + file!(), + )) + }, + )?, end, )); } else if write < read { @@ -315,8 +356,7 @@ impl Receiver { } /// Attempt to read a message, blocking if necessary until one becomes available. - pub fn recv(&self) -> Result - { + pub fn recv(&self) -> Result { let (value, position) = self.recv_timeout_0::(None)?.unwrap(); self.seek(position)?; @@ -326,8 +366,7 @@ impl Receiver { /// Attempt to read a message, blocking for up to the specified duration if necessary until one becomes /// available. - pub fn recv_timeout(&self, timeout: Duration) -> Result> - { + pub fn recv_timeout(&self, timeout: Duration) -> Result> { Ok( if let Some((value, position)) = self.recv_timeout_0::(Some(timeout))? { self.seek(position)?; @@ -484,7 +523,9 @@ impl<'a> ZeroCopyContext<'a> { Err(Error::AlreadyReceived) } else { Ok( - if let Some((value, position)) = self.receiver.recv_zc_timeout_0::(Some(timeout))? { + if let Some((value, position)) = + self.receiver.recv_zc_timeout_0::(Some(timeout))? + { self.position = Some(position); Some(value) } else { @@ -560,7 +601,13 @@ impl Sender { let buffer = self.0 .0.buffer(); let map = self.0 .0.map_mut(); - let bytes = value.serialize()?; + let bytes = value.serialize().map_err(|e| { + Error::Serialize(SerializeError::FailedSerialize( + e.to_string(), + line!(), + file!(), + )) + })?; let size = bytes.len() as u32; @@ -627,7 +674,7 @@ mod tests { use super::*; use anyhow::{anyhow, Result}; use proptest::{arbitrary::any, collection::vec, prop_assume, proptest, strategy::Strategy}; - use serde::{Serialize, Deserialize}; + use serde::{Deserialize, Serialize}; use std::thread; #[derive(Debug)] @@ -637,7 +684,9 @@ mod tests { where T: Deserialize<'de>, { - fn deserialize_from_bytes(bytes: &'de [u8]) -> super::Result { + type Error = bincode::Error; + + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result { Ok(Self(bincode::deserialize::(bytes)?)) } } @@ -646,9 +695,12 @@ mod tests { pub struct BincodeDeserializer(pub T); impl ShmDeserializer for BincodeDeserializer - where T: for<'de> Deserialize<'de> + where + T: for<'de> Deserialize<'de>, { - fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> super::Result { + type Error = bincode::Error; + + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result { Ok(Self(bincode::deserialize::(bytes)?)) } } @@ -657,7 +709,9 @@ mod tests { pub struct BincodeSerializer(pub T); impl ShmSerializer for BincodeSerializer { - fn serialize(&self) -> super::Result> { + type Error = bincode::Error; + + fn serialize(&self) -> std::result::Result, Self::Error> { Ok(bincode::serialize(&self.0)?) } } @@ -679,7 +733,7 @@ mod tests { let expected = self.data.clone(); thread::spawn(move || -> Result<()> { for item in &expected { - let received = rx.recv::>>()?; + let received = rx.recv::>>()?; assert_eq!(item, &received.0); } @@ -691,7 +745,7 @@ mod tests { let expected = self.data.len() * self.sender_count as usize; thread::spawn(move || -> Result<()> { for _ in 0..expected { - rx.recv::>>()?; + rx.recv::>>()?; } Ok(()) }) @@ -791,11 +845,14 @@ mod tests { let sender = os::test::fork(move || { thread::sleep(Duration::from_secs(1)); - tx.send(&BincodeSerializer(42_u32)).map_err(anyhow::Error::from) + tx.send(&BincodeSerializer(42_u32)) + .map_err(anyhow::Error::from) })?; loop { - if let Some(value) = rx.recv_timeout::>(Duration::from_millis(1))? { + if let Some(value) = + rx.recv_timeout::>(Duration::from_millis(1))? + { assert_eq!(42_u32, value.0); break; } From 7c6595c756eaea99b0d2afe305aa4bfa93d4b8c2 Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 14 Oct 2025 12:00:35 -0400 Subject: [PATCH 09/10] saving --- ipc-benchmarks/src/lib.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ipc-benchmarks/src/lib.rs b/ipc-benchmarks/src/lib.rs index 1377b52..23cb27e 100644 --- a/ipc-benchmarks/src/lib.rs +++ b/ipc-benchmarks/src/lib.rs @@ -43,8 +43,10 @@ impl<'de, T> ShmZeroCopyDeserializer<'de> for BincodeZeroCopyDeserializer where T: Deserialize<'de>, { + type Error = bincode::Error; + #[inline(always)] - fn deserialize_from_bytes(bytes: &'de [u8]) -> ipmpsc::Result { + fn deserialize_from_bytes(bytes: &'de [u8]) -> std::result::Result { Ok(Self(bincode::deserialize::(bytes)?)) } } @@ -55,8 +57,10 @@ pub struct BincodeDeserializer(pub T); impl ShmDeserializer for BincodeDeserializer where T: for<'de> Deserialize<'de> { + type Error = bincode::Error; + #[inline(always)] - fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> ipmpsc::Result { + fn deserialize_from_bytes<'de>(bytes: &'de [u8]) -> std::result::Result { Ok(Self(bincode::deserialize::(bytes)?)) } } @@ -65,8 +69,10 @@ where T: for<'de> Deserialize<'de> pub struct BincodeSerializer(pub T); impl ShmSerializer for BincodeSerializer { + type Error = bincode::Error; + #[inline(always)] - fn serialize(&self) -> ipmpsc::Result> { + fn serialize(&self) -> std::result::Result, Self::Error> { Ok(bincode::serialize(&self.0)?) } } From 8029eab15e5edd98791827cb56f5d7c0e1e29b46 Mon Sep 17 00:00:00 2001 From: andrew-manifold <102973930+andrew-manifold@users.noreply.github.com> Date: Tue, 21 Oct 2025 21:29:00 -0400 Subject: [PATCH 10/10] docs --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 09628dd..557418a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ //! Inter-Process Multiple Producer, Single Consumer Channels for Rust //! //! This library provides a type-safe, high-performance inter-process channel implementation based on a shared -//! memory ring buffer. It uses [bincode](https://github.com/TyOverby/bincode) for (de)serialization, including +//! memory ring buffer. It is agnostic to (de)serialization backends and includes the capability for //! zero-copy deserialization, making it ideal for messages with large `&str` or `&[u8]` fields. And it has a name //! that rolls right off the tongue.