-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFazliCoverageAlgorithm.py
More file actions
179 lines (126 loc) · 5.41 KB
/
FazliCoverageAlgorithm.py
File metadata and controls
179 lines (126 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/python3
from Robot import RobotControlInput
import random
import GraphRoadmap
import Vector
import numpy as np
## Coverage algorithm from "The Effects of Communication and Visual Range on
# Multi-Robot Repeated Boundary Coverage" by Fazli and Mackworth
#
class FazliCoverageAlgorithm:
## Initializes the coverage algorithm.
#
# @param sensors (dict of sensors)
# <br> -- the sensors that this algorithm has access to
#
# @param cmdargs (object)
# <br> -- A command-line arguments object generated by `argparse`.
#
def __init__(self, sensors, cmdargs):
self._sensors = sensors;
self._cmdargs = cmdargs;
self._alpha = 0.99
self._gps = sensors['gps']
self._cur_path = [GraphRoadmap.GraphNode(self._gps.location())]
self._step_num = 0
self._node_props = dict()
for node in self._sensors['roadmap'].get_nodes():
self._node_props[node] = {'expected_reward': 0, 'RAR': np.random.uniform(0.1, 0.5), 'last_visit': 0}
self.debug_info = {'min_proximities': []}
## Select the next action for the robot
#
# This function uses the robot's radar and location information, as
# well as internally stored information about previous locations,
# to compute the next action the robot should take.
#
# @returns (`Robot.RobotControlInput` object)
# <br> -- A control input representing the next action the robot
# should take.
#
def select_next_action(self):
self._step_num += 1
msgs = self._sensors['recv'].get_recent_bcasts()
for msg in msgs:
if msg['msg']['action'] == 'cleared_node' and msg['msg']['node'] in self._node_props:
self._update_node_props_from_visit(msg['msg']['node'], msg['msg']['visit_time'], msg['msg']['avg_reward'])
for node in self._node_props:
self._node_props[node]['expected_reward'] += self._node_props[node]['RAR']
if self._gps.distance_to(self._cur_path[-1].location) <= 1:
if self._cur_path[-1] in self._node_props:
self._visit_cur_node(self._cur_path[-1])
final_node = self._cur_path.pop()
while len(self._cur_path) == 0:
self._choose_next_path(nearest_node=final_node)
direction = self._gps.angle_to(self._cur_path[-1].location)
speed = min(self._gps.distance_to(self._cur_path[-1].location), 15)
return RobotControlInput(speed, direction)
def _visit_cur_node(self, cur_node):
events = self._sensors['event'].get_events()
self._sensors['event'].handle_all_events()
time_since_last_visit = self._step_num - self._node_props[cur_node]['last_visit']
avg_reward = 0
if time_since_last_visit > 0:
avg_reward = len(events) / time_since_last_visit
else:
avg_reward = self._node_props[cur_node]['RAR']
self._sensors['bcast'].send_bcast({'action': 'cleared_node', 'node': cur_node, 'reward': avg_reward, 'visit_time': self._step_num})
self._update_node_props_from_visit(cur_node, self._step_num, avg_reward)
def _update_node_props_from_visit(self, node, visit_time, avg_reward):
# Since this may be called when receiving a bcast message,
# double-check that we haven't visited more recently than the
# bcast
if visit_time < self._node_props[node]['last_visit']:
return
self._node_props[node]['RAR'] = ((1 - self._alpha) * self._node_props[node]['RAR']) + (self._alpha * avg_reward)
self._node_props[node]['expected_reward'] = self._node_props[node]['RAR'] * (self._step_num - visit_time)
self._node_props[node]['last_visit'] = visit_time
def _choose_next_path(self, nearest_node=None):
roadmap = self._sensors['roadmap']
if nearest_node is None or nearest_node not in roadmap.get_nodes():
nearest_node = self._find_nearest_node()
best_path = [nearest_node]
best_path_avg_reward = float('-inf')
for node in roadmap.get_nodes():
path = roadmap.find_path(nearest_node, node)
# If we aren't already at the nearest node, add it to the path
if 0.01 < Vector.distance_between(nearest_node.location, self._gps.location()):
path.insert(0, nearest_node)
avg_path_reward = self._calc_avg_path_reward(path)
if best_path_avg_reward < avg_path_reward:
best_path = path
best_path_avg_reward = avg_path_reward
self._cur_path = best_path
# Reverse it so we can pop() each node off in the right order
self._cur_path.reverse()
def _calc_avg_path_reward(self, path):
roadmap = self._sensors['roadmap']
if len(path) == 0:
return 0
# First make a list of the cost to each node along the path
costs_list = []
costs_list.append(Vector.distance_between(self._gps.location(), path[0].location))
prev_node = path[0]
for node in path[1:]:
costs_list.append(costs_list[-1] + prev_node.get_neighbors()[node])
prev_node = node
total_reward = 0
for i in range(len(path)):
node = path[i]
# Add the current expected reward plus the reward that
# will accumulate in the time it takes us to get to
# this node
total_reward += self._node_props[node]['expected_reward'] + self._node_props[node]['RAR'] * costs_list[i]
return total_reward / costs_list[-1]
def _find_nearest_node(self, location=None):
if location is None:
location = self._gps.location()
roadmap = self._sensors['roadmap']
nearest_node = random.sample(roadmap.get_nodes(), 1)[0]
nearest_dist = Vector.distance_between(nearest_node.location, location)
for node in roadmap.get_nodes():
if Vector.distance_between(node.location, location) < nearest_dist:
nearest_node = node
nearest_dist = Vector.distance_between(node.location, location)
return nearest_node
def has_given_up(self):
return False;