diff --git a/rmw_fastrtps_cpp/CMakeLists.txt b/rmw_fastrtps_cpp/CMakeLists.txt index 2a49fb23d..3e2bfd471 100644 --- a/rmw_fastrtps_cpp/CMakeLists.txt +++ b/rmw_fastrtps_cpp/CMakeLists.txt @@ -34,6 +34,7 @@ find_package(rosidl_generator_cpp REQUIRED) find_package(rosidl_typesupport_introspection_c REQUIRED) find_package(rosidl_typesupport_introspection_cpp REQUIRED) +ament_export_dependencies(rcutils) ament_export_dependencies(rmw) ament_export_dependencies(rosidl_generator_c) ament_export_dependencies(rosidl_generator_cpp) diff --git a/rmw_fastrtps_cpp/package.xml b/rmw_fastrtps_cpp/package.xml index 7a496b564..4c314ea4a 100644 --- a/rmw_fastrtps_cpp/package.xml +++ b/rmw_fastrtps_cpp/package.xml @@ -24,6 +24,7 @@ fastcdr fastrtps + rcutils rmw rosidl_generator_c rosidl_generator_cpp diff --git a/rmw_fastrtps_cpp/src/functions.cpp b/rmw_fastrtps_cpp/src/functions.cpp index bf35dc442..dffd85723 100644 --- a/rmw_fastrtps_cpp/src/functions.cpp +++ b/rmw_fastrtps_cpp/src/functions.cpp @@ -22,7 +22,9 @@ #include #include -#include "rcutils/types/string_array.h" +#include "rcutils/split.h" +#include "rcutils/types.h" + #include "rmw/allocators.h" #include "rmw/rmw.h" #include "rmw/error_handling.h" @@ -63,6 +65,9 @@ #include "rosidl_typesupport_introspection_c/service_introspection.h" #include "rosidl_typesupport_introspection_c/visibility_control.h" +// uncomment the next line to enable debug prints +// #define DEBUG_LOGGING 1 + using MessageTypeSupport_c = rmw_fastrtps_cpp::MessageTypeSupport; using MessageTypeSupport_cpp = @@ -317,6 +322,36 @@ _deserialize_ros_message( return false; } +template +inline +rcutils_ret_t +_assign_partitions_to_attributes( + const char * const topic_name, const char * const prefix, AttributeT * attributes) +{ + rcutils_ret_t ret = RCUTILS_RET_ERROR; + // set topic and partitions + rcutils_string_array_t name_tokens = rcutils_get_zero_initialized_string_array(); + name_tokens = rcutils_split_last(topic_name, '/'); + if (name_tokens.size == 1) { + attributes->qos.m_partition.push_back(prefix); + attributes->topic.topicName = name_tokens.data[0]; + ret = RCUTILS_RET_OK; + } else if (name_tokens.size == 2) { + attributes->qos.m_partition.push_back( + (std::string(prefix) + "/" + name_tokens.data[0]).c_str()); + attributes->topic.topicName = name_tokens.data[1]; + ret = RCUTILS_RET_OK; + } else { + RMW_SET_ERROR_MSG("Malformed topic name"); + ret = RCUTILS_RET_ERROR; + } + if (rcutils_string_array_fini(&name_tokens) != RCUTILS_RET_OK) { + fprintf(stderr, "Failed to destroy the token string array\n"); + ret = RCUTILS_RET_ERROR; + } + return ret; +} + class ClientListener; typedef struct CustomWaitsetInfo @@ -357,6 +392,7 @@ class topicnamesandtypesReaderListener : public ReaderListener void onNewCacheChangeAdded(RTPSReader * reader, const CacheChange_t * const change) { (void)reader; + WriterProxyData proxyData; CDRMessage_t tempMsg; tempMsg.msg_endian = change->serializedPayload.encapsulation == @@ -365,14 +401,23 @@ class topicnamesandtypesReaderListener : public ReaderListener memcpy(tempMsg.buffer, change->serializedPayload.data, tempMsg.length); if (proxyData.readFromCDRMessage(&tempMsg)) { bool mapModified = false; + // TODO(wjwwood): remove this logic and replace with check for + // a single partition which is prefixed with the ROS specific + // prefix. + auto partition_str = std::string(""); + // don't use std::accumulate - schlemiel O(n2) + for (const auto & partition : proxyData.m_qos.m_partition.getNames()) { + partition_str += partition; + } mapmutex.lock(); - auto it = topicNtypes.find(proxyData.topicName()); + auto fqdn = partition_str + "/" + proxyData.topicName(); + auto it = topicNtypes.find(fqdn); if (change->kind == ALIVE) { if ( it == topicNtypes.end() || it->second.find(proxyData.typeName()) == it->second.end()) { - topicNtypes[proxyData.topicName()].insert(proxyData.typeName()); + topicNtypes[fqdn].insert(proxyData.typeName()); mapModified = true; } } else { @@ -380,7 +425,7 @@ class topicnamesandtypesReaderListener : public ReaderListener it != topicNtypes.end() && it->second.find(proxyData.typeName()) != it->second.end()) { - topicNtypes[proxyData.topicName()].erase(proxyData.typeName()); + topicNtypes[fqdn].erase(proxyData.typeName()); mapModified = true; } } @@ -493,7 +538,11 @@ typedef struct CustomParticipantInfo extern "C" { -const char * const eprosima_fastrtps_identifier = "rmw_fastrtps_cpp"; +// static for internal linkage +static const char * const eprosima_fastrtps_identifier = "rmw_fastrtps_cpp"; +static const char * const ros_topic_prefix = "rt"; +static const char * const ros_service_requester_prefix = "rq"; +static const char * const ros_service_response_prefix = "rr"; const char * rmw_get_implementation_identifier() { @@ -811,26 +860,37 @@ rmw_publisher_t * rmw_create_publisher(const rmw_node_t * node, const rosidl_message_type_support_t * type_supports, const char * topic_name, const rmw_qos_profile_t * qos_policies) { - rmw_publisher_t * rmw_publisher = nullptr; - const GUID_t * guid = nullptr; - - assert(node); - assert(type_supports); - assert(topic_name); - assert(qos_policies); + if (!node) { + RMW_SET_ERROR_MSG("node handle is null"); + return NULL; + } if (node->implementation_identifier != eprosima_fastrtps_identifier) { RMW_SET_ERROR_MSG("node handle not from this implementation"); return NULL; } - CustomParticipantInfo * impl = static_cast(node->data); + if (!topic_name || strlen(topic_name) == 0) { + RMW_SET_ERROR_MSG("publisher topic is null or empty string"); + } + + if (!qos_policies) { + RMW_SET_ERROR_MSG("qos_profile is null"); + return NULL; + } + + const CustomParticipantInfo * impl = static_cast(node->data); if (!impl) { RMW_SET_ERROR_MSG("node impl is null"); return NULL; } Participant * participant = impl->participant; + if (!participant) { + RMW_SET_ERROR_MSG("participant handle is null"); + return NULL; + } + const rosidl_message_type_support_t * type_support = get_message_typesupport_handle( type_supports, rosidl_typesupport_introspection_c__identifier); if (!type_support) { @@ -842,11 +902,17 @@ rmw_publisher_t * rmw_create_publisher(const rmw_node_t * node, } } - CustomPublisherInfo * info = new CustomPublisherInfo(); + CustomPublisherInfo * info = nullptr; + rmw_publisher_t * rmw_publisher = nullptr; + PublisherAttributes publisherParam; + const GUID_t * guid = nullptr; + + // TODO(karsten1987) Verify consequences for std::unique_ptr? + info = new CustomPublisherInfo(); info->typesupport_identifier_ = type_support->typesupport_identifier; - std::string type_name = _create_type_name(type_support->data, "msg", - info->typesupport_identifier_); + std::string type_name = _create_type_name( + type_support->data, "msg", info->typesupport_identifier_); if (!Domain::getRegisteredType(participant, type_name.c_str(), reinterpret_cast(&info->type_support_))) { @@ -855,12 +921,16 @@ rmw_publisher_t * rmw_create_publisher(const rmw_node_t * node, _register_type(participant, info->type_support_, info->typesupport_identifier_); } - PublisherAttributes publisherParam; - publisherParam.topic.topicKind = NO_KEY; - publisherParam.topic.topicDataType = type_name; - publisherParam.topic.topicName = topic_name; publisherParam.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE; publisherParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + publisherParam.topic.topicKind = NO_KEY; + publisherParam.topic.topicDataType = type_name; + rcutils_ret_t ret = _assign_partitions_to_attributes( + topic_name, ros_topic_prefix, &publisherParam); + if (ret != RCUTILS_RET_OK) { + // error msg already set + goto fail; + } // 1 Heartbeat every 10ms // publisherParam.times.heartbeatPeriod.seconds = 0; @@ -871,6 +941,7 @@ rmw_publisher_t * rmw_create_publisher(const rmw_node_t * node, // publisherParam.throughputController = throughputController; if (!get_datawriter_qos(*qos_policies, publisherParam)) { + RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } @@ -889,9 +960,17 @@ rmw_publisher_t * rmw_create_publisher(const rmw_node_t * node, memset(info->publisher_gid.data, 0, RMW_GID_STORAGE_SIZE); guid = &info->publisher_->getGuid(); + if (!guid) { + RMW_SET_ERROR_MSG("no guid found for publisher"); + goto fail; + } memcpy(info->publisher_gid.data, guid, sizeof(eprosima::fastrtps::rtps::GUID_t)); - rmw_publisher = new rmw_publisher_t; + rmw_publisher = rmw_publisher_allocate(); + if (!rmw_publisher) { + RMW_SET_ERROR_MSG("failed to allocate publisher"); + goto fail; + } rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier; rmw_publisher->data = info; rmw_publisher->topic_name = reinterpret_cast(new char[strlen(topic_name) + 1]); @@ -899,11 +978,15 @@ rmw_publisher_t * rmw_create_publisher(const rmw_node_t * node, return rmw_publisher; fail: - if (info != nullptr) { + if (info) { _delete_typesupport(info->type_support_, info->typesupport_identifier_); delete info; } + if (rmw_publisher) { + rmw_publisher_free(rmw_publisher); + } + return NULL; } @@ -1069,26 +1152,37 @@ rmw_subscription_t * rmw_create_subscription(const rmw_node_t * node, const rosidl_message_type_support_t * type_supports, const char * topic_name, const rmw_qos_profile_t * qos_policies, bool ignore_local_publications) { - (void)ignore_local_publications; - rmw_subscription_t * subscription = nullptr; - - assert(node); - assert(type_supports); - assert(topic_name); - assert(qos_policies); + if (!node) { + RMW_SET_ERROR_MSG("node handle is null"); + return NULL; + } if (node->implementation_identifier != eprosima_fastrtps_identifier) { RMW_SET_ERROR_MSG("node handle not from this implementation"); return NULL; } - CustomParticipantInfo * impl = static_cast(node->data); + if (!topic_name || strlen(topic_name) == 0) { + RMW_SET_ERROR_MSG("publisher topic is null or empty string"); + } + + if (!qos_policies) { + RMW_SET_ERROR_MSG("qos_profile is null"); + return NULL; + } + + const CustomParticipantInfo * impl = static_cast(node->data); if (!impl) { RMW_SET_ERROR_MSG("node impl is null"); return NULL; } Participant * participant = impl->participant; + if (!participant) { + RMW_SET_ERROR_MSG("participant handle is null"); + return NULL; + } + const rosidl_message_type_support_t * type_support = get_message_typesupport_handle( type_supports, rosidl_typesupport_introspection_c__identifier); if (!type_support) { @@ -1100,12 +1194,16 @@ rmw_subscription_t * rmw_create_subscription(const rmw_node_t * node, } } - CustomSubscriberInfo * info = new CustomSubscriberInfo(); + (void)ignore_local_publications; + CustomSubscriberInfo * info = nullptr; + rmw_subscription_t * rmw_subscription = nullptr; + SubscriberAttributes subscriberParam; + + info = new CustomSubscriberInfo(); info->typesupport_identifier_ = type_support->typesupport_identifier; std::string type_name = _create_type_name( type_support->data, "msg", info->typesupport_identifier_); - if (!Domain::getRegisteredType(participant, type_name.c_str(), reinterpret_cast(&info->type_support_))) { @@ -1114,13 +1212,18 @@ rmw_subscription_t * rmw_create_subscription(const rmw_node_t * node, _register_type(participant, info->type_support_, info->typesupport_identifier_); } - SubscriberAttributes subscriberParam; + subscriberParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; subscriberParam.topic.topicKind = NO_KEY; subscriberParam.topic.topicDataType = type_name; - subscriberParam.topic.topicName = topic_name; - subscriberParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + rcutils_ret_t ret = _assign_partitions_to_attributes( + topic_name, ros_topic_prefix, &subscriberParam); + if (ret != RCUTILS_RET_OK) { + // error msg already set + goto fail; + } if (!get_datareader_qos(*qos_policies, subscriberParam)) { + RMW_SET_ERROR_MSG("failed to get datareader qos"); goto fail; } @@ -1132,13 +1235,17 @@ rmw_subscription_t * rmw_create_subscription(const rmw_node_t * node, goto fail; } - subscription = new rmw_subscription_t; - subscription->implementation_identifier = eprosima_fastrtps_identifier; - subscription->data = info; - subscription->topic_name = reinterpret_cast(new char[strlen(topic_name) + 1]); - memcpy(const_cast(subscription->topic_name), topic_name, strlen(topic_name) + 1); + rmw_subscription = rmw_subscription_allocate(); + if (!rmw_subscription) { + RMW_SET_ERROR_MSG("failed to allocate subscription"); + goto fail; + } + rmw_subscription->implementation_identifier = eprosima_fastrtps_identifier; + rmw_subscription->data = info; + rmw_subscription->topic_name = reinterpret_cast(new char[strlen(topic_name) + 1]); + memcpy(const_cast(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1); + return rmw_subscription; - return subscription; fail: if (info != nullptr) { @@ -1148,6 +1255,10 @@ rmw_subscription_t * rmw_create_subscription(const rmw_node_t * node, delete info; } + if (rmw_subscription) { + rmw_subscription_free(rmw_subscription); + } + return NULL; } @@ -1549,26 +1660,36 @@ rmw_client_t * rmw_create_client(const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, const char * service_name, const rmw_qos_profile_t * qos_policies) { - CustomClientInfo * info = nullptr; - rmw_client_t * client = nullptr; - - assert(node); - assert(type_supports); - assert(service_name); - assert(qos_policies); + if (!node) { + RMW_SET_ERROR_MSG("node handle is null"); + return NULL; + } if (node->implementation_identifier != eprosima_fastrtps_identifier) { RMW_SET_ERROR_MSG("node handle not from this implementation"); return NULL; } - CustomParticipantInfo * impl = static_cast(node->data); + if (!service_name || strlen(service_name) == 0) { + RMW_SET_ERROR_MSG("publisher topic is null or empty string"); + } + + if (!qos_policies) { + RMW_SET_ERROR_MSG("qos_profile is null"); + return NULL; + } + + const CustomParticipantInfo * impl = static_cast(node->data); if (!impl) { RMW_SET_ERROR_MSG("node impl is null"); return NULL; } Participant * participant = impl->participant; + if (!participant) { + RMW_SET_ERROR_MSG("participant handle is null"); + return NULL; + } const rosidl_service_type_support_t * type_support = get_service_typesupport_handle( type_supports, rosidl_typesupport_introspection_c__identifier); @@ -1581,6 +1702,11 @@ rmw_client_t * rmw_create_client(const rmw_node_t * node, } } + CustomClientInfo * info = nullptr; + SubscriberAttributes subscriberParam; + PublisherAttributes publisherParam; + rmw_client_t * rmw_client = nullptr; + info = new CustomClientInfo(); info->participant_ = participant; info->typesupport_identifier_ = type_support->typesupport_identifier; @@ -1614,39 +1740,58 @@ rmw_client_t * rmw_create_client(const rmw_node_t * node, _register_type(participant, info->response_type_support_, info->typesupport_identifier_); } - SubscriberAttributes subscriberParam; - PublisherAttributes publisherParam; - subscriberParam.topic.topicKind = NO_KEY; subscriberParam.topic.topicDataType = response_type_name; - subscriberParam.topic.topicName = std::string(service_name) + "Reply"; subscriberParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + rcutils_ret_t ret = _assign_partitions_to_attributes( + service_name, ros_service_response_prefix, &subscriberParam); + if (ret != RCUTILS_RET_OK) { + // error msg already set + goto fail; + } + subscriberParam.topic.topicName += "Reply"; - if (!get_datareader_qos(*qos_policies, subscriberParam)) { + publisherParam.topic.topicKind = NO_KEY; + publisherParam.topic.topicDataType = request_type_name; + publisherParam.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + ret = _assign_partitions_to_attributes( + service_name, ros_service_requester_prefix, &publisherParam); + if (ret != RCUTILS_RET_OK) { + // error msg already set goto fail; } + publisherParam.topic.topicName += "Request"; - info->listener_ = new ClientListener(info); - info->response_subscriber_ = Domain::createSubscriber(participant, subscriberParam, - info->listener_); +#ifdef DEBUG_LOGGING + fprintf(stderr, "************ Client Details *********\n"); + fprintf(stderr, "Sub Topic %s\n", subscriberParam.topic.topicName.c_str()); + fprintf(stderr, "Sub Partition %s\n", subscriberParam.qos.m_partition.getNames()[0].c_str()); + fprintf(stderr, "Pub Topic %s\n", publisherParam.topic.topicName.c_str()); + fprintf(stderr, "Pub Partition %s\n", publisherParam.qos.m_partition.getNames()[0].c_str()); + fprintf(stderr, "***********\n"); +#endif + // Create Client Subscriber and set QoS + if (!get_datareader_qos(*qos_policies, subscriberParam)) { + RMW_SET_ERROR_MSG("failed to get datareader qos"); + goto fail; + } + info->listener_ = new ClientListener(info); + info->response_subscriber_ = + Domain::createSubscriber(participant, subscriberParam, info->listener_); if (!info->response_subscriber_) { RMW_SET_ERROR_MSG("create_client() could not create subscriber"); goto fail; } - publisherParam.topic.topicKind = NO_KEY; - publisherParam.topic.topicDataType = request_type_name; - publisherParam.topic.topicName = std::string(service_name) + "Request"; - publisherParam.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - + // Create Client Subscriber and set QoS if (!get_datawriter_qos(*qos_policies, publisherParam)) { + RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } - - info->request_publisher_ = Domain::createPublisher(participant, publisherParam, NULL); - + info->request_publisher_ = + Domain::createPublisher(participant, publisherParam, NULL); if (!info->request_publisher_) { RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); goto fail; @@ -1654,17 +1799,18 @@ rmw_client_t * rmw_create_client(const rmw_node_t * node, info->writer_guid_ = info->request_publisher_->getGuid(); - client = new rmw_client_t; - client->implementation_identifier = eprosima_fastrtps_identifier; - client->data = info; - client->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); - if (!client->service_name) { + rmw_client = rmw_client_allocate(); + rmw_client->implementation_identifier = eprosima_fastrtps_identifier; + rmw_client->data = info; + rmw_client->service_name = reinterpret_cast( + rmw_allocate(strlen(service_name) + 1)); + if (!rmw_client->service_name) { RMW_SET_ERROR_MSG("failed to allocate memory for client name"); goto fail; } - memcpy(const_cast(client->service_name), service_name, strlen(service_name) + 1); + memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); - return client; + return rmw_client; fail: @@ -1697,6 +1843,8 @@ rmw_client_t * rmw_create_client(const rmw_node_t * node, delete info; } + rmw_free(rmw_client); + return NULL; } @@ -1863,26 +2011,37 @@ rmw_service_t * rmw_create_service(const rmw_node_t * node, const rosidl_service_type_support_t * type_supports, const char * service_name, const rmw_qos_profile_t * qos_policies) { - CustomServiceInfo * info = nullptr; - rmw_service_t * service = nullptr; - - assert(node); - assert(type_supports); - assert(service_name); - assert(qos_policies); + if (!node) { + RMW_SET_ERROR_MSG("node handle is null"); + return NULL; + } if (node->implementation_identifier != eprosima_fastrtps_identifier) { RMW_SET_ERROR_MSG("node handle not from this implementation"); return NULL; } - CustomParticipantInfo * impl = static_cast(node->data); + if (!service_name || strlen(service_name) == 0) { + RMW_SET_ERROR_MSG("publisher topic is null or empty string"); + } + + if (!qos_policies) { + RMW_SET_ERROR_MSG("qos_profile is null"); + return NULL; + } + + const CustomParticipantInfo * impl = static_cast(node->data); if (!impl) { RMW_SET_ERROR_MSG("node impl is null"); return NULL; } Participant * participant = impl->participant; + if (!participant) { + RMW_SET_ERROR_MSG("participant handle is null"); + return NULL; + } + const rosidl_service_type_support_t * type_support = get_service_typesupport_handle( type_supports, rosidl_typesupport_introspection_c__identifier); if (!type_support) { @@ -1894,6 +2053,11 @@ rmw_service_t * rmw_create_service(const rmw_node_t * node, } } + CustomServiceInfo * info = nullptr; + SubscriberAttributes subscriberParam; + PublisherAttributes publisherParam; + rmw_service_t * rmw_service = nullptr; + info = new CustomServiceInfo(); info->participant_ = participant; info->typesupport_identifier_ = type_support->typesupport_identifier; @@ -1927,83 +2091,104 @@ rmw_service_t * rmw_create_service(const rmw_node_t * node, _register_type(participant, info->response_type_support_, info->typesupport_identifier_); } - SubscriberAttributes subscriberParam; - PublisherAttributes publisherParam; - subscriberParam.topic.topicKind = NO_KEY; - subscriberParam.topic.topicDataType = request_type_name; - subscriberParam.topic.topicName = std::string(service_name) + "Request"; subscriberParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + subscriberParam.topic.topicDataType = request_type_name; + rcutils_ret_t ret = _assign_partitions_to_attributes( + service_name, ros_service_requester_prefix, &subscriberParam); + if (ret != RCUTILS_RET_OK) { + // error msg already set + goto fail; + } + subscriberParam.topic.topicName += "Request"; - if (!get_datareader_qos(*qos_policies, subscriberParam)) { + publisherParam.topic.topicKind = NO_KEY; + publisherParam.topic.topicDataType = response_type_name; + publisherParam.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE; + publisherParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; + ret = _assign_partitions_to_attributes( + service_name, ros_service_response_prefix, &publisherParam); + if (ret != RCUTILS_RET_OK) { + // error msg already set goto fail; } + publisherParam.topic.topicName += "Reply"; +#ifdef DEBUG_LOGGING + fprintf(stderr, "************ Service Details *********\n"); + fprintf(stderr, "Sub Topic %s\n", subscriberParam.topic.topicName.c_str()); + fprintf(stderr, "Sub Partition %s\n", subscriberParam.qos.m_partition.getNames()[0].c_str()); + fprintf(stderr, "Pub Topic %s\n", publisherParam.topic.topicName.c_str()); + fprintf(stderr, "Pub Partition %s\n", publisherParam.qos.m_partition.getNames()[0].c_str()); + fprintf(stderr, "***********\n"); +#endif + + // Create Service Subscriber and set QoS + if (!get_datareader_qos(*qos_policies, subscriberParam)) { + RMW_SET_ERROR_MSG("failed to get datareader qos"); + goto fail; + } info->listener_ = new ServiceListener(info); info->request_subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_); - if (!info->request_subscriber_) { RMW_SET_ERROR_MSG("create_client() could not create subscriber"); goto fail; } - publisherParam.topic.topicKind = NO_KEY; - publisherParam.topic.topicDataType = response_type_name; - publisherParam.topic.topicName = std::string(service_name) + "Reply"; - publisherParam.qos.m_publishMode.kind = ASYNCHRONOUS_PUBLISH_MODE; - publisherParam.historyMemoryPolicy = PREALLOCATED_WITH_REALLOC_MEMORY_MODE; - + // Create Service Publisher and set QoS if (!get_datawriter_qos(*qos_policies, publisherParam)) { + RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } - - info->response_publisher_ = Domain::createPublisher(participant, publisherParam, NULL); - + info->response_publisher_ = + Domain::createPublisher(participant, publisherParam, NULL); if (!info->response_publisher_) { RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); goto fail; } - service = new rmw_service_t; - service->implementation_identifier = eprosima_fastrtps_identifier; - service->data = info; - service->service_name = reinterpret_cast( + rmw_service = rmw_service_allocate(); + rmw_service->implementation_identifier = eprosima_fastrtps_identifier; + rmw_service->data = info; + rmw_service->service_name = reinterpret_cast( rmw_allocate(strlen(service_name) + 1)); - if (!service->service_name) { + if (!rmw_service->service_name) { RMW_SET_ERROR_MSG("failed to allocate memory for service name"); goto fail; } - memcpy(const_cast(service->service_name), service_name, strlen(service_name) + 1); + memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); - return service; + return rmw_service; fail: - if (info != nullptr) { - if (info->response_publisher_ != nullptr) { + if (info) { + if (info->response_publisher_) { Domain::removePublisher(info->response_publisher_); } - if (info->request_subscriber_ != nullptr) { + if (info->request_subscriber_) { Domain::removeSubscriber(info->request_subscriber_); } - if (info->listener_ != nullptr) { + if (info->listener_) { delete info->listener_; } - if (info->request_type_support_ != nullptr) { + if (info->request_type_support_) { _unregister_type(participant, info->request_type_support_, info->typesupport_identifier_); } - if (info->response_type_support_ != nullptr) { + if (info->response_type_support_) { _unregister_type(participant, info->response_type_support_, info->typesupport_identifier_); } delete info; } + rmw_free(rmw_service); + return NULL; } @@ -2494,6 +2679,15 @@ rmw_count_publishers( } else { *count = it->second.size(); } + +#ifdef DEBUG_LOGGING + fprintf(stderr, "looking for subscriber topic: %s\n", topic_name); + for (auto it : unfiltered_topics) { + fprintf(stderr, "available topic: %s\n", it.first.c_str()); + } + fprintf(stderr, "number of matches: %zu\n", *count); +#endif + return RMW_RET_OK; } @@ -2535,6 +2729,14 @@ rmw_count_subscribers( *count = it->second.size(); } +#ifdef DEBUG_LOGGING + fprintf(stderr, "looking for subscriber topic: %s\n", topic_name); + for (auto it : unfiltered_topics) { + fprintf(stderr, "available topic: %s\n", it.first.c_str()); + } + fprintf(stderr, "number of matches: %zu\n", *count); +#endif + return RMW_RET_OK; } @@ -2570,12 +2772,35 @@ rmw_service_server_is_available( return RMW_RET_ERROR; } - *is_available = false; + auto pub_topic_name = + client_info->request_publisher_->getAttributes().topic.getTopicName(); + auto pub_partitions = + client_info->request_publisher_->getAttributes().qos.m_partition.getNames(); + // every rostopic has exactly 1 partition field set + if (pub_partitions.size() != 1) { + fprintf(stderr, "Topic %s is not a ros topic\n", pub_topic_name.c_str()); + RMW_SET_ERROR_MSG((std::string(pub_topic_name) + " is a non-ros topic\n").c_str()); + return RMW_RET_ERROR; + } + auto pub_fqdn = pub_partitions[0] + "/" + pub_topic_name; + auto sub_topic_name = + client_info->response_subscriber_->getAttributes().topic.getTopicName(); + auto sub_partitions = + client_info->response_subscriber_->getAttributes().qos.m_partition.getNames(); + // every rostopic has exactly 1 partition field set + if (sub_partitions.size() != 1) { + fprintf(stderr, "Topic %s is not a ros topic\n", sub_topic_name.c_str()); + RMW_SET_ERROR_MSG((std::string(sub_topic_name) + " is a non-ros topic\n").c_str()); + return RMW_RET_ERROR; + } + auto sub_fqdn = sub_partitions[0] + "/" + sub_topic_name; + + *is_available = false; size_t number_of_request_subscribers = 0; rmw_ret_t ret = rmw_count_subscribers( node, - client_info->request_publisher_->getAttributes().topic.getTopicName().c_str(), + pub_fqdn.c_str(), &number_of_request_subscribers); if (ret != RMW_RET_OK) { // error string already set @@ -2589,7 +2814,7 @@ rmw_service_server_is_available( size_t number_of_response_publishers = 0; ret = rmw_count_publishers( node, - client_info->response_subscriber_->getAttributes().topic.getTopicName().c_str(), + sub_fqdn.c_str(), &number_of_response_publishers); if (ret != RMW_RET_OK) { // error string already set