From e53bb26f5836ea1c174425d9e44ce7ec4fc6d409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Balthasar=20Sch=C3=BCss?= Date: Thu, 16 Apr 2026 17:25:36 +0200 Subject: [PATCH 1/2] JumpTimeCallbacks and separate worker for simulation time clock --- rclrs/src/lib.rs | 2 + rclrs/src/node.rs | 63 +++++++++-- rclrs/src/time_jumps.rs | 238 ++++++++++++++++++++++++++++++++++++++++ rclrs/src/worker.rs | 62 ++++++++++- 4 files changed, 350 insertions(+), 15 deletions(-) create mode 100644 rclrs/src/time_jumps.rs diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 9db4767f..79461804 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -195,6 +195,7 @@ mod qos; mod service; mod subscription; mod time; +mod time_jumps; mod time_source; mod timer; pub mod vendor; @@ -224,6 +225,7 @@ pub use rcl_bindings::rmw_request_id_t; pub use service::*; pub use subscription::*; pub use time::*; +pub use time_jumps::*; use time_source::*; pub use timer::*; pub use wait_set::*; diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 0583fb9d..8b2a81c1 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -11,6 +11,7 @@ mod node_graph_task; use node_graph_task::*; use std::{ + any::Any, cmp::PartialEq, ffi::CStr, fmt, @@ -37,16 +38,17 @@ use crate::{ NodeDynamicSubscriptionCallback, }, rcl_bindings::*, + time_jumps::ClockTimeJumpCallbackHandle, ActionClient, ActionClientState, ActionGoalReceiver, ActionServer, ActionServerState, - AnyTimerCallback, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands, - IntoActionClientOptions, IntoActionServerOptions, IntoAsyncServiceCallback, - IntoAsyncSubscriptionCallback, IntoNodeServiceCallback, IntoNodeSubscriptionCallback, - IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, IntoTimerOptions, LogParams, - Logger, MessageInfo, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, - Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, Service, - ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState, - TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, WorkerOptions, WorkerState, - ENTITY_LIFECYCLE_MUTEX, + AnyTimerCallback, Client, ClientOptions, ClientState, Clock, ClockTimeJumpCondition, + ContextHandle, ExecutorCommands, IntoActionClientOptions, IntoActionServerOptions, + IntoAsyncServiceCallback, IntoAsyncSubscriptionCallback, IntoNodeServiceCallback, + IntoNodeSubscriptionCallback, IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, + IntoTimerOptions, LogParams, Logger, MessageInfo, ParameterBuilder, ParameterInterface, + ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError, + RequestedGoal, Service, ServiceOptions, ServiceState, Subscription, SubscriptionOptions, + SubscriptionState, TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, + WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX, }; /// A processing unit that can communicate with other nodes. See the API of @@ -1332,6 +1334,49 @@ impl NodeState { node, ) } + /// Registers time jump callback that is scheduled on the executor. + /// + /// # Example + /// ``` + /// # use rclrs::*; + /// + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// # use std::time::Duration; + /// let jump_handle = node.register_time_jump_callback( + /// ClockTimeJumpConditions::on_min_forwards(Duration::from_secs_f32(1.5), + /// { + /// let node = node.clone(); + /// move |delta| {log!(node.info(), "Delta {:?}", delta);} + /// } + /// + /// ).unwrap(); + /// ``` + /// The callback remains active as long as the jump handle is not dropped. + pub fn register_time_jump_callback( + &self, + jump_condition: T, + callback: impl Fn(T::CallbackParameter) + Send + Sync + 'static, + ) -> Result { + let callback = Arc::new(callback); + let commands = Arc::downgrade(self.commands().async_worker_commands()); + + let schedule_jump_time_callback = move |time_jump: T::CallbackParameter| { + if let Some(commands) = commands.upgrade() { + let callback = Arc::clone(&callback); + let payload_task = move |_: &mut dyn Any| { + callback(time_jump); + }; + commands.run_on_payload(Box::new(payload_task)); + } + }; + + ClockTimeJumpCallbackHandle::new( + self.get_clock(), + jump_condition, + Box::new(schedule_jump_time_callback), + ) + } /// Returns the ROS domain ID that the node is using. /// diff --git a/rclrs/src/time_jumps.rs b/rclrs/src/time_jumps.rs new file mode 100644 index 00000000..0151c80b --- /dev/null +++ b/rclrs/src/time_jumps.rs @@ -0,0 +1,238 @@ +use crate::error::ToResult; +use crate::rcl_bindings::{rcl_clock_change_e, rcl_time_jump_t}; +use crate::{rcl_bindings::rcl_jump_threshold_t, ENTITY_LIFECYCLE_MUTEX}; +use std::time::Duration; +use std::{ + ffi::c_void, + sync::{Arc, Weak}, +}; + +use crate::{rcl_bindings, Clock, RclrsError}; + +type ClockTimeJumpCallback = dyn Fn(&rcl_bindings::rcl_time_jump_t) + Send + 'static + Sync; +unsafe extern "C" fn on_post_time_jump( + time_jump: *const rcl_bindings::rcl_time_jump_t, + before_jump: bool, + user_data: *mut c_void, +) { + if before_jump { + return; + } + let weak = &*(user_data as *const Weak); + let jump = &*time_jump; + if let Some(cb) = weak.upgrade() { + cb(jump); + } +} + +/// Holding a reference to this handle keeps the jump callback alive +pub struct ClockTimeJumpCallbackHandle { + clock: Clock, + user_data: *mut c_void, + _cb: Arc, +} +unsafe impl Send for ClockTimeJumpCallbackHandle {} +unsafe impl Sync for ClockTimeJumpCallbackHandle {} + +/// Condition for typed clock time jump callbacks. +/// ``` +/// # use rclrs::*; +/// +/// # ClockTimeJumpConditions::on_min_forward(Duration::as_secs(1)) +/// # ClockTimeJumpConditions::on_min_backward(Duration::as_secs(1)) +/// # ClockTimeJumpConditions::on_min_forward_or_backward(Duration::as_secs(1), Duration::as_secs(1)) +/// # ClockTimeJumpConditions::on_clock_change() +/// ``` +/// See [`crate::ClockTimeCondition`] +pub trait ClockTimeJumpCondition: Send + Sync + 'static + std::fmt::Debug { + /// The jump payload depending on the jump event. + type CallbackParameter: Send + 'static + std::fmt::Debug; + + /// Converts jump condition into rcl data type + fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t; + + /// Converts rcl change payload into callback parameter + fn into_callback_param(data: &rcl_time_jump_t) -> Self::CallbackParameter; +} + +#[derive(Debug)] +struct OnMinForwardJump(Duration); +#[derive(Debug)] +struct OnMinBackwardJump(Duration); +#[derive(Debug)] +struct OnMinForwardBackwardJump(Duration, Duration); +#[derive(Debug)] +struct OnClockChange; + +impl ClockTimeJumpCondition for OnMinBackwardJump { + type CallbackParameter = i64; + + fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t { + rcl_jump_threshold_t { + min_backward: rcl_bindings::rcl_duration_s { + nanoseconds: -(self.0.as_nanos() as i64), + }, + min_forward: rcl_bindings::rcl_duration_s { nanoseconds: 0 }, + on_clock_change: false, + } + } + fn into_callback_param(data: &rcl_time_jump_t) -> i64 { + data.delta.nanoseconds + } +} +impl ClockTimeJumpCondition for OnMinForwardJump { + type CallbackParameter = i64; + + fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t { + rcl_jump_threshold_t { + min_backward: rcl_bindings::rcl_duration_s { nanoseconds: 0 }, + min_forward: rcl_bindings::rcl_duration_s { + nanoseconds: self.0.as_nanos() as i64, + }, + on_clock_change: false, + } + } + fn into_callback_param(data: &rcl_time_jump_t) -> i64 { + data.delta.nanoseconds + } +} +impl ClockTimeJumpCondition for OnMinForwardBackwardJump { + type CallbackParameter = i64; + + fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t { + rcl_jump_threshold_t { + min_backward: rcl_bindings::rcl_duration_s { + nanoseconds: -(self.0.as_nanos() as i64), + }, + min_forward: rcl_bindings::rcl_duration_s { + nanoseconds: self.1.as_nanos() as i64, + }, + on_clock_change: false, + } + } + fn into_callback_param(data: &rcl_time_jump_t) -> i64 { + data.delta.nanoseconds + } +} +impl ClockTimeJumpCondition for OnClockChange { + type CallbackParameter = ClockChange; + + fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t { + rcl_jump_threshold_t { + min_backward: rcl_bindings::rcl_duration_s { nanoseconds: 0 }, + min_forward: rcl_bindings::rcl_duration_s { nanoseconds: 0 }, + on_clock_change: true, + } + } + fn into_callback_param(data: &rcl_time_jump_t) -> ClockChange { + match data.clock_change { + rcl_clock_change_e::RCL_ROS_TIME_ACTIVATED => ClockChange::RclRosTimeActivated, + rcl_clock_change_e::RCL_ROS_TIME_DEACTIVATED => ClockChange::RclRosTimeDeactivated, + _ => { + unreachable!() + } + } + } +} + +/// Provides constructors for [`crate::ClockTimeJumpCondition`]s. +pub struct ClockTimeJumpConditions {} +impl ClockTimeJumpConditions { + /// Returns [`crate::ClockTimeJumpCondition`] for time jumps further forwards than given duration. + /// The callback accepts the time jump delta. + pub fn on_min_forward( + min_forward: Duration, + ) -> impl ClockTimeJumpCondition { + OnMinForwardJump(min_forward) + } + /// Returns [`crate::ClockTimeJumpCondition`] for time jumps further backwards than given duration. + /// The callback accepts the time jump delta. + pub fn on_min_backward( + min_backward: Duration, + ) -> impl ClockTimeJumpCondition { + OnMinBackwardJump(min_backward) + } + /// Returns [`crate::ClockTimeJumpCondition`] for time jumps further forwards/backwards than given durations. + /// The callback accepts the time jump delta. + pub fn on_min_forward_or_backward( + min_backward: Duration, + min_forward: Duration, + ) -> impl ClockTimeJumpCondition { + OnMinForwardBackwardJump(min_backward, min_forward) + } + + /// Returns [`crate::ClockTimeJumpCondition`] for ros time activation/deactivation. + pub fn on_clock_change() -> impl ClockTimeJumpCondition { + OnClockChange {} + } +} + +/// Represents a clock type change. +#[derive(Debug)] +pub enum ClockChange { + /// Triggered when RclRosTime is activated. + RclRosTimeActivated, + /// Triggered when RclRosTime is deactivated. + RclRosTimeDeactivated, +} + +impl ClockTimeJumpCallbackHandle { + pub(crate) fn new( + clock: Clock, + jump_condition: T, + cb: impl Fn(T::CallbackParameter) + Send + Sync + 'static, + ) -> Result { + let rcl_jump_condition = jump_condition.into_rcl_jump_threshold(); + let cb: Arc = Arc::new(move |jump: &rcl_time_jump_t| { + cb(T::into_callback_param(jump)); + }); + + let user_data = Box::into_raw(Box::new(Arc::downgrade(&cb))) as *mut c_void; + { + // register + let mut rcl_clock = clock.get_rcl_clock().lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + let rcl_ret = unsafe { + rcl_bindings::rcl_clock_add_jump_callback( + &mut *rcl_clock, + rcl_jump_condition, + Some(on_post_time_jump), + user_data, + ) + .ok() + }; + if let Err(err) = rcl_ret { + // Reclaim box + unsafe { + let _ = Box::from_raw(user_data as *mut Weak); + } + return Err(err); + } + } + let handle = ClockTimeJumpCallbackHandle { + clock, + user_data, + _cb: cb, + }; + Ok(handle) + } +} + +impl Drop for ClockTimeJumpCallbackHandle { + fn drop(&mut self) { + let mut rcl_clock = self.clock.get_rcl_clock().lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + // No clue what to do with this here. Logging in Drop is discouraged + // print error to stderr? + let _rcl_ret = unsafe { + let rcl_ret = rcl_bindings::rcl_clock_remove_jump_callback( + &mut *rcl_clock, + Some(on_post_time_jump), + self.user_data, + ); + // Reclaim + let _ = Box::from_raw(self.user_data as *mut Weak); + rcl_ret.ok() + }; + } +} diff --git a/rclrs/src/worker.rs b/rclrs/src/worker.rs index 73a3b09d..5daea748 100644 --- a/rclrs/src/worker.rs +++ b/rclrs/src/worker.rs @@ -1,13 +1,15 @@ +use crate::ClockTimeJumpCondition; use crate::{ dynamic_message::{ DynamicMessage, DynamicSubscriptionState, MessageTypeName, WorkerDynamicSubscription, WorkerDynamicSubscriptionCallback, }, - log_fatal, AnyTimerCallback, IntoTimerOptions, IntoWorkerServiceCallback, - IntoWorkerSubscriptionCallback, IntoWorkerTimerOneshotCallback, - IntoWorkerTimerRepeatingCallback, MessageInfo, Node, Promise, RclrsError, ServiceOptions, - ServiceState, SubscriptionOptions, SubscriptionState, TimerState, WorkerCommands, - WorkerService, WorkerSubscription, WorkerTimer, + log_fatal, + time_jumps::ClockTimeJumpCallbackHandle, + AnyTimerCallback, IntoTimerOptions, IntoWorkerServiceCallback, IntoWorkerSubscriptionCallback, + IntoWorkerTimerOneshotCallback, IntoWorkerTimerRepeatingCallback, MessageInfo, Node, Promise, + RclrsError, ServiceOptions, ServiceState, SubscriptionOptions, SubscriptionState, TimerState, + WorkerCommands, WorkerService, WorkerSubscription, WorkerTimer, }; use futures::channel::oneshot; use rosidl_runtime_rs::{Message, Service as ServiceIDL}; @@ -529,7 +531,7 @@ impl WorkerState { /// Since the callback of this timer may block other callbacks from being /// able to run, it is strongly recommended to ensure that the callback /// returns quickly. If the callback needs to trigger long-running behavior - /// then you can condier using `std::thread::spawn`, or for async behaviors + /// then you can consider using `std::thread::spawn`, or for async behaviors /// you can capture an [`ExecutorCommands`][1] in your callback and use /// [`ExecutorCommands::run`][2] to issue a task for the executor to run in /// its async task pool. @@ -627,6 +629,54 @@ impl WorkerState { ) } + /// Registers time jump callback that is scheduled on the executor with synchronized payload access. + /// + /// # Example + /// ``` + /// # use rclrs::*; + /// + /// # let executor = Context::default().create_basic_executor(); + /// # let node = executor.create_node("my_node").unwrap(); + /// # let payload = "my data".to_string(); + /// # let worker = node.create_worker::(payload); + /// # use std::time::Duration; + /// let jump_handle = node.register_time_jump_callback( + /// ClockTimeJumpConditions::on_min_forwards(Duration::from_secs_f32(1.5), + /// { + /// let node = node.clone(); + /// move |payload, time_jump| { + /// payload.push_str(" modified"); + /// log!(node.info(), "Delta: {:?} with payload {:?}", time_jump, payload); + /// } + /// } + /// ).unwrap(); + /// ``` + /// The callback remains active as long as the jump handle is not dropped. + pub fn register_time_jump_callback( + &self, + jump_condition: T, + callback: impl Fn(&mut Payload, T::CallbackParameter) + Send + Sync + 'static, + ) -> Result { + let callback = Arc::new(callback); + let commands = Arc::downgrade(&self.commands); + let schedule_jump_time_callback = move |time_jump: T::CallbackParameter| { + if let Some(commands) = commands.upgrade() { + let callback = Arc::clone(&callback); + let payload_task = move |any_payload: &mut dyn Any| { + // SAFETY: This should be safe, because the WorkerCommands should have the correct Payload. + let payload = any_payload.downcast_mut::().unwrap(); + callback(payload, time_jump); + }; + commands.run_on_payload(Box::new(payload_task)); + } + }; + ClockTimeJumpCallbackHandle::new( + self.node.get_clock(), + jump_condition, + Box::new(schedule_jump_time_callback), + ) + } + /// Used by [`Node`][crate::Node] to create a `WorkerState`. Users should /// call [`Node::create_worker`][crate::NodeState::create_worker] instead of /// this. From 4f2a8b072e7ad97e1c79415a31868d00fd07b2b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Balthasar=20Sch=C3=BCss?= Date: Mon, 20 Apr 2026 10:18:24 +0200 Subject: [PATCH 2/2] Fix doctest --- rclrs/src/node.rs | 4 ++-- rclrs/src/time_jumps.rs | 9 +++++---- rclrs/src/worker.rs | 8 ++++---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 8b2a81c1..ce05b1de 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -1339,12 +1339,12 @@ impl NodeState { /// # Example /// ``` /// # use rclrs::*; + /// # use std::time::Duration; /// /// # let executor = Context::default().create_basic_executor(); /// # let node = executor.create_node("my_node").unwrap(); - /// # use std::time::Duration; /// let jump_handle = node.register_time_jump_callback( - /// ClockTimeJumpConditions::on_min_forwards(Duration::from_secs_f32(1.5), + /// ClockTimeJumpConditions::on_min_forward(Duration::from_secs_f32(1.5)), /// { /// let node = node.clone(); /// move |delta| {log!(node.info(), "Delta {:?}", delta);} diff --git a/rclrs/src/time_jumps.rs b/rclrs/src/time_jumps.rs index 0151c80b..049a306b 100644 --- a/rclrs/src/time_jumps.rs +++ b/rclrs/src/time_jumps.rs @@ -37,11 +37,12 @@ unsafe impl Sync for ClockTimeJumpCallbackHandle {} /// Condition for typed clock time jump callbacks. /// ``` /// # use rclrs::*; +/// # use std::time::Duration; /// -/// # ClockTimeJumpConditions::on_min_forward(Duration::as_secs(1)) -/// # ClockTimeJumpConditions::on_min_backward(Duration::as_secs(1)) -/// # ClockTimeJumpConditions::on_min_forward_or_backward(Duration::as_secs(1), Duration::as_secs(1)) -/// # ClockTimeJumpConditions::on_clock_change() +/// ClockTimeJumpConditions::on_min_forward(Duration::from_secs(1)); +/// ClockTimeJumpConditions::on_min_backward(Duration::from_secs(1)); +/// ClockTimeJumpConditions::on_min_forward_or_backward(Duration::from_secs(1), Duration::from_secs(1)); +/// ClockTimeJumpConditions::on_clock_change(); /// ``` /// See [`crate::ClockTimeCondition`] pub trait ClockTimeJumpCondition: Send + Sync + 'static + std::fmt::Debug { diff --git a/rclrs/src/worker.rs b/rclrs/src/worker.rs index 5daea748..2145516a 100644 --- a/rclrs/src/worker.rs +++ b/rclrs/src/worker.rs @@ -634,14 +634,14 @@ impl WorkerState { /// # Example /// ``` /// # use rclrs::*; + /// # use std::time::Duration; /// /// # let executor = Context::default().create_basic_executor(); /// # let node = executor.create_node("my_node").unwrap(); /// # let payload = "my data".to_string(); - /// # let worker = node.create_worker::(payload); - /// # use std::time::Duration; - /// let jump_handle = node.register_time_jump_callback( - /// ClockTimeJumpConditions::on_min_forwards(Duration::from_secs_f32(1.5), + /// let worker = node.create_worker::(payload); + /// let jump_handle = worker.register_time_jump_callback( + /// ClockTimeJumpConditions::on_min_forward(Duration::from_secs_f32(1.5)), /// { /// let node = node.clone(); /// move |payload, time_jump| {