import os import sys import time import numpy as np import json from pathlib import Path # Ensure we can import the model wrapper sys.path.append(str(Path(__file__).parent)) try: from model_wrapper import ShenronRadarModel except ImportError: # Fallback if called from a different context from scripts.ISOLATE.model_wrapper import ShenronRadarModel class ShenronOrchestrator: """ Unified orchestration engine for physics-based radar synthesis. Ensures parity between production generation (dashboard) and iterative testbench (test_shenron). """ def __init__(self, radar_types=['awrl1432', 'radarbook']): self.radar_types = radar_types self.models = {} def init_models(self, output_root: Path): """Initializes models and prepares directory structure.""" specs = {} # Resolve project root for stop flag check (root/scripts/ISOLATE/shenron_orchestrator.py) self.project_root = Path(__file__).resolve().parents[2] self.flag_path = self.project_root / "tmp" / "stop.flag" for r_type in self.radar_types: try: print(f" - Initializing Shenron {r_type} engine...") model = ShenronRadarModel(radar_type=r_type) self.models[r_type] = model # Setup Folders radar_dir = output_root / r_type radar_dir.mkdir(exist_ok=True, parents=True) met_base = radar_dir / "metrology" for sub in ["rd", "ra", "cfar"]: (met_base / sub).mkdir(parents=True, exist_ok=True) # Save physical axes (static per session) np.save(met_base / "range_axis.npy", model.processor.rangeAxis) np.save(met_base / "angle_axis.npy", model.processor.angleAxis) # Get hardware specs specs[r_type] = model.get_radar_specs() # Save specs for MCAP converter downstream hw_specs = { 'f': float(model.radar_obj.f), 'chirp_rep': float(model.radar_obj.chirp_rep), 'max_velocity': float((3e8 / model.radar_obj.f) / (4 * model.radar_obj.chirp_rep)), } with open(met_base / "radar_specs.json", "w") as sf: json.dump(hw_specs, sf) except Exception as e: print(f" [WARNING] Failed to init {r_type}: {e}") continue return specs def process_frame(self, lidar_file: Path, output_root: Path, save_adc=False): """Processes a single LiDAR frame through all active radar models.""" try: data = np.load(lidar_file) except Exception as e: print(f" [ERROR] Failed to load {lidar_file.name}: {e}") return None # Standard Padding: Ensure [x, y, z, intensity, cos_inc_angle, obj, tag] if data.shape[1] == 6: padded_data = np.zeros((data.shape[0], 7), dtype=np.float32) padded_data[:, 0:3] = data[:, 0:3] padded_data[:, 4:7] = data[:, 3:6] data = padded_data frame_results = {} for r_type, model in self.models.items(): try: # 1. Physics Processing rich_pcd = model.process(data) # 2. Save Pointcloud output_file = output_root / r_type / lidar_file.name np.save(output_file, rich_pcd) # 3. Optional ADC Saving if save_adc and hasattr(model, "last_adc") and model.last_adc is not None: adc_folder = output_root / r_type / "adc_raw" adc_folder.mkdir(parents=True, exist_ok=True) np.save(adc_folder / lidar_file.name, model.last_adc) # 4. Save Metrology (.npy) met = model.get_last_metrology() met_base = output_root / r_type / "metrology" if met: frame_name = lidar_file.stem np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap']) np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap']) np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) # 5. Extract and Save Metrics metrics = model.get_signal_metrics() if metrics: # Clean for JSON (handle NaN/Inf) clean_metrics = {} for k, v in metrics.items(): if isinstance(v, float) and (np.isnan(v) or np.isinf(v)): clean_metrics[k] = 0.0 else: clean_metrics[k] = v # Append to metrics log metrics_file = met_base / "metrics.jsonl" with open(metrics_file, "a") as f: f.write(json.dumps({"frame": lidar_file.stem, **clean_metrics}) + "\n") # Add point count for telemetry clean_metrics["pts"] = int(rich_pcd.shape[0]) if hasattr(rich_pcd, 'shape') else 0 frame_results[r_type] = clean_metrics except Exception as e: print(f" [ERROR] {r_type} processing failed for {lidar_file.name}: {e}") continue return frame_results def check_stop_flag(self) -> bool: """Check if a stop request has been issued by the user/dashboard.""" if hasattr(self, 'flag_path') and self.flag_path.exists(): print(f"\n[SHENRON] Stop flag detected at {self.flag_path}! Halting processing...") return True return False