Skip to content

Commit

Permalink
Fix mutex lock count on PDPListener (#2020)
Browse files Browse the repository at this point in the history
* Refs 11857. Added regression test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11857. Fixed issue.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11857. Bonus: fixed PDPServerListener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 11857. Added comments on test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
(cherry picked from commit 392defc)

# Conflicts:
#	src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp
  • Loading branch information
MiguelCompany authored and mergify-bot committed Jun 24, 2021
1 parent 661a563 commit 4192e76
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 7 deletions.
7 changes: 4 additions & 3 deletions src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,12 @@ void PDPListener::onNewCacheChangeAdded(
{
// Create a new one when not found
pdata = parent_pdp_->createParticipantProxyData(temp_participant_data_, writer_guid);

reader->getMutex().unlock();
lock.unlock();

if (pdata != nullptr)
{
reader->getMutex().unlock();
lock.unlock();

logInfo(RTPS_PDP_DISCOVERY, "New participant "
<< pdata->m_guid << " at "
<< "MTTLoc: " << pdata->metatraffic_locators
Expand Down
168 changes: 168 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void PDPServerListener::onNewCacheChangeAdded(
return;
}

<<<<<<< HEAD
ParticipantProxyData local_data(parent_pdp_->getRTPSParticipant()->getRTPSParticipantAttributes().allocation);

// Load information on local_data
Expand All @@ -100,6 +101,162 @@ void PDPServerListener::onNewCacheChangeAdded(
{
change->instanceHandle = local_data.m_key;
guid = local_data.m_guid;
=======
// Deserialize the payload to access the discovery info
CDRMessage_t msg(change->serializedPayload);
temp_participant_data_.clear();
auto participant_data = temp_participant_data_;

if (participant_data.readFromCDRMessage(
&msg,
true,
pdp_server()->getRTPSParticipant()->network_factory(),
pdp_server()->getRTPSParticipant()->has_shm_transport()))
{
/* Check PID_VENDOR_ID */
if (participant_data.m_VendorId != fastrtps::rtps::c_VendorId_eProsima)
{
logInfo(RTPS_PDP_LISTENER,
"DATA(p|Up) from different vendor is not supported for Discover-Server operation");
return;
}

fastrtps::ParameterPropertyList_t properties = participant_data.m_properties;

/* Check DS_VERSION */
auto ds_version = std::find_if(
properties.begin(),
properties.end(),
[](const dds::ParameterProperty_t& property)
{
return property.first() == dds::parameter_property_ds_version;
});

if (ds_version != properties.end())
{
if (std::stof(ds_version->second()) < 1.0)
{
logError(RTPS_PDP_LISTENER, "Minimum " << dds::parameter_property_ds_version
<< " is 1.0, found: " << ds_version->second());
return;
}
logInfo(RTPS_PDP_LISTENER, "Participant " << dds::parameter_property_ds_version << ": "
<< ds_version->second());
}
else
{
logInfo(RTPS_PDP_LISTENER, dds::parameter_property_ds_version << " is not set. Assuming 1.0");
}

/* Check PARTICIPANT_TYPE */
bool is_client = true;
auto participant_type = std::find_if(
properties.begin(),
properties.end(),
[](const dds::ParameterProperty_t& property)
{
return property.first() == dds::parameter_property_participant_type;
});

if (participant_type != properties.end())
{
if (participant_type->second() == ParticipantType::SERVER ||
participant_type->second() == ParticipantType::BACKUP ||
participant_type->second() == ParticipantType::SUPER_CLIENT)
{
is_client = false;
}
else if (participant_type->second() == ParticipantType::SIMPLE)
{
logInfo(RTPS_PDP_LISTENER, "Ignoring " << dds::parameter_property_participant_type << ": "
<< participant_type->second());
return;
}
else if (participant_type->second() != ParticipantType::CLIENT)
{
logError(RTPS_PDP_LISTENER, "Wrong " << dds::parameter_property_participant_type << ": "
<< participant_type->second());
return;
}
logInfo(RTPS_PDP_LISTENER, "Participant type " << participant_type->second());
}
else
{
logInfo(RTPS_PDP_LISTENER, dds::parameter_property_participant_type << " is not set");
// Fallback to checking whether participant is a SERVER looking for the persistence GUID
auto persistence_guid = std::find_if(
properties.begin(),
properties.end(),
[](const dds::ParameterProperty_t& property)
{
return property.first() == dds::parameter_property_persistence_guid;
});
// The presence of persistence GUID property suggests a SERVER. This assumption is made to keep
// backwards compatibility with Discovery Server v1.0. However, any participant that has been configured
// as persistent will have this property.
if (persistence_guid != properties.end())
{
is_client = false;
}
logInfo(RTPS_PDP_LISTENER, "Participant is client: " << std::boolalpha << is_client);
}

// Check whether the participant is a client/server of this server or if it has been forwarded from
// another entity (server).
// is_local means that the server is connected (or will be) with this entity directly
bool is_local = true;

// In case a new changes arrives from a local entity, but the ParticipantProxyData already exists
// because we know it from other server
bool was_local = true;

// If the instance handle is different from the writer GUID, then the change has been relayed
if (iHandle2GUID(change->instanceHandle).guidPrefix != change->writerGUID.guidPrefix)
{
is_local = false;
}
else
{
// We already know that the writer and the entity are the same, so we can use writerGUID
was_local = pdp_server()->discovery_db().is_participant_local(change->writerGUID.guidPrefix);
}

if (!pdp_server()->discovery_db().backup_in_progress())
{
// Notify the DiscoveryDataBase
if (pdp_server()->discovery_db().update(
change.get(),
ddb::DiscoveryParticipantChangeData(
participant_data.metatraffic_locators,
is_client,
is_local)))
{
// Remove change from PDP reader history, but do not return it to the pool. From here on, the discovery
// database takes ownership of the CacheChange_t. Henceforth there are no references to the change.
// Take change ownership away from the unique pointer, so that its destruction does not destroy the data
pdp_history->remove_change(pdp_history->find_change(change.release()), false);

// Ensure processing time for the cache by triggering the Server thread (which process the updates)
// The server does not have to postpone the execution of the routine if a change is received, i.e.
// the server routine is triggered instantly as the default value of the interval that the server has
// to wait is 0.
pdp_server()->awake_routine_thread();

// TODO: when the DiscoveryDataBase allows updating capabilities we can dismissed old PDP processing
}
else
{
// If the database doesn't take the ownership, then return the CacheChante_t to the pool.
pdp_reader->releaseCache(change.release());
}

}
else
{
// Release the unique pointer, not the change in the pool
change.release();
}
>>>>>>> 392defcc0 (Fix mutex lock count on PDPListener (#2020))

// At this point we can release reader lock.
reader->getMutex().unlock();
Expand All @@ -124,8 +281,14 @@ void PDPServerListener::onNewCacheChangeAdded(
logInfo(RTPS_PDP, "Registering a new participant: " <<
change->write_params.sample_identity().writer_guid());

<<<<<<< HEAD
// Create a new one when not found
pdata = parent_pdp_->createParticipantProxyData(local_data, writer_guid);
=======
// Create a new participant proxy entry
pdata = pdp_server()->createParticipantProxyData(participant_data, writer_guid);
// Realease PDP mutex
>>>>>>> 392defcc0 (Fix mutex lock count on PDPListener (#2020))
lock.unlock();

if (pdata != nullptr)
Expand All @@ -142,7 +305,12 @@ void PDPServerListener::onNewCacheChangeAdded(
}
else
{
<<<<<<< HEAD
pdata->updateData(local_data);
=======
// Update proxy
pdata->updateData(participant_data);
>>>>>>> 392defcc0 (Fix mutex lock count on PDPListener (#2020))
pdata->isAlive = true;
// activate lease duration if the DATA(p) comes directly from the client
bool previous_lease_check_status = pdata->should_check_lease_duration;
Expand Down
2 changes: 2 additions & 0 deletions test/blackbox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND)
${CMAKE_CURRENT_BINARY_DIR}/PubSubWriter.xml)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/PubSubReader.xml.in
${CMAKE_CURRENT_BINARY_DIR}/PubSubReader.xml)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/discovery_participant_flags.xml
${CMAKE_CURRENT_BINARY_DIR}/discovery_participant_flags.xml)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/persistence.xml
${CMAKE_CURRENT_BINARY_DIR}/persistence.xml)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/utils/check_guid.py
Expand Down
52 changes: 49 additions & 3 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,52 @@ class PubSubReader
std::cout << "Reader discovery finished..." << std::endl;
}

bool wait_participant_discovery(
unsigned int min_participants = 1,
std::chrono::seconds timeout = std::chrono::seconds::zero())
{
bool ret_value = true;
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Reader is waiting discovery of at least " << min_participants << " participants..." << std::endl;

if (timeout == std::chrono::seconds::zero())
{
cvDiscovery_.wait(lock, [&]()
{
return participant_matched_ >= min_participants;
});
}
else
{
if (!cvDiscovery_.wait_for(lock, timeout, [&]()
{
return participant_matched_ >= min_participants;
}))
{
ret_value = false;
}
}

if (ret_value)
{
std::cout << "Reader participant discovery finished successfully..." << std::endl;
}
else
{
std::cout << "Reader participant discovery finished unsuccessfully..." << std::endl;
}

return ret_value;
}

bool wait_participant_undiscovery(
std::chrono::seconds timeout = std::chrono::seconds::zero())
{
bool ret_value = true;
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Reader is waiting undiscovery..." << std::endl;
std::cout << "Reader is waiting participant undiscovery..." << std::endl;

if (timeout == std::chrono::seconds::zero())
{
Expand All @@ -519,11 +558,11 @@ class PubSubReader

if (ret_value)
{
std::cout << "Reader undiscovery finished successfully..." << std::endl;
std::cout << "Reader participant undiscovery finished successfully..." << std::endl;
}
else
{
std::cout << "Reader undiscovery finished unsuccessfully..." << std::endl;
std::cout << "Reader participant undiscovery finished unsuccessfully..." << std::endl;
}

return ret_value;
Expand Down Expand Up @@ -881,6 +920,13 @@ class PubSubReader
return *this;
}

PubSubReader& ignore_participant_flags(
eprosima::fastrtps::rtps::ParticipantFilteringFlags_t flags)
{
participant_qos_.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = flags;
return *this;
}

PubSubReader& socket_buffer_size(
uint32_t sockerBufferSize)
{
Expand Down
2 changes: 1 addition & 1 deletion test/blackbox/common/BlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1117,4 +1117,4 @@ TEST(Discovery, ServerClientEnvironmentSetUp)
output.clear();
ASSERT_FALSE(load_environment_server_info(text, output));

}
}
55 changes: 55 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gtest/gtest.h>

#include "BlackboxTests.hpp"

#include "PubSubReader.hpp"

// Regression test for redmine issue 11857
TEST(DDSDiscovery, IgnoreParticipantFlags)
{
// This participant is created with:
// - ignoreParticipantFlags = FILTER_SAME_PROCESS (will avoid discovery of p2)
// - metatrafficUnicastLocatorList = 127.0.0.1:7399, 127.0.0.1:7398 (to ensure two listening threads are created)
PubSubReader<HelloWorldType> p1(TEST_TOPIC_NAME);
p1.set_xml_filename("discovery_participant_flags.xml");
p1.set_participant_profile("participant_1");
p1.init();
EXPECT_TRUE(p1.isInitialized());

// This participant is created with initialPeersList = 127.0.0.1:7399
// When the announcements of this participant arrive to p1, they will be ignored, and thus p1 will not
// announce itself back to p2.
PubSubReader<HelloWorldType> p2(TEST_TOPIC_NAME);
p2.set_xml_filename("discovery_participant_flags.xml");
p2.set_participant_profile("participant_2");
p2.init();
EXPECT_TRUE(p2.isInitialized());
EXPECT_FALSE(p2.wait_participant_discovery(1, std::chrono::seconds(1)));
EXPECT_FALSE(p1.wait_participant_discovery(1, std::chrono::seconds(1)));

// This participant is created with:
// - initialPeersList = 127.0.0.1:7398
// - a custom guid prefix
// The announcements of this participant will arrive to p1 on a different listening thread.
// Due to the custom prefix, they should not be ignored, and mutual discovery should happen
PubSubReader<HelloWorldType> p3(TEST_TOPIC_NAME);
p3.set_xml_filename("discovery_participant_flags.xml");
p3.set_participant_profile("participant_3");
p3.init();
EXPECT_TRUE(p1.wait_participant_discovery());
EXPECT_TRUE(p3.wait_participant_discovery());
}
Loading

0 comments on commit 4192e76

Please sign in to comment.