Browse Source

feat: initial commit CARLA ADAS modular scenario-driven simulation pipeline

## Summary

First commit of the Fox CARLA simulation pipeline. Establishes a fully working,
end-to-end ADAS data collection and visualization system built on CARLA 0.9.16.

---

## What is included

### Core Pipeline
- Multi-sensor synchronous simulation (Camera, Radar, LiDAR) at configurable FPS
- Frame-aligned sensor data captured using CARLA synchronous mode with fixed delta time
- Structured dataset output: PNG images, NPY arrays, JSONL metadata per frame
- MCAP conversion script producing Foxglove-compatible topics (/camera, /lidar, /radar, /ego_pose)
- Ground truth vehicle tracking embedded in every frame record

### Modular Scenario Architecture
- Abstract ScenarioBase plugin interface (setup / step / cleanup lifecycle)
- Dynamic scenario loader via importlib  main.py never imports scenarios by name
- Three implemented ADAS test scenarios:
  - braking   : Lead vehicle hard braking (emergency stop at configurable frame)
  - cutin     : Adjacent lane NPC forced lane change into ego lane
  - obstacle  : Static traffic cone placed on ego lane
- All NPC placement via CARLA waypoints  zero hardcoded world coordinates
- Scenario metadata embedded into every JSONL frame record for MCAP traceability

### Orchestrator (src/main.py)
- Fully scenario-agnostic  scenarios selected via --scenario CLI flag
- Supports --frames override for quick test runs without editing config
- Supports --list-scenarios for dry-run discovery (no CARLA server required)
- Deferred CARLA import so the process works without a live server for non-simulation commands

### Configuration (config.py)
- Single source of truth for all simulation, sensor, and scenario constants
- Scenario trigger frames and distances fully configurable without code changes

### Recorder (src/recorder.py)
- Session folders named <scenario>_YYYYMMDD_HHMMSS for easy identification
- extra_meta parameter merges scenario state into every frame record automatically

### Developer Tooling
- run.bat: one-click launcher that activates the carla312 conda environment
- intel/context.md: comprehensive AI-agent and developer reference document
- .gitignore: excludes data/, *.mcap, __pycache__, logs/, *.pyc

---

## Architecture

src/main.py (orchestrator)
  |-- scenario_loader.py    (importlib-based dynamic loader)
  |-- sensors.py            (SensorManager: camera + radar + lidar)
  |-- recorder.py           (per-frame data writer)

scenarios/
  |-- base.py               (ScenarioBase ABC)
  |-- braking.py            (BrakingScenario)
  |-- cutin.py              (CutInScenario)
  |-- obstacle.py           (ObstacleScenario)

config.py                   (all constants)
data_to_mcap.py             (dataset -> MCAP converter)
run.bat                     (conda env launcher)
intel/context.md            (codebase reference)

---

## Usage

  run.bat braking
  run.bat cutin --frames 120
  run.bat obstacle
  python data_to_mcap.py

---

## Environment
- CARLA 0.9.16
- Python conda env: carla312 (miniconda)
- Key deps: carla, numpy, opencv-python, mcap

---

## Known Limitations (future work)
- MCAP uses JSON encoding (functional; Protobuf migration planned)
- Intersection scenario not yet implemented (requires junction-aware waypoint logic)
- Single ego vehicle assumed throughout
- Foxglove layout presets not yet defined
1843_integration
RUSHIL AMBARISH KADU 2 months ago
commit
2d6d04e0b0
  1. 6
      .gitignore
  2. 39
      config.py
  3. 125
      data_inspector.py
  4. 178
      data_to_mcap.py
  5. 302
      intel/context.md
  6. 34
      run.bat
  7. 1
      scenarios/__init__.py
  8. 153
      scenarios/base.py
  9. 119
      scenarios/braking.py
  10. 111
      scenarios/cutin.py
  11. 108
      scenarios/obstacle.py
  12. 194
      src/main.py
  13. 122
      src/recorder.py
  14. 92
      src/scenario_loader.py
  15. 110
      src/sensors.py

6
.gitignore

@ -0,0 +1,6 @@
data/
__pycache__/
*.mcap
logs/
*.pyc
.env

39
config.py

@ -0,0 +1,39 @@
# Simulation settings
FPS = 20
DELTA_SECONDS = 1.0 / FPS
# Camera settings
CAM_WIDTH = 1280
CAM_HEIGHT = 720
CAM_FOV = 90
# Radar settings
RADAR_HORIZONTAL_FOV = 120
RADAR_VERTICAL_FOV = 30
RADAR_RANGE = 100
# Lidar settings (balanced for performance + quality)
LIDAR_RANGE = 100
LIDAR_CHANNELS = 32
LIDAR_POINTS_PER_SECOND = 100000
LIDAR_ROTATION_FREQUENCY = FPS
# Output
MAX_FRAMES = 200
# -----------------------------------------------------------------------
# Scenario defaults (scenarios read these via getattr(config, KEY, default))
# -----------------------------------------------------------------------
DEFAULT_SCENARIO = "braking"
# BrakingScenario
SCENARIO_LEAD_DISTANCE = 25 # metres ahead of ego for lead vehicle
SCENARIO_BRAKE_FRAME = 80 # frame on which the lead vehicle emergency-brakes
# CutInScenario
SCENARIO_CUTIN_DISTANCE = 15 # metres ahead of ego for NPC in adjacent lane
SCENARIO_CUTIN_FRAME = 60 # frame on which NPC is forced to change lane
# ObstacleScenario
SCENARIO_OBSTACLE_DISTANCE = 30 # metres ahead of ego for static prop
SCENARIO_OBSTACLE_PROP = "static.prop.trafficcone01"

125
data_inspector.py

