CARLA ? C-Shenron based Simualtor for Sensor data generation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

414 lines
20 KiB

import os
import json
import base64
import io
import numpy as np
from PIL import Image
import matplotlib.cm as cm
from mcap.writer import Writer
# Official Foxglove JSON Schemas
FOXGLOVE_POSE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.Pose",
"title": "foxglove.Pose",
"type": "object",
"properties": {
"position": {
"type": "object",
"properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}
},
"orientation": {
"type": "object",
"properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}
}
}
}
FOXGLOVE_IMAGE_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.CompressedImage",
"title": "foxglove.CompressedImage",
"type": "object",
"properties": {
"timestamp": {
"type": "object",
"properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}
},
"frame_id": {"type": "string"},
"data": {"type": "string", "contentEncoding": "base64"},
"format": {"type": "string"}
}
}
FOXGLOVE_PCL_SCHEMA = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "foxglove.PointCloud",
"title": "foxglove.PointCloud",
"type": "object",
"properties": {
"timestamp": {
"type": "object",
"properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}
},
"frame_id": {"type": "string"},
"pose": FOXGLOVE_POSE_SCHEMA,
"point_stride": {"type": "integer"},
"fields": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"offset": {"type": "integer"},
"type": {"type": "integer"}
}
}
},
"data": {"type": "string", "contentEncoding": "base64"}
}
}
def render_heatmap(data, cmap='viridis', vmin=None, vmax=None):
"""Convert 2D array to colormapped B64 PNG with guide-compliant normalization."""
# Step 6: Normalization [0, 1]
# If vmin/vmax are provided, use fixed scaling to preserve physical intensity.
# Otherwise, fall back to relative normalization (relative to current frame).
if vmin is not None and vmax is not None:
norm = np.clip((data - vmin) / (vmax - vmin), 0, 1)
else:
d_min, d_max = np.min(data), np.max(data)
if d_max > d_min:
norm = (data - d_min) / (d_max - d_min)
else:
norm = np.zeros_like(data)
# Step 7: Apply Radar-style Colormap (Blue-style)
# Using matplotlib.cm API for consistency with this script's imports
mapper = cm.get_cmap(cmap)
rgba = mapper(norm) # (H, W, 4)
rgb = (rgba[:, :, :3] * 255).astype(np.uint8)
img = Image.fromarray(rgb)
buffered = io.BytesIO()
img.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode("ascii")
def postprocess_ra(ra_heatmap, range_axis, smooth_sigma=1.0):
"""
Refined RA post-processing pipeline for Physical Realism.
Restores the natural power decay by removing per-range normalization.
"""
# 1. Clutter removal (subtract per-range-bin mean to suppress static ground)
clutter = np.mean(ra_heatmap, axis=1, keepdims=True)
ra = ra_heatmap - (0.8 * clutter) # Subtract context-aware mean
# 2. Physics-based dynamic range compression (Linear -> Log)
# Conversion to dB scale with System Gain Calibration (calculated from Iter 28)
SYSTEM_GAIN_OFFSET = 68.0
ra_db = 10 * np.log10(ra) - SYSTEM_GAIN_OFFSET
# 3. Fixed dynamic range clipping (-5 to 45 dB)
# This ensures consistent contrast and preserves physical R^-4 decay
ra_db = np.clip(ra_db, -5, 45)
# 4. Optional Gaussian smoothing to reduce speckle
if smooth_sigma > 0:
from scipy.ndimage import gaussian_filter
ra_db = gaussian_filter(ra_db, sigma=smooth_sigma)
return ra_db
def scan_convert_ra(ra_heatmap, range_axis, angle_axis, img_size=512):
"""
Polar-to-Cartesian scan conversion following FIG / Guide logic.
Converts RA (Range, Angle) polar data into a 120° Fan-shaped Sector plot.
"""
max_range = range_axis[-1]
theta_min = angle_axis[0]
theta_max = angle_axis[-1]
# 4. Create Cartesian Grid (X: lateral, Y: forward)
# Origin (0,0) will be at bottom-center of the 512x512 image
x = np.linspace(-max_range, max_range, img_size)
y = np.linspace(max_range, 0, img_size) # Far to Near
X, Y = np.meshgrid(x, y)
# 4. Convert to Polar Coordinates
R_query = np.sqrt(X**2 + Y**2)
Theta_query = np.arctan2(X, Y)
# 5. Mask Valid Radar FOV (120-degree sector)
fov_mask = (Theta_query >= theta_min) & (Theta_query <= theta_max) & (R_query <= max_range)
# 5. Map RA Heatmap to Cartesian Grid
# Calculate fractional indices
r_idx = np.clip(((R_query / max_range) * (ra_heatmap.shape[0] - 1)).astype(int), 0, ra_heatmap.shape[0] - 1)
# theta index: Shift by theta_min to align 0..120 range
theta_range = theta_max - theta_min
theta_idx = np.clip(((Theta_query - theta_min) / theta_range * (ra_heatmap.shape[1] - 1)).astype(int), 0, ra_heatmap.shape[1] - 1)
# Project
cartesian = np.full((img_size, img_size), np.min(ra_heatmap), dtype=np.float64)
cartesian[fov_mask] = ra_heatmap[r_idx[fov_mask], theta_idx[fov_mask]]
return cartesian
def load_frames(folder_path):
with open(os.path.join(folder_path, "frames.jsonl")) as f:
for line in f:
yield json.loads(line)
def convert_folder(folder_path):
folder_name = os.path.basename(folder_path)
output_path = os.path.join(folder_path, f"{folder_name}.mcap")
if os.path.exists(output_path):
print(f"\n>>> Skipping folder (MCAP already exists): {folder_name}", flush=True)
return
print(f"\n>>> Processing folder: {folder_name}", flush=True)
print(f"Target MCAP: {output_path}", flush=True)
with open(output_path, "wb") as f:
writer = Writer(f)
writer.start(profile="foxglove")
# Register Schemas
pose_schema_id = writer.register_schema(name="foxglove.Pose", encoding="jsonschema", data=json.dumps(FOXGLOVE_POSE_SCHEMA).encode())
camera_schema_id = writer.register_schema(name="foxglove.CompressedImage", encoding="jsonschema", data=json.dumps(FOXGLOVE_IMAGE_SCHEMA).encode())
lidar_schema_id = writer.register_schema(name="foxglove.PointCloud", encoding="jsonschema", data=json.dumps(FOXGLOVE_PCL_SCHEMA).encode())
# Register Channels
camera_channel_id = writer.register_channel(topic="/camera", message_encoding="json", schema_id=camera_schema_id)
camera_tpp_channel_id = writer.register_channel(topic="/camera_tpp", message_encoding="json", schema_id=camera_schema_id)
lidar_channel_id = writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id)
pose_channel_id = writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id)
radar_channel_id = writer.register_channel(topic="/radar", message_encoding="json", schema_id=lidar_schema_id)
shenron_channel_id = writer.register_channel(topic="/radar/shenron", message_encoding="json", schema_id=lidar_schema_id)
# Register Metrology Channels
met_ra_id = writer.register_channel(topic="/radar/shenron/heatmaps/range_azimuth", message_encoding="json", schema_id=camera_schema_id)
met_rd_id = writer.register_channel(topic="/radar/shenron/heatmaps/range_doppler", message_encoding="json", schema_id=camera_schema_id)
met_cfar_id = writer.register_channel(topic="/radar/shenron/heatmaps/cfar_mask", message_encoding="json", schema_id=camera_schema_id)
# Pre-load axes for scan conversion if they exist
met_dir = os.path.join(folder_path, "metrology")
range_ax = None
angle_ax = None
if os.path.exists(met_dir):
r_ax_p = os.path.join(met_dir, "range_axis.npy")
a_ax_p = os.path.join(met_dir, "angle_axis.npy")
if os.path.exists(r_ax_p) and os.path.exists(a_ax_p):
range_ax = np.load(r_ax_p)
angle_ax = np.load(a_ax_p)
print(" - Loaded physical axes for high-fidelity visualization.")
frame_count = 0
for frame in load_frames(folder_path):
ts_ns = int(frame["timestamp"] * 1e9)
ts_sec = ts_ns // 1_000_000_000
ts_nsec = ts_ns % 1_000_000_000
raw_pose = frame["ego_pose"]
x, y, z = raw_pose["x"], -raw_pose["y"], raw_pose["z"]
yaw_rad = -np.radians(raw_pose.get("yaw", 0))
ego_world_pose = {
"position": {"x": x, "y": y, "z": z},
"orientation": {"x": 0.0, "y": 0.0, "z": float(np.sin(yaw_rad/2)), "w": float(np.cos(yaw_rad/2))}
}
identity_pose = {"position": {"x": 0.0, "y": 0.0, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
# CAMERA
camera_path = os.path.join(folder_path, "camera", frame["camera"])
if os.path.exists(camera_path):
with open(camera_path, "rb") as img_f:
img_bytes = img_f.read()
cam_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle",
"format": "png" if frame["camera"].endswith(".png") else "jpeg",
"data": base64.b64encode(img_bytes).decode("ascii")
}
writer.add_message(camera_channel_id, log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns)
# CAMERA (TPP)
if "camera_tpp" in frame:
camera_tpp_path = os.path.join(folder_path, "camera_tpp", frame["camera_tpp"])
if os.path.exists(camera_tpp_path):
with open(camera_tpp_path, "rb") as img_f:
img_bytes = img_f.read()
cam_tpp_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle",
"format": "png" if frame["camera_tpp"].endswith(".png") else "jpeg",
"data": base64.b64encode(img_bytes).decode("ascii")
}
writer.add_message(camera_tpp_channel_id, log_time=ts_ns, data=json.dumps(cam_tpp_msg).encode(), publish_time=ts_ns)
# EGO POSE
writer.add_message(pose_channel_id, log_time=ts_ns, data=json.dumps(ego_world_pose).encode(), publish_time=ts_ns)
# LIDAR
lidar_path = os.path.join(folder_path, "lidar", frame["lidar"])
if os.path.exists(lidar_path):
points = np.load(lidar_path)
# Robustness handle 6 vs 7 cols
if points.shape[1] == 6:
# Pad to [x, y, z, velocity, cos, obj, tag]
padded = np.zeros((points.shape[0], 7), dtype=np.float32)
padded[:, 0:3] = points[:, 0:3]
padded[:, 4] = points[:, 3] # cos
padded[:, 5] = points[:, 4].view(np.uint32).astype(np.float32) # obj
padded[:, 6] = points[:, 5].view(np.uint32).astype(np.float32) # tag
ros_points = padded
else:
ros_points = points.copy().astype(np.float32)
# Correct bits for [x,y,z,vel,cos,obj,tag]
ros_points[:, 5] = ros_points[:, 5].view(np.uint32).astype(np.float32)
ros_points[:, 6] = ros_points[:, 6].view(np.uint32).astype(np.float32)
ros_points[:, 1] = -ros_points[:, 1] # RHS conversion
# MOUNT OFFSET: LiDAR is on the roof (Z=2.5)
lidar_pose = {"position": {"x": 0.0, "y": 0.0, "z": 2.5}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
lidar_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle", "pose": lidar_pose, "point_stride": 28,
"fields": [
{"name":"x","offset":0,"type":7},
{"name":"y","offset":4,"type":7},
{"name":"z","offset":8,"type":7},
{"name":"velocity","offset":12,"type":7},
{"name":"cos_inc_angle","offset":16,"type":7},
{"name":"object_id","offset":20,"type":7},
{"name":"semantic_tag","offset":24,"type":7}
],
"data": base64.b64encode(ros_points.tobytes()).decode("ascii")
}
writer.add_message(lidar_channel_id, log_time=ts_ns, data=json.dumps(lidar_msg).encode(), publish_time=ts_ns)
# RADAR
radar_path = os.path.join(folder_path, "radar", frame["radar"])
if os.path.exists(radar_path):
r_data = np.load(radar_path)
if r_data.size > 0:
# r_data = [depth, azimuth, altitude, velocity]
# We negate azimuth to convert from CARLA (Right-handed for Y)
# note: CARLA is actually LHS (X-fwd, Y-right, Z-up)
# ROS is RHS (X-fwd, Y-left, Z-up) -> Negating Y converts it.
dist, az, alt, vel = r_data[:, 0], -r_data[:, 1], r_data[:, 2], r_data[:, 3]
xr, yr, zr = dist*np.cos(az)*np.cos(alt), dist*np.sin(az)*np.cos(alt), dist*np.sin(alt)
# Stack X, Y, Z, and Velocity (4 floats = 16 bytes stride)
radar_points = np.stack([xr, yr, zr, vel], axis=1).astype(np.float32)
# MOUNT OFFSET: Radar is on the bumper (X=2.0, Z=1.0)
radar_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
radar_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle",
"pose": radar_pose,
"point_stride": 16,
"fields": [
{"name":"x","offset":0,"type":7},
{"name":"y","offset":4,"type":7},
{"name":"z","offset":8,"type":7},
{"name":"velocity","offset":12,"type":7}
],
"data": base64.b64encode(radar_points.tobytes()).decode("ascii")
}
writer.add_message(radar_channel_id, log_time=ts_ns, data=json.dumps(radar_msg).encode(), publish_time=ts_ns)
# SHENRON RADAR
shenron_file = f"frame_{int(frame['frame_id']):06d}.npy"
shenron_path = os.path.join(folder_path, "shenron_radar", shenron_file)
if os.path.exists(shenron_path):
s_data = np.load(shenron_path)
if s_data.size > 0:
# s_data = [x, y, z, velocity, magnitude]
# ISOLATE coords: X is fwd, Y is right.
# ROS: X is fwd, Y is left.
ros_shenron = s_data.copy().astype(np.float32)
ros_shenron[:, 1] = -ros_shenron[:, 1] # Negate Y for ROS
# MOUNT OFFSET: Shenron Radar is on the bumper (X=2.0, Z=1.0)
shenron_pose = {"position": {"x": 2.0, "y": 0.0, "z": 1.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
shenron_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle",
"pose": shenron_pose,
"point_stride": 20, # 5 floats * 4 bytes
"fields": [
{"name":"x","offset":0,"type":7},
{"name":"y","offset":4,"type":7},
{"name":"z","offset":8,"type":7},
{"name":"velocity","offset":12,"type":7},
{"name":"magnitude","offset":16,"type":7}
],
"data": base64.b64encode(ros_shenron.tobytes()).decode("ascii")
}
writer.add_message(shenron_channel_id, log_time=ts_ns, data=json.dumps(shenron_msg).encode(), publish_time=ts_ns)
# METROLOGY HEATMAPS
if os.path.exists(met_dir):
frame_name = f"frame_{int(frame['frame_id']):06d}"
# RA (Polar Sector BEV)
ra_p = os.path.join(met_dir, "ra", f"{frame_name}.npy")
if os.path.exists(ra_p) and range_ax is not None:
ra_data = np.load(ra_p)
# Use smooth_sigma=0.0 for sharp focus (Iter 27 baseline)
ra_processed = postprocess_ra(ra_data, range_ax, smooth_sigma=0.0)
bev_data = scan_convert_ra(ra_processed, range_ax, angle_ax, img_size=512)
b64 = render_heatmap(bev_data, cmap='jet', vmin=-5, vmax=45)
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}
writer.add_message(met_ra_id, log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns)
# RD (Log-scaled)
rd_p = os.path.join(met_dir, "rd", f"{frame_name}.npy")
if os.path.exists(rd_p):
rd_data = np.log10(np.load(rd_p) + 1e-9)
b64 = render_heatmap(np.flipud(rd_data), cmap='viridis')
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}
writer.add_message(met_rd_id, log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns)
# CFAR (Mask)
cfar_p = os.path.join(met_dir, "cfar", f"{frame_name}.npy")
if os.path.exists(cfar_p):
b64 = render_heatmap(np.flipud(np.load(cfar_p)), cmap='plasma')
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}
writer.add_message(met_cfar_id, log_time=ts_ns, data=json.dumps(msg).encode(), publish_time=ts_ns)
frame_count += 1
if frame_count % 50 == 0:
print(f" Processed {frame_count} frames...", flush=True)
writer.finish()
print(f" Done! MCAP saved: {output_path} ({os.path.getsize(output_path)/1024/1024:.2f} MB)", flush=True)
def main():
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
root_data = os.path.join(PROJECT_ROOT, "data")
if not os.path.exists(root_data):
print(f"Error: {root_data} directory not found.")
return
folders = [os.path.join(root_data, d) for d in os.listdir(root_data) if os.path.isdir(os.path.join(root_data, d))]
# Also check if 'root_data' itself contains 'frames.jsonl' (legacy single-folder mode)
if os.path.exists(os.path.join(root_data, "frames.jsonl")):
convert_folder(root_data)
for folder in folders:
if os.path.exists(os.path.join(folder, "frames.jsonl")):
# Check if MCAP already exists and avoid re-processing if you prefer,
# but here we'll process all matching folders.
convert_folder(folder)
if __name__ == "__main__":
main()