Skip to content

Commit

Permalink
Added examples for statistical summarization:
Browse files Browse the repository at this point in the history
* Scala: StatisticalSummary.scala
** Tests: correlation, MultivariateOnlineSummarizer
* python: statistical_summary.py
** Tests: correlation (since MultivariateOnlineSummarizer has no Python API)

Added sc.stop() to all examples.

CorrelationSuite.scala
* Added 1 test for RDDs with only 1 value

Python SparseVector (pyspark/mllib/linalg.py)
* Added toDense() function

python/run-tests script
* Added stat.py (doc test)
  • Loading branch information
jkbradley committed Aug 7, 2014
1 parent 4201d27 commit ee918e9
Show file tree
Hide file tree
Showing 22 changed files with 240 additions and 8 deletions.
2 changes: 2 additions & 0 deletions examples/src/main/python/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,5 @@ def update(i, vec, mat, ratings):
error = rmse(R, ms, us)
print "Iteration %d:" % i
print "\nRMSE: %5.4f\n" % error

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/cassandra_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@
output = cass_rdd.collect()
for (k, v) in output:
print (k, v)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/cassandra_outputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,5 @@
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/hbase_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,5 @@
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/hbase_outputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,5 @@
conf=conf,
keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ def closestPoint(p, centers):
kPoints[x] = y

print "Final centers: " + str(kPoints)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ def add(x, y):
w -= points.map(lambda m: gradient(m, w)).reduce(add)

print "Final w: " + str(w)

sc.stop()
5 changes: 5 additions & 0 deletions examples/src/main/python/mllib/decision_tree_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""
Decision tree classification and regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""

import numpy, os, sys
Expand Down Expand Up @@ -117,6 +119,7 @@ def usage():
if len(sys.argv) == 2:
dataPath = sys.argv[1]
if not os.path.isfile(dataPath):
sc.stop()
usage()
points = MLUtils.loadLibSVMFile(sc, dataPath)

Expand All @@ -131,3 +134,5 @@ def usage():
print " Model depth: %d\n" % model.depth()
print " Training accuracy: %g\n" % getAccuracy(model, reindexedData)
print model

sc.stop()
1 change: 1 addition & 0 deletions examples/src/main/python/mllib/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ def parseVector(line):
k = int(sys.argv[2])
model = KMeans.train(data, k)
print "Final centers: " + str(model.clusterCenters)
sc.stop()
1 change: 1 addition & 0 deletions examples/src/main/python/mllib/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ def parsePoint(line):
model = LogisticRegressionWithSGD.train(points, iterations)
print "Final weights: " + str(model.weights)
print "Final intercept: " + str(model.intercept)
sc.stop()
60 changes: 60 additions & 0 deletions examples/src/main/python/mllib/statistical_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Statistical summarization using MLlib.
"""

import sys

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.mllib.util import MLUtils


if __name__ == "__main__":
if len(sys.argv) not in [1,2]:
print >> sys.stderr, "Usage: statistical_summary (<file>)"
exit(-1)
sc = SparkContext(appName="PythonStatisticalSummary")
if len(sys.argv) == 2:
filepath = sys.argv[1]
else:
filepath = 'data/mllib/sample_linear_regression_data.txt'
corrType = 'pearson'

points = MLUtils.loadLibSVMFile(sc, filepath)\
.map(lambda lp: LabeledPoint(lp.label, lp.features.toDense()))

print ''
print 'Summary of data file: ' + filepath
print '%d data points' % points.count()

# Statistics (correlations)
print ''
print 'Correlation (%s) between label and each feature' % corrType
print 'Feature\tCorrelation'
numFeatures = points.take(1)[0].features.size
labelRDD = points.map(lambda lp: lp.label)
for i in range(numFeatures):
featureRDD = points.map(lambda lp: lp.features[i])
corr = Statistics.corr(labelRDD, featureRDD, corrType)
print '%d\t%g' % (i, corr)
print ''

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ def parseNeighbors(urls):
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
print "%s has rank: %s." % (link, rank)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ def f(_):

count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
print "Pi is roughly %f" % (4.0 * count / n)

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@
output = sortedCount.collect()
for (num, unitcount) in output:
print num

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/transitive_closure.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@ def generateGraph():
break

print "TC has %i edges" % tc.count()

sc.stop()
2 changes: 2 additions & 0 deletions examples/src/main/python/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@
output = counts.collect()
for (word, count) in output:
print "%s: %i" % (word, count)

sc.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import scopt.OptionParser

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, Statistics}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.{SparkConf, SparkContext}


/**
* An example app for summarizing multivariate data from a file. Run with
* {{{
* bin/run-example org.apache.spark.examples.mllib.Statistics
* }}}
* By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`.
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object StatisticalSummary extends App {

case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")

val defaultParams = Params()

val parser = new OptionParser[Params]("StatisticalSummary") {
head("StatisticalSummary: an example app for MultivariateOnlineSummarizer and Statistics (correlation)")
opt[String]("input")
.text(s"Input paths to labeled examples in LIBSVM format, default: ${defaultParams.input}")
.action((x, c) => c.copy(input = x))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class org.apache.spark.examples.mllib.StatisticalSummary \
| examples/target/scala-*/spark-examples-*.jar \
| data/mllib/sample_linear_regression_data.txt
""".stripMargin)
}

