CARLA ? C-Shenron based Simualtor for Sensor data generation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

204 lines
6.5 KiB

"""
scenarios/base.py
-----------------
Abstract base class for all CARLA simulation scenarios.
Every scenario must subclass ScenarioBase and implement:
- setup()
- step()
- cleanup()
- name (property)
Optional overrides:
- on_ego_spawned()
- get_scenario_metadata()
- weather (property)
- max_frames (property)
"""
from abc import ABC, abstractmethod
class ScenarioBase(ABC):
"""
Abstract interface for all ADAS simulation scenarios.
Lifecycle
---------
1. scenario.setup(world, ego_vehicle, traffic_manager)
Called once before the main loop.
Spawn NPC actors, configure initial state.
2. scenario.step(frame, ego_vehicle)
Called every simulation tick.
Drive per-frame behaviour (e.g. trigger a brake, change lane).
3. scenario.cleanup()
Called in the finally block after the loop.
Destroy every actor spawned by this scenario.
"""
def __init__(self):
self._world = None
self._ego = None
self._tm = None
self._actors = [] # sub-classes append their NPCs here
# ------------------------------------------------------------------
# Required interface
# ------------------------------------------------------------------
@property
@abstractmethod
def name(self) -> str:
"""Human-readable scenario identifier (e.g. 'braking')."""
@abstractmethod
def setup(self, world, ego_vehicle, traffic_manager) -> None:
"""
Initialise the scenario:
- Store world / ego / tm references.
- Spawn NPC actors.
- Configure initial autopilot behaviour.
Parameters
----------
world : carla.World
ego_vehicle : carla.Vehicle (the ego actor)
traffic_manager : carla.TrafficManager
"""
@abstractmethod
def step(self, frame_count: int, ego_vehicle, pbar=None) -> None:
"""
Per-tick logic executed inside the main simulation loop.
Parameters
----------
frame : int (1-based frame counter)
ego_vehicle : carla.Vehicle
"""
@abstractmethod
def cleanup(self) -> None:
"""
Destroy all actors spawned by this scenario.
Always call this in a finally block so CARLA stays clean.
"""
# ------------------------------------------------------------------
# Optional hooks & Properties (default no-ops — override as needed)
# ------------------------------------------------------------------
@property
def weather(self) -> str:
"""
Override to request a specific weather preset for this scenario.
If None, the global config or CLI default will be used.
"""
return None
@property
def max_frames(self) -> int:
"""
Override to request a specific duration for this scenario.
If None, the global config or CLI default will be used.
"""
return None
@property
def ego_spawn_point(self):
"""
Override to request a specific spawn point for the ego vehicle.
Returns:
- int: index in the world's spawn points list.
- carla.Transform: absolute spawn position and rotation.
If None, the global config or CLI default index will be used.
"""
return None
def on_ego_spawned(self, ego_vehicle) -> None:
"""
Called immediately after the ego vehicle is spawned.
Override to react to ego spawn before setup() is called.
"""
def get_scenario_metadata(self) -> dict:
"""
Return a dict that will be merged into every recorded frame's
metadata (written to frames.jsonl and MCAP).
Override to add scenario-specific fields, e.g.:
{"scenario": self.name, "brake_frame": self._brake_frame}
"""
return {"scenario": self.name}
def apply_parameters(self, params: dict) -> None:
"""
Allow the orchestrator to inject parameters parsed from CLI.
This maps keys in `params` to existing attributes on the scenario.
Useful for non-code tuning.
"""
for key, value in params.items():
if hasattr(self, key):
# Try to preserve type
existing_val = getattr(self, key)
try:
if isinstance(existing_val, bool):
typed_val = str(value).lower() in ("true", "1", "yes")
else:
typed_val = type(existing_val)(value)
setattr(self, key, typed_val)
print(f"[Scenario] Injected param: {key} = {typed_val}")
except Exception as e:
print(f"[WARN] Failed to inject param '{key}={value}': {e}")
else:
print(f"[WARN] Scenario '{self.name}' has no attribute '{key}'")
# ------------------------------------------------------------------
# Shared helpers for subclasses
# ------------------------------------------------------------------
def _destroy_actors(self) -> None:
"""
Convenience method: destroy all actors tracked in self._actors.
Call this from cleanup() in subclasses.
"""
for actor in self._actors:
try:
if actor.is_alive:
actor.destroy()
except Exception as e:
print(f"[WARN] Could not destroy actor {actor.id}: {e}")
self._actors.clear()
def _get_waypoint_ahead(self, distance: float, lane_offset: int = 0):
"""
Return a waypoint `distance` metres ahead of the ego vehicle.
lane_offset = 0 → same lane
lane_offset = 1 → one lane to the right
lane_offset = -1 → one lane to the left
Returns None if no waypoint was found.
"""
if self._world is None or self._ego is None:
return None
map_ = self._world.get_map()
ego_wp = map_.get_waypoint(self._ego.get_location())
# Apply lane offset first
if lane_offset != 0:
for _ in range(abs(lane_offset)):
if lane_offset > 0:
adj = ego_wp.get_right_lane()
else:
adj = ego_wp.get_left_lane()
if adj is None:
return None
ego_wp = adj
next_wps = ego_wp.next(distance)
if not next_wps:
return None
return next_wps[0]