Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 232 additions & 9 deletions irobot_edu_sdk/create3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import math
from enum import IntEnum, IntFlag
from typing import Union, Callable, Awaitable, List
from typing import Union, Callable, Awaitable, List, Tuple, Optional
from struct import pack, unpack
from dataclasses import dataclass
from .backend.backend import Backend
from .event import Event
from .completer import Completer
Expand All @@ -14,6 +15,23 @@
from .getter_types import IPv4Addresses, IrProximity, Pose, DockingSensor
from .robot import Robot

@dataclass
class Waypoint:
"""Represents a navigation waypoint with position and optional heading"""
x: float
y: float
heading: Optional[float] = None
tolerance: float = 5.0 # cm
heading_tolerance: float = 5.0 # degrees

@dataclass
class NavigationState:
"""Represents the current state of navigation"""
current_waypoint: Optional[Waypoint] = None
waypoints: List[Waypoint] = None
is_navigating: bool = False
obstacle_detected: bool = False
last_obstacle_position: Optional[Tuple[float, float]] = None

class Create3(Robot):
"""Create 3 robot object."""
Expand All @@ -27,13 +45,27 @@ class DockResult(IntEnum):
UNDOCKED = 0
DOCKED = 1

class DockSensorEnum(IntEnum):
"""Enum for docking sensor states"""
NO_CONTACT = 0
CONTACT = 1
IR_SENSOR_0 = 2
IR_SENSOR_1 = 3
IR_SENSOR_2 = 4

def __init__(self, backend: Backend):
super().__init__(backend=backend)

self._events[(19, 0)] = self._when_docking_sensor_handler

self._when_docking_sensor: list[Event] = []
self._docking_sensor_triggers: dict[DockSensorEnum, list[Event]] = {
self.DockSensorEnum.NO_CONTACT: [],
self.DockSensorEnum.CONTACT: [],
self.DockSensorEnum.IR_SENSOR_0: [],
self.DockSensorEnum.IR_SENSOR_1: [],
self.DockSensorEnum.IR_SENSOR_2: []
}

# Getters.
self.ipv4_address = IPv4Addresses()
Expand All @@ -42,6 +74,15 @@ def __init__(self, backend: Backend):
# Use Create 3 robot's internal position estimate
self.USE_ROBOT_POSE = True

# Navigation state
self._navigation_state = NavigationState()
self._navigation_state.waypoints = []

# Navigation parameters
self.OBSTACLE_DETECTION_THRESHOLD = 30 # cm
self.OBSTACLE_AVOIDANCE_DISTANCE = 50 # cm
self.NAVIGATION_SPEED = 0.5 # m/s

def __enter__(self):
return self

Expand All @@ -52,21 +93,40 @@ def __exit__(self, exc_type, exc_value, traceback):

async def _when_docking_sensor_handler(self, packet):
if len(packet.payload) > 4:
# Update cached values
self.docking_sensor.contacts = packet.payload[4] != 0
self.docking_sensor.sensors = (packet.payload[5],
packet.payload[6],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you restore the indentation on this tuple, please?

packet.payload[7])
packet.payload[6],
packet.payload[7])

# Trigger general docking sensor event
for event in self._when_docking_sensor:
# TODO: Generate triggers instead of just firing for any event
# TODO: Define dock sensor Enum
await event.run(self)

# Trigger specific sensor events
if self.docking_sensor.contacts:
for event in self._docking_sensor_triggers[self.DockSensorEnum.CONTACT]:
await event.run(self)
else:
for event in self._docking_sensor_triggers[self.DockSensorEnum.NO_CONTACT]:
await event.run(self)

# Trigger IR sensor events
for i, sensor_value in enumerate(self.docking_sensor.sensors):
if sensor_value > 0: # Assuming 0 means no detection
for event in self._docking_sensor_triggers[self.DockSensorEnum(i + 2)]:
await event.run(self)

# Event Callbacks.

def when_docking_sensor(self, callback: Callable[[bool], Awaitable[None]]):
"""Register a callback for any docking sensor event"""
self._when_docking_sensor.append(Event(True, callback))

def when_docking_sensor_trigger(self, trigger: DockSensorEnum, callback: Callable[[bool], Awaitable[None]]):
"""Register a callback for a specific docking sensor trigger"""
self._docking_sensor_triggers[trigger].append(Event(True, callback))

# Commands.

async def get_ipv4_address(self) -> IPv4Addresses:
Expand Down Expand Up @@ -200,17 +260,32 @@ async def undock(self):
return None

async def get_docking_values(self):
"""Get docking values."""
# TODO: Harmonize access with cached value from events
"""Get docking values. Returns cached values if available, otherwise fetches from robot."""
# Return cached values if available
if hasattr(self.docking_sensor, 'contacts') and hasattr(self.docking_sensor, 'sensors'):
return {
'timestamp': 0, # We don't have timestamp in cached values
'contacts': self.docking_sensor.contacts,
'IR sensor 0': self.docking_sensor.sensors[0],
'IR sensor 1': self.docking_sensor.sensors[1],
'IR sensor 2': self.docking_sensor.sensors[2]
}

