Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ mod qos;
mod service;
mod subscription;
mod time;
mod time_jumps;
mod time_source;
mod timer;
pub mod vendor;
Expand Down Expand Up @@ -224,6 +225,7 @@ pub use rcl_bindings::rmw_request_id_t;
pub use service::*;
pub use subscription::*;
pub use time::*;
pub use time_jumps::*;
use time_source::*;
pub use timer::*;
pub use wait_set::*;
Expand Down
63 changes: 54 additions & 9 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod node_graph_task;
use node_graph_task::*;

use std::{
any::Any,
cmp::PartialEq,
ffi::CStr,
fmt,
Expand All @@ -37,16 +38,17 @@ use crate::{
NodeDynamicSubscriptionCallback,
},
rcl_bindings::*,
time_jumps::ClockTimeJumpCallbackHandle,
ActionClient, ActionClientState, ActionGoalReceiver, ActionServer, ActionServerState,
AnyTimerCallback, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
IntoActionClientOptions, IntoActionServerOptions, IntoAsyncServiceCallback,
IntoAsyncSubscriptionCallback, IntoNodeServiceCallback, IntoNodeSubscriptionCallback,
IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, IntoTimerOptions, LogParams,
Logger, MessageInfo, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters,
Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, Service,
ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState,
TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, WorkerOptions, WorkerState,
ENTITY_LIFECYCLE_MUTEX,
AnyTimerCallback, Client, ClientOptions, ClientState, Clock, ClockTimeJumpCondition,
ContextHandle, ExecutorCommands, IntoActionClientOptions, IntoActionServerOptions,
IntoAsyncServiceCallback, IntoAsyncSubscriptionCallback, IntoNodeServiceCallback,
IntoNodeSubscriptionCallback, IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback,
IntoTimerOptions, LogParams, Logger, MessageInfo, ParameterBuilder, ParameterInterface,
ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError,
RequestedGoal, Service, ServiceOptions, ServiceState, Subscription, SubscriptionOptions,
SubscriptionState, TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker,
WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX,
};

/// A processing unit that can communicate with other nodes. See the API of
Expand Down Expand Up @@ -1332,6 +1334,49 @@ impl NodeState {
node,
)
}
/// Registers time jump callback that is scheduled on the executor.
///
/// # Example
/// ```
/// # use rclrs::*;
/// # use std::time::Duration;
///
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// let jump_handle = node.register_time_jump_callback(
/// ClockTimeJumpConditions::on_min_forward(Duration::from_secs_f32(1.5)),
/// {
/// let node = node.clone();
/// move |delta| {log!(node.info(), "Delta {:?}", delta);}
/// }
///
/// ).unwrap();
/// ```
/// The callback remains active as long as the jump handle is not dropped.
pub fn register_time_jump_callback<T: ClockTimeJumpCondition>(
&self,
jump_condition: T,
callback: impl Fn(T::CallbackParameter) + Send + Sync + 'static,
) -> Result<ClockTimeJumpCallbackHandle, RclrsError> {
let callback = Arc::new(callback);
let commands = Arc::downgrade(self.commands().async_worker_commands());

let schedule_jump_time_callback = move |time_jump: T::CallbackParameter| {
if let Some(commands) = commands.upgrade() {
let callback = Arc::clone(&callback);
let payload_task = move |_: &mut dyn Any| {
callback(time_jump);
};
commands.run_on_payload(Box::new(payload_task));
}
};

ClockTimeJumpCallbackHandle::new(
self.get_clock(),
jump_condition,
Box::new(schedule_jump_time_callback),
)
}

/// Returns the ROS domain ID that the node is using.
///
Expand Down
239 changes: 239 additions & 0 deletions rclrs/src/time_jumps.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
use crate::error::ToResult;
use crate::rcl_bindings::{rcl_clock_change_e, rcl_time_jump_t};
use crate::{rcl_bindings::rcl_jump_threshold_t, ENTITY_LIFECYCLE_MUTEX};
use std::time::Duration;
use std::{
ffi::c_void,
sync::{Arc, Weak},
};

