-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathmqtt-data-logger.py
More file actions
258 lines (226 loc) · 7.18 KB
/
mqtt-data-logger.py
File metadata and controls
258 lines (226 loc) · 7.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#!c:\python34\python.exe
#!/usr/bin/env python
#If Running in Windows use top line and edit according to your python
#location and version. If running on Linux delete the top line.
###demo code provided by Steve Cope at www.steves-internet-guide.com
##email steve@steves-internet-guide.com
###Free to use for any purpose
"""
This will log messages to file.Los time,message and topic as JSON data
"""
#updated 18-oct-2023
mqttclient_log=False #MQTT client logs showing messages
Log_worker_flag=True
import paho.mqtt.client as mqtt
import json
import os
import time
import sys, getopt,random
import logging
import mlogger as mlogger
import threading
from queue import Queue
from command import command_input
import command
import sys
print("Python version is", sys.version_info)
q=Queue()
##helper functions
def convert(t):
d=""
for c in t: # replace all chars outside BMP with a !
d =d+(c if ord(c) < 0x10000 else '!')
return(d)
###
class MQTTClient(mqtt.Client):#extend the paho client class
run_flag=False #global flag used in multi loop
def __init__(self,cname,**kwargs):
super(MQTTClient, self).__init__(cname,**kwargs)
self.last_pub_time=time.time()
self.topic_ack=[] #used to track subscribed topics
self.run_flag=True
self.submitted_flag=False #used for connections
self.subscribe_flag=False
self.bad_connection_flag=False
self.bad_count=0
self.connected_flag=False
self.connect_flag=False #used in multi loop
self.disconnect_flag=False
self.disconnect_time=0.0
self.pub_msg_count=0
self.pub_flag=False
self.sub_topic=""
#self.sub_topics=["tele/#","stat/#"] #multiple topics
self.sub_qos=0
self.devices=[]
self.broker="mqtt2.home"
self.port=1883
self.keepalive=60
self.run_forever=False
self.cname=""
self.delay=10 #retry interval
self.retry_time=time.time()
def Initialise_clients(cname,mqttclient_log=False,cleansession=True,flags=""):
#flags set
print("initialising clients")
logging.info("initialising clients")
client= MQTTClient(cname,clean_session=cleansession)
client.cname=cname
client.on_connect= on_connect #attach function to callback
client.on_message=on_message #attach function to callback
#client.on_disconnect=on_disconnect
#client.on_subscribe=on_subscribe
if mqttclient_log:
client.on_log=on_log
return client
def on_connect(client, userdata, flags, rc):
"""
set the bad connection flag for rc >0, Sets onnected_flag if connected ok
also subscribes to topics
"""
logging.debug("Connected flags"+str(flags)+"result code "\
+str(rc)+"client1_id")
if rc==0:
client.connected_flag=True #old clients use this
client.bad_connection_flag=False
if client.sub_topic!="": #single topic
print("subscribing "+str(client.sub_topic))
print("subscribing in on_connect ")
topic=client.sub_topics
if client.sub_qos!=0:
client.subscribe(topic,qos)
elif client.sub_topics!="":
print("subscribing in on_connect multiple",client.sub_topics)
client.subscribe(client.sub_topics)
else:
print("set bad connection flag")
client.bad_connection_flag=True #
client.bad_count +=1
client.connected_flag=False #
def on_message(client,userdata, msg):
topic=msg.topic
m_decode=str(msg.payload.decode("utf-8","ignore"))
message_handler(client,m_decode,topic)
#print("message received")
def message_handler(client,msg,topic):
data=dict()
tnow=time.ctime()
#m=time.asctime(tnow)+" "+topic+" "+msg
try:
msg=json.loads(msg)#convert to Javascript before saving
#print("json data")
except:
pass
#print("not already json")
data["time"]=tnow
data["time_ms"]=time.time()
data["topic"]=topic
if csv_flag:
try:
keys=msg.keys()
for key in keys:
data[key]=msg[key]
except:
data["message"]=msg
else:
data["message"]=msg
#data["message"]=msg
if command.options["storechangesonly"]:
if has_changed(client,topic,msg):
client.q.put(data) #put messages on queue
else:
client.q.put(data) #put messages on queue
def has_changed(client,topic,msg):
topic2=topic.lower()
if topic2.find("control")!=-1:
return False
if topic in client.last_message:
if client.last_message[topic]==msg:
return False
client.last_message[topic]=msg
return True
###
def log_worker():
"""runs in own thread to log data from queue"""
while Log_worker_flag:
time.sleep(0.01)
while not q.empty():
results = q.get()
if results is None:
continue
if csv_flag:
log.log_csv(results)
else:
log.log_json(results)
#print("message saved ",results["message"])
log.close_file()
# MAIN PROGRAM
options=command.options
if __name__ == "__main__" and len(sys.argv)>=2:
options=command_input(options)
else:
print("Need broker name and topics to continue.. exiting")
#raise SystemExit(1)
if not options["cname"]:
r=random.randrange(1,10000)
cname="logger-"+str(r)
else:
cname="logger-"+str(options["cname"])
log_dir=options["log_dir"]
log_records=options["log_records"]
number_logs=options["number_logs"]
log=mlogger.m_logger(log_dir,log_records,number_logs) #create log object
print("Log Directory =",log_dir)
print("Log records per log =",log_records)
if number_logs==0:
print("Max logs = Unlimited")
else:
print("Max logs =",number_logs)
logging.info("creating client"+cname)
client=Initialise_clients(cname,mqttclient_log,False)#create and initialise client object
if options["username"] !="":
client.username_pw_set(options["username"], options["password"])
client.sub_topics=options["topics"]
client.broker=options["broker"]
client.port=options["port"]
json_flag=False
csv_flag=False
if options["JSON"]:
print("Logging JSON format")
json_flag=True
#note for csv data logging input data must be in json format
if options["CSV"]:
csv_flag=True
print("Logging CSV format")
if options["CSV"]==False and options["JSON"]==False:
print("logging plain data")
if options["storechangesonly"]:
print("starting storing only changed data")
else :
print("starting storing all data")
##
#Log_worker_flag=True
t = threading.Thread(target=log_worker) #start logger
t.start() #start logging thread
###
client.last_message=dict()
client.q=q #make queue available as part of client
print("topics",options["topics"])
try:
res=client.connect(client.broker,client.port) #connect to broker
client.loop_start() #start loop
except:
print("connection to ",client.broker," failed")
logging.debug("connection to ",client.broker," failed")
raise SystemExit("connection failed")
print("connected")
try:
while True:
time.sleep(1)
pass
except KeyboardInterrupt:
print("interrrupted by keyboard")
print("end")
client.loop_stop() #start loop
Log_worker_flag=False #stop logging thread
time.sleep(5)