Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace shared pointers in AnyExecutable with raw pointers #164

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions rclcpp/include/rclcpp/any_executable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,22 @@ namespace rclcpp
namespace executor
{

struct AnyExecutable
struct AnyExecutableState
{
// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase * subscription;
rclcpp::subscription::SubscriptionBase * subscription_intra_process;
rclcpp::timer::TimerBase * timer;
rclcpp::service::ServiceBase * service;
rclcpp::client::ClientBase * client;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure that the pointers stored in here are still valid when the executable is actually being processed? The instance might have been deleted by then.

// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup * callback_group;
rclcpp::node::Node * node;
};

class AnyExecutable
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(AnyExecutable);

RCLCPP_PUBLIC
Expand All @@ -36,15 +50,7 @@ struct AnyExecutable
RCLCPP_PUBLIC
virtual ~AnyExecutable();

// Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;
// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
rclcpp::node::Node::SharedPtr node;
AnyExecutableState state;
};

} // namespace executor
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/any_service_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class AnyServiceCallback
void dispatch(
std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<typename ServiceT::Request> request,
std::shared_ptr<typename ServiceT::Response> response)
std::shared_ptr<typename ServiceT::Response> response) const
{
if (shared_ptr_callback_ != nullptr) {
(void)request_header;
Expand Down
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/any_subscription_callback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class AnySubscriptionCallback
}

void dispatch(
std::shared_ptr<MessageT> message, const rmw_message_info_t & message_info)
std::shared_ptr<MessageT> message, const rmw_message_info_t & message_info) const
{
(void)message_info;
if (shared_ptr_callback_) {
Expand All @@ -179,7 +179,7 @@ class AnySubscriptionCallback
}

void dispatch_intra_process(
MessageUniquePtr & message, const rmw_message_info_t & message_info)
MessageUniquePtr & message, const rmw_message_info_t & message_info) const
{
(void)message_info;
if (shared_ptr_callback_) {
Expand Down
5 changes: 4 additions & 1 deletion rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ class CallbackGroup
const std::vector<rclcpp::client::ClientBase::SharedPtr> &
get_client_ptrs() const;

/*
RCLCPP_PUBLIC
std::atomic_bool &
can_be_taken_from();
*/

RCLCPP_PUBLIC
const CallbackGroupType &
Expand Down Expand Up @@ -101,7 +103,8 @@ class CallbackGroup
std::vector<rclcpp::timer::TimerBase::WeakPtr> timer_ptrs_;
std::vector<rclcpp::service::ServiceBase::SharedPtr> service_ptrs_;
std::vector<rclcpp::client::ClientBase::SharedPtr> client_ptrs_;
std::atomic_bool can_be_taken_from_;
public:
std::atomic_bool can_be_taken_from;
};

} // namespace callback_group
Expand Down
8 changes: 4 additions & 4 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class ClientBase
const rmw_client_t *
get_client_handle() const;

virtual std::shared_ptr<void> create_response() = 0;
virtual std::shared_ptr<void> create_request_header() = 0;
virtual std::shared_ptr<void> create_response() const = 0;
virtual std::shared_ptr<void> create_request_header() const = 0;
virtual void handle_response(
std::shared_ptr<void> & request_header, std::shared_ptr<void> & response) = 0;

Expand Down Expand Up @@ -99,12 +99,12 @@ class Client : public ClientBase
: ClientBase(node_handle, client_handle, service_name)
{}

std::shared_ptr<void> create_response()
std::shared_ptr<void> create_response() const
{
return std::shared_ptr<void>(new typename ServiceT::Response());
}

std::shared_ptr<void> create_request_header()
std::shared_ptr<void> create_request_header() const
{
// TODO(wjwwood): This should probably use rmw_request_id's allocator.
// (since it is a C type)
Expand Down
12 changes: 6 additions & 6 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,29 +207,29 @@ class Executor
*/
RCLCPP_PUBLIC
void
execute_any_executable(AnyExecutable::SharedPtr any_exec);
execute_any_executable(const AnyExecutable & any_exec) const;

RCLCPP_PUBLIC
static void
execute_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription);
const rclcpp::subscription::SubscriptionBase * subscription);

RCLCPP_PUBLIC
static void
execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription);
const rclcpp::subscription::SubscriptionBase * subscription);

RCLCPP_PUBLIC
static void
execute_timer(rclcpp::timer::TimerBase::SharedPtr timer);
execute_timer(rclcpp::timer::TimerBase * timer);

RCLCPP_PUBLIC
static void
execute_service(rclcpp::service::ServiceBase::SharedPtr service);
execute_service(rclcpp::service::ServiceBase * service);

RCLCPP_PUBLIC
static void
execute_client(rclcpp::client::ClientBase::SharedPtr client);
execute_client(rclcpp::client::ClientBase * client);

RCLCPP_PUBLIC
void
Expand Down
18 changes: 9 additions & 9 deletions rclcpp/include/rclcpp/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ class ServiceBase

RCLCPP_PUBLIC
std::string
get_service_name();
get_service_name() const;

RCLCPP_PUBLIC
const rmw_service_t *
get_service_handle();
get_service_handle() const;

virtual std::shared_ptr<void> create_request() = 0;
virtual std::shared_ptr<void> create_request_header() = 0;
virtual std::shared_ptr<void> create_request() const = 0;
virtual std::shared_ptr<void> create_request_header() const = 0;
virtual void handle_request(
std::shared_ptr<void> request_header,
std::shared_ptr<void> request) = 0;
std::shared_ptr<void> request) const = 0;

private:
RCLCPP_DISABLE_COPY(ServiceBase);
Expand Down Expand Up @@ -97,19 +97,19 @@ class Service : public ServiceBase

Service() = delete;

std::shared_ptr<void> create_request()
std::shared_ptr<void> create_request() const
{
return std::shared_ptr<void>(new typename ServiceT::Request());
}

std::shared_ptr<void> create_request_header()
std::shared_ptr<void> create_request_header() const
{
// TODO(wjwwood): This should probably use rmw_request_id's allocator.
// (since it is a C type)
return std::shared_ptr<void>(new rmw_request_id_t);
}

void handle_request(std::shared_ptr<void> request_header, std::shared_ptr<void> request)
void handle_request(std::shared_ptr<void> request_header, std::shared_ptr<void> request) const
{
auto typed_request = std::static_pointer_cast<typename ServiceT::Request>(request);
auto typed_request_header = std::static_pointer_cast<rmw_request_id_t>(request_header);
Expand All @@ -120,7 +120,7 @@ class Service : public ServiceBase

void send_response(
std::shared_ptr<rmw_request_id_t> req_id,
std::shared_ptr<typename ServiceT::Response> response)
std::shared_ptr<typename ServiceT::Response> response) const
{
rmw_ret_t status = rmw_send_response(get_service_handle(), req_id.get(), response.get());
if (status != RMW_RET_OK) {
Expand Down
28 changes: 14 additions & 14 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from().load()) {
if (!group || !group->can_be_taken_from.load()) {
continue;
}
for (auto & weak_subscription : group->get_subscription_ptrs()) {
Expand Down Expand Up @@ -190,20 +190,20 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
subscriber_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
if (!group->can_be_taken_from.load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
any_exec->state.subscription_intra_process = subscription.get();
} else {
any_exec->subscription = subscription;
any_exec->state.subscription = subscription.get();
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes);
any_exec->state.callback_group = group.get();
any_exec->state.node = get_node_by_group(group, weak_nodes).get();
subscriber_handles_.erase(it);
return;
}
Expand All @@ -228,16 +228,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
service_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
if (!group->can_be_taken_from.load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->service = service;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes);
any_exec->state.service = service.get();
any_exec->state.callback_group = group.get();
any_exec->state.node = get_node_by_group(group, weak_nodes).get();
service_handles_.erase(it);
return;
}
Expand All @@ -261,16 +261,16 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
client_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
if (!group->can_be_taken_from.load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->client = client;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes);
any_exec->state.client = client.get();
any_exec->state.callback_group = group.get();
any_exec->state.node = get_node_by_group(group, weak_nodes).get();
client_handles_.erase(it);
return;
}
Expand Down
17 changes: 9 additions & 8 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,24 @@ class SubscriptionBase
/// Borrow a new message.
// \return Shared pointer to the fresh message.
virtual std::shared_ptr<void>
create_message() = 0;
create_message() const = 0;
/// Check if we need to handle the message, and execute the callback if we do.
/**
* \param[in] message Shared pointer to the message to handle.
* \param[in] message_info Metadata associated with this message.
*/
virtual void
handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) const = 0;

/// Return the message borrowed in create_message.
// \param[in] Shared pointer to the returned message.
virtual void
return_message(std::shared_ptr<void> & message) = 0;
return_message(std::shared_ptr<void> & message) const = 0;

virtual void
handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) = 0;
const rmw_message_info_t & message_info) const = 0;

protected:
rmw_subscription_t * intra_process_subscription_handle_;
Expand Down Expand Up @@ -168,7 +169,7 @@ class Subscription : public SubscriptionBase
{
message_memory_strategy_ = message_memory_strategy;
}
std::shared_ptr<void> create_message()
std::shared_ptr<void> create_message() const
{
/* The default message memory strategy provides a dynamically allocated message on each call to
* create_message, though alternative memory strategies that re-use a preallocated message may be
Expand All @@ -177,7 +178,7 @@ class Subscription : public SubscriptionBase
return message_memory_strategy_->borrow_message();
}

void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) const
{
if (matches_any_intra_process_publishers_) {
if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
Expand All @@ -190,15 +191,15 @@ class Subscription : public SubscriptionBase
any_callback_.dispatch(typed_message, message_info);
}

void return_message(std::shared_ptr<void> & message)
void return_message(std::shared_ptr<void> & message) const
{
auto typed_message = std::static_pointer_cast<MessageT>(message);
message_memory_strategy_->return_message(typed_message);
}

void handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info)
const rmw_message_info_t & message_info) const
{
if (!get_intra_process_message_callback_) {
// throw std::runtime_error(
Expand Down
21 changes: 11 additions & 10 deletions rclcpp/src/rclcpp/any_executable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
using rclcpp::executor::AnyExecutable;

AnyExecutable::AnyExecutable()
: subscription(nullptr),
subscription_intra_process(nullptr),
timer(nullptr),
service(nullptr),
client(nullptr),
callback_group(nullptr),
node(nullptr)
{}
{
state.subscription = nullptr;
state.subscription_intra_process = nullptr;
state.timer = nullptr;
state.service = nullptr;
state.client = nullptr;
state.callback_group = nullptr;
state.node = nullptr;
}

AnyExecutable::~AnyExecutable()
{
// Make sure that discarded (taken but not executed) AnyExecutable's have
// their callback groups reset. This can happen when an executor is canceled
// between taking an AnyExecutable and executing it.
if (callback_group) {
callback_group->can_be_taken_from().store(true);
if (state.callback_group) {
state.callback_group->can_be_taken_from.store(true);
}
}
Loading