Skip to content

Commit

Permalink
Pick smallest available participant ID for new paricipants (#3437)
Browse files Browse the repository at this point in the history
* Pick smallest available participant ID for new paricipants

Signed-off-by: Shane Loretz <sloretz@google.com>

* Fix style issues

Signed-off-by: Shane Loretz <sloretz@google.com>

* Eliminate m_maxRTPSParticipantId

Signed-off-by: Shane Loretz <sloretz@google.com>

* Clearer comments on rationale behind return value

Signed-off-by: Shane Loretz <sloretz@google.com>

* static_cast to avoid warning on WIN32

Signed-off-by: Shane Loretz <sloretz@google.com>

* Use special participant id value for prefix creation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* New reservation mechanism

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Shane Loretz <sloretz@google.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
sloretz and MiguelCompany authored Apr 7, 2023
1 parent eb20c80 commit e46e875
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 39 deletions.
83 changes: 57 additions & 26 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,26 +153,9 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(
}

uint32_t ID;
if (!instance->prepare_participant_id(PParam.participantID, ID))
{
std::lock_guard<std::mutex> guard(instance->m_mutex);

if (PParam.participantID < 0)
{
ID = instance->getNewId();
while (instance->m_RTPSParticipantIDs.insert(ID).second == false)
{
ID = instance->getNewId();
}
}
else
{
ID = PParam.participantID;
if (instance->m_RTPSParticipantIDs.insert(ID).second == false)
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return nullptr;
}
}
return nullptr;
}

if (!PParam.defaultUnicastLocatorList.isValid())
Expand All @@ -190,7 +173,7 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(

// Generate a new GuidPrefix_t
GuidPrefix_t guidP;
guid_prefix_create(ID, guidP);
guid_prefix_create(instance->get_id_for_prefix(ID), guidP);
if (!PParam.builtin.metatraffic_external_unicast_locators.empty())
{
fastdds::rtps::LocatorList locators;
Expand Down Expand Up @@ -259,6 +242,8 @@ RTPSParticipant* RTPSDomainImpl::createParticipant(
{
std::lock_guard<std::mutex> guard(instance->m_mutex);
instance->m_RTPSParticipants.push_back(t_p_RTPSParticipant(p, pimpl));
instance->m_RTPSParticipantIDs[ID].used = true;
instance->m_RTPSParticipantIDs[ID].reserved = true;
}

// Check the environment file in case it was modified during participant creation leading to a missed callback.
Expand Down Expand Up @@ -291,7 +276,9 @@ bool RTPSDomainImpl::removeRTPSParticipant(
{
RTPSDomainImpl::t_p_RTPSParticipant participant = *it;
instance->m_RTPSParticipants.erase(it);
instance->m_RTPSParticipantIDs.erase(participant.second->getRTPSParticipantID());
uint32_t participant_id = participant.second->getRTPSParticipantID();
instance->m_RTPSParticipantIDs[participant_id].used = false;
instance->m_RTPSParticipantIDs[participant_id].reserved = false;
lock.unlock();
instance->removeRTPSParticipant_nts(participant);
return true;
Expand Down Expand Up @@ -599,7 +586,54 @@ RTPSParticipant* RTPSDomainImpl::clientServerEnvironmentCreationOverride(

uint32_t RTPSDomainImpl::getNewId()
{
return m_maxRTPSParticipantID++;
// Get the smallest available participant ID.
// Settings like maxInitialPeersRange control how many participants a peer
// will look for on this host.
// Choosing the smallest value ensures peers using unicast discovery will
// find this participant as long as the total number of participants has
// not exceeded the number of peers they will look for.
uint32_t i = 0;
while (m_RTPSParticipantIDs[i].reserved || m_RTPSParticipantIDs[i].used)
{
++i;
}
m_RTPSParticipantIDs[i].reserved = true;
return i;
}

bool RTPSDomainImpl::prepare_participant_id(
int32_t input_id,
uint32_t& participant_id)
{
std::lock_guard<std::mutex> guard(m_mutex);
if (input_id < 0)
{
participant_id = getNewId();
}
else
{
participant_id = input_id;
if (m_RTPSParticipantIDs[participant_id].used == true)
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return false;
}
}
return true;
}

uint32_t RTPSDomainImpl::get_id_for_prefix(
uint32_t participant_id)
{
uint32_t ret = participant_id;
if (ret < 0x10000)
{
std::lock_guard<std::mutex> guard(m_mutex);
ret |= m_RTPSParticipantIDs[participant_id].counter;
m_RTPSParticipantIDs[participant_id].counter += 0x10000;
}

return ret;
}

void RTPSDomainImpl::create_participant_guid(
Expand All @@ -610,10 +644,7 @@ void RTPSDomainImpl::create_participant_guid(
{
auto instance = get_instance();
std::lock_guard<std::mutex> guard(instance->m_mutex);
do
{
participant_id = instance->getNewId();
} while (instance->m_RTPSParticipantIDs.find(participant_id) != instance->m_RTPSParticipantIDs.end());
participant_id = instance->getNewId();
}

guid_prefix_create(participant_id, guid.guidPrefix);
Expand Down
32 changes: 19 additions & 13 deletions src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>
#include <memory>
#include <thread>
#include <unordered_map>

#if defined(_WIN32) || defined(__unix__)
#include <FileWatch.hpp>
Expand Down Expand Up @@ -104,16 +105,6 @@ class RTPSDomainImpl
static bool removeRTPSParticipant(
RTPSParticipant* p);

/**
* Set the maximum RTPSParticipantID.
* @param maxRTPSParticipantId ID.
*/
static inline void setMaxRTPSParticipantId(
uint32_t maxRTPSParticipantId)
{
get_instance()->m_maxRTPSParticipantID = maxRTPSParticipantId;
}

/**
* Creates a RTPSParticipant as default server or client if ROS_MASTER_URI environment variable is set.
* @param domain_id DDS domain associated
Expand Down Expand Up @@ -214,10 +205,20 @@ class RTPSDomainImpl

/**
* @brief Get Id to create a RTPSParticipant.
*
* This function assumes m_mutex is already locked by the caller.
*
* @return Different ID for each call.
*/
uint32_t getNewId();

bool prepare_participant_id(
int32_t input_id,
uint32_t& participant_id);

uint32_t get_id_for_prefix(
uint32_t participant_id);

void removeRTPSParticipant_nts(
t_p_RTPSParticipant&);

Expand All @@ -227,11 +228,16 @@ class RTPSDomainImpl

std::mutex m_mutex;

std::atomic<uint32_t> m_maxRTPSParticipantID;

std::vector<t_p_RTPSParticipant> m_RTPSParticipants;

std::set<uint32_t> m_RTPSParticipantIDs;
struct ParticipantIDState
{
uint32_t counter = 0;
bool reserved = false;
bool used = false;
};

std::unordered_map<uint32_t, ParticipantIDState> m_RTPSParticipantIDs;

FileWatchHandle file_watch_handle_;
};
Expand Down

0 comments on commit e46e875

Please sign in to comment.