Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick smallest available participant ID for new paricipants (backport #3437) #4555

Merged
merged 4 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 65 additions & 27 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <regex>
#include <string>
#include <thread>
#include <unordered_map>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/history/WriterHistory.h>
Expand Down Expand Up @@ -61,7 +62,7 @@ static void guid_prefix_create(
std::mutex RTPSDomain::m_mutex;
std::atomic<uint32_t> RTPSDomain::m_maxRTPSParticipantID(1);
std::vector<RTPSDomain::t_p_RTPSParticipant> RTPSDomain::m_RTPSParticipants;
std::set<uint32_t> RTPSDomain::m_RTPSParticipantIDs;
std::unordered_map<uint32_t, RTPSDomainImpl::ParticipantIDState> RTPSDomainImpl::m_RTPSParticipantIDs;
FileWatchHandle RTPSDomainImpl::file_watch_handle_;

void RTPSDomain::stopAll()
Expand All @@ -75,7 +76,8 @@ void RTPSDomain::stopAll()
while (m_RTPSParticipants.size() > 0)
{
RTPSDomain::t_p_RTPSParticipant participant = m_RTPSParticipants.back();
m_RTPSParticipantIDs.erase(m_RTPSParticipantIDs.find(participant.second->getRTPSParticipantID()));
RTPSDomainImpl::m_RTPSParticipantIDs.erase(RTPSDomainImpl::m_RTPSParticipantIDs.find(participant.second->
getRTPSParticipantID()));
m_RTPSParticipants.pop_back();

lock.unlock();
Expand Down Expand Up @@ -129,26 +131,9 @@ RTPSParticipant* RTPSDomain::createParticipant(
}

uint32_t ID;
if (!RTPSDomainImpl::prepare_participant_id(PParam.participantID, ID))
{
std::lock_guard<std::mutex> guard(m_mutex);

if (PParam.participantID < 0)
{
ID = getNewId();
while (m_RTPSParticipantIDs.insert(ID).second == false)
{
ID = getNewId();
}
}
else
{
ID = PParam.participantID;
if (m_RTPSParticipantIDs.insert(ID).second == false)
{
logError(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return nullptr;
}
}
return nullptr;
}

if (!PParam.defaultUnicastLocatorList.isValid())
Expand All @@ -166,7 +151,7 @@ RTPSParticipant* RTPSDomain::createParticipant(

// Generate a new GuidPrefix_t
GuidPrefix_t guidP;
guid_prefix_create(ID, guidP);
guid_prefix_create(RTPSDomainImpl::get_id_for_prefix(ID), guidP);

RTPSParticipant* p = new RTPSParticipant(nullptr);
RTPSParticipantImpl* pimpl = nullptr;
Expand Down Expand Up @@ -224,6 +209,8 @@ RTPSParticipant* RTPSDomain::createParticipant(
{
std::lock_guard<std::mutex> guard(m_mutex);
m_RTPSParticipants.push_back(t_p_RTPSParticipant(p, pimpl));
RTPSDomainImpl::m_RTPSParticipantIDs[ID].used = true;
RTPSDomainImpl::m_RTPSParticipantIDs[ID].reserved = true;
}

// Check the environment file in case it was modified during participant creation leading to a missed callback.
Expand Down Expand Up @@ -256,7 +243,9 @@ bool RTPSDomain::removeRTPSParticipant(
{
RTPSDomain::t_p_RTPSParticipant participant = *it;
m_RTPSParticipants.erase(it);
m_RTPSParticipantIDs.erase(m_RTPSParticipantIDs.find(participant.second->getRTPSParticipantID()));
uint32_t participant_id = participant.second->getRTPSParticipantID();
RTPSDomainImpl::m_RTPSParticipantIDs[participant_id].used = false;
RTPSDomainImpl::m_RTPSParticipantIDs[participant_id].reserved = false;
lock.unlock();
removeRTPSParticipant_nts(participant);
return true;
Expand Down Expand Up @@ -547,17 +536,66 @@ RTPSParticipant* RTPSDomain::clientServerEnvironmentCreationOverride(
return nullptr;
}

uint32_t RTPSDomainImpl::getNewId()
{
// Get the smallest available participant ID.
// Settings like maxInitialPeersRange control how many participants a peer
// will look for on this host.
// Choosing the smallest value ensures peers using unicast discovery will
// find this participant as long as the total number of participants has
// not exceeded the number of peers they will look for.
uint32_t i = 0;
while (m_RTPSParticipantIDs[i].reserved || m_RTPSParticipantIDs[i].used)
{
++i;
}
m_RTPSParticipantIDs[i].reserved = true;
return i;
}

bool RTPSDomainImpl::prepare_participant_id(
int32_t input_id,
uint32_t& participant_id)
{
std::lock_guard<std::mutex> guard(RTPSDomain::m_mutex);
if (input_id < 0)
{
participant_id = getNewId();
}
else
{
participant_id = input_id;
if (m_RTPSParticipantIDs[participant_id].used == true)
{
logError(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
return false;
}
}
return true;
}

uint32_t RTPSDomainImpl::get_id_for_prefix(
uint32_t participant_id)
{
uint32_t ret = participant_id;
if (ret < 0x10000)
{
std::lock_guard<std::mutex> guard(RTPSDomain::m_mutex);
ret |= m_RTPSParticipantIDs[participant_id].counter;
m_RTPSParticipantIDs[participant_id].counter += 0x10000;
}

return ret;
}

void RTPSDomainImpl::create_participant_guid(
int32_t& participant_id,
GUID_t& guid)
{
if (participant_id < 0)
{
std::lock_guard<std::mutex> guard(RTPSDomain::m_mutex);
do
{
participant_id = RTPSDomain::getNewId();
} while (RTPSDomain::m_RTPSParticipantIDs.find(participant_id) != RTPSDomain::m_RTPSParticipantIDs.end());
participant_id = getNewId();
}

guid_prefix_create(participant_id, guid.guidPrefix);
Expand Down
26 changes: 26 additions & 0 deletions src/cpp/rtps/RTPSDomainImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <chrono>
#include <thread>
#include <unordered_map>

#if defined(_WIN32) || defined(__unix__)
#include <FileWatch.hpp>
Expand Down Expand Up @@ -146,6 +147,31 @@ class RTPSDomainImpl
static void file_watch_callback();

static FileWatchHandle file_watch_handle_;

/**
* @brief Get Id to create a RTPSParticipant.
*
* This function assumes m_mutex is already locked by the caller.
*
* @return Different ID for each call.
*/
static uint32_t getNewId();

static bool prepare_participant_id(
int32_t input_id,
uint32_t& participant_id);

static uint32_t get_id_for_prefix(
uint32_t participant_id);

struct ParticipantIDState
{
uint32_t counter = 0;
bool reserved = false;
bool used = false;
};

static std::unordered_map<uint32_t, ParticipantIDState> m_RTPSParticipantIDs;
};

} // namespace rtps
Expand Down
Loading