Skip to content

Commit

Permalink
Refs #11831. Apply suggestions on StatelessWriter.
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
  • Loading branch information
richiware committed Jul 16, 2021
1 parent 5aad481 commit 94f4072
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 18 deletions.
2 changes: 1 addition & 1 deletion include/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ class RTPSWriter
* be a member of this RTPSWriter object.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
virtual bool send(
virtual bool send_nts(
CDRMessage_t* message,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const;
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class StatelessWriter : public RTPSWriter
* @param message Pointer to the buffer with the message already serialized.
* @param max_blocking_time_point Future timepoint where blocking send should end.
*/
bool send(
bool send_nts(
CDRMessage_t* message,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const override;
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/flowcontrol/FlowControllerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ class FlowControllerImpl : public FlowController
/*!
* This function tries to send the sample synchronously.
* That is, it uses the user's thread, which is the one calling this function, to send the sample.
* It calls new function `RTPSWriter::deliver_sample()` for sending the sample.
* It calls new function `RTPSWriter::deliver_sample_nts()` for sending the sample.
* If this function fails (for example because non-blocking socket is full), this function stores internally the sample to
* try sending it again asynchronously.
*/
Expand Down
2 changes: 2 additions & 0 deletions src/cpp/rtps/messages/RTPSMessageGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ void RTPSMessageGroup::send()

if (full_msg_->length > RTPSMESSAGE_HEADER_SIZE)
{
std::unique_lock<RecursiveTimedMutex> lock(endpoint_->getMutex());

#if HAVE_SECURITY
// TODO(Ricardo) Control message size if it will be encrypted.
if (participant_->security_attributes().is_rtps_protected && endpoint_->supports_rtps_protection())
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/writer/LocatorSelectorSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ bool LocatorSelectorSender::send(
CDRMessage_t* message,
std::chrono::steady_clock::time_point max_blocking_time_point) const
{
return writer.send(message, *this, max_blocking_time_point);
return writer.send_nts(message, *this, max_blocking_time_point);
}

} // namespace rtps
Expand Down
3 changes: 1 addition & 2 deletions src/cpp/rtps/writer/RTPSWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,11 @@ bool RTPSWriter::is_pool_initialized() const
return true;
}

bool RTPSWriter::send(
bool RTPSWriter::send_nts(
CDRMessage_t* message,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const
{
std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
RTPSParticipantImpl* participant = getRTPSParticipant();

return locator_selector.locator_selector.selected_size() == 0 ||
Expand Down
24 changes: 13 additions & 11 deletions src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ bool StatelessWriter::change_removed_by_history(
const uint64_t sequence_number = change->sequenceNumber.to64long();
if (sequence_number > last_sequence_number_sent_)
{
unsent_changes_cond_.notify_one();
unsent_changes_cond_.notify_all();
}

return true;
Expand All @@ -382,6 +382,7 @@ bool StatelessWriter::change_removed_by_history(
bool StatelessWriter::is_acked_by_all(
const CacheChange_t* change) const
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
return change->sequenceNumber.to64long() >= last_sequence_number_sent_;
}

Expand Down Expand Up @@ -521,7 +522,6 @@ bool StatelessWriter::matched_reader_remove(
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);

//TODO Marcar para flushear
if (locator_selector_.locator_selector.remove_entry(reader_guid))
{
std::unique_ptr<ReaderLocator> reader;
Expand Down Expand Up @@ -592,20 +592,19 @@ bool StatelessWriter::matched_reader_is_matched(
void StatelessWriter::unsent_changes_reset()
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
// TODO improve flowcontroller with iterators.
std::for_each(mp_history->changesBegin(), mp_history->changesEnd(), [&](CacheChange_t* change)
{
flow_controller_->add_new_sample(this, change,
std::chrono::steady_clock::now() + std::chrono::hours(24));
});
}

bool StatelessWriter::send(
bool StatelessWriter::send_nts(
CDRMessage_t* message,
const LocatorSelectorSender& locator_selector,
std::chrono::steady_clock::time_point& max_blocking_time_point) const
{
if (!RTPSWriter::send(message, locator_selector, max_blocking_time_point))
if (!RTPSWriter::send_nts(message, locator_selector, max_blocking_time_point))
{
return false;
}
Expand Down Expand Up @@ -633,11 +632,14 @@ DeliveryRetCode StatelessWriter::deliver_sample_nts(
}

// Send to interprocess readers the new sample.
for_matched_readers(matched_local_readers_, [&, cache_change](ReaderLocator& reader)
{
intraprocess_delivery(cache_change, reader);
return false;
});
if (0 == current_fragment_sent_)
{
for_matched_readers(matched_local_readers_, [&, cache_change](ReaderLocator& reader)
{
intraprocess_delivery(cache_change, reader);
return false;
});
}

try
{
Expand Down Expand Up @@ -753,7 +755,7 @@ DeliveryRetCode StatelessWriter::deliver_sample_nts(
{
// This update must be done before calling the callback.
last_sequence_number_sent_ = change_sequence_number;
unsent_changes_cond_.notify_one();
unsent_changes_cond_.notify_all();

if (nullptr != mp_listener)
{
Expand Down
2 changes: 1 addition & 1 deletion test/mock/rtps/RTPSWriter/fastdds/rtps/writer/RTPSWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class RTPSWriter : public Endpoint
LocatorSelectorSender&,
const std::chrono::time_point<std::chrono::steady_clock>&));

MOCK_METHOD3(send, bool(
MOCK_METHOD3(send_nts, bool(
CDRMessage_t*,
const LocatorSelectorSender&,
std::chrono::steady_clock::time_point&));
Expand Down

0 comments on commit 94f4072

Please sign in to comment.