Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique network flows #1682

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ option(SQLITE3_SUPPORT "Activate SQLITE3 support" ON)
###############################################################################
# SHM as Default transport
###############################################################################
option(SHM_TRANSPORT_DEFAULT "Adds SHM transport to the default transports" ON)
option(SHM_TRANSPORT_DEFAULT "Adds SHM transport to the default transports" OFF)

###############################################################################
# LogConsumer default setup
Expand Down
9 changes: 6 additions & 3 deletions include/fastdds/rtps/RTPSDomain.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,16 @@ class RTPSDomain
* @param payload_pool Shared pointer to the IPayloadPool
* @param hist Pointer to the WriterHistory.
* @param listen Pointer to the WriterListener.
* @param devoted_receiver_resource bool that specifies if the Publisher shares sending port
* @return Pointer to the created RTPSWriter.
*/
RTPS_DllAPI static RTPSWriter* createRTPSWriter(
RTPSParticipant* p,
WriterAttributes& watt,
const std::shared_ptr<IPayloadPool>& payload_pool,
WriterHistory* hist,
WriterListener* listen = nullptr);
WriterListener* listen = nullptr,
bool devoted_receiver_resource = false);

/**
* Remove a RTPSWriter.
Expand Down Expand Up @@ -149,15 +151,16 @@ class RTPSDomain
* @param payload_pool Shared pointer to the IPayloadPool
* @param hist Pointer to the ReaderHistory.
* @param listen Pointer to the ReaderListener.
* @param devoted_receiver_resource bool that specifies if the Subscriber shares listening ports
* @return Pointer to the created RTPSReader.
*/
RTPS_DllAPI static RTPSReader* createRTPSReader(
RTPSParticipant* p,
ReaderAttributes& ratt,
const std::shared_ptr<IPayloadPool>& payload_pool,
ReaderHistory* hist,
ReaderListener* listen = nullptr);

ReaderListener* listen = nullptr,
bool devoted_receiver_resource = false);
/**
* Remove a RTPSReader.
* @param reader Pointer to the reader you want to remove.
Expand Down
2 changes: 2 additions & 0 deletions include/fastdds/rtps/common/Locator.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ class LocatorList_t
{
public:

using value_type = Locator_t;

RTPS_DllAPI LocatorList_t()
{
}
Expand Down
8 changes: 8 additions & 0 deletions include/fastdds/rtps/network/NetworkFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ class NetworkFactory
LocatorList_t& locators,
const RTPSParticipantAttributes& m_att) const;

/**
* Adds locators to devoted port unicast configuration.
* */
bool getDevotedUnicastLocators(
uint32_t domain_id,
LocatorList_t& locators,
const RTPSParticipantAttributes& m_att) const;

/**
* Fills the locator with the default unicast configuration.
* */
Expand Down
11 changes: 10 additions & 1 deletion include/fastdds/rtps/network/SenderResource.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
#include <functional>
#include <vector>
#include <chrono>
#include <fastdds/rtps/common/Locator.h>

namespace eprosima{
namespace fastrtps{
namespace rtps{

class RTPSParticipantImpl;
class MessageReceiver;
class Locator_t;

/**
* RAII object that encapsulates the Send operation over one chanel in an unknown transport.
Expand Down Expand Up @@ -75,10 +75,19 @@ class SenderResource
send_lambda_.swap(rValueResource.send_lambda_);
}

SenderResource& operator=(SenderResource&& rValueResource)
{
clean_up.swap(rValueResource.clean_up);
send_lambda_.swap(rValueResource.send_lambda_);
return *this;
}

virtual ~SenderResource() = default;

int32_t kind() const { return transport_kind_; }

virtual LocatorList_t get_locators() = 0;

protected:

SenderResource(int32_t transport_kind) : transport_kind_(transport_kind) {}
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/transport/TransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static const uint32_t s_minimumSocketBuffer = 65536;
static const std::string s_IPv4AddressAny = "0.0.0.0";
static const std::string s_IPv6AddressAny = "::";

using SendResourceList = std::vector<std::unique_ptr<fastrtps::rtps::SenderResource>>;
using SendResourceList = std::vector<std::shared_ptr<fastrtps::rtps::SenderResource>>;

class ChannelResource;

Expand Down
32 changes: 32 additions & 0 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface

public:

/**
* @brief Returns RTPSWriter sending ports
* @return Returns Endpoint associated LocatorList_t
*/
rtps::LocatorList_t get_locators();

/**
* Associate specific sender resources to the writer
* @param send_resource_list
*/
void set_sender_resources(const fastdds::rtps::SendResourceList& send_resources)
{
send_resource_list = send_resources;
}

/**
* Create a new change based with the provided changeKind.
* @param data Data of the change.
Expand Down Expand Up @@ -409,6 +424,21 @@ class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface
CDRMessage_t* message,
std::chrono::steady_clock::time_point& max_blocking_time_point) const override;

/**
* Sends to a destination locator, through this writer's sender resources
* the previous parameter.
* @param message Pointer to the buffer with the message already serialized.
* @param destination_locators_begin destination endpoint Locators iterator begin.
* @param destination_locators_end destination endpoint Locators iterator end.
* @param max_blocking_time_point If transport supports it then it will use it as maximum blocking time.
* @return Success of the send operation.
*/
bool send(
CDRMessage_t* message,
LocatorsIterator* destination_locators_begin,
LocatorsIterator* destination_locators_end,
std::chrono::steady_clock::time_point& max_blocking_time_point) const;

protected:

//!Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?.
Expand Down Expand Up @@ -469,6 +499,8 @@ class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface


RTPSWriter* next_[2] = { nullptr, nullptr };

fastdds::rtps::SendResourceList send_resource_list;
};

} /* namespace rtps */
Expand Down
8 changes: 6 additions & 2 deletions include/fastrtps/Domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,14 @@ class Domain
* @param part Pointer to the participant where you want to create the Publisher.
* @param att PublisherAttributes.
* @param listen Pointer to the PublisherListener.
* @param devoted_receiver_resource bool that specifies if the Publisher shares sending port
* @return Pointer to the created Publisher (nullptr if not created).
*/
RTPS_DllAPI static Publisher* createPublisher(
Participant* part,
const PublisherAttributes& att,
PublisherListener* listen = nullptr);
PublisherListener* listen = nullptr,
bool devoted_receiver_resource = false);

