import pandas as pd import numpy as np import cv2 import os # 深度相机的内参 depth_intrinsics = { 'width': 640, 'height': 360, 'ppx': 318.72125244140625, 'ppy': 175.43511962890625, 'fx': 326.4121398925781, 'fy': 326.4121398925781, 'model': 'brown_conrady', 'coeffs': [0, 0, 0, 0, 0] } # RGB相机的内参 rgb_intrinsics = { 'width': 640, 'height': 360, 'ppx': 324.22979736328125, 'ppy': 178.41847229003906, 'fx': 322.05242919921875, 'fy': 321.74606323242188, 'model': 'brown_conrady', 'coeffs': [-0.053237937390804291, 0.061944413930177689, -0.000904313754290342, 0.000950822024606168, -0.019032796844840050] } # 从深度相机到RGB相机的外参矩阵 extrinsics = np.array([ [0.999996206217029227, 0.001707809844067704, -0.002161235083288896, -0.059238668531179428], [-0.001709124363593386, 0.999998355509077008, -0.000606525392216498, 0.000298385828500614], [0.002160195699121914, 0.000610216910727095, 0.999997480591757970, -0.000162546217325144], [0, 0, 0, 1] ]) # 输入目录路径 csv_folder = r'D:\D455\Color_vedio_ske' rgb_folder_base = r'D:\D455\Color_image0' depth_folder_base = r'D:\D455\Depth_image0' output_folder = r'D:\D455\Output_dele' # 创建输出文件夹(如果不存在) os.makedirs(output_folder, exist_ok=True) # 预期的列名 column_names = ['frame', 'person_id', 'nose_y', 'nose_x', 'nose_confidence', 'left_eye_y', 'left_eye_x', 'left_eye_confidence', 'right_eye_y', 'right_eye_x', 'right_eye_confidence', 'left_ear_y', 'left_ear_x', 'left_ear_confidence', 'right_ear_y', 'right_ear_x', 'right_ear_confidence', 'left_shoulder_y', 'left_shoulder_x', 'left_shoulder_confidence', 'right_shoulder_y', 'right_shoulder_x', 'right_shoulder_confidence', 'left_elbow_y', 'left_elbow_x', 'left_elbow_confidence', 'right_elbow_y', 'right_elbow_x', 'right_elbow_confidence', 'left_wrist_y', 'left_wrist_x', 'left_wrist_confidence', 'right_wrist_y', 'right_wrist_x', 'right_wrist_confidence', 'left_hip_y', 'left_hip_x', 'left_hip_confidence', 'right_hip_y', 'right_hip_x', 'right_hip_confidence', 'left_knee_y', 'left_knee_x', 'left_knee_confidence', 'right_knee_y', 'right_knee_x', 'right_knee_confidence', 'left_ankle_y', 'left_ankle_x', 'left_ankle_confidence', 'right_ankle_y', 'right_ankle_x', 'right_ankle_confidence'] # 畸变矫正函数 def undistort_image(image, intrinsics): """ 对图像进行去畸变处理 """ camera_matrix = np.array([ [intrinsics['fx'], 0, intrinsics['ppx']], [0, intrinsics['fy'], intrinsics['ppy']], [0, 0, 1] ]) dist_coeffs = np.array(intrinsics['coeffs']) # 畸变参数 # 使用 OpenCV 去畸变 undistorted_image = cv2.undistort(image, camera_matrix, dist_coeffs) return undistorted_image # 从深度相机坐标系到实际物理坐标的转换 def convert_depth_to_phys_coord_using_realsense(x, y, depth, intrinsics): depth /= 1000.0 # 将深度从毫米转换为米 cx, cy = intrinsics['ppx'], intrinsics['ppy'] fx, fy = intrinsics['fx'], intrinsics['fy'] X = (x - cx) * depth / fx Y = (y - cy) * depth / fy Z = depth return [X, Y, Z] # 对齐深度图像到RGB图像 def align_depth_to_color(depth_image, rgb_image, depth_intrinsics, rgb_intrinsics, extrinsics): """ 根据外参矩阵对齐深度图像到RGB图像 """ depth_height, depth_width = depth_image.shape rgb_height, rgb_width, _ = rgb_image.shape aligned_depth_image = np.zeros((rgb_height, rgb_width), dtype=np.float32) for v in range(depth_height): for u in range(depth_width): depth_value = depth_image[v, u] if depth_value == 0: continue x = (u - depth_intrinsics['ppx']) * depth_value / depth_intrinsics['fx'] y = (v - depth_intrinsics['ppy']) * depth_value / depth_intrinsics['fy'] z = depth_value depth_point = np.array([x, y, z, 1.0]) rgb_point = np.dot(extrinsics, depth_point) u_rgb = int(rgb_point[0] * rgb_intrinsics['fx'] / rgb_point[2] + rgb_intrinsics['ppx']) v_rgb = int(rgb_point[1] * rgb_intrinsics['fy'] / rgb_point[2] + rgb_intrinsics['ppy']) if 0 <= u_rgb < rgb_width and 0 <= v_rgb < rgb_height: aligned_depth_image[v_rgb, u_rgb] = depth_value return aligned_depth_image # 处理 CSV 文件夹中的所有文件 for csv_file in os.listdir(csv_folder): if not csv_file.endswith('.csv'): continue csv_path = os.path.join(csv_folder, csv_file) # 读取 CSV 文件并添加列名 pixel_data = pd.read_csv(csv_path, header=None) pixel_data.columns = column_names # 获取文件名(不带扩展名)用于匹配对应的 RGB 和深度图像文件夹 video_name = os.path.splitext(csv_file)[0] # 构造对应的 RGB 和深度图像文件夹路径 depth_folder = os.path.join(depth_folder_base, video_name) rgb_folder = os.path.join(rgb_folder_base, video_name) # 输出文件路径 output_csv_path = os.path.join(output_folder, f'{csv_file}') results = [] for index, row in pixel_data.iterrows(): frame = int(row['frame']) person_id = row['person_id'] depth_image_path = os.path.join(depth_folder, f'{frame}.png') rgb_image_path = os.path.join(rgb_folder, f'{frame}.png') if not os.path.exists(depth_image_path): print(f"深度图像 {depth_image_path} 不存在") continue if not os.path.exists(rgb_image_path): print(f"RGB图像 {rgb_image_path} 不存在") continue depth_image = cv2.imread(depth_image_path, cv2.IMREAD_ANYDEPTH) rgb_image = cv2.imread(rgb_image_path, cv2.IMREAD_COLOR) if depth_image is None: print(f"深度图像 {depth_image_path} 加载失败") continue if rgb_image is None: print(f"RGB图像 {rgb_image_path} 加载失败") continue # **对 RGB 图像和深度图像进行去畸变处理(新增逻辑)** rgb_image = undistort_image(rgb_image, rgb_intrinsics) depth_image = undistort_image(depth_image, depth_intrinsics) # 对齐深度图像到 RGB 图像 aligned_depth_image = align_depth_to_color(depth_image, rgb_image, depth_intrinsics, rgb_intrinsics, extrinsics) coordinates = [frame, person_id] for part in ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"]: y, x = int(row[f'{part}_y']), int(row[f'{part}_x']) if 0 <= y < aligned_depth_image.shape[0] and 0 <= x < aligned_depth_image.shape[1]: depth_value = aligned_depth_image[y, x] if depth_value > 0: coord = convert_depth_to_phys_coord_using_realsense(x, y, depth_value, rgb_intrinsics) else: coord = [None, None, None] else: coord = [None, None, None] coordinates.extend(coord) results.append(coordinates) columns = ['frame', 'person_id'] + [f'{part}_{axis}' for part in ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"] for axis in ['x', 'y', 'z']] results_df = pd.DataFrame(results, columns=columns) results_df.to_csv(output_csv_path, index=False) print(f'处理完成: {output_csv_path}')