forked from aimacode/aima-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvacuum.py
More file actions
572 lines (467 loc) · 22.7 KB
/
vacuum.py
File metadata and controls
572 lines (467 loc) · 22.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
'''
This is hosted on github.
This is heavily based on the example from Artificial Intelligence: A Modern Approach located here:
http://aima.cs.berkeley.edu/python/agents.html
http://aima.cs.berkeley.edu/python/agents.py
'''
import inspect
from utils import *
import random, copy
from functools import partial
import matplotlib.pyplot as plt
# import my files
from agents import *
from objects import *
from display import *
from comms import *
'''Implement Agents and Environments (Chapters 1-2).
The class hierarchies are as follows:
Object ## A physical object that can exist in an environment
Agent
RandomReflexAgent
...
Dirt
Wall
DeadCell
Fire
...
Environment ## An environment holds objects, runs simulations
XYEnvironment
VacuumEnvironment
EnvFrame ## A graphical representation of the Environment
'''
class Environment:
"""
Abstract class representing an Environment. 'Real' Environment classes inherit from this. Your Environment will
typically need to implement:
percept: Define the percept that an agent sees.
execute_action: Define the effects of executing an action.
Also update the agent.performance slot.
The environment keeps a list of .objects and .agents (which is a subset of .objects). Each agent has a .performance
slot, initialized to 0. Each object has a .location slot, even though some environments may not need this.
"""
def __init__(self,):
self.t = 0
self.objects = []
self.agents = []
self.perceptors = {}
self.communicator = None
# Mark: What does this do? It isn't checked in the Environment class's add_object.
object_classes = [] ## List of classes that can go into environment
def percept(self, agent):
agentpercept = {} # initialize the percept dictionary
for per in agent.perceptorTypes: # for each perceptor in agent
# calculate the percept value for the perceptor and append to the percept dictionary
agentpercept.update(self.perceptors[per.__name__].percept(agent))
return agentpercept
def execute_action(self, agent, action):
"Change the world to reflect this action. Override this."
raise NotImplementedError
def default_location(self, obj):
"Default location to place a new object with unspecified location"
return None
def exogenous_change(self):
"If there is spontaneous change in the world, override this."
pass
def is_done(self):
"By default, we're done when we can't find a live agent."
for agent in self.agents:
if agent.is_alive(): return False
return True
def step(self):
'''Run the environment for one time step. If the
actions and exogenous changes are independent, this method will
do. If there are interactions between them, you'll need to
override this method.'''
if not self.is_done():
# increment time counter
self.t += 1
# for each agent
# run agent.program with the agent's preception as an input
# agent's perception = Env.precept(agent)
for a in self.agents:
a.percepts = self.percept(a)
# TODO: Implement comms
if self.communicator:
for from_agent in self.agents:
agents_seen = self.communicator.get_comms_network(from_agent)
for to_agent in agents_seen:
self.communicator.communicate(from_agent.percepts, from_agent, to_agent)
# generate actions
actions = [agent.program(agent.percepts)
for agent in self.agents]
# for each agent-action pair, have the environment process the actions
for (agent, action) in zip(self.agents, actions):
self.execute_action(agent, action)
# process any external events
self.exogenous_change()
def run(self, steps=1000):
for step in range(steps): # Run the Environment for given number of time steps.
if self.is_done(): return
self.step()
def add_object(self, obj, location=None):
'''Add an object to the environment, setting its location. Also keep track of objects that are agents.
Shouldn't need to override this.'''
obj.location = location or self.default_location(obj)
# Mark: ^^ unsure about this line, lazy evaluation means it will only process if location=None?
# Add the new Object to self.objects
self.objects.append(obj)
# If the object is an Agent, add it to self.agents and initialize performance parameter
if isinstance(obj, Agent):
obj.performance = 0
self.add_perceptor_for_agent(obj)
self.add_communicator_for_agent(obj)
self.agents.append(obj)
return obj
def add_perceptor_for_agent(self, agent):
for pertype in agent.perceptorTypes: # for each type of perceptor for the agent
if not [p for p in self.perceptors.values() if type(p) is pertype]: # if the perceptor doesn't exist yet
self.perceptors[pertype.__name__] = pertype(self) # add the name:perceptor pair to the dictionary
def add_communicator_for_agent(self, agent):
if agent.communicator:
if self.communicator: # if the communicator exists...
if not type(self.communicator) is agent.communicator:
# if the communicator exists, but is a different type, throw and error (TODO: implement multiple communicators)
raise ValueError('Communicator already exists')
else: # if communicator exists and is the same type, don't recreate, just pass
pass
else: # if it doesn't exist, create a new communicator based on the agent's communicator definition
self.communicator = agent.communicator(self) # set the communicator equal to the agent communicator
class XYEnvironment(Environment):
'''This class is for environments on a 2D plane, with locations
labelled by (x, y) points, either discrete or continuous. Agents
perceive objects within a radius. Each agent in the environment
has a .location slot which should be a location such as (0, 1),
and a .holding slot, which should be a list of objects that are
held '''
#robot_images = {(1,0):'img/robot-right.gif',(-1,0):'img/robot-left.gif',(0,-1):'img/robot-up.gif',(0,1):'img/robot-down.gif'}
def __init__(self, width=10, height=10):
# set all of the initial conditions with the update function
self.width = width
self.height = height
Environment.__init__(self)
def objects_of_type(self, cls):
# Use a list comprehension to return a list of all objects of type cls
return [obj for obj in self.objects if isinstance(obj, cls)]
def objects_at(self, location):
"Return all objects exactly at a given location."
return [obj for obj in self.objects if obj.location == location]
def find_at(self, cls, loc):
return [o for o in self.objects_at(loc) if isinstance(o, cls)]
def objects_near(self, location, radius):
"Return all objects within radius of location."
radius2 = radius * radius # square radius instead of taking the square root for faster processing
return [obj for obj in self.objects if isinstance(obj.location, tuple) and distance2(location, obj.location) <= radius2]
# def percept(self, agent): # Unused, currently at default settings
# "By default, agent perceives objects within radius r."
# return [self.object_percept(obj, agent)
# for obj in self.objects_near(agent, 3)]
def execute_action(self, agent, action):
# TODO: Add stochasticity
# TODO: Add actions on objects e.g. Grab(Target)
# The world processes actions on behalf of an agent.
# Agents decide what to do, but the Environment class actually processes the behavior.
#
# Implemented actions are 'TurnRignt', 'TurnLeft', 'Forward', 'Grab', 'Release'
if action == 'TurnRight':
# decrement the heading by -90° by getting the previous index of the headings array
agent.heading = self.turn_heading(agent.heading, -1)
elif action == 'TurnLeft':
# increment the heading by +90° by getting the next index of the headings array
agent.heading = self.turn_heading(agent.heading, +1)
elif action == 'Forward':
# move the Agent in the facing direction by adding the heading vector to the Agent location
self.move_to(agent, vector_add(agent.heading, agent.location))
elif action == 'Grab':
# check to see if any objects at the Agent's location are grabbable by the Agent
objs = [obj for obj in self.objects_at(agent.location)
if (obj != agent and obj.is_grabbable(agent))]
# if so, pick up all grabbable objects and add them to the holding array
if objs:
agent.holding += objs
for o in objs:
# set the location of the Object = the Agent instance carrying the Object
# by setting the location to an object instead of a tuple, we can now detect
# when to remove if from the display. This may be useful in other ways, if
# the object needs to know who it's holder is
o.location = agent
if isinstance(o,Dirt): agent.performance += 100
elif action == 'Release':
# drop an objects being held by the Agent.
if agent.holding:
# restore the location parameter to add the object back to the display
agent.holding.pop().location = agent.location
def default_location(self, obj):
# If no location is specified, set the location to be a random location in the Environment.
return (random.choice(self.width), random.choice(self.height))
def move_to(self, obj, destination):
"Move an object to a new location."
# Currently move_to assumes that the object is only moving a single cell at a time
# e.g. agent.location + agent.heading => (x,y) + (0,1)
#
# The function finds all objects at the destination that have the blocker flag set.
# If there are none, move to the destination
obstacles = [os for os in self.objects_at(destination) if os.blocker]
if not obstacles:
obj.location = destination
def add_object(self, obj, location=(1, 1)):
Environment.add_object(self, obj, location)
obj.holding = []
obj.held = None
return obj
def add_walls(self):
"Put walls around the entire perimeter of the grid."
for x in range(self.width-1):
self.add_object(Wall(), (x, 0))
self.add_object(Wall(), (x+1, self.height-1))
for y in range(self.height-1):
self.add_object(Wall(), (0, y+1))
self.add_object(Wall(), (self.width-1, y))
def turn_heading(self, heading, inc,
headings=[(1, 0), (0, 1), (-1, 0), (0, -1)]):
"Return the heading to the left (inc=+1) or right (inc=-1) in headings."
return headings[(headings.index(heading) + inc) % len(headings)]
#______________________________________________________________________________
## Vacuum environment
class VacuumEnvironment(XYEnvironment):
'''The environment of [Ex. 2.12]. Agent perceives dirty or clean,
and bump (into obstacle) or not; 2D discrete world of unknown size;
performance measure is 100 for each dirt cleaned, and -1 for
each turn taken.'''
def __init__(self, width=10, height=10):
XYEnvironment.__init__(self, width, height)
self.add_walls()
object_classes = []
# def percept(self, agent):
# status = if_(self.find_at(Dirt, agent.location), 'Dirty', 'Clean')
# bump = if_(agent.bump, 'Bump', 'None')
# dirts = [obj.location for obj in self.objects_of_type(Dirt) if not isinstance(obj.location, Agent)]
# return (status, bump, dirts, agent.location, agent.heading)
def execute_action(self, agent, action):
if action == 'Suck':
if self.find_at(Dirt, agent.location):
agent.performance += 100
agent.performance -= 1
XYEnvironment.execute_action(self, agent, action)
def exogenous_change(self):
pass
def NewVacuumEnvironment(width=10, height=10, config=None):
e = VacuumEnvironment(width=width, height=height)
# Generate walls with dead cells in the center
if config==None:
pass
elif config == 'shape of eight':
for x in [2,3]:
for y in [2,3]:
e.add_object(Wall(), (x,y))
for x in [2,3]:
for y in [5,6]:
e.add_object(Wall(), (x,y))
e.add_object(Dirt(),location=(4,5))
elif config=='center walls':
for x in range(int(e.width/2-5),int(e.width/2+5)):
for y in range(int(e.height/2-5),int(e.height/2+5)):
if ((x == int(e.width/2-5)) or (x == int(e.width/2+4)) or
(y == int(e.height/2-5)) or (y == int(e.height/2+4))):
e.add_object(Wall(), (x,y))
else:
e.add_object(DeadCell(), (x,y))
elif config=='full dirt':
# Fill a square area with dirt
for x in range(0,e.width):
for y in range(0,e.height):
if not e.find_at(Wall,(x,y)): e.add_object(Dirt(),location=(x,y))
# extend exogenous_change with function to detect if no dirt is left
old_exogenous_chage = e.exogenous_change
def new_exogenous_change(self):
old_exogenous_chage()
if not [d for d in self.objects_of_type(Dirt) if isinstance(d.location, tuple)]:
for a in self.agents:
a.alive = False
a.performance = self.t
e.exogenous_change = MethodType(new_exogenous_change, e)
elif config=='sparse dirt':
# Fill a square area with dirt every n cells
stp = 3
for x in range(0,e.width,stp):
for y in range(0,e.height,stp):
if not e.find_at(Wall,(x,y)): e.add_object(Dirt(),location=(x,y))
# extend exogenous_change with function to detect if no dirt is left
old_exogenous_chage = e.exogenous_change
def new_exogenous_change(self):
old_exogenous_chage()
if not [d for d in self.objects_of_type(Dirt) if isinstance(d.location, tuple)]:
for a in self.agents:
a.alive = False
a.performance = self.t
e.exogenous_change = MethodType(new_exogenous_change, e)
elif config=='random dirt':
for x in range(100):
loc = (random.randrange(width), random.randrange(width))
if not (e.find_at(Dirt, loc) or e.find_at(Wall, loc)):
e.add_object(Dirt(), loc)
# extend exogenous_change with function to detect if no dirt is left
old_exogenous_chage = e.exogenous_change
def new_exogenous_change(self):
old_exogenous_chage()
if not [d for d in self.objects_of_type(Dirt) if isinstance(d.location, tuple)]:
for a in self.agents:
a.alive = False
a.performance = self.t
e.exogenous_change = MethodType(new_exogenous_change, e)
elif config=='random dirt and wall':
for x in range(int(e.width/2-5),int(e.width/2+5)):
for y in range(int(e.height/2-5),int(e.height/2+5)):
if ((x == int(e.width/2-5)) or (x == int(e.width/2+4)) or
(y == int(e.height/2-5)) or (y == int(e.height/2+4))):
e.add_object(Wall(), (x,y))
for x in range(50):
loc = (random.randrange(width), random.randrange(width))
if not (e.find_at(Dirt, loc) or e.find_at(Wall, loc) or (loc[0] > 5 and loc[0]< 14) and loc[1] > 5 and loc[1] < 14):
e.add_object(Dirt(), loc)
elif config=='center walls w/ random dirt and fire':
for x in range(int(e.width/2-5),int(e.width/2+5)):
for y in range(int(e.height/2-5),int(e.height/2+5)):
if ((x == int(e.width/2-5)) or (x == int(e.width/2+4)) or
(y == int(e.height/2-5)) or (y == int(e.height/2+4))):
e.add_object(Wall(), (x,y))
else:
e.add_object(DeadCell(), (x,y))
# adds custom behavior to the exogenous_chage() method to avoid creating a new class
# is that correct? should we just create a new class?
def exogenous_dirt(self):
if random.uniform(0, 1) < 1.0:
loc = (random.randrange(self.width), random.randrange(self.height))
if not (self.find_at(Dirt, loc) or self.find_at(Wall, loc)):
self.add_object(Dirt(), loc)
def exogenous_fire(self):
fs = self.objects_of_type(Fire)
if fs:
for f in fs:
if f.t == 0:
f.destroy()
self.objects.remove(f)
else:
f.t -= 1
if random.uniform(0, 1) < 0.21:
emptyCells = [(x, y) for x in range(f.location[0] - 1, f.location[0] + 2)
for y in range(f.location[1] - 1, f.location[1] + 2)
if not self.objects_at((x, y))]
if emptyCells: self.add_object(Fire(), random.choice(emptyCells))
else: # if there is no fire
for i in range(5):
for i in range(10): # try 10 times, would do while, but that could get stuck
loc = (random.randrange(1, self.width), random.randrange(1, self.width))
if not self.objects_at(loc):
self.add_object(Fire(), loc)
break
old_exogenous_chage = e.exogenous_change
def new_exogenous_change(self):
old_exogenous_chage()
exogenous_dirt(self)
exogenous_fire(self)
e.exogenous_change = MethodType(new_exogenous_change, e)
return e
#______________________________________________________________________________
def compare_agents(EnvFactory, AgentFactories, n=10, steps=100):
'''See how well each of several agents do in n instances of an environment.
Pass in a factory (constructor) for environments, and several for agents.
Create n instances of the environment, and run each agent in copies of
each one for steps. Return a list of (agent, average-score) tuples.'''
envs = [EnvFactory() for i in range(n)]
return [(A, test_agent(A, steps, copy.deepcopy(envs)))
for A in AgentFactories]
def test_agent(AgentFactory, steps, envs):
"Return the mean score of running an agent in each of the envs, for steps"
total = 0
i = 0
for env in envs:
i+=1
with Timer(name='Simulation Timer - Agent=%s' % i, format='%.4f'):
agent = AgentFactory()
env.add_object(agent)
env.run(steps)
total += env.t
return float(total)/len(envs)
#______________________________________________________________________________
def test0():
e = NewVacuumEnvironment(width=20,height=20,config="random dirt")
ef = EnvFrame(e,cellwidth=30)
# Create agents on left wall
e.add_object(GreedyAgentWithRangePerception(sensor_radius=3))
ef.configure_display()
ef.run()
ef.mainloop()
def test1():
e = NewVacuumEnvironment(width=20,height=20,config="center walls w/ random dirt and fire")
ef = EnvFrame(e,cellwidth=30)
# Create agents on left wall
for i in range(1,19):
e.add_object(NewRandomReflexAgent(debug=False),location=(1,i)).id = i
ef.configure_display()
ef.run()
ef.mainloop()
def test2():
EnvFactory = partial(NewVacuumEnvironment,width=20,height=20,config="random dirt")
#AgentFactory = partial(NewGreedyAgentWithRangePerception, debug=False)
sensor_radii = range(10)
results = compare_agents(EnvFactory, [partial(NewGreedyAgentWithRangePerception, debug=False, sensor_radius=i) for i in sensor_radii], n=10, steps=2000)
print(results)
plt.plot(sensor_radii,[r[1] for r in results],'r-')
plt.xlabel('sensor radius')
plt.ylabel('time to fully clean')
plt.show()
def test3():
e = NewVacuumEnvironment(width=20,height=20,config="center walls w/ random dirt and fire")
ef = EnvFrame(e,cellwidth=30)
# Create agents on left wall
for i in range(1,19):
e.add_object(GreedyAgentWithRangePerception(sensor_radius = 6), location=(1,i)).id = i
ef.configure_display()
ef.run()
ef.mainloop()
def test4():
e = NewVacuumEnvironment(width=20,height=20,config="random dirt")
ef = EnvFrame(e,cellwidth=30)
# Create agents on left wall
for i in range(1,5):
e.add_object(GreedyAgentWithRangePerception(sensor_radius = 6, communication = True), location=(1,i * 3)).id = i
ef.configure_display()
ef.run()
ef.mainloop()
def test5():
EnvFactory = partial(NewVacuumEnvironment,width=20,height=20,config="random dirt")
envs = [EnvFactory() for i in range(30)]
"Return the mean score of running an agent in each of the envs, for steps"
results = []
for communication in [True, False]:
total = 0
steps = 2000
i = 0
for env in copy.deepcopy(envs):
i+=1
with Timer(name='Simulation Timer - Agent=%s' % i, format='%.4f'):
for i in range(1,5):
env.add_object(GreedyAgentWithRangePerception(sensor_radius = 6, communication = communication), location=(1,i * 3)).id = i
env.run(steps)
total += env.t
results.append(float(total)/len(envs))
plt.plot(['True', 'False'],[r for r in results],'r-')
plt.xlabel('communication')
plt.ylabel('time to fully clean')
plt.show()
def test6():
e = NewVacuumEnvironment(width=6,height=9,config="shape of eight")
ef = EnvFrame(e,cellwidth=30)
# Create agents on left wall
e.add_object(GreedyAgentWithRangePerception(sensor_radius = 3, communication = True), location=(1,1)).id = 1
ef.configure_display()
ef.run()
ef.mainloop()
def main():
# set a seed to provide repeatable outcomes each run
random.seed(None) # set seed to None to remove the seed and have different outcomes
test6()
if __name__ == "__main__":
# execute only if run as a script
main()