//!Fills publisher_attributes with the default values.
RTPS_DllAPI static void getDefaultPublisherAttributes(PublisherAttributes& publisher_attributes);
Expand All @@ -127,12 +129,14 @@ class Domain
* @param part Pointer to the participant where you want to create the Publisher.
* @param att SubscriberAttributes.
* @param listen Pointer to the SubscriberListener.
* @param devoted_receiver_resource bool that specifies if the Subscriber shares listening ports
* @return Pointer to the created Subscriber (nullptr if not created).
*/
RTPS_DllAPI static Subscriber* createSubscriber(
Participant* part,
const SubscriberAttributes& att,
SubscriberListener* listen = nullptr);
SubscriberListener* listen = nullptr,
bool devoted_receiver_resource = false);

//!Fills subscriber_attributes with the default values.
RTPS_DllAPI static void getDefaultSubscriberAttributes(SubscriberAttributes& subscriber_attributes);
Expand Down
6 changes: 6 additions & 0 deletions include/fastrtps/publisher/Publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ class RTPS_DllAPI Publisher
void get_liveliness_lost_status(
LivelinessLostStatus& status);

/**
* @brief Retrieves Publisher sending locators
* @return Returns Endpoint associated LocatorList_t
*/
rtps::LocatorList_t get_locators();

private:

PublisherImpl* mp_impl;
Expand Down
7 changes: 7 additions & 0 deletions include/fastrtps/subscriber/Subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/common/Locator.h>
#include <fastrtps/attributes/SubscriberAttributes.h>
#include <fastrtps/qos/DeadlineMissedStatus.h>
#include <fastrtps/qos/LivelinessChangedStatus.h>
Expand Down Expand Up @@ -168,6 +169,12 @@ class RTPS_DllAPI Subscriber
void get_liveliness_changed_status(
LivelinessChangedStatus& status);

/**
* @brief Retrieves Subscriber listening locators
* @return Returns Endpoint associated LocatorList_t
*/
rtps::LocatorList_t get_locators();

private:

