import os import sys import numpy as np import tqdm from pathlib import Path import json import base64 import argparse import matplotlib import matplotlib.pyplot as plt import matplotlib.cm as cm import io from PIL import Image from mcap.writer import Writer # Add project root and ISOLATE paths project_root = Path(__file__).parent.parent sys.path.append(str(project_root)) sys.path.append(str(project_root / 'scripts' / 'ISOLATE')) try: from scripts.ISOLATE.model_wrapper import ShenronRadarModel except ImportError as e: print(f"Error: Failed to import ShenronRadarModel. Ensure scripts/ISOLATE/model_wrapper.py exists. ({e})") sys.exit(1) # Official Foxglove JSON Schemas FOXGLOVE_POSE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Pose", "title": "foxglove.Pose", "type": "object", "properties": { "position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}}, "orientation": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}} } } FOXGLOVE_IMAGE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.CompressedImage", "title": "foxglove.CompressedImage", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "data": {"type": "string", "contentEncoding": "base64"}, "format": {"type": "string"} } } FOXGLOVE_PCL_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.PointCloud", "title": "foxglove.PointCloud", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "pose": FOXGLOVE_POSE_SCHEMA, "point_stride": {"type": "integer"}, "fields": { "type": "array", "items": {"type": "object", "properties": {"name": {"type": "string"}, "offset": {"type": "integer"}, "type": {"type": "integer"}}} }, "data": {"type": "string", "contentEncoding": "base64"} } } FOXGLOVE_METRICS_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Telemetry", "title": "foxglove.Telemetry", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "metrics": {"type": "object", "additionalProperties": {"type": "number"}} } } def render_heatmap(data, vmin=None, vmax=None, cmap='viridis'): """Converts a 2D numpy array to a colormapped PNG base64 string.""" if data is None or data.size == 0: return None # Simple log scaling if needed? For now we assume input is power or magnitude # Normalize to 0-1 if vmin is None: vmin = np.min(data) if vmax is None: vmax = np.max(data) if vmax > vmin: norm_data = (data - vmin) / (vmax - vmin) else: norm_data = np.zeros_like(data) # Apply colormap (Updated to use modern matplotlib.colormaps API) color_mapped = matplotlib.colormaps[cmap](norm_data) # [H, W, 4] # Convert to 8-bit RGB rgb = (color_mapped[:, :, :3] * 255).astype(np.uint8) img = Image.fromarray(rgb) buffered = io.BytesIO() img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode("ascii") def load_frames(folder_path): with open(os.path.join(folder_path, "frames.jsonl")) as f: for line in f: yield json.loads(line) def run_testbench(iter_name): # Setup directories debug_dir = project_root / 'Shenron_debug' logs_dir = debug_dir / 'logs' iter_dir = debug_dir / 'iterations' / iter_name if not logs_dir.exists(): print(f"[ERROR] Required base folder {logs_dir} not found!") return lidar_dir = logs_dir / 'lidar' if not lidar_dir.exists(): print(f"[ERROR] Required base folder {lidar_dir} not found!") return iter_dir.mkdir(parents=True, exist_ok=True) radar_types = ['awrl1432', 'radarbook'] print(f"\n======================================") print(f"SHENRON TESTBENCH ITERATION: {iter_name}") print(f"======================================") # 1. GENERATE SYNTHETIC DATA print("\n[Stage 1]: Processing Physics models...") models = {} for r_type in radar_types: try: print(f" -> Initializing {r_type} engine...") models[r_type] = ShenronRadarModel(radar_type=r_type) (iter_dir / r_type).mkdir(exist_ok=True) # Create Metrology folders met_base = iter_dir / r_type / "metrology" for sub in ["rd", "ra", "cfar"]: (met_base / sub).mkdir(parents=True, exist_ok=True) except Exception as e: print(f" -> [WARNING] Failed to init {r_type}: {e}") lidar_files = sorted(list(lidar_dir.glob("*.npy"))) for lidar_file in tqdm.tqdm(lidar_files, desc=" Simulating Radars", unit="frame"): data = np.load(lidar_file) # Pad to [x, y, z, intensity, cos_inc_angle, obj, tag] if needed if data.shape[1] == 6: padded_data = np.zeros((data.shape[0], 7), dtype=np.float32) padded_data[:, 0:3] = data[:, 0:3] padded_data[:, 4:7] = data[:, 3:6] data = padded_data for r_type, model in models.items(): try: rich_pcd = model.process(data) out_path = iter_dir / r_type / lidar_file.name np.save(out_path, rich_pcd) # --- PHASES 1 & 3: Save Raw Metrology (.npy) --- met = model.get_last_metrology() if met: frame_name = lidar_file.stem # e.g., frame_000200 np.save(iter_dir / r_type / "metrology" / "rd" / f"{frame_name}.npy", met['rd_heatmap']) np.save(iter_dir / r_type / "metrology" / "ra" / f"{frame_name}.npy", met['ra_heatmap']) np.save(iter_dir / r_type / "metrology" / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) # Log Metrics metrics = model.get_signal_metrics() with open(iter_dir / r_type / "metrology" / "metrics.jsonl", "a") as mf: mf.write(json.dumps({"frame": frame_name, **metrics}) + "\n") except Exception as e: print(f"[ERROR] Frame {lidar_file.name} failed for {r_type}: {e}") # 2. GENERATE MCAP print("\n[Stage 2]: Weaving MCAP Comparison Package...") output_mcap = iter_dir / f"{iter_name}.mcap" with open(output_mcap, "wb") as f: writer = Writer(f) writer.start(profile="foxglove") # Register Schemas pose_schema_id = writer.register_schema(name="foxglove.Pose", encoding="jsonschema", data=json.dumps(FOXGLOVE_POSE_SCHEMA).encode()) camera_schema_id = writer.register_schema(name="foxglove.CompressedImage", encoding="jsonschema", data=json.dumps(FOXGLOVE_IMAGE_SCHEMA).encode()) lidar_schema_id = writer.register_schema(name="foxglove.PointCloud", encoding="jsonschema", data=json.dumps(FOXGLOVE_PCL_SCHEMA).encode()) # Register Channels channels = { 'camera': writer.register_channel(topic="/camera", message_encoding="json", schema_id=camera_schema_id), 'camera_tpp': writer.register_channel(topic="/camera_tpp", message_encoding="json", schema_id=camera_schema_id), 'lidar': writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id), 'native_radar': writer.register_channel(topic="/radar/native", message_encoding="json", schema_id=lidar_schema_id), 'ego_pose': writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id) } shenron_channels = {} metrology_channels = {} for r_type in radar_types: shenron_channels[r_type] = writer.register_channel(topic=f"/radar/{r_type}", message_encoding="json", schema_id=lidar_schema_id) # Register Metrology Channels metrology_channels[r_type] = { "rd": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_doppler", message_encoding="json", schema_id=camera_schema_id), "ra": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth", message_encoding="json", schema_id=camera_schema_id), "cfar": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/cfar_mask", message_encoding="json", schema_id=camera_schema_id), "telemetry": writer.register_channel(topic=f"/radar/{r_type}/metrics", message_encoding="json", schema_id=writer.register_schema(name="foxglove.Telemetry", encoding="jsonschema", data=json.dumps(FOXGLOVE_METRICS_SCHEMA).encode())) } try: frames_gen = load_frames(logs_dir) frames = list(frames_gen) except Exception as e: print(f"[ERROR] Could not load frames.jsonl from {logs_dir}: {e}") return for frame in tqdm.tqdm(frames, desc=" Packaging Frames", unit="frame"): ts_ns = int(frame["timestamp"] * 1e9) ts_sec = ts_ns // 1_000_000_000 ts_nsec = ts_ns % 1_000_000_000 raw_pose = frame["ego_pose"] x, y, z = raw_pose["x"], -raw_pose["y"], raw_pose["z"] yaw_rad = -np.radians(raw_pose.get("yaw", 0)) ego_world_pose = {"position": {"x": x, "y": y, "z": z}, "orientation": {"x": 0.0, "y": 0.0, "z": float(np.sin(yaw_rad/2)), "w": float(np.cos(yaw_rad/2))}} identity_pose = {"position": {"x": 0.0, "y": 0.0, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} # POSE writer.add_message(channels['ego_pose'], log_time=ts_ns, data=json.dumps(ego_world_pose).encode(), publish_time=ts_ns) # CAMERA cam_path = logs_dir / "camera" / frame["camera"] if cam_path.exists(): with open(cam_path, "rb") as img_f: img_bytes = img_f.read() cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")} writer.add_message(channels['camera'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns) # CAMERA TPP if "camera_tpp" in frame: cam_tpp_path = logs_dir / "camera_tpp" / frame["camera_tpp"] if cam_tpp_path.exists(): with open(cam_tpp_path, "rb") as img_f: img_bytes = img_f.read() cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")} writer.add_message(channels['camera_tpp'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns) # LIDAR lidar_p = logs_dir / "lidar" / frame["lidar"] if lidar_p.exists(): points = np.load(lidar_p) # Robustness: Handle 6-column (Old) vs 7-column (Modern with Velocity) data if points.shape[1] == 6: # Pad to 7-column [x, y, z, vel, cos, obj, tag] # Note: obj and tag columns [4,5] are actually uint32 bitstreams padded = np.zeros((points.shape[0], 7), dtype=np.float32) padded[:, 0:3] = points[:, 0:3] padded[:, 4] = points[:, 3] # cos (pure float) padded[:, 5] = points[:, 4].view(np.uint32).astype(np.float32) # real object id padded[:, 6] = points[:, 5].view(np.uint32).astype(np.float32) # real semantic tag ros_points = padded else: ros_points = points.copy().astype(np.float32) # For newer 7-col data: [x,y,z,vel,cos,obj,tag] # We still need to fix the obj/tag bits at [5,6] ros_points[:, 5] = ros_points[:, 5].view(np.uint32).astype(np.float32) ros_points[:, 6] = ros_points[:, 6].view(np.uint32).astype(np.float32) ros_points[:, 1] = -ros_points[:, 1] # RHS conversion # MOUNT OFFSET: LiDAR is on the roof (Z=2.5) lidar_pose = {"position": {"x": 0.0, "y": 0.0, "z": 2.5}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} lidar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": lidar_pose, "point_stride": 28, "fields": [ {"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"cos_inc_angle","offset":16,"type":7}, {"name":"object_id","offset":20,"type":7}, {"name":"semantic_tag","offset":24,"type":7} ], "data": base64.b64encode(ros_points.tobytes()).decode("ascii") } writer.add_message(channels['lidar'], log_time=ts_ns, data=json.dumps(lidar_msg).encode(), publish_time=ts_ns) # NATIVE RADAR radar_p = logs_dir / "radar" / frame["radar"] if radar_p.exists(): r_data = np.load(radar_p) if r_data.size > 0: dist, az, alt, vel = r_data[:, 0], -r_data[:, 1], r_data[:, 2], r_data[:, 3] xr, yr, zr = dist*np.cos(az)*np.cos(alt), dist*np.sin(az)*np.cos(alt), dist*np.sin(alt) radar_points = np.stack([xr, yr, zr, vel], axis=1).astype(np.float32) # MOUNT OFFSET: Native Radar is on the front bumper (X=2.0, Z=1.0) radar_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} radar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": radar_pose, "point_stride": 16, "fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}], "data": base64.b64encode(radar_points.tobytes()).decode("ascii") } writer.add_message(channels['native_radar'], log_time=ts_ns, data=json.dumps(radar_msg).encode(), publish_time=ts_ns) # SHENRON RADARS shenron_fname = f"frame_{int(frame['frame_id']):06d}.npy" for r_type in radar_types: s_path = iter_dir / r_type / shenron_fname if s_path.exists(): s_data = np.load(s_path) if s_data.size > 0: ros_shenron = s_data.copy().astype(np.float32) ros_shenron[:, 1] = -ros_shenron[:, 1] # Negate Y for ROS # MOUNT OFFSET: Shenron Radars use the same pose as native (X=2.0, Z=1.0) shenron_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} shenron_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": shenron_pose, "point_stride": 20, "fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"magnitude","offset":16,"type":7}], "data": base64.b64encode(ros_shenron.tobytes()).decode("ascii") } writer.add_message(shenron_channels[r_type], log_time=ts_ns, data=json.dumps(shenron_msg).encode(), publish_time=ts_ns) # --- PHASE 2: Stream Metrology Visuals --- met_folder = iter_dir / r_type / "metrology" rd_p = met_folder / "rd" / shenron_fname ra_p = met_folder / "ra" / shenron_fname cf_p = met_folder / "cfar" / shenron_fname if rd_p.exists(): rd_data = np.load(rd_p) # Flip UD so Range 0 (ego) is at the bottom b64 = render_heatmap(np.log10(np.flipud(rd_data) + 1e-9), cmap='viridis') if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["rd"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) if ra_p.exists(): ra_data = np.load(ra_p) # Flip UD so Range 0 (ego) is at the bottom b64 = render_heatmap(np.log10(np.flipud(ra_data) + 1e-9), cmap='magma') # Magma for top-down contrast if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) if cf_p.exists(): cf_data = np.load(cf_p) # Flip UD so Range 0 (ego) is at the bottom b64 = render_heatmap(np.log10(np.flipud(cf_data) + 1e-9), cmap='plasma') # Plasma for threshold mask if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["cfar"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) writer.finish() print(f"\n[SUCCESS] Iteration packaged to: {output_mcap}") print(f"File size: {os.path.getsize(output_mcap)/1024/1024:.2f} MB") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Shenron Physics Iteration Testbench") parser.add_argument("--iter", required=True, help="Name of the current debug iteration (e.g., 01_baseline)") args = parser.parse_args() run_testbench(args.iter)