Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Discovery Server without Custom Listening Address [12570] #2247

Closed
wants to merge 10 commits into from
11 changes: 8 additions & 3 deletions src/cpp/rtps/RTPSDomain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,16 @@ RTPSParticipant* RTPSDomain::createParticipant(
// Above constructors create the sender resources. If a given listening port cannot be allocated an iterative
// mechanism will allocate another by default. Change the default listening port is unacceptable for server
// discovery.
if ((PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol_t::SERVER
if (
(PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol_t::SERVER
|| PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol_t::BACKUP)
&& pimpl->did_mutation_took_place_on_meta(
&&
(!PParam.builtin.metatrafficMulticastLocatorList.empty()
|| !PParam.builtin.metatrafficUnicastLocatorList.empty())
&&
(pimpl->did_mutation_took_place_on_meta(
PParam.builtin.metatrafficMulticastLocatorList,
PParam.builtin.metatrafficUnicastLocatorList))
PParam.builtin.metatrafficUnicastLocatorList)))
{
// we do not log an error because the library may use participant creation as a trial for server existence
logInfo(RTPS_PARTICIPANT, "Server wasn't able to allocate the specified listening port");
Expand Down
45 changes: 22 additions & 23 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ void DiscoveryDataBase::create_writers_from_change_(
}
else
{
logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " as no associated participant. Skipping");
logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " has no associated participant. Skipping");
return;
}

Expand Down Expand Up @@ -987,7 +987,7 @@ void DiscoveryDataBase::create_readers_from_change_(
}
else
{
logError(DISCOVERY_DATABASE, "Writer " << reader_guid << " as no associated participant. Skipping");
logError(DISCOVERY_DATABASE, "Reader " << reader_guid << " has no associated participant. Skipping");
return;
}

Expand Down Expand Up @@ -1723,14 +1723,14 @@ void DiscoveryDataBase::AckedFunctor::operator () (
{
// If the participant is already in the DB it means it has answered to the pinging
// or that is pinging us and we have already received its DATA(p)
// If neither of both has happenned we should not wait for it to ack this data, so we
// If either of both has happenned we should not wait for it to ack this data, so we
// skip it and leave it as acked
auto remote_server_it = db_->participants_.find(*it);
if (remote_server_it == db_->participants_.end())
if (remote_server_it != db_->participants_.end())
{
logInfo(DISCOVERY_DATABASE, "Change " << change_->instanceHandle <<
"check as acked for " << reader_proxy->guid() << " as it has not answered pinging yet");
return;
remote_server_it->second.add_or_update_ack_participant(db_->server_guid_prefix_, true);
}
}
}
Expand All @@ -1755,25 +1755,24 @@ void DiscoveryDataBase::unmatch_participant_(
"Attempting to unmatch an unexisting participant: " << guid_prefix);
}

