import sys import os import numpy as np # Add the necessary directories to sys.path to ensure internal imports work current_dir = os.path.dirname(os.path.abspath(__file__)) package_root = os.path.join(current_dir, 'e2e_agent_sem_lidar2shenron_package') utils_root = os.path.join(current_dir, 'sim_radar_utils') if current_dir not in sys.path: sys.path.append(current_dir) if package_root not in sys.path: sys.path.append(package_root) if utils_root not in sys.path: sys.path.append(utils_root) # Now import the modules from e2e_agent_sem_lidar2shenron_package.lidar import run_lidar from e2e_agent_sem_lidar2shenron_package.ConfigureRadar import radar from sim_radar_utils.radar_processor import RadarProcessor from sim_radar_utils.utils_radar import reformat_adc_shenron class ShenronRadarModel: def __init__(self, radar_type='radarbook'): """ Initialize the Shenron Radar Model. Args: radar_type (str): Type of radar to simulate (default: 'radarbook'). """ # Init message suppressed — telemetry is emitted by generate_shenron.py self.radar_type = radar_type # Initialize the hardware radar object self.radar_obj = radar(radar_type) self.radar_obj.center = np.array([0.0, 0.0]) # center of radar self.radar_obj.elv = np.array([0.0]) # Synchronize global config used by Signal Processor with the Simulated Hardware self._sync_configs() # Initialize the signal processor (FFT, CFAR, etc.) self.processor = RadarProcessor() # Standard simulation config used by the internal physics engine self.sim_config = { 'RADAR_TYPE': radar_type, 'INVERT_ANGLE': 0, 'RAY_TRACING': False, 'RADAR_MOVING': False } # Internal buffer for raw metrology (Heatmaps, SNR, etc.) self.last_metrology = {} def _sync_configs(self): """Important: Sync global variables in sim_radar_utils to match current radar.obj""" import sim_radar_utils.utils_radar as ur # Update Radar Cfg ur.radarCfg['N'] = self.radar_obj.N_sample ur.radarCfg['Np'] = self.radar_obj.chirps ur.radarCfg['NrChn'] = self.radar_obj.nRx ur.radarCfg['fStrt'] = self.radar_obj.f ur.radarCfg['fStop'] = self.radar_obj.f + self.radar_obj.B ur.radarCfg['Tp'] = self.radar_obj.chirp_rep # Update FFT Cfg ur.fftCfg['NFFT'] = self.radar_obj.N_sample ur.fftCfg['NFFTVel'] = self.radar_obj.chirps # Sync log suppressed for dashboard cleanliness — data exposed via get_radar_specs() # CRITICAL: Re-initialize the internal axes of the processor to match new hardware if hasattr(self, 'processor'): self.processor.__init__() def process(self, semantic_lidar_data): """ Process semantic LiDAR data to generate a rich radar point cloud. Args: semantic_lidar_data (np.ndarray): Array of shape [N, 7] format: [x, y, z, intensity, cos_inc_angle, object_idx, semantic_tag] Returns: np.ndarray: Rich radar point cloud [M, 5] format: [x, y, z, velocity, magnitude] """ if semantic_lidar_data is None or len(semantic_lidar_data) == 0: return np.empty((0, 5)) try: # Re-sync global configs for this specific model, in case another model overwrote them self._sync_configs() # 1. Physics-based Signal Generation (FMCW Chirps) # This generates the raw ADC samples [Np, N, Ant] raw_adc = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj) # Store raw ADC for later saving self.last_adc = raw_adc adc_data = raw_adc # 2. Reformat to match Signal Processor expectations # Internal logic often needs specific axis ordering adc_data = reformat_adc_shenron(adc_data) # 3. Fast Fourier Transform (FFT) Pipeline # Range FFT converts time data to range profiles range_profile = self.processor.cal_range_fft(adc_data) # Doppler FFT converts range profiles over time to velocity info doppler_profile = self.processor.cal_doppler_fft(range_profile) # 4. Target Detection and Rich Parameter Extraction # CFAR detection + Angle of Arrival (AoA) estimation # returns: rangeAoA, pointcloud ([x, y, z, vel, mag]), metrology dict _, rich_pcd, metrology = self.processor.convert_to_pcd(doppler_profile) # 5. Capture Advanced Metrology # Calculate SNR and basic noise stats for the Frame Metrics self.last_pcd = rich_pcd self.last_metrology = metrology return rich_pcd except Exception as e: print(f"Error during Shenron processing: {e}") import traceback traceback.print_exc() return np.empty((0, 5)) def get_last_metrology(self): """ Return the raw internal heatmaps and thresholds for the last processed frame. Returns: dict: { 'rd_heatmap': np.ndarray, 'ra_heatmap': np.ndarray, 'threshold_matrix': np.ndarray } """ return self.last_metrology def get_signal_metrics(self): """ Calculates frame-level signal-to-noise ratio and noise floor metadata. """ if not self.last_metrology: return {} rd = self.last_metrology['rd_heatmap'] noise = self.last_metrology['threshold_matrix'] ra = self.last_metrology['ra_heatmap'] peak_mag = np.max(rd) avg_noise = np.mean(noise) snr = 10 * np.log10(peak_mag / avg_noise) if avg_noise > 0 else 0 metrics = { "peak_magnitude": float(peak_mag), "avg_noise_floor": float(avg_noise), "peak_snr_db": float(snr), "active_bins": int(np.sum(rd > noise)) } try: # 1. Point Cloud Metrics pcd = getattr(self, 'last_pcd', np.array([])) cfar_count = len(pcd) if cfar_count > 0: ranges = np.linalg.norm(pcd[:, :2], axis=1) vels = pcd[:, 3] farthest = np.max(ranges) closest = np.min(ranges) mean_doppler = np.mean(np.abs(vels)) doppler_var = np.var(vels) else: farthest = 0.0 closest = 0.0 mean_doppler = 0.0 doppler_var = 0.0 metrics["cfar_target_count"] = int(cfar_count) metrics["farthest_target_m"] = float(farthest) metrics["closest_target_m"] = float(closest) metrics["mean_absolute_doppler"] = float(mean_doppler) metrics["doppler_variance"] = float(doppler_var) # 2. Physical Array Metrics min_hit = np.min(rd[rd > noise]) if cfar_count > 0 else avg_noise dyn_range = 10 * np.log10(peak_mag / min_hit) if peak_mag > 0 and min_hit > 0 else 0.0 r_axis = self.processor.rangeAxis v_axis = self.processor.velAxis r_mask = r_axis < 5.0 v_mask = np.abs(v_axis) < 1.0 if len(r_axis) == rd.shape[0] and len(v_axis) == rd.shape[1]: ego_power = np.sum(rd[np.ix_(r_mask, v_mask)]) else: ego_power = 0.0 clutter_bins = (rd > avg_noise) & (rd <= noise) clutter_ratio = np.sum(clutter_bins) / rd.size avg_clutter = np.mean(rd[clutter_bins]) if np.any(clutter_bins) else avg_noise scr = 10 * np.log10(peak_mag / avg_clutter) if avg_clutter > 0 else snr metrics["dynamic_range_db"] = float(dyn_range) metrics["ego_vicinity_power"] = float(ego_power) metrics["clutter_ratio"] = float(clutter_ratio) metrics["signal_to_clutter_ratio_db"] = float(scr) # 3. Array Health (Angular Dispersion) if ra is not None: # Normalize RA heatmap [0, 1] to prevent massive scalar variance ra_max = np.max(ra) ra_norm = ra / ra_max if ra_max > 0 else ra # Azimuth Variance (Normalized) # This measures the average energy dispersion across all range bins az_var = np.mean(np.var(ra_norm, axis=1)) metrics["azimuth_variance"] = float(az_var) # Peak Azimuth Spread (Half-Power Beamwidth proxy) # Find the range bin with the maximum energy peak_range_idx = np.argmax(np.max(ra, axis=1)) peak_az_profile = ra[peak_range_idx, :] peak_val = np.max(peak_az_profile) if peak_val > 0: # Count bins within 3dB (0.5 power) of the peak half_power_bins = np.sum(peak_az_profile > (0.5 * peak_val)) # Convert to degrees: (Num Bins / Total Bins) * FOV # Total Bins = len(angle_axis), FOV ~ 120 degrees fov_deg = 120.0 # Approximate for visualization spread_deg = (half_power_bins / len(peak_az_profile)) * fov_deg else: spread_deg = 0.0 metrics["peak_azimuth_spread_deg"] = float(spread_deg) else: metrics["azimuth_variance"] = 0.0 metrics["peak_azimuth_spread_deg"] = 0.0 except Exception as e: print(f"[WARNING] Advanced metrology calculation failed: {e}") pass return metrics def get_radar_specs(self): """Return radar hardware specs for dashboard telemetry.""" r = self.radar_obj return { "type": self.radar_type, "freq_ghz": r.f / 1e9, "bw_mhz": r.B / 1e6, "chirps": r.chirps, "samples": r.N_sample, "antennas": r.nRx, "range_res_m": round(r.range_res, 3), "max_range_m": round(r.max_range, 1), "chirp_rep_us": round(r.chirp_rep * 1e6, 1), "gain_db": round(10 * np.log10(r.gain), 1), } if __name__ == "__main__": # Internal test/demo model = ShenronRadarModel() print("Model initialized successfully.")