Skip to content

Commit

Permalink
Fixed persistence guid issue (#2011)
Browse files Browse the repository at this point in the history
* Added statistics regression test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Fix issue.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* More blackbox tests added

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Linters

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Fixed build errors

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Apply suggestions from code review

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: José Luis Bueno López <69244257+JLBuenoLopez-eProsima@users.noreply.github.com>

Co-authored-by: José Luis Bueno López <69244257+JLBuenoLopez-eProsima@users.noreply.github.com>
  • Loading branch information
MiguelCompany and JLBuenoLopez authored Jun 16, 2021
1 parent 5b8b905 commit 92cccd6
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ RTPSParticipantImpl::RTPSParticipantImpl(
: domain_id_(domain_id)
, m_att(PParam)
, m_guid(guidP, c_EntityId_RTPSParticipant)
, m_persistence_guid(persistence_guid, c_EntityId_RTPSParticipant)
, mp_builtinProtocols(nullptr)
, mp_ResourceSemaphore(new Semaphore(0))
, IdCounter(0)
Expand All @@ -146,6 +145,10 @@ RTPSParticipantImpl::RTPSParticipantImpl(
, is_intraprocess_only_(should_be_intraprocess_only(PParam))
, has_shm_transport_(false)
{
if (c_GuidPrefix_Unknown != persistence_guid)
{
m_persistence_guid = GUID_t(persistence_guid, c_EntityId_RTPSParticipant);
}
// Builtin transports by default
if (PParam.useBuiltinTransports)
{
Expand Down
90 changes: 90 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,96 @@ TEST_P(PubSubBasic, unique_flows_one_writer_two_readers)
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30)));
}

template<typename T>
static void two_consecutive_writers(
PubSubReader<T>& reader,
PubSubWriter<T>& writer,
bool block_for_all)
{
writer.init();
EXPECT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto complete_data = default_helloworld_data_generator();

reader.startReception(complete_data);

// Send data
writer.send(complete_data);
EXPECT_TRUE(complete_data.empty());

if (block_for_all)
{
reader.block_for_all();
}
else
{
reader.block_for_at_least(2);
}
reader.stopReception();

writer.destroy();

// Wait for undiscovery
reader.wait_writer_undiscovery();
}

TEST_P(PubSubBasic, BestEffortTwoWritersConsecutives)
{
// Pull mode incompatible with best effort
if (use_pull_mode)
{
return;
}

PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);

reader.history_depth(10).init();
EXPECT_TRUE(reader.isInitialized());

for (int i = 0; i < 2; ++i)
{
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);
writer.history_depth(10).reliability(BEST_EFFORT_RELIABILITY_QOS);
two_consecutive_writers(reader, writer, false);
}
}


TEST_P(PubSubBasic, ReliableVolatileTwoWritersConsecutives)
{
PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);

reader.history_depth(10).reliability(RELIABLE_RELIABILITY_QOS).init();
EXPECT_TRUE(reader.isInitialized());

for (int i = 0; i < 2; ++i)
{
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);
writer.history_depth(10).durability_kind(VOLATILE_DURABILITY_QOS);
two_consecutive_writers(reader, writer, true);
}
}

TEST_P(PubSubBasic, ReliableTransientLocalTwoWritersConsecutives)
{
PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);

reader.history_depth(10).reliability(RELIABLE_RELIABILITY_QOS).durability_kind(TRANSIENT_LOCAL_DURABILITY_QOS);
reader.init();
EXPECT_TRUE(reader.isInitialized());

for (int i = 0; i < 2; ++i)
{
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);
writer.history_depth(10).reliability(RELIABLE_RELIABILITY_QOS);
two_consecutive_writers(reader, writer, true);
}
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
54 changes: 54 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,57 @@ TEST(DDSStatistics, simple_statistics_datareaders)

#endif // FASTDDS_STATISTICS
}

