Skip to content

Commit

Permalink
Fix read_next_sample and take_next_sample (#1899)
Browse files Browse the repository at this point in the history
This is a port of #1732 from <master> to <2.2.x>

* Refs 10476. Fix on DataReaderTests.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Added xxx_next_sample tests to DataReaderTests.read_unread

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Strict real-time on read_or_take.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Using LoanableTypedCollection<SampleInfo> on ReadTakeCommand and DataReaderLoanManager.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Use ReadTakeCommand on take_next_sample.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Use ReadTakeCommand on read_next_sample via read_or_take_next_sample.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Adapt tests to new behavior of take

Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>

* Refs 10476. Added method get_first_change_with_minimum_ts to ReaderHistory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Add new method to mock.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. PubSubReader::last_seq is now a map for each instance.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Added UserAllocatedSequence.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Added take_first_data to PubSubReader.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Using take_first_data on LifespanQos blackbox test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10476. Solve build error on non-windows platforms.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10480. Apply suggestions from code review

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

Co-authored-by: IkerLuengo <57146230+IkerLuengo@users.noreply.github.com>

Co-authored-by: Iker Luengo <ikerluengo@eprosima.com>
Co-authored-by: IkerLuengo <57146230+IkerLuengo@users.noreply.github.com>
Signed-off-by: Iker Luengo <ikerluengo@eprosima.com>
  • Loading branch information
IkerLuengo and IkerLuengo authored Apr 23, 2021
1 parent da33604 commit 12fdf42
Show file tree
Hide file tree
Showing 14 changed files with 355 additions and 97 deletions.
103 changes: 103 additions & 0 deletions include/fastdds/dds/core/UserAllocatedSequence.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file UserAllocatedSequence.hpp
*/

#ifndef _FASTDDS_DDS_CORE_USERALLOCATEDSEQUENCE_HPP_
#define _FASTDDS_DDS_CORE_USERALLOCATEDSEQUENCE_HPP_

#include <cassert>
#include <cstdint>
#include <stdexcept>

#include <fastdds/dds/core/LoanableCollection.hpp>

namespace eprosima {
namespace fastdds {
namespace dds {

/**
* A collection of generic opaque pointers allocated by the user.
*
* This kind of collection would always return @c true for @c has_ownership(),
* and thus would not be able to receive loans.
* It would also have an inmutable @c maximum(), so it would not allow @c length() to grow beyond the maximum
* value indicated on construction.
*/
struct UserAllocatedSequence : public LoanableCollection
{
using size_type = LoanableCollection::size_type;
using element_type = LoanableCollection::element_type;

/**
* Construct a UserAllocatedSequence.
*
* @param [in] items Pointer to the beginning of an array of @c num_items opaque pointers.
* @param [in] num_items Number of opaque pointers in @c items.
*
* @post buffer() == items
* @post has_ownership() == true
* @post length() == 0
* @post maximum() == num_items
*/
UserAllocatedSequence(
element_type* items,
size_type num_items)
{
has_ownership_ = true;
maximum_ = num_items;
length_ = 0;
elements_ = items;
}

~UserAllocatedSequence() = default;

// Non-copyable
UserAllocatedSequence(
const UserAllocatedSequence&) = delete;
UserAllocatedSequence& operator = (
const UserAllocatedSequence&) = delete;

// Non-moveable
UserAllocatedSequence(
UserAllocatedSequence&&) = delete;
UserAllocatedSequence& operator = (
UserAllocatedSequence&&) = delete;

protected:

using LoanableCollection::maximum_;
using LoanableCollection::length_;
using LoanableCollection::elements_;
using LoanableCollection::has_ownership_;

void resize(
size_type new_length) override
{
// This kind of collection cannot grow above its stack-allocated size
if (new_length > maximum_)
{
throw std::bad_alloc();
}
}

};

} // namespace dds
} // namespace fastdds
} // namespace eprosima

#endif // _FASTDDS_DDS_CORE_USERALLOCATEDSEQUENCE_HPP_
3 changes: 3 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ class ReaderHistory : public History
RTPS_DllAPI void do_release_cache(
CacheChange_t* ch) override;

iterator get_first_change_with_minimum_ts(
const Time_t timestamp);

//!Pointer to the reader
RTPSReader* mp_reader;

Expand Down
82 changes: 52 additions & 30 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

#include <fastdds/subscriber/DataReaderImpl.hpp>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/dds/core/StackAllocatedSequence.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
Expand Down Expand Up @@ -409,7 +410,19 @@ ReturnCode_t DataReaderImpl::read_or_take(
return code;
}

std::lock_guard<RecursiveTimedMutex> lock(reader_->getMutex());
auto max_blocking_time = std::chrono::steady_clock::now() +
#if HAVE_STRICT_REALTIME
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
#else
std::chrono::hours(24);
#endif // if HAVE_STRICT_REALTIME

std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex(), std::defer_lock);

if (!lock.try_lock_until(max_blocking_time))
{
return ReturnCode_t::RETCODE_TIMEOUT;
}

auto it = history_.lookup_instance(handle, exact_instance);
if (!it.first)
Expand Down Expand Up @@ -560,9 +573,10 @@ ReturnCode_t DataReaderImpl::return_loan(
return ReturnCode_t::RETCODE_OK;
}

ReturnCode_t DataReaderImpl::read_next_sample(
ReturnCode_t DataReaderImpl::read_or_take_next_sample(
void* data,
SampleInfo* info)
SampleInfo* info,
bool should_take)
{
if (reader_ == nullptr)
{
Expand All @@ -580,43 +594,51 @@ ReturnCode_t DataReaderImpl::read_next_sample(
#else
std::chrono::hours(24);
#endif // if HAVE_STRICT_REALTIME
SampleInfo_t rtps_info;
if (history_.readNextData(data, &rtps_info, max_blocking_time))
{
sample_info_to_dds(rtps_info, info);
return ReturnCode_t::RETCODE_OK;
}
return ReturnCode_t::RETCODE_ERROR;
}

ReturnCode_t DataReaderImpl::take_next_sample(
void* data,
SampleInfo* info)
{
if (reader_ == nullptr)
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex(), std::defer_lock);

if (!lock.try_lock_until(max_blocking_time))
{
return ReturnCode_t::RETCODE_NOT_ENABLED;
return ReturnCode_t::RETCODE_TIMEOUT;
}

if (history_.getHistorySize() == 0)
auto it = history_.lookup_instance(HANDLE_NIL, false);
if (!it.first)
{
return ReturnCode_t::RETCODE_NO_DATA;
}

auto max_blocking_time = std::chrono::steady_clock::now() +
#if HAVE_STRICT_REALTIME
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
#else
std::chrono::hours(24);
#endif // if HAVE_STRICT_REALTIME
StackAllocatedSequence<void*, 1> data_values;
const_cast<void**>(data_values.buffer())[0] = data;
StackAllocatedSequence<SampleInfo, 1> sample_infos;

SampleInfo_t rtps_info;
if (history_.takeNextData(data, &rtps_info, max_blocking_time))
detail::StateFilter states{ NOT_READ_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE };
detail::ReadTakeCommand cmd(*this, data_values, sample_infos, 1, states, it.second, false);
while (!cmd.is_finished())
{
sample_info_to_dds(rtps_info, info);
return ReturnCode_t::RETCODE_OK;
cmd.add_instance(should_take);
}

ReturnCode_t code = cmd.return_value();
if (ReturnCode_t::RETCODE_OK == code)
{
*info = sample_infos[0];
}
return ReturnCode_t::RETCODE_ERROR;
return code;
}

ReturnCode_t DataReaderImpl::read_next_sample(
void* data,
SampleInfo* info)
{
return read_or_take_next_sample(data, info, false);
}

ReturnCode_t DataReaderImpl::take_next_sample(
void* data,
SampleInfo* info)
{
return read_or_take_next_sample(data, info, true);
}

ReturnCode_t DataReaderImpl::get_first_untaken_info(
Expand Down
5 changes: 5 additions & 0 deletions src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ class DataReaderImpl
bool single_instance,
bool should_take);

ReturnCode_t read_or_take_next_sample(
void* data,
SampleInfo* info,
bool should_take);

/**
* @brief A method called when a new cache change is added
* @param change The cache change that has been added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <cassert>

#include <fastdds/dds/core/LoanableCollection.hpp>
#include <fastdds/dds/core/LoanableSequence.hpp>
#include <fastdds/dds/core/LoanableTypedCollection.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>

Expand All @@ -38,7 +38,7 @@ namespace detail {

struct DataReaderLoanManager
{
using SampleInfoSeq = LoanableSequence<SampleInfo>;
using SampleInfoSeq = LoanableTypedCollection<SampleInfo>;
using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t;

explicit DataReaderLoanManager(
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <cstdint>

#include <fastdds/dds/core/LoanableCollection.hpp>
#include <fastdds/dds/core/LoanableSequence.hpp>
#include <fastdds/dds/core/LoanableTypedCollection.hpp>
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>

Expand Down Expand Up @@ -51,7 +51,7 @@ struct ReadTakeCommand
using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t;
using RTPSReader = eprosima::fastrtps::rtps::RTPSReader;
using WriterProxy = eprosima::fastrtps::rtps::WriterProxy;
using SampleInfoSeq = LoanableSequence<SampleInfo>;
using SampleInfoSeq = LoanableTypedCollection<SampleInfo>;

ReadTakeCommand(
DataReaderImpl& reader,
Expand Down
31 changes: 18 additions & 13 deletions src/cpp/rtps/history/ReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,8 @@ bool ReaderHistory::add_change(
logError(RTPS_READER_HISTORY, "The Writer GUID_t must be defined");
}

if (!m_changes.empty() && a_change->sourceTimestamp < (*m_changes.rbegin())->sourceTimestamp)
{
auto it = std::lower_bound(m_changes.begin(), m_changes.end(), a_change,
[](const CacheChange_t* c1, const CacheChange_t* c2) -> bool
{
return c1->sourceTimestamp < c2->sourceTimestamp;
});
m_changes.insert(it, a_change);
}
else
{
m_changes.push_back(a_change);
}
auto it = get_first_change_with_minimum_ts(a_change->sourceTimestamp);
m_changes.insert(it, a_change);

logInfo(RTPS_READER_HISTORY,
"Change " << a_change->sequenceNumber << " added with " << a_change->serializedPayload.length << " bytes");
Expand Down Expand Up @@ -242,6 +231,22 @@ void ReaderHistory::do_release_cache(
mp_reader->releaseCache(ch);
}

History::iterator ReaderHistory::get_first_change_with_minimum_ts(
const Time_t timestamp)
{
if (!m_changes.empty() && timestamp < (*m_changes.rbegin())->sourceTimestamp)
{
iterator it = std::lower_bound(m_changes.begin(), m_changes.end(), timestamp,
[](const CacheChange_t* c1, const Time_t& ts) -> bool
{
return c1->sourceTimestamp < ts;
});
return it;
}

return m_changes.end();
}

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
Loading

0 comments on commit 12fdf42

Please sign in to comment.