import os import sys import numpy as np import tqdm from pathlib import Path import json import base64 import argparse import io from PIL import Image from mcap.writer import Writer # Official Foxglove JSON Schemas FOXGLOVE_POSE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Pose", "title": "foxglove.Pose", "type": "object", "properties": { "position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}}, "orientation": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}} } } FOXGLOVE_SCENE_UPDATE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.SceneUpdate", "title": "foxglove.SceneUpdate", "type": "object", "properties": { "entities": { "type": "array", "items": { "type": "object", "properties": { "id": {"type": "string"}, "frame_id": {"type": "string"}, "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "lines": { "type": "array", "items": { "type": "object", "properties": { "type": {"type": "integer"}, "pose": FOXGLOVE_POSE_SCHEMA, "points": { "type": "array", "items": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}} }, "thickness": {"type": "number"}, "color": {"type": "object", "properties": {"r": {"type": "number"}, "g": {"type": "number"}, "b": {"type": "number"}, "a": {"type": "number"}}} } } } } } } } } # Add project root and ISOLATE paths project_root = Path(__file__).parent.parent sys.path.append(str(project_root)) sys.path.append(str(project_root / 'scripts' / 'ISOLATE')) try: from scripts.ISOLATE.model_wrapper import ShenronRadarModel except ImportError as e: print(f"Error: Failed to import ShenronRadarModel. Ensure scripts/ISOLATE/model_wrapper.py exists. ({e})") sys.exit(1) # Official Foxglove JSON Schemas FOXGLOVE_POSE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Pose", "title": "foxglove.Pose", "type": "object", "properties": { "position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}}, "orientation": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}} } } FOXGLOVE_IMAGE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.CompressedImage", "title": "foxglove.CompressedImage", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "data": {"type": "string", "contentEncoding": "base64"}, "format": {"type": "string"} } } FOXGLOVE_PCL_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.PointCloud", "title": "foxglove.PointCloud", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "pose": FOXGLOVE_POSE_SCHEMA, "point_stride": {"type": "integer"}, "fields": { "type": "array", "items": {"type": "object", "properties": {"name": {"type": "string"}, "offset": {"type": "integer"}, "type": {"type": "integer"}}} }, "data": {"type": "string", "contentEncoding": "base64"} } } FOXGLOVE_METRICS_SCHEMA = { "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "peak_magnitude": {"type": "number"}, "avg_noise_floor": {"type": "number"}, "peak_snr_db": {"type": "number"}, "active_bins": {"type": "number"}, "cfar_target_count": {"type": "number"}, "farthest_target_m": {"type": "number"}, "closest_target_m": {"type": "number"}, "mean_absolute_doppler": {"type": "number"}, "doppler_variance": {"type": "number"}, "dynamic_range_db": {"type": "number"}, "ego_vicinity_power": {"type": "number"}, "clutter_ratio": {"type": "number"}, "signal_to_clutter_ratio_db": {"type": "number"}, "azimuth_variance": {"type": "number"} } } from sim_radar_utils.plots import render_heatmap, FastHeatmapEngine, postprocess_ra, scan_convert_ra def load_frames(folder_path): with open(os.path.join(folder_path, "frames.jsonl")) as f: for line in f: yield json.loads(line) def run_testbench(iter_name): # Setup directories debug_dir = project_root / 'Shenron_debug' logs_dir = debug_dir / 'logs' iter_dir = debug_dir / 'iterations' / iter_name if not logs_dir.exists(): print(f"[ERROR] Required base folder {logs_dir} not found!") return lidar_dir = logs_dir / 'lidar' if not lidar_dir.exists(): print(f"[ERROR] Required base folder {lidar_dir} not found!") return iter_dir.mkdir(parents=True, exist_ok=True) radar_types = ['awrl1432', 'radarbook', 'ti_cascade'] print(f"\n======================================") print(f"SHENRON TESTBENCH ITERATION: {iter_name}") print(f"======================================") # 1. GENERATE SYNTHETIC DATA print("\n[Stage 1]: Processing Physics models...") from scripts.ISOLATE.shenron_orchestrator import ShenronOrchestrator orchestrator = ShenronOrchestrator(radar_types=radar_types) radar_specs = orchestrator.init_models(iter_dir) lidar_files = sorted(list(lidar_dir.glob("*.npy"))) if args.frames and args.frames > 0: print(f" [INFO] Limiting to first {args.frames} frames as requested.") lidar_files = lidar_files[:args.frames] for lidar_file in tqdm.tqdm(lidar_files, desc=" Simulating Radars", unit="frame"): orchestrator.process_frame(lidar_file, iter_dir, save_adc=False) # 2. GENERATE MCAP print("\n[Stage 2]: Weaving MCAP Comparison Package...") output_mcap = iter_dir / f"{iter_name}.mcap" with open(output_mcap, "wb") as f: writer = Writer(f) writer.start(profile="foxglove") # Register Schemas pose_schema_id = writer.register_schema(name="foxglove.Pose", encoding="jsonschema", data=json.dumps(FOXGLOVE_POSE_SCHEMA).encode()) camera_schema_id = writer.register_schema(name="foxglove.CompressedImage", encoding="jsonschema", data=json.dumps(FOXGLOVE_IMAGE_SCHEMA).encode()) lidar_schema_id = writer.register_schema(name="foxglove.PointCloud", encoding="jsonschema", data=json.dumps(FOXGLOVE_PCL_SCHEMA).encode()) telemetry_schema_id = writer.register_schema(name="TelemetryMetrics", encoding="jsonschema", data=json.dumps(FOXGLOVE_METRICS_SCHEMA).encode()) scene_update_schema_id = writer.register_schema(name="foxglove.SceneUpdate", encoding="jsonschema", data=json.dumps(FOXGLOVE_SCENE_UPDATE_SCHEMA).encode()) # Register Channels channels = { 'camera': writer.register_channel(topic="/camera", message_encoding="json", schema_id=camera_schema_id), 'camera_tpp': writer.register_channel(topic="/camera_tpp", message_encoding="json", schema_id=camera_schema_id), 'lidar': writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id), 'native_radar': writer.register_channel(topic="/radar/native", message_encoding="json", schema_id=lidar_schema_id), 'ego_pose': writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id) } shenron_channels = {} metrology_channels = {} for r_type in radar_types: shenron_channels[r_type] = writer.register_channel(topic=f"/radar/{r_type}", message_encoding="json", schema_id=lidar_schema_id) # Register Metrology Channels metrology_channels[r_type] = { "rd": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_doppler", message_encoding="json", schema_id=camera_schema_id), "ra": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth", message_encoding="json", schema_id=camera_schema_id), "ra_dynamic": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth_dynamic", message_encoding="json", schema_id=camera_schema_id), "cfar": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/cfar_mask", message_encoding="json", schema_id=camera_schema_id), "telemetry": writer.register_channel(topic=f"/radar/{r_type}/metrics", message_encoding="json", schema_id=telemetry_schema_id), "frustum": writer.register_channel(topic=f"/radar/{r_type}/fov_frustum", message_encoding="json", schema_id=scene_update_schema_id) } try: frames_gen = load_frames(logs_dir) frames = list(frames_gen) if args.frames and args.frames > 0: frames = frames[:args.frames] except Exception as e: print(f"[ERROR] Could not load frames.jsonl from {logs_dir}: {e}") return # Pre-load telemetry maps to avoid doing O(N^2) disk reads telemetry_cache = {} cached_axes = {} render_engines = {} for r_type in radar_types: metrics_path = iter_dir / r_type / "metrology" / "metrics.jsonl" telemetry_cache[r_type] = {} if metrics_path.exists(): with open(metrics_path, "r") as mf: for line in mf: ld = json.loads(line) if "frame" in ld: telemetry_cache[r_type][ld["frame"]] = {k: v for k, v in ld.items() if k != "frame"} range_ax_p = iter_dir / r_type / "metrology" / "range_axis.npy" angle_ax_p = iter_dir / r_type / "metrology" / "angle_axis.npy" if range_ax_p.exists() and angle_ax_p.exists(): cached_axes[r_type] = { 'range_axis': np.load(range_ax_p), 'angle_axis': np.load(angle_ax_p), } else: cached_axes[r_type] = None # Initialize the stateful Matplotlib renderers for extreme throughput f_cfg = orchestrator.models[r_type].radar_obj.f if r_type in orchestrator.models else 77e9 chirp_rep_cfg = orchestrator.models[r_type].radar_obj.chirp_rep if r_type in orchestrator.models else 3e-5 max_vel_cfg = (3e8 / f_cfg) / (4 * chirp_rep_cfg) max_r_cfg = cached_axes[r_type]['range_axis'][-1] if cached_axes[r_type] else 150 display_limit_cfg = 120.0 render_engines[r_type] = { 'rd': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='viridis', title=f'{r_type.upper()} Range-Doppler', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='nearest'), 'cfar': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='plasma', title=f'{r_type.upper()} CFAR Noise Threshold', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='nearest'), 'ra_static': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', vmin=-5, vmax=55, title=f'{r_type.upper()} Range-Azimuth (Absolute)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal'), 'ra_dyn': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', title=f'{r_type.upper()} Range-Azimuth (Dynamic)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal') } for frame in tqdm.tqdm(frames, desc=" Packaging Frames", unit="frame"): ts_ns = int(frame["timestamp"] * 1e9) ts_sec = ts_ns // 1_000_000_000 ts_nsec = ts_ns % 1_000_000_000 raw_pose = frame["ego_pose"] x, y, z = raw_pose["x"], -raw_pose["y"], raw_pose["z"] yaw_rad = -np.radians(raw_pose.get("yaw", 0)) ego_world_pose = {"position": {"x": x, "y": y, "z": z}, "orientation": {"x": 0.0, "y": 0.0, "z": float(np.sin(yaw_rad/2)), "w": float(np.cos(yaw_rad/2))}} identity_pose = {"position": {"x": 0.0, "y": 0.0, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} # POSE writer.add_message(channels['ego_pose'], log_time=ts_ns, data=json.dumps(ego_world_pose).encode(), publish_time=ts_ns) # CAMERA cam_path = logs_dir / "camera" / frame["camera"] if cam_path.exists(): with open(cam_path, "rb") as img_f: img_bytes = img_f.read() cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")} writer.add_message(channels['camera'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns) # CAMERA TPP if "camera_tpp" in frame: cam_tpp_path = logs_dir / "camera_tpp" / frame["camera_tpp"] if cam_tpp_path.exists(): with open(cam_tpp_path, "rb") as img_f: img_bytes = img_f.read() cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")} writer.add_message(channels['camera_tpp'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns) # LIDAR lidar_p = logs_dir / "lidar" / frame["lidar"] if lidar_p.exists(): points = np.load(lidar_p) # Robustness: Handle 6-column (Old) vs 7-column (Modern with Velocity) data if points.shape[1] == 6: # Pad to 7-column [x, y, z, vel, cos, obj, tag] # Note: obj and tag columns [4,5] are actually uint32 bitstreams padded = np.zeros((points.shape[0], 7), dtype=np.float32) padded[:, 0:3] = points[:, 0:3] padded[:, 4] = points[:, 3] # cos (pure float) padded[:, 5] = points[:, 4].view(np.uint32).astype(np.float32) # real object id padded[:, 6] = points[:, 5].view(np.uint32).astype(np.float32) # real semantic tag ros_points = padded else: ros_points = points.copy().astype(np.float32) # For newer 7-col data: [x,y,z,vel,cos,obj,tag] # We still need to fix the obj/tag bits at [5,6] ros_points[:, 5] = ros_points[:, 5].view(np.uint32).astype(np.float32) ros_points[:, 6] = ros_points[:, 6].view(np.uint32).astype(np.float32) ros_points[:, 1] = -ros_points[:, 1] # RHS conversion # MOUNT OFFSET: LiDAR is on the roof (Z=2.5) lidar_pose = {"position": {"x": 0.0, "y": 0.0, "z": 2.5}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} lidar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": lidar_pose, "point_stride": 28, "fields": [ {"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"cos_inc_angle","offset":16,"type":7}, {"name":"object_id","offset":20,"type":7}, {"name":"semantic_tag","offset":24,"type":7} ], "data": base64.b64encode(ros_points.tobytes()).decode("ascii") } writer.add_message(channels['lidar'], log_time=ts_ns, data=json.dumps(lidar_msg).encode(), publish_time=ts_ns) # NATIVE RADAR radar_p = logs_dir / "radar" / frame["radar"] if radar_p.exists(): r_data = np.load(radar_p) if r_data.size > 0: dist, az, alt, vel = r_data[:, 0], -r_data[:, 1], r_data[:, 2], r_data[:, 3] xr, yr, zr = dist*np.cos(az)*np.cos(alt), dist*np.sin(az)*np.cos(alt), dist*np.sin(alt) radar_points = np.stack([xr, yr, zr, vel], axis=1).astype(np.float32) # MOUNT OFFSET: Native Radar is on the front bumper (X=2.0, Z=1.0) radar_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} radar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": radar_pose, "point_stride": 16, "fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}], "data": base64.b64encode(radar_points.tobytes()).decode("ascii") } writer.add_message(channels['native_radar'], log_time=ts_ns, data=json.dumps(radar_msg).encode(), publish_time=ts_ns) # SHENRON RADARS shenron_fname = f"frame_{int(frame['frame_id']):06d}.npy" for r_type in radar_types: s_path = iter_dir / r_type / shenron_fname if s_path.exists(): s_data = np.load(s_path) if s_data.size > 0: ros_shenron = s_data.copy().astype(np.float32) ros_shenron[:, 1] = -ros_shenron[:, 1] # Negate Y for ROS # MOUNT OFFSET: Shenron Radars use the same pose as native (X=2.0, Z=1.0) shenron_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} shenron_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": shenron_pose, "point_stride": 20, "fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"magnitude","offset":16,"type":7}], "data": base64.b64encode(ros_shenron.tobytes()).decode("ascii") } writer.add_message(shenron_channels[r_type], log_time=ts_ns, data=json.dumps(shenron_msg).encode(), publish_time=ts_ns) # --- PHASE 2: Stream Metrology Visuals --- met_folder = iter_dir / r_type / "metrology" rd_p = met_folder / "rd" / shenron_fname ra_p = met_folder / "ra" / shenron_fname cf_p = met_folder / "cfar" / shenron_fname if rd_p.exists(): rd_data = np.load(rd_p) # Apply log conversion minus identical system gain offset to maintain -5 to 45 scaling # Simple 10*log10 - SYSTEM_GAIN_OFFSET rd_db = 10 * np.log10(np.clip(rd_data, 1e-9, None)) - 78.0 # Updated offset: +10dB from gain recalibration # Flip UD so Range 0 (ego) is at the bottom. Use Cached Renderer. b64 = render_engines[r_type]['rd'].render(np.flipud(rd_db)) if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["rd"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) if ra_p.exists(): ra_data = np.load(ra_p) axes = cached_axes.get(r_type) if axes is not None: # Apply full post-processing chain (log, R² compensation, clutter, normalize, smooth) ra_processed = postprocess_ra(ra_data, axes['range_axis'], smooth_sigma=0.0) # Disabled smoothing as per focus fix # Polar Sector BEV plot — geometrically accurate # Project using 512x512 resolution constrained entirely to the 120m boundary to avoid pixelation display_rng_limit = 120.0 bev_data = scan_convert_ra(ra_processed, axes['range_axis'], axes['angle_axis'], img_size=512, max_display_range=display_rng_limit) # 1. PRIMARY PLOT: Static fixed bounds for 1:1 magnitude tracking over time b64_static = render_engines[r_type]['ra_static'].render(bev_data) if b64_static: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64_static} writer.add_message(metrology_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) # 2. HIGHLIGHT PLOT: Dynamic bounds to track the peak signature without external thresholds b64_dynamic = render_engines[r_type]['ra_dyn'].render(bev_data) if b64_dynamic: msg_dyn = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64_dynamic} writer.add_message(metrology_channels[r_type]["ra_dynamic"], log_time=ts_ns, data=json.dumps(msg_dyn).encode(), publish_time=ts_ns) else: # Fallback: rectangular log plot (no axis info available) b64 = render_heatmap(np.log10(np.flipud(ra_data) + 1e-9), cmap='magma') if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) if cf_p.exists(): cf_data = np.load(cf_p) # Convert threshold power floor to pure threshold DB mask similar to RD cf_db = 10 * np.log10(np.clip(cf_data, 1e-9, None)) - 78.0 # Flip UD so Range 0 (ego) is at the bottom # Revert to dynamic (None) scaling so threshold logic is easily visible b64 = render_engines[r_type]['cfar'].render(np.flipud(cf_db)) if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["cfar"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) # --- PHASE 3: Telemetry Stream --- telemetry_row = telemetry_cache[r_type].get(shenron_fname.replace('.npy', '')) if telemetry_row: # Flattening message natively so Foxglove Plot panel can instantly locate numbers dynamically map /radar/.../metrics.peak_magnitude telemetry_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", **telemetry_row } writer.add_message(metrology_channels[r_type]["telemetry"], log_time=ts_ns, data=json.dumps(telemetry_msg).encode(), publish_time=ts_ns) # --- PHASE 4: 3D Hardware FOV Frustum (Audited Geometry & Visibility) --- axes = cached_axes.get(r_type) if axes is not None: # 1. Base Physical Constraints (Use Hardware Specs for the "Box") # Some range axes go to FFT-limit, we cap the visual frustum to a reasonable effective range max_r = 150.0 # Hardware Azimuth/Elevation typicals az_limit_deg = 75.0 el_limit_deg = 15.0 if r_type == "awrl1432": az_limit_deg = 75.0 el_limit_deg = 20.0 elif r_type == "radarbook": az_limit_deg = 60.0 el_limit_deg = 10.0 # Convert to Radians az_rad = np.radians(az_limit_deg) el_rad = np.radians(el_limit_deg) # 2. Vertex Calculation (Planar Far-Face Project at X = max_r) # This ensures the frustum extends the full distance forward on the boresight. # Origin is (0,0,0) in the Entity frame # Corners (TL, TR, BL, BR) # Carla LHS: X=Forward, Y=Right, Z=Up c = [ [0.0, 0.0, 0.0], # V0: Origin [max_r, -max_r * np.tan(az_rad), max_r * np.tan(el_rad)], # V1: TL (NegAz, PosEl) [max_r, max_r * np.tan(az_rad), max_r * np.tan(el_rad)], # V2: TR (PosAz, PosEl) [max_r, -max_r * np.tan(az_rad), -max_r * np.tan(el_rad)], # V3: BL (NegAz, NegEl) [max_r, max_r * np.tan(az_rad), -max_r * np.tan(el_rad)], # V4: BR (PosAz, NegEl) ] # RHS Conversion: [X, -Y, Z] rhs = [{"x": float(v[0]), "y": float(-v[1]), "z": float(v[2])} for v in c] # 3. One-Time Verification Log if frame == frames[0]: tqdm.tqdm.write(f"\n [AUDIT] {r_type.upper()} Frustum Calculation:") tqdm.tqdm.write(f" - Spec: Az ±{az_limit_deg}°, El ±{el_limit_deg}°, Range {max_r}m") tqdm.tqdm.write(f" - V1 (RHS TL): X={rhs[1]['x']:.1f}, Y={rhs[1]['y']:.1f}, Z={rhs[1]['z']:.1f}") tqdm.tqdm.write(f" - range_axis[0, -1]: {axes['range_axis'][0]:.1f}, {axes['range_axis'][-1]:.1f}") tqdm.tqdm.write(f" - angle_axis (rad) [0, -1]: {axes['angle_axis'][0]:.3f}, {axes['angle_axis'][-1]:.3f}") # 4. Connect primitives (LINE_LIST pairs) line_points = [ rhs[0], rhs[1], rhs[0], rhs[2], rhs[0], rhs[3], rhs[0], rhs[4], # Beams rhs[1], rhs[2], rhs[2], rhs[4], rhs[4], rhs[3], rhs[3], rhs[1] # Face ] color_map = { "awrl1432": {"r": 1.0, "g": 0.5, "b": 0.0, "a": 1.0}, # Solid Orange "radarbook": {"r": 0.0, "g": 1.0, "b": 1.0, "a": 1.0}, # Solid Cyan "ti_cascade": {"r": 0.0, "g": 1.0, "b": 0.0, "a": 1.0} # Solid Green } f_color = color_map.get(r_type, {"r": 1.0, "g": 1.0, "b": 1.0, "a": 1.0}) frustum_msg = { "entities": [{ "id": f"radar_fov_{r_type}", "frame_id": "ego_vehicle", "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "lines": [{ "type": 1, # LINE_LIST "pose": {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}, "points": line_points, "thickness": 0.5, "color": f_color }] }] } writer.add_message(metrology_channels[r_type]["frustum"], log_time=ts_ns, data=json.dumps(frustum_msg).encode(), publish_time=ts_ns) writer.finish() print(f"\n[SUCCESS] Iteration packaged to: {output_mcap}") print(f"File size: {os.path.getsize(output_mcap)/1024/1024:.2f} MB") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Shenron Physics Iteration Testbench") parser.add_argument("--iter", required=True, help="Name of the current debug iteration (e.g., 01_baseline)") parser.add_argument("--frames", type=int, default=0, help="Number of frames to process (0 for all)") args = parser.parse_args() run_testbench(args.iter)