Browse Source
refactor: unify Shenron radar processing pipeline
refactor: unify Shenron radar processing pipeline
Established a common orchestration framework to eliminate logic duplication between production data generation and the iterative testbench. Key changes: - Created 'ShenronOrchestrator' in scripts/ISOLATE/shenron_orchestrator.py to serve as the single source of truth for the processing loop. - Refactored 'generate_shenron.py' to use the orchestrator, ensuring production data benefit from research-level DSP improvements. - Refactored 'test_shenron.py' to use the orchestrator, guaranteeing that debug iterations are bit-identical to production outputs. - Centralized LiDAR padding, model execution, and metrology/metric serialization logic. - Preserved Dashboard SSE telemetry patterns ([SHENRON_INIT/STEP]) to maintain full UI compatibility. This restructuring ensures that any iterative changes made in the 'test_shenron' lab are automatically and safely inherited by the Dashboard's automated simulation pipeline.Shenron
3 changed files with 146 additions and 163 deletions
-
129scripts/ISOLATE/shenron_orchestrator.py
-
117scripts/generate_shenron.py
-
63scripts/test_shenron.py
@ -0,0 +1,129 @@ |
|||
import os |
|||
import sys |
|||
import time |
|||
import numpy as np |
|||
import json |
|||
from pathlib import Path |
|||
|
|||
# Ensure we can import the model wrapper |
|||
sys.path.append(str(Path(__file__).parent)) |
|||
try: |
|||
from model_wrapper import ShenronRadarModel |
|||
except ImportError: |
|||
# Fallback if called from a different context |
|||
from scripts.ISOLATE.model_wrapper import ShenronRadarModel |
|||
|
|||
class ShenronOrchestrator: |
|||
""" |
|||
Unified orchestration engine for physics-based radar synthesis. |
|||
Ensures parity between production generation (dashboard) and iterative testbench (test_shenron). |
|||
""" |
|||
|
|||
def __init__(self, radar_types=['awrl1432', 'radarbook']): |
|||
self.radar_types = radar_types |
|||
self.models = {} |
|||
|
|||
def init_models(self, output_root: Path): |
|||
"""Initializes models and prepares directory structure.""" |
|||
specs = {} |
|||
for r_type in self.radar_types: |
|||
try: |
|||
print(f" - Initializing Shenron {r_type} engine...") |
|||
model = ShenronRadarModel(radar_type=r_type) |
|||
self.models[r_type] = model |
|||
|
|||
# Setup Folders |
|||
radar_dir = output_root / r_type |
|||
radar_dir.mkdir(exist_ok=True, parents=True) |
|||
|
|||
met_base = radar_dir / "metrology" |
|||
for sub in ["rd", "ra", "cfar"]: |
|||
(met_base / sub).mkdir(parents=True, exist_ok=True) |
|||
|
|||
# Save physical axes (static per session) |
|||
np.save(met_base / "range_axis.npy", model.processor.rangeAxis) |
|||
np.save(met_base / "angle_axis.npy", model.processor.angleAxis) |
|||
|
|||
# Get hardware specs |
|||
specs[r_type] = model.get_radar_specs() |
|||
|
|||
# Save specs for MCAP converter downstream |
|||
hw_specs = { |
|||
'f': float(model.radar_obj.f), |
|||
'chirp_rep': float(model.radar_obj.chirp_rep), |
|||
'max_velocity': float((3e8 / model.radar_obj.f) / (4 * model.radar_obj.chirp_rep)), |
|||
} |
|||
with open(met_base / "radar_specs.json", "w") as sf: |
|||
json.dump(hw_specs, sf) |
|||
|
|||
except Exception as e: |
|||
print(f" [WARNING] Failed to init {r_type}: {e}") |
|||
continue |
|||
return specs |
|||
|
|||
def process_frame(self, lidar_file: Path, output_root: Path, save_adc=False): |
|||
"""Processes a single LiDAR frame through all active radar models.""" |
|||
try: |
|||
data = np.load(lidar_file) |
|||
except Exception as e: |
|||
print(f" [ERROR] Failed to load {lidar_file.name}: {e}") |
|||
return None |
|||
|
|||
# Standard Padding: Ensure [x, y, z, intensity, cos_inc_angle, obj, tag] |
|||
if data.shape[1] == 6: |
|||
padded_data = np.zeros((data.shape[0], 7), dtype=np.float32) |
|||
padded_data[:, 0:3] = data[:, 0:3] |
|||
padded_data[:, 4:7] = data[:, 3:6] |
|||
data = padded_data |
|||
|
|||
frame_results = {} |
|||
|
|||
for r_type, model in self.models.items(): |
|||
try: |
|||
# 1. Physics Processing |
|||
rich_pcd = model.process(data) |
|||
|
|||
# 2. Save Pointcloud |
|||
output_file = output_root / r_type / lidar_file.name |
|||
np.save(output_file, rich_pcd) |
|||
|
|||
# 3. Optional ADC Saving |
|||
if save_adc and hasattr(model, "last_adc") and model.last_adc is not None: |
|||
adc_folder = output_root / r_type / "adc_raw" |
|||
adc_folder.mkdir(parents=True, exist_ok=True) |
|||
np.save(adc_folder / lidar_file.name, model.last_adc) |
|||
|
|||
# 4. Save Metrology (.npy) |
|||
met = model.get_last_metrology() |
|||
met_base = output_root / r_type / "metrology" |
|||
if met: |
|||
frame_name = lidar_file.stem |
|||
np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap']) |
|||
np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap']) |
|||
np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix']) |
|||
|
|||
# 5. Extract and Save Metrics |
|||
metrics = model.get_signal_metrics() |
|||
if metrics: |
|||
# Clean for JSON (handle NaN/Inf) |
|||
clean_metrics = {} |
|||
for k, v in metrics.items(): |
|||
if isinstance(v, float) and (np.isnan(v) or np.isinf(v)): |
|||
clean_metrics[k] = 0.0 |
|||
else: |
|||
clean_metrics[k] = v |
|||
|
|||
# Append to metrics log |
|||
metrics_file = met_base / "metrics.jsonl" |
|||
with open(metrics_file, "a") as f: |
|||
f.write(json.dumps({"frame": lidar_file.stem, **clean_metrics}) + "\n") |
|||
|
|||
# Add point count for telemetry |
|||
clean_metrics["pts"] = int(rich_pcd.shape[0]) if hasattr(rich_pcd, 'shape') else 0 |
|||
frame_results[r_type] = clean_metrics |
|||
|
|||
except Exception as e: |
|||
print(f" [ERROR] {r_type} processing failed for {lidar_file.name}: {e}") |
|||
continue |
|||
|
|||
return frame_results |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue