Skip to content

Commit

Permalink
Some suggested changes to style (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
srowen authored Dec 6, 2016
1 parent ea5abe3 commit a525617
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class AssembleDocumentTermMatrix(private val spark: SparkSession) extends Serial

docs.mapPartitions { iter =>
val pipeline = createNLPPipeline()
iter.map{ case(title, contents) => (title, plainTextToLemmas(contents, stopWords, pipeline))}
iter.map { case (title, contents) => (title, plainTextToLemmas(contents, stopWords, pipeline)) }
}
}

Expand Down
39 changes: 20 additions & 19 deletions ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/RunLSA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
package com.cloudera.datascience.lsa

import breeze.linalg.{DenseMatrix => BDenseMatrix, SparseVector => BSparseVector}
import org.apache.spark.SparkContext._

import org.apache.spark.mllib.linalg.{Matrices, Matrix, SingularValueDecomposition, Vectors, Vector => MLLibVector}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
Expand All @@ -18,7 +18,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer

object RunLSA {
def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
val k = if (args.length > 0) args(0).toInt else 100
val numTerms = if (args.length > 1) args(1).toInt else 50000

Expand Down Expand Up @@ -67,8 +67,8 @@ object RunLSA {
}

/**
* The top concepts are the concepts that explain the most variance in the dataset. For each top concept, finds the
* terms that are most relevant to the concept.
* The top concepts are the concepts that explain the most variance in the dataset.
* For each top concept, finds the terms that are most relevant to the concept.
*
* @param svd A singular value decomposition.
* @param numConcepts The number of concepts to look at.
Expand All @@ -85,14 +85,14 @@ object RunLSA {
val offs = i * v.numRows
val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex
val sorted = termWeights.sortBy(-_._1)
topTerms += sorted.take(numTerms).map{case (score, id) => (termIds(id), score)}
topTerms += sorted.take(numTerms).map {case (score, id) => (termIds(id), score) }
}
topTerms
}

/**
* The top concepts are the concepts that explain the most variance in the dataset. For each top concept, finds the
* documentsthat are most relevant to the concept.
* The top concepts are the concepts that explain the most variance in the dataset.
* For each top concept, finds the documents that are most relevant to the concept.
*
* @param svd A singular value decomposition.
* @param numConcepts The number of concepts to look at.
Expand All @@ -106,7 +106,7 @@ object RunLSA {
val topDocs = new ArrayBuffer[Seq[(String, Double)]]()
for (i <- 0 until numConcepts) {
val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId
topDocs += docWeights.top(numDocs).map{case (score, id) => (docIds(id), score)}
topDocs += docWeights.top(numDocs).map { case (score, id) => (docIds(id), score) }
}
topDocs
}
Expand Down Expand Up @@ -141,11 +141,11 @@ class LSAQueryEngine(
*/
def multiplyByDiagonalRowMatrix(mat: RowMatrix, diag: MLLibVector): RowMatrix = {
val sArr = diag.toArray
new RowMatrix(mat.rows.map(vec => {
new RowMatrix(mat.rows.map { vec =>
val vecArr = vec.toArray
val newArr = (0 until vec.size).toArray.map(i => vecArr(i) * sArr(i))
Vectors.dense(newArr)
}))
})
}

/**
Expand All @@ -155,7 +155,7 @@ class LSAQueryEngine(
val newMat = new BDenseMatrix[Double](mat.rows, mat.cols)
for (r <- 0 until mat.rows) {
val length = math.sqrt((0 until mat.cols).map(c => mat(r, c) * mat(r, c)).sum)
(0 until mat.cols).map(c => newMat.update(r, c, mat(r, c) / length))
(0 until mat.cols).foreach(c => newMat.update(r, c, mat(r, c) / length))
}
newMat
}
Expand All @@ -164,10 +164,11 @@ class LSAQueryEngine(
* Returns a distributed matrix where each row is divided by its length.
*/
def distributedRowsNormalized(mat: RowMatrix): RowMatrix = {
new RowMatrix(mat.rows.map(vec => {
val length = math.sqrt(vec.toArray.map(x => x * x).sum)
Vectors.dense(vec.toArray.map(_ / length))
}))
new RowMatrix(mat.rows.map { vec =>
val array = vec.toArray
val length = math.sqrt(array.map(x => x * x).sum)
Vectors.dense(array.map(_ / length))
})
}

/**
Expand Down Expand Up @@ -240,22 +241,22 @@ class LSAQueryEngine(
allDocWeights.top(10)
}

def printTopTermsForTerm(term: String) {
def printTopTermsForTerm(term: String): Unit = {
val idWeights = topTermsForTerm(idTerms(term))
println(idWeights.map { case (score, id) => (termIds(id), score) }.mkString(", "))
}

def printTopDocsForDoc(doc: String) {
def printTopDocsForDoc(doc: String): Unit = {
val idWeights = topDocsForDoc(idDocs(doc))
println(idWeights.map { case (score, id) => (docIds(id), score) }.mkString(", "))
}

def printTopDocsForTerm(term: String) {
def printTopDocsForTerm(term: String): Unit = {
val idWeights = topDocsForTerm(idTerms(term))
println(idWeights.map { case (score, id) => (docIds(id), score) }.mkString(", "))
}

def printTopDocsForTermQuery(terms: Seq[String]) {
def printTopDocsForTermQuery(terms: Seq[String]): Unit = {
val queryVec = termsToQueryVector(terms)
val idWeights = topDocsForTermQuery(queryVec)
println(idWeights.map { case (score, id) => (docIds(id), score) }.mkString(", "))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import java.security.MessageDigest
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, Row}
import org.apache.spark.sql.{Dataset, SparkSession, Row}
import org.apache.spark.sql.functions._

import scala.xml._
Expand Down Expand Up @@ -46,8 +45,8 @@ object RunGraph extends Serializable {
println("Number of unique co-occurrence pairs: " + cooccurs.count())
spark.sql("SELECT pairs, cnt FROM topic_pairs ORDER BY cnt DESC LIMIT 10").show()

val vertices = topics.map{ case Row(topic: String) => (hashId(topic), topic) }.toDF("hash", "topic")
val edges = cooccurs.map{ case Row(topics: Seq[_], cnt: Long) =>
val vertices = topics.map { case Row(topic: String) => (hashId(topic), topic) }.toDF("hash", "topic")
val edges = cooccurs.map { case Row(topics: Seq[_], cnt: Long) =>
val ids = topics.map(_.toString).map(hashId).sorted
Edge(ids(0), ids(1), cnt)
}
Expand Down Expand Up @@ -76,11 +75,11 @@ object RunGraph extends Serializable {
}.toDF("topic", "degree").orderBy(desc("degree")).show()

val T = medline.count()
val topicDistRdd = topicDist.map{ case Row(topic: String, cnt: Long) => (hashId(topic), cnt) }.rdd
val topicDistRdd = topicDist.map { case Row(topic: String, cnt: Long) => (hashId(topic), cnt) }.rdd
val topicDistGraph = Graph(topicDistRdd, topicGraph.edges)
val chiSquaredGraph = topicDistGraph.mapTriplets(triplet => {
val chiSquaredGraph = topicDistGraph.mapTriplets(triplet =>
chiSq(triplet.attr, triplet.srcAttr, triplet.dstAttr, T)
})
)
chiSquaredGraph.edges.map(x => x.attr).stats()

val interesting = chiSquaredGraph.subgraph(triplet => triplet.attr > 19.5)
Expand Down Expand Up @@ -132,7 +131,7 @@ object RunGraph extends Serializable {
val start = Map[VertexId, Int]()
val res = mapGraph.ops.pregel(start)(update, iterate, mergeMaps)
res.vertices.flatMap { case (id, m) =>
m.map{ case (k, v) =>
m.map { case (k, v) =>
if (id < k) {
(id, k, v)
} else {
Expand All @@ -149,9 +148,7 @@ object RunGraph extends Serializable {
m2.getOrElse(k, Int.MaxValue))
}

(m1.keySet ++ m2.keySet).map{
k => (k, minThatExists(k))
}.toMap
(m1.keySet ++ m2.keySet).map(k => (k, minThatExists(k))).toMap
}

def update(id: VertexId, state: Map[VertexId, Int], msg: Map[VertexId, Int])
Expand All @@ -161,7 +158,7 @@ object RunGraph extends Serializable {

def checkIncrement(a: Map[VertexId, Int], b: Map[VertexId, Int], bid: VertexId)
: Iterator[(VertexId, Map[VertexId, Int])] = {
val aplus = a.map{ case (v, d) => v -> (d + 1) }
val aplus = a.map { case (v, d) => v -> (d + 1) }
if (b != mergeMaps(aplus, b)) {
Iterator((bid, aplus))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,35 @@ import spray.json._
case class Feature(id: Option[JsValue],
properties: Map[String, JsValue],
geometry: RichGeometry) {
def apply(property: String) = properties(property)
def get(property: String) = properties.get(property)
def apply(property: String): JsValue = properties(property)
def get(property: String): Option[JsValue] = properties.get(property)
}

case class FeatureCollection(features: Array[Feature])
extends IndexedSeq[Feature] {
def apply(index: Int) = features(index)
def length = features.length
def apply(index: Int): Feature = features(index)
def length: Int = features.length
}

case class GeometryCollection(geometries: Array[RichGeometry])
extends IndexedSeq[RichGeometry] {
def apply(index: Int) = geometries(index)
def length = geometries.length
def apply(index: Int): RichGeometry = geometries(index)
def length: Int = geometries.length
}

object GeoJsonProtocol extends DefaultJsonProtocol {
implicit object RichGeometryJsonFormat extends RootJsonFormat[RichGeometry] {
def write(g: RichGeometry) = {
def write(g: RichGeometry): JsValue = {
GeometryEngine.geometryToGeoJson(g.spatialReference, g.geometry).parseJson
}
def read(value: JsValue) = {
def read(value: JsValue): RichGeometry = {
val mg = GeometryEngine.geometryFromGeoJson(value.compactPrint, 0, Geometry.Type.Unknown)
new RichGeometry(mg.getGeometry, mg.getSpatialReference)
}
}

implicit object FeatureJsonFormat extends RootJsonFormat[Feature] {
def write(f: Feature) = {
def write(f: Feature): JsObject = {
val buf = scala.collection.mutable.ArrayBuffer(
"type" -> JsString("Feature"),
"properties" -> JsObject(f.properties),
Expand All @@ -49,7 +49,7 @@ object GeoJsonProtocol extends DefaultJsonProtocol {
JsObject(buf.toMap)
}

def read(value: JsValue) = {
def read(value: JsValue): Feature = {
val jso = value.asJsObject
val id = jso.fields.get("id")
val properties = jso.fields("properties").asJsObject.fields
Expand All @@ -59,26 +59,26 @@ object GeoJsonProtocol extends DefaultJsonProtocol {
}

implicit object FeatureCollectionJsonFormat extends RootJsonFormat[FeatureCollection] {
def write(fc: FeatureCollection) = {
def write(fc: FeatureCollection): JsObject = {
JsObject(
"type" -> JsString("FeatureCollection"),
"features" -> JsArray(fc.features.map(_.toJson): _*)
)
}

def read(value: JsValue) = {
def read(value: JsValue): FeatureCollection = {
FeatureCollection(value.asJsObject.fields("features").convertTo[Array[Feature]])
}
}

implicit object GeometryCollectionJsonFormat extends RootJsonFormat[GeometryCollection] {
def write(gc: GeometryCollection) = {
def write(gc: GeometryCollection): JsObject = {
JsObject(
"type" -> JsString("GeometryCollection"),
"geometries" -> JsArray(gc.geometries.map(_.toJson): _*))
}

def read(value: JsValue) = {
def read(value: JsValue): GeometryCollection = {
GeometryCollection(value.asJsObject.fields("geometries").convertTo[Array[RichGeometry]])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import java.text.SimpleDateFormat
import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._

import com.esri.core.geometry.Point
Expand All @@ -20,7 +19,8 @@ import spray.json._
import com.cloudera.datascience.geotime.GeoJsonProtocol._

class RichRow(row: Row) {
def getAs[T](field: String) = if (row.isNullAt(row.fieldIndex(field))) None else Some(row.getAs[T](field))
def getAs[T](field: String): Option[T] =
if (row.isNullAt(row.fieldIndex(field))) None else Some(row.getAs[T](field))
}

case class Trip(
Expand Down Expand Up @@ -50,7 +50,7 @@ object RunGeoTime extends Serializable {
}
val hoursUDF = udf(hours)

taxiGood.groupBy(hoursUDF('pickupTime, 'dropoffTime)).count().show()
taxiGood.groupBy(hoursUDF($"pickupTime", $"dropoffTime")).count().show()

// register the UDF, use it in a where clause
spark.udf.register("hours", hours)
Expand All @@ -76,9 +76,9 @@ object RunGeoTime extends Serializable {
}
val boroughUDF = udf(borough)

taxiClean.groupBy(boroughUDF('dropoffX, 'dropoffY)).count().show()
taxiClean.groupBy(boroughUDF($"dropoffX", $"dropoffY")).count().show()
val taxiDone = taxiClean.where("dropoffX != 0 and dropoffY != 0 and pickupX != 0 and pickupY != 0")
taxiDone.groupBy(boroughUDF('dropoffX, 'dropoffY)).count().show()
taxiDone.groupBy(boroughUDF($"dropoffX", $"dropoffY")).count().show()

taxiGood.unpersist()

Expand Down Expand Up @@ -119,7 +119,7 @@ object RunGeoTime extends Serializable {

def parseTaxiTime(datetime: Option[String]): Long = {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH)
datetime.map(dt => formatter.parse(dt).getTime()).getOrElse(0L)
datetime.map(dt => formatter.parse(dt).getTime).getOrElse(0L)
}

def parse(line: Row): Trip = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,24 @@

package com.cloudera.datascience.risk

import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.StatCounter
import org.apache.commons.math3.util.FastMath

object KernelDensity {
/**
* Simple heuristic to choose an appropriate bandwidth.
*/
def chooseBandwidth(samples: Seq[Double]): Double = {
val stddev = new StatCounter(samples).stdev
1.06 * stddev * math.pow(samples.size, -.2)
1.06 * stddev * math.pow(samples.size, -0.2)
}

/**
* Simple heuristic to choose an appropriate bandwidth.
*/
def chooseBandwidth(samples: RDD[Double]): Double = {
val stats = samples.stats()
1.06 * stats.stdev * math.pow(stats.count, -.2)
1.06 * stats.stdev * math.pow(stats.count, -0.2)
}

/**
Expand All @@ -35,7 +33,7 @@ object KernelDensity {
def estimate(samples: Seq[Double], evaluationPoints: Array[Double]): Array[Double] = {
val stddev = chooseBandwidth(samples)
val logStandardDeviationPlusHalfLog2Pi =
FastMath.log(stddev) + 0.5 * FastMath.log(2 * FastMath.PI)
math.log(stddev) + 0.5 * math.log(2.0 * math.Pi)

val zero = (new Array[Double](evaluationPoints.length), 0)
val (points, count) = samples.aggregate(zero)(
Expand All @@ -57,7 +55,7 @@ object KernelDensity {
def estimate(samples: RDD[Double], evaluationPoints: Array[Double]): Array[Double] = {
val stddev = chooseBandwidth(samples)
val logStandardDeviationPlusHalfLog2Pi =
FastMath.log(stddev) + 0.5 * FastMath.log(2 * FastMath.PI)
math.log(stddev) + 0.5 * math.log(2.0 * math.Pi)

val zero = (new Array[Double](evaluationPoints.length), 0)
val (points, count) = samples.aggregate(zero)(
Expand Down Expand Up @@ -106,6 +104,6 @@ object KernelDensity {
x: Double): Double = {
val x0 = x - mean
val x1 = x0 / standardDeviation
FastMath.exp(-0.5 * x1 * x1 - logStandardDeviationPlusHalfLog2Pi)
math.exp(-0.5 * x1 * x1 - logStandardDeviationPlusHalfLog2Pi)
}
}
Loading

0 comments on commit a525617

Please sign in to comment.