Skip to content
14 changes: 10 additions & 4 deletions src/cfclient/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,29 @@ def _task_done_callback(task: asyncio.Task[object]) -> None:
return
try:
task.result()
except DisconnectedError:
logger.debug("Task interrupted by disconnect: %s", task)
except Exception:
import traceback

traceback.print_exc()
os._exit(1)


async def _wrap_disconnected(coro: Coroutine[object, object, object]) -> None:
try:
await coro
except DisconnectedError:
logger.debug("Task interrupted by disconnect: %r", coro)


def create_task(coro: Coroutine[object, object, object]) -> asyncio.Task[object]:
"""Schedule a coroutine as a task with automatic exception logging.

Use this instead of asyncio.ensure_future() to ensure exceptions
are logged immediately rather than silently swallowed.
are logged immediately rather than silently swallowed. DisconnectedError
is treated as a normal shutdown case and is logged at debug level only.
"""
logger.debug(f"create_task: {coro}")
task = asyncio.ensure_future(coro)
task = asyncio.ensure_future(_wrap_disconnected(coro))
task.add_done_callback(_task_done_callback)
return task

Expand Down
73 changes: 46 additions & 27 deletions src/cfclient/ui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@
The main file for the Crazyflie control application.
"""

from __future__ import annotations

import asyncio
import logging
import sys
from collections.abc import Callable, Coroutine
from concurrent.futures import Future
from typing import Any

import cfclient
from cfclient.gui import create_task
Expand All @@ -40,6 +45,7 @@
from cfclient.utils.config import Config
from cfclient.utils.config_manager import ConfigManager
from cfclient.utils.input import JoystickReader
from cfclient.utils.input.inputreaderinterface import InputReaderInterface
from cfclient.utils.ui import UiUtils
from cfclient.ui.dialogs.inputconfigdialogue import InputConfigDialogue
from cflib2 import Crazyflie, LinkContext
Expand Down Expand Up @@ -353,19 +359,21 @@ def set_preferred_dock_area(

# --- Event loop ---

def _on_event_loop_ready(self):
def _on_event_loop_ready(self) -> None:
self._loop = asyncio.get_event_loop()
self._scan(self._connectivity_manager.get_address())

# --- Commander pipeline ---

async def _safe_send(self, coro_fn):
async def _safe_send(
self, coro_fn: Callable[[], Coroutine[Any, Any, None]]
) -> None:
try:
await coro_fn()
except DisconnectedError:
pass

def _commander_future_cb(self, future):
def _commander_future_cb(self, future: Future[None]) -> None:
"""Log unexpected exceptions from commander coroutines."""
try:
future.result()
Expand All @@ -374,7 +382,9 @@ def _commander_future_cb(self, future):
except Exception:
logger.error("Unhandled exception in commander coroutine", exc_info=True)

def _send_setpoint(self, roll, pitch, yaw, thrust):
def _send_setpoint(
self, roll: float, pitch: float, yaw: float, thrust: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -386,7 +396,9 @@ def _send_setpoint(self, roll, pitch, yaw, thrust):
)
future.add_done_callback(self._commander_future_cb)

def _send_velocity_world(self, vx, vy, vz, yawrate):
def _send_velocity_world(
self, vx: float, vy: float, vz: float, yawrate: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -398,7 +410,9 @@ def _send_velocity_world(self, vx, vy, vz, yawrate):
)
future.add_done_callback(self._commander_future_cb)

def _send_zdistance(self, roll, pitch, yawrate, zdistance):
def _send_zdistance(
self, roll: float, pitch: float, yawrate: float, zdistance: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -412,7 +426,9 @@ def _send_zdistance(self, roll, pitch, yawrate, zdistance):
)
future.add_done_callback(self._commander_future_cb)

def _send_hover(self, vx, vy, yawrate, zdistance):
def _send_hover(
self, vx: float, vy: float, yawrate: float, zdistance: float
) -> None:
cf = self.cf
if self._disable_input or cf is None or self._loop is None:
return
Expand All @@ -424,13 +440,13 @@ def _send_hover(self, vx, vy, yawrate, zdistance):
)
future.add_done_callback(self._commander_future_cb)

def disable_input(self, disable):
def disable_input(self, disable: bool) -> None:
"""Disable gamepad input to allow a tab to send setpoints directly."""
self._disable_input = disable

# --- Emergency stop ---

def _emergency_stop(self):
def _emergency_stop(self) -> None:
if self.cf is not None:
create_task(self.cf.localization().emergency().send_emergency_stop())

Expand Down Expand Up @@ -566,6 +582,7 @@ async def _async_disconnect(self) -> None:
self._disconnect_watch_task.cancel()
self._disconnect_watch_task = None
if self.cf is not None:
logger.info(f"Disconnected from {self.cf.uri}")
self._notify_tabs_disconnected()
await self.cf.disconnect()
self.cf = None
Expand All @@ -586,25 +603,27 @@ def _notify_tabs_disconnected(self) -> None:
for tab_toolbox in self.loaded_tab_toolboxes.values():
tab_toolbox.disconnected()

async def _stream_battery(self, cf):
async def _stream_battery(self, cf: Crazyflie) -> None:
log = cf.log()
block = await log.create_block()
await block.add_variable("pm.vbat")
await block.add_variable("pm.state")
stream = await block.start(1000)
stream = None
try:
block = await log.create_block()
await block.add_variable("pm.vbat")
await block.add_variable("pm.state")
stream = await block.start(1000)
while True:
data = await stream.next()
self._battery_signal.emit(
data.data["pm.vbat"], int(data.data["pm.state"])
)
finally:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass
if stream is not None:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass

def _update_battery(self, vbat, state):
def _update_battery(self, vbat: float, state: int) -> None:
self.batteryBar.setValue(int(vbat * 1000))

color = UiUtils.COLOR_BLUE
Expand Down Expand Up @@ -680,16 +699,16 @@ def set_default_theme(self) -> None:

# --- Input device menu ---

def _show_input_device_config_dialog(self):
def _show_input_device_config_dialog(self) -> None:
self.inputConfig = InputConfigDialogue(self._joystick_reader)
self.inputConfig.show()

def _display_input_device_error(self, error):
def _display_input_device_error(self, error: str) -> None:
if self.cf is not None:
create_task(self._async_disconnect())
QMessageBox.critical(self, "Input device error", error)

def _mux_selected(self, checked):
def _mux_selected(self, checked: bool) -> None:
if not checked:
(mux, sub_nodes) = self.sender().data()
for s in sub_nodes:
Expand All @@ -709,7 +728,7 @@ def _mux_selected(self, checked):

self._update_input_device_footer()

def _get_dev_status(self, device):
def _get_dev_status(self, device: InputReaderInterface) -> str:
msg = "{}".format(device.name)
if device.supports_mapping:
map_name = "No input mapping"
Expand All @@ -718,7 +737,7 @@ def _get_dev_status(self, device):
msg += " ({})".format(map_name)
return msg

def _update_input_device_footer(self):
def _update_input_device_footer(self) -> None:
msg = ""

if len(self._joystick_reader.available_devices()) > 0:
Expand All @@ -738,7 +757,7 @@ def _update_input_device_footer(self):
msg = "No input device found"
self._statusbar_label.setText(msg)

def _inputdevice_selected(self, checked):
def _inputdevice_selected(self, checked: bool) -> None:
(map_menu, device, mux_menu) = self.sender().data()
if not checked:
if map_menu:
Expand Down Expand Up @@ -767,7 +786,7 @@ def _inputdevice_selected(self, checked):
)
self._update_input_device_footer()

def _inputconfig_selected(self, checked):
def _inputconfig_selected(self, checked: bool) -> None:
if not checked:
return

Expand All @@ -776,7 +795,7 @@ def _inputconfig_selected(self, checked):
self._joystick_reader.set_input_map(device.name, selected_mapping)
self._update_input_device_footer()

def device_discovery(self, devs):
def device_discovery(self, devs: list[InputReaderInterface]) -> None:
"""Called when new devices have been added"""
for menu in self._all_role_menus:
role_menu = menu["rolemenu"]
Expand Down
43 changes: 24 additions & 19 deletions src/cfclient/ui/pose_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
Sets up logging for the the full pose of the Crazyflie
"""

