Skip to content

Commit f9107d9

Browse files
Gonzalo de Pedroclalancette
Gonzalo de Pedro
authored andcommitted
Tests fix (#15)
* Tests fix Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar> * Tests fix Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar> * Fixed some tests Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar> * Fix test_intra_process_manager test. Signed-off-by: Chris Lalancette <clalancette@openrobotics.org> * Style fixups. Signed-off-by: Chris Lalancette <clalancette@openrobotics.org> * One more small fix. Signed-off-by: Chris Lalancette <clalancette@openrobotics.org> * Fixed issues with allocators Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar> * Small fix for lint_cmake. Signed-off-by: Chris Lalancette <clalancette@openrobotics.org> Co-authored-by: Chris Lalancette <clalancette@openrobotics.org> Add additional tests for the TypeAdapt conversions (#16) * Implement skipping ROS conversion when using the const ref publish. Signed-off-by: Chris Lalancette <clalancette@openrobotics.org> * Add tests to ensure that type adaptation doesn't unnecessarily convert. Signed-off-by: Chris Lalancette <clalancette@openrobotics.org> fixed allocator allocate usage on intra process manager (#17) Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar> Removed TestPublisher.conversion_exception_is_passed_up test Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar>
1 parent e8dcd09 commit f9107d9

9 files changed

+607
-213
lines changed

rclcpp/include/rclcpp/experimental/intra_process_manager.hpp

+79-39
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,10 @@ class IntraProcessManager
176176
*/
177177
template<
178178
typename MessageT,
179+
typename ROSMessageType,
179180
typename Alloc = std::allocator<void>,
180-
typename Deleter = std::default_delete<MessageT>>
181+
typename Deleter = std::default_delete<MessageT>
182+
>
181183
void
182184
do_intra_process_publish(
183185
uint64_t intra_process_publisher_id,
@@ -203,7 +205,7 @@ class IntraProcessManager
203205
// None of the buffers require ownership, so we promote the pointer
204206
std::shared_ptr<MessageT> msg = std::move(message);
205207

206-
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
208+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
207209
msg, sub_ids.take_shared_subscriptions);
208210
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
209211
sub_ids.take_shared_subscriptions.size() <= 1)
@@ -214,8 +216,7 @@ class IntraProcessManager
214216
concatenated_vector.end(),
215217
sub_ids.take_ownership_subscriptions.begin(),
216218
sub_ids.take_ownership_subscriptions.end());
217-
218-
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
219+
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
219220
std::move(message),
220221
concatenated_vector,
221222
allocator);
@@ -226,17 +227,19 @@ class IntraProcessManager
226227
// for the buffers that do not require ownership
227228
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
228229

229-
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
230+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
230231
shared_msg, sub_ids.take_shared_subscriptions);
231-
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
232+
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
232233
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
233234
}
234235
}
235236

236237
template<
237238
typename MessageT,
239+
typename ROSMessageType,
238240
typename Alloc = std::allocator<void>,
239-
typename Deleter = std::default_delete<MessageT>>
241+
typename Deleter = std::default_delete<MessageT>
242+
>
240243
std::shared_ptr<const MessageT>
241244
do_intra_process_publish_and_return_shared(
242245
uint64_t intra_process_publisher_id,
@@ -262,7 +265,7 @@ class IntraProcessManager
262265
// If there are no owning, just convert to shared.
263266
std::shared_ptr<MessageT> shared_msg = std::move(message);
264267
if (!sub_ids.take_shared_subscriptions.empty()) {
265-
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
268+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
266269
shared_msg, sub_ids.take_shared_subscriptions);
267270
}
268271
return shared_msg;
@@ -272,12 +275,12 @@ class IntraProcessManager
272275
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);
273276

