Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove thread and guard condition from Timer #69

Merged
merged 1 commit into from
Aug 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 63 additions & 77 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ class Executor
// Collect the subscriptions and timers to be waited on
bool has_invalid_weak_nodes = false;
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr> subs;
std::vector<rclcpp::timer::TimerBase::SharedPtr> timers;
std::vector<rclcpp::service::ServiceBase::SharedPtr> services;
std::vector<rclcpp::client::ClientBase::SharedPtr> clients;
for (auto & weak_node : weak_nodes_) {
Expand All @@ -271,12 +270,6 @@ class Executor
subs.push_back(subscription);
}
}
for (auto & weak_timer : group->timer_ptrs_) {
auto timer = weak_timer.lock();
if (timer) {
timers.push_back(timer);
}
}
for (auto & service : group->service_ptrs_) {
services.push_back(service);
}
Expand Down Expand Up @@ -349,10 +342,9 @@ class Executor
client_handle_index += 1;
}

// Use the number of guard conditions to allocate memory in the handles
// Add 2 to the number for the ctrl-c guard cond and the executor's
size_t start_of_timer_guard_conds = 2;
size_t number_of_guard_conds = timers.size() + start_of_timer_guard_conds;
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
size_t number_of_guard_conds = 2;
rmw_guard_conditions_t guard_condition_handles;
guard_condition_handles.guard_condition_count = number_of_guard_conds;
guard_condition_handles.guard_conditions =
Expand All @@ -368,21 +360,25 @@ class Executor
// Put the executor's guard condition in
guard_condition_handles.guard_conditions[1] = \
interrupt_guard_condition_->data;
// Then fill the SubscriberHandles with ready subscriptions
size_t guard_cond_handle_index = start_of_timer_guard_conds;
for (auto & timer : timers) {
guard_condition_handles.guard_conditions[guard_cond_handle_index] = \
timer->guard_condition_->data;
guard_cond_handle_index += 1;
}

rmw_time_t * wait_timeout = NULL;
rmw_time_t rmw_timeout;

if (timeout >= std::chrono::duration<int64_t, T>::zero()) {
auto next_timer_duration = get_earliest_timer();
// If the next timer timeout must preempt the requested timeout
// or if the requested timeout blocks forever, and there exists a valid timer,
// replace the requested timeout with the next timeout.
bool has_valid_timer = next_timer_duration >= std::chrono::nanoseconds::zero();
if ((next_timer_duration < timeout ||
timeout < std::chrono::duration<int64_t, T>::zero()) && has_valid_timer)
{
rmw_timeout.sec =
std::chrono::duration_cast<std::chrono::seconds>(next_timer_duration).count();
rmw_timeout.nsec = next_timer_duration.count() % (1000 * 1000 * 1000);
wait_timeout = &rmw_timeout;
} else if (timeout >= std::chrono::duration<int64_t, T>::zero()) {
// Convert timeout representation to rmw_time
auto timeout_sec = std::chrono::duration_cast<std::chrono::seconds>(timeout);
rmw_timeout.sec = timeout_sec.count();
rmw_timeout.sec = std::chrono::duration_cast<std::chrono::seconds>(timeout).count();
rmw_timeout.nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count() %
(1000 * 1000 * 1000);
wait_timeout = &rmw_timeout;
Expand Down Expand Up @@ -419,13 +415,6 @@ class Executor
subscriber_handles_.push_back(handle);
}
}
// Then the timers, start with start_of_timer_guard_conds
for (size_t i = start_of_timer_guard_conds; i < number_of_guard_conds; ++i) {
void * handle = guard_condition_handles.guard_conditions[i];
if (handle) {
guard_condition_handles_.push_back(handle);
}
}
// Then the services
for (size_t i = 0; i < number_of_services; ++i) {
void * handle = service_handles.services[i];
Expand Down Expand Up @@ -478,30 +467,6 @@ class Executor
return rclcpp::subscription::SubscriptionBase::SharedPtr();
}

rclcpp::timer::TimerBase::SharedPtr
get_timer_by_handle(void * guard_condition_handle)
{
for (auto weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto weak_group : node->callback_groups_) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto weak_timer : group->timer_ptrs_) {
auto timer = weak_timer.lock();
if (timer && timer->guard_condition_->data == guard_condition_handle) {
return timer;
}
}
}
}
return rclcpp::timer::TimerBase::SharedPtr();
}

rclcpp::service::ServiceBase::SharedPtr
get_service_by_handle(void * service_handle)
{
Expand Down Expand Up @@ -548,7 +513,6 @@ class Executor
return rclcpp::client::ClientBase::SharedPtr();
}


rclcpp::node::Node::SharedPtr
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr & group)
{
Expand Down Expand Up @@ -598,34 +562,58 @@ class Executor
void
get_next_timer(AnyExecutable::SharedPtr & any_exec)
{
auto it = guard_condition_handles_.begin();
while (it != guard_condition_handles_.end()) {
auto timer = get_timer_by_handle(*it);
if (timer) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_timer(timer);
if (!group) {
// Group was not found, meaning the timer is not valid...
// Remove it from the ready list and continue looking
guard_condition_handles_.erase(it++);
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->callback_groups_) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from_.load()) {
continue;
}
if (!group->can_be_taken_from_.load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
for (auto & timer_ref : group->timer_ptrs_) {
auto timer = timer_ref.lock();
if (timer && timer->check_and_trigger()) {
any_exec->timer = timer;
any_exec->callback_group = group;
node = get_node_by_group(group);
return;
}
}
}
}
}

