-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtask.py
More file actions
130 lines (109 loc) · 4.65 KB
/
task.py
File metadata and controls
130 lines (109 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import numpy as np
from sklearn.metrics.pairwise import euclidean_distances
from scipy.spatial import distance
from environment import Environment
class Task():
#"""Task (environment) that defines the goal and provides feedback to the agent."""
def __init__(self, grid_file,iBeacon_loc,labeled_data,runtime=5., target_pos=None, init_pose=None):
#"""Initialize a Task object.
# Arguments:
# init_pose: initial position of the user in (x,y) dimensions
# runtime: time limit for each episode
# target_pos: target/goal (x,y) position for the agent
#"""
# Simulation
self.sim = Environment(grid_file,iBeacon_loc,labeled_data, runtime, init_pose)
self.action_repeat = 1
#"""
#States: The state of the agent is represented as a tuple of these observations.
#1) A vector of 13 RSSI values.
#2) Current location (identified by row and column numbers).
#3) Distance to the target.
#"""
self.state_size = self.action_repeat * 16
self.action_size = 1
self.action_categories = 4
#Statistics data variables
self.prev_dis = 0
self.total_dis = 0
self.positions = []
self.best_pos = []
self.best_score = -np.inf
self.score = 0
# Goal
self.target_pos = target_pos if target_pos is not None else np.array([17, 10])
self.init_dis = self.calc_distance(self.sim.pose, self.target_pos)
self.dis_to_target = self.calc_distance(self.sim.pose, self.target_pos)
def calc_distance(self,a,b):
return 3 * distance.euclidean(a,b)
def get_reward(self, done):
#"""Uses current pose of sim to return reward."""
reward = 0
distance = self.calc_distance(self.sim.pose, self.target_pos)
self.total_dis += abs(self.prev_dis - distance)
#positive reward for getting close to the target and neagtive for getting far
if self.prev_dis > distance:
reward += (self.prev_dis - distance)
if (distance < 12 and distance != 0): #reward for being close to target
reward += (10)
elif distance <= 4: # reward for getting to the target
reward += 20
#elif distance is not 0:
# reward += 1/distance
else:
reward -= 1 #penalty for hovering away from the target
#penalty for being done without reaching target
if done is True:
if not np.array_equal(self.sim.pose,self.target_pos):
reward -= 0
self.prev_dis = distance
return reward
def step(self, direction):
#"""Uses action to obtain next state, reward, done."""
reward = 0
list_ = []
for _ in range(self.action_repeat):
done = self.sim.next_timestep(direction)
self.dis_to_target = self.calc_distance(self.sim.pose, self.target_pos)
if np.array_equal(self.sim.pose,self.target_pos):
done = True
reward += self.get_reward(done)
self.score += reward
self.positions.append(self.sim.pose)
y1 = self.sim.BLE_vals.reshape(-1,)
for i in range(0,13):
list_.append(y1[i])
y2 = self.sim.pose.reshape(-1,)
for i in range(0,2):
list_.append(y2[i])
list_.append((3 * distance.euclidean(self.sim.pose,self.target_pos)))
next_state =np.array(list_)
list_.clear()
if done is True:
self.update_positions(self.score)
return next_state, reward, done
def reset(self):
#"""Reset the sim to start a new episode."""
self.sim.reset()
self.prev_dis = 0
self.total_dis = 0
self.score = 0
self.dis_to_target = self.calc_distance(self.sim.pose, self.target_pos)
list_ = []
y1 = self.sim.BLE_vals.reshape(-1,)
for i in range(0,13):
list_.append(y1[i])
y2 = self.sim.pose.reshape(-1,)
for i in range(0,2):
list_.append(y2[i])
list_.append((3 * distance.euclidean(self.sim.pose,self.target_pos)))
state = np.concatenate([np.array(list_)]* self.action_repeat)
list_.clear()
return state
def update_positions(self,reward):
#"""saves the best path found in terms of rewards"""
if reward > self.best_score:
self.best_pos.clear()
self.best_pos = self.positions
self.positions.clear()
self.best_score = reward