@ -0,0 +1,125 @@
import os
import json
import numpy as np
import matplotlib.pyplot as plt
DATA_PATH = "data"
# ---------------- LOAD FRAME METADATA ----------------
def load_frame(frame_id):
with open(os.path.join(DATA_PATH, "frames.jsonl")) as f:
for line in f:
data = json.loads(line)
if data["frame_id"] == frame_id:
return data
return None
# ---------------- RADAR ----------------
def inspect_radar(frame_id):
path = os.path.join(DATA_PATH, "radar", f"frame_{frame_id:06d}.npy")
data = np.load(path)
print("\n--- RADAR DATA ---")
print("Shape:", data.shape)
if len(data) > 0:
print("\nFirst 5 detections:")
for row in data[:5]:
print(f"Depth={row[0]:.2f} m | Azimuth={row[1]:.3f} rad | Vel={row[3]:.2f} m/s")
else:
print("No radar detections")
return data
# ---------------- LIDAR ----------------
def inspect_lidar(frame_id):
path = os.path.join(DATA_PATH, "lidar", f"frame_{frame_id:06d}.npy")
data = np.load(path)
print("\n--- LIDAR DATA ---")
print("Shape:", data.shape)
print("\nFirst 5 points:")
for row in data[:5]:
print(f"x={row[0]:.2f}, y={row[1]:.2f}, z={row[2]:.2f}")
return data
# ---------------- GROUND TRUTH ----------------
def inspect_gt(frame_id):
frame = load_frame(frame_id)
print("\n--- GROUND TRUTH ---")
gt = frame["ground_truth"]
print(f"Total vehicles: {len(gt)}")
for v in gt[:5]:
print(f"ID={v['id']} | x={v['x']:.2f}, y={v['y']:.2f} | speed={v['speed']:.2f}")
# ---------------- PLOT RADAR ----------------
def plot_radar(radar_data):
if len(radar_data) == 0:
print("No radar data to plot")
return
r = radar_data[:, 0]
az = radar_data[:, 1]
x = r * np.cos(az)
y = r * np.sin(az)
plt.figure()
plt.scatter(x, y, s=10)
plt.title("Radar XY")
plt.xlabel("X (m)")
plt.ylabel("Y (m)")
plt.axis("equal")
plt.grid()
plt.show()
# ---------------- PLOT LIDAR ----------------
def plot_lidar(lidar_data):
x = lidar_data[:, 0]
y = lidar_data[:, 1]
plt.figure()
plt.scatter(x, y, s=1)
plt.title("Lidar Top View")
plt.axis("equal")
plt.grid()
plt.show()
# ---------------- MAIN ----------------
if __name__ == "__main__":
frame_id = int(input("Enter frame number: "))
frame = load_frame(frame_id)
if frame is None:
print("Frame not found")
exit()
print("\n========== FRAME INFO ==========")
print("Frame ID:", frame["frame_id"])
print("Timestamp:", frame["timestamp"])
inspect_gt(frame_id)
radar = inspect_radar(frame_id)
lidar = inspect_lidar(frame_id)
# Optional plots
plot_choice = input("\nPlot data? (y/n): ").lower()
if plot_choice == "y":
plot_radar(radar)
plot_lidar(lidar)

178
data_to_mcap.py

@ -0,0 +1,178 @@
import os
import json
import numpy as np
import base64
from mcap.writer import Writer
# Official Foxglove JSON Schemas
FOXGLOVE_POSE_SCHEMA = {
"type": "object",
"properties": {
"position": {
"type": "object",
"properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}}
},
"orientation": {
"type": "object",
"properties": {"x": {"type": "number"}, "y": {"type": "number"}, "z": {"type": "number"}, "w": {"type": "number"}}
}
}
}
FOXGLOVE_IMAGE_SCHEMA = {
"type": "object",
"properties": {
"timestamp": {
"type": "object",
"properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}
},
"frame_id": {"type": "string"},
"data": {"type": "string", "contentEncoding": "base64"},
"format": {"type": "string"}
}
}
FOXGLOVE_PCL_SCHEMA = {
"type": "object",
"properties": {
"timestamp": {
"type": "object",
"properties": {"sec": {"type": "integer"}, "nsec": {"type": "integer"}}
},
"frame_id": {"type": "string"},
"pose": FOXGLOVE_POSE_SCHEMA,
"point_stride": {"type": "integer"},
"fields": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"offset": {"type": "integer"},
"type": {"type": "integer"}
}
}
},
"data": {"type": "string", "contentEncoding": "base64"}
}
}
def load_frames(folder_path):
with open(os.path.join(folder_path, "frames.jsonl")) as f:
for line in f:
yield json.loads(line)
def convert_folder(folder_path):
folder_name = os.path.basename(folder_path)
output_path = os.path.join(folder_path, f"{folder_name}.mcap")
if os.path.exists(output_path):
print(f"\n>>> Skipping folder (MCAP already exists): {folder_name}")
return
print(f"\n>>> Processing folder: {folder_name}")
print(f"Target MCAP: {output_path}")
with open(output_path, "wb") as f:
writer = Writer(f)
writer.start(profile="foxglove")
# Register Schemas
pose_schema_id = writer.register_schema(name="foxglove.Pose", encoding="jsonschema", data=json.dumps(FOXGLOVE_POSE_SCHEMA).encode())
camera_schema_id = writer.register_schema(name="foxglove.CompressedImage", encoding="jsonschema", data=json.dumps(FOXGLOVE_IMAGE_SCHEMA).encode())
lidar_schema_id = writer.register_schema(name="foxglove.PointCloud", encoding="jsonschema", data=json.dumps(FOXGLOVE_PCL_SCHEMA).encode())
# Register Channels
camera_channel_id = writer.register_channel(topic="/camera", message_encoding="json", schema_id=camera_schema_id)
lidar_channel_id = writer.register_channel(topic="/lidar", message_encoding="json", schema_id=lidar_schema_id)
pose_channel_id = writer.register_channel(topic="/ego_pose", message_encoding="json", schema_id=pose_schema_id)
radar_channel_id = writer.register_channel(topic="/radar", message_encoding="json", schema_id=lidar_schema_id)
frame_count = 0
for frame in load_frames(folder_path):
ts_ns = int(frame["timestamp"] * 1e9)
ts_sec = ts_ns // 1_000_000_000
ts_nsec = ts_ns % 1_000_000_000
raw_pose = frame["ego_pose"]
x, y, z = raw_pose["x"], -raw_pose["y"], raw_pose["z"]
yaw_rad = -np.radians(raw_pose.get("yaw", 0))
ego_world_pose = {
"position": {"x": x, "y": y, "z": z},
"orientation": {"x": 0.0, "y": 0.0, "z": float(np.sin(yaw_rad/2)), "w": float(np.cos(yaw_rad/2))}
}
identity_pose = {"position": {"x": 0.0, "y": 0.0, "z": 0.0}, "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0}}
# CAMERA
camera_path = os.path.join(folder_path, "camera", frame["camera"])
if os.path.exists(camera_path):
with open(camera_path, "rb") as img_f:
img_bytes = img_f.read()
cam_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle",
"format": "png" if frame["camera"].endswith(".png") else "jpeg",
"data": base64.b64encode(img_bytes).decode("ascii")
}
writer.add_message(camera_channel_id, log_time=ts_ns, data=json.dumps(cam_msg).encode(), publish_time=ts_ns)
# EGO POSE
writer.add_message(pose_channel_id, log_time=ts_ns, data=json.dumps(ego_world_pose).encode(), publish_time=ts_ns)
# LIDAR
lidar_path = os.path.join(folder_path, "lidar", frame["lidar"])
if os.path.exists(lidar_path):
points = np.load(lidar_path)
ros_points = points[:, :3].copy()
ros_points[:, 1] = -ros_points[:, 1]
lidar_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle", "pose": identity_pose, "point_stride": 12,
"fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}],
"data": base64.b64encode(ros_points.astype(np.float32).tobytes()).decode("ascii")
}
writer.add_message(lidar_channel_id, log_time=ts_ns, data=json.dumps(lidar_msg).encode(), publish_time=ts_ns)
# RADAR
radar_path = os.path.join(folder_path, "radar", frame["radar"])
if os.path.exists(radar_path):
r_data = np.load(radar_path)
if r_data.size > 0:
dist, az, alt = r_data[:, 0], -r_data[:, 1], r_data[:, 2]
xr, yr, zr = dist*np.cos(az)*np.cos(alt), dist*np.sin(az)*np.cos(alt), dist*np.sin(alt)
radar_msg = {
"timestamp": {"sec": ts_sec, "nsec": ts_nsec},
"frame_id": "ego_vehicle", "pose": identity_pose, "point_stride": 12,
"fields": [{"name":"x","offset":0,"type":7}, {"name":"y","offset":4,"type":7}, {"name":"z","offset":8,"type":7}],
"data": base64.b64encode(np.stack([xr,yr,zr],axis=1).astype(np.float32).tobytes()).decode("ascii")
}
writer.add_message(radar_channel_id, log_time=ts_ns, data=json.dumps(radar_msg).encode(), publish_time=ts_ns)
frame_count += 1
if frame_count % 50 == 0:
print(f" Processed {frame_count} frames...")
writer.finish()
print(f" Done! MCAP saved: {output_path} ({os.path.getsize(output_path)/1024/1024:.2f} MB)")
def main():
root_data = "data"
if not os.path.exists(root_data):
print(f"Error: {root_data} directory not found.")
return
folders = [os.path.join(root_data, d) for d in os.listdir(root_data) if os.path.isdir(os.path.join(root_data, d))]
# Also check if 'root_data' itself contains 'frames.jsonl' (legacy single-folder mode)
if os.path.exists(os.path.join(root_data, "frames.jsonl")):
convert_folder(root_data)
for folder in folders:
if os.path.exists(os.path.join(folder, "frames.jsonl")):
# Check if MCAP already exists and avoid re-processing if you prefer,
# but here we'll process all matching folders.
convert_folder(folder)
if __name__ == "__main__":
main()

