Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not reuse cache change if sample does not fit. [7609] #1013

Merged
merged 2 commits into from
Feb 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ bool StatelessReader::processDataFragMsg(
if (work_change->sequenceNumber < change_to_add->sequenceNumber)
{
// Pending change should be dropped. Check if it can be reused
if (work_change->serializedPayload.max_size <= sampleSize)
if (sampleSize <= work_change->serializedPayload.max_size)
{
// Sample fits inside pending change. Reuse it.
work_change->copy_not_memcpy(change_to_add);
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/transport/test_UDPv4Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <asio.hpp>
#include <fastrtps/transport/test_UDPv4Transport.h>
#include <cstdlib>

Expand Down Expand Up @@ -41,6 +42,10 @@ test_UDPv4Transport::test_UDPv4Transport(const test_UDPv4TransportDescriptor& de
test_UDPv4Transport_ShutdownAllNetwork = false;
UDPv4Transport::mSendBufferSize = descriptor.sendBufferSize;
UDPv4Transport::mReceiveBufferSize = descriptor.receiveBufferSize;
for (auto interf : descriptor.interfaceWhiteList)
{
UDPv4Transport::interface_whitelist_.emplace_back(asio::ip::address_v4::from_string(interf));
}
test_UDPv4Transport_DropLog.clear();
test_UDPv4Transport_DropLogLength = descriptor.dropLogLength;
}
Expand Down
2 changes: 2 additions & 0 deletions test/blackbox/BlackboxTests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ std::list<Data1mb> default_data300kb_data_generator(size_t max = 0);

std::list<Data1mb> default_data300kb_mix_data_generator(size_t max = 0);

std::list<Data1mb> default_data96kb_data300kb_data_generator(size_t max = 0);

/****** Auxiliary lambda functions ******/
extern const std::function<void(const HelloWorld&)> default_helloworld_print;

Expand Down
62 changes: 62 additions & 0 deletions test/blackbox/BlackboxTestsPubSubFragments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,68 @@ TEST(PubSubFragments, AsyncPubSubAsReliableData300kbInLossyConditions)
testTransport->dropLogLength);
}

// Test introduced to verify the fix of the bug (#7609 Do not reuse cache change if sample does not fit)
// detected in relase 1.9.4
TEST(PubSubFragments, AsyncPubSubAsBestEffortAlternateSizeInLossyConditions)
{
PubSubReader<Data1mbType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbType> writer(TEST_TOPIC_NAME);

auto reader_transport = std::make_shared<UDPv4TransportDescriptor>();
reader_transport->interfaceWhiteList.push_back("127.0.0.1");

reader
.disable_builtin_transport()
.add_user_transport_to_pparams(reader_transport)
.history_depth(5)
.reliability(eprosima::fastrtps::BEST_EFFORT_RELIABILITY_QOS)
.mem_policy(eprosima::fastrtps::rtps::DYNAMIC_RESERVE_MEMORY_MODE)
.init();

ASSERT_TRUE(reader.isInitialized());


// To simulate lossy conditions, we are going to remove the default
// bultin transport, and instead use a lossy shim layer variant.
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->sendBufferSize = 65536;
testTransport->receiveBufferSize = 65536;
// We drop 50% of all data frags
testTransport->dropDataFragMessagesPercentage = 50;
testTransport->dropLogLength = 10;
// Only one interface in order to really drop 50% of packages!!!
testTransport->interfaceWhiteList.push_back("127.0.0.1");

writer
.disable_builtin_transport()
.add_user_transport_to_pparams(testTransport)
.history_depth(5)
.asynchronously(eprosima::fastrtps::SYNCHRONOUS_PUBLISH_MODE)
.init();

ASSERT_TRUE(writer.isInitialized());

// Because its volatile the durability
// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_data96kb_data300kb_data_generator(2);

reader.startReception(data);
writer.send(data);

// All data has 7 fragments so when 3 has been lost all data has been sent
// Wait until then
while(eprosima::fastrtps::rtps::test_UDPv4Transport::test_UDPv4Transport_DropLog.size() < 3)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}

// A second should be enough time to assure all data has been received
std::this_thread::sleep_for(std::chrono::seconds(1));
}

TEST(PubSubFragments, AsyncPubSubAsReliableData300kbInLossyConditionsSmallFragments)
{
PubSubReader<Data1mbType> reader(TEST_TOPIC_NAME);
Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ class PubSubReader
return *this;
}

PubSubReader& mem_policy(const eprosima::fastrtps::rtps::MemoryManagementPolicy mem_policy)
{
subscriber_attr_.historyMemoryPolicy = mem_policy;
return *this;
}

PubSubReader& deadline_period(const eprosima::fastrtps::Duration_t deadline_period)
{
subscriber_attr_.qos.m_deadline.period = deadline_period;
Expand Down
24 changes: 24 additions & 0 deletions test/blackbox/utils/data_generators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,27 @@ std::list<Data1mb> default_data300kb_mix_data_generator(size_t max)

return returnedValue;
}

const size_t data96kb_length = 96*1024;
std::list<Data1mb> default_data96kb_data300kb_data_generator(size_t max)
{
unsigned char index = 1;
size_t maximum = max ? max : 10;
std::list<Data1mb> returnedValue(maximum);

std::generate(returnedValue.begin(), returnedValue.end(), [&index]
{
Data1mb data;
size_t length = index % 2 != 0 ? data96kb_length : data300kb_length;
data.data().resize(length);
data.data()[0] = index;
for (size_t i = 1; i < length; ++i)
{
data.data()[i] = static_cast<unsigned char>(i + data.data()[0]);
}
++index;
return data;
});

return returnedValue;
}