CARLA ? C-Shenron based Simualtor for Sensor data generation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

586 lines
33 KiB

import os
import sys
import numpy as np
import tqdm
from pathlib import Path
import json
import base64
import argparse
import io
from PIL import Image
from mcap.writer import Writer
# Official Foxglove JSON Schemas
FOXGLOVE_POSE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.Pose",
"title": "foxglove.Pose",
"type": "object",
"properties": {
"position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}},
"orientation": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}}
}
}
FOXGLOVE_SCENE_UPDATE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.SceneUpdate",
"title": "foxglove.SceneUpdate",
"type": "object",
"properties": {
"entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"frame_id": {"type": "string"},
"timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}},
"lines": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {"type": "integer"},
"pose": FOXGLOVE_POSE_SCHEMA,
"points": {
"type": "array",
"items": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}}
},
"thickness": {"type": "number"},
"color": {"type": "object", "properties": {"r": {"type": "number"}, "g": {"type": "number"}, "b": {"type": "number"}, "a": {"type": "number"}}}
}
}
}
}
}
}
}
}
# Add project root and ISOLATE paths
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))
sys.path.append(str(project_root / 'scripts' / 'ISOLATE'))
try:
from scripts.ISOLATE.model_wrapper import ShenronRadarModel
except ImportError as e:
print(f"Error: Failed to import ShenronRadarModel. Ensure scripts/ISOLATE/model_wrapper.py exists. ({e})")
sys.exit(1)
# Official Foxglove JSON Schemas
FOXGLOVE_POSE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.Pose",
"title": "foxglove.Pose",
"type": "object",
"properties": {
"position": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}},
"orientation": {"type": "object", "properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}}
}
}
FOXGLOVE_IMAGE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.CompressedImage",
"title": "foxglove.CompressedImage",
"type": "object",
"properties": {
"timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}},
"frame_id": {"type": "string"},
"data": {"type": "string", "contentEncoding": "base64"},
"format": {"type": "string"}
}
}
FOXGLOVE_PCL_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.PointCloud",
"title": "foxglove.PointCloud",
"type": "object",
"properties": {
"timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}},
"frame_id": {"type": "string"},
"pose": FOXGLOVE_POSE_SCHEMA,
"point_stride": {"type": "integer"},
"fields": {
"type": "array",
"items": {"type": "object", "properties": {"name": {"type": "string"}, "offset": {"type": "integer"}, "type": {"type": "integer"}}}
},
"data": {"type": "string", "contentEncoding": "base64"}
}
}
FOXGLOVE_METRICS_SCHEMA = {
"type": "object",
"properties": {
"timestamp": {"type": "object", "properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}},
"frame_id": {"type": "string"},
"peak_magnitude": {"type": "number"},
"avg_noise_floor": {"type": "number"},
"peak_snr_db": {"type": "number"},
"active_bins": {"type": "number"},
"cfar_target_count": {"type": "number"},
"farthest_target_m": {"type": "number"},
"closest_target_m": {"type": "number"},
"mean_absolute_doppler": {"type": "number"},
"doppler_variance": {"type": "number"},
"dynamic_range_db": {"type": "number"},
"ego_vicinity_power": {"type": "number"},
"clutter_ratio": {"type": "number"},
"signal_to_clutter_ratio_db": {"type": "number"},
"azimuth_variance": {"type": "number"}
}
}
from sim_radar_utils.plots import render_heatmap, FastHeatmapEngine, postprocess_ra, scan_convert_ra
def load_frames(folder_path):
with open(os.path.join(folder_path, "frames.jsonl")) as f:
for line in f:
yield json.loads(line)
def run_testbench(iter_name):
# Setup directories
debug_dir = project_root / 'Shenron_debug'
logs_dir = debug_dir / 'logs'
iter_dir = debug_dir / 'iterations' / iter_name
if not logs_dir.exists():
print(f"[ERROR] Required base folder {logs_dir} not found!")
return
lidar_dir = logs_dir / 'lidar'
if not lidar_dir.exists():
print(f"[ERROR] Required base folder {lidar_dir} not found!")
return
iter_dir.mkdir(parents=True, exist_ok=True)
radar_types = ['awrl1432', 'radarbook']
print(f"\n======================================")
print(f"SHENRON TESTBENCH ITERATION: {iter_name}")
print(f"======================================")
# 1. GENERATE SYNTHETIC DATA
print("\n[Stage 1]: Processing Physics models...")
models = {}
for r_type in radar_types:
try:
print(f" -> Initializing {r_type} engine...")
models[r_type] = ShenronRadarModel(radar_type=r_type)
(iter_dir / r_type).mkdir(exist_ok=True)
# Create Metrology folders
met_base = iter_dir / r_type / "metrology"
for sub in ["rd", "ra", "cfar"]:
(met_base / sub).mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f" -> [WARNING] Failed to init {r_type}: {e}")
continue
# Save physical axes once per radar type (same for every frame — config-derived)
met_base = iter_dir / r_type / "metrology"
np.save(met_base / "range_axis.npy", models[r_type].processor.rangeAxis)
np.save(met_base / "angle_axis.npy", models[r_type].processor.angleAxis)
lidar_files = sorted(list(lidar_dir.glob("*.npy")))
if args.frames and args.frames > 0:
print(f" [INFO] Limiting to first {args.frames} frames as requested.")
lidar_files = lidar_files[:args.frames]
for lidar_file in tqdm.tqdm(lidar_files, desc=" Simulating Radars", unit="frame"):
data = np.load(lidar_file)
# Pad to [x, y, z, intensity, cos_inc_angle, obj, tag] if needed
if data.shape[1] == 6:
padded_data = np.zeros((data.shape[0], 7), dtype=np.float32)
padded_data[:, 0:3] = data[:, 0:3]
padded_data[:, 4:7] = data[:, 3:6]
data = padded_data
for r_type, model in models.items():
try:
rich_pcd = model.process(data)
out_path = iter_dir / r_type / lidar_file.name
np.save(out_path, rich_pcd)
# --- PHASES 1 & 3: Save Raw Metrology (.npy) ---
met = model.get_last_metrology()
if met:
frame_name = lidar_file.stem # e.g., frame_000200
ra_map = met['ra_heatmap']
np.save(iter_dir / r_type / "metrology" / "rd" / f"{frame_name}.npy", met['rd_heatmap'])
np.save(iter_dir / r_type / "metrology" / "ra" / f"{frame_name}.npy", ra_map)
np.save(iter_dir / r_type / "metrology" / "cfar" / f"{frame_name}.npy", met['threshold_matrix'])
# --- SANITY CHECK: Azimuth Variance ---
# If RA is working, energy should vary across azimuth bins.
# If RA is broken (uniform rings), variance will be near zero.
az_std = np.mean(np.std(ra_map, axis=1))
if az_std < 1e-12:
tqdm.tqdm.write(f" [⚠️ WARNING] Frame {frame_name} ({r_type}): Azimuth variance is ZERO. Check Phase Preservation.")
# Log Metrics
metrics = model.get_signal_metrics()
with open(iter_dir / r_type / "metrology" / "metrics.jsonl", "a") as mf:
mf.write(json.dumps({"frame": frame_name, **metrics}) + "\n")
except Exception as e:
print(f"[ERROR] Frame {lidar_file.name} failed for {r_type}: {e}")
# 2. GENERATE MCAP
print("\n[Stage 2]: Weaving MCAP Comparison Package...")
output_mcap = iter_dir / f"{iter_name}.mcap"
with open(output_mcap, "wb") as f:
writer = Writer(f)
writer.start(profile="foxglove")
# Register Schemas
pose_schema_id = writer.register_schema(name="foxglove.Pose", encoding="jsonschema", data=json.dumps(FOXGLOVE_POSE_SCHEMA).encode())
camera_schema_id = writer.register_schema(name="foxglove.CompressedImage", encoding="jsonschema", data=json.dumps(FOXGLOVE_IMAGE_SCHEMA).encode())
lidar_schema_id = writer.register_schema(name="foxglove.PointCloud", encoding="jsonschema", data=json.dumps(FOXGLOVE_PCL_SCHEMA).encode())
telemetry_schema_id = writer.register_schema(name="TelemetryMetrics", encoding="jsonschema", data=json.dumps(FOXGLOVE_METRICS_SCHEMA).encode())
scene_update_schema_id = writer.register_schema(name="foxglove.SceneUpdate", encoding="jsonschema", data=json.dumps(FOXGLOVE_SCENE_UPDATE_SCHEMA).encode())
# Register Channels
channels = {
'camera': writer.register_channel(topic="/camera", message_encoding="json", schema_id=camera_schema_id),
'camera_tpp': writer.register_channel(topic="/camera_tpp", message_encoding="json", schema_id=camera_schema_id),
'lidar': writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id),
'native_radar': writer.register_channel(topic="/radar/native", message_encoding="json", schema_id=lidar_schema_id),
'ego_pose': writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id)
}
shenron_channels = {}
metrology_channels = {}
for r_type in radar_types:
shenron_channels[r_type] = writer.register_channel(topic=f"/radar/{r_type}", message_encoding="json", schema_id=lidar_schema_id)
# Register Metrology Channels
metrology_channels[r_type] = {
"rd": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_doppler", message_encoding="json", schema_id=camera_schema_id),
"ra": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth", message_encoding="json", schema_id=camera_schema_id),
"ra_dynamic": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/range_azimuth_dynamic", message_encoding="json", schema_id=camera_schema_id),
"cfar": writer.register_channel(topic=f"/radar/{r_type}/heatmaps/cfar_mask", message_encoding="json", schema_id=camera_schema_id),
"telemetry": writer.register_channel(topic=f"/radar/{r_type}/metrics", message_encoding="json", schema_id=telemetry_schema_id),
"frustum": writer.register_channel(topic=f"/radar/{r_type}/fov_frustum", message_encoding="json", schema_id=scene_update_schema_id)
}
try:
frames_gen = load_frames(logs_dir)
frames = list(frames_gen)
if args.frames and args.frames > 0:
frames = frames[:args.frames]
except Exception as e:
print(f"[ERROR] Could not load frames.jsonl from {logs_dir}: {e}")
return
# Pre-load telemetry maps to avoid doing O(N^2) disk reads
telemetry_cache = {}
cached_axes = {}
render_engines = {}
for r_type in radar_types:
metrics_path = iter_dir / r_type / "metrology" / "metrics.jsonl"
telemetry_cache[r_type] = {}
if metrics_path.exists():
with open(metrics_path, "r") as mf:
for line in mf:
ld = json.loads(line)
if "frame" in ld:
telemetry_cache[r_type][ld["frame"]] = {k: v for k, v in ld.items() if k != "frame"}
range_ax_p = iter_dir / r_type / "metrology" / "range_axis.npy"
angle_ax_p = iter_dir / r_type / "metrology" / "angle_axis.npy"
if range_ax_p.exists() and angle_ax_p.exists():
cached_axes[r_type] = {
'range_axis': np.load(range_ax_p),
'angle_axis': np.load(angle_ax_p),
}
else:
cached_axes[r_type] = None
# Initialize the stateful Matplotlib renderers for extreme throughput
f_cfg = models[r_type].radar_obj.f if r_type in models else 77e9
chirp_rep_cfg = models[r_type].radar_obj.chirp_rep if r_type in models else 3e-5
max_vel_cfg = (3e8 / f_cfg) / (4 * chirp_rep_cfg)
max_r_cfg = cached_axes[r_type]['range_axis'][-1] if cached_axes[r_type] else 150
display_limit_cfg = 120.0
render_engines[r_type] = {
'rd': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='viridis', title=f'{r_type.upper()} Range-Doppler', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='bicubic'),
'cfar': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='plasma', title=f'{r_type.upper()} CFAR Noise Threshold', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='bicubic'),
'ra_static': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', vmin=-5, vmax=45, title=f'{r_type.upper()} Range-Azimuth (Absolute)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal'),
'ra_dyn': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', title=f'{r_type.upper()} Range-Azimuth (Dynamic)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal')
}
for frame in tqdm.tqdm(frames, desc=" Packaging Frames", unit="frame"):
ts_ns = int(frame["timestamp"] * 1e9)
ts_sec = ts_ns // 1_000_000_000
ts_nsec = ts_ns % 1_000_000_000
raw_pose = frame["ego_pose"]
x, y, z = raw_pose["x"], -raw_pose["y"], raw_pose["z"]
yaw_rad = -np.radians(raw_pose.get("yaw", 0))
ego_world_pose = {"position": {"x": x, "y": y, "z": z}, "orientation": {"x": 0.0, "y": 0.0, "z": float(np.sin(yaw_rad/2)), "w": float(np.cos(yaw_rad/2))}}
identity_pose = {"position": {"x": 0.0, "y": 0.0, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
# POSE
writer.add_message(channels['ego_pose'], log_time=ts_ns, data=json.dumps(ego_world_pose).encode(), publish_time=ts_ns)
# CAMERA
cam_path = logs_dir / "camera" / frame["camera"]
if cam_path.exists():
with open(cam_path, "rb") as img_f: img_bytes = img_f.read()
cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")}
writer.add_message(channels['camera'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns)
# CAMERA TPP
if "camera_tpp" in frame:
cam_tpp_path = logs_dir / "camera_tpp" / frame["camera_tpp"]
if cam_tpp_path.exists():
with open(cam_tpp_path, "rb") as img_f: img_bytes = img_f.read()
cam_msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": base64.b64encode(img_bytes).decode("ascii")}
writer.add_message(channels['camera_tpp'], log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns)
# LIDAR
lidar_p = logs_dir / "lidar" / frame["lidar"]
if lidar_p.exists():
points = np.load(lidar_p)
# Robustness: Handle 6-column (Old) vs 7-column (Modern with Velocity) data
if points.shape[1] == 6:
# Pad to 7-column [x, y, z, vel, cos, obj, tag]
# Note: obj and tag columns [4,5] are actually uint32 bitstreams
padded = np.zeros((points.shape[0], 7), dtype=np.float32)
padded[:, 0:3] = points[:, 0:3]
padded[:, 4] = points[:, 3] # cos (pure float)
padded[:, 5] = points[:, 4].view(np.uint32).astype(np.float32) # real object id
padded[:, 6] = points[:, 5].view(np.uint32).astype(np.float32) # real semantic tag
ros_points = padded
else:
ros_points = points.copy().astype(np.float32)
# For newer 7-col data: [x,y,z,vel,cos,obj,tag]
# We still need to fix the obj/tag bits at [5,6]
ros_points[:, 5] = ros_points[:, 5].view(np.uint32).astype(np.float32)
ros_points[:, 6] = ros_points[:, 6].view(np.uint32).astype(np.float32)
ros_points[:, 1] = -ros_points[:, 1] # RHS conversion
# MOUNT OFFSET: LiDAR is on the roof (Z=2.5)
lidar_pose = {"position": {"x": 0.0, "y": 0.0, "z": 2.5}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
lidar_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": lidar_pose, "point_stride": 28,
"fields": [
{"name":"x","offset":0,"type":7},
{"name":"y","offset":4,"type":7},
{"name":"z","offset":8,"type":7},
{"name":"velocity","offset":12,"type":7},
{"name":"cos_inc_angle","offset":16,"type":7},
{"name":"object_id","offset":20,"type":7},
{"name":"semantic_tag","offset":24,"type":7}
],
"data": base64.b64encode(ros_points.tobytes()).decode("ascii")
}
writer.add_message(channels['lidar'], log_time=ts_ns, data=json.dumps(lidar_msg).encode(), publish_time=ts_ns)
# NATIVE RADAR
radar_p = logs_dir / "radar" / frame["radar"]
if radar_p.exists():
r_data = np.load(radar_p)
if r_data.size > 0:
dist, az, alt, vel = r_data[:, 0], -r_data[:, 1], r_data[:, 2], r_data[:, 3]
xr, yr, zr = dist*np.cos(az)*np.cos(alt), dist*np.sin(az)*np.cos(alt), dist*np.sin(alt)
radar_points = np.stack([xr, yr, zr, vel], axis=1).astype(np.float32)
# MOUNT OFFSET: Native Radar is on the front bumper (X=2.0, Z=1.0)
radar_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
radar_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": radar_pose, "point_stride": 16,
"fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}],
"data": base64.b64encode(radar_points.tobytes()).decode("ascii")
}
writer.add_message(channels['native_radar'], log_time=ts_ns, data=json.dumps(radar_msg).encode(), publish_time=ts_ns)
# SHENRON RADARS
shenron_fname = f"frame_{int(frame['frame_id']):06d}.npy"
for r_type in radar_types:
s_path = iter_dir / r_type / shenron_fname
if s_path.exists():
s_data = np.load(s_path)
if s_data.size > 0:
ros_shenron = s_data.copy().astype(np.float32)
ros_shenron[:, 1] = -ros_shenron[:, 1] # Negate Y for ROS
# MOUNT OFFSET: Shenron Radars use the same pose as native (X=2.0, Z=1.0)
shenron_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
shenron_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "pose": shenron_pose, "point_stride": 20,
"fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}, {"name":"velocity","offset":12,"type":7}, {"name":"magnitude","offset":16,"type":7}],
"data": base64.b64encode(ros_shenron.tobytes()).decode("ascii")
}
writer.add_message(shenron_channels[r_type], log_time=ts_ns, data=json.dumps(shenron_msg).encode(), publish_time=ts_ns)
# --- PHASE 2: Stream Metrology Visuals ---
met_folder = iter_dir / r_type / "metrology"
rd_p = met_folder / "rd" / shenron_fname
ra_p = met_folder / "ra" / shenron_fname
cf_p = met_folder / "cfar" / shenron_fname
if rd_p.exists():
rd_data = np.load(rd_p)
# Apply log conversion minus identical system gain offset to maintain -5 to 45 scaling
# Simple 10*log10 - SYSTEM_GAIN_OFFSET
rd_db = 10 * np.log10(np.clip(rd_data, 1e-9, None)) - 68.0
# Flip UD so Range 0 (ego) is at the bottom. Use Cached Renderer.
b64 = render_engines[r_type]['rd'].render(np.flipud(rd_db))
if b64:
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}
writer.add_message(metrology_channels[r_type]["rd"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns)
if ra_p.exists():
ra_data = np.load(ra_p)
axes = cached_axes.get(r_type)
if axes is not None:
# Apply full post-processing chain (log, R² compensation, clutter, normalize, smooth)
ra_processed = postprocess_ra(ra_data, axes['range_axis'], smooth_sigma=0.0) # Disabled smoothing as per focus fix
# Polar Sector BEV plot — geometrically accurate
# Project using 512x512 resolution constrained entirely to the 120m boundary to avoid pixelation
display_rng_limit = 120.0
bev_data = scan_convert_ra(ra_processed, axes['range_axis'], axes['angle_axis'], img_size=512, max_display_range=display_rng_limit)
# 1. PRIMARY PLOT: Static fixed bounds for 1:1 magnitude tracking over time
b64_static = render_engines[r_type]['ra_static'].render(bev_data)
if b64_static:
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64_static}
writer.add_message(metrology_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns)
# 2. HIGHLIGHT PLOT: Dynamic bounds to track the peak signature without external thresholds
b64_dynamic = render_engines[r_type]['ra_dyn'].render(bev_data)
if b64_dynamic:
msg_dyn = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64_dynamic}
writer.add_message(metrology_channels[r_type]["ra_dynamic"], log_time=ts_ns, data=json.dumps(msg_dyn).encode(), publish_time=ts_ns)
else:
# Fallback: rectangular log plot (no axis info available)
b64 = render_heatmap(np.log10(np.flipud(ra_data) + 1e-9), cmap='magma')
if b64:
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}
writer.add_message(metrology_channels[r_type]["ra"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns)
if cf_p.exists():
cf_data = np.load(cf_p)
# Convert threshold power floor to pure threshold DB mask similar to RD
cf_db = 10 * np.log10(np.clip(cf_data, 1e-9, None)) - 68.0
# Flip UD so Range 0 (ego) is at the bottom
# Revert to dynamic (None) scaling so threshold logic is easily visible
b64 = render_engines[r_type]['cfar'].render(np.flipud(cf_db))
if b64:
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}
writer.add_message(metrology_channels[r_type]["cfar"], log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns)
# --- PHASE 3: Telemetry Stream ---
telemetry_row = telemetry_cache[r_type].get(shenron_fname.replace('.npy', ''))
if telemetry_row:
# Flattening message natively so Foxglove Plot panel can instantly locate numbers dynamically map /radar/.../metrics.peak_magnitude
telemetry_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle",
**telemetry_row
}
writer.add_message(metrology_channels[r_type]["telemetry"], log_time=ts_ns, data=json.dumps(telemetry_msg).encode(), publish_time=ts_ns)
# --- PHASE 4: 3D Hardware FOV Frustum (Audited Geometry & Visibility) ---
axes = cached_axes.get(r_type)
if axes is not None:
# 1. Base Physical Constraints (Use Hardware Specs for the "Box")
# Some range axes go to FFT-limit, we cap the visual frustum to a reasonable effective range
max_r = 150.0
# Hardware Azimuth/Elevation typicals
az_limit_deg = 75.0
el_limit_deg = 15.0
if r_type == "awrl1432":
az_limit_deg = 75.0
el_limit_deg = 20.0
elif r_type == "radarbook":
az_limit_deg = 60.0
el_limit_deg = 10.0
# Convert to Radians
az_rad = np.radians(az_limit_deg)
el_rad = np.radians(el_limit_deg)
# 2. Vertex Calculation (Planar Far-Face Project at X = max_r)
# This ensures the frustum extends the full distance forward on the boresight.
# Origin is (0,0,0) in the Entity frame
# Corners (TL, TR, BL, BR)
# Carla LHS: X=Forward, Y=Right, Z=Up
c = [
[0.0, 0.0, 0.0], # V0: Origin
[max_r, -max_r * np.tan(az_rad), max_r * np.tan(el_rad)], # V1: TL (NegAz, PosEl)
[max_r, max_r * np.tan(az_rad), max_r * np.tan(el_rad)], # V2: TR (PosAz, PosEl)
[max_r, -max_r * np.tan(az_rad), -max_r * np.tan(el_rad)], # V3: BL (NegAz, NegEl)
[max_r, max_r * np.tan(az_rad), -max_r * np.tan(el_rad)], # V4: BR (PosAz, NegEl)
]
# RHS Conversion: [X, -Y, Z]
rhs = [{"x": float(v[0]), "y": float(-v[1]), "z": float(v[2])} for v in c]
# 3. One-Time Verification Log
if frame == frames[0]:
tqdm.tqdm.write(f"\n [AUDIT] {r_type.upper()} Frustum Calculation:")
tqdm.tqdm.write(f" - Spec: Az ±{az_limit_deg}°, El ±{el_limit_deg}°, Range {max_r}m")
tqdm.tqdm.write(f" - V1 (RHS TL): X={rhs[1]['x']:.1f}, Y={rhs[1]['y']:.1f}, Z={rhs[1]['z']:.1f}")
tqdm.tqdm.write(f" - range_axis[0, -1]: {axes['range_axis'][0]:.1f}, {axes['range_axis'][-1]:.1f}")
tqdm.tqdm.write(f" - angle_axis (rad) [0, -1]: {axes['angle_axis'][0]:.3f}, {axes['angle_axis'][-1]:.3f}")
# 4. Connect primitives (LINE_LIST pairs)
line_points = [
rhs[0], rhs[1], rhs[0], rhs[2], rhs[0], rhs[3], rhs[0], rhs[4], # Beams
rhs[1], rhs[2], rhs[2], rhs[4], rhs[4], rhs[3], rhs[3], rhs[1] # Face
]
color_map = {
"awrl1432": {"r": 1.0, "g": 0.5, "b": 0.0, "a": 1.0}, # Solid Orange
"radarbook": {"r": 0.0, "g": 1.0, "b": 1.0, "a": 1.0} # Solid Cyan
}
f_color = color_map.get(r_type, {"r": 1.0, "g": 1.0, "b": 1.0, "a": 1.0})
frustum_msg = {
"entities": [{
"id": f"radar_fov_{r_type}",
"frame_id": "ego_vehicle",
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"lines": [{
"type": 1, # LINE_LIST
"pose": {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}},
"points": line_points,
"thickness": 0.5,
"color": f_color
}]
}]
}
writer.add_message(metrology_channels[r_type]["frustum"], log_time=ts_ns, data=json.dumps(frustum_msg).encode(), publish_time=ts_ns)
writer.finish()
print(f"\n[SUCCESS] Iteration packaged to: {output_mcap}")
print(f"File size: {os.path.getsize(output_mcap)/1024/1024:.2f} MB")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Shenron Physics Iteration Testbench")
parser.add_argument("--iter", required=True, help="Name of the current debug iteration (e.g., 01_baseline)")
parser.add_argument("--frames", type=int, default=0, help="Number of frames to process (0 for all)")
args = parser.parse_args()
run_testbench(args.iter)