diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp index f80c6029ccf..1178780be96 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp @@ -125,11 +125,12 @@ void PDPListener::onNewCacheChangeAdded( { // Create a new one when not found pdata = parent_pdp_->createParticipantProxyData(temp_participant_data_, writer_guid); + + reader->getMutex().unlock(); + lock.unlock(); + if (pdata != nullptr) { - reader->getMutex().unlock(); - lock.unlock(); - logInfo(RTPS_PDP_DISCOVERY, "New participant " << pdata->m_guid << " at " << "MTTLoc: " << pdata->metatraffic_locators diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp index 5cb72c8837c..8fca5db7d95 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp @@ -90,6 +90,7 @@ void PDPServerListener::onNewCacheChangeAdded( return; } +<<<<<<< HEAD ParticipantProxyData local_data(parent_pdp_->getRTPSParticipant()->getRTPSParticipantAttributes().allocation); // Load information on local_data @@ -100,6 +101,162 @@ void PDPServerListener::onNewCacheChangeAdded( { change->instanceHandle = local_data.m_key; guid = local_data.m_guid; +======= + // Deserialize the payload to access the discovery info + CDRMessage_t msg(change->serializedPayload); + temp_participant_data_.clear(); + auto participant_data = temp_participant_data_; + + if (participant_data.readFromCDRMessage( + &msg, + true, + pdp_server()->getRTPSParticipant()->network_factory(), + pdp_server()->getRTPSParticipant()->has_shm_transport())) + { + /* Check PID_VENDOR_ID */ + if (participant_data.m_VendorId != fastrtps::rtps::c_VendorId_eProsima) + { + logInfo(RTPS_PDP_LISTENER, + "DATA(p|Up) from different vendor is not supported for Discover-Server operation"); + return; + } + + fastrtps::ParameterPropertyList_t properties = participant_data.m_properties; + + /* Check DS_VERSION */ + auto ds_version = std::find_if( + properties.begin(), + properties.end(), + [](const dds::ParameterProperty_t& property) + { + return property.first() == dds::parameter_property_ds_version; + }); + + if (ds_version != properties.end()) + { + if (std::stof(ds_version->second()) < 1.0) + { + logError(RTPS_PDP_LISTENER, "Minimum " << dds::parameter_property_ds_version + << " is 1.0, found: " << ds_version->second()); + return; + } + logInfo(RTPS_PDP_LISTENER, "Participant " << dds::parameter_property_ds_version << ": " + << ds_version->second()); + } + else + { + logInfo(RTPS_PDP_LISTENER, dds::parameter_property_ds_version << " is not set. Assuming 1.0"); + } + + /* Check PARTICIPANT_TYPE */ + bool is_client = true; + auto participant_type = std::find_if( + properties.begin(), + properties.end(), + [](const dds::ParameterProperty_t& property) + { + return property.first() == dds::parameter_property_participant_type; + }); + + if (participant_type != properties.end()) + { + if (participant_type->second() == ParticipantType::SERVER || + participant_type->second() == ParticipantType::BACKUP || + participant_type->second() == ParticipantType::SUPER_CLIENT) + { + is_client = false; + } + else if (participant_type->second() == ParticipantType::SIMPLE) + { + logInfo(RTPS_PDP_LISTENER, "Ignoring " << dds::parameter_property_participant_type << ": " + << participant_type->second()); + return; + } + else if (participant_type->second() != ParticipantType::CLIENT) + { + logError(RTPS_PDP_LISTENER, "Wrong " << dds::parameter_property_participant_type << ": " + << participant_type->second()); + return; + } + logInfo(RTPS_PDP_LISTENER, "Participant type " << participant_type->second()); + } + else + { + logInfo(RTPS_PDP_LISTENER, dds::parameter_property_participant_type << " is not set"); + // Fallback to checking whether participant is a SERVER looking for the persistence GUID + auto persistence_guid = std::find_if( + properties.begin(), + properties.end(), + [](const dds::ParameterProperty_t& property) + { + return property.first() == dds::parameter_property_persistence_guid; + }); + // The presence of persistence GUID property suggests a SERVER. This assumption is made to keep + // backwards compatibility with Discovery Server v1.0. However, any participant that has been configured + // as persistent will have this property. + if (persistence_guid != properties.end()) + { + is_client = false; + } + logInfo(RTPS_PDP_LISTENER, "Participant is client: " << std::boolalpha << is_client); + } + + // Check whether the participant is a client/server of this server or if it has been forwarded from + // another entity (server). + // is_local means that the server is connected (or will be) with this entity directly + bool is_local = true; + + // In case a new changes arrives from a local entity, but the ParticipantProxyData already exists + // because we know it from other server + bool was_local = true; + + // If the instance handle is different from the writer GUID, then the change has been relayed + if (iHandle2GUID(change->instanceHandle).guidPrefix != change->writerGUID.guidPrefix) + { + is_local = false; + } + else + { + // We already know that the writer and the entity are the same, so we can use writerGUID + was_local = pdp_server()->discovery_db().is_participant_local(change->writerGUID.guidPrefix); + } + + if (!pdp_server()->discovery_db().backup_in_progress()) + { + // Notify the DiscoveryDataBase + if (pdp_server()->discovery_db().update( + change.get(), + ddb::DiscoveryParticipantChangeData( + participant_data.metatraffic_locators, + is_client, + is_local))) + { + // Remove change from PDP reader history, but do not return it to the pool. From here on, the discovery + // database takes ownership of the CacheChange_t. Henceforth there are no references to the change. + // Take change ownership away from the unique pointer, so that its destruction does not destroy the data + pdp_history->remove_change(pdp_history->find_change(change.release()), false); + + // Ensure processing time for the cache by triggering the Server thread (which process the updates) + // The server does not have to postpone the execution of the routine if a change is received, i.e. + // the server routine is triggered instantly as the default value of the interval that the server has + // to wait is 0. + pdp_server()->awake_routine_thread(); + + // TODO: when the DiscoveryDataBase allows updating capabilities we can dismissed old PDP processing + } + else + { + // If the database doesn't take the ownership, then return the CacheChante_t to the pool. + pdp_reader->releaseCache(change.release()); + } + + } + else + { + // Release the unique pointer, not the change in the pool + change.release(); + } +>>>>>>> 392defcc0 (Fix mutex lock count on PDPListener (#2020)) // At this point we can release reader lock. reader->getMutex().unlock(); @@ -124,8 +281,14 @@ void PDPServerListener::onNewCacheChangeAdded( logInfo(RTPS_PDP, "Registering a new participant: " << change->write_params.sample_identity().writer_guid()); +<<<<<<< HEAD // Create a new one when not found pdata = parent_pdp_->createParticipantProxyData(local_data, writer_guid); +======= + // Create a new participant proxy entry + pdata = pdp_server()->createParticipantProxyData(participant_data, writer_guid); + // Realease PDP mutex +>>>>>>> 392defcc0 (Fix mutex lock count on PDPListener (#2020)) lock.unlock(); if (pdata != nullptr) @@ -142,7 +305,12 @@ void PDPServerListener::onNewCacheChangeAdded( } else { +<<<<<<< HEAD pdata->updateData(local_data); +======= + // Update proxy + pdata->updateData(participant_data); +>>>>>>> 392defcc0 (Fix mutex lock count on PDPListener (#2020)) pdata->isAlive = true; // activate lease duration if the DATA(p) comes directly from the client bool previous_lease_check_status = pdata->should_check_lease_duration; diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index 96b32c193f5..a29b6c665ea 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -233,6 +233,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) ${CMAKE_CURRENT_BINARY_DIR}/PubSubWriter.xml) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/PubSubReader.xml.in ${CMAKE_CURRENT_BINARY_DIR}/PubSubReader.xml) + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/discovery_participant_flags.xml + ${CMAKE_CURRENT_BINARY_DIR}/discovery_participant_flags.xml) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/persistence.xml ${CMAKE_CURRENT_BINARY_DIR}/persistence.xml) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/utils/check_guid.py diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 5d00e742bd3..4348a971b4a 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -491,13 +491,52 @@ class PubSubReader std::cout << "Reader discovery finished..." << std::endl; } + bool wait_participant_discovery( + unsigned int min_participants = 1, + std::chrono::seconds timeout = std::chrono::seconds::zero()) + { + bool ret_value = true; + std::unique_lock lock(mutexDiscovery_); + + std::cout << "Reader is waiting discovery of at least " << min_participants << " participants..." << std::endl; + + if (timeout == std::chrono::seconds::zero()) + { + cvDiscovery_.wait(lock, [&]() + { + return participant_matched_ >= min_participants; + }); + } + else + { + if (!cvDiscovery_.wait_for(lock, timeout, [&]() + { + return participant_matched_ >= min_participants; + })) + { + ret_value = false; + } + } + + if (ret_value) + { + std::cout << "Reader participant discovery finished successfully..." << std::endl; + } + else + { + std::cout << "Reader participant discovery finished unsuccessfully..." << std::endl; + } + + return ret_value; + } + bool wait_participant_undiscovery( std::chrono::seconds timeout = std::chrono::seconds::zero()) { bool ret_value = true; std::unique_lock lock(mutexDiscovery_); - std::cout << "Reader is waiting undiscovery..." << std::endl; + std::cout << "Reader is waiting participant undiscovery..." << std::endl; if (timeout == std::chrono::seconds::zero()) { @@ -519,11 +558,11 @@ class PubSubReader if (ret_value) { - std::cout << "Reader undiscovery finished successfully..." << std::endl; + std::cout << "Reader participant undiscovery finished successfully..." << std::endl; } else { - std::cout << "Reader undiscovery finished unsuccessfully..." << std::endl; + std::cout << "Reader participant undiscovery finished unsuccessfully..." << std::endl; } return ret_value; @@ -881,6 +920,13 @@ class PubSubReader return *this; } + PubSubReader& ignore_participant_flags( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t flags) + { + participant_qos_.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = flags; + return *this; + } + PubSubReader& socket_buffer_size( uint32_t sockerBufferSize) { diff --git a/test/blackbox/common/BlackboxTestsDiscovery.cpp b/test/blackbox/common/BlackboxTestsDiscovery.cpp index 3d8e248442f..d038b5f724d 100644 --- a/test/blackbox/common/BlackboxTestsDiscovery.cpp +++ b/test/blackbox/common/BlackboxTestsDiscovery.cpp @@ -1117,4 +1117,4 @@ TEST(Discovery, ServerClientEnvironmentSetUp) output.clear(); ASSERT_FALSE(load_environment_server_info(text, output)); -} +} \ No newline at end of file diff --git a/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp new file mode 100644 index 00000000000..470d3562aa9 --- /dev/null +++ b/test/blackbox/common/DDSBlackboxTestsDiscovery.cpp @@ -0,0 +1,55 @@ +// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "BlackboxTests.hpp" + +#include "PubSubReader.hpp" + +// Regression test for redmine issue 11857 +TEST(DDSDiscovery, IgnoreParticipantFlags) +{ + // This participant is created with: + // - ignoreParticipantFlags = FILTER_SAME_PROCESS (will avoid discovery of p2) + // - metatrafficUnicastLocatorList = 127.0.0.1:7399, 127.0.0.1:7398 (to ensure two listening threads are created) + PubSubReader p1(TEST_TOPIC_NAME); + p1.set_xml_filename("discovery_participant_flags.xml"); + p1.set_participant_profile("participant_1"); + p1.init(); + EXPECT_TRUE(p1.isInitialized()); + + // This participant is created with initialPeersList = 127.0.0.1:7399 + // When the announcements of this participant arrive to p1, they will be ignored, and thus p1 will not + // announce itself back to p2. + PubSubReader p2(TEST_TOPIC_NAME); + p2.set_xml_filename("discovery_participant_flags.xml"); + p2.set_participant_profile("participant_2"); + p2.init(); + EXPECT_TRUE(p2.isInitialized()); + EXPECT_FALSE(p2.wait_participant_discovery(1, std::chrono::seconds(1))); + EXPECT_FALSE(p1.wait_participant_discovery(1, std::chrono::seconds(1))); + + // This participant is created with: + // - initialPeersList = 127.0.0.1:7398 + // - a custom guid prefix + // The announcements of this participant will arrive to p1 on a different listening thread. + // Due to the custom prefix, they should not be ignored, and mutual discovery should happen + PubSubReader p3(TEST_TOPIC_NAME); + p3.set_xml_filename("discovery_participant_flags.xml"); + p3.set_participant_profile("participant_3"); + p3.init(); + EXPECT_TRUE(p1.wait_participant_discovery()); + EXPECT_TRUE(p3.wait_participant_discovery()); +} \ No newline at end of file diff --git a/test/blackbox/discovery_participant_flags.xml b/test/blackbox/discovery_participant_flags.xml new file mode 100644 index 00000000000..ff4d216972c --- /dev/null +++ b/test/blackbox/discovery_participant_flags.xml @@ -0,0 +1,67 @@ + + + + + + + Discovery.IgnoreParticipantFlags.p1 + + + + FILTER_SAME_PROCESS + + + + + 7399 +
127.0.0.1
+
+
+ + + 7398 +
127.0.0.1
+
+
+
+
+
+
+ + + + Discovery.IgnoreParticipantFlags.p2 + + + + + + 7399 +
127.0.0.1
+
+
+
+
+
+
+ + + + f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0 + Discovery.IgnoreParticipantFlags.p3 + + + + + + 7398 +
127.0.0.1
+
+
+
+
+
+
+ +
+