Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20815] Only apply content filter to ALIVE changes #4876

Merged
merged 13 commits into from
Jun 6, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Refs #20815: Add regression test
Signed-off-by: eduponz <eduardoponz@eprosima.com>
  • Loading branch information
EduPonz committed Jun 5, 2024
commit bfd054dbc2e0181e0be1a86a5f34eae5e86ea186
259 changes: 259 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsContentFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,40 @@
// limitations under the License.

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <string>
#include <thread>

#include <gtest/gtest.h>

#include <fastdds/dds/core/status/BaseStatus.hpp>
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp>
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/DataWriterListener.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/qos/SubscriberQos.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/topic/qos/TopicQos.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/rtps/transport/test_UDPv4TransportDescriptor.h>
#include <fastrtps/attributes/LibrarySettingsAttributes.h>
#include <fastrtps/types/DynamicData.h>
#include <fastrtps/types/DynamicDataFactory.h>
#include <fastrtps/types/DynamicPubSubType.h>
#include <fastrtps/types/DynamicTypePtr.h>
#include <fastrtps/types/TypesBase.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "../types/HelloWorldTypeObject.h"
Expand Down Expand Up @@ -615,6 +640,240 @@ TEST(DDSContentFilter, CorrectlyHandleAliasOtherHeader)
EXPECT_NE(nullptr, filtered_topic);
}

/*
* Regression test for https://eprosima.easyredmine.com/issues/20815
* Check that the content filter is only applied to alive changes.
* The test creates a reliable writer and a reader with a content filter that only accepts messages with a specific
* string. After discovery, the writer sends 10 samples which pass the filer in 10 different instances, with the
* particularity that after each write, the instance is unregistered.
* The DATA(u) generated would not pass the filter if it was applied. To check that the filter is only applied to
* ALIVE changes (not unregister or disposed), the test checks that the reader receives 10 valid samples (one per
* sample sent) and 10 invalid samples (one per unregister). Furthermore, it also checks that no samples are lost.writer
*/
TEST(DDSContentFilter, OnlyFilterAliveChanges)
{
/* Create entity infrastructure */
// Create participant
auto dpf = DomainParticipantFactory::get_instance();
DomainId_t domain_id = static_cast<DomainId_t>(GET_PID() % 230);
auto participant = dpf->create_participant(domain_id, PARTICIPANT_QOS_DEFAULT);

/*
* Create a Dynamic type equivalent to that of KeyedHelloWorld.idl.
* We use a dynamic type because the Fast DDS generated TypeSupport does not contain the TypeObject code,
* which is necessary for the content filter to work; instead of regenerating all the types for a single
* test, we can leverage the DynamicType API to create the type and its TypeObject support.
*/
// Define the members ids of the struct for readability
enum class KeyedHelloWorldMembers : fastrtps::types::MemberId
{
KEY = 0,
INDEX = 1,
MESSAGE = 2
};

// Create the struct type builder
const std::string topic_type_name = "keyed_hello_world";
auto struct_builder(fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_struct_builder());
struct_builder->set_name(topic_type_name);

// Create the key member with the @key annotation
auto key_member_builder = fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_uint16_builder();
key_member_builder->apply_annotation(fastrtps::types::ANNOTATION_KEY_ID, "value", "true");
auto key_member = key_member_builder->build();

// Add members to the struct builder
struct_builder->add_member(static_cast<fastrtps::types::MemberId>(KeyedHelloWorldMembers::KEY), "key", key_member);
struct_builder->add_member(static_cast<fastrtps::types::MemberId>(KeyedHelloWorldMembers::INDEX), "index",
fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_uint16_type());
struct_builder->add_member(static_cast<fastrtps::types::MemberId>(KeyedHelloWorldMembers::MESSAGE), "message",
fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_string_type(128));

// Build the type
auto struct_type = struct_builder->build();

// Create type support
TypeSupport type(new fastrtps::types::DynamicPubSubType(struct_type));

// Set the autofill type object to true so that TypeObject is registered upon type registration
type->auto_fill_type_object(true);

// Register the type
ASSERT_EQ(ReturnCode_t::RETCODE_OK, type.register_type(participant));

// Create topic
auto topic = participant->create_topic("TestTopic", type->getName(), TOPIC_QOS_DEFAULT);
ASSERT_NE(nullptr, topic);

// Create content filtered topic
std::string expression = "index = 1";
auto filtered_topic = participant->create_contentfilteredtopic("FilteredTestTopic", topic, expression, {});
ASSERT_NE(nullptr, filtered_topic);

/* Discovery synchronization variable */
std::mutex discovery_mtx;
std::condition_variable discovery_cv;

/* Create reader */
// Custom DataReaderListener to count valid, invalid, and lost samples, and to monitor subscription matched status
class CustomReaderListener : public DataReaderListener
{
public:

CustomReaderListener(
fastrtps::types::DynamicType_ptr type,
std::condition_variable& discovery_cv)
: valid_samples(0u)
, invalid_samples(0u)
, lost_samples(0u)
, matched(0u)
, type_(type)
, discovery_cv_(discovery_cv)
{
}

void on_data_available(
DataReader* reader) override
{
fastrtps::types::DynamicData* data;
data = fastrtps::types::DynamicDataFactory::get_instance()->create_data(type_);
SampleInfo info;

while (ReturnCode_t::RETCODE_OK == reader->take_next_sample(data, &info))
{
if (info.valid_data)
{
++valid_samples;
}
else
{
++invalid_samples;
}
}
}

void on_sample_lost(
DataReader*,
const SampleLostStatus& status) override
{
lost_samples = status.total_count;
}

void on_subscription_matched(
DataReader*,
const SubscriptionMatchedStatus& info) override
{
matched = info.current_count;
if (0 < matched)
{
discovery_cv_.notify_one();
}
}

uint8_t valid_samples;
uint8_t invalid_samples;
uint8_t lost_samples;
uint8_t matched;

private:

fastrtps::types::DynamicType_ptr type_;
std::condition_variable& discovery_cv_;
};

// Create subscriber
auto sub = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr);
ASSERT_NE(nullptr, sub);

