Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show subscribers info when running topic info #384

Merged
merged 5 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/gz/transport/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,18 @@ namespace gz
/// \param[in] _topic Name of the topic.
/// \param[out] _publishers List of publishers on the topic
/// \return False if unable to get topic info
public: bool TopicInfo(const std::string &_topic,
public: bool GZ_DEPRECATED(13) TopicInfo(const std::string &_topic,
std::vector<MessagePublisher> &_publishers) const;

/// \brief Get the information about a topic.
/// \param[in] _topic Name of the topic.
/// \param[out] _publishers List of publishers on the topic.
/// \param[out] _subscribers List of subscribers on the topic.
/// \return False if unable to get topic info.
public: bool TopicInfo(const std::string &_topic,
std::vector<MessagePublisher> &_publishers,
std::vector<MessagePublisher> &_subscribers) const;

/// \brief Get the list of topics currently advertised in the network.
/// Note that this function can block for some time if the
/// discovery is in its initialization phase.
Expand Down
61 changes: 43 additions & 18 deletions src/Node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,18 @@ std::unordered_set<std::string> &Node::SrvsAdvertised() const
bool Node::TopicInfo(const std::string &_topic,
std::vector<MessagePublisher> &_publishers) const
{
this->dataPtr->shared->dataPtr->msgDiscovery->WaitForInit();
std::vector<MessagePublisher> unused;
return this->TopicInfo(_topic, _publishers, unused);
}

//////////////////////////////////////////////////
bool Node::TopicInfo(const std::string &_topic,
std::vector<MessagePublisher> &_publishers,
std::vector<MessagePublisher> &_subscribers) const
{
// We trigger a topic list to update the list of remote subscribers.
std::vector<std::string> allTopics;
this->dataPtr->shared->dataPtr->msgDiscovery->TopicList(allTopics);

// Construct a topic name with the partition and namespace
std::string fullyQualifiedTopic;
Expand All @@ -883,31 +894,45 @@ bool Node::TopicInfo(const std::string &_topic,
return false;
}

// Helper function to do some conversion.
auto convert = [](MsgAddresses_M &_input,
std::vector<MessagePublisher> &_output)
{
_output.clear();

// Copy the publishers.
for (MsgAddresses_M::iterator iter = _input.begin();
iter != _input.end(); ++iter)
{
for (std::vector<MessagePublisher>::iterator pubIter =
iter->second.begin(); pubIter != iter->second.end(); ++pubIter)
{
// Add the publisher if it doesn't already exist.
if (std::find(_output.begin(), _output.end(), *pubIter) ==
_output.end())
{
_output.push_back(*pubIter);
}
}
}
};

std::lock_guard<std::recursive_mutex> lk(this->dataPtr->shared->mutex);

// Get all the publishers on the given topics
// Get all the publishers on the given topics.
MsgAddresses_M pubs;
if (!this->dataPtr->shared->dataPtr->msgDiscovery->Publishers(
if (this->dataPtr->shared->dataPtr->msgDiscovery->Publishers(
fullyQualifiedTopic, pubs))
{
return false;
convert(pubs, _publishers);
}

_publishers.clear();

// Copy the publishers.
for (MsgAddresses_M::iterator iter = pubs.begin(); iter != pubs.end(); ++iter)
// Get all the remote subscribers on the given topics.
MsgAddresses_M subs;
if (this->dataPtr->shared->dataPtr->msgDiscovery->RemoteSubscribers(
fullyQualifiedTopic, subs))
{
for (std::vector<MessagePublisher>::iterator pubIter = iter->second.begin();
pubIter != iter->second.end(); ++pubIter)
{
// Add the publisher if it doesn't already exist.
if (std::find(_publishers.begin(), _publishers.end(), *pubIter) ==
_publishers.end())
{
_publishers.push_back(*pubIter);
}
}
convert(subs, _subscribers);
}

return true;
Expand Down
28 changes: 22 additions & 6 deletions src/cmd/gz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,46 @@ extern "C" void cmdTopicInfo(const char *_topic)
return;
}

Node node;

// Get the publishers on the requested topic
std::vector<MessagePublisher> publishers;
node.TopicInfo(_topic, publishers);
std::vector<MessagePublisher> subscribers;
Node node;
node.TopicInfo(_topic, publishers, subscribers);

if (!publishers.empty())
{
std::cout << "Publishers [Address, Message Type]:\n";

/// List the publishers
// List the publishers
for (std::vector<MessagePublisher>::iterator iter = publishers.begin();
iter != publishers.end(); ++iter)
{
std::cout << " " << (*iter).Addr() << ", "
<< (*iter).MsgTypeName() << std::endl;
<< (*iter).MsgTypeName() << std::endl;
}
}
else
{
std::cout << "No publishers on topic [" << _topic << "]\n";
}

// TODO(anyone): Add subscribers lists
// Get the subscribers on the requested topic
if (!subscribers.empty())
{
std::cout << "Subscribers [Address, Message Type]:\n";

// List the subscribers
for (std::vector<MessagePublisher>::iterator iter = subscribers.begin();
iter != subscribers.end(); ++iter)
{
std::cout << " " << (*iter).Addr() << ", "
<< (*iter).MsgTypeName() << std::endl;
}
}
else
{
std::cout << "No subscribers on topic [" << _topic << "]\n";
}
}

//////////////////////////////////////////////////
Expand Down
46 changes: 41 additions & 5 deletions src/cmd/gz_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,55 @@ TEST(gzTest, TopicInfo)
while (!infoFound && retries++ < 10u)
{
output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion);
infoFound = output.size() > 50u;
bool pubsFound = output.find("No publishers") == std::string::npos;
bool subsFound = output.find("No subscribers") == std::string::npos;
// We should have publishers info but no subscribers.
infoFound = pubsFound && !subsFound;
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}

EXPECT_TRUE(infoFound) << "OUTPUT["
<< output << "] Size[" << output.size()
<< "]. Expected Size=50" << std::endl;
EXPECT_TRUE(infoFound);
EXPECT_TRUE(output.find("gz.msgs.Vector3d") != std::string::npos);

// Wait for the child process to return.
testing::waitAndCleanupFork(pi);
}

