-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathlocal_history.patch
More file actions
183 lines (176 loc) · 7.04 KB
/
local_history.patch
File metadata and controls
183 lines (176 loc) · 7.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
Index: flow/envs/test.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/flow/envs/test.py b/flow/envs/test.py
--- a/flow/envs/test.py
+++ b/flow/envs/test.py (date 1709550721974)
@@ -87,7 +87,7 @@
dtype=np.float32)
position = Box(
- low=-50,
+ low=-100,
high=2000,
shape=(3, self.initial_vehicles.num_rl_vehicles),
dtype=np.float32)
@@ -150,20 +150,156 @@
ids = self.k.vehicle.get_rl_ids()
max_speed = self.k.network.max_speed()
max_acc = self.env_params.additional_params["max_accel"]
- num_rl_veh = self.k.vehicle.num_rl_vehicles
# the reward to forward the vehicles
- average_rl_speed = sum(self.k.vehicle.get_speed(ids))/num_rl_veh
- rv = average_rl_speed / max_speed
+ rv = 0
+ ra = 0
+
+ if self.k.vehicle.num_rl_vehicles != 0:
+ average_rl_speed = sum(self.k.vehicle.get_speed(ids)) / self.k.vehicle.num_rl_vehicles
+ rv = average_rl_speed / max_speed
+ if average_rl_speed - max_speed >= 5:
+ rv = - 10
- # the reward to control the acceleration-behavior
- acc = [0 for _ in range(num_rl_veh)]
+ # the panel to control the acceleration-behavior
if rl_actions is not None:
- acc = rl_actions
+ ra += np.average(rl_actions) / 2 * max_acc + 0.5
- #ra = - sum([_ ** 2 for _ in acc]) ** 0.5 / (num_rl_veh * max_acc)
- reward = rv
+ # ra = - sum([_ ** 2 for _ in acc]) ** 0.5 / (num_rl_veh * max_acc)
+ # model performance crashed when involved a in reward
+ reward = 0.5 * rv + 0.5 * ra
+
+ return rv
+
+ def step(self, rl_actions):
+ """Advance the environment by one step.
+
+ Assigns actions to autonomous and human-driven agents (i.e. vehicles,
+ traffic lights, etc...). Actions that are not assigned are left to the
+ control of the simulator. The actions are then used to advance the
+ simulator by the number of time steps requested per environment step.
+
+ Results from the simulations are processed through various classes,
+ such as the Vehicle and TrafficLight kernels, to produce standardized
+ methods for identifying specific network state features. Finally,
+ results from the simulator are used to generate appropriate
+ observations.
+
+ Parameters
+ ----------
+ rl_actions : array_like
+ an list of actions provided by the rl algorithm
+
+ Returns
+ -------
+ observation : array_like
+ agent's observation of the current environment
+ reward : float
+ amount of reward associated with the previous state/action pair
+ done : bool
+ indicates whether the episode has ended
+ info : dict
+ contains other diagnostic information from the previous action
+ """
+ for _ in range(self.env_params.sims_per_step):
+ self.time_counter += 1
+ self.step_counter += 1
+
+ # perform acceleration actions for controlled human-driven vehicles
+ if len(self.k.vehicle.get_controlled_ids()) > 0:
+ accel = []
+ for veh_id in self.k.vehicle.get_controlled_ids():
+ action = self.k.vehicle.get_acc_controller(
+ veh_id).get_action(self)
+ accel.append(action)
+ self.k.vehicle.apply_acceleration(
+ self.k.vehicle.get_controlled_ids(), accel, smooth=False)
+
+ # perform lane change actions for controlled human-driven vehicles
+ if len(self.k.vehicle.get_controlled_lc_ids()) > 0:
+ direction = []
+ for veh_id in self.k.vehicle.get_controlled_lc_ids():
+ target_lane = self.k.vehicle.get_lane_changing_controller(
+ veh_id).get_action(self)
+ direction.append(target_lane)
+ self.k.vehicle.apply_lane_change(
+ self.k.vehicle.get_controlled_lc_ids(),
+ direction=direction)
+
+ # perform (optionally) routing actions for all vehicles in the
+ # network, including RL and SUMO-controlled vehicles
+ routing_ids = []
+ routing_actions = []
+ for veh_id in self.k.vehicle.get_ids():
+ if self.k.vehicle.get_routing_controller(veh_id) \
+ is not None:
+ routing_ids.append(veh_id)
+ route_contr = self.k.vehicle.get_routing_controller(
+ veh_id)
+ routing_actions.append(route_contr.choose_route(self))
+
+ self.k.vehicle.choose_routes(routing_ids, routing_actions)
+
+ self.apply_rl_actions(rl_actions)
+
+ self.additional_command()
+
+ # advance the simulation in the simulator by one step
+ self.k.simulation.simulation_step()
+
+ # store new observations in the vehicles and traffic lights class
+ self.k.update(reset=False)
+
+ # update the colors of vehicles
+ if self.sim_params.render:
+ self.k.vehicle.update_vehicle_colors()
+
+ # crash encodes whether the simulator experienced a collision
+ crash = self.k.simulation.check_collision()
- return reward
+ # stop collecting new simulation steps if there is a collision
+ if crash:
+ break
+
+ # render a frame
+ self.render()
+
+ states = self.get_state()
+
+ # collect information of the state of the network based on the
+ # environment class used
+ self.state = np.asarray(states).T
+
+ # collect observation new state associated with action
+ next_observation = np.copy(states)
+
+ # test if the environment should terminate due to a collision caused by rl-veh or the
+ # time horizon being met
+
+ rl_collision = False
+ for rl_ids in self.k.vehicle.get_rl_ids():
+ if rl_ids in self.k.simulation.collision_list():
+ rl_collision = True
+ break
+
+ done = (
+ self.time_counter >= self.env_params.sims_per_step *
+ (self.env_params.warmup_steps + self.env_params.horizon)
+ or rl_collision or self.k.vehicle.num_rl_vehicles == 0
+ )
+
+ # compute the info for each agent
+ infos = {}
+ if done:
+ print(self.k.simulation.collision_list())
+ # compute the reward
+ if self.env_params.clip_actions:
+ rl_clipped = self.clip_actions(rl_actions)
+ reward = self.compute_reward(rl_clipped, fail=crash)
+ else:
+ reward = self.compute_reward(rl_actions, fail=crash)
+
+ return next_observation, reward, done, infos
+