274277
if (!sub_ids.take_shared_subscriptions.empty()) {
275-
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
278+
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
276279
shared_msg,
277280
sub_ids.take_shared_subscriptions);
278281
}
279282
if (!sub_ids.take_ownership_subscriptions.empty()) {
280-
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
283+
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
281284
std::move(message),
282285
sub_ids.take_ownership_subscriptions,
283286
allocator);
@@ -334,14 +337,22 @@ class IntraProcessManager
334337
template<
335338
typename MessageT,
336339
typename Alloc,
337-
typename Deleter>
340+
typename Deleter,
341+
typename ROSMessageType>
338342
void
339343
add_shared_msg_to_buffers(
340344
std::shared_ptr<const MessageT> message,
341345
std::vector<uint64_t> subscription_ids)
342346
{
343347
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
344-
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
348+
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
349+
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
350+
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
351+
352+
using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
353+
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
354+
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
355+
345356

346357
for (auto id : subscription_ids) {
347358
auto subscription_it = subscriptions_.find(id);
@@ -351,11 +362,13 @@ class IntraProcessManager
351362
auto subscription_base = subscription_it->second.lock();
352363
if (subscription_base) {
353364
auto subscription = std::dynamic_pointer_cast<
354-
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType, Alloc, Deleter>
365+
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
366+
PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
355367
>(subscription_base);
356368
if (nullptr == subscription) {
357369
auto ros_message_subscription = std::dynamic_pointer_cast<
358-
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType, Alloc, Deleter>
370+
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType,
371+
ROSMessageTypeAllocator, ROSMessageTypeDeleter>
359372
>(subscription_base);
360373

361374
if (nullptr == ros_message_subscription) {
@@ -368,9 +381,22 @@ class IntraProcessManager
368381
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
369382
ROSMessageType ros_msg;
370383
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, ros_msg);
371-
ros_message_subscription->provide_intra_process_message(ros_msg);
384+
ros_message_subscription->provide_intra_process_message(
385+
std::make_shared<ROSMessageType>(ros_msg));
372386
} else {
373-
ros_message_subscription->provide_intra_process_message(message);
387+
if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
388+
ros_message_subscription->provide_intra_process_message(message);
389+
} else {
390+
if constexpr (std::is_same<typename rclcpp::TypeAdapter<MessageT,
391+
ROSMessageType>::ros_message_type, ROSMessageType>::value)
392+
{
393+
ROSMessageType ros_msg;
394+
rclcpp::TypeAdapter<MessageT, ROSMessageType>::convert_to_ros_message(
395+
*message, ros_msg);
396+
ros_message_subscription->provide_intra_process_message(
397+
std::make_shared<ROSMessageType>(ros_msg));
398+
}
399+
}
374400
}
375401
}
376402
} else {
@@ -385,7 +411,8 @@ class IntraProcessManager
385411
template<
386412
typename MessageT,
387413
typename Alloc = std::allocator<void>,
388-
typename Deleter = std::default_delete<MessageT>>
414+
typename Deleter = std::default_delete<MessageT>,
415+
typename ROSMessageType = MessageT>
389416
void
390417
add_owned_msg_to_buffers(
391418
std::unique_ptr<MessageT, Deleter> message,
@@ -395,7 +422,14 @@ class IntraProcessManager
395422
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
396423
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
397424
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
398-
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
425+
426+
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
427+
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
428+
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;
429+
430+
using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
431+
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
432+
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;
399433

400434
for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
401435
auto subscription_it = subscriptions_.find(*it);
@@ -405,41 +439,47 @@ class IntraProcessManager
405439
auto subscription_base = subscription_it->second.lock();
406440
if (subscription_base) {
407441
auto subscription = std::dynamic_pointer_cast<
408-
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType, Alloc, Deleter>
442+
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
443+
PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
409444
>(subscription_base);
410445
if (nullptr == subscription) {
411446
auto ros_message_subscription = std::dynamic_pointer_cast<
412-
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType, Alloc, Deleter>
447+
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType,
448+
ROSMessageTypeAllocator, ROSMessageTypeDeleter>
413449
>(subscription_base);
414450

415451
if (nullptr == ros_message_subscription) {
416452
throw std::runtime_error(
417-
"failed to dynamic cast SubscriptionIntraProcessBase to "
453+
"--failed to dynamic cast SubscriptionIntraProcessBase to "
418454
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
419455
"can happen when the publisher and subscription use different "
420456
"allocator types, which is not supported");
421457
} else {
422458
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
423-
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
424-
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
425-
ROSMessageTypeAllocatorTraits::construct(allocator, ptr);
426-
Deleter deleter = message.get_deleter();
427-
auto ros_msg = std::unique_ptr<ROSMessageType, Deleter>(ptr, deleter);
428-
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(message, ros_msg);
459+
ROSMessageTypeAllocator ros_message_alloc(allocator);
460+
auto ptr = ros_message_alloc.allocate(1);
461+
ros_message_alloc.construct(ptr);
462+
ROSMessageTypeDeleter deleter;
463+
allocator::set_allocator_for_deleter(&deleter, &allocator);
464+
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *ptr);
465+
auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
429466
ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
430467
} else {
431-
if (std::next(it) == subscription_ids.end()) {
432-
// If this is the last subscription, give up ownership
433-
ros_message_subscription->provide_intra_process_message(std::move(message));
434-
} else {
435-
// Copy the message since we have additional subscriptions to serve
436-
MessageUniquePtr copy_message;
437-
Deleter deleter = message.get_deleter();
438-
auto ptr = MessageAllocTraits::allocate(allocator, 1);
439-
MessageAllocTraits::construct(allocator, ptr, *message);
440-
copy_message = MessageUniquePtr(ptr, deleter);
441-
442-
ros_message_subscription->provide_intra_process_message(std::move(copy_message));
468+
if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
469+
if (std::next(it) == subscription_ids.end()) {
470+
// If this is the last subscription, give up ownership
471+
ros_message_subscription->provide_intra_process_message(std::move(message));
472+
} else {
473+
// Copy the message since we have additional subscriptions to serve
474+
MessageUniquePtr copy_message;
475+
Deleter deleter = message.get_deleter();
476+
allocator::set_allocator_for_deleter(&deleter, &allocator);
477+
auto ptr = MessageAllocTraits::allocate(allocator, 1);
478+
MessageAllocTraits::construct(allocator, ptr, *message);
479+
copy_message = MessageUniquePtr(ptr, deleter);
480+
481+
ros_message_subscription->provide_intra_process_message(std::move(copy_message));
482+
}
443483
}
444484
}
445485
}

rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp

+13-9
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,24 @@ namespace experimental
4141
template<
4242
typename MessageT,
4343
typename SubscribedType,
44-
typename SubscribedTypeAlloc = std::allocator<void>,
45-
typename SubscribedTypeDeleter = std::default_delete<SubscribedType>
44+
typename SubscribedTypeAlloc = std::allocator<SubscribedType>,
45+
typename SubscribedTypeDeleter = std::default_delete<SubscribedType>,
46+
typename ROSMessageType = SubscribedType,
47+
typename Alloc = std::allocator<void>
4648
>
4749
class SubscriptionIntraProcess
4850
: public SubscriptionIntraProcessBuffer<
4951
SubscribedType,
5052
SubscribedTypeAlloc,
51-
SubscribedTypeDeleter
53+
SubscribedTypeDeleter,
54+
ROSMessageType
5255
>
5356
{
5457
using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer<
5558
SubscribedType,
5659
SubscribedTypeAlloc,
57-
SubscribedTypeDeleter
60+
SubscribedTypeDeleter,
61+
ROSMessageType
5862
>;
5963

6064
public:
@@ -68,15 +72,15 @@ class SubscriptionIntraProcess
6872
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;
6973

7074
SubscriptionIntraProcess(
71-
AnySubscriptionCallback<MessageT, SubscribedTypeAlloc> callback,
72-
std::shared_ptr<SubscribedTypeAlloc> allocator,
75+
AnySubscriptionCallback<MessageT, Alloc> callback,
76+
std::shared_ptr<Alloc> allocator,
7377
rclcpp::Context::SharedPtr context,
7478
const std::string & topic_name,
7579
const rclcpp::QoS & qos_profile,
7680
rclcpp::IntraProcessBufferType buffer_type)
7781
: SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
78-
SubscribedTypeDeleter>(
79-
allocator,
82+
SubscribedTypeDeleter, ROSMessageType>(
83+
std::make_shared<SubscribedTypeAlloc>(*allocator),
8084
context,
8185
topic_name,
8286
qos_profile,
@@ -154,7 +158,7 @@ class SubscriptionIntraProcess
154158
shared_ptr.reset();
155159
}
156160

157-
AnySubscriptionCallback<MessageT, SubscribedTypeAlloc> any_callback_;
161+
AnySubscriptionCallback<MessageT, Alloc> any_callback_;
158162
};
159163

160164
} // namespace experimental

rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp

+9-7
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,16 @@ namespace experimental
3838

3939
template<
4040
typename SubscribedType,
41-
typename Alloc = std::allocator<void>,
41+
typename Alloc = std::allocator<SubscribedType>,
4242
typename Deleter = std::default_delete<SubscribedType>,
4343
/// MessageT::ros_message_type if MessageT is a TypeAdapter,
4444
/// otherwise just MessageT.
45-
typename ROSMessageType = typename rclcpp::TypeAdapter<SubscribedType>::ros_message_type
45+
typename ROSMessageType = SubscribedType
4646
>
47-
class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMessageType, Alloc,
48-
Deleter>
47+
class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMessageType,
48+
typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
49+
allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
50+
ROSMessageType>>
4951
{
5052
public:
5153
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer)
@@ -76,8 +78,8 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
7678
const std::string & topic_name,
7779
const rclcpp::QoS & qos_profile,
7880
rclcpp::IntraProcessBufferType buffer_type)
79-
: ROSMessageIntraProcessBuffer<ROSMessageType, Alloc, Deleter>(context, topic_name,
80-
qos_profile),
81+
: ROSMessageIntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>(
82+
context, topic_name, qos_profile),
8183
subscribed_type_allocator_(*allocator)
8284
{
8385
allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
@@ -87,7 +89,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
8789
SubscribedTypeDeleter>(
8890
buffer_type,
8991
qos_profile,
90-
allocator);
92+
std::make_shared<Alloc>(subscribed_type_allocator_));
9193
}
9294

9395
bool

0 commit comments

Comments
 (0)