Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix real-time in Flow Controller #3735

Merged
merged 14 commits into from
Sep 13, 2023
Merged
23 changes: 23 additions & 0 deletions include/fastdds/rtps/history/History.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,19 @@ class History
const_iterator removal,
bool release = true);

/**
* Remove a specific change from the history.
* No Thread Safe
* @param removal iterator to the CacheChange_t to remove.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @param release defaults to true and hints if the CacheChange_t should return to the pool
* @return iterator to the next CacheChange_t or end iterator.
*/
RTPS_DllAPI virtual iterator remove_change_nts(
const_iterator removal,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
bool release = true);

/**
* Remove all changes from the History
* @return True if everything was correctly removed.
Expand All @@ -159,6 +172,16 @@ class History
RTPS_DllAPI bool remove_change(
CacheChange_t* ch);

/**
* Remove a specific change from the history.
* @param ch Pointer to the CacheChange_t.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed.
*/
RTPS_DllAPI bool remove_change(
CacheChange_t* ch,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

/**
* Find a specific change in the history using the matches_change method criteria.
* @param ch Pointer to the CacheChange_t to search for.
Expand Down
13 changes: 13 additions & 0 deletions include/fastdds/rtps/history/ReaderHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ class ReaderHistory : public History
const_iterator removal,
bool release = true) override;

/**
* Remove a specific change from the history.
* No Thread Safe
* @param removal iterator to the change for removal
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @param release specifies if the change must be returned to the pool
* @return iterator to the next change if any
*/
RTPS_DllAPI iterator remove_change_nts(
const_iterator removal,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
bool release = true) override;

/**
* Criteria to search a specific CacheChange_t on history
* @param inner change to compare
Expand Down
25 changes: 25 additions & 0 deletions include/fastdds/rtps/history/WriterHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ class WriterHistory : public rtps::History
const_iterator removal,
bool release = true) override;

/**
* Remove a specific change from the history.
* No Thread Safe
* @param removal iterator to the change for removal
* @param release specifies if the change should be return to the pool
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return iterator to the next change if any
*/
RTPS_DllAPI iterator remove_change_nts(
const_iterator removal,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
bool release = true) override;

/**
* Criteria to search a specific CacheChange_t on history
* @param inner change to compare
Expand All @@ -99,6 +112,10 @@ class WriterHistory : public rtps::History
RTPS_DllAPI virtual bool remove_change_g(
CacheChange_t* a_change);

RTPS_DllAPI virtual bool remove_change_g(
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

RTPS_DllAPI bool remove_change(
const SequenceNumber_t& sequence_number);

Expand All @@ -111,6 +128,14 @@ class WriterHistory : public rtps::History
*/
RTPS_DllAPI bool remove_min_change();

/**
* Remove the CacheChange_t with the minimum sequenceNumber.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if correctly removed.
*/
RTPS_DllAPI bool remove_min_change(
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

RTPS_DllAPI SequenceNumber_t next_sequence_number() const
{
return m_lastCacheChangeSeqNum + 1;
Expand Down
7 changes: 4 additions & 3 deletions include/fastdds/rtps/messages/RTPSMessageGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ class RTPSMessageGroup
* Constructs a RTPSMessageGroup allowing to allocate its own buffer.
* @param participant Pointer to the participant sending data.
* @param internal_buffer true indicates this object to allocate its own buffer. false indicates to get a buffer
* @param max_blocking_time_point Future time point where blocking send should end.
* from the participant.
*/
RTPSMessageGroup(
RTPSParticipantImpl* participant,
bool internal_buffer = false);
bool internal_buffer = false,
std::chrono::steady_clock::time_point max_blocking_time_point =
std::chrono::steady_clock::now() + std::chrono::hours(24));

/**
* Basic constructor.
Expand Down Expand Up @@ -313,8 +316,6 @@ class RTPSMessageGroup

std::chrono::steady_clock::time_point max_blocking_time_point_;

bool max_blocking_time_is_set_ = false;

std::unique_ptr<RTPSMessageGroup_t> send_buffer_;

bool internal_buffer_ = false;
Expand Down
17 changes: 14 additions & 3 deletions include/fastdds/rtps/writer/LocatorSelectorSender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
#include <fastdds/rtps/common/LocatorSelector.hpp>
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <mutex>
#include <fastrtps/utils/TimedMutex.hpp>

namespace eprosima {
namespace fastrtps {
Expand Down Expand Up @@ -95,6 +94,18 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface
mutex_.unlock();
}

/*!
* Try to lock the object.
*
* This kind of object needs to be locked because could be used outside the writer's mutex.
*/
template <class Clock, class Duration>
bool try_lock_until(
const std::chrono::time_point<Clock, Duration>& abs_time)
{
return mutex_.try_lock_until(abs_time);
}

fastrtps::rtps::LocatorSelector locator_selector;

ResourceLimitedVector<GUID_t> all_remote_readers;
Expand All @@ -105,7 +116,7 @@ class LocatorSelectorSender : public RTPSMessageSenderInterface

RTPSWriter& writer_;

std::recursive_mutex mutex_;
RecursiveTimedMutex mutex_;
};

} // namespace rtps
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ class RTPSWriter
/**
* Add a change to the unsent list.
* @param change Pointer to the change to add.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
virtual void unsent_change_added_to_history(
CacheChange_t* change,
Expand All @@ -544,10 +544,12 @@ class RTPSWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
virtual bool change_removed_by_history(
CacheChange_t* a_change) = 0;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;

bool is_datasharing_compatible_with(
const ReaderProxyData& rdata) const;
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatefulPersistentWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class StatefulPersistentWriter : public StatefulWriter, private PersistentWriter
/**
* Add a specific change to all ReaderLocators.
* @param p Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* p,
Expand All @@ -92,10 +92,12 @@ class StatefulPersistentWriter : public StatefulWriter, private PersistentWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* a_change) override;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
};

} // namespace rtps
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class StatefulWriter : public RTPSWriter
/**
* Add a specific change to all ReaderLocators.
* @param p Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* p,
Expand All @@ -143,10 +143,12 @@ class StatefulWriter : public RTPSWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* a_change) override;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

/**
* Sends a change directly to a intraprocess reader.
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatelessPersistentWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class StatelessPersistentWriter : public StatelessWriter, private PersistentWrit
/**
* Add a specific change to all ReaderLocators.
* @param p Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* p,
Expand All @@ -83,10 +83,12 @@ class StatelessPersistentWriter : public StatelessWriter, private PersistentWrit
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param a_change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* a_change) override;
CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
};

} // namespace rtps
Expand Down
6 changes: 4 additions & 2 deletions include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class StatelessWriter : public RTPSWriter
/**
* Add a specific change to all ReaderLocators.
* @param change Pointer to the change.
* @param max_blocking_time
* @param[in] max_blocking_time Maximum time this method has to complete the task.
*/
void unsent_change_added_to_history(
CacheChange_t* change,
Expand All @@ -92,10 +92,12 @@ class StatelessWriter : public RTPSWriter
/**
* Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
* @param change Pointer to the change that is going to be removed.
* @param[in] max_blocking_time Maximum time this method has to complete the task.
* @return True if removed correctly.
*/
bool change_removed_by_history(
CacheChange_t* change) override;
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

/**
* Add a matched reader.
Expand Down
15 changes: 11 additions & 4 deletions include/fastrtps/attributes/LibrarySettingsAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,26 @@ class LibrarySettingsAttributes
{
public:

LibrarySettingsAttributes() {
LibrarySettingsAttributes()
{
}

virtual ~LibrarySettingsAttributes() {
virtual ~LibrarySettingsAttributes()
{
}

bool operator==(
bool operator ==(
const LibrarySettingsAttributes& b) const
{
return (intraprocess_delivery == b.intraprocess_delivery);
}

IntraprocessDeliveryType intraprocess_delivery = INTRAPROCESS_FULL;
IntraprocessDeliveryType intraprocess_delivery =
#if HAVE_STRICT_REALTIME
INTRAPROCESS_OFF;
#else
INTRAPROCESS_FULL;
#endif // if HAVE_STRICT_REALTIME
};

} // namespace fastrtps
Expand Down
8 changes: 6 additions & 2 deletions include/fastrtps/publisher/PublisherHistory.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ class PublisherHistory : public rtps::WriterHistory
bool remove_change_pub(
rtps::CacheChange_t* change);

virtual bool remove_change_g(
rtps::CacheChange_t* a_change);
bool remove_change_g(
rtps::CacheChange_t* a_change) override;

bool remove_change_g(
rtps::CacheChange_t* a_change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

bool remove_instance_changes(
const rtps::InstanceHandle_t& handle,
Expand Down
25 changes: 23 additions & 2 deletions include/fastrtps/utils/TimedConditionVariable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#endif // if HAVE_STRICT_REALTIME && defined(__unix__)

#include <mutex>
#include <condition_variable>
#include <chrono>
#include <functional>

Expand Down Expand Up @@ -115,6 +116,25 @@ class TimedConditionVariable
return ret_value;
}

template<typename Mutex>
std::cv_status wait_for(
std::unique_lock<Mutex>& lock,
const std::chrono::nanoseconds& max_blocking_time)
{
auto nsecs = max_blocking_time;
struct timespec max_wait = {
0, 0
};
clock_gettime(CLOCK_MONOTONIC, &max_wait);
nsecs = nsecs + std::chrono::nanoseconds(max_wait.tv_nsec);
auto secs = std::chrono::duration_cast<std::chrono::seconds>(nsecs);
nsecs -= secs;
max_wait.tv_sec += secs.count();
max_wait.tv_nsec = (long)nsecs.count();
return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(),
&max_wait) == 0) ? std::cv_status::no_timeout : std::cv_status::timeout;
}

template<typename Mutex>
bool wait_until(
std::unique_lock<Mutex>& lock,
Expand All @@ -137,7 +157,7 @@ class TimedConditionVariable
}

template<typename Mutex>
bool wait_until(
std::cv_status wait_until(
std::unique_lock<Mutex>& lock,
const std::chrono::steady_clock::time_point& max_blocking_time)
{
Expand All @@ -147,7 +167,8 @@ class TimedConditionVariable
struct timespec max_wait = {
secs.time_since_epoch().count(), ns.count()
};
return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(), &max_wait) == 0);
return (CV_TIMEDWAIT_(cv_, lock.mutex()->native_handle(),
&max_wait) == 0) ? std::cv_status::no_timeout : std::cv_status::timeout;
}

void notify_one()
Expand Down
Loading