Compare commits

...

7 Commits

Author SHA1 Message Date
RUSHIL AMBARISH KADU 9c028d606e docs: update context.md manifest and fix radar SNR/telemetry logic 2 weeks ago
RUSHIL AMBARISH KADU 11d6583230 Intel update 2 weeks ago
RUSHIL AMBARISH KADU bd466a3568 refactor: unify Shenron radar processing pipeline 2 weeks ago
RUSHIL AMBARISH KADU 05f9d181e1 feat(radar): recalibrate visualization engine for high-fidelity diagnostics 2 weeks ago
RUSHIL AMBARISH KADU 70aa058e4d feat(shenron): gain recalibration and visualization stability 2 weeks ago
RUSHIL AMBARISH KADU 0bbfe68ca3 feat(shenron): Physics audit, azimuth metrology & pipeline stability 2 weeks ago
RUSHIL AMBARISH KADU 274401b6e1 ADC_Data.md added. 3 weeks ago
  1. 2
      README.md
  2. 8
      gemini.md
  3. 8
      intel/CHRONICLES.md
  4. 180
      intel/internal/context.md
  5. 473
      intel/radar/ADC_Data.md
  6. 6
      scripts/ISOLATE/e2e_agent_sem_lidar2shenron_package/ConfigureRadar.py
  7. 27
      scripts/ISOLATE/e2e_agent_sem_lidar2shenron_package/shenron/Sceneset.py
  8. 62
      scripts/ISOLATE/model_wrapper.py
  9. 140
      scripts/ISOLATE/shenron_orchestrator.py
  10. 9
      scripts/ISOLATE/sim_radar_utils/config.yaml
  11. 11
      scripts/ISOLATE/sim_radar_utils/plots.py
  12. 12
      scripts/ISOLATE/sim_radar_utils/radar_processor.py
  13. 7
      scripts/data_to_mcap.py
  14. 110
      scripts/generate_shenron.py
  15. 81
      scripts/test_shenron.py
  16. 3
      src/main.py
  17. 12
      src/pipeline/manager.py
  18. 1
      src/pipeline/stages/video_stage.py

2
README.md

@ -71,7 +71,7 @@ You can run scenarios directly via the `run.bat` wrapper:
* **`src/`**: Core logic including `PipelineManager`, `SensorManager`, and `Recorder`.
* **`scenarios/`**: Modular scenario implementations (e.g., `braking.py`, `cutin.py`).
* **`scripts/`**: Data conversion tools (MCAP) and the **Shenron** radar synthesis engine.
* **`scripts/`**: Data conversion tools (MCAP) and the **Shenron** radar synthesis engine (orchestrated by `ShenronOrchestrator`).
* **`dashboard/`**: Flask-based web interface for the orchestrator.
* **`intel/`**: Deep-dive documentation, project history (CHRONICLES.md), and versioned changelogs.

8
gemini.md

@ -30,6 +30,7 @@ This document is the consolidated source of truth for AI agents working on the F
- `scripts/test_shenron.py`: Standalone diagnostic testbench for radar iteration.
- `scripts/ISOLATE/`: High-fidelity radar simulation engine.
- `scripts/ISOLATE/sim_radar_utils/`: Visualization and DSP utilities.
- `scripts/ISOLATE/shenron_orchestrator.py`: **Unified Orchestration Engine** — Centralized processing loop for physics-based radar synthesis. Ensures parity between `generate_shenron.py` (Production) and `test_shenron.py` (Iterative Lab).
- `scripts/ISOLATE/sim_radar_utils/plots.py`: **Single source of truth** for all radar heatmap rendering (`FastHeatmapEngine`, `postprocess_ra`, `scan_convert_ra`).
- `scripts/ISOLATE/sim_radar_utils/config.yaml`: DSP processor and plot limit configuration.
- `dashboard/`: Flask backend and static web assets for the GUI.
@ -59,6 +60,13 @@ This document is the consolidated source of truth for AI agents working on the F
- `McapStage` (`mcap_stage.py`): Performs Foxglove serialization via `data_to_mcap.py`.
- `VideoStage` (`video_stage.py`): Stitches captured camera frames into `.mp4` preview videos (dash + third-person).
### `scripts/ISOLATE/shenron_orchestrator.py` (The Unified Engine)
- **Role**: Centralized orchestration for physics-based radar synthesis.
- **Features**:
- Ensures 100% parity between `generate_shenron.py` and `test_shenron.py`.
- Manages directory structures and multi-modal serialization (ADC, PCD, Metrology).
- Decouples core physics processing from UI-specific telemetry via a callback pattern.
### `src/processing/physics.py` (The Math Layer)
- Centralized source of truth for:
- **Radial Velocity Injection**: Projecting relative velocity onto LOS via `calculate_radial_velocity()`.

8
intel/CHRONICLES.md

@ -148,6 +148,14 @@ After 30 iterations of parameter tuning, we identified the fundamental reason fo
## 📜 6. Detailed Daily Chronology (Git Absolute)
### **May 05**
- **Task**: Unified Radar Processing Orchestration.
- **Problem**: Identified divergence and confusion between the "Iterative Lab" (test_shenron) and "Production" (generate_shenron) pipelines. Changes made in research were not consistently propagated to the simulation engine.
- **Action**: Developed the **ShenronOrchestrator** in `scripts/ISOLATE/shenron_orchestrator.py`.
- **Decision**: Centralized the core processing loop, directory management, and data serialization (ADC, PCD, Metrology, Metrics) into a shared engine.
- **UI Logic**: Implemented a callback-based telemetry pattern to keep production-specific Dashboard logic (`[SHENRON_STEP]`) isolated from the core physics loop.
- **Parity**: Achieved 100% bit-identical logic between the research lab and the production simulation.
### **April 23 (Milestone: Fox v1.1 "Ares")**
- **Task**: Stage-Based Pipeline & Radar Parity Sync.
- **Architecture**: Completed the full refactor to a modular stage-based pipeline. Isolated `SimulationStage`, `ShenronStage`, and `McapStage`.

180
intel/internal/context.md

