-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathelevator.py
More file actions
224 lines (190 loc) · 8 KB
/
elevator.py
File metadata and controls
224 lines (190 loc) · 8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
from policies import Action
from delegate import Delegate
from debug import Debug as DB
import random
class Elevator:
"""
Represents an elevator
:param min_floor: The minimum floor of the elevator
:type min_floor: int
:param max_floor: The maximum floor of the elevator
:type max_floor: int
:param policy: The policy of the elevator
:type policy: Policy
:param capacity: The capacity of the elevator
:type capacity: int
"""
def __init__(self, min_floor, max_floor, policy, capacity):
self.on_passenger_entered = Delegate()
self.on_passenger_exited = Delegate()
self.min_floor = min_floor
self.max_floor = max_floor
self.current_height = random.randint(min_floor, max_floor) * 100
self.fps = 10 # floors per second (in Percent)
self.decision = Action.WAIT
self.passenger_list = []
self.policy = policy
self.elevator_buttons = [False] * (max_floor + 1)
self.elevator_index = 0
self.target = -1 # Target floor
# Direction that will be taken once reached target (-1 = down, 0 =
# undefined, 1 = up)
self.target_direction = 0
self.capacity = capacity
def __str__(self) -> str:
return DB.str(
"Class",
"Elevator",
kwargs=[
self.max_floor,
self.min_floor,
self.current_height,
self.fps,
self.decision,
self.passenger_list,
self.policy,
self.elevator_buttons],
desc=[
"max floor",
"min floor",
" current height",
"fps",
"decision",
"passengerlist",
"policy",
"buttons pressed"])
def set_elevator_index(self, index):
"""
Set the index of the elevator in the building
:param index: The index of the elevator
:type index: int
"""
self.elevator_index = index
def get_current_floor(self):
"""
Get the current floor of the elevator
:return: The current floor
:rtype: int
"""
return self.current_height // 100
def get_elevator_index(self):
"""
Get the index of the elevator in the building
:return: The index of the elevator
:rtype: int
"""
return self.elevator_index
def step(self, time, building):
"""
Steps the elevator one time step forward
:param time: The current time
:type time: int
:param building: The building the elevator is in
:type building: Building
"""
self.time = time
if (DB.elv_fct_step and ((time % int(DB.elv_fct_stepsSkips)) == 0)):
DB.pr("Func", "step", message="function was called", t=time)
# Check if we are at a floor, approximate if error occurred
if (((self.current_height + self.fps / 2) % 100 <= self.fps) and
(self.decision == Action.MOVE_UP or self.decision == Action.MOVE_DOWN)):
# Wait for policy to make a decision
self.current_height = round(self.current_height / 100.0) * 100
self.decision = Action.WAIT
current_floor = self.get_current_floor()
if (self.decision == Action.WAIT):
# Waiting, get decision from policy
self.decision = self.policy.get_action(
current_floor,
building.floors,
self.elevator_buttons,
building.elevators,
self,
time)
elif (self.decision == Action.WAIT_UP or self.decision == Action.WAIT_DOWN or self.decision == Action.WAIT_OPEN):
# Waiting to go up or down, ask passengers to enter if they go in
# same direction
# Check if any passenger wants to leave
for p in self.passenger_list:
if (p.end_level == current_floor):
if (DB.elv_passenger_leaves_elevator and (
(time % int(DB.elv_passenger_leaves_elevatorSkips)) == 0)):
DB.pr(
"Func",
"step",
message="passenger left elevator",
t=time,
kwargs=[p],
desc=["passenger"])
# Remove passenger from elevator, notify observers (for
# statistics)
self.passenger_list.remove(p)
self.on_passenger_exited.notify_all(p, time)
return
# Since we arrived at floor, disable button
self.elevator_buttons[current_floor] = False
# Check if any passenger wants to enter
floor = building.floors[current_floor]
for p in floor.passenger_list:
# Check if passenger wants to go in same direction
if ((p.end_level < current_floor and self.decision == Action.WAIT_DOWN) or (
p.end_level > current_floor and self.decision == Action.WAIT_UP) or Action.WAIT_OPEN) and self.capacity > len(self.passenger_list):
# Elevator still has capacity and a passenger wants to
# enter, thus passenger leaves floor
# Remove passenger and update floor buttons
floor.remove_passenger(p, time)
if (DB.elv_passenger_enters_elevator and (
(time % int(DB.elv_passenger_enters_elevatorSkips)) == 0)):
DB.pr(
"Func",
"step",
message="passenger entered elevator",
t=time,
kwargs=[p],
desc=["passenger"])
# Add passenger to elevator and press button of passenger
# destination
self.passenger_list.append(p)
self.elevator_buttons[p.end_level] = True
if (DB.elv_passenger_pressed_button and (
(time % int(DB.elv_passenger_pressed_buttonSkips)) == 0)):
DB.pr(
"Func",
"step",
message="passenger pressed button",
t=time,
kwargs=[
p.end_level],
desc=["level"])
self.on_passenger_entered.notify_all(p, time)
# We let only one passenger enter per step, thus return
return
# Now that we let all passengers enter, get new decision from
# policy
self.decision = self.policy.get_action(
current_floor,
building.floors,
self.elevator_buttons,
building.elevators,
self,
time)
if (DB.elv_decision_update and (
(time % int(DB.elv_decision_updateSkips)) == 0)):
DB.pr("Func", "step", message="decision was updated",
t=time, kwargs=[self.decision], desc=["decision"])
# Finally move
if (self.decision == Action.MOVE_DOWN):
self.current_height = max(0, self.current_height - self.fps)
elif (self.decision == Action.MOVE_UP):
self.current_height = min(
(self.max_floor) * 100,
self.current_height + self.fps)
if (DB.elv_movement_update and ((time % int(DB.elv_movement_updateSkips)) == 0)):
DB.pr(
"Func",
"step",
message="elevator moved",
t=time,
kwargs=[
self.current_height],
desc=["height"])