Skip to content

Commit

Permalink
List subscribed topics when running topic list (#379)
Browse files Browse the repository at this point in the history
* Topic list and topic info

Signed-off-by: Carlos Agüero <caguero@openrobotics.org>
  • Loading branch information
caguero authored Feb 8, 2023
1 parent c89f55e commit 4bbf528
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .github/ci/packages.apt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
libgz-cmake3-dev
libgz-math7-dev
libgz-msgs9-dev
libgz-msgs10-dev
libgz-tools2-dev
libgz-utils2-cli-dev
libprotobuf-dev
Expand Down
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ set(GZ_UTILS_VER ${gz-utils2_VERSION_MAJOR})

#--------------------------------------
# Find gz-msgs
gz_find_package(gz-msgs9 REQUIRED)
set(GZ_MSGS_VER ${gz-msgs9_VERSION_MAJOR})
gz_find_package(gz-msgs10 REQUIRED)
set(GZ_MSGS_VER ${gz-msgs10_VERSION_MAJOR})

#--------------------------------------
# Find ifaddrs
Expand Down
4 changes: 2 additions & 2 deletions docker/gz-transport/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ RUN sudo /bin/sh -c 'echo "deb [trusted=yes] http://packages.osrfoundation.org/g
&& sudo apt-get install -y \
libgz-cmake3-dev \
libgz-math7-dev \
libgz-msgs9-dev \
libgz-msgs10-dev \
libgz-utils2-cli-dev \
&& sudo apt-get clean

# Gazebo transport
RUN git clone https://github.com/gazebosim/gz-transport.git -b gz-transport12 \
RUN git clone https://github.com/gazebosim/gz-transport.git -b gz-transport13 \
&& cd gz-transport \
&& mkdir build \
&& cd build \
Expand Down
82 changes: 80 additions & 2 deletions include/gz/transport/Discovery.hh
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,14 @@ namespace gz
return true;
}

/// \brief Send the response to a SUBSCRIBERS_REQ message.
/// \param[in] _pub Information to send.
public: void SendSubscribersRep(const MessagePublisher &_pub) const
{
this->SendMsg(
DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REP, _pub);
}

/// \brief Register a node from this process as a remote subscriber.
/// \param[in] _pub Contains information about the subscriber.
public: void Register(const MessagePublisher &_pub) const
Expand Down Expand Up @@ -433,6 +441,17 @@ namespace gz
return this->info.Publishers(_topic, _publishers);
}

/// \brief Get all the subscribers' information known for a given topic.
/// \param[in] _topic Topic name.
/// \param[out] _subscribers All remote subscribers for this topic.
/// \return True if the topic is found and there is at least one publisher
public: bool RemoteSubscribers(const std::string &_topic,
Addresses_M<Pub> &_subscribers) const
{
std::lock_guard<std::mutex> lock(this->mutex);
return this->remoteSubscribers.Publishers(_topic, _subscribers);
}

/// \brief Unadvertise a new message. Broadcast a discovery
/// message that will cancel all the discovery information for the topic
/// advertised by a specific node.
Expand Down Expand Up @@ -573,6 +592,15 @@ namespace gz
this->unregistrationCb = _cb;
}

/// \brief Register a callback to receive an event when a node requests
/// the list of remote subscribers.
/// \param[in] _cb Function callback.
public: void SubscribersCb(const std::function<void()> &_cb)
{
std::lock_guard<std::mutex> lock(this->mutex);
this->subscribersCb = _cb;
}

/// \brief Print the current discovery state.
public: void PrintCurrentState() const
{
Expand Down Expand Up @@ -615,13 +643,33 @@ namespace gz
std::cout << "---------------" << std::endl;
}

/// \brief Get the list of topics currently advertised in the network.
/// \brief Get the list of topics currently advertised and subscribed
/// in the network.
/// \param[out] _topics List of advertised topics.
public: void TopicList(std::vector<std::string> &_topics) const
public: void TopicList(std::vector<std::string> &_topics)
{
this->remoteSubscribers.Clear();

// Request the list of subscribers.
Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
this->SendMsg(
DestinationType::ALL, msgs::Discovery::SUBSCRIBERS_REQ, pub);

this->WaitForInit();
std::lock_guard<std::mutex> lock(this->mutex);
this->info.TopicList(_topics);

std::vector<std::string> remoteSubs;
this->remoteSubscribers.TopicList(remoteSubs);

// Add the remote subscribers
for (auto const &t : remoteSubs)
{
if (std::find(_topics.begin(), _topics.end(), t) == _topics.end())
{
_topics.push_back(t);
}
}
}

/// \brief Check if ready/initialized. If not, then wait on the
Expand Down Expand Up @@ -926,13 +974,15 @@ namespace gz
DiscoveryCallback<Pub> disconnectCb;
DiscoveryCallback<Pub> registerCb;
DiscoveryCallback<Pub> unregisterCb;
std::function<void()> subscribersReqCb;
{
std::lock_guard<std::mutex> lock(this->mutex);
this->activity[recvPUuid] = std::chrono::steady_clock::now();
connectCb = this->connectionCb;
disconnectCb = this->disconnectionCb;
registerCb = this->registrationCb;
unregisterCb = this->unregistrationCb;
subscribersReqCb = this->subscribersCb;
}

switch (msg.type())
Expand Down Expand Up @@ -1011,6 +1061,25 @@ namespace gz

break;
}
case msgs::Discovery::SUBSCRIBERS_REQ:
{
if (subscribersReqCb)
subscribersReqCb();

break;
}
case msgs::Discovery::SUBSCRIBERS_REP:
{
// Save the remote subscriber.
Pub publisher;
publisher.SetFromDiscovery(msg);

{
std::lock_guard<std::mutex> lock(this->mutex);
this->remoteSubscribers.AddPublisher(publisher);
}
break;
}
case msgs::Discovery::NEW_CONNECTION:
{
// Read the rest of the fields.
Expand Down Expand Up @@ -1114,6 +1183,7 @@ namespace gz
discoveryMsg.set_version(this->Version());
discoveryMsg.set_type(_type);
discoveryMsg.set_process_uuid(this->pUuid);
_pub.FillDiscovery(discoveryMsg);

switch (_type)
{
Expand All @@ -1132,6 +1202,8 @@ namespace gz
}
case msgs::Discovery::HEARTBEAT:
case msgs::Discovery::BYE:
case msgs::Discovery::SUBSCRIBERS_REQ:
case msgs::Discovery::SUBSCRIBERS_REP:
break;
default:
std::cerr << "Discovery::SendMsg() error: Unrecognized message"
Expand Down Expand Up @@ -1450,9 +1522,15 @@ namespace gz
/// \brief Callback executed when a new remote subscriber is unregistered.
private: DiscoveryCallback<Pub> unregistrationCb;

/// \brief Callback executed when a SUBSCRIBERS_REQ message is received.
private: std::function<void()> subscribersCb;

/// \brief Addressing information.
private: TopicStorage<Pub> info;

/// \brief Remote subscribers.
private: TopicStorage<Pub> remoteSubscribers;

/// \brief Activity information. Every time there is a message from a
/// remote node, its activity information is updated. If we do not hear
/// from a node in a while, its entries in 'info' will be invalided. The
Expand Down
9 changes: 8 additions & 1 deletion include/gz/transport/HandlerStorage.hh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace gz
using UUIDHandler_M = std::map<std::string, std::shared_ptr<T>>;
using UUIDHandler_Collection_M = std::map<std::string, UUIDHandler_M>;

/// \brief key is a topic name and value is UUIDHandler_M
/// \brief key is a topic name and value is UUIDHandler_Collection_M
using TopicServiceCalls_M =
std::map<std::string, UUIDHandler_Collection_M>;

Expand Down Expand Up @@ -159,6 +159,13 @@ namespace gz
return true;
}

/// \brief Get a reference to all the handlers.
/// \return All the handlers.
public: TopicServiceCalls_M &AllHandlers()
{
return this->data;
}

/// \brief Add a request handler to a topic. A request handler stores
/// the callback and types associated to a service call request.
/// \param[in] _topic Topic name.
Expand Down
10 changes: 10 additions & 0 deletions include/gz/transport/NodeShared.hh
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ namespace gz
/// \param[in] _pub Information of the remote subscriber.
public: void OnEndRegistration(const MessagePublisher &_pub);

/// \brief Callback executed when a SUBSCRIBERS request is received.
public: void OnSubscribers();

/// \brief Pass through to bool Publishers(const std::string &_topic,
/// Addresses_M<Pub> &_publishers) const
/// \param[in] _topic Service name.
Expand Down Expand Up @@ -395,6 +398,13 @@ namespace gz
const std::string &_fullyQualifiedTopic,
const std::string &_nUuid);

/// \brief Convert all the HandlerStorages into a vector of publishers.
/// \param[in] _addr The pub/sub address.
/// \param[in] _pUuid The process UUID.
/// \return The vector of message publishers.
public: std::vector<MessagePublisher> Convert(const std::string &_addr,
const std::string &_pUuid);

/// \brief Normal local subscriptions.
public: HandlerStorage<ISubscriptionHandler> normal;

Expand Down
66 changes: 0 additions & 66 deletions include/gz/transport/Packet.hh

This file was deleted.

6 changes: 6 additions & 0 deletions include/gz/transport/TopicStorage.hh
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@ namespace gz
}
}

/// \brief Clear the content.
public: void Clear()
{
this->data.clear();
}

/// \brief The keys are topics. The values are another map, where the key
/// is the process UUID and the value a vector of publishers.
private: std::map<std::string,
Expand Down
2 changes: 2 additions & 0 deletions src/HandlerStorage_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ TEST(RepStorageTest, RepStorageAPI)
EXPECT_FALSE(reps.HasHandlersForTopic(topic));
EXPECT_FALSE(reps.RemoveHandlersForNode(topic, nUuid1));
EXPECT_FALSE(reps.HasHandlersForNode(topic, nUuid1));
EXPECT_TRUE(reps.AllHandlers().empty());

// Create a REP handler.
std::shared_ptr<transport::RepHandler<msgs::Vector3d,
Expand All @@ -104,6 +105,7 @@ TEST(RepStorageTest, RepStorageAPI)
EXPECT_TRUE(reps.Handlers(topic, m));
EXPECT_EQ(m.size(), 1u);
EXPECT_EQ(m.begin()->first, nUuid1);
EXPECT_EQ(1u, reps.AllHandlers().size());

reset();

Expand Down
Loading

0 comments on commit 4bbf528

Please sign in to comment.