import os import json import base64 import io import numpy as np from PIL import Image import matplotlib from mcap.writer import Writer # Official Foxglove JSON Schemas FOXGLOVE_POSE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Pose", "title": "foxglove.Pose", "type": "object", "properties": { "position": { "type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}} }, "orientation": { "type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}} } } } FOXGLOVE_IMAGE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.CompressedImage", "title": "foxglove.CompressedImage", "type": "object", "properties": { "timestamp": { "type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}} }, "frame_id": {"type": "string"}, "data": {"type": "string", "contentEncoding": "base64"}, "format": {"type": "string"} } } FOXGLOVE_PCL_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.PointCloud", "title": "foxglove.PointCloud", "type": "object", "properties": { "timestamp": { "type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}} }, "frame_id": {"type": "string"}, "pose": FOXGLOVE_POSE_SCHEMA, "point_stride": {"type": "integer"}, "fields": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "offset": {"type": "integer"}, "type": {"type": "integer"} } } }, "data": {"type": "string", "contentEncoding": "base64"} } } FOXGLOVE_METRICS_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Telemetry", "title": "foxglove.Telemetry", "type": "object", "properties": { "timestamp": { "type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}} }, "frame_id": {"type": "string"}, "metrics": {"type": "object", "additionalProperties": {"type": "number"}} } } def render_heatmap(data, cmap='viridis', vmin=None, vmax=None): """Convert 2D array to colormapped B64 PNG with guide-compliant normalization.""" # Step 6: Normalization [0, 1] # If vmin/vmax are provided, use fixed scaling to preserve physical intensity. # Otherwise, fall back to relative normalization (relative to current frame). if vmin is not None and vmax is not None: norm = np.clip((data - vmin) / (vmax - vmin), 0, 1) else: d_min, d_max = np.min(data), np.max(data) if d_max > d_min: norm = (data - d_min) / (d_max - d_min) else: norm = np.zeros_like(data) # Step 7: Apply Radar-style Colormap (Blue-style) # Using matplotlib.cm API for consistency with this script's imports rgba = matplotlib.colormaps[cmap](norm) # (H, W, 4) rgb = (rgba[:, :, :3] * 255).astype(np.uint8) img = Image.fromarray(rgb) buffered = io.BytesIO() img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode("ascii") def postprocess_ra(ra_heatmap, range_axis, smooth_sigma=1.0): """ Refined RA post-processing pipeline for Physical Realism. Restores the natural power decay by removing per-range normalization. """ # 1. Clutter removal (subtract per-range-bin mean to suppress static ground) clutter = np.mean(ra_heatmap, axis=1, keepdims=True) ra = ra_heatmap - (0.8 * clutter) # Subtract context-aware mean ra = np.clip(ra, 1e-9, None) SYSTEM_GAIN_OFFSET = 68.0 ra_db = 10 * np.log10(ra) - SYSTEM_GAIN_OFFSET # 3. Fixed dynamic range clipping (-5 to 45 dB) # This ensures consistent contrast and preserves physical R^-4 decay ra_db = np.clip(ra_db, -5, 45) # 4. Optional Gaussian smoothing to reduce speckle if smooth_sigma > 0: from scipy.ndimage import gaussian_filter ra_db = gaussian_filter(ra_db, sigma=smooth_sigma) return ra_db def scan_convert_ra(ra_heatmap, range_axis, angle_axis, img_size=512): """ Polar-to-Cartesian scan conversion following FIG / Guide logic. Converts RA (Range, Angle) polar data into a 120° Fan-shaped Sector plot. """ max_range = range_axis[-1] theta_min = angle_axis[0] theta_max = angle_axis[-1] # 4. Create Cartesian Grid (X: lateral, Y: forward) # Origin (0,0) will be at bottom-center of the 512x512 image x = np.linspace(-max_range, max_range, img_size) y = np.linspace(max_range, 0, img_size) # Far to Near X, Y = np.meshgrid(x, y) # 4. Convert to Polar Coordinates R_query = np.sqrt(X**2 + Y**2) Theta_query = np.arctan2(X, Y) # 5. Mask Valid Radar FOV (120-degree sector) fov_mask = (Theta_query >= theta_min) & (Theta_query <= theta_max) & (R_query <= max_range) # 5. Map RA Heatmap to Cartesian Grid # Calculate fractional indices r_idx = np.clip(((R_query / max_range) * (ra_heatmap.shape[0] - 1)).astype(int), 0, ra_heatmap.shape[0] - 1) # theta index: Shift by theta_min to align 0..120 range theta_range = theta_max - theta_min theta_idx = np.clip(((Theta_query - theta_min) / theta_range * (ra_heatmap.shape[1] - 1)).astype(int), 0, ra_heatmap.shape[1] - 1) # Project cartesian = np.full((img_size, img_size), np.min(ra_heatmap), dtype=np.float64) cartesian[fov_mask] = ra_heatmap[r_idx[fov_mask], theta_idx[fov_mask]] return cartesian def load_frames(folder_path): with open(os.path.join(folder_path, "frames.jsonl")) as f: for line in f: yield json.loads(line) def convert_folder(folder_path): folder_name = os.path.basename(folder_path) output_path = os.path.join(folder_path, f"{folder_name}.mcap") if os.path.exists(output_path): print(f"\n>>> Skipping folder (MCAP already exists): {folder_name}", flush=True) return print(f"\n>>> Processing folder: {folder_name}", flush=True) print(f"Target MCAP: {output_path}", flush=True) with open(output_path, "wb") as f: writer = Writer(f) writer.start(profile="foxglove") # Register Schemas pose_schema_id = writer.register_schema(name="foxglove.Pose", encoding="jsonschema", data=json.dumps(FOXGLOVE_POSE_SCHEMA).encode()) camera_schema_id = writer.register_schema(name="foxglove.CompressedImage", encoding="jsonschema", data=json.dumps(FOXGLOVE_IMAGE_SCHEMA).encode()) lidar_schema_id = writer.register_schema(name="foxglove.PointCloud", encoding="jsonschema", data=json.dumps(FOXGLOVE_PCL_SCHEMA).encode()) metrics_schema_id = writer.register_schema(name="foxglove.Telemetry", encoding="jsonschema", data=json.dumps(FOXGLOVE_METRICS_SCHEMA).encode()) # Register Channels camera_channel_id = writer.register_channel(topic="/camera", message_encoding="json", schema_id=camera_schema_id) camera_tpp_channel_id = writer.register_channel(topic="/camera_tpp", message_encoding="json", schema_id=camera_schema_id) lidar_channel_id = writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id) pose_channel_id = writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id) radar_channel_id = writer.register_channel(topic="/radar/native", message_encoding="json", schema_id=lidar_schema_id) radar_types = ['awrl1432', 'radarbook'] shenron_channels = {} metrics_channels = {} met_channels = {} cached_axes = {} metrics_lookups = {} for r_type in radar_types: shenron_channels[r_type] = writer.register_channel(topic=f"/radar/{r_type}", message_encoding="json", schema_id=lidar_schema_id) metrics_channels[r_type] = writer.register_channel(topic=f"/radar/{r_type}/metrics", message_encoding="json", schema_id=metrics_schema_id) met_channels[r_type] = { "ra": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth", message_encoding="json", schema_id=camera_schema_id), "rd": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_doppler", message_encoding="json", schema_id=camera_schema_id), "cfar": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/cfar_mask", message_encoding="json", schema_id=camera_schema_id) } # Pre-load axes for scan conversion if they exist met_dir = os.path.join(folder_path, r_type, "metrology") if os.path.exists(met_dir): r_ax_p = os.path.join(met_dir, "range_axis.npy") a_ax_p = os.path.join(met_dir, "angle_axis.npy") if os.path.exists(r_ax_p) and os.path.exists(a_ax_p): cached_axes[r_type] = { 'range_axis': np.load(r_ax_p), 'angle_axis': np.load(a_ax_p) } print(f" - Loaded physical axes for {r_type} visualization.") # Load Metrics Lookup if available metrics_lookups[r_type] = {} metrics_path = os.path.join(met_dir, "metrics.jsonl") if os.path.exists(metrics_path): with open(metrics_path, "r") as mf: for line in mf: m_data = json.loads(line) metrics_lookups[r_type][m_data["frame"]] = m_data print(f" - Loaded {len(metrics_lookups[r_type])} metrics records for {r_type}.") frame_count = 0 for frame in load_frames(folder_path): ts_ns = int(frame["timestamp"] * 1e9) ts_sec = ts_ns // 1_000_000_000 ts_nsec = ts_ns % 1_000_000_000 raw_pose = frame["ego_pose"] x, y, z = raw_pose["x"], -raw_pose["y"], raw_pose["z"] yaw_rad = -np.radians(raw_pose.get("yaw", 0)) ego_world_pose = { "position": {"x": x, "y": y, "z": z}, "orientation": {"x": 0.0, "y": 0.0, "z": float(np.sin(yaw_rad/2)), "w": float(np.cos(yaw_rad/2))} } identity_pose = {"position": {"x": 0.0, "y": 0.0, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} # CAMERA camera_path = os.path.join(folder_path, "camera", frame["camera"]) if os.path.exists(camera_path): with open(camera_path, "rb") as img_f: img_bytes = img_f.read() cam_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png" if frame["camera"].endswith(".png") else "jpeg", "data": base64.b64encode(img_bytes).decode("ascii") } writer.add_message(camera_channel_id, log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns) # CAMERA (TPP) if "camera_tpp" in frame: camera_tpp_path = os.path.join(folder_path, "camera_tpp", frame["camera_tpp"]) if os.path.exists(camera_tpp_path): with open(camera_tpp_path, "rb") as img_f: img_bytes = img_f.read() cam_tpp_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png" if frame["camera_tpp"].endswith(".png") else "jpeg", "data": base64.b64encode(img_bytes).decode("ascii") } writer.add_message(camera_tpp_channel_id, log_time=ts_ns, data=json.dumps(cam_tpp_msg).encode(), publish_time=ts_ns) # EGO POSE writer.add_message(pose_channel_id, log_time=ts_ns, data=json.dumps(ego_world_pose).encode(), publish_time=ts_ns) # LIDAR lidar_path = os.path.join(folder_path, "lidar", frame["lidar"]) if os.path.exists(lidar_path): points = np.load(lidar_path) # Robustness handle 6 vs 7 cols if points.shape[1] == 6: # Pad to [x, y, z, velocity, cos, obj, tag] padded = np.zeros((points.shape[0], 7), dtype=np.float32) padded[:, 0:3] = points[:, 0:3] padded[:, 4] = points[:, 3] # cos padded[:, 5] = points[:, 4].view(np.uint32).astype(np.float32) # obj padded[:, 6] = points[:, 5].view(np.uint32).astype(np.float32) # tag ros_points = padded else: ros_points = points.copy().astype(np.float32) # Correct bits for [x,y,z,vel,cos,obj,tag] ros_points[:, 5] = ros_points[:, 5].view(np.uint32).astype(np.float32) ros_points[:, 6] = ros_points[:, 6].view(np.uint32).astype(np.float32) ros_points[:, 1] = -ros_points[:, 1] # RHS conversion # MOUNT OFFSET: LiDAR is on the roof (Z=2.5) lidar_pose = {"position": {"x": 0.0, "y": 0.0, "z": 2.5}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} lidar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": lidar_pose, "point_stride": 28, "fields": [ {"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"cos_inc_angle","offset":16,"type":7}, {"name":"object_id","offset":20,"type":7}, {"name":"semantic_tag","offset":24,"type":7} ], "data": base64.b64encode(ros_points.tobytes()).decode("ascii") } writer.add_message(lidar_channel_id, log_time=ts_ns, data=json.dumps(lidar_msg).encode(), publish_time=ts_ns) # RADAR radar_path = os.path.join(folder_path, "radar", frame["radar"]) if os.path.exists(radar_path): r_data = np.load(radar_path) if r_data.size > 0: # r_data = [depth, azimuth, altitude, velocity] # We negate azimuth to convert from CARLA (Right-handed for Y) # note: CARLA is actually LHS (X-fwd, Y-right, Z-up) # ROS is RHS (X-fwd, Y-left, Z-up) -> Negating Y converts it. dist, az, alt, vel = r_data[:, 0], -r_data[:, 1], r_data[:, 2], r_data[:, 3] xr, yr, zr = dist*np.cos(az)*np.cos(alt), dist*np.sin(az)*np.cos(alt), dist*np.sin(alt) # Stack X, Y, Z, and Velocity (4 floats = 16 bytes stride) radar_points = np.stack([xr, yr, zr, vel], axis=1).astype(np.float32) # MOUNT OFFSET: Radar is on the bumper (X=2.0, Z=1.0) radar_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} radar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": radar_pose, "point_stride": 16, "fields": [ {"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7} ], "data": base64.b64encode(radar_points.tobytes()).decode("ascii") } writer.add_message(radar_channel_id, log_time=ts_ns, data=json.dumps(radar_msg).encode(), publish_time=ts_ns) # SHENRON RADARS shenron_file = f"frame_{int(frame['frame_id']):06d}.npy" frame_name = f"frame_{int(frame['frame_id']):06d}" for r_type in radar_types: shenron_path = os.path.join(folder_path, r_type, shenron_file) if os.path.exists(shenron_path): s_data = np.load(shenron_path) if s_data.size > 0: # s_data = [x, y, z, velocity, magnitude] # ISOLATE coords: X is fwd, Y is right. # ROS: X is fwd, Y is left. ros_shenron = s_data.copy().astype(np.float32) ros_shenron[:, 1] = -ros_shenron[:, 1] # Negate Y for ROS # MOUNT OFFSET: Shenron Radar is on the bumper (X=2.0, Z=1.0) shenron_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} shenron_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": shenron_pose, "point_stride": 20, # 5 floats * 4 bytes "fields": [ {"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"magnitude","offset":16,"type":7} ], "data": base64.b64encode(ros_shenron.tobytes()).decode("ascii") } writer.add_message(shenron_channels[r_type], log_time=ts_ns, data=json.dumps(shenron_msg).encode(), publish_time=ts_ns) met_dir = os.path.join(folder_path, r_type, "metrology") if os.path.exists(met_dir): # RA (Polar Sector BEV) ra_p = os.path.join(met_dir, "ra", f"{frame_name}.npy") if os.path.exists(ra_p) and r_type in cached_axes: ra_data = np.load(ra_p) axes = cached_axes[r_type] ra_processed = postprocess_ra(ra_data, axes['range_axis'], smooth_sigma=0.0) bev_data = scan_convert_ra(ra_processed, axes['range_axis'], axes['angle_axis'], img_size=512) b64 = render_heatmap(bev_data, cmap='jet', vmin=-5, vmax=45) if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(met_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) # RD (Log-scaled) rd_p = os.path.join(met_dir, "rd", f"{frame_name}.npy") if os.path.exists(rd_p): rd_data = np.log10(np.load(rd_p) + 1e-9) b64 = render_heatmap(np.flipud(rd_data), cmap='viridis') if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(met_channels[r_type]["rd"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) # CFAR (Mask) cfar_p = os.path.join(met_dir, "cfar", f"{frame_name}.npy") if os.path.exists(cfar_p): b64 = render_heatmap(np.flipud(np.load(cfar_p)), cmap='plasma') if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(met_channels[r_type]["cfar"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) # METRICS (Telemetry) if frame_name in metrics_lookups.get(r_type, {}): m_payload = metrics_lookups[r_type][frame_name].copy() m_payload.pop("frame", None) telemetry_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "metrics": m_payload } writer.add_message(metrics_channels[r_type], log_time=ts_ns, data=json.dumps(telemetry_msg).encode(), publish_time=ts_ns) frame_count += 1 if frame_count % 50 == 0: print(f" Processed {frame_count} frames...", flush=True) writer.finish() print(f" Done! MCAP saved: {output_path} ({os.path.getsize(output_path)/1024/1024:.2f} MB)", flush=True) def main(): PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) root_data = os.path.join(PROJECT_ROOT, "data") if not os.path.exists(root_data): print(f"Error: {root_data} directory not found.") return folders = [os.path.join(root_data, d) for d in os.listdir(root_data) if os.path.isdir(os.path.join(root_data, d))] # Also check if 'root_data' itself contains 'frames.jsonl' (legacy single-folder mode) if os.path.exists(os.path.join(root_data, "frames.jsonl")): convert_folder(root_data) for folder in folders: if os.path.exists(os.path.join(folder, "frames.jsonl")): # Check if MCAP already exists and avoid re-processing if you prefer, # but here we'll process all matching folders. convert_folder(folder) if __name__ == "__main__": main()