diff --git a/src/car_navigation_system/main.py b/src/car_navigation_system/main.py index 716ab6a423..2dc3557ee7 100644 --- a/src/car_navigation_system/main.py +++ b/src/car_navigation_system/main.py @@ -101,10 +101,12 @@ def __init__(self): self.client = None self.world = None self.vehicle = None + self.vehicles = [] # 存储所有车辆(包括主车和NPC) + self.controllers = [] # 存储每个车辆的控制器 + self.current_vehicle_index = 0 # 当前关注的车辆索引 self.cameras = {} # 存储多个相机 self.controller = None self.camera_image = None - self.current_view = 'third_person' # 当前视角模式:'first_person', 'third_person', 'birdseye' def connect(self): """连接到CARLA服务器""" @@ -274,11 +276,81 @@ def get_view_name(self): } return view_names.get(self.current_view, 'Unknown') + def setup_all_vehicles_cameras(self): + """为所有车辆设置相机系统""" + print("正在为所有车辆设置相机...") + + try: + blueprint_library = self.world.get_blueprint_library() + camera_bp = blueprint_library.find('sensor.camera.rgb') + + # 设置相机属性 + camera_bp.set_attribute('image_size_x', '640') + camera_bp.set_attribute('image_size_y', '480') + camera_bp.set_attribute('fov', '90') + + # 为每辆车创建相机 + for i, vehicle in enumerate(self.vehicles): + if vehicle is None: + continue + + print(f"为车辆 {i} 设置相机...") + + # 第三人称相机 - 用于跟随视角 + third_person_transform = carla.Transform( + carla.Location(x=-8.0, z=6.0), # 在车辆后方上方 + carla.Rotation(pitch=-20.0) # 向下看 + ) + third_person_camera = self.world.spawn_actor( + camera_bp, third_person_transform, attach_to=vehicle + ) + third_person_camera.listen(lambda image, idx=i: self.camera_callback(image, idx, 'third_person')) + self.cameras[f'vehicle_{i}_third_person'] = third_person_camera + + print(f"已为 {len(self.vehicles)} 辆车设置相机") + return True + + except Exception as e: + print(f"设置多车辆相机时出错: {e}") + return False + + def camera_callback(self, image, vehicle_index, view_mode=None): + """相机数据回调""" + try: + # 只有当前关注车辆的相机数据才会被使用 + if vehicle_index == self.current_vehicle_index and view_mode == 'third_person': + # 转换图像数据 + array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) + array = np.reshape(array, (image.height, image.width, 4)) + self.camera_image = array[:, :, :3] # RGB通道 + except: + pass + + def update_current_vehicle_view(self): + """更新当前关注的车辆视角""" + vehicle_label = "主车辆(红色特斯拉)" if self.current_vehicle_index == 0 else f"NPC车辆 {self.current_vehicle_index}" + print(f"已切换到: {vehicle_label} 视角") + def setup_controller(self): """设置控制器""" self.controller = SimpleController(self.world, self.vehicle) print("控制器设置完成") + def setup_all_controllers(self): + """为所有车辆设置控制器""" + print("正在为所有车辆设置控制器...") + self.controllers = [] + + for i, vehicle in enumerate(self.vehicles): + controller = SimpleController(self.world, vehicle) + self.controllers.append(controller) + vehicle_name = "主车辆" if i == 0 else f"NPC车辆{i}" + print(f" {vehicle_name} 控制器设置完成") + + # 确保self.controller指向主车辆控制器 + self.controller = self.controllers[0] + print("所有控制器设置完成") + def run(self): """主运行循环""" print("\n" + "=" * 50) @@ -293,11 +365,6 @@ def run(self): if not self.spawn_vehicle(): return - # 设置相机 - if not self.setup_camera(): - # 即使相机失败也继续运行 - print("警告:相机设置失败,继续运行...") - # 设置控制器 self.setup_controller() @@ -313,72 +380,104 @@ def run(self): ) self.world.set_weather(weather) - # 生成一些NPC车辆 - self.spawn_npc_vehicles(2) + # 生成2辆NPC车辆(加上主车辆共3辆特斯拉) + npc_vehicles = self.spawn_npc_vehicles(2) + + # 将所有车辆添加到列表中(主车辆在前,NPC在后) + self.vehicles = [self.vehicle] + npc_vehicles + + # 为所有车辆设置控制器 + self.setup_all_controllers() print("\n系统准备就绪!") + print(f"共 {len(self.vehicles)} 辆车可用") print("控制指令:") print(" q - 退出程序") - print(" r - 重置车辆") + print(" r - 重置当前车辆") print(" s - 紧急停止") print(" x - 切换倒车/前进模式(速度为0时生效)") - print(" v - 切换视角(第一人称/第三人称/鸟瞰图)") + for i in range(len(self.vehicles)): + if i == 0: + print(f" 1 - 切换到主车辆视角(红色特斯拉)") + else: + print(f" {i+1} - 切换到NPC车辆{i}视角") print("\n开始自动驾驶...\n") + # 为所有车辆创建相机 + self.setup_all_vehicles_cameras() + frame_count = 0 running = True + + # 卡住检测变量 + stuck_frame_count = 0 + stuck_threshold = 500 # 连续500帧速度低于2km/h认为卡住 try: while running: - # 获取车辆状态 - velocity = self.vehicle.get_velocity() + # 获取当前关注车辆的状态 + current_vehicle = self.vehicles[self.current_vehicle_index] + velocity = current_vehicle.get_velocity() speed = math.sqrt(velocity.x ** 2 + velocity.y ** 2) * 3.6 - # 获取控制指令(现在返回4个值,原代码返回3个值) - # throttle, brake, steer = self.controller.get_control() # 原代码 - throttle, brake, steer, reverse = self.controller.get_control() # 新代码 + # 检测主车辆是否卡住 + if self.current_vehicle_index == 0: + if speed < 2.0: # 速度低于2km/h + stuck_frame_count += 1 + if stuck_frame_count >= stuck_threshold: + print("检测到主车辆卡住,自动重置...") + self.reset_vehicle() + stuck_frame_count = 0 + else: + stuck_frame_count = 0 - # 应用控制 + # 对所有车辆应用控制器 + # 主车辆(索引0)使用自定义控制器 + controller = self.controllers[0] + throttle, brake, steer, reverse = controller.get_control() control = carla.VehicleControl( throttle=float(throttle), brake=float(brake), steer=float(steer), hand_brake=False, - # reverse=False # 原代码 - reverse=reverse # 新代码,支持倒车 + reverse=reverse ) self.vehicle.apply_control(control) + # NPC车辆使用内置自动驾驶(自动避开障碍物) + for i in range(1, len(self.vehicles)): + self.vehicles[i].set_autopilot(True, 16) # 16 = ARLA_AUTOPILIT_IGNORE_ALL + # 更新显示 if self.camera_image is not None: display_img = self.camera_image.copy() - # 添加状态信息 + # 显示当前车辆编号(绿色) + cv2.putText(display_img, f"Vehicle: {self.current_vehicle_index + 1}", + (20, 30), cv2.FONT_HERSHEY_SIMPLEX, + 0.6, (0, 255, 0), 2) + + # 添加状态信息(显示当前关注车辆的数据) cv2.putText(display_img, f"Speed: {speed:.1f} km/h", - (20, 40), cv2.FONT_HERSHEY_SIMPLEX, - 0.8, (255, 255, 255), 2) + (20, 60), cv2.FONT_HERSHEY_SIMPLEX, + 0.6, (255, 255, 255), 2) cv2.putText(display_img, f"Throttle: {throttle:.2f}", - (20, 80), cv2.FONT_HERSHEY_SIMPLEX, - 0.8, (255, 255, 255), 2) + (20, 90), cv2.FONT_HERSHEY_SIMPLEX, + 0.6, (255, 255, 255), 2) cv2.putText(display_img, f"Steer: {steer:.2f}", (20, 120), cv2.FONT_HERSHEY_SIMPLEX, - 0.8, (255, 255, 255), 2) + 0.6, (255, 255, 255), 2) cv2.putText(display_img, f"Frame: {frame_count}", - (20, 160), cv2.FONT_HERSHEY_SIMPLEX, - 0.8, (255, 255, 255), 2) - - # 显示倒车状态(新功能) - if self.controller.manual_reverse: - cv2.putText(display_img, "REVERSE MODE", - (20, 200), cv2.FONT_HERSHEY_SIMPLEX, - 0.8, (0, 0, 255), 2) # 红色显示 - - # 显示当前视角模式 - cv2.putText(display_img, f"View: {self.get_view_name()}", - (20, 240), cv2.FONT_HERSHEY_SIMPLEX, - 0.8, (0, 255, 0), 2) # 绿色显示 - - cv2.imshow('Autonomous Driving - Simple Version', display_img) + (20, 150), cv2.FONT_HERSHEY_SIMPLEX, + 0.6, (255, 255, 255), 2) + + # 显示倒车状态 + if self.current_vehicle_index == 0 and self.controller.manual_reverse: + cv2.putText(display_img, "REVERSE MODE", + (20, 240), cv2.FONT_HERSHEY_SIMPLEX, + 0.8, (0, 0, 255), 2) + + cv2.imshow('Autonomous Driving - Multi-Vehicle View', display_img) # 处理按键 key = cv2.waitKey(1) & 0xFF @@ -386,26 +485,30 @@ def run(self): print("正在退出...") running = False elif key == ord('r'): - self.reset_vehicle() + self.reset_current_vehicle() elif key == ord('s'): - # 紧急停止 - self.vehicle.apply_control(carla.VehicleControl( - throttle=0.0, brake=1.0, hand_brake=True - )) + # 紧急停止(只对主车辆生效) + if self.current_vehicle_index == 0: + self.vehicle.apply_control(carla.VehicleControl( + throttle=0.0, brake=1.0, hand_brake=True + )) print("紧急停止") elif key == ord('x'): # 切换倒车模式(只在速度接近0时允许切换) - if speed < 1.0: # 速度小于1km/h时允许切换 + if self.current_vehicle_index == 0 and speed < 1.0: self.controller.toggle_reverse() + elif self.current_vehicle_index != 0: + print("只有主车辆可以切换倒车模式") else: print("请先减速到接近停止(速度<1km/h)再切换倒车模式") - elif key == ord('v'): - # 切换视角模式 - view_modes = ['third_person', 'first_person', 'birdseye'] - current_index = view_modes.index(self.current_view) - next_index = (current_index + 1) % len(view_modes) - self.current_view = view_modes[next_index] - self.update_camera_view() + elif ord('1') <= key <= ord('9'): + # 动态切换车辆视角(按数字键1-9) + vehicle_index = key - ord('1') + if vehicle_index < len(self.vehicles): + self.current_vehicle_index = vehicle_index + self.update_current_vehicle_view() + else: + print(f"车辆 {vehicle_index + 1} 不存在") frame_count += 1 @@ -423,40 +526,65 @@ def run(self): self.cleanup() def spawn_npc_vehicles(self, count=2): - """生成NPC车辆(简化)""" + """生成NPC车辆(改进版:确保生成在合适位置)""" print(f"正在生成 {count} 辆NPC车辆...") try: blueprint_library = self.world.get_blueprint_library() spawn_points = self.world.get_map().get_spawn_points() + # 获取主车辆当前位置 + main_location = self.vehicle.get_location() + + # 筛选出距离主车辆较远的出生点(至少80米) + far_spawn_points = [ + sp for sp in spawn_points + if sp.location.distance(main_location) > 80.0 + ] + + if not far_spawn_points: + far_spawn_points = spawn_points + npc_vehicles = [] - for i in range(min(count, len(spawn_points))): - # 跳过主车辆的出生点 - if i == 0: - continue + # 生成指定数量的特斯拉车辆 + for i in range(count): + # 使用不同的出生点 + spawn_index = i % len(far_spawn_points) try: - # 随机选择车辆类型 - vehicle_bps = list(blueprint_library.filter('vehicle.*')) - if vehicle_bps: - vehicle_bp = random.choice(vehicle_bps) - - # 生成NPC - npc = self.world.try_spawn_actor(vehicle_bp, spawn_points[i]) - - if npc: - npc.set_autopilot(True) - npc_vehicles.append(npc) - print(f"生成NPC车辆 {len(npc_vehicles)}") - except: + # 使用特斯拉Model3蓝图 + vehicle_bp = blueprint_library.find('vehicle.tesla.model3') + if not vehicle_bp: + print("未找到特斯拉蓝图,尝试其他车辆...") + vehicle_bp = blueprint_library.filter('vehicle.*')[0] + + # 设置不同颜色区分 + colors = [[0, 255, 0], [0, 0, 255], [255, 255, 0], [255, 0, 255], [0, 255, 255]] + color = colors[i % len(colors)] + vehicle_bp.set_attribute('color', f'{color[0]},{color[1]},{color[2]}') + + # 生成NPC + npc = self.world.try_spawn_actor(vehicle_bp, far_spawn_points[spawn_index]) + + if npc: + # 立即开启自动驾驶,禁用交通灯检测 + npc.set_autopilot(True, 16) + npc_vehicles.append(npc) + print(f"生成NPC车辆 {len(npc_vehicles)} (特斯拉Model3)") + print(f" 位置: {far_spawn_points[spawn_index].location}") + else: + print(f"无法在出生点 {spawn_index} 生成NPC车辆") + except Exception as e: + print(f"生成NPC车辆 {i+1} 时出错: {e}") pass print(f"成功生成 {len(npc_vehicles)} 辆NPC车辆") + return npc_vehicles except Exception as e: print(f"生成NPC车辆时出错: {e}") + return [] def reset_vehicle(self): """重置车辆位置""" @@ -464,10 +592,38 @@ def reset_vehicle(self): spawn_points = self.world.get_map().get_spawn_points() if spawn_points: - new_spawn_point = random.choice(spawn_points) + # 选择一个距离当前车辆较远的出生点,避免重置后立即撞到 + current_location = self.vehicle.get_location() + far_spawn_points = [ + sp for sp in spawn_points + if sp.location.distance(current_location) > 50.0 # 至少50米远 + ] + if far_spawn_points: + new_spawn_point = random.choice(far_spawn_points) + else: + new_spawn_point = random.choice(spawn_points) + self.vehicle.set_transform(new_spawn_point) print(f"车辆已重置到新位置: {new_spawn_point.location}") + # 重置控制器状态 + self.controllers[0].last_waypoint = None + + # 等待重置完成 + time.sleep(0.5) + + def reset_current_vehicle(self): + """重置当前关注的车辆位置""" + current_vehicle = self.vehicles[self.current_vehicle_index] + vehicle_label = "主车辆" if self.current_vehicle_index == 0 else f"NPC车辆 {self.current_vehicle_index}" + print(f"重置{vehicle_label}位置...") + + spawn_points = self.world.get_map().get_spawn_points() + if spawn_points: + new_spawn_point = random.choice(spawn_points) + current_vehicle.set_transform(new_spawn_point) + print(f"{vehicle_label}已重置到新位置: {new_spawn_point.location}") + # 等待重置完成 time.sleep(0.5)