-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathp2.py
More file actions
61 lines (52 loc) · 1.94 KB
/
p2.py
File metadata and controls
61 lines (52 loc) · 1.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONValueProtocol
import re
import itertools
import time
class UsersCount(MRJob):
INPUT_PROTOCOL = JSONValueProtocol
def map_user_business(self, _, line):
user = line['user_id']
business = line['business_id']
yield user, business
def reduce_business_per_user(self, key, values):
business_list = list(values)
yield [key, len(business_list)], business_list
def map_business_with_user(self, key, values):
for business in values:
yield business, key
def reduce_users_into_pairs(self, key, values):
for subset in itertools.combinations(values,2):
yield [subset[0][0],subset[1][0]] , subset[0][1] + subset[1][1]
def map_to_count(self, key, value):
yield [key,value], 1
def reduce_and_get_jaccard(self, key, values):
intersection = float(sum(values))
union = float(key[1])
jaccard = float(intersection/(union-intersection))
if jaccard > 0.5:
yield 'Best match by Jaccard index', [intersection/(union-intersection), key[0]]
def steps(self):
return [
MRStep(
mapper=self.map_user_business,
reducer=self.reduce_business_per_user
),
MRStep(
mapper=self.map_business_with_user,
reducer=self.reduce_users_into_pairs
),
MRStep(
mapper=self.map_to_count,
reducer=self.reduce_and_get_jaccard
)
]
if __name__ == '__main__':
print "Begin..."
time_init = time.time()
UsersCount.run()
duration = time.time() - time_init
print "End!"
print "________________________________"
print "Query duration: {0}".format(duration)