parser.parse(args, defaultParams).map { params =>
run(params)
} getOrElse {
sys.exit(1)
}

def runStatisticalSummary(examples: RDD[LabeledPoint], params: Params) {
// Summarize labels
val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
(summary, lp) => summary.add(Vectors.dense(lp.label)),
(sum1, sum2) => sum1.merge(sum2))

// Summarize features
val featureSummary = examples.aggregate(new MultivariateOnlineSummarizer())(
(summary, lp) => summary.add(lp.features),
(sum1, sum2) => sum1.merge(sum2))

println()
println(s"Summary statistics")
println(s"\tLabel\tFeatures")
println(s"mean\t${labelSummary.mean(0)}\t${featureSummary.mean.toArray.mkString("\t")}")
println(s"var\t${labelSummary.variance(0)}\t${featureSummary.variance.toArray.mkString("\t")}")
println(
s"nnz\t${labelSummary.numNonzeros(0)}\t${featureSummary.numNonzeros.toArray.mkString("\t")}")
println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}")
println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}")
println()
}

def runCorrelations(examples: RDD[LabeledPoint], params: Params) {
// Calculate label -- feature correlations
val labelRDD = examples.map(_.label)
val numFeatures = examples.take(1)(0).features.size
val corrType = "pearson"
println()
println(s"Correlation ($corrType) between label and each feature")
println(s"Feature\tCorrelation")
var feature = 0
while (feature < numFeatures) {
val featureRDD = examples.map(_.features(feature))
val corr = Statistics.corr(labelRDD, featureRDD)
println(s"$feature\t$corr")
feature += 1
}
println()
}

def run(params: Params) {
val conf = new SparkConf().setAppName(s"StatisticalSummary with $params")
val sc = new SparkContext(conf)

val examples = MLUtils.loadLibSVMFile(sc, params.input).cache()

println(s"Summary of data file: ${params.input}")
println(s"${examples.count} data points")

runStatisticalSummary(examples, params)

runCorrelations(examples, params)

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
*/
def add(sample: Vector): this.type = {
if (n == 0) {
require(sample.toBreeze.length > 0, s"Vector should have dimension larger than zero.")
n = sample.toBreeze.length
require(sample.size > 0, s"Vector should have dimension larger than zero.")
n = sample.size

currMean = BDV.zeros[Double](n)
currM2n = BDV.zeros[Double](n)
Expand All @@ -65,8 +65,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
currMin = BDV.fill(n)(Double.MaxValue)
}

require(n == sample.toBreeze.length, s"Dimensions mismatch when adding new sample." +
s" Expecting $n but got ${sample.toBreeze.length}.")
require(n == sample.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $n but got ${sample.size}.")

sample.toBreeze.activeIterator.foreach {
case (_, 0.0) => // Skip explicit zero elements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {
Vectors.dense(9.0, 0.0, 0.0, 1.0)
)

test("corr(x, y) pearson, 1 value in data") {
val x = sc.parallelize(Array(1.0))
val y = sc.parallelize(Array(4.0))
assert(Statistics.corr(x, y, "pearson").isNaN)
assert(Statistics.corr(x, y, "spearman").isNaN)
}

test("corr(x, y) default, pearson") {
val x = sc.parallelize(xData)
val y = sc.parallelize(yData)
Expand All @@ -58,7 +65,7 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {

// RDD of zero variance
val z = sc.parallelize(zeros)
assert(Statistics.corr(x, z).isNaN())
assert(Statistics.corr(x, z).isNaN)
}

test("corr(x, y) spearman") {
Expand All @@ -78,7 +85,7 @@ class CorrelationSuite extends FunSuite with LocalSparkContext {

// RDD of zero variance => zero variance in ranks
val z = sc.parallelize(zeros)
assert(Statistics.corr(x, z, "spearman").isNaN())
assert(Statistics.corr(x, z, "spearman").isNaN)
}

test("corr(X) default, pearson") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class MultivariateOnlineSummarizerSuite extends FunSuite {
assert(summarizer.numNonzeros ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch")

assert(summarizer.variance ~==
Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, "variance mismatch")
Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5,
"variance mismatch")

assert(summarizer.count === 6)
}
Expand Down Expand Up @@ -167,7 +168,8 @@ class MultivariateOnlineSummarizerSuite extends FunSuite {
assert(summarizer.numNonzeros ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch")

assert(summarizer.variance ~==
Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, "variance mismatch")
Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5,
"variance mismatch")

assert(summarizer.count === 6)
}
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
SciPy is available in their environment.
"""

import numpy
from numpy import array, array_equal, ndarray, float64, int32


Expand Down Expand Up @@ -160,6 +161,15 @@ def squared_distance(self, other):
j += 1
return result

def toDense(self):
"""
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
arr = numpy.zeros(self.size)
for i in range(self.indices.size):
arr[self.indices[i]] = self.values[i]
return arr

def __str__(self):
inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
vals = "[" + ",".join([str(v) for v in self.values]) + "]"
Expand Down
1 change: 1 addition & 0 deletions python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/random.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/stat.py"
run_test "pyspark/mllib/tests.py"
run_test "pyspark/mllib/util.py"

Expand Down

0 comments on commit ee918e9

Please sign in to comment.