You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
135 lines
4.7 KiB
135 lines
4.7 KiB
"""
|
|
scenarios/cutin.py
|
|
------------------
|
|
Adjacent Lane Cut-In Scenario.
|
|
|
|
An NPC vehicle is spawned in the adjacent lane, slightly ahead of the
|
|
ego. At a configurable frame it is forced to change into the ego's lane,
|
|
simulating an unexpected lane cut-in event.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import math
|
|
from scenarios.base import ScenarioBase
|
|
|
|
|
|
class CutInScenario(ScenarioBase):
|
|
"""
|
|
Scenario: Adjacent lane cut-in.
|
|
|
|
Timeline
|
|
--------
|
|
Frame 1 … CUTIN_FRAME-1 : NPC drives in adjacent lane, ahead of ego.
|
|
Frame CUTIN_FRAME+ : TM forced lane change into ego lane.
|
|
"""
|
|
|
|
NPC_DISTANCE_M = 15 # metres ahead of ego for NPC in adjacent lane
|
|
CUTIN_FRAME = 60 # frame on which NPC is forced to change lane
|
|
LANE_OFFSET = -1 # -1 = left lane, 1 = right lane
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._npc = None
|
|
self._cut_triggered = False
|
|
|
|
# ------------------------------------------------------------------
|
|
# ScenarioBase interface
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "cutin"
|
|
|
|
@property
|
|
def ego_spawn_point(self):
|
|
# Working Town10 intersection point (Southbound)
|
|
import carla
|
|
return carla.Transform(carla.Location(x=107.412, y=45.309, z=0.5), carla.Rotation(yaw=-87.7))
|
|
|
|
def setup(self, world, ego_vehicle, traffic_manager) -> None:
|
|
self._world = world
|
|
self._ego = ego_vehicle
|
|
self._tm = traffic_manager
|
|
|
|
bp_lib = world.get_blueprint_library()
|
|
npc_bps = bp_lib.filter("vehicle.audi.*")
|
|
if not npc_bps:
|
|
npc_bps = bp_lib.filter("vehicle.*")
|
|
npc_bp = npc_bps[0]
|
|
|
|
# Spawn in adjacent lane, ahead of ego
|
|
spawn_wp = self._get_waypoint_ahead(
|
|
self.NPC_DISTANCE_M, lane_offset=self.LANE_OFFSET
|
|
)
|
|
if spawn_wp is None:
|
|
raise RuntimeError(
|
|
f"[CutInScenario] No adjacent lane waypoint found "
|
|
f"(lane_offset={self.LANE_OFFSET}). "
|
|
"Try a map with multiple lanes."
|
|
)
|
|
|
|
# Lift actor slightly to avoid ground clipping
|
|
spawn_transform = spawn_wp.transform
|
|
spawn_transform.location.z += 0.5
|
|
|
|
self._npc = world.try_spawn_actor(npc_bp, spawn_transform)
|
|
if self._npc is None:
|
|
raise RuntimeError(
|
|
"[CutInScenario] Failed to spawn NPC at "
|
|
f"{spawn_transform.location}. Spot may be occupied."
|
|
)
|
|
|
|
self._actors.append(self._npc)
|
|
|
|
self._npc.set_autopilot(True, traffic_manager.get_port())
|
|
traffic_manager.auto_lane_change(self._npc, False) # prevent random changes
|
|
traffic_manager.vehicle_percentage_speed_difference(self._npc, 0)
|
|
|
|
print(
|
|
f"[{self.name}] NPC spawned in adjacent lane "
|
|
f"{self.NPC_DISTANCE_M} m ahead. Cut-in at frame {self.CUTIN_FRAME}."
|
|
)
|
|
|
|
def step(self, frame_count: int, ego_vehicle, pbar=None) -> None:
|
|
if frame_count == self.CUTIN_FRAME and not self._cut_triggered:
|
|
if self._npc and self._npc.is_alive:
|
|
# force_lane_change: True = left, False = right
|
|
force_left = (self.LANE_OFFSET < 0)
|
|
self._tm.force_lane_change(self._npc, force_left)
|
|
self._cut_triggered = True
|
|
print(f"[{self.name}] Lane change triggered at frame {frame_count}.")
|
|
|
|
# Verbose Logging (Full frequency for 1:1 log mirroring)
|
|
e_vel = ego_vehicle.get_velocity()
|
|
e_speed = 3.6 * math.sqrt(e_vel.x**2 + e_vel.y**2 + e_vel.z**2)
|
|
|
|
n_dist = -1.0
|
|
if self._npc and self._npc.is_alive:
|
|
n_dist = ego_vehicle.get_location().distance(self._npc.get_location())
|
|
|
|
msg = f"[FRAME {frame_count:03d}] EGO (spd={e_speed:.1f}kph) | NPC DIST: {n_dist:.1f}m {'(CUT-IN)' if self._cut_triggered else ''}"
|
|
if pbar:
|
|
pbar.write(msg)
|
|
else:
|
|
print(msg)
|
|
|
|
def cleanup(self) -> None:
|
|
self._destroy_actors()
|
|
print(f"[{self.name}] Cleanup complete.")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Optional overrides
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_scenario_metadata(self) -> dict:
|
|
return {
|
|
"scenario": self.name,
|
|
"npc_distance_m": self.NPC_DISTANCE_M,
|
|
"cutin_frame": self.CUTIN_FRAME,
|
|
"lane_offset": self.LANE_OFFSET,
|
|
"cut_triggered": self._cut_triggered,
|
|
}
|