-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPyspark_TF_IDF.py
134 lines (100 loc) · 4.29 KB
/
Pyspark_TF_IDF.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Imports
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import DataFrame
#Init Spark
spark = SparkSession.builder \
.appName("SBIR") \
.enableHiveSupport() \
.config("spark.hadoop.yarn.resourcemanager.principal", "ibrooks") \
.config("spark.sql.warehouse.dir", "target/spark-warehouse") \
.getOrCreate()
#Check Spark Version
spark.version
#Import JSON Files
df_WholeSetRaw = spark.read.option("multiline", "true").json("sbir-search-results*.json")
df_WholeSetRaw.cache()
#Create Table from DataFrame
#df_WholeSetRaw.createOrReplaceTempView("SBIR2018")
#Display resulting Infered schema
df_WholeSetRaw.printSchema()
df_WholeSetRaw.take(1)
#Using Spark Feature Engineering Tools to for NLP Use Case
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, CountVectorizer, CountVectorizerModel
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
#Tokenizer Option 1
tokenizer = Tokenizer(inputCol="abstract", outputCol="words")
tokenized = tokenizer.transform(df_WholeSetRaw)
#Tokenizer Option 2
regexTokenizer = RegexTokenizer(inputCol="abstract", outputCol="words", pattern="\\\W+")
# alternatively, pattern="\\w+", gaps(False)
regexTokenized = regexTokenizer.transform(df_WholeSetRaw)
#Count Tokens for Common Words
countTokens = udf(lambda words: len(words), IntegerType())
regexTokenized.select("abstract", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
#Configure CountVectorizer Model
cvModel = CountVectorizer(inputCol="words", outputCol="rawFeatures", minDF=4, vocabSize=100000).fit(tokenized)
#Configure HashingTF Model
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=400)
#Build Featured Training Sets
CVfeaturizedData = cvModel.transform(regexTokenized)
TFfeaturizedData = hashingTF.transform(regexTokenized)
#Configure IDF/TF Model
idf = IDF(inputCol="rawFeatures", outputCol="features")
#Fit IDF/TF Model to Featurized Training Setd
idfModel = idf.fit(TFfeaturizedData)
rescaledData = idfModel.transform(TFfeaturizedData)
rescaledData.select("abstract","features").show()
#rescaledData.select("features").createOrReplaceTempView("TFIDF")
#rescaledData.show(truncate=False)
#KMeans - Clustering on Hashed Tokens
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import lit
#Set the number of clusters (Play around with this value)
NumberOfClusters = 128
#Create the K-Means model
kmeans = KMeans().setK(NumberOfClusters).setSeed(1).setFeaturesCol("features").setPredictionCol("prediction")
#Train the K-Means model with feature vector
model = kmeans.fit(rescaledData)
# Make predictions
predictions = model.transform(rescaledData)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the Clusering results
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
print("Documents by Cluster")
predictions.select("award_title","prediction").show()
#Print a clusters members out
predictions.filter("prediction =13").select("agency","award_title","research_keywords").show()
#MinHash LSH Example
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinHashLSH
#Set Vocab Size - Good value to play around with
vocabSize = 10000
#Build MinHash Model
mh = MinHashLSH().setNumHashTables(vocabSize).setInputCol("rawFeatures").setOutputCol("hashValues")
MHmodel = mh.fit(rescaledData)
MHmodel.transform(rescaledData).show()
#Find Valus for Keyword Search
keyVal1 = cvModel.vocabulary.index("high")
keyVal2 = cvModel.vocabulary.index("heat")
keyVal3 = cvModel.vocabulary.index("metal")
#Build Keys
One_key = Vectors.sparse(vocabSize, [200], [1.0])
Two_key = Vectors.sparse(vocabSize, [200, 398], [1.0, 1.0])
Three_key = Vectors.sparse(vocabSize, [24, 200, 398], [1.0, 1.0, 1.0])
#Number of Search Results
k = 40
#Find Matched Documents
DF_Matched = MHmodel.approxNearestNeighbors(rescaledData, Three_key, k)
DF_Matched.cache()
DF_Matched.select("agency","award_title","research_keywords","distCol").show() #truncate=False