-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathairport_network.py
More file actions
406 lines (320 loc) · 16 KB
/
airport_network.py
File metadata and controls
406 lines (320 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
from typing import Dict
from models import Airport
from fibonacci_heap import FibonacciHeap
import json
import math
import configuration as config
def haversine_calculator(lat1, lon1, lat2, lon2):
"""
Use the haversine formula to calculate the distance between two points
Args:
lat1 (float): latitude of first point
lon1 (float): longitude of first point
lat2 (float): latitude of second point
lon2 (float): longitude of second point
Returns: the distance between the two points
"""
# haversine function
earth_radius = 6371 # radius of earth in KM
lat1_rad = math.radians(float(lat1))
lon1_rad = math.radians(float(lon1))
lat2_rad = math.radians(float(lat2))
lon2_rad = math.radians(float(lon2))
# Differences in coordinates
delta_lat = lat2_rad - lat1_rad
delta_lon = lon2_rad - lon1_rad
# Apply the Haversine formula
a = math.sin(delta_lat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# Calculate the distance
distance = earth_radius * c
return distance
# Node class for the priority queue in A* algorithm
class Node:
"""A node in the priority queue"""
def __init__(self, airport, f_score, stops=0, path=None, tie_breaker=0, distance=0, duration=0, estimate_emission=0, estimate_price=0):
self.airport = airport # Airport IATA code
self.f_score = f_score # Total estimated cost
self.stops = stops # Number of stops in path
self.path = path or [airport] # Path of airports visited
self.tie_breaker = tie_breaker # For breaking ties
self.distance = distance # Actual distance traveled
self.duration = duration # Flight duration in minutes
self.estimate_emission = estimate_emission # Carbon Dioxide emission
self.estimate_price = estimate_price # Ticket price
class PriorityQueue:
"""Manual priority queue implementation for A* algorithm"""
def __init__(self):
self.elements = []
self.count = 0 # For tie-breaking
def empty(self):
return len(self.elements) == 0
def put(self, node):
"""Add node to the priority queue"""
self.elements.append(node)
self.count += 1
def get(self):
"""Get node with lowest f_score (priority)"""
if self.empty():
return None
# Find index of node with minimum f_score
min_idx = 0
for i in range(1, len(self.elements)):
# If f_scores are equal, use tie breaker
if (self.elements[i].f_score < self.elements[min_idx].f_score or
(self.elements[i].f_score == self.elements[min_idx].f_score and
self.elements[i].tie_breaker < self.elements[min_idx].tie_breaker)):
min_idx = i
# Remove and return the minimum node
return self.elements.pop(min_idx)
class FibonacciHeapPriorityQueue:
"""Priority queue implementation using Fibonacci Heap for A* algorithm"""
def __init__(self):
"""Initialize an empty priority queue"""
self.heap = FibonacciHeap()
self.node_map = {} # map node to heap nodes for decrease-key operations
self.count = 0 # for tie-breaking
def empty(self):
"""Check if the priority queue is empty"""
return self.heap.is_empty()
def put(self, node):
"""
Adds a node to the priority queue
Args:
node (Airport): Airport object
"""
# store original tie_breaker if it exists, otherwise use our counter
if not hasattr(node, 'tie_breaker'):
node.tie_breaker = self.count
self.count += 1
# insert with f score and heap
heap_node = self.heap.insert(node.f_score, node)
# Store reference to heap node for potential decrease-key operations
self.node_map[node] = heap_node
def get(self):
"""
Get node with lowest f_score (priority)
Returns:
node: Airport object with the lowest key in the heap.
"""
if self.empty():
return None
# get the min_node
min_heap_node = self.heap.extract_min()
if min_heap_node is None:
return None
# Get the original node from the value
node = min_heap_node.value
# Remove from node map
if node in self.node_map:
del self.node_map[node]
return node
def update_priority(self, node, new_f_score):
"""Updates the priority of a node if the new f_score is lower"""
if node in self.node_map:
heap_node = self.node_map[node]
# Only decrease if new score is lower
if new_f_score < heap_node.key:
# Update the node's f_score
node.f_score = new_f_score
# Decrease key in the heap
self.heap.decrease_key(heap_node, new_f_score)
else:
# If node isn't in the queue yet, add it
node.f_score = new_f_score
self.put(node)
def contains(self, node):
"""Checks if the queue has a specific node"""
return node in self.node_map
class AirportNetwork:
"""
Airport network class. Defines and holds the datastructures for the entire program.
1. Serializes and deserializes the JSON for the nodes and edges (Airports and flights)
2. Cache and pre-calculate airport distances using haversine.
3. Calculates and estimate the ticket price and carbon emissions for the flight.
5. Executes the core modified A star k-shortest paths algorithm.
Attributes:
airports - the dictionary of airports keyed by airport code (IATA).
"""
def __init__(self, airports: Dict[str, Airport]):
self.airports = airports # Dictionary mapping IATA code to Airport object
self.routes = {iata: [] for iata in airports.keys()} # Initialize routes dictionary with empty lists
self.hdistances = {}
@classmethod
def from_json_file(cls, file_path: str):
"""
Static method that creates an instance of the AirportNetwork class from a deserializable json file.
Args:
file_path (str): The path to the JSON file.
Returns:
AirportNetwork: An AirportNetwork instance.
"""
with open(file_path, 'r') as f:
data = json.load(f)
airports = {iata: Airport.from_dict(iata, airport_data) for iata, airport_data in data.items()}
return cls(airports=airports)
def get_airport_by_iata(self, iata: str):
"""
Gets an airport data class by its iata code.
Args:
iata (str): The iata code.
Returns:
Airport: An instance of Airport or None.
"""
if iata not in self.airports:
return None # Or raise a KeyError, depending on desired behavior
return self.airports[iata]
def get_all_airport_iata_codes(self):
return list(self.airports.keys())
# Calculate distance on demand with in-memory caching
def get_distance(self, origin, destination):
"""
Pre-calculate airport distance between origin and destination using haversine.
Uses simple in-memory caching to avoid recalculation.
Args:
origin (str): The iata code of the origin airport.
destination (str): The iata code of the destination airport.
Returns:
distance (float): The distance between the origin and destination airport.
"""
# Check if we've already calculated this distance
if origin in self.hdistances and destination in self.hdistances[origin]:
return self.hdistances[origin][destination]
# Need to calculate - make sure the nested dict exists
if origin not in self.hdistances:
self.hdistances[origin] = {}
# Get airport objects
origin_airport = self.airports[origin]
dest_airport = self.airports[destination]
# Calculate distance
distance = haversine_calculator(
float(origin_airport.latitude), float(origin_airport.longitude),
float(dest_airport.latitude), float(dest_airport.longitude)
)
# Cache the result
self.hdistances[origin][destination] = distance
return distance
def estimate_ticket_price(self, distance, duration):
"""
Estimates the ticket price for a given distance and duration.
Configurations can be found in the config.TICKET_PRICE dictionary.
Args:
distance (float): The distance in kilometers.
duration (float): The duration in hours.
Returns:
float: The estimated ticket price.
"""
price_per_km = config.TICKET_PRICE["price_per_km"]
price_per_hour = config.TICKET_PRICE["price_per_min"] * (duration / 60)
return (distance * price_per_km) + (duration * price_per_hour)
def estimate_carbon_emission(self, distance):
"""
Estimate the CO2 emissions from the config file, using a fixed factor derived from online sources.
Uses a different factor for: Long, Medium and Short Haul flights, can be found in config.EMISSION_FACTORS dict.
It determines the differentiating distances from the config.DISTANCE_THRESHOLD dictionary.
Args:
distance (float): The distance in kilometers.
Returns:
Estimated CO2 emissions
"""
long_haul_emission = config.EMISSION_FACTORS["long_haul"]
medium_haul_emission = config.EMISSION_FACTORS["medium_haul"]
short_haul_emission = config.EMISSION_FACTORS["short_haul"]
emission = 0
if distance > config.DISTANCE_THRESHOLD["medium_haul"]:
emission = long_haul_emission
elif distance > config.DISTANCE_THRESHOLD["short_haul"]:
emission = medium_haul_emission
else:
emission = short_haul_emission
return emission * distance
def find_routes(self, start_airport, dest_airport):
"""
Executes the core A star k-shortest/optimal paths algorithm to find the best path to an airport.
Additionally takes into account CO2 emissions, stop penalty (to reduce number of stops for optimal travel)
Args:
start_airport: The starting airport
dest_airport: the destination airport to go to.
Returns: k-shortest/optimal paths to the airport.
"""
open_set = FibonacciHeapPriorityQueue()
initial_heuristic = config.PENALTY["weight_factors"] * self.get_distance(start_airport, dest_airport)
start_node = Node(airport=start_airport, f_score=initial_heuristic, stops=0, path=[start_airport], tie_breaker=0, distance=0, duration=0, estimate_emission=0, estimate_price=config.TICKET_PRICE["base_price"])
open_set.put(start_node)
# Track minimum distance to each airport (g_score in A*)
g_score = dict.fromkeys(self.airports, float('inf'))
g_score[start_airport] = 0
# Track estimated total cost to goal through each airport (f_score in A*)
f_score = dict.fromkeys(self.airports, float('inf'))
f_score[start_airport] = initial_heuristic
# Storage for our two categories of results
possible_routes = []
best_actual_distance = float('inf') # For pruning suboptimal paths
# A* search loop
while not open_set.empty():
current_node = open_set.get()
current = current_node.airport
stops = current_node.stops
path = current_node.path
actual_distance = current_node.distance
actual_duration = current_node.duration
estimate_emission = current_node.estimate_emission
estimate_price = current_node.estimate_price
# Check if destination reached with valid number of connections
if current == dest_airport and stops <= config.SEARCH_LIMIT["max_stops"] + 1:
# Store this route in both result categories
# Add the stop penalty
penalized_score = actual_distance + (stops * config.PENALTY["stop_penalty"])
possible_routes.append((penalized_score, actual_distance, path.copy(), stops, actual_duration, estimate_emission, estimate_price))
# Update best distance for future path pruning
best_actual_distance = min(best_actual_distance, actual_distance)
# Explore connecting flights from current airport
for route in self.airports[current].routes:
next_airport = route.iata
# Skip if we've already visited this airport (avoid loops)
if next_airport in path:
continue
# Calculate new distance if we take this connection
distance = float(route.km)
new_actual_distance = actual_distance + distance
# Add stop penalty for A* ordering (doesn't affect actual distance)
penalized_distance = new_actual_distance + (stops * config.PENALTY["stop_penalty"])
# Calculate duration if we take this connection
duration = float(route.min)
new_actual_duration = actual_duration + duration
# Calculate Carbon Dioxide emission if we take this connection
carbon_emission = self.estimate_carbon_emission(distance)
new_estimate_emission = estimate_emission + carbon_emission
# Calculate Ticket price if we take this connection
ticket_price = self.estimate_ticket_price(distance, duration)
new_estimate_price = estimate_price + ticket_price
if stops > 1: # Penalty for each stop
new_actual_duration += config.PENALTY["duration_penalty"] # Add 90 mins for each stop
new_estimate_emission *= config.PENALTY["emit_penalty"] # 15% increase in emission for each stop
new_estimate_price *= config.PENALTY["price_penalty"] # 15% decrease in ticket price for each stop
# Only consider this path if it's the best way to reach next_airport so far
if new_actual_distance < g_score[next_airport]:
g_score[next_airport] = new_actual_distance
# Calculate remaining distance estimate using haversine distance
heuristic = self.get_distance(next_airport, dest_airport)
# Combined score: actual distance so far + stop penalties + weighted heuristic
new_f_score = penalized_distance + (config.PENALTY["weight_factors"] * heuristic)
f_score[next_airport] = new_f_score
# Create new node and add to queue
new_node = Node(
airport=next_airport,
f_score=new_f_score,
stops=stops + 1,
path=path + [next_airport],
tie_breaker=open_set.count,
distance=round(new_actual_distance,0),
duration= round(new_actual_duration,0),
estimate_emission=round(new_estimate_emission,0),
estimate_price= round(new_estimate_price,0)
)
open_set.put(new_node)
# Return None if no routes found
if not possible_routes:
print("No route found.")
return None
return possible_routes[:config.SEARCH_LIMIT["max_routes"]]