diff --git a/rmw_fastrtps_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_cpp/src/rmw_publisher.cpp index 1d101719d..5907590fd 100644 --- a/rmw_fastrtps_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_cpp/src/rmw_publisher.cpp @@ -92,7 +92,12 @@ rmw_create_publisher( Domain::getDefaultPublisherAttributes(publisherParam); // TODO(karsten1987) Verify consequences for std::unique_ptr? - info = new CustomPublisherInfo(); + info = new (std::nothrow) CustomPublisherInfo(); + if (!info) { + RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo"); + return nullptr; + } + info->typesupport_identifier_ = type_support->typesupport_identifier; auto callbacks = static_cast(type_support->data); @@ -100,7 +105,11 @@ rmw_create_publisher( if (!Domain::getRegisteredType(participant, type_name.c_str(), reinterpret_cast(&info->type_support_))) { - info->type_support_ = new MessageTypeSupport_cpp(callbacks); + info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks); + if (!info->type_support_) { + RMW_SET_ERROR_MSG("Failed to allocate MessageTypeSupport"); + goto fail; + } _register_type(participant, info->type_support_); } @@ -128,8 +137,13 @@ rmw_create_publisher( goto fail; } - info->publisher_ = Domain::createPublisher(participant, publisherParam, nullptr); + info->listener_ = new (std::nothrow) PubListener(info); + if (!info->listener_) { + RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener"); + goto fail; + } + info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_); if (!info->publisher_) { RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); goto fail; @@ -171,6 +185,9 @@ rmw_create_publisher( if (info->type_support_ != nullptr) { delete info->type_support_; } + if (info->listener_ != nullptr) { + delete info->listener_; + } delete info; } @@ -181,6 +198,15 @@ rmw_create_publisher( return nullptr; } +rmw_ret_t +rmw_publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count) +{ + return rmw_fastrtps_shared_cpp::__rmw_publisher_count_matched_subscriptions( + publisher, subscription_count); +} + rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 887d0ba22..998b859f8 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -95,7 +95,12 @@ rmw_create_subscription( // Load default XML profile. Domain::getDefaultSubscriberAttributes(subscriberParam); - info = new CustomSubscriberInfo(); + info = new (std::nothrow) CustomSubscriberInfo(); + if (!info) { + RMW_SET_ERROR_MSG("failed to allocate CustomSubscriberInfo"); + return nullptr; + } + info->typesupport_identifier_ = type_support->typesupport_identifier; auto callbacks = static_cast(type_support->data); @@ -103,7 +108,11 @@ rmw_create_subscription( if (!Domain::getRegisteredType(participant, type_name.c_str(), reinterpret_cast(&info->type_support_))) { - info->type_support_ = new MessageTypeSupport_cpp(callbacks); + info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks); + if (!info->type_support_) { + RMW_SET_ERROR_MSG("failed to allocate MessageTypeSupport_cpp"); + goto fail; + } _register_type(participant, info->type_support_); } @@ -122,9 +131,13 @@ rmw_create_subscription( goto fail; } - info->listener_ = new SubListener(info); - info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_); + info->listener_ = new (std::nothrow) SubListener(info); + if (!info->listener_) { + RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener"); + goto fail; + } + info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_); if (!info->subscriber_) { RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber"); goto fail; @@ -154,6 +167,9 @@ rmw_create_subscription( if (info->type_support_ != nullptr) { delete info->type_support_; } + if (info->listener_ != nullptr) { + delete info->listener_; + } delete info; } @@ -164,6 +180,15 @@ rmw_create_subscription( return nullptr; } +rmw_ret_t +rmw_subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count) +{ + return rmw_fastrtps_shared_cpp::__rmw_subscription_count_matched_publishers( + subscription, publisher_count); +} + rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp index 505d5f710..ae44cfe03 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp @@ -91,7 +91,11 @@ rmw_create_publisher( Domain::getDefaultPublisherAttributes(publisherParam); // TODO(karsten1987) Verify consequences for std::unique_ptr? - info = new CustomPublisherInfo(); + info = new (std::nothrow) CustomPublisherInfo(); + if (!info) { + RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo"); + return nullptr; + } info->typesupport_identifier_ = type_support->typesupport_identifier; std::string type_name = _create_type_name( @@ -128,8 +132,13 @@ rmw_create_publisher( goto fail; } - info->publisher_ = Domain::createPublisher(participant, publisherParam, nullptr); + info->listener_ = new (std::nothrow) PubListener(info); + if (!info->listener_) { + RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener"); + goto fail; + } + info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_); if (!info->publisher_) { RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); goto fail; @@ -171,6 +180,9 @@ rmw_create_publisher( if (info->type_support_ != nullptr) { delete info->type_support_; } + if (info->listener_ != nullptr) { + delete info->listener_; + } delete info; } @@ -181,6 +193,15 @@ rmw_create_publisher( return nullptr; } +rmw_ret_t +rmw_publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count) +{ + return rmw_fastrtps_shared_cpp::__rmw_publisher_count_matched_subscriptions( + publisher, subscription_count); +} + rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher) { diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_serialize.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_serialize.cpp index 6af64af4f..64efe0910 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_serialize.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_serialize.cpp @@ -48,7 +48,8 @@ rmw_serialize( } } - eprosima::fastcdr::FastBuffer buffer(reinterpret_cast(serialized_message->buffer), + eprosima::fastcdr::FastBuffer buffer( + reinterpret_cast(serialized_message->buffer), data_length); eprosima::fastcdr::Cdr ser( buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp index 6253a4598..a421424fe 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp @@ -95,7 +95,11 @@ rmw_create_subscription( // Load default XML profile. Domain::getDefaultSubscriberAttributes(subscriberParam); - info = new CustomSubscriberInfo(); + info = new (std::nothrow) CustomSubscriberInfo(); + if (!info) { + RMW_SET_ERROR_MSG("failed to allocate CustomSubscriberInfo"); + return nullptr; + } info->typesupport_identifier_ = type_support->typesupport_identifier; std::string type_name = _create_type_name( @@ -123,9 +127,13 @@ rmw_create_subscription( goto fail; } - info->listener_ = new SubListener(info); - info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_); + info->listener_ = new (std::nothrow) SubListener(info); + if (!info->listener_) { + RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener"); + goto fail; + } + info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_); if (!info->subscriber_) { RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber"); goto fail; @@ -155,6 +163,9 @@ rmw_create_subscription( if (info->type_support_ != nullptr) { delete info->type_support_; } + if (info->listener_ != nullptr) { + delete info->listener_; + } delete info; } @@ -165,6 +176,15 @@ rmw_create_subscription( return nullptr; } +rmw_ret_t +rmw_subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count) +{ + return rmw_fastrtps_shared_cpp::__rmw_subscription_count_matched_publishers( + subscription, publisher_count); +} + rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) { diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 330977b9d..486f10869 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -15,18 +15,57 @@ #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ +#include +#include + #include "fastrtps/publisher/Publisher.h" +#include "fastrtps/publisher/PublisherListener.h" #include "rmw/rmw.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" +class PubListener; + typedef struct CustomPublisherInfo { eprosima::fastrtps::Publisher * publisher_; + PubListener * listener_; rmw_fastrtps_shared_cpp::TypeSupport * type_support_; rmw_gid_t publisher_gid; const char * typesupport_identifier_; } CustomPublisherInfo; +class PubListener : public eprosima::fastrtps::PublisherListener +{ +public: + explicit PubListener(CustomPublisherInfo * info) + { + (void) info; + } + + void + onPublicationMatched( + eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) + { + (void) pub; + std::lock_guard lock(internalMutex_); + if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) { + subscriptions_.insert(info.remoteEndpointGuid); + } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == info.status) { + subscriptions_.erase(info.remoteEndpointGuid); + } + } + + size_t subscriptionCount() + { + std::lock_guard lock(internalMutex_); + return subscriptions_.size(); + } + +private: + std::mutex internalMutex_; + std::set subscriptions_; +}; + #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index e597d6caf..f3938a146 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "fastrtps/subscriber/Subscriber.h" @@ -51,7 +52,13 @@ class SubListener : public eprosima::fastrtps::SubscriberListener eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) { (void)sub; - (void)info; + + std::lock_guard lock(internalMutex_); + if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) { + publishers_.insert(info.remoteEndpointGuid); + } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == info.status) { + publishers_.erase(info.remoteEndpointGuid); + } } void @@ -107,11 +114,19 @@ class SubListener : public eprosima::fastrtps::SubscriberListener } } + size_t publisherCount() + { + std::lock_guard lock(internalMutex_); + return publishers_.size(); + } + private: std::mutex internalMutex_; std::atomic_size_t data_; std::mutex * conditionMutex_; std::condition_variable * conditionVariable_; + + std::set publishers_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 44c06a77e..a4d968cb3 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -129,6 +129,12 @@ __rmw_destroy_publisher( rmw_node_t * node, rmw_publisher_t * publisher); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_send_request( @@ -193,6 +199,12 @@ __rmw_destroy_subscription( rmw_node_t * node, rmw_subscription_t * subscription); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_take( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp index 8458e6033..f31301e4d 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp @@ -64,6 +64,9 @@ __rmw_destroy_publisher( if (info->publisher_ != nullptr) { Domain::removePublisher(info->publisher_); } + if (info->listener_ != nullptr) { + delete info->listener_; + } if (info->type_support_ != nullptr) { auto impl = static_cast(node->data); if (!impl) { @@ -82,4 +85,20 @@ __rmw_destroy_publisher( return RMW_RET_OK; } + +rmw_ret_t +__rmw_publisher_count_matched_subscriptions( + const rmw_publisher_t * publisher, + size_t * subscription_count) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription_count, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(publisher->data); + if (info != nullptr) { + *subscription_count = info->listener_->subscriptionCount(); + } + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index 1a52e8dec..c237aa5cf 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -91,4 +91,20 @@ __rmw_destroy_subscription( return RMW_RET_OK; } + +rmw_ret_t +__rmw_subscription_count_matched_publishers( + const rmw_subscription_t * subscription, + size_t * publisher_count) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(publisher_count, RMW_RET_INVALID_ARGUMENT); + + auto info = static_cast(subscription->data); + if (info != nullptr) { + *publisher_count = info->listener_->publisherCount(); + } + return RMW_RET_OK; +} + } // namespace rmw_fastrtps_shared_cpp