Skip to content

Commit

Permalink
Fix handling of STRICT_REALTIME (#967)
Browse files Browse the repository at this point in the history
* Refs #7365. Solving issue.

* Refs #7365. Styling.
  • Loading branch information
MiguelCompany authored Jan 29, 2020
1 parent 9f77b5b commit 505ca4a
Showing 1 changed file with 57 additions and 39 deletions.
96 changes: 57 additions & 39 deletions src/cpp/publisher/PublisherHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ bool PublisherHistory::add_pub_change(
//NO KEY HISTORY
if(mp_pubImpl->getAttributes().topic.getTopicKind() == NO_KEY)
{
if(this->add_change_(change, wparams, max_blocking_time))
#if HAVE_STRICT_REALTIME
if (this->add_change_(change, wparams, max_blocking_time))
#else
if (this->add_change_(change, wparams))
#endif
{
returnedValue = true;
}
Expand All @@ -117,7 +121,7 @@ bool PublisherHistory::add_pub_change(
}
else
{
logWarning(RTPS_HISTORY,"Change not added due to maximum number of samples per instance"<<endl;);
logWarning(RTPS_HISTORY,"Change not added due to maximum number of samples per instance");
}
}
else if (m_historyQos.kind == KEEP_LAST_HISTORY_QOS)
Expand All @@ -138,7 +142,11 @@ bool PublisherHistory::add_pub_change(
if(add)
{
vit->second.cache_changes.push_back(change);
if(this->add_change_(change, wparams, max_blocking_time))
#if HAVE_STRICT_REALTIME
if (this->add_change_(change, wparams, max_blocking_time))
#else
if (this->add_change_(change, wparams))
#endif
{
logInfo(RTPS_HISTORY,this->mp_pubImpl->getGuid().entityId <<" Change "
<< change->sequenceNumber << " added with key: "<<change->instanceHandle
Expand Down Expand Up @@ -181,64 +189,70 @@ bool PublisherHistory::find_key(
return true;
}
}
logWarning(PUBLISHER, "History has reached the maximum number of instances" << endl;)
logWarning(PUBLISHER, "History has reached the maximum number of instances");
}
return false;
}


bool PublisherHistory::removeAllChange(size_t* removed)
bool PublisherHistory::removeAllChange(
size_t* removed)
{

size_t rem = 0;
std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);

while(m_changes.size()>0)
while (m_changes.size() > 0)
{
if(remove_change_pub(m_changes.front()))
if (remove_change_pub(m_changes.front()))
{
++rem;
}
else
{
break;
}
}
if(removed!=nullptr)
if (removed != nullptr)
{
*removed = rem;
}
if (rem>0)
if (rem > 0)
{
return true;
}
return false;
}


bool PublisherHistory::removeMinChange()
{
if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}

std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
if(m_changes.size()>0)
if (m_changes.size() > 0)
{
return remove_change_pub(m_changes.front());
}
return false;
}

bool PublisherHistory::remove_change_pub(CacheChange_t* change)
bool PublisherHistory::remove_change_pub(
CacheChange_t* change)
{

if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}

std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
if(mp_pubImpl->getAttributes().topic.getTopicKind() == NO_KEY)
if (mp_pubImpl->getAttributes().topic.getTopicKind() == NO_KEY)
{
if(this->remove_change(change))
if (this->remove_change(change))
{
m_isHistoryFull = false;
return true;
Expand All @@ -249,29 +263,30 @@ bool PublisherHistory::remove_change_pub(CacheChange_t* change)
else
{
t_m_Inst_Caches::iterator vit;
if(!this->find_key(change,&vit))
if (!this->find_key(change, &vit))
{
return false;
}

for(auto chit = vit->second.cache_changes.begin(); chit!= vit->second.cache_changes.end(); ++chit)
for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
{
if( ((*chit)->sequenceNumber == change->sequenceNumber) && ((*chit)->writerGUID == change->writerGUID) )
if ( ((*chit)->sequenceNumber == change->sequenceNumber) && ((*chit)->writerGUID == change->writerGUID) )
{
if(remove_change(change))
if (remove_change(change))
{
vit->second.cache_changes.erase(chit);
m_isHistoryFull = false;
return true;
}
}
}
logError(PUBLISHER,"Change not found, something is wrong");
logError(PUBLISHER, "Change not found, something is wrong");
}
return false;
}

bool PublisherHistory::remove_change_g(CacheChange_t* a_change)
bool PublisherHistory::remove_change_g(
CacheChange_t* a_change)
{
return remove_change_pub(a_change);
}
Expand All @@ -280,9 +295,9 @@ bool PublisherHistory::set_next_deadline(
const InstanceHandle_t& handle,
const std::chrono::steady_clock::time_point& next_deadline_us)
{
if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}
std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
Expand All @@ -292,7 +307,7 @@ bool PublisherHistory::set_next_deadline(
next_deadline_us_ = next_deadline_us;
return true;
}
else if(mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
else if (mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
{
if (keyed_changes_.find(handle) == keyed_changes_.end())
{
Expand All @@ -307,24 +322,27 @@ bool PublisherHistory::set_next_deadline(
}

bool PublisherHistory::get_next_deadline(
InstanceHandle_t &handle,
std::chrono::steady_clock::time_point &next_deadline_us)
InstanceHandle_t& handle,
std::chrono::steady_clock::time_point& next_deadline_us)
{
if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}
std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);

if(mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
if (mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
{
auto min = std::min_element(keyed_changes_.begin(),
keyed_changes_.end(),
[](
const std::pair<InstanceHandle_t, KeyedChanges> &lhs,
const std::pair<InstanceHandle_t, KeyedChanges> &rhs)
{ return lhs.second.next_deadline_us < rhs.second.next_deadline_us; });
auto min = std::min_element(
keyed_changes_.begin(),
keyed_changes_.end(),
[](
const std::pair<InstanceHandle_t, KeyedChanges>& lhs,
const std::pair<InstanceHandle_t, KeyedChanges>& rhs)
{
return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
});

handle = min->first;
next_deadline_us = min->second.next_deadline_us;
Expand Down

0 comments on commit 505ca4a

Please sign in to comment.