// For each relevant participant make not relevant
for (eprosima::fastrtps::rtps::GuidPrefix_t relevant_participant : pit->second.relevant_participants())
// For each participant remove it
// IMPORTANT: This is not for every relevant participant, as participant A could be in other participant's B info
// and B not be relevant for A
// Using for with a map does copy the values, do not give references. Find is needed.
for (auto participant_it : participants_)
{
if (relevant_participant != guid_prefix)
{
auto rpit = participants_.find(relevant_participant);
if (rpit == participants_.end())
{
// This is not an error. Remote participants will try to unmatch with participants even
// when the match is not reciprocal
logInfo(DISCOVERY_DATABASE,
"Participant " << relevant_participant << " matched with an unexisting participant: " <<
guid_prefix);
}
else
{
rpit->second.remove_participant(guid_prefix);
}
}
auto it = participants_.find(participant_it.first);
it->second.remove_participant(guid_prefix);
}
for (auto writer_it : writers_)
{
auto it = writers_.find(writer_it.first);
it->second.remove_participant(guid_prefix);
}
for (auto reader_it : readers_)
{
auto it = readers_.find(reader_it.first);
it->second.remove_participant(guid_prefix);
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,16 @@ void PDP::initializeParticipantProxyData(

// Keep persistence Guid_Prefix_t in a specific property. This info must be propagated to all builtin endpoints
{
GuidPrefix_t persistent = mp_RTPSParticipant->getAttributes().prefix;
// Use user set persistence guid
GuidPrefix_t persistent = mp_RTPSParticipant->get_persistence_guid_prefix();

// If it has not been set, use guid
if (persistent == c_GuidPrefix_Unknown)
{
persistent = mp_RTPSParticipant->getAttributes().prefix;
}

// If persistent is set, set it into the participant proxy
if (persistent != c_GuidPrefix_Unknown)
{
participant_data->set_persistence_guid(
Expand Down
4 changes: 4 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,10 @@ bool PDPServer::process_to_send_lists()
logInfo(RTPS_PDP_SERVER, "Processing pdp_to_send");
process_to_send_list(discovery_db_.pdp_to_send(), mp_PDPWriter, mp_PDPWriterHistory);
}
else if(!discovery_db_.server_acked_by_all())
{
announceParticipantState(false);
}
else
{
logInfo(RTPS_PDP_SERVER, "Skiping sending PDP data because no entities have been discovered or updated");
Expand Down
16 changes: 13 additions & 3 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ RTPSParticipantImpl::RTPSParticipantImpl(
TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
if (pT && pT->listening_ports.empty())
{
logError(RTPS_PARTICIPANT,
logInfo(RTPS_PARTICIPANT,
"Participant " << m_att.getName() << " with GUID " << m_guid
<< " tries to use discovery server over TCP without providing a proper listening port");
<< " tries to use discovery server over TCP"
<< " without providing a proper listening port");
}
}
default:
Expand Down Expand Up @@ -416,7 +417,16 @@ RTPSParticipantImpl::RTPSParticipantImpl(

mp_builtinProtocols = new BuiltinProtocols();

logInfo(RTPS_PARTICIPANT, "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix);
if (c_GuidPrefix_Unknown != persistence_guid)
{
logInfo(RTPS_PARTICIPANT, "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix
<< " and persistence guid: " << persistence_guid);
}
else
{
logInfo(RTPS_PARTICIPANT,
"RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix);
}
}

RTPSParticipantImpl::RTPSParticipantImpl(
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,12 @@ class RTPSParticipantImpl
client_override_ = value;
}

//! Retrieve persistence guid prefix
GuidPrefix_t get_persistence_guid_prefix() const
{
return m_persistence_guid.guidPrefix;
}

private:

//! DomainId
Expand Down
28 changes: 24 additions & 4 deletions src/cpp/rtps/reader/RTPSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,30 @@ void RTPSReader::add_persistence_guid(
const GUID_t& guid,
const GUID_t& persistence_guid)
{
GUID_t persistence_guid_to_store = (c_Guid_Unknown == persistence_guid) ? guid : persistence_guid;
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
history_state_->persistence_guid_map[guid] = persistence_guid_to_store;
history_state_->persistence_guid_count[persistence_guid_to_store]++;
if (c_Guid_Unknown == persistence_guid || persistence_guid == guid)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
history_state_->persistence_guid_map[guid] = guid;
history_state_->persistence_guid_count[guid]++;
}
else
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
history_state_->persistence_guid_map[guid] = persistence_guid;
history_state_->persistence_guid_count[persistence_guid]++;

// Could happen that a value has already been stored in the record with the guid and not the persistence guid
// This is because received_change is called before Proxy is created
// In this case, we substitute the guid for the persistence (in case they are not equal)
auto spourious_record = history_state_->history_record.find(guid);
if (spourious_record != history_state_->history_record.end())
{
logInfo(RTPS_READER, "Sporious record found, changing guid "
<< guid << " for persistence guid " << persistence_guid);
update_last_notified(guid, spourious_record->second);
history_state_->history_record.erase(spourious_record);
}
}
}

bool RTPSReader::may_remove_history_record(
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,11 @@ bool StatefulReader::change_received(
"Writer Proxy " << a_change->writerGUID << " not matched to this Reader " << m_guid.entityId);
return false;
}
else if (a_change->kind != eprosima::fastrtps::rtps::ChangeKind_t::ALIVE)
{
logInfo(RTPS_READER, "Not alived change " << a_change->writerGUID << " has not WriterProxy");
return false;
}
else
{
// handle framework messages in a stateless fashion
Expand All @@ -833,6 +838,9 @@ bool StatefulReader::change_received(
}
}

logInfo(RTPS_READER, "Change received from " << a_change->writerGUID << " with sequence number: "
<< a_change->sequenceNumber <<
" skipped. Higher sequence numbers has being received.");
return false;
}
}
Expand Down