-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadc_load.py
308 lines (238 loc) · 12.3 KB
/
adc_load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
Author : Shelta Zhao(赵小棠)
Email : xiaotang_zhao@outlook.com
Copyright (C) : NJU DisLab, 2025.
Description : Load & Get regular raw radar data.
"""
import os
import yaml
import sys
import numpy as np
import pandas as pd
import concurrent.futures
from datetime import datetime
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))
from handler.param_process import get_radar_params
def get_regular_data(data_path, readObj, frame_idx='all', timestamp=False, save=False, load=False):
"""
Head Function: Get regular raw radar data.
Parameters:
data_path (str): Path to the directory containing binary files.
readObj (dict): Dictionary containing radar parameters.
frame_idx (str): Option to load all frames or a specific frame (start from 1). Default: 'all'.
timestamp (bool): Option to extract timestamp from log file. Default: False.
save (bool): Option to save the regular data in file. note : save only when frame_idx is 'all'.
load (bool): Option to load the regular data from raw data path.
Returns:
np.ndarray: Regular raw radar data. Shape: (num_frames, num_samples, num_chirps, num_rx, num_tx)
int: Extracted timestamp in milliseconds if timestamp is True.
"""
# Load regular data if required
if load:
try:
regular_data = np.load(f"{data_path}/regular_raw_data.npy")
print(f"Regular data has been loaded from {data_path}/regular_raw_data.npy.")
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {data_path}/regular_raw_data.npy")
except ValueError as e:
raise ValueError(f"Error loading numpy array from file: {data_path}/regular_raw_data.npy. Details: {e}")
except Exception as e:
raise RuntimeError(f"An unexpected error occurred while loading {data_path}/regular_raw_data.npy. Details: {e}")
else:
# Get bin file frames and file handles
bin_file_frames, file_handles = get_num_frames(data_path, readObj['dataSizeOneFrame'])
# Load raw data
if frame_idx == 'all':
time_domain_datas = load_all_frames(bin_file_frames, file_handles, readObj['dataSizeOneFrame'])
else:
time_domain_datas = load_one_frame(int(frame_idx), bin_file_frames, file_handles, readObj['dataSizeOneFrame'])
# Generate regular data
regular_data = generate_regular_data(readObj, time_domain_datas)
# Save the regular data if required
if save and frame_idx == 'all':
file_path = f"{data_path}/regular_raw_data.npy"
if os.path.exists(file_path):
os.remove(file_path)
np.save(file_path, regular_data)
print(f"Regular data has been saved to {file_path}.")
# Extract timestamp if required
if timestamp:
timestamp = get_timestamps(data_path)
return regular_data, timestamp
else:
return regular_data
def get_timestamps(data_path):
"""
Extract the timestamp from the log file in the specified directory.
Parameters:
data_path (str): Path to the directory containing log files.
Returns:
int: The extracted timestamp in milliseconds.
"""
# Get log file names in the directory
log_file_name = [f for f in os.listdir(data_path) if f.endswith('.csv')][0]
# Read timestamps from the log file
log_file = pd.read_csv(os.path.join(data_path, log_file_name), skiprows=1, usecols=[0], on_bad_lines='skip', index_col=None)
for _, row in log_file.iterrows():
if row.str.contains('Capture start time').any():
raw_timestamp = row.str.split(' - ').iloc[0][1]
timestamp = datetime.strptime(raw_timestamp, "%a %b %d %H:%M:%S %Y")
break
# Return the timestamps in milliseconds
return int(timestamp.timestamp() * 1000)
def get_num_frames(data_path, data_size_one_frame):
"""
Calculate the total number of frames and frames per binary file, and open the files.
Parameters:
data_path (str): Binary file paths.
data_size_one_frame (int): Size of one frame in bytes.
Returns:
tuple: (bin_file_frames, file_handles)
- bin_file_frames: List of number of frames per binary file.
- file_handles: List of file handles for opened binary files.
"""
# Get all binary files in the directory
bin_file_names = [f for f in os.listdir(data_path) if f.endswith('.bin')]
if len(bin_file_names) == 0:
raise RuntimeError(f"No binary files found in the {data_path}.")
# Get the total number of frames and frames per binary file
bin_file_frames, file_handles = [], []
for _, bin_file_name in enumerate(bin_file_names):
try:
# Open the file
bin_file_path = os.path.join(data_path, bin_file_name)
file_handle = open(bin_file_path, 'rb')
file_handles.append(file_handle)
# Get file size
file_size = os.path.getsize(bin_file_path)
# Calculate number of frames in the binary file
num_frames = file_size // data_size_one_frame
bin_file_frames.append(num_frames)
# Check if the file has enough data for at least one frame
if num_frames == 0:
raise RuntimeError(f"Not enough data in binary file.")
except Exception as e:
raise RuntimeError(f"Error opening or processing file {bin_file_name}: {e}.")
return bin_file_frames, file_handles
def load_one_frame(frame_idx, bin_file_frames, file_handles, data_size_one_frame):
"""
Load one frame of data from the binary files.
Parameters:
frame_idx (int): Index of the frame to load, note that the index starts from 1.
bin_file_frames (list): List of number of frames per binary file.
file_handles (list): List of file handles for opened binary files.
data_size_one_frame (int): Size of one frame in bytes.
Returns:
np.ndarray: Data for one frame. Shape: (1, raw_data_per_frame).
"""
# Check if the frame index is valid
if frame_idx <= 0:
raise ValueError("Frame index starts from 1.")
# Determine which binary file contains the requested frame
fid_idx, curr_frame_idx = -1, 0
num_bin_files = len(bin_file_frames)
for idx in range(num_bin_files):
if frame_idx <= (bin_file_frames[idx] + curr_frame_idx):
fid_idx = idx
break
curr_frame_idx += bin_file_frames[idx]
if fid_idx == -1 or fid_idx >= num_bin_files:
raise ValueError("Frame index out of range for the given binary files.")
# Seek to the frame position in the identified binary file
file_handle = file_handles[fid_idx]
converted_frame_idx = frame_idx - curr_frame_idx - 1
file_handle.seek(converted_frame_idx * data_size_one_frame, os.SEEK_SET)
# Load one frame data
try:
# Read raw data and convert to float32
raw_data = np.fromfile(file_handle, dtype=np.uint16, count=data_size_one_frame // 2).astype(np.float32)
if len(raw_data) * 2 != data_size_one_frame:
raise ValueError(f"Read incorrect data length: {len(raw_data)*2}, expected: {data_size_one_frame}")
# Adjust values greater than 32768 to negative range
time_domain_data = raw_data - (raw_data >= 2**15) * 2**16
return time_domain_data.reshape((1, -1)).astype(np.float32)
except Exception as e:
raise IOError(f"Error reading frame data: {e}")
def load_all_frames(bin_file_frames, file_handles, data_size_one_frame):
"""
Load all frames of data from the binary files.
Parameters:
bin_file_frames (list): List of number of frames per binary file.
file_handles (list): List of file handles for opened binary files.
data_size_one_frame (int): Size of one frame in bytes.
Returns:
np.ndarray: Array containing all frame data. Shape: (num_frames, raw_data_per_frame).
"""
all_frames_data = [] # To store data for all frames
for fid_idx, num_frames_in_file in enumerate(bin_file_frames):
file_handle = file_handles[fid_idx]
try:
# Seek to the beginning of the file
file_handle.seek(0, os.SEEK_SET)
# Load all frames from the current binary file
raw_data = np.fromfile(file_handle, dtype=np.uint16, count=num_frames_in_file * (data_size_one_frame // 2)).astype(np.float32)
if len(raw_data) * 2 != num_frames_in_file * data_size_one_frame:
raise ValueError(f"Incorrect data length in file {fid_idx}: {len(raw_data) * 2}, expected: {num_frames_in_file * data_size_one_frame}")
# Adjust values greater than 32768 to the negative range
time_domain_data = raw_data - (raw_data >= 2**15) * 2**16
# Reshape into frames and append to the list
frames_data = time_domain_data.reshape((num_frames_in_file, -1)).astype(np.float32)
all_frames_data.append(frames_data)
except Exception as e:
raise IOError(f"Error reading frames from file {fid_idx}: {e}")
# Combine data from all binary files
return np.vstack(all_frames_data)
def generate_regular_data(readObj, time_domain_datas):
"""
Generate regular data from the time domain data.
Parameters:
readObj (dict): Dictionary containing radar parameters.
time_domain_datas (np.ndarray): Time domain data. Shape: (num_frames, raw_data_per_frame).
Returns:
np.ndarray: Regular data. Shape: (num_frames, num_samples, num_chirps, num_rx, num_tx)
"""
# Get the number of frames, chirps, and samples
num_frames = time_domain_datas.shape[0]
num_lane, ch_interleave, iq_swap = readObj['numLane'], readObj['chInterleave'], readObj['iqSwap']
num_samples, num_chirps, num_tx, num_rx= readObj['numAdcSamplePerChirp'], readObj['numChirpsPerFrame'], readObj['numTxForMIMO'], readObj['numRxForMIMO']
def process_frame(frame_idx):
# Get the raw data for one frame
raw_data_complex = time_domain_datas[frame_idx, :].squeeze()
# Reshape the time domain data based on the number of lanes
raw_data_reshaped = np.reshape(raw_data_complex, (num_lane * 2, -1), order='F')
raw_data_I = raw_data_reshaped[:num_lane, :].reshape(-1, order='F')
raw_data_Q = raw_data_reshaped[num_lane:, :].reshape(-1, order='F')
frame_data = np.column_stack((raw_data_I, raw_data_Q))
# Swap I/Q if necessary
frame_data[:, [0, 1]] = frame_data[:, [1, 0]] if iq_swap else frame_data[:, [0, 1]]
# Combine I/Q data into complex data
frame_data_complex = frame_data[:, 0] + 1j * frame_data[:, 1]
# Reshape the complex data into regular data : (num_chirp, num_tx, num_rx, num_samples)
frame_data_regular = frame_data_complex.reshape((num_samples * num_rx, num_chirps), order='F').T
reshape_order = (num_chirps, num_rx, num_samples) if ch_interleave == 0 else (num_chirps, num_samples, num_rx)
results = frame_data_regular.reshape(reshape_order, order='F').transpose(0, 2, 1)
# Return the regular data
return results.reshape((num_tx, -1, num_rx, num_samples), order='F').transpose(3, 1, 2, 0)
# Process all frames in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
regular_data = list(executor.map(process_frame, range(num_frames)))
return np.stack(regular_data)
if __name__ == "__main__":
# Parse data config & Get readObj
with open("adc_list.yaml", "r") as file:
data = yaml.safe_load(file)[0]
data_path = os.path.join("data/adc_data", f"{data['prefix']}/{data['index']}")
config_path = os.path.join("data/radar_config", data["config"])
readObj = get_radar_params(config_path, data['radar'])['readObj']
# Test timestamp extraction
timestamp = get_timestamps(data_path)
print(f"Timestamp: {timestamp}")
# Test frame loading
bin_file_frames, file_handles = get_num_frames(data_path, readObj['dataSizeOneFrame'])
time_domain_data = load_one_frame(1, bin_file_frames, file_handles, readObj['dataSizeOneFrame'])
print(type(time_domain_data))
print(time_domain_data.shape)
# Test regular data
regular_data = generate_regular_data(readObj, time_domain_data)
print(type(regular_data))
print(regular_data.shape)