""" scenarios/cutin.py ------------------ Adjacent Lane Cut-In Scenario. An NPC vehicle is spawned in the adjacent lane, slightly ahead of the ego. At a configurable frame it is forced to change into the ego's lane, simulating an unexpected lane cut-in event. """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import config from scenarios.base import ScenarioBase class CutInScenario(ScenarioBase): """ Scenario: Adjacent lane cut-in. Timeline -------- Frame 1 … CUTIN_FRAME-1 : NPC drives in adjacent lane, ahead of ego. Frame CUTIN_FRAME+ : TM forced lane change into ego lane. """ NPC_DISTANCE_M = getattr(config, "SCENARIO_CUTIN_DISTANCE", 15) CUTIN_FRAME = getattr(config, "SCENARIO_CUTIN_FRAME", 60) LANE_OFFSET = -1 # -1 = left lane, 1 = right lane def __init__(self): super().__init__() self._npc = None self._cut_triggered = False # ------------------------------------------------------------------ # ScenarioBase interface # ------------------------------------------------------------------ @property def name(self) -> str: return "cutin" def setup(self, world, ego_vehicle, traffic_manager) -> None: self._world = world self._ego = ego_vehicle self._tm = traffic_manager bp_lib = world.get_blueprint_library() npc_bps = bp_lib.filter("vehicle.audi.*") if not npc_bps: npc_bps = bp_lib.filter("vehicle.*") npc_bp = npc_bps[0] # Spawn in adjacent lane, ahead of ego spawn_wp = self._get_waypoint_ahead( self.NPC_DISTANCE_M, lane_offset=self.LANE_OFFSET ) if spawn_wp is None: raise RuntimeError( f"[CutInScenario] No adjacent lane waypoint found " f"(lane_offset={self.LANE_OFFSET}). " "Try a map with multiple lanes." ) self._npc = world.try_spawn_actor(npc_bp, spawn_wp.transform) if self._npc is None: raise RuntimeError( "[CutInScenario] Failed to spawn NPC at " f"{spawn_wp.transform.location}. Spot may be occupied." ) self._actors.append(self._npc) self._npc.set_autopilot(True, traffic_manager.get_port()) traffic_manager.auto_lane_change(self._npc, False) # prevent random changes traffic_manager.vehicle_percentage_speed_difference(self._npc, 0) print( f"[{self.name}] NPC spawned in adjacent lane " f"{self.NPC_DISTANCE_M} m ahead. Cut-in at frame {self.CUTIN_FRAME}." ) def step(self, frame_count: int, ego_vehicle, pbar=None) -> None: if frame == self.CUTIN_FRAME and not self._cut_triggered: if self._npc and self._npc.is_alive: # force_lane_change: True = left, False = right force_left = (self.LANE_OFFSET < 0) self._tm.force_lane_change(self._npc, force_left) self._cut_triggered = True print(f"[{self.name}] Lane change triggered at frame {frame}.") def cleanup(self) -> None: self._destroy_actors() print(f"[{self.name}] Cleanup complete.") # ------------------------------------------------------------------ # Optional overrides # ------------------------------------------------------------------ def get_scenario_metadata(self) -> dict: return { "scenario": self.name, "npc_distance_m": self.NPC_DISTANCE_M, "cutin_frame": self.CUTIN_FRAME, "lane_offset": self.LANE_OFFSET, "cut_triggered": self._cut_triggered, }