import sys import os import numpy as np # Add the necessary directories to sys.path to ensure internal imports work current_dir = os.path.dirname(os.path.abspath(__file__)) package_root = os.path.join(current_dir, 'e2e_agent_sem_lidar2shenron_package') utils_root = os.path.join(current_dir, 'sim_radar_utils') if current_dir not in sys.path: sys.path.append(current_dir) if package_root not in sys.path: sys.path.append(package_root) if utils_root not in sys.path: sys.path.append(utils_root) # Now import the modules from e2e_agent_sem_lidar2shenron_package.lidar import run_lidar from e2e_agent_sem_lidar2shenron_package.ConfigureRadar import radar from sim_radar_utils.radar_processor import RadarProcessor from sim_radar_utils.utils_radar import reformat_adc_shenron class ShenronRadarModel: def __init__(self, radar_type='radarbook'): """ Initialize the Shenron Radar Model. Args: radar_type (str): Type of radar to simulate (default: 'radarbook'). """ print(f"Initializing ShenronRadarModel with type: {radar_type}") self.radar_type = radar_type # Initialize the hardware radar object self.radar_obj = radar(radar_type) self.radar_obj.center = np.array([0.0, 0.0]) # center of radar self.radar_obj.elv = np.array([0.0]) # Synchronize global config used by Signal Processor with the Simulated Hardware self._sync_configs() # Initialize the signal processor (FFT, CFAR, etc.) self.processor = RadarProcessor() # Standard simulation config used by the internal physics engine self.sim_config = { 'RADAR_TYPE': radar_type, 'INVERT_ANGLE': 0, 'RAY_TRACING': False, 'RADAR_MOVING': False } def _sync_configs(self): """Important: Sync global variables in sim_radar_utils to match current radar.obj""" import sim_radar_utils.utils_radar as ur # Update Radar Cfg ur.radarCfg['N'] = self.radar_obj.N_sample ur.radarCfg['Np'] = self.radar_obj.chirps ur.radarCfg['NrChn'] = self.radar_obj.nRx ur.radarCfg['fStrt'] = self.radar_obj.f ur.radarCfg['fStop'] = self.radar_obj.f + self.radar_obj.B ur.radarCfg['Tp'] = self.radar_obj.chirp_rep # Update FFT Cfg ur.fftCfg['NFFT'] = self.radar_obj.N_sample ur.fftCfg['NFFTVel'] = self.radar_obj.chirps print(f"Synced global config: N={ur.radarCfg['N']}, Np={ur.radarCfg['Np']}, Ant={ur.radarCfg['NrChn']}") def process(self, semantic_lidar_data): """ Process semantic LiDAR data to generate a rich radar point cloud. Args: semantic_lidar_data (np.ndarray): Array of shape [N, 7] format: [x, y, z, intensity, cos_inc_angle, object_idx, semantic_tag] Returns: np.ndarray: Rich radar point cloud [M, 5] format: [x, y, z, velocity, magnitude] """ if semantic_lidar_data is None or len(semantic_lidar_data) == 0: return np.empty((0, 5)) try: # 1. Physics-based Signal Generation (FMCW Chirps) # This generates the raw ADC samples [Np, N, Ant] adc_data = run_lidar(self.sim_config, semantic_lidar_data) # 2. Reformat to match Signal Processor expectations # Internal logic often needs specific axis ordering adc_data = reformat_adc_shenron(adc_data) # 3. Fast Fourier Transform (FFT) Pipeline # Range FFT converts time data to range profiles range_profile = self.processor.cal_range_fft(adc_data) # Doppler FFT converts range profiles over time to velocity info doppler_profile = self.processor.cal_doppler_fft(range_profile) # 4. Target Detection and Rich Parameter Extraction # CFAR detection + Angle of Arrival (AoA) estimation # returns: rangeAoA, pointcloud ([x, y, z, vel, mag]) _, rich_pcd = self.processor.convert_to_pcd(doppler_profile) return rich_pcd except Exception as e: print(f"Error during Shenron processing: {e}") import traceback traceback.print_exc() return np.empty((0, 5)) if __name__ == "__main__": # Internal test/demo model = ShenronRadarModel() print("Model initialized successfully.")