from __future__ import annotations

import asyncio
import logging
import math

from cflib2 import Crazyflie
from cflib2.error import DisconnectedError

from cfclient.gui import create_task
Expand Down Expand Up @@ -64,47 +67,48 @@ def __init__(self) -> None:
self._stream_task = None

@property
def position(self):
def position(self) -> tuple[float, ...]:
"""Get the position part of the full pose"""
return self.pose[0:3]

@property
def rpy(self):
def rpy(self) -> tuple[float, ...]:
"""Get the roll, pitch and yaw of the full pose in degrees"""
return self.pose[3:6]

@property
def rpy_rad(self):
def rpy_rad(self) -> list[float]:
"""Get the roll, pitch and yaw of the full pose in radians"""
return [
math.radians(self.pose[3]),
math.radians(self.pose[4]),
math.radians(self.pose[5]),
]

def start(self, cf):
def start(self, cf: Crazyflie) -> None:
"""Start streaming pose data from the Crazyflie."""
self._stream_task = create_task(self._stream_loop(cf))

def stop(self):
def stop(self) -> None:
"""Stop streaming pose data."""
if self._stream_task is not None:
self._stream_task.cancel()
self._stream_task = None
self.pose = self.NO_POSE

async def _stream_loop(self, cf):
async def _stream_loop(self, cf: Crazyflie) -> None:
log = cf.log()
block = await log.create_block()
await block.add_variable(self.LOG_NAME_ESTIMATE_X)
await block.add_variable(self.LOG_NAME_ESTIMATE_Y)
await block.add_variable(self.LOG_NAME_ESTIMATE_Z)
await block.add_variable(self.LOG_NAME_ESTIMATE_ROLL)
await block.add_variable(self.LOG_NAME_ESTIMATE_PITCH)
await block.add_variable(self.LOG_NAME_ESTIMATE_YAW)

stream = await block.start(40) # 40ms period
stream = None
try:
block = await log.create_block()
await block.add_variable(self.LOG_NAME_ESTIMATE_X)
await block.add_variable(self.LOG_NAME_ESTIMATE_Y)
await block.add_variable(self.LOG_NAME_ESTIMATE_Z)
await block.add_variable(self.LOG_NAME_ESTIMATE_ROLL)
await block.add_variable(self.LOG_NAME_ESTIMATE_PITCH)
await block.add_variable(self.LOG_NAME_ESTIMATE_YAW)

stream = await block.start(40) # 40ms period
while True:
data = await stream.next()
self.pose = (
Expand All @@ -117,7 +121,8 @@ async def _stream_loop(self, cf):
)
self.data_received_cb.call(self, self.pose)
finally:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass
if stream is not None:
try:
await asyncio.shield(stream.stop())
except (DisconnectedError, asyncio.CancelledError):
pass
Loading