Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make intra-process manager thread safe, rename IPMState to IPMImpl #165

Merged
merged 2 commits into from
Dec 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/intra_process_manager.cpp
src/rclcpp/intra_process_manager_state.cpp
src/rclcpp/intra_process_manager_impl.cpp
src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp
src/rclcpp/parameter.cpp
Expand Down
14 changes: 7 additions & 7 deletions rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include <set>

#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager_state.hpp"
#include "rclcpp/intra_process_manager_impl.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher.hpp"
Expand Down Expand Up @@ -127,7 +127,7 @@ class IntraProcessManager

RCLCPP_PUBLIC
explicit IntraProcessManager(
IntraProcessManagerStateBase::SharedPtr state = create_default_state());
IntraProcessManagerImplBase::SharedPtr state = create_default_impl());

RCLCPP_PUBLIC
virtual ~IntraProcessManager();
Expand Down Expand Up @@ -189,7 +189,7 @@ class IntraProcessManager
auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT,
typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>::make_shared(
size, publisher->get_allocator());
state_->add_publisher(id, publisher, mrb, size);
impl_->add_publisher(id, publisher, mrb, size);
return id;
}

Expand Down Expand Up @@ -242,7 +242,7 @@ class IntraProcessManager
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id(
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
intra_process_publisher_id, message_seq);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
Expand All @@ -254,7 +254,7 @@ class IntraProcessManager
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.

state_->store_intra_process_message(intra_process_publisher_id, message_seq);
impl_->store_intra_process_message(intra_process_publisher_id, message_seq);

// Return the message sequence which is sent to the subscription.
return message_seq;
Expand Down Expand Up @@ -308,7 +308,7 @@ class IntraProcessManager
message = nullptr;

size_t target_subs_size = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message(
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
intra_process_publisher_id,
message_sequence_number,
requesting_subscriptions_intra_process_id,
Expand Down Expand Up @@ -338,7 +338,7 @@ class IntraProcessManager
static uint64_t
get_next_unique_id();

IntraProcessManagerStateBase::SharedPtr state_;
IntraProcessManagerImplBase::SharedPtr impl_;
};

} // namespace intra_process_manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
#define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_

#include <algorithm>
#include <atomic>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
Expand All @@ -37,13 +38,13 @@ namespace rclcpp
namespace intra_process_manager
{

class IntraProcessManagerStateBase
class IntraProcessManagerImplBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase);
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerImplBase);

IntraProcessManagerStateBase() = default;
~IntraProcessManagerStateBase() = default;
IntraProcessManagerImplBase() = default;
~IntraProcessManagerImplBase() = default;

virtual void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
Expand Down Expand Up @@ -77,15 +78,15 @@ class IntraProcessManagerStateBase
matches_any_publishers(const rmw_gid_t * id) const = 0;

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerStateBase);
RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase);
};

template<typename Allocator = std::allocator<void>>
class IntraProcessManagerState : public IntraProcessManagerStateBase
class IntraProcessManagerImpl : public IntraProcessManagerImplBase
{
public:
IntraProcessManagerState() = default;
~IntraProcessManagerState() = default;
IntraProcessManagerImpl() = default;
~IntraProcessManagerImpl() = default;

void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
Expand Down Expand Up @@ -152,6 +153,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
{
std::lock_guard<std::mutex> lock(runtime_mutex_);
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
Expand Down Expand Up @@ -184,6 +186,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
size_t & size
)
{
std::lock_guard<std::mutex> lock(runtime_mutex_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this lock be moved into the next block, where we iterate over publishers_? Or do we need to hold it all the way through the other two blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement this more optimally, I could have one mutex for publishers_ and then a mutex for each PublisherInfo entry in publishers_. That way one thread could look up an entry in publishers_ while the other is looking up something in the map owned by an entry in publishers (I believe that would be fine). And yes, that implementation would include moving the mutex for publishers_ into the block where find is invoked one the map.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A mutex per item sounds like overkill; I would leave it as is.

I hadn't dug into the data types being handled, and didn't realize that you still need exclusion after pulling the items out of publishers_.

PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
Expand Down Expand Up @@ -233,7 +236,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
}

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerState);
RCLCPP_DISABLE_COPY(IntraProcessManagerImpl);

template<typename T>
using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
Expand Down Expand Up @@ -270,13 +273,15 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;

PublisherMap publishers_;

std::mutex runtime_mutex_;
};

RCLCPP_PUBLIC
IntraProcessManagerStateBase::SharedPtr
create_default_state();
IntraProcessManagerImplBase::SharedPtr
create_default_impl();

} // namespace intra_process_manager
} // namespace rclcpp

#endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
12 changes: 6 additions & 6 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ namespace intra_process_manager
static std::atomic<uint64_t> _next_unique_id {1};

IntraProcessManager::IntraProcessManager(
rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr state)
: state_(state)
rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr impl)
: impl_(impl)
{}

IntraProcessManager::~IntraProcessManager()
Expand All @@ -34,26 +34,26 @@ IntraProcessManager::add_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription)
{
auto id = IntraProcessManager::get_next_unique_id();
state_->add_subscription(id, subscription);
impl_->add_subscription(id, subscription);
return id;
}

void
IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id)
{
state_->remove_subscription(intra_process_subscription_id);
impl_->remove_subscription(intra_process_subscription_id);
}

void
IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{
state_->remove_publisher(intra_process_publisher_id);
impl_->remove_publisher(intra_process_publisher_id);
}

bool
IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{
return state_->matches_any_publishers(id);
return impl_->matches_any_publishers(id);
}

uint64_t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rclcpp/intra_process_manager_state.hpp"
#include "rclcpp/intra_process_manager_impl.hpp"

#include <memory>

rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr
rclcpp::intra_process_manager::create_default_state()
rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr
rclcpp::intra_process_manager::create_default_impl()
{
return std::make_shared<IntraProcessManagerState<>>();
return std::make_shared<IntraProcessManagerImpl<>>();
}
2 changes: 1 addition & 1 deletion rclcpp/test/test_intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class SubscriptionBase
#define PublisherBase mock::PublisherBase
#define SubscriptionBase mock::SubscriptionBase
#include "../src/rclcpp/intra_process_manager.cpp"
#include "../src/rclcpp/intra_process_manager_state.cpp"
#include "../src/rclcpp/intra_process_manager_impl.cpp"
#undef SubscriptionBase
#undef Publisher
#undef PublisherBase
Expand Down