You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
204 lines
6.5 KiB
204 lines
6.5 KiB
"""
|
|
scenarios/base.py
|
|
-----------------
|
|
Abstract base class for all CARLA simulation scenarios.
|
|
|
|
Every scenario must subclass ScenarioBase and implement:
|
|
- setup()
|
|
- step()
|
|
- cleanup()
|
|
- name (property)
|
|
|
|
Optional overrides:
|
|
- on_ego_spawned()
|
|
- get_scenario_metadata()
|
|
- weather (property)
|
|
- max_frames (property)
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
|
class ScenarioBase(ABC):
|
|
"""
|
|
Abstract interface for all ADAS simulation scenarios.
|
|
|
|
Lifecycle
|
|
---------
|
|
1. scenario.setup(world, ego_vehicle, traffic_manager)
|
|
Called once before the main loop.
|
|
Spawn NPC actors, configure initial state.
|
|
|
|
2. scenario.step(frame, ego_vehicle)
|
|
Called every simulation tick.
|
|
Drive per-frame behaviour (e.g. trigger a brake, change lane).
|
|
|
|
3. scenario.cleanup()
|
|
Called in the finally block after the loop.
|
|
Destroy every actor spawned by this scenario.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._world = None
|
|
self._ego = None
|
|
self._tm = None
|
|
self._actors = [] # sub-classes append their NPCs here
|
|
|
|
# ------------------------------------------------------------------
|
|
# Required interface
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
@abstractmethod
|
|
def name(self) -> str:
|
|
"""Human-readable scenario identifier (e.g. 'braking')."""
|
|
|
|
@abstractmethod
|
|
def setup(self, world, ego_vehicle, traffic_manager) -> None:
|
|
"""
|
|
Initialise the scenario:
|
|
- Store world / ego / tm references.
|
|
- Spawn NPC actors.
|
|
- Configure initial autopilot behaviour.
|
|
|
|
Parameters
|
|
----------
|
|
world : carla.World
|
|
ego_vehicle : carla.Vehicle (the ego actor)
|
|
traffic_manager : carla.TrafficManager
|
|
"""
|
|
|
|
@abstractmethod
|
|
def step(self, frame_count: int, ego_vehicle, pbar=None) -> None:
|
|
"""
|
|
Per-tick logic executed inside the main simulation loop.
|
|
|
|
Parameters
|
|
----------
|
|
frame : int (1-based frame counter)
|
|
ego_vehicle : carla.Vehicle
|
|
"""
|
|
|
|
@abstractmethod
|
|
def cleanup(self) -> None:
|
|
"""
|
|
Destroy all actors spawned by this scenario.
|
|
Always call this in a finally block so CARLA stays clean.
|
|
"""
|
|
|
|
# ------------------------------------------------------------------
|
|
# Optional hooks & Properties (default no-ops — override as needed)
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def weather(self) -> str:
|
|
"""
|
|
Override to request a specific weather preset for this scenario.
|
|
If None, the global config or CLI default will be used.
|
|
"""
|
|
return None
|
|
|
|
@property
|
|
def max_frames(self) -> int:
|
|
"""
|
|
Override to request a specific duration for this scenario.
|
|
If None, the global config or CLI default will be used.
|
|
"""
|
|
return None
|
|
|
|
@property
|
|
def ego_spawn_point(self):
|
|
"""
|
|
Override to request a specific spawn point for the ego vehicle.
|
|
Returns:
|
|
- int: index in the world's spawn points list.
|
|
- carla.Transform: absolute spawn position and rotation.
|
|
If None, the global config or CLI default index will be used.
|
|
"""
|
|
return None
|
|
|
|
def on_ego_spawned(self, ego_vehicle) -> None:
|
|
"""
|
|
Called immediately after the ego vehicle is spawned.
|
|
Override to react to ego spawn before setup() is called.
|
|
"""
|
|
|
|
def get_scenario_metadata(self) -> dict:
|
|
"""
|
|
Return a dict that will be merged into every recorded frame's
|
|
metadata (written to frames.jsonl and MCAP).
|
|
|
|
Override to add scenario-specific fields, e.g.:
|
|
{"scenario": self.name, "brake_frame": self._brake_frame}
|
|
"""
|
|
return {"scenario": self.name}
|
|
|
|
def apply_parameters(self, params: dict) -> None:
|
|
"""
|
|
Allow the orchestrator to inject parameters parsed from CLI.
|
|
This maps keys in `params` to existing attributes on the scenario.
|
|
Useful for non-code tuning.
|
|
"""
|
|
for key, value in params.items():
|
|
if hasattr(self, key):
|
|
# Try to preserve type
|
|
existing_val = getattr(self, key)
|
|
try:
|
|
if isinstance(existing_val, bool):
|
|
typed_val = str(value).lower() in ("true", "1", "yes")
|
|
else:
|
|
typed_val = type(existing_val)(value)
|
|
setattr(self, key, typed_val)
|
|
print(f"[Scenario] Injected param: {key} = {typed_val}")
|
|
except Exception as e:
|
|
print(f"[WARN] Failed to inject param '{key}={value}': {e}")
|
|
else:
|
|
print(f"[WARN] Scenario '{self.name}' has no attribute '{key}'")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Shared helpers for subclasses
|
|
# ------------------------------------------------------------------
|
|
|
|
def _destroy_actors(self) -> None:
|
|
"""
|
|
Convenience method: destroy all actors tracked in self._actors.
|
|
Call this from cleanup() in subclasses.
|
|
"""
|
|
for actor in self._actors:
|
|
try:
|
|
if actor.is_alive:
|
|
actor.destroy()
|
|
except Exception as e:
|
|
print(f"[WARN] Could not destroy actor {actor.id}: {e}")
|
|
self._actors.clear()
|
|
|
|
def _get_waypoint_ahead(self, distance: float, lane_offset: int = 0):
|
|
"""
|
|
Return a waypoint `distance` metres ahead of the ego vehicle.
|
|
lane_offset = 0 → same lane
|
|
lane_offset = 1 → one lane to the right
|
|
lane_offset = -1 → one lane to the left
|
|
|
|
Returns None if no waypoint was found.
|
|
"""
|
|
if self._world is None or self._ego is None:
|
|
return None
|
|
|
|
map_ = self._world.get_map()
|
|
ego_wp = map_.get_waypoint(self._ego.get_location())
|
|
|
|
# Apply lane offset first
|
|
if lane_offset != 0:
|
|
for _ in range(abs(lane_offset)):
|
|
if lane_offset > 0:
|
|
adj = ego_wp.get_right_lane()
|
|
else:
|
|
adj = ego_wp.get_left_lane()
|
|
if adj is None:
|
|
return None
|
|
ego_wp = adj
|
|
|
|
next_wps = ego_wp.next(distance)
|
|
if not next_wps:
|
|
return None
|
|
return next_wps[0]
|