diff --git a/examples/rclrs_timer_demo/Cargo.toml b/examples/rclrs_timer_demo/Cargo.toml new file mode 100644 index 000000000..772d635be --- /dev/null +++ b/examples/rclrs_timer_demo/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "rclrs_timer_demo" +version = "0.1.0" +edition = "2021" + +[[bin]] +name="rclrs_timer_demo" +path="src/rclrs_timer_demo.rs" + + +[dependencies] +rclrs = "*" diff --git a/examples/rclrs_timer_demo/package.xml b/examples/rclrs_timer_demo/package.xml new file mode 100644 index 000000000..64e673704 --- /dev/null +++ b/examples/rclrs_timer_demo/package.xml @@ -0,0 +1,13 @@ + + rclrs_timer_demo + 0.1.0 + Shows how to implement a timer within a Node using rclrs. + user + TODO: License declaration. + + rclrs + + + ament_cargo + + diff --git a/examples/rclrs_timer_demo/src/rclrs_timer_demo.rs b/examples/rclrs_timer_demo/src/rclrs_timer_demo.rs new file mode 100644 index 000000000..279858be3 --- /dev/null +++ b/examples/rclrs_timer_demo/src/rclrs_timer_demo.rs @@ -0,0 +1,48 @@ +/// Creates a SimpleTimerNode, initializes a node and the timer with a callback +/// that prints the timer callback execution iteration. The callback is executed +/// thanks to the spin, which is in charge of executing the timer's events among +/// other entities' events. +use rclrs::{create_node, Context, Node, RclrsError, Timer}; +use std::{ + env, + sync::{Arc, Mutex}, +}; + +/// Contains both the node and timer. +struct SimpleTimerNode { + node: Arc, + timer: Arc, +} + +impl SimpleTimerNode { + /// Creates a node and a timer with a callback. + /// + /// The callback will simply print to stdout: + /// "Drinking 🧉 for the xth time every p nanoseconds." + /// where x is the iteration callback counter and p is the period of the timer. + fn new(context: &Context, timer_period_ns: i64) -> Result { + let node = create_node(context, "simple_timer_node")?; + let count: Arc> = Arc::new(Mutex::new(0)); + let timer = node.create_timer( + timer_period_ns, + context, + Some(Box::new(move |_| { + let x = *count.lock().unwrap(); + println!( + "Drinking 🧉 for the {}th time every {} nanoseconds.", + x, timer_period_ns + ); + *count.lock().unwrap() = x + 1; + })), + None, + )?; + Ok(Self { node, timer }) + } +} + +fn main() -> Result<(), RclrsError> { + let timer_period: i64 = 1e9 as i64; // 1 seconds. + let context = Context::new(env::args()).unwrap(); + let simple_timer_node = Arc::new(SimpleTimerNode::new(&context, timer_period).unwrap()); + rclrs::spin(simple_timer_node.node.clone()) +} diff --git a/rclrs/src/clock.rs b/rclrs/src/clock.rs index f7c085e14..8182d37ae 100644 --- a/rclrs/src/clock.rs +++ b/rclrs/src/clock.rs @@ -83,6 +83,11 @@ impl Clock { } } + /// Returns the clock's `rcl_clock_t`. + pub(crate) fn get_rcl_clock(&self) -> &Arc> { + &self.rcl_clock + } + /// Returns the clock's `ClockType`. pub fn clock_type(&self) -> ClockType { self.kind diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 37c43a68e..bb795a96e 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -48,7 +48,11 @@ impl SingleThreadedExecutor { }) { let wait_set = WaitSet::new_for_node(&node)?; - let ready_entities = wait_set.wait(timeout)?; + let mut ready_entities = wait_set.wait(timeout)?; + + for ready_timer in ready_entities.timers.iter_mut() { + ready_timer.execute()?; + } for ready_subscription in ready_entities.subscriptions { ready_subscription.execute()?; diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 3a22c6da8..ddb3e9c3f 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -20,6 +20,7 @@ mod service; mod subscription; mod time; mod time_source; +mod timer; mod vendor; mod wait; @@ -49,6 +50,7 @@ pub use service::*; pub use subscription::*; pub use time::*; use time_source::*; +pub use timer::*; pub use wait::*; /// Polls the node for new messages and executes the corresponding callbacks. diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index b51b59817..5df9c057d 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -16,7 +16,7 @@ use crate::{ rcl_bindings::*, Client, ClientBase, Clock, Context, ContextHandle, GuardCondition, LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, - SubscriptionCallback, TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX, + SubscriptionCallback, TimeSource, Timer, TimerCallback, ToLogParams, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -63,6 +63,7 @@ pub struct Node { pub(crate) guard_conditions_mtx: Mutex>>, pub(crate) services_mtx: Mutex>>, pub(crate) subscriptions_mtx: Mutex>>, + pub(crate) timers_mtx: Mutex>>, time_source: TimeSource, parameter: ParameterInterface, pub(crate) handle: Arc, @@ -340,6 +341,30 @@ impl Node { Ok(subscription) } + /// Creates a [`Timer`][1]. + /// + /// [1]: crate::Timer + /// TODO: make timer's lifetime depend on node's lifetime. + pub fn create_timer( + &self, + period_ns: i64, + context: &Context, + callback: Option, + clock: Option, + ) -> Result, RclrsError> { + let clock_used = match clock { + Some(value) => value, + None => self.get_clock(), + }; + let timer = Timer::new(&clock_used, &context, period_ns, callback)?; + let timer = Arc::new(timer); + self.timers_mtx + .lock() + .unwrap() + .push(Arc::downgrade(&timer) as Weak); + Ok(timer) + } + /// Returns the subscriptions that have not been dropped yet. pub(crate) fn live_subscriptions(&self) -> Vec> { { self.subscriptions_mtx.lock().unwrap() } @@ -369,6 +394,13 @@ impl Node { .collect() } + pub(crate) fn live_timers(&self) -> Vec> { + { self.timers_mtx.lock().unwrap() } + .iter() + .filter_map(Weak::upgrade) + .collect() + } + /// Returns the ROS domain ID that the node is using. /// /// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1]. @@ -551,6 +583,21 @@ mod tests { Ok(()) } + #[test] + fn test_create_timer_without_clock_source() -> Result<(), RclrsError> { + let timer_period_ns: i64 = 1e6 as i64; // 1 millisecond. + let context = Context::new([])?; + let dut = NodeBuilder::new(&context, "node_with_timer") + .namespace("test_create_timer") + .build()?; + + let _timer = + dut.create_timer(timer_period_ns, &context, Some(Box::new(move |_| {})), None)?; + assert_eq!(dut.live_timers().len(), 1); + + Ok(()) + } + #[test] fn test_logger_name() -> Result<(), RclrsError> { // Use helper to create 2 nodes for us diff --git a/rclrs/src/node/builder.rs b/rclrs/src/node/builder.rs index 1e7a9fc63..4aec2686d 100644 --- a/rclrs/src/node/builder.rs +++ b/rclrs/src/node/builder.rs @@ -340,6 +340,7 @@ impl NodeBuilder { guard_conditions_mtx: Mutex::new(vec![]), services_mtx: Mutex::new(vec![]), subscriptions_mtx: Mutex::new(vec![]), + timers_mtx: Mutex::new(vec![]), time_source: TimeSource::builder(self.clock_type) .clock_qos(self.clock_qos) .build(), diff --git a/rclrs/src/rcl_bindings.rs b/rclrs/src/rcl_bindings.rs index 90f434009..dbfb5d5b0 100644 --- a/rclrs/src/rcl_bindings.rs +++ b/rclrs/src/rcl_bindings.rs @@ -89,6 +89,10 @@ cfg_if::cfg_if! { #[derive(Debug)] pub struct rcl_wait_set_t; + #[repr(C)] + #[derive(Debug)] + pub struct rcl_timer_t; + #[repr(C)] #[derive(Debug)] pub struct rcutils_string_array_t; diff --git a/rclrs/src/timer.rs b/rclrs/src/timer.rs new file mode 100644 index 000000000..fb5f761a7 --- /dev/null +++ b/rclrs/src/timer.rs @@ -0,0 +1,398 @@ +use crate::{clock::Clock, context::Context, error::RclrsError, rcl_bindings::*, to_rclrs_result}; +// TODO: fix me when the callback type is properly defined. +// use std::fmt::Debug; +use std::sync::{atomic::AtomicBool, Arc, Mutex}; + +/// Type alias for the `Timer` callback. +pub type TimerCallback = Box; + +/// Struct for executing periodic events. +/// +/// The execution of the callbacks is tied to [`spin_once`][1] or [`spin`][2] on the timers's node. +/// +/// Timer can be created via [`Node::create_timer()`][3], this is to ensure that [`Node`][4]s can +/// track all the timers that have been created. However, a user of a `Timer` can also +/// use it standalone. +/// +/// [1]: crate::spin_once +/// [2]: crate::spin +/// [3]: crate::Node::create_timer +/// [4]: crate::Node +// TODO: callback type prevents us from making the Timer implement the Debug trait. +// #[derive(Debug)] +pub struct Timer { + pub(crate) rcl_timer: Arc>, + /// The callback function that runs when the timer is due. + callback: Option, + pub(crate) in_use_by_wait_set: Arc, +} + +impl Timer { + /// Creates a new timer. + pub fn new( + clock: &Clock, + context: &Context, + period: i64, + callback: Option, + ) -> Result { + let mut rcl_timer; + let timer_init_result = unsafe { + // SAFETY: Getting a default value is always safe. + rcl_timer = rcl_get_zero_initialized_timer(); + let mut rcl_clock = clock.get_rcl_clock().lock().unwrap(); + let allocator = rcutils_get_default_allocator(); + let mut rcl_context = context.handle.rcl_context.lock().unwrap(); + // Callbacks will be handled in the WaitSet. + let rcl_timer_callback: rcl_timer_callback_t = None; + // Function will return Err(_) only if there isn't enough memory to allocate a clock + // object. + rcl_timer_init( + &mut rcl_timer, + &mut *rcl_clock, + &mut *rcl_context, + period, + rcl_timer_callback, + allocator, + ) + }; + to_rclrs_result(timer_init_result).map(|_| Timer { + rcl_timer: Arc::new(Mutex::new(rcl_timer)), + callback, + in_use_by_wait_set: Arc::new(AtomicBool::new(false)), + }) + } + + /// Gets the period of the timer in nanoseconds + pub fn get_timer_period_ns(&self) -> Result { + let mut timer_period_ns = 0; + let get_period_result = unsafe { + let rcl_timer = self.rcl_timer.lock().unwrap(); + rcl_timer_get_period(&*rcl_timer, &mut timer_period_ns) + }; + to_rclrs_result(get_period_result).map(|_| timer_period_ns) + } + + /// Cancels the timer, stopping the execution of the callback + pub fn cancel(&self) -> Result<(), RclrsError> { + let mut rcl_timer = self.rcl_timer.lock().unwrap(); + let cancel_result = unsafe { rcl_timer_cancel(&mut *rcl_timer) }; + to_rclrs_result(cancel_result) + } + + /// Checks whether the timer is canceled or not + pub fn is_canceled(&self) -> Result { + let mut is_canceled = false; + let is_canceled_result = unsafe { + let rcl_timer = self.rcl_timer.lock().unwrap(); + rcl_timer_is_canceled(&*rcl_timer, &mut is_canceled) + }; + to_rclrs_result(is_canceled_result).map(|_| is_canceled) + } + + /// Retrieves the time since the last call to the callback + pub fn time_since_last_call(&self) -> Result { + let mut time_value_ns: i64 = 0; + let time_since_last_call_result = unsafe { + let rcl_timer = self.rcl_timer.lock().unwrap(); + rcl_timer_get_time_since_last_call(&*rcl_timer, &mut time_value_ns) + }; + to_rclrs_result(time_since_last_call_result).map(|_| time_value_ns) + } + + /// Retrieves the time until the next call of the callback + pub fn time_until_next_call(&self) -> Result { + let mut time_value_ns: i64 = 0; + let time_until_next_call_result = unsafe { + let rcl_timer = self.rcl_timer.lock().unwrap(); + rcl_timer_get_time_until_next_call(&*rcl_timer, &mut time_value_ns) + }; + to_rclrs_result(time_until_next_call_result).map(|_| time_value_ns) + } + + /// Resets the timer. + pub fn reset(&self) -> Result<(), RclrsError> { + let mut rcl_timer = self.rcl_timer.lock().unwrap(); + to_rclrs_result(unsafe { rcl_timer_reset(&mut *rcl_timer) }) + } + + /// Executes the callback of the timer (this is triggered by the executor or the node directly) + pub fn call(&self) -> Result<(), RclrsError> { + let mut rcl_timer = self.rcl_timer.lock().unwrap(); + to_rclrs_result(unsafe { rcl_timer_call(&mut *rcl_timer) }) + } + + /// Checks if the timer is ready (not canceled) + pub fn is_ready(&self) -> Result { + let (is_ready, is_ready_result) = unsafe { + let mut is_ready: bool = false; + let rcl_timer = self.rcl_timer.lock().unwrap(); + let is_ready_result = rcl_timer_is_ready(&*rcl_timer, &mut is_ready); + (is_ready, is_ready_result) + }; + to_rclrs_result(is_ready_result).map(|_| is_ready) + } + + pub(crate) fn execute(&self) -> Result<(), RclrsError> { + if self.is_ready()? { + let time_since_last_call = self.time_since_last_call()?; + self.call()?; + if let Some(ref callback) = self.callback { + callback(time_since_last_call); + } + } + Ok(()) + } +} + +/// 'Drop' trait implementation to be able to release the resources +impl Drop for rcl_timer_t { + fn drop(&mut self) { + // SAFETY: No preconditions for this function + let rc = unsafe { rcl_timer_fini(&mut *self) }; + if let Err(e) = to_rclrs_result(rc) { + panic!("Unable to release Timer. {:?}", e) + } + } +} + +impl PartialEq for Timer { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.rcl_timer, &other.rcl_timer) + } +} + +// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread +// they are running in. Therefore, this type can be safely sent to another thread. +unsafe impl Send for rcl_timer_t {} + +#[cfg(test)] +mod tests { + use super::*; + use std::{thread, time}; + + fn create_dummy_callback() -> Option { + Some(Box::new(move |_| {})) + } + + #[test] + fn traits() { + use crate::test_helpers::*; + + assert_send::(); + assert_sync::(); + } + + #[test] + fn test_new_with_system_clock() { + let clock = Clock::system(); + let context = Context::new(vec![]).unwrap(); + let period: i64 = 1e6 as i64; // 1 milliseconds. + let dut = Timer::new(&clock, &context, period, create_dummy_callback()); + assert!(dut.is_ok()); + } + + #[test] + fn test_new_with_steady_clock() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period: i64 = 1e6 as i64; // 1 milliseconds. + let dut = Timer::new(&clock, &context, period, create_dummy_callback()); + assert!(dut.is_ok()); + } + + #[ignore = "SIGSEGV when creating the timer with Clock::with_source()."] + #[test] + fn test_new_with_source_clock() { + let (clock, source) = Clock::with_source(); + // No manual time set, it should default to 0 + assert!(clock.now().nsec == 0); + let set_time = 1234i64; + source.set_ros_time_override(set_time); + // Ros time is set, should return the value that was set + assert_eq!(clock.now().nsec, set_time); + let context = Context::new(vec![]).unwrap(); + let period: i64 = 1e6 as i64; // 1 milliseconds.. + let dut = Timer::new(&clock, &context, period, create_dummy_callback()); + assert!(dut.is_ok()); + } + + #[test] + fn test_get_period() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period: i64 = 1e6 as i64; // 1 milliseconds. + let dut = Timer::new(&clock, &context, period, create_dummy_callback()); + assert!(dut.is_ok()); + let dut = dut.unwrap(); + let period_result = dut.get_timer_period_ns(); + assert!(period_result.is_ok()); + let period_result = period_result.unwrap(); + assert_eq!(period_result, 1e6 as i64); + } + + #[test] + fn test_cancel() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period: i64 = 1e6 as i64; // 1 milliseconds. + let dut = Timer::new(&clock, &context, period, create_dummy_callback()); + assert!(dut.is_ok()); + let dut = dut.unwrap(); + assert!(dut.is_canceled().is_ok()); + assert!(!dut.is_canceled().unwrap()); + let cancel_result = dut.cancel(); + assert!(cancel_result.is_ok()); + assert!(dut.is_canceled().is_ok()); + assert!(dut.is_canceled().unwrap()); + } + + #[test] + fn test_time_since_last_call_before_first_event() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 2e6 as i64; // 2 milliseconds. + let sleep_period_ms = time::Duration::from_millis(1); + let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()); + assert!(dut.is_ok()); + let dut = dut.unwrap(); + thread::sleep(sleep_period_ms); + let time_since_last_call = dut.time_since_last_call(); + assert!(time_since_last_call.is_ok()); + let time_since_last_call = time_since_last_call.unwrap(); + assert!( + time_since_last_call > 9e5 as i64, + "time_since_last_call: {}", + time_since_last_call + ); + } + + #[test] + fn test_time_until_next_call_before_first_event() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 2e6 as i64; // 2 milliseconds. + let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()); + assert!(dut.is_ok()); + let dut = dut.unwrap(); + let time_until_next_call = dut.time_until_next_call(); + assert!(time_until_next_call.is_ok()); + let time_until_next_call = time_until_next_call.unwrap(); + assert!( + time_until_next_call < period_ns, + "time_until_next_call: {}", + time_until_next_call + ); + } + + #[test] + fn test_reset() { + let tolerance = 20e4 as i64; + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 2e6 as i64; // 2 milliseconds. + let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap(); + let elapsed = period_ns - dut.time_until_next_call().unwrap(); + assert!(elapsed < tolerance, "elapsed before reset: {}", elapsed); + thread::sleep(time::Duration::from_millis(1)); + assert!(dut.reset().is_ok()); + let elapsed = period_ns - dut.time_until_next_call().unwrap(); + assert!(elapsed < tolerance, "elapsed after reset: {}", elapsed); + } + + #[test] + fn test_call() { + let tolerance = 20e4 as i64; + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 1e6 as i64; // 1 millisecond. + let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap(); + let elapsed = period_ns - dut.time_until_next_call().unwrap(); + assert!(elapsed < tolerance, "elapsed before reset: {}", elapsed); + thread::sleep(time::Duration::from_micros(1500)); + let elapsed = period_ns - dut.time_until_next_call().unwrap(); + assert!( + elapsed > 1500000i64, + "time_until_next_call before call: {}", + elapsed + ); + assert!(dut.call().is_ok()); + let elapsed = dut.time_until_next_call().unwrap(); + assert!( + elapsed < 500000i64, + "time_until_next_call after call: {}", + elapsed + ); + } + + #[test] + fn test_is_ready() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 1e6 as i64; // 1 millisecond. + let dut = Timer::new(&clock, &context, period_ns, create_dummy_callback()).unwrap(); + let is_ready = dut.is_ready(); + assert!(is_ready.is_ok()); + assert!(!is_ready.unwrap()); + thread::sleep(time::Duration::from_micros(1100)); + let is_ready = dut.is_ready(); + assert!(is_ready.is_ok()); + assert!(is_ready.unwrap()); + } + + #[test] + fn test_callback() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 1e6 as i64; // 1 millisecond. + let foo = Arc::new(Mutex::new(0i64)); + let foo_callback = foo.clone(); + let dut = Timer::new( + &clock, + &context, + period_ns, + Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)), + ) + .unwrap(); + dut.callback.unwrap()(123); + assert_eq!(*foo.lock().unwrap(), 123); + } + + #[test] + fn test_execute_when_is_not_ready() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 1e6 as i64; // 1 millisecond. + let foo = Arc::new(Mutex::new(0i64)); + let foo_callback = foo.clone(); + let dut = Timer::new( + &clock, + &context, + period_ns, + Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)), + ) + .unwrap(); + assert!(dut.execute().is_ok()); + assert_eq!(*foo.lock().unwrap(), 0i64); + } + + #[test] + fn test_execute_when_is_ready() { + let clock = Clock::steady(); + let context = Context::new(vec![]).unwrap(); + let period_ns: i64 = 1e6 as i64; // 1 millisecond. + let foo = Arc::new(Mutex::new(0i64)); + let foo_callback = foo.clone(); + let dut = Timer::new( + &clock, + &context, + period_ns, + Some(Box::new(move |x| *foo_callback.lock().unwrap() = x)), + ) + .unwrap(); + thread::sleep(time::Duration::from_micros(1500)); + assert!(dut.execute().is_ok()); + let x = *foo.lock().unwrap(); + assert!(x > 1500000i64); + assert!(x < 1600000i64); + } +} diff --git a/rclrs/src/wait.rs b/rclrs/src/wait.rs index 243c9d857..f19ebb8cb 100644 --- a/rclrs/src/wait.rs +++ b/rclrs/src/wait.rs @@ -20,7 +20,7 @@ use std::{sync::Arc, time::Duration, vec::Vec}; use crate::{ error::{to_rclrs_result, RclReturnCode, RclrsError, ToResult}, rcl_bindings::*, - ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, + ClientBase, Context, ContextHandle, Node, ServiceBase, SubscriptionBase, Timer, }; mod exclusivity_guard; @@ -51,6 +51,7 @@ pub struct WaitSet { guard_conditions: Vec>>, services: Vec>>, handle: WaitSetHandle, + timers: Vec>>, } /// A list of entities that are ready, returned by [`WaitSet::wait`]. @@ -63,6 +64,8 @@ pub struct ReadyEntities { pub guard_conditions: Vec>, /// A list of services that have potentially received requests. pub services: Vec>, + /// A list of timers that are potentially due. + pub timers: Vec>, } impl Drop for rcl_wait_set_t { @@ -127,6 +130,7 @@ impl WaitSet { rcl_wait_set, context_handle: Arc::clone(&context.handle), }, + timers: Vec::new(), }) } @@ -138,13 +142,14 @@ impl WaitSet { let live_clients = node.live_clients(); let live_guard_conditions = node.live_guard_conditions(); let live_services = node.live_services(); + let live_timers = node.live_timers(); let ctx = Context { handle: Arc::clone(&node.handle.context_handle), }; let mut wait_set = WaitSet::new( live_subscriptions.len(), live_guard_conditions.len(), - 0, + live_timers.len(), live_clients.len(), live_services.len(), 0, @@ -166,6 +171,10 @@ impl WaitSet { for live_service in &live_services { wait_set.add_service(live_service.clone())?; } + + for live_timer in &live_timers { + wait_set.add_timer(live_timer.clone())?; + } Ok(wait_set) } @@ -178,6 +187,7 @@ impl WaitSet { self.guard_conditions.clear(); self.clients.clear(); self.services.clear(); + self.timers.clear(); // This cannot fail – the rcl_wait_set_clear function only checks that the input handle is // valid, which it always is in our case. Hence, only debug_assert instead of returning // Result. @@ -311,6 +321,34 @@ impl WaitSet { Ok(()) } + /// Adds a timer to the wait set. + /// + /// # Errors + /// - If the timer was already added to this wait set or another one, + /// [`AlreadyAddedToWaitSet`][1] will be returned + /// - If the number of timer in the wait set is larger than the + /// capacity set in [`WaitSet::new`], [`WaitSetFull`][2] will be returned + /// + /// [1]: crate::RclrsError + /// [2]: crate::RclReturnCode + pub fn add_timer(&mut self, timer: Arc) -> Result<(), RclrsError> { + let exclusive_timer = + ExclusivityGuard::new(Arc::clone(&timer), Arc::clone(&timer.in_use_by_wait_set))?; + unsafe { + // SAFETY: I'm not sure if it's required, but the timer pointer will remain valid + // for as long as the wait set exists, because it's stored in self.timers. + // Passing in a null pointer for the third argument is explicitly allowed. + rcl_wait_set_add_timer( + &mut self.handle.rcl_wait_set, + &*timer.rcl_timer.lock().unwrap() as *const _, + core::ptr::null_mut(), + ) + } + .ok()?; + self.timers.push(exclusive_timer); + Ok(()) + } + /// Blocks until the wait set is ready, or until the timeout has been exceeded. /// /// If the timeout is `None` then this function will block indefinitely until @@ -365,6 +403,7 @@ impl WaitSet { clients: Vec::new(), guard_conditions: Vec::new(), services: Vec::new(), + timers: Vec::new(), }; for (i, subscription) in self.subscriptions.iter().enumerate() { // SAFETY: The `subscriptions` entry is an array of pointers, and this dereferencing is @@ -409,6 +448,16 @@ impl WaitSet { ready_entities.services.push(Arc::clone(&service.waitable)); } } + + for (i, timer) in self.timers.iter().enumerate() { + // SAFETY: The `timers` entry is an array of pointers, and this dereferencing is + // equivalent to + // https://github.com/ros2/rcl/blob/35a31b00a12f259d492bf53c0701003bd7f1745c/rcl/include/rcl/wait.h#L419 + let wait_set_entry = unsafe { *self.handle.rcl_wait_set.timers.add(i) }; + if !wait_set_entry.is_null() { + ready_entities.timers.push(Arc::clone(&timer.waitable)); + } + } Ok(ready_entities) } } @@ -416,6 +465,8 @@ impl WaitSet { #[cfg(test)] mod tests { use super::*; + use crate::clock::Clock; + use crate::timer::TimerCallback; #[test] fn traits() { @@ -440,4 +491,38 @@ mod tests { Ok(()) } + + #[test] + fn timer_in_wait_not_set_readies() -> Result<(), RclrsError> { + let context = Context::new([])?; + let clock = Clock::steady(); + let period: i64 = 1e6 as i64; // 1 millisecond. + let callback: Option = Some(Box::new(move |_| {})); + let timer = Arc::new(Timer::new(&clock, &context, period, callback)?); + + let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?; + wait_set.add_timer(timer.clone())?; + + let readies = wait_set.wait(Some(std::time::Duration::from_micros(0)))?; + assert!(!readies.timers.contains(&timer)); + + Ok(()) + } + + #[test] + fn timer_in_wait_set_readies() -> Result<(), RclrsError> { + let context = Context::new([])?; + let clock = Clock::steady(); + let period: i64 = 1e6 as i64; // 1 millisecond. + let callback: Option = Some(Box::new(move |_| {})); + let timer = Arc::new(Timer::new(&clock, &context, period, callback)?); + + let mut wait_set = WaitSet::new(0, 0, 1, 0, 0, 0, &context)?; + wait_set.add_timer(timer.clone())?; + + let readies = wait_set.wait(Some(std::time::Duration::from_micros(1500)))?; + assert!(readies.timers.contains(&timer)); + + Ok(()) + } }