# If no cached values, fetch from robot
dev, cmd, inc = 19, 1, self.inc
completer = Completer()
self._responses[(dev, cmd, inc)] = completer
await self._backend.write_packet(Packet(dev, cmd, inc))
packet = await completer.wait(self.DEFAULT_TIMEOUT)
if packet:
unpacked = unpack('>IBBBBHHHH', packet.payload)
return {'timestamp': unpacked[0], 'contacts': unpacked[1], 'IR sensor 0': unpacked[2],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is much more readable.

'IR sensor 1': unpacked[3], 'IR sensor 2': unpacked[4]}
return {
'timestamp': unpacked[0],
'contacts': unpacked[1],
'IR sensor 0': unpacked[2],
'IR sensor 1': unpacked[3],
'IR sensor 2': unpacked[4]
}
return None

async def get_version_string(self) -> str:
Expand Down Expand Up @@ -243,3 +318,151 @@ async def get_cliff_sensors(self):
If there were a protocol getter, this would await that response when the cache is empty.
'''
return self.get_cliff_sensors_cached()

async def navigate_to_waypoint(self, waypoint: Waypoint) -> bool:
"""
Navigate to a specific waypoint with optional heading.
Returns True if navigation was successful, False otherwise.
"""
self._navigation_state.current_waypoint = waypoint
self._navigation_state.is_navigating = True

try:
# Check for obstacles before starting navigation
if await self._check_for_obstacles():
return False

# Navigate to the waypoint
await self.navigate_to(waypoint.x, waypoint.y, waypoint.heading)

# Verify we reached the waypoint within tolerance
if not await self._verify_waypoint_reached(waypoint):
return False

return True
finally:
self._navigation_state.is_navigating = False
self._navigation_state.current_waypoint = None

async def follow_waypoints(self, waypoints: List[Waypoint]) -> bool:
"""
Follow a sequence of waypoints.
Returns True if all waypoints were reached successfully.
"""
self._navigation_state.waypoints = waypoints.copy()

for waypoint in waypoints:
if not await self.navigate_to_waypoint(waypoint):
return False

return True

async def _check_for_obstacles(self) -> bool:
"""
Check for obstacles using IR proximity sensors.
Returns True if obstacles are detected, False otherwise.
"""
ir_proximity = await self.get_ir_proximity()
if ir_proximity is None:
return False

# Check if any sensor detects an obstacle within threshold
for sensor_value in ir_proximity.sensors:
if sensor_value > self.OBSTACLE_DETECTION_THRESHOLD:
self._navigation_state.obstacle_detected = True
self._navigation_state.last_obstacle_position = (self.pose.x, self.pose.y)
return True

self._navigation_state.obstacle_detected = False
return False

async def _verify_waypoint_reached(self, waypoint: Waypoint) -> bool:
"""
Verify that the robot has reached the waypoint within tolerance.
"""
if not self.USE_ROBOT_POSE:
return True

dx = self.pose.x - waypoint.x
dy = self.pose.y - waypoint.y
distance = math.sqrt(dx * dx + dy * dy)

if distance > waypoint.tolerance:
return False

if waypoint.heading is not None:
heading_diff = abs(self.pose.heading - waypoint.heading)
if heading_diff > waypoint.heading_tolerance:
return False

return True

async def avoid_obstacle(self) -> bool:
"""
Perform obstacle avoidance maneuver.
Returns True if avoidance was successful, False otherwise.
"""
if not self._navigation_state.obstacle_detected:
return True

# Get current position and heading
current_x, current_y = self.pose.x, self.pose.y
current_heading = self.pose.heading

# Calculate avoidance point
avoidance_distance = self.OBSTACLE_AVOIDANCE_DISTANCE
avoidance_angle = 45 # degrees

# Calculate new position for avoidance
new_heading = current_heading + avoidance_angle
new_x = current_x + avoidance_distance * math.cos(math.radians(new_heading))
new_y = current_y + avoidance_distance * math.sin(math.radians(new_heading))

# Create avoidance waypoint
avoidance_waypoint = Waypoint(
x=new_x,
y=new_y,
heading=new_heading,
tolerance=10.0 # Larger tolerance for avoidance
)

# Navigate to avoidance point
return await self.navigate_to_waypoint(avoidance_waypoint)

async def navigate_with_obstacle_avoidance(self, x: float, y: float, heading: Optional[float] = None) -> bool:
"""
Navigate to a point with obstacle avoidance.
Returns True if navigation was successful, False otherwise.
"""
waypoint = Waypoint(x=x, y=y, heading=heading)

while True:
# Check for obstacles
if await self._check_for_obstacles():
# Try to avoid obstacle
if not await self.avoid_obstacle():
return False
continue

# Navigate to waypoint
if await self.navigate_to_waypoint(waypoint):
return True

# If we failed to reach the waypoint, try obstacle avoidance
if not await self.avoid_obstacle():
return False

def get_navigation_state(self) -> NavigationState:
"""
Get the current navigation state.
"""
return self._navigation_state

async def stop_navigation(self):
"""
Stop the current navigation task.
"""
self._navigation_state.is_navigating = False
self._navigation_state.current_waypoint = None
self._navigation_state.waypoints = []
await self.stop()