-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathmain.py
More file actions
125 lines (96 loc) · 3.04 KB
/
main.py
File metadata and controls
125 lines (96 loc) · 3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Derived from:
# * https://github.com/peterhinch/micropython-async/blob/master/v3/as_demos/auart.py
# * https://github.com/tve/mqboard/blob/master/mqtt_async/hello-world.py
# * https://github.com/peterhinch/micropython-mqtt
# * https://github.com/embedded-systems-design/external_pycopy-lib
import ssl
from mqtt_as.mqtt_as import MQTTClient
from mqtt_as.mqtt_local import wifi_led, blue_led, config
import uasyncio as asyncio
from machine import UART
import time
from config import *
MAXTX = 4
uart = UART(2, 9600,tx=17,rx=16)
uart.init(9600, bits=8, parity=None, stop=1,flow=0) # init with given parameters
async def receiver():
b = b''
sreader = asyncio.StreamReader(uart)
while True:
res = await sreader.read(1)
if res==b';':
b+=res
await client.publish(TOPIC_PUB, b, qos=1)
print('published', b)
b = b''
else:
b+=res
# Subscription callback
def sub_cb(topic, msg, retained):
print(f'Topic: "{topic.decode()}" Message: "{msg.decode()}" Retained: {retained}')
uart.write(msg)
# time.sleep(.01)
# Demonstrate scheduler is operational.
async def heartbeat():
s = True
while True:
await asyncio.sleep_ms(500)
blue_led(s)
s = not s
async def wifi_han(state):
wifi_led(not state)
print('Wifi is ', 'up' if state else 'down')
await asyncio.sleep(1)
# If you connect with clean_session True, must re-subscribe (MQTT spec 3.1.2.4)
async def conn_han(client):
await client.subscribe(TOPIC_SUB, 1)
async def main(client):
try:
await client.connect()
except OSError:
print('Connection failed.')
return
asyncio.create_task(receiver())
n = 0
while True:
await asyncio.sleep(5)
print('publish', n)
# If WiFi is down the following will pause for the duration.
await client.publish(TOPIC_HB, '{} {}'.format(n, client.REPUB_COUNT), qos = 1)
n += 1
# Define configuration
config['server'] = MQTT_SERVER
config['ssid'] = WIFI_SSID
config['wifi_pw'] = WIFI_PASSWORD
config['ssl'] = True
# read in DER formatted certs & user key
with open('certs/student_key.pem', 'rb') as f:
key_data = f.read()
with open('certs/student_crt.pem', 'rb') as f:
cert_data = f.read()
with open('certs/ca_crt.pem', 'rb') as f:
ca_data = f.read()
ssl_params = {}
ssl_params["cert"] = cert_data
ssl_params["key"] = key_data
ssl_params["cadata"] = ca_data
ssl_params["server_hostname"] = MQTT_SERVER
ssl_params["cert_reqs"] = ssl.CERT_REQUIRED
config["time_server"] = MQTT_SERVER
config["time_server_timeout"] = 10
config['ssl_params'] = ssl_params
config['subs_cb'] = sub_cb
config['wifi_coro'] = wifi_han
config['connect_coro'] = conn_han
config['clean'] = True
config['user'] = MQTT_USER
config["password"] = MQTT_PASSWORD
# Set up client
MQTTClient.DEBUG = True # Optional
client = MQTTClient(config)
asyncio.create_task(heartbeat())
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
asyncio.new_event_loop()