diff --git a/LICENSE b/LICENSE index b2001f029a4f0..d6b9ccf07d999 100644 --- a/LICENSE +++ b/LICENSE @@ -836,6 +836,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. +======================================================================== +For vis.js (core/src/main/resources/org/apache/spark/ui/static/vis.min.js): +======================================================================== +Copyright (C) 2010-2015 Almende B.V. + +Vis.js is dual licensed under both + + * The Apache 2.0 License + http://www.apache.org/licenses/LICENSE-2.0 + +and + + * The MIT License + http://opensource.org/licenses/MIT + +Vis.js may be distributed under either license. ======================================================================== BSD-style licenses diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 73999a6737032..9138629cac9c0 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -67,8 +67,8 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode, }) setMethod("show", "RDD", - function(.Object) { - cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep="")) + function(object) { + cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep="")) }) setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) { diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css index d40de704229c3..d1e6d462b836f 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.css @@ -19,7 +19,8 @@ div#application-timeline, div#job-timeline { margin-bottom: 30px; } -#application-timeline div.legend-area { +#application-timeline div.legend-area, +#job-timeline div.legend-area { margin-top: 5px; } diff --git a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js index 48fbb33b1155b..558beb8a5867f 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js +++ b/core/src/main/resources/org/apache/spark/ui/static/timeline-view.js @@ -39,23 +39,24 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) { function setupJobEventAction() { $(".item.range.job.application-timeline-object").each(function() { - var getJobId = function(baseElem) { + var getSelectorForJobEntry = function(baseElem) { var jobIdText = $($(baseElem).find(".application-timeline-content")[0]).text(); var jobId = jobIdText.match("\\(Job (\\d+)\\)")[1]; - return jobId; + return "#job-" + jobId; }; $(this).click(function() { - window.location.href = "job/?id=" + getJobId(this); + var jobPagePath = $(getSelectorForJobEntry(this)).find("a").attr("href") + window.location.href = jobPagePath }); $(this).hover( function() { - $("#job-" + getJobId(this)).addClass("corresponding-item-hover"); + $(getSelectorForJobEntry(this)).addClass("corresponding-item-hover"); $($(this).find("div.application-timeline-content")[0]).tooltip("show"); }, function() { - $("#job-" + getJobId(this)).removeClass("corresponding-item-hover"); + $(getSelectorForJobEntry(this)).removeClass("corresponding-item-hover"); $($(this).find("div.application-timeline-content")[0]).tooltip("hide"); } ); @@ -97,32 +98,24 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) { function setupStageEventAction() { $(".item.range.stage.job-timeline-object").each(function() { - var getStageIdAndAttempt = function(baseElem) { + var getSelectorForStageEntry = function(baseElem) { var stageIdText = $($(baseElem).find(".job-timeline-content")[0]).text(); var stageIdAndAttempt = stageIdText.match("\\(Stage (\\d+\\.\\d+)\\)")[1].split("."); - return stageIdAndAttempt; + return "#stage-" + stageIdAndAttempt[0] + "-" + stageIdAndAttempt[1]; }; $(this).click(function() { - var idAndAttempt = getStageIdAndAttempt(this); - var id = idAndAttempt[0]; - var attempt = idAndAttempt[1]; - window.location.href = "../../stages/stage/?id=" + id + "&attempt=" + attempt; + var stagePagePath = $(getSelectorForStageEntry(this)).find("a").attr("href") + window.location.href = stagePagePath }); $(this).hover( function() { - var idAndAttempt = getStageIdAndAttempt(this); - var id = idAndAttempt[0]; - var attempt = idAndAttempt[1]; - $("#stage-" + id + "-" + attempt).addClass("corresponding-item-hover"); + $(getSelectorForStageEntry(this)).addClass("corresponding-item-hover"); $($(this).find("div.job-timeline-content")[0]).tooltip("show"); }, function() { - var idAndAttempt = getStageIdAndAttempt(this); - var id = idAndAttempt[0]; - var attempt = idAndAttempt[1]; - $("#stage-" + id + "-" + attempt).removeClass("corresponding-item-hover"); + $(getSelectorForStageEntry(this)).removeClass("corresponding-item-hover"); $($(this).find("div.job-timeline-content")[0]).tooltip("hide"); } ); diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8baf199f215fb..7dad30ecbdd2f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1161,8 +1161,8 @@ abstract class RDD[T: ClassTag]( */ @Experimental def countApproxDistinct(p: Int, sp: Int): Long = withScope { - require(p >= 4, s"p ($p) must be at least 4") - require(sp <= 32, s"sp ($sp) cannot be greater than 32") + require(p >= 4, s"p ($p) must be >= 4") + require(sp <= 32, s"sp ($sp) must be <= 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") val zeroCounter = new HyperLogLogPlus(p, sp) aggregate(zeroCounter)( @@ -1187,8 +1187,9 @@ abstract class RDD[T: ClassTag]( * It must be greater than 0.000017. */ def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope { + require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt - countApproxDistinct(p, 0) + countApproxDistinct(if (p < 4) 4 else p, 0) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index ef8c36a28655b..afc11bdc4d6ab 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -89,6 +89,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val simpleRdd = sc.makeRDD(uniformDistro, 10) assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2) assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1) + assert(error(simpleRdd.countApproxDistinct(0.02), size) < 0.1) + assert(error(simpleRdd.countApproxDistinct(0.5), size) < 0.22) } test("SparkContext.union") { diff --git a/docs/mllib-dimensionality-reduction.md b/docs/mllib-dimensionality-reduction.md index 870fed6cc5024..05f51168d837c 100644 --- a/docs/mllib-dimensionality-reduction.md +++ b/docs/mllib-dimensionality-reduction.md @@ -137,7 +137,7 @@ statistical method to find a rotation such that the first coordinate has the lar possible, and each succeeding coordinate in turn has the largest variance possible. The columns of the rotation matrix are called principal components. PCA is used widely in dimensionality reduction. -MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format. +MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format and any Vectors.
@@ -157,6 +157,23 @@ val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are // Project the rows to the linear space spanned by the top 10 principal components. val projected: RowMatrix = mat.multiply(pc) {% endhighlight %} + +The following code demonstrates how to compute principal components on source vectors +and use them to project the vectors into a low-dimensional space while keeping associated labels: + +{% highlight scala %} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.feature.PCA + +val data: RDD[LabeledPoint] = ... + +// Compute the top 10 principal components. +val pca = new PCA(10).fit(data.map(_.features)) + +// Project vectors to the linear space spanned by the top 10 principal components, keeping the label +val projected = data.map(p => p.copy(features = pca.transform(p.features))) +{% endhighlight %} +
diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 03fedd01016b9..f723cd6b9dfab 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -507,7 +507,6 @@ v_N This example below demonstrates how to load a simple vectors file, extract a set of vectors, then transform those vectors using a transforming vector value. -
{% highlight scala %} @@ -531,3 +530,57 @@ val transformedData2 = parsedData.map(x => transformer.transform(x))
+## PCA + +A feature transformer that projects vectors to a low-dimensional space using PCA. +Details you can read at [dimensionality reduction](mllib-dimensionality-reduction.html). + +### Example + +The following code demonstrates how to compute principal components on a `Vector` +and use them to project the vectors into a low-dimensional space while keeping associated labels +for calculation a [Linear Regression]((mllib-linear-methods.html)) + +
+
+{% highlight scala %} +import org.apache.spark.mllib.regression.LinearRegressionWithSGD +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.feature.PCA + +val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line => + val parts = line.split(',') + LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) +}.cache() + +val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) +val training = splits(0).cache() +val test = splits(1) + +val pca = new PCA(training.first().features.size/2).fit(data.map(_.features)) +val training_pca = training.map(p => p.copy(features = pca.transform(p.features))) +val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) + +val numIterations = 100 +val model = LinearRegressionWithSGD.train(training, numIterations) +val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations) + +val valuesAndPreds = test.map { point => + val score = model.predict(point.features) + (score, point.label) +} + +val valuesAndPreds_pca = test_pca.map { point => + val score = model_pca.predict(point.features) + (score, point.label) +} + +val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() +val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean() + +println("Mean Squared Error = " + MSE) +println("PCA Mean Squared Error = " + MSE_pca) +{% endhighlight %} +
+
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 2f2fea53168a3..bd863d48d53e3 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1915,7 +1915,7 @@ In that case, consider [reducing](#reducing-the-processing-time-of-each-batch) the batch processing time. The progress of a Spark Streaming program can also be monitored using the -[StreamingListener](api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface, +[StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) interface, which allows you to get receiver status and processing times. Note that this is a developer API and it is likely to be improved upon (i.e., more information reported) in the future. diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala index 8340d91101ab3..54e4073941056 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala @@ -112,7 +112,7 @@ object DecisionTreeExample { .text(s"input path to test dataset. If given, option fracTest is ignored." + s" default: ${defaultParams.testInput}") .action((x, c) => c.copy(testInput = x)) - opt[String]("") + opt[String]("dataFormat") .text("data format: libsvm (default), dense (deprecated in Spark v1.1)") .action((x, c) => c.copy(dataFormat = x)) arg[String]("") diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala index c5899b6683c79..33905277c7341 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala @@ -111,7 +111,7 @@ object GBTExample { .text(s"input path to test dataset. If given, option fracTest is ignored." + s" default: ${defaultParams.testInput}") .action((x, c) => c.copy(testInput = x)) - opt[String]("") + opt[String]("dataFormat") .text("data format: libsvm (default), dense (deprecated in Spark v1.1)") .action((x, c) => c.copy(dataFormat = x)) arg[String]("") diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala index 7f88d2681bcaa..9f7cad68a4594 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala @@ -117,7 +117,7 @@ object RandomForestExample { .text(s"input path to test dataset. If given, option fracTest is ignored." + s" default: ${defaultParams.testInput}") .action((x, c) => c.copy(testInput = x)) - opt[String]("") + opt[String]("dataFormat") .text("data format: libsvm (default), dense (deprecated in Spark v1.1)") .action((x, c) => c.copy(dataFormat = x)) arg[String]("") diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala index 262fd2c9611d0..b0613632c9946 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -126,7 +126,7 @@ object DecisionTreeRunner { .text(s"input path to test dataset. If given, option fracTest is ignored." + s" default: ${defaultParams.testInput}") .action((x, c) => c.copy(testInput = x)) - opt[String]("") + opt[String]("dataFormat") .text("data format: libsvm (default), dense (deprecated in Spark v1.1)") .action((x, c) => c.copy(dataFormat = x)) arg[String]("") diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala index 0763a7736305a..7416fb5a40848 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala @@ -69,7 +69,7 @@ object GradientBoostedTreesRunner { .text(s"input path to test dataset. If given, option fracTest is ignored." + s" default: ${defaultParams.testInput}") .action((x, c) => c.copy(testInput = x)) - opt[String]("") + opt[String]("dataFormat") .text("data format: libsvm (default), dense (deprecated in Spark v1.1)") .action((x, c) => c.copy(dataFormat = x)) arg[String]("") diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index a1850390c0a86..31d629f853161 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -26,7 +26,7 @@ import scopt.OptionParser import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} +import org.apache.spark.mllib.clustering.{EMLDAOptimizer, OnlineLDAOptimizer, DistributedLDAModel, LDA} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD @@ -48,6 +48,7 @@ object LDAExample { topicConcentration: Double = -1, vocabSize: Int = 10000, stopwordFile: String = "", + algorithm: String = "em", checkpointDir: Option[String] = None, checkpointInterval: Int = 10) extends AbstractParams[Params] @@ -78,6 +79,10 @@ object LDAExample { .text(s"filepath for a list of stopwords. Note: This must fit on a single machine." + s" default: ${defaultParams.stopwordFile}") .action((x, c) => c.copy(stopwordFile = x)) + opt[String]("algorithm") + .text(s"inference algorithm to use. em and online are supported." + + s" default: ${defaultParams.algorithm}") + .action((x, c) => c.copy(algorithm = x)) opt[String]("checkpointDir") .text(s"Directory for checkpointing intermediate results." + s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." + @@ -128,7 +133,17 @@ object LDAExample { // Run LDA. val lda = new LDA() - lda.setK(params.k) + + val optimizer = params.algorithm.toLowerCase match { + case "em" => new EMLDAOptimizer + // add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets. + case "online" => new OnlineLDAOptimizer().setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize) + case _ => throw new IllegalArgumentException( + s"Only em, online are supported but got ${params.algorithm}.") + } + + lda.setOptimizer(optimizer) + .setK(params.k) .setMaxIterations(params.maxIterations) .setDocConcentration(params.docConcentration) .setTopicConcentration(params.topicConcentration) @@ -137,14 +152,18 @@ object LDAExample { sc.setCheckpointDir(params.checkpointDir.get) } val startTime = System.nanoTime() - val ldaModel = lda.run(corpus).asInstanceOf[DistributedLDAModel] + val ldaModel = lda.run(corpus) val elapsed = (System.nanoTime() - startTime) / 1e9 println(s"Finished training LDA model. Summary:") println(s"\t Training time: $elapsed sec") - val avgLogLikelihood = ldaModel.logLikelihood / actualCorpusSize.toDouble - println(s"\t Training data average log likelihood: $avgLogLikelihood") - println() + + if (ldaModel.isInstanceOf[DistributedLDAModel]) { + val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel] + val avgLogLikelihood = distLDAModel.logLikelihood / actualCorpusSize.toDouble + println(s"\t Training data average log likelihood: $avgLogLikelihood") + println() + } // Print the topics, showing the top-weighted terms for each topic. val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 8c30ad4b391ae..f4c477596557f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -32,6 +32,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.python.SerDeUtil import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ +import org.apache.spark.mllib.evaluation.RankingMetrics import org.apache.spark.mllib.feature._ import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel} import org.apache.spark.mllib.linalg._ @@ -50,6 +51,7 @@ import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTree import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomForest} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -923,6 +925,14 @@ private[python] class PythonMLLibAPI extends Serializable { RG.gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, parts, s) } + /** + * Java stub for the constructor of Python mllib RankingMetrics + */ + def newRankingMetrics(predictionAndLabels: DataFrame): RankingMetrics[Any] = { + new RankingMetrics(predictionAndLabels.map( + r => (r.getSeq(0).toArray[Any], r.getSeq(1).toArray[Any]))) + } + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala index 666362ae6739a..4628dc5690913 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala @@ -23,6 +23,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Matrices, Matrix} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame /** * ::Experimental:: @@ -33,6 +34,13 @@ import org.apache.spark.rdd.RDD @Experimental class MulticlassMetrics(predictionAndLabels: RDD[(Double, Double)]) { + /** + * An auxiliary constructor taking a DataFrame. + * @param predictionAndLabels a DataFrame with two double columns: prediction and label + */ + private[mllib] def this(predictionAndLabels: DataFrame) = + this(predictionAndLabels.map(r => (r.getDouble(0), r.getDouble(1)))) + private lazy val labelCountByClass: Map[Double, Long] = predictionAndLabels.values.countByValue() private lazy val labelCount: Long = labelCountByClass.values.sum private lazy val tpByClass: Map[Double, Int] = predictionAndLabels diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala new file mode 100644 index 0000000000000..4e01e402b4283 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.rdd.RDD + +/** + * A feature transformer that projects vectors to a low-dimensional space using PCA. + * + * @param k number of principal components + */ +class PCA(val k: Int) { + require(k >= 1, s"PCA requires a number of principal components k >= 1 but was given $k") + + /** + * Computes a [[PCAModel]] that contains the principal components of the input vectors. + * + * @param sources source vectors + */ + def fit(sources: RDD[Vector]): PCAModel = { + require(k <= sources.first().size, + s"source vector size is ${sources.first().size} must be greater than k=$k") + + val mat = new RowMatrix(sources) + val pc = mat.computePrincipalComponents(k) match { + case dm: DenseMatrix => + dm + case sm: SparseMatrix => + /* Convert a sparse matrix to dense. + * + * RowMatrix.computePrincipalComponents always returns a dense matrix. + * The following code is a safeguard. + */ + sm.toDense + case m => + throw new IllegalArgumentException("Unsupported matrix format. Expected " + + s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}") + + } + new PCAModel(k, pc) + } + + /** Java-friendly version of [[fit()]] */ + def fit(sources: JavaRDD[Vector]): PCAModel = fit(sources.rdd) +} + +/** + * Model fitted by [[PCA]] that can project vectors to a low-dimensional space using PCA. + * + * @param k number of principal components. + * @param pc a principal components Matrix. Each column is one principal component. + */ +class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends VectorTransformer { + /** + * Transform a vector by computed Principal Components. + * + * @param vector vector to be transformed. + * Vector must be the same length as the source vectors given to [[PCA.fit()]]. + * @return transformed vector. Vector will be of length k. + */ + override def transform(vector: Vector): Vector = { + vector match { + case dv: DenseVector => + pc.transpose.multiply(dv) + case SparseVector(size, indices, values) => + /* SparseVector -> single row SparseMatrix */ + val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose + val projection = sm.multiply(pc) + Vectors.dense(projection.values) + case _ => + throw new IllegalArgumentException("Unsupported vector format. Expected " + + s"SparseVector or DenseVector. Instead got: ${vector.getClass}") + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala new file mode 100644 index 0000000000000..758af588f1c69 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class PCASuite extends FunSuite with MLlibTestSparkContext { + + private val data = Array( + Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))), + Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), + Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) + ) + + private lazy val dataRDD = sc.parallelize(data, 2) + + test("Correct computing use a PCA wrapper") { + val k = dataRDD.count().toInt + val pca = new PCA(k).fit(dataRDD) + + val mat = new RowMatrix(dataRDD) + val pc = mat.computePrincipalComponents(k) + + val pca_transform = pca.transform(dataRDD).collect() + val mat_multiply = mat.multiply(pc).rows.collect() + + assert(pca_transform.toSet === mat_multiply.toSet) + } +} diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index ed3171b6976d3..3be0979b92013 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -88,12 +88,12 @@ def get$Name(self): print("\n# DO NOT MODIFY THIS FILE! It was generated by _shared_params_code_gen.py.\n") print("from pyspark.ml.param import Param, Params\n\n") shared = [ - ("maxIter", "max number of iterations", None), - ("regParam", "regularization constant", None), + ("maxIter", "max number of iterations (>= 0)", None), + ("regParam", "regularization parameter (>= 0)", None), ("featuresCol", "features column name", "'features'"), ("labelCol", "label column name", "'label'"), ("predictionCol", "prediction column name", "'prediction'"), - ("rawPredictionCol", "raw prediction column name", "'rawPrediction'"), + ("rawPredictionCol", "raw prediction (a.k.a. confidence) column name", "'rawPrediction'"), ("inputCol", "input column name", None), ("inputCols", "input column names", None), ("outputCol", "output column name", None), diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index d0bcadee22347..4b22322b895b4 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -22,16 +22,16 @@ class HasMaxIter(Params): """ - Mixin for param maxIter: max number of iterations. + Mixin for param maxIter: max number of iterations (>= 0). """ # a placeholder to make it appear in the generated doc - maxIter = Param(Params._dummy(), "maxIter", "max number of iterations") + maxIter = Param(Params._dummy(), "maxIter", "max number of iterations (>= 0)") def __init__(self): super(HasMaxIter, self).__init__() - #: param for max number of iterations - self.maxIter = Param(self, "maxIter", "max number of iterations") + #: param for max number of iterations (>= 0) + self.maxIter = Param(self, "maxIter", "max number of iterations (>= 0)") if None is not None: self._setDefault(maxIter=None) @@ -51,16 +51,16 @@ def getMaxIter(self): class HasRegParam(Params): """ - Mixin for param regParam: regularization constant. + Mixin for param regParam: regularization parameter (>= 0). """ # a placeholder to make it appear in the generated doc - regParam = Param(Params._dummy(), "regParam", "regularization constant") + regParam = Param(Params._dummy(), "regParam", "regularization parameter (>= 0)") def __init__(self): super(HasRegParam, self).__init__() - #: param for regularization constant - self.regParam = Param(self, "regParam", "regularization constant") + #: param for regularization parameter (>= 0) + self.regParam = Param(self, "regParam", "regularization parameter (>= 0)") if None is not None: self._setDefault(regParam=None) @@ -167,16 +167,16 @@ def getPredictionCol(self): class HasRawPredictionCol(Params): """ - Mixin for param rawPredictionCol: raw prediction column name. + Mixin for param rawPredictionCol: raw prediction (a.k.a. confidence) column name. """ # a placeholder to make it appear in the generated doc - rawPredictionCol = Param(Params._dummy(), "rawPredictionCol", "raw prediction column name") + rawPredictionCol = Param(Params._dummy(), "rawPredictionCol", "raw prediction (a.k.a. confidence) column name") def __init__(self): super(HasRawPredictionCol, self).__init__() - #: param for raw prediction column name - self.rawPredictionCol = Param(self, "rawPredictionCol", "raw prediction column name") + #: param for raw prediction (a.k.a. confidence) column name + self.rawPredictionCol = Param(self, "rawPredictionCol", "raw prediction (a.k.a. confidence) column name") if 'rawPrediction' is not None: self._setDefault(rawPredictionCol='rawPrediction') @@ -403,14 +403,12 @@ class HasStepSize(Params): """ # a placeholder to make it appear in the generated doc - stepSize = Param(Params._dummy(), "stepSize", - "Step size to be used for each iteration of optimization.") + stepSize = Param(Params._dummy(), "stepSize", "Step size to be used for each iteration of optimization.") def __init__(self): super(HasStepSize, self).__init__() #: param for Step size to be used for each iteration of optimization. - self.stepSize = Param(self, "stepSize", - "Step size to be used for each iteration of optimization.") + self.stepSize = Param(self, "stepSize", "Step size to be used for each iteration of optimization.") if None is not None: self._setDefault(stepSize=None) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index c1b2077c985cf..fdbae06405f6a 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -179,7 +179,7 @@ def transform(self, dataset, params={}): return dataset -class Evaluator(object): +class Evaluator(Params): """ Base class for evaluators that compute metrics from predictions. """ diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 3a42bcf723894..ba6478dcd58a9 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -34,7 +34,7 @@ from pyspark.sql import DataFrame from pyspark.ml.param import Param from pyspark.ml.param.shared import HasMaxIter, HasInputCol -from pyspark.ml.pipeline import Transformer, Estimator, Pipeline +from pyspark.ml.pipeline import Estimator, Model, Pipeline, Transformer class MockDataset(DataFrame): @@ -77,7 +77,7 @@ def fit(self, dataset, params={}): return model -class MockModel(MockTransformer, Transformer): +class MockModel(MockTransformer, Model): def __init__(self): super(MockModel, self).__init__() @@ -128,7 +128,7 @@ def test_param(self): testParams = TestParams() maxIter = testParams.maxIter self.assertEqual(maxIter.name, "maxIter") - self.assertEqual(maxIter.doc, "max number of iterations") + self.assertEqual(maxIter.doc, "max number of iterations (>= 0)") self.assertTrue(maxIter.parent is testParams) def test_params(self): @@ -156,7 +156,7 @@ def test_params(self): self.assertEquals( testParams.explainParams(), "\n".join(["inputCol: input column name (undefined)", - "maxIter: max number of iterations (default: 10, current: 100)"])) + "maxIter: max number of iterations (>= 0) (default: 10, current: 100)"])) if __name__ == "__main__": diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 28e3727f2c064..86f4dc7368be0 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -236,6 +236,7 @@ class CrossValidatorModel(Model): """ def __init__(self, bestModel): + super(CrossValidatorModel, self).__init__() #: best model from cross validation self.bestModel = bestModel diff --git a/python/pyspark/mllib/evaluation.py b/python/pyspark/mllib/evaluation.py index 3e11df09da6b1..4c777f2180dc9 100644 --- a/python/pyspark/mllib/evaluation.py +++ b/python/pyspark/mllib/evaluation.py @@ -15,9 +15,12 @@ # limitations under the License. # -from pyspark.mllib.common import JavaModelWrapper +from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc from pyspark.sql import SQLContext -from pyspark.sql.types import StructField, StructType, DoubleType +from pyspark.sql.types import StructField, StructType, DoubleType, IntegerType, ArrayType + +__all__ = ['BinaryClassificationMetrics', 'RegressionMetrics', + 'MulticlassMetrics', 'RankingMetrics'] class BinaryClassificationMetrics(JavaModelWrapper): @@ -141,6 +144,206 @@ def r2(self): return self.call("r2") +class MulticlassMetrics(JavaModelWrapper): + """ + Evaluator for multiclass classification. + + >>> predictionAndLabels = sc.parallelize([(0.0, 0.0), (0.0, 1.0), (0.0, 0.0), + ... (1.0, 0.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)]) + >>> metrics = MulticlassMetrics(predictionAndLabels) + >>> metrics.falsePositiveRate(0.0) + 0.2... + >>> metrics.precision(1.0) + 0.75... + >>> metrics.recall(2.0) + 1.0... + >>> metrics.fMeasure(0.0, 2.0) + 0.52... + >>> metrics.precision() + 0.66... + >>> metrics.recall() + 0.66... + >>> metrics.weightedFalsePositiveRate + 0.19... + >>> metrics.weightedPrecision + 0.68... + >>> metrics.weightedRecall + 0.66... + >>> metrics.weightedFMeasure() + 0.66... + >>> metrics.weightedFMeasure(2.0) + 0.65... + """ + + def __init__(self, predictionAndLabels): + """ + :param predictionAndLabels an RDD of (prediction, label) pairs. + """ + sc = predictionAndLabels.ctx + sql_ctx = SQLContext(sc) + df = sql_ctx.createDataFrame(predictionAndLabels, schema=StructType([ + StructField("prediction", DoubleType(), nullable=False), + StructField("label", DoubleType(), nullable=False)])) + java_class = sc._jvm.org.apache.spark.mllib.evaluation.MulticlassMetrics + java_model = java_class(df._jdf) + super(MulticlassMetrics, self).__init__(java_model) + + def truePositiveRate(self, label): + """ + Returns true positive rate for a given label (category). + """ + return self.call("truePositiveRate", label) + + def falsePositiveRate(self, label): + """ + Returns false positive rate for a given label (category). + """ + return self.call("falsePositiveRate", label) + + def precision(self, label=None): + """ + Returns precision or precision for a given label (category) if specified. + """ + if label is None: + return self.call("precision") + else: + return self.call("precision", float(label)) + + def recall(self, label=None): + """ + Returns recall or recall for a given label (category) if specified. + """ + if label is None: + return self.call("recall") + else: + return self.call("recall", float(label)) + + def fMeasure(self, label=None, beta=None): + """ + Returns f-measure or f-measure for a given label (category) if specified. + """ + if beta is None: + if label is None: + return self.call("fMeasure") + else: + return self.call("fMeasure", label) + else: + if label is None: + raise Exception("If the beta parameter is specified, label can not be none") + else: + return self.call("fMeasure", label, beta) + + @property + def weightedTruePositiveRate(self): + """ + Returns weighted true positive rate. + (equals to precision, recall and f-measure) + """ + return self.call("weightedTruePositiveRate") + + @property + def weightedFalsePositiveRate(self): + """ + Returns weighted false positive rate. + """ + return self.call("weightedFalsePositiveRate") + + @property + def weightedRecall(self): + """ + Returns weighted averaged recall. + (equals to precision, recall and f-measure) + """ + return self.call("weightedRecall") + + @property + def weightedPrecision(self): + """ + Returns weighted averaged precision. + """ + return self.call("weightedPrecision") + + def weightedFMeasure(self, beta=None): + """ + Returns weighted averaged f-measure. + """ + if beta is None: + return self.call("weightedFMeasure") + else: + return self.call("weightedFMeasure", beta) + + +class RankingMetrics(JavaModelWrapper): + """ + Evaluator for ranking algorithms. + + >>> predictionAndLabels = sc.parallelize([ + ... ([1, 6, 2, 7, 8, 3, 9, 10, 4, 5], [1, 2, 3, 4, 5]), + ... ([4, 1, 5, 6, 2, 7, 3, 8, 9, 10], [1, 2, 3]), + ... ([1, 2, 3, 4, 5], [])]) + >>> metrics = RankingMetrics(predictionAndLabels) + >>> metrics.precisionAt(1) + 0.33... + >>> metrics.precisionAt(5) + 0.26... + >>> metrics.precisionAt(15) + 0.17... + >>> metrics.meanAveragePrecision + 0.35... + >>> metrics.ndcgAt(3) + 0.33... + >>> metrics.ndcgAt(10) + 0.48... + + """ + + def __init__(self, predictionAndLabels): + """ + :param predictionAndLabels: an RDD of (predicted ranking, ground truth set) pairs. + """ + sc = predictionAndLabels.ctx + sql_ctx = SQLContext(sc) + df = sql_ctx.createDataFrame(predictionAndLabels, + schema=sql_ctx._inferSchema(predictionAndLabels)) + java_model = callMLlibFunc("newRankingMetrics", df._jdf) + super(RankingMetrics, self).__init__(java_model) + + def precisionAt(self, k): + """ + Compute the average precision of all the queries, truncated at ranking position k. + + If for a query, the ranking algorithm returns n (n < k) results, the precision value + will be computed as #(relevant items retrieved) / k. This formula also applies when + the size of the ground truth set is less than k. + + If a query has an empty ground truth set, zero will be used as precision together + with a log warning. + """ + return self.call("precisionAt", int(k)) + + @property + def meanAveragePrecision(self): + """ + Returns the mean average precision (MAP) of all the queries. + If a query has an empty ground truth set, the average precision will be zero and + a log warining is generated. + """ + return self.call("meanAveragePrecision") + + def ndcgAt(self, k): + """ + Compute the average NDCG value of all the queries, truncated at ranking position k. + The discounted cumulative gain at position k is computed as: + sum,,i=1,,^k^ (2^{relevance of ''i''th item}^ - 1) / log(i + 1), + and the NDCG is obtained by dividing the DCG value on the ground truth set. + In the current implementation, the relevance value is binary. + + If a query has an empty ground truth set, zero will be used as ndcg together with + a log warning. + """ + return self.call("ndcgAt", int(k)) + + def _test(): import doctest from pyspark import SparkContext diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d254deb527d10..545c5ad20cb96 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2239,8 +2239,6 @@ def countApproxDistinct(self, relativeSD=0.05): """ if relativeSD < 0.000017: raise ValueError("relativeSD should be greater than 0.000017") - if relativeSD > 0.37: - raise ValueError("relativeSD should be smaller than 0.37") # the hash space in Java is 2^32 hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF) return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ea63a396da5b8..09de4d159fdcf 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -644,7 +644,6 @@ def test_count_approx_distinct(self): self.assertTrue(18 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 22) self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001)) - self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5)) def test_histogram(self): # empty diff --git a/sql/core/pom.xml b/sql/core/pom.xml index e3a6b1fe72435..7d274a73e079f 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -64,7 +64,7 @@ com.fasterxml.jackson.core jackson-databind - 2.3.0 + ${fasterxml.jackson.version} org.jodd diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 1a5083dbe0f61..a03ade3881f59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -109,7 +109,7 @@ private[sql] object JDBCRDD extends Logging { val fields = new Array[StructField](ncols) var i = 0 while (i < ncols) { - val columnName = rsmd.getColumnName(i + 1) + val columnName = rsmd.getColumnLabel(i + 1) val dataType = rsmd.getColumnType(i + 1) val typeName = rsmd.getColumnTypeName(i + 1) val fieldSize = rsmd.getPrecision(i + 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 021affafe36a6..2abfe7f167f77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -204,6 +204,22 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { assert(ids(2) === 3) } + test("Register JDBC query with renamed fields") { + // Regression test for bug SPARK-7345 + sql( + s""" + |CREATE TEMPORARY TABLE renamed + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', dbtable '(select NAME as NAME1, NAME as NAME2 from TEST.PEOPLE)', + |user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) + + val df = sql("SELECT * FROM renamed") + assert(df.schema.fields.size == 2) + assert(df.schema.fields(0).name == "NAME1") + assert(df.schema.fields(1).name == "NAME2") + } + test("Basic API") { assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE").collect().size === 3) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 20a23b3bd6aa9..54f2f3cdec298 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var path: File = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala index ca25751b9583d..6664e8d64c13a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -64,7 +64,7 @@ case class SimpleDDLScan(from: Int, to: Int, table: String)(@transient val sqlCo } class DDLTestSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ before { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index 9d3090c19b4e8..24ed665c67d2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter abstract class DataSourceTest extends QueryTest with BeforeAndAfter { // We want to test some edge cases. - implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) + implicit val caseInsensitiveContext = new SQLContext(TestSQLContext.sparkContext) - caseInsensisitiveContext.setConf(SQLConf.CASE_SENSITIVE, "false") + caseInsensitiveContext.setConf(SQLConf.CASE_SENSITIVE, "false") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index cb5e5147ff189..cce747e7dbf64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -97,7 +97,7 @@ object FiltersPushed { class FilteredScanSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ before { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 50629ea4dc066..d1d427e1790bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils class InsertSuite extends DataSourceTest with BeforeAndAfterAll { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var path: File = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index 6a1ddf2f8e98b..c2bc52e2120c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -52,7 +52,7 @@ case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLCo } class PrunedScanSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ before { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index cb287ba85c1f8..6567d1acd7644 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.Utils class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var originalDefaultSource: String = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 3b47b8adf313b..77af04a491742 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -88,7 +88,7 @@ case class AllDataTypesScan( } class TableScanSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var tableWithSchemaExpected = (1 to 10).map { i => Row( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 0be5a92c2546c..3458b04bfba0f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -147,7 +147,7 @@ object HiveThriftServer2 extends Logging { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { server.stop() } - + var onlineSessionNum: Int = 0 val sessionList = new mutable.LinkedHashMap[String, SessionInfo] val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] val retainedStatements = @@ -170,11 +170,13 @@ object HiveThriftServer2 extends Logging { def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = { val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName) sessionList.put(sessionId, info) + onlineSessionNum += 1 trimSessionIfNecessary() } def onSessionClosed(sessionId: String): Unit = { sessionList(sessionId).finishTimestamp = System.currentTimeMillis + onlineSessionNum -= 1 } def onStatementStart( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index 71b16b6bebffb..6a2be4a58e5cb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -29,7 +29,7 @@ import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui._ -/** Page for Spark Web UI that shows statistics of a streaming job */ +/** Page for Spark Web UI that shows statistics of a thrift server */ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging { private val listener = parent.listener @@ -42,7 +42,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" generateBasicStats() ++
++

- {listener.sessionList.size} session(s) are online, + {listener.onlineSessionNum} session(s) are online, running {listener.totalRunning} SQL statement(s)

++ generateSessionStatsTable() ++ @@ -50,12 +50,12 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000)) } - /** Generate basic stats of the streaming program */ + /** Generate basic stats of the thrift server program */ private def generateBasicStats(): Seq[Node] = { val timeSinceStart = System.currentTimeMillis() - startTime.getTime
  • - Started at: {startTime.toString} + Started at: {formatDate(startTime)}
  • Time since start: {formatDurationVerbose(timeSinceStart)} @@ -148,7 +148,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" {session.userName} {session.ip} - {session.sessionId} , + {session.sessionId} {formatDate(session.startTimestamp)} {if(session.finishTimestamp > 0) formatDate(session.finishTimestamp)} {formatDurationOption(Some(session.totalTime))} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index bbdb4e8af036c..5abe1367752d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming import java.io.InputStream -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.Map import scala.collection.mutable.Queue @@ -28,8 +28,9 @@ import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + import org.apache.spark._ import org.apache.spark.annotation.Experimental import org.apache.spark.input.FixedLengthBinaryInputFormat @@ -37,8 +38,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} -import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} +import org.apache.spark.util.CallSite /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -202,6 +204,8 @@ class StreamingContext private[streaming] ( import StreamingContextState._ private[streaming] var state = Initialized + private val startSite = new AtomicReference[CallSite](null) + /** * Return the associated Spark context */ @@ -518,6 +522,7 @@ class StreamingContext private[streaming] ( * @throws SparkException if the context has already been started or stopped. */ def start(): Unit = synchronized { + import StreamingContext._ if (state == Started) { throw new SparkException("StreamingContext has already been started") } @@ -525,10 +530,15 @@ class StreamingContext private[streaming] ( throw new SparkException("StreamingContext has already been stopped") } validate() - sparkContext.setCallSite(DStream.getCreationSite()) - scheduler.start() - uiTab.foreach(_.attach()) - state = Started + startSite.set(DStream.getCreationSite()) + sparkContext.setCallSite(startSite.get) + ACTIVATION_LOCK.synchronized { + assertNoOtherContextIsActive() + scheduler.start() + uiTab.foreach(_.attach()) + state = Started + setActiveContext(this) + } } /** @@ -603,6 +613,7 @@ class StreamingContext private[streaming] ( uiTab.foreach(_.detach()) // The state should always be Stopped after calling `stop()`, even if we haven't started yet: state = Stopped + StreamingContext.setActiveContext(null) } } @@ -612,8 +623,29 @@ class StreamingContext private[streaming] ( */ object StreamingContext extends Logging { + /** + * Lock that guards access to global variables that track active StreamingContext. + */ + private val ACTIVATION_LOCK = new Object() - private[streaming] val DEFAULT_CLEANER_TTL = 3600 + private val activeContext = new AtomicReference[StreamingContext](null) + + private def assertNoOtherContextIsActive(): Unit = { + ACTIVATION_LOCK.synchronized { + if (activeContext.get() != null) { + throw new SparkException( + "Only one StreamingContext may be started in this JVM. " + + "Currently running StreamingContext was started at" + + activeContext.get.startSite.get.longForm) + } + } + } + + private def setActiveContext(ssc: StreamingContext): Unit = { + ACTIVATION_LOCK.synchronized { + activeContext.set(ssc) + } + } @deprecated("Replaced by implicit functions in the DStream companion object. This is " + "kept here only for backward compatibility.", "1.3.0") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 42c49678d24f0..92cfd7d40338c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -63,6 +63,11 @@ case class Time(private val millis: Long) { new Time((this.millis / t) * t) } + def floor(that: Duration, zeroTime: Time): Time = { + val t = that.milliseconds + new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) + } + def isMultipleOf(that: Duration): Boolean = (this.millis % that.milliseconds == 0) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index f1f8a70655996..7092a3d3f0b86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -763,16 +763,22 @@ abstract class DStream[T: ClassTag] ( if (!isInitialized) { throw new SparkException(this + " has not been initialized") } - if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + + val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) { + toTime + } else { + logWarning("toTime (" + toTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") + toTime.floor(slideDuration, zeroTime) } - if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + + val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) { + fromTime + } else { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") + fromTime.floor(slideDuration, zeroTime) } - val alignedToTime = toTime.floor(slideDuration) - val alignedFromTime = fromTime.floor(slideDuration) logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a589deb1fa579..11c7fd835bfcd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -480,6 +480,24 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("multiple streaming contexts") { + sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName)) + ssc = new StreamingContext(sc, Seconds(1)) + val input = addInputStream(ssc) + input.foreachRDD { rdd => rdd.count } + ssc.start() + + // Creating another streaming context should not create errors + val anotherSsc = new StreamingContext(sc, Seconds(10)) + val anotherInput = addInputStream(anotherSsc) + anotherInput.foreachRDD { rdd => rdd.count } + + val exception = intercept[SparkException] { + anotherSsc.start() + } + assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception") + } + test("DStream and generated RDD creation sites") { testPackage.test() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala index 5579ac364346c..e6a01656f479d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala @@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase { assert(new Time(1200).floor(new Duration(200)) == new Time(1200)) assert(new Time(199).floor(new Duration(200)) == new Time(0)) assert(new Time(1).floor(new Duration(1)) == new Time(1)) + assert(new Time(1350).floor(new Duration(200), new Time(50)) == new Time(1250)) + assert(new Time(1350).floor(new Duration(200), new Time(150)) == new Time(1350)) + assert(new Time(1350).floor(new Duration(200), new Time(200)) == new Time(1200)) } test("isMultipleOf") { diff --git a/tox.ini b/tox.ini index b568029a204cc..76e3f42cde62d 100644 --- a/tox.ini +++ b/tox.ini @@ -15,4 +15,4 @@ [pep8] max-line-length=100 -exclude=cloudpickle.py,heapq3.py +exclude=cloudpickle.py,heapq3.py,shared.py