//////////////////////////////////////////////////
/// \brief Check 'gz topic -i' running a subscriber on a different process.
TEST(gzTest, TopicInfoSub)
{
transport::Node node;
node.Subscribe("/foo", topicCB);
node.SubscribeRaw("/baz", cbRaw, msgs::StringMsg().GetTypeName());
node.Subscribe("/no", topicCB);
node.Unsubscribe("/no");

// Check the 'gz topic -i' command.
std::string gz = std::string(GZ_PATH);

for (auto topic : {"/foo", "/baz"})
{
unsigned int retries = 0u;
bool infoFound = false;
std::string output;

while (!infoFound && retries++ < 10u)
{
output = custom_exec_str(gz + " topic -i -t " + topic + " "
+ g_gzVersion);
bool pubsFound = output.find("No publishers") == std::string::npos;
bool subsFound = output.find("No subscribers") == std::string::npos;
// We should have subscribers info but no publishers.
infoFound = !pubsFound && subsFound;
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}

EXPECT_TRUE(infoFound);
EXPECT_TRUE(output.find("gz.msgs.") != std::string::npos);
}
}

//////////////////////////////////////////////////
/// \brief Check 'gz service -l' running the advertiser on a different
/// process.
Expand Down Expand Up @@ -307,7 +343,7 @@ TEST(gzTest, TopicInfoSameProc)
while (!infoFound && retries++ < 10u)
{
output = custom_exec_str(gz + " topic -t /foo -i " + g_gzVersion);
infoFound = output.size() > 50u;
infoFound = output.size() > 60u;
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}

Expand Down
5 changes: 4 additions & 1 deletion src/cmd/gz_src_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ TEST(gzTest, cmdTopicInfo)

// A topic without advertisers should show an empty list of publishers.
cmdTopicInfo(g_topic.c_str());
EXPECT_EQ(stdOutBuffer.str(), "No publishers on topic [/topic]\n");
EXPECT_TRUE(stdOutBuffer.str().find("No publishers on topic [/topic]\n") !=
std::string::npos);
EXPECT_TRUE(stdOutBuffer.str().find("No subscribers on topic [/topic]\n") !=
std::string::npos);
clearIOStreams(stdOutBuffer, stdErrBuffer);

restoreIO();
Expand Down
7 changes: 4 additions & 3 deletions test/integration/twoProcsPubSub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,17 +514,18 @@ TEST(twoProcPubSub, TopicInfo)

transport::Node node;
std::vector<transport::MessagePublisher> publishers;
std::vector<transport::MessagePublisher> subscribers;

// We need some time for discovering the other node.
std::this_thread::sleep_for(std::chrono::milliseconds(2500));

EXPECT_FALSE(node.TopicInfo("@", publishers));
EXPECT_FALSE(node.TopicInfo("@", publishers, subscribers));
EXPECT_EQ(publishers.size(), 0u);

EXPECT_FALSE(node.TopicInfo("/bogus", publishers));
EXPECT_TRUE(node.TopicInfo("/bogus", publishers, subscribers));
EXPECT_EQ(publishers.size(), 0u);

EXPECT_TRUE(node.TopicInfo("/foo", publishers));
EXPECT_TRUE(node.TopicInfo("/foo", publishers, subscribers));
EXPECT_EQ(publishers.size(), 1u);
EXPECT_EQ(publishers.front().MsgTypeName(), "gz.msgs.Vector3d");

Expand Down