-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path4_Script_collection_others.py
More file actions
42 lines (36 loc) · 1.49 KB
/
4_Script_collection_others.py
File metadata and controls
42 lines (36 loc) · 1.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from pyspark.sql.functions import *
import csv
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from pyspark.sql import Row
import csv
from pyspark.sql import SQLContext
def parseCSV(idx, part):
if idx==0:
part.next()
for p in csv.reader(part):
yield Row(ORIGIN=p[14],
ORIGIN_AIRPORT_ID = str(p[11]),
DEST = p[23],
DEST_AIRPORT_ID = str(p[20]),
ROUTE = (p[11],p[20]))
def main(sc):
spark = HiveContext(sc)
sqlContext = HiveContext(sc)
rows = sc.textFile('../lmf445/Flight_Project/Data/864625436_T_ONTIME_2*.csv').mapPartitionsWithIndex(parseCSV)
df = sqlContext.createDataFrame(rows)
df_unique = df.select(df.ROUTE.alias('ROUTE_UNIQUE'), 'DEST', 'ORIGIN').distinct()
busyest_route_single = df.select('ROUTE').groupBy('ROUTE').count()
busyest_route_single = busyest_route_single.join(df_unique, busyest_route_single.ROUTE == df_unique.ROUTE_UNIQUE)
busyest_route_single = busyest_route_single.drop('ROUTE_UNIQUE')
busyest_route_single = busyest_route_single.sort(desc('count'))
busyest_route_single.show()
busyest_route_single.toPandas().to_csv('Output/MostBussyRoute.csv')
if __name__ == "__main__":
sc = SparkContext()
main(sc)