Skip to content

Commit

Permalink
Fixed the lock ordering between EDP and PDP
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumeautran committed Mar 10, 2018
1 parent b87400e commit 94d808f
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 141 deletions.
7 changes: 4 additions & 3 deletions include/fastrtps/rtps/builtin/data/ParticipantProxyData.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#define PARTICIPANTPROXYDATA_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <mutex>
#include <memory>
#include "../../../qos/QosList.h"
#include "../../../qos/ParameterList.h"

Expand Down Expand Up @@ -107,11 +108,11 @@ class ParticipantProxyData
//!
std::vector<octet> m_userData;
//!
RemoteParticipantLeaseDuration* mp_leaseDurationTimer;
std::shared_ptr<RemoteParticipantLeaseDuration> mp_leaseDurationTimer;
//!
std::vector<ReaderProxyData*> m_readers;
std::vector<std::shared_ptr<ReaderProxyData> > m_readers;
//!
std::vector<WriterProxyData*> m_writers;
std::vector<std::shared_ptr<WriterProxyData> > m_writers;

/**
* Initialize the object with the data of the lcoal RTPSParticipant.
Expand Down
26 changes: 13 additions & 13 deletions include/fastrtps/rtps/builtin/discovery/participant/PDPSimple.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class PDPSimple
PDPSimple(BuiltinProtocols* builtin);
virtual ~PDPSimple();

void initializeParticipantProxyData(ParticipantProxyData* participant_data);
void initializeParticipantProxyData(std::shared_ptr<ParticipantProxyData> participant_data);

/**
* Initialize the PDP.
Expand Down Expand Up @@ -163,25 +163,25 @@ class PDPSimple
* Get a pointer to the local RTPSParticipant RTPSParticipantProxyData object.
* @return Pointer to the local RTPSParticipant RTPSParticipantProxyData object.
*/
ParticipantProxyData* getLocalParticipantProxyData()
std::shared_ptr<ParticipantProxyData> getLocalParticipantProxyData()
{
return m_participantProxies.front();
}
/**
* Get a pointer to the EDP object.
* @return pointer to the EDP object.
*/
inline EDP* getEDP(){return mp_EDP;}
inline std::shared_ptr<EDP> getEDP(){return mp_EDP;}
/**
* Get a cons_iterator to the beginning of the RTPSParticipant Proxies.
* @return const_iterator.
*/
std::vector<ParticipantProxyData*>::const_iterator ParticipantProxiesBegin(){return m_participantProxies.begin();};
std::vector<std::shared_ptr<ParticipantProxyData> >::const_iterator ParticipantProxiesBegin(){return m_participantProxies.begin();};
/**
* Get a cons_iterator to the end RTPSParticipant Proxies.
* @return const_iterator.
*/
std::vector<ParticipantProxyData*>::const_iterator ParticipantProxiesEnd(){return m_participantProxies.end();};
std::vector<std::shared_ptr<ParticipantProxyData> >::const_iterator ParticipantProxiesEnd(){return m_participantProxies.end();};

/**
* Assert the liveliness of a Remote Participant.
Expand Down Expand Up @@ -219,7 +219,7 @@ class PDPSimple
* Get the mutex.
* @return Pointer to the Mutex
*/
inline std::recursive_mutex* getMutex() const {return mp_mutex;}
inline std::shared_ptr<std::recursive_mutex> getMutex() const {return mp_mutex;}

CDRMessage_t get_participant_proxy_data_serialized(Endianness_t endian);

