-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpartc3.py
More file actions
63 lines (49 loc) · 1.64 KB
/
partc3.py
File metadata and controls
63 lines (49 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import pyspark, re
sc = pyspark.SparkContext()
def cleaned_vin(line):
try:
fields = line.split(',')
if len(fields) != 3:
return False
float(fields[2])
return True
except:
return False
def clean_vout(line):
try:
fields = line.split(',')
if len(fields) != 4 or fields[3] != "{18iEz617DoDp8CNQUyyrjCcC7XCGDf5SVb}":
return False
float(fields[1])
float(fields[2])
return True
except:
return False
def clean_vout_no_wiki(line):
try:
fields = line.split(',')
if len(fields) != 4:
return False
float(fields[1])
float(fields[2])
return True
except:
return False
vout = sc.textFile("/data/bitcoin/vout.csv")
cleaned_vout = vout.filter(clean_vout).map(lambda l: l.split(","))
vout_join = cleaned_vout.map(lambda l: (l[0], ((l[1]), float(l[2]), l[3])))
vin = sc.textFile("/data/bitcoin/vin.csv")
cleaned_vin = vin.filter(cleaned_vin).map(lambda a: a.split(","))
vin_join = cleaned_vin.map(lambda l: (l[1], (l[0])))
#Joined (tx_hash, (vin(tx_id), vout(value, n, publicKey)))
joined_data = vin_join.join(vout_join)
hash_vout_KV = joined_data.map(lambda b: ((b[1][0]),"667"))
#vout((tx_hash), (value, n,publicKey))
cleaned_vout_no_wiki = vout.filter(clean_vout_no_wiki).map(lambda c: c.split(',')).map(lambda c: ((c[0]), (c[1], c[2], c[3])))
second_join = hash_vout_KV.join(cleaned_vout_no_wiki)
#second_join.saveAsTextFile("pleaseworkx")
second_KV = second_join.map(lambda d: (d[1][1][2], float(d[1][1][1])))
second_join_reduced = second_KV.reduceByKey(lambda a,b: a+b)
second_join_reduced.saveAsTextFile("pleasework99")
#part4 = second_join_reduced.sortBy(lambda x: -x[1])
#sc.parallelize(part4).saveAsTextFile("part4xdd99")