import os import json import math import datetime from types import SimpleNamespace from concurrent.futures import ThreadPoolExecutor import numpy as np import cv2 class Recorder: def __init__(self, base_path="data", scenario_name="run"): # Folder name: _YYYYMMDD_HHMMSS → easy to identify in file explorer timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") session_name = f"{scenario_name}_{timestamp}" self.base_path = os.path.join(base_path, session_name) self.camera_path = os.path.join(self.base_path, "camera") self.camera_tpp_path = os.path.join(self.base_path, "camera_tpp") self.radar_path = os.path.join(self.base_path, "radar") self.lidar_path = os.path.join(self.base_path, "lidar") self.meta_file = os.path.join(self.base_path, "frames.jsonl") os.makedirs(self.camera_path, exist_ok=True) os.makedirs(self.camera_tpp_path, exist_ok=True) os.makedirs(self.radar_path, exist_ok=True) os.makedirs(self.lidar_path, exist_ok=True) self.frame_id = 0 self.meta_f = open(self.meta_file, "w") # Performance optimization: Background thread pool for image saving # This prevents cv2.imwrite from blocking the main simulation tick self.executor = ThreadPoolExecutor(max_workers=4) print(f"--- Recorder initialized. Saving data to: {self.base_path} ---") def _async_save_image(self, path, img): """Helper to save image in a background thread.""" cv2.imwrite(path, img) # ---------------- SAVE FRAME ---------------- def save(self, cam, cam_tpp, radar, lidar, vehicle, extra_meta: dict = None): self.frame_id += 1 # -------- CAMERA (Dash) -------- img = np.frombuffer(cam.raw_data, dtype=np.uint8) img = img.reshape((cam.height, cam.width, 4))[:, :, :3] cam_file = f"frame_{self.frame_id:06d}.png" cam_path = os.path.join(self.camera_path, cam_file) # Background write self.executor.submit(self._async_save_image, cam_path, img) # -------- CAMERA (TPP) -------- img_tpp = np.frombuffer(cam_tpp.raw_data, dtype=np.uint8) img_tpp = img_tpp.reshape((cam_tpp.height, cam_tpp.width, 4))[:, :, :3] cam_tpp_file = f"frame_{self.frame_id:06d}.png" cam_tpp_path = os.path.join(self.camera_tpp_path, cam_tpp_file) # Background write self.executor.submit(self._async_save_image, cam_tpp_path, img_tpp) # -------- RADAR -------- radar_points = [] for d in radar: radar_points.append([ d.depth, d.azimuth, d.altitude, d.velocity ]) if radar_points: radar_np = np.array(radar_points) else: radar_np = np.empty((0, 4)) radar_file = f"frame_{self.frame_id:06d}.npy" radar_path = os.path.join(self.radar_path, radar_file) np.save(radar_path, radar_np) # -------- LIDAR (Semantic) -------- # Dynamic reshape to handle semantic columns: [x, y, z, cos, obj, tag] lidar_data = np.frombuffer(lidar.raw_data, dtype=np.float32) total_points = len(lidar) if total_points > 0: lidar_data = np.reshape(lidar_data, (total_points, -1)) else: lidar_data = np.empty((0, 6)) # Default to 6 columns for semantic lidar_file = f"frame_{self.frame_id:06d}.npy" lidar_path = os.path.join(self.lidar_path, lidar_file) np.save(lidar_path, lidar_data) # -------- EGO POSE -------- transform = vehicle.get_transform() # -------- GROUND TRUTH ACTORS -------- world = vehicle.get_world() # Track both vehicles and pedestrians actors = list(world.get_actors().filter("vehicle.*")) + \ list(world.get_actors().filter("walker.*")) gt_actors = [] ego_loc = transform.location ego_vel = vehicle.get_velocity() for actor in actors: if actor.id == vehicle.id: continue # skip ego # 1. Physical State t = actor.get_transform() v = actor.get_velocity() a = actor.get_acceleration() # 2. Geometry bb = actor.bounding_box extent = bb.extent # half-size # 3. Relative Metrics (ADAS) rel_metrics = self._calculate_relative_metrics(vehicle, actor) gt_actors.append({ "id": actor.id, "class": self._get_actor_class(actor), "type": actor.type_id, "transform": { "x": t.location.x, "y": t.location.y, "z": t.location.z, "yaw": t.rotation.yaw, "pitch": t.rotation.pitch, "roll": t.rotation.roll }, "velocity": { "vx": v.x, "vy": v.y, "vz": v.z, "speed": math.sqrt(v.x**2 + v.y**2 + v.z**2) }, "acceleration": { "ax": a.x, "ay": a.y, "az": a.z }, "bounding_box": { "l": extent.x * 2, "w": extent.y * 2, "h": extent.z * 2 }, "relative": rel_metrics }) # -------- RECORD -------- record = { "frame_id": self.frame_id, "timestamp": cam.timestamp, "ego_pose": { "x": transform.location.x, "y": transform.location.y, "z": transform.location.z, "yaw": transform.rotation.yaw }, "camera": cam_file, "camera_tpp": cam_tpp_file, "radar": radar_file, "lidar": lidar_file, "ground_truth": gt_actors, } # Merge scenario metadata (e.g. {"scenario": "braking", "brake_frame": 80}) if extra_meta: record.update(extra_meta) self.meta_f.write(json.dumps(record) + "\n") self.meta_f.flush() # safer in case of crash # ---------------- CLEANUP ---------------- def close(self): print(f"[INFO] Finalizing recording... Waiting for background image tasks.") self.executor.shutdown(wait=True) # New: Generate MP4 videos automatically self._generate_videos() self.meta_f.close() def _generate_videos(self): """Stitch captured frames into .mp4 files for easy viewing.""" print(f"[INFO] Generating video previews...", flush=True) # We'll do this for both cameras configs = [ (self.camera_path, "maneuver_dash.mp4"), (self.camera_tpp_path, "maneuver_tpp.mp4") ] for folder, out_name in configs: images = sorted([img for img in os.listdir(folder) if img.endswith(".png")]) if not images: continue first_frame = cv2.imread(os.path.join(folder, images[0])) h, w, _ = first_frame.shape out_path = os.path.join(self.base_path, out_name) # Using mp4v codec for broad compatibility fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(out_path, fourcc, 20.0, (w, h)) for img_name in images: frame = cv2.imread(os.path.join(folder, img_name)) out.write(frame) out.release() print(f" - Video saved: {out_name}", flush=True) # ---------------- HELPERS ---------------- def _get_actor_class(self, actor) -> str: """Categorise actor into broad ADAS classes.""" type_id = actor.type_id if "walker" in type_id: return "pedestrian" if "vehicle" in type_id: if "bicycle" in type_id or "motorcycle" in type_id: return "vru" # Vulnerable Road User return "vehicle" return "unknown" def _calculate_relative_metrics(self, ego, npc) -> dict: """ Calculate relative range, azimuth, and closing velocity. Uses right-handed coordinate projections where necessary. """ e_t = ego.get_transform() n_t = npc.get_transform() e_v = ego.get_velocity() n_v = npc.get_velocity() # 1. Range (Euclidean distance) rel_pos_v = n_t.location - e_t.location rng = rel_pos_v.length() if rng < 0.1: # avoid div by zero return {"range": rng, "azimuth": 0.0, "closing_velocity": 0.0} # 2. Azimuth (Local angle in degrees) # Project relative position into ego's local frame # CARLA forward is X+, Right is Y+ (left-handed) forward = e_t.get_forward_vector() # Simple dot-product for angle (cosine similarity) # We use atan2 for full 360 bearing # rel_pos_v in ego local coordinates rel_pos_local = self._to_local_location(e_t, n_t.location) azimuth = math.degrees(math.atan2(rel_pos_local.y, rel_pos_local.x)) # 3. Closing Velocity (V_c) # Projection of relative velocity onto the unit line-of-sight vector # V_c = -(V_npc - V_ego) . (P_npc - P_ego) / |P_npc - P_ego| rel_vel = n_v - e_v unit_los = rel_pos_v / rng closing_vel = -(rel_vel.x * unit_los.x + rel_vel.y * unit_los.y + rel_vel.z * unit_los.z) return { "range": round(rng, 3), "azimuth": round(azimuth, 3), "closing_velocity": round(closing_vel, 3) } def _to_local_location(self, transform, location): """Helper to convert world location to transform-local location.""" # Simple translation and rotation inverse # CARLA locations also have raw_data but subtraction is easier rel_loc = location - transform.location # Inverse rotation (Yaw only for simplicity of horizontal metrics) yaw_rad = math.radians(transform.rotation.yaw) c, s = math.cos(yaw_rad), math.sin(yaw_rad) # Rotate rel_loc vector by -yaw lx = rel_loc.x * c + rel_loc.y * s ly = -rel_loc.x * s + rel_loc.y * c return SimpleNamespace(x=lx, y=ly, z=rel_loc.z)