Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling of STRICT_REALTIME <1.9.x>[7368] #967

Merged
merged 2 commits into from
Jan 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 57 additions & 39 deletions src/cpp/publisher/PublisherHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ bool PublisherHistory::add_pub_change(
//NO KEY HISTORY
if(mp_pubImpl->getAttributes().topic.getTopicKind() == NO_KEY)
{
if(this->add_change_(change, wparams, max_blocking_time))
#if HAVE_STRICT_REALTIME
if (this->add_change_(change, wparams, max_blocking_time))
#else
if (this->add_change_(change, wparams))
#endif
{
returnedValue = true;
}
Expand All @@ -117,7 +121,7 @@ bool PublisherHistory::add_pub_change(
}
else
{
logWarning(RTPS_HISTORY,"Change not added due to maximum number of samples per instance"<<endl;);
logWarning(RTPS_HISTORY,"Change not added due to maximum number of samples per instance");
}
}
else if (m_historyQos.kind == KEEP_LAST_HISTORY_QOS)
Expand All @@ -138,7 +142,11 @@ bool PublisherHistory::add_pub_change(
if(add)
{
vit->second.cache_changes.push_back(change);
if(this->add_change_(change, wparams, max_blocking_time))
#if HAVE_STRICT_REALTIME
if (this->add_change_(change, wparams, max_blocking_time))
#else
if (this->add_change_(change, wparams))
#endif
{
logInfo(RTPS_HISTORY,this->mp_pubImpl->getGuid().entityId <<" Change "
<< change->sequenceNumber << " added with key: "<<change->instanceHandle
Expand Down Expand Up @@ -181,64 +189,70 @@ bool PublisherHistory::find_key(
return true;
}
}
logWarning(PUBLISHER, "History has reached the maximum number of instances" << endl;)
logWarning(PUBLISHER, "History has reached the maximum number of instances");
}
return false;
}


bool PublisherHistory::removeAllChange(size_t* removed)
bool PublisherHistory::removeAllChange(
size_t* removed)
{

size_t rem = 0;
std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);

while(m_changes.size()>0)
while (m_changes.size() > 0)
{
if(remove_change_pub(m_changes.front()))
if (remove_change_pub(m_changes.front()))
{
++rem;
}
else
{
break;
}
}
if(removed!=nullptr)
if (removed != nullptr)
{
*removed = rem;
}
if (rem>0)
if (rem > 0)
{
return true;
}
return false;
}


bool PublisherHistory::removeMinChange()
{
if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}

std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
if(m_changes.size()>0)
if (m_changes.size() > 0)
{
return remove_change_pub(m_changes.front());
}
return false;
}

bool PublisherHistory::remove_change_pub(CacheChange_t* change)
bool PublisherHistory::remove_change_pub(
CacheChange_t* change)
{

if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}

std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
if(mp_pubImpl->getAttributes().topic.getTopicKind() == NO_KEY)
if (mp_pubImpl->getAttributes().topic.getTopicKind() == NO_KEY)
{
if(this->remove_change(change))
if (this->remove_change(change))
{
m_isHistoryFull = false;
return true;
Expand All @@ -249,29 +263,30 @@ bool PublisherHistory::remove_change_pub(CacheChange_t* change)
else
{
t_m_Inst_Caches::iterator vit;
if(!this->find_key(change,&vit))
if (!this->find_key(change, &vit))
{
return false;
}

for(auto chit = vit->second.cache_changes.begin(); chit!= vit->second.cache_changes.end(); ++chit)
for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
{
if( ((*chit)->sequenceNumber == change->sequenceNumber) && ((*chit)->writerGUID == change->writerGUID) )
if ( ((*chit)->sequenceNumber == change->sequenceNumber) && ((*chit)->writerGUID == change->writerGUID) )
{
if(remove_change(change))
if (remove_change(change))
{
vit->second.cache_changes.erase(chit);
m_isHistoryFull = false;
return true;
}
}
}
logError(PUBLISHER,"Change not found, something is wrong");
logError(PUBLISHER, "Change not found, something is wrong");
}
return false;
}

bool PublisherHistory::remove_change_g(CacheChange_t* a_change)
bool PublisherHistory::remove_change_g(
CacheChange_t* a_change)
{
return remove_change_pub(a_change);
}
Expand All @@ -280,9 +295,9 @@ bool PublisherHistory::set_next_deadline(
const InstanceHandle_t& handle,
const std::chrono::steady_clock::time_point& next_deadline_us)
{
if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}
std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
Expand All @@ -292,7 +307,7 @@ bool PublisherHistory::set_next_deadline(
next_deadline_us_ = next_deadline_us;
return true;
}
else if(mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
else if (mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
{
if (keyed_changes_.find(handle) == keyed_changes_.end())
{
Expand All @@ -307,24 +322,27 @@ bool PublisherHistory::set_next_deadline(
}

bool PublisherHistory::get_next_deadline(
InstanceHandle_t &handle,
std::chrono::steady_clock::time_point &next_deadline_us)
InstanceHandle_t& handle,
std::chrono::steady_clock::time_point& next_deadline_us)
{
if(mp_writer == nullptr || mp_mutex == nullptr)
if (mp_writer == nullptr || mp_mutex == nullptr)
{
logError(RTPS_HISTORY,"You need to create a Writer with this History before using it");
logError(RTPS_HISTORY, "You need to create a Writer with this History before using it");
return false;
}
std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);

if(mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
if (mp_pubImpl->getAttributes().topic.getTopicKind() == WITH_KEY)
{
auto min = std::min_element(keyed_changes_.begin(),
keyed_changes_.end(),
[](
const std::pair<InstanceHandle_t, KeyedChanges> &lhs,
const std::pair<InstanceHandle_t, KeyedChanges> &rhs)
{ return lhs.second.next_deadline_us < rhs.second.next_deadline_us; });
auto min = std::min_element(
keyed_changes_.begin(),
keyed_changes_.end(),
[](
const std::pair<InstanceHandle_t, KeyedChanges>& lhs,
const std::pair<InstanceHandle_t, KeyedChanges>& rhs)
{
return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
});

handle = min->first;
next_deadline_us = min->second.next_deadline_us;
Expand Down