import sys import os import numpy as np # Add the necessary directories to sys.path to ensure internal imports work current_dir = os.path.dirname(os.path.abspath(__file__)) package_root = os.path.join(current_dir, 'e2e_agent_sem_lidar2shenron_package') utils_root = os.path.join(current_dir, 'sim_radar_utils') if current_dir not in sys.path: sys.path.append(current_dir) if package_root not in sys.path: sys.path.append(package_root) if utils_root not in sys.path: sys.path.append(utils_root) # Now import the modules from e2e_agent_sem_lidar2shenron_package.lidar import run_lidar from e2e_agent_sem_lidar2shenron_package.ConfigureRadar import radar from sim_radar_utils.radar_processor import RadarProcessor from sim_radar_utils.utils_radar import reformat_adc_shenron class ShenronRadarModel: def __init__(self, radar_type='radarbook'): """ Initialize the Shenron Radar Model. Args: radar_type (str): Type of radar to simulate (default: 'radarbook'). """ print(f"Initializing ShenronRadarModel with type: {radar_type}") self.radar_type = radar_type # Initialize the hardware radar object self.radar_obj = radar(radar_type) self.radar_obj.center = np.array([0.0, 0.0]) # center of radar self.radar_obj.elv = np.array([0.0]) # Synchronize global config used by Signal Processor with the Simulated Hardware self._sync_configs() # Initialize the signal processor (FFT, CFAR, etc.) self.processor = RadarProcessor() # Standard simulation config used by the internal physics engine self.sim_config = { 'RADAR_TYPE': radar_type, 'INVERT_ANGLE': 0, 'RAY_TRACING': False, 'RADAR_MOVING': False } # Internal buffer for raw metrology (Heatmaps, SNR, etc.) self.last_metrology = {} def _sync_configs(self): """Important: Sync global variables in sim_radar_utils to match current radar.obj""" import sim_radar_utils.utils_radar as ur # Update Radar Cfg ur.radarCfg['N'] = self.radar_obj.N_sample ur.radarCfg['Np'] = self.radar_obj.chirps ur.radarCfg['NrChn'] = self.radar_obj.nRx ur.radarCfg['fStrt'] = self.radar_obj.f ur.radarCfg['fStop'] = self.radar_obj.f + self.radar_obj.B ur.radarCfg['Tp'] = self.radar_obj.chirp_rep # Update FFT Cfg ur.fftCfg['NFFT'] = self.radar_obj.N_sample ur.fftCfg['NFFTVel'] = self.radar_obj.chirps print(f"Synced global config: N={ur.radarCfg['N']}, Np={ur.radarCfg['Np']}, Ant={ur.radarCfg['NrChn']}") # CRITICAL: Re-initialize the internal axes of the processor to match new hardware if hasattr(self, 'processor'): self.processor.__init__() def process(self, semantic_lidar_data): """ Process semantic LiDAR data to generate a rich radar point cloud. Args: semantic_lidar_data (np.ndarray): Array of shape [N, 7] format: [x, y, z, intensity, cos_inc_angle, object_idx, semantic_tag] Returns: np.ndarray: Rich radar point cloud [M, 5] format: [x, y, z, velocity, magnitude] """ if semantic_lidar_data is None or len(semantic_lidar_data) == 0: return np.empty((0, 5)) try: # Re-sync global configs for this specific model, in case another model overwrote them self._sync_configs() # 1. Physics-based Signal Generation (FMCW Chirps) # This generates the raw ADC samples [Np, N, Ant] adc_data = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj) # 2. Reformat to match Signal Processor expectations # Internal logic often needs specific axis ordering adc_data = reformat_adc_shenron(adc_data) # 3. Fast Fourier Transform (FFT) Pipeline # Range FFT converts time data to range profiles range_profile = self.processor.cal_range_fft(adc_data) # Doppler FFT converts range profiles over time to velocity info doppler_profile = self.processor.cal_doppler_fft(range_profile) # 4. Target Detection and Rich Parameter Extraction # CFAR detection + Angle of Arrival (AoA) estimation # returns: rangeAoA, pointcloud ([x, y, z, vel, mag]), metrology dict _, rich_pcd, metrology = self.processor.convert_to_pcd(doppler_profile) # 5. Capture Advanced Metrology # Calculate SNR and basic noise stats for the Frame Metrics self.last_metrology = metrology return rich_pcd except Exception as e: print(f"Error during Shenron processing: {e}") import traceback traceback.print_exc() return np.empty((0, 5)) def get_last_metrology(self): """ Return the raw internal heatmaps and thresholds for the last processed frame. Returns: dict: { 'rd_heatmap': np.ndarray, 'ra_heatmap': np.ndarray, 'threshold_matrix': np.ndarray } """ return self.last_metrology def get_signal_metrics(self): """ Calculates frame-level signal-to-noise ratio and noise floor metadata. """ if not self.last_metrology: return {} rd = self.last_metrology['rd_heatmap'] noise = self.last_metrology['threshold_matrix'] peak_mag = np.max(rd) avg_noise = np.mean(noise) snr = 10 * np.log10(peak_mag / avg_noise) if avg_noise > 0 else 0 return { "peak_magnitude": float(peak_mag), "avg_noise_floor": float(avg_noise), "peak_snr_db": float(snr), "active_bins": int(np.sum(rd > avg_noise)) } if __name__ == "__main__": # Internal test/demo model = ShenronRadarModel() print("Model initialized successfully.")