302
intel/context.md

@ -0,0 +1,302 @@
# Project Context — Fox CARLA ADAS Simulation Pipeline
> This document is the primary reference for AI agents and developers navigating this codebase.
> It covers project purpose, architecture, file-by-file roles, data flows, and extension patterns.
---
## Project Purpose
A modular, scenario-driven simulation framework built on CARLA 0.9.16.
**End-to-end pipeline:**
```
CARLA Simulator → Multi-Sensor Capture → Dataset (PNG / NPY / JSONL)
→ MCAP Conversion → Foxglove Visualization
```
**Primary goal:** Demonstrate structured ADAS driving scenarios with synchronized,
multi-modal sensor data that can be visualized and analysed in Foxglove Studio.
---
## Repository Layout
```
Fox/
├── run.bat ← One-click launcher (activates carla312 conda env)
├── config.py ← All tuneable constants (FPS, sensor params, scenario defaults)
├── data_to_mcap.py ← Converts recorded dataset folders → .mcap files
├── data_inspector.py ← Utility to inspect / debug recorded datasets
├── src/
│ ├── main.py ← Orchestrator — scenario-agnostic entry point
│ ├── sensors.py ← SensorManager (camera, radar, lidar setup + sync queues)
│ ├── recorder.py ← Recorder (writes PNG / NPY / JSONL per frame)
│ └── scenario_loader.py ← Dynamic scenario loader via importlib
├── scenarios/
│ ├── __init__.py ← Package marker
│ ├── base.py ← ScenarioBase abstract class (the plugin contract)
│ ├── braking.py ← Lead vehicle hard braking scenario
│ ├── cutin.py ← Adjacent lane cut-in scenario
│ └── obstacle.py ← Static obstacle (traffic cone) scenario
├── data/ ← Auto-created; one subfolder per recording session
│ └── <scenario>_YYYYMMDD_HHMMSS/
│ ├── camera/ ← frame_XXXXXX.png
│ ├── radar/ ← frame_XXXXXX.npy (shape: [N, 4] — depth/az/alt/vel)
│ ├── lidar/ ← frame_XXXXXX.npy (shape: [N, 4] — x/y/z/intensity)
│ └── frames.jsonl ← One JSON record per frame (metadata + scenario info)
└── intel/
└── context.md ← This file
```
---
## Key Files — Detailed Reference
### `config.py`
Single source of truth for all constants. Every module imports from here via `import config`.
| Key | Default | Purpose |
|---|---|---|
| `FPS` | 20 | Simulation tick rate |
| `DELTA_SECONDS` | 0.05 | CARLA fixed delta (= 1/FPS) |
| `CAM_WIDTH/HEIGHT/FOV` | 1280×720, 90° | Camera resolution & field of view |
| `RADAR_HORIZONTAL_FOV` | 120° | Radar angular sweep |
| `RADAR_RANGE` | 100 m | Radar max range |
| `LIDAR_CHANNELS` | 32 | LiDAR beam count |
| `LIDAR_POINTS_PER_SECOND` | 100 000 | LiDAR density |
| `MAX_FRAMES` | 200 | Default run length |
| `DEFAULT_SCENARIO` | `"braking"` | Used when no `--scenario` flag provided |
| `SCENARIO_LEAD_DISTANCE` | 25 m | Lead vehicle spawn distance (braking) |
| `SCENARIO_BRAKE_FRAME` | 80 | Frame at which lead brakes |
| `SCENARIO_CUTIN_DISTANCE` | 15 m | NPC spawn distance (cutin) |
| `SCENARIO_CUTIN_FRAME` | 60 | Frame at which NPC changes lane |
| `SCENARIO_OBSTACLE_DISTANCE` | 30 m | Cone spawn distance (obstacle) |
| `SCENARIO_OBSTACLE_PROP` | `static.prop.trafficcone01` | CARLA blueprint name |
---
### `src/main.py` — Orchestrator
**Responsibilities:** CARLA connection, ego spawn, sensor init, scenario load, main loop, shutdown.
**Does NOT contain any scenario-specific logic.**
**CLI:**
```
python src/main.py --scenario braking
python src/main.py --scenario cutin --frames 120
python src/main.py --list-scenarios
```
**Execution flow:**
1. Parse args → `load_scenario(name)` → returns `ScenarioBase` instance
2. Connect CARLA (`localhost:2000`), enable sync mode at `DELTA_SECONDS`
3. Clear existing actors, spawn ego (`vehicle.tesla.model3`)
4. `SensorManager.spawn_sensors()` → attach camera / radar / lidar to ego
5. `Recorder(scenario_name=scenario.name)` → creates `data/<scenario>_<ts>/`
6. `scenario.setup(world, ego, traffic_manager)`
7. **Main loop** per frame: `world.tick()``sensor_manager.get_data()`
`recorder.save(..., extra_meta=scenario.get_scenario_metadata())``scenario.step(frame, ego)`
8. `finally`: `scenario.cleanup()``sensor_manager.destroy()` → restore async mode
> **Key invariant:** `main.py` never imports a scenario module by name.
> The CARLA import itself is deferred until after `--list-scenarios` early-exit so the dry-run
> works without a running CARLA server.
---
### `src/sensors.py` — SensorManager
Manages three sensors attached to the ego vehicle. All sensors write into `queue.Queue` objects.
`get_data()` blocks until one item from each queue is available, then asserts frame alignment.
| Sensor | CARLA Blueprint | Attachment Point | Output |
|---|---|---|---|
| Camera | `sensor.camera.rgb` | x=1.5, z=2.4 | BGRA image → `camera_queue` |
| Radar | `sensor.other.radar` | x=2.0, z=1.0 | Detection list → `radar_queue` |
| LiDAR | `sensor.lidar.ray_cast` | x=0.0, z=2.5 | Point buffer → `lidar_queue` |
> `get_data()` asserts `cam.frame == radar.frame == lidar.frame` — any mismatch raises immediately.
---
### `src/recorder.py` — Recorder
Writes one frame's worth of data to disk each tick.
**Output per frame:**
- `camera/frame_XXXXXX.png` — BGR image via OpenCV
- `radar/frame_XXXXXX.npy``float64` array `[N, 4]`: depth, azimuth, altitude, velocity
- `lidar/frame_XXXXXX.npy``float32` array `[N, 4]`: x, y, z, intensity
- One line appended to `frames.jsonl`:
```json
{
"frame_id": 82,
"timestamp": 1234.56,
"scenario": "braking",
"brake_frame": 80,
"braked": true,
"ego_pose": {"x": 12.3, "y": 4.5, "z": 0.0, "yaw": -91.2},
"camera": "frame_000082.png",
"radar": "frame_000082.npy",
"lidar": "frame_000082.npy",
"ground_truth": [{"id": 42, "x": ..., "speed": 8.3}]
}
```
**`extra_meta` pattern:** `save()` accepts `extra_meta: dict` which is merged into the record.
Scenarios use `get_scenario_metadata()` to supply this — no recorder changes needed per scenario.
---
### `src/scenario_loader.py` — Dynamic Loader
Uses `importlib` to load `scenarios.<name>` at runtime. Inspects the module for a concrete
`ScenarioBase` subclass and returns an instance.
```python
from scenario_loader import load_scenario, list_scenarios
scenario = load_scenario("braking") # → BrakingScenario()
names = list_scenarios() # → ['braking', 'cutin', 'obstacle']
```
`list_scenarios()` uses `pkgutil.iter_modules` on the `scenarios/` package — auto-discovers
new files with no configuration changes.
---
### `scenarios/base.py` — ScenarioBase (Abstract)
The **plugin contract** all scenarios must fulfil.
```
Required abstracts: name (property), setup(), step(), cleanup()
Optional overrides: on_ego_spawned(), get_scenario_metadata()
Shared helpers: _destroy_actors(), _get_waypoint_ahead(distance, lane_offset)
Protected state: self._world, self._ego, self._tm, self._actors (list)
```
`_get_waypoint_ahead(distance, lane_offset)`:
- `lane_offset=0` → same lane as ego
- `lane_offset=1` → right lane
- `lane_offset=-1` → left lane
- Returns `carla.Waypoint` or `None`
**All NPC placement must go through this helper** — never use hardcoded world coordinates.
---
### Implemented Scenarios
| File | Class | Trigger | Effect |
|---|---|---|---|
| `braking.py` | `BrakingScenario` | Frame 80 | Lead vehicle (25 m ahead) applies emergency stop |
| `cutin.py` | `CutInScenario` | Frame 60 | NPC in left lane (15 m ahead) forced into ego lane |
| `obstacle.py` | `ObstacleScenario` | None (static) | Traffic cone placed 30 m ahead on ego lane |
All trigger frames and distances are driven by `config.py` constants.
---
### `data_to_mcap.py` — MCAP Converter
Scans `data/` for subfolders containing `frames.jsonl` and converts each to a `.mcap` file.
Output is written as `data/<session>/<session>.mcap` (skips if already exists).
**Foxglove topics produced:**
| Topic | Schema | Content |
|---|---|---|
| `/camera` | `foxglove.CompressedImage` | Base64-encoded PNG |
| `/lidar` | `foxglove.PointCloud` | X/Y/Z float32, Y-axis flipped for ROS convention |
| `/radar` | `foxglove.PointCloud` | Spherical → Cartesian conversion, Y-flipped |
| `/ego_pose` | `foxglove.Pose` | Position + quaternion from yaw angle |
> **Coordinate system note:** CARLA uses left-handed coords (Y increases right).
> The converter negates Y and yaw to match ROS/Foxglove right-handed convention.
**Run:**
```
python data_to_mcap.py
```
Processes all unprocessed session folders in `data/` automatically.
---
## Full Pipeline — End-to-End
```
1. CARLA server running (CarlaUE4.exe)
2. run.bat braking → data/braking_<ts>/ (PNG + NPY + JSONL)
3. run.bat cutin → data/cutin_<ts>/
4. run.bat obstacle → data/obstacle_<ts>/
5. python data_to_mcap.py → data/*/<session>.mcap
6. Open .mcap in Foxglove Studio
```
---
## How to Add a New Scenario
1. Create `scenarios/my_scenario.py`
2. Subclass `ScenarioBase`, implement the four required members
3. Append any spawned actors to `self._actors` so `_destroy_actors()` handles cleanup
4. Use `self._get_waypoint_ahead(d)` for all NPC placement
5. Add config constants in `config.py` (optional but recommended)
6. Run `python src/main.py --list-scenarios` — it appears automatically
```python
from scenarios.base import ScenarioBase
class MyScenario(ScenarioBase):
@property
def name(self): return "my_scenario"
def setup(self, world, ego_vehicle, traffic_manager):
self._world, self._ego, self._tm = world, ego_vehicle, traffic_manager
wp = self._get_waypoint_ahead(20)
npc = world.try_spawn_actor(bp, wp.transform)
if npc:
self._actors.append(npc)
def step(self, frame, ego_vehicle):
if frame == 100:
pass # trigger event
def cleanup(self):
self._destroy_actors()
def get_scenario_metadata(self):
return {"scenario": self.name}
```
---
## Environment & Dependencies
| Item | Value |
|---|---|
| CARLA version | 0.9.16 |
| Python environment | conda env `carla312` (miniconda) |
| Activation | `run.bat` calls `activate.bat carla312` automatically |
| Key Python packages | `carla`, `numpy`, `opencv-python` (`cv2`), `mcap` |
| CARLA server address | `localhost:2000` (hardcoded in `main.py`) |
| Traffic Manager port | `8000` (hardcoded in `main.py`) |
---
## Known Limitations & Future Work
| Area | Status | Notes |
|---|---|---|
| MCAP encoding | JSON (functional) | Should migrate to Protobuf/typed schemas for performance |
| Intersection scenario | Not implemented | Needs `waypoint.is_junction` awareness |
| Scenario parameterization | Config-based | Future: CLI `--param key=val` support |
| Foxglove layouts | Manual | Future: `.json` layout presets per scenario |
| Multi-ego support | Not implemented | Single ego vehicle assumed throughout |
| Radar Foxglove schema | Re-uses PointCloud | Correct but non-semantic; dedicated radar schema planned |
---
*Last updated: 2026-03-27 | Pipeline version: modular scenario-driven (post-refactor)*

