diff --git a/src/UAV_navigation_system/caozuo.test b/src/UAV_navigation_system/caozuo.test new file mode 100644 index 0000000000..0ddfb63988 --- /dev/null +++ b/src/UAV_navigation_system/caozuo.test @@ -0,0 +1,9 @@ +F1 - 显示帮助 +F - 切换到第一人称视角 +B - 第三人称视角 +R - 开始/停止录制 +退格键 - 重置无人机位置 +方向键 - 控制无人机移动 +q - 退出程序 +s - 保存当前快照 +p - 暂停/继续 \ No newline at end of file diff --git a/src/UAV_navigation_system/drone_main.py b/src/UAV_navigation_system/drone_main.py index 5b0afa90d7..4b2481d88f 100644 --- a/src/UAV_navigation_system/drone_main.py +++ b/src/UAV_navigation_system/drone_main.py @@ -1,238 +1,769 @@ +# drone_vision_system_en_fixed.py """ -无人机视觉导航系统 - 简化版 -可以立即运行测试 +Drone Vision Navigation System +Optimized for Abandoned Park Environment +Pure English interface to avoid encoding issues +Fixed KeyboardInterrupt handling """ -import os -import sys -import time +import airsim import cv2 import numpy as np - -# 添加项目路径,让Python能找到src模块(如果需要) -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - - -def ensure_directories(): - """确保所有目录都存在(用于存储数据)""" - # 定义需要创建的目录列表 - dirs = ['data/images', 'data/videos', 'data/logs', 'data/config'] - for d in dirs: - # 如果目录不存在则创建 - os.makedirs(d, exist_ok=True) - print(f"✓ 目录已就绪: {d}") - - -class SimpleDroneCamera: - """简单的无人机摄像头类 - 模拟或真实摄像头""" - - def __init__(self, camera_id=0): - """ - 初始化摄像头 - Args: - camera_id: 摄像头ID,0通常表示默认摄像头 +import time +import json +import os +import sys +from datetime import datetime +import random + +class DroneVisionSystem: + """Drone vision navigation system""" + + def __init__(self): + """Initialize system""" + self.clear_screen() + print("=" * 70) + print("DRONE VISION NAVIGATION SYSTEM v2.0") + print("=" * 70) + print("Optimized for Abandoned Park Environment") + print("-" * 70) + + # Try to connect to simulator + try: + print("Connecting to AirSim simulator...") + self.client = airsim.MultirotorClient() + self.client.confirmConnection() + print("✓ Connection successful!") + except Exception as e: + print(f"✗ Connection failed: {str(e)}") + print("\nPlease ensure:") + print("1. AbandonedPark.exe is running") + print("2. Simulator is fully loaded") + print("3. Switched to drone mode if needed") + self.client = None + + # System status + self.running = False + self.flying = False + self.battery = 100.0 + self.current_env = "Unknown" + self.env_confidence = 0.0 + self.emergency = False + + # Create directories + self.create_folders() + + # Initialize classifier + self.classifier = EnvironmentClassifier() + + print("System initialization complete!") + print("=" * 70) + + def clear_screen(self): + """Clear console screen""" + if sys.platform == 'win32': + os.system('cls') + else: + os.system('clear') + + def create_folders(self): + """Create project folders""" + folders = ['data/images', 'data/logs', 'models', 'debug'] + for folder in folders: + os.makedirs(folder, exist_ok=True) + + def log(self, message): + """Log message""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_msg = f"[{timestamp}] {message}" + + # Print to console + print(log_msg) + + # Save to file + log_file = f"data/logs/drone_{datetime.now().strftime('%Y%m%d')}.log" + with open(log_file, 'a') as f: + f.write(log_msg + "\n") + + def show_status(self): + """Display system status""" + status_text = f""" +Current Status: + Flight Status: {'Flying' if self.flying else 'Landed'} + Environment: {self.current_env} ({self.env_confidence:.1%}) + Battery: {self.battery:.1f}% + Emergency Mode: {'Yes' if self.emergency else 'No'} + System Running: {'Yes' if self.running else 'No'} """ - self.camera_id = camera_id - self.cap = None # OpenCV视频捕获对象 - - def open(self): - """打开摄像头""" - print(f"尝试打开摄像头 {self.camera_id}...") - # 尝试打开指定ID的摄像头 - self.cap = cv2.VideoCapture(self.camera_id) - - # 检查摄像头是否成功打开 - if not self.cap.isOpened(): - print("⚠️ 无法打开物理摄像头,使用模拟模式") - return False # 打开失败 + print(status_text) + + def takeoff(self, height=15): + """Take off to specified height""" + if not self.client: + print("Not connected to simulator!") + return False + + try: + self.log(f"Taking off to {height} meters...") + + # Unlock drone + self.client.enableApiControl(True) + self.client.armDisarm(True) + time.sleep(1) + + # Take off + self.client.takeoffAsync().join() + time.sleep(2) + + # Ascend to specified height + self.client.moveToZAsync(-height, 3).join() + + self.flying = True + self.log("Takeoff successful!") + return True + + except Exception as e: + self.log(f"Takeoff failed: {str(e)}") + return False + + def land(self): + """Land the drone""" + try: + self.log("Landing...") + self.client.landAsync().join() + time.sleep(2) + + # Lock drone + self.client.armDisarm(False) + self.client.enableApiControl(False) + + self.flying = False + self.log("Landing successful!") + return True + + except Exception as e: + self.log(f"Landing failed: {str(e)}") + return False + + def capture_image(self): + """Capture image from drone camera""" + try: + responses = self.client.simGetImages([ + airsim.ImageRequest("0", airsim.ImageType.Scene, False, False) + ]) + + if responses: + response = responses[0] + img1d = np.frombuffer(response.image_data_uint8, dtype=np.uint8) + img_rgb = img1d.reshape(response.height, response.width, 3) + + # Convert to BGR format + img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR) + return img_bgr + + except Exception as e: + self.log(f"Image capture failed: {str(e)}") + + return None + + def analyze_environment(self, image): + """Analyze environment using classifier""" + return self.classifier.classify(image) + + def update_battery(self): + """Update battery status""" + if self.flying: + self.battery -= 0.05 # Flying consumption else: - print("✓ 摄像头已连接") - return True # 打开成功 - - def read_frame(self): - """读取一帧图像""" - # 如果有摄像头且已打开,尝试读取帧 - if self.cap and self.cap.isOpened(): - ret, frame = self.cap.read() - if ret: # 读取成功 - return frame - - # 如果没有摄像头或读取失败,返回模拟图像 - return self.simulate_frame() - - def simulate_frame(self): - """生成模拟图像(在没有摄像头时使用)""" - # 创建黑色背景图像 - width, height = 640, 480 - frame = np.zeros((height, width, 3), dtype=np.uint8) - - # 在图像上添加一些图形作为模拟内容 - cv2.putText(frame, "无人机模拟视图", (50, 50), - cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) - cv2.rectangle(frame, (100, 100), (300, 300), (255, 0, 0), 2) - cv2.circle(frame, (400, 200), 50, (0, 0, 255), -1) - - return frame - - def release(self): - """释放摄像头资源""" - if self.cap: - self.cap.release() - print("摄像头已释放") - - -def analyze_scene_simple(frame): - """简单场景分析(基于颜色特征)""" - # 检查输入帧是否有效 - if frame is None: - return "未知", 0.5 - - # 将BGR颜色空间转换为HSV(色调、饱和度、明度)便于颜色分析 - hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) - - # 检测绿色(植被)范围 - green_mask = cv2.inRange(hsv, (35, 40, 40), (85, 255, 255)) - # 计算绿色像素占比 - green_pct = np.sum(green_mask > 0) / green_mask.size - - # 检测蓝色(水域)范围 - blue_mask = cv2.inRange(hsv, (100, 40, 40), (140, 255, 255)) - # 计算蓝色像素占比 - blue_pct = np.sum(blue_mask > 0) / blue_mask.size - - # 根据颜色占比判断场景类型 - if green_pct > 0.3: # 绿色占比超过30%认为是森林/草地 - return "森林/草地", green_pct - elif blue_pct > 0.2: # 蓝色占比超过20%认为是水域 - return "水域", blue_pct - else: # 其他情况认为是城市/建筑区域 - return "城市/建筑", max(green_pct, blue_pct) - - -def get_decision(scene_type, confidence): - """根据场景类型做出飞行决策""" - # 决策映射表:不同场景类型对应的飞行决策 - decisions = { - "森林/草地": "✓ 安全区域,继续飞行", - "水域": "⚠️ 接近水域,提高飞行高度", - "城市/建筑": "⚠️ 城市区域,降低速度并避让", - "未知": "? 无法识别,保持警戒" - } - # 获取对应决策,如果没有匹配则返回默认决策 - return decisions.get(scene_type, "保持当前状态") - - -def main(): - """主函数 - 无人机视觉导航系统主循环""" - # 打印程序标题和说明 - print("=" * 50) - print("无人机视觉导航系统") - print("版本: 1.0.0") - print("按 'q' 键退出,按 's' 键保存图像") - print("=" * 50) - - # 确保所需目录存在 - ensure_directories() - - # 创建无人机摄像头对象 - drone_cam = SimpleDroneCamera(camera_id=0) - drone_cam.open() # 打开摄像头 - - # 初始化飞行状态变量 - battery = 100 # 电池电量(百分比) - flight_time = 0 # 飞行时间(秒) - frame_count = 0 # 已处理帧数 - start_time = time.time() # 飞行开始时间 - - print("\n开始飞行...") - - # 主循环:处理每一帧图像 - while True: - # 1. 读取当前帧 - frame = drone_cam.read_frame() - frame_count += 1 # 帧数计数器递增 - - # 2. 分析场景类型 - scene_type, confidence = analyze_scene_simple(frame) - - # 3. 根据场景类型做出决策 - decision = get_decision(scene_type, confidence) - - # 4. 更新飞行状态 - flight_time = time.time() - start_time # 计算已飞行时间 - battery = max(0, battery - 0.05) # 模拟电池消耗(每帧减少0.05%) - - # 5. 在图像上显示状态信息 - info_lines = [ - f"场景: {scene_type} ({confidence:.1%})", - f"决策: {decision}", - f"飞行时间: {flight_time:.1f}s", - f"电池: {battery:.1f}%", - f"帧数: {frame_count}" + self.battery -= 0.01 # Standby consumption + + if self.battery < 0: + self.battery = 0 + + # Check low battery + if self.battery < 20 and not self.emergency: + self.log(f"Warning: Low battery ({self.battery:.1f}%)") + if self.battery < 10: + self.log("Critical: Very low battery!") + self.emergency = True + + def save_image_data(self, image, environment, confidence): + """Save image data""" + if image is None: + return + + # Generate filename + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + filename = f"data/images/img_{timestamp}_{environment}.jpg" + + # Save image + cv2.imwrite(filename, image) + + # Record to JSON + data = { + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), + "filename": filename, + "environment": environment, + "confidence": float(confidence), + "battery": float(self.battery), + "flying": self.flying + } + + json_file = "data/images/images_log.json" + + try: + # Read existing data + if os.path.exists(json_file): + with open(json_file, 'r') as f: + all_data = json.load(f) + else: + all_data = [] + + # Add new data + all_data.append(data) + + # Keep only recent 1000 records + if len(all_data) > 1000: + all_data = all_data[-1000:] + + # Save + with open(json_file, 'w') as f: + json.dump(all_data, f, indent=2) + + except Exception as e: + self.log(f"Data save failed: {str(e)}") + + def navigate(self, environment, confidence): + """Navigate based on environment""" + if not self.flying: + return + + # Choose action based on environment + actions = { + "Ruins": self.navigate_ruins, + "Building": self.navigate_building, + "Forest": self.navigate_forest, + "Road": self.navigate_road, + "Sky": self.navigate_sky, + "Water": self.navigate_water, + "Fire": self.navigate_fire, + "Animal": self.navigate_animal, + "Vehicle": self.navigate_vehicle + } + + # Execute corresponding navigation function + action_func = actions.get(environment, self.navigate_default) + action_func(confidence) + + def navigate_ruins(self, confidence): + """Navigate in ruins""" + self.log("Exploring ruins...") + # Move slowly to avoid collisions + self.client.moveByVelocityAsync(2, 0, 0, 2).join() + time.sleep(1) + + def navigate_building(self, confidence): + """Navigate around buildings""" + self.log("Avoiding building...") + # Rise to avoid collision + self.client.moveByVelocityAsync(0, 0, -1, 1).join() + # Move right + self.client.moveByVelocityAsync(0, 2, 0, 2).join() + + def navigate_forest(self, confidence): + """Navigate through forest""" + self.log("Moving through forest...") + # Rise slightly and move slowly + self.client.moveByVelocityAsync(1.5, 0, -0.5, 2).join() + + def navigate_road(self, confidence): + """Navigate along road""" + self.log("Following road...") + # Move at normal speed + self.client.moveByVelocityAsync(3, 0, 0, 3).join() + + def navigate_sky(self, confidence): + """Navigate in open sky""" + self.log("Open sky, normal flight...") + # Move faster + self.client.moveByVelocityAsync(4, 0, 0, 3).join() + + def navigate_water(self, confidence): + """Avoid water""" + self.log("Avoiding water area...") + # Rise immediately + self.client.moveByVelocityAsync(0, 0, -2, 1).join() + # Move backward + self.client.moveByVelocityAsync(-2, 0, 0, 2).join() + + def navigate_fire(self, confidence): + """Navigate fire emergency""" + self.log("Fire detected! Emergency response...") + self.emergency = True + # Emergency ascent + self.client.moveToZAsync(-30, 5).join() + # Send alert + self.log("Fire alert!") + + def navigate_animal(self, confidence): + """Navigate around animals""" + self.log("Animal detected, keeping distance...") + # Hover and observe + self.client.hoverAsync().join() + time.sleep(3) + # Move back slowly + self.client.moveByVelocityAsync(-1, 0, 0, 2).join() + + def navigate_vehicle(self, confidence): + """Navigate around vehicles""" + self.log("Vehicle detected, following...") + # Follow at distance + self.client.moveByVelocityAsync(2, 0, 0, 2).join() + + def navigate_default(self, confidence): + """Default navigation""" + self.log("Unknown environment, conservative exploration...") + # Move slowly + self.client.moveByVelocityAsync(1, 0, 0, 2).join() + + def display_image(self, image, environment, confidence): + """Display image with information""" + if image is None: + return + + # Create display image + display_img = image.copy() + height, width = display_img.shape[:2] + + # Add environment info + env_text = f"Env: {environment}" + conf_text = f"Conf: {confidence:.1%}" + bat_text = f"Battery: {self.battery:.1f}%" + + # Set text color + color = (0, 255, 0) # Green + if confidence < 0.6: + color = (0, 255, 255) # Yellow + if confidence < 0.4: + color = (0, 0, 255) # Red + + # Add text to image + cv2.putText(display_img, env_text, (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 3) + cv2.putText(display_img, conf_text, (20, 80), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) + cv2.putText(display_img, bat_text, (20, 110), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) + + # Add timestamp + timestamp = datetime.now().strftime("%H:%M:%S") + cv2.putText(display_img, timestamp, (width - 150, 40), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) + + # Show image + cv2.imshow('Drone Vision System', display_img) + + def run_mission(self, mission_time=300): + """Run main mission loop""" + if not self.client: + print("Cannot run: Not connected to simulator") + return + + self.clear_screen() + print("=" * 70) + print("STARTING DRONE MISSION") + print(f"Mission time: {mission_time} seconds") + print("=" * 70) + print("CONTROLS:") + print(" Q - Quit program") + print(" L - Manual landing") + print(" R - Return home") + print(" S - Save current image") + print(" P - Pause/Resume") + print("-" * 70) + + # Take off + if not self.takeoff(): + return + + self.running = True + start_time = time.time() + frame_count = 0 + last_env = "Unknown" + + try: + while self.running and (time.time() - start_time) < mission_time: + frame_count += 1 + + # Update battery + self.update_battery() + + # Check emergency + if self.emergency and self.flying: + self.log("Emergency! Returning home...") + self.client.moveToPositionAsync(0, 0, -20, 5).join() + self.land() + break + + # Capture image + image = self.capture_image() + + if image is not None: + # Analyze environment + try: + environment, confidence = self.analyze_environment(image) + last_env = environment + self.current_env = environment + self.env_confidence = confidence + except Exception as e: + self.log(f"Environment analysis failed: {str(e)}") + environment, confidence = "Unknown", 0.0 + self.current_env = "Unknown" + self.env_confidence = 0.0 + + # Save data + if confidence > 0.7 or frame_count % 5 == 0: + self.save_image_data(image, environment, confidence) + + # Display image + self.display_image(image, environment, confidence) + + # Navigation decision + try: + self.navigate(environment, confidence) + except Exception as e: + self.log(f"Navigation error: {str(e)}") + + # Show status (每20帧更新一次,减少屏幕闪烁) + if frame_count % 20 == 0: + elapsed = int(time.time() - start_time) + remaining = mission_time - elapsed + self.clear_screen() + print(f"MISSION IN PROGRESS...") + print(f"Elapsed: {elapsed} sec | Remaining: {remaining} sec") + print(f"Environment: {last_env} ({self.env_confidence:.1%})") + print(f"Battery: {self.battery:.1f}%") + print(f"Frames processed: {frame_count}") + print("-" * 40) + print("CONTROLS: Q-Quit, L-Land, R-Return, S-Save, P-Pause") + + # Check keyboard input + key = cv2.waitKey(30) & 0xFF + if key == ord('q'): + self.log("User quit") + break + elif key == ord('l'): + self.land() + break + elif key == ord('r'): + self.log("Manual return home") + self.client.moveToPositionAsync(0, 0, -20, 5).join() + self.land() + break + elif key == ord('s'): + if image is not None: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"debug/snapshot_{timestamp}.jpg" + cv2.imwrite(filename, image) + self.log(f"Saved snapshot: {filename}") + elif key == ord('p'): + self.running = not self.running + status = "Paused" if not self.running else "Resumed" + self.log(f"Mission {status}") + + # Small delay to control processing speed + time.sleep(0.1) # 从0.5秒减少到0.1秒,提高响应性 + + # Mission complete + self.log("Mission complete!") + + except KeyboardInterrupt: + self.log("Mission interrupted by user (Ctrl+C)") + print("\nMission interrupted. Landing drone...") + except Exception as e: + self.log(f"Runtime error: {str(e)}") + finally: + # Cleanup + if self.flying: + try: + self.land() + except: + pass + cv2.destroyAllWindows() + try: + self.generate_report() + except: + pass + + def generate_report(self): + """Generate mission report""" + report = { + "mission": "Drone Vision Navigation", + "date": datetime.now().strftime("%Y-%m-%d"), + "time": datetime.now().strftime("%H:%M:%S"), + "summary": { + "battery_final": round(self.battery, 1), + "environment_detected": self.current_env, + "emergency_activated": self.emergency + }, + "files": { + "images": "data/images/", + "logs": "data/logs/", + "debug": "debug/" + } + } + + report_file = f"data/logs/report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + try: + with open(report_file, 'w') as f: + json.dump(report, f, indent=2) + self.log(f"Report saved: {report_file}") + except Exception as e: + self.log(f"Report save failed: {str(e)}") + + +class EnvironmentClassifier: + """Environment classifier for abandoned park""" + + def __init__(self): + self.environments = [ + "Ruins", "Building", "Forest", "Road", + "Sky", "Water", "Fire", "Animal", "Vehicle" ] + + # Feature weights for abandoned park + self.weights = { + "Ruins": 0.35, # Highest probability for ruins + "Building": 0.20, + "Forest": 0.15, + "Road": 0.10, + "Sky": 0.08, + "Water": 0.05, # Lower water weight + "Fire": 0.02, + "Animal": 0.03, + "Vehicle": 0.02 + } + + def classify(self, image): + """Classify image""" + if image is None: + return "Unknown", 0.0 + + # Extract features + features = self.extract_features(image) + + # Rule-based classification + env, conf = self.rule_based(features) + + # If rules unclear, use weighted random + if env == "Unknown": + env, conf = self.weighted_random(features) + + return env, conf + + def extract_features(self, image): + """Extract image features""" + features = {} + + height, width = image.shape[:2] + + # Color features + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) + + # Blue regions + blue_low = np.array([100, 50, 50]) + blue_high = np.array([130, 255, 255]) + blue_mask = cv2.inRange(hsv, blue_low, blue_high) + features['blue_ratio'] = np.sum(blue_mask > 0) / (height * width) + + # Green regions + green_low = np.array([40, 50, 50]) + green_high = np.array([80, 255, 255]) # 修复:添加 np. + green_mask = cv2.inRange(hsv, green_low, green_high) + features['green_ratio'] = np.sum(green_mask > 0) / (height * width) + + # Red regions + red_low1 = np.array([0, 50, 50]) + red_high1 = np.array([10, 255, 255]) + red_low2 = np.array([170, 50, 50]) + red_high2 = np.array([180, 255, 255]) + red_mask1 = cv2.inRange(hsv, red_low1, red_high1) + red_mask2 = cv2.inRange(hsv, red_low2, red_high2) + red_mask = cv2.bitwise_or(red_mask1, red_mask2) + features['red_ratio'] = np.sum(red_mask > 0) / (height * width) + + # Texture features + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + edges = cv2.Canny(gray, 50, 150) + features['edge_density'] = np.sum(edges > 0) / (height * width) + features['gray_variance'] = np.var(gray) + features['brightness'] = np.mean(gray) + + # Sky detection + if height > 10: + top_blue = np.sum(blue_mask[:height//3, :] > 0) / (width * height//3) + bottom_blue = np.sum(blue_mask[2*height//3:, :] > 0) / (width * height//3) + features['sky_ratio'] = top_blue + features['is_sky'] = top_blue > 0.3 and top_blue > bottom_blue * 2 + + return features + + def rule_based(self, features): + """Rule-based classification""" + blue = features.get('blue_ratio', 0) + green = features.get('green_ratio', 0) + red = features.get('red_ratio', 0) + edges = features.get('edge_density', 0) + variance = features.get('gray_variance', 0) + bright = features.get('brightness', 0) + is_sky = features.get('is_sky', False) + + # Rules for abandoned park + if edges > 0.06 and variance > 1000: + return "Ruins", 0.85 + + if green > 0.25: + return "Forest", 0.80 + + if blue > 0.3: + if is_sky: + return "Sky", 0.75 + else: + return "Water", 0.65 + + if edges < 0.03 and 100 < bright < 180: + return "Road", 0.70 + + if red > 0.15: + if bright > 180: + return "Fire", 0.60 + else: + return "Ruins", 0.65 + + if edges > 0.04 and bright < 120: + return "Building", 0.75 + + return "Unknown", 0.0 + + def weighted_random(self, features): + """Weighted random selection""" + # Adjust weights based on features + adj_weights = self.weights.copy() + + blue = features.get('blue_ratio', 0) + green = features.get('green_ratio', 0) + edges = features.get('edge_density', 0) + + # Adjust based on features + if blue > 0.2: + adj_weights["Sky"] *= 1.5 + if blue > 0.3: + adj_weights["Water"] *= 0.5 # Reduce water weight + + if green > 0.15: + adj_weights["Forest"] *= 2.0 + + if edges > 0.05: + adj_weights["Ruins"] *= 1.8 + + # Normalize + total = sum(adj_weights.values()) + if total > 0: + probs = [adj_weights[env]/total for env in self.environments] + else: + probs = [1/len(self.environments)] * len(self.environments) + + # Random selection + env_idx = random.choices(range(len(self.environments)), weights=probs)[0] + env = self.environments[env_idx] + + # Calculate confidence + base_conf = 0.6 + + # Increase confidence based on features + if env == "Ruins" and edges > 0.04: + base_conf += 0.15 + elif env == "Forest" and green > 0.15: + base_conf += 0.1 + elif env == "Sky" and blue > 0.2: + base_conf += 0.1 + + conf = min(base_conf, 0.9) + + return env, conf + + +def safe_input(prompt): + """Safe input function with KeyboardInterrupt handling""" + try: + return input(prompt).strip() + except KeyboardInterrupt: + print("\n\nOperation cancelled by user (Ctrl+C)") + print("Exiting program...") + sys.exit(0) - # 逐行绘制状态信息到图像上 - y_offset = 30 # 文字起始y坐标 - for line in info_lines: - cv2.putText(frame, line, (10, y_offset), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 1) - y_offset += 25 # 每行文字间隔25像素 - - # 6. 显示处理后的图像 - cv2.imshow('无人机视觉导航', frame) - - # 7. 检查键盘输入 - key = cv2.waitKey(1) & 0xFF # 等待1毫秒并获取按键 - if key == ord('q'): # 按 'q' 键退出程序 - print("\n用户请求退出...") - break - elif key == ord('s'): # 按 's' 键保存当前图像 - filename = f"data/images/capture_{frame_count}.jpg" - cv2.imwrite(filename, frame) - print(f"✓ 保存图像: {filename}") - - # 8. 检查电池电量(安全条件) - if battery <= 0: - print("\n⚠️ 电池耗尽!紧急降落...") - break - - # 9. 检查飞行时间限制(可选安全限制) - if flight_time > 60: # 运行60秒后自动停止(演示用途) - print("\n⏰ 飞行时间到,安全降落...") - break - - # 10. 清理资源 - drone_cam.release() # 释放摄像头 - cv2.destroyAllWindows() # 关闭所有OpenCV窗口 - - # 11. 打印飞行统计信息 - print("\n" + "=" * 50) - print("飞行统计:") - print(f"- 总飞行时间: {flight_time:.1f} 秒") - print(f"- 处理帧数: {frame_count}") - # 计算平均帧率(FPS) - if flight_time > 0: - print(f"- 平均帧率: {frame_count / flight_time:.1f} FPS") - else: - print("- 平均帧率: N/A") - print(f"- 最终电池: {battery:.1f}%") - print("=" * 50) - print("\n飞行结束!") - return 0 # 返回成功状态码 +def main(): + """Main function with improved exception handling""" + try: + print("DRONE VISION NAVIGATION SYSTEM") + print("-" * 50) + + # Create system + drone = DroneVisionSystem() + + if drone.client is None: + print("Cannot connect to simulator, exiting") + return + + # User configuration + print("\nMISSION CONFIGURATION:") + print("1. Test mode (60 seconds)") + print("2. Short mission (5 minutes)") + print("3. Long mission (10 minutes)") + print("4. Custom time") + + choice = safe_input("\nSelect mode (1-4): ") + + if choice == '1': + mission_time = 60 + elif choice == '2': + mission_time = 300 + elif choice == '3': + mission_time = 600 + elif choice == '4': + try: + time_input = safe_input("Enter mission time (seconds): ") + mission_time = int(time_input) + except ValueError: + mission_time = 300 + print(f"Invalid input, using default: {mission_time} seconds") + else: + mission_time = 300 + print(f"Using default: {mission_time} seconds") + + print(f"\nMISSION SETTINGS:") + print(f" Time: {mission_time} seconds ({mission_time/60:.1f} minutes)") + print(f" Image saving: Auto") + print(f" Logging: Enabled") + + confirm = safe_input("\nStart mission? (y/n): ").lower() + + if confirm == 'y': + drone.run_mission(mission_time) + else: + print("Mission cancelled") + + print("\nProgram ended gracefully") + + except KeyboardInterrupt: + print("\n\nProgram interrupted by user (Ctrl+C)") + print("Exiting...") + except Exception as e: + print(f"\nUnexpected error: {str(e)}") + print("Please check your setup and try again.") -# Python标准入口点 if __name__ == "__main__": - try: - exit_code = main() # 运行主函数 - except KeyboardInterrupt: # 处理Ctrl+C中断 - print("\n程序被用户中断") - exit_code = 0 - except Exception as e: # 处理其他异常 - print(f"\n程序出错: {e}") - exit_code = 1 - - # 等待用户确认退出(便于查看输出结果) - input("\n按 Enter 键退出...") - sys.exit(exit_code) # 退出程序 \ No newline at end of file + main() \ No newline at end of file diff --git a/src/UAV_navigation_system/start_fixed.bat b/src/UAV_navigation_system/start_fixed.bat new file mode 100644 index 0000000000..58693749b2 --- /dev/null +++ b/src/UAV_navigation_system/start_fixed.bat @@ -0,0 +1,56 @@ +@echo off +cls + +echo ============================================ +echo DRONE VISION NAVIGATION SYSTEM +echo ============================================ +echo FIXED VERSION - KeyboardInterrupt Handled +echo ============================================ +echo. + +echo Step 1: Checking simulator... +tasklist | findstr "AbandonedPark.exe" > nul +if errorlevel 1 ( + echo Simulator not running! + echo Please run AbandonedPark.exe manually + echo Waiting 20 seconds... + timeout /t 20 /nobreak > nul +) else ( + echo ✓ Simulator is running +) + +echo. +echo Step 2: Activating Python environment... +if exist "venv\Scripts\activate.bat" ( + call venv\Scripts\activate.bat + echo ✓ Virtual environment activated +) else ( + echo Creating virtual environment... + python -m venv venv + call venv\Scripts\activate.bat + echo Installing dependencies... + pip install airsim opencv-python numpy +) + +echo. +echo Step 3: Checking Python packages... +python -c "import airsim; print('✓ AirSim installed')" 2>nul +if errorlevel 1 ( + echo Installing AirSim... + pip install airsim +) + +python -c "import cv2; print('✓ OpenCV installed')" 2>nul +if errorlevel 1 ( + echo Installing OpenCV... + pip install opencv-python +) + +echo. +echo Step 4: Starting drone system... +echo Note: Press Ctrl+C at any time to safely exit +echo. +python drone_vision_system_en_fixed.py + +echo. +pause \ No newline at end of file diff --git a/src/UAV_navigation_system/test_airsim.py b/src/UAV_navigation_system/test_airsim.py new file mode 100644 index 0000000000..8a23a4dd4b --- /dev/null +++ b/src/UAV_navigation_system/test_airsim.py @@ -0,0 +1,10 @@ +import airsim +import sys + +print("Python 版本:", sys.version) +print("AirSim 版本:", airsim.__version__) +print("AirSim 导入成功!") + +# 检查客户端是否可以创建 +client = airsim.MultirotorClient() +print("客户端创建成功") \ No newline at end of file