-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathplanner.py
More file actions
338 lines (275 loc) · 11.2 KB
/
planner.py
File metadata and controls
338 lines (275 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import math
import subprocess
from logger import log
from hardware.sensor.LDR import read_ldr
from hardware.actuator.LED import set_led
from hardware.sensor.soundSensor import get_random_sound_value
from hardware.actuator.LCD_Display import *
from hardware.sensor.temperature import read_temperature
from hardware.actuator.fan import control_fan_based_on_temperature
from hardware.sensor.ultrasonic import read_ultrasonic
from software.weather_api import get_weather
from software.mold_risk import check_mold_risk
from hardware.actuator.LCD_Display import *
from software.actuators.email import send_email_alert
from mqtt.mqtt_client import mqtt_callback
import requests
def get_sensor_data_and_create_problem_file():
print("Inside sensor data code")
# Light
raw_light, intensity = read_ldr()
# Temp and Humidity
temp, humidity = read_temperature()
# ultrasonic
distance = read_ultrasonic()
# sound
sound_value = get_random_sound_value()
# Temp/Humidity from weather api
api_temp, api_humidity = get_weather()
# Mold risk - compound sensor
# risk, mold_risk_level = 1,2
risk, mold_risk_level = check_mold_risk(temp, humidity, api_temp, api_humidity)
#Sensor data
sensor_data= {
'inside_temperature': temp,
'inside_humidity': humidity,
'raw_light': raw_light,
'ultrasonic': distance,
'sound': sound_value,
'outside_temperature': api_temp,
'outside_humidity': api_humidity,
'mold_risk_level': risk
}
print("Going to send mqtt")
mqtt_callback(sensor_data, "smartstacks/sensors")
if not (math.isnan(temp) or math.isnan(humidity)):
print("Going to send to planner")
print(temp, humidity, raw_light, sound_value, mold_risk_level, distance)
write_problem_pddl(temp, humidity, raw_light, sound_value, mold_risk_level, distance)
if (math.isnan(temp)):
log("Error: Temp is NaN")
if (math.isnan(humidity)):
log("Error: Humidity is NaN")
if (math.isnan(raw_light)):
log("Error: Light is NaN")
if (math.isnan(sound_value)):
log("Error: Sound is NaN")
if (math.isnan(mold_risk_level)):
log("Error: Mold Risk is NaN")
if (distance == 0 or math.isnan(distance) or distance == 65535):
log("Error: Distance is NaN")
def write_problem_pddl(temp, humidity, raw_light, sound_value, mold_risk_level, distance):
filepath = 'ai_planning/problem.pddl'
with open(filepath, 'w') as f:
f.write(f"""
(define (problem smart-library)
(:domain environment-control)
(:objects
room - location
)
(:init
(= (temperature room) {temp})
(= (humidity room) {humidity})
(= (light-level room) {raw_light})
(= (sound-level room) {sound_value})
(= (mold-risk room) {mold_risk_level})
(= (ultrasonic-distance room) {distance})
)
(:goal
(and
(ideal-env)
)
)
)
""")
print("Numeric values written to problem.pddl:")
print(f" temperature: {temp}")
print(f" humidity: {humidity}")
print(f" light level: {raw_light}")
print(f" sound level: {sound_value}")
print(f" mold risk: {mold_risk_level}")
print(f" distance: {distance}")
def send_pddl_files_and_get_plan(domain_file_path, problem_file_path):
server_url='http://127.0.0.1:5000/plan'
try:
with open(domain_file_path, 'rb') as domain_file, open(problem_file_path, 'rb') as problem_file:
files = {
'domain': domain_file,
'problem': problem_file
}
response = requests.post(server_url, files=files)
response.raise_for_status() # Raise error for HTTP failure
data = response.json()
full_output = data.get("plan", [])
# print("Full planner output:", full_output)
plan_steps = []
# Extract only plan steps (ignore log lines)
plan_steps = [line for line in full_output if not any(keyword in line.lower() for keyword in [
"normalizing", "instantiating", "generating", "building", "translating",
"writing", "done", "cpu", "wall-clock", "axiom", "mutex", "fact", "group", "queue", "removed"
]) and line.strip()]
return plan_steps
except requests.exceptions.RequestException as e:
print("Error sending PDDL files:", e)
return []
def extract_plan_lines(planner_output):
plan_lines = []
for line in planner_output:
stripped = line.strip().lower()
if not stripped:
continue
if any(stripped.startswith(prefix) for prefix in [
'normalizing', 'instantiating', 'generating', 'computing',
'building', 'processing', 'detecting', 'reordering',
'translator', 'done', 'remove', 'writing'
]):
continue
if any(kw in stripped for kw in ['cpu', 'wall-clock', 'mutex', 'axiom', 'proposition']):
continue
# Accept plan lines like "0: ACTION ARGS"
if stripped[0].isdigit() and ':' in stripped:
plan_lines.append(line.strip())
return plan_lines
def parse_plan_response(plan_lines):
parsed_plan = []
for line in plan_lines:
line = line.strip()
if not line or line.startswith(";"):
continue
# Remove step number prefix if present (e.g., "1: ACTION ARGS")
if ':' in line:
line = line.split(':', 1)[1].strip()
# Standardize case (optional — remove `.lower()` if you want case preserved)
line = line.lower()
# Parse action and arguments
if '(' in line and ')' in line:
action = line.split('(')[0].strip()
args = line[line.find('(') + 1:line.find(')')].split()
else:
parts = line.split()
if not parts:
continue
action = parts[0]
args = parts[1:]
parsed_plan.append(action)
return parsed_plan
def execute_plan(parsed_plan):
proper_actions = ["adjust-light-to-level-three",
"adjust-light-to-level-two",
"adjust-light-to-level-one",
"turn-on-light-to-level-one-very-dark",
"turn-on-light-to-level-one-dark",
"turn-off-light",
"turn-on-fan-to-level-three",
"turn-on-fan-to-level-two",
"turn-on-fan-to-level-one",
"turn-on-fan-to-reduce-humidity",
"turn-off-fan",
"display-quiet-in-lcd-display",
"display-normal-in-lcd-display",
"display-loud-in-lcd-display",
"turn-off-lcd-display",
"display-seat-occupied",
"send-email-for-high-mold-index"
]
mqtt_plan = []
actuator_data = {}
for action_name in parsed_plan:
if action_name in proper_actions:
# print(f"Executing action: {action_name}")
execute_actions(action_name, actuator_data)
mqtt_plan.append(action_name)
# print(f"MQTT plan : {mqtt_plan}")
mqtt_callback(mqtt_plan, "smartstacks/plan")
# Also handle how to send actutator status to MQTT broker
print(f"Actuator : {actuator_data}")
mqtt_callback(actuator_data, "smartstacks/actuators")
def execute_actions(action_name, actuator_data):
if action_name == "adjust-light-to-level-three":
print(f"Executing action: {action_name}")
set_led(3)
actuator_data['light_level'] = 3
elif action_name == "adjust-light-to-level-two":
print(f"Executing action: {action_name}")
set_led(2)
actuator_data['light_level'] = 2
elif action_name in ["turn-on-light-to-level-one-very-dark", "turn-on-light-to-level-one-dark", "adjust-light-to-level-one"]:
set_led(1)
print(f"Executing action: {action_name}")
actuator_data['light_level'] = 1
elif action_name == "turn-off-light":
set_led(0)
print(f"Executing action: {action_name}")
actuator_data['light_level'] = 0
elif action_name == "turn-on-fan-to-level-three":
print(f"Executing action: {action_name}")
actuator_data['fan_speed'] = 3
elif action_name == "turn-on-fan-to-level-two":
print(f"Executing action: {action_name}")
actuator_data['fan_speed'] = 2
elif action_name == "turn-on-fan-to-level-one":
print(f"Executing action: {action_name}")
actuator_data['fan_speed'] = 1
elif action_name == "turn-on-fan-to-reduce-humidity":
actuator_data['fan_speed'] = 3
print(f"Executing action: {action_name}")
elif action_name == "turn-off-fan":
print(f"Executing action: {action_name}")
actuator_data['fan_speed'] = 0
elif action_name == "display-quiet-in-lcd-display":
print(f"Executing action: {action_name}")
setRGB(0,255,0)
actuator_data['sound_level'] = "Quiet"
sound_level = "Quiet"
setText(sound_level)
elif action_name == "display-normal-in-lcd-display":
print(f"Executing action: {action_name}")
setRGB(125,125,125)
sound_level = "Normal"
actuator_data['sound_level'] = "Normal"
setText(sound_level)
elif action_name == "display-loud-in-lcd-display":
print(f"Executing action: {action_name}")
setRGB(255,0,0)
sound_level = "Loud"
actuator_data['sound_level'] = "Loud"
setText(sound_level)
elif action_name == "turn-off-lcd-display":
print(f"Executing action: {action_name}")
setRGB(0, 0, 0)
setText("")
actuator_data['sound_level'] = "Off"
elif action_name == "display-seat-occupied":
print(f"Executing action: {action_name}")
actuator_data['occupied'] = True
elif action_name == "send-email-for-high-mold-index":
print(f"Executing action: {action_name}")
send_email_alert(
subject=" Mold Risk Alert - High",
body="Mold risk has reached a high level in the Smart Library. Immediate action is recommended!"
)
else:
print(f"Unknown action: {action_name}")
def run_planner():
try:
while True:
domain = 'ai_planning/domain.pddl'
problem = 'ai_planning/problem.pddl'
# Get sensor data and create problem file
get_sensor_data_and_create_problem_file()
# Send PDDL files and get plan
planner_output = send_pddl_files_and_get_plan(domain, problem)
if not planner_output:
print("No plan recieved. Environment is in ideal conditions")
time.sleep(5)
continue
plan_lines = extract_plan_lines(planner_output)
parsed_plan = parse_plan_response(plan_lines)
print("Actions to be performed:", parsed_plan)
execute_plan(parsed_plan)
time.sleep(5)
except Exception as e:
print(f"Planner error: {e}")
log(f"Planner error: {e}")
# Sleep or wait for next iteration
# time.sleep(5) # Adjust as needed