Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First features for safety critical systems #486

Merged
merged 27 commits into from
Apr 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
dbc32c2
Fixed size strings [4216] (#361)
MiguelCompany Dec 26, 2018
598c37b
New Allocation test [4259] (#364)
JuanCarlos-Arce Jan 16, 2019
cd29fc1
Fixed size bitmaps [4320] (#370)
MiguelCompany Jan 16, 2019
a029bae
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Jan 17, 2019
430f352
Remove ParameterList_t [4370] (#379)
MiguelCompany Jan 22, 2019
25a5dd1
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Jan 23, 2019
660f601
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Jan 30, 2019
3ff2d32
Refs #4370. Fixing errors compiling on Debug.
MiguelCompany Jan 30, 2019
181b507
Mutex wrapper with testing properties [4405] (#383)
richiware Feb 4, 2019
a730633
Resource limited vector [4421] (#386)
MiguelCompany Feb 11, 2019
95b4fa3
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Feb 11, 2019
2bf0a7f
Writers: Resource limits on the number of matched readers [4570] (#406)
MiguelCompany Feb 13, 2019
c5f89a4
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Feb 15, 2019
15bea84
Refs #4704. Using matchedSubscribersAllocation on allocations test. (…
MiguelCompany Feb 20, 2019
20d6d6e
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Feb 20, 2019
99764dd
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Feb 26, 2019
de9e382
Merge remote-tracking branch 'origin/develop' into feature/safety-cri…
richiware Mar 5, 2019
fb301dc
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Mar 12, 2019
edd523e
Merge remote-tracking branch 'origin/develop' into feature/safety-cri…
richiware Mar 15, 2019
3af1e02
Merge remote-tracking branch 'origin/develop' into feature/safety-cri…
richiware Mar 27, 2019
74e6ab1
Merge branch 'develop' into feature/safety-critical-systems
MiguelCompany Apr 1, 2019
c592937
WIP: Sync write converted to non-blocking call (#426)
richiware Apr 10, 2019
30d4d25
Refs #5094 Added ReaderProxyTests (find_change related tests).
Apr 10, 2019
9d0c4c6
Refs #5094 Fixed find_change related bug.
Apr 10, 2019
2e5762c
Refs #5107. Fixing windows compile error.
richiware Apr 11, 2019
52e28a9
Fixing non deterministic tests.
richiware Apr 11, 2019
072cfca
Fixed a warning on windows.
richiware Apr 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'develop' into feature/safety-critical-systems
  • Loading branch information
MiguelCompany committed Feb 15, 2019
commit c5f89a437162984f5a0b0b8338c552c962838737
33 changes: 24 additions & 9 deletions include/fastrtps/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ namespace fastrtps {
namespace rtps {

class StatefulWriter;
class NackResponseDelay;
class NackSupressionDuration;

/**
Expand Down Expand Up @@ -256,6 +255,27 @@ class ReaderProxy
return reader_attributes_.endpoint.remoteLocatorList;
}

/**
* Get the locators that should be used to send data to the reader represented by this proxy.
* @return the locators that should be used to send data to the reader represented by this proxy.
*/
inline const LocatorList_t& remote_locators_shrinked() const
{
return reader_attributes_.endpoint.unicastLocatorList.empty() ?
reader_attributes_.endpoint.multicastLocatorList :
reader_attributes_.endpoint.unicastLocatorList;
}

/**
* Get the GUID of the reader represented to this proxy as a const reference to a vector
* of GUID_t object containing just that single GUID. It is used as a temporary workaround
* before the API of RTPSMessageGroup is changed.
*/
inline const std::vector<GUID_t>& guid_as_vector() const
{
return guid_as_vector_;
}

/**
* Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK.
* @param acknack_count The count of the received ACKNACK.
Expand All @@ -278,6 +298,7 @@ class ReaderProxy
* @param nack_count Counter field of the submessage.
* @param seq_num Sequence number field of the submessage.
* @param fragments_state Bitmap indicating the requested fragments.
* @return true if a change was modified, false otherwise.
*/
bool process_nack_frag(
const GUID_t& reader_guid,
Expand All @@ -304,12 +325,6 @@ class ReaderProxy
return changes_low_mark_;
}

/**
* Change the interval of nack-response event.
* @param interval Time from acknack reception to data sending.
*/
void update_nack_response_interval(const Duration_t& interval);

/**
* Change the interval of nack-supression event.
* @param interval Time from data sending to acknack processing.
Expand All @@ -324,10 +339,10 @@ class ReaderProxy
RemoteReaderAttributes reader_attributes_;
//!Pointer to the associated StatefulWriter.
StatefulWriter* writer_;
//!To fool RTPSMessageGroup when using this proxy as single destination
ResourceLimitedVector<GUID_t> guid_as_vector_;
//!Set of the changes and its state.
ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
//! Timed Event to manage the Acknack response delay.
std::shared_ptr<NackResponseDelay> nack_response_event_;
//! Timed Event to manage the delay to mark a change as UNACKED after sending it.
std::shared_ptr<NackSupressionDuration> nack_supression_event_;
//! Are timed events enabled?
Expand Down
5 changes: 4 additions & 1 deletion include/fastrtps/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace fastrtps {
namespace rtps {

class ReaderProxy;
class NackResponseDelay;

/**
* Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.
Expand Down Expand Up @@ -82,6 +83,8 @@ class StatefulWriter : public RTPSWriter
std::mutex may_remove_change_mutex_;
std::condition_variable may_remove_change_cond_;
unsigned int may_remove_change_;
//! Timed Event to manage the Acknack response delay.
NackResponseDelay* nack_response_event_;

public:

Expand Down Expand Up @@ -208,7 +211,7 @@ class StatefulWriter : public RTPSWriter
*/
void send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final = false);

void perform_nack_response(const GUID_t& reader_guid);
void perform_nack_response();

void perform_nack_supression(const GUID_t& reader_guid);

Expand Down
9 changes: 0 additions & 9 deletions include/fastrtps/rtps/writer/timedevent/NackResponseDelay.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class NackResponseDelay : public TimedEvent
* Construct a NackResponseDelay event.
*
* @param writer Pointer to the StatefulWriter creating this event
* @param reader_guid GUID of the related reader proxy
* @param interval_in_ms Event interval in miliseconds
*/
NackResponseDelay(
Expand All @@ -61,18 +60,10 @@ class NackResponseDelay : public TimedEvent
EventCode code,
const char* msg = nullptr) override;

inline void reader_guid(const GUID_t guid)
{
reader_guid_ = guid;
}

private:

//! Associated stateful writer
StatefulWriter* writer_;

//! GUID of the reader proxy
GUID_t reader_guid_;
};

} /* namespace rtps */
Expand Down
6 changes: 1 addition & 5 deletions src/cpp/rtps/reader/timedevent/HeartbeatResponseDelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ void HeartbeatResponseDelay::event(
mp_WP->mp_SFR->m_acknackCount++;
logInfo(RTPS_READER,"Sending ACKNACK: "<< sns;);

bool final = false;
if(sns.empty())
final = true;
}

bool final = sns.empty();
group.add_acknack(m_remote_endpoints, sns, mp_WP->mp_SFR->m_acknackCount, final, m_destination_locators);
}

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/RTPSWriterCollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RTPSWriterCollector
{
if(change->getFragmentSize() > 0)
{
optionalFragmentsNotSent.for_each([this, change, remoteReader](auto sn)
optionalFragmentsNotSent.for_each([this, change, remoteReader](FragmentNumber_t sn)
{
assert(sn <= change->getDataFragments()->size());
auto it = mItems_.emplace(change->sequenceNumber, sn, change);
Expand Down
36 changes: 9 additions & 27 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <fastrtps/rtps/resources/AsyncWriterThread.h>
#include <fastrtps/rtps/writer/ReaderProxy.h>
#include <fastrtps/rtps/writer/StatefulWriter.h>
#include <fastrtps/rtps/writer/timedevent/NackResponseDelay.h>
#include <fastrtps/rtps/writer/timedevent/NackSupressionDuration.h>
#include <fastrtps/utils/TimeConversion.h>

Expand All @@ -43,15 +42,13 @@ ReaderProxy::ReaderProxy(
: is_active_(false)
, reader_attributes_()
, writer_(writer)
, guid_as_vector_(ResourceLimitedContainerConfig::fixed_size_configuration(1u))
, changes_for_reader_(resource_limits_from_history(writer->mp_history->m_att, 0))
, nack_response_event_(nullptr)
, nack_supression_event_(nullptr)
, timers_enabled_(false)
, last_acknack_count_(0)
, last_nackfrag_count_(0)
{
nack_response_event_ = std::make_shared<NackResponseDelay>(writer_,
TimeConv::Time_t2MilliSecondsDouble(times.nackResponseDelay));
nack_supression_event_ = std::make_shared <NackSupressionDuration>(writer_,
TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration));

Expand All @@ -66,11 +63,11 @@ void ReaderProxy::start(const RemoteReaderAttributes& reader_attributes)
{
is_active_ = true;
reader_attributes_ = reader_attributes;
guid_as_vector_.push_back(reader_attributes_.guid);

reader_attributes_.endpoint.remoteLocatorList.assign(reader_attributes_.endpoint.unicastLocatorList);
reader_attributes_.endpoint.remoteLocatorList.push_back(reader_attributes_.endpoint.multicastLocatorList);

nack_response_event_->reader_guid(reader_attributes_.guid);
nack_supression_event_->reader_guid(reader_attributes_.guid);
timers_enabled_ = (reader_attributes_.endpoint.reliabilityKind == RELIABLE);

Expand All @@ -87,22 +84,17 @@ void ReaderProxy::stop()
last_acknack_count_ = 0;
last_nackfrag_count_ = 0;
changes_low_mark_ = SequenceNumber_t();
guid_as_vector_.clear();
}

void ReaderProxy::disable_timers()
{
if (timers_enabled_.exchange(false))
{
nack_response_event_->cancel_timer();
nack_supression_event_->cancel_timer();
}
}

void ReaderProxy::update_nack_response_interval(const Duration_t& interval)
{
nack_response_event_->update_interval(interval);
}

void ReaderProxy::update_nack_supression_interval(const Duration_t& interval)
{
nack_supression_event_->update_interval(interval);
Expand Down Expand Up @@ -226,24 +218,17 @@ bool ReaderProxy::requested_changes_set(const SequenceNumberSet_t& seq_num_set)
seq_num_set.for_each([&](SequenceNumber_t sit)
{
ChangeIterator chit = find_change(sit);
if (chit != changes_for_reader_.end())
if (chit != changes_for_reader_.end() && UNACKNOWLEDGED == chit->getStatus())
{
if (chit->getStatus() == UNACKNOWLEDGED)
{
chit->setStatus(REQUESTED);
chit->markAllFragmentsAsUnsent();
isSomeoneWasSetRequested = true;
}
chit->setStatus(REQUESTED);
chit->markAllFragmentsAsUnsent();
isSomeoneWasSetRequested = true;
}
});

if (isSomeoneWasSetRequested)
{
logInfo(RTPS_WRITER, "Requested Changes: " << seq_num_set);
if (timers_enabled_)
{
nack_response_event_->restart_timer();
}
}
else if (!seq_num_set.empty())
{
Expand Down Expand Up @@ -327,7 +312,6 @@ bool ReaderProxy::mark_fragment_as_sent_for_change(
}

return change_found;

}

bool ReaderProxy::perform_nack_supression()
Expand All @@ -344,7 +328,7 @@ bool ReaderProxy::convert_status_on_all_changes(
ChangeForReaderStatus_t previous,
ChangeForReaderStatus_t next)
{
assert(previous != next);
assert(previous > next);

// NOTE: This is only called for REQUESTED=>UNSENT (acknack response) or
// UNDERWAY=>UNACKNOWLEDGED (nack supression)
Expand Down Expand Up @@ -424,11 +408,9 @@ bool ReaderProxy::process_nack_frag(
// TODO Not doing Acknowledged.
if (requested_fragment_set(seq_num, fragments_state))
{
nack_response_event_->restart_timer();
return true;
}
}

return true;
}

return false;
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.