Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed error sending more than 256 fragments <1.9.x> [7403] #973

Merged
merged 9 commits into from
Jan 24, 2020
2 changes: 1 addition & 1 deletion include/fastrtps/rtps/writer/StatelessWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class StatelessWriter : public RTPSWriter
bool set_fixed_locators(
const LocatorList_t& locator_list);

void update_unsent_changes(
bool update_unsent_changes(
const SequenceNumber_t& seq_num,
const FragmentNumber_t& frag_num);

Expand Down
159 changes: 82 additions & 77 deletions src/cpp/publisher/PublisherImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ PublisherImpl::PublisherImpl(
, m_history(this, pdatatype->m_typeSize
#if HAVE_SECURITY
// In future v2 changepool is in writer, and writer set this value to cachechagepool.
+ 20 /*SecureDataHeader*/ + 4 + ((2* 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/
+ 20 /*SecureDataHeader*/ + 4 + ((2 * 16) /*EVP_MAX_IV_LENGTH max block size*/ - 1 ) /* SecureDataBodey*/
+ 16 + 4 /*SecureDataTag*/
#endif
, att.topic.historyQos, att.topic.resourceLimitsQos, att.historyMemoryPolicy)
Expand All @@ -76,26 +76,26 @@ PublisherImpl::PublisherImpl(
, lifespan_duration_us_(m_att.qos.m_lifespan.duration.to_ns() * 1e-3)
{
deadline_timer_ = new TimedEvent(mp_participant->get_resource_event(),
[&]() -> bool
{
return deadline_missed();
},
att.qos.m_deadline.period.to_ns() * 1e-6);
[&]() -> bool
{
return deadline_missed();
},
att.qos.m_deadline.period.to_ns() * 1e-6);

lifespan_timer_ = new TimedEvent(mp_participant->get_resource_event(),
[&]() -> bool
{
return lifespan_expired();
},
m_att.qos.m_lifespan.duration.to_ns() * 1e-6);
[&]() -> bool
{
return lifespan_expired();
},
m_att.qos.m_lifespan.duration.to_ns() * 1e-6);
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
}

PublisherImpl::~PublisherImpl()
{
delete(lifespan_timer_);
delete(deadline_timer_);

if(mp_writer != nullptr)
if (mp_writer != nullptr)
{
logInfo(PUBLISHER, this->getGuid().entityId << " in topic: " << this->m_att.topic.topicName);
}
Expand All @@ -104,8 +104,6 @@ PublisherImpl::~PublisherImpl()
delete(this->mp_userPublisher);
}



bool PublisherImpl::create_new_change(
ChangeKind_t changeKind,
void* data)
Expand All @@ -127,79 +125,80 @@ bool PublisherImpl::create_new_change_with_params(
return false;
}

if(changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED ||
if (changeKind == NOT_ALIVE_UNREGISTERED || changeKind == NOT_ALIVE_DISPOSED ||
changeKind == NOT_ALIVE_DISPOSED_UNREGISTERED)
{
if(m_att.topic.topicKind == NO_KEY)
if (m_att.topic.topicKind == NO_KEY)
{
logError(PUBLISHER,"Topic is NO_KEY, operation not permitted");
logError(PUBLISHER, "Topic is NO_KEY, operation not permitted");
return false;
}
}

InstanceHandle_t handle;
if(m_att.topic.topicKind == WITH_KEY)
if (m_att.topic.topicKind == WITH_KEY)
{
bool is_key_protected = false;
#if HAVE_SECURITY
is_key_protected = mp_writer->getAttributes().security_attributes().is_key_protected;
#endif
mp_type->getKey(data,&handle,is_key_protected);
mp_type->getKey(data, &handle, is_key_protected);
}

// Block lowlevel writer
auto max_blocking_time = std::chrono::steady_clock::now() +
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time));
std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time));
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved

#if HAVE_STRICT_REALTIME
std::unique_lock<RecursiveTimedMutex> lock(mp_writer->getMutex(), std::defer_lock);
if(lock.try_lock_until(max_blocking_time))
if (lock.try_lock_until(max_blocking_time))
#else
std::unique_lock<RecursiveTimedMutex> lock(mp_writer->getMutex());
#endif
{
CacheChange_t* ch = mp_writer->new_change(mp_type->getSerializedSizeProvider(data), changeKind, handle);
if(ch != nullptr)
if (ch != nullptr)
{
if(changeKind == ALIVE)
if (changeKind == ALIVE)
{
//If these two checks are correct, we asume the cachechange is valid and thwn we can write to it.
if(!mp_type->serialize(data, &ch->serializedPayload))
if (!mp_type->serialize(data, &ch->serializedPayload))
{
logWarning(RTPS_WRITER,"RTPSWriter:Serialization returns false";);
logWarning(RTPS_WRITER, "RTPSWriter:Serialization returns false"; );
m_history.release_Cache(ch);
return false;
}
}

//TODO(Ricardo) This logic in a class. Then a user of rtps layer can use it.
if(high_mark_for_frag_ == 0)
if (high_mark_for_frag_ == 0)
{
uint32_t max_data_size = mp_writer->getMaxDataSize();
uint32_t writer_throughput_controller_bytes =
mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod);
mp_writer->calculateMaxDataSize(m_att.throughputController.bytesPerPeriod);
uint32_t participant_throughput_controller_bytes =
mp_writer->calculateMaxDataSize(
mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod);
mp_writer->calculateMaxDataSize(
mp_rtpsParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod);
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved

high_mark_for_frag_ =
max_data_size > writer_throughput_controller_bytes ?
writer_throughput_controller_bytes :
(max_data_size > participant_throughput_controller_bytes ?
participant_throughput_controller_bytes :
max_data_size);
max_data_size > writer_throughput_controller_bytes ?
writer_throughput_controller_bytes :
(max_data_size > participant_throughput_controller_bytes ?
participant_throughput_controller_bytes :
max_data_size);
high_mark_for_frag_ &= ~3;
}

uint32_t final_high_mark_for_frag = high_mark_for_frag_;

// If needed inlineqos for related_sample_identity, then remove the inlinqos size from final fragment size.
if(wparams.related_sample_identity() != SampleIdentity::unknown())
if (wparams.related_sample_identity() != SampleIdentity::unknown())
{
final_high_mark_for_frag -= 32;
}

// If it is big data, fragment it.
if(ch->serializedPayload.length > final_high_mark_for_frag)
if (ch->serializedPayload.length > final_high_mark_for_frag)
{
/// Fragment the data.
// Set the fragment size to the cachechange.
Expand All @@ -208,7 +207,7 @@ bool PublisherImpl::create_new_change_with_params(
}

InstanceHandle_t change_handle = ch->instanceHandle;
if(!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time))
if (!this->m_history.add_pub_change(ch, wparams, lock, max_blocking_time))
{
m_history.release_Cache(ch);
return false;
Expand Down Expand Up @@ -237,7 +236,8 @@ bool PublisherImpl::create_new_change_with_params(

if (m_att.qos.m_lifespan.duration != c_TimeInfinite)
{
lifespan_duration_us_ = std::chrono::duration<double, std::ratio<1, 1000000>>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3);
lifespan_duration_us_ = std::chrono::duration<double, std::ratio<1, 1000000> >(
m_att.qos.m_lifespan.duration.to_ns() * 1e-3);
lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6);
lifespan_timer_->restart_timer();
}
Expand All @@ -249,13 +249,13 @@ bool PublisherImpl::create_new_change_with_params(
return false;
}


bool PublisherImpl::removeMinSeqChange()
{
return m_history.removeMinChange();
}

bool PublisherImpl::removeAllChange(size_t* removed)
bool PublisherImpl::removeAllChange(
size_t* removed)
{
return m_history.removeAllChange(removed);
}
Expand All @@ -264,84 +264,86 @@ const GUID_t& PublisherImpl::getGuid()
{
return mp_writer->getGuid();
}

//
bool PublisherImpl::updateAttributes(const PublisherAttributes& att)
bool PublisherImpl::updateAttributes(
const PublisherAttributes& att)
{
bool updated = true;
bool missing = false;
if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS)
if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS)
{
if(att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() ||
if (att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() ||
att.multicastLocatorList.size() != this->m_att.multicastLocatorList.size())
{
logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version");
logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version");
updated &= false;
}
else
{
for(LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin();
lit1!=this->m_att.unicastLocatorList.end();++lit1)
for (LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin();
lit1 != this->m_att.unicastLocatorList.end(); ++lit1)
{
missing = true;
for(LocatorListConstIterator lit2 = att.unicastLocatorList.begin();
lit2!= att.unicastLocatorList.end();++lit2)
for (LocatorListConstIterator lit2 = att.unicastLocatorList.begin();
lit2 != att.unicastLocatorList.end(); ++lit2)
{
if(*lit1 == *lit2)
if (*lit1 == *lit2)
{
missing = false;
break;
}
}
if(missing)
if (missing)
{
logWarning(PUBLISHER,"Locator: "<< *lit1 << " not present in new list");
logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version");
logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list");
logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version");
}
}
for(LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin();
lit1!=this->m_att.multicastLocatorList.end();++lit1)
for (LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin();
lit1 != this->m_att.multicastLocatorList.end(); ++lit1)
{
missing = true;
for(LocatorListConstIterator lit2 = att.multicastLocatorList.begin();
lit2!= att.multicastLocatorList.end();++lit2)
for (LocatorListConstIterator lit2 = att.multicastLocatorList.begin();
lit2 != att.multicastLocatorList.end(); ++lit2)
{
if(*lit1 == *lit2)
if (*lit1 == *lit2)
{
missing = false;
break;
}
}
if(missing)
if (missing)
{
logWarning(PUBLISHER,"Locator: "<< *lit1<< " not present in new list");
logWarning(PUBLISHER,"Locator Lists cannot be changed or updated in this version");
logWarning(PUBLISHER, "Locator: " << *lit1 << " not present in new list");
logWarning(PUBLISHER, "Locator Lists cannot be changed or updated in this version");
}
}
}
}

//TOPIC ATTRIBUTES
if(this->m_att.topic != att.topic)
if (this->m_att.topic != att.topic)
{
logWarning(PUBLISHER,"Topic Attributes cannot be updated");
logWarning(PUBLISHER, "Topic Attributes cannot be updated");
updated &= false;
}
//QOS:
//CHECK IF THE QOS CAN BE SET
if(!this->m_att.qos.canQosBeUpdated(att.qos))
if (!this->m_att.qos.canQosBeUpdated(att.qos))
{
updated &=false;
updated &= false;
}
if(updated)
if (updated)
{
if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS)
if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS)
{
//UPDATE TIMES:
StatefulWriter* sfw = (StatefulWriter*)mp_writer;
sfw->updateTimes(att.times);
}