Expand All @@ -233,26 +233,26 @@ class PDPSimple
//!Pointer to the SPDPReader.
StatelessReader* mp_SPDPReader;
//!Pointer to the EDP object.
EDP* mp_EDP;
std::shared_ptr<EDP> mp_EDP;
//!Registered RTPSParticipants (including the local one, that is the first one.)
std::vector<ParticipantProxyData*> m_participantProxies;
std::vector<std::shared_ptr<ParticipantProxyData> > m_participantProxies;
//!Variable to indicate if any parameter has changed.
bool m_hasChangedLocalPDP;
//!TimedEvent to periodically resend the local RTPSParticipant information.
ResendParticipantProxyDataPeriod* mp_resendParticipantTimer;
std::shared_ptr<ResendParticipantProxyDataPeriod> mp_resendParticipantTimer;
//!Listener for the SPDP messages.
PDPSimpleListener* mp_listener;
std::shared_ptr<PDPSimpleListener> mp_listener;
//!WriterHistory
WriterHistory* mp_SPDPWriterHistory;
std::shared_ptr<WriterHistory> mp_SPDPWriterHistory;
//!Reader History
ReaderHistory* mp_SPDPReaderHistory;
std::shared_ptr<ReaderHistory> mp_SPDPReaderHistory;

/**
* Create the SPDP Writer and Reader
* @return True if correct.
*/
bool createSPDPEndpoints();
std::recursive_mutex* mp_mutex;
std::shared_ptr<std::recursive_mutex> mp_mutex;



Expand Down
2 changes: 1 addition & 1 deletion src/cpp/log/Log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ void Log::QueueLog(const std::string& message, const Log::Context& context, Log:
{
std::unique_lock<std::mutex> guard(mResources.mCvMutex);
mResources.mWork = true;
mResources.mCv.notify_all();
}
mResources.mCv.notify_all();
}

Log::Kind Log::GetVerbosity()
Expand Down
12 changes: 0 additions & 12 deletions src/cpp/rtps/builtin/data/ParticipantProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,6 @@ ParticipantProxyData::ParticipantProxyData(const ParticipantProxyData& pdata) :
ParticipantProxyData::~ParticipantProxyData()
{
logInfo(RTPS_PARTICIPANT,this->m_guid);
for(std::vector<ReaderProxyData*>::iterator it = this->m_readers.begin();
it!=this->m_readers.end();++it)
{
delete(*it);
}
for(std::vector<WriterProxyData*>::iterator it = this->m_writers.begin();
it!=this->m_writers.end();++it)
{
delete(*it);
}
if(this->mp_leaseDurationTimer != nullptr)
delete(mp_leaseDurationTimer);
}

