diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py index d083705e52138..6b16a56e44af7 100755 --- a/examples/src/main/python/mllib/correlations.py +++ b/examples/src/main/python/mllib/correlations.py @@ -39,14 +39,14 @@ corrType = 'pearson' points = MLUtils.loadLibSVMFile(sc, filepath)\ - .map(lambda lp: LabeledPoint(lp.label, lp.features.toDense())) + .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray())) - print '' + print print 'Summary of data file: ' + filepath print '%d data points' % points.count() # Statistics (correlations) - print '' + print print 'Correlation (%s) between label and each feature' % corrType print 'Feature\tCorrelation' numFeatures = points.take(1)[0].features.size @@ -55,6 +55,6 @@ featureRDD = points.map(lambda lp: lp.features[i]) corr = Statistics.corr(labelRDD, featureRDD, corrType) print '%d\t%g' % (i, corr) - print '' + print sc.stop() diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py new file mode 100755 index 0000000000000..b388d8d83fb86 --- /dev/null +++ b/examples/src/main/python/mllib/random_rdd_generation.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Randomly generated RDDs. +""" + +import sys + +from pyspark import SparkContext +from pyspark.mllib.random import RandomRDDs + + +if __name__ == "__main__": + if len(sys.argv) not in [1, 2]: + print >> sys.stderr, "Usage: random_rdd_generation" + exit(-1) + + sc = SparkContext(appName="PythonRandomRDDGeneration") + + numExamples = 10000 # number of examples to generate + fraction = 0.1 # fraction of data to sample + + # Example: RandomRDDs.normalRDD + normalRDD = RandomRDDs.normalRDD(sc, numExamples) + print 'Generated RDD of %d examples sampled from the standard normal distribution'\ + % normalRDD.count() + print ' First 5 samples:' + for sample in normalRDD.take(5): + print ' ' + str(sample) + print + + # Example: RandomRDDs.normalVectorRDD + normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count() + print ' First 5 samples:' + for sample in normalVectorRDD.take(5): + print ' ' + str(sample) + print + + sc.stop() diff --git a/examples/src/main/python/mllib/random_and_sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py similarity index 68% rename from examples/src/main/python/mllib/random_and_sampled_rdds.py rename to examples/src/main/python/mllib/sampled_rdds.py index bca8e115085c3..652043a71584f 100755 --- a/examples/src/main/python/mllib/random_and_sampled_rdds.py +++ b/examples/src/main/python/mllib/sampled_rdds.py @@ -16,61 +16,49 @@ # """ -Randomly generated and sampled RDDs. +Randomly sampled RDDs. """ import sys from pyspark import SparkContext -from pyspark.mllib.random import RandomRDDGenerators from pyspark.mllib.util import MLUtils - if __name__ == "__main__": if len(sys.argv) not in [1, 2]: - print >> sys.stderr, "Usage: random_and_sampled_rdds " + print >> sys.stderr, "Usage: sampled_rdds " exit(-1) if len(sys.argv) == 2: datapath = sys.argv[1] else: datapath = 'data/mllib/sample_binary_classification_data.txt' - sc = SparkContext(appName="PythonRandomAndSampledRDDs") - - points = MLUtils.loadLibSVMFile(sc, datapath) + sc = SparkContext(appName="PythonSampledRDDs") - numExamples = 10000 # number of examples to generate fraction = 0.1 # fraction of data to sample - # Example: RandomRDDGenerators - normalRDD = RandomRDDGenerators.normalRDD(sc, numExamples) - print 'Generated RDD of %d examples sampled from the standard normal distribution'\ - % normalRDD.count() - normalVectorRDD = RandomRDDGenerators.normalVectorRDD(sc, numRows = numExamples, numCols = 2) - print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count() - - print + examples = MLUtils.loadLibSVMFile(sc, datapath) + numExamples = examples.count() + print 'Loaded data with %d examples from file: %s' % (numExamples, datapath) # Example: RDD.sample() and RDD.takeSample() expectedSampleSize = int(numExamples * fraction) print 'Sampling RDD using fraction %g. Expected sample size = %d.' \ % (fraction, expectedSampleSize) - sampledRDD = normalRDD.sample(withReplacement = True, fraction = fraction) + sampledRDD = examples.sample(withReplacement = True, fraction = fraction) print ' RDD.sample(): sample has %d examples' % sampledRDD.count() - sampledArray = normalRDD.takeSample(withReplacement = True, num = expectedSampleSize) + sampledArray = examples.takeSample(withReplacement = True, num = expectedSampleSize) print ' RDD.takeSample(): sample has %d examples' % len(sampledArray) print # Example: RDD.sampleByKey() - examples = MLUtils.loadLibSVMFile(sc, datapath) - sizeA = examples.count() - print 'Loaded data with %d examples from file: %s' % (sizeA, datapath) keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features)) print ' Keyed data using label (Int) as key ==> Orig' # Count examples per label in original data. keyCountsA = keyedRDD.countByKey() + # Subsample, and count examples per label in sampled data. fractions = {} for k in keyCountsA.keys(): @@ -80,9 +68,11 @@ sizeB = sum(keyCountsB.values()) print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \ % sizeB + + # Compare samples print ' \tFractions of examples with key' print 'Key\tOrig\tSample' for k in sorted(keyCountsA.keys()): - print '%d\t%g\t%g' % (k, keyCountsA[k] / float(sizeA), keyCountsB[k] / float(sizeB)) + print '%d\t%g\t%g' % (k, keyCountsA[k] / float(numExamples), keyCountsB[k] / float(sizeB)) sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala new file mode 100644 index 0000000000000..d6b2fe430e5a4 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/Correlations.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import scopt.OptionParser + +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.{SparkConf, SparkContext} + + +/** + * An example app for summarizing multivariate data from a file. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.Correlations + * }}} + * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`. + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object Correlations { + + case class Params(input: String = "data/mllib/sample_linear_regression_data.txt") + + def main(args: Array[String]) { + + val defaultParams = Params() + + val parser = new OptionParser[Params]("Correlations") { + head("Correlations: an example app for computing correlations") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.Correlations \ + | examples/target/scala-*/spark-examples-*.jar \ + | --input data/mllib/sample_linear_regression_data.txt + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"Correlations with $params") + val sc = new SparkContext(conf) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + println(s"Summary of data file: ${params.input}") + println(s"${examples.count()} data points") + + // Calculate label -- feature correlations + val labelRDD = examples.map(_.label) + val numFeatures = examples.take(1)(0).features.size + val corrType = "pearson" + println() + println(s"Correlation ($corrType) between label and each feature") + println(s"Feature\tCorrelation") + var feature = 0 + while (feature < numFeatures) { + val featureRDD = examples.map(_.features(feature)) + val corr = Statistics.corr(labelRDD, featureRDD) + println(s"$feature\t$corr") + feature += 1 + } + println() + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala similarity index 62% rename from examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala rename to examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala index 8917314301ca2..4532512c01f84 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MultivariateSummarizer.scala @@ -17,12 +17,10 @@ package org.apache.spark.examples.mllib -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.rdd.RDD import scopt.OptionParser import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, Statistics} +import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.mllib.util.MLUtils import org.apache.spark.{SparkConf, SparkContext} @@ -30,40 +28,50 @@ import org.apache.spark.{SparkConf, SparkContext} /** * An example app for summarizing multivariate data from a file. Run with * {{{ - * bin/run-example org.apache.spark.examples.mllib.Statistics + * bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer * }}} * By default, this loads a synthetic dataset from `data/mllib/sample_linear_regression_data.txt`. * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ -object StatisticalSummary extends App { +object MultivariateSummarizer { case class Params(input: String = "data/mllib/sample_linear_regression_data.txt") - val defaultParams = Params() + def main(args: Array[String]) { - val parser = new OptionParser[Params]("StatisticalSummary") { - head("StatisticalSummary: an example app for MultivariateOnlineSummarizer and Statistics" + - " (correlation)") - opt[String]("input") - .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") - .action((x, c) => c.copy(input = x)) - note( - """ + val defaultParams = Params() + + val parser = new OptionParser[Params]("MultivariateSummarizer") { + head("MultivariateSummarizer: an example app for MultivariateOnlineSummarizer") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ |For example, the following command runs this app on a synthetic dataset: | - | bin/spark-submit --class org.apache.spark.examples.mllib.StatisticalSummary \ + | bin/spark-submit --class org.apache.spark.examples.mllib.MultivariateSummarizer \ | examples/target/scala-*/spark-examples-*.jar \ | --input data/mllib/sample_linear_regression_data.txt - """.stripMargin) - } + """.stripMargin) + } - parser.parse(args, defaultParams).map { params => - run(params) - } getOrElse { - sys.exit(1) + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } } - def runStatisticalSummary(examples: RDD[LabeledPoint], params: Params) { + def run(params: Params) { + val conf = new SparkConf().setAppName(s"MultivariateSummarizer with $params") + val sc = new SparkContext(conf) + + val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() + + println(s"Summary of data file: ${params.input}") + println(s"${examples.count()} data points") + // Summarize labels val labelSummary = examples.aggregate(new MultivariateOnlineSummarizer())( (summary, lp) => summary.add(Vectors.dense(lp.label)), @@ -84,38 +92,6 @@ object StatisticalSummary extends App { println(s"max\t${labelSummary.max(0)}\t${featureSummary.max.toArray.mkString("\t")}") println(s"min\t${labelSummary.min(0)}\t${featureSummary.min.toArray.mkString("\t")}") println() - } - - def runCorrelations(examples: RDD[LabeledPoint], params: Params) { - // Calculate label -- feature correlations - val labelRDD = examples.map(_.label) - val numFeatures = examples.take(1)(0).features.size - val corrType = "pearson" - println() - println(s"Correlation ($corrType) between label and each feature") - println(s"Feature\tCorrelation") - var feature = 0 - while (feature < numFeatures) { - val featureRDD = examples.map(_.features(feature)) - val corr = Statistics.corr(labelRDD, featureRDD) - println(s"$feature\t$corr") - feature += 1 - } - println() - } - - def run(params: Params) { - val conf = new SparkConf().setAppName(s"StatisticalSummary with $params") - val sc = new SparkContext(conf) - - val examples = MLUtils.loadLibSVMFile(sc, params.input).cache() - - println(s"Summary of data file: ${params.input}") - println(s"${examples.count} data points") - - runStatisticalSummary(examples, params) - - runCorrelations(examples, params) sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala new file mode 100644 index 0000000000000..924b586e3af99 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomRDDGeneration.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.mllib.random.RandomRDDs +import org.apache.spark.rdd.RDD + +import org.apache.spark.{SparkConf, SparkContext} + +/** + * An example app for randomly generated RDDs. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.RandomRDDGeneration + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object RandomRDDGeneration { + + def main(args: Array[String]) { + + val conf = new SparkConf().setAppName(s"RandomRDDGeneration") + val sc = new SparkContext(conf) + + val numExamples = 10000 // number of examples to generate + val fraction = 0.1 // fraction of data to sample + + // Example: RandomRDDs.normalRDD + val normalRDD: RDD[Double] = RandomRDDs.normalRDD(sc, numExamples) + println(s"Generated RDD of ${normalRDD.count()}" + + " examples sampled from the standard normal distribution") + println(" First 5 samples:") + normalRDD.take(5).foreach( x => println(s" $x") ) + + // Example: RandomRDDs.normalVectorRDD + val normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.") + println(" First 5 samples:") + normalVectorRDD.take(5).foreach( x => println(s" $x") ) + + println() + + sc.stop() + } + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomAndSampledRDDs.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala similarity index 58% rename from examples/src/main/scala/org/apache/spark/examples/mllib/RandomAndSampledRDDs.scala rename to examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala index f336286daf7c1..42a906e0c296c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomAndSampledRDDs.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala @@ -17,9 +17,7 @@ package org.apache.spark.examples.mllib -import org.apache.spark.mllib.random.RandomRDDGenerators import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.rdd.RDD import scopt.OptionParser import org.apache.spark.{SparkConf, SparkContext} @@ -28,19 +26,19 @@ import org.apache.spark.SparkContext._ /** * An example app for randomly generated and sampled RDDs. Run with * {{{ - * bin/run-example org.apache.spark.examples.mllib.RandomAndSampledRDDs + * bin/run-example org.apache.spark.examples.mllib.SampledRDDs * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ -object RandomAndSampledRDDs { +object SampledRDDs { case class Params(input: String = "data/mllib/sample_binary_classification_data.txt") def main(args: Array[String]) { val defaultParams = Params() - val parser = new OptionParser[Params]("RandomAndSampledRDDs") { - head("RandomAndSampledRDDs: an example app for randomly generated and sampled RDDs.") + val parser = new OptionParser[Params]("SampledRDDs") { + head("SampledRDDs: an example app for randomly generated and sampled RDDs.") opt[String]("input") .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") .action((x, c) => c.copy(input = x)) @@ -48,10 +46,9 @@ object RandomAndSampledRDDs { """ |For example, the following command runs this app: | - | bin/spark-submit --class org.apache.spark.examples.mllib.RandomAndSampledRDDs \ + | bin/spark-submit --class org.apache.spark.examples.mllib.SampledRDDs \ | examples/target/scala-*/spark-examples-*.jar - """. - stripMargin) + """.stripMargin) } parser.parse(args, defaultParams).map { params => @@ -62,50 +59,55 @@ object RandomAndSampledRDDs { } def run(params: Params) { - val conf = new SparkConf().setAppName(s"RandomAndSampledRDDs with $params") + val conf = new SparkConf().setAppName(s"SampledRDDs with $params") val sc = new SparkContext(conf) - val numExamples = 10000 // number of examples to generate val fraction = 0.1 // fraction of data to sample - // Example: RandomRDDGenerators - val normalRDD: RDD[Double] = RandomRDDGenerators.normalRDD(sc, numExamples) - println(s"Generated RDD of ${normalRDD.count()} examples sampled from a unit normal distribution") - val normalVectorRDD = - RandomRDDGenerators.normalVectorRDD(sc, numRows = numExamples, numCols = 2) - println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.") - - println() + val examples = MLUtils.loadLibSVMFile(sc, params.input) + val numExamples = examples.count() + println(s"Loaded data with $numExamples examples from file: ${params.input}") // Example: RDD.sample() and RDD.takeSample() - val exactSampleSize = (numExamples * fraction).toInt - println(s"Sampling RDD using fraction $fraction. Expected sample size = $exactSampleSize.") - val sampledRDD = normalRDD.sample(withReplacement = true, fraction = fraction) + val expectedSampleSize = (numExamples * fraction).toInt + println(s"Sampling RDD using fraction $fraction. Expected sample size = $expectedSampleSize.") + val sampledRDD = examples.sample(withReplacement = true, fraction = fraction) println(s" RDD.sample(): sample has ${sampledRDD.count()} examples") - val sampledArray = normalRDD.takeSample(withReplacement = true, num = exactSampleSize) + val sampledArray = examples.takeSample(withReplacement = true, num = expectedSampleSize) println(s" RDD.takeSample(): sample has ${sampledArray.size} examples") println() - // Example: RDD.sampleByKey() - val examples = MLUtils.loadLibSVMFile(sc, params.input) - val sizeA = examples.count() - println(s"Loaded data with $sizeA examples from file: ${params.input}") + // Example: RDD.sampleByKey() and RDD.sampleByKeyExact() val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) } println(s" Keyed data using label (Int) as key ==> Orig") // Count examples per label in original data. - val keyCountsA = keyedRDD.countByKey() - // Subsample, and count examples per label in sampled data. - val fractions = keyCountsA.keys.map((_, fraction)).toMap - val sampledByKeyRDD = - keyedRDD.sampleByKey(withReplacement = true, fractions = fractions, exact = true) + val keyCounts = keyedRDD.countByKey() + + // Subsample, and count examples per label in sampled data. (approximate) + val fractions = keyCounts.keys.map((_, fraction)).toMap + val sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = true, fractions = fractions) val keyCountsB = sampledByKeyRDD.countByKey() val sizeB = keyCountsB.values.sum - println(s" Sampled $sizeB examples using exact stratified sampling (by label). ==> Sample") + println(s" Sampled $sizeB examples using approximate stratified sampling (by label)." + + " ==> Approx Sample") + + // Subsample, and count examples per label in sampled data. (approximate) + val sampledByKeyRDDExact = + keyedRDD.sampleByKeyExact(withReplacement = true, fractions = fractions) + val keyCountsBExact = sampledByKeyRDDExact.countByKey() + val sizeBExact = keyCountsBExact.values.sum + println(s" Sampled $sizeBExact examples using exact stratified sampling (by label)." + + " ==> Exact Sample") + + // Compare samples println(s" \tFractions of examples with key") - println(s"Key\tOrig\tSample") - keyCountsA.keys.toSeq.sorted.foreach { key => - println(s"$key\t${keyCountsA(key) / sizeA.toDouble}\t${keyCountsB(key) / sizeB.toDouble}") + println(s"Key\tOrig\tApprox Sample\tExact Sample") + keyCounts.keys.toSeq.sorted.foreach { key => + val origFrac = keyCounts(key) / numExamples.toDouble + val approxFrac = keyCountsB(key) / sizeB.toDouble + val exactFrac = keyCountsBExact(key) / sizeBExact.toDouble + println(s"$key\t$origFrac\t$approxFrac\t$exactFrac") } sc.stop()