""" scenarios/base.py ----------------- Abstract base class for all CARLA simulation scenarios. Every scenario must subclass ScenarioBase and implement: - setup() - step() - cleanup() - name (property) Optional overrides: - on_ego_spawned() - get_scenario_metadata() - weather (property) - max_frames (property) """ from abc import ABC, abstractmethod class ScenarioBase(ABC): """ Abstract interface for all ADAS simulation scenarios. Lifecycle --------- 1. scenario.setup(world, ego_vehicle, traffic_manager) Called once before the main loop. Spawn NPC actors, configure initial state. 2. scenario.step(frame, ego_vehicle) Called every simulation tick. Drive per-frame behaviour (e.g. trigger a brake, change lane). 3. scenario.cleanup() Called in the finally block after the loop. Destroy every actor spawned by this scenario. """ def __init__(self): self._world = None self._ego = None self._tm = None self._actors = [] # sub-classes append their NPCs here # ------------------------------------------------------------------ # Required interface # ------------------------------------------------------------------ @property @abstractmethod def name(self) -> str: """Human-readable scenario identifier (e.g. 'braking').""" @abstractmethod def setup(self, world, ego_vehicle, traffic_manager) -> None: """ Initialise the scenario: - Store world / ego / tm references. - Spawn NPC actors. - Configure initial autopilot behaviour. Parameters ---------- world : carla.World ego_vehicle : carla.Vehicle (the ego actor) traffic_manager : carla.TrafficManager """ @abstractmethod def step(self, frame_count: int, ego_vehicle, pbar=None) -> None: """ Per-tick logic executed inside the main simulation loop. Parameters ---------- frame : int (1-based frame counter) ego_vehicle : carla.Vehicle """ @abstractmethod def cleanup(self) -> None: """ Destroy all actors spawned by this scenario. Always call this in a finally block so CARLA stays clean. """ # ------------------------------------------------------------------ # Logging Utilities # ------------------------------------------------------------------ def log_frame(self, frame: int, ego_vehicle, extra_msg: str = "", pbar=None) -> None: """ Unified 1:1 logging for all scenarios. This method ensures clean console output by manually managing tqdm state to avoid redundant bar updates in redirected log files. """ import math vel = ego_vehicle.get_velocity() speed = 3.6 * math.sqrt(vel.x**2 + vel.y**2 + vel.z**2) msg = f"[FRAME {frame:03d}] EGO (spd={speed:.1f}kph) {extra_msg}" if pbar: # We assume pbar.n has been updated by the orchestrator (main.py) # pbar.write prints the msg and refreshes the bar on a new line. pbar.write(msg) else: print(msg) # ------------------------------------------------------------------ # Optional hooks & Properties (default no-ops — override as needed) # ------------------------------------------------------------------ @property def weather(self) -> str: """ Override to request a specific weather preset for this scenario. If None, the global config or CLI default will be used. """ return None @property def max_frames(self) -> int: """ Override to request a specific duration for this scenario. If None, the global config or CLI default will be used. """ return None @property def ego_spawn_point(self): """ Override to request a specific spawn point for the ego vehicle. Returns: - int: index in the world's spawn points list. - carla.Transform: absolute spawn position and rotation. If None, the global config or CLI default index will be used. """ return None def on_ego_spawned(self, ego_vehicle) -> None: """ Called immediately after the ego vehicle is spawned. Override to react to ego spawn before setup() is called. """ def get_scenario_metadata(self) -> dict: """ Return a dict that will be merged into every recorded frame's metadata (written to frames.jsonl and MCAP). Override to add scenario-specific fields, e.g.: {"scenario": self.name, "brake_frame": self._brake_frame} """ return {"scenario": self.name} def apply_parameters(self, params: dict) -> None: """ Allow the orchestrator to inject parameters parsed from CLI. This maps keys in `params` to existing attributes on the scenario. Useful for non-code tuning. """ for key, value in params.items(): if hasattr(self, key): # Try to preserve type existing_val = getattr(self, key) try: if isinstance(existing_val, bool): typed_val = str(value).lower() in ("true", "1", "yes") else: typed_val = type(existing_val)(value) setattr(self, key, typed_val) print(f"[Scenario] Injected param: {key} = {typed_val}") except Exception as e: print(f"[WARN] Failed to inject param '{key}={value}': {e}") else: print(f"[WARN] Scenario '{self.name}' has no attribute '{key}'") # ------------------------------------------------------------------ # Shared helpers for subclasses # ------------------------------------------------------------------ def _destroy_actors(self) -> None: """ Convenience method: destroy all actors tracked in self._actors. Call this from cleanup() in subclasses. """ for actor in self._actors: try: if actor.is_alive: actor.destroy() except Exception as e: print(f"[WARN] Could not destroy actor {actor.id}: {e}") self._actors.clear() def _get_waypoint_ahead(self, distance: float, lane_offset: int = 0): """ Return a waypoint `distance` metres ahead of the ego vehicle. lane_offset = 0 → same lane lane_offset = 1 → one lane to the right lane_offset = -1 → one lane to the left Returns None if no waypoint was found. """ if self._world is None or self._ego is None: return None map_ = self._world.get_map() ego_wp = map_.get_waypoint(self._ego.get_location()) # Apply lane offset first if lane_offset != 0: for _ in range(abs(lane_offset)): if lane_offset > 0: adj = ego_wp.get_right_lane() else: adj = ego_wp.get_left_lane() if adj is None: return None ego_wp = adj next_wps = ego_wp.next(distance) if not next_wps: return None return next_wps[0]