""" scenarios/cutin.py ------------------ Adjacent Lane Cut-In Scenario. An NPC vehicle is spawned in the adjacent lane, slightly ahead of the ego. At a configurable frame it is forced to change into the ego's lane, simulating an unexpected lane cut-in event. """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from scenarios.base import ScenarioBase class CutInScenario(ScenarioBase): """ Scenario: Adjacent lane cut-in. Timeline -------- Frame 1 … CUTIN_FRAME-1 : NPC drives in adjacent lane, ahead of ego. Frame CUTIN_FRAME+ : TM forced lane change into ego lane. """ NPC_DISTANCE_M = 15 # metres ahead of ego for NPC in adjacent lane CUTIN_FRAME = 60 # frame on which NPC is forced to change lane LANE_OFFSET = -1 # -1 = left lane, 1 = right lane def __init__(self): super().__init__() self._npc = None self._cut_triggered = False # ------------------------------------------------------------------ # ScenarioBase interface # ------------------------------------------------------------------ @property def name(self) -> str: return "cutin" @property def ego_spawn_point(self): # Working Town10 intersection point (Southbound) import carla return carla.Transform(carla.Location(x=107.412, y=45.309, z=0.5), carla.Rotation(yaw=-87.7)) def setup(self, world, ego_vehicle, traffic_manager) -> None: self._world = world self._ego = ego_vehicle self._tm = traffic_manager bp_lib = world.get_blueprint_library() npc_bps = bp_lib.filter("vehicle.audi.*") if not npc_bps: npc_bps = bp_lib.filter("vehicle.*") npc_bp = npc_bps[0] # Spawn in adjacent lane, ahead of ego spawn_wp = self._get_waypoint_ahead( self.NPC_DISTANCE_M, lane_offset=self.LANE_OFFSET ) if spawn_wp is None: raise RuntimeError( f"[CutInScenario] No adjacent lane waypoint found " f"(lane_offset={self.LANE_OFFSET}). " "Try a map with multiple lanes." ) # Lift actor slightly to avoid ground clipping spawn_transform = spawn_wp.transform spawn_transform.location.z += 0.5 self._npc = world.try_spawn_actor(npc_bp, spawn_transform) if self._npc is None: raise RuntimeError( "[CutInScenario] Failed to spawn NPC at " f"{spawn_transform.location}. Spot may be occupied." ) self._actors.append(self._npc) self._npc.set_autopilot(True, traffic_manager.get_port()) traffic_manager.auto_lane_change(self._npc, False) # prevent random changes traffic_manager.vehicle_percentage_speed_difference(self._npc, 0) print( f"[{self.name}] NPC spawned in adjacent lane " f"{self.NPC_DISTANCE_M} m ahead. Cut-in at frame {self.CUTIN_FRAME}." ) def step(self, frame_count: int, ego_vehicle, pbar=None) -> None: if frame_count == self.CUTIN_FRAME and not self._cut_triggered: if self._npc and self._npc.is_alive: # force_lane_change: True = left, False = right force_left = (self.LANE_OFFSET < 0) self._tm.force_lane_change(self._npc, force_left) self._cut_triggered = True print(f"[{self.name}] Lane change triggered at frame {frame_count}.") def cleanup(self) -> None: self._destroy_actors() print(f"[{self.name}] Cleanup complete.") # ------------------------------------------------------------------ # Optional overrides # ------------------------------------------------------------------ def get_scenario_metadata(self) -> dict: return { "scenario": self.name, "npc_distance_m": self.NPC_DISTANCE_M, "cutin_frame": self.CUTIN_FRAME, "lane_offset": self.LANE_OFFSET, "cut_triggered": self._cut_triggered, }