You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
125 lines
4.9 KiB
125 lines
4.9 KiB
import sys
|
|
import os
|
|
import numpy as np
|
|
|
|
# Add the necessary directories to sys.path to ensure internal imports work
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
package_root = os.path.join(current_dir, 'e2e_agent_sem_lidar2shenron_package')
|
|
utils_root = os.path.join(current_dir, 'sim_radar_utils')
|
|
|
|
if current_dir not in sys.path:
|
|
sys.path.append(current_dir)
|
|
if package_root not in sys.path:
|
|
sys.path.append(package_root)
|
|
if utils_root not in sys.path:
|
|
sys.path.append(utils_root)
|
|
|
|
# Now import the modules
|
|
from e2e_agent_sem_lidar2shenron_package.lidar import run_lidar
|
|
from e2e_agent_sem_lidar2shenron_package.ConfigureRadar import radar
|
|
from sim_radar_utils.radar_processor import RadarProcessor
|
|
from sim_radar_utils.utils_radar import reformat_adc_shenron
|
|
|
|
class ShenronRadarModel:
|
|
def __init__(self, radar_type='radarbook'):
|
|
"""
|
|
Initialize the Shenron Radar Model.
|
|
|
|
Args:
|
|
radar_type (str): Type of radar to simulate (default: 'radarbook').
|
|
"""
|
|
print(f"Initializing ShenronRadarModel with type: {radar_type}")
|
|
self.radar_type = radar_type
|
|
|
|
# Initialize the hardware radar object
|
|
self.radar_obj = radar(radar_type)
|
|
self.radar_obj.center = np.array([0.0, 0.0]) # center of radar
|
|
self.radar_obj.elv = np.array([0.0])
|
|
|
|
# Synchronize global config used by Signal Processor with the Simulated Hardware
|
|
self._sync_configs()
|
|
|
|
# Initialize the signal processor (FFT, CFAR, etc.)
|
|
self.processor = RadarProcessor()
|
|
|
|
# Standard simulation config used by the internal physics engine
|
|
self.sim_config = {
|
|
'RADAR_TYPE': radar_type,
|
|
'INVERT_ANGLE': 0,
|
|
'RAY_TRACING': False,
|
|
'RADAR_MOVING': False
|
|
}
|
|
|
|
def _sync_configs(self):
|
|
"""Important: Sync global variables in sim_radar_utils to match current radar.obj"""
|
|
import sim_radar_utils.utils_radar as ur
|
|
|
|
# Update Radar Cfg
|
|
ur.radarCfg['N'] = self.radar_obj.N_sample
|
|
ur.radarCfg['Np'] = self.radar_obj.chirps
|
|
ur.radarCfg['NrChn'] = self.radar_obj.nRx
|
|
ur.radarCfg['fStrt'] = self.radar_obj.f
|
|
ur.radarCfg['fStop'] = self.radar_obj.f + self.radar_obj.B
|
|
ur.radarCfg['Tp'] = self.radar_obj.chirp_rep
|
|
|
|
# Update FFT Cfg
|
|
ur.fftCfg['NFFT'] = self.radar_obj.N_sample
|
|
ur.fftCfg['NFFTVel'] = self.radar_obj.chirps
|
|
|
|
print(f"Synced global config: N={ur.radarCfg['N']}, Np={ur.radarCfg['Np']}, Ant={ur.radarCfg['NrChn']}")
|
|
|
|
# CRITICAL: Re-initialize the internal axes of the processor to match new hardware
|
|
if hasattr(self, 'processor'):
|
|
self.processor.__init__()
|
|
|
|
def process(self, semantic_lidar_data):
|
|
"""
|
|
Process semantic LiDAR data to generate a rich radar point cloud.
|
|
|
|
Args:
|
|
semantic_lidar_data (np.ndarray): Array of shape [N, 7]
|
|
format: [x, y, z, intensity, cos_inc_angle, object_idx, semantic_tag]
|
|
|
|
Returns:
|
|
np.ndarray: Rich radar point cloud [M, 5]
|
|
format: [x, y, z, velocity, magnitude]
|
|
"""
|
|
if semantic_lidar_data is None or len(semantic_lidar_data) == 0:
|
|
return np.empty((0, 5))
|
|
|
|
try:
|
|
# Re-sync global configs for this specific model, in case another model overwrote them
|
|
self._sync_configs()
|
|
|
|
# 1. Physics-based Signal Generation (FMCW Chirps)
|
|
# This generates the raw ADC samples [Np, N, Ant]
|
|
adc_data = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
|
|
|
|
# 2. Reformat to match Signal Processor expectations
|
|
# Internal logic often needs specific axis ordering
|
|
adc_data = reformat_adc_shenron(adc_data)
|
|
|
|
# 3. Fast Fourier Transform (FFT) Pipeline
|
|
# Range FFT converts time data to range profiles
|
|
range_profile = self.processor.cal_range_fft(adc_data)
|
|
|
|
# Doppler FFT converts range profiles over time to velocity info
|
|
doppler_profile = self.processor.cal_doppler_fft(range_profile)
|
|
|
|
# 4. Target Detection and Rich Parameter Extraction
|
|
# CFAR detection + Angle of Arrival (AoA) estimation
|
|
# returns: rangeAoA, pointcloud ([x, y, z, vel, mag])
|
|
_, rich_pcd = self.processor.convert_to_pcd(doppler_profile)
|
|
|
|
return rich_pcd
|
|
|
|
except Exception as e:
|
|
print(f"Error during Shenron processing: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return np.empty((0, 5))
|
|
|
|
if __name__ == "__main__":
|
|
# Internal test/demo
|
|
model = ShenronRadarModel()
|
|
print("Model initialized successfully.")
|