diff --git a/include/gz/transport/Node.hh b/include/gz/transport/Node.hh index be3c5e44a..67a3e3e82 100644 --- a/include/gz/transport/Node.hh +++ b/include/gz/transport/Node.hh @@ -670,9 +670,18 @@ namespace gz /// \param[in] _topic Name of the topic. /// \param[out] _publishers List of publishers on the topic /// \return False if unable to get topic info - public: bool TopicInfo(const std::string &_topic, + public: bool GZ_DEPRECATED(13) TopicInfo(const std::string &_topic, std::vector &_publishers) const; + /// \brief Get the information about a topic. + /// \param[in] _topic Name of the topic. + /// \param[out] _publishers List of publishers on the topic. + /// \param[out] _subscribers List of subscribers on the topic. + /// \return False if unable to get topic info. + public: bool TopicInfo(const std::string &_topic, + std::vector &_publishers, + std::vector &_subscribers) const; + /// \brief Get the list of topics currently advertised in the network. /// Note that this function can block for some time if the /// discovery is in its initialization phase. diff --git a/src/Node.cc b/src/Node.cc index 99ef440c3..69be39008 100644 --- a/src/Node.cc +++ b/src/Node.cc @@ -873,7 +873,18 @@ std::unordered_set &Node::SrvsAdvertised() const bool Node::TopicInfo(const std::string &_topic, std::vector &_publishers) const { - this->dataPtr->shared->dataPtr->msgDiscovery->WaitForInit(); + std::vector unused; + return this->TopicInfo(_topic, _publishers, unused); +} + +////////////////////////////////////////////////// +bool Node::TopicInfo(const std::string &_topic, + std::vector &_publishers, + std::vector &_subscribers) const +{ + // We trigger a topic list to update the list of remote subscribers. + std::vector allTopics; + this->dataPtr->shared->dataPtr->msgDiscovery->TopicList(allTopics); // Construct a topic name with the partition and namespace std::string fullyQualifiedTopic; @@ -883,31 +894,45 @@ bool Node::TopicInfo(const std::string &_topic, return false; } + // Helper function to do some conversion. + auto convert = [](MsgAddresses_M &_input, + std::vector &_output) + { + _output.clear(); + + // Copy the publishers. + for (MsgAddresses_M::iterator iter = _input.begin(); + iter != _input.end(); ++iter) + { + for (std::vector::iterator pubIter = + iter->second.begin(); pubIter != iter->second.end(); ++pubIter) + { + // Add the publisher if it doesn't already exist. + if (std::find(_output.begin(), _output.end(), *pubIter) == + _output.end()) + { + _output.push_back(*pubIter); + } + } + } + }; + std::lock_guard lk(this->dataPtr->shared->mutex); - // Get all the publishers on the given topics + // Get all the publishers on the given topics. MsgAddresses_M pubs; - if (!this->dataPtr->shared->dataPtr->msgDiscovery->Publishers( + if (this->dataPtr->shared->dataPtr->msgDiscovery->Publishers( fullyQualifiedTopic, pubs)) { - return false; + convert(pubs, _publishers); } - _publishers.clear(); - - // Copy the publishers. - for (MsgAddresses_M::iterator iter = pubs.begin(); iter != pubs.end(); ++iter) + // Get all the remote subscribers on the given topics. + MsgAddresses_M subs; + if (this->dataPtr->shared->dataPtr->msgDiscovery->RemoteSubscribers( + fullyQualifiedTopic, subs)) { - for (std::vector::iterator pubIter = iter->second.begin(); - pubIter != iter->second.end(); ++pubIter) - { - // Add the publisher if it doesn't already exist. - if (std::find(_publishers.begin(), _publishers.end(), *pubIter) == - _publishers.end()) - { - _publishers.push_back(*pubIter); - } - } + convert(subs, _subscribers); } return true; diff --git a/src/cmd/gz.cc b/src/cmd/gz.cc index f251ecbbf..36068f108 100644 --- a/src/cmd/gz.cc +++ b/src/cmd/gz.cc @@ -61,22 +61,22 @@ extern "C" void cmdTopicInfo(const char *_topic) return; } - Node node; - // Get the publishers on the requested topic std::vector publishers; - node.TopicInfo(_topic, publishers); + std::vector subscribers; + Node node; + node.TopicInfo(_topic, publishers, subscribers); if (!publishers.empty()) { std::cout << "Publishers [Address, Message Type]:\n"; - /// List the publishers + // List the publishers for (std::vector::iterator iter = publishers.begin(); iter != publishers.end(); ++iter) { std::cout << " " << (*iter).Addr() << ", " - << (*iter).MsgTypeName() << std::endl; + << (*iter).MsgTypeName() << std::endl; } } else @@ -84,7 +84,23 @@ extern "C" void cmdTopicInfo(const char *_topic) std::cout << "No publishers on topic [" << _topic << "]\n"; } - // TODO(anyone): Add subscribers lists + // Get the subscribers on the requested topic + if (!subscribers.empty()) + { + std::cout << "Subscribers [Address, Message Type]:\n"; + + // List the subscribers + for (std::vector::iterator iter = subscribers.begin(); + iter != subscribers.end(); ++iter) + { + std::cout << " " << (*iter).Addr() << ", " + << (*iter).MsgTypeName() << std::endl; + } + } + else + { + std::cout << "No subscribers on topic [" << _topic << "]\n"; + } } ////////////////////////////////////////////////// diff --git a/src/cmd/gz_TEST.cc b/src/cmd/gz_TEST.cc index fcdee8a7e..1968de211 100644 --- a/src/cmd/gz_TEST.cc +++ b/src/cmd/gz_TEST.cc @@ -173,19 +173,55 @@ TEST(gzTest, TopicInfo) while (!infoFound && retries++ < 10u) { output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion); - infoFound = output.size() > 50u; + bool pubsFound = output.find("No publishers") == std::string::npos; + bool subsFound = output.find("No subscribers") == std::string::npos; + // We should have publishers info but no subscribers. + infoFound = pubsFound && !subsFound; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } - EXPECT_TRUE(infoFound) << "OUTPUT[" - << output << "] Size[" << output.size() - << "]. Expected Size=50" << std::endl; + EXPECT_TRUE(infoFound); EXPECT_TRUE(output.find("gz.msgs.Vector3d") != std::string::npos); // Wait for the child process to return. testing::waitAndCleanupFork(pi); } +////////////////////////////////////////////////// +/// \brief Check 'gz topic -i' running a subscriber on a different process. +TEST(gzTest, TopicInfoSub) +{ + transport::Node node; + node.Subscribe("/foo", topicCB); + node.SubscribeRaw("/baz", cbRaw, msgs::StringMsg().GetTypeName()); + node.Subscribe("/no", topicCB); + node.Unsubscribe("/no"); + + // Check the 'gz topic -i' command. + std::string gz = std::string(GZ_PATH); + + for (auto topic : {"/foo", "/baz"}) + { + unsigned int retries = 0u; + bool infoFound = false; + std::string output; + + while (!infoFound && retries++ < 10u) + { + output = custom_exec_str(gz + " topic -i -t " + topic + " " + + g_gzVersion); + bool pubsFound = output.find("No publishers") == std::string::npos; + bool subsFound = output.find("No subscribers") == std::string::npos; + // We should have subscribers info but no publishers. + infoFound = !pubsFound && subsFound; + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + } + + EXPECT_TRUE(infoFound); + EXPECT_TRUE(output.find("gz.msgs.") != std::string::npos); + } +} + ////////////////////////////////////////////////// /// \brief Check 'gz service -l' running the advertiser on a different /// process. @@ -307,7 +343,7 @@ TEST(gzTest, TopicInfoSameProc) while (!infoFound && retries++ < 10u) { output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion); - infoFound = output.size() > 50u; + infoFound = output.size() > 60u; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } diff --git a/src/cmd/gz_src_TEST.cc b/src/cmd/gz_src_TEST.cc index 296852e3d..a66241fad 100644 --- a/src/cmd/gz_src_TEST.cc +++ b/src/cmd/gz_src_TEST.cc @@ -88,7 +88,10 @@ TEST(gzTest, cmdTopicInfo) // A topic without advertisers should show an empty list of publishers. cmdTopicInfo(g_topic.c_str()); - EXPECT_EQ(stdOutBuffer.str(), "No publishers on topic [/topic]\n"); + EXPECT_TRUE(stdOutBuffer.str().find("No publishers on topic [/topic]\n") != + std::string::npos); + EXPECT_TRUE(stdOutBuffer.str().find("No subscribers on topic [/topic]\n") != + std::string::npos); clearIOStreams(stdOutBuffer, stdErrBuffer); restoreIO(); diff --git a/test/integration/twoProcsPubSub.cc b/test/integration/twoProcsPubSub.cc index a5a82f20f..7b5550301 100644 --- a/test/integration/twoProcsPubSub.cc +++ b/test/integration/twoProcsPubSub.cc @@ -514,17 +514,18 @@ TEST(twoProcPubSub, TopicInfo) transport::Node node; std::vector publishers; + std::vector subscribers; // We need some time for discovering the other node. std::this_thread::sleep_for(std::chrono::milliseconds(2500)); - EXPECT_FALSE(node.TopicInfo("@", publishers)); + EXPECT_FALSE(node.TopicInfo("@", publishers, subscribers)); EXPECT_EQ(publishers.size(), 0u); - EXPECT_FALSE(node.TopicInfo("/bogus", publishers)); + EXPECT_TRUE(node.TopicInfo("/bogus", publishers, subscribers)); EXPECT_EQ(publishers.size(), 0u); - EXPECT_TRUE(node.TopicInfo("/foo", publishers)); + EXPECT_TRUE(node.TopicInfo("/foo", publishers, subscribers)); EXPECT_EQ(publishers.size(), 1u); EXPECT_EQ(publishers.front().MsgTypeName(), "gz.msgs.Vector3d");