diff --git a/robocon/__init__.py b/robocon/__init__.py new file mode 100644 index 0000000..bcd30a0 --- /dev/null +++ b/robocon/__init__.py @@ -0,0 +1,18 @@ +from robocon.game import ( + TEAM, + TARGET_TYPE, + MARKER, + TARGET_MARKER, + MARKER_TYPE, + BASE_MARKER, + ARENA_MARKER) + +__all__ = ( + "TEAM", + "TARGET_TYPE", + "MARKER", + "TARGET_MARKER", + "MARKER_TYPE", + "BASE_MARKER", + "ARENA_MARKER", +) diff --git a/robocon/brain/__init__.py b/robocon/brain/__init__.py new file mode 100644 index 0000000..052af96 --- /dev/null +++ b/robocon/brain/__init__.py @@ -0,0 +1,25 @@ +import importlib.util + +has_rpi = importlib.util.find_spec("RPi") is not None + +if not has_rpi: + import sys + import fake_rpi + sys.modules["RPi"] = fake_rpi.RPi + sys.modules["RPi.GPIO"] = fake_rpi.RPi.GPIO + sys.modules["smbus2"] = fake_rpi.smbus + +import sys + +# greengiant imports for users +from robocon.brain.greengiant import OUTPUT, INPUT, INPUT_ANALOG, INPUT_PULLUP, PWM_SERVO +from robocon.brain.io import IO + +__all__ = ( + "IO", + "OUTPUT", + "INPUT", + "INPUT_ANALOG", + "INPUT_PULLUP", + "PWM_SERVO", +) diff --git a/robocon/brain/cytron.py b/robocon/brain/cytron.py new file mode 100644 index 0000000..c91f420 --- /dev/null +++ b/robocon/brain/cytron.py @@ -0,0 +1,80 @@ +""" +An interface to the cyctron motor board. A GPIO pin is used for each motor +to give direction and has a PWM signal at 100Hz giving infomation about voltage +to apply +""" + +import RPi.GPIO as GPIO +from .greengiant import clamp + +_MAX_OUTPUT_VOLTAGE = 12 + +_PWM_PIN_1 = 12 +_PWM_PIN_2 = 13 +_DIR_PIN_1 = 26 +_DIR_PIN_2 = 24 + +class CytronBoard: + def __init__(self, max_motor_voltage): + """ + The interface to the CytronBoard + max_motor_voltage - The motors will be scaled so that this is the maxium + average voltage the Cytron will output + """ + if not (0 <= max_motor_voltage <= 12): + raise ValueError("max_motor_voltage must satisfy 0 <= " + "max_motor_voltage <= 12 but instead is " + f"{max_motor_voltage}") + + # because we care about heating effects in the motors, we have to scale by + # the square of the ratio + self.power_scaling_factor = ( + max_motor_voltage / _MAX_OUTPUT_VOLTAGE) ** 2 + + self._dir = [ + [GPIO.LOW, _DIR_PIN_1], + [GPIO.LOW, _DIR_PIN_2], + ] + self._pwm = [ + [0, GPIO.PWM(_PWM_PIN_1, 100)], + [0, GPIO.PWM(_PWM_PIN_2, 100)], + ] + + GPIO.setmode(GPIO.BCM) + GPIO.setup(_DIR_PIN_1, GPIO.OUT) + GPIO.setup(_DIR_PIN_2, GPIO.OUT) + GPIO.setup(_PWM_PIN_1, GPIO.OUT) + GPIO.setup(_PWM_PIN_2, GPIO.OUT) + + def __getitem__(self, index): + """Returns current motor PWM value as a percentage""" + if index not in (0, 1): + raise IndexError( + f"Motor index must be in (0,1) but instead got {index}") + + return self._pwm[index][0] + + def __setitem__(self, index, percent): + """Set current motor PWM percentage value""" + if index not in (0, 1): + raise IndexError( + f"Motor index must be in (0,1) but instead got {index}") + + if percent < 0: + self._dir[index][0] = GPIO.LOW + else: + self._dir[index][0] = GPIO.HIGH + + GPIO.output(self._dir[index][1], self._dir[index][0]) + + percent = clamp(percent, -100, 100) + self._pwm[index][0] = percent + + percent = abs(percent) * self.power_scaling_factor + self._pwm[index][1].start(percent) + + def stop(self): + """Turns motors off""" + for i in range(len(self._pwm)): + self._pwm[i][0] = 0 + self._pwm[i][1].start(0) diff --git a/robot/greengiant.py b/robocon/brain/greengiant.py similarity index 99% rename from robot/greengiant.py rename to robocon/brain/greengiant.py index f2b24f8..a44cac3 100644 --- a/robot/greengiant.py +++ b/robocon/brain/greengiant.py @@ -234,15 +234,14 @@ def set_5v_acc_power(self, new_state): self._bus.write_byte_data(_GG_I2C_ADDR, _GG_ENABLE_5V_ACC, int(new_state)) else: # for GG versions 5v power is always enabled - raise IOError(f"Attempted to set 5v power to {new_state} on an unsupported BrainBox.") + print(f"WARN: Attempted to set 5v power to {new_state} on an unsupported BrainBox.") def get_5v_acc_power(self): if self._version >= 10: return bool(self._bus.read_byte_data(_GG_I2C_ADDR, _GG_ENABLE_5V_ACC)) else: # for GG versions 5v power is always enabled - raise IOError(f"Attempted to get 5v power on an unsupported BrainBox.") - + print(f"WARN: Attempted to set 5v power to {new_state} on an unsupported BrainBox.") def set_user_led(self, on): self._bus.write_byte_data(_GG_I2C_ADDR, _GG_USER_LED, int(on)) diff --git a/robocon/brain/io.py b/robocon/brain/io.py new file mode 100644 index 0000000..9cdc37c --- /dev/null +++ b/robocon/brain/io.py @@ -0,0 +1,129 @@ +import sys +import logging +from smbus2 import SMBus + +from robocon.brain.cytron import CytronBoard +from robocon.brain.greengiant import ( + GreenGiantInternal, + GreenGiantGPIOPinList, + GreenGiantMotors, + _GG_SERVO_PWM_BASE, + _GG_GPIO_PWM_BASE, + _GG_GPIO_GPIO_BASE, + _GG_SERVO_GPIO_BASE) + +_logger = logging.getLogger("robot_io") + +def setup_logging(level): + """Display the just the message when logging events + Sets the logging level to `level`""" + _logger.setLevel(level) + + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + + fmt = logging.Formatter("%(message)s") + handler.setFormatter(fmt) + + _logger.addHandler(handler) + +class IO(): + _initialised = False + + def __init__(self, max_motor_voltage=6, enable_12v=True, enable_5v=True, log_level=logging.INFO): + self._max_motor_voltage = max_motor_voltage + + self._initialised = False + self._warnings = [] + + setup_logging(log_level) + + if type(self)._initialised: + raise RuntimeError("IO object acquires hardware locks for its" + " sole use and so can only be used once.") + + self.bus = SMBus(1) + self._green_giant = GreenGiantInternal(self.bus) + self._gg_version = self._green_giant.get_version() + if self._gg_version >= 10: + # enable power rails + self._green_giant.set_motor_power(True) + self.enable_12v = enable_12v + self.enable_5v = enable_5v + self._adc_max = 5 + # configure User IO Ports + self.servos = GreenGiantGPIOPinList(self.bus, self._gg_version, self._adc_max, _GG_SERVO_GPIO_BASE, _GG_SERVO_PWM_BASE) + self.gpio = GreenGiantGPIOPinList(self.bus, self._gg_version, self._adc_max, _GG_GPIO_GPIO_BASE, _GG_GPIO_PWM_BASE) + # configure motor drivers + self.motors = GreenGiantMotors(self.bus, self._max_motor_voltage) + ## thinks, perhaps this should be inherrent to using the motors and + ## open load detection can be in there? + self.motors.enable_motors(True) + else: + # power rails + self._green_giant.set_motor_power(True) + self._adc_max = self._green_giant.get_fvr_reading() + # user IO + self.servos = GreenGiantGPIOPinList(self.bus, self._gg_version, None, None, _GG_SERVO_PWM_BASE) + self.gpio = GreenGiantGPIOPinList(self.bus, self._gg_version, self._adc_max, _GG_GPIO_GPIO_BASE , None) + # configure motor drivers + self.motors = CytronBoard(self._max_motor_voltage) + + type(self)._initialised = True + + @property + def enable_motors(self): + """Return if motors are currently enabled + + For the GG board this will be the state of the 12v line, which we cannot query, + so return what it was set to. + + For the PiLow series the Motors have both a power control and a enable. Generally + the Power should not be switched on and off, just the enable bits. The power may + be tripped in extreme circumstances. I guess that here we want to report any + reason for the motors not working, which includes power and enable + + """ + if self._gg_version < 10: + return self._green_giant.enable_12v + else: + return self._green_giant.get_motorpwr() and self._green_giant.get_enable() + + @enable_motors.setter + def enable_motors(self, on): + """An nice alias for set_12v""" + if self._version < 10: + return self._green_giant.enable_motors(on) + + @property + def enable_12v(self): + return self._green_giant.get_12v_acc_power() + + @enable_12v.setter + def enable_12v(self, on): + self._green_giant.set_12v_acc_power(on) + + @property + def enable_5v(self): + return self._green_giant.get_5v_acc_power() + + @enable_5v.setter + def enable_5v(self, on): + self._green_giant.set_5v_acc_power(on) + + def stop(self): + """Stops the robot and cuts power to the motors. + + does not touch the servos position. + """ + self.enable_12v = False + self.motors.stop() + + def set_user_led(self, val=True): + self._green_giant.set_user_led(val) + + def __del__(self): + """Frees hardware resources held by the vision object""" + logging.warning("Destroying robot object") + type(self)._initialised = False + diff --git a/robot/reset.py b/robocon/brain/reset.py similarity index 84% rename from robot/reset.py rename to robocon/brain/reset.py index a8f907a..ff64492 100644 --- a/robot/reset.py +++ b/robocon/brain/reset.py @@ -10,8 +10,8 @@ https://stackoverflow.com/a/45799209/5006710 """ from smbus2 import SMBus -import robot.cytron as c -import robot.greengiant as gg +import .cytron as c +import .greengiant as gg def reset(): @@ -19,7 +19,8 @@ def reset(): Used by Shepherd when the Stop button is pressed. """ bus = SMBus(1) - version = gg.GreenGiantInternal(bus).get_version() + internal = gg.GreenGiantInternal(bus) + version = internal.get_version() if version < 10: c.CytronBoard(1).stop() @@ -31,10 +32,8 @@ def reset(): gg.GreenGiantGPIOPinList(bus, version, 5, gg._GG_GPIO_GPIO_BASE, gg._GG_GPIO_PWM_BASE).off() # probably should wrap this all up in a .off() - internal = gg.GreenGiantInternal(bus) internal.enable_motors(False) - #internal.set_motor_power(False) - internal.set_12v_acc_power(False) # Not sure, should this be controlled by user? + internal.set_12v_acc_power(False) internal.set_5v_acc_power(False) internal.set_user_led(False) diff --git a/robot/game_config/__init__.py b/robocon/game/__init__.py similarity index 89% rename from robot/game_config/__init__.py rename to robocon/game/__init__.py index 60f778c..f97ba60 100644 --- a/robot/game_config/__init__.py +++ b/robocon/game/__init__.py @@ -17,11 +17,8 @@ BLUE = (255, 0, 0) # Blue WHITE = (255, 255, 255) # White -SECTOR = TEAM # 2026 ONLY, ALIAS `TEAM` AS `SECTOR` - __all__ = ( "TEAM", - "SECTOR", "TARGET_TYPE", "MARKER", "TARGET_MARKER", diff --git a/robot/game_config/markers.py b/robocon/game/markers.py similarity index 100% rename from robot/game_config/markers.py rename to robocon/game/markers.py diff --git a/robot/game_config/startup_poems.py b/robocon/game/startup_poems.py similarity index 100% rename from robot/game_config/startup_poems.py rename to robocon/game/startup_poems.py diff --git a/robot/game_config/targets.py b/robocon/game/targets.py similarity index 100% rename from robot/game_config/targets.py rename to robocon/game/targets.py diff --git a/robot/game_config/teams.py b/robocon/game/teams.py similarity index 100% rename from robot/game_config/teams.py rename to robocon/game/teams.py diff --git a/robocon/vision/__init__.py b/robocon/vision/__init__.py new file mode 100644 index 0000000..251192f --- /dev/null +++ b/robocon/vision/__init__.py @@ -0,0 +1,25 @@ +import importlib.util + +has_picamera2 = importlib.util.find_spec("picamera2") is not None + +if not has_picamera2: + import sys + import robocon.vision.mock_picamera2 as mock_picamera2 + sys.modules["picamera2"] = mock_picamera2 + +import sys + +from robocon.vision.camera import Camera, NoCameraPresent +from robocon.vision.vision import RoboConUSBCamera + +MINIUM_VERSION = (3, 6) +if sys.version_info <= MINIUM_VERSION: + raise ImportError( + "Expected python {} but instead got {}".format(MINIUM_VERSION, sys.version_info) + ) + +__all__ = ( + "Camera", + "NoCameraPresent", + "RoboConUSBCamera", +) diff --git a/robot/apriltags3.py b/robocon/vision/apriltags3.py old mode 100755 new mode 100644 similarity index 99% rename from robot/apriltags3.py rename to robocon/vision/apriltags3.py index ff344fa..d0f657c --- a/robot/apriltags3.py +++ b/robocon/vision/apriltags3.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python """Python wrapper for C version of apriltags. This program creates two classes that are used to detect apriltags and extract information from them. Using this module, you can identify all apriltags visible in an @@ -21,7 +20,7 @@ import numpy as np import scipy.spatial.transform as transform -from robot.game_config import MARKER, MARKER_TYPE +from robocon.game import MARKER, MARKER_TYPE ###################################################################### diff --git a/robocon/vision/camera.py b/robocon/vision/camera.py new file mode 100644 index 0000000..7d53f31 --- /dev/null +++ b/robocon/vision/camera.py @@ -0,0 +1,86 @@ +""" +The module containing the `Robot` class + +Mainly provides init routine for the brain and binds attributes of the `Robot` +class to their respecitve classes +""" +import json +import sys +import optparse +import os +import logging +import time +import threading +import random +import typing + +from datetime import datetime +from robocon.game import TEAM, POEM_ON_STARTUP +from . import vision + +from hopper import HopperPipe, HopperPipeType, JsonReader + +_logger = logging.getLogger("robot_vision") + +def setup_logging(level): + """Display the just the message when logging events + Sets the logging level to `level`""" + _logger.setLevel(level) + + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(level) + + fmt = logging.Formatter("%(message)s") + handler.setFormatter(fmt) + + _logger.addHandler(handler) + + +class NoCameraPresent(Exception): + """Camera not connected.""" + + def __str__(self): + return "No camera found." + +class Camera(): + _initialised = False + + def __init__(self, camera=None, log_level=logging.INFO): + self._initialised = False + self._warnings = [] + + self._image_pipe = HopperPipe(HopperPipeType.IN, "robot", "camera") + self._image_pipe.open() + + setup_logging(log_level) + + if type(self)._initialised: + raise RuntimeError("Camera object can only be initialised once") + + self.camera = vision.RoboConPiCamera() if camera is None else camera() + if not isinstance(self.camera, vision.Camera): + raise ValueError("camera must inherit from vision.Camera") + self.res = self.camera.res + + self._vision = vision.Vision(self.zone, camera=self.camera, image_pipe=self._image_pipe) + + type(self)._initialised = True + + def see(self) -> vision.Detections: + """Take a photo, detect markers in scene, attach RoboCon specific + properties""" + return self._vision.detect_markers() + + def __del__(self): + """Frees hardware resources held by the vision object""" + try: + self._image_pipe.close() + except: + pass + + # If vision never was initialised this creates confusing errors + # so check that it is initialised first + if hasattr(self, "_vision"): + self._vision.stop() + type(self)._initialised = False + diff --git a/robocon/vision/mock_picamera2.py b/robocon/vision/mock_picamera2.py new file mode 100644 index 0000000..5ed8ba3 --- /dev/null +++ b/robocon/vision/mock_picamera2.py @@ -0,0 +1,38 @@ +class Picamera2: + """ Mock Picamera2 class """ + + ERROR = None + + camera_properties = {"Model": "Mock Picamera2"} + + def __init__(self): + print("mock_picamera2.Picamera2.__init__()") + + def configure(self, _): + print("mock_picamera2.Picamera2.configure(_)") + + def start(self): + print("mock_picamera2.Picamera2.start()") + pass + + def stop(self): + print("mock_picamera2.Picamera2.stop()") + pass + + def close(self): + print("mock_picamera2.Picamera2.close()") + pass + + def capture_array(self): + print("mock_picamera2.Picamera2.capture_array(): None") + return None + + def create_still_configuration(self, _): + print("mock_picamera2.Picamera2.create_still_configuration(_): {}") + return {} + + @staticmethod + def set_logging(_): + print("mock_picamera2.Picamera2.set_logging(_)") + +print("<<< WARN: using mock picamera2 >>>") diff --git a/robot/vision.py b/robocon/vision/vision.py old mode 100755 new mode 100644 similarity index 97% rename from robot/vision.py rename to robocon/vision/vision.py index 4b1d211..7bc6f5a --- a/robot/vision.py +++ b/robocon/vision/vision.py @@ -2,6 +2,7 @@ postprocessing on the data to make it accessible to the user """ import abc +import base64 import logging import os import threading @@ -11,20 +12,13 @@ from datetime import datetime from typing import NamedTuple, Any - -from robot.game_config import MARKER, WHITE -from .game_config import BASE_MARKER as MarkerInfo +from robocon.game import MARKER, WHITE, BASE_MARKER as MarkerInfo import cv2 import numpy as np import picamera2 -import robot.apriltags3 as AT - - -# TODO put all of the paths together -IMAGE_TO_SHEPHERD_PATH = "/home/pi/shepherd/shepherd/static/image.jpg" - +import robocon.vision.apriltags3 as AT class Marker(): """A class to automatically pull the dis and bear_y out of the detection""" @@ -165,7 +159,7 @@ class RoboConPiCamera(Camera): def __init__(self, start_res=None, focal_lengths=None): os.environ["LIBCAMERA_LOG_LEVELS"] = "3" - picamera2.Picamera2.set_logging(picamera2.Picamera2.ERROR) + picamera2.Picamera2.set_logging(logging.ERROR) self._pi_camera = picamera2.Picamera2() # should test if the camera exists here, and give a nice warning self.camera_model = self._pi_camera.camera_properties['Model'] @@ -202,7 +196,7 @@ def __init__(self, start_res=None, focal_lengths=None): else: print("unknown camera: " + self._pi_camera.camera_properties) - self._pi_camera.set_logging(picamera2.Picamera2.ERROR) + self._pi_camera.set_logging(logging.ERROR) self._pi_camera_resolution = start_res # we store this - WHY? self._camera_config = self._pi_camera.create_still_configuration( main={"size": start_res, "format": 'RGB888'}) @@ -323,6 +317,7 @@ class PostProcessor(threading.Thread): def __init__(self, owner, zone, + image_pipe, bounding_box_thickness=5, bounding_box=True, usb_stick=False, @@ -333,6 +328,7 @@ def __init__(self, self._owner = owner self.zone = zone + self._image_pipe = image_pipe self._bounding_box_thickness = bounding_box_thickness self._bounding_box = bounding_box self._usb_stick = usb_stick @@ -417,7 +413,8 @@ def run(self): if self._bounding_box: frame = self._draw_bounding_box(frame, detections) if self._save: - cv2.imwrite(IMAGE_TO_SHEPHERD_PATH, frame) + encoded_img = base64.b64encode(cv2.imencode(".jpg", frame)[1]) + b'\n' + self._image_pipe.write(encoded_img) if self._usb_stick: self._write_to_usb(capture, detections) if self._send_to_sheep: @@ -430,6 +427,7 @@ class Vision(): def __init__(self, zone, + image_pipe, at_path=_AT_PATH, max_queue_size=4, camera=None): @@ -453,7 +451,7 @@ def __init__(self, self.camera = camera self.frames_to_postprocess = queue.Queue(max_queue_size) - self.post_processor = PostProcessor(self, zone=self.zone) + self.post_processor = PostProcessor(self, zone=self.zone, image_pipe=image_pipe) def stop(self): """Cleanup to prevent leaking hardware resource""" diff --git a/robot/__init__.py b/robot/__init__.py deleted file mode 100644 index 0c1c77f..0000000 --- a/robot/__init__.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/python3 -"""The robot module, provides an python interface to the RoboCon hardware and -April tags a marker recognition system. Also performs convince functions for use -by shepherd""" - -import importlib - -has_picamera = importlib.find_loader("picamera") is not None - -if not has_picamera: - import sys - import fake_rpi - - sys.modules["RPi"] = fake_rpi.RPi - sys.modules["RPi.GPIO"] = fake_rpi.RPi.GPIO - sys.modules["picamera"] = fake_rpi.picamera - sys.modules["smbus2"] = fake_rpi.smbus - -import sys - -# This log import configures our logging for us, but we don't want to -# provide it as part of this package. -import robot.log - -from robot.wrapper import Robot, NoCameraPresent -from robot.greengiant import OUTPUT, INPUT, INPUT_ANALOG, INPUT_PULLUP, PWM_SERVO -from robot.vision import RoboConUSBCamera -from robot.game_config import ( - MARKER, - BASE_MARKER, - ARENA_MARKER, - TARGET_MARKER, - MARKER_TYPE, - TARGET_TYPE, - TEAM, - SECTOR -) - - -MINIUM_VERSION = (3, 6) -if sys.version_info <= MINIUM_VERSION: - raise ImportError( - "Expected python {} but instead got {}".format(MINIUM_VERSION, sys.version_info) - ) - -__all__ = ( - "Robot", - "NoCameraPresent", - "OUTPUT", - "INPUT", - "INPUT_ANALOG", - "INPUT_PULLUP", - "PWM_SERVO", - "MARKER", - "BASE_MARKER", - "ARENA_MARKER", - "TARGET_MARKER", - "MARKER_TYPE", - "TARGET_TYPE", - "TEAM", - "SECTOR", - "RoboConUSBCamera" -) diff --git a/robot/cytron.py b/robot/cytron.py deleted file mode 100644 index 55d1e97..0000000 --- a/robot/cytron.py +++ /dev/null @@ -1,98 +0,0 @@ -"""An interface to the cyctron motor board. A gpio pin is used for each motor -to give direction and has a PWM signal at 100Hz giving infomation about voltage -to apply -""" -import wiringpi as wp -from robot.greengiant import clamp - -_MAX_OUTPUT_VOLTAGE = 12 - -_PWM_PIN_1 = 26 -_PWM_PIN_2 = 23 -_DIR_PIN_1 = 25 -_DIR_PIN_2 = 5 - -_WP_OUT = 1 -_WP_PWM = 2 - -# Wiring pi's PWM has range 0-1024 but we want to present a range of 0-100 -_WP_PWM_MAX = 1024 -_RC_PWM_MAX = 100 - - -def wp_to_rc_pwm(wp_pwm): - """Convert from wiring pi's numbering to percentages""" - return (wp_pwm * _RC_PWM_MAX)/_WP_PWM_MAX - - -def rc_to_wp_pwm(rc_pwm): - """Convert from percenatges to wiring pi's numbering""" - return int((rc_pwm * _WP_PWM_MAX)/_RC_PWM_MAX) - - -class CytronBoard(): - def __init__(self, max_motor_voltage): - """The interface to the CytronBoard - max_motor_voltage - The motors will be scaled so that this is the maxium - average voltage the Cyctron will output - """ - # Set up Wiring Pi following the wiring pi numbering scheme - if wp.wiringPiSetup() != 0: - raise RuntimeError("Failed to init Wiring Pi") - - if not (0 <= max_motor_voltage <= 12): - raise ValueError("max_motor_voltage must satisfy 0 <= " - "max_motor_voltage <= 12 but instead is " - f"{max_motor_voltage}") - - # because we care about heating effects in the motors, we have to scale by - # the square of the ratio - self.power_scaling_factor = ( - max_motor_voltage / _MAX_OUTPUT_VOLTAGE) ** 2 - - self._percentages = [0, 0] - self._dir = [_DIR_PIN_1, _DIR_PIN_2] - self._pwm_pins = [_PWM_PIN_1, _PWM_PIN_2] - - wp.pinMode(_DIR_PIN_1, _WP_OUT) - wp.pinMode(_DIR_PIN_2, _WP_OUT) - wp.pinMode(_PWM_PIN_1, _WP_PWM) - wp.pinMode(_PWM_PIN_2, _WP_PWM) - - wp.pwmSetClock(3000) - for pin in self._pwm_pins: - wp.pwmWrite(pin, 0) - - def __getitem__(self, index): - """Returns the current PWM value in RC units. Adds a sign to represent""" - if index not in (1, 2): - raise IndexError( - f"motor index must be in (1,2) but instead got {index}") - - index -= 1 - return self._percentages[index] - - def __setitem__(self, index, percent): - """Clamps input value, converts from percentage to wiring pi format and - sets a PWM format""" - if index not in (1, 2): - raise IndexError( - f"motor index must be in (1,2) but instead got {index}") - - index -= 1 - percent = clamp(percent, -100, 100) - self._percentages[index] = percent - - direction = (percent < 0) - wp.digitalWrite(self._dir[index], direction) - - # Scale such that 50% with a motor limit of 6V is really 3V - scaled_value = abs(percent) * self.power_scaling_factor - wp_value = rc_to_wp_pwm(scaled_value) - wp.pwmWrite(self._pwm_pins[index], wp_value) - - def stop(self): - """Turns motors off""" - for pwm_pin, percentage in zip(self._pwm_pins, self._percentages): - wp.pwmWrite(pwm_pin, 0) - percentage = 0 diff --git a/robot/log.py b/robot/log.py deleted file mode 100644 index 3d9d2e2..0000000 --- a/robot/log.py +++ /dev/null @@ -1,15 +0,0 @@ -# pylint: skip-file -# TODO is this dead code? -import logging - -# Python 2.6's logging doesn't have NullHandler - - -class NullHandler(logging.Handler): - def emit(self, record): - pass - - -# Default to sending our log messages nowhere -logger = logging.getLogger("sr") -logger.addHandler(NullHandler()) diff --git a/robot/wrapper.py b/robot/wrapper.py deleted file mode 100644 index 3f1711d..0000000 --- a/robot/wrapper.py +++ /dev/null @@ -1,330 +0,0 @@ -""" -The module containing the `Robot` class - -Mainly provides init routine for the brain and binds attributes of the `Robot` -class to their respecitve classes -""" -import json -import sys -import optparse -import os -import logging -import time -import threading -import random -import typing - -from datetime import datetime -from smbus2 import SMBus - -from robot import vision -from robot.cytron import CytronBoard -from robot.greengiant import GreenGiantInternal, GreenGiantGPIOPinList, GreenGiantMotors, _GG_SERVO_PWM_BASE, _GG_GPIO_PWM_BASE, _GG_GPIO_GPIO_BASE, _GG_SERVO_GPIO_BASE -from robot.game_config import TEAM -from . import game_config -from robot.game_config import POEM_ON_STARTUP - -from hopper.client import * -from hopper.common import * - -_logger = logging.getLogger("robot") - -# path to file with status of USB program copy, -# if this exists it is because the code on the robot has been copied from the robotusb -# this boot cycle. This is to highlight weird behaviour in the arena -COPY_STAT_FILE = "/tmp/usb_file_uploaded" - -def setup_logging(level): - """Display the just the message when logging events - Sets the logging level to `level`""" - _logger.setLevel(level) - - handler = logging.StreamHandler(sys.stdout) - handler.setLevel(level) - - fmt = logging.Formatter("%(message)s") - handler.setFormatter(fmt) - - _logger.addHandler(handler) - - -class NoCameraPresent(Exception): - """Camera not connected.""" - - def __str__(self): - return "No camera found." - - -class Robot(): - """Class for initialising and accessing robot hardware""" - - _initialised = False - - def __init__(self, - wait_for_start=True, - camera=None, - max_motor_voltage=6, - logging_level=logging.INFO, - start_enable_12v = True, - start_enable_5v = True, - ): - - self.zone = game_config.TEAM.RED - self.mode = "competition" - self._max_motor_voltage = max_motor_voltage - - self._initialised = False - self._start_pressed = False - self._warnings = [] - - # Initialize a RcMuxClient and open the start pipe - self._hopper_client = HopperClient() - self._start_pipe = PipeName((PipeType.OUTPUT, "start-button", "robot"), "/home/pi/pipes") - self._hopper_client.open_pipe(self._start_pipe, delete=True, create=True, blocking=True) # Make sure to use blocking mode, otherwise start button code fails - - self._log_pipe = PipeName((PipeType.INPUT, "log", "robot"), "/home/pi/pipes") - self._json_reader = JsonReader(self._hopper_client, self._start_pipe) - - # Close stdout and stderr - os.close(1) - os.close(2) - - # ...and open a pipe in its place - self._hopper_client.open_pipe(self._log_pipe, delete=True, create=True) - os.dup(self._hopper_client.get_pipe_by_pipe_name(self._log_pipe).fd) - - self._parse_cmdline() - - setup_logging(logging_level) - - # check if copy stat file exists and read it if it does then delete it - try: - with open(COPY_STAT_FILE, "r") as f: - _logger.info("Robot code copied %s from USB\n", f.read().strip()) - os.remove(COPY_STAT_FILE) - except IOError: - pass - - self.subsystem_init(camera, start_enable_12v, start_enable_5v) - self.report_hardware_status() - type(self)._initialised = True - - # Allows for the robot object to be set up and mutated before being set - # up. Dangerous as it means the start info might not get loaded - # depending on user code. - if wait_for_start is True: - start_data = self.wait_start() - self.zone = start_data['zone'] - self.mode = start_data['mode'] - else: - _logger.warning("Robot initalized but usercode running before" - "`robot.wait_start`. Robot will not wait for the " - "start button until `robot.wait_start` is called.") - - def subsystem_init(self, camera, start_enable_12v, start_enable_5v): - """Allows for initalisation of subsystems after instansating `Robot()` - Can only be called once""" - if type(self)._initialised: - raise RuntimeError("Robot object is acquires hardware locks for its" - " sole use and so can only be used once.") - - self.bus = SMBus(1) - self._green_giant = GreenGiantInternal(self.bus) - self._gg_version = self._green_giant.get_version() - if self._gg_version >= 10: - # enable power rails - self._green_giant.set_motor_power(True) - self.enable_12v = start_enable_12v - self.enable_5v = start_enable_5v - self._adc_max = 5 - # configure User IO Ports - self.servos = GreenGiantGPIOPinList(self.bus, self._gg_version, self._adc_max, _GG_SERVO_GPIO_BASE, _GG_SERVO_PWM_BASE) - self.gpio = GreenGiantGPIOPinList(self.bus, self._gg_version, self._adc_max, _GG_GPIO_GPIO_BASE, _GG_GPIO_PWM_BASE) - # configure motor drivers - self.motors = GreenGiantMotors(self.bus, self._max_motor_voltage) - ## thinks, perhaps this should be inherrent to using the motors and - ## open load detection can be in there? - self.motors.enable_motors(True) - else: - # power rails - self._green_giant.set_motor_power(True) - self._adc_max = self._green_giant.get_fvr_reading() - # user IO - self.servos = GreenGiantGPIOPinList(self.bus, self._gg_version, None, None, _GG_SERVO_PWM_BASE) - self.gpio = GreenGiantGPIOPinList(self.bus, self._gg_version, self._adc_max, _GG_GPIO_GPIO_BASE , None) - # configure motor drivers - self.motors = CytronBoard(self._max_motor_voltage) - - self.camera = vision.RoboConPiCamera() if camera is None else camera() - if not isinstance(self.camera, vision.Camera): - raise ValueError("camera must inherit from vision.Camera") - self.res = self.camera.res - - self._vision = vision.Vision(self.zone, camera=self.camera) - - def report_hardware_status(self): - """Print out a nice log message at the start of each robot init with - the hardware status""" - - battery_voltage = self._green_giant.get_battery_voltage() - battery_str = "Battery Voltage: %.2fv" % battery_voltage - # GG cannot read voltages above 12.2v - if battery_voltage > 12.2: - battery_str = "Battery Voltage: > 12.2v" - elif battery_voltage < 11.5: - self._warnings.append("Battery voltage below 11.5v, consider changing for a charged battery") - - if self._gg_version < 3: - self._warnings.append( - "Green Giant version not 3 but instead {}".format(self._gg_version)) - - camera_type_str = "Camera: {}".format( - self.camera.__class__.__name__) - - # Adds a secret every now and again! - POEM_ON_STARTUP.on_startup(_logger,random) - - # print report of hardware - _logger.info("------HARDWARE REPORT------") - #_logger.info("Time: %s", datetime.now().strftime('%Y-%m-%d %H:%M:%S')) - # no RTC on new boards, perhaps use a "run number" increment instead? - _logger.info("Patch Version: ") - _logger.info(battery_str) - #_logger.info("ADC Max: %.2fv", self._adc_max) - _logger.info("Robocon Board: Yes (v%d)", self._gg_version) - if self._gg_version <= 3: - _logger.info("Motor Driver: Cytron Board") - else: - _logger.info("Motor Driver: PiLow Cover") - # check and report open load here, warning race condition, is battery power on long enough? - # Thinks that we just motor enable/disable and leave motor power always on? - _logger.info(camera_type_str) - _logger.info("---------------------------") - - for warning in self._warnings: - _logger.warning("WARNING: %s", warning) - - if not self._warnings: - _logger.info("Hardware looks good") - - @property - def enable_motors(self): - """Return if motors are currently enabled - - For the GG board this will be the state of the 12v line, which we cannot query, - so return what it was set to. - - For the PiLow series the Motors have both a power control and a enable. Generally - the Power should not be switched on and off, just the enable bits. The power may - be tripped in extreme circumstances. I guess that here we want to report any - reason for the motors not working, which includes power and enable - - """ - if self._gg_version < 10: - return self._green_giant.enable_12v - else: - return self._green_giant.get_motorpwr() and self._green_giant.get_enable() - - @enable_motors.setter - def enable_motors(self, on): - """An nice alias for set_12v""" - if self._version < 10: - return self._green_giant.enable_motors(on) - - @property - def enable_12v(self): - return self._green_giant.get_12v_acc_power() - - @enable_12v.setter - def enable_12v(self, on): - self._green_giant.set_12v_acc_power(on) - - @property - def enable_5v(self): - return self._green_giant.get_5v_acc_power() - - @enable_5v.setter - def enable_5v(self, on): - self._green_giant.set_5v_acc_power(on) - - def stop(self): - """Stops the robot and cuts power to the motors. - - does not touch the servos position. - """ - self.enable_12v = False - self.motors.stop() - - def _parse_cmdline(self): - """Parse the command line arguments""" - parser = optparse.OptionParser() - - parser.add_option("--usbkey", type="string", dest="usbkey", - help="The path of the (non-volatile) user USB key") - - (options, _) = parser.parse_args() - - self.usbkey = options.usbkey - - def _wait_start_blink(self): - """Blink status LED until start is pressed""" - v = False - while not self._start_pressed: - time.sleep(0.2) - self._green_giant.set_user_led(v) - v = not v - if self._gg_version < 10: - # on GG keep main LED on to show the device is running - self._green_giant.set_user_led(True) - else: - # for PiLow the board has its own Power LED, so this can be used by users - self._green_giant.set_user_led(False) - - def _get_start_info(self): - """Get the start infomation from the named pipe""" - - # This call blocks until the start info is read - settings = self._json_reader.read() - - assert "zone" in settings, "zone must be in startup info" - if settings["zone"] not in range(4): - raise ValueError( - "zone must be in range 0-3 inclusive -- value of %i is invalid" - % settings["zone"]) - settings["zone"] = TEAM[f"T{settings['zone']}"] - - self._start_pressed = True - - return settings - - def wait_start(self): - """Wait for the start signal to happen""" - - blink_thread = threading.Thread(target=self._wait_start_blink) - blink_thread.start() - - _logger.info("\nWaiting for start signal...") - - # This blocks till we get start info - start_info = self._get_start_info() - - _logger.info("Robot started!\n") - - return start_info - - def set_user_led(self, val=True): - self._green_giant.set_user_led(val) - - def see(self) -> vision.Detections: - """Take a photo, detect markers in sene, attach RoboCon specific - properties""" - return self._vision.detect_markers() - - def __del__(self): - """Frees hardware resources held by the vision object""" - logging.warning("Destroying robot object") - # If vision never was initialled this creates confusing errors - # so check that it is initialled first - if hasattr(self, "_vision"): - self._vision.stop() - type(self)._initialised = False diff --git a/setup.py b/setup.py index 7c93982..10c7853 100644 --- a/setup.py +++ b/setup.py @@ -1,12 +1,9 @@ -from setuptools import setup +from setuptools import setup, find_packages setup( - name="robot", + name="robocon", version="2024.1", - packages=["robot"], - - install_requires=[ - ], + packages=find_packages(), author="Skyler Grey", author_email="skyler3665@gmail.com",