CARLA ? C-Shenron based Simualtor for Sensor data generation.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

111 lines
3.7 KiB

"""
scenarios/cutin.py
------------------
Adjacent Lane Cut-In Scenario.
An NPC vehicle is spawned in the adjacent lane, slightly ahead of the
ego. At a configurable frame it is forced to change into the ego's lane,
simulating an unexpected lane cut-in event.
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
from scenarios.base import ScenarioBase
class CutInScenario(ScenarioBase):
"""
Scenario: Adjacent lane cut-in.
Timeline
--------
Frame 1 … CUTIN_FRAME-1 : NPC drives in adjacent lane, ahead of ego.
Frame CUTIN_FRAME+ : TM forced lane change into ego lane.
"""
NPC_DISTANCE_M = getattr(config, "SCENARIO_CUTIN_DISTANCE", 15)
CUTIN_FRAME = getattr(config, "SCENARIO_CUTIN_FRAME", 60)
LANE_OFFSET = -1 # -1 = left lane, 1 = right lane
def __init__(self):
super().__init__()
self._npc = None
self._cut_triggered = False
# ------------------------------------------------------------------
# ScenarioBase interface
# ------------------------------------------------------------------
@property
def name(self) -> str:
return "cutin"
def setup(self, world, ego_vehicle, traffic_manager) -> None:
self._world = world
self._ego = ego_vehicle
self._tm = traffic_manager
bp_lib = world.get_blueprint_library()
npc_bps = bp_lib.filter("vehicle.audi.*")
if not npc_bps:
npc_bps = bp_lib.filter("vehicle.*")
npc_bp = npc_bps[0]
# Spawn in adjacent lane, ahead of ego
spawn_wp = self._get_waypoint_ahead(
self.NPC_DISTANCE_M, lane_offset=self.LANE_OFFSET
)
if spawn_wp is None:
raise RuntimeError(
f"[CutInScenario] No adjacent lane waypoint found "
f"(lane_offset={self.LANE_OFFSET}). "
"Try a map with multiple lanes."
)
self._npc = world.try_spawn_actor(npc_bp, spawn_wp.transform)
if self._npc is None:
raise RuntimeError(
"[CutInScenario] Failed to spawn NPC at "
f"{spawn_wp.transform.location}. Spot may be occupied."
)
self._actors.append(self._npc)
self._npc.set_autopilot(True, traffic_manager.get_port())
traffic_manager.auto_lane_change(self._npc, False) # prevent random changes
traffic_manager.vehicle_percentage_speed_difference(self._npc, 0)
print(
f"[{self.name}] NPC spawned in adjacent lane "
f"{self.NPC_DISTANCE_M} m ahead. Cut-in at frame {self.CUTIN_FRAME}."
)
def step(self, frame: int, ego_vehicle) -> None:
if frame == self.CUTIN_FRAME and not self._cut_triggered:
if self._npc and self._npc.is_alive:
# force_lane_change: True = left, False = right
force_left = (self.LANE_OFFSET < 0)
self._tm.force_lane_change(self._npc, force_left)
self._cut_triggered = True
print(f"[{self.name}] Lane change triggered at frame {frame}.")
def cleanup(self) -> None:
self._destroy_actors()
print(f"[{self.name}] Cleanup complete.")
# ------------------------------------------------------------------
# Optional overrides
# ------------------------------------------------------------------
def get_scenario_metadata(self) -> dict:
return {
"scenario": self.name,
"npc_distance_m": self.NPC_DISTANCE_M,
"cutin_frame": self.CUTIN_FRAME,
"lane_offset": self.LANE_OFFSET,
"cut_triggered": self._cut_triggered,
}