@ -26,30 +26,40 @@ multi-modal sensor data that can be visualized and analysed in Foxglove Studio.
Fox/
├── dashboard.bat ← One-click launcher for the GUI orchestrator (Flask)
├── run.bat ← One-click launcher (activates carla312 conda env)
├── config.py ← All tuneable constants (FPS, sensor params, scenario defaults)
├── config.py ← All tuneable constants (FPS, sensor params, ego models)
├── gemini.md ← The primary agent instruction protocol
├── README.md ← Project setup and replication guide
├── src/
│ ├── main.py ← Thin CLI wrapper for the PipelineManager
│ ├── pipeline/ ← Stage-based orchestration (Sim → Shenron → MCAP → Video)
│ ├── processing/ ← Physics utilities (radial velocity, ADAS metrology)
│ ├── sensors.py ← SensorManager (camera, radar, lidar sync queues)
│ ├── recorder.py ← Asynchronous frame recorder (PNG / NPY / JSONL)
│ ├── scenario_loader.py ← Dynamic scenario loader via importlib
│ └── utils.py ← Shared project helpers (e.g., weather mapping)
├── scripts/
│ ├── data_to_mcap.py ← Converts recorded dataset folders → .mcap files
│ └── data_inspector.py ← Utility to inspect / debug recorded datasets
│ ├── ISOLATE/ ← High-fidelity Shenron radar physics engine
│ │ ├── shenron_orchestrator.py ← Unified processing loop (production & testbench)
│ │ ├── model_wrapper.py ← Hardware spec syncer and physics execution
│ │ └── sim_radar_utils/ ← DSP 3D-FFT chain and visualization heatmaps
│ ├── generate_shenron.py ← Production wrapper invoked by the pipeline (with SSE telemetry)
│ ├── test_shenron.py ← Standalone iterative lab for radar testing
│ └── data_to_mcap.py ← Converts datasets into Foxglove `.mcap` formats
├── dashboard/ ← Flask backend and web frontend (GUI)
│ ├── app.py ← Web server bridging API requests to run.bat
│ ├── static/ ← Frontend logic and styling assets
│ └── templates/ ← HTML views
├── src/
│ ├── main.py ← Orchestrator — scenario-agnostic entry point
│ ├── sensors.py ← SensorManager (camera, radar, lidar setup + sync queues)
│ ├── recorder.py ← Recorder (writes PNG / NPY / JSONL per frame)
│ ├── scenario_loader.py ← Dynamic scenario loader via importlib
│ └── utils.py ← Shared project helpers (weather mapping, etc.)
│ ├── static/ ← Frontend logic (app.js with SSE parser)
│ └── templates/ ← HTML views (index.html with ADAS Explorer panel)
├── scenarios/
│ ├── __init__.py ← Package marker (empty to avoid side-effect imports)
│ ├── __init__.py
│ ├── base.py ← ScenarioBase abstract class (the plugin contract)
│ ├── braking.py ← Lead vehicle hard braking scenario
│ ├── cutin.py ← Adjacent lane cut-in scenario
│ └── obstacle.py ← Static obstacle (traffic cone) scenario
│ ├── obstacle.py ← Static obstacle (traffic cone) scenario
│ └── showcase.py ← Complex Left-Turn Across Path demo
├── data/ ← Auto-created; one subfolder per recording session
│ └── <scenario>_YYYYMMDD_HHMMSS/
@ -58,25 +68,37 @@ Fox/
│ ├── lidar/ ← frame_XXXXXX.npy (shape: [N, 4] — x/y/z/intensity)
│ └── frames.jsonl ← One JSON record per frame (metadata + scenario info)
├── carla_examples/ ← Standard CARLA PythonAPI examples (not part of core Fox pipeline)
├── tmp/ ← Staging area for IPC flags (e.g., stop.flag)
└── intel/
├── radar/ ← [CRITICAL] Physics, Material RCS, and Debug logs
│ ├── Shenron_debug.md ← The "Source of Truth" for radar calibration
│ └── Sceneset_deepdive.md ← Reflection & Electromagnetic math
├── scenarios/ ← Operational manuals for driving simulations
│ ├── dashboard.md ← Web GUI Architecture
│ └── showcase.md ← Scenario-specific post-mortems
└── internal/ ← Project-wide context and developer guides
└── context.md ← This file (Primary entry point)
└── intel/ ← Project Knowledge Base & Developer Guides
├── radar/ ← Deep-dives into physics math and RCS diagnostics
│ ├── core/ ← Calibration and architecture guides
│ ├── diagnostics/ ← Shenron_debug.md and energy suppression logs
│ ├── metrology_suite/ ← Heatmap and CFAR integration docs
│ ├── research/ ← Mathematical deep-dives (e.g. Isotropic Illumination)
│ └── ADC_Data.md ← FMCW and I/Q sampling reference
├── scenarios/ ← Operational manuals for driving simulations
│ ├── braking.md
│ ├── dashboard.md
│ └── showcase.md
├── internal/ ← Context manifest and legacy architecture references
│ ├── context.md ← This file
│ └── old_implement.md
└── CHRONICLES.md ← Running timeline of weekly updates and feature completions
```
---
## Key Files — Detailed Reference
### `dashboard.bat` & `dashboard/` — Web GUI Orchestrator
A Flask-based web dashboard that provides an intuitive interface for running CARLA scenarios without the CLI. It dynamically fetches available scenarios and config params, translates user choices into `run.bat` commands as a background subprocess, and heavily streams the unbuffered Python stdout text back to the browser using Server-Sent Events (SSE).
### `dashboard.bat` & `dashboard/app.py` — Web GUI Orchestrator
A Flask-based web dashboard (`app.py`) that provides an intuitive interface for running CARLA scenarios without the CLI.
- **API Endpoints**: Dynamically fetches available scenarios (`/api/config`), scenario-specific parameters (`/api/scenario_params/<name>`), and manages the CARLA simulator lifecycle (launching, killing, and putting the GPU into "idle" mode).
- **Execution**: Translates user choices into `run.bat` commands spawned as background subprocesses.
- **Streaming**: Streams the unbuffered Python stdout text back to the browser using Server-Sent Events (SSE).
> **Full Architecture details:** See `intel/scenarios/dashboard.md` for a complete breakdown of API routing and extension guidelines.
---
@ -98,38 +120,37 @@ Single source of truth for all simulation-wide defaults. It NO LONGER contains s
---
### `src/main.py` — Orchestrator
**Responsibilities:** CARLA connection, ego spawn, sensor init, scenario load, main loop, shutdown.
**Does NOT contain any scenario-specific logic.**
### `src/pipeline/` — Stage-Based Architecture
The simulation pipeline has been refactored into modular, sequential stages.
- **`src/pipeline/base.py`**: Defines the `PipelineContext` (shared state container) and the `PipelineStage` abstract base class (requires `name`, `run()`, and `cleanup()`).
- **`src/pipeline/manager.py`**: `PipelineManager` orchestrates the sequential execution of stages. It handles `skip_stages` flags, propagates `stop.flag` early halts, and guarantees `cleanup()` is called in reverse order for all started stages if an error occurs.
- **`src/pipeline/stages/`**: Individual worker implementations.
- `sim_stage.py` (`SimulationStage`): Runs the live CARLA capture loop. Connects to CARLA, handles Ego/NPC spawning, applies weather, runs the `world.tick()` loop, reads sensor queues, and records frames via the `Recorder`. Detects `tmp/stop.flag` for graceful shutdown.
- `shenron_stage.py` (`ShenronStage`): Runs the physics-based radar synthesis over the recorded dataset (calls `generate_shenron.py`). Preserves SSE progress tags (`[SHENRON_INIT]`) for the dashboard.
- `mcap_stage.py` (`McapStage`): Performs Foxglove serialization via `data_to_mcap.py`.
- `video_stage.py` (`VideoStage`): Stitches captured camera frames into MP4 previews.
**CLI:**
```powershell
python src/main.py --scenario braking --params "BRAKE_FRAME=100"
python src/main.py --scenario cutin --frames 120 --weather Rain
python src/main.py --list-scenarios
```
---
### `src/main.py` — Entry Point
**Role:** Thin CLI wrapper that initializes the `PipelineManager`.
**Features:**
- Parses arguments and initializes the `PipelineContext`.
- Supports selective execution via flags like `--only-mcap`, `--only-shenron`, `--skip-shenron`, `--skip-mcap`, `--skip-sim`.
- Supports session reuse via `--session <path>` to re-process existing data (e.g., running Shenron or MCAP conversions on old recordings without re-running CARLA).
---
### `src/utils.py` — Utilities
Contains shared project helpers, currently featuring `get_weather_preset(name)`, which safely maps simple string names (e.g., "Clear", "Rain") to the corresponding `carla.WeatherParameters` objects.
---
**New Flags:**
- `--params`: Scenario-specific adjustments (e.g. `SPEED=40`).
- `--weather`: Override scenario default (e.g. `Sunset`, `Wet`).
- `--no-record`: Dry-run simulation tracking with no disk I/O.
**Execution flow:**
1. Parse args → `load_scenario(name)` → returns `ScenarioBase` instance
2. **Handle Parameter Injection**: Call `scenario.apply_parameters(args.params)`
3. Connect CARLA, settle sync mode, apply chosen weather (CLI > Scenario > Config)
4. Clear existing actors, spawn ego at **`ego_spawn_point`** (deterministic index or Transform)
5. `SensorManager.spawn_sensors()` → attach camera / radar / lidar to ego
6. `Recorder(scenario_name=scenario.name)` → creates `data/<scenario>_<ts>/`
7. **Settle Physics**: Call `world.tick()` once to synchronize Ego location before setup
8. `scenario.setup(world, ego, traffic_manager)`
9. **Main loop** per frame: `world.tick()`**IPC Stop Check (`tmp/stop.flag`)**`sensor_manager.get_data()`
`recorder.save(..., extra_meta=scenario.get_scenario_metadata())``scenario.step(frame, ego, pbar)`
10. `finally`: `scenario.cleanup()``sensor_manager.destroy()` → restore async mode
> **Key invariant:** `main.py` never imports a scenario module by name.
> The CARLA import itself is deferred until after `--list-scenarios` early-exit so the dry-run
> works without a running CARLA server.
### `src/processing/physics.py` — Centralized Math & Physics
Standalone utilities for sensor data augmentation and ADAS metrology.
- **`calculate_radial_velocity()`**: Injects radial speed onto LiDAR points by projecting relative velocity onto the line-of-sight vector (used heavily by Shenron synthesis).
- **`calculate_relative_metrics()`**: Computes ground-truth Range, Azimuth, and Closing Velocity between the Ego and NPCs.
- **`get_actor_class()`**: Categorizes CARLA actors into broad ADAS classes (`vehicle`, `vru`, `pedestrian`).
---
@ -229,7 +250,6 @@ NPCs should be spawned with a **0.5m Z-offset (lift)** relative to the road wayp
### Implemented Scenarios
| File | Class | Trigger | Effect |
| File | Class | Default Effect | Deterministic? |
|---|---|---|---|
| `braking.py` | `BrakingScenario` | Lead vehicle brakes at frame 80 | Yes (Spawn-and-Move) |
@ -241,38 +261,42 @@ All scenarios now encapsulate their own defaults and support CLI injection via `
---
### `scripts/data_to_mcap.py` — MCAP Converter
Scans `data/` for subfolders containing `frames.jsonl` and converts each to a `.mcap` file.
Output is written as `data/<session>/<session>.mcap` (skips if already exists).
### `scripts/` — Production Utilities & Shenron Engine
The `scripts/` folder houses post-processing and conversion utilities, most notably the physics-based radar synthesis engine (Shenron).
#### Shenron Engine (`scripts/ISOLATE/`)
- **`scripts/ISOLATE/shenron_orchestrator.py`**: The unified orchestration engine. It ensures total parity between the production pipeline and iterative lab tests. Manages the directory structures, initializes radar models, processes LiDAR frames, and saves ADC data, pointclouds, and metrology `.npy` files.
- **`scripts/generate_shenron.py`**: The production wrapper invoked by the `ShenronStage`. Feeds the orchestrator with LiDAR data from the session and handles telemetry string reporting (`[SHENRON_INIT]`, `[SHENRON_STEP]`) back to the Flask dashboard.
- **`scripts/ISOLATE/model_wrapper.py`**: Defines `ShenronRadarModel`, providing an object-oriented interface over the underlying physics engine and DSP. It synchronizes hardware specifications (bandwidth, chirps) from `config.yaml` to the global configuration used by the processor.
- **`scripts/ISOLATE/sim_radar_utils/radar_processor.py`**: The core DSP chain. Simulates the TI mmWave hardware accelerators by executing the Range FFT, Doppler FFT, CFAR detection, peak grouping (NMS), and Angle/Azimuth beamforming (3D-FFT) to convert raw ADC cubes into a final 3D point cloud.
- **`scripts/ISOLATE/sim_radar_utils/plots.py`**: Visualization engine. Contains `FastHeatmapEngine` (a stateful matplotlib renderer optimized for high-speed frame-by-frame rendering by reusing figure memory) and other rendering functions for RA/RD heatmaps.
#### Foxglove Serialization
- **`scripts/data_to_mcap.py`**: Converts the raw data folders (`data/<session>/`) into Foxglove-compatible `.mcap` files.
**Foxglove topics produced:**
**Foxglove topics produced by `data_to_mcap.py`:**
| Topic | Schema | Content |
|---|---|---|
| `/camera` | `foxglove.CompressedImage` | Base64-encoded PNG |
| `/lidar` | `foxglove.PointCloud` | X/Y/Z float32, Y-axis flipped for ROS convention |
| `/radar` | `foxglove.PointCloud` | Spherical → Cartesian conversion, Y-flipped |
| `/radar/*` | `foxglove.PointCloud` | Synthesized Shenron pointclouds |
| `/ego_pose` | `foxglove.Pose` | Position + quaternion from yaw angle |
| `/metrology/*` | `foxglove.Grid` | Heatmaps and CFAR diagnostic matrices |
> **Coordinate system note:** CARLA uses left-handed coords (Y increases right).
> The converter negates Y and yaw to match ROS/Foxglove right-handed convention.
**Run:**
```
python scripts/data_to_mcap.py
```
Processes all unprocessed session folders in `data/` automatically.
---
## Full Pipeline — End-to-End
```
1. CARLA server running (CarlaUE4.exe)
2. run.bat braking → data/braking_<ts>/ (PNG + NPY + JSONL)
3. run.bat cutin → data/cutin_<ts>/
4. run.bat obstacle → data/obstacle_<ts>/
5. python scripts/data_to_mcap.py → data/*/<session>.mcap
2. run.bat braking → [SIMULATION] → data/braking_<ts>/ (PNG + NPY + JSONL)
3. → [SHENRON] → synthesizes radar physics → data/braking_<ts>/<radar_type>/
4. → [MCAP] → scripts/data_to_mcap.py → data/braking_<ts>/<session>.mcap
5. → [VIDEO] → mp4 preview stitcher
6. Open .mcap in Foxglove Studio
```
@ -328,6 +352,20 @@ class MyScenario(ScenarioBase):
---
## Knowledge Base (`intel/`)
The `intel/` directory is the project's brain, storing deep-dive documentation, research logs, and agent protocols. Unlike the codebase sections above, these are markdown files (`.md`).
- **`intel/radar/`**: The core research repository for the Shenron physics engine.
- `ADC_Data.md`: Comprehensive technical reference on raw ADC data, FMCW, I/Q sampling, and signal synthesis.
- `core/`: High-level architecture of the Shenron engine, antenna gain calibration, and implementation guides.
- `diagnostics/`: Crucial debugging logs (e.g., `Shenron_debug.md` is the source of truth for RCS calibration and 3D energy suppression issues).
- `metrology_suite/`: Documentation on the Foxglove integration (RARD, CFAR heatmaps, Auto MCAP).
- `research/`: Mathematical deep-dives into physics problems like isotropic illumination and symmetric RCS reflection.
- **`intel/scenarios/`**: Operational manuals for driving simulations (e.g., dashboard architecture, showcase walkthroughs).
- **`intel/internal/`**: Project-wide context (like this file) and legacy implementation archives.
---
## Known Limitations & Future Work
| Area | Status | Notes |

473
intel/radar/ADC_Data.md

@ -0,0 +1,473 @@
# 📡 Radar Deep Dive: The Comprehensive Guide to ADC Data (Raw Samples)
This document serves as the master reference for Raw ADC (Analog-to-Digital Converter) data within the Fox CARLA Shenron simulation engine. It is designed to bridge the gap between Mechanical Engineering, Computer Science, and Electrical Engineering, providing intuitive explanations and deep technical dives into how radar physics are simulated in code.
---
## 1. Foundation: The FMCW Radar Principle (For Non-EE Engineers)
Before analyzing data structures, we must understand the physical mechanism of the sensor. The sensors we simulate (TI AWRL1432, Radarbook) are **FMCW (Frequency Modulated Continuous Wave)** radars.
### 1.1 What is a Chirp? (The Sweeping Pitch Analogy)
Imagine you are standing in a large empty factory hall and you have a special whistle that changes pitch continuously. You press the whistle and it starts at a very low bass note, then steadily ramps up to a high-pitched squeal over 40 microseconds (0.00004 seconds), then instantly resets to the low note and repeats. Each of these pitch-sweeps is called a **"Chirp."**
In radar, we do the same with electromagnetic waves (radio waves) instead of sound:
* **Start Frequency ($f_c$):** The starting pitch. For automotive radars this is 77 GHz (77,000,000,000 vibrations per second). This frequency is invisible and harmless — think of it like a colour of light you cannot see.
* **Bandwidth ($B$):** The total pitch range swept. The AWRL1432 sweeps 137.2 MHz; the Radarbook sweeps 250 MHz. A wider sweep = finer ability to separate nearby objects.
* **Chirp Duration ($T_c$):** How long one sweep takes. For the Radarbook: `N_sample / samp_rate = 256 / 1e6 = 256 microseconds`.
* **Frequency Slope ($k = B / T_c$):** How fast the frequency increases. Think of this as "how steep the ramp is."
* **Range Resolution Formula:** `range_res = c / (2 * B)`. For the Radarbook (B=250 MHz): `3e8 / (2 * 250e6) = 0.6 metres`. This means two objects 0.6 m apart are the minimum resolvable.
### 1.2 What is a Mixer? (The Pitch-Comparator Analogy)
Back to the factory hall. You blow your sweeping whistle at a metal wall. The sound travels to the wall and echoes back. Here is the key insight:
**Because sound takes time to travel,** the echo reaching your ear right now was emitted slightly in the past. Since your whistle pitch is always increasing, the echo has a *lower pitch* than what you are currently blowing.
Now imagine you hold two instruments simultaneously:
- Your left hand plays the current whistle note (the transmitted signal, TX).
- Your right hand plays the echo from the wall (the received signal, RX).
If you play both at the same time, your ear (or a microphone) hears an acoustic phenomenon called a **"beat"** — a slow wobble in loudness caused by two slightly different frequencies competing. The rate of this wobble is exactly the *difference* between the two frequencies. This is the **Beat Frequency**.
In hardware, the **Mixer** is the electronic component that performs this comparison. It multiplies TX by RX mathematically. The result contains the Beat Frequency (the difference), which is then isolated by a low-pass filter. This Beat Frequency signal is what the ADC samples.
**The golden rule: `Beat Frequency = (2 * k * Range) / c`**
* Close target (5 m) → small beat frequency → low-pitched wobble.
* Far target (80 m) → large beat frequency → high-pitched wobble.
The ADC simply records this "wobble" at discrete time steps. When you later apply an FFT, each distinct wobble frequency appears as a peak — and you convert it back to a distance.
### 1.3 Hardware Parameters in Our Codebase
These parameters are defined in `scripts/ISOLATE/e2e_agent_sem_lidar2shenron_package/ConfigureRadar.py`:
| Parameter | AWRL1432 | Radarbook | Meaning |
|---|---|---|---|
| `f` | 77 GHz | 24 GHz | Start frequency |
| `B` | 137.2 MHz | 250 MHz | Bandwidth (range resolution) |
| `N_sample` | 256 | 256 | ADC samples per chirp |
| `chirps` | 128 | 128 | Chirps per frame (velocity resolution) |
| `nRx` | 6 | 8 | Receiver antenna channels |
| `chirp_rep` | 36.4 µs | 750 µs | Repetition interval between chirps |
| `noise_amp` | 0.005 | 0.005 | Noise floor amplitude |
---
## 2. I/Q Sampling: The Reason ADC Data is Complex
A common point of confusion is why ADC data is stored as **Complex Numbers (Real + Imaginary pairs)** rather than a simple list of voltage values. This section explains the physics and hardware design that necessitates this.
### 2.1 The Ambiguity Problem (The Blindfold Analogy)
Imagine you are blindfolded and someone hands you a microphone. A person walks past you dragging a stick along a picket fence: "clack-clack-clack." You can determine how fast they are walking from the rhythm. But **which direction** are they walking? Left-to-right, or right-to-left? With only one microphone (one measurement), you cannot tell. The signal is intrinsically ambiguous about direction.
In radar signal processing, this problem appears as **Doppler ambiguity**. The Doppler effect shifts the beat frequency slightly depending on whether the target is approaching or receding. However, if we only record a single real voltage stream, the mathematical result is a cosine wave `A·cos(2π·f·t + φ)`. By fundamental trigonometry, `cos(x) = cos(-x)`, meaning the positive and negative frequency components look **identical**. A car moving toward you at 10 m/s produces the exact same signal as a car moving away at 10 m/s. You know the speed magnitude, but the direction is lost.
### 2.2 The Hardware Solution: Two Mixers at 90° Offset
The hardware solution is elegant. Instead of one mixer, the radar uses **two mixers in parallel**:
**Path 1 — I (In-phase):**
The received echo is multiplied by the transmitted chirp at exactly 0° phase. This produces the **In-phase (I)** voltage.
**Path 2 — Q (Quadrature):**
A copy of the transmitted chirp is shifted by exactly 90° (a quarter of a full wave cycle) using a phase-shifter circuit. The received echo is multiplied by this 90°-shifted version. This produces the **Quadrature (Q)** voltage.
The word *Quadrature* simply means "at 90 degrees" — it comes from the Latin for "square," referring to the quarter-turn rotation. Two separate ADC chips sample I and Q simultaneously.
### 2.3 What I and Q Values Tell You
Let us walk through a concrete example. Suppose a car is at 20 metres, moving toward the sensor at 8 m/s.
* The beat frequency encodes the range (20 m).
* The Doppler shift creates a tiny phase advancement in the I and Q signals from chirp to chirp.
* If the car is approaching: I leads Q in phase (I peaks before Q).
* If the car is receding: Q leads I in phase (Q peaks before I).
By looking at the *relationship* between I and Q, the DSP can unambiguously determine direction.
### 2.4 The Complex Representation (The Clock-Hand Analogy)
Mathematically, we merge the two real streams into one complex number:
`S(t) = I(t) + j·Q(t)`
Where `j = √(-1)` is the imaginary unit. This is not just a notation trick — it allows us to use the powerful mathematics of complex exponentials.
Instead of thinking of the signal as a wave going up and down, picture it as a clock hand spinning on a dial:
* The **length** of the clock hand is the signal amplitude (how strong the reflection is).
* The **angle** of the clock hand is the signal phase (encoding range and velocity information).
* **Approaching target:** the clock hand spins counter-clockwise over successive chirps.
* **Receding target:** the clock hand spins clockwise over successive chirps.
A complex FFT naturally sees the direction of this spin and separates positive from negative velocities into different bins on the Doppler axis. A real-only FFT cannot.
### 2.5 Practical Consequence in Our Data
Because we use I/Q sampling:
* Our ADC tensors use `dtype=complex128` (or `complex64`), storing a real and imaginary float per sample.
* The `reformat_adc_shenron()` function in `sim_radar_utils/utils_radar.py` rearranges the axes but preserves the complex dtype throughout.
* The `RadarProcessor.cal_range_fft()` and `cal_doppler_fft()` functions use `scipy.fft.fft()` which accepts complex input and correctly resolves positive/negative velocities.
* If ADC data were real-valued only, the Range-Doppler plot would be **symmetric** (mirrored), and we could not distinguish which side was real.
### 2.6 SNR Benefit of Complex Sampling
There is an additional engineering benefit beyond direction disambiguation. A complex FFT effectively doubles the number of usable frequency bins compared to a real FFT of the same length. This results in approximately **3 dB better Signal-to-Noise Ratio (SNR)** — meaning we can detect targets that are half as reflective. For ADAS this is significant: a pedestrian at 50 m reflects far less energy than a truck.
---
## 3. Shenron's Signal Synthesis: Physics to Code
In `heatmap_gen_fast.py`, since we have no physical antenna, we mathematically synthesize what the ADC voltages would be, using GPU-accelerated tensor operations (PyTorch). Here is the exact step-by-step physics model with the corresponding code.
### Step 3.1: Input Geometry — The Point Cloud as Scatterers
The pipeline takes the semantic LiDAR point cloud where every $(x, y, z)$ coordinate is treated as an independent **Point Scatterer** — a tiny reflective surface.
For each scatterer, three physical quantities are extracted:
* **Range ($\rho$):** Euclidean distance `np.linalg.norm([x, y, z])`. This determines the time delay.
* **Azimuth ($\theta$):** Angle relative to the radar boresight. Used for Angle of Arrival (AoA) computation.
* **Radar Cross Section (loss):** The LiDAR intensity (optical reflectivity) is used as a proxy for electronic reflectivity. It is scaled by `1/rho^2` to model the **radar range equation**: the power returned from a target falls off as the 4th power of distance (the signal travels twice — once to the target, once back). The code does this with: `loss = torch.tensor(loss * (1 / rho ** 2))`.
### Step 3.2: The Round-Trip Time Delay ($\tau$)
For each scatterer and each chirp, the code computes the exact round-trip time delay:
```python
tau = 2 * (rho / radar.c + chirp * radar.chirp_rep * speed / radar.c)
```
Breaking this down:
* `2 * rho / c` — the static delay for sound (or light) to travel to the target and back.
* `chirp * chirp_rep * speed / c` — the additional delay caused by the target moving during the frame. `chirp_rep` is the time between two consecutive chirps. So for chirp number 3, the target has moved `3 * chirp_rep * speed` further. Dividing by `c` converts that distance to additional time.
This second term is what physically generates the **Doppler Effect** across chirps. It is typically nanometer-scale but critically important because the Doppler FFT is extremely sensitive to phase.
### Step 3.3: Antenna Array Spatial Delays ($\delta$)
For a multi-antenna radar (e.g., AWRL1432 has 6 Rx antennas; Radarbook has 8), the signal hits each antenna at a slightly different time depending on the target's azimuth angle. The code computes a per-antenna spatial delay:
```python
delta[i, :] = i * sRx * torch.sin(np.pi / 2 - theta) / radar.c
```
Where `sRx = lambda / 2` is the half-wavelength antenna spacing (a standard MIMO design). `i` is the antenna index (0 through nRx-1). A target directly in front (theta=0) has zero additional delay across antennas. A target at 45° off-boresight has a significant inter-antenna delay — this is what allows us to compute the Angle of Arrival later.
### Step 3.4: The Beamforming Vector (Spatial Phase)
The beamforming vector encodes the complete spatial and temporal phase for every point, antenna, and time sample:
```python
beamforming_vector = torch.exp(
1j*2*np.pi*(
radar.f * delta[:,:,None] # Carrier phase across antennas
- 0.5 * k * delta[:,:,None]**2 # Second-order FMCW correction
- k * tau[None,:,None] * delta[:,:,None] # Cross-coupling (range x angle)
+ k * delta[:,:,None] @ t[None,None,:]) # Per-sample phase
)
```
This is the most mathematically complex line. It computes `exp(j * phase)` which produces a complex unit vector rotating at the beat frequency. The `@` (matrix multiply) operation broadcasts this across all 256 sample times simultaneously on the GPU.
### Step 3.5: The Dechirped Beat Signal
The actual beat signal (what the mixer outputs) is:
```python
dechirped = torch.exp((1j * 2 * np.pi) *
(radar.f * tau[:,None] + k * tau[:,None] @ t[None,:] - 0.5 * k * tau[:,None]**2))
```
This implements the FMCW beat frequency equation directly. For a target at 20 m (`tau = 2*20/3e8 ≈ 133 ns`), this produces a complex sinusoid oscillating at the beat frequency corresponding to that range.
### Step 3.6: Signal Superposition (The Summation)
All scatterer contributions are combined:
```python
signal_single_antenna = loss_factor * dechirped # Per-scatterer amplitude
signal = beamforming_vector * signal_single_antenna[None,:,:] # Spread across antennas
signal = torch.squeeze(torch.sum(signal, 1)) # Sum all scatterers -> final voltage
```
At every single ADC time sample, this produces the total complex voltage that a physical antenna would measure if all those LiDAR points were real physical objects.
### Step 3.7: Hardware Gain Scaling
After superposition, the signal is scaled by the radar's system gain constant:
```python
adc_sampled = torch.sqrt(torch.tensor(radar.gain * _lambda**2 / (4*np.pi)**3)) * signal
```
This implements the **Friis Transmission Equation** accounting for antenna efficiency and wavelength. `radar.gain = 10**(110/10)` for the current calibration in `ConfigureRadar.py`.
### Step 3.8: Noise Floor Addition and Tuning
The clean synthetic signal is then corrupted with thermal noise:
```python
adc_sampled = adc_sampled + signal_Noisy
```
Where `signal_Noisy = radar.get_noise()` generates:
```python
noise_real = np.random.normal(0, 1, size=(nRx, N_sample))
noise_complex = np.random.normal(0, 1, size=(nRx, N_sample))
signal_Noisy = (noise_real + 1j * noise_complex) * self.noise_amp
```
**Why this matters for real-world matching:**
Real radar hardware has a **Noise Figure (NF)** (typically 10-15 dB for automotive radar) which determines the minimum detectable signal. Our `noise_amp = 0.005` parameter controls this floor. If `noise_amp` is too low, every distant weak scatterer (road surface dust, tree leaves) registers as a target. If too high, real targets get buried.
The current calibration (`noise_amp = 0.005`) was determined empirically through iteration testing to match the CFAR false-alarm rate to realistic values. In a real system, you would characterize the hardware Noise Figure via bench measurements and set `noise_amp` accordingly.
### Step 3.9: Writing Into the ADC Measurement Cube
The final complex voltages are placed into the output 3D array:
```python
measurement[chirp, :, :] = adc_sampled # Shape: (N_chirps, nRx, N_sample)
```
After all chirps are processed, `measurement` is the complete raw ADC tensor — a 3D complex array representing what the physical radar hardware would have recorded.
---
## 4. Pipeline Integration: The Complete Data Flow
The Fox pipeline orchestrates radar signal synthesis and storage through a layered architecture. Understanding this flow is essential for debugging, modifying, or extending the ADC capture capability.
### 4.1 High-Level Data Flow Diagram
```
CARA Simulator
└─> Semantic LiDAR (.npy per frame)
└─> generate_shenron.py [Orchestrator]
└─> ShenronRadarModel.process() [model_wrapper.py]
└─> run_lidar() [lidar.py]
└─> heatmap_gen() [heatmap_gen_fast.py]
└─> RAW ADC TENSOR ← saved HERE
└─> reformat_adc_shenron() [utils_radar.py]
└─> REFORMATTED ADC
└─> RadarProcessor [radar_processor.py]
├─> cal_range_fft() → Range Profile
├─> cal_doppler_fft() → Doppler Profile
└─> convert_to_pcd() → Rich Point Cloud + Heatmaps
├─> Saves: <session>/<radar_type>/<frame>.npy (Point Cloud)
├─> Saves: <session>/<radar_type>/adc_raw/<frame>.npy (Raw ADC)
└─> Saves: <session>/<radar_type>/metrology/{rd,ra,cfar}/<frame>.npy
```
### 4.2 The Orchestrator: `generate_shenron.py`
This script is the master batch processor. It processes every simulation session in the `data/` directory.
**Initialization Phase:**
1. For each radar type (`awrl1432`, `radarbook`), a `ShenronRadarModel` is instantiated.
2. The model internally creates a `radar` hardware object (from `ConfigureRadar.py`) and a `RadarProcessor`.
3. Three metrology sub-folders are created: `rd/`, `ra/`, `cfar/`.
4. The physical axes (`range_axis.npy`, `angle_axis.npy`) are saved once per session — these do not change between frames as they are determined by the hardware config.
5. **The `adc_raw/` folder** is also created here, one per radar type per session.
**Per-Frame Processing Loop:**
```python
# From generate_shenron.py (simplified)
for lidar_file in lidar_files:
data = np.load(lidar_file) # Load LiDAR cloud
rich_pcd = model.process(data) # Run physics synthesis
# Capture raw ADC from buffer
if hasattr(model, 'last_adc') and model.last_adc is not None:
adc_path = session_path / r_type / 'adc_raw' / lidar_file.name
np.save(adc_path, model.last_adc) # Save raw ADC
np.save(output_file, rich_pcd) # Save point cloud
```
### 4.3 The Physics Bridge: `model_wrapper.py`
The `ShenronRadarModel.process()` method is the critical integration point.
**Inside `process(data)`:**
```python
# Step 1: Re-sync hardware config (critical for multi-radar sessions)
self._sync_configs()
# Step 2: Synthesize raw ADC (this calls heatmap_gen_fast.py)
raw_adc = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
# raw_adc shape at this point: (N_chirps, nRx, N_sample) — as output by heatmap_gen
# Step 3: Buffer raw ADC for external access
self.last_adc = raw_adc
adc_data = raw_adc
# Step 4: Reformat axes to match RadarProcessor expectations
adc_data = reformat_adc_shenron(adc_data)
# After reformat: (N_sample, N_chirps, nRx) — transposed
# Step 5: Run DSP chain
range_profile = self.processor.cal_range_fft(adc_data)
doppler_profile = self.processor.cal_doppler_fft(range_profile)
_, rich_pcd, metrology = self.processor.convert_to_pcd(doppler_profile)
```
### 4.4 The `reformat_adc_shenron()` Step — Why It Matters
The raw ADC from `heatmap_gen_fast.py` has shape `(N_chirps, nRx, N_sample)` because the synthesis loops over chirps and builds the antenna dimension first.
The `RadarProcessor` in `radar_processor.py` expects data as `(N_sample, N_chirps, nRx)` because its windowing arrays (`rangeWin`, `velWin`) are pre-computed in that axis order. The `reformat_adc_shenron()` function performs this axis reordering:
```python
def reformat_adc_shenron(data):
data = np.swapaxes(data, 1, 2) # Swap axes 1 and 2: (Np, N, nRx)
chirpLevelData = data
return np.transpose(chirpLevelData[:, :, :], (1, 0, 2)) # -> (N, Np, nRx)
```
This is the step where we **save `self.last_adc = raw_adc` BEFORE this reformat** — so the stored ADC is in the original synthesis shape `(N_chirps, nRx, N_sample)`, closest to the hardware physical format.
### 4.5 Memory Footprint and Disk Impact
| Radar | Chirps | Samples | Antennas | Per-Frame Size (complex64) |
|---|---|---|---|---|
| AWRL1432 | 128 | 256 | 6 | 128×256×6×8 bytes ≈ **1.5 MB** |
| Radarbook | 128 | 256 | 8 | 128×256×8×8 bytes ≈ **2.0 MB** |
* A 500-frame braking scenario generates ~750 MB of ADC data (both radars combined).
* Compared to the point cloud (~4 KB per frame), ADC data is ~375x larger.
* If disk space is constrained, set a frame sub-sampling factor in `generate_shenron.py`.
---
## 5. DSP Implementation: From ADC to Heatmaps (Codebase Walkthrough)
This section traces exactly how the stored ADC data is processed into the RD and RA heatmaps visible in the Foxglove dashboard. All code references are from `scripts/ISOLATE/sim_radar_utils/radar_processor.py`.
### 5.1 The RadarProcessor Initialization
When `ShenronRadarModel` is created, it instantiates a `RadarProcessor`. Inside `__init__`, the processor pre-computes all the windowing arrays and physical axes that will be reused for every frame:
```python
# Hanning windows for Range and Doppler dimensions
self.rangeWin = np.tile(signal.windows.hann(radarCfg['N']), (radarCfg['Np'], radarCfg['NrChn'], 1))
self.rangeWin = np.transpose(self.rangeWin, (2, 0, 1)) # -> (N_sample, N_chirps, nRx)
self.velWin = np.tile(signal.windows.hann(radarCfg['Np']), (radarCfg['N'], radarCfg['NrChn'], 1))
self.velWin = np.transpose(self.velWin, (0, 2, 1)) # -> (N_sample, N_chirps, nRx)
```
**Why Hanning Windows?** When you take an FFT of a finite signal, the sharp edges at the start and end of the data cause the frequency content to "smear" into adjacent bins (Spectral Leakage). A Hanning window gently tapers the data to zero at both edges, dramatically reducing this leakage and creating cleaner peaks. It is the equivalent of slowly fading in and fading out a recording instead of cutting it sharply.
Physical range and velocity axes are pre-calculated:
```python
rangeRes = c0 / (2 * (fStop - fStrt)) # metres per bin
self.rangeAxis = np.arange(0, NFFT) * N/NFFT * rangeRes # physical range values
self.velAxis = np.arange(-NFFTVel//2, NFFTVel//2)/NFFTVel * (1/Tp) * c0/(2*fc) # m/s
```
### 5.2 Step 1: The Range FFT (`cal_range_fft`)
```python
def cal_range_fft(self, data):
return fft(data * self.rangeWin * self.velWin, fftCfg['NFFT'], 0)
```
What this does:
1. **Windowing:** `data * self.rangeWin * self.velWin` applies the Hanning window to both the samples and chirps dimensions simultaneously.
2. **FFT along axis 0:** The FFT is applied across the N_sample (range) dimension. For each chirp and each antenna, the time-domain beat frequency signal is converted into a frequency-domain spectrum. Each frequency bin maps to a specific range.
3. **Output shape:** `(NFFT, N_chirps, nRx)` where `NFFT >= N_sample` (zero-padded for interpolation).
After this step, you have a **Range Profile** — for each antenna and each chirp, you can see which distances have strong reflections.
### 5.3 Step 2: The Doppler FFT (`cal_doppler_fft`)
```python
def cal_doppler_fft(self, rangeProfile):
return fftshift(fft(rangeProfile[self.RMinIdx:self.RMaxIdx+1, :], fftCfg['NFFTVel'], 1), 1)
```
What this does:
1. **Range Clipping:** `rangeProfile[RMinIdx:RMaxIdx+1, :]` discards range bins outside the configured `RMin`/`RMax` window (set in `config.yaml`).
2. **FFT along axis 1:** The FFT is applied across the N_chirps (Doppler) dimension. For each range bin and each antenna, the slow phase rotation from chirp to chirp is converted into a velocity spectrum.
3. **`fftshift`:** By default, FFT places zero frequency at index 0, with the positive half first and negative half at the end. `fftshift` reorders this so zero velocity is in the centre, negative velocities are on the left, and positive are on the right — exactly as seen in the RD heatmap.
4. **Output shape:** `(N_range_bins, NFFTVel, nRx)`.
### 5.4 Step 3: Range-Doppler Heatmap (Inside `convert_to_pcd`)
```python
rd_heatmap = np.sum(np.abs(dopplerProfile)**2, axis=2)
```
This collapses the antenna dimension by:
1. `np.abs(dopplerProfile)` — computing the magnitude of each complex number (clock hand length).
2. `**2` — squaring it to get power (energy) rather than amplitude.
3. `np.sum(..., axis=2)` — incoherent summation across all `nRx` antennas.
"Incoherent" means we add the *powers* (not the complex voltages). This avoids antenna-phase cancellation from interfering signals. The result is a 2D array `(N_range_bins, NFFTVel)` — the brightly coloured Range-Doppler map.
### 5.5 Step 4: CFAR Detection
```python
hit_matrix, threshold_matrix = self.cfar(rd_heatmap)
```
The `CA_CFAR` (Cell Averaging CFAR) detector scans every cell of the RD heatmap. For each cell:
1. It looks at a window of surrounding cells (guard cells excluded) and averages their power — this is the estimated noise floor.
2. It multiplies by a threshold factor: `threshold = noise_floor * cfar_threshold`.
3. If the current cell exceeds this threshold, it is marked as a target in `hit_matrix`.
The `threshold_matrix` (the CFAR noise floor) is also saved as the `cfar` metrology file.
### 5.6 Step 5: Angle of Arrival via Antenna FFT
For each detected target `(row, col)` in the RD map:
```python
pointSel[i] = dopplerProfile[row, col, :] # Complex voltages at the 8 antennas
pointSel_windowed = pointSel * self.spatialWin # Apply Hanning spatial window
aoaProfile = fftshift(fft(pointSel_windowed, fftCfg['NFFTAnt'], 1), 1)
angleIdx = np.argmax(np.abs(aoaProfile), axis=1) # Peak = angle bin
angleVals = self.angleAxis[angleIdx] # Convert bin to radians
```
The complex voltages across the 8 antennas form a spatial array. An FFT across this array converts inter-antenna phase differences into an angle spectrum. The peak of this spectrum is the Angle of Arrival. Combined with range (`rangeVals = self.rangeAxis[rowSel]`), we get $(x, y)$ positions.
### 5.7 Step 6: Range-Azimuth Heatmap (Doppler-Slice Accumulation)
```python
ra_heatmap = np.zeros((N_range, fftCfg['NFFTAnt']), dtype=np.float64)
for d in range(N_doppler):
doppler_slice = dopplerProfile[:, d, :] # One velocity slice
doppler_slice_windowed = doppler_slice * self.spatialWin
angle_fft = fftshift(fft(doppler_slice_windowed, fftCfg['NFFTAnt'], axis=1), axes=1)
ra_heatmap += np.abs(angle_fft) ** 2 # Accumulate power
```
This is the most important difference from a simple RA plot. By iterating through every Doppler bin and doing the Angle-FFT independently, we preserve the angular sharpness of moving targets. If we had collapsed the Doppler dimension first (averaged all chirps), a moving target's phase would smear across the chirps and produce a blurred angle estimate.
The resulting `ra_heatmap` is saved as the `ra` metrology file and rendered as the BEV (Bird's Eye View) polar sector image in Foxglove.
---
## 6. The "Reverse DSP" Fallacy: A Deep Dive
A persistent engineering hypothesis is: *If we have the final Radar Point Cloud, can we read the $(x, y, z)$ coordinates and run the math backward to re-create the raw ADC data?*
This question is technically answerable with "yes, you can generate *a* signal," but the critical answer is **no, you cannot recover the *original* signal.** The two are fundamentally different things, and conflating them leads to systematically over-optimistic simulations.
### 6.1 The One-Way Information Bottleneck
The entire DSP chain from ADC to Point Cloud is a **lossy compression pipeline**:
```
RAW ADC (131,072 complex numbers per frame)
| Range FFT
v
Range Profile (still 131,072 numbers, but transformed)
| Doppler FFT
v
RD Heatmap (reduced to range_bins x doppler_bins = ~5,000 numbers)
| CFAR threshold (discards ~99% of cells)
v
Point Cloud (perhaps 20-50 complex detections)
```
At each step, information is permanently discarded. Going backwards from 20 points to 131,072 complex numbers requires *inventing* the missing 131,052 numbers. There is no mathematical inverse of a lossy compression.
### 6.2 The "Silent Background" Problem — CFAR Entropy Loss
In `radar_processor.py`, the `CA_CFAR` detector examines every cell of the RD heatmap. It estimates the local noise floor by averaging surrounding cells and only retains cells exceeding a threshold. Everything below the threshold — the noise floor itself, ground clutter, and marginal reflections — is discarded.
**The concrete consequence:** A reconstructed ADC signal has absolute silence between targets. When you re-run CFAR on this reconstructed ADC, the adaptive threshold drops to near-zero because there is no noise floor for CFAR to measure. Every tiny numerical artefact exceeds the threshold. The CFAR algorithm, calibrated against a real noise floor, now produces hundreds of false detections in a signal that was supposed to be clean.
This is the "clean-room" paradox: the cleaner you make the reconstructed data, the more wrong CFAR behaves on it.
### 6.3 The Illusion of Perfect Sidelobes
When an FFT processes a finite-duration signal, energy unavoidably leaks into neighbouring frequency bins. These ripples are called **Sidelobes**. Every real target produces a main peak surrounded by a pattern of sidelobes.
* **In real hardware:** The sidelobes are distorted by Local Oscillator (LO) phase noise (random jitter in the clock), amplifier non-linearities (signal clipping), and temperature-dependent gain drift. The sidelobe pattern is messy and irregular.
* **In reconstructed ADC:** Each target is represented by a pure, perfect complex sinusoid. Its FFT produces textbook-perfect sidelobes defined only by the Hanning window function. There is no irregularity.
Consequence: A CFAR algorithm tuned on real hardware sidelobes will behave completely differently when tested on perfect-sidelobe reconstructed data. A tracker that correctly ignores real hardware sidelobes may falsely detect the perfect sidelobes as real targets, or vice versa.
### 6.4 Multipath Erasure
In the real physical world (and in our Shenron simulation from LiDAR), electromagnetic waves bounce multiple times. A signal transmitted from the radar may:
1. Travel directly to a car and reflect back — the primary detection.
2. Bounce off the ground first, hit the car at an angle, and return — a "Ghost" at a slightly different apparent range.
3. Reflect off the car, bounce off a nearby wall, and return after an extra delay — another ghost.
These multipath reflections create **Ghost Targets** in the RD map. The CFAR algorithm may or may not detect them as targets depending on their strength. If they survive into the point cloud, they appear as extra spurious points.
**The fallacy:** If the point cloud has already filtered out ghost targets (or included them), reconstructing ADC from just the surviving real targets produces a signal with zero multipath. Testing a tracker on this tells you nothing about how it handles real-world multipath environments.
### 6.5 Irreversible Sub-Resolution Merging
The Radarbook (B=250 MHz) has a range resolution of 0.6 m. This means any two objects closer than 0.6 m to each other are indistinguishable in range — their beats overlap and appear as one stronger peak.
If two pedestrians are walking 0.4 m apart, the CFAR detects them as a single, stronger point at the midpoint between them. The point cloud stores one point. When you reconstruct ADC from that one point, you generate one single sine wave at the midpoint. There is no mathematical transformation that recovers two separate sine waves from that one point — the information is permanently merged.
### 6.6 Phase Matrix Loss — The AoA Bottleneck
Angle of Arrival estimation extracts the target angle from the inter-antenna phase pattern. After extracting the angle (`angleVals` in `convert_to_pcd`), the original complex phase across the 8 antennas is immediately discarded — only the angle in degrees is stored in the point cloud.
To reconstruct ADC for that target, you need to synthesize the complex voltages at all 8 antennas. You can compute what those phases *should* be given the stored angle, but there are two problems:
1. **Quantization error:** The stored angle is a single bin from an FFT. The true angle might be between two bins. The reconstruction picks exactly one bin, creating a slightly wrong phase pattern.
2. **Amplitude ambiguity:** The relative magnitudes across antennas were shaped by the real hardware antenna pattern. Unless you model that pattern precisely, the per-antenna amplitudes will be wrong, producing an AoA estimate that is biased after re-processing.
### 6.7 When Reverse DSP Is Acceptable
Despite all the above limitations, there are narrow use cases where reconstructing ADC from a point cloud is acceptable:
* **Unit testing DSP code:** If you want to verify that your `cal_range_fft` and `cal_doppler_fft` functions correctly detect a target at 30 m, constructing a synthetic single-target ADC (a pure sine wave at the 30 m beat frequency) is a valid unit test. You are testing the code logic, not the realism.
* **HIL injection with known ground truth:** If you inject a synthetic perfect ADC into a hardware ECU and measure where the ECU places the detection, you are characterizing the hardware's detection bias, not testing its performance against real noise.
For any scenario where you need the result to be comparable to real-world driving data, you must use the forward path: **CARLA → LiDAR → ADC Synthesis → DSP → Detection.**
---
## 7. Advanced Use Cases: The Value of Raw ADC
Given the memory footprint, why did we modify the Fox pipeline to capture ADC data?
1. **Custom DSP Architecture Design:** Engineers can bypass `sim_radar_utils`. With raw ADC data, they can write and test custom FFT windowing, dynamic 2D-CFAR algorithms, or Super-Resolution Angle estimation algorithms (like MUSIC or ESPRIT) directly in Python, validating changes without re-running the heavy CARLA simulation.
2. **Machine Learning & Neural Perception:** The cutting edge of AI research abandons traditional FFT/CFAR chains. Instead, massive 3D Complex ADC cubes are fed directly into 3D Convolutional Neural Networks (3D-CNNs) or Spatiotemporal Transformers. The AI learns the physics directly from the raw voltage, bypassing human-engineered thresholds.
3. **Hardware-in-the-Loop (HIL) Stimulation:** Raw ADC matrices can be formatted and injected directly into the digital baseband ports of physical radar Electronic Control Units (ECUs) on a testbench. This physically tricks the silicon processor into "seeing" the virtual CARLA simulation, allowing for hardware validation before the sensor is ever mounted on a car.
---
*Created: 2026-04-30 | Part of the Fox Radar Metrology Suite | Generated via Antigravity Agentic Memory.*

6
scripts/ISOLATE/e2e_agent_sem_lidar2shenron_package/ConfigureRadar.py

@ -25,7 +25,7 @@ class radar():
self.chirps = 3 # 128
self.nRx = 86 #16 # number of antennas(virtual antennas included, AOA dim)
self.noise_amp = 0.005 #0.0001(concrete+metal) # 0.00001(metal) #0.005(after skyward data)
self.gain = 10 ** (110 / 10) # Calibrated for Iteration 16
self.gain = 10 ** (120 / 10) # Recalibrated (+10dB) to compensate for 1/R^4 physics fix (Iteration 37)
self.angle_fft_size = 256
self.range_res = self.c / (2 * self.B) # range resolution
@ -70,7 +70,7 @@ class radar():
self.chirps = 128
self.nRx = 8 # number of antennas(virtual antennas included, AOA dim)
self.noise_amp = 0.005
self.gain = 10 ** (110 / 10) # Calibrated for Iteration 16
self.gain = 10 ** (120 / 10) # Recalibrated (+10dB) to compensate for 1/R^4 physics fix (Iteration 37)
self.angle_fft_size = 256
self.range_res = self.c / (2 * self.B) # range resolution
@ -115,7 +115,7 @@ class radar():
self.chirps = 128 # Increased to match 2TX MIMO frame depth for 3dB processing gain
self.nRx = 6
self.noise_amp = 0.005
self.gain = 10 ** (110 / 10) # Calibrated for Iteration 16
self.gain = 10 ** (120 / 10) # Recalibrated (+10dB) to compensate for 1/R^4 physics fix (Iteration 37)
self.angle_fft_size = 256
self.range_res = self.c / (2 * self.B)

27
scripts/ISOLATE/e2e_agent_sem_lidar2shenron_package/shenron/Sceneset.py

@ -460,13 +460,28 @@ def get_loss_3(points, rho, az_boresight, elev_angle, angles, radar, use_spec =
phi_deg = np.rad2deg(np.abs(np.pi / 2 - elev_angle))
G_ant = np.exp(-2.77 * np.power(phi_deg / radar.vertical_beamwidth, 2))
# --- Iteration 37: Area Integration (Resolution Independence) ---
# A single LiDAR point represents an expanding physical patch of Area = R^2 * dTheta * dPhi
point_area = np.power(rho, 2) * voxel_theta * voxel_phi
# --- AREA INTEGRATION DISABLED (Iteration 37 — Commented Out) ---
# The Area Integration model treated each LiDAR point as an expanding resolution cell
# (Area = R^2 * dTheta * dPhi). This R^2 area gain exactly cancelled the 1/R^2 transmit-path
# loss, making P_incident distance-independent (i.e., constant regardless of range).
# Combined with the 1/R^2 return-path loss in heatmap_gen_fast.py, the net result was
# only 1/R^2 total falloff — incorrect for point-scatterer targets like vehicles/pedestrians.
#
# Disabled to restore the physically correct 1/R^4 two-way radar range equation:
# Tx-path loss: 1/R^2 (here, in get_loss_3)
# Rx-path loss: 1/R^2 (applied in heatmap_gen_fast.py: loss * (1/rho^2))
# Total: 1/R^4
#
# NOTE: Gain recalibration may be needed. Removing the point_area multiplier will reduce
# signal magnitudes by ~30-40 dB (depending on voxel resolution). Adjust K_sq or
# radar.gain in ConfigureRadar.py if targets become invisible in the RD heatmap.
#
# point_area = np.power(rho, 2) * voxel_theta * voxel_phi # DISABLED — causes 1/R^2, not 1/R^4
# --- Iteration 17 preserved: Pure Physical 1/R^2 Tx path loss ---
# Intercepted power is weighted by the physical area the point represents
P_incident = (1 / np.power(rho, tx_dist_loss_exponent)) * K_sq * G_ant * point_area
# --- Point-Scatterer Model: Pure Physical 1/R^2 Tx path loss ---
# Each LiDAR point is treated as an isotropic point scatterer. Incident power falls
# off as 1/R^2 (one-way spreading loss), without any area-expansion compensation.
P_incident = (1 / np.power(rho, tx_dist_loss_exponent)) * K_sq * G_ant
# DEBUG: Monitor Signal Trends
# P_inc print suppressed — data captured via model.get_signal_metrics() telemetry

62
scripts/ISOLATE/model_wrapper.py

@ -18,7 +18,7 @@ if utils_root not in sys.path:
from e2e_agent_sem_lidar2shenron_package.lidar import run_lidar
from e2e_agent_sem_lidar2shenron_package.ConfigureRadar import radar
from sim_radar_utils.radar_processor import RadarProcessor
from sim_radar_utils.utils_radar import reformat_adc_shenron
from sim_radar_utils.utils_radar import reformat_adc_shenron, config
class ShenronRadarModel:
def __init__(self, radar_type='radarbook'):
@ -96,8 +96,11 @@ class ShenronRadarModel:
# 1. Physics-based Signal Generation (FMCW Chirps)
# This generates the raw ADC samples [Np, N, Ant]
adc_data = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
raw_adc = run_lidar(self.sim_config, semantic_lidar_data, radarobj=self.radar_obj)
# Store raw ADC for later saving
self.last_adc = raw_adc
adc_data = raw_adc
# 2. Reformat to match Signal Processor expectations
# Internal logic often needs specific axis ordering
adc_data = reformat_adc_shenron(adc_data)
@ -152,12 +155,21 @@ class ShenronRadarModel:
ra = self.last_metrology['ra_heatmap']
peak_mag = np.max(rd)
avg_noise = np.mean(noise)
snr = 10 * np.log10(peak_mag / avg_noise) if avg_noise > 0 else 0
# 'noise' here is actually the detection_gate (threshold_matrix) from CFAR.
# It already has the 10**(threshold/10) multiplier applied.
# We must divide it out to get the true physical average noise floor.
threshold_db = config['CFAR'].get('threshold', 20.0)
threshold_linear = 10 ** (threshold_db / 10.0)
avg_noise_gate = np.mean(noise)
true_avg_noise = avg_noise_gate / threshold_linear
snr = 10 * np.log10(peak_mag / true_avg_noise) if true_avg_noise > 0 else 0
metrics = {
"peak_magnitude": float(peak_mag),
"avg_noise_floor": float(avg_noise),
"avg_noise_floor": float(true_avg_noise),
"peak_snr_db": float(snr),
"active_bins": int(np.sum(rd > noise))
}
@ -200,9 +212,9 @@ class ShenronRadarModel:
else:
ego_power = 0.0
clutter_bins = (rd > avg_noise) & (rd <= noise)
clutter_bins = (rd > true_avg_noise) & (rd <= noise)
clutter_ratio = np.sum(clutter_bins) / rd.size
avg_clutter = np.mean(rd[clutter_bins]) if np.any(clutter_bins) else avg_noise
avg_clutter = np.mean(rd[clutter_bins]) if np.any(clutter_bins) else true_avg_noise
scr = 10 * np.log10(peak_mag / avg_clutter) if avg_clutter > 0 else snr
metrics["dynamic_range_db"] = float(dyn_range)
@ -210,9 +222,37 @@ class ShenronRadarModel:
metrics["clutter_ratio"] = float(clutter_ratio)
metrics["signal_to_clutter_ratio_db"] = float(scr)
# 3. Array Health
az_var = np.mean(np.var(ra, axis=1)) if ra is not None else 0.0
metrics["azimuth_variance"] = float(az_var)
# 3. Array Health (Angular Dispersion)
if ra is not None:
# Normalize RA heatmap [0, 1] to prevent massive scalar variance
ra_max = np.max(ra)
ra_norm = ra / ra_max if ra_max > 0 else ra
# Azimuth Variance (Normalized)
# This measures the average energy dispersion across all range bins
az_var = np.mean(np.var(ra_norm, axis=1))
metrics["azimuth_variance"] = float(az_var)
# Peak Azimuth Spread (Half-Power Beamwidth proxy)
# Find the range bin with the maximum energy
peak_range_idx = np.argmax(np.max(ra, axis=1))
peak_az_profile = ra[peak_range_idx, :]
peak_val = np.max(peak_az_profile)
if peak_val > 0:
# Count bins within 3dB (0.5 power) of the peak
half_power_bins = np.sum(peak_az_profile > (0.5 * peak_val))
# Convert to degrees: (Num Bins / Total Bins) * FOV
# Total Bins = len(angle_axis), FOV ~ 120 degrees
fov_deg = 120.0 # Approximate for visualization
spread_deg = (half_power_bins / len(peak_az_profile)) * fov_deg
else:
spread_deg = 0.0
metrics["peak_azimuth_spread_deg"] = float(spread_deg)
else:
metrics["azimuth_variance"] = 0.0
metrics["peak_azimuth_spread_deg"] = 0.0
except Exception as e:
print(f"[WARNING] Advanced metrology calculation failed: {e}")

140
scripts/ISOLATE/shenron_orchestrator.py

@ -0,0 +1,140 @@
import os
import sys
import time
import numpy as np
import json
from pathlib import Path
# Ensure we can import the model wrapper
sys.path.append(str(Path(__file__).parent))
try:
from model_wrapper import ShenronRadarModel
except ImportError:
# Fallback if called from a different context
from scripts.ISOLATE.model_wrapper import ShenronRadarModel
class ShenronOrchestrator:
"""
Unified orchestration engine for physics-based radar synthesis.
Ensures parity between production generation (dashboard) and iterative testbench (test_shenron).
"""
def __init__(self, radar_types=['awrl1432', 'radarbook']):
self.radar_types = radar_types
self.models = {}
def init_models(self, output_root: Path):
"""Initializes models and prepares directory structure."""
specs = {}
# Resolve project root for stop flag check (root/scripts/ISOLATE/shenron_orchestrator.py)
self.project_root = Path(__file__).resolve().parents[2]
self.flag_path = self.project_root / "tmp" / "stop.flag"
for r_type in self.radar_types:
try:
print(f" - Initializing Shenron {r_type} engine...")
model = ShenronRadarModel(radar_type=r_type)
self.models[r_type] = model
# Setup Folders
radar_dir = output_root / r_type
radar_dir.mkdir(exist_ok=True, parents=True)
met_base = radar_dir / "metrology"
for sub in ["rd", "ra", "cfar"]:
(met_base / sub).mkdir(parents=True, exist_ok=True)
# Save physical axes (static per session)
np.save(met_base / "range_axis.npy", model.processor.rangeAxis)
np.save(met_base / "angle_axis.npy", model.processor.angleAxis)
# Get hardware specs
specs[r_type] = model.get_radar_specs()
# Save specs for MCAP converter downstream
hw_specs = {
'f': float(model.radar_obj.f),
'chirp_rep': float(model.radar_obj.chirp_rep),
'max_velocity': float((3e8 / model.radar_obj.f) / (4 * model.radar_obj.chirp_rep)),
}
with open(met_base / "radar_specs.json", "w") as sf:
json.dump(hw_specs, sf)
except Exception as e:
print(f" [WARNING] Failed to init {r_type}: {e}")
continue
return specs
def process_frame(self, lidar_file: Path, output_root: Path, save_adc=False):
"""Processes a single LiDAR frame through all active radar models."""
try:
data = np.load(lidar_file)
except Exception as e:
print(f" [ERROR] Failed to load {lidar_file.name}: {e}")
return None
# Standard Padding: Ensure [x, y, z, intensity, cos_inc_angle, obj, tag]
if data.shape[1] == 6:
padded_data = np.zeros((data.shape[0], 7), dtype=np.float32)
padded_data[:, 0:3] = data[:, 0:3]
padded_data[:, 4:7] = data[:, 3:6]
data = padded_data
frame_results = {}
for r_type, model in self.models.items():
try:
# 1. Physics Processing
rich_pcd = model.process(data)
# 2. Save Pointcloud
output_file = output_root / r_type / lidar_file.name
np.save(output_file, rich_pcd)
# 3. Optional ADC Saving
if save_adc and hasattr(model, "last_adc") and model.last_adc is not None:
adc_folder = output_root / r_type / "adc_raw"
adc_folder.mkdir(parents=True, exist_ok=True)
np.save(adc_folder / lidar_file.name, model.last_adc)
# 4. Save Metrology (.npy)
met = model.get_last_metrology()
met_base = output_root / r_type / "metrology"
if met:
frame_name = lidar_file.stem
np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap'])
np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap'])
np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix'])
# 5. Extract and Save Metrics
metrics = model.get_signal_metrics()
if metrics:
# Clean for JSON (handle NaN/Inf)
clean_metrics = {}
for k, v in metrics.items():
if isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
clean_metrics[k] = 0.0
else:
clean_metrics[k] = v
# Append to metrics log
metrics_file = met_base / "metrics.jsonl"
with open(metrics_file, "a") as f:
f.write(json.dumps({"frame": lidar_file.stem, **clean_metrics}) + "\n")
# Add point count for telemetry
clean_metrics["pts"] = int(rich_pcd.shape[0]) if hasattr(rich_pcd, 'shape') else 0
frame_results[r_type] = clean_metrics
except Exception as e:
print(f" [ERROR] {r_type} processing failed for {lidar_file.name}: {e}")
continue
return frame_results
def check_stop_flag(self) -> bool:
"""Check if a stop request has been issued by the user/dashboard."""
if hasattr(self, 'flag_path') and self.flag_path.exists():
print(f"\n[SHENRON] Stop flag detected at {self.flag_path}! Halting processing...")
return True
return False

9
scripts/ISOLATE/sim_radar_utils/config.yaml

@ -50,8 +50,8 @@ ROS:
CFAR:
win_param: [9, 9, 5, 5] # [Est. width, Est. height, Guard width, Guard height] - Widened guard to 11x11 to isolate nearby clutter
threshold: 20 # dB (Standard) - Reverted to 20dB as per user request to maintain baseline sensitivity
peak_grouping: true # Toggle Non-Maximum Suppression (NMS) to eliminate range/doppler sidelobes
threshold: 19 # dB (Standard) - Lowered from 20dB to 19dB as a balanced compromise to boost density without excessive noise
peak_grouping: false # Toggle Non-Maximum Suppression (NMS) to eliminate range/doppler sidelobes
Visualize:
@ -69,8 +69,9 @@ Visualize:
xUnit: "m"
yLabel: "Velocity"
yUnit: "m/s"
xRange: [-8, 8] # Doppler Velocity limits [m/s]
yRange: [0, 120] # Range limits [m]
# xRange and yRange deliberately NOT set here — the renderer uses the true
# physical Doppler axis (±26.8 m/s for AWRL1432) computed from chirp_rep.
# DO NOT add xRange overrides without resetting the axis in test_shenron.py.
winSize: [500, 400]
pos: [600, 50]
radarPCD:

11
scripts/ISOLATE/sim_radar_utils/plots.py

@ -104,11 +104,16 @@ def postprocess_ra(ra_heatmap, range_axis, smooth_sigma=1.0):
ra = np.clip(ra, 1e-9, None)
# 2. Physics-based dynamic range compression (Linear -> Log)
SYSTEM_GAIN_OFFSET = 68.0
# SYSTEM_GAIN_OFFSET: Accounts for the absolute scale of the simulated signal.
# Must be updated when radar.gain changes. Formula: 10*log10(gain) - reference_offset
# Baseline: gain=10^(110/10) -> offset=68.0
# After +10dB recalibration: gain=10^(120/10) -> offset=78.0
SYSTEM_GAIN_OFFSET = 78.0
ra_db = 10 * np.log10(ra) - SYSTEM_GAIN_OFFSET
# 3. Fixed dynamic range clipping (-5 to 45 dB)
ra_db = np.clip(ra_db, -5, 45)
# 3. Fixed dynamic range clipping (-5 to 55 dB)
# Upper limit raised from 45 to 55 dB to provide headroom after gain recalibration.
ra_db = np.clip(ra_db, -5, 55)
# 4. Optional Gaussian smoothing
if smooth_sigma > 0:

12
scripts/ISOLATE/sim_radar_utils/radar_processor.py

@ -22,10 +22,13 @@ cfarCfg = config['CFAR']
class RadarProcessor:
def __init__(self):
# radar data will be shaped as (# of chirp, # of sample, # of antenna)
self.rangeWin = np.tile(signal.windows.hann(radarCfg['N']), (radarCfg['Np'], radarCfg['NrChn'], 1))
# Blackman-Harris window: Deepest sidelobe rejection (-92 dB).
# Used to suppress the horizontal "astigmatism" sidelobe lines at the
# cost of a broader main-lobe (slightly less range resolution).
self.rangeWin = np.tile(signal.windows.blackmanharris(radarCfg['N']), (radarCfg['Np'], radarCfg['NrChn'], 1))
self.rangeWin = np.transpose(self.rangeWin, (2, 0, 1))
self.velWin = np.tile(signal.windows.hann(radarCfg['Np']), (radarCfg['N'], radarCfg['NrChn'], 1))
self.velWin = np.tile(signal.windows.blackmanharris(radarCfg['Np']), (radarCfg['N'], radarCfg['NrChn'], 1))
self.velWin = np.transpose(self.velWin, (0, 2, 1))
rangeRes = fftCfg['c0'] / (2*(radarCfg['fStop'] - radarCfg['fStrt']))
@ -46,8 +49,9 @@ class RadarProcessor:
threshold=cfarCfg['threshold'],
rd_size=(self.RMaxIdx - self.RMinIdx + 1, fftCfg['NFFTVel']))
# Spatial Window for Angle-FFT (Hann window to reduce sidelobes)
self.spatialWin = np.hanning(radarCfg['NrChn'])
# Spatial Window for Angle-FFT (Hamming window to reduce sidelobes without zeroing edges)
# Note: Hamming is better for small arrays (like 6-8 antennas) than Hanning
self.spatialWin = np.hamming(radarCfg['NrChn'])
def cal_range_fft(self, data):
'''apply range window and doppler window and apply fft on each sample to get range profile'''

7
scripts/data_to_mcap.py

@ -138,6 +138,7 @@ FOXGLOVE_SCENE_UPDATE_SCHEMA = {
FRUSTUM_SPECS = {
"awrl1432": {"az_deg": 75.0, "el_deg": 20.0, "max_r": 150.0, "color": {"r": 1.0, "g": 0.5, "b": 0.0, "a": 1.0}},
"radarbook": {"az_deg": 60.0, "el_deg": 10.0, "max_r": 150.0, "color": {"r": 0.0, "g": 1.0, "b": 1.0, "a": 1.0}},
"ti_cascade": {"az_deg": 60.0, "el_deg": 10.0, "max_r": 150.0, "color": {"r": 0.0, "g": 1.0, "b": 0.0, "a": 1.0}},
}
def load_frames(folder_path):
@ -173,7 +174,7 @@ def convert_folder(folder_path):
lidar_channel_id = writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id)
pose_channel_id = writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id)
radar_channel_id = writer.register_channel(topic="/radar/native", message_encoding="json", schema_id=lidar_schema_id)
radar_types = ['awrl1432', 'radarbook']
radar_types = ['awrl1432', 'radarbook', 'ti_cascade']
shenron_channels = {}
met_channels = {}
cached_axes = {}
@ -395,7 +396,7 @@ def convert_folder(folder_path):
rd_p = os.path.join(met_dir, "rd", f"{frame_name}.npy")
if os.path.exists(rd_p):
rd_data = np.load(rd_p)
rd_db = 10 * np.log10(np.clip(rd_data, 1e-9, None)) - 68.0
rd_db = 10 * np.log10(np.clip(rd_data, 1e-9, None)) - 78.0 # Updated offset: -10dB to match Iteration 37 recalibration
b64 = render_engines[r_type]['rd'].render(np.flipud(rd_db))
if b64:
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}
@ -433,7 +434,7 @@ def convert_folder(folder_path):
cfar_p = os.path.join(met_dir, "cfar", f"{frame_name}.npy")
if os.path.exists(cfar_p):
cf_data = np.load(cfar_p)
cf_db = 10 * np.log10(np.clip(cf_data, 1e-9, None)) - 68.0
cf_db = 10 * np.log10(np.clip(cf_data, 1e-9, None)) - 78.0
b64 = render_engines[r_type]['cfar'].render(np.flipud(cf_db))
if b64:
msg = {"timestamp": {"sec": ts_sec, "nsec": ts_nsec}, "frame_id": "ego_vehicle", "format": "png", "data": b64}

