-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmovie_recommendations_als_dataframe.py
More file actions
63 lines (48 loc) · 1.96 KB
/
movie_recommendations_als_dataframe.py
File metadata and controls
63 lines (48 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
from pyspark.ml.recommendation import ALS
import sys
import codecs
def load_movie_names():
movie_names = {}
# CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
with codecs.open(
'C:/Users/brian/code/SparkExamples/ml-100k/u.item',
'r',
encoding='ISO-8859-1',
errors='ignore'
) as f:
for line in f:
fields = line.split('|')
movie_names[int(fields[0])] = fields[1]
return movie_names
spark = SparkSession.builder.appName('ALSExample').getOrCreate()
movies_schema = StructType([
StructField('user_id', IntegerType(), True),
StructField('movie_id', IntegerType(), True),
StructField('rating', IntegerType(), True),
StructField('timestamp', LongType(), True)])
names = load_movie_names()
ratings = spark.read.option('sep', '\t').schema(movies_schema) \
.csv('file:///Users/brian/code/SparkExamples/ml-100k/u.data')
print('Training recommendation model...')
als = ALS().setMaxIter(5).setRegParam(0.01).setUserCol('user_id').setItemCol(
'movie_id').setRatingCol('rating')
model = als.fit(ratings)
# Manually construct a dataframe of the user ID's we want recs for
user_id = int(sys.argv[1])
user_schema = StructType([StructField('user_id', IntegerType(), True)])
users = spark.createDataFrame([[user_id, ]], user_schema)
recommendations = model.recommendForUserSubset(users, 10).collect()
print('Top 10 recommendations for user ID ' + str(user_id))
for user_recs in recommendations:
# user_recs is (user_id, [Row(movie_id, rating), Row(movie_id, rating)...])
my_recs = user_recs[1]
for rec in my_recs: # my Recs is just the column of recs for the user
# For each rec in the list, extract the movie ID and rating
movie = rec[0]
rating = rec[1]
movie_name = names[movie]
print(movie_name, str(rating))
# Stop the session
spark.stop()