import os import sys import time import numpy as np import json from pathlib import Path # Add project root and ISOLATE paths project_root = Path(__file__).parent.parent sys.path.append(str(project_root)) sys.path.append(str(project_root / 'scripts' / 'ISOLATE')) # Import the model wrapper try: from scripts.ISOLATE.model_wrapper import ShenronRadarModel except ImportError as e: print(f"Error: Failed to import ShenronRadarModel. Ensure scripts/ISOLATE/model_wrapper.py exists. ({e})") sys.exit(1) def _get_gpu_info(): """Retrieve GPU hardware info for telemetry display.""" try: import torch if not torch.cuda.is_available(): return {"name": "CPU Fallback", "vram_gb": 0, "backend": "CPU"} # Try to get device name safely try: name = torch.cuda.get_device_name(0) except: name = "NVIDIA Device" # Try to get properties safely try: props = torch.cuda.get_device_properties(0) vram_total = props.total_memory / (1024**3) except: vram_total = 0 return {"name": name, "vram_gb": round(vram_total, 1), "backend": "CUDA"} except Exception as e: return {"name": f"Detection Error ({type(e).__name__})", "vram_gb": 0, "backend": "Unknown"} def process_session(session_path): print(f"\n>>> Processing session: {session_path.name}") lidar_dir = session_path / "lidar" if not lidar_dir.exists(): print(f" [SKIP] No 'lidar' folder found.") return # Find all .npy files in lidar/ lidar_files = sorted(list(lidar_dir.glob("*.npy"))) if not lidar_files: print(f" [SKIP] No .npy files in 'lidar' folder.") return radar_types = ['awrl1432', 'radarbook'] models = {} # ----------------------------------------------------------------------- # DIAGNOSTIC: Step 1 - Model Initialization # ----------------------------------------------------------------------- print(f" [DIAGNOSTIC] Step 1: Initializing models...", flush=True) for r_type in radar_types: try: models[r_type] = ShenronRadarModel(radar_type=r_type) (session_path / r_type).mkdir(exist_ok=True) # Create Metrology folders met_base = session_path / r_type / "metrology" for sub in ["rd", "ra", "cfar"]: (met_base / sub).mkdir(parents=True, exist_ok=True) # Save physical axes once per session np.save(met_base / "range_axis.npy", models[r_type].processor.rangeAxis) np.save(met_base / "angle_axis.npy", models[r_type].processor.angleAxis) except Exception as e: print(f" [WARNING] Failed to init {r_type}: {e}") continue # ----------------------------------------------------------------------- # TELEMETRY: Emit structured init payload # ----------------------------------------------------------------------- print(f" [DIAGNOSTIC] Step 2: Collecting metadata...", flush=True) gpu_info = _get_gpu_info() radar_specs = {} for r_type, model in models.items(): radar_specs[r_type] = model.get_radar_specs() telemetry_init = { "gpu": gpu_info, "radars": radar_specs, "total_frames": len(lidar_files), "session": session_path.name, } print(f"[SHENRON_INIT]{json.dumps(telemetry_init)}", flush=True) # ----------------------------------------------------------------------- # MAIN PROCESSING LOOP # ----------------------------------------------------------------------- print(f" [DIAGNOSTIC] Step 3: Starting main loop for {len(lidar_files)} frames...", flush=True) total_frames = len(lidar_files) frame_times = [] for frame_idx, lidar_file in enumerate(lidar_files): frame_start = time.time() # 1. Load Semantic LiDAR data try: data = np.load(lidar_file) except Exception as e: print(f"\n [ERROR] Failed to load {lidar_file.name}: {e}") continue if data.shape[1] == 6: padded_data = np.zeros((data.shape[0], 7), dtype=np.float32) padded_data[:, 0:3] = data[:, 0:3] padded_data[:, 4:7] = data[:, 3:6] data = padded_data frame_metrics = {} for r_type, model in models.items(): try: # 2. Process through the physics-based model # print(f" [DEBUG] Processing {r_type} frame {frame_idx+1}...", end='\r', flush=True) rich_pcd = model.process(data) # 3. Save to disk output_file = session_path / r_type / lidar_file.name np.save(output_file, rich_pcd) # 4. Save Metrology Heatmaps met_base = session_path / r_type / "metrology" met = model.get_last_metrology() if met: frame_name = lidar_file.stem np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap']) np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap']) np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) # 5. Save Signal Metrics try: metrics = model.get_signal_metrics() if metrics: # Clean metrics for JSON (handle NaN/Inf) clean_metrics = {} for k, v in metrics.items(): if isinstance(v, float) and (np.isnan(v) or np.isinf(v)): clean_metrics[k] = 0.0 else: clean_metrics[k] = v metrics_file = met_base / "metrics.jsonl" with open(metrics_file, "a") as f: f.write(json.dumps({"frame": lidar_file.stem, **clean_metrics}) + "\n") # CAPTURE UNIQUE POINT COUNT FOR TELEMETRY clean_metrics["pts"] = int(rich_pcd.shape[0]) if hasattr(rich_pcd, 'shape') else 0 frame_metrics[r_type] = clean_metrics except Exception as e: pass # Metrics failure shouldn't crash the loop except Exception as e: print(f"\n [ERROR] Failed to process {lidar_file.name} for {r_type}: {e}") # Timing frame_elapsed = time.time() - frame_start frame_times.append(frame_elapsed) avg_time = sum(frame_times[-10:]) / len(frame_times[-10:]) remaining = total_frames - (frame_idx + 1) eta_seconds = remaining * avg_time eta_str = f"{int(eta_seconds // 60)}m {int(eta_seconds % 60)}s" if eta_seconds > 60 else f"{int(eta_seconds)}s" progress_pct = int(((frame_idx + 1) / total_frames) * 100) telemetry_frame = { "frame": frame_idx + 1, "total": total_frames, "pct": progress_pct, "fps": round(1.0 / frame_elapsed, 2) if frame_elapsed > 0 else 0, "elapsed": round(frame_elapsed, 2), "eta": eta_str, "metrics": {} } for r_type, m in frame_metrics.items(): telemetry_frame["metrics"][r_type] = { "snr": round(m.get("peak_snr_db", 0), 1), "pts": m.get("pts", 0), "peak": round(m.get("peak_magnitude", 0), 1), "bins": m.get("active_bins", 0), } print(f"[SHENRON_STEP]{json.dumps(telemetry_frame)}", flush=True) def main(): data_root = project_root / "data" if not data_root.exists(): print(f"Error: {data_root} not found.") return sessions = sorted([d for d in data_root.iterdir() if d.is_dir()]) if not sessions: print("No simulation sessions found in data/.") return print(f"Found {len(sessions)} sessions.") for session in sessions: if (session / "frames.jsonl").exists(): process_session(session) print("\n" + "="*50) print("SHENRON BATCH PROCESSING COMPLETE!") print("="*50) if __name__ == "__main__": main()