110
scripts/generate_shenron.py

@ -55,50 +55,19 @@ def process_session(session_path):
print(f" [SKIP] No .npy files in 'lidar' folder.")
return
radar_types = ['awrl1432', 'radarbook']
models = {}
from scripts.ISOLATE.shenron_orchestrator import ShenronOrchestrator
orchestrator = ShenronOrchestrator(radar_types=['awrl1432', 'radarbook', 'ti_cascade'])
# -----------------------------------------------------------------------
# DIAGNOSTIC: Step 1 - Model Initialization
# TELEMETRY: Init Phase
# -----------------------------------------------------------------------
print(f" [DIAGNOSTIC] Step 1: Initializing models...", flush=True)
for r_type in radar_types:
try:
print(f" - Loading physics weights for {r_type}...", flush=True)
models[r_type] = ShenronRadarModel(radar_type=r_type)
(session_path / r_type).mkdir(exist_ok=True)
# Create Metrology folders
met_base = session_path / r_type / "metrology"
for sub in ["rd", "ra", "cfar"]:
(met_base / sub).mkdir(parents=True, exist_ok=True)
# Save physical axes once per session
np.save(met_base / "range_axis.npy", models[r_type].processor.rangeAxis)
np.save(met_base / "angle_axis.npy", models[r_type].processor.angleAxis)
# Save radar hardware specs for downstream MCAP visualization
radar_hw_specs = {
'f': float(models[r_type].radar_obj.f),
'chirp_rep': float(models[r_type].radar_obj.chirp_rep),
'max_velocity': float((3e8 / models[r_type].radar_obj.f) / (4 * models[r_type].radar_obj.chirp_rep)),
}
with open(met_base / "radar_specs.json", "w") as sf:
json.dump(radar_hw_specs, sf)
except Exception as e:
print(f" [WARNING] Failed to init {r_type}: {e}")
continue
radar_specs = orchestrator.init_models(session_path)
# -----------------------------------------------------------------------
# TELEMETRY: Emit structured init payload
# -----------------------------------------------------------------------
print(f" [DIAGNOSTIC] Step 2: Collecting metadata...", flush=True)
gpu_info = _get_gpu_info()
radar_specs = {}
for r_type, model in models.items():
radar_specs[r_type] = model.get_radar_specs()
telemetry_init = {
"gpu": gpu_info,
"radars": radar_specs,
@ -117,65 +86,16 @@ def process_session(session_path):
for frame_idx, lidar_file in enumerate(lidar_files):
frame_start = time.time()
# 1. Load Semantic LiDAR data
try:
data = np.load(lidar_file)
except Exception as e:
print(f"\n [ERROR] Failed to load {lidar_file.name}: {e}")
continue
if data.shape[1] == 6:
padded_data = np.zeros((data.shape[0], 7), dtype=np.float32)
padded_data[:, 0:3] = data[:, 0:3]
padded_data[:, 4:7] = data[:, 3:6]
data = padded_data
# Stop if requested
if orchestrator.check_stop_flag():
break
frame_metrics = {}
for r_type, model in models.items():
try:
# 2. Process through the physics-based model
# print(f" [DEBUG] Processing {r_type} frame {frame_idx+1}...", end='\r', flush=True)
rich_pcd = model.process(data)
# 3. Save to disk
output_file = session_path / r_type / lidar_file.name
np.save(output_file, rich_pcd)
# 4. Save Metrology Heatmaps
met_base = session_path / r_type / "metrology"
met = model.get_last_metrology()
if met:
frame_name = lidar_file.stem
np.save(met_base / "rd" / f"{frame_name}.npy", met['rd_heatmap'])
np.save(met_base / "ra" / f"{frame_name}.npy", met['ra_heatmap'])
np.save(met_base / "cfar" / f"{frame_name}.npy", met['threshold_matrix'])
# 5. Save Signal Metrics
try:
metrics = model.get_signal_metrics()
if metrics:
# Clean metrics for JSON (handle NaN/Inf)
clean_metrics = {}
for k, v in metrics.items():
if isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
clean_metrics[k] = 0.0
else:
clean_metrics[k] = v
metrics_file = met_base / "metrics.jsonl"
with open(metrics_file, "a") as f:
f.write(json.dumps({"frame": lidar_file.stem, **clean_metrics}) + "\n")
# CAPTURE UNIQUE POINT COUNT FOR TELEMETRY
clean_metrics["pts"] = int(rich_pcd.shape[0]) if hasattr(rich_pcd, 'shape') else 0
frame_metrics[r_type] = clean_metrics
except Exception as e:
pass # Metrics failure shouldn't crash the loop
except Exception as e:
print(f"\n [ERROR] Failed to process {lidar_file.name} for {r_type}: {e}")
# Process through the unified orchestrator
frame_results = orchestrator.process_frame(lidar_file, session_path, save_adc=True)
if frame_results is None:
continue
# Timing
frame_elapsed = time.time() - frame_start
frame_times.append(frame_elapsed)
@ -197,12 +117,14 @@ def process_session(session_path):
"metrics": {}
}
for r_type, m in frame_metrics.items():
for r_type, m in frame_results.items():
telemetry_frame["metrics"][r_type] = {
"snr": round(m.get("peak_snr_db", 0), 1),
"pts": m.get("pts", 0),
"peak": round(m.get("peak_magnitude", 0), 1),
"bins": m.get("active_bins", 0),
"az_std": round(m.get("azimuth_variance", 0), 4),
"spread": round(m.get("peak_azimuth_spread_deg", 0), 1),
}
print(f"[SHENRON_STEP]{json.dumps(telemetry_frame)}", flush=True)

