forked from AlexGidiotis/keras-CF-NADE
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_prep.py
115 lines (87 loc) · 3.64 KB
/
data_prep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry, BlockMatrix
from indexes import create_user_index, create_doc_index, load_indexes, map_recommendations
from configs import data_path
def read_data(sc, data_file, delimiter='::'):
"""
Read the data into an RDD of tuple (usrId, productId, rating).
Args:
sc: An active SparkContext.
data_file: A (delimiter) separated file.
delimiter: The delimiter used to separate the 3 fields of the input file. Default is ','.
Returns:
ui_mat_rdd: The UI matrix in an RDD.
"""
data = sc.textFile(data_file)
header = data.first()
def _func(x):
return int(x.split(delimiter)[0]), int(x.split(delimiter)[1]), float(
x.split(delimiter)[2])
ui_mat_rdd = data.filter(lambda row: row != header).map(_func)
return ui_mat_rdd
if __name__ == "__main__":
sc = SparkContext()
spark = SparkSession(sc)
ui_mat_rdd = read_data(
sc, data_path, delimiter='::').sample(
False, 1.0, seed=0).persist()
print('Creating usr and doc indexes...')
user_index = create_user_index(ui_mat_rdd)
doc_index = create_doc_index(ui_mat_rdd)
b_uidx = sc.broadcast(user_index)
b_didx = sc.broadcast(doc_index)
def _func(i):
usrId, docId, value = i
return b_uidx.value[usrId], b_didx.value[docId], value
ui_mat_rdd = ui_mat_rdd.map(_func)
def _func(i):
usrId, docId, value = i
return usrId
num_users = ui_mat_rdd.map(_func).distinct().count()
def _func(i):
usrId, docId, value = i
return docId
num_movies = ui_mat_rdd.map(_func).distinct().count()
print('users:', num_users, 'products:', num_movies)
df = spark.createDataFrame(ui_mat_rdd, ['userId', 'movieId', 'value'])
ui_mat_rdd.unpersist()
print('Splitting data set...')
df = df.orderBy(F.rand())
train_df, test_df = df.randomSplit([0.9, 0.1], seed=45)
train_df, val_df = train_df.randomSplit([0.95, 0.05], seed=45)
train_df = train_df.withColumn('flag', F.lit(0))
val_df = val_df.withColumn('flag', F.lit(1))
val_df = val_df.union(train_df)
test_df = test_df.withColumn('flag', F.lit(2))
test_df = test_df.union(train_df)
test_df = test_df.union(val_df)
train_size = train_df.count()
val_size = val_df.count()
test_size = test_df.count()
train_df.show()
print(train_size, 'training examples')
print(val_size, 'validation examples')
print(test_size, 'testing example')
train_examples = train_df.select(
"movieId", F.struct(["userId", "value",
"flag"]).alias("ranking")).groupby('movieId').agg(
F.collect_list('ranking').alias('rankings'))
val_examples = val_df.select(
"movieId", F.struct(["userId", "value",
"flag"]).alias("ranking")).groupby('movieId').agg(
F.collect_list('ranking').alias('rankings'))
test_examples = test_df.select(
"movieId", F.struct(["userId", "value",
"flag"]).alias("ranking")).groupby('movieId').agg(
F.collect_list('ranking').alias('rankings'))
train_examples.show()
val_examples.show()
test_examples.show()
train_examples.coalesce(1).write.json(
path="data/train_set", mode='overwrite')
val_examples.coalesce(1).write.json(path="data/val_set", mode='overwrite')
test_examples.coalesce(1).write.json(
path="data/test_set", mode='overwrite')