import sys import os # Add project root to Python path sys.path.append(os.path.dirname(os.path.dirname(__file__))) import carla import queue import config class SensorManager: def __init__(self, world, blueprint_library, vehicle): self.world = world self.bp_lib = blueprint_library self.vehicle = vehicle self.camera = None self.radar = None self.lidar = None # Queues for synchronization self.camera_queue = queue.Queue() self.camera_tpp_queue = queue.Queue() self.radar_queue = queue.Queue() self.lidar_queue = queue.Queue() def spawn_sensors(self): self._spawn_camera() self._spawn_camera_tpp() self._spawn_radar() self._spawn_lidar() # ---------------- CAMERA ---------------- def _spawn_camera(self): cam_bp = self.bp_lib.find("sensor.camera.rgb") cam_bp.set_attribute("image_size_x", str(config.CAM_WIDTH)) cam_bp.set_attribute("image_size_y", str(config.CAM_HEIGHT)) cam_bp.set_attribute("fov", str(config.CAM_FOV)) transform = carla.Transform( carla.Location(x=1.5, z=2.4) ) self.camera = self.world.spawn_actor( cam_bp, transform, attach_to=self.vehicle ) self.camera.listen(lambda data: self.camera_queue.put(data)) def _spawn_camera_tpp(self): cam_bp = self.bp_lib.find("sensor.camera.rgb") cam_bp.set_attribute("image_size_x", str(config.CAM_WIDTH)) cam_bp.set_attribute("image_size_y", str(config.CAM_HEIGHT)) cam_bp.set_attribute("fov", str(config.CAM_FOV)) transform = carla.Transform( carla.Location(x=-5.5, z=2.8), carla.Rotation(pitch=-15) ) self.camera_tpp = self.world.spawn_actor( cam_bp, transform, attach_to=self.vehicle ) self.camera_tpp.listen(lambda data: self.camera_tpp_queue.put(data)) # ---------------- RADAR ---------------- def _spawn_radar(self): radar_bp = self.bp_lib.find("sensor.other.radar") radar_bp.set_attribute("horizontal_fov", str(config.RADAR_HORIZONTAL_FOV)) radar_bp.set_attribute("vertical_fov", str(config.RADAR_VERTICAL_FOV)) radar_bp.set_attribute("range", str(config.RADAR_RANGE)) transform = carla.Transform( carla.Location(x=2.0, z=1.0) ) self.radar = self.world.spawn_actor( radar_bp, transform, attach_to=self.vehicle ) self.radar.listen(lambda data: self.radar_queue.put(data)) # ---------------- LIDAR ---------------- def _spawn_lidar(self): lidar_bp = self.bp_lib.find("sensor.lidar.ray_cast_semantic") lidar_bp.set_attribute("range", str(config.LIDAR_RANGE)) lidar_bp.set_attribute("channels", str(config.LIDAR_CHANNELS)) lidar_bp.set_attribute("points_per_second", str(config.LIDAR_POINTS_PER_SECOND)) lidar_bp.set_attribute("rotation_frequency", str(config.LIDAR_ROTATION_FREQUENCY)) transform = carla.Transform( carla.Location(x=0.0, z=2.5) ) self.lidar = self.world.spawn_actor( lidar_bp, transform, attach_to=self.vehicle ) self.lidar.listen(lambda data: self.lidar_queue.put(data)) # ---------------- FETCH SYNC DATA ---------------- def get_data(self): cam = self.camera_queue.get() cam_tpp = self.camera_tpp_queue.get() radar = self.radar_queue.get() lidar = self.lidar_queue.get() # Strong sync check assert cam.frame == cam_tpp.frame == radar.frame == lidar.frame, "Frame mismatch!" return cam, cam_tpp, radar, lidar # ---------------- CLEANUP ---------------- def destroy(self): if self.camera: self.camera.stop() self.camera.destroy() if self.camera_tpp: self.camera_tpp.stop() self.camera_tpp.destroy() if self.radar: self.radar.stop() self.radar.destroy() if self.lidar: self.lidar.stop() self.lidar.destroy()