From 4bbf528bbdabcfaaf0fdc7e9189d42e00af16e78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Wed, 8 Feb 2023 15:48:09 +0100 Subject: [PATCH] List subscribed topics when running topic list (#379) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Topic list and topic info Signed-off-by: Carlos Agüero --- .github/ci/packages.apt | 2 +- CMakeLists.txt | 4 +- docker/gz-transport/Dockerfile | 4 +- include/gz/transport/Discovery.hh | 82 +++++++++++++++++++++++++- include/gz/transport/HandlerStorage.hh | 9 ++- include/gz/transport/NodeShared.hh | 10 ++++ include/gz/transport/Packet.hh | 66 --------------------- include/gz/transport/TopicStorage.hh | 6 ++ src/HandlerStorage_TEST.cc | 2 + src/NodeShared.cc | 47 +++++++++++++++ src/Publisher.cc | 37 +++++++----- src/TopicStorage_TEST.cc | 7 ++- src/cmd/gz_TEST.cc | 43 ++++++++++++++ tutorials/02_installation.md | 2 +- 14 files changed, 229 insertions(+), 92 deletions(-) delete mode 100644 include/gz/transport/Packet.hh diff --git a/.github/ci/packages.apt b/.github/ci/packages.apt index fc9b97a5d..6e2c82feb 100644 --- a/.github/ci/packages.apt +++ b/.github/ci/packages.apt @@ -1,6 +1,6 @@ libgz-cmake3-dev libgz-math7-dev -libgz-msgs9-dev +libgz-msgs10-dev libgz-tools2-dev libgz-utils2-cli-dev libprotobuf-dev diff --git a/CMakeLists.txt b/CMakeLists.txt index 3127ade5c..94491f582 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,8 +86,8 @@ set(GZ_UTILS_VER ${gz-utils2_VERSION_MAJOR}) #-------------------------------------- # Find gz-msgs -gz_find_package(gz-msgs9 REQUIRED) -set(GZ_MSGS_VER ${gz-msgs9_VERSION_MAJOR}) +gz_find_package(gz-msgs10 REQUIRED) +set(GZ_MSGS_VER ${gz-msgs10_VERSION_MAJOR}) #-------------------------------------- # Find ifaddrs diff --git a/docker/gz-transport/Dockerfile b/docker/gz-transport/Dockerfile index f4af4ece1..3d3fc50c6 100644 --- a/docker/gz-transport/Dockerfile +++ b/docker/gz-transport/Dockerfile @@ -66,12 +66,12 @@ RUN sudo /bin/sh -c 'echo "deb [trusted=yes] http://packages.osrfoundation.org/g && sudo apt-get install -y \ libgz-cmake3-dev \ libgz-math7-dev \ - libgz-msgs9-dev \ + libgz-msgs10-dev \ libgz-utils2-cli-dev \ && sudo apt-get clean # Gazebo transport -RUN git clone https://github.com/gazebosim/gz-transport.git -b gz-transport12 \ +RUN git clone https://github.com/gazebosim/gz-transport.git -b gz-transport13 \ && cd gz-transport \ && mkdir build \ && cd build \ diff --git a/include/gz/transport/Discovery.hh b/include/gz/transport/Discovery.hh index 2226f990b..286a0520e 100644 --- a/include/gz/transport/Discovery.hh +++ b/include/gz/transport/Discovery.hh @@ -398,6 +398,14 @@ namespace gz return true; } + /// \brief Send the response to a SUBSCRIBERS_REQ message. + /// \param[in] _pub Information to send. + public: void SendSubscribersRep(const MessagePublisher &_pub) const + { + this->SendMsg( + DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub); + } + /// \brief Register a node from this process as a remote subscriber. /// \param[in] _pub Contains information about the subscriber. public: void Register(const MessagePublisher &_pub) const @@ -433,6 +441,17 @@ namespace gz return this->info.Publishers(_topic, _publishers); } + /// \brief Get all the subscribers' information known for a given topic. + /// \param[in] _topic Topic name. + /// \param[out] _subscribers All remote subscribers for this topic. + /// \return True if the topic is found and there is at least one publisher + public: bool RemoteSubscribers(const std::string &_topic, + Addresses_M &_subscribers) const + { + std::lock_guard lock(this->mutex); + return this->remoteSubscribers.Publishers(_topic, _subscribers); + } + /// \brief Unadvertise a new message. Broadcast a discovery /// message that will cancel all the discovery information for the topic /// advertised by a specific node. @@ -573,6 +592,15 @@ namespace gz this->unregistrationCb = _cb; } + /// \brief Register a callback to receive an event when a node requests + /// the list of remote subscribers. + /// \param[in] _cb Function callback. + public: void SubscribersCb(const std::function &_cb) + { + std::lock_guard lock(this->mutex); + this->subscribersCb = _cb; + } + /// \brief Print the current discovery state. public: void PrintCurrentState() const { @@ -615,13 +643,33 @@ namespace gz std::cout << "---------------" << std::endl; } - /// \brief Get the list of topics currently advertised in the network. + /// \brief Get the list of topics currently advertised and subscribed + /// in the network. /// \param[out] _topics List of advertised topics. - public: void TopicList(std::vector &_topics) const + public: void TopicList(std::vector &_topics) { + this->remoteSubscribers.Clear(); + + // Request the list of subscribers. + Publisher pub("", "", this->pUuid, "", AdvertiseOptions()); + this->SendMsg( + DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub); + this->WaitForInit(); std::lock_guard lock(this->mutex); this->info.TopicList(_topics); + + std::vector remoteSubs; + this->remoteSubscribers.TopicList(remoteSubs); + + // Add the remote subscribers + for (auto const &t : remoteSubs) + { + if (std::find(_topics.begin(), _topics.end(), t) == _topics.end()) + { + _topics.push_back(t); + } + } } /// \brief Check if ready/initialized. If not, then wait on the @@ -926,6 +974,7 @@ namespace gz DiscoveryCallback disconnectCb; DiscoveryCallback registerCb; DiscoveryCallback unregisterCb; + std::function subscribersReqCb; { std::lock_guard lock(this->mutex); this->activity[recvPUuid] = std::chrono::steady_clock::now(); @@ -933,6 +982,7 @@ namespace gz disconnectCb = this->disconnectionCb; registerCb = this->registrationCb; unregisterCb = this->unregistrationCb; + subscribersReqCb = this->subscribersCb; } switch (msg.type()) @@ -1011,6 +1061,25 @@ namespace gz break; } + case msgs::Discovery::SUBSCRIBERS_REQ: + { + if (subscribersReqCb) + subscribersReqCb(); + + break; + } + case msgs::Discovery::SUBSCRIBERS_REP: + { + // Save the remote subscriber. + Pub publisher; + publisher.SetFromDiscovery(msg); + + { + std::lock_guard lock(this->mutex); + this->remoteSubscribers.AddPublisher(publisher); + } + break; + } case msgs::Discovery::NEW_CONNECTION: { // Read the rest of the fields. @@ -1114,6 +1183,7 @@ namespace gz discoveryMsg.set_version(this->Version()); discoveryMsg.set_type(_type); discoveryMsg.set_process_uuid(this->pUuid); + _pub.FillDiscovery(discoveryMsg); switch (_type) { @@ -1132,6 +1202,8 @@ namespace gz } case msgs::Discovery::HEARTBEAT: case msgs::Discovery::BYE: + case msgs::Discovery::SUBSCRIBERS_REQ: + case msgs::Discovery::SUBSCRIBERS_REP: break; default: std::cerr << "Discovery::SendMsg() error: Unrecognized message" @@ -1450,9 +1522,15 @@ namespace gz /// \brief Callback executed when a new remote subscriber is unregistered. private: DiscoveryCallback unregistrationCb; + /// \brief Callback executed when a SUBSCRIBERS_REQ message is received. + private: std::function subscribersCb; + /// \brief Addressing information. private: TopicStorage info; + /// \brief Remote subscribers. + private: TopicStorage remoteSubscribers; + /// \brief Activity information. Every time there is a message from a /// remote node, its activity information is updated. If we do not hear /// from a node in a while, its entries in 'info' will be invalided. The diff --git a/include/gz/transport/HandlerStorage.hh b/include/gz/transport/HandlerStorage.hh index d00d8056c..37d3f9f7f 100644 --- a/include/gz/transport/HandlerStorage.hh +++ b/include/gz/transport/HandlerStorage.hh @@ -45,7 +45,7 @@ namespace gz using UUIDHandler_M = std::map>; using UUIDHandler_Collection_M = std::map; - /// \brief key is a topic name and value is UUIDHandler_M + /// \brief key is a topic name and value is UUIDHandler_Collection_M using TopicServiceCalls_M = std::map; @@ -159,6 +159,13 @@ namespace gz return true; } + /// \brief Get a reference to all the handlers. + /// \return All the handlers. + public: TopicServiceCalls_M &AllHandlers() + { + return this->data; + } + /// \brief Add a request handler to a topic. A request handler stores /// the callback and types associated to a service call request. /// \param[in] _topic Topic name. diff --git a/include/gz/transport/NodeShared.hh b/include/gz/transport/NodeShared.hh index 788458072..7d97c93ad 100644 --- a/include/gz/transport/NodeShared.hh +++ b/include/gz/transport/NodeShared.hh @@ -218,6 +218,9 @@ namespace gz /// \param[in] _pub Information of the remote subscriber. public: void OnEndRegistration(const MessagePublisher &_pub); + /// \brief Callback executed when a SUBSCRIBERS request is received. + public: void OnSubscribers(); + /// \brief Pass through to bool Publishers(const std::string &_topic, /// Addresses_M &_publishers) const /// \param[in] _topic Service name. @@ -395,6 +398,13 @@ namespace gz const std::string &_fullyQualifiedTopic, const std::string &_nUuid); + /// \brief Convert all the HandlerStorages into a vector of publishers. + /// \param[in] _addr The pub/sub address. + /// \param[in] _pUuid The process UUID. + /// \return The vector of message publishers. + public: std::vector Convert(const std::string &_addr, + const std::string &_pUuid); + /// \brief Normal local subscriptions. public: HandlerStorage normal; diff --git a/include/gz/transport/Packet.hh b/include/gz/transport/Packet.hh deleted file mode 100644 index ff977d5b6..000000000 --- a/include/gz/transport/Packet.hh +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) 2014 Open Source Robotics Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * -*/ - -#ifndef GZ_TRANSPORT_PACKET_HH_ -#define GZ_TRANSPORT_PACKET_HH_ - -#include -#include -#include -#include - -#include "gz/transport/config.hh" -#include "gz/transport/Export.hh" -#include "gz/transport/Publisher.hh" - -// This whole file is deprecated in version 8 of Gazebo Transport. Please -// remove this file in Version 9. - -namespace gz -{ - namespace transport - { - // Inline bracket to help doxygen filtering. - inline namespace GZ_TRANSPORT_VERSION_NAMESPACE { - // - // Message types. - static const uint8_t Uninitialized = 0; - static const uint8_t AdvType = 1; - static const uint8_t SubType = 2; - static const uint8_t UnadvType = 3; - static const uint8_t HeartbeatType = 4; - static const uint8_t ByeType = 5; - static const uint8_t NewConnection = 6; - static const uint8_t EndConnection = 7; - - // Flag set when a discovery message is relayed. - static const uint16_t FlagRelay = 0b000000000000'0001; - // Flag set when we want to avoid to relay a discovery message. - // This is used to avoid loops. - static const uint16_t FlagNoRelay = 0b000000000000'0010; - - /// \brief Used for debugging the message type received/send. - static const std::vector MsgTypesStr = - { - "UNINITIALIZED", "ADVERTISE", "SUBSCRIBE", "UNADVERTISE", "HEARTBEAT", - "BYE", "NEW_CONNECTION", "END_CONNECTION" - }; - } - } -} - -#endif diff --git a/include/gz/transport/TopicStorage.hh b/include/gz/transport/TopicStorage.hh index b1741ebed..8cdfa557e 100644 --- a/include/gz/transport/TopicStorage.hh +++ b/include/gz/transport/TopicStorage.hh @@ -362,6 +362,12 @@ namespace gz } } + /// \brief Clear the content. + public: void Clear() + { + this->data.clear(); + } + /// \brief The keys are topics. The values are another map, where the key /// is the process UUID and the value a vector of publishers. private: std::mapfirst, nUuid1); + EXPECT_EQ(1u, reps.AllHandlers().size()); reset(); diff --git a/src/NodeShared.cc b/src/NodeShared.cc index 33219c95d..196854fab 100644 --- a/src/NodeShared.cc +++ b/src/NodeShared.cc @@ -353,6 +353,9 @@ NodeShared::NodeShared() this->dataPtr->msgDiscovery->UnregistrationsCb( std::bind(&NodeShared::OnEndRegistration, this, std::placeholders::_1)); + this->dataPtr->msgDiscovery->SubscribersCb( + std::bind(&NodeShared::OnSubscribers, this)); + // Set the callback to notify svc discovery updates (new services). this->dataPtr->srvDiscovery->ConnectionsCb( std::bind(&NodeShared::OnNewSrvConnection, this, std::placeholders::_1)); @@ -1391,6 +1394,18 @@ void NodeShared::OnEndRegistration(const MessagePublisher &_pub) this->remoteSubscribers.DelPublisherByNode(topic, procUuid, nodeUuid); } +////////////////////////////////////////////////// +void NodeShared::OnSubscribers() +{ + // Get the list of local subscribers. + std::lock_guard lock(this->mutex); + auto pubs = this->localSubscribers.Convert(this->myAddress, this->pUuid); + + // Reply to the SUBSCRIBERS_REQ with multiple SUBSCRIBERS_REP. + for (auto const &publisher : pubs) + this->dataPtr->msgDiscovery->SendSubscribersRep(publisher); +} + ////////////////////////////////////////////////// bool NodeShared::InitializeSockets() { @@ -1660,6 +1675,38 @@ bool NodeShared::HandlerWrapper::RemoveHandlersForNode( return removed; } +////////////////////////////////////////////////// +std::vector NodeShared::HandlerWrapper::Convert( + const std::string &_addr, const std::string &_pUuid) +{ + std::vector res; + + for (const auto &[topic, handlerCollection] : this->normal.AllHandlers()) + { + for (const auto &[nUuid, handlerNode] : handlerCollection) + { + for (const auto &[hUuid, handler] : handlerNode) + { + res.push_back(MessagePublisher(topic, _addr, "", _pUuid, nUuid, + handler->TypeName(), AdvertiseMessageOptions())); + } + } + } + + for (const auto &[topic, handlerCollection] : this->raw.AllHandlers()) + { + for (const auto &[nUuid, handlerNode] : handlerCollection) + { + for (const auto &[hUuid, handler] : handlerNode) + { + res.push_back(MessagePublisher(topic, _addr, "", _pUuid, nUuid, + handler->TypeName(), AdvertiseMessageOptions())); + } + } + } + + return res; +} ////////////////////////////////////////////////// void NodeSharedPrivate::SecurityOnNewConnection() diff --git a/src/Publisher.cc b/src/Publisher.cc index a2a86fc1c..9292b2397 100644 --- a/src/Publisher.cc +++ b/src/Publisher.cc @@ -128,23 +128,28 @@ void Publisher::FillDiscovery(msgs::Discovery &_msg) const ////////////////////////////////////////////////// void Publisher::SetFromDiscovery(const msgs::Discovery &_msg) { - this->topic = _msg.pub().topic(); - this->addr = _msg.pub().address(); - this->pUuid = _msg.pub().process_uuid(); - this->nUuid = _msg.pub().node_uuid(); - - switch (_msg.pub().scope()) + if (_msg.has_sub()) + this->topic = _msg.sub().topic(); + else if (_msg.has_pub()) { - case msgs::Discovery::Publisher::PROCESS: - this->opts.SetScope(Scope_t::PROCESS); - break; - case msgs::Discovery::Publisher::HOST: - this->opts.SetScope(Scope_t::HOST); - break; - default: - case msgs::Discovery::Publisher::ALL: - this->opts.SetScope(Scope_t::ALL); - break; + this->topic = _msg.pub().topic(); + this->addr = _msg.pub().address(); + this->pUuid = _msg.pub().process_uuid(); + this->nUuid = _msg.pub().node_uuid(); + + switch (_msg.pub().scope()) + { + case msgs::Discovery::Publisher::PROCESS: + this->opts.SetScope(Scope_t::PROCESS); + break; + case msgs::Discovery::Publisher::HOST: + this->opts.SetScope(Scope_t::HOST); + break; + default: + case msgs::Discovery::Publisher::ALL: + this->opts.SetScope(Scope_t::ALL); + break; + } } } diff --git a/src/TopicStorage_TEST.cc b/src/TopicStorage_TEST.cc index 65702836e..579c6d484 100644 --- a/src/TopicStorage_TEST.cc +++ b/src/TopicStorage_TEST.cc @@ -411,11 +411,16 @@ TEST(TopicStorageTest, TopicList) std::vector topics; test.TopicList(topics); - EXPECT_EQ(topics.size(), 2u); + EXPECT_EQ(2u, topics.size()); EXPECT_TRUE(std::find(topics.begin(), topics.end(), g_topic1) != topics.end()); EXPECT_TRUE(std::find(topics.begin(), topics.end(), g_topic2) != topics.end()); + + test.Clear(); + topics.clear(); + test.TopicList(topics); + EXPECT_EQ(0u, topics.size()); } ////////////////////////////////////////////////// diff --git a/src/cmd/gz_TEST.cc b/src/cmd/gz_TEST.cc index 4babd53e0..fcdee8a7e 100644 --- a/src/cmd/gz_TEST.cc +++ b/src/cmd/gz_TEST.cc @@ -77,6 +77,19 @@ void topicCB(const msgs::StringMsg &_msg) g_topicCBStr = _msg.data(); } +////////////////////////////////////////////////// +/// \brief A generic callback. +void genericCb(const transport::ProtoMsg &/*_msg*/) +{ +} + +////////////////////////////////////////////////// +/// \brief A raw callback. +void cbRaw(const char * /*_msgData*/, const size_t /*_size*/, + const transport::MessageInfo &/*_info*/) +{ +} + ////////////////////////////////////////////////// /// \brief Check 'gz topic -l' running the advertiser on a different process. TEST(gzTest, GZ_UTILS_TEST_DISABLED_ON_MAC(TopicList)) @@ -108,6 +121,36 @@ TEST(gzTest, GZ_UTILS_TEST_DISABLED_ON_MAC(TopicList)) testing::waitAndCleanupFork(pi); } +////////////////////////////////////////////////// +/// \brief Check 'gz topic -l' running a subscriber on a different process. +TEST(gzTest, TopicListSub) +{ + transport::Node node; + node.Subscribe("/foo", topicCB); + node.Subscribe("/bar", genericCb); + node.SubscribeRaw("/baz", cbRaw, msgs::StringMsg().GetTypeName()); + node.Subscribe("/no", topicCB); + node.Unsubscribe("/no"); + + // Check the 'gz topic -l' command. + std::string gz = std::string(GZ_PATH); + + unsigned int retries = 0u; + bool topicFound = false; + + while (!topicFound && retries++ < 10u) + { + std::string output = custom_exec_str(gz + " topic -l " + g_gzVersion); + topicFound = output.find("/foo\n") != std::string::npos; + topicFound &= output.find("/bar\n") != std::string::npos; + topicFound &= output.find("/baz\n") != std::string::npos; + topicFound &= output.find("/no\n") == std::string::npos; + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + } + + EXPECT_TRUE(topicFound); +} + ////////////////////////////////////////////////// /// \brief Check 'gz topic -i' running the advertiser on a different process. TEST(gzTest, TopicInfo) diff --git a/tutorials/02_installation.md b/tutorials/02_installation.md index 2158d9e39..6c7502596 100644 --- a/tutorials/02_installation.md +++ b/tutorials/02_installation.md @@ -94,7 +94,7 @@ sudo apt-get remove libgz-transport.*-dev Install prerequisites. A clean Ubuntu system will need: ``` -sudo apt-get install git cmake pkg-config python ruby-ronn libprotoc-dev libprotobuf-dev protobuf-compiler uuid-dev libzmq3-dev libgz-msgs9-dev libgz-utils2-cli-dev +sudo apt-get install git cmake pkg-config python ruby-ronn libprotoc-dev libprotobuf-dev protobuf-compiler uuid-dev libzmq3-dev libgz-msgs10-dev libgz-utils2-cli-dev ``` Clone the repository