bool ParticipantProxyData::initializeData(RTPSParticipantImpl* part,PDPSimple* pdp)
Expand Down
64 changes: 28 additions & 36 deletions src/cpp/rtps/builtin/discovery/endpoint/EDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ bool EDP::unpairWriterProxy(const GUID_t& participant_guid, const GUID_t& writer
logInfo(RTPS_EDP, writer_guid);

std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSReader*>::iterator rit = mp_RTPSParticipant->userReadersListBegin();
for(auto rit = mp_RTPSParticipant->userReadersListBegin();
rit!=mp_RTPSParticipant->userReadersListEnd();++rit)
{
RemoteWriterAttributes watt;
Expand Down Expand Up @@ -194,7 +194,7 @@ bool EDP::unpairReaderProxy(const GUID_t& participant_guid, const GUID_t& reader
logInfo(RTPS_EDP, reader_guid);

std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSWriter*>::iterator wit = mp_RTPSParticipant->userWritersListBegin();
for(auto wit = mp_RTPSParticipant->userWritersListBegin();
wit!=mp_RTPSParticipant->userWritersListEnd();++wit)
{
RemoteReaderAttributes ratt;
Expand Down Expand Up @@ -265,10 +265,9 @@ bool EDP::validMatching(const WriterProxyData* wdata, const ReaderProxyData* rda
}
else if(wdata->m_qos.m_partition.names.empty() && rdata->m_qos.m_partition.names.size()>0)
{
for(std::vector<std::string>::const_iterator rnameit = rdata->m_qos.m_partition.names.begin();
rnameit!=rdata->m_qos.m_partition.names.end();++rnameit)
for(auto const & rnameit : rdata->m_qos.m_partition.names)
{
if(rnameit->size()==0)
if(rnameit.size()==0)
{
matched = true;
break;
Expand All @@ -277,10 +276,9 @@ bool EDP::validMatching(const WriterProxyData* wdata, const ReaderProxyData* rda
}
else if(wdata->m_qos.m_partition.names.size()>0 && rdata->m_qos.m_partition.names.empty() )
{
for(std::vector<std::string>::const_iterator wnameit = wdata->m_qos.m_partition.names.begin();
wnameit != wdata->m_qos.m_partition.names.end();++wnameit)
for(auto const & wnameit : wdata->m_qos.m_partition.names)
{
if(wnameit->size()==0)
if(wnameit.size()==0)
{
matched = true;
break;
Expand All @@ -289,13 +287,11 @@ bool EDP::validMatching(const WriterProxyData* wdata, const ReaderProxyData* rda
}
else
{
for(std::vector<std::string>::const_iterator wnameit = wdata->m_qos.m_partition.names.begin();
wnameit != wdata->m_qos.m_partition.names.end();++wnameit)
for(auto const & wnameit : wdata->m_qos.m_partition.names)
{
for(std::vector<std::string>::const_iterator rnameit = rdata->m_qos.m_partition.names.begin();
rnameit!=rdata->m_qos.m_partition.names.end();++rnameit)
for(auto const & rnameit : rdata->m_qos.m_partition.names)
{
if(StringMatching::matchString(wnameit->c_str(),rnameit->c_str()))
if(StringMatching::matchString(wnameit.c_str(),rnameit.c_str()))
{
matched = true;
break;
Expand Down Expand Up @@ -353,10 +349,9 @@ bool EDP::validMatching(const ReaderProxyData* rdata, const WriterProxyData* wda
}
else if(rdata->m_qos.m_partition.names.empty() && wdata->m_qos.m_partition.names.size()>0)
{
for(std::vector<std::string>::const_iterator rnameit = wdata->m_qos.m_partition.names.begin();
rnameit!=wdata->m_qos.m_partition.names.end();++rnameit)
for(auto const & rnameit : wdata->m_qos.m_partition.names)
{
if(rnameit->size()==0)
if(rnameit.size()==0)
{
matched = true;
break;
Expand All @@ -365,10 +360,9 @@ bool EDP::validMatching(const ReaderProxyData* rdata, const WriterProxyData* wda
}
else if(rdata->m_qos.m_partition.names.size()>0 && wdata->m_qos.m_partition.names.empty() )
{
for(std::vector<std::string>::const_iterator wnameit = rdata->m_qos.m_partition.names.begin();
wnameit != rdata->m_qos.m_partition.names.end();++wnameit)
for(auto const & wnameit : rdata->m_qos.m_partition.names)
{
if(wnameit->size()==0)
if(wnameit.size()==0)
{
matched = true;
break;
Expand All @@ -377,13 +371,11 @@ bool EDP::validMatching(const ReaderProxyData* rdata, const WriterProxyData* wda
}
else
{
for(std::vector<std::string>::const_iterator wnameit = rdata->m_qos.m_partition.names.begin();
wnameit != rdata->m_qos.m_partition.names.end();++wnameit)
for(auto const & wnameit : rdata->m_qos.m_partition.names)
{
for(std::vector<std::string>::const_iterator rnameit = wdata->m_qos.m_partition.names.begin();
rnameit!=wdata->m_qos.m_partition.names.end();++rnameit)
for(auto const & rnameit : wdata->m_qos.m_partition.names)
{
if(StringMatching::matchString(wnameit->c_str(),rnameit->c_str()))
if(StringMatching::matchString(wnameit.c_str(),rnameit.c_str()))
{
matched = true;
break;
Expand All @@ -409,13 +401,13 @@ bool EDP::pairingReader(RTPSReader* R, const ParticipantProxyData& pdata, const
logInfo(RTPS_EDP, rdata.guid() <<" in topic: \"" << rdata.topicName() <<"\"");
std::lock_guard<std::recursive_mutex> pguard(*mp_PDP->getMutex());

for(std::vector<ParticipantProxyData*>::const_iterator pit = mp_PDP->ParticipantProxiesBegin();
for(auto pit = mp_PDP->ParticipantProxiesBegin();
pit!=mp_PDP->ParticipantProxiesEnd(); ++pit)
{
for(std::vector<WriterProxyData*>::iterator wdatait = (*pit)->m_writers.begin();
for(auto wdatait = (*pit)->m_writers.begin();
wdatait != (*pit)->m_writers.end(); ++wdatait)
{
bool valid = validMatching(&rdata, *wdatait);
bool valid = validMatching(&rdata, (*wdatait).get());

if(valid)
{
Expand Down Expand Up @@ -483,13 +475,13 @@ bool EDP::pairingWriter(RTPSWriter* W, const ParticipantProxyData& pdata, const
logInfo(RTPS_EDP, W->getGuid() << " in topic: \"" << wdata.topicName() <<"\"");
std::lock_guard<std::recursive_mutex> pguard(*mp_PDP->getMutex());

for(std::vector<ParticipantProxyData*>::const_iterator pit = mp_PDP->ParticipantProxiesBegin();
for(auto pit = mp_PDP->ParticipantProxiesBegin();
pit!=mp_PDP->ParticipantProxiesEnd(); ++pit)
{
for(std::vector<ReaderProxyData*>::iterator rdatait = (*pit)->m_readers.begin();
for(auto rdatait = (*pit)->m_readers.begin();
rdatait!=(*pit)->m_readers.end(); ++rdatait)
{
bool valid = validMatching(&wdata, *rdatait);
bool valid = validMatching(&wdata, (*rdatait).get());

if(valid)
{
Expand Down Expand Up @@ -555,7 +547,7 @@ bool EDP::pairing_reader_proxy_with_any_local_writer(ParticipantProxyData* pdata

logInfo(RTPS_EDP, rdata->guid() <<" in topic: \"" << rdata->topicName() <<"\"");
std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSWriter*>::iterator wit = mp_RTPSParticipant->userWritersListBegin();
for(auto wit = mp_RTPSParticipant->userWritersListBegin();
wit!=mp_RTPSParticipant->userWritersListEnd();++wit)
{
(*wit)->getMutex()->lock();
Expand Down Expand Up @@ -631,7 +623,7 @@ bool EDP::pairing_reader_proxy_with_local_writer(const GUID_t& local_writer, con
{
logInfo(RTPS_EDP, rdata.guid() <<" in topic: \"" << rdata.topicName() <<"\"");
std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSWriter*>::iterator wit = mp_RTPSParticipant->userWritersListBegin();
for(auto wit = mp_RTPSParticipant->userWritersListBegin();
wit!=mp_RTPSParticipant->userWritersListEnd();++wit)
{
(*wit)->getMutex()->lock();
Expand Down Expand Up @@ -702,7 +694,7 @@ bool EDP::pairing_remote_reader_with_local_writer_after_crypto(const GUID_t& loc
const ReaderProxyData& remote_reader_data)
{
std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSWriter*>::iterator wit = mp_RTPSParticipant->userWritersListBegin();
for(auto wit = mp_RTPSParticipant->userWritersListBegin();
wit!=mp_RTPSParticipant->userWritersListEnd();++wit)
{
(*wit)->getMutex()->lock();
Expand Down Expand Up @@ -738,7 +730,7 @@ bool EDP::pairing_writer_proxy_with_any_local_reader(ParticipantProxyData *pdata

logInfo(RTPS_EDP, wdata->guid() <<" in topic: \"" << wdata->topicName() <<"\"");
std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSReader*>::iterator rit = mp_RTPSParticipant->userReadersListBegin();
for(auto rit = mp_RTPSParticipant->userReadersListBegin();
rit!=mp_RTPSParticipant->userReadersListEnd();++rit)
{
GUID_t readerGUID;
Expand Down Expand Up @@ -814,7 +806,7 @@ bool EDP::pairing_writer_proxy_with_local_reader(const GUID_t& local_reader, con
{
logInfo(RTPS_EDP, wdata.guid() <<" in topic: \"" << wdata.topicName() <<"\"");
std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSReader*>::iterator rit = mp_RTPSParticipant->userReadersListBegin();
for(auto rit = mp_RTPSParticipant->userReadersListBegin();
rit!=mp_RTPSParticipant->userReadersListEnd();++rit)
{
GUID_t readerGUID;
Expand Down Expand Up @@ -886,7 +878,7 @@ bool EDP::pairing_remote_writer_with_local_reader_after_crypto(const GUID_t& loc
const WriterProxyData& remote_writer_data)
{
std::lock_guard<std::recursive_mutex> guard(*mp_RTPSParticipant->getParticipantMutex());
for(std::vector<RTPSReader*>::iterator rit = mp_RTPSParticipant->userReadersListBegin();
for(auto rit = mp_RTPSParticipant->userReadersListBegin();
rit!=mp_RTPSParticipant->userReadersListEnd();++rit)
{
GUID_t readerGUID;
Expand Down
8 changes: 4 additions & 4 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPStatic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ bool EDPStatic::processLocalReaderProxyData(ReaderProxyData* rdata)
logInfo(RTPS_EDP,rdata->guid().entityId<< " in topic: " <<rdata->topicName());
mp_PDP->getMutex()->lock();
//Add the property list entry to our local pdp
ParticipantProxyData* localpdata = this->mp_PDP->getLocalParticipantProxyData();
auto localpdata = this->mp_PDP->getLocalParticipantProxyData();
localpdata->m_properties.properties.push_back(EDPStaticProperty::toProperty("Reader","ALIVE", rdata->userDefinedId(), rdata->guid().entityId));
mp_PDP->getMutex()->unlock();
this->mp_PDP->announceParticipantState(true);
Expand All @@ -118,7 +118,7 @@ bool EDPStatic::processLocalWriterProxyData(WriterProxyData* wdata)
logInfo(RTPS_EDP ,wdata->guid().entityId << " in topic: " << wdata->topicName());
mp_PDP->getMutex()->lock();
//Add the property list entry to our local pdp
ParticipantProxyData* localpdata = this->mp_PDP->getLocalParticipantProxyData();
auto localpdata = this->mp_PDP->getLocalParticipantProxyData();
localpdata->m_properties.properties.push_back(EDPStaticProperty::toProperty("Writer","ALIVE",
wdata->userDefinedId(), wdata->guid().entityId));
mp_PDP->getMutex()->unlock();
Expand All @@ -129,7 +129,7 @@ bool EDPStatic::processLocalWriterProxyData(WriterProxyData* wdata)
bool EDPStatic::removeLocalReader(RTPSReader* R)
{
std::lock_guard<std::recursive_mutex> guard(*mp_PDP->getMutex());
ParticipantProxyData* localpdata = this->mp_PDP->getLocalParticipantProxyData();
auto localpdata = this->mp_PDP->getLocalParticipantProxyData();
for(std::vector<std::pair<std::string,std::string>>::iterator pit = localpdata->m_properties.properties.begin();
pit!=localpdata->m_properties.properties.end();++pit)
{
Expand All @@ -149,7 +149,7 @@ bool EDPStatic::removeLocalReader(RTPSReader* R)
bool EDPStatic::removeLocalWriter(RTPSWriter*W)
{
std::lock_guard<std::recursive_mutex> guard(*mp_PDP->getMutex());
ParticipantProxyData* localpdata = this->mp_PDP->getLocalParticipantProxyData();
auto localpdata = this->mp_PDP->getLocalParticipantProxyData();
for(std::vector<std::pair<std::string,std::string>>::iterator pit = localpdata->m_properties.properties.begin();
pit!=localpdata->m_properties.properties.end();++pit)
{
Expand Down
Loading

0 comments on commit 94d808f

Please sign in to comment.