std::chrono::nanoseconds
get_earliest_timer()
{
std::chrono::nanoseconds latest = std::chrono::nanoseconds::max();
bool timers_empty = true;
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->callback_groups_) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from_.load()) {
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->timer = timer;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
guard_condition_handles_.erase(it++);
return;
for (auto & timer_ref : group->timer_ptrs_) {
timers_empty = false;
// Check the expected trigger time
auto timer = timer_ref.lock();
if (timer && timer->time_until_trigger() < latest) {
latest = timer->time_until_trigger();
}
}
}
// Else, the timer is no longer valid, remove it and continue
guard_condition_handles_.erase(it++);
}
if (timers_empty) {
return std::chrono::nanoseconds(-1);
}
return latest;
}

rclcpp::callback_group::CallbackGroup::SharedPtr
Expand Down Expand Up @@ -867,8 +855,6 @@ class Executor
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
typedef std::list<void *> SubscriberHandles;
SubscriberHandles subscriber_handles_;
typedef std::list<void *> GuardConditionHandles;
GuardConditionHandles guard_condition_handles_;
typedef std::list<void *> ServiceHandles;
ServiceHandles service_handles_;
typedef std::list<void *> ClientHandles;
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/rate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ class GenericRate : public RateBase
last_interval_ = Clock::now();
}

std::chrono::nanoseconds period() const
{
return period_;
}

private:
RCLCPP_DISABLE_COPY(GenericRate);

Expand Down
72 changes: 40 additions & 32 deletions rclcpp/include/rclcpp/timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,12 @@ class TimerBase
TimerBase(std::chrono::nanoseconds period, CallbackType callback)
: period_(period),
callback_(callback),
guard_condition_(rmw_create_guard_condition()),
canceled_(false)
{
if (!guard_condition_) {
// TODO(wjwwood): use custom exception
throw std::runtime_error(
std::string("failed to create guard condition: ") +
rmw_get_error_string_safe()
);
}
}

virtual ~TimerBase()
{
if (guard_condition_) {
if (rmw_destroy_guard_condition(guard_condition_) != RMW_RET_OK) {
std::stringstream ss;
ss << "Error in TimerBase destructor, rmw_destroy_guard_condition failed: " <<
rmw_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
}
}

void
Expand All @@ -83,12 +67,17 @@ class TimerBase
this->canceled_ = true;
}

virtual std::chrono::nanoseconds
time_until_trigger() = 0;

virtual bool is_steady() = 0;

// Interface for externally triggering the timer event
virtual bool check_and_trigger() = 0;

protected:
std::chrono::nanoseconds period_;
CallbackType callback_;
rmw_guard_condition_t * guard_condition_;

bool canceled_;

Expand All @@ -106,29 +95,48 @@ class GenericTimer : public TimerBase
GenericTimer(std::chrono::nanoseconds period, CallbackType callback)
: TimerBase(period, callback), loop_rate_(period)
{
thread_ = std::thread(&GenericTimer<Clock>::run, this);
/* Subtracting the loop rate period ensures that the callback gets triggered
on the first call to check_and_trigger. */
last_triggered_time_ = Clock::now() - period;
}

virtual ~GenericTimer()
{
cancel();
thread_.join();
}

void
run()
// return: true to trigger callback on the next "execute_timer" call in executor
bool
check_and_trigger()
{
if (canceled_) {
return false;
}
if (Clock::now() < last_triggered_time_) {
return false;
}
if (std::chrono::duration_cast<std::chrono::nanoseconds>(Clock::now() - last_triggered_time_) >=
loop_rate_.period())
{
last_triggered_time_ = Clock::now();
return true;
}
return false;
}

std::chrono::nanoseconds
time_until_trigger()
{
while (rclcpp::utilities::ok() && !this->canceled_) {
loop_rate_.sleep();
if (!rclcpp::utilities::ok()) {
return;
}
rmw_ret_t status = rmw_trigger_guard_condition(guard_condition_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe());
}
std::chrono::nanoseconds time_until_trigger;
// Calculate the time between the next trigger and the current time
if (last_triggered_time_ + loop_rate_.period() < Clock::now()) {
// time is overdue, need to trigger immediately
time_until_trigger = std::chrono::nanoseconds::zero();
} else {
time_until_trigger = std::chrono::duration_cast<std::chrono::nanoseconds>(
last_triggered_time_ - Clock::now()) + loop_rate_.period();
}
return time_until_trigger;
}

virtual bool
Expand All @@ -140,8 +148,8 @@ class GenericTimer : public TimerBase
private:
RCLCPP_DISABLE_COPY(GenericTimer);

std::thread thread_;
rclcpp::rate::GenericRate<Clock> loop_rate_;
std::chrono::time_point<Clock> last_triggered_time_;

};

Expand Down