From bfd47780c33d51f9c0c708edb878b53814f9152d Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Wed, 18 Mar 2026 09:06:12 +0100 Subject: [PATCH 1/2] implement serialized subscription --- rclrs/src/dynamic_message.rs | 4 +- rclrs/src/lib.rs | 2 + rclrs/src/node.rs | 30 ++++- rclrs/src/serialized_subscription.rs | 188 +++++++++++++++++++++++++++ 4 files changed, 218 insertions(+), 6 deletions(-) create mode 100644 rclrs/src/serialized_subscription.rs diff --git a/rclrs/src/dynamic_message.rs b/rclrs/src/dynamic_message.rs index aae749d0..9c61e57e 100644 --- a/rclrs/src/dynamic_message.rs +++ b/rclrs/src/dynamic_message.rs @@ -120,7 +120,7 @@ pub struct DynamicMessage { } /// This is an analogue of rclcpp::get_typesupport_library. -fn get_type_support_library( +pub(crate) fn get_type_support_library( package_name: &str, type_support_identifier: &str, ) -> Result, DynamicMessageError> { @@ -162,7 +162,7 @@ fn get_type_support_library( /// /// It is unsafe because it would be theoretically possible to pass in a library that has /// the expected symbol defined, but with an unexpected type. -unsafe fn get_type_support_handle( +pub(crate) unsafe fn get_type_support_handle( type_support_library: &libloading::Library, type_support_identifier: &str, message_type: &MessageTypeName, diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 9db4767f..854bec2c 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -192,6 +192,7 @@ mod node; mod parameter; mod publisher; mod qos; +mod serialized_subscription; mod service; mod subscription; mod time; @@ -221,6 +222,7 @@ pub use parameter::*; pub use publisher::*; pub use qos::*; pub use rcl_bindings::rmw_request_id_t; +pub use serialized_subscription::*; pub use service::*; pub use subscription::*; pub use time::*; diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index ed436df2..8b8ac7cb 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -43,10 +43,10 @@ use crate::{ IntoAsyncSubscriptionCallback, IntoNodeServiceCallback, IntoNodeSubscriptionCallback, IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, IntoTimerOptions, LogParams, Logger, MessageInfo, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, - Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, Service, - ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState, - TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, WorkerOptions, WorkerState, - ENTITY_LIFECYCLE_MUTEX, + Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, + SerializedSubscription, SerializedSubscriptionState, Service, ServiceOptions, ServiceState, + Subscription, SubscriptionOptions, SubscriptionState, TerminatedGoal, TimeSource, Timer, + TimerState, ToLogParams, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX, }; /// A processing unit that can communicate with other nodes. See the API of @@ -944,6 +944,28 @@ impl NodeState { ) } + /// Creates a [`SerializedSubscription`] with an ordinary callback. + /// + /// The message type is determined at runtime, and the callback receives the serialized + /// ROS 2 payload bytes plus [`crate::MessageInfo`]. + pub fn create_serialized_subscription<'a, F>( + &self, + topic_type: MessageTypeName, + options: impl Into>, + callback: F, + ) -> Result + where + F: Fn(Vec, MessageInfo) + Send + Sync + 'static, + { + SerializedSubscriptionState::create( + topic_type, + options, + crate::serialized_subscription::NodeSerializedSubscriptionCallback::new(callback), + &self.handle, + self.commands.async_worker_commands(), + ) + } + /// Creates a [`DynamicSubscription`] with an async callback. /// /// For the behavior and API refer to [`Node::create_async_subscription`][1], except two key diff --git a/rclrs/src/serialized_subscription.rs b/rclrs/src/serialized_subscription.rs new file mode 100644 index 00000000..54fa2823 --- /dev/null +++ b/rclrs/src/serialized_subscription.rs @@ -0,0 +1,188 @@ +use std::{ + any::Any, + ffi::CString, + sync::{Arc, Mutex}, +}; + +use crate::{ + dynamic_message::{get_type_support_handle, get_type_support_library, MessageTypeName}, + rcl_bindings::*, + MessageInfo, NodeHandle, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, + RclrsErrorFilter, ReadyKind, SubscriptionHandle, SubscriptionOptions, ToResult, Waitable, + WaitableLifecycle, WorkerCommands, ENTITY_LIFECYCLE_MUTEX, +}; + +/// Struct for receiving serialized ROS 2 messages as raw bytes. +/// +/// Create a serialized subscription using [`NodeState::create_serialized_subscription`]. +pub type SerializedSubscription = Arc; + +struct SerializedSubscriptionExecutable { + handle: Arc, + callback: Arc>, +} + +struct SerializedMessageBuffer { + inner: rcl_serialized_message_t, +} + +pub(crate) struct NodeSerializedSubscriptionCallback( + Box, MessageInfo) + Send + Sync>, +); + +impl NodeSerializedSubscriptionCallback { + pub(crate) fn new(f: impl Fn(Vec, MessageInfo) + Send + Sync + 'static) -> Self { + Self(Box::new(f)) + } +} + +impl SerializedMessageBuffer { + fn new() -> Self { + Self { + inner: unsafe { rcutils_get_zero_initialized_uint8_array() }, + } + } + + fn take_bytes(&self) -> Vec { + unsafe { std::slice::from_raw_parts(self.inner.buffer, self.inner.buffer_length) }.to_vec() + } +} + +impl Drop for SerializedMessageBuffer { + fn drop(&mut self) { + if self.inner.allocator.allocate.is_some() { + unsafe { + rcutils_uint8_array_fini(&mut self.inner); + } + } + } +} + +impl SerializedSubscriptionExecutable { + fn take(&self) -> Result<(Vec, MessageInfo), RclrsError> { + let mut serialized_message = SerializedMessageBuffer::new(); + let mut message_info = unsafe { rmw_get_zero_initialized_message_info() }; + let rcl_subscription = &mut *self.handle.lock(); + + unsafe { + rcl_take_serialized_message( + rcl_subscription, + &mut serialized_message.inner, + &mut message_info, + std::ptr::null_mut(), + ) + .ok()?; + }; + + Ok(( + serialized_message.take_bytes(), + MessageInfo::from_rmw_message_info(&message_info), + )) + } +} + +impl RclPrimitive for SerializedSubscriptionExecutable { + unsafe fn execute( + &mut self, + ready: ReadyKind, + _payload: &mut dyn Any, + ) -> Result<(), RclrsError> { + ready.for_basic()?; + let evaluate = || { + let (msg, msg_info) = self.take()?; + (self.callback.lock().unwrap().0)(msg, msg_info); + Ok(()) + }; + + evaluate().take_failed_ok() + } + + fn kind(&self) -> RclPrimitiveKind { + RclPrimitiveKind::Subscription + } + + fn handle(&self) -> RclPrimitiveHandle<'_> { + RclPrimitiveHandle::Subscription(self.handle.lock()) + } +} + +/// Inner state of a [`SerializedSubscription`]. +pub struct SerializedSubscriptionState { + handle: Arc, + #[allow(unused)] + callback: Arc>, + #[allow(unused)] + lifecycle: WaitableLifecycle, + #[allow(dead_code)] + type_support_library: Arc, +} + +impl SerializedSubscriptionState { + pub(crate) fn create<'a>( + topic_type: MessageTypeName, + options: impl Into>, + callback: NodeSerializedSubscriptionCallback, + node_handle: &Arc, + commands: &Arc, + ) -> Result, RclrsError> { + let SubscriptionOptions { topic, qos } = options.into(); + let type_support_library = + get_type_support_library(&topic_type.package_name, "rosidl_typesupport_c")?; + let type_support_ptr = unsafe { + get_type_support_handle( + type_support_library.as_ref(), + "rosidl_typesupport_c", + &topic_type, + )? + }; + + let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul { + err, + s: topic.into(), + })?; + + let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() }; + rcl_subscription_options.qos = qos.into(); + let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() }; + { + let rcl_node = node_handle.rcl_node.lock()?; + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock()?; + unsafe { + rcl_subscription_init( + &mut rcl_subscription, + &*rcl_node, + type_support_ptr, + topic_c_string.as_ptr(), + &rcl_subscription_options, + ) + .ok()?; + } + } + + let handle = Arc::new(SubscriptionHandle { + rcl_subscription: Mutex::new(rcl_subscription), + node_handle: Arc::clone(node_handle), + }); + let callback = Arc::new(Mutex::new(callback)); + let (waitable, lifecycle) = Waitable::new( + Box::new(SerializedSubscriptionExecutable { + handle: Arc::clone(&handle), + callback: Arc::clone(&callback), + }), + Some(Arc::clone(commands.get_guard_condition())), + ); + commands.add_to_wait_set(waitable); + + Ok(Arc::new(Self { + handle, + callback, + lifecycle, + type_support_library, + })) + } + + /// Returns the topic name of the subscription. + pub fn topic_name(&self) -> String { + self.handle.topic_name() + } +} From b73073f497d0570e2e9b99206a9f97eefc54714d Mon Sep 17 00:00:00 2001 From: Alexey Timin Date: Wed, 18 Mar 2026 14:58:17 +0100 Subject: [PATCH 2/2] initialize serialized message buffer before take --- rclrs/src/serialized_subscription.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/rclrs/src/serialized_subscription.rs b/rclrs/src/serialized_subscription.rs index 54fa2823..fb435d07 100644 --- a/rclrs/src/serialized_subscription.rs +++ b/rclrs/src/serialized_subscription.rs @@ -37,10 +37,13 @@ impl NodeSerializedSubscriptionCallback { } impl SerializedMessageBuffer { - fn new() -> Self { - Self { - inner: unsafe { rcutils_get_zero_initialized_uint8_array() }, + fn new() -> Result { + let mut inner = unsafe { rcutils_get_zero_initialized_uint8_array() }; + let allocator = unsafe { rcutils_get_default_allocator() }; + unsafe { + rcutils_uint8_array_init(&mut inner, 0, &allocator).ok()?; } + Ok(Self { inner }) } fn take_bytes(&self) -> Vec { @@ -60,7 +63,7 @@ impl Drop for SerializedMessageBuffer { impl SerializedSubscriptionExecutable { fn take(&self) -> Result<(Vec, MessageInfo), RclrsError> { - let mut serialized_message = SerializedMessageBuffer::new(); + let mut serialized_message = SerializedMessageBuffer::new()?; let mut message_info = unsafe { rmw_get_zero_initialized_message_info() }; let rcl_subscription = &mut *self.handle.lock();