// Create DataReader
CustomReaderListener reader_listener(struct_type, discovery_cv);
DataReaderQos reader_qos;
reader_qos.history().depth = 2; // Each instance can hold a sample and the unregister
reader_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; // Reliable for determinism

auto reader = sub->create_datareader(filtered_topic, reader_qos, &reader_listener);
ASSERT_NE(nullptr, reader);

/* Create writer */
// Custom DataWriterListener publication matched status
class CustomWriterListener : public DataWriterListener
{
public:

CustomWriterListener(
std::condition_variable& discovery_cv)
: matched(0u)
, discovery_cv_(discovery_cv)
{
}

void on_publication_matched(
DataWriter*,
const PublicationMatchedStatus& info) override
{
matched = info.current_count;
if (0 < matched)
{
discovery_cv_.notify_one();
}
}

uint8_t matched;

private:

std::condition_variable& discovery_cv_;
};

// Create publisher
auto pub = participant->create_publisher(PUBLISHER_QOS_DEFAULT, nullptr);
ASSERT_NE(nullptr, pub);

// Create DataWriter
CustomWriterListener writer_listener(discovery_cv);
DataWriterQos writer_qos;
writer_qos.history().depth = 2; // Each instance can hold a sample and the unregister
writer_qos.reliability().kind = RELIABLE_RELIABILITY_QOS; // Reliable for determinism

auto writer = pub->create_datawriter(topic, writer_qos, &writer_listener);
ASSERT_NE(nullptr, writer);

/* Wait for discovery */
{
std::unique_lock<std::mutex> lck(discovery_mtx);
discovery_cv.wait_for(lck, std::chrono::seconds(3), [&]()
{
return (0 < writer_listener.matched) && (0 < reader_listener.matched);
});

ASSERT_GT(reader_listener.matched, 0);
ASSERT_GT(writer_listener.matched, 0);
}

/* Send 10 samples, each on a different instance, unregistering instances after writing */
for (uint16_t i = 0; i < 10; ++i)
{
fastrtps::types::DynamicData* data;
data = fastrtps::types::DynamicDataFactory::get_instance()->create_data(struct_type);
data->set_uint16_value(i, static_cast<fastrtps::types::MemberId>(KeyedHelloWorldMembers::KEY));
data->set_uint16_value(1u, static_cast<fastrtps::types::MemberId>(KeyedHelloWorldMembers::INDEX));

InstanceHandle_t handle = writer->register_instance(data);
ASSERT_NE(HANDLE_NIL, handle);
ASSERT_EQ(ReturnCode_t::RETCODE_OK, writer->write(data, handle));
ASSERT_EQ(ReturnCode_t::RETCODE_OK, writer->unregister_instance(data, handle));
fastrtps::types::DynamicDataFactory::get_instance()->delete_data(data);
}

// Wait until all samples are received
ASSERT_EQ(ReturnCode_t::RETCODE_OK, writer->wait_for_acknowledgments({3, 0}));

/* Check that both samples and unregisters are received */
ASSERT_EQ(reader_listener.valid_samples, 10u);
ASSERT_EQ(reader_listener.invalid_samples, 10u);
ASSERT_EQ(reader_listener.lost_samples, 0u);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down