import os import sys import numpy as np import tqdm import json from pathlib import Path # Add project root and ISOLATE paths project_root = Path(__file__).parent.parent sys.path.append(str(project_root)) sys.path.append(str(project_root / 'scripts' / 'ISOLATE')) # Import the model wrapper try: from scripts.ISOLATE.model_wrapper import ShenronRadarModel except ImportError as e: print(f"Error: Failed to import ShenronRadarModel. Ensure scripts/ISOLATE/model_wrapper.py exists. ({e})") sys.exit(1) def process_session(session_path): print(f"\n>>> Processing session: {session_path.name}") lidar_dir = session_path / "lidar" if not lidar_dir.exists(): print(f" [SKIP] No 'lidar' folder found.") return # Find all .npy files in lidar/ lidar_files = sorted(list(lidar_dir.glob("*.npy"))) if not lidar_files: print(f" [SKIP] No .npy files in 'lidar' folder.") return radar_types = ['awrl1432', 'radarbook'] models = {} for r_type in radar_types: try: print(f" Initializing ShenronRadarModel ({r_type})...") models[r_type] = ShenronRadarModel(radar_type=r_type) (session_path / r_type).mkdir(exist_ok=True) # Create Metrology folders met_base = session_path / r_type / "metrology" for sub in ["rd", "ra", "cfar"]: (met_base / sub).mkdir(parents=True, exist_ok=True) # Save physical axes once per session np.save(met_base / "range_axis.npy", models[r_type].processor.rangeAxis) np.save(met_base / "angle_axis.npy", models[r_type].processor.angleAxis) except Exception as e: print(f" [WARNING] Failed to init {r_type}: {e}") continue print(f" Generating Shenron Radar data for {len(lidar_files)} frames...") for lidar_file in tqdm.tqdm(lidar_files, desc=" Simulating Radar", unit="frame"): # 1. Load Semantic LiDAR data once per frame # Expected raw: [x, y, z, cos, obj, tag] (6 cols) # Expected Shenron input: [x, y, z, intensity, cos, obj, tag] (7 cols) data = np.load(lidar_file) if data.shape[1] == 6: # Pad with a dummy intensity column at index 3 # This aligns 'tag' to index 6 as expected by our lidar.py mapping padded_data = np.zeros((data.shape[0], 7), dtype=np.float32) padded_data[:, 0:3] = data[:, 0:3] # x, y, z padded_data[:, 4:7] = data[:, 3:6] # cos, obj, tag data = padded_data for r_type, model in models.items(): try: # 2. Process through the physics-based model # returns rich PCD: [M, 5] (x, y, z, velocity, magnitude) rich_pcd = model.process(data) # 3. Save to disk output_file = session_path / r_type / lidar_file.name np.save(output_file, rich_pcd) # 4. Save Metrology Heatmaps met_base = session_path / r_type / "metrology" met = model.get_last_metrology() if met: frame_name = lidar_file.stem np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap']) np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap']) np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) # 5. Save Signal Metrics (Telemetry) metrics = model.get_signal_metrics() if metrics: metrics_file = met_base / "metrics.jsonl" with open(metrics_file, "a") as f: f.write(json.dumps({"frame": lidar_file.stem, **metrics}) + "\n") except Exception as e: print(f"\n [ERROR] Failed to process {lidar_file.name} for {r_type}: {e}") def main(): data_root = project_root / "data" if not data_root.exists(): print(f"Error: {data_root} not found.") return # Get all session folders sessions = sorted([d for d in data_root.iterdir() if d.is_dir()]) if not sessions: print("No simulation sessions found in data/.") return print(f"Found {len(sessions)} sessions.") for session in sessions: # Check if the session has frames.jsonl to confirm it's a valid data folder if (session / "frames.jsonl").exists(): process_session(session) else: print(f"Skipping {session.name} (no frames.jsonl found).") print("\n" + "="*50) print("SHENRON BATCH PROCESSING COMPLETE!") print("="*50) if __name__ == "__main__": main()