-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmain.py
More file actions
64 lines (52 loc) · 2.08 KB
/
main.py
File metadata and controls
64 lines (52 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import ssl
from ssl import SSLContext
from sensor_simulation import SensorManager
from sensor_simulation.clients import MqttClient
from sensor_simulation.sensors import (
TemperatureSensor,
HumiditySensor,
WaterLevelSensor,
LightIntensitySensor,
PhSensor
)
from sensor_simulation.config.parsers import TomlParser
from sensor_simulation.config.logger import configure_logger
def ssl_alpn(alpn_context: str, ca_path: str, cert_path: str, key_path: str) -> SSLContext:
try:
ssl_context = ssl.create_default_context()
ssl_context.set_alpn_protocols([alpn_context])
ssl_context.load_verify_locations(cafile=ca_path)
ssl_context.load_cert_chain(certfile=cert_path, keyfile=key_path)
return ssl_context
except Exception as e:
print("exception ssl_alpn()")
raise e
if __name__ == '__main__':
parser = TomlParser('aws_config.toml')
parser.read_config()
general_config = parser.get_general_config()
mqtt_client = MqttClient(general_config['broker_address'], general_config['broker_port'])
context = ssl_alpn(
general_config['alpn_context'],
general_config['ca_path'],
general_config['cert_path'],
general_config['key_path']
)
mqtt_client.tls_set_context(context=context)
mqtt_client.connect()
mqtt_client.loop_start()
temperature_sensor = TemperatureSensor(mqtt_client, 'sensor/temperature', 1)
humidity_sensor = HumiditySensor(mqtt_client, 'sensor/humidity', 2)
water_level_sensor = WaterLevelSensor(mqtt_client, 'sensor/water_level', 3)
light_intensity_sensor = LightIntensitySensor(mqtt_client, 'sensor/light_intensity', 4)
ph_sensor = PhSensor(mqtt_client, 'sensor/ph', 5)
sensor_manager = SensorManager()
sensor_manager.add_sensor(temperature_sensor)
sensor_manager.add_sensor(humidity_sensor)
sensor_manager.add_sensor(water_level_sensor)
sensor_manager.add_sensor(light_intensity_sensor)
sensor_manager.add_sensor(ph_sensor)
try:
sensor_manager.run()
except KeyboardInterrupt:
sensor_manager.stop_all()