Skip to content

Commit

Permalink
Do not reuse cache change if sample does not fit (#1021)
Browse files Browse the repository at this point in the history
This is a port of #1013 from 1.9.x
  • Loading branch information
IkerLuengo authored Feb 21, 2020
1 parent 3d18a54 commit d402a75
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ bool StatelessReader::processDataFragMsg(
if(work_change->sequenceNumber < change_to_add->sequenceNumber)
{
// Pending change should be dropped. Check if it can be reused
if (work_change->serializedPayload.max_size <= sampleSize)
if (sampleSize <= work_change->serializedPayload.max_size)
{
// Sample fits inside pending change. Reuse it.
work_change->copy_not_memcpy(change_to_add);
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/rtps/transport/test_UDPv4Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <asio.hpp>
#include <fastdds/rtps/transport/test_UDPv4Transport.h>
#include <cstdlib>

Expand Down Expand Up @@ -48,6 +49,10 @@ test_UDPv4Transport::test_UDPv4Transport(const test_UDPv4TransportDescriptor& de
test_UDPv4Transport_ShutdownAllNetwork = false;
UDPv4Transport::mSendBufferSize = descriptor.sendBufferSize;
UDPv4Transport::mReceiveBufferSize = descriptor.receiveBufferSize;
for (auto interf : descriptor.interfaceWhiteList)
{
UDPv4Transport::interface_whitelist_.emplace_back(asio::ip::address_v4::from_string(interf));
}
test_UDPv4Transport_DropLog.clear();
test_UDPv4Transport_DropLogLength = descriptor.dropLogLength;
}
Expand Down
2 changes: 2 additions & 0 deletions test/blackbox/BlackboxTests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ std::list<Data1mb> default_data300kb_data_generator(size_t max = 0);

std::list<Data1mb> default_data300kb_mix_data_generator(size_t max = 0);

std::list<Data1mb> default_data96kb_data300kb_data_generator(size_t max = 0);

/****** Auxiliary lambda functions ******/
extern const std::function<void(const HelloWorld&)> default_helloworld_print;

Expand Down
67 changes: 66 additions & 1 deletion test/blackbox/BlackboxTestsPubSubFragments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,74 @@ TEST(BlackBox, AsyncPubSubAsReliableData300kbInLossyConditions)
reader.block_for_all();

// Sanity check. Make sure we have dropped a few packets
ASSERT_EQ(eprosima::fastrtps::rtps::test_UDPv4Transport::test_UDPv4Transport_DropLog.size(), testTransport->dropLogLength);
ASSERT_EQ(
eprosima::fastrtps::rtps::test_UDPv4Transport::test_UDPv4Transport_DropLog.size(),
testTransport->dropLogLength);
}

// Test introduced to verify the fix of the bug (#7609 Do not reuse cache change if sample does not fit)
// detected in relase 1.9.4
TEST(PubSubFragments, AsyncPubSubAsBestEffortAlternateSizeInLossyConditions)
{
PubSubReader<Data1mbType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbType> writer(TEST_TOPIC_NAME);

auto reader_transport = std::make_shared<UDPv4TransportDescriptor>();
reader_transport->interfaceWhiteList.push_back("127.0.0.1");

reader
.disable_builtin_transport()
.add_user_transport_to_pparams(reader_transport)
.history_depth(5)
.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS)
.mem_policy(eprosima::fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE)
.init();

ASSERT_TRUE(reader.isInitialized());


// To simulate lossy conditions, we are going to remove the default
// bultin transport, and instead use a lossy shim layer variant.
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->sendBufferSize = 65536;
testTransport->receiveBufferSize = 65536;
// We drop 50% of all data frags
testTransport->dropDataFragMessagesPercentage = 50;
testTransport->dropLogLength = 10;
// Only one interface in order to really drop 50% of packages!!!
testTransport->interfaceWhiteList.push_back("127.0.0.1");

writer
.disable_builtin_transport()
.add_user_transport_to_pparams(testTransport)
.history_depth(5)
.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE)
.init();

ASSERT_TRUE(writer.isInitialized());

// Because its volatile the durability
// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_data96kb_data300kb_data_generator(2);

reader.startReception(data);
writer.send(data);

// All data has 7 fragments so when 3 has been lost all data has been sent
// Wait until then
while(eprosima::fastrtps::rtps::test_UDPv4Transport::test_UDPv4Transport_DropLog.size() < 3)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}

// A second should be enough time to assure all data has been received
std::this_thread::sleep_for(std::chrono::seconds(1));
}


TEST(BlackBox, AsyncFragmentSizeTest)
{
// ThroghputController size large than maxMessageSize.
Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,12 @@ class PubSubReader
return *this;
}

PubSubReader& mem_policy(const eprosima::fastrtps::rtps::MemoryManagementPolicy mem_policy)
{
subscriber_attr_.historyMemoryPolicy = mem_policy;
return *this;
}

PubSubReader& deadline_period(const eprosima::fastrtps::Duration_t deadline_period)
{
subscriber_attr_.qos.m_deadline.period = deadline_period;
Expand Down
24 changes: 24 additions & 0 deletions test/blackbox/utils/data_generators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,27 @@ std::list<Data1mb> default_data300kb_mix_data_generator(size_t max)

return returnedValue;
}

const size_t data96kb_length = 96*1024;
std::list<Data1mb> default_data96kb_data300kb_data_generator(size_t max)
{
unsigned char index = 1;
size_t maximum = max ? max : 10;
std::list<Data1mb> returnedValue(maximum);

std::generate(returnedValue.begin(), returnedValue.end(), [&index]
{
Data1mb data;
size_t length = index % 2 != 0 ? data96kb_length : data300kb_length;
data.data().resize(length);
data.data()[0] = index;
for (size_t i = 1; i < length; ++i)
{
data.data()[i] = static_cast<unsigned char>(i + data.data()[0]);
}
++index;
return data;
});

return returnedValue;
}

0 comments on commit d402a75

Please sign in to comment.