Skip to content

Commit

Permalink
Refs 7288. Remove from history when change is not on instance
Browse files Browse the repository at this point in the history
This is a port of #961 from 1.9.x
  • Loading branch information
IkerLuengo committed Feb 24, 2020
1 parent d402a75 commit 506c3ed
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 23 deletions.
39 changes: 18 additions & 21 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,37 +424,34 @@ bool SubscriberHistory::remove_change_sub(
}

std::lock_guard<RecursiveTimedMutex> guard(*mp_mutex);
if (topic_att_.getTopicKind() == NO_KEY)
{
if (remove_change(change))
{
m_isHistoryFull = false;
return true;
}
return false;
}
else
if (topic_att_.getTopicKind() == WITH_KEY)
{
bool found = false;
t_m_Inst_Caches::iterator vit;
if (!find_key(change, &vit))
{
return false;
}

for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
if (find_key(change, &vit))
{
if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID)
for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
{
if (remove_change(change))
if ((*chit)->sequenceNumber == change->sequenceNumber && (*chit)->writerGUID == change->writerGUID)
{
vit->second.cache_changes.erase(chit);
m_isHistoryFull = false;
return true;
found = true;
break;
}
}
}
logError(SUBSCRIBER, "Change not found on this key, something is wrong");
if (!found)
{
logError(SUBSCRIBER, "Change not found on this key, something is wrong");
}
}

if (remove_change(change))
{
m_isHistoryFull = false;
return true;
}

return false;
}

Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/BlackboxTests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ void default_receive_print(const HelloWorld& hello);
template<>
void default_receive_print(const FixedSized& hello);

template<>
void default_receive_print(const KeyedHelloWorld& str);

template<>
void default_receive_print(const String& str);

Expand All @@ -85,6 +88,9 @@ void default_send_print(const HelloWorld& hello);
template<>
void default_send_print(const FixedSized& hello);

template<>
void default_send_print(const KeyedHelloWorld& str);

template<>
void default_send_print(const String& str);

Expand Down
88 changes: 88 additions & 0 deletions test/blackbox/BlackboxTestsPubSubHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,91 @@ TEST(BlackBox, PubSubAsReliableKeepLastReaderSmallDepthTwoPublishers)
ASSERT_EQ(received.index(), 3u);
}

TEST(BlackBox, PubSubAsReliableKeepLastWithKey)
{
PubSubReader<KeyedHelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldType> writer(TEST_TOPIC_NAME);

uint32_t keys = 2;

reader.resource_limits_max_instances(keys).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS).
key(true).init();

ASSERT_TRUE(reader.isInitialized());

writer.resource_limits_max_instances(keys).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
key(true).init();

ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_keyedhelloworld_data_generator();
reader.startReception(data);

// Send data
writer.send(data);
ASSERT_TRUE(data.empty());

reader.block_for_all();
reader.stopReception();
}

TEST(BlackBox, PubSubAsReliableKeepLastReaderSmallDepthWithKey)
{
PubSubReader<KeyedHelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedHelloWorldType> writer(TEST_TOPIC_NAME);

uint32_t keys = 2;
uint32_t depth = 2;

reader.history_depth(depth).
resource_limits_max_samples(keys*depth).
resource_limits_allocated_samples(keys*depth).
resource_limits_max_instances(keys).
resource_limits_max_samples_per_instance(depth).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
history_kind(eprosima::fastrtps::KEEP_LAST_HISTORY_QOS).
key(true).init();

ASSERT_TRUE(reader.isInitialized());

writer.history_depth(depth).
resource_limits_max_samples(keys*depth).
resource_limits_allocated_samples(keys*depth).
resource_limits_max_instances(keys).
resource_limits_max_samples_per_instance(depth).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).
key(true).init();

ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

//We want the number of messages to be multiple of keys*depth
auto data = default_keyedhelloworld_data_generator(3*keys*depth);
while(data.size() > 1)
{
auto expected_data(data);

// Send data
writer.send(data);
ASSERT_TRUE(data.empty());
std::this_thread::sleep_for(std::chrono::milliseconds(500));

reader.startReception(expected_data);
size_t current_received = reader.block_for_at_least(keys*depth);
reader.stopReception();
ASSERT_EQ(current_received, keys*depth);

data = reader.data_not_received();
}
}

12 changes: 12 additions & 0 deletions test/blackbox/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,18 @@ class PubSubReader
return *this;
}

PubSubReader& resource_limits_max_instances(const int32_t max)
{
subscriber_attr_.topic.resourceLimitsQos.max_instances = max;
return *this;
}

PubSubReader& resource_limits_max_samples_per_instance(const int32_t max)
{
subscriber_attr_.topic.resourceLimitsQos.max_samples_per_instance = max;
return *this;
}

PubSubReader& matched_writers_allocation(size_t initial, size_t maximum)
{
subscriber_attr_.matched_publisher_allocation.initial = initial;
Expand Down
14 changes: 14 additions & 0 deletions test/blackbox/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,20 @@ class PubSubWriter
return *this;
}

PubSubWriter& resource_limits_max_instances(
const int32_t max)
{
publisher_attr_.topic.resourceLimitsQos.max_instances = max;
return *this;
}