34
run.bat

@ -0,0 +1,34 @@
@echo off
:: Activate the CARLA conda environment
call C:\ProgramData\miniconda3\Scripts\activate.bat carla312
:: ---------------------------------------------------------------
:: Usage:
:: run.bat braking
:: run.bat cutin
:: run.bat obstacle --frames 120
:: run.bat --list-scenarios
:: run.bat -l
:: ---------------------------------------------------------------
if "%~1"=="" (
echo [ERROR] No argument specified.
echo.
echo Usage:
echo run.bat braking
echo run.bat cutin
echo run.bat obstacle --frames 120
echo run.bat --list-scenarios
echo.
pause
exit /b 1
)
:: Pass --list-scenarios / -l directly, otherwise treat first arg as scenario name
if "%~1"=="--list-scenarios" (
python src/main.py --list-scenarios
) else if "%~1"=="-l" (
python src/main.py --list-scenarios
) else (
python src/main.py --scenario %*
)

1
scenarios/__init__.py

@ -0,0 +1 @@
# scenarios package

153
scenarios/base.py

@ -0,0 +1,153 @@
"""
scenarios/base.py
-----------------
Abstract base class for all CARLA simulation scenarios.
Every scenario must subclass ScenarioBase and implement:
- setup()
- step()
- cleanup()
- name (property)
Optional overrides:
- on_ego_spawned()
- get_scenario_metadata()
"""
from abc import ABC, abstractmethod
class ScenarioBase(ABC):
"""
Abstract interface for all ADAS simulation scenarios.
Lifecycle
---------
1. scenario.setup(world, ego_vehicle, traffic_manager)
Called once before the main loop.
Spawn NPC actors, configure initial state.
2. scenario.step(frame, ego_vehicle)
Called every simulation tick.
Drive per-frame behaviour (e.g. trigger a brake, change lane).
3. scenario.cleanup()
Called in the finally block after the loop.
Destroy every actor spawned by this scenario.
"""
def __init__(self):
self._world = None
self._ego = None
self._tm = None
self._actors = [] # sub-classes append their NPCs here
# ------------------------------------------------------------------
# Required interface
# ------------------------------------------------------------------
@property
@abstractmethod
def name(self) -> str:
"""Human-readable scenario identifier (e.g. 'braking')."""
@abstractmethod
def setup(self, world, ego_vehicle, traffic_manager) -> None:
"""
Initialise the scenario:
- Store world / ego / tm references.
- Spawn NPC actors.
- Configure initial autopilot behaviour.
Parameters
----------
world : carla.World
ego_vehicle : carla.Vehicle (the ego actor)
traffic_manager : carla.TrafficManager
"""
@abstractmethod
def step(self, frame: int, ego_vehicle) -> None:
"""
Per-tick logic executed inside the main simulation loop.
Parameters
----------
frame : int (1-based frame counter)
ego_vehicle : carla.Vehicle
"""
@abstractmethod
def cleanup(self) -> None:
"""
Destroy all actors spawned by this scenario.
Always call this in a finally block so CARLA stays clean.
"""
# ------------------------------------------------------------------
# Optional hooks (default no-ops — override as needed)
# ------------------------------------------------------------------
def on_ego_spawned(self, ego_vehicle) -> None:
"""
Called immediately after the ego vehicle is spawned.
Override to react to ego spawn before setup() is called.
"""
def get_scenario_metadata(self) -> dict:
"""
Return a dict that will be merged into every recorded frame's
metadata (written to frames.jsonl and MCAP).
Override to add scenario-specific fields, e.g.:
{"scenario": self.name, "brake_frame": self._brake_frame}
"""
return {"scenario": self.name}
# ------------------------------------------------------------------
# Shared helpers for subclasses
# ------------------------------------------------------------------
def _destroy_actors(self) -> None:
"""
Convenience method: destroy all actors tracked in self._actors.
Call this from cleanup() in subclasses.
"""
for actor in self._actors:
try:
if actor.is_alive:
actor.destroy()
except Exception as e:
print(f"[WARN] Could not destroy actor {actor.id}: {e}")
self._actors.clear()
def _get_waypoint_ahead(self, distance: float, lane_offset: int = 0):
"""
Return a waypoint `distance` metres ahead of the ego vehicle.
lane_offset = 0 same lane
lane_offset = 1 one lane to the right
lane_offset = -1 one lane to the left
Returns None if no waypoint was found.
"""
if self._world is None or self._ego is None:
return None
map_ = self._world.get_map()
ego_wp = map_.get_waypoint(self._ego.get_location())
# Apply lane offset first
if lane_offset != 0:
for _ in range(abs(lane_offset)):
if lane_offset > 0:
adj = ego_wp.get_right_lane()
else:
adj = ego_wp.get_left_lane()
if adj is None:
return None
ego_wp = adj
next_wps = ego_wp.next(distance)
if not next_wps:
return None
return next_wps[0]

119
scenarios/braking.py

@ -0,0 +1,119 @@
"""
scenarios/braking.py
--------------------
Lead Vehicle Hard Braking Scenario.
A vehicle is spawned directly ahead of the ego on the same lane
and driven by the Traffic Manager. At a configurable frame, the
lead vehicle performs an emergency stop, creating a forward-collision
test scenario for ADAS evaluation.
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import carla
import config
from scenarios.base import ScenarioBase
class BrakingScenario(ScenarioBase):
"""
Scenario: Lead vehicle hard braking.
Timeline
--------
Frame 1 BRAKE_FRAME-1 : Lead vehicle cruises ahead at cruise_speed km/h.
Frame BRAKE_FRAME+ : Lead vehicle applies emergency stop.
"""
LEAD_DISTANCE_M = getattr(config, "SCENARIO_LEAD_DISTANCE", 25)
BRAKE_FRAME = getattr(config, "SCENARIO_BRAKE_FRAME", 80)
CRUISE_SPEED = 30 # km/h, set via TM percentage of speed limit
def __init__(self):
super().__init__()
self._lead_vehicle = None
self._braked = False
# ------------------------------------------------------------------
# ScenarioBase interface
# ------------------------------------------------------------------
@property
def name(self) -> str:
return "braking"
def setup(self, world, ego_vehicle, traffic_manager) -> None:
self._world = world
self._ego = ego_vehicle
self._tm = traffic_manager
bp_lib = world.get_blueprint_library()
# Pick a distinct vehicle blueprint for the lead car
lead_bps = bp_lib.filter("vehicle.tesla.model3")
if not lead_bps:
lead_bps = bp_lib.filter("vehicle.*")
lead_bp = lead_bps[0]
# Place lead vehicle ahead on the same lane
spawn_wp = self._get_waypoint_ahead(self.LEAD_DISTANCE_M, lane_offset=0)
if spawn_wp is None:
raise RuntimeError(
"[BrakingScenario] Could not find waypoint ahead of ego. "
"Ensure ego is on a driveable lane."
)
self._lead_vehicle = world.try_spawn_actor(lead_bp, spawn_wp.transform)
if self._lead_vehicle is None:
raise RuntimeError(
"[BrakingScenario] Failed to spawn lead vehicle at waypoint "
f"{spawn_wp.transform.location}. Spot may be occupied."
)
self._actors.append(self._lead_vehicle)
# TM: cruise just below the speed limit
self._lead_vehicle.set_autopilot(True, traffic_manager.get_port())
traffic_manager.vehicle_percentage_speed_difference(
self._lead_vehicle, -10 # 10 % above speed limit → ~33–44 km/h
)
traffic_manager.auto_lane_change(self._lead_vehicle, False)
traffic_manager.distance_to_leading_vehicle(self._lead_vehicle, 0)
print(
f"[{self.name}] Lead vehicle spawned {self.LEAD_DISTANCE_M} m ahead. "
f"Emergency brake at frame {self.BRAKE_FRAME}."
)
def step(self, frame: int, ego_vehicle) -> None:
if frame == self.BRAKE_FRAME and not self._braked:
if self._lead_vehicle and self._lead_vehicle.is_alive:
# Disengage TM autopilot and apply full brake
self._lead_vehicle.set_autopilot(False)
control = carla.VehicleControl()
control.throttle = 0.0
control.brake = 1.0
control.hand_brake = True
self._lead_vehicle.apply_control(control)
self._braked = True
print(f"[{self.name}] EMERGENCY BRAKE applied at frame {frame}.")
def cleanup(self) -> None:
self._destroy_actors()
print(f"[{self.name}] Cleanup complete.")
# ------------------------------------------------------------------
# Optional overrides
# ------------------------------------------------------------------
def get_scenario_metadata(self) -> dict:
return {
"scenario": self.name,
"lead_distance_m": self.LEAD_DISTANCE_M,
"brake_frame": self.BRAKE_FRAME,
"braked": self._braked,
}

111
scenarios/cutin.py

@ -0,0 +1,111 @@
"""
scenarios/cutin.py
------------------
Adjacent Lane Cut-In Scenario.
An NPC vehicle is spawned in the adjacent lane, slightly ahead of the
ego. At a configurable frame it is forced to change into the ego's lane,
simulating an unexpected lane cut-in event.
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
from scenarios.base import ScenarioBase
class CutInScenario(ScenarioBase):
"""
Scenario: Adjacent lane cut-in.
Timeline
--------
Frame 1 CUTIN_FRAME-1 : NPC drives in adjacent lane, ahead of ego.
Frame CUTIN_FRAME+ : TM forced lane change into ego lane.
"""
NPC_DISTANCE_M = getattr(config, "SCENARIO_CUTIN_DISTANCE", 15)
CUTIN_FRAME = getattr(config, "SCENARIO_CUTIN_FRAME", 60)
LANE_OFFSET = -1 # -1 = left lane, 1 = right lane
def __init__(self):
super().__init__()
self._npc = None
self._cut_triggered = False
# ------------------------------------------------------------------
# ScenarioBase interface
# ------------------------------------------------------------------
@property
def name(self) -> str:
return "cutin"
def setup(self, world, ego_vehicle, traffic_manager) -> None:
self._world = world
self._ego = ego_vehicle
self._tm = traffic_manager
bp_lib = world.get_blueprint_library()
npc_bps = bp_lib.filter("vehicle.audi.*")
if not npc_bps:
npc_bps = bp_lib.filter("vehicle.*")
npc_bp = npc_bps[0]
# Spawn in adjacent lane, ahead of ego
spawn_wp = self._get_waypoint_ahead(
self.NPC_DISTANCE_M, lane_offset=self.LANE_OFFSET
)
if spawn_wp is None:
raise RuntimeError(
f"[CutInScenario] No adjacent lane waypoint found "
f"(lane_offset={self.LANE_OFFSET}). "
"Try a map with multiple lanes."
)
self._npc = world.try_spawn_actor(npc_bp, spawn_wp.transform)
if self._npc is None:
raise RuntimeError(
"[CutInScenario] Failed to spawn NPC at "
f"{spawn_wp.transform.location}. Spot may be occupied."
)
self._actors.append(self._npc)
self._npc.set_autopilot(True, traffic_manager.get_port())
traffic_manager.auto_lane_change(self._npc, False) # prevent random changes
traffic_manager.vehicle_percentage_speed_difference(self._npc, 0)
print(
f"[{self.name}] NPC spawned in adjacent lane "
f"{self.NPC_DISTANCE_M} m ahead. Cut-in at frame {self.CUTIN_FRAME}."
)
def step(self, frame: int, ego_vehicle) -> None:
if frame == self.CUTIN_FRAME and not self._cut_triggered:
if self._npc and self._npc.is_alive:
# force_lane_change: True = left, False = right
force_left = (self.LANE_OFFSET < 0)
self._tm.force_lane_change(self._npc, force_left)
self._cut_triggered = True
print(f"[{self.name}] Lane change triggered at frame {frame}.")
def cleanup(self) -> None:
self._destroy_actors()
print(f"[{self.name}] Cleanup complete.")
# ------------------------------------------------------------------
# Optional overrides
# ------------------------------------------------------------------
def get_scenario_metadata(self) -> dict:
return {
"scenario": self.name,
"npc_distance_m": self.NPC_DISTANCE_M,
"cutin_frame": self.CUTIN_FRAME,
"lane_offset": self.LANE_OFFSET,
"cut_triggered": self._cut_triggered,
}

108
scenarios/obstacle.py

@ -0,0 +1,108 @@
"""
scenarios/obstacle.py
---------------------
Static Obstacle Scenario.
A traffic cone (or configurable static prop) is placed on the ego lane
ahead of the vehicle. The ego autopilot must detect and react to the
obstruction. No per-frame logic is needed this is a purely static scene.
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
from scenarios.base import ScenarioBase
class ObstacleScenario(ScenarioBase):
"""
Scenario: Static obstacle on the ego lane.
A prop is spawned on the road ahead. The ego continues under
autopilot and must navigate around or brake for the obstacle.
"""
OBSTACLE_DISTANCE_M = getattr(config, "SCENARIO_OBSTACLE_DISTANCE", 30)
# CARLA blueprint name for the static prop
PROP_BLUEPRINT = getattr(
config, "SCENARIO_OBSTACLE_PROP", "static.prop.trafficcone01"
)
def __init__(self):
super().__init__()
self._obstacle = None
# ------------------------------------------------------------------
# ScenarioBase interface
# ------------------------------------------------------------------
@property
def name(self) -> str:
return "obstacle"
def setup(self, world, ego_vehicle, traffic_manager) -> None:
self._world = world
self._ego = ego_vehicle
self._tm = traffic_manager
bp_lib = world.get_blueprint_library()
# Locate prop blueprint
prop_bps = bp_lib.filter(self.PROP_BLUEPRINT)
if not prop_bps:
# Fallback to any traffic cone available
prop_bps = bp_lib.filter("static.prop.trafficcone*")
if not prop_bps:
raise RuntimeError(
f"[ObstacleScenario] Blueprint '{self.PROP_BLUEPRINT}' not found. "
"Check CARLA content pack."
)
prop_bp = prop_bps[0]
# Place obstacle on the ego lane ahead
spawn_wp = self._get_waypoint_ahead(self.OBSTACLE_DISTANCE_M, lane_offset=0)
if spawn_wp is None:
raise RuntimeError(
"[ObstacleScenario] Could not find waypoint ahead — "
"ensure ego is on a driveable road."
)
# Offset slightly to lane centre to avoid collision with road mesh
obstacle_transform = spawn_wp.transform
obstacle_transform.location.z += 0.1 # tiny Z lift avoids clipping
self._obstacle = world.try_spawn_actor(prop_bp, obstacle_transform)
if self._obstacle is None:
raise RuntimeError(
"[ObstacleScenario] Failed to spawn obstacle at "
f"{obstacle_transform.location}."
)
self._actors.append(self._obstacle)
print(
f"[{self.name}] '{self.PROP_BLUEPRINT}' placed "
f"{self.OBSTACLE_DISTANCE_M} m ahead of ego."
)
def step(self, frame: int, ego_vehicle) -> None:
# Static scenario — no per-frame logic required
pass
def cleanup(self) -> None:
self._destroy_actors()
print(f"[{self.name}] Cleanup complete.")
# ------------------------------------------------------------------
# Optional overrides
# ------------------------------------------------------------------
def get_scenario_metadata(self) -> dict:
return {
"scenario": self.name,
"obstacle_prop": self.PROP_BLUEPRINT,
"obstacle_distance_m": self.OBSTACLE_DISTANCE_M,
}

194
src/main.py

@ -0,0 +1,194 @@
"""
src/main.py
-----------
Scenario-agnostic orchestrator for the CARLA ADAS simulation pipeline.
Usage
-----
python src/main.py --scenario braking
python src/main.py --scenario cutin
python src/main.py --scenario obstacle --frames 120
python src/main.py --list-scenarios
This file never changes when new scenarios are added.
All scenario logic lives in scenarios/<name>.py.
"""
import sys
import os
import argparse
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import config
from scenario_loader import load_scenario, list_scenarios
# -----------------------------------------------------------------------
# CLI
# -----------------------------------------------------------------------
def parse_args():
parser = argparse.ArgumentParser(
description="CARLA ADAS Simulation — scenario-driven runner"
)
parser.add_argument(
"--scenario", "-s",
type=str,
default=getattr(config, "DEFAULT_SCENARIO", "braking"),
help="Scenario name to run (e.g. braking, cutin, obstacle)"
)
parser.add_argument(
"--frames", "-f",
type=int,
default=None,
help="Override config.MAX_FRAMES for this run"
)
parser.add_argument(
"--list-scenarios", "-l",
action="store_true",
help="Print available scenarios and exit"
)
return parser.parse_args()
# -----------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------
def main():
args = parse_args()
if args.list_scenarios:
print("Available scenarios:")
for s in list_scenarios():
print(f" - {s}")
return
# CARLA imports only needed for live simulation
import carla
from sensors import SensorManager
from recorder import Recorder
max_frames = args.frames if args.frames is not None else config.MAX_FRAMES
# ------------------------------------------------------------------
# 1. Load scenario (no CARLA needed yet)
# ------------------------------------------------------------------
scenario = load_scenario(args.scenario)
# ------------------------------------------------------------------
# 2. Connect to CARLA
# ------------------------------------------------------------------
client = carla.Client("localhost", 2000)
client.set_timeout(10.0)
world = client.get_world()
blueprint_library = world.get_blueprint_library()
map_ = world.get_map()
# Clean slate: remove all existing vehicles and walkers
for actor in world.get_actors().filter("vehicle.*"):
actor.destroy()
for actor in world.get_actors().filter("walker.*"):
actor.destroy()
print("[INFO] Cleared existing actors")
# ------------------------------------------------------------------
# 3. Traffic Manager + Sync mode
# ------------------------------------------------------------------
traffic_manager = client.get_trafficmanager(8000)
traffic_manager.set_synchronous_mode(True)
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = config.DELTA_SECONDS
world.apply_settings(settings)
print(f"[INFO] Sync mode enabled @ {1/config.DELTA_SECONDS:.1f} FPS")
# ------------------------------------------------------------------
# 4. Spawn ego vehicle
# ------------------------------------------------------------------
vehicle_bp = blueprint_library.filter("model3")[0]
spawn_points = map_.get_spawn_points()
ego_vehicle = None
for sp in spawn_points:
ego_vehicle = world.try_spawn_actor(vehicle_bp, sp)
if ego_vehicle:
print("[INFO] Ego spawned successfully")
break
if ego_vehicle is None:
raise RuntimeError("Failed to spawn ego vehicle")
ego_vehicle.set_autopilot(True, traffic_manager.get_port())
# Notify scenario of ego spawn (optional hook)
scenario.on_ego_spawned(ego_vehicle)
# ------------------------------------------------------------------
# 5. Attach sensors + recorder
# ------------------------------------------------------------------
sensor_manager = SensorManager(world, blueprint_library, ego_vehicle)
sensor_manager.spawn_sensors()
recorder = Recorder(scenario_name=scenario.name)
spectator = world.get_spectator()
# ------------------------------------------------------------------
# 6. Scenario setup
# ------------------------------------------------------------------
scenario.setup(world, ego_vehicle, traffic_manager)
print(f"[INFO] Running scenario: '{scenario.name}' for {max_frames} frames")
frame_count = 0
# ------------------------------------------------------------------
# 7. Main simulation loop
# ------------------------------------------------------------------
try:
while frame_count < max_frames:
world.tick()
frame_count += 1
cam, radar, lidar = sensor_manager.get_data()
transform = ego_vehicle.get_transform()
spectator.set_transform(
carla.Transform(
transform.location + carla.Location(x=-6, z=3),
transform.rotation
)
)
# Merge scenario metadata into every frame record
extra_meta = scenario.get_scenario_metadata()
recorder.save(cam, radar, lidar, ego_vehicle, extra_meta=extra_meta)
scenario.step(frame_count, ego_vehicle)
print(f"[FRAME {frame_count:04d}/{max_frames}] [{scenario.name}] SAVED")
# ------------------------------------------------------------------
# 8. Shutdown (always runs)
# ------------------------------------------------------------------
finally:
print("[INFO] Shutting down...")
scenario.cleanup()
sensor_manager.destroy()
ego_vehicle.destroy()
recorder.close()
# Restore async mode
settings.synchronous_mode = False
settings.fixed_delta_seconds = None
world.apply_settings(settings)
print("[INFO] Done")
if __name__ == "__main__":
main()

122
src/recorder.py

@ -0,0 +1,122 @@
import os
import json
import numpy as np
import cv2
import datetime
class Recorder:
def __init__(self, base_path="data", scenario_name="run"):
# Folder name: <scenario>_YYYYMMDD_HHMMSS → easy to identify in file explorer
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
session_name = f"{scenario_name}_{timestamp}"
self.base_path = os.path.join(base_path, session_name)
self.camera_path = os.path.join(self.base_path, "camera")
self.radar_path = os.path.join(self.base_path, "radar")
self.lidar_path = os.path.join(self.base_path, "lidar")
self.meta_file = os.path.join(self.base_path, "frames.jsonl")
os.makedirs(self.camera_path, exist_ok=True)
os.makedirs(self.radar_path, exist_ok=True)
os.makedirs(self.lidar_path, exist_ok=True)
self.frame_id = 0
self.meta_f = open(self.meta_file, "w")
print(f"--- Recorder initialized. Saving data to: {self.base_path} ---")
# ---------------- SAVE FRAME ----------------
def save(self, cam, radar, lidar, vehicle, extra_meta: dict = None):
self.frame_id += 1
# -------- CAMERA --------
img = np.frombuffer(cam.raw_data, dtype=np.uint8)
img = img.reshape((cam.height, cam.width, 4))[:, :, :3]
cam_file = f"frame_{self.frame_id:06d}.png"
cam_path = os.path.join(self.camera_path, cam_file)
cv2.imwrite(cam_path, img)
# -------- RADAR --------
radar_points = []
for d in radar:
radar_points.append([
d.depth,
d.azimuth,
d.altitude,
d.velocity
])
if radar_points:
radar_np = np.array(radar_points)
else:
radar_np = np.empty((0, 4))
radar_file = f"frame_{self.frame_id:06d}.npy"
radar_path = os.path.join(self.radar_path, radar_file)
np.save(radar_path, radar_np)
# -------- LIDAR --------
lidar_data = np.frombuffer(lidar.raw_data, dtype=np.float32)
lidar_data = np.reshape(lidar_data, (-1, 4)) # x, y, z, intensity
lidar_file = f"frame_{self.frame_id:06d}.npy"
lidar_path = os.path.join(self.lidar_path, lidar_file)
np.save(lidar_path, lidar_data)
# -------- EGO POSE --------
transform = vehicle.get_transform()
# -------- GROUND TRUTH VEHICLES --------
world = vehicle.get_world()
actors = world.get_actors().filter("vehicle.*")
gt_vehicles = []
for actor in actors:
if actor.id == vehicle.id:
continue # skip ego
t = actor.get_transform()
v = actor.get_velocity()
speed = (v.x**2 + v.y**2 + v.z**2) ** 0.5
gt_vehicles.append({
"id": actor.id,
"x": t.location.x,
"y": t.location.y,
"z": t.location.z,
"vx": v.x,
"vy": v.y,
"vz": v.z,
"speed": speed
})
# -------- RECORD --------
record = {
"frame_id": self.frame_id,
"timestamp": cam.timestamp,
"ego_pose": {
"x": transform.location.x,
"y": transform.location.y,
"z": transform.location.z,
"yaw": transform.rotation.yaw
},
"camera": cam_file,
"radar": radar_file,
"lidar": lidar_file,
"ground_truth": gt_vehicles,
}
# Merge scenario metadata (e.g. {"scenario": "braking", "brake_frame": 80})
if extra_meta:
record.update(extra_meta)
self.meta_f.write(json.dumps(record) + "\n")
self.meta_f.flush() # safer in case of crash
# ---------------- CLEANUP ----------------
def close(self):
self.meta_f.close()

92
src/scenario_loader.py

@ -0,0 +1,92 @@
"""
src/scenario_loader.py
----------------------
Dynamic scenario discovery and loading.
Usage
-----
from scenario_loader import load_scenario, list_scenarios
scenario = load_scenario("braking") # returns BrakingScenario()
names = list_scenarios() # ['braking', 'cutin', 'obstacle']
"""
import importlib
import inspect
import pkgutil
import sys
import os
# Ensure project root is on path so 'scenarios' package is importable
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from scenarios.base import ScenarioBase
def list_scenarios() -> list[str]:
"""
Discover all scenario modules inside the 'scenarios' package.
Returns a sorted list of module names (e.g. ['braking', 'cutin', 'obstacle']).
Skips 'base' and '__init__'.
"""
import scenarios as _scenarios_pkg
names = []
for finder, module_name, is_pkg in pkgutil.iter_modules(_scenarios_pkg.__path__):
if module_name in ("base",):
continue
names.append(module_name)
return sorted(names)
def load_scenario(name: str) -> ScenarioBase:
"""
Dynamically import scenario module 'scenarios.<name>' and return
an instance of the concrete ScenarioBase subclass found within it.
Parameters
----------
name : str
Scenario identifier, e.g. 'braking', 'cutin', 'obstacle'.
Returns
-------
ScenarioBase
An instantiated, ready-to-use scenario object.
Raises
------
ModuleNotFoundError
If no module 'scenarios.<name>' exists.
ValueError
If the module contains no concrete ScenarioBase subclass.
"""
module_path = f"scenarios.{name}"
try:
module = importlib.import_module(module_path)
except ModuleNotFoundError as exc:
available = list_scenarios()
raise ModuleNotFoundError(
f"Scenario '{name}' not found. "
f"Available scenarios: {available}"
) from exc
# Find the first concrete subclass of ScenarioBase in the module
for _, obj in inspect.getmembers(module, inspect.isclass):
if (
issubclass(obj, ScenarioBase)
and obj is not ScenarioBase
and not inspect.isabstract(obj)
):
instance = obj()
print(f"[ScenarioLoader] Loaded scenario: '{instance.name}' ({obj.__name__})")
return instance
raise ValueError(
f"No concrete ScenarioBase subclass found in module '{module_path}'. "
"Ensure your scenario class subclasses ScenarioBase and implements all abstract methods."
)

110
src/sensors.py

@ -0,0 +1,110 @@
import sys
import os
# Add project root to Python path
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import carla
import queue
import config
class SensorManager:
def __init__(self, world, blueprint_library, vehicle):
self.world = world
self.bp_lib = blueprint_library
self.vehicle = vehicle
self.camera = None
self.radar = None
self.lidar = None
# Queues for synchronization
self.camera_queue = queue.Queue()
self.radar_queue = queue.Queue()
self.lidar_queue = queue.Queue()
def spawn_sensors(self):
self._spawn_camera()
self._spawn_radar()
self._spawn_lidar()
# ---------------- CAMERA ----------------
def _spawn_camera(self):
cam_bp = self.bp_lib.find("sensor.camera.rgb")
cam_bp.set_attribute("image_size_x", str(config.CAM_WIDTH))
cam_bp.set_attribute("image_size_y", str(config.CAM_HEIGHT))
cam_bp.set_attribute("fov", str(config.CAM_FOV))
transform = carla.Transform(
carla.Location(x=1.5, z=2.4)
)
self.camera = self.world.spawn_actor(
cam_bp, transform, attach_to=self.vehicle
)
self.camera.listen(lambda data: self.camera_queue.put(data))
# ---------------- RADAR ----------------
def _spawn_radar(self):
radar_bp = self.bp_lib.find("sensor.other.radar")
radar_bp.set_attribute("horizontal_fov", str(config.RADAR_HORIZONTAL_FOV))
radar_bp.set_attribute("vertical_fov", str(config.RADAR_VERTICAL_FOV))
radar_bp.set_attribute("range", str(config.RADAR_RANGE))
transform = carla.Transform(
carla.Location(x=2.0, z=1.0)
)
self.radar = self.world.spawn_actor(
radar_bp, transform, attach_to=self.vehicle
)
self.radar.listen(lambda data: self.radar_queue.put(data))
# ---------------- LIDAR ----------------
def _spawn_lidar(self):
lidar_bp = self.bp_lib.find("sensor.lidar.ray_cast")
lidar_bp.set_attribute("range", str(config.LIDAR_RANGE))
lidar_bp.set_attribute("channels", str(config.LIDAR_CHANNELS))
lidar_bp.set_attribute("points_per_second", str(config.LIDAR_POINTS_PER_SECOND))
lidar_bp.set_attribute("rotation_frequency", str(config.LIDAR_ROTATION_FREQUENCY))
transform = carla.Transform(
carla.Location(x=0.0, z=2.5)
)
self.lidar = self.world.spawn_actor(
lidar_bp, transform, attach_to=self.vehicle
)
self.lidar.listen(lambda data: self.lidar_queue.put(data))
# ---------------- FETCH SYNC DATA ----------------
def get_data(self):
cam = self.camera_queue.get()
radar = self.radar_queue.get()
lidar = self.lidar_queue.get()
# Strong sync check
assert cam.frame == radar.frame == lidar.frame, "Frame mismatch!"
return cam, radar, lidar
# ---------------- CLEANUP ----------------
def destroy(self):
if self.camera:
self.camera.stop()
self.camera.destroy()
if self.radar:
self.radar.stop()
self.radar.destroy()
if self.lidar:
self.lidar.stop()
self.lidar.destroy()
Loading…
Cancel
Save