diff --git a/src/carla_weather_robustness/config/settings.py b/src/carla_weather_robustness/config/settings.py index ef810e6a2..016e2738f 100644 --- a/src/carla_weather_robustness/config/settings.py +++ b/src/carla_weather_robustness/config/settings.py @@ -88,7 +88,7 @@ PID_KP = 1.0 PID_KI = 0.01 PID_KD = 0.1 -MAX_SPEED = 25.0 +MAX_SPEED = 16.0 # 日志 LOG_LEVEL = "INFO" diff --git a/src/carla_weather_robustness/main.py b/src/carla_weather_robustness/main.py index a8396e672..4b77de8a5 100644 --- a/src/carla_weather_robustness/main.py +++ b/src/carla_weather_robustness/main.py @@ -3,6 +3,11 @@ 自动遍历7种天气,输出鲁棒性评分报告 """ +import sys +# 添加CARLA Python API路径(根据实际安装位置修改) +CARLA_PATH = r"F:\hutb\PythonAPI" +sys.path.append(CARLA_PATH) + import carla import numpy as np import cv2 @@ -23,6 +28,85 @@ PID_KP, PID_KI, PID_KD, MAX_SPEED, LOG_LEVEL, ) + +class RoadFollowController: + """道路循迹控制器 - 使用CARLA Waypoint API沿道路中心线自动行驶""" + + def __init__(self, vehicle, world): + self.vehicle = vehicle + self.world = world + self.map = world.get_map() + self._target_speed = MAX_SPEED * 0.5 # 默认循迹速度 + self._waypoint_buffer = [] # 预读取前方路径点 + + def update_waypoints(self, lookahead_distance=10.0): + """更新前方路径点列表""" + current_transform = self.vehicle.get_transform() + current_location = current_transform.location + current_waypoint = self.map.get_waypoint(current_location) + + # 获取前方多个路径点 + self._waypoint_buffer = [] + waypoint = current_waypoint + total_distance = 0.0 + + while total_distance < lookahead_distance: + # 沿道路方向前进2米 + next_waypoints = waypoint.next(2.0) + if not next_waypoints: + break + waypoint = next_waypoints[0] + self._waypoint_buffer.append(waypoint) + total_distance += 2.0 + + def get_steering(self): + """计算沿道路行驶所需的转向角""" + if not self._waypoint_buffer: + self.update_waypoints() + + if len(self._waypoint_buffer) < 2: + return 0.0 # 没有足够路径点,直行 + + # 获取车辆当前位置和朝向 + vehicle_transform = self.vehicle.get_transform() + vehicle_location = vehicle_transform.location + vehicle_yaw = np.radians(vehicle_transform.rotation.yaw) + + # 获取最近的前方路径点作为目标 + target_waypoint = self._waypoint_buffer[0] + target_location = target_waypoint.transform.location + + # 计算相对位置 + dx = target_location.x - vehicle_location.x + dy = target_location.y - vehicle_location.y + + # 计算到目标的角度(世界坐标系) + target_angle = np.arctan2(dy, dx) + + # 计算相对于车辆朝向的角度差 + angle_diff = target_angle - vehicle_yaw + + # 归一化到 [-pi, pi] + while angle_diff > np.pi: + angle_diff -= 2 * np.pi + while angle_diff < -np.pi: + angle_diff += 2 * np.pi + + # 将角度转换为转向值 [-1, 1] + # CARLA中转向角范围约为 [-1, 1],对应约90度 + max_steer_angle = np.pi / 2 + steer = np.clip(angle_diff / max_steer_angle, -1.0, 1.0) + + return float(steer) + + def get_target_speed(self): + """获取循迹目标速度""" + return self._target_speed + + def set_target_speed(self, speed): + """设置循迹目标速度""" + self._target_speed = max(0.0, min(speed, MAX_SPEED)) + logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(message)s") logger = logging.getLogger("WeatherRobustness") @@ -204,6 +288,7 @@ def __init__(self): self.fusion_perceiver = AdaptiveFusionPerceiver() self.scorer = RobustnessScorer() self.pid = PIDController() + self.road_controller = None # 道路循迹控制器 self._spawn_point = None def connect(self): @@ -233,7 +318,10 @@ def spawn_ego_vehicle(self): lidar_bp.set_attribute("rotation_frequency", "20") self.lidar = self.world.spawn_actor(lidar_bp, carla.Transform(carla.Location(x=0.0, z=2.8)), attach_to=self.vehicle) self.lidar.listen(self._on_lidar) - logger.info("车辆和传感器已就绪") + + # 初始化道路循迹控制器 + self.road_controller = RoadFollowController(self.vehicle, self.world) + logger.info("车辆和传感器已就绪,道路循迹控制器已初始化") def _on_camera(self, image): array = np.frombuffer(image.raw_data, dtype=np.uint8).reshape(image.height, image.width, 4) @@ -255,11 +343,7 @@ def apply_weather(self, name): weather.sun_altitude_angle = profile["sun_altitude_angle"] self.world.set_weather(weather) self.scorer.start_weather(name) - # 重置车辆位置到出生点 - self.vehicle.set_transform(self._spawn_point) - self.vehicle.set_target_velocity(carla.Vector3D(0, 0, 0)) - self.vehicle.set_target_angular_velocity(carla.Vector3D(0, 0, 0)) - logger.info(f"天气已切换: {WEATHER_LABELS.get(name, name)} [{name}],车辆已重置") + logger.info(f"天气已切换: {WEATHER_LABELS.get(name, name)} [{name}]") def get_severity(self): wp = self.world.get_weather() @@ -297,8 +381,12 @@ def run_step(self, weather_name, step): v = self.vehicle.get_velocity() speed = 3.6 * np.sqrt(v.x**2 + v.y**2 + v.z**2) + # 更新道路循迹路径点 + self.road_controller.update_waypoints(lookahead_distance=15.0) + steer = self.road_controller.get_steering() + throttle, brake = self._compute_control(fusion["nearest_distance"], speed) - self.vehicle.apply_control(carla.VehicleControl(throttle=throttle, brake=brake, steer=0.0)) + self.vehicle.apply_control(carla.VehicleControl(throttle=throttle, brake=brake, steer=steer)) collision = self._check_collision() self.scorer.record_step(weather_name, fusion, collision) @@ -308,11 +396,11 @@ def run_step(self, weather_name, step): logger.info( f"[{weather_name}] step={step+1}/{STEPS_PER_WEATHER}, " f"融合={fusion['fusion_mode']}, 图像质量={fusion['image_quality']['overall']:.2f}, " - f"速度={speed:.1f}km/h" + f"速度={speed:.1f}km/h, 转向={steer:.2f}" ) - self._draw_hud(fusion, speed, weather_name) + self._draw_hud(fusion, speed, weather_name, steer) - def _draw_hud(self, fusion, speed, weather_name): + def _draw_hud(self, fusion, speed, weather_name, steer=0.0): if self._camera_image is None: return display = self._camera_image.copy() @@ -321,20 +409,20 @@ def _draw_hud(self, fusion, speed, weather_name): iq = fusion["image_quality"] overlay = display.copy() - cv2.rectangle(overlay, (0, 0), (w, 140), (0, 0, 0), -1) + cv2.rectangle(overlay, (0, 0), (w, 165), (0, 0, 0), -1) cv2.addWeighted(overlay, 0.6, display, 0.4, 0, display) y = 28 cv2.putText(display, f"Weather: {label} [{weather_name}]", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) y += 28 - cv2.putText(display, f"Speed: {speed:.1f} km/h | Obstacles: {fusion['num_obstacles']} | Nearest: {fusion['nearest_distance']:.1f}m", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 1) + cv2.putText(display, f"Speed: {speed:.1f} km/h | Steer: {steer:+.2f} | Obstacles: {fusion['num_obstacles']} | Nearest: {fusion['nearest_distance']:.1f}m", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) y += 25 cv2.putText(display, f"Image: blur={iq['blur_score']:.2f} bright={iq['brightness_score']:.2f} vis={iq['visibility_score']:.2f} overall={iq['overall']:.2f}", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) y += 25 mode_color = (0, 255, 0) if fusion["fusion_mode"] == "camera_dominant" else (0, 0, 255) if fusion["fusion_mode"] == "lidar_dominant" else (0, 255, 255) - cv2.putText(display, f"Fusion: {fusion['fusion_mode']} cam={fusion['camera_weight']:.2f} lidar={fusion['lidar_weight']:.2f}", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, mode_color, 2) + cv2.putText(display, f"Fusion: {fusion['fusion_mode']} cam={fusion['camera_weight']:.2f} lidar={fusion['lidar_weight']:.2f}", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, mode_color, 2) y += 25 - cv2.putText(display, "CARLA Weather Robustness Test", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (180, 180, 180), 1) + cv2.putText(display, "CARLA Weather Robustness Test - Road Following", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (180, 180, 180), 1) cv2.imshow("Weather Robustness", cv2.cvtColor(display, cv2.COLOR_RGB2BGR)) cv2.waitKey(1) @@ -369,10 +457,6 @@ def run(self): print(f" 图像质量={score['avg_image_quality']:.2f}, LiDAR主导比例={score['lidar_dominant_ratio']:.2f}") print("\n" + "=" * 60) - with open("robustness_report.json", "w", encoding="utf-8") as f: - json.dump(report, f, indent=2, ensure_ascii=False) - print(" 报告已保存至 robustness_report.json\n") - except KeyboardInterrupt: logger.info("用户中断") finally: