Skip to content

Commit

Permalink
Pick smallest available participant ID for new paricipants (#3437) (#…
Browse files Browse the repository at this point in the history
…4555)

* Pick smallest available participant ID for new paricipants (#3437)

* Pick smallest available participant ID for new paricipants

Signed-off-by: Shane Loretz <sloretz@google.com>

* Fix style issues

Signed-off-by: Shane Loretz <sloretz@google.com>

* Eliminate m_maxRTPSParticipantId

Signed-off-by: Shane Loretz <sloretz@google.com>

* Clearer comments on rationale behind return value

Signed-off-by: Shane Loretz <sloretz@google.com>

* static_cast to avoid warning on WIN32

Signed-off-by: Shane Loretz <sloretz@google.com>

* Use special participant id value for prefix creation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* New reservation mechanism

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

---------

Signed-off-by: Shane Loretz <sloretz@google.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Co-authored-by: Miguel Company <MiguelCompany@eprosima.com>
(cherry picked from commit e46e875)

# Conflicts:
#	src/cpp/rtps/RTPSDomain.cpp
#	src/cpp/rtps/RTPSDomainImpl.hpp

* Refs #20120: Solve conflicts

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20120: Fix wrong deleted return

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

* Refs #20120: Uncrustify

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>

---------

Signed-off-by: Jesus Perez <jesusperez@eprosima.com>
Co-authored-by: Shane Loretz <sloretz@google.com>
Co-authored-by: Jesus Perez <jesusperez@eprosima.com>
  • Loading branch information
3 people authored Apr 4, 2024
1 parent 6a45117 commit 4e3e0eb
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 27 deletions.
92 changes: 65 additions & 27 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <regex>
#include <string>
#include <thread>
#include <unordered_map>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/history/WriterHistory.h>
Expand Down Expand Up @@ -61,7 +62,7 @@ static void guid_prefix_create(
std::mutex RTPSDomain::m_mutex;
std::atomic<uint32_t> RTPSDomain::m_maxRTPSParticipantID(1);
std::vector<RTPSDomain::t_p_RTPSParticipant> RTPSDomain::m_RTPSParticipants;
std::set<uint32_t> RTPSDomain::m_RTPSParticipantIDs;
std::unordered_map<uint32_t, RTPSDomainImpl::ParticipantIDState> RTPSDomainImpl::m_RTPSParticipantIDs;
FileWatchHandle RTPSDomainImpl::file_watch_handle_;

void RTPSDomain::stopAll()
Expand All @@ -75,7 +76,8 @@ void RTPSDomain::stopAll()
while (m_RTPSParticipants.size() > 0)
{
RTPSDomain::t_p_RTPSParticipant participant = m_RTPSParticipants.back();
m_RTPSParticipantIDs.erase(m_RTPSParticipantIDs.find(participant.second->getRTPSParticipantID()));
RTPSDomainImpl::m_RTPSParticipantIDs.erase(RTPSDomainImpl::m_RTPSParticipantIDs.find(participant.second->
getRTPSParticipantID()));
m_RTPSParticipants.pop_back();

lock.unlock();
Expand Down Expand Up @@ -129,26 +131,9 @@ RTPSParticipant* RTPSDomain::createParticipant(
}

uint32_t ID;
if (!RTPSDomainImpl::prepare_participant_id(PParam.participantID, ID))
{
std::lock_guard<std::mutex> guard(m_mutex);

if (PParam.participantID < 0)
{
ID = getNewId();
while (m_RTPSParticipantIDs.insert(ID).second == false)
{
ID = getNewId();
}
}
else
{
ID = PParam.participantID;
if (m_RTPSParticipantIDs.insert(ID).second == false)
{
logError(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return nullptr;
}
}
return nullptr;
}

if (!PParam.defaultUnicastLocatorList.isValid())
Expand All @@ -166,7 +151,7 @@ RTPSParticipant* RTPSDomain::createParticipant(

// Generate a new GuidPrefix_t
GuidPrefix_t guidP;
guid_prefix_create(ID, guidP);
guid_prefix_create(RTPSDomainImpl::get_id_for_prefix(ID), guidP);

RTPSParticipant* p = new RTPSParticipant(nullptr);
RTPSParticipantImpl* pimpl = nullptr;
Expand Down Expand Up @@ -224,6 +209,8 @@ RTPSParticipant* RTPSDomain::createParticipant(
{
std::lock_guard<std::mutex> guard(m_mutex);
m_RTPSParticipants.push_back(t_p_RTPSParticipant(p, pimpl));
RTPSDomainImpl::m_RTPSParticipantIDs[ID].used = true;
RTPSDomainImpl::m_RTPSParticipantIDs[ID].reserved = true;
}

// Check the environment file in case it was modified during participant creation leading to a missed callback.
Expand Down Expand Up @@ -256,7 +243,9 @@ bool RTPSDomain::removeRTPSParticipant(
{
RTPSDomain::t_p_RTPSParticipant participant = *it;
m_RTPSParticipants.erase(it);
m_RTPSParticipantIDs.erase(m_RTPSParticipantIDs.find(participant.second->getRTPSParticipantID()));
uint32_t participant_id = participant.second->getRTPSParticipantID();
RTPSDomainImpl::m_RTPSParticipantIDs[participant_id].used = false;
RTPSDomainImpl::m_RTPSParticipantIDs[participant_id].reserved = false;
lock.unlock();
removeRTPSParticipant_nts(participant);
return true;
Expand Down Expand Up @@ -547,17 +536,66 @@ RTPSParticipant* RTPSDomain::clientServerEnvironmentCreationOverride(
return nullptr;
}

uint32_t RTPSDomainImpl::getNewId()
{
// Get the smallest available participant ID.
// Settings like maxInitialPeersRange control how many participants a peer
// will look for on this host.
// Choosing the smallest value ensures peers using unicast discovery will
// find this participant as long as the total number of participants has
// not exceeded the number of peers they will look for.
uint32_t i = 0;
while (m_RTPSParticipantIDs[i].reserved || m_RTPSParticipantIDs[i].used)
{
++i;
}
m_RTPSParticipantIDs[i].reserved = true;
return i;
}

bool RTPSDomainImpl::prepare_participant_id(
int32_t input_id,
uint32_t& participant_id)
{
std::lock_guard<std::mutex> guard(RTPSDomain::m_mutex);
if (input_id < 0)
{
participant_id = getNewId();
}
else
{
participant_id = input_id;
if (m_RTPSParticipantIDs[participant_id].used == true)
{
logError(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return false;
}
}
return true;
}

uint32_t RTPSDomainImpl::get_id_for_prefix(
uint32_t participant_id)
{
uint32_t ret = participant_id;
if (ret < 0x10000)
{
std::lock_guard<std::mutex> guard(RTPSDomain::m_mutex);
ret |= m_RTPSParticipantIDs[participant_id].counter;
m_RTPSParticipantIDs[participant_id].counter += 0x10000;
}

return ret;
}

void RTPSDomainImpl::create_participant_guid(
int32_t& participant_id,
GUID_t& guid)
{
if (participant_id < 0)
{
std::lock_guard<std::mutex> guard(RTPSDomain::m_mutex);
do
{
participant_id = RTPSDomain::getNewId();
} while (RTPSDomain::m_RTPSParticipantIDs.find(participant_id) != RTPSDomain::m_RTPSParticipantIDs.end());
participant_id = getNewId();
}

guid_prefix_create(participant_id, guid.guidPrefix);
Expand Down
26 changes: 26 additions & 0 deletions src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <chrono>
#include <thread>
#include <unordered_map>

#if defined(_WIN32) || defined(__unix__)
#include <FileWatch.hpp>
Expand Down Expand Up @@ -146,6 +147,31 @@ class RTPSDomainImpl
static void file_watch_callback();

static FileWatchHandle file_watch_handle_;

/**
* @brief Get Id to create a RTPSParticipant.
*
* This function assumes m_mutex is already locked by the caller.
*
* @return Different ID for each call.
*/
static uint32_t getNewId();

static bool prepare_participant_id(
int32_t input_id,
uint32_t& participant_id);

static uint32_t get_id_for_prefix(
uint32_t participant_id);

struct ParticipantIDState
{
uint32_t counter = 0;
bool reserved = false;
bool used = false;
};

static std::unordered_map<uint32_t, ParticipantIDState> m_RTPSParticipantIDs;
};

} // namespace rtps
Expand Down

0 comments on commit 4e3e0eb

Please sign in to comment.