Browse Source

feat(shenron): Physics audit, azimuth metrology & pipeline stability

- [Physics] Disable R^2 area expansion in Sceneset.get_loss_3 to restore
  physically correct 1/R^4 two-way radar power law (point scatterer model)

- [Metrology] Normalize RA heatmap [0,1] before variance calculation in
  model_wrapper.py to prevent 1e20 scalar overflow values

- [Metrology] Add peak_azimuth_spread_deg (Half-Power Beamwidth proxy)
  to signal metrics for per-frame angular resolution monitoring

- [DSP] Switch spatial Angle-FFT window from Hanning to Hamming in
  radar_processor.py for sharper beamforming on small antenna arrays (6-8 Rx)

- [Telemetry] Expose az_std and spread in [SHENRON_STEP] dashboard stream
  for both generate_shenron.py and test_shenron.py pipelines

- [BugFix] Add missing sys/os imports in src/main.py and video_stage.py
  that caused NameError crashes in the stage-based pipeline orchestrator
Shenron
RUSHIL AMBARISH KADU 2 weeks ago
parent
commit
0bbfe68ca3
  1. 29
      scripts/ISOLATE/e2e_agent_sem_lidar2shenron_package/shenron/Sceneset.py
  2. 39
      scripts/ISOLATE/model_wrapper.py
  3. 5
      scripts/ISOLATE/sim_radar_utils/radar_processor.py
  4. 19
      scripts/generate_shenron.py
  5. 7
      scripts/test_shenron.py
  6. 3
      src/main.py
  7. 1
      src/pipeline/stages/video_stage.py

29
scripts/ISOLATE/e2e_agent_sem_lidar2shenron_package/shenron/Sceneset.py

@ -460,13 +460,28 @@ def get_loss_3(points, rho, az_boresight, elev_angle, angles, radar, use_spec =
phi_deg = np.rad2deg(np.abs(np.pi / 2 - elev_angle)) phi_deg = np.rad2deg(np.abs(np.pi / 2 - elev_angle))
G_ant = np.exp(-2.77 * np.power(phi_deg / radar.vertical_beamwidth, 2)) G_ant = np.exp(-2.77 * np.power(phi_deg / radar.vertical_beamwidth, 2))
# --- Iteration 37: Area Integration (Resolution Independence) ---
# A single LiDAR point represents an expanding physical patch of Area = R^2 * dTheta * dPhi
point_area = np.power(rho, 2) * voxel_theta * voxel_phi
# --- Iteration 17 preserved: Pure Physical 1/R^2 Tx path loss ---
# Intercepted power is weighted by the physical area the point represents
P_incident = (1 / np.power(rho, tx_dist_loss_exponent)) * K_sq * G_ant * point_area
# --- AREA INTEGRATION DISABLED (Iteration 37 — Commented Out) ---
# The Area Integration model treated each LiDAR point as an expanding resolution cell
# (Area = R^2 * dTheta * dPhi). This R^2 area gain exactly cancelled the 1/R^2 transmit-path
# loss, making P_incident distance-independent (i.e., constant regardless of range).
# Combined with the 1/R^2 return-path loss in heatmap_gen_fast.py, the net result was
# only 1/R^2 total falloff — incorrect for point-scatterer targets like vehicles/pedestrians.
#
# Disabled to restore the physically correct 1/R^4 two-way radar range equation:
# Tx-path loss: 1/R^2 (here, in get_loss_3)
# Rx-path loss: 1/R^2 (applied in heatmap_gen_fast.py: loss * (1/rho^2))
# Total: 1/R^4
#
# NOTE: Gain recalibration may be needed. Removing the point_area multiplier will reduce
# signal magnitudes by ~30-40 dB (depending on voxel resolution). Adjust K_sq or
# radar.gain in ConfigureRadar.py if targets become invisible in the RD heatmap.
#
# point_area = np.power(rho, 2) * voxel_theta * voxel_phi # DISABLED — causes 1/R^2, not 1/R^4
# --- Point-Scatterer Model: Pure Physical 1/R^2 Tx path loss ---
# Each LiDAR point is treated as an isotropic point scatterer. Incident power falls
# off as 1/R^2 (one-way spreading loss), without any area-expansion compensation.
P_incident = (1 / np.power(rho, tx_dist_loss_exponent)) * K_sq * G_ant
# DEBUG: Monitor Signal Trends # DEBUG: Monitor Signal Trends
# P_inc print suppressed — data captured via model.get_signal_metrics() telemetry # P_inc print suppressed — data captured via model.get_signal_metrics() telemetry

39
scripts/ISOLATE/model_wrapper.py

@ -96,7 +96,10 @@ class ShenronRadarModel:
# 1. Physics-based Signal Generation (FMCW Chirps) # 1. Physics-based Signal Generation (FMCW Chirps)
# This generates the raw ADC samples [Np, N, Ant] # This generates the raw ADC samples [Np, N, Ant]
adc_data = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
raw_adc = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
# Store raw ADC for later saving
self.last_adc = raw_adc
adc_data = raw_adc
# 2. Reformat to match Signal Processor expectations # 2. Reformat to match Signal Processor expectations
# Internal logic often needs specific axis ordering # Internal logic often needs specific axis ordering
@ -210,9 +213,37 @@ class ShenronRadarModel:
metrics["clutter_ratio"] = float(clutter_ratio) metrics["clutter_ratio"] = float(clutter_ratio)
metrics["signal_to_clutter_ratio_db"] = float(scr) metrics["signal_to_clutter_ratio_db"] = float(scr)
# 3. Array Health
az_var = np.mean(np.var(ra, axis=1)) if ra is not None else 0.0
metrics["azimuth_variance"] = float(az_var)
# 3. Array Health (Angular Dispersion)
if ra is not None:
# Normalize RA heatmap [0, 1] to prevent massive scalar variance
ra_max = np.max(ra)
ra_norm = ra / ra_max if ra_max > 0 else ra
# Azimuth Variance (Normalized)
# This measures the average energy dispersion across all range bins
az_var = np.mean(np.var(ra_norm, axis=1))
metrics["azimuth_variance"] = float(az_var)
# Peak Azimuth Spread (Half-Power Beamwidth proxy)
# Find the range bin with the maximum energy
peak_range_idx = np.argmax(np.max(ra, axis=1))
peak_az_profile = ra[peak_range_idx, :]
peak_val = np.max(peak_az_profile)
if peak_val > 0:
# Count bins within 3dB (0.5 power) of the peak
half_power_bins = np.sum(peak_az_profile > (0.5 * peak_val))
# Convert to degrees: (Num Bins / Total Bins) * FOV
# Total Bins = len(angle_axis), FOV ~ 120 degrees
fov_deg = 120.0 # Approximate for visualization
spread_deg = (half_power_bins / len(peak_az_profile)) * fov_deg
else:
spread_deg = 0.0
metrics["peak_azimuth_spread_deg"] = float(spread_deg)
else:
metrics["azimuth_variance"] = 0.0
metrics["peak_azimuth_spread_deg"] = 0.0
except Exception as e: except Exception as e:
print(f"[WARNING] Advanced metrology calculation failed: {e}") print(f"[WARNING] Advanced metrology calculation failed: {e}")

5
scripts/ISOLATE/sim_radar_utils/radar_processor.py

@ -46,8 +46,9 @@ class RadarProcessor:
threshold=cfarCfg['threshold'], threshold=cfarCfg['threshold'],
rd_size=(self.RMaxIdx - self.RMinIdx + 1, fftCfg['NFFTVel'])) rd_size=(self.RMaxIdx - self.RMinIdx + 1, fftCfg['NFFTVel']))
# Spatial Window for Angle-FFT (Hann window to reduce sidelobes)
self.spatialWin = np.hanning(radarCfg['NrChn'])
# Spatial Window for Angle-FFT (Hamming window to reduce sidelobes without zeroing edges)
# Note: Hamming is better for small arrays (like 6-8 antennas) than Hanning
self.spatialWin = np.hamming(radarCfg['NrChn'])
def cal_range_fft(self, data): def cal_range_fft(self, data):
'''apply range window and doppler window and apply fft on each sample to get range profile''' '''apply range window and doppler window and apply fft on each sample to get range profile'''

19
scripts/generate_shenron.py

@ -133,10 +133,17 @@ def process_session(session_path):
frame_metrics = {} frame_metrics = {}
for r_type, model in models.items(): for r_type, model in models.items():
# Ensure a folder exists for raw ADC samples
adc_folder = session_path / r_type / "adc_raw"
adc_folder.mkdir(parents=True, exist_ok=True)
try: try:
# 2. Process through the physics-based model # 2. Process through the physics-based model
# print(f" [DEBUG] Processing {r_type} frame {frame_idx+1}...", end='\r', flush=True) # print(f" [DEBUG] Processing {r_type} frame {frame_idx+1}...", end='\r', flush=True)
rich_pcd = model.process(data) rich_pcd = model.process(data)
# 2. Save raw ADC data (saved by the model in .last_adc)
if hasattr(model, "last_adc") and model.last_adc is not None:
adc_path = session_path / r_type / "adc_raw" / lidar_file.name
np.save(adc_path, model.last_adc)
# 3. Save to disk # 3. Save to disk
output_file = session_path / r_type / lidar_file.name output_file = session_path / r_type / lidar_file.name
@ -147,16 +154,22 @@ def process_session(session_path):
met = model.get_last_metrology() met = model.get_last_metrology()
if met: if met:
frame_name = lidar_file.stem frame_name = lidar_file.stem
ra_map = met['ra_heatmap']
np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap']) np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap'])
np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap'])
np.save(met_base / "ra" / f"{frame_name}.npy", ra_map)
np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix'])
# --- SANITY CHECK: Azimuth Variance ---
# Detects if RA heatmap is "peaky" (good) vs "rings/flat" (broken phase)
# We use the normalized variance from the model for consistency
pass # Handled by model.get_signal_metrics below
# 5. Save Signal Metrics # 5. Save Signal Metrics
try: try:
metrics = model.get_signal_metrics() metrics = model.get_signal_metrics()
if metrics: if metrics:
# Clean metrics for JSON (handle NaN/Inf) # Clean metrics for JSON (handle NaN/Inf)
clean_metrics = {}
clean_metrics = frame_metrics.get(r_type, {})
for k, v in metrics.items(): for k, v in metrics.items():
if isinstance(v, float) and (np.isnan(v) or np.isinf(v)): if isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
clean_metrics[k] = 0.0 clean_metrics[k] = 0.0
@ -203,6 +216,8 @@ def process_session(session_path):
"pts": m.get("pts", 0), "pts": m.get("pts", 0),
"peak": round(m.get("peak_magnitude", 0), 1), "peak": round(m.get("peak_magnitude", 0), 1),
"bins": m.get("active_bins", 0), "bins": m.get("active_bins", 0),
"az_std": round(m.get("azimuth_variance", 0), 4),
"spread": round(m.get("peak_azimuth_spread_deg", 0), 1),
} }
print(f"[SHENRON_STEP]{json.dumps(telemetry_frame)}", flush=True) print(f"[SHENRON_STEP]{json.dumps(telemetry_frame)}", flush=True)

7
scripts/test_shenron.py

@ -215,11 +215,8 @@ def run_testbench(iter_name):
np.save(iter_dir / r_type / "metrology" / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) np.save(iter_dir / r_type / "metrology" / "cfar" / f"{frame_name}.npy", met['threshold_matrix'])
# --- SANITY CHECK: Azimuth Variance --- # --- SANITY CHECK: Azimuth Variance ---
# If RA is working, energy should vary across azimuth bins.
# If RA is broken (uniform rings), variance will be near zero.
az_std = np.mean(np.std(ra_map, axis=1))
if az_std < 1e-12:
tqdm.tqdm.write(f" [⚠️ WARNING] Frame {frame_name} ({r_type}): Azimuth variance is ZERO. Check Phase Preservation.")
# Detects if RA heatmap is "peaky" (good) vs "rings/flat" (broken phase)
pass # Handled by model.get_signal_metrics below
# Log Metrics # Log Metrics
metrics = model.get_signal_metrics() metrics = model.get_signal_metrics()

3
src/main.py

@ -19,6 +19,8 @@ This file never changes when new scenarios are added.
All scenario logic lives in scenarios/<name>.py. All scenario logic lives in scenarios/<name>.py.
""" """
import sys
import os
from pathlib import Path from pathlib import Path
import argparse import argparse
@ -27,7 +29,6 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
import config import config
from scenario_loader import list_scenarios from scenario_loader import list_scenarios
from pathlib import Path
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------

1
src/pipeline/stages/video_stage.py

@ -8,6 +8,7 @@ transition. This stage runs at the very end of the pipeline.
""" """
import os import os
import sys
import cv2 import cv2
from pathlib import Path from pathlib import Path
from pipeline.base import PipelineStage, PipelineContext from pipeline.base import PipelineStage, PipelineContext

Loading…
Cancel
Save