PubSubWriter& resource_limits_max_samples_per_instance(
const int32_t max)
{
publisher_attr_.topic.resourceLimitsQos.max_samples_per_instance = max;
return *this;
}

PubSubWriter& matched_readers_allocation(
size_t initial,
size_t maximum)
Expand Down
11 changes: 9 additions & 2 deletions test/blackbox/types/KeyedHelloWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ using namespace eprosima::fastcdr::exception;

KeyedHelloWorld::KeyedHelloWorld()
{
m_index = 0;
m_key = 0;

}

KeyedHelloWorld::~KeyedHelloWorld()
Expand All @@ -45,18 +45,21 @@ KeyedHelloWorld::~KeyedHelloWorld()

KeyedHelloWorld::KeyedHelloWorld(const KeyedHelloWorld &x)
{
m_index = x.m_index;
m_key = x.m_key;
m_message = x.m_message;
}

KeyedHelloWorld::KeyedHelloWorld(KeyedHelloWorld &&x)
{
m_index = x.m_index;
m_key = x.m_key;
m_message = std::move(x.m_message);
}

KeyedHelloWorld& KeyedHelloWorld::operator=(const KeyedHelloWorld &x)
{
m_index = x.m_index;
m_key = x.m_key;
m_message = x.m_message;

Expand All @@ -65,6 +68,7 @@ KeyedHelloWorld& KeyedHelloWorld::operator=(const KeyedHelloWorld &x)

KeyedHelloWorld& KeyedHelloWorld::operator=(KeyedHelloWorld &&x)
{
m_index = x.m_index;
m_key = x.m_key;
m_message = std::move(x.m_message);

Expand All @@ -73,7 +77,8 @@ KeyedHelloWorld& KeyedHelloWorld::operator=(KeyedHelloWorld &&x)

bool KeyedHelloWorld::operator==(const KeyedHelloWorld &x) const
{
if(m_message == x.m_message &&
if(m_index == x.m_index &&
m_message == x.m_message &&
m_key == x.m_key)
return true;

Expand All @@ -99,6 +104,7 @@ size_t KeyedHelloWorld::getCdrSerializedSize(const KeyedHelloWorld& data, size_t
void KeyedHelloWorld::serialize(eprosima::fastcdr::Cdr &scdr) const
{
scdr << m_key;
scdr << m_index;

if(m_message.length() <= 256)
{
Expand All @@ -113,6 +119,7 @@ void KeyedHelloWorld::serialize(eprosima::fastcdr::Cdr &scdr) const
void KeyedHelloWorld::deserialize(eprosima::fastcdr::Cdr &dcdr)
{
dcdr >> m_key;
dcdr >> m_index;
dcdr >> m_message;
}

Expand Down
28 changes: 28 additions & 0 deletions test/blackbox/types/KeyedHelloWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,33 @@ class KeyedHelloWorld
{
return m_key;
}

/*
* @brief This function sets a value in member index
* @param _index New value for member index
*/
inline eProsima_user_DllExport void index(uint16_t _index)
{
m_index = _index;
}

/*!
* @brief This function returns the value of member index
* @return Value of member index
*/
inline eProsima_user_DllExport uint16_t index() const
{
return m_index;
}

/*!
* @brief This function returns a reference to member index
* @return Reference to member index
*/
inline eProsima_user_DllExport uint16_t& index()
{
return m_index;
}
/*!
* @brief This function copies the value in member message
* @param _message New value to be copied in member message
Expand Down Expand Up @@ -201,6 +228,7 @@ class KeyedHelloWorld
eProsima_user_DllExport void serializeKey(eprosima::fastcdr::Cdr &cdr) const;

private:
uint16_t m_index;
uint16_t m_key;
std::string m_message;
};
Expand Down
1 change: 1 addition & 0 deletions test/blackbox/utils/data_generators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ std::list<KeyedHelloWorld> default_keyedhelloworld_data_generator(size_t max)
std::generate(returnedValue.begin(), returnedValue.end(), [&index]
{
KeyedHelloWorld hello;
hello.index(index);
hello.key(index % 2);
std::stringstream ss;
ss << "HelloWorld " << index;
Expand Down
12 changes: 12 additions & 0 deletions test/blackbox/utils/print_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ void default_receive_print(const HelloWorld& hello)
std::cout << "Received HelloWorld " << hello.index() << std::endl;
}

template<>
void default_receive_print(const KeyedHelloWorld& hello)
{
std::cout << "Received HelloWorld " << hello.index() << " with key " << hello.key() << std::endl;
}

template<>
void default_receive_print(const FixedSized& hello)
{
Expand Down Expand Up @@ -59,6 +65,12 @@ void default_send_print(const HelloWorld& hello)
std::cout << "Sent HelloWorld " << hello.index() << std::endl;
}

template<>
void default_send_print(const KeyedHelloWorld& hello)
{
std::cout << "Sent HelloWorld " << hello.index() << " with key " << hello.key() << std::endl;
}

template<>
void default_send_print(const FixedSized& hello)
{
Expand Down

0 comments on commit 506c3ed

Please sign in to comment.