Skip to content

Commit

Permalink
Merge pull request #1 from locusrobotics/cleanup
Browse files Browse the repository at this point in the history
rosbag_snapshot independent package
  • Loading branch information
DLu authored Apr 17, 2020
2 parents d220827 + 78c2edc commit ab2223e
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 77 deletions.
138 changes: 138 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# rosbag_snapshot

Solution for [this ros_comm issue](https://github.com/ros/ros_comm/issues/1399) which acts similarly to the deprecated `rosrecord -s` command but with additional features. It is added as a new package here rather than patching `rosbag` based on [the discussion here](https://github.com/ros/ros_comm/pull/1414).

It subscribes to topics and maintains a buffer of recent messages like a dash cam. This is useful in live testing where unexpected events can occur which would be useful to have data on but the opportunity is missed if `rosbag record` was not running (disk space limits make always running `rosbag record` impracticable). Instead, users may run snapshot in the background and save data from the recent past to disk as needed.


## Usage

`rosbag_snapshot` can be configured through command line flags and with ROS params for more granular control. By default, the command will run in server mode (buffering data). When certain flags are used, program will act as a client by requesting that the server write data to disk or freezing the buffer to preserve interesting data until a user can decide what to write.

### CLI usage

```
$ rosrun rosbag_snapshot snapshot -h
Usage: snapshot [options] [topic1 topic2 ...]
Buffer recent messages until triggered to write or trigger an already running instance.
Options:
-h [ --help ] produce help message
-t [ --trigger-write ] Write buffer of selected topcis to a bag file
-p [ --pause ] Stop buffering new messages until resumed or
write is triggered
-r [ --resume ] Resume buffering new messages, writing over
older messages as needed
-s [ --size ] arg (=-1) Maximum memory per topic to use in buffering in
MB. Default: no limit
-d [ --duration ] arg (=30) Maximum difference between newest and oldest
buffered message per topic in seconds. Default:
30
-o [ --output-prefix ] arg When in trigger write mode, prepend PREFIX to
name of writting bag file
-O [ --output-filename ] arg When in trigger write mode, exact name of
written bag file
--topic arg Topic to buffer. If triggering write, write only
these topics instead of all buffered topics.
```

###### Hold a buffer of the last 30 seconds of data from selected topics until triggered to write
`rosrun rosbag_snapshot snapshot -d 30 /tf /odom /camera/image_color /camera/camera_info /velodyne_points`

###### Buffer the most recent gigabyte of the following topics in the camera namespace
`ROS_NAMESPACE=camera rosrun rosbag_snapshot snapshot -s 1000 image_rect_color camera_info`


### Example launch file
```
<launch>
<node name="snapshot" pkg="rosbag_snapshot" type="snapshot" args="">
<rosparam>
default_duration_limit: 1 # Maximum time difference between newest and oldest message, seconds, overrides -d flag
default_memory_limit: 0.1 # Maximum memory used by messages in each topic's buffer, MB, override -s flag
topics:
- test1 # Inherit defaults
- test2: # Override duration limit, inherit memory limit
duration: 2
- test3: # Override both limits
duration: -1 # Negative means no duration limit
memory: 0.00
</rosparam>
</node>
</launch>
```

## Client examples

###### Write all buffered data to `<datetime>.bag`
`rosrun rosbag_snapshot snapshot -t`

###### Write buffered data from selected topics to `new_lighting<datetime>.bag`
`rosrun rosbag_snapshot snapshot -t -o new_lighting /camera/image_raw /camera/camera_info`

###### Write all buffered data to `/home/user/crashed_into_wall.bag`
`rosrun rosbag_snapshot snapshot -t -O /home/user/crashed_into_wall`

###### Pause buffering of new data, holding current buffer in memory until -t or -r is used
`rosrun rosbag_snapshot snapshot -p`

###### Resume buffering new data
`rosrun rosbag_snapshot snapshot -r`

###### Call trigger service manually, specifying absolute window start and stop time for written data

```
$ rosservice call /trigger_snapshot "filename: 'short_control_instability.bag'
topics:
- '/tf'
- '/odom'
- '/wrench'
- '/motors/cmd'
start_time:
secs: 62516
nsecs: 0
stop_time:
secs: 62528
nsecs: 0"
```

###### View status of buffered data (useful for future tools/GUI)

```
$ rostopic echo /snapshot_status
topics:
-
topic: "/test"
node_pub: ''
node_sub: "/snapshot_1527185221120605085"
window_start:
secs: 62516
nsecs: 761000000
window_stop:
secs: 62521
nsecs: 984000000
delivered_msgs: 524
dropped_msgs: 0
traffic: 2096
period_mean:
secs: 0
nsecs: 0
period_stddev:
secs: 0
nsecs: 0
period_max:
secs: 0
nsecs: 0
stamp_age_mean:
secs: 0
nsecs: 0
stamp_age_stddev:
secs: 0
nsecs: 0
stamp_age_max:
secs: 0
nsecs: 0
enabled: True
---
```
9 changes: 9 additions & 0 deletions rosbag_snapshot/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ add_executable(snapshot src/snapshot.cpp)
add_dependencies(snapshot ${catkin_EXPORTED_TARGETS})
target_link_libraries(snapshot rosbag_snapshot ${catkin_LIBRARIES})

if(CATKIN_ENABLE_TESTING)
find_package(catkin REQUIRED COMPONENTS roslint rostest)
roslint_python(test)
roslint_cpp()
roslint_add_test()

add_rostest(test/snapshot.test)
endif()

install(TARGETS snapshot
RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
Expand Down
27 changes: 16 additions & 11 deletions rosbag_snapshot/include/rosbag_snapshot/snapshotter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
#ifndef ROSBAG_SNAPSHOT_SNAPSHOTTER_H
#define ROSBAG_SNAPSHOT_SNAPSHOTTER_H

#include <deque>
#include <map>
#include <string>
#include <boost/atomic.hpp>
#include <boost/thread/mutex.hpp>
#include <ros/ros.h>
Expand All @@ -45,9 +42,14 @@
#include <std_srvs/SetBool.h>
#include <topic_tools/shape_shifter.h>
#include <rosgraph_msgs/TopicStatistics.h>
#include <rosbag_snapshot_msgs/SnapshotStatus.h>
#include "rosbag/bag.h"
#include "rosbag/macros.h"
#include <rosbag_msgs/SnapshotStatus.h>
#include <rosbag/bag.h>
#include <rosbag/macros.h>
#include <deque>
#include <map>
#include <string>
#include <utility>
#include <vector>

namespace rosbag_snapshot
{
Expand Down Expand Up @@ -96,7 +98,8 @@ struct ROSBAG_DECL SnapshotterOptions
ros::Duration status_period = ros::Duration(1));

// Add a new topic to the configuration
void addTopic(std::string const& topic, ros::Duration duration_limit = SnapshotterTopicOptions::INHERIT_DURATION_LIMIT,
void addTopic(std::string const& topic,
ros::Duration duration_limit = SnapshotterTopicOptions::INHERIT_DURATION_LIMIT,
int32_t memory_limit = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT);
};

Expand Down Expand Up @@ -134,7 +137,7 @@ class ROSBAG_DECL MessageQueue
boost::shared_ptr<ros::Subscriber> sub_;

public:
MessageQueue(SnapshotterTopicOptions const& options);
explicit MessageQueue(SnapshotterTopicOptions const& options);
// Add a new message to the internal queue if possible, truncating the front of the queue as needed to enforce limits
void push(SnapshotMessage const& msg);
// Removes the message at the front of the queue (oldest) and returns it
Expand Down Expand Up @@ -171,7 +174,7 @@ class ROSBAG_DECL MessageQueue
class ROSBAG_DECL Snapshotter
{
public:
Snapshotter(SnapshotterOptions const& options);
explicit Snapshotter(SnapshotterOptions const& options);
// Sets up callbacks and spins until node is killed
int run();

Expand Down Expand Up @@ -207,7 +210,8 @@ class ROSBAG_DECL Snapshotter
void topicCB(const ros::MessageEvent<topic_tools::ShapeShifter const>& msg_event,
boost::shared_ptr<MessageQueue> queue);
// Service callback, write all of part of the internal buffers to a bag file according to request parameters
bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request& req, rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
bool triggerSnapshotCb(rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
// Service callback, enable or disable recording (storing new messages into queue). Used to pause before writing
bool enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res);
// Set recording_ to false and do nessesary cleaning, CALLER MUST OBTAIN LOCK
Expand All @@ -219,7 +223,8 @@ class ROSBAG_DECL Snapshotter
// Write the parts of message_queue within the time constraints of req to the queue
// If returns false, there was an error opening/writing the bag and an error message was written to res.message
bool writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, std::string const& topic,
rosbag_snapshot_msgs::TriggerSnapshot::Request& req, rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
rosbag_snapshot_msgs::TriggerSnapshot::Request& req,
rosbag_snapshot_msgs::TriggerSnapshot::Response& res);
};

// Configuration for SnapshotterClient
Expand Down
2 changes: 1 addition & 1 deletion rosbag_snapshot/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<name>rosbag_snapshot</name>
<version>0.0.0</version>
<description>The rosbag_snapshot package</description>

<author email="kma1660+rsp@gmail.com">Kevin Allen</author>
<maintainer email="davidvlu@gmail.com">David V. Lu!!</maintainer>

<license>BSD</license>
Expand Down
35 changes: 21 additions & 14 deletions rosbag_snapshot/src/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@
* POSSIBILITY OF SUCH DAMAGE.
********************************************************************/

#include "rosbag_snapshot/snapshotter.h"
#include "rosbag/exceptions.h"
#include <rosbag_snapshot/snapshotter.h>
#include <rosbag/exceptions.h>

#include "boost/program_options.hpp"
#include <boost/program_options.hpp>
#include <string>
#include <sstream>
#include <vector>

namespace po = boost::program_options;

Expand Down Expand Up @@ -66,11 +67,15 @@ bool parseOptions(po::variables_map& vm, int argc, char** argv)
("trigger-write,t", "Write buffer of selected topcis to a bag file")
("pause,p", "Stop buffering new messages until resumed or write is triggered")
("resume,r", "Resume buffering new messages, writing over older messages as needed")
("size,s", po::value<double>()->default_value(-1), "Maximum memory per topic to use in buffering in MB. Default: no limit")
("duration,d", po::value<double>()->default_value(30.0), "Maximum difference between newest and oldest buffered message per topic in seconds. Default: 30")
("output-prefix,o", po::value<std::string>()->default_value(""), "When in trigger write mode, prepend PREFIX to name of writting bag file")
("size,s", po::value<double>()->default_value(-1),
"Maximum memory per topic to use in buffering in MB. Default: no limit")
("duration,d", po::value<double>()->default_value(30.0),
"Maximum difference between newest and oldest buffered message per topic in seconds. Default: 30")
("output-prefix,o", po::value<std::string>()->default_value(""),
"When in trigger write mode, prepend PREFIX to name of writting bag file")
("output-filename,O", po::value<std::string>(), "When in trigger write mode, exact name of written bag file")
("topic", po::value<std::vector<std::string> >(), "Topic to buffer. If triggering write, write only these topics instead of all buffered topics.");
("topic", po::value<std::vector<std::string> >(),
"Topic to buffer. If triggering write, write only these topics instead of all buffered topics.");
// clang-format on
po::positional_options_description p;
p.add("topic", -1);
Expand All @@ -82,13 +87,13 @@ bool parseOptions(po::variables_map& vm, int argc, char** argv)
}
catch (boost::program_options::error const& e)
{
std::cout << "rosbag snapshot: " << e.what() << std::endl;
std::cout << "rosbag_snapshot: " << e.what() << std::endl;
return false;
}

if (vm.count("help"))
{
std::cout << "Usage: rosbag snapshot [options] [topic1 topic2 ...]" << std::endl
std::cout << "Usage: rosrun rosbag_snapshot snapshot [options] [topic1 topic2 ...]" << std::endl
<< std::endl
<< "Buffer recent messages until triggered to write or trigger an already running instance." << std::endl
<< std::endl;
Expand All @@ -103,10 +108,12 @@ bool parseVariablesMap(SnapshotterOptions& opts, po::variables_map const& vm)
if (vm.count("topic"))
{
std::vector<std::string> topics = vm["topic"].as<std::vector<std::string> >();
BOOST_FOREACH (std::string& str, topics)
for (const std::string& str : topics)
{
opts.addTopic(str);
}
}
opts.default_memory_limit_ = int(MB_TO_BYTES * vm["size"].as<double>());
opts.default_memory_limit_ = static_cast<int>(MB_TO_BYTES * vm["size"].as<double>());
opts.default_duration_limit_ = ros::Duration(vm["duration"].as<double>());
return true;
}
Expand All @@ -132,7 +139,7 @@ bool parseVariablesMapClient(SnapshotterClientOptions& opts, po::variables_map c

/* Read configured topics and limits from ROS params
* TODO: use exceptions instead of asserts to follow style conventions
* See snapshot.test in test_rosbag for an example
* See snapshot.test for an example
*/
void appendParamOptions(ros::NodeHandle& nh, SnapshotterOptions& opts)
{
Expand All @@ -142,7 +149,7 @@ void appendParamOptions(ros::NodeHandle& nh, SnapshotterOptions& opts)
// Override program options for default limits if the parameters are set.
double tmp;
if (nh.getParam("default_memory_limit", tmp))
opts.default_memory_limit_ = int(MB_TO_BYTES * tmp);
opts.default_memory_limit_ = static_cast<int>(MB_TO_BYTES * tmp);
if (nh.getParam("default_duration_limit", tmp))
opts.default_duration_limit_ = ros::Duration(tmp);

Expand Down Expand Up @@ -195,7 +202,7 @@ void appendParamOptions(ros::NodeHandle& nh, SnapshotterOptions& opts)
if (mem_limit.getType() == XmlRpcValue::TypeDouble)
{
double mb = mem_limit;
mem = int(MB_TO_BYTES * mb);
mem = static_cast<int>(MB_TO_BYTES * mb);
}
else if (mem_limit.getType() == XmlRpcValue::TypeInt)
{
Expand Down
Loading

0 comments on commit ab2223e

Please sign in to comment.