-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhome.py
More file actions
executable file
·282 lines (239 loc) · 7.38 KB
/
home.py
File metadata and controls
executable file
·282 lines (239 loc) · 7.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#!/usr/bin/python2
from flask import Flask, render_template, make_response, jsonify
import datetime, random, time
from threading import Timer, Thread
import RPi.GPIO as GPIO
import serial
#from flask_cors import CORS
import logging, chromalog, re
import random
from serial import SerialException
chromalog.basicConfig(level=logging.DEBUG) #For colored debug terminal #change from .INFO to .DEBUG for more detail
logger = logging.getLogger()
app = Flask(__name__)
#CORS(app, resources=r'/pump/(on|/off)')
ser=serial.Serial('/dev/serial0',4800, timeout=5)
#Setup GPIO
GPIO.setwarnings(False) #remove for debugging
GPIO.setmode(GPIO.BCM)
GPIO.setup(23,GPIO.IN,pull_up_down=GPIO.PUD_UP)
GPIO.setup(24,GPIO.OUT, initial=GPIO.LOW)
GPIO.setup(27,GPIO.OUT, initial=GPIO.HIGH) #Relay default:off (active low)
#setup serial
def update_thread():
logger.debug( "update thread active")
line="Default"
try:
ser.flushInput()
time.sleep(0.6)
line=ser.readline()
except UnicodeDecodeError: #never seems to trigger, but supresses warning
logger.warning("Invalid Unicode Characters recived")
except SerialException:
logger.warning("Serial Exception raised:")
except KeyboardInterrupt:
logger.info("Keyboard Exception caught by Serial Thread")
sys.exit()
#try:
line=re.sub(r'[^\x00-\x7f]',r'', line) #uncomment to try to filter out invalid unicode characters
logger.debug(line)
#except UnicodeDecodeError:
# print line
#except Exception:
# print ("unexpected exception")
#linedata = line.split()
#print linedata
#if (linedata[2]=="TouchValue:"):
if "TouchValue: " in line:
linenum=line.split("TouchValue: ",1)[1]
try:
global touchval
touchval = int(linenum)
logger.info( "valid number recived: "+str(touchval))
except ValueError:
logger.warning( "invalid number: "+linenum)
else:
logger.warning ("invalid string recived")
t=Timer(15.0,update_thread)
t.start()
def test_callback(channel): #inverted PICAXE serial input, no longer needed
#GPIO.output(24, not GPIO.input(23))
#print "callback activated"
pass #todo: remove function entirely, remove call of it
if __name__ == "__main__":
#initializing global variables
global count
count = 0
global touchval
touchval=-1
global pumpstate
pumpstate='off (default)'
global cycleid
cycleid=0
def timeoff(*mycycleid):
global touchval
global secson
global secsoff
global pumpstate
global cycleid
if (cycleid != mycycleid[0]):
logger.warning("cycle id changed, currently " + str(cycleid) + "; old cycle " + str(mycycleid[0]) + " terminating")
return 2
elif (secsoff==0):
logger.info("time off set to zero, ending cycle")
GPIO.output(27,GPIO.HIGH)
pumpstate="off (cycle ended)"
return 3
elif ("cycle" not in pumpstate): #and (touchval>9000)):
logger.warning("timer cycle terminated; mode not set to cycle") #todo: specifiy cause of termination (water level or alternate command)
return 1
else:
GPIO.output(27,GPIO.HIGH)
pumpstate="cycle off"
logger.info("starting off cycle of " + str(secsoff) + "seconds, with cycleID " + str(mycycleid[0]) + ".")
t=Timer(secsoff,timeon,[cycleid])
t.start()
return 0
def timeon(*mycycleid):
global touchval
global secson
global secsoff
global pumpstate
global cycleid
if (cycleid != mycycleid[0]):
logger.warning("cycle id changed, currently " + str(cycleid) + "; old cycle " + str(mycycleid[0]) + " terminating")
return 2
elif (secson==0):
logger.info("time on set to zero, ending cycle")
GPIO.output(27,GPIO.LOW)
pumpstate="on (cycle ended)"
return 3
elif (("cycle" not in pumpstate) ):#and (touchval>9000)): # uncomment when in action, commented for testing.
logger.warning(pumpstate)
logger.warning("timer cycle terminated; mode no longer set to cycle") #todo: specifiy cause of termination (water level or alternate command)
return 1
else:
GPIO.output(27,GPIO.LOW)
pumpstate="cycle on"
logger.info("starting on cycle of " + str(secson) + "seconds, with cycleID " + str(mycycleid[0]) + ".")
t=Timer(secson,timeoff,[cycleid])
t.start()
return 0
@app.route("/pump/cycle/<hrson>/<hrsoff>")
def pumpduty(hrson,hrsoff):
global secson
secson=float(hrson)*60*60
global secsoff
secsoff=float(hrsoff)*60*60
global pumpstate
global cycleid
cycleid=random.randrange(10000)
pumpstate="cycle"
logger.info("starting timed cycle of "+str(hrson)+" Hours on and "+str(hrsoff)+" hours off with cycleID "+str(cycleid)+".")
if (secson != 0):
timeon(cycleid)
else:
timeoff(cycleid)
templateData = {
'state' : 'Cycling'
}
return render_template("pumpset.html", **templateData)
@app.route("/pump/cyclem/<hrson>/<hrsoff>")
def pumpdutym(hrson,hrsoff):
global secson
secson=float(hrson)*60
global secsoff
secsoff=float(hrsoff)*60
global pumpstate
pumpstate="cycle"
global cycleid
cycleid=random.randrange(10000)
logger.info("starting timed cycle of "+str(hrson)+" Minutes on and "+str(hrsoff)+" Minutes off with cycleID "+str(cycleid)+".")
if (secson != 0):
timeon(cycleid)
else:
timeoff(cycleid)
templateData = {
'state' : 'Cycling'
}
return render_template("pumpset.html", **templateData)
@app.route("/pump/on")
def pumpon():
GPIO.output(27,GPIO.LOW)
global pumpstate
pumpstate='on'
templateData = {
'state' : 'on'
}
return render_template("pumpset.html", **templateData)
@app.route("/pump/off")
def pumpoff():
GPIO.output(27,GPIO.HIGH)
global pumpstate
pumpstate="off"
templateData = {
'state' : 'off'
}
return render_template("pumpset.html", **templateData)
@app.route("/json")
def jsonout():
now = datetime.datetime.now()
timeString = now.strftime("%Y-%m-%d %I:%M:%S %p")
#poollevel = random.randrange(100)
minlevel=8500 #measured value 8400, rounded for simple math
maxlevel=18500 #measured value 18700, rounded
global touchval
poollevel = (touchval-minlevel)/100
global count
count=count+1
global pumpstate
global cycleid
return jsonify(
title = 'Pool Data',
time = timeString,
poollevel = poollevel,
count = count,
touchval = touchval,
pumpstate = pumpstate,
cycleid = cycleid
)
@app.route("/")
def index():
now = datetime.datetime.now()
timeString = now.strftime("%Y-%m-%d %I:%M:%S %p")
#poollevel = random.randrange(100)
minlevel=8500 #measured value 8400, rounded for simple math
maxlevel=18500 #measured value 18700, rounded
global touchval
poollevel = (touchval-minlevel)/100
global count
count=count+1
global pumpstate
global cycleid
templateData = {
'title' : 'Pool Data',
'time' : timeString,
'poollevel' : poollevel,
'count' : count,
'touchval' : touchval,
'pumpstate' : pumpstate,
'cycleid' : cycleid
}
resp=make_response(render_template('index.html', **templateData))
resp.headers['Cache-Control'] = 'no-cache,no-store,must-revalidate'
return resp
@app.route('/test')
def testpage():
return "Hello World! <br> Sucessful Test! <br> <a href='/'> Back Home </a>"
#setup GPIO callbacks
GPIO.add_event_detect(23,GPIO.BOTH,callback=test_callback)
try:
if __name__ == "__main__":
updateThread = Thread(target = update_thread)
updateThread.start()
app.run(debug=True,port=80, host='0.0.0.0')
except KeyboardInterrupt:
GPIO.cleanup() #clean up gpio if ctrl+c exited
logger.info("Recived Keyboard interrupt, quitting threads.")
raise
GPIO.cleanup() #clean up GPIO if normally exited