81
scripts/test_shenron.py

@ -156,7 +156,7 @@ def run_testbench(iter_name):
return
iter_dir.mkdir(parents=True, exist_ok=True)
radar_types = ['awrl1432', 'radarbook']
radar_types = ['awrl1432', 'radarbook', 'ti_cascade']
print(f"\n======================================")
print(f"SHENRON TESTBENCH ITERATION: {iter_name}")
@ -164,26 +164,10 @@ def run_testbench(iter_name):
# 1. GENERATE SYNTHETIC DATA
print("\n[Stage 1]: Processing Physics models...")
models = {}
for r_type in radar_types:
try:
print(f" -> Initializing {r_type} engine...")
models[r_type] = ShenronRadarModel(radar_type=r_type)
(iter_dir / r_type).mkdir(exist_ok=True)
# Create Metrology folders
met_base = iter_dir / r_type / "metrology"
for sub in ["rd", "ra", "cfar"]:
(met_base / sub).mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f" -> [WARNING] Failed to init {r_type}: {e}")
continue
# Save physical axes once per radar type (same for every frame — config-derived)
met_base = iter_dir / r_type / "metrology"
np.save(met_base / "range_axis.npy", models[r_type].processor.rangeAxis)
np.save(met_base / "angle_axis.npy", models[r_type].processor.angleAxis)
from scripts.ISOLATE.shenron_orchestrator import ShenronOrchestrator
orchestrator = ShenronOrchestrator(radar_types=radar_types)
radar_specs = orchestrator.init_models(iter_dir)
lidar_files = sorted(list(lidar_dir.glob("*.npy")))
if args.frames and args.frames > 0:
@ -191,43 +175,7 @@ def run_testbench(iter_name):
lidar_files = lidar_files[:args.frames]
for lidar_file in tqdm.tqdm(lidar_files, desc=" Simulating Radars", unit="frame"):
data = np.load(lidar_file)
# Pad to [x, y, z, intensity, cos_inc_angle, obj, tag] if needed
if data.shape[1] == 6:
padded_data = np.zeros((data.shape[0], 7), dtype=np.float32)
padded_data[:, 0:3] = data[:, 0:3]
padded_data[:, 4:7] = data[:, 3:6]
data = padded_data
for r_type, model in models.items():
try:
rich_pcd = model.process(data)
out_path = iter_dir / r_type / lidar_file.name
np.save(out_path, rich_pcd)
# --- PHASES 1 & 3: Save Raw Metrology (.npy) ---
met = model.get_last_metrology()
if met:
frame_name = lidar_file.stem # e.g., frame_000200
ra_map = met['ra_heatmap']
np.save(iter_dir / r_type / "metrology" / "rd" / f"{frame_name}.npy", met['rd_heatmap'])
np.save(iter_dir / r_type / "metrology" / "ra" / f"{frame_name}.npy", ra_map)
np.save(iter_dir / r_type / "metrology" / "cfar" / f"{frame_name}.npy", met['threshold_matrix'])
# --- SANITY CHECK: Azimuth Variance ---
# If RA is working, energy should vary across azimuth bins.
# If RA is broken (uniform rings), variance will be near zero.
az_std = np.mean(np.std(ra_map, axis=1))
if az_std < 1e-12:
tqdm.tqdm.write(f" [⚠️ WARNING] Frame {frame_name} ({r_type}): Azimuth variance is ZERO. Check Phase Preservation.")
# Log Metrics
metrics = model.get_signal_metrics()
with open(iter_dir / r_type / "metrology" / "metrics.jsonl", "a") as mf:
mf.write(json.dumps({"frame": frame_name, **metrics}) + "\n")
except Exception as e:
print(f"[ERROR] Frame {lidar_file.name} failed for {r_type}: {e}")
orchestrator.process_frame(lidar_file, iter_dir, save_adc=False)
# 2. GENERATE MCAP
print("\n[Stage 2]: Weaving MCAP Comparison Package...")
@ -303,16 +251,16 @@ def run_testbench(iter_name):
cached_axes[r_type] = None
# Initialize the stateful Matplotlib renderers for extreme throughput
f_cfg = models[r_type].radar_obj.f if r_type in models else 77e9
chirp_rep_cfg = models[r_type].radar_obj.chirp_rep if r_type in models else 3e-5
f_cfg = orchestrator.models[r_type].radar_obj.f if r_type in orchestrator.models else 77e9
chirp_rep_cfg = orchestrator.models[r_type].radar_obj.chirp_rep if r_type in orchestrator.models else 3e-5
max_vel_cfg = (3e8 / f_cfg) / (4 * chirp_rep_cfg)
max_r_cfg = cached_axes[r_type]['range_axis'][-1] if cached_axes[r_type] else 150
display_limit_cfg = 120.0
render_engines[r_type] = {
'rd': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='viridis', title=f'{r_type.upper()} Range-Doppler', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='bicubic'),
'cfar': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='plasma', title=f'{r_type.upper()} CFAR Noise Threshold', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='bicubic'),
'ra_static': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', vmin=-5, vmax=45, title=f'{r_type.upper()} Range-Azimuth (Absolute)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal'),
'rd': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='viridis', title=f'{r_type.upper()} Range-Doppler', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='nearest'),
'cfar': FastHeatmapEngine(extent=[-max_vel_cfg, max_vel_cfg, 0, max_r_cfg], cmap='plasma', title=f'{r_type.upper()} CFAR Noise Threshold', xlabel='Doppler Velocity [m/s]', ylabel='Range [m]', ylim=[0, 120], interpolation='nearest'),
'ra_static': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', vmin=-5, vmax=55, title=f'{r_type.upper()} Range-Azimuth (Absolute)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal'),
'ra_dyn': FastHeatmapEngine(extent=[-display_limit_cfg, display_limit_cfg, 0, display_limit_cfg], cmap='jet', title=f'{r_type.upper()} Range-Azimuth (Dynamic)', xlabel='Lateral distance [m]', ylabel='Longitudinal distance [m]', aspect='equal')
}
@ -436,7 +384,7 @@ def run_testbench(iter_name):
rd_data = np.load(rd_p)
# Apply log conversion minus identical system gain offset to maintain -5 to 45 scaling
# Simple 10*log10 - SYSTEM_GAIN_OFFSET
rd_db = 10 * np.log10(np.clip(rd_data, 1e-9, None)) - 68.0
rd_db = 10 * np.log10(np.clip(rd_data, 1e-9, None)) - 78.0 # Updated offset: +10dB from gain recalibration
# Flip UD so Range 0 (ego) is at the bottom. Use Cached Renderer.
b64 = render_engines[r_type]['rd'].render(np.flipud(rd_db))
@ -479,7 +427,7 @@ def run_testbench(iter_name):
cf_data = np.load(cf_p)
# Convert threshold power floor to pure threshold DB mask similar to RD
cf_db = 10 * np.log10(np.clip(cf_data, 1e-9, None)) - 68.0
cf_db = 10 * np.log10(np.clip(cf_data, 1e-9, None)) - 78.0
# Flip UD so Range 0 (ego) is at the bottom
# Revert to dynamic (None) scaling so threshold logic is easily visible
@ -553,7 +501,8 @@ def run_testbench(iter_name):
color_map = {
"awrl1432": {"r": 1.0, "g": 0.5, "b": 0.0, "a": 1.0}, # Solid Orange
"radarbook": {"r": 0.0, "g": 1.0, "b": 1.0, "a": 1.0} # Solid Cyan
"radarbook": {"r": 0.0, "g": 1.0, "b": 1.0, "a": 1.0}, # Solid Cyan
"ti_cascade": {"r": 0.0, "g": 1.0, "b": 0.0, "a": 1.0} # Solid Green
}
f_color = color_map.get(r_type, {"r": 1.0, "g": 1.0, "b": 1.0, "a": 1.0})

3
src/main.py

@ -19,6 +19,8 @@ This file never changes when new scenarios are added.
All scenario logic lives in scenarios/<name>.py.
"""
import sys
import os
from pathlib import Path
import argparse
@ -27,7 +29,6 @@ sys.path.append(str(Path(__file__).resolve().parents[1]))
import config
from scenario_loader import list_scenarios
from pathlib import Path
# -----------------------------------------------------------------------

12
src/pipeline/manager.py

@ -51,6 +51,12 @@ class PipelineManager:
True if all stages completed successfully.
"""
started_stages = []
# Resolve project root for stop flag check
import os
from pathlib import Path
root_dir = Path(__file__).resolve().parents[2]
flag_path = root_dir / "tmp" / "stop.flag"
try:
for stage in self._stages:
@ -59,6 +65,12 @@ class PipelineManager:
print(f"[PIPELINE] Skipping stage: '{stage.name}'")
continue
# Check if user requested a stop via dashboard
if os.path.exists(flag_path):
print(f"[PIPELINE] Stop flag detected! Halting before "
f"starting '{stage.name}'.")
break
# Check if a previous stage failed
if not ctx.success:
print(f"[PIPELINE] Halting — previous stage failed. "

1
src/pipeline/stages/video_stage.py

@ -8,6 +8,7 @@ transition. This stage runs at the very end of the pipeline.
"""
import os
import sys
import cv2
from pathlib import Path
from pipeline.base import PipelineStage, PipelineContext

Loading…
Cancel
Save