Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataReader::read() reads invalid data #5136

Closed
1 task done
fschoenm opened this issue Aug 5, 2024 · 7 comments · Fixed by eProsima/Fast-CDR#233
Closed
1 task done

DataReader::read() reads invalid data #5136

fschoenm opened this issue Aug 5, 2024 · 7 comments · Fixed by eProsima/Fast-CDR#233
Labels
in progress Issue or PR which is being reviewed

Comments

@fschoenm
Copy link
Contributor

fschoenm commented Aug 5, 2024

Is there an already existing issue for this?

  • I have searched the existing issues

Expected behavior

Receive the messages exactly as they are sent. (According to https://fast-dds.docs.eprosima.com/en/latest/fastdds/dds_layer/subscriber/dataReader/readingData.html).

Current behavior

DataReader::read() seems to invent data or merges received data from multiple messages.

We have a simple publisher/subscriber scenario with a message type that contains a map/std::map. When publishing multiple messages, subsequent messages don't receive the message that has been sent but message contents of multiple messages are merged into one:

Message sent: {0: "abc"}
Message rcvd: {0: "abc"}
Message sent: {1: "def"}
Message rcvd: {0: "abc", 1: "def"}

This seems to happen when using DataReader::read() (with DataReader::read_next_sample() it seems to work correctly but that is not our use case so I haven't investigated a lot).

Steps to reproduce

Here's a hopefully complete reproduction scenario. Code from IDL is generated with fastddsgen -d idl msg.idl.

main.cpp
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/subscriber/SubscriberListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>

#include "idl/msg.h"
#include "idl/msgPubSubTypes.h"

#include <fmt/format.h>
#include <fmt/ranges.h>

using namespace std::chrono_literals;

using MsgT = ComplexMessage;
using MsgPubSubT = ComplexMessagePubSubType;

template <class MsgT>
struct SubscriberLoans
{
    eprosima::fastdds::dds::LoanableSequence<MsgT> data;
    eprosima::fastdds::dds::LoanableSequence<eprosima::fastdds::dds::SampleInfo> infos;
};

class Listener : public eprosima::fastdds::dds::SubscriberListener
{
public:
    void on_data_available(eprosima::fastdds::dds::DataReader* reader) override
    {
        while (reader->read(loans.data, loans.infos, eprosima::fastdds::dds::LENGTH_UNLIMITED,
                            eprosima::fastdds::dds::NOT_READ_SAMPLE_STATE) == ReturnCode_t::RETCODE_OK)
        {
            for (eprosima::fastdds::dds::LoanableCollection::size_type i = 0; i < loans.infos.length(); ++i)
            {
                const auto& infos = loans.infos[i];
                const auto& msg = loans.data[i];
                std::cout << "Message rcvd: " << fmt::format("{}\n", msg.dict());
            }

            auto ret = reader->return_loan(loans.data, loans.infos);
            assert(ret == ReturnCode_t::RETCODE_OK);
        }
    }

private:
    SubscriberLoans<MsgT> loans;
};

int
main()
{
    eprosima::fastdds::dds::DomainParticipantQos qos;
    eprosima::fastdds::dds::DomainParticipant* part{
        eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant(
            1, qos)
    };

    // register types
    eprosima::fastdds::dds::TypeSupport typeSupport(new MsgPubSubT());
    part->register_type(typeSupport);

    // create publisher
    eprosima::fastdds::dds::PublisherQos pubQos;
    auto pub = part->create_publisher(pubQos);
    eprosima::fastdds::dds::TopicQos topicQos;
    auto topic = part->create_topic("/msg", typeSupport.get_type_name(), topicQos);
    eprosima::fastdds::dds::DataWriterQos writerQos;
    auto writer = pub->create_datawriter(topic, writerQos);

    // create subscriber
    Listener listener;
    eprosima::fastdds::dds::SubscriberQos subQos;
    auto sub = part->create_subscriber(subQos);
    eprosima::fastdds::dds::DataReaderQos readerQos;
    auto reader = sub->create_datareader(topic, readerQos, &listener);

    // publish data
    MsgT msg0;
    msg0.dict().insert(std::make_pair(0, "abc"));
    std::cout << "Message sent: " << fmt::format("{}\n", msg0.dict());
    writer->write(&msg0);
    std::this_thread::sleep_for(20ms);

    MsgT msg1;
    msg1.dict().insert(std::make_pair(1, "def"));
    std::cout << "Message sent: " << fmt::format("{}\n", msg1.dict());
    writer->write(&msg1);
    std::this_thread::sleep_for(20ms);

    return 0;
}
msg.idl
struct ComplexMessage {
	map<uint32, string> dict;
};
CMakeLists.txt
cmake_minimum_required(VERSION 3.23)

project(fastdds-bug LANGUAGES CXX)

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

find_package(fastdds REQUIRED)
find_package(fmt REQUIRED)

add_executable(bug main.cpp)
target_compile_features(bug PUBLIC cxx_std_23)
target_link_libraries(bug PUBLIC fast-dds::fast-dds)
target_link_libraries(bug PUBLIC fmt::fmt)
target_sources(bug PRIVATE idl/msg.cxx idl/msgPubSubTypes.cxx)

Fast DDS version/commit

2.14.2

Platform/Architecture

Windows 10 Visual Studio 2019

Transport layer

Default configuration, UDPv4 & SHM

Additional context

n/a

XML configuration file

n/a

Relevant log output

Message sent: {0: "abc"}
Message rcvd: {0: "abc"}
Message sent: {1: "def"}
Message rcvd: {0: "abc", 1: "def"}

Network traffic capture

n/a

Originally posted by @fschoenm in #5134

@fschoenm fschoenm changed the title ### Is there an already existing issue for this? DataReader::read() reads invalid data Aug 5, 2024
@fschoenm
Copy link
Contributor Author

fschoenm commented Aug 5, 2024

(Reopened from #5127 because I couldn't add comments to that issue anymore.)

For clarification, this does not read previous samples but invents a totally new sample which is compromised of the data of multiple previous samples (see the log output above). This seems to happen if the message contains a map. With simpler data type, it works.

Also when changing the callback to use reader->read_next_sample(&msg, &info) == ReturnCode_t::RETCODE_OK instead, it works as expected while the documentation states that

The read_next_sample operation is semantically equivalent to the read operation where the input Data sequence has max_length = 1 , the sample_states = NOT_READ_SAMPLE_STATE , the view_states = ANY_VIEW_STATE , and the instance_states = ANY_INSTANCE_STATE .

@fschoenm
Copy link
Contributor Author

fschoenm commented Aug 5, 2024

So I tried to figure this out a little bit more and I'm not really sure how it's supposed to work.

It seems to come down to the SampleLoanManager that keeps a list of supposedly "unused" loans (free_loans_). There's a function to return the loan:

    void return_loan(
            void* sample)
    {
        OutstandingLoanItem* item = find_by_sample(sample);
        assert(nullptr != item);

        item->num_refs -= 1;
        if (item->num_refs == 0)
        {
            CacheChange_t tmp;
            tmp.payload_owner(item->owner);
            tmp.serializedPayload = item->payload;
            item->owner->release_payload(tmp);
            item->payload.data = nullptr;
            item->owner = nullptr;

            item = free_loans_.push_back(*item);
            assert(nullptr != item);
            used_loans_.remove(*item);
        }
    }

But it doesn't really change the sample attached to the item; it just changes other stuff and moves the item from the used_loans_ to the free_loans_, which means that the sample item is still attached to it.

Later it reuses the same loan (with the sample still attached to it) from the list of available loans in get_loan():

    void get_loan(
            CacheChange_t* change,
            void*& sample)
    {
        // ...
        else
        {
            // Reuse a free entry
            item = used_loans_.push_back(free_loans_.back());
            assert(nullptr != item);
            free_loans_.pop_back();
        }

        // ...

        if (is_plain_)
        {
            // ...
        }
        else
        {
            type_->deserialize(&item->payload, item->sample);
        }

       // ...
}

However, this function also does not change the sample (unless it's plain which is isn't), which at this point is still attached to the item. Because of the way, maps are deserialized (emplaced into the existing map), this doesn't really work. I think it also may fail for other data types.

I saw that in some cases, item->sample = type_->createData() is called for non-plain data types but this is missing when returning a loan.

@fschoenm

This comment was marked as outdated.

@MiguelCompany
Copy link
Member

I think the actual issue lies in the generated type support which, when deserializing, should clear the map before adding elements to it.

If you use read_next_sample passing always the same data instance, you will observe the same wrong behavior.

Could you look at the generated deserialize method, and ensure the map is cleared before reading the number of elements?

@fschoenm
Copy link
Contributor Author

fschoenm commented Aug 6, 2024

@MiguelCompany No, there's nothing of the sort in deserialize:

template<>
eProsima_user_DllExport void deserialize(
        eprosima::fastcdr::Cdr& cdr,
        ComplexMessage& data)
{
    cdr.deserialize_type(eprosima::fastcdr::CdrVersion::XCDRv2 == cdr.get_cdr_version() ?
            eprosima::fastcdr::EncodingAlgorithmFlag::DELIMIT_CDR2 :
            eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR,
            [&data](eprosima::fastcdr::Cdr& dcdr, const eprosima::fastcdr::MemberId& mid) -> bool
            {
                bool ret_value = true;
                switch (mid.id)
                {
                                        case 0:
                                                dcdr >> data.dict();
                                            break;

                    default:
                        ret_value = false;
                        break;
                }
                return ret_value;
            });
}

@fschoenm
Copy link
Contributor Author

fschoenm commented Aug 6, 2024

Same for the other deserialze method, not sure which one would be supposed to clear:

bool ComplexMessagePubSubType::deserialize(
        SerializedPayload_t* payload,
        void* data)
{
    try
    {
        // Convert DATA to pointer of your type
        ComplexMessage* p_type = static_cast<ComplexMessage*>(data);

        // Object that manages the raw buffer.
        eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast<char*>(payload->data), payload->length);

        // Object that deserializes the data.
        eprosima::fastcdr::Cdr deser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN
#if FASTCDR_VERSION_MAJOR == 1
                , eprosima::fastcdr::Cdr::CdrType::DDS_CDR
#endif // FASTCDR_VERSION_MAJOR == 1
                );

        // Deserialize encapsulation.
        deser.read_encapsulation();
        payload->encapsulation = deser.endianness() == eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;

        // Deserialize the object.
        deser >> *p_type;
    }
    catch (eprosima::fastcdr::exception::Exception& /*exception*/)
    {
        return false;
    }

    return true;
}

@fschoenm
Copy link
Contributor Author

fschoenm commented Aug 6, 2024

@MiguelCompany Ok, seems like there's missing a clear in fast-cdr's Cdr.h:

    template<class _K, class _T, typename std::enable_if<!std::is_enum<_T>::value &&
            !std::is_arithmetic<_T>::value>::type* = nullptr>
    Cdr& deserialize(
            std::map<_K, _T>& map_t)
    {
        if (CdrVersion::XCDRv2 == cdr_version_)
        {
            uint32_t dheader {0};
            deserialize(dheader);

            auto offset = offset_;

            uint32_t map_length {0};
            deserialize(map_length);

            map_t.clear();

            uint32_t count {0};
            while (offset_ - offset < dheader && count < map_length)
            {
                _K key;
                _T val;
                deserialize(key);
                deserialize(val);
                map_t.emplace(std::pair<_K, _T>(std::move(key), std::move(val)));
                ++count;
            }

            if (offset_ - offset != dheader)
            {
                throw exception::BadParamException("Member size greater than size specified by DHEADER");
            }
        }
        else
        {
            uint32_t sequence_length = 0;
            state state_(*this);

            deserialize(sequence_length);

            try
            {
                for (uint32_t i = 0; i < sequence_length; ++i)
                {
                    _K key;
                    _T value;
                    deserialize(key);
                    deserialize(value);
                    map_t.emplace(std::pair<_K, _T>(std::move(key), std::move(value)));
                }
            }
            catch (exception::Exception& ex)
            {
                set_state(state_);
                ex.raise();
            }
        }

        return *this;
    }

The else branch is missing a clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in progress Issue or PR which is being reviewed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants