-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdvs_data_converter.py
executable file
·86 lines (64 loc) · 3.12 KB
/
dvs_data_converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/python3
"""
@file dvs_data_converter.py
@brief Script to convert DVS (Dynamic Vision Sensor) data between different formats.
@author Raul Tapia (github.com/raultapia)
"""
import argparse
import os
import rosbag
def format_topic_name(x):
if x.startswith('/'):
return x[1:].replace('/', '_')
else:
return x.replace('/', '_')
def bag2txt(filename, columns, zero_offset, separation=" ", extension="txt"):
bag = rosbag.Bag(filename)
topics = [key for (key, value) in bag.get_type_and_topic_info()[1].items() if value[0] == 'dvs_msgs/EventArray']
t0 = 0
for tpc in topics:
with open(f"{filename[:-4]}.{extension}", 'w') if len(topics) == 1 else open(f"{filename[:-4]}.{format_topic_name(tpc)}.{extension}", 'w') as output_file:
for topic, msg, t in bag.read_messages():
if tpc == topic:
for e in msg.events:
if t0 == 0 and zero_offset:
t0 = e.ts.to_sec()
if columns == 'txyp':
output_file.write(f"{e.ts.to_sec()-t0:.9f}{separation}{e.x}{separation}{e.y}{separation}{int(e.polarity)}\n")
elif columns == 'xytp':
output_file.write(f"{e.x}{separation}{e.y}{separation}{e.ts.to_sec()-t0:.9f}{separation}{int(e.polarity)}\n")
elif columns == 'ptxy':
output_file.write(f"{int(e.polarity)}{separation}{e.ts.to_sec()-t0:.9f}{separation}{e.x}{separation}{e.y}\n")
elif columns == 'pxyt':
output_file.write(f"{int(e.polarity)}{separation}{e.x}{separation}{e.y}{separation}{e.ts.to_sec()-t0:.9f}\n")
def bag2csv(filename, columns, zero_offset):
bag2txt(filename, columns, zero_offset, ",", "csv")
def txt2bag(): # TODO
pass
def csv2bag(): # TODO
pass
def txt2csv(): # TODO
pass
def csv2txt(): # TODO
pass
def check_file(file_path):
if not os.path.isfile(file_path):
raise FileNotFoundError(f"The file '{file_path}' does not exist.")
if not (file_path.endswith('.txt') or file_path.endswith('.csv') or file_path.endswith('.bag')):
raise ValueError(f"The file '{file_path}' is not txt, csv, or bag.")
return file_path[-3:]
def main():
parser = argparse.ArgumentParser(description="DVS data converter")
parser.add_argument('files', metavar='files', type=str, nargs='+', help='file(s) to convert')
parser.add_argument('--output', '-o', type=str, choices=['txt', 'csv', 'bag'], required=True, help='output format')
parser.add_argument('--columns', '-c', type=str, choices=['txyp', 'xytp', 'ptxy', 'pxyt'], default='txyp', help='column order for txt and csv')
parser.add_argument("--zero", '-z', action="store_true", help='add time offset to start at t = 0')
args = parser.parse_args()
for file in args.files:
fn = f"{check_file(file)}2{args.output}"
if fn in globals():
globals()[fn](file, args.columns, args.zero)
else:
raise ValueError("Conversion not defined.")
if __name__ == "__main__":
main()