Skip to content
2 changes: 1 addition & 1 deletion src/carla_weather_robustness/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
PID_KP = 1.0
PID_KI = 0.01
PID_KD = 0.1
MAX_SPEED = 25.0
MAX_SPEED = 16.0

# 日志
LOG_LEVEL = "INFO"
120 changes: 102 additions & 18 deletions src/carla_weather_robustness/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
自动遍历7种天气,输出鲁棒性评分报告
"""

import sys
# 添加CARLA Python API路径(根据实际安装位置修改)
CARLA_PATH = r"F:\hutb\PythonAPI"
sys.path.append(CARLA_PATH)

import carla
import numpy as np
import cv2
Expand All @@ -23,6 +28,85 @@
PID_KP, PID_KI, PID_KD, MAX_SPEED, LOG_LEVEL,
)


class RoadFollowController:
"""道路循迹控制器 - 使用CARLA Waypoint API沿道路中心线自动行驶"""

def __init__(self, vehicle, world):
self.vehicle = vehicle
self.world = world
self.map = world.get_map()
self._target_speed = MAX_SPEED * 0.5 # 默认循迹速度
self._waypoint_buffer = [] # 预读取前方路径点

def update_waypoints(self, lookahead_distance=10.0):
"""更新前方路径点列表"""
current_transform = self.vehicle.get_transform()
current_location = current_transform.location
current_waypoint = self.map.get_waypoint(current_location)

# 获取前方多个路径点
self._waypoint_buffer = []
waypoint = current_waypoint
total_distance = 0.0

while total_distance < lookahead_distance:
# 沿道路方向前进2米
next_waypoints = waypoint.next(2.0)
if not next_waypoints:
break
waypoint = next_waypoints[0]
self._waypoint_buffer.append(waypoint)
total_distance += 2.0

def get_steering(self):
"""计算沿道路行驶所需的转向角"""
if not self._waypoint_buffer:
self.update_waypoints()

if len(self._waypoint_buffer) < 2:
return 0.0 # 没有足够路径点,直行

# 获取车辆当前位置和朝向
vehicle_transform = self.vehicle.get_transform()
vehicle_location = vehicle_transform.location
vehicle_yaw = np.radians(vehicle_transform.rotation.yaw)

# 获取最近的前方路径点作为目标
target_waypoint = self._waypoint_buffer[0]
target_location = target_waypoint.transform.location

# 计算相对位置
dx = target_location.x - vehicle_location.x
dy = target_location.y - vehicle_location.y

# 计算到目标的角度(世界坐标系)
target_angle = np.arctan2(dy, dx)

# 计算相对于车辆朝向的角度差
angle_diff = target_angle - vehicle_yaw

# 归一化到 [-pi, pi]
while angle_diff > np.pi:
angle_diff -= 2 * np.pi
while angle_diff < -np.pi:
angle_diff += 2 * np.pi

# 将角度转换为转向值 [-1, 1]
# CARLA中转向角范围约为 [-1, 1],对应约90度
max_steer_angle = np.pi / 2
steer = np.clip(angle_diff / max_steer_angle, -1.0, 1.0)

return float(steer)

def get_target_speed(self):
"""获取循迹目标速度"""
return self._target_speed

def set_target_speed(self, speed):
"""设置循迹目标速度"""
self._target_speed = max(0.0, min(speed, MAX_SPEED))

logging.basicConfig(level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(message)s")
logger = logging.getLogger("WeatherRobustness")

Expand Down Expand Up @@ -204,6 +288,7 @@ def __init__(self):
self.fusion_perceiver = AdaptiveFusionPerceiver()
self.scorer = RobustnessScorer()
self.pid = PIDController()
self.road_controller = None # 道路循迹控制器
self._spawn_point = None

def connect(self):
Expand Down Expand Up @@ -233,7 +318,10 @@ def spawn_ego_vehicle(self):
lidar_bp.set_attribute("rotation_frequency", "20")
self.lidar = self.world.spawn_actor(lidar_bp, carla.Transform(carla.Location(x=0.0, z=2.8)), attach_to=self.vehicle)
self.lidar.listen(self._on_lidar)
logger.info("车辆和传感器已就绪")

# 初始化道路循迹控制器
self.road_controller = RoadFollowController(self.vehicle, self.world)
logger.info("车辆和传感器已就绪,道路循迹控制器已初始化")

def _on_camera(self, image):
array = np.frombuffer(image.raw_data, dtype=np.uint8).reshape(image.height, image.width, 4)
Expand All @@ -255,11 +343,7 @@ def apply_weather(self, name):
weather.sun_altitude_angle = profile["sun_altitude_angle"]
self.world.set_weather(weather)
self.scorer.start_weather(name)
# 重置车辆位置到出生点
self.vehicle.set_transform(self._spawn_point)
self.vehicle.set_target_velocity(carla.Vector3D(0, 0, 0))
self.vehicle.set_target_angular_velocity(carla.Vector3D(0, 0, 0))
logger.info(f"天气已切换: {WEATHER_LABELS.get(name, name)} [{name}],车辆已重置")
logger.info(f"天气已切换: {WEATHER_LABELS.get(name, name)} [{name}]")

def get_severity(self):
wp = self.world.get_weather()
Expand Down Expand Up @@ -297,8 +381,12 @@ def run_step(self, weather_name, step):
v = self.vehicle.get_velocity()
speed = 3.6 * np.sqrt(v.x**2 + v.y**2 + v.z**2)

# 更新道路循迹路径点
self.road_controller.update_waypoints(lookahead_distance=15.0)
steer = self.road_controller.get_steering()

throttle, brake = self._compute_control(fusion["nearest_distance"], speed)
self.vehicle.apply_control(carla.VehicleControl(throttle=throttle, brake=brake, steer=0.0))
self.vehicle.apply_control(carla.VehicleControl(throttle=throttle, brake=brake, steer=steer))

collision = self._check_collision()
self.scorer.record_step(weather_name, fusion, collision)
Expand All @@ -308,11 +396,11 @@ def run_step(self, weather_name, step):
logger.info(
f"[{weather_name}] step={step+1}/{STEPS_PER_WEATHER}, "
f"融合={fusion['fusion_mode']}, 图像质量={fusion['image_quality']['overall']:.2f}, "
f"速度={speed:.1f}km/h"
f"速度={speed:.1f}km/h, 转向={steer:.2f}"
)
self._draw_hud(fusion, speed, weather_name)
self._draw_hud(fusion, speed, weather_name, steer)

def _draw_hud(self, fusion, speed, weather_name):
def _draw_hud(self, fusion, speed, weather_name, steer=0.0):
if self._camera_image is None:
return
display = self._camera_image.copy()
Expand All @@ -321,20 +409,20 @@ def _draw_hud(self, fusion, speed, weather_name):
iq = fusion["image_quality"]

overlay = display.copy()
cv2.rectangle(overlay, (0, 0), (w, 140), (0, 0, 0), -1)
cv2.rectangle(overlay, (0, 0), (w, 165), (0, 0, 0), -1)
cv2.addWeighted(overlay, 0.6, display, 0.4, 0, display)

y = 28
cv2.putText(display, f"Weather: {label} [{weather_name}]", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
y += 28
cv2.putText(display, f"Speed: {speed:.1f} km/h | Obstacles: {fusion['num_obstacles']} | Nearest: {fusion['nearest_distance']:.1f}m", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 1)
cv2.putText(display, f"Speed: {speed:.1f} km/h | Steer: {steer:+.2f} | Obstacles: {fusion['num_obstacles']} | Nearest: {fusion['nearest_distance']:.1f}m", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
y += 25
cv2.putText(display, f"Image: blur={iq['blur_score']:.2f} bright={iq['brightness_score']:.2f} vis={iq['visibility_score']:.2f} overall={iq['overall']:.2f}", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
y += 25
mode_color = (0, 255, 0) if fusion["fusion_mode"] == "camera_dominant" else (0, 0, 255) if fusion["fusion_mode"] == "lidar_dominant" else (0, 255, 255)
cv2.putText(display, f"Fusion: {fusion['fusion_mode']} cam={fusion['camera_weight']:.2f} lidar={fusion['lidar_weight']:.2f}", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.55, mode_color, 2)
cv2.putText(display, f"Fusion: {fusion['fusion_mode']} cam={fusion['camera_weight']:.2f} lidar={fusion['lidar_weight']:.2f}", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, mode_color, 2)
y += 25
cv2.putText(display, "CARLA Weather Robustness Test", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (180, 180, 180), 1)
cv2.putText(display, "CARLA Weather Robustness Test - Road Following", (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (180, 180, 180), 1)

cv2.imshow("Weather Robustness", cv2.cvtColor(display, cv2.COLOR_RGB2BGR))
cv2.waitKey(1)
Expand Down Expand Up @@ -369,10 +457,6 @@ def run(self):
print(f" 图像质量={score['avg_image_quality']:.2f}, LiDAR主导比例={score['lidar_dominant_ratio']:.2f}")
print("\n" + "=" * 60)

with open("robustness_report.json", "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(" 报告已保存至 robustness_report.json\n")

except KeyboardInterrupt:
logger.info("用户中断")
finally:
Expand Down