-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrelational_join.py
More file actions
56 lines (44 loc) · 1.57 KB
/
relational_join.py
File metadata and controls
56 lines (44 loc) · 1.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
__author__ = 'FarhanKhwaja'
import MapReduce
import sys
"""
Implement a MapReduce algorithm to join the two relations based on the Movie ID/Rated Movie ID value.
A file with two relations that represent user information names and user movie ratings, is provided.
"""
mr = MapReduce.MapReduce()
# =============================
# Do not modify above this line
def mapper(record):
# key: MovieID
# value: (Relation,Values)
relation = record[0]
value = record[1:]
if relation == "MovieNames":
mr.emit_intermediate(value[1], (relation, value[0]))
else:
mr.emit_intermediate(value[0], (relation, value[1:]))
def reducer(key, list_of_values):
# key: MoveID
# value: (Movie Name, MovieID, Rated Movie ID, User ID, User Rating) & (MovieNam,Average Rating)
list_rating = []
rating_sum = 0
movie_name = ""
# Creating Rating List
for Values in list_of_values:
if Values[0] == "MovieNames":
movie_name = Values[1]
else:
list_rating.append(Values)
for ratings in list_rating:
for rate in ratings[1:]:
# emit(Movie Name, MovieID, Rated Movie ID, User ID, User Rating)
mr.emit((movie_name, key, key, rate[0], rate[1]))
# summation of all user ratings
rating_sum += rate[1]
# emit(MovieName,AvgRating)
mr.emit((movie_name, rating_sum/len(list_rating)))
# Do not modify below this line
# =============================
if __name__ == '__main__':
inputdata = open(sys.argv[1])
mr.execute(inputdata, mapper, reducer)