-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathseg_bastar.py
More file actions
583 lines (466 loc) · 18.2 KB
/
seg_bastar.py
File metadata and controls
583 lines (466 loc) · 18.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
import numpy as np
from Utils.gridmaker import gridload
from queue import PriorityQueue
from Environments.dec_grid_rl import DecGridRL
from Logger.logger import Logger
import copy
class BA_Star(object):
'''
Online controller that takes incremental observations of the environment and
can achieve optimal and full coverage in certain conditions (see Shreyas'
notes).
'''
def __init__(self, internal_grid_rad, egoradius):
super().__init__()
# describes what subcell we are currently in
self._internal_grid_rad = internal_grid_rad
self._egoradius = egoradius
self.num_actions = 4
# reset policy (creates visited array and curr_x, curr_y)
self.reset()
def get_obs_vis(self, state):
# getting only the obstacle layer
obs = copy.deepcopy(np.squeeze(state)[2])
vis = copy.deepcopy(
self._visited[self._curr_x - 1:self._curr_x + 2, self._curr_y - 1:self._curr_y + 2])
# print("ori vis: " + str(vis))
# print("ori obs: " + str(obs))
if self._curr_a == 0:
r = 1
elif self._curr_a == 90:
r = 0
elif self._curr_a == 180:
r = 3
else:
r = 2
for i in range(r):
vis = np.rot90(vis)
if self._curr_a == 0:
r = 1
elif self._curr_a == 90:
r = 0
elif self._curr_a == 180:
r = 3
else:
r = 2
for i in range(r):
obs = np.rot90(obs)
# print("rot obs: " + str(obs))
# print("rot vis: " + str(vis))
return obs, vis
def b(self, p1, p2, vis, obs):
# print("p1: " + str(p1) + " obs: "
# + str(obs[p1]) + " vis: " + str(vis[p1]))
# print("p2: " + str(p2) + " obs: "
# + str(obs[p2]) + " vis: " + str(vis[p2]))
if vis[p1] == 0 and obs[p1] == 0:
if vis[p2] == 1 or obs[p2] == 1:
return 1
return 0
def is_bpoint(self, i, j, vis, obs):
points = {
1: (i + 1, j),
2: (i + 1, j + 1),
3: (i, j + 1),
4: (i - 1, j + 1),
5: (i - 1, j),
6: (i - 1, j - 1),
7: (i, j - 1),
8: (i + 1, j - 1)
}
u = self.b(points[1], points[2], vis, obs) + \
self.b(points[1], points[8], vis, obs) + \
self.b(points[3], points[2], vis, obs) + \
self.b(points[3], points[4], vis, obs) + \
self.b(points[5], points[4], vis, obs) + \
self.b(points[5], points[6], vis, obs) + \
self.b(points[7], points[6], vis, obs) + \
self.b(points[7], points[8], vis, obs)
if u >= 1:
return True
return False
def backtracking_point(self, vis, obs, start):
backtracking_list = []
for i in range(1, vis.shape[0]):
for j in range(1, vis.shape[1]):
# print("is bpoint: " + str((i, j)))
if vis[i, j] == 1 and self.is_bpoint(i, j, vis, obs):
backtracking_list.append((i, j))
goal = None
m = (vis.shape[0] * vis.shape[1]) + 1
# print(backtracking_list)
for p in backtracking_list:
dist = np.linalg.norm(np.array(p) - np.array(start))
if dist < m:
goal = p
m = dist
return goal
def pi(self, state):
# print("------")
obs, vis = self.get_obs_vis(state)
# print("current angle: " + str(self._curr_a))
# print("frontiering: " + str(self._frontiering))
# print("turning: " + str(self._turning))
# update the robot's maps
self.update_maps(state)
# check if we need to turn
init_boustro = False
if (obs[1, 2] == 1 or vis[1, 2] == 1) and self._turning is False and self._frontiering is False:
# print("attempting to init turn, prev turn: " + str(self._prev_turn))
# check if robot is able to turn, if not init frontier
if self._prev_turn == "right":
# check left if we last turned right
if obs[0, 1] == 1 or vis[0, 1] == 1:
self.init_frontier()
else:
# check right if we last turned left
if obs[2, 1] == 1 or vis[2, 1] == 1:
self.init_frontier()
# if able to turn, init turn
if self._frontiering is False:
self.init_turn()
# print("frontiering: " + str(self._frontiering))
# print("turning: " + str(self._turning))
# turn
if self._turning:
# keep turning unless obstacle in way
if self._prev_turn == "right":
if obs[2, 1] == 1 or vis[2, 1] == 1:
self.init_frontier()
else:
self.turn_right()
else:
if obs[0, 1] == 1 or vis[0, 1] == 1:
self.init_frontier()
else:
self.turn_left()
# reset turning boolean if we are done turning or frontier started
if (self._init_a + 180) % 360 == self._curr_a or self._frontiering:
self._turning = False
# print("frontiering: " + str(self._frontiering))
# print("turning: " + str(self._turning))
# execute frontier based exploration
if self._frontiering:
if self._goal_point == (self._curr_x, self._curr_y):
self._frontiering = False
self._path_map = None
self._goal_point = None
self._prev_a = self.to_open()
# print("Reached goal point! Action to open space: "
# + str(self._prev_a))
if self._prev_a == -1:
self.init_frontier()
self._prev_a = self.frontier_based()
else:
init_boustro = True
# self.init_boustro(state)
else:
self._prev_a = self.frontier_based()
# print("frontier output: " + str(self._prev_a))
u = self._prev_a
# update robot's controls
self.update_controls(u)
# initialize boustrophedon motion if necessary
if init_boustro:
self.init_boustro(state)
return u
def to_open(self):
if self._obstacles[self._curr_x + 1, self._curr_y] == 0 and self._free[self._curr_x + 1, self._curr_y] == 0 and self._visited[self._curr_x + 1, self._curr_y] == 0:
u = 0
elif self._obstacles[self._curr_x - 1, self._curr_y] == 0 and self._free[self._curr_x - 1, self._curr_y] == 0 and self._visited[self._curr_x - 1, self._curr_y] == 0:
u = 2
elif self._obstacles[self._curr_x, self._curr_y + 1] == 0 and self._free[self._curr_x, self._curr_y + 1] == 0 and self._visited[self._curr_x, self._curr_y + 1] == 0:
u = 1
elif self._obstacles[self._curr_x, self._curr_y - 1] == 0 and self._free[self._curr_x, self._curr_y - 1] == 0 and self._visited[self._curr_x, self._curr_y - 1] == 0:
u = 3
else:
u = -1
print("No open spaces!")
if u != -1:
if u == 0:
self._curr_a = 0
elif u == 1:
self._curr_a = 90
elif u == 2:
self._curr_a = 180
else:
self._curr_a = 270
return u
def init_boustro(self, state):
# if self._visited[self._curr_x + 1, self._curr_y] == 0 and self._obstacles[self._curr_x + 1, self._curr_y] == 0:
# self._curr_a = 0
# elif self._visited[self._curr_x, self._curr_y + 1] == 0 and self._obstacles[self._curr_x, self._curr_y + 1] == 0:
# self._curr_a = 90
# elif self._visited[self._curr_x - 1, self._curr_y] == 0 and self._obstacles[self._curr_x - 1, self._curr_y] == 0:
# self._curr_a = 180
# else:
# self._curr_a = 270
obs, vis = self.get_obs_vis(state)
# print("init boustro obs: " + str(obs))
# print("init boustro vis: " + str(vis))
if vis[0, 1] == 1 or obs[0, 1] == 1:
self._prev_turn = "left"
else:
self._prev_turn = "right"
def init_turn(self):
if self._prev_turn == "right":
self._prev_turn = "left"
else:
self._prev_turn = "right"
# print("initializing turn! setting prev turn to: " + str(self._prev_turn))
self._turning = True
self._init_a = self._curr_a
def init_frontier(self):
# init frontier based and flip 0s and 1s
free_copy = copy.deepcopy(self._visited)
free_copy = np.bitwise_not(
free_copy.astype('?')).astype(np.uint8)
free_copy = free_copy.astype('int')
# get explored positions
exp_inds = np.argwhere(free_copy > 0)
# mark not free positions as obstacles
free_copy[exp_inds[:, 0], exp_inds[:, 1]] = -1
# get obstacle positions
obs_inds = np.argwhere(self._obstacles > 0)
# mark obstacle positions
free_copy[obs_inds[:, 0], obs_inds[:, 1]] = -1
# get closest corner point
self._goal_point = self.backtracking_point(
self._visited, self._obstacles, (self._curr_x, self._curr_y))
# generate path to corner point
self._path_map = self.dijkstra_path_map(
free_copy, self._curr_x, self._curr_y, self._goal_point)
# set status to frontiering
self._frontiering = True
# print("visited: " + str(self._visited))
# print(self._free)
# print("obstacles: " + str(self._obstacles))
# print("goal point: " + str(self._goal_point))
# print("path map: " + str(self._path_map))
# print("starting frontier based!")
def turn_left(self):
# print("turning left!")
self._curr_a = (self._curr_a + 90) % 360
self._prev_a = (self._prev_a + 1) % 4
self._prev_turn = "left"
def turn_right(self):
# print("turning right!")
self._curr_a -= 90
if self._curr_a < 0:
self._curr_a = 270
self._prev_a -= 1
if self._prev_a < 0:
self._prev_a = 3
self._prev_turn = "right"
def update_controls(self, u):
# updating robot x, y based on controls
if u == 0:
self._curr_x += 1
elif u == 1:
self._curr_y += 1
elif u == 2:
self._curr_x -= 1
elif u == 3:
self._curr_y -= 1
# print("pos: " + str((self._curr_x, self._curr_y)))
def update_maps(self, state):
pos_map, free, obs, dist = state[0]
# print("free: " + str(free))
# print("obs: " + str(obs))
# visiting the current cell
self._visited[self._curr_x][self._curr_y] = 1
# update free and obstacle maps
for i in range(self._curr_x - self._egoradius, self._curr_x + self._egoradius + 1):
for j in range(self._curr_y - self._egoradius, self._curr_y + self._egoradius + 1):
m = i - self._curr_x + self._egoradius
n = j - self._curr_y + self._egoradius
self._free[i, j] = free[m, n]
self._obstacles[i, j] = obs[m, n]
def frontier_based(self):
pos = (self._curr_x, self._curr_y)
# print("path map inside frontier: " + str(self._path_map))
# determine action
u = -1
for i in range(self.num_actions):
if i == 0:
p = (pos[0] + 1, pos[1])
elif i == 1:
p = (pos[0], pos[1] + 1)
elif i == 2:
p = (pos[0] - 1, pos[1])
elif i == 3:
p = (pos[0], pos[1] - 1)
# print("p: " + str(p))
if self._path_map[p] == 1:
self._path_map[pos] = 0
u = i
break
if u == -1:
u = 0
if u == 0:
self._curr_a = 0
elif u == 1:
self._curr_a = 90
elif u == 2:
self._curr_a = 180
else:
self._curr_a = 270
return u
def in_bounds(self, x, y, grid):
return x >= 0 and y >= 0 and x < grid.shape[0] and y < grid.shape[1]
def get_valid_neighbors(self, x, y, grid, visited):
"""
Args:
x : the x position on the grid
y : the y position
grid : an array representing the environment, 0 is explored,
1 is unexplored, and -1 is obstacle
visited: an array representing whether we have visited a cell
or not, 1 is visited, 0 is not visited
Returns:
A list of valid neighbors and their coordinates
"""
neighbors = []
if self.in_bounds(x+1, y, grid) and visited[x+1][y] == 0 and grid[x+1][y] != -1:
neighbors.append((x+1, y))
if self.in_bounds(x-1, y, grid) and visited[x-1][y] == 0 and grid[x-1][y] != -1:
neighbors.append((x-1, y))
if self.in_bounds(x, y+1, grid) and visited[x][y+1] == 0 and grid[x][y+1] != -1:
neighbors.append((x, y+1))
if self.in_bounds(x, y-1, grid) and visited[x][y-1] == 0 and grid[x][y-1] != -1:
neighbors.append((x, y-1))
return neighbors
def dijkstra_path_map(self, grid, start_x, start_y, end_point):
"""
Args:
grid : an array representing the environment, 1 is explored,
0 is unexplored, and -1 is obstacle
start_x : starting x position
start_y : starting y position
Returns:
an array showing the shortest path to an unexplored cell
"""
open_set = PriorityQueue()
visited = np.zeros(grid.shape)
cost = -1*np.ones(grid.shape)
# adding starting point to the open set
open_set.put((0, (start_x, start_y)))
# main dijkstra loop
while not open_set.empty():
cell = open_set.get()
# check if cell has already been visited
if visited[cell[1][0]][cell[1][1]] == 1:
continue
# mark as visited, finalize cost
visited[cell[1][0]][cell[1][1]] = 1
cost[cell[1][0]][cell[1][1]] = cell[0]
# checking if cell is unexplored
if cell[1] == end_point:
break
# looping over all neighbors and updating their costs
neighbors = self.get_valid_neighbors(
cell[1][0], cell[1][1], grid, visited)
for neighbor in neighbors:
open_set.put((cell[0] + 1, neighbor))
# using cost array to make optimal path
path_array = np.zeros(grid.shape)
# handling case where we can't reach any unexplored points
if end_point is None:
return path_array
curr = end_point
curr_cost = cost[curr[0], curr[1]]
path_array[curr[0], curr[1]] = 1
# reset visited to not interfere with neighbor check
visited = 1 - visited
while curr[0] != start_x or curr[1] != start_y:
neighbors = self.get_valid_neighbors(
curr[0], curr[1], grid, visited)
# finding the neighbor with the minimum cost
for neighbor in neighbors:
if cost[neighbor[0], neighbor[1]] == curr_cost - 1:
curr_cost -= 1
curr = neighbor
break
# adding the current cell to the path
path_array[curr[0], curr[1]] = 1
assert path_array[start_x, start_y] == 1
return path_array
def reset(self):
'''
resets the policy to run again on a different environment
'''
internal_grid_rad = self._internal_grid_rad
# internal coordinate system for keeping track of where we have been
self._visited = np.zeros((2*internal_grid_rad, 2*internal_grid_rad))
# internal coordinate system for tracking observed cells and obstacles
self._obstacles = np.zeros((2*internal_grid_rad, 2*internal_grid_rad))
self._free = np.zeros((2*internal_grid_rad, 2*internal_grid_rad))
# setting the starting x,y
index = int((internal_grid_rad + internal_grid_rad % 2)/2 - 1)
self._curr_x = 2*index
self._curr_y = 2*index
self._curr_a = 0
self._prev_a = 0
self._init_a = 0
self._turning = False
self._frontiering = False
self._prev_turn = "right"
self._path_map = None
self._goal_point = None
# visiting the current cell
self._visited[self._curr_x][self._curr_y] = 1
if __name__ == "__main__":
# testing spanning tree coverage on dec_grid_rl environment
env_config = {
"numrobot": 1,
"maxsteps": 10000,
"collision_penalty": 5,
"egoradius": 1,
"done_thresh": 1,
"done_incr": 0,
"terminal_reward": 30,
"mini_map_rad": 0,
"comm_radius": 0,
"allow_comm": 0,
"map_sharing": 0,
"single_square_tool": 1,
"dist_reward": 0,
"dijkstra_input": 1,
"sensor_type": "square_sensor",
"sensor_config": {
"range": 1
}
}
grid_config = {
"grid_dir": "./Grids/bg2_100x100",
"gridwidth": 100,
"gridlen": 100,
"numgrids": 30,
"prob_obst": 0
}
'''Making the list of grids'''
gridlis = gridload(grid_config)
# train_set, test_set = gridload(grid_config)
env = DecGridRL(gridlis, env_config)
# logger stuff
makevid = True
exp_name = "stcEmptyGrid1"
logger = Logger(exp_name, makevid)
# testing bsa
bsa_controller = BA_Star(105, env_config["egoradius"])
state = env.reset() # getting only the obstacle layer
done = False
render = True
# simulating
while not done:
# determine action
action = bsa_controller.pi(state)
# step environment and save episode results
state, reward = env.step(action)
# determine if episode is completed
done = env.done()
# render if necessary
if render:
frame = env.render()
if(makevid):
logger.addFrame(frame)