Skip to content

Commit

Permalink
Merge pull request #68 from JohnSnowLabs/resource-helper-txtds
Browse files Browse the repository at this point in the history
TXTDS corpus reading
  • Loading branch information
aleksei-ai authored Dec 22, 2017
2 parents e4836d9 + b2c1c15 commit c17ddac
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 65 deletions.
3 changes: 0 additions & 3 deletions python/sparknlp/annotator.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ class NorvigSweetingApproach(JavaEstimator, JavaMLWritable, JavaMLReadable, Anno
@keyword_only
def __init__(self,
dictPath="/spell/words.txt",
slangPath="/spell/slangs.txt",
caseSensitive=False,
doubleVariants=False,
shortCircuit=False
Expand All @@ -375,7 +374,6 @@ def __init__(self,
kwargs = self._input_kwargs
self._setDefault(
dictPath="/spell/words.txt",
slangPath="/spell/slangs.txt",
caseSensitive=False,
doubleVariants=False,
shortCircuit=False
Expand Down Expand Up @@ -412,7 +410,6 @@ def setShortCircuit(self, value):

def setParams(self,
dictPath="/spell/words.txt",
slangPath="/spell/slangs.txt",
caseSensitive=False,
doubleVariants=False,
shortCircuit=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class NorvigSweetingApproach(override val uid: String)
val slangPath = new Param[String](this, "slangPath", "path to custom dictionaries")

setDefault(dictPath, "/spell/words.txt")
setDefault(slangPath, "/spell/slangs.txt")
setDefault(corpusFormat, "TXT")

setDefault(caseSensitive, false)
Expand All @@ -43,20 +42,17 @@ class NorvigSweetingApproach(override val uid: String)
def setSlangPath(value: String): this.type = set(slangPath, value)

override def train(dataset: Dataset[_]): NorvigSweetingModel = {
val loadWords = ResourceHelper.wordCount($(dictPath), TXT)
val loadWords = ResourceHelper.wordCount($(dictPath), $(corpusFormat).toUpperCase)
val corpusWordCount =
if (get(corpusPath).isDefined) {
if ($(corpusFormat).toLowerCase == "txt") {
ResourceHelper.wordCount($(corpusPath), TXT)
} else if ($(corpusFormat).toLowerCase == "txtds") {
ResourceHelper.wordCount($(corpusPath), TXTDS)
} else {
throw new Exception("Unsupported corpusFormat. Must be txt or txtds")
}
ResourceHelper.wordCount($(corpusPath), $(corpusFormat).toUpperCase)
} else {
Map.empty[String, Int]
}
val loadSlangs = ResourceHelper.parseKeyValueText($(slangPath), "txt", ",")
val loadSlangs = if (get(slangPath).isDefined)
ResourceHelper.parseKeyValueText($(slangPath), $(corpusFormat).toUpperCase, ",")
else
Map.empty[String, String]
new NorvigSweetingModel()
.setWordCount(loadWords.toMap ++ corpusWordCount)
.setCustomDict(loadSlangs)
Expand Down
115 changes: 63 additions & 52 deletions src/main/scala/com/johnsnowlabs/nlp/util/io/ResourceHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import java.io.{File, FileNotFoundException, InputStream}

import com.johnsnowlabs.nlp.annotators.{Normalizer, RegexTokenizer}
import com.johnsnowlabs.nlp.{DocumentAssembler, Finisher}
import com.johnsnowlabs.nlp.annotators.common.{TaggedSentence, TaggedWord}
import com.johnsnowlabs.nlp.util.io.ResourceFormat._
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SparkSession
Expand All @@ -27,6 +26,22 @@ object ResourceHelper {

private val spark: SparkSession = SparkSession.builder().getOrCreate()

/** Structure for a SourceStream coming from compiled content */
case class SourceStream(resource: String) {
val pipe: Option[InputStream] = {
var stream = getClass.getResourceAsStream(resource)
if (stream == null)
stream = getClass.getClassLoader.getResourceAsStream(resource)
Option(stream)
}
val content: Source = pipe.map(p => {
Source.fromInputStream(p)("UTF-8")
}).getOrElse(Source.fromFile(resource, "UTF-8"))
def close(): Unit = {
content.close()
pipe.foreach(_.close())
}
}

def listDirectory(path: String): Seq[String] = {
var dirURL = getClass.getResource(path)
Expand Down Expand Up @@ -69,57 +84,12 @@ object ResourceHelper {
throw new UnsupportedOperationException(s"Cannot list files for URL $dirURL")
}

/** Structure for a SourceStream coming from compiled content */
case class SourceStream(resource: String) {
val pipe: Option[InputStream] = {
var stream = getClass.getResourceAsStream(resource)
if (stream == null)
stream = getClass.getClassLoader.getResourceAsStream(resource)
Option(stream)
}
val content: Source = pipe.map(p => {
Source.fromInputStream(p)("UTF-8")
}).getOrElse(Source.fromFile(resource, "UTF-8"))
def close(): Unit = {
content.close()
pipe.foreach(_.close())
}
}

/** Checks whether a path points to directory */
def pathIsDirectory(path: String): Boolean = {
//ToDo: Improve me???
if (path.contains(".txt")) false else true
}

/**
* General purpose key values parser from source
* Currently only text files
* @param source File input to streamline
* @param format format, for now only txt
* @param keySep separator character
* @param valueSep values separator in dictionary
* @return Dictionary of all values per key
*/
def parseKeyValuesText(
source: String,
format: Format,
keySep: String,
valueSep: String): Map[String, Array[String]] = {
format match {
case TXT =>
val sourceStream = SourceStream(source)
val res = sourceStream.content.getLines.map (line => {
val kv = line.split (keySep).map (_.trim)
val key = kv (0)
val values = kv (1).split (valueSep).map (_.trim)
(key, values)
}).toMap
sourceStream.close()
res
}
}

/**
* General purpose key value parser from source
* Currently read only text files
Expand All @@ -142,6 +112,14 @@ object ResourceHelper {
}).toMap
sourceStream.close()
res
case TXTDS =>
import spark.implicits._
val dataset = spark.read.option("delimiter", keySep).csv(source).toDF("key", "value")
val keyValueStore = MMap.empty[String, String]
dataset.as[(String, String)].foreach{kv => keyValueStore(kv._1) = kv._2}
keyValueStore.toMap
case _ =>
throw new Exception("Unsupported format. Must be TXT or TXTDS")
}
}

Expand All @@ -162,6 +140,16 @@ object ResourceHelper {
val res = sourceStream.content.getLines.toArray
sourceStream.close()
res
case TXTDS =>
import spark.implicits._
val dataset = spark.read.text(source)
val lineStore = spark.sparkContext.collectionAccumulator[String]
dataset.as[String].foreach(l => lineStore.add(l))
val result = lineStore.value.toArray.map(_.toString)
lineStore.reset()
result
case _ =>
throw new Exception("Unsupported format. Must be TXT or TXTDS")
}
}

Expand All @@ -187,6 +175,19 @@ object ResourceHelper {
}).toArray
sourceStream.close()
res
case TXTDS =>
import spark.implicits._
val dataset = spark.read.text(source)
val lineStore = spark.sparkContext.collectionAccumulator[String]
dataset.as[String].foreach(l => lineStore.add(l))
val result = lineStore.value.toArray.map(line => {
val kv = line.toString.split (keySep).map (_.trim)
(kv.head, kv.last)
})
lineStore.reset()
result
case _ =>
throw new Exception("Unsupported format. Must be TXT or TXTDS")
}
}

Expand Down Expand Up @@ -215,7 +216,19 @@ object ResourceHelper {
})
sourceStream.close()
m.toMap
case _ => throw new IllegalArgumentException("Only txt supported as a file format")
case TXTDS =>
import spark.implicits._
val dataset = spark.read.text(source)
val valueAsKeys = MMap.empty[String, String]
dataset.as[String].foreach(line => {
val kv = line.split(keySep).map(_.trim)
val key = kv(0)
val values = kv(1).split(valueSep).map(_.trim)
values.foreach(v => valueAsKeys(v) = key)
})
valueAsKeys.toMap
case _ =>
throw new Exception("Unsupported format. Must be TXT or TXTDS")
}
}

Expand Down Expand Up @@ -273,7 +286,7 @@ object ResourceHelper {
case TXTDS =>
import spark.implicits._
val dataset = spark.read.textFile(source)
val wordCount = spark.sparkContext.broadcast(MMap.empty[String, Int].withDefaultValue(0))
val wordCount = MMap.empty[String, Int].withDefaultValue(0)
val documentAssembler = new DocumentAssembler()
.setInputCol("value")
val tokenizer = new RegexTokenizer()
Expand All @@ -292,11 +305,9 @@ object ResourceHelper {
.transform(dataset)
.select("finished").as[String]
.foreach(text => text.split("--").foreach(t => {
wordCount.value(t) += 1
wordCount(t) += 1
}))
val result = wordCount.value
wordCount.destroy()
result
wordCount
case _ => throw new IllegalArgumentException("format not available for word count")
}
}
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions src/test/scala/com/johnsnowlabs/nlp/AnnotatorBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ object AnnotatorBuilder extends FlatSpec { this: Suite =>
val spellChecker = new NorvigSweetingApproach()
.setInputCols(Array("normalized"))
.setOutputCol("spell")
.setDictPath("./src/main/resources/spell/words.txt")
.setCorpusPath("./src/test/resources/spell/sherlockholmes.txt")
.setCorpusFormat(inputFormat)
spellChecker.fit(withFullNormalizer(dataset)).transform(withFullNormalizer(dataset))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ trait NorvigSweetingBehaviors { this: FlatSpec =>

val spellChecker = new NorvigSweetingApproach()
.setCorpusPath("/spell")
.setSlangPath("/spell/slangs.txt")
.fit(DataBuilder.basicDataBuild("dummy"))

def isolatedNorvigChecker(wordAnswer: Seq[(String, String)]): Unit = {
Expand Down

0 comments on commit c17ddac

Please sign in to comment.