CARLA ? C-Shenron based Simualtor for Sensor data generation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

166 lines
6.2 KiB

import sys
import os
import numpy as np
# Add the necessary directories to sys.path to ensure internal imports work
current_dir = os.path.dirname(os.path.abspath(__file__))
package_root = os.path.join(current_dir, 'e2e_agent_sem_lidar2shenron_package')
utils_root = os.path.join(current_dir, 'sim_radar_utils')
if current_dir not in sys.path:
sys.path.append(current_dir)
if package_root not in sys.path:
sys.path.append(package_root)
if utils_root not in sys.path:
sys.path.append(utils_root)
# Now import the modules
from e2e_agent_sem_lidar2shenron_package.lidar import run_lidar
from e2e_agent_sem_lidar2shenron_package.ConfigureRadar import radar
from sim_radar_utils.radar_processor import RadarProcessor
from sim_radar_utils.utils_radar import reformat_adc_shenron
class ShenronRadarModel:
def __init__(self, radar_type='radarbook'):
"""
Initialize the Shenron Radar Model.
Args:
radar_type (str): Type of radar to simulate (default: 'radarbook').
"""
print(f"Initializing ShenronRadarModel with type: {radar_type}")
self.radar_type = radar_type
# Initialize the hardware radar object
self.radar_obj = radar(radar_type)
self.radar_obj.center = np.array([0.0, 0.0]) # center of radar
self.radar_obj.elv = np.array([0.0])
# Synchronize global config used by Signal Processor with the Simulated Hardware
self._sync_configs()
# Initialize the signal processor (FFT, CFAR, etc.)
self.processor = RadarProcessor()
# Standard simulation config used by the internal physics engine
self.sim_config = {
'RADAR_TYPE': radar_type,
'INVERT_ANGLE': 0,
'RAY_TRACING': False,
'RADAR_MOVING': False
}
# Internal buffer for raw metrology (Heatmaps, SNR, etc.)
self.last_metrology = {}
def _sync_configs(self):
"""Important: Sync global variables in sim_radar_utils to match current radar.obj"""
import sim_radar_utils.utils_radar as ur
# Update Radar Cfg
ur.radarCfg['N'] = self.radar_obj.N_sample
ur.radarCfg['Np'] = self.radar_obj.chirps
ur.radarCfg['NrChn'] = self.radar_obj.nRx
ur.radarCfg['fStrt'] = self.radar_obj.f
ur.radarCfg['fStop'] = self.radar_obj.f + self.radar_obj.B
ur.radarCfg['Tp'] = self.radar_obj.chirp_rep
# Update FFT Cfg
ur.fftCfg['NFFT'] = self.radar_obj.N_sample
ur.fftCfg['NFFTVel'] = self.radar_obj.chirps
print(f"Synced global config: N={ur.radarCfg['N']}, Np={ur.radarCfg['Np']}, Ant={ur.radarCfg['NrChn']}")
# CRITICAL: Re-initialize the internal axes of the processor to match new hardware
if hasattr(self, 'processor'):
self.processor.__init__()
def process(self, semantic_lidar_data):
"""
Process semantic LiDAR data to generate a rich radar point cloud.
Args:
semantic_lidar_data (np.ndarray): Array of shape [N, 7]
format: [x, y, z, intensity, cos_inc_angle, object_idx, semantic_tag]
Returns:
np.ndarray: Rich radar point cloud [M, 5]
format: [x, y, z, velocity, magnitude]
"""
if semantic_lidar_data is None or len(semantic_lidar_data) == 0:
return np.empty((0, 5))
try:
# Re-sync global configs for this specific model, in case another model overwrote them
self._sync_configs()
# 1. Physics-based Signal Generation (FMCW Chirps)
# This generates the raw ADC samples [Np, N, Ant]
adc_data = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
# 2. Reformat to match Signal Processor expectations
# Internal logic often needs specific axis ordering
adc_data = reformat_adc_shenron(adc_data)
# 3. Fast Fourier Transform (FFT) Pipeline
# Range FFT converts time data to range profiles
range_profile = self.processor.cal_range_fft(adc_data)
# Doppler FFT converts range profiles over time to velocity info
doppler_profile = self.processor.cal_doppler_fft(range_profile)
# 4. Target Detection and Rich Parameter Extraction
# CFAR detection + Angle of Arrival (AoA) estimation
# returns: rangeAoA, pointcloud ([x, y, z, vel, mag]), metrology dict
_, rich_pcd, metrology = self.processor.convert_to_pcd(doppler_profile)
# 5. Capture Advanced Metrology
# Calculate SNR and basic noise stats for the Frame Metrics
self.last_metrology = metrology
return rich_pcd
except Exception as e:
print(f"Error during Shenron processing: {e}")
import traceback
traceback.print_exc()
return np.empty((0, 5))
def get_last_metrology(self):
"""
Return the raw internal heatmaps and thresholds for the last processed frame.
Returns:
dict: {
'rd_heatmap': np.ndarray,
'ra_heatmap': np.ndarray,
'threshold_matrix': np.ndarray
}
"""
return self.last_metrology
def get_signal_metrics(self):
"""
Calculates frame-level signal-to-noise ratio and noise floor metadata.
"""
if not self.last_metrology:
return {}
rd = self.last_metrology['rd_heatmap']
noise = self.last_metrology['threshold_matrix']
peak_mag = np.max(rd)
avg_noise = np.mean(noise)
snr = 10 * np.log10(peak_mag / avg_noise) if avg_noise > 0 else 0
return {
"peak_magnitude": float(peak_mag),
"avg_noise_floor": float(avg_noise),
"peak_snr_db": float(snr),
"active_bins": int(np.sum(rd > avg_noise))
}
if __name__ == "__main__":
# Internal test/demo
model = ShenronRadarModel()
print("Model initialized successfully.")