-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmicrobir_serial.py
More file actions
157 lines (130 loc) · 4.98 KB
/
microbir_serial.py
File metadata and controls
157 lines (130 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import sys
import glob
import serial
import threading
from pynput.keyboard import Key, Controller
class MicrobitSerial:
"""
This class represents a serial connection with a microbit to receive and send commands.
Attributes:
- port (str) - The serial port to which the microbit is connected.
- baudrate (int) - The communication baud rate.
- timeout (int) - The timeout to receive data.
- state (bool) - The state of the serial connection.
- serial_thread - The thread of execution for serial communication.
- commands_event (dict) - The dictionary of events and their associated commands.
- NO_PORT (str) - Default port if no valid port exists
- keyboard (Controller) - The keyboard event handler.
Methods:
- set_port(port: str) -> None - Sets the serial port.
- set_baudrate(baudrate: int) -> None - Sets the baudrate of the communication.
- set_timeout(timeout: int) -> None - Sets the timeout.
- set_commands_event(commands: dict) - Sets the event commands.
- port_scan() -> list - Scans the available serial ports on the system.
- press(key: str) -> None - Simulates pressing a key on the keyboard.
- start_serial_communication() - Starts serial communication.
- stop_serial_communication() - Stops serial communication.
- _serial_worker() - Private method for the serial communication thread.
- _run_command(new_event: int) - Private method to execute the commands associated with the events.
"""
def __init__(self, port:str = "--"):
self.port = port
self.baudrate:int = 115200
self.timeout:int = 0.01
self.state:bool = False
self.serial_thread = None
self.commands_event:dict = None
self.NO_PORT = "--"
self.keyboard:Controller = Controller()
# -- SETTERS
def set_port(self, port:str) -> None:
"""
Sets the serial port.
"""
self.port = port
def set_baudrate(self,baudrate:int) -> None:
"""
Sets the baudrate of the communication.
"""
self.baudrate = baudrate
def set_timeout(self, timeout:int) -> None:
"""
Sets the timeout.
"""
self.timeout = timeout
def set_commands_event(self, commands:dict):
"""
Sets the event commands.
"""
self.commands_event = dict(commands)
# -- UTILITY
def port_scan(self) -> list:
"""
Scans the available serial ports on the system.
"""
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/tty[A-Za-z]*')
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.*')
else:
raise EnvironmentError('Unsupported platform')
result = []
for port in ports:
try:
s = serial.Serial(port)
s.close()
result.append(port)
except (OSError, serial.SerialException):
pass
if len(result) == 0:
result.append(self.NO_PORT)
return result
def press(self, key:str) -> None:
"""
Simulates pressing a key on the keyboard.
"""
self.keyboard.press(key)
self.keyboard.release(key)
# -- Serial comuni ation
def start_serial_communication(self):
"""
Starts serial communication.
"""
if not self.state:
self.state = True
self.serial_thread = threading.Thread(target=self._serial_worker)
self.serial_thread.start()
def stop_serial_communication(self) -> bool:
"""
Stops serial communication.
"""
self.state = False
if self.serial_thread is not None:
self.serial_thread.join()
return True
return False
def _serial_worker(self):
"""
Private method for the serial communication thread.
"""
try:
with serial.Serial(self.port, self.baudrate, timeout=self.timeout) as socket:
while self.state:
if socket.in_waiting > 0:
try:
event = int(socket.readline().decode().strip())
except:
event = 0
self._run_command(event)
except Exception as e:
print("Thread error: ",e)
def _run_command(self, new_event:int):
"""
Private method for the serial communication thread.
"""
for event, command in self.commands_event.items():
if new_event == event:
self.press(command)