diff --git a/examples/src/main/python/mllib/random_and_sampled_rdds.py b/examples/src/main/python/mllib/random_and_sampled_rdds.py new file mode 100755 index 0000000000000..efa455a37335e --- /dev/null +++ b/examples/src/main/python/mllib/random_and_sampled_rdds.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Randomly generated and sampled RDDs. +""" + +import sys + +from pyspark import SparkContext +from pyspark.mllib.random import RandomRDDGenerators +from pyspark.mllib.util import MLUtils + + + +if __name__ == "__main__": + if len(sys.argv) not in [1, 2]: + print >> sys.stderr, "Usage: logistic_regression " + exit(-1) + if len(sys.argv) == 2: + datapath = sys.argv[1] + else: + datapath = 'data/mllib/sample_binary_classification_data.txt' + + sc = SparkContext(appName="PythonRandomAndSampledRDDs") + + points = MLUtils.loadLibSVMFile(sc, datapath) + + numExamples = 10000 # number of examples to generate + fraction = 0.1 # fraction of data to sample + + # Example: RandomRDDGenerators + normalRDD = RandomRDDGenerators.normalRDD(sc, numExamples) + print 'Generated RDD of %d examples sampled from a unit normal distribution' % normalRDD.count() + normalVectorRDD = RandomRDDGenerators.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count() + + print '' + + # Example: RDD.sample() and RDD.takeSample() + exactSampleSize = int(numExamples * fraction) + print 'Sampling RDD using fraction %g. Expected sample size = %d.' \ + % (fraction, exactSampleSize) + sampledRDD = normalRDD.sample(withReplacement = True, fraction = fraction) + print ' RDD.sample(): sample has %d examples' % sampledRDD.count() + sampledArray = normalRDD.takeSample(withReplacement = True, num = exactSampleSize) + print ' RDD.takeSample(): sample has %d examples' % len(sampledArray) + + print '' + + # Example: RDD.sampleByKey() + examples = MLUtils.loadLibSVMFile(sc, datapath) + sizeA = examples.count() + print 'Loaded data with %d examples from file: %s' % (sizeA, datapath) + keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features)) + print ' Keyed data using label (Int) as key ==> Orig' + # Count examples per label in original data. + keyCountsA = keyedRDD.countByKey() + # Subsample, and count examples per label in sampled data. + fractions = {} + for k in keyCountsA.keys(): + fractions[k] = fraction + sampledByKeyRDD = \ + keyedRDD.sampleByKey(withReplacement = True, fractions = fractions)#, exact = True) + keyCountsB = sampledByKeyRDD.countByKey() + sizeB = sum(keyCountsB.values()) + print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \ + % sizeB + print ' \tFractions of examples with key' + print 'Key\tOrig\tSample' + for k in sorted(keyCountsA.keys()): + print '%d\t%g\t%g' % (k, keyCountsA[k] / float(sizeA), keyCountsB[k] / float(sizeB)) + + sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/RandomAndSampledRDDs.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomAndSampledRDDs.scala new file mode 100644 index 0000000000000..11e5833f52ea3 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/RandomAndSampledRDDs.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.mllib.random.RandomRDDGenerators +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ + +/** + * An example app for randomly generated and sampled RDDs. Run with + * {{{ + * bin/run-example org.apache.spark.examples.mllib.RandomAndSampledRDDs + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ +object RandomAndSampledRDDs extends App { + + case class Params(input: String = "data/mllib/sample_binary_classification_data.txt") + + val defaultParams = Params() + + val parser = new OptionParser[Params]("RandomAndSampledRDDs") { + head("RandomAndSampledRDDs: an example app for randomly generated and sampled RDDs.") + opt[String]("input") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .action((x, c) => c.copy(input = x)) + note( + """ + |For example, the following command runs this app: + | + | bin/spark-submit --class org.apache.spark.examples.mllib.RandomAndSampledRDDs \ + | examples/target/scala-*/spark-examples-*.jar + """.stripMargin) + } + + parser.parse(args, defaultParams).map { params => + run(params) + } getOrElse { + sys.exit(1) + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"RandomAndSampledRDDs with $params") + val sc = new SparkContext(conf) + + val numExamples = 10000 // number of examples to generate + val fraction = 0.1 // fraction of data to sample + + // Example: RandomRDDGenerators + val normalRDD: RDD[Double] = RandomRDDGenerators.normalRDD(sc, numExamples) + println(s"Generated RDD of ${normalRDD.count()} examples sampled from a unit normal distribution") + val normalVectorRDD = + RandomRDDGenerators.normalVectorRDD(sc, numRows = numExamples, numCols = 2) + println(s"Generated RDD of ${normalVectorRDD.count()} examples of length-2 vectors.") + + println() + + // Example: RDD.sample() and RDD.takeSample() + val exactSampleSize = (numExamples * fraction).toInt + println(s"Sampling RDD using fraction $fraction. Expected sample size = $exactSampleSize.") + val sampledRDD = normalRDD.sample(withReplacement = true, fraction = fraction) + println(s" RDD.sample(): sample has ${sampledRDD.count()} examples") + val sampledArray = normalRDD.takeSample(withReplacement = true, num = exactSampleSize) + println(s" RDD.takeSample(): sample has ${sampledArray.size} examples") + + println() + + // Example: RDD.sampleByKey() + val examples = MLUtils.loadLibSVMFile(sc, params.input) + val sizeA = examples.count() + println(s"Loaded data with $sizeA examples from file: ${params.input}") + val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) } + println(s" Keyed data using label (Int) as key ==> Orig") + // Count examples per label in original data. + val keyCountsA = keyedRDD.countByKey() + // Subsample, and count examples per label in sampled data. + val fractions = keyCountsA.keys.map((_, fraction)).toMap + val sampledByKeyRDD = + keyedRDD.sampleByKey(withReplacement = true, fractions = fractions, exact = true) + val keyCountsB = sampledByKeyRDD.countByKey() + val sizeB = keyCountsB.values.sum + println(s" Sampled $sizeB examples using exact stratified sampling (by label). ==> Sample") + println(s" \tFractions of examples with key") + println(s"Key\tOrig\tSample") + keyCountsA.keys.toSeq.sorted.foreach { key => + println(s"$key\t${keyCountsA(key) / sizeA.toDouble}\t${keyCountsB(key) / sizeB.toDouble}") + } + + sc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala index a20f880bd6c5e..8917314301ca2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StatisticalSummary.scala @@ -42,9 +42,10 @@ object StatisticalSummary extends App { val defaultParams = Params() val parser = new OptionParser[Params]("StatisticalSummary") { - head("StatisticalSummary: an example app for MultivariateOnlineSummarizer and Statistics (correlation)") + head("StatisticalSummary: an example app for MultivariateOnlineSummarizer and Statistics" + + " (correlation)") opt[String]("input") - .text(s"Input paths to labeled examples in LIBSVM format, default: ${defaultParams.input}") + .text(s"Input path to labeled examples in LIBSVM format, default: ${defaultParams.input}") .action((x, c) => c.copy(input = x)) note( """ @@ -52,7 +53,7 @@ object StatisticalSummary extends App { | | bin/spark-submit --class org.apache.spark.examples.mllib.StatisticalSummary \ | examples/target/scala-*/spark-examples-*.jar \ - | data/mllib/sample_linear_regression_data.txt + | --input data/mllib/sample_linear_regression_data.txt """.stripMargin) }