-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdeviating_controller.py
More file actions
90 lines (74 loc) · 2.93 KB
/
deviating_controller.py
File metadata and controls
90 lines (74 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import h5py
import pickle
import numpy as np
import sys
import random
import arobocar
import os
import project
import importlib
import gym
# This controller just follows the PID recommendations most of the time but deviates to capture off-policy state
# this controller also records state
# runtime configuration parameters
# steering_noise - amount of noise to add to steering and throttle
# noise_probabilit - how often to deviate - set to zero to drive correctly
# deviation_duration - duration of deviation
def run(project,env,maxidx, filename="robocar.hdf5",
steering_noise=.15,
throttle_noise=.15,
noise_probability=0.01,
deviation_duration=40 ):
fullpathname=os.path.join(project.datadir,filename) # or None
#now open the h5 file
output = h5py.File(fullpathname, 'w')
recorder=project.recorder(output,project.config,maxidx)
actions = output.create_dataset('actions', (maxidx, 2))
actions.attrs['cols'] = "steering,throttle"
steering=0
throttle=0
deviating_cnt=0
#get started
observation=env.reset()
for h5idx in range(0,maxidx):
# get images and state from simulatorq
observation,reward,done,info = env.step([steering, throttle])
if(done):
print("Done")
recorder.record(observation, reward, done, info)
env.reset()
continue
#use the PID values by default
steering=info["PIDsteering"]
throttle=info["PIDthrottle"]
offset=info["pathoffset"] #distance from center of road
speed=observation[1][0]
idx=recorder.record(observation, reward, done, info)
actions[idx]=np.array([steering, throttle])
if deviating_cnt > 0 and ( abs(offset) > 75 or speed>1600) : # stop deviating if we ran off the road or go too fast
deviating_cnt = 0
print("Abort deviation")
if deviating_cnt>0: # while deviation
if deviation_type == 0:
steering = deviation_angle
else:
throttle = deviation_throttle
deviating_cnt -= 1
if (deviating_cnt == 0):
print("End deviation")
#decide when to start another deviation
if deviating_cnt == 0 and random.random() < noise_probability:
deviating_cnt = deviation_duration
if(random.random()>0.5):
deviation_angle = steering + random.random() * steering_noise - (steering_noise / 2)
deviation_type=0
print("** Begin Steering deviation {}".format(deviation_angle))
else:
deviation_throttle = throttle + random.random() * throttle_noise - (throttle_noise / 2)
deviation_type=1
print("** Begin Throttle deviation {}".format(deviation_throttle))
output.close()
if __name__ == "__main__":
# connect to environment
env = gym.make('ARoboCar-v0')
run( project, env, 10000)