use crate::{rcl_bindings, Clock, RclrsError};

type ClockTimeJumpCallback = dyn Fn(&rcl_bindings::rcl_time_jump_t) + Send + 'static + Sync;
unsafe extern "C" fn on_post_time_jump(
time_jump: *const rcl_bindings::rcl_time_jump_t,
before_jump: bool,
user_data: *mut c_void,
) {
if before_jump {
return;
}
let weak = &*(user_data as *const Weak<ClockTimeJumpCallback>);
let jump = &*time_jump;
if let Some(cb) = weak.upgrade() {
cb(jump);
}
}

/// Holding a reference to this handle keeps the jump callback alive
pub struct ClockTimeJumpCallbackHandle {
clock: Clock,
user_data: *mut c_void,
_cb: Arc<ClockTimeJumpCallback>,
}
unsafe impl Send for ClockTimeJumpCallbackHandle {}
unsafe impl Sync for ClockTimeJumpCallbackHandle {}

/// Condition for typed clock time jump callbacks.
/// ```
/// # use rclrs::*;
/// # use std::time::Duration;
///
/// ClockTimeJumpConditions::on_min_forward(Duration::from_secs(1));
/// ClockTimeJumpConditions::on_min_backward(Duration::from_secs(1));
/// ClockTimeJumpConditions::on_min_forward_or_backward(Duration::from_secs(1), Duration::from_secs(1));
/// ClockTimeJumpConditions::on_clock_change();
/// ```
/// See [`crate::ClockTimeCondition`]
pub trait ClockTimeJumpCondition: Send + Sync + 'static + std::fmt::Debug {
/// The jump payload depending on the jump event.
type CallbackParameter: Send + 'static + std::fmt::Debug;

/// Converts jump condition into rcl data type
fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t;

/// Converts rcl change payload into callback parameter
fn into_callback_param(data: &rcl_time_jump_t) -> Self::CallbackParameter;
}

#[derive(Debug)]
struct OnMinForwardJump(Duration);
#[derive(Debug)]
struct OnMinBackwardJump(Duration);
#[derive(Debug)]
struct OnMinForwardBackwardJump(Duration, Duration);
#[derive(Debug)]
struct OnClockChange;

impl ClockTimeJumpCondition for OnMinBackwardJump {
type CallbackParameter = i64;

fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t {
rcl_jump_threshold_t {
min_backward: rcl_bindings::rcl_duration_s {
nanoseconds: -(self.0.as_nanos() as i64),
},
min_forward: rcl_bindings::rcl_duration_s { nanoseconds: 0 },
on_clock_change: false,
}
}
fn into_callback_param(data: &rcl_time_jump_t) -> i64 {
data.delta.nanoseconds
}
}
impl ClockTimeJumpCondition for OnMinForwardJump {
type CallbackParameter = i64;

fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t {
rcl_jump_threshold_t {
min_backward: rcl_bindings::rcl_duration_s { nanoseconds: 0 },
min_forward: rcl_bindings::rcl_duration_s {
nanoseconds: self.0.as_nanos() as i64,
},
on_clock_change: false,
}
}
fn into_callback_param(data: &rcl_time_jump_t) -> i64 {
data.delta.nanoseconds
}
}
impl ClockTimeJumpCondition for OnMinForwardBackwardJump {
type CallbackParameter = i64;

fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t {
rcl_jump_threshold_t {
min_backward: rcl_bindings::rcl_duration_s {
nanoseconds: -(self.0.as_nanos() as i64),
},
min_forward: rcl_bindings::rcl_duration_s {
nanoseconds: self.1.as_nanos() as i64,
},
on_clock_change: false,
}
}
fn into_callback_param(data: &rcl_time_jump_t) -> i64 {
data.delta.nanoseconds
}
}
impl ClockTimeJumpCondition for OnClockChange {
type CallbackParameter = ClockChange;

fn into_rcl_jump_threshold(&self) -> rcl_jump_threshold_t {
rcl_jump_threshold_t {
min_backward: rcl_bindings::rcl_duration_s { nanoseconds: 0 },
min_forward: rcl_bindings::rcl_duration_s { nanoseconds: 0 },
on_clock_change: true,
}
}
fn into_callback_param(data: &rcl_time_jump_t) -> ClockChange {
match data.clock_change {
rcl_clock_change_e::RCL_ROS_TIME_ACTIVATED => ClockChange::RclRosTimeActivated,
rcl_clock_change_e::RCL_ROS_TIME_DEACTIVATED => ClockChange::RclRosTimeDeactivated,
_ => {
unreachable!()
}
}
}
}

