From d02721eaf062380c0176ce9de524273ac793be29 Mon Sep 17 00:00:00 2001 From: Danilo Burbano Date: Mon, 4 Sep 2023 10:04:42 -0500 Subject: [PATCH] SPARKNLP-884 Enabling getVectors method to get word vectors as spark dataframe --- .../sparknlp/annotator/embeddings/doc2vec.py | 6 ++ .../sparknlp/annotator/embeddings/word2vec.py | 6 ++ .../nlp/embeddings/Doc2VecModel.scala | 27 ++++++++- .../nlp/embeddings/Word2VecModel.scala | 23 +++++++- .../nlp/embeddings/Doc2VecTestSpec.scala | 59 ++++++++++++------- .../nlp/embeddings/Word2VecTestSpec.scala | 33 +++++++++++ 6 files changed, 130 insertions(+), 24 deletions(-) diff --git a/python/sparknlp/annotator/embeddings/doc2vec.py b/python/sparknlp/annotator/embeddings/doc2vec.py index da63d575996d1e..1bc6c7120b8e77 100755 --- a/python/sparknlp/annotator/embeddings/doc2vec.py +++ b/python/sparknlp/annotator/embeddings/doc2vec.py @@ -344,3 +344,9 @@ def pretrained(name="doc2vec_gigaword_300", lang="en", remote_loc=None): from sparknlp.pretrained import ResourceDownloader return ResourceDownloader.downloadModel(Doc2VecModel, name, lang, remote_loc) + def getVectors(self): + """ + Returns the vector representation of the words as a dataframe + with two fields, word and vector. + """ + return self._call_java("getVectors") diff --git a/python/sparknlp/annotator/embeddings/word2vec.py b/python/sparknlp/annotator/embeddings/word2vec.py index e3e52f32f00c2a..c9c9450f5ffb4e 100755 --- a/python/sparknlp/annotator/embeddings/word2vec.py +++ b/python/sparknlp/annotator/embeddings/word2vec.py @@ -345,3 +345,9 @@ def pretrained(name="word2vec_gigaword_300", lang="en", remote_loc=None): from sparknlp.pretrained import ResourceDownloader return ResourceDownloader.downloadModel(Word2VecModel, name, lang, remote_loc) + def getVectors(self): + """ + Returns the vector representation of the words as a dataframe + with two fields, word and vector. + """ + return self._call_java("getVectors") diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecModel.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecModel.scala index 6b2d6f86664a50..6524c9a3bd1e0e 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecModel.scala @@ -23,7 +23,8 @@ import com.johnsnowlabs.nlp._ import com.johnsnowlabs.storage.HasStorageRef import org.apache.spark.ml.param.{IntParam, ParamValidators} import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.types.{ArrayType, FloatType, StringType, StructField, StructType} /** Word2Vec model that creates vector representations of words in a text corpus. * @@ -166,6 +167,21 @@ class Doc2VecModel(override val uid: String) /** @group setParam */ def setWordVectors(value: Map[String, Array[Float]]): this.type = set(wordVectors, value) + private var sparkSession: Option[SparkSession] = None + + def getVectors: DataFrame = { + val vectors: Map[String, Array[Float]] = $$(wordVectors) + val rows = vectors.toSeq.map { case (key, values) => Row(key, values) } + val schema = StructType( + StructField("word", StringType, nullable = false) :: + StructField("vector", ArrayType(FloatType), nullable = false) :: Nil) + if (sparkSession.isEmpty) { + throw new UnsupportedOperationException( + "Vector representation empty. Please run Doc2VecModel in some pipeline before accessing vector vocabulary.") + } + sparkSession.get.createDataFrame(sparkSession.get.sparkContext.parallelize(rows), schema) + } + setDefault(inputCols -> Array(TOKEN), outputCol -> "doc2vec", vectorSize -> 100) private def calculateSentenceEmbeddings(matrix: Seq[Array[Float]]): Array[Float] = { @@ -180,6 +196,11 @@ class Doc2VecModel(override val uid: String) res } + override def beforeAnnotate(dataset: Dataset[_]): Dataset[_] = { + sparkSession = Some(dataset.sparkSession) + dataset + } + /** takes a document and annotations and produces new annotations of this annotator's annotation * type * @@ -204,8 +225,8 @@ class Doc2VecModel(override val uid: String) .filter(_.nonEmpty) val oovVector = Array.fill($(vectorSize))(0.0f) - val vectors = tokens.map { tokne => - $$(wordVectors).getOrElse(tokne, oovVector) + val vectors = tokens.map { token => + $$(wordVectors).getOrElse(token, oovVector) } val sentEmbeddings = calculateSentenceEmbeddings(vectors) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/Word2VecModel.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/Word2VecModel.scala index 5ddf760450df4d..67a7388eef419f 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/Word2VecModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/Word2VecModel.scala @@ -24,7 +24,8 @@ import com.johnsnowlabs.nlp._ import com.johnsnowlabs.storage.HasStorageRef import org.apache.spark.ml.param.{IntParam, ParamValidators} import org.apache.spark.ml.util.Identifiable -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.{ArrayType, FloatType, StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** Word2Vec model that creates vector representations of words in a text corpus. * @@ -167,8 +168,28 @@ class Word2VecModel(override val uid: String) /** @group setParam */ def setWordVectors(value: Map[String, Array[Float]]): this.type = set(wordVectors, value) + private var sparkSession: Option[SparkSession] = None + + def getVectors: DataFrame = { + val vectors: Map[String, Array[Float]] = $$(wordVectors) + val rows = vectors.toSeq.map { case (key, values) => Row(key, values) } + val schema = StructType( + StructField("word", StringType, nullable = false) :: + StructField("vector", ArrayType(FloatType), nullable = false) :: Nil) + if (sparkSession.isEmpty) { + throw new UnsupportedOperationException( + "Vector representation empty. Please run Word2VecModel in some pipeline before accessing vector vocabulary.") + } + sparkSession.get.createDataFrame(sparkSession.get.sparkContext.parallelize(rows), schema) + } + setDefault(inputCols -> Array(TOKEN), outputCol -> "word2vec", vectorSize -> 100) + override def beforeAnnotate(dataset: Dataset[_]): Dataset[_] = { + sparkSession = Some(dataset.sparkSession) + dataset + } + /** takes a document and annotations and produces new annotations of this annotator's annotation * type * diff --git a/src/test/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecTestSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecTestSpec.scala index c66ddb882b4216..d9dc979116e7ce 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecTestSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/embeddings/Doc2VecTestSpec.scala @@ -17,6 +17,7 @@ package com.johnsnowlabs.nlp.embeddings import com.johnsnowlabs.nlp.annotator._ +import com.johnsnowlabs.nlp.annotators.SparkSessionTest import com.johnsnowlabs.nlp.base._ import com.johnsnowlabs.nlp.training.CoNLL import com.johnsnowlabs.nlp.util.io.ResourceHelper @@ -27,7 +28,7 @@ import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, Multiclas import org.apache.spark.sql.functions.{explode, when} import org.scalatest.flatspec.AnyFlatSpec -class Doc2VecTestSpec extends AnyFlatSpec { +class Doc2VecTestSpec extends AnyFlatSpec with SparkSessionTest { "Doc2VecApproach" should "train, save, and load back the saved model" taggedAs FastTest in { @@ -43,18 +44,6 @@ class Doc2VecTestSpec extends AnyFlatSpec { " ", " ").toDF("text") - val document = new DocumentAssembler() - .setInputCol("text") - .setOutputCol("document") - - val setence = new SentenceDetector() - .setInputCols("document") - .setOutputCol("sentence") - - val tokenizer = new Tokenizer() - .setInputCols(Array("sentence")) - .setOutputCol("token") - val stops = new StopWordsCleaner() .setInputCols("token") .setOutputCol("cleanedToken") @@ -67,7 +56,7 @@ class Doc2VecTestSpec extends AnyFlatSpec { .setStorageRef("my_awesome_doc2vec") .setEnableCaching(true) - val pipeline = new Pipeline().setStages(Array(document, setence, tokenizer, stops, doc2Vec)) + val pipeline = new Pipeline().setStages(Array(documentAssembler, sentenceDetector, tokenizerWithSentence, stops, doc2Vec)) val pipelineModel = pipeline.fit(ddd) val pipelineDF = pipelineModel.transform(ddd) @@ -87,7 +76,7 @@ class Doc2VecTestSpec extends AnyFlatSpec { .setOutputCol("sentence_embeddings") val loadedPipeline = - new Pipeline().setStages(Array(document, setence, tokenizer, loadedDoc2Vec)) + new Pipeline().setStages(Array(documentAssembler, sentenceDetector, tokenizerWithSentence, loadedDoc2Vec)) loadedPipeline.fit(ddd).transform(ddd).select("sentence_embeddings").show() @@ -105,10 +94,6 @@ class Doc2VecTestSpec extends AnyFlatSpec { "carbon emissions have come down without impinging on our growth .\\u2009.\\u2009.", "the ").toDF("text") - val document = new DocumentAssembler() - .setInputCol("text") - .setOutputCol("document") - val setence = new SentenceDetector() .setInputCols("document") .setOutputCol("sentence") @@ -135,7 +120,7 @@ class Doc2VecTestSpec extends AnyFlatSpec { val pipeline = new Pipeline().setStages( Array( - document, + documentAssembler, setence, tokenizerDocument, tokenizerSentence, @@ -332,4 +317,38 @@ class Doc2VecTestSpec extends AnyFlatSpec { println("Area under ROC = " + auROC) } + + it should "get word vectors as spark dataframe" taggedAs SlowTest in { + + import ResourceHelper.spark.implicits._ + + val testDataset = Seq( + "Rare Hendrix song draft sells for almost $17,000. This is my second sentenece! The third one here!") + .toDF("text") + + val doc2Vec = Doc2VecModel + .pretrained() + .setInputCols("token") + .setOutputCol("embeddings") + + val pipeline = + new Pipeline().setStages(Array(documentAssembler, tokenizer, doc2Vec)) + + val result = pipeline.fit(testDataset).transform(testDataset) + result.show() + + doc2Vec.getVectors.show() + } + + it should "raise an error when trying to retrieve empty word vectors" taggedAs SlowTest in { + val word2Vec = Doc2VecModel + .pretrained() + .setInputCols("token") + .setOutputCol("embeddings") + + intercept[UnsupportedOperationException] { + word2Vec.getVectors + } + } + } diff --git a/src/test/scala/com/johnsnowlabs/nlp/embeddings/Word2VecTestSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/embeddings/Word2VecTestSpec.scala index 8f51f440cd29d8..3c27a767b1da88 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/embeddings/Word2VecTestSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/embeddings/Word2VecTestSpec.scala @@ -201,4 +201,37 @@ class Word2VecTestSpec extends AnyFlatSpec with SparkSessionTest { } + it should "get word vectors as spark dataframe" taggedAs SlowTest in { + + import ResourceHelper.spark.implicits._ + + val testDataset = Seq( + "Rare Hendrix song draft sells for almost $17,000. This is my second sentenece! The third one here!") + .toDF("text") + + val word2Vec = Word2VecModel + .pretrained() + .setInputCols("token") + .setOutputCol("embeddings") + + val pipeline = + new Pipeline().setStages(Array(documentAssembler, tokenizer, word2Vec)) + + val result = pipeline.fit(testDataset).transform(testDataset) + result.show() + + word2Vec.getVectors.show() + } + + it should "raise an error when trying to retrieve empty word vectors" taggedAs SlowTest in { + val word2Vec = Word2VecModel + .pretrained() + .setInputCols("token") + .setOutputCol("embeddings") + + intercept[UnsupportedOperationException] { + word2Vec.getVectors + } + } + }