this->m_att.qos.setQos(att.qos,false);
this->m_att.qos.setQos(att.qos, false);
this->m_att = att;
//Notify the participant that a Writer has changed its QOS
mp_rtpsParticipant->updateWriter(this->mp_writer, m_att.topic, m_att.qos);
Expand All @@ -351,7 +353,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att)
if (m_att.qos.m_deadline.period != c_TimeInfinite)
{
deadline_duration_us_ =
duration<double, std::ratio<1, 1000000>>(m_att.qos.m_deadline.period.to_ns() * 1e-3);
duration<double, std::ratio<1, 1000000> >(m_att.qos.m_deadline.period.to_ns() * 1e-3);
deadline_timer_->update_interval_millisec(m_att.qos.m_deadline.period.to_ns() * 1e-6);
}
else
Expand All @@ -364,7 +366,7 @@ bool PublisherImpl::updateAttributes(const PublisherAttributes& att)
if (m_att.qos.m_lifespan.duration != c_TimeInfinite)
{
lifespan_duration_us_ =
duration<double, std::ratio<1, 1000000>>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3);
duration<double, std::ratio<1, 1000000> >(m_att.qos.m_lifespan.duration.to_ns() * 1e-3);
lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6);
}
else
Expand All @@ -380,7 +382,7 @@ void PublisherImpl::PublisherWriterListener::onWriterMatched(
RTPSWriter* /*writer*/,
MatchingInfo& info)
{
if( mp_publisherImpl->mp_listener != nullptr )
if ( mp_publisherImpl->mp_listener != nullptr )
{
mp_publisherImpl->mp_listener->onPublicationMatched(mp_publisherImpl->mp_userPublisher, info);
}
Expand All @@ -397,20 +399,21 @@ void PublisherImpl::PublisherWriterListener::onWriterChangeReceivedByAll(
}

void PublisherImpl::PublisherWriterListener::on_liveliness_lost(
RTPSWriter *writer,
const LivelinessLostStatus &status)
RTPSWriter* writer,
const LivelinessLostStatus& status)
{
(void)writer;

if (mp_publisherImpl->mp_listener != nullptr)
{
mp_publisherImpl->mp_listener->on_liveliness_lost(
mp_publisherImpl->mp_userPublisher,
status);
mp_publisherImpl->mp_userPublisher,
status);
}
}

bool PublisherImpl::wait_for_all_acked(const eprosima::fastrtps::Time_t& max_wait)
bool PublisherImpl::wait_for_all_acked(
const eprosima::fastrtps::Time_t& max_wait)
{
return mp_writer->wait_for_all_acked(max_wait);
}
Expand Down Expand Up @@ -457,7 +460,8 @@ bool PublisherImpl::deadline_missed()
return deadline_timer_reschedule();
}

void PublisherImpl::get_offered_deadline_missed_status(OfferedDeadlineMissedStatus &status)
void PublisherImpl::get_offered_deadline_missed_status(
OfferedDeadlineMissedStatus& status)
{
std::unique_lock<RecursiveTimedMutex> lock(mp_writer->getMutex());

Expand Down Expand Up @@ -506,7 +510,8 @@ bool PublisherImpl::lifespan_expired()
return true;
}

void PublisherImpl::get_liveliness_lost_status(LivelinessLostStatus &status)
void PublisherImpl::get_liveliness_lost_status(
LivelinessLostStatus& status)
{
std::unique_lock<RecursiveTimedMutex> lock(mp_writer->getMutex());

Expand Down
Loading