Skip to content

Commit

Permalink
move heartbeat check into its own thread
Browse files Browse the repository at this point in the history
  • Loading branch information
iche033 committed Apr 17, 2020
1 parent aa3b61b commit 296de9e
Showing 1 changed file with 44 additions and 30 deletions.
74 changes: 44 additions & 30 deletions subt_ros/src/BridgeLogger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#include <chrono>
#include <mutex>
#include <thread>
#include <fstream>
#include <ros/ros.h>
#include <std_msgs/String.h>
Expand Down Expand Up @@ -51,6 +52,9 @@ class BridgeLogger
private: void OnSensorMsg(const topic_tools::ShapeShifter::ConstPtr &_msg,
const std::string &_topic);

/// \brief Check heartbeat for all topics monitored
private: void CheckHeartbeat();

/// \brief ROS node handler.
private: ros::NodeHandle n;

Expand All @@ -70,10 +74,13 @@ class BridgeLogger
/// \brief Timer that triggers the update function.
private: ros::Timer updateTimer;

/// \brief A map of topic names and time when their last heartbeat was
/// received.
private: std::map<std::string,
std::chrono::time_point<std::chrono::steady_clock>> topicTimestamp;

private: std::chrono::time_point<std::chrono::steady_clock> lastHeartbeatTime;
/// \brief Check heartbeat in separate thread
private: std::thread heartbeatThread;
};

/////////////////////////////////////////////////
Expand All @@ -84,6 +91,8 @@ BridgeLogger::BridgeLogger()

this->updateTimer = this->n.createTimer(ros::Duration(10.0),
&BridgeLogger::Update, this);

this->heartbeatThread = std::thread(&BridgeLogger::CheckHeartbeat, this);
}

/////////////////////////////////////////////////
Expand All @@ -102,7 +111,6 @@ void BridgeLogger::Update(const ros::TimerEvent &)

// Skip if the topic has already been added to `this->streams` or if
// the topic is one that we don't monitor, such as parameter updates.

if (this->streams.find(info.name) != this->streams.end() ||
info.name.find("parameter_descriptions") != std::string::npos ||
info.name.find("parameter_updates") != std::string::npos ||
Expand Down Expand Up @@ -156,7 +164,6 @@ void BridgeLogger::Update(const ros::TimerEvent &)
this->subscribers.push_back(
this->n.subscribe(info.name, 1000, callback));
}

}
}

Expand Down Expand Up @@ -218,33 +225,6 @@ void BridgeLogger::OnSensorMsg(const topic_tools::ShapeShifter::ConstPtr &_msg,
return;
}

// check heartbeat
auto tdiff = systemTime - this->lastHeartbeatTime;
auto seconds =
std::chrono::duration_cast<std::chrono::milliseconds>(tdiff).count() /
1000.0;
if (seconds > 5.0)
{
// check topic heartbeat
for (auto &tIt : this->streams)
{
std::string topic = tIt.first;
auto it = this->topicTimestamp.find(topic);
if (it != this->topicTimestamp.end())
{
auto d = systemTime - it->second;
seconds =
std::chrono::duration_cast<std::chrono::milliseconds>(d).count() /
1000.0;
if (seconds > 5.0)
{
this->streams[topic] << "***Error: No messages received for more than"
<< " 5 seconds ***\n";
}
}
}
this->lastHeartbeatTime = systemTime;
}
// log msg time received in wall clock time
this->topicTimestamp[_topic] = systemTime;

Expand All @@ -262,6 +242,40 @@ void BridgeLogger::OnSensorMsg(const topic_tools::ShapeShifter::ConstPtr &_msg,
this->prevSeq[_topic] = seq;
}

/////////////////////////////////////////////////
void BridgeLogger::CheckHeartbeat()
{
// check heartbeat
using namespace std::chrono_literals;
double heartbeatCheckPeriod = 5.0;
while (ros::ok)
{
std::this_thread::sleep_for(5s);
auto systemTime = std::chrono::steady_clock::now();
{
std::lock_guard<std::mutex> lock(this->mutex);
// check topic heartbeat
for (auto &tIt : this->streams)
{
std::string topic = tIt.first;
auto it = this->topicTimestamp.find(topic);
if (it != this->topicTimestamp.end())
{
auto d = systemTime - it->second;
auto seconds =
std::chrono::duration_cast<std::chrono::milliseconds>(d).count() /
1000.0;
if (seconds > heartbeatCheckPeriod)
{
this->streams[topic] << "***Error: No messages received for more "
<< "than 5 seconds ***\n";
}
}
}
}
}
}

/////////////////////////////////////////////////
int main(int argc, char** argv)
{
Expand Down

0 comments on commit 296de9e

Please sign in to comment.