/// Provides constructors for [`crate::ClockTimeJumpCondition`]s.
pub struct ClockTimeJumpConditions {}
impl ClockTimeJumpConditions {
/// Returns [`crate::ClockTimeJumpCondition`] for time jumps further forwards than given duration.
/// The callback accepts the time jump delta.
pub fn on_min_forward(
min_forward: Duration,
) -> impl ClockTimeJumpCondition<CallbackParameter = i64> {
OnMinForwardJump(min_forward)
}
/// Returns [`crate::ClockTimeJumpCondition`] for time jumps further backwards than given duration.
/// The callback accepts the time jump delta.
pub fn on_min_backward(
min_backward: Duration,
) -> impl ClockTimeJumpCondition<CallbackParameter = i64> {
OnMinBackwardJump(min_backward)
}
/// Returns [`crate::ClockTimeJumpCondition`] for time jumps further forwards/backwards than given durations.
/// The callback accepts the time jump delta.
pub fn on_min_forward_or_backward(
min_backward: Duration,
min_forward: Duration,
) -> impl ClockTimeJumpCondition<CallbackParameter = i64> {
OnMinForwardBackwardJump(min_backward, min_forward)
}

/// Returns [`crate::ClockTimeJumpCondition`] for ros time activation/deactivation.
pub fn on_clock_change() -> impl ClockTimeJumpCondition<CallbackParameter = ClockChange> {
OnClockChange {}
}
}

/// Represents a clock type change.
#[derive(Debug)]
pub enum ClockChange {
/// Triggered when RclRosTime is activated.
RclRosTimeActivated,
/// Triggered when RclRosTime is deactivated.
RclRosTimeDeactivated,
}

impl ClockTimeJumpCallbackHandle {
pub(crate) fn new<T: ClockTimeJumpCondition>(
clock: Clock,
jump_condition: T,
cb: impl Fn(T::CallbackParameter) + Send + Sync + 'static,
) -> Result<Self, RclrsError> {
let rcl_jump_condition = jump_condition.into_rcl_jump_threshold();
let cb: Arc<ClockTimeJumpCallback> = Arc::new(move |jump: &rcl_time_jump_t| {
cb(T::into_callback_param(jump));
});

let user_data = Box::into_raw(Box::new(Arc::downgrade(&cb))) as *mut c_void;
{
// register
let mut rcl_clock = clock.get_rcl_clock().lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
let rcl_ret = unsafe {
rcl_bindings::rcl_clock_add_jump_callback(
&mut *rcl_clock,
rcl_jump_condition,
Some(on_post_time_jump),
user_data,
)
.ok()
};
if let Err(err) = rcl_ret {
// Reclaim box
unsafe {
let _ = Box::from_raw(user_data as *mut Weak<ClockTimeJumpCallback>);
}
return Err(err);
}
}
let handle = ClockTimeJumpCallbackHandle {
clock,
user_data,
_cb: cb,
};
Ok(handle)
}
}

impl Drop for ClockTimeJumpCallbackHandle {
fn drop(&mut self) {
let mut rcl_clock = self.clock.get_rcl_clock().lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
// No clue what to do with this here. Logging in Drop is discouraged
// print error to stderr?
let _rcl_ret = unsafe {
let rcl_ret = rcl_bindings::rcl_clock_remove_jump_callback(
&mut *rcl_clock,
Some(on_post_time_jump),
self.user_data,
);
// Reclaim
let _ = Box::from_raw(self.user_data as *mut Weak<ClockTimeJumpCallback>);
rcl_ret.ok()
};
}
}
Loading
Loading