Skip to content

Commit

Permalink
Refs #7399. Fixed error sending fragments.
Browse files Browse the repository at this point in the history
  • Loading branch information
richiware committed Jan 23, 2020
1 parent ba934b2 commit 4f0243b
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 3 deletions.
2 changes: 1 addition & 1 deletion include/fastdds/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class StatelessWriter : public RTPSWriter
bool set_fixed_locators(
const LocatorList_t& locator_list);

void update_unsent_changes(
bool update_unsent_changes(
const SequenceNumber_t& seq_num,
const FragmentNumber_t& frag_num);

Expand Down
1 change: 1 addition & 0 deletions src/cpp/dds/topic/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ bool DataWriterImpl::perform_create_new_change(
(max_data_size > participant_throughput_controller_bytes ?
participant_throughput_controller_bytes :
max_data_size);
high_mark_for_frag_ &= ~3;
}

uint32_t final_high_mark_for_frag = high_mark_for_frag_;
Expand Down
1 change: 1 addition & 0 deletions src/cpp/fastrtps_deprecated/publisher/PublisherImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ bool PublisherImpl::create_new_change_with_params(
(max_data_size > participant_throughput_controller_bytes ?
participant_throughput_controller_bytes :
max_data_size);
high_mark_for_frag_ &= ~3;
}

uint32_t final_high_mark_for_frag = high_mark_for_frag_;
Expand Down
14 changes: 12 additions & 2 deletions src/cpp/rtps/writer/StatelessWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,11 @@ bool StatelessWriter::is_acked_by_all(
return true;
}

void StatelessWriter::update_unsent_changes(
bool StatelessWriter::update_unsent_changes(
const SequenceNumber_t& seq_num,
const FragmentNumber_t& frag_num)
{
bool has_to_wake_async_thread = false;
auto find_by_seq_num = [seq_num](const ChangeForReader_t& unsent_change)
{
return seq_num == unsent_change.getSequenceNumber();
Expand All @@ -279,7 +280,13 @@ void StatelessWriter::update_unsent_changes(
{
unsent_changes_.remove_if(find_by_seq_num);
}
else
{
has_to_wake_async_thread = true;
}
}

return has_to_wake_async_thread;
}

void StatelessWriter::send_any_unsent_changes()
Expand Down Expand Up @@ -317,7 +324,10 @@ void StatelessWriter::send_any_unsent_changes()

// Remove the messages selected for sending from the original list,
// and update those that were fragmented with the new sent index
update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber);
if (update_unsent_changes(changeToSend.sequenceNumber, changeToSend.fragmentNumber))
{
mp_RTPSParticipant->async_thread().wake_up(this);
}

// Notify the controllers
FlowController::NotifyControllersChangeSent(changeToSend.cacheChange);
Expand Down

0 comments on commit 4f0243b

Please sign in to comment.