You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
282 lines
11 KiB
282 lines
11 KiB
import sys
|
|
import os
|
|
import numpy as np
|
|
|
|
# Add the necessary directories to sys.path to ensure internal imports work
|
|
current_dir = os.path.dirname(os.path.abspath(__file__))
|
|
package_root = os.path.join(current_dir, 'e2e_agent_sem_lidar2shenron_package')
|
|
utils_root = os.path.join(current_dir, 'sim_radar_utils')
|
|
|
|
if current_dir not in sys.path:
|
|
sys.path.append(current_dir)
|
|
if package_root not in sys.path:
|
|
sys.path.append(package_root)
|
|
if utils_root not in sys.path:
|
|
sys.path.append(utils_root)
|
|
|
|
# Now import the modules
|
|
from e2e_agent_sem_lidar2shenron_package.lidar import run_lidar
|
|
from e2e_agent_sem_lidar2shenron_package.ConfigureRadar import radar
|
|
from sim_radar_utils.radar_processor import RadarProcessor
|
|
from sim_radar_utils.utils_radar import reformat_adc_shenron, config
|
|
|
|
class ShenronRadarModel:
|
|
def __init__(self, radar_type='radarbook'):
|
|
"""
|
|
Initialize the Shenron Radar Model.
|
|
|
|
Args:
|
|
radar_type (str): Type of radar to simulate (default: 'radarbook').
|
|
"""
|
|
# Init message suppressed — telemetry is emitted by generate_shenron.py
|
|
self.radar_type = radar_type
|
|
|
|
# Initialize the hardware radar object
|
|
self.radar_obj = radar(radar_type)
|
|
self.radar_obj.center = np.array([0.0, 0.0]) # center of radar
|
|
self.radar_obj.elv = np.array([0.0])
|
|
|
|
# Synchronize global config used by Signal Processor with the Simulated Hardware
|
|
self._sync_configs()
|
|
|
|
# Initialize the signal processor (FFT, CFAR, etc.)
|
|
self.processor = RadarProcessor()
|
|
|
|
# Standard simulation config used by the internal physics engine
|
|
self.sim_config = {
|
|
'RADAR_TYPE': radar_type,
|
|
'INVERT_ANGLE': 0,
|
|
'RAY_TRACING': False,
|
|
'RADAR_MOVING': False
|
|
}
|
|
|
|
# Internal buffer for raw metrology (Heatmaps, SNR, etc.)
|
|
self.last_metrology = {}
|
|
|
|
def _sync_configs(self):
|
|
"""Important: Sync global variables in sim_radar_utils to match current radar.obj"""
|
|
import sim_radar_utils.utils_radar as ur
|
|
|
|
# Update Radar Cfg
|
|
ur.radarCfg['N'] = self.radar_obj.N_sample
|
|
ur.radarCfg['Np'] = self.radar_obj.chirps
|
|
ur.radarCfg['NrChn'] = self.radar_obj.nRx
|
|
ur.radarCfg['fStrt'] = self.radar_obj.f
|
|
ur.radarCfg['fStop'] = self.radar_obj.f + self.radar_obj.B
|
|
ur.radarCfg['Tp'] = self.radar_obj.chirp_rep
|
|
|
|
# Update FFT Cfg
|
|
ur.fftCfg['NFFT'] = self.radar_obj.N_sample
|
|
ur.fftCfg['NFFTVel'] = self.radar_obj.chirps
|
|
|
|
# Sync log suppressed for dashboard cleanliness — data exposed via get_radar_specs()
|
|
|
|
# CRITICAL: Re-initialize the internal axes of the processor to match new hardware
|
|
if hasattr(self, 'processor'):
|
|
self.processor.__init__()
|
|
|
|
def process(self, semantic_lidar_data):
|
|
"""
|
|
Process semantic LiDAR data to generate a rich radar point cloud.
|
|
|
|
Args:
|
|
semantic_lidar_data (np.ndarray): Array of shape [N, 7]
|
|
format: [x, y, z, intensity, cos_inc_angle, object_idx, semantic_tag]
|
|
|
|
Returns:
|
|
np.ndarray: Rich radar point cloud [M, 5]
|
|
format: [x, y, z, velocity, magnitude]
|
|
"""
|
|
if semantic_lidar_data is None or len(semantic_lidar_data) == 0:
|
|
return np.empty((0, 5))
|
|
|
|
try:
|
|
# Re-sync global configs for this specific model, in case another model overwrote them
|
|
self._sync_configs()
|
|
|
|
# 1. Physics-based Signal Generation (FMCW Chirps)
|
|
# This generates the raw ADC samples [Np, N, Ant]
|
|
raw_adc = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
|
|
# Store raw ADC for later saving
|
|
self.last_adc = raw_adc
|
|
adc_data = raw_adc
|
|
|
|
# 2. Reformat to match Signal Processor expectations
|
|
# Internal logic often needs specific axis ordering
|
|
adc_data = reformat_adc_shenron(adc_data)
|
|
|
|
# 3. Fast Fourier Transform (FFT) Pipeline
|
|
# Range FFT converts time data to range profiles
|
|
range_profile = self.processor.cal_range_fft(adc_data)
|
|
|
|
# Doppler FFT converts range profiles over time to velocity info
|
|
doppler_profile = self.processor.cal_doppler_fft(range_profile)
|
|
|
|
# 4. Target Detection and Rich Parameter Extraction
|
|
# CFAR detection + Angle of Arrival (AoA) estimation
|
|
# returns: rangeAoA, pointcloud ([x, y, z, vel, mag]), metrology dict
|
|
_, rich_pcd, metrology = self.processor.convert_to_pcd(doppler_profile)
|
|
|
|
# 5. Capture Advanced Metrology
|
|
# Calculate SNR and basic noise stats for the Frame Metrics
|
|
self.last_pcd = rich_pcd
|
|
self.last_metrology = metrology
|
|
|
|
return rich_pcd
|
|
|
|
except Exception as e:
|
|
print(f"Error during Shenron processing: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return np.empty((0, 5))
|
|
|
|
def get_last_metrology(self):
|
|
"""
|
|
Return the raw internal heatmaps and thresholds for the last processed frame.
|
|
|
|
Returns:
|
|
dict: {
|
|
'rd_heatmap': np.ndarray,
|
|
'ra_heatmap': np.ndarray,
|
|
'threshold_matrix': np.ndarray
|
|
}
|
|
"""
|
|
return self.last_metrology
|
|
|
|
def get_signal_metrics(self):
|
|
"""
|
|
Calculates frame-level signal-to-noise ratio and noise floor metadata.
|
|
"""
|
|
if not self.last_metrology:
|
|
return {}
|
|
|
|
rd = self.last_metrology['rd_heatmap']
|
|
noise = self.last_metrology['threshold_matrix']
|
|
ra = self.last_metrology['ra_heatmap']
|
|
|
|
peak_mag = np.max(rd)
|
|
|
|
# 'noise' here is actually the detection_gate (threshold_matrix) from CFAR.
|
|
# It already has the 10**(threshold/10) multiplier applied.
|
|
# We must divide it out to get the true physical average noise floor.
|
|
threshold_db = config['CFAR'].get('threshold', 20.0)
|
|
threshold_linear = 10 ** (threshold_db / 10.0)
|
|
|
|
avg_noise_gate = np.mean(noise)
|
|
true_avg_noise = avg_noise_gate / threshold_linear
|
|
|
|
snr = 10 * np.log10(peak_mag / true_avg_noise) if true_avg_noise > 0 else 0
|
|
|
|
metrics = {
|
|
"peak_magnitude": float(peak_mag),
|
|
"avg_noise_floor": float(true_avg_noise),
|
|
"peak_snr_db": float(snr),
|
|
"active_bins": int(np.sum(rd > noise))
|
|
}
|
|
|
|
try:
|
|
# 1. Point Cloud Metrics
|
|
pcd = getattr(self, 'last_pcd', np.array([]))
|
|
cfar_count = len(pcd)
|
|
|
|
if cfar_count > 0:
|
|
ranges = np.linalg.norm(pcd[:, :2], axis=1)
|
|
vels = pcd[:, 3]
|
|
farthest = np.max(ranges)
|
|
closest = np.min(ranges)
|
|
mean_doppler = np.mean(np.abs(vels))
|
|
doppler_var = np.var(vels)
|
|
else:
|
|
farthest = 0.0
|
|
closest = 0.0
|
|
mean_doppler = 0.0
|
|
doppler_var = 0.0
|
|
|
|
metrics["cfar_target_count"] = int(cfar_count)
|
|
metrics["farthest_target_m"] = float(farthest)
|
|
metrics["closest_target_m"] = float(closest)
|
|
metrics["mean_absolute_doppler"] = float(mean_doppler)
|
|
metrics["doppler_variance"] = float(doppler_var)
|
|
|
|
# 2. Physical Array Metrics
|
|
min_hit = np.min(rd[rd > noise]) if cfar_count > 0 else avg_noise
|
|
dyn_range = 10 * np.log10(peak_mag / min_hit) if peak_mag > 0 and min_hit > 0 else 0.0
|
|
|
|
r_axis = self.processor.rangeAxis
|
|
v_axis = self.processor.velAxis
|
|
r_mask = r_axis < 5.0
|
|
v_mask = np.abs(v_axis) < 1.0
|
|
|
|
if len(r_axis) == rd.shape[0] and len(v_axis) == rd.shape[1]:
|
|
ego_power = np.sum(rd[np.ix_(r_mask, v_mask)])
|
|
else:
|
|
ego_power = 0.0
|
|
|
|
clutter_bins = (rd > true_avg_noise) & (rd <= noise)
|
|
clutter_ratio = np.sum(clutter_bins) / rd.size
|
|
avg_clutter = np.mean(rd[clutter_bins]) if np.any(clutter_bins) else true_avg_noise
|
|
scr = 10 * np.log10(peak_mag / avg_clutter) if avg_clutter > 0 else snr
|
|
|
|
metrics["dynamic_range_db"] = float(dyn_range)
|
|
metrics["ego_vicinity_power"] = float(ego_power)
|
|
metrics["clutter_ratio"] = float(clutter_ratio)
|
|
metrics["signal_to_clutter_ratio_db"] = float(scr)
|
|
|
|
# 3. Array Health (Angular Dispersion)
|
|
if ra is not None:
|
|
# Normalize RA heatmap [0, 1] to prevent massive scalar variance
|
|
ra_max = np.max(ra)
|
|
ra_norm = ra / ra_max if ra_max > 0 else ra
|
|
|
|
# Azimuth Variance (Normalized)
|
|
# This measures the average energy dispersion across all range bins
|
|
az_var = np.mean(np.var(ra_norm, axis=1))
|
|
metrics["azimuth_variance"] = float(az_var)
|
|
|
|
# Peak Azimuth Spread (Half-Power Beamwidth proxy)
|
|
# Find the range bin with the maximum energy
|
|
peak_range_idx = np.argmax(np.max(ra, axis=1))
|
|
peak_az_profile = ra[peak_range_idx, :]
|
|
peak_val = np.max(peak_az_profile)
|
|
|
|
if peak_val > 0:
|
|
# Count bins within 3dB (0.5 power) of the peak
|
|
half_power_bins = np.sum(peak_az_profile > (0.5 * peak_val))
|
|
# Convert to degrees: (Num Bins / Total Bins) * FOV
|
|
# Total Bins = len(angle_axis), FOV ~ 120 degrees
|
|
fov_deg = 120.0 # Approximate for visualization
|
|
spread_deg = (half_power_bins / len(peak_az_profile)) * fov_deg
|
|
else:
|
|
spread_deg = 0.0
|
|
|
|
metrics["peak_azimuth_spread_deg"] = float(spread_deg)
|
|
else:
|
|
metrics["azimuth_variance"] = 0.0
|
|
metrics["peak_azimuth_spread_deg"] = 0.0
|
|
|
|
except Exception as e:
|
|
print(f"[WARNING] Advanced metrology calculation failed: {e}")
|
|
pass
|
|
|
|
return metrics
|
|
|
|
def get_radar_specs(self):
|
|
"""Return radar hardware specs for dashboard telemetry."""
|
|
r = self.radar_obj
|
|
return {
|
|
"type": self.radar_type,
|
|
"freq_ghz": r.f / 1e9,
|
|
"bw_mhz": r.B / 1e6,
|
|
"chirps": r.chirps,
|
|
"samples": r.N_sample,
|
|
"antennas": r.nRx,
|
|
"range_res_m": round(r.range_res, 3),
|
|
"max_range_m": round(r.max_range, 1),
|
|
"chirp_rep_us": round(r.chirp_rep * 1e6, 1),
|
|
"gain_db": round(10 * np.log10(r.gain), 1),
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
# Internal test/demo
|
|
model = ShenronRadarModel()
|
|
print("Model initialized successfully.")
|