import os import sys import numpy as np import tqdm from pathlib import Path import json import base64 import argparse import matplotlib import matplotlib.pyplot as plt import matplotlib.cm as cm import io from PIL import Image from mcap.writer import Writer # Add project root and ISOLATE paths project_root = Path(__file__).parent.parent sys.path.append(str(project_root)) sys.path.append(str(project_root / 'scripts' / 'ISOLATE')) try: from scripts.ISOLATE.model_wrapper import ShenronRadarModel except ImportError as e: print(f"Error: Failed to import ShenronRadarModel. Ensure scripts/ISOLATE/model_wrapper.py exists. ({e})") sys.exit(1) # Official Foxglove JSON Schemas FOXGLOVE_POSE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Pose", "title": "foxglove.Pose", "type": "object", "properties": { "position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}}, "orientation": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}} } } FOXGLOVE_IMAGE_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.CompressedImage", "title": "foxglove.CompressedImage", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "data": {"type": "string", "contentEncoding": "base64"}, "format": {"type": "string"} } } FOXGLOVE_PCL_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.PointCloud", "title": "foxglove.PointCloud", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "pose": FOXGLOVE_POSE_SCHEMA, "point_stride": {"type": "integer"}, "fields": { "type": "array", "items": {"type": "object", "properties": {"name": {"type": "string"}, "offset": {"type": "integer"}, "type": {"type": "integer"}}} }, "data": {"type": "string", "contentEncoding": "base64"} } } FOXGLOVE_METRICS_SCHEMA = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": "foxglove.Telemetry", "title": "foxglove.Telemetry", "type": "object", "properties": { "timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}}, "frame_id": {"type": "string"}, "metrics": {"type": "object", "additionalProperties": {"type": "number"}} } } def render_heatmap(data, vmin=None, vmax=None, cmap='viridis'): """Converts a 2D numpy array to a colormapped PNG base64 string.""" if data is None or data.size == 0: return None # Simple log scaling if needed? For now we assume input is power or magnitude # Normalize to 0-1 if vmin is None: vmin = np.min(data) if vmax is None: vmax = np.max(data) if vmax > vmin: norm_data = (data - vmin) / (vmax - vmin) else: norm_data = np.zeros_like(data) # Apply colormap (Updated to use modern matplotlib.colormaps API) color_mapped = matplotlib.colormaps[cmap](norm_data) # [H, W, 4] # Convert to 8-bit RGB rgb = (color_mapped[:, :, :3] * 255).astype(np.uint8) img = Image.fromarray(rgb) buffered = io.BytesIO() img.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode("ascii") from matplotlib.figure import Figure from matplotlib.backends.backend_agg import FigureCanvasAgg class FastHeatmapEngine: """Stateful Matplotlib engine that reuses figure memory to achieve high-speed frame rendering.""" def __init__(self, extent, cmap='jet', vmin=None, vmax=None, title='Range-Azimuth Heatmap', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', xlim=None, ylim=None, aspect='auto', interpolation=None): self.vmin = vmin self.vmax = vmax self.fig = Figure(figsize=(6, 5), dpi=100) self.canvas = FigureCanvasAgg(self.fig) self.ax = self.fig.add_subplot(111) cm_obj = matplotlib.colormaps.get_cmap(cmap).copy() cm_obj.set_bad(color='white') # Dummy matrix to initialize geometry dummy = np.zeros((2, 2)) self.im = self.ax.imshow(dummy, extent=extent, cmap=cm_obj, vmin=vmin, vmax=vmax, origin='upper', aspect=aspect, interpolation=interpolation) if xlim is not None: self.ax.set_xlim(xlim) if ylim is not None: self.ax.set_ylim(ylim) self.ax.set_xlabel(xlabel) self.ax.set_ylabel(ylabel) self.ax.set_title(title) self.ax.grid(color='gray', linestyle='--', linewidth=0.5, alpha=0.7) self.fig.colorbar(self.im, ax=self.ax, label='Magnitude (dB)') self.fig.tight_layout() def render(self, data): self.im.set_data(data) if self.vmin is None and self.vmax is None: v_low, v_high = np.nanmin(data), np.nanmax(data) self.im.set_clim(v_low if not np.isnan(v_low) else 0.0, v_high if not np.isnan(v_high) else 1.0) self.fig.canvas.draw() rgba = np.asarray(self.canvas.buffer_rgba()) img = Image.fromarray(rgba) buf = io.BytesIO() img.save(buf, format='png') return base64.b64encode(buf.getvalue()).decode("ascii") def postprocess_ra(ra_heatmap, range_axis, smooth_sigma=1.0): """ Refined RA post-processing pipeline for Physical Realism. Restores the natural power decay (near objects are brighter) by removing the misleading per-range normalization. Args: ra_heatmap : 2-D ndarray (N_range, N_angle), linear power range_axis : 1-D ndarray, physical range in metres smooth_sigma: float, Gaussian sigma (0 to disable) Returns: 2-D ndarray (N_range, N_angle), processed linear or log units """ # 1. Clutter removal (subtract per-range-bin mean to suppress static ground) # This preserves relative intensity between actual objects clutter = np.mean(ra_heatmap, axis=1, keepdims=True) ra = ra_heatmap - (0.8 * clutter) # Subtract 80% of mean to keep some context ra = np.clip(ra, 1e-9, None) # 2. Physics-based dynamic range compression (Linear -> Log) # Conversion to dB scale with System Gain Calibration (calculated from Iter 28) # This offset maps raw physical power to the diagnostic visual range. SYSTEM_GAIN_OFFSET = 68.0 ra_db = 10 * np.log10(ra) - SYSTEM_GAIN_OFFSET # 3. Fixed dynamic range clipping (-5 to 45 dB) # This ensures consistent contrast and preserves physical R^-4 decay ra_db = np.clip(ra_db, -5, 45) # 4. Optional Gaussian smoothing to reduce speckle if smooth_sigma > 0: from scipy.ndimage import gaussian_filter ra_db = gaussian_filter(ra_db, sigma=smooth_sigma) return ra_db def scan_convert_ra(ra_heatmap, range_axis, angle_axis, img_size=512, max_display_range=None): """ Polar-to-Cartesian scan conversion following FIG / Guide logic. Converts RA (Range, Angle) polar data into a 120° Fan-shaped Sector plot. """ true_max_range = range_axis[-1] max_range = max_display_range if max_display_range is not None else true_max_range theta_min = angle_axis[0] theta_max = angle_axis[-1] # 4. Create Cartesian Grid (X: lateral, Y: forward) # Origin (0,0) will be at bottom-center of the 512x512 image x = np.linspace(-max_range, max_range, img_size) y = np.linspace(max_range, 0, img_size) # Far to Near X, Y = np.meshgrid(x, y) # 4. Convert to Polar Coordinates R_query = np.sqrt(X**2 + Y**2) Theta_query = np.arctan2(X, Y) # 5. Mask Valid Radar FOV (120-degree sector) fov_mask = (Theta_query >= theta_min) & (Theta_query <= theta_max) & (R_query <= max_range) # 5. Map RA Heatmap to Cartesian Grid # Calculate fractional indices based on the true underlying data range r_idx = np.clip(((R_query / true_max_range) * (ra_heatmap.shape[0] - 1)).astype(int), 0, ra_heatmap.shape[0] - 1) # theta index: Shift by theta_min to align 0..120 range theta_range = theta_max - theta_min theta_idx = np.clip(((Theta_query - theta_min) / theta_range * (ra_heatmap.shape[1] - 1)).astype(int), 0, ra_heatmap.shape[1] - 1) # Project cartesian = np.full((img_size, img_size), np.nan, dtype=np.float64) cartesian[fov_mask] = ra_heatmap[r_idx[fov_mask], theta_idx[fov_mask]] return cartesian def load_frames(folder_path): with open(os.path.join(folder_path, "frames.jsonl")) as f: for line in f: yield json.loads(line) def run_testbench(iter_name): # Setup directories debug_dir = project_root / 'Shenron_debug' logs_dir = debug_dir / 'logs' iter_dir = debug_dir / 'iterations' / iter_name if not logs_dir.exists(): print(f"[ERROR] Required base folder {logs_dir} not found!") return lidar_dir = logs_dir / 'lidar' if not lidar_dir.exists(): print(f"[ERROR] Required base folder {lidar_dir} not found!") return iter_dir.mkdir(parents=True, exist_ok=True) radar_types = ['awrl1432', 'radarbook'] print(f"\n======================================") print(f"SHENRON TESTBENCH ITERATION: {iter_name}") print(f"======================================") # 1. GENERATE SYNTHETIC DATA print("\n[Stage 1]: Processing Physics models...") models = {} for r_type in radar_types: try: print(f" -> Initializing {r_type} engine...") models[r_type] = ShenronRadarModel(radar_type=r_type) (iter_dir / r_type).mkdir(exist_ok=True) # Create Metrology folders met_base = iter_dir / r_type / "metrology" for sub in ["rd", "ra", "cfar"]: (met_base / sub).mkdir(parents=True, exist_ok=True) except Exception as e: print(f" -> [WARNING] Failed to init {r_type}: {e}") continue # Save physical axes once per radar type (same for every frame — config-derived) met_base = iter_dir / r_type / "metrology" np.save(met_base / "range_axis.npy", models[r_type].processor.rangeAxis) np.save(met_base / "angle_axis.npy", models[r_type].processor.angleAxis) lidar_files = sorted(list(lidar_dir.glob("*.npy"))) if args.frames and args.frames > 0: print(f" [INFO] Limiting to first {args.frames} frames as requested.") lidar_files = lidar_files[:args.frames] for lidar_file in tqdm.tqdm(lidar_files, desc=" Simulating Radars", unit="frame"): data = np.load(lidar_file) # Pad to [x, y, z, intensity, cos_inc_angle, obj, tag] if needed if data.shape[1] == 6: padded_data = np.zeros((data.shape[0], 7), dtype=np.float32) padded_data[:, 0:3] = data[:, 0:3] padded_data[:, 4:7] = data[:, 3:6] data = padded_data for r_type, model in models.items(): try: rich_pcd = model.process(data) out_path = iter_dir / r_type / lidar_file.name np.save(out_path, rich_pcd) # --- PHASES 1 & 3: Save Raw Metrology (.npy) --- met = model.get_last_metrology() if met: frame_name = lidar_file.stem # e.g., frame_000200 ra_map = met['ra_heatmap'] np.save(iter_dir / r_type / "metrology" / "rd" / f"{frame_name}.npy", met['rd_heatmap']) np.save(iter_dir / r_type / "metrology" / "ra" / f"{frame_name}.npy", ra_map) np.save(iter_dir / r_type / "metrology" / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) # --- SANITY CHECK: Azimuth Variance --- # If RA is working, energy should vary across azimuth bins. # If RA is broken (uniform rings), variance will be near zero. az_std = np.mean(np.std(ra_map, axis=1)) if az_std < 1e-12: tqdm.tqdm.write(f" [⚠️ WARNING] Frame {frame_name} ({r_type}): Azimuth variance is ZERO. Check Phase Preservation.") # Log Metrics metrics = model.get_signal_metrics() with open(iter_dir / r_type / "metrology" / "metrics.jsonl", "a") as mf: mf.write(json.dumps({"frame": frame_name, **metrics}) + "\n") except Exception as e: print(f"[ERROR] Frame {lidar_file.name} failed for {r_type}: {e}") # 2. GENERATE MCAP print("\n[Stage 2]: Weaving MCAP Comparison Package...") output_mcap = iter_dir / f"{iter_name}.mcap" with open(output_mcap, "wb") as f: writer = Writer(f) writer.start(profile="foxglove") # Register Schemas pose_schema_id = writer.register_schema(name="foxglove.Pose", encoding="jsonschema", data=json.dumps(FOXGLOVE_POSE_SCHEMA).encode()) camera_schema_id = writer.register_schema(name="foxglove.CompressedImage", encoding="jsonschema", data=json.dumps(FOXGLOVE_IMAGE_SCHEMA).encode()) lidar_schema_id = writer.register_schema(name="foxglove.PointCloud", encoding="jsonschema", data=json.dumps(FOXGLOVE_PCL_SCHEMA).encode()) # Register Channels channels = { 'camera': writer.register_channel(topic="/camera", message_encoding="json", schema_id=camera_schema_id), 'camera_tpp': writer.register_channel(topic="/camera_tpp", message_encoding="json", schema_id=camera_schema_id), 'lidar': writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id), 'native_radar': writer.register_channel(topic="/radar/native", message_encoding="json", schema_id=lidar_schema_id), 'ego_pose': writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id) } shenron_channels = {} metrology_channels = {} for r_type in radar_types: shenron_channels[r_type] = writer.register_channel(topic=f"/radar/{r_type}", message_encoding="json", schema_id=lidar_schema_id) # Register Metrology Channels metrology_channels[r_type] = { "rd": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_doppler", message_encoding="json", schema_id=camera_schema_id), "ra": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth", message_encoding="json", schema_id=camera_schema_id), "ra_dynamic": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth_dynamic", message_encoding="json", schema_id=camera_schema_id), "cfar": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/cfar_mask", message_encoding="json", schema_id=camera_schema_id), "telemetry": writer.register_channel(topic=f"/radar/{r_type}/metrics", message_encoding="json", schema_id=writer.register_schema(name="foxglove.Telemetry", encoding="jsonschema", data=json.dumps(FOXGLOVE_METRICS_SCHEMA).encode())) } try: frames_gen = load_frames(logs_dir) frames = list(frames_gen) if args.frames and args.frames > 0: frames = frames[:args.frames] except Exception as e: print(f"[ERROR] Could not load frames.jsonl from {logs_dir}: {e}") return # Pre-load physical axes & Heatmap Engines cached_axes = {} render_engines = {} for r_type in radar_types: range_ax_p = iter_dir / r_type / "metrology" / "range_axis.npy" angle_ax_p = iter_dir / r_type / "metrology" / "angle_axis.npy" if range_ax_p.exists() and angle_ax_p.exists(): cached_axes[r_type] = { 'range_axis': np.load(range_ax_p), 'angle_axis': np.load(angle_ax_p), } else: cached_axes[r_type] = None # Initialize the stateful Matplotlib renderers for extreme throughput f_cfg = models[r_type].radar_obj.f if r_type in models else 77e9 chirp_rep_cfg = models[r_type].radar_obj.chirp_rep if r_type in models else 3e-5 max_vel_cfg = (3e8 / f_cfg) / (4 * chirp_rep_cfg) max_r_cfg = cached_axes[r_type]['range_axis'][-1] if cached_axes[r_type] else 150 display_limit_cfg = 120.0 render_engines[r_type] = { 'rd': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='viridis', title=f'{r_type.upper()} Range-Doppler', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='bicubic'), 'cfar': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='plasma', title=f'{r_type.upper()} CFAR Noise Threshold', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='bicubic'), 'ra_static': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', vmin=-5, vmax=45, title=f'{r_type.upper()} Range-Azimuth (Absolute)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal'), 'ra_dyn': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', title=f'{r_type.upper()} Range-Azimuth (Dynamic)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal') } for frame in tqdm.tqdm(frames, desc=" Packaging Frames", unit="frame"): ts_ns = int(frame["timestamp"] * 1e9) ts_sec = ts_ns // 1_000_000_000 ts_nsec = ts_ns % 1_000_000_000 raw_pose = frame["ego_pose"] x, y, z = raw_pose["x"], -raw_pose["y"], raw_pose["z"] yaw_rad = -np.radians(raw_pose.get("yaw", 0)) ego_world_pose = {"position": {"x": x, "y": y, "z": z}, "orientation": {"x": 0.0, "y": 0.0, "z": float(np.sin(yaw_rad/2)), "w": float(np.cos(yaw_rad/2))}} identity_pose = {"position": {"x": 0.0, "y": 0.0, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} # POSE writer.add_message(channels['ego_pose'], log_time=ts_ns, data=json.dumps(ego_world_pose).encode(), publish_time=ts_ns) # CAMERA cam_path = logs_dir / "camera" / frame["camera"] if cam_path.exists(): with open(cam_path, "rb") as img_f: img_bytes = img_f.read() cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")} writer.add_message(channels['camera'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns) # CAMERA TPP if "camera_tpp" in frame: cam_tpp_path = logs_dir / "camera_tpp" / frame["camera_tpp"] if cam_tpp_path.exists(): with open(cam_tpp_path, "rb") as img_f: img_bytes = img_f.read() cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")} writer.add_message(channels['camera_tpp'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns) # LIDAR lidar_p = logs_dir / "lidar" / frame["lidar"] if lidar_p.exists(): points = np.load(lidar_p) # Robustness: Handle 6-column (Old) vs 7-column (Modern with Velocity) data if points.shape[1] == 6: # Pad to 7-column [x, y, z, vel, cos, obj, tag] # Note: obj and tag columns [4,5] are actually uint32 bitstreams padded = np.zeros((points.shape[0], 7), dtype=np.float32) padded[:, 0:3] = points[:, 0:3] padded[:, 4] = points[:, 3] # cos (pure float) padded[:, 5] = points[:, 4].view(np.uint32).astype(np.float32) # real object id padded[:, 6] = points[:, 5].view(np.uint32).astype(np.float32) # real semantic tag ros_points = padded else: ros_points = points.copy().astype(np.float32) # For newer 7-col data: [x,y,z,vel,cos,obj,tag] # We still need to fix the obj/tag bits at [5,6] ros_points[:, 5] = ros_points[:, 5].view(np.uint32).astype(np.float32) ros_points[:, 6] = ros_points[:, 6].view(np.uint32).astype(np.float32) ros_points[:, 1] = -ros_points[:, 1] # RHS conversion # MOUNT OFFSET: LiDAR is on the roof (Z=2.5) lidar_pose = {"position": {"x": 0.0, "y": 0.0, "z": 2.5}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} lidar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": lidar_pose, "point_stride": 28, "fields": [ {"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"cos_inc_angle","offset":16,"type":7}, {"name":"object_id","offset":20,"type":7}, {"name":"semantic_tag","offset":24,"type":7} ], "data": base64.b64encode(ros_points.tobytes()).decode("ascii") } writer.add_message(channels['lidar'], log_time=ts_ns, data=json.dumps(lidar_msg).encode(), publish_time=ts_ns) # NATIVE RADAR radar_p = logs_dir / "radar" / frame["radar"] if radar_p.exists(): r_data = np.load(radar_p) if r_data.size > 0: dist, az, alt, vel = r_data[:, 0], -r_data[:, 1], r_data[:, 2], r_data[:, 3] xr, yr, zr = dist*np.cos(az)*np.cos(alt), dist*np.sin(az)*np.cos(alt), dist*np.sin(alt) radar_points = np.stack([xr, yr, zr, vel], axis=1).astype(np.float32) # MOUNT OFFSET: Native Radar is on the front bumper (X=2.0, Z=1.0) radar_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} radar_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": radar_pose, "point_stride": 16, "fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}], "data": base64.b64encode(radar_points.tobytes()).decode("ascii") } writer.add_message(channels['native_radar'], log_time=ts_ns, data=json.dumps(radar_msg).encode(), publish_time=ts_ns) # SHENRON RADARS shenron_fname = f"frame_{int(frame['frame_id']):06d}.npy" for r_type in radar_types: s_path = iter_dir / r_type / shenron_fname if s_path.exists(): s_data = np.load(s_path) if s_data.size > 0: ros_shenron = s_data.copy().astype(np.float32) ros_shenron[:, 1] = -ros_shenron[:, 1] # Negate Y for ROS # MOUNT OFFSET: Shenron Radars use the same pose as native (X=2.0, Z=1.0) shenron_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}} shenron_msg = { "timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": shenron_pose, "point_stride": 20, "fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"magnitude","offset":16,"type":7}], "data": base64.b64encode(ros_shenron.tobytes()).decode("ascii") } writer.add_message(shenron_channels[r_type], log_time=ts_ns, data=json.dumps(shenron_msg).encode(), publish_time=ts_ns) # --- PHASE 2: Stream Metrology Visuals --- met_folder = iter_dir / r_type / "metrology" rd_p = met_folder / "rd" / shenron_fname ra_p = met_folder / "ra" / shenron_fname cf_p = met_folder / "cfar" / shenron_fname if rd_p.exists(): rd_data = np.load(rd_p) # Apply log conversion minus identical system gain offset to maintain -5 to 45 scaling # Simple 10*log10 - SYSTEM_GAIN_OFFSET rd_db = 10 * np.log10(np.clip(rd_data, 1e-9, None)) - 68.0 # Flip UD so Range 0 (ego) is at the bottom. Use Cached Renderer. b64 = render_engines[r_type]['rd'].render(np.flipud(rd_db)) if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["rd"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) if ra_p.exists(): ra_data = np.load(ra_p) axes = cached_axes.get(r_type) if axes is not None: # Apply full post-processing chain (log, R² compensation, clutter, normalize, smooth) ra_processed = postprocess_ra(ra_data, axes['range_axis'], smooth_sigma=0.0) # Disabled smoothing as per focus fix # Polar Sector BEV plot — geometrically accurate # Project using 512x512 resolution constrained entirely to the 120m boundary to avoid pixelation display_rng_limit = 120.0 bev_data = scan_convert_ra(ra_processed, axes['range_axis'], axes['angle_axis'], img_size=512, max_display_range=display_rng_limit) # 1. PRIMARY PLOT: Static fixed bounds for 1:1 magnitude tracking over time b64_static = render_engines[r_type]['ra_static'].render(bev_data) if b64_static: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64_static} writer.add_message(metrology_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) # 2. HIGHLIGHT PLOT: Dynamic bounds to track the peak signature without external thresholds b64_dynamic = render_engines[r_type]['ra_dyn'].render(bev_data) if b64_dynamic: msg_dyn = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64_dynamic} writer.add_message(metrology_channels[r_type]["ra_dynamic"], log_time=ts_ns, data=json.dumps(msg_dyn).encode(), publish_time=ts_ns) else: # Fallback: rectangular log plot (no axis info available) b64 = render_heatmap(np.log10(np.flipud(ra_data) + 1e-9), cmap='magma') if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) if cf_p.exists(): cf_data = np.load(cf_p) # Convert threshold power floor to pure threshold DB mask similar to RD cf_db = 10 * np.log10(np.clip(cf_data, 1e-9, None)) - 68.0 # Flip UD so Range 0 (ego) is at the bottom # Revert to dynamic (None) scaling so threshold logic is easily visible b64 = render_engines[r_type]['cfar'].render(np.flipud(cf_db)) if b64: msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64} writer.add_message(metrology_channels[r_type]["cfar"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns) writer.finish() print(f"\n[SUCCESS] Iteration packaged to: {output_mcap}") print(f"File size: {os.path.getsize(output_mcap)/1024/1024:.2f} MB") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Shenron Physics Iteration Testbench") parser.add_argument("--iter", required=True, help="Name of the current debug iteration (e.g., 01_baseline)") parser.add_argument("--frames", type=int, default=0, help="Number of frames to process (0 for all)") args = parser.parse_args() run_testbench(args.iter)