Skip to content

Commit

Permalink
Only accepting ALIVE changes from unknown trusted writers (#1005)
Browse files Browse the repository at this point in the history
* Refs #7552. Added blackbox test.

* Refs #7552. Only accepting changes from unknown writers when they are ALIVE.
  • Loading branch information
MiguelCompany authored Feb 13, 2020
1 parent 5bc0e7b commit 2577261
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 9 deletions.
3 changes: 2 additions & 1 deletion include/fastrtps/rtps/reader/StatelessReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ class StatelessReader : public RTPSReader
};

bool acceptMsgFrom(
const GUID_t& entityId);
const GUID_t& entityId,
ChangeKind_t change_kind);

bool thereIsUpperRecordOf(
const GUID_t& guid,
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ void PDPListener::onNewCacheChangeAdded(

// Load information on temp_participant_data_
CDRMessage_t msg(change->serializedPayload);
temp_participant_data_.clear();
if(temp_participant_data_.readFromCDRMessage(&msg, true, parent_pdp_->getRTPSParticipant()->network_factory()))
{
// After correctly reading it
Expand Down
20 changes: 12 additions & 8 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ bool StatelessReader::processDataMsg(

std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);

if (acceptMsgFrom(change->writerGUID))
if (acceptMsgFrom(change->writerGUID, change->kind))
{
logInfo(RTPS_MSG_IN, IDSTRING "Trying to add change " << change->sequenceNumber << " TO reader: "
<< getGuid().entityId);
Expand Down Expand Up @@ -466,15 +466,19 @@ bool StatelessReader::processGapMsg(
}

bool StatelessReader::acceptMsgFrom(
const GUID_t& writerId)
const GUID_t& writerId,
ChangeKind_t change_kind)
{
if (m_acceptMessagesFromUnkownWriters)
if (change_kind == ChangeKind_t::ALIVE)
{
return true;
}
else if (writerId.entityId == m_trustedWriterEntityId)
{
return true;
if (m_acceptMessagesFromUnkownWriters)
{
return true;
}
else if (writerId.entityId == m_trustedWriterEntityId)
{
return true;
}
}

return std::any_of(matched_writers_.begin(), matched_writers_.end(),
Expand Down
65 changes: 65 additions & 0 deletions test/blackbox/BlackboxTestsDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,71 @@ TEST_P(Discovery, TwentyParticipantsSeveralEndpointsUnicast)
discoverParticipantsSeveralEndpointsTest(true, 20, 20, 20, TEST_TOPIC_NAME);
}

//! Regression test for support case 7552 (CRM #353)
TEST_P(Discovery, RepeatPubGuid)
{
PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldType> writer2(TEST_TOPIC_NAME);

reader
.history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS)
.history_depth(10)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.participant_id(2)
.init();

writer
.history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS)
.history_depth(10)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.participant_id(1)
.init();

ASSERT_TRUE(reader.isInitialized());
ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_helloworld_data_generator();
reader.startReception(data);

// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();

writer.destroy();
reader.wait_writer_undiscovery();
reader.wait_participant_undiscovery();

writer2
.history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS)
.history_depth(10)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.participant_id(1)
.init();

ASSERT_TRUE(writer2.isInitialized());

writer2.wait_discovery();
reader.wait_discovery();

data = default_helloworld_data_generator();
reader.startReception(data);

// Send data
writer2.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();
}

INSTANTIATE_TEST_CASE_P(Discovery,
Discovery,
testing::Values(false, true),
Expand Down
1 change: 1 addition & 0 deletions test/blackbox/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ class PubSubReader
total_msgs_ = msgs;
number_samples_expected_ = total_msgs_.size();
current_received_count_ = 0;
last_seq = eprosima::fastrtps::rtps::SequenceNumber_t();
mutex_.unlock();

bool ret = false;
Expand Down

0 comments on commit 2577261

Please sign in to comment.