Skip to content

Commit

Permalink
Merge pull request #165 from ros2/intra_process_lock
Browse files Browse the repository at this point in the history
Make intra-process manager thread safe, rename IPMState to IPMImpl
  • Loading branch information
jacquelinekay committed Dec 3, 2015
2 parents f5e5acd + 67151de commit f73ebcb
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 33 deletions.
2 changes: 1 addition & 1 deletion rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/intra_process_manager.cpp
src/rclcpp/intra_process_manager_state.cpp
src/rclcpp/intra_process_manager_impl.cpp
src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp
src/rclcpp/parameter.cpp
Expand Down
14 changes: 7 additions & 7 deletions rclcpp/include/rclcpp/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include <set>

#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/intra_process_manager_state.hpp"
#include "rclcpp/intra_process_manager_impl.hpp"
#include "rclcpp/mapped_ring_buffer.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/publisher.hpp"
Expand Down Expand Up @@ -127,7 +127,7 @@ class IntraProcessManager

RCLCPP_PUBLIC
explicit IntraProcessManager(
IntraProcessManagerStateBase::SharedPtr state = create_default_state());
IntraProcessManagerImplBase::SharedPtr state = create_default_impl());

RCLCPP_PUBLIC
virtual ~IntraProcessManager();
Expand Down Expand Up @@ -189,7 +189,7 @@ class IntraProcessManager
auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT,
typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>::make_shared(
size, publisher->get_allocator());
state_->add_publisher(id, publisher, mrb, size);
impl_->add_publisher(id, publisher, mrb, size);
return id;
}

Expand Down Expand Up @@ -242,7 +242,7 @@ class IntraProcessManager
using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer<MessageT, MRBMessageAlloc>;
uint64_t message_seq = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id(
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
intra_process_publisher_id, message_seq);
typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
if (!typed_buffer) {
Expand All @@ -254,7 +254,7 @@ class IntraProcessManager
// TODO(wjwwood): do something when a message was displaced. log debug?
(void)did_replace; // Avoid unused variable warning.

state_->store_intra_process_message(intra_process_publisher_id, message_seq);
impl_->store_intra_process_message(intra_process_publisher_id, message_seq);

// Return the message sequence which is sent to the subscription.
return message_seq;
Expand Down Expand Up @@ -308,7 +308,7 @@ class IntraProcessManager
message = nullptr;

size_t target_subs_size = 0;
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message(
mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
intra_process_publisher_id,
message_sequence_number,
requesting_subscriptions_intra_process_id,
Expand Down Expand Up @@ -338,7 +338,7 @@ class IntraProcessManager
static uint64_t
get_next_unique_id();

IntraProcessManagerStateBase::SharedPtr state_;
IntraProcessManagerImplBase::SharedPtr impl_;
};

} // namespace intra_process_manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
#define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_

#include <algorithm>
#include <atomic>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
Expand All @@ -37,13 +38,13 @@ namespace rclcpp
namespace intra_process_manager
{

class IntraProcessManagerStateBase
class IntraProcessManagerImplBase
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase);
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerImplBase);

IntraProcessManagerStateBase() = default;
~IntraProcessManagerStateBase() = default;
IntraProcessManagerImplBase() = default;
~IntraProcessManagerImplBase() = default;

virtual void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
Expand Down Expand Up @@ -77,15 +78,15 @@ class IntraProcessManagerStateBase
matches_any_publishers(const rmw_gid_t * id) const = 0;

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerStateBase);
RCLCPP_DISABLE_COPY(IntraProcessManagerImplBase);
};

template<typename Allocator = std::allocator<void>>
class IntraProcessManagerState : public IntraProcessManagerStateBase
class IntraProcessManagerImpl : public IntraProcessManagerImplBase
{
public:
IntraProcessManagerState() = default;
~IntraProcessManagerState() = default;
IntraProcessManagerImpl() = default;
~IntraProcessManagerImpl() = default;

void
add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
Expand Down Expand Up @@ -152,6 +153,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
void
store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
{
std::lock_guard<std::mutex> lock(runtime_mutex_);
auto it = publishers_.find(intra_process_publisher_id);
if (it == publishers_.end()) {
throw std::runtime_error("store_intra_process_message called with invalid publisher id");
Expand Down Expand Up @@ -184,6 +186,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
size_t & size
)
{
std::lock_guard<std::mutex> lock(runtime_mutex_);
PublisherInfo * info;
{
auto it = publishers_.find(intra_process_publisher_id);
Expand Down Expand Up @@ -233,7 +236,7 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
}

private:
RCLCPP_DISABLE_COPY(IntraProcessManagerState);
RCLCPP_DISABLE_COPY(IntraProcessManagerImpl);

template<typename T>
using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
Expand Down Expand Up @@ -270,13 +273,15 @@ class IntraProcessManagerState : public IntraProcessManagerStateBase
RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;

PublisherMap publishers_;

std::mutex runtime_mutex_;
};

RCLCPP_PUBLIC
IntraProcessManagerStateBase::SharedPtr
create_default_state();
IntraProcessManagerImplBase::SharedPtr
create_default_impl();

} // namespace intra_process_manager
} // namespace rclcpp

#endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_
#endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
12 changes: 6 additions & 6 deletions rclcpp/src/rclcpp/intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ namespace intra_process_manager
static std::atomic<uint64_t> _next_unique_id {1};

IntraProcessManager::IntraProcessManager(
rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr state)
: state_(state)
rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr impl)
: impl_(impl)
{}

IntraProcessManager::~IntraProcessManager()
Expand All @@ -34,26 +34,26 @@ IntraProcessManager::add_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription)
{
auto id = IntraProcessManager::get_next_unique_id();
state_->add_subscription(id, subscription);
impl_->add_subscription(id, subscription);
return id;
}

void
IntraProcessManager::remove_subscription(uint64_t intra_process_subscription_id)
{
state_->remove_subscription(intra_process_subscription_id);
impl_->remove_subscription(intra_process_subscription_id);
}

void
IntraProcessManager::remove_publisher(uint64_t intra_process_publisher_id)
{
state_->remove_publisher(intra_process_publisher_id);
impl_->remove_publisher(intra_process_publisher_id);
}

bool
IntraProcessManager::matches_any_publishers(const rmw_gid_t * id) const
{
return state_->matches_any_publishers(id);
return impl_->matches_any_publishers(id);
}

uint64_t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rclcpp/intra_process_manager_state.hpp"
#include "rclcpp/intra_process_manager_impl.hpp"

#include <memory>

rclcpp::intra_process_manager::IntraProcessManagerStateBase::SharedPtr
rclcpp::intra_process_manager::create_default_state()
rclcpp::intra_process_manager::IntraProcessManagerImplBase::SharedPtr
rclcpp::intra_process_manager::create_default_impl()
{
return std::make_shared<IntraProcessManagerState<>>();
return std::make_shared<IntraProcessManagerImpl<>>();
}
2 changes: 1 addition & 1 deletion rclcpp/test/test_intra_process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class SubscriptionBase
#define PublisherBase mock::PublisherBase
#define SubscriptionBase mock::SubscriptionBase
#include "../src/rclcpp/intra_process_manager.cpp"
#include "../src/rclcpp/intra_process_manager_state.cpp"
#include "../src/rclcpp/intra_process_manager_impl.cpp"
#undef SubscriptionBase
#undef Publisher
#undef PublisherBase
Expand Down

0 comments on commit f73ebcb

Please sign in to comment.