diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py index c862650b0aa1d..5b1fa4d997eeb 100755 --- a/examples/src/main/python/als.py +++ b/examples/src/main/python/als.py @@ -97,3 +97,5 @@ def update(i, vec, mat, ratings): error = rmse(R, ms, us) print "Iteration %d:" % i print "\nRMSE: %5.4f\n" % error + + sc.stop() diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py index 39fa6b0d22ef5..e4a897f61e39d 100644 --- a/examples/src/main/python/cassandra_inputformat.py +++ b/examples/src/main/python/cassandra_inputformat.py @@ -77,3 +77,5 @@ output = cass_rdd.collect() for (k, v) in output: print (k, v) + + sc.stop() diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py index 1dfbf98604425..836c35b5c6794 100644 --- a/examples/src/main/python/cassandra_outputformat.py +++ b/examples/src/main/python/cassandra_outputformat.py @@ -81,3 +81,5 @@ conf=conf, keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter", valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter") + + sc.stop() diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index c9fa8e171c2a1..befacee0dea56 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -71,3 +71,5 @@ output = hbase_rdd.collect() for (k, v) in output: print (k, v) + + sc.stop() diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py index 5e11548fd13f7..49bbc5aebdb0b 100644 --- a/examples/src/main/python/hbase_outputformat.py +++ b/examples/src/main/python/hbase_outputformat.py @@ -63,3 +63,5 @@ conf=conf, keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter", valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter") + + sc.stop() diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py index 036bdf4c4f999..86ef6f32c84e8 100755 --- a/examples/src/main/python/kmeans.py +++ b/examples/src/main/python/kmeans.py @@ -77,3 +77,5 @@ def closestPoint(p, centers): kPoints[x] = y print "Final centers: " + str(kPoints) + + sc.stop() diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py index 8456b272f9c05..3aa56b0528168 100755 --- a/examples/src/main/python/logistic_regression.py +++ b/examples/src/main/python/logistic_regression.py @@ -80,3 +80,5 @@ def add(x, y): w -= points.map(lambda m: gradient(m, w)).reduce(add) print "Final w: " + str(w) + + sc.stop() diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py new file mode 100755 index 0000000000000..6b16a56e44af7 --- /dev/null +++ b/examples/src/main/python/mllib/correlations.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Correlations using MLlib. +""" + +import sys + +from pyspark import SparkContext +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.stat import Statistics +from pyspark.mllib.util import MLUtils + + +if __name__ == "__main__": + if len(sys.argv) not in [1,2]: + print >> sys.stderr, "Usage: correlations ()" + exit(-1) + sc = SparkContext(appName="PythonCorrelations") + if len(sys.argv) == 2: + filepath = sys.argv[1] + else: + filepath = 'data/mllib/sample_linear_regression_data.txt' + corrType = 'pearson' + + points = MLUtils.loadLibSVMFile(sc, filepath)\ + .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray())) + + print + print 'Summary of data file: ' + filepath + print '%d data points' % points.count() + + # Statistics (correlations) + print + print 'Correlation (%s) between label and each feature' % corrType + print 'Feature\tCorrelation' + numFeatures = points.take(1)[0].features.size + labelRDD = points.map(lambda lp: lp.label) + for i in range(numFeatures): + featureRDD = points.map(lambda lp: lp.features[i]) + corr = Statistics.corr(labelRDD, featureRDD, corrType) + print '%d\t%g' % (i, corr) + print + + sc.stop() diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py index db96a7cb3730f..6e4a4a0cb6be0 100755 --- a/examples/src/main/python/mllib/decision_tree_runner.py +++ b/examples/src/main/python/mllib/decision_tree_runner.py @@ -17,6 +17,8 @@ """ Decision tree classification and regression using MLlib. + +This example requires NumPy (http://www.numpy.org/). """ import numpy, os, sys @@ -117,6 +119,7 @@ def usage(): if len(sys.argv) == 2: dataPath = sys.argv[1] if not os.path.isfile(dataPath): + sc.stop() usage() points = MLUtils.loadLibSVMFile(sc, dataPath) @@ -133,3 +136,5 @@ def usage(): print " Model depth: %d\n" % model.depth() print " Training accuracy: %g\n" % getAccuracy(model, reindexedData) print model + + sc.stop() diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py index b308132c9aeeb..2eeb1abeeb12b 100755 --- a/examples/src/main/python/mllib/kmeans.py +++ b/examples/src/main/python/mllib/kmeans.py @@ -42,3 +42,4 @@ def parseVector(line): k = int(sys.argv[2]) model = KMeans.train(data, k) print "Final centers: " + str(model.clusterCenters) + sc.stop() diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py index 9d547ff77c984..8cae27fc4a52d 100755 --- a/examples/src/main/python/mllib/logistic_regression.py +++ b/examples/src/main/python/mllib/logistic_regression.py @@ -50,3 +50,4 @@ def parsePoint(line): model = LogisticRegressionWithSGD.train(points, iterations) print "Final weights: " + str(model.weights) print "Final intercept: " + str(model.intercept) + sc.stop() diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py new file mode 100755 index 0000000000000..b388d8d83fb86 --- /dev/null +++ b/examples/src/main/python/mllib/random_rdd_generation.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Randomly generated RDDs. +""" + +import sys + +from pyspark import SparkContext +from pyspark.mllib.random import RandomRDDs + + +if __name__ == "__main__": + if len(sys.argv) not in [1, 2]: + print >> sys.stderr, "Usage: random_rdd_generation" + exit(-1) + + sc = SparkContext(appName="PythonRandomRDDGeneration") + + numExamples = 10000 # number of examples to generate + fraction = 0.1 # fraction of data to sample + + # Example: RandomRDDs.normalRDD + normalRDD = RandomRDDs.normalRDD(sc, numExamples) + print 'Generated RDD of %d examples sampled from the standard normal distribution'\ + % normalRDD.count() + print ' First 5 samples:' + for sample in normalRDD.take(5): + print ' ' + str(sample) + print + + # Example: RandomRDDs.normalVectorRDD + normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count() + print ' First 5 samples:' + for sample in normalVectorRDD.take(5): + print ' ' + str(sample) + print + + sc.stop() diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py new file mode 100755 index 0000000000000..ec64a5978c672 --- /dev/null +++ b/examples/src/main/python/mllib/sampled_rdds.py @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Randomly sampled RDDs. +""" + +import sys + +from pyspark import SparkContext +from pyspark.mllib.util import MLUtils + + +if __name__ == "__main__": + if len(sys.argv) not in [1, 2]: + print >> sys.stderr, "Usage: sampled_rdds " + exit(-1) + if len(sys.argv) == 2: + datapath = sys.argv[1] + else: + datapath = 'data/mllib/sample_binary_classification_data.txt' + + sc = SparkContext(appName="PythonSampledRDDs") + + fraction = 0.1 # fraction of data to sample + + examples = MLUtils.loadLibSVMFile(sc, datapath) + numExamples = examples.count() + if numExamples == 0: + print >> sys.stderr, "Error: Data file had no samples to load." + exit(1) + print 'Loaded data with %d examples from file: %s' % (numExamples, datapath) + + # Example: RDD.sample() and RDD.takeSample() + expectedSampleSize = int(numExamples * fraction) + print 'Sampling RDD using fraction %g. Expected sample size = %d.' \ + % (fraction, expectedSampleSize) + sampledRDD = examples.sample(withReplacement = True, fraction = fraction) + print ' RDD.sample(): sample has %d examples' % sampledRDD.count() + sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize) + print ' RDD.takeSample(): sample has %d examples' % len(sampledArray) + + print + + # Example: RDD.sampleByKey() + keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features)) + print ' Keyed data using label (Int) as key ==> Orig' + # Count examples per label in original data. + keyCountsA = keyedRDD.countByKey() + + # Subsample, and count examples per label in sampled data. + fractions = {} + for k in keyCountsA.keys(): + fractions[k] = fraction + sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = True, fractions = fractions) + keyCountsB = sampledByKeyRDD.countByKey() + sizeB = sum(keyCountsB.values()) + print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \ + % sizeB + + # Compare samples + print ' \tFractions of examples with key' + print 'Key\tOrig\tSample' + for k in sorted(keyCountsA.keys()): + fracA = keyCountsA[k] / float(numExamples) + if sizeB != 0: + fracB = keyCountsB.get(k, 0) / float(sizeB) + else: + fracB = 0 + print '%d\t%g\t%g' % (k, fracA, fracB) + + sc.stop() diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py index 0b96343158d44..b539c4128cdcc 100755 --- a/examples/src/main/python/pagerank.py +++ b/examples/src/main/python/pagerank.py @@ -68,3 +68,5 @@ def parseNeighbors(urls): # Collects all URL ranks and dump them to console. for (link, rank) in ranks.collect(): print "%s has rank: %s." % (link, rank) + + sc.stop() diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index 21d94a2cd4b64..fc37459dc74aa 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -37,3 +37,5 @@ def f(_): count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) print "Pi is roughly %f" % (4.0 * count / n) + + sc.stop() diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py index 41d00c1b79133..bb686f17518a0 100755 --- a/examples/src/main/python/sort.py +++ b/examples/src/main/python/sort.py @@ -34,3 +34,5 @@ output = sortedCount.collect() for (num, unitcount) in output: print num + + sc.stop() diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py index 8698369b13d84..bf331b542c438 100755 --- a/examples/src/main/python/transitive_closure.py +++ b/examples/src/main/python/transitive_closure.py @@ -64,3 +64,5 @@ def generateGraph(): break print "TC has %i edges" % tc.count() + + sc.stop() diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py index dcc095fdd0ed9..ae6cd13b83d92 100755 --- a/examples/src/main/python/wordcount.py +++ b/examples/src/main/python/wordcount.py @@ -33,3 +33,5 @@ output = counts.collect() for (word, count) in output: print "%s: %i" % (word, count) + + sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala new file mode 100644 index 0000000000000..d6b2fe430e5a4 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import scopt.OptionParser + +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.{SparkConf, SparkContext} + + +/** + * An example app for summarizing multivariate data from a file. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.Correlations + * }}} + * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object Correlations { + + case class Params(input: String = "data/mllib/sample_linear_regression_data.txt") + + def main(args: Array[String]) { + + val defaultParams = Params() + + val parser = new OptionParser[Params]("Correlations") { + head("Correlations: an example app for computing correlations") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \ + | examples/target/scala-*/spark-examples-*.jar \ + | --input data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"Correlations with $params") + val sc = new SparkContext(conf) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + println(s"Summary of data file: ${params.input}") + println(s"${examples.count()} data points") + + // Calculate label -- feature correlations + val labelRDD = examples.map(_.label) + val numFeatures = examples.take(1)(0).features.size + val corrType = "pearson" + println() + println(s"Correlation ($corrType) between label and each feature") + println(s"Feature\tCorrelation") + var feature = 0 + while (feature < numFeatures) { + val featureRDD = examples.map(_.features(feature)) + val corr = Statistics.corr(labelRDD, featureRDD) + println(s"$feature\t$corr") + feature += 1 + } + println() + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala new file mode 100644 index 0000000000000..4532512c01f84 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import scopt.OptionParser + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.{SparkConf, SparkContext} + + +/** + * An example app for summarizing multivariate data from a file. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer + * }}} + * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object MultivariateSummarizer { + + case class Params(input: String = "data/mllib/sample_linear_regression_data.txt") + + def main(args: Array[String]) { + + val defaultParams = Params() + + val parser = new OptionParser[Params]("MultivariateSummarizer") { + head("MultivariateSummarizer: an example app for MultivariateOnlineSummarizer") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \ + | examples/target/scala-*/spark-examples-*.jar \ + | --input data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"MultivariateSummarizer with $params") + val sc = new SparkContext(conf) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + println(s"Summary of data file: ${params.input}") + println(s"${examples.count()} data points") + + // Summarize labels + val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())( + (summary, lp) => summary.add(Vectors.dense(lp.label)), + (sum1, sum2) => sum1.merge(sum2)) + + // Summarize features + val featureSummary = examples.aggregate(new MultivariateOnlineSummarizer())( + (summary, lp) => summary.add(lp.features), + (sum1, sum2) => sum1.merge(sum2)) + + println() + println(s"Summary statistics") + println(s"\tLabel\tFeatures") + println(s"mean\t${labelSummary.mean(0)}\t${featureSummary.mean.toArray.mkString("\t")}") + println(s"var\t${labelSummary.variance(0)}\t${featureSummary.variance.toArray.mkString("\t")}") + println( + s"nnz\t${labelSummary.numNonzeros(0)}\t${featureSummary.numNonzeros.toArray.mkString("\t")}") + println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}") + println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}") + println() + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala new file mode 100644 index 0000000000000..924b586e3af99 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.mllib.random.RandomRDDs +import org.apache.spark.rdd.RDD + +import org.apache.spark.{SparkConf, SparkContext} + +/** + * An example app for randomly generated RDDs. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.RandomRDDGeneration + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object RandomRDDGeneration { + + def main(args: Array[String]) { + + val conf = new SparkConf().setAppName(s"RandomRDDGeneration") + val sc = new SparkContext(conf) + + val numExamples = 10000 // number of examples to generate + val fraction = 0.1 // fraction of data to sample + + // Example: RandomRDDs.normalRDD + val normalRDD: RDD[Double] = RandomRDDs.normalRDD(sc, numExamples) + println(s"Generated RDD of ${normalRDD.count()}" + + " examples sampled from the standard normal distribution") + println(" First 5 samples:") + normalRDD.take(5).foreach( x => println(s" $x") ) + + // Example: RandomRDDs.normalVectorRDD + val normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.") + println(" First 5 samples:") + normalVectorRDD.take(5).foreach( x => println(s" $x") ) + + println() + + sc.stop() + } + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala new file mode 100644 index 0000000000000..f01b8266e3fe3 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.mllib.util.MLUtils +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ + +/** + * An example app for randomly generated and sampled RDDs. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.SampledRDDs + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object SampledRDDs { + + case class Params(input: String = "data/mllib/sample_binary_classification_data.txt") + + def main(args: Array[String]) { + val defaultParams = Params() + + val parser = new OptionParser[Params]("SampledRDDs") { + head("SampledRDDs: an example app for randomly generated and sampled RDDs.") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.SampledRDDs \ + | examples/target/scala-*/spark-examples-*.jar + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"SampledRDDs with $params") + val sc = new SparkContext(conf) + + val fraction = 0.1 // fraction of data to sample + + val examples = MLUtils.loadLibSVMFile(sc, params.input) + val numExamples = examples.count() + if (numExamples == 0) { + throw new RuntimeException("Error: Data file had no samples to load.") + } + println(s"Loaded data with $numExamples examples from file: ${params.input}") + + // Example: RDD.sample() and RDD.takeSample() + val expectedSampleSize = (numExamples * fraction).toInt + println(s"Sampling RDD using fraction $fraction. Expected sample size = $expectedSampleSize.") + val sampledRDD = examples.sample(withReplacement = true, fraction = fraction) + println(s" RDD.sample(): sample has ${sampledRDD.count()} examples") + val sampledArray = examples.takeSample(withReplacement = true, num = expectedSampleSize) + println(s" RDD.takeSample(): sample has ${sampledArray.size} examples") + + println() + + // Example: RDD.sampleByKey() and RDD.sampleByKeyExact() + val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) } + println(s" Keyed data using label (Int) as key ==> Orig") + // Count examples per label in original data. + val keyCounts = keyedRDD.countByKey() + + // Subsample, and count examples per label in sampled data. (approximate) + val fractions = keyCounts.keys.map((_, fraction)).toMap + val sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = true, fractions = fractions) + val keyCountsB = sampledByKeyRDD.countByKey() + val sizeB = keyCountsB.values.sum + println(s" Sampled $sizeB examples using approximate stratified sampling (by label)." + + " ==> Approx Sample") + + // Subsample, and count examples per label in sampled data. (approximate) + val sampledByKeyRDDExact = + keyedRDD.sampleByKeyExact(withReplacement = true, fractions = fractions) + val keyCountsBExact = sampledByKeyRDDExact.countByKey() + val sizeBExact = keyCountsBExact.values.sum + println(s" Sampled $sizeBExact examples using exact stratified sampling (by label)." + + " ==> Exact Sample") + + // Compare samples + println(s" \tFractions of examples with key") + println(s"Key\tOrig\tApprox Sample\tExact Sample") + keyCounts.keys.toSeq.sorted.foreach { key => + val origFrac = keyCounts(key) / numExamples.toDouble + val approxFrac = if (sizeB != 0) { + keyCountsB.getOrElse(key, 0L) / sizeB.toDouble + } else { + 0 + } + val exactFrac = if (sizeBExact != 0) { + keyCountsBExact.getOrElse(key, 0L) / sizeBExact.toDouble + } else { + 0 + } + println(s"$key\t$origFrac\t$approxFrac\t$exactFrac") + } + + sc.stop() + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index e76bc9fefff01..2e414a73be8e0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -53,8 +53,14 @@ class RowMatrix( /** Gets or computes the number of columns. */ override def numCols(): Long = { if (nCols <= 0) { - // Calling `first` will throw an exception if `rows` is empty. - nCols = rows.first().size + try { + // Calling `first` will throw an exception if `rows` is empty. + nCols = rows.first().size + } catch { + case err: UnsupportedOperationException => + sys.error("Cannot determine the number of cols because it is not specified in the " + + "constructor and the rows RDD is empty.") + } } nCols } @@ -293,6 +299,10 @@ class RowMatrix( (s1._1 + s2._1, s1._2 += s2._2) ) + if (m <= 1) { + sys.error(s"RowMatrix.computeCovariance called on matrix with only $m rows." + + " Cannot compute the covariance of a RowMatrix with <= 1 row.") + } updateNumRows(m) mean :/= m.toDouble diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala index 5105b5c37aaaa..7d845c44365dd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala @@ -55,8 +55,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S */ def add(sample: Vector): this.type = { if (n == 0) { - require(sample.toBreeze.length > 0, s"Vector should have dimension larger than zero.") - n = sample.toBreeze.length + require(sample.size > 0, s"Vector should have dimension larger than zero.") + n = sample.size currMean = BDV.zeros[Double](n) currM2n = BDV.zeros[Double](n) @@ -65,8 +65,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S currMin = BDV.fill(n)(Double.MaxValue) } - require(n == sample.toBreeze.length, s"Dimensions mismatch when adding new sample." + - s" Expecting $n but got ${sample.toBreeze.length}.") + require(n == sample.size, s"Dimensions mismatch when adding new sample." + + s" Expecting $n but got ${sample.size}.") sample.toBreeze.activeIterator.foreach { case (_, 0.0) => // Skip explicit zero elements. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala index a3f76f77a5dcc..34548c86ebc14 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala @@ -39,6 +39,17 @@ class CorrelationSuite extends FunSuite with LocalSparkContext { Vectors.dense(9.0, 0.0, 0.0, 1.0) ) + test("corr(x, y) pearson, 1 value in data") { + val x = sc.parallelize(Array(1.0)) + val y = sc.parallelize(Array(4.0)) + intercept[RuntimeException] { + Statistics.corr(x, y, "pearson") + } + intercept[RuntimeException] { + Statistics.corr(x, y, "spearman") + } + } + test("corr(x, y) default, pearson") { val x = sc.parallelize(xData) val y = sc.parallelize(yData) @@ -58,7 +69,7 @@ class CorrelationSuite extends FunSuite with LocalSparkContext { // RDD of zero variance val z = sc.parallelize(zeros) - assert(Statistics.corr(x, z).isNaN()) + assert(Statistics.corr(x, z).isNaN) } test("corr(x, y) spearman") { @@ -78,7 +89,7 @@ class CorrelationSuite extends FunSuite with LocalSparkContext { // RDD of zero variance => zero variance in ranks val z = sc.parallelize(zeros) - assert(Statistics.corr(x, z, "spearman").isNaN()) + assert(Statistics.corr(x, z, "spearman").isNaN) } test("corr(X) default, pearson") { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala index db13f142df517..1e9415249104b 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala @@ -139,7 +139,8 @@ class MultivariateOnlineSummarizerSuite extends FunSuite { assert(summarizer.numNonzeros ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch") assert(summarizer.variance ~== - Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, "variance mismatch") + Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, + "variance mismatch") assert(summarizer.count === 6) } @@ -167,7 +168,8 @@ class MultivariateOnlineSummarizerSuite extends FunSuite { assert(summarizer.numNonzeros ~== Vectors.dense(3, 5, 2) absTol 1E-5, "numNonzeros mismatch") assert(summarizer.variance ~== - Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, "variance mismatch") + Vectors.dense(3.857666666666, 7.0456666666666, 2.48166666666666) absTol 1E-5, + "variance mismatch") assert(summarizer.count === 6) } diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 9a239abfbbeb1..f485a69db1fa2 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -23,6 +23,7 @@ SciPy is available in their environment. """ +import numpy from numpy import array, array_equal, ndarray, float64, int32 @@ -160,6 +161,15 @@ def squared_distance(self, other): j += 1 return result + def toArray(self): + """ + Returns a copy of this SparseVector as a 1-dimensional NumPy array. + """ + arr = numpy.zeros(self.size) + for i in xrange(self.indices.size): + arr[self.indices[i]] = self.values[i] + return arr + def __str__(self): inds = "[" + ",".join([str(i) for i in self.indices]) + "]" vals = "[" + ",".join([str(v) for v in self.values]) + "]" diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py index a73abc5ff90df..feef0d16cd644 100644 --- a/python/pyspark/mllib/stat.py +++ b/python/pyspark/mllib/stat.py @@ -118,16 +118,18 @@ def corr(x, y=None, method=None): >>> from linalg import Vectors >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]), ... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])]) - >>> Statistics.corr(rdd) - array([[ 1. , 0.05564149, nan, 0.40047142], - [ 0.05564149, 1. , nan, 0.91359586], - [ nan, nan, 1. , nan], - [ 0.40047142, 0.91359586, nan, 1. ]]) - >>> Statistics.corr(rdd, method="spearman") - array([[ 1. , 0.10540926, nan, 0.4 ], - [ 0.10540926, 1. , nan, 0.9486833 ], - [ nan, nan, 1. , nan], - [ 0.4 , 0.9486833 , nan, 1. ]]) + >>> pearsonCorr = Statistics.corr(rdd) + >>> print str(pearsonCorr).replace('nan', 'NaN') + [[ 1. 0.05564149 NaN 0.40047142] + [ 0.05564149 1. NaN 0.91359586] + [ NaN NaN 1. NaN] + [ 0.40047142 0.91359586 NaN 1. ]] + >>> spearmanCorr = Statistics.corr(rdd, method="spearman") + >>> print str(spearmanCorr).replace('nan', 'NaN') + [[ 1. 0.10540926 NaN 0.4 ] + [ 0.10540926 1. NaN 0.9486833 ] + [ NaN NaN 1. NaN] + [ 0.4 0.9486833 NaN 1. ]] >>> try: ... Statistics.corr(rdd, "spearman") ... print "Method name as second argument without 'method=' shouldn't be allowed." diff --git a/python/run-tests b/python/run-tests index a6271e0cf5fa9..b506559a5e810 100755 --- a/python/run-tests +++ b/python/run-tests @@ -78,6 +78,7 @@ run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/random.py" run_test "pyspark/mllib/recommendation.py" run_test "pyspark/mllib/regression.py" +run_test "pyspark/mllib/stat.py" run_test "pyspark/mllib/tests.py" run_test "pyspark/mllib/tree.py" run_test "pyspark/mllib/util.py"