CARLA ? C-Shenron based Simualtor for Sensor data generation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

127 lines
4.8 KiB

import os
import sys
import numpy as np
import tqdm
import json
from pathlib import Path
# Add project root and ISOLATE paths
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))
sys.path.append(str(project_root / 'scripts' / 'ISOLATE'))
# Import the model wrapper
try:
from scripts.ISOLATE.model_wrapper import ShenronRadarModel
except ImportError as e:
print(f"Error: Failed to import ShenronRadarModel. Ensure scripts/ISOLATE/model_wrapper.py exists. ({e})")
sys.exit(1)
def process_session(session_path):
print(f"\n>>> Processing session: {session_path.name}")
lidar_dir = session_path / "lidar"
if not lidar_dir.exists():
print(f" [SKIP] No 'lidar' folder found.")
return
# Find all .npy files in lidar/
lidar_files = sorted(list(lidar_dir.glob("*.npy")))
if not lidar_files:
print(f" [SKIP] No .npy files in 'lidar' folder.")
return
radar_types = ['awrl1432', 'radarbook']
models = {}
for r_type in radar_types:
try:
print(f" Initializing ShenronRadarModel ({r_type})...")
models[r_type] = ShenronRadarModel(radar_type=r_type)
(session_path / r_type).mkdir(exist_ok=True)
# Create Metrology folders
met_base = session_path / r_type / "metrology"
for sub in ["rd", "ra", "cfar"]:
(met_base / sub).mkdir(parents=True, exist_ok=True)
# Save physical axes once per session
np.save(met_base / "range_axis.npy", models[r_type].processor.rangeAxis)
np.save(met_base / "angle_axis.npy", models[r_type].processor.angleAxis)
except Exception as e:
print(f" [WARNING] Failed to init {r_type}: {e}")
continue
print(f" Generating Shenron Radar data for {len(lidar_files)} frames...")
for lidar_file in tqdm.tqdm(lidar_files, desc=" Simulating Radar", unit="frame"):
# 1. Load Semantic LiDAR data once per frame
# Expected raw: [x, y, z, cos, obj, tag] (6 cols)
# Expected Shenron input: [x, y, z, intensity, cos, obj, tag] (7 cols)
data = np.load(lidar_file)
if data.shape[1] == 6:
# Pad with a dummy intensity column at index 3
# This aligns 'tag' to index 6 as expected by our lidar.py mapping
padded_data = np.zeros((data.shape[0], 7), dtype=np.float32)
padded_data[:, 0:3] = data[:, 0:3] # x, y, z
padded_data[:, 4:7] = data[:, 3:6] # cos, obj, tag
data = padded_data
for r_type, model in models.items():
try:
# 2. Process through the physics-based model
# returns rich PCD: [M, 5] (x, y, z, velocity, magnitude)
rich_pcd = model.process(data)
# 3. Save to disk
output_file = session_path / r_type / lidar_file.name
np.save(output_file, rich_pcd)
# 4. Save Metrology Heatmaps
met_base = session_path / r_type / "metrology"
met = model.get_last_metrology()
if met:
frame_name = lidar_file.stem
np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap'])
np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap'])
np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix'])
# 5. Save Signal Metrics (Telemetry)
metrics = model.get_signal_metrics()
if metrics:
metrics_file = met_base / "metrics.jsonl"
with open(metrics_file, "a") as f:
f.write(json.dumps({"frame": lidar_file.stem, **metrics}) + "\n")
except Exception as e:
print(f"\n [ERROR] Failed to process {lidar_file.name} for {r_type}: {e}")
def main():
data_root = project_root / "data"
if not data_root.exists():
print(f"Error: {data_root} not found.")
return
# Get all session folders
sessions = sorted([d for d in data_root.iterdir() if d.is_dir()])
if not sessions:
print("No simulation sessions found in data/.")
return
print(f"Found {len(sessions)} sessions.")
for session in sessions:
# Check if the session has frames.jsonl to confirm it's a valid data folder
if (session / "frames.jsonl").exists():
process_session(session)
else:
print(f"Skipping {session.name} (no frames.jsonl found).")
print("\n" + "="*50)
print("SHENRON BATCH PROCESSING COMPLETE!")
print("="*50)
if __name__ == "__main__":
main()