TEST(DDSStatistics, simple_statistics_second_writer)
{
#ifdef FASTDDS_STATISTICS

auto transport = std::make_shared<UDPv4TransportDescriptor>();
auto domain_id = GET_PID() % 100;

DomainParticipantQos p_qos = PARTICIPANT_QOS_DEFAULT;
p_qos.transport().use_builtin_transports = false;
p_qos.transport().user_transports.push_back(transport);

auto participant_factory = DomainParticipantFactory::get_instance();
DomainParticipant* p1 = participant_factory->create_participant(domain_id, p_qos);
DomainParticipant* p2 = participant_factory->create_participant(domain_id, p_qos);

ASSERT_NE(nullptr, p1);
ASSERT_NE(nullptr, p2);

auto statistics_p1 = statistics::dds::DomainParticipant::narrow(p1);
auto statistics_p2 = statistics::dds::DomainParticipant::narrow(p2);
ASSERT_NE(nullptr, statistics_p1);
ASSERT_NE(nullptr, statistics_p2);

auto subscriber_p1 = p1->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
auto subscriber_p2 = p2->create_subscriber(SUBSCRIBER_QOS_DEFAULT);
ASSERT_NE(nullptr, subscriber_p1);
ASSERT_NE(nullptr, subscriber_p2);

auto physical_data_reader_1 = enable_statistics(statistics_p1, subscriber_p1, statistics::PHYSICAL_DATA_TOPIC);
auto physical_data_reader_2 = enable_statistics(statistics_p2, subscriber_p2, statistics::PHYSICAL_DATA_TOPIC);
ASSERT_NE(nullptr, physical_data_reader_1);
ASSERT_NE(nullptr, physical_data_reader_2);

wait_statistics(physical_data_reader_1, 2, "PHYSICAL_DATA_TOPIC", 10u);
wait_statistics(physical_data_reader_2, 2, "PHYSICAL_DATA_TOPIC", 10u);

disable_statistics(statistics_p1, subscriber_p1, physical_data_reader_1, statistics::PHYSICAL_DATA_TOPIC);
physical_data_reader_1 = enable_statistics(statistics_p1, subscriber_p1, statistics::PHYSICAL_DATA_TOPIC);

wait_statistics(physical_data_reader_1, 2, "PHYSICAL_DATA_TOPIC", 10u);
wait_statistics(physical_data_reader_2, 1, "PHYSICAL_DATA_TOPIC", 10u);

disable_statistics(statistics_p1, subscriber_p1, physical_data_reader_1, statistics::PHYSICAL_DATA_TOPIC);
disable_statistics(statistics_p2, subscriber_p2, physical_data_reader_2, statistics::PHYSICAL_DATA_TOPIC);

p2->delete_subscriber(subscriber_p2);
p1->delete_subscriber(subscriber_p1);

participant_factory->delete_participant(p2);
participant_factory->delete_participant(p1);

#endif // FASTDDS_STATISTICS
}
105 changes: 105 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,111 @@ TEST_P(RTPS, RTPSAsReliableWithRegistrationAndHolesInHistory)
}


TEST_P(RTPS, RTPSAsReliableVolatileTwoWritersConsecutives)
{
RTPSWithRegistrationReader<HelloWorldType> reader(TEST_TOPIC_NAME);
RTPSWithRegistrationWriter<HelloWorldType> writer(TEST_TOPIC_NAME);

reader.reliability(ReliabilityKind_t::RELIABLE).init();
EXPECT_TRUE(reader.isInitialized());

writer.reliability(ReliabilityKind_t::RELIABLE).init();
EXPECT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto complete_data = default_helloworld_data_generator();

reader.expected_data(complete_data);
reader.startReception();

// Send data
writer.send(complete_data);
EXPECT_TRUE(complete_data.empty());

reader.block_for_all();
reader.stopReception();

writer.destroy();

// Wait for undiscovery
reader.wait_undiscovery();

writer.reliability(ReliabilityKind_t::RELIABLE).init();

// Wait for discovery
writer.wait_discovery();
reader.wait_discovery();

complete_data = default_helloworld_data_generator();

reader.expected_data(complete_data, true);
reader.startReception();

writer.send(complete_data);
EXPECT_TRUE(complete_data.empty());

reader.block_for_all();

reader.destroy();
writer.destroy();
}

TEST_P(RTPS, RTPSAsReliableTransientLocalTwoWritersConsecutives)
{
RTPSWithRegistrationReader<HelloWorldType> reader(TEST_TOPIC_NAME);
RTPSWithRegistrationWriter<HelloWorldType> writer(TEST_TOPIC_NAME);

reader.reliability(ReliabilityKind_t::RELIABLE).durability(DurabilityKind_t::TRANSIENT_LOCAL).init();
EXPECT_TRUE(reader.isInitialized());

writer.reliability(ReliabilityKind_t::RELIABLE).init();

EXPECT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto complete_data = default_helloworld_data_generator();

reader.expected_data(complete_data);
reader.startReception();

// Send data
writer.send(complete_data);
EXPECT_TRUE(complete_data.empty());

reader.block_for_all();
reader.stopReception();

writer.destroy();

// Wait for undiscovery
reader.wait_undiscovery();

writer.reliability(ReliabilityKind_t::RELIABLE).init();

// Wait for discovery
writer.wait_discovery();
reader.wait_discovery();

complete_data = default_helloworld_data_generator();

reader.expected_data(complete_data, true);
reader.startReception();

writer.send(complete_data);
EXPECT_TRUE(complete_data.empty());

reader.block_for_all();

reader.destroy();
writer.destroy();
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down
8 changes: 7 additions & 1 deletion test/blackbox/common/RTPSWithRegistrationReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,16 @@ class RTPSWithRegistrationReader
}

void expected_data(
const std::list<type>& msgs)
const std::list<type>& msgs,
bool reset_seq = false)
{
std::unique_lock<std::mutex> lock(mutex_);
total_msgs_ = msgs;

if (reset_seq)
{
last_seq_ = eprosima::fastrtps::rtps::SequenceNumber_t();
}
}

void expected_data(
Expand Down

0 comments on commit 92cccd6

Please sign in to comment.