Skip to content

Commit

Permalink
Refs #5204 Check if reader has disable_positive_acks enabled. [5218] (#…
Browse files Browse the repository at this point in the history
…508)

* Refs #5204 Check if reader has disable_positive_acks enabled.

* Refs #5204 WIP Adding BlackBoxTests for DisablePositiveAck

* Refs #5204 Fixing failing test

* Refs #5204 Fixing mac warning

* Refs #5204 Trying to make tests more stable on Windows

* Refs #5204 Fixing memory leak
  • Loading branch information
LuisGP authored and richiware committed Apr 30, 2019
1 parent cb4f5b1 commit 11af676
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 23 deletions.
14 changes: 10 additions & 4 deletions include/fastrtps/rtps/attributes/WriterAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,18 @@ class RemoteReaderAttributes
{
public:

RemoteReaderAttributes() : expectsInlineQos(false),
is_eprosima_endpoint(true)
RemoteReaderAttributes()
: expectsInlineQos(false)
, is_eprosima_endpoint(true)
, disable_positive_acks(false)
{
endpoint.endpointKind = READER;
}

RemoteReaderAttributes(const VendorId_t& vendor_id) : expectsInlineQos(false),
is_eprosima_endpoint(vendor_id == c_VendorId_eProsima)
RemoteReaderAttributes(const VendorId_t& vendor_id)
: expectsInlineQos(false)
, is_eprosima_endpoint(vendor_id == c_VendorId_eProsima)
, disable_positive_acks(false)
{
endpoint.endpointKind = READER;
}
Expand Down Expand Up @@ -163,6 +167,8 @@ class RemoteReaderAttributes
bool expectsInlineQos;

bool is_eprosima_endpoint;

bool disable_positive_acks;
};

}
Expand Down
1 change: 1 addition & 0 deletions src/cpp/rtps/builtin/data/ReaderProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ RemoteReaderAttributes ReaderProxyData::toRemoteReaderAttributes() const
remoteAtt.endpoint.reliabilityKind = m_qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS ? RELIABLE : BEST_EFFORT;
remoteAtt.endpoint.unicastLocatorList = this->m_unicastLocatorList;
remoteAtt.endpoint.multicastLocatorList = this->m_multicastLocatorList;
remoteAtt.disable_positive_acks = m_qos.m_disablePositiveACKs.enabled;

return remoteAtt;
}
Expand Down
64 changes: 45 additions & 19 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ StatefulWriter::~StatefulWriter()

logInfo(RTPS_WRITER,"StatefulWriter destructor");

if (disable_positive_acks_)
{
delete ack_timer_;
}

// Stop all active proxies and pass them to the pool
while (!matched_readers_.empty())
{
Expand Down Expand Up @@ -270,6 +275,12 @@ void StatefulWriter::unsent_change_added_to_history(

if (disable_positive_acks_)
{
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);

ack_timer_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_timer_->restart_timer();
}
}
Expand Down Expand Up @@ -1084,6 +1095,7 @@ void StatefulWriter::send_heartbeat_nts_(
RTPSMessageGroup& message_group,
bool final)
{

SequenceNumber_t firstSeq = get_seq_num_min();
SequenceNumber_t lastSeq = get_seq_num_max();

Expand Down Expand Up @@ -1278,29 +1290,43 @@ void StatefulWriter::ack_timer_expired()
{
std::unique_lock<std::recursive_timed_mutex> lock(mp_mutex);

// The timer has expired so the earliest non-acked change must be marked as acknowledged
// This will be done in the first while iteration, as we start with a negative interval

// The timer has expired so the earliest non-acked change must have been acked
for(const auto& remote_reader : matched_readers_)
{
remote_reader->acked_changes_set(last_sequence_number_);
}
last_sequence_number_++;

// Get the next cache change from the history
CacheChange_t* change;
if (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change))
auto interval = -keep_duration_us_;

// On the other hand, we've seen in the tests that if samples are sent very quickly with little
// time between consecutive samples, the timer interval could end up being negative
// In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while
// loop

while (interval.count() < 0)
{
return;
}
for(ReaderProxy* remote_reader : matched_readers_)
{
if (remote_reader->reader_attributes().disable_positive_acks)
{
remote_reader->acked_changes_set(last_sequence_number_ + 1);
}
}
last_sequence_number_++;

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
// Get the next cache change from the history
CacheChange_t* change;

assert(interval.count() > 0);
if (!mp_history->get_change(
last_sequence_number_,
getGuid(),
&change))
{
return;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
interval = source_timestamp - now + keep_duration_us_;
}
assert(interval.count() >= 0);

ack_timer_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_timer_->restart_timer();
Expand Down
158 changes: 158 additions & 0 deletions test/blackbox/BlackboxTestsAcknackQos.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "BlackboxTests.hpp"

#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"
#include "ReqRepAsReliableHelloWorldRequester.hpp"
#include "ReqRepAsReliableHelloWorldReplier.hpp"

#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/transport/test_UDPv4Transport.h>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;

BLACKBOXTEST(AcknackQos, RecoverAfterLosingCommunicationWithDisablePositiveAck)
{
// This test makes the writer send a few samples
// and checks that those changes were received by the reader.
// Then disconnects the communication and sends some more samples.
// Reconnects and checks that the reader receives only the lost samples by the disconnection.

PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);

// Number of samples written by writer
uint32_t writer_samples = 15;

auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();

writer.keep_duration({2, 0});
//writer.history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS);
writer.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS);
//writer.history_depth(15);
writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
//writer.lifespan_period(lifespan_s);
writer.disable_builtin_transport();
writer.add_user_transport_to_pparams(testTransport);
writer.init();

reader.keep_duration({1, 0});
//reader.history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS);
reader.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS);
//reader.history_depth(15);
reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
//reader.lifespan_period(lifespan_s);
reader.init();

ASSERT_TRUE(reader.isInitialized());
ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

std::list<HelloWorld> data = default_helloworld_data_generator(writer_samples);
reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true;

data = default_helloworld_data_generator(writer_samples);
reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());

std::this_thread::sleep_for(std::chrono::seconds(1));

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = false;

// Block reader until reception finished or timeout.
reader.block_for_all();
}

BLACKBOXTEST(AcknackQos, NotRecoverAfterLosingCommunicationWithDisablePositiveAck)
{
// This test makes the writer send a few samples
// and checks that those changes were received by the reader.
// Then disconnects the communication and sends some more samples.
// Reconnects and checks that the reader receives only the lost samples by the disconnection.

PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);

// Number of samples written by writer
uint32_t writer_samples = 15;

auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();

writer.keep_duration({1, 0});
//writer.history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS);
writer.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS);
//writer.history_depth(15);
writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
//writer.lifespan_period(lifespan_s);
writer.disable_builtin_transport();
writer.add_user_transport_to_pparams(testTransport);
writer.init();

reader.keep_duration({1, 0});
//reader.history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS);
reader.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS);
//reader.history_depth(15);
reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS);
//reader.lifespan_period(lifespan_s);
reader.init();

ASSERT_TRUE(reader.isInitialized());
ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

std::list<HelloWorld> data = default_helloworld_data_generator(writer_samples);
reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_all();

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true;

data = default_helloworld_data_generator(writer_samples);
reader.startReception(data);
// Send data
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());

std::this_thread::sleep_for(std::chrono::seconds(2));

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = false;

// Block reader until reception finished or timeout.
ASSERT_EQ(reader.block_for_all(std::chrono::seconds(1)), 0u);
}

0 comments on commit 11af676

Please sign in to comment.