From 188187db38ba31b5575388b4dc4d1958d41b1097 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Wed, 15 Feb 2023 18:14:40 +0100 Subject: [PATCH 1/5] Show subscribers info when calling topic info. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Carlos Agüero --- include/gz/transport/Node.hh | 11 +++++- src/Node.cc | 61 +++++++++++++++++++++--------- src/cmd/gz.cc | 28 +++++++++++--- src/cmd/gz_TEST.cc | 34 +++++++++++++++++ test/integration/twoProcsPubSub.cc | 7 ++-- 5 files changed, 113 insertions(+), 28 deletions(-) diff --git a/include/gz/transport/Node.hh b/include/gz/transport/Node.hh index be3c5e44a..67a3e3e82 100644 --- a/include/gz/transport/Node.hh +++ b/include/gz/transport/Node.hh @@ -670,9 +670,18 @@ namespace gz /// \param[in] _topic Name of the topic. /// \param[out] _publishers List of publishers on the topic /// \return False if unable to get topic info - public: bool TopicInfo(const std::string &_topic, + public: bool GZ_DEPRECATED(13) TopicInfo(const std::string &_topic, std::vector &_publishers) const; + /// \brief Get the information about a topic. + /// \param[in] _topic Name of the topic. + /// \param[out] _publishers List of publishers on the topic. + /// \param[out] _subscribers List of subscribers on the topic. + /// \return False if unable to get topic info. + public: bool TopicInfo(const std::string &_topic, + std::vector &_publishers, + std::vector &_subscribers) const; + /// \brief Get the list of topics currently advertised in the network. /// Note that this function can block for some time if the /// discovery is in its initialization phase. diff --git a/src/Node.cc b/src/Node.cc index 99ef440c3..69be39008 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -873,7 +873,18 @@ std::unordered_set &Node::SrvsAdvertised() const bool Node::TopicInfo(const std::string &_topic, std::vector &_publishers) const { - this->dataPtr->shared->dataPtr->msgDiscovery->WaitForInit(); + std::vector unused; + return this->TopicInfo(_topic, _publishers, unused); +} + +////////////////////////////////////////////////// +bool Node::TopicInfo(const std::string &_topic, + std::vector &_publishers, + std::vector &_subscribers) const +{ + // We trigger a topic list to update the list of remote subscribers. + std::vector allTopics; + this->dataPtr->shared->dataPtr->msgDiscovery->TopicList(allTopics); // Construct a topic name with the partition and namespace std::string fullyQualifiedTopic; @@ -883,31 +894,45 @@ bool Node::TopicInfo(const std::string &_topic, return false; } + // Helper function to do some conversion. + auto convert = [](MsgAddresses_M &_input, + std::vector &_output) + { + _output.clear(); + + // Copy the publishers. + for (MsgAddresses_M::iterator iter = _input.begin(); + iter != _input.end(); ++iter) + { + for (std::vector::iterator pubIter = + iter->second.begin(); pubIter != iter->second.end(); ++pubIter) + { + // Add the publisher if it doesn't already exist. + if (std::find(_output.begin(), _output.end(), *pubIter) == + _output.end()) + { + _output.push_back(*pubIter); + } + } + } + }; + std::lock_guard lk(this->dataPtr->shared->mutex); - // Get all the publishers on the given topics + // Get all the publishers on the given topics. MsgAddresses_M pubs; - if (!this->dataPtr->shared->dataPtr->msgDiscovery->Publishers( + if (this->dataPtr->shared->dataPtr->msgDiscovery->Publishers( fullyQualifiedTopic, pubs)) { - return false; + convert(pubs, _publishers); } - _publishers.clear(); - - // Copy the publishers. - for (MsgAddresses_M::iterator iter = pubs.begin(); iter != pubs.end(); ++iter) + // Get all the remote subscribers on the given topics. + MsgAddresses_M subs; + if (this->dataPtr->shared->dataPtr->msgDiscovery->RemoteSubscribers( + fullyQualifiedTopic, subs)) { - for (std::vector::iterator pubIter = iter->second.begin(); - pubIter != iter->second.end(); ++pubIter) - { - // Add the publisher if it doesn't already exist. - if (std::find(_publishers.begin(), _publishers.end(), *pubIter) == - _publishers.end()) - { - _publishers.push_back(*pubIter); - } - } + convert(subs, _subscribers); } return true; diff --git a/src/cmd/gz.cc b/src/cmd/gz.cc index f251ecbbf..36068f108 100644 --- a/src/cmd/gz.cc +++ b/src/cmd/gz.cc @@ -61,22 +61,22 @@ extern "C" void cmdTopicInfo(const char *_topic) return; } - Node node; - // Get the publishers on the requested topic std::vector publishers; - node.TopicInfo(_topic, publishers); + std::vector subscribers; + Node node; + node.TopicInfo(_topic, publishers, subscribers); if (!publishers.empty()) { std::cout << "Publishers [Address, Message Type]:\n"; - /// List the publishers + // List the publishers for (std::vector::iterator iter = publishers.begin(); iter != publishers.end(); ++iter) { std::cout << " " << (*iter).Addr() << ", " - << (*iter).MsgTypeName() << std::endl; + << (*iter).MsgTypeName() << std::endl; } } else @@ -84,7 +84,23 @@ extern "C" void cmdTopicInfo(const char *_topic) std::cout << "No publishers on topic [" << _topic << "]\n"; } - // TODO(anyone): Add subscribers lists + // Get the subscribers on the requested topic + if (!subscribers.empty()) + { + std::cout << "Subscribers [Address, Message Type]:\n"; + + // List the subscribers + for (std::vector::iterator iter = subscribers.begin(); + iter != subscribers.end(); ++iter) + { + std::cout << " " << (*iter).Addr() << ", " + << (*iter).MsgTypeName() << std::endl; + } + } + else + { + std::cout << "No subscribers on topic [" << _topic << "]\n"; + } } ////////////////////////////////////////////////// diff --git a/src/cmd/gz_TEST.cc b/src/cmd/gz_TEST.cc index fcdee8a7e..f75b3cf19 100644 --- a/src/cmd/gz_TEST.cc +++ b/src/cmd/gz_TEST.cc @@ -186,6 +186,40 @@ TEST(gzTest, TopicInfo) testing::waitAndCleanupFork(pi); } +////////////////////////////////////////////////// +/// \brief Check 'gz topic -i' running a subscriber on a different process. +TEST(gzTest, TopicInfoSub) +{ + transport::Node node; + node.Subscribe("/foo", topicCB); + node.SubscribeRaw("/baz", cbRaw, msgs::StringMsg().GetTypeName()); + node.Subscribe("/no", topicCB); + node.Unsubscribe("/no"); + + // Check the 'gz topic -i' command. + std::string gz = std::string(GZ_PATH); + + for (auto topic : {"/foo", "/baz"}) + { + unsigned int retries = 0u; + bool infoFound = false; + std::string output; + + while (!infoFound && retries++ < 10u) + { + output = custom_exec_str(gz + " topic -i -t " + topic + " " + + g_gzVersion); + infoFound = output.size() > 60u; + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + } + + EXPECT_TRUE(infoFound) << "OUTPUT[" + << output << "] Size[" << output.size() + << "]. Expected Size>60" << std::endl; + EXPECT_TRUE(output.find("gz.msgs.") != std::string::npos); + } +} + ////////////////////////////////////////////////// /// \brief Check 'gz service -l' running the advertiser on a different /// process. diff --git a/test/integration/twoProcsPubSub.cc b/test/integration/twoProcsPubSub.cc index a5a82f20f..788e4a878 100644 --- a/test/integration/twoProcsPubSub.cc +++ b/test/integration/twoProcsPubSub.cc @@ -514,17 +514,18 @@ TEST(twoProcPubSub, TopicInfo) transport::Node node; std::vector publishers; + std::vector subscribers; // We need some time for discovering the other node. std::this_thread::sleep_for(std::chrono::milliseconds(2500)); - EXPECT_FALSE(node.TopicInfo("@", publishers)); + EXPECT_FALSE(node.TopicInfo("@", publishers, subscribers)); EXPECT_EQ(publishers.size(), 0u); - EXPECT_FALSE(node.TopicInfo("/bogus", publishers)); + EXPECT_FALSE(node.TopicInfo("/bogus", publishers, subscribers)); EXPECT_EQ(publishers.size(), 0u); - EXPECT_TRUE(node.TopicInfo("/foo", publishers)); + EXPECT_TRUE(node.TopicInfo("/foo", publishers, subscribers)); EXPECT_EQ(publishers.size(), 1u); EXPECT_EQ(publishers.front().MsgTypeName(), "gz.msgs.Vector3d"); From 14bd383701e2ed6cdced61809405bbb14983f3b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Wed, 15 Feb 2023 19:28:36 +0100 Subject: [PATCH 2/5] Update tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Carlos Agüero --- src/cmd/gz_TEST.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cmd/gz_TEST.cc b/src/cmd/gz_TEST.cc index f75b3cf19..2c536fed7 100644 --- a/src/cmd/gz_TEST.cc +++ b/src/cmd/gz_TEST.cc @@ -173,13 +173,13 @@ TEST(gzTest, TopicInfo) while (!infoFound && retries++ < 10u) { output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion); - infoFound = output.size() > 50u; + infoFound = output.size() > 60u; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } EXPECT_TRUE(infoFound) << "OUTPUT[" << output << "] Size[" << output.size() - << "]. Expected Size=50" << std::endl; + << "]. Expected Size>60" << std::endl; EXPECT_TRUE(output.find("gz.msgs.Vector3d") != std::string::npos); // Wait for the child process to return. @@ -341,7 +341,7 @@ TEST(gzTest, TopicInfoSameProc) while (!infoFound && retries++ < 10u) { output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion); - infoFound = output.size() > 50u; + infoFound = output.size() > 60u; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } From 40ff8b1cc1102c7163c3fdb6b54012deaa23e2dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Wed, 15 Feb 2023 21:41:33 +0100 Subject: [PATCH 3/5] Fix tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Carlos Agüero --- src/cmd/gz_src_TEST.cc | 5 ++++- test/integration/twoProcsPubSub.cc | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cmd/gz_src_TEST.cc b/src/cmd/gz_src_TEST.cc index 296852e3d..a66241fad 100644 --- a/src/cmd/gz_src_TEST.cc +++ b/src/cmd/gz_src_TEST.cc @@ -88,7 +88,10 @@ TEST(gzTest, cmdTopicInfo) // A topic without advertisers should show an empty list of publishers. cmdTopicInfo(g_topic.c_str()); - EXPECT_EQ(stdOutBuffer.str(), "No publishers on topic [/topic]\n"); + EXPECT_TRUE(stdOutBuffer.str().find("No publishers on topic [/topic]\n") != + std::string::npos); + EXPECT_TRUE(stdOutBuffer.str().find("No subscribers on topic [/topic]\n") != + std::string::npos); clearIOStreams(stdOutBuffer, stdErrBuffer); restoreIO(); diff --git a/test/integration/twoProcsPubSub.cc b/test/integration/twoProcsPubSub.cc index 788e4a878..7b5550301 100644 --- a/test/integration/twoProcsPubSub.cc +++ b/test/integration/twoProcsPubSub.cc @@ -522,7 +522,7 @@ TEST(twoProcPubSub, TopicInfo) EXPECT_FALSE(node.TopicInfo("@", publishers, subscribers)); EXPECT_EQ(publishers.size(), 0u); - EXPECT_FALSE(node.TopicInfo("/bogus", publishers, subscribers)); + EXPECT_TRUE(node.TopicInfo("/bogus", publishers, subscribers)); EXPECT_EQ(publishers.size(), 0u); EXPECT_TRUE(node.TopicInfo("/foo", publishers, subscribers)); From dcb5cbf0dcd638405e9f0bf44925191fca8c5440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Thu, 16 Feb 2023 01:12:15 +0100 Subject: [PATCH 4/5] Test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Carlos Agüero --- src/cmd/gz_TEST.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/cmd/gz_TEST.cc b/src/cmd/gz_TEST.cc index 2c536fed7..25d065cde 100644 --- a/src/cmd/gz_TEST.cc +++ b/src/cmd/gz_TEST.cc @@ -172,15 +172,17 @@ TEST(gzTest, TopicInfo) while (!infoFound && retries++ < 10u) { - output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion); - infoFound = output.size() > 60u; + output = custom_exec_str(gz + " topic -t /foo2 -i " + g_gzVersion); + std::cout << output.size() << std::endl; + infoFound = output.size() > 70u; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } EXPECT_TRUE(infoFound) << "OUTPUT[" << output << "] Size[" << output.size() - << "]. Expected Size>60" << std::endl; + << "]. Expected Size>70" << std::endl; EXPECT_TRUE(output.find("gz.msgs.Vector3d") != std::string::npos); + std::cout << output << std::endl; // Wait for the child process to return. testing::waitAndCleanupFork(pi); From 70e44a3e0825a5b88a8b027ab64f2043248a2a37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ag=C3=BCero?= Date: Fri, 17 Feb 2023 16:13:51 +0100 Subject: [PATCH 5/5] Tweak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Carlos Agüero --- src/cmd/gz_TEST.cc | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/cmd/gz_TEST.cc b/src/cmd/gz_TEST.cc index 25d065cde..1968de211 100644 --- a/src/cmd/gz_TEST.cc +++ b/src/cmd/gz_TEST.cc @@ -172,17 +172,16 @@ TEST(gzTest, TopicInfo) while (!infoFound && retries++ < 10u) { - output = custom_exec_str(gz + " topic -t /foo2 -i " + g_gzVersion); - std::cout << output.size() << std::endl; - infoFound = output.size() > 70u; + output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion); + bool pubsFound = output.find("No publishers") == std::string::npos; + bool subsFound = output.find("No subscribers") == std::string::npos; + // We should have publishers info but no subscribers. + infoFound = pubsFound && !subsFound; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } - EXPECT_TRUE(infoFound) << "OUTPUT[" - << output << "] Size[" << output.size() - << "]. Expected Size>70" << std::endl; + EXPECT_TRUE(infoFound); EXPECT_TRUE(output.find("gz.msgs.Vector3d") != std::string::npos); - std::cout << output << std::endl; // Wait for the child process to return. testing::waitAndCleanupFork(pi); @@ -211,13 +210,14 @@ TEST(gzTest, TopicInfoSub) { output = custom_exec_str(gz + " topic -i -t " + topic + " " + g_gzVersion); - infoFound = output.size() > 60u; + bool pubsFound = output.find("No publishers") == std::string::npos; + bool subsFound = output.find("No subscribers") == std::string::npos; + // We should have subscribers info but no publishers. + infoFound = !pubsFound && subsFound; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } - EXPECT_TRUE(infoFound) << "OUTPUT[" - << output << "] Size[" << output.size() - << "]. Expected Size>60" << std::endl; + EXPECT_TRUE(infoFound); EXPECT_TRUE(output.find("gz.msgs.") != std::string::npos); } }