-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathget_data.py
More file actions
80 lines (74 loc) · 3.16 KB
/
get_data.py
File metadata and controls
80 lines (74 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import pickle
import numpy as np
import argparse
# expert agent policy
# x is the position of the ego agent;
# y is the position of the other agent;
# goal is the position of the ego agent's goal
# returns the sampled action that minimizes cost C
def agent(x, y, goal, beta=1.0):
U = np.random.uniform(-1, 1, (100, 2))
P = []
for u in U:
if np.linalg.norm(u) > 1.0:
u /= np.linalg.norm(u)
C_goal = np.linalg.norm((x + u) - goal) - np.linalg.norm(x - goal)
C_avoid = np.linalg.norm(x - y) - np.linalg.norm((x + u) - y)
C = C_goal + 0.75 * C_avoid
P.append(np.exp(-beta * C))
idx = np.argmax(P)
return U[idx, :]
# get demonstrations
# each interaction consists of 20 timesteps
# during interactions two vehicles attempt to reach their goals
# while avoiding collision
def get_dataset(args):
dataset = []
dataset_history = []
goal_x = np.array([10., 0.])
goal_y = np.array([0., 10.])
state_history = np.zeros((args.n_history, 4))
for _ in range(args.n_interactions):
# initial position of the two vehicles
x = np.random.uniform([-10, -10], [0, 10], 2)
y = np.random.uniform([-10, -10], [10, 0], 2)
for idx in range(20):
# optimal action for ego agent
action = agent(x, y, goal_x)
# add noise of the specified type
if args.noise_type == "uniform":
action += np.random.uniform(-args.sigma, +args.sigma, 2)
elif args.noise_type == "gaussian":
action += np.random.normal(0, args.sigma, 2)
elif args.noise_type == "random":
if np.random.rand() < args.sigma:
action += np.random.uniform(-args.sigma, +args.sigma, 2)
# optimal action for the other agent
u2 = agent(y, x, goal_y)
state = np.array([x[0], x[1], y[0], y[1]])
datapoint = []
datapoint.append(state.tolist() + action.tolist())
# sample counterfactuals within radius delta
for _ in range(50):
action1 = action + np.random.uniform(-args.delta, +args.delta, 2)
datapoint.append(state.tolist() + action1.tolist())
dataset.append(datapoint)
# fifo stack for the state history
state_history[:-1, :] = state_history[1:, :]
state_history[-1, :] = np.copy(state)
dataset_history.append((np.copy(state_history), np.copy(action)))
x += action
y += u2
pickle.dump(dataset, open("data/demos.pkl", "wb"))
pickle.dump(dataset_history, open("data/demos_history.pkl", "wb"))
print("dataset has this many state-action pairs:", len(dataset))
# save the specified number of demonstrations
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--n_interactions', type=int, default=10)
parser.add_argument('--n_history', type=int, default=5)
parser.add_argument('--noise_type', default="uniform")
parser.add_argument('--sigma', type=float, default=0.5)
parser.add_argument('--delta', type=float, default=0.5)
args = parser.parse_args()
get_dataset(args)