-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathrun_planner.py
More file actions
89 lines (69 loc) · 2.98 KB
/
run_planner.py
File metadata and controls
89 lines (69 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import argparse
from multi_drone_environment import MultiDroneEnvironment
from models.multi_drone_model import (
MultiDroneTransitionModel,
MultiDroneObservationModel,
MultiDroneInitialBelief,
MultiDroneMixtureInitialBelief,
MultiDroneTask
)
from belief_state import BeliefState
# Replace this with your own online planner
from planners.dummy_planner import DummyPlanner
parser = argparse.ArgumentParser()
parser.add_argument('--config', type=str, required=True, help="Path to the yaml configuration file")
args = parser.parse_args()
def run(env, planner, planning_time_per_step=1.0):
# Set the simulator to the initial state
current_state = env.reset() # Set the simulator to the initial state
belief_state = BeliefState(env) # Initialize a belief state
num_steps = 0
total_discounted_reward = 0.0
history = []
while True:
# Use MCTS to plan an action from the current state
action = planner.plan(belief_state, planning_time_per_step=planning_time_per_step)
# Apply the action to the environment
next_state, observation, reward, done, info = env.step(action)
print(f"next state: {next_state}, action {action}, observation: {observation}, reward: {reward}, done: {done}")
# Accumulate discounted reward
total_discounted_reward += (env.get_config().discount_factor ** num_steps) * reward
# Log trajectory
history.append((current_state, action, observation, reward, next_state, done, info))
# Update the belief with the action exectued and the observation perceived
updated_belief = belief_state.update(action, observation, 1000)
# Update visualization
if belief_state.belief_particles is not None:
# Plot 10 belief particles
env.update_plot(belief_state.belief_particles[:10])
else:
env.update_plot()
# Move forward
current_state = next_state
num_steps += 1
if updated_belief is False:
print("Couldn't update belief")
break
# Quit if we reached a terminal state or reached a maximum number of steps
if done or num_steps >= env.get_config().max_num_steps:
break
return total_discounted_reward, history, num_steps
# Instantiate the POMDP model components
transition_model = MultiDroneTransitionModel()
observation_model = MultiDroneObservationModel()
initial_belief = MultiDroneInitialBelief()
task = MultiDroneTask()
# Instantiate the MultiDroneEnvironment with the POMDP models components
env = MultiDroneEnvironment(
args.config,
transition_model,
observation_model,
initial_belief,
task,
)
# Instantiate the planner
planner = DummyPlanner(env)
# Run the planning loop
total_discounted_reward, history, num_steps = run(env, planner, planning_time_per_step=2.0)
print(f"success: {history[-1][6]['all_reached']}, Total discounted reward: {total_discounted_reward}, num_steps: {num_steps}")
env.show()