SubscriberImpl* mp_impl;
Expand Down
10 changes: 6 additions & 4 deletions src/cpp/fastrtps_deprecated/Domain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,15 @@ Publisher* Domain::createPublisher(
Publisher* Domain::createPublisher(
Participant* part,
const PublisherAttributes& att,
PublisherListener* listen)
PublisherListener* listen,
bool devoted_receiver_resource)
{
std::lock_guard<std::mutex> guard(m_mutex);
for (auto it = m_participants.begin(); it != m_participants.end(); ++it)
{
if (it->second->getGuid() == part->getGuid())
{
return part->mp_impl->createPublisher(att, listen);
return part->mp_impl->createPublisher(att, listen, devoted_receiver_resource);
}
}
//TODO MOSTRAR MENSAJE DE ERROR WARNING y COMPROBAR QUE EL PUNTERO QUE ME PASA NO ES NULL
Expand Down Expand Up @@ -293,14 +294,15 @@ Subscriber* Domain::createSubscriber(
Subscriber* Domain::createSubscriber(
Participant* part,
const SubscriberAttributes& att,
SubscriberListener* listen)
SubscriberListener* listen,
bool devoted_receiver_resource)
{
std::lock_guard<std::mutex> guard(m_mutex);
for (auto it = m_participants.begin(); it != m_participants.end(); ++it)
{
if (it->second->getGuid() == part->getGuid())
{
return part->mp_impl->createSubscriber(att, listen);
return part->mp_impl->createSubscriber(att, listen, devoted_receiver_resource);
}
}
return nullptr;
Expand Down
15 changes: 10 additions & 5 deletions src/cpp/fastrtps_deprecated/participant/ParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ const GUID_t& ParticipantImpl::getGuid() const

Publisher* ParticipantImpl::createPublisher(
const PublisherAttributes& att,
PublisherListener* listen)
PublisherListener* listen,
bool devoted_receiver_resource)
{
logInfo(PARTICIPANT, "CREATING PUBLISHER IN TOPIC: " << att.topic.getTopicName());
//Look for the correct type registration
Expand Down Expand Up @@ -231,7 +232,8 @@ Publisher* ParticipantImpl::createPublisher(
this->mp_rtpsParticipant,
watt, pubimpl->payload_pool(),
(WriterHistory*)&pubimpl->m_history,
(WriterListener*)&pubimpl->m_writerListener);
(WriterListener*)&pubimpl->m_writerListener,
devoted_receiver_resource);
if (writer == nullptr)
{
logError(PARTICIPANT, "Problem creating associated Writer");
Expand Down Expand Up @@ -262,7 +264,8 @@ std::vector<std::string> ParticipantImpl::getParticipantNames() const

Subscriber* ParticipantImpl::createSubscriber(
const SubscriberAttributes& att,
SubscriberListener* listen)
SubscriberListener* listen,
bool devoted_receiver_resource)
{
logInfo(PARTICIPANT, "CREATING SUBSCRIBER IN TOPIC: " << att.topic.getTopicName())
//Look for the correct type registration
Expand Down Expand Up @@ -359,10 +362,12 @@ Subscriber* ParticipantImpl::createSubscriber(
ratt.disable_positive_acks = true;
}

RTPSReader* reader = RTPSDomain::createRTPSReader(this->mp_rtpsParticipant,
RTPSReader* reader = RTPSDomain::createRTPSReader(
mp_rtpsParticipant,
ratt, subimpl->payload_pool(),
(ReaderHistory*)&subimpl->m_history,
(ReaderListener*)&subimpl->m_readerListener);
(ReaderListener*)&subimpl->m_readerListener,
devoted_receiver_resource);
if (reader == nullptr)
{
logError(PARTICIPANT, "Problem creating associated Reader");
Expand Down
8 changes: 6 additions & 2 deletions src/cpp/fastrtps_deprecated/participant/ParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,25 @@ class ParticipantImpl
* Create a Publisher in this Participant.
* @param att Attributes of the Publisher.
* @param listen Pointer to the listener.
* @param devoted_receiver_resource bool that specifies if the Publisher share sending port
* @return Pointer to the created Publisher.
*/
Publisher* createPublisher(
const PublisherAttributes& att,
PublisherListener* listen=nullptr);
PublisherListener* listen=nullptr,
bool devoted_receiver_resource = false);

/**
* Create a Subscriber in this Participant.
* @param att Attributes of the Subscriber
* @param listen Pointer to the listener.
* @param devoted_receiver_resource bool that specifies if the Subscriber shares listening ports
* @return Pointer to the created Subscriber.
*/
Subscriber* createSubscriber(
const SubscriberAttributes& att,
SubscriberListener* listen=nullptr);
SubscriberListener* listen=nullptr,
bool devoted_receiver_resource = false);

/**
* Remove a Publisher from this participant.
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/fastrtps_deprecated/publisher/Publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,9 @@ void Publisher::assert_liveliness()
{
mp_impl->assert_liveliness();
}

LocatorList_t Publisher::get_locators()
{
assert(mp_impl != nullptr);
return mp_impl->get_locators();
}
7 changes: 7 additions & 0 deletions src/cpp/fastrtps_deprecated/publisher/PublisherImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,3 +666,10 @@ std::shared_ptr<rtps::IPayloadPool> PublisherImpl::payload_pool()
{
return payload_pool_;
}


LocatorList_t PublisherImpl::get_locators()
{
assert(mp_writer != nullptr);
return mp_writer->get_locators();
}
6 changes: 6 additions & 0 deletions src/cpp/fastrtps_deprecated/publisher/PublisherImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class PublisherImpl
void get_liveliness_lost_status(
LivelinessLostStatus& status);

/**
* @brief Returns Publisher sending ports
* @return Returns Endpoint associated LocatorList_t
*/
rtps::LocatorList_t get_locators();

/**
* @brief Asserts liveliness
*/
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ void Subscriber::get_liveliness_changed_status(
{
mp_impl->get_liveliness_changed_status(status);
}

LocatorList_t Subscriber::get_locators()
{
assert(mp_impl != nullptr);
return mp_impl->get_locators();
}
6 changes: 6 additions & 0 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,5 +483,11 @@ std::shared_ptr<rtps::IPayloadPool> SubscriberImpl::payload_pool()
return payload_pool_;
}

LocatorList_t SubscriberImpl::get_locators()
{
assert(mp_reader != nullptr);
return mp_reader->getAttributes().unicastLocatorList;
}

} /* namespace fastrtps */
} /* namespace eprosima */
Loading