From e54a0396071efa3d66126a4c7139467fe536b1e7 Mon Sep 17 00:00:00 2001 From: Fouad-Smaoui Date: Mon, 19 May 2025 21:38:03 +0200 Subject: [PATCH] feat: Add advanced navigation features including waypoint navigation and obstacle avoidance --- irobot_edu_sdk/create3.py | 241 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 232 insertions(+), 9 deletions(-) diff --git a/irobot_edu_sdk/create3.py b/irobot_edu_sdk/create3.py index 9194050..a97faf0 100644 --- a/irobot_edu_sdk/create3.py +++ b/irobot_edu_sdk/create3.py @@ -4,8 +4,9 @@ import math from enum import IntEnum, IntFlag -from typing import Union, Callable, Awaitable, List +from typing import Union, Callable, Awaitable, List, Tuple, Optional from struct import pack, unpack +from dataclasses import dataclass from .backend.backend import Backend from .event import Event from .completer import Completer @@ -14,6 +15,23 @@ from .getter_types import IPv4Addresses, IrProximity, Pose, DockingSensor from .robot import Robot +@dataclass +class Waypoint: + """Represents a navigation waypoint with position and optional heading""" + x: float + y: float + heading: Optional[float] = None + tolerance: float = 5.0 # cm + heading_tolerance: float = 5.0 # degrees + +@dataclass +class NavigationState: + """Represents the current state of navigation""" + current_waypoint: Optional[Waypoint] = None + waypoints: List[Waypoint] = None + is_navigating: bool = False + obstacle_detected: bool = False + last_obstacle_position: Optional[Tuple[float, float]] = None class Create3(Robot): """Create 3 robot object.""" @@ -27,6 +45,13 @@ class DockResult(IntEnum): UNDOCKED = 0 DOCKED = 1 + class DockSensorEnum(IntEnum): + """Enum for docking sensor states""" + NO_CONTACT = 0 + CONTACT = 1 + IR_SENSOR_0 = 2 + IR_SENSOR_1 = 3 + IR_SENSOR_2 = 4 def __init__(self, backend: Backend): super().__init__(backend=backend) @@ -34,6 +59,13 @@ def __init__(self, backend: Backend): self._events[(19, 0)] = self._when_docking_sensor_handler self._when_docking_sensor: list[Event] = [] + self._docking_sensor_triggers: dict[DockSensorEnum, list[Event]] = { + self.DockSensorEnum.NO_CONTACT: [], + self.DockSensorEnum.CONTACT: [], + self.DockSensorEnum.IR_SENSOR_0: [], + self.DockSensorEnum.IR_SENSOR_1: [], + self.DockSensorEnum.IR_SENSOR_2: [] + } # Getters. self.ipv4_address = IPv4Addresses() @@ -42,6 +74,15 @@ def __init__(self, backend: Backend): # Use Create 3 robot's internal position estimate self.USE_ROBOT_POSE = True + # Navigation state + self._navigation_state = NavigationState() + self._navigation_state.waypoints = [] + + # Navigation parameters + self.OBSTACLE_DETECTION_THRESHOLD = 30 # cm + self.OBSTACLE_AVOIDANCE_DISTANCE = 50 # cm + self.NAVIGATION_SPEED = 0.5 # m/s + def __enter__(self): return self @@ -52,21 +93,40 @@ def __exit__(self, exc_type, exc_value, traceback): async def _when_docking_sensor_handler(self, packet): if len(packet.payload) > 4: + # Update cached values self.docking_sensor.contacts = packet.payload[4] != 0 self.docking_sensor.sensors = (packet.payload[5], - packet.payload[6], - packet.payload[7]) + packet.payload[6], + packet.payload[7]) + # Trigger general docking sensor event for event in self._when_docking_sensor: - # TODO: Generate triggers instead of just firing for any event - # TODO: Define dock sensor Enum await event.run(self) + # Trigger specific sensor events + if self.docking_sensor.contacts: + for event in self._docking_sensor_triggers[self.DockSensorEnum.CONTACT]: + await event.run(self) + else: + for event in self._docking_sensor_triggers[self.DockSensorEnum.NO_CONTACT]: + await event.run(self) + + # Trigger IR sensor events + for i, sensor_value in enumerate(self.docking_sensor.sensors): + if sensor_value > 0: # Assuming 0 means no detection + for event in self._docking_sensor_triggers[self.DockSensorEnum(i + 2)]: + await event.run(self) + # Event Callbacks. def when_docking_sensor(self, callback: Callable[[bool], Awaitable[None]]): + """Register a callback for any docking sensor event""" self._when_docking_sensor.append(Event(True, callback)) + def when_docking_sensor_trigger(self, trigger: DockSensorEnum, callback: Callable[[bool], Awaitable[None]]): + """Register a callback for a specific docking sensor trigger""" + self._docking_sensor_triggers[trigger].append(Event(True, callback)) + # Commands. async def get_ipv4_address(self) -> IPv4Addresses: @@ -200,8 +260,18 @@ async def undock(self): return None async def get_docking_values(self): - """Get docking values.""" - # TODO: Harmonize access with cached value from events + """Get docking values. Returns cached values if available, otherwise fetches from robot.""" + # Return cached values if available + if hasattr(self.docking_sensor, 'contacts') and hasattr(self.docking_sensor, 'sensors'): + return { + 'timestamp': 0, # We don't have timestamp in cached values + 'contacts': self.docking_sensor.contacts, + 'IR sensor 0': self.docking_sensor.sensors[0], + 'IR sensor 1': self.docking_sensor.sensors[1], + 'IR sensor 2': self.docking_sensor.sensors[2] + } + + # If no cached values, fetch from robot dev, cmd, inc = 19, 1, self.inc completer = Completer() self._responses[(dev, cmd, inc)] = completer @@ -209,8 +279,13 @@ async def get_docking_values(self): packet = await completer.wait(self.DEFAULT_TIMEOUT) if packet: unpacked = unpack('>IBBBBHHHH', packet.payload) - return {'timestamp': unpacked[0], 'contacts': unpacked[1], 'IR sensor 0': unpacked[2], - 'IR sensor 1': unpacked[3], 'IR sensor 2': unpacked[4]} + return { + 'timestamp': unpacked[0], + 'contacts': unpacked[1], + 'IR sensor 0': unpacked[2], + 'IR sensor 1': unpacked[3], + 'IR sensor 2': unpacked[4] + } return None async def get_version_string(self) -> str: @@ -243,3 +318,151 @@ async def get_cliff_sensors(self): If there were a protocol getter, this would await that response when the cache is empty. ''' return self.get_cliff_sensors_cached() + + async def navigate_to_waypoint(self, waypoint: Waypoint) -> bool: + """ + Navigate to a specific waypoint with optional heading. + Returns True if navigation was successful, False otherwise. + """ + self._navigation_state.current_waypoint = waypoint + self._navigation_state.is_navigating = True + + try: + # Check for obstacles before starting navigation + if await self._check_for_obstacles(): + return False + + # Navigate to the waypoint + await self.navigate_to(waypoint.x, waypoint.y, waypoint.heading) + + # Verify we reached the waypoint within tolerance + if not await self._verify_waypoint_reached(waypoint): + return False + + return True + finally: + self._navigation_state.is_navigating = False + self._navigation_state.current_waypoint = None + + async def follow_waypoints(self, waypoints: List[Waypoint]) -> bool: + """ + Follow a sequence of waypoints. + Returns True if all waypoints were reached successfully. + """ + self._navigation_state.waypoints = waypoints.copy() + + for waypoint in waypoints: + if not await self.navigate_to_waypoint(waypoint): + return False + + return True + + async def _check_for_obstacles(self) -> bool: + """ + Check for obstacles using IR proximity sensors. + Returns True if obstacles are detected, False otherwise. + """ + ir_proximity = await self.get_ir_proximity() + if ir_proximity is None: + return False + + # Check if any sensor detects an obstacle within threshold + for sensor_value in ir_proximity.sensors: + if sensor_value > self.OBSTACLE_DETECTION_THRESHOLD: + self._navigation_state.obstacle_detected = True + self._navigation_state.last_obstacle_position = (self.pose.x, self.pose.y) + return True + + self._navigation_state.obstacle_detected = False + return False + + async def _verify_waypoint_reached(self, waypoint: Waypoint) -> bool: + """ + Verify that the robot has reached the waypoint within tolerance. + """ + if not self.USE_ROBOT_POSE: + return True + + dx = self.pose.x - waypoint.x + dy = self.pose.y - waypoint.y + distance = math.sqrt(dx * dx + dy * dy) + + if distance > waypoint.tolerance: + return False + + if waypoint.heading is not None: + heading_diff = abs(self.pose.heading - waypoint.heading) + if heading_diff > waypoint.heading_tolerance: + return False + + return True + + async def avoid_obstacle(self) -> bool: + """ + Perform obstacle avoidance maneuver. + Returns True if avoidance was successful, False otherwise. + """ + if not self._navigation_state.obstacle_detected: + return True + + # Get current position and heading + current_x, current_y = self.pose.x, self.pose.y + current_heading = self.pose.heading + + # Calculate avoidance point + avoidance_distance = self.OBSTACLE_AVOIDANCE_DISTANCE + avoidance_angle = 45 # degrees + + # Calculate new position for avoidance + new_heading = current_heading + avoidance_angle + new_x = current_x + avoidance_distance * math.cos(math.radians(new_heading)) + new_y = current_y + avoidance_distance * math.sin(math.radians(new_heading)) + + # Create avoidance waypoint + avoidance_waypoint = Waypoint( + x=new_x, + y=new_y, + heading=new_heading, + tolerance=10.0 # Larger tolerance for avoidance + ) + + # Navigate to avoidance point + return await self.navigate_to_waypoint(avoidance_waypoint) + + async def navigate_with_obstacle_avoidance(self, x: float, y: float, heading: Optional[float] = None) -> bool: + """ + Navigate to a point with obstacle avoidance. + Returns True if navigation was successful, False otherwise. + """ + waypoint = Waypoint(x=x, y=y, heading=heading) + + while True: + # Check for obstacles + if await self._check_for_obstacles(): + # Try to avoid obstacle + if not await self.avoid_obstacle(): + return False + continue + + # Navigate to waypoint + if await self.navigate_to_waypoint(waypoint): + return True + + # If we failed to reach the waypoint, try obstacle avoidance + if not await self.avoid_obstacle(): + return False + + def get_navigation_state(self) -> NavigationState: + """ + Get the current navigation state. + """ + return self._navigation_state + + async def stop_navigation(self): + """ + Stop the current navigation task. + """ + self._navigation_state.is_navigating = False + self._navigation_state.current_waypoint = None + self._navigation_state.waypoints = [] + await self.stop()