Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
JihongMA committed May 11, 2015
2 parents d5bf3f5 + 1b46556 commit 9ea0832
Show file tree
Hide file tree
Showing 46 changed files with 644 additions and 102 deletions.
16 changes: 16 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

========================================================================
For vis.js (core/src/main/resources/org/apache/spark/ui/static/vis.min.js):
========================================================================
Copyright (C) 2010-2015 Almende B.V.

Vis.js is dual licensed under both

* The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0

and

* The MIT License
http://opensource.org/licenses/MIT

Vis.js may be distributed under either license.

========================================================================
BSD-style licenses
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
})

setMethod("show", "RDD",
function(.Object) {
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
function(object) {
cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep=""))
})

setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ div#application-timeline, div#job-timeline {
margin-bottom: 30px;
}

#application-timeline div.legend-area {
#application-timeline div.legend-area,
#job-timeline div.legend-area {
margin-top: 5px;
}

Expand Down
31 changes: 12 additions & 19 deletions core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,24 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {

function setupJobEventAction() {
$(".item.range.job.application-timeline-object").each(function() {
var getJobId = function(baseElem) {
var getSelectorForJobEntry = function(baseElem) {
var jobIdText = $($(baseElem).find(".application-timeline-content")[0]).text();
var jobId = jobIdText.match("\\(Job (\\d+)\\)")[1];
return jobId;
return "#job-" + jobId;
};

$(this).click(function() {
window.location.href = "job/?id=" + getJobId(this);
var jobPagePath = $(getSelectorForJobEntry(this)).find("a").attr("href")
window.location.href = jobPagePath
});

$(this).hover(
function() {
$("#job-" + getJobId(this)).addClass("corresponding-item-hover");
$(getSelectorForJobEntry(this)).addClass("corresponding-item-hover");
$($(this).find("div.application-timeline-content")[0]).tooltip("show");
},
function() {
$("#job-" + getJobId(this)).removeClass("corresponding-item-hover");
$(getSelectorForJobEntry(this)).removeClass("corresponding-item-hover");
$($(this).find("div.application-timeline-content")[0]).tooltip("hide");
}
);
Expand Down Expand Up @@ -97,32 +98,24 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {

function setupStageEventAction() {
$(".item.range.stage.job-timeline-object").each(function() {
var getStageIdAndAttempt = function(baseElem) {
var getSelectorForStageEntry = function(baseElem) {
var stageIdText = $($(baseElem).find(".job-timeline-content")[0]).text();
var stageIdAndAttempt = stageIdText.match("\\(Stage (\\d+\\.\\d+)\\)")[1].split(".");
return stageIdAndAttempt;
return "#stage-" + stageIdAndAttempt[0] + "-" + stageIdAndAttempt[1];
};

$(this).click(function() {
var idAndAttempt = getStageIdAndAttempt(this);
var id = idAndAttempt[0];
var attempt = idAndAttempt[1];
window.location.href = "../../stages/stage/?id=" + id + "&attempt=" + attempt;
var stagePagePath = $(getSelectorForStageEntry(this)).find("a").attr("href")
window.location.href = stagePagePath
});

$(this).hover(
function() {
var idAndAttempt = getStageIdAndAttempt(this);
var id = idAndAttempt[0];
var attempt = idAndAttempt[1];
$("#stage-" + id + "-" + attempt).addClass("corresponding-item-hover");
$(getSelectorForStageEntry(this)).addClass("corresponding-item-hover");
$($(this).find("div.job-timeline-content")[0]).tooltip("show");
},
function() {
var idAndAttempt = getStageIdAndAttempt(this);
var id = idAndAttempt[0];
var attempt = idAndAttempt[1];
$("#stage-" + id + "-" + attempt).removeClass("corresponding-item-hover");
$(getSelectorForStageEntry(this)).removeClass("corresponding-item-hover");
$($(this).find("div.job-timeline-content")[0]).tooltip("hide");
}
);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1161,8 +1161,8 @@ abstract class RDD[T: ClassTag](
*/
@Experimental
def countApproxDistinct(p: Int, sp: Int): Long = withScope {
require(p >= 4, s"p ($p) must be at least 4")
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
require(p >= 4, s"p ($p) must be >= 4")
require(sp <= 32, s"sp ($sp) must be <= 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val zeroCounter = new HyperLogLogPlus(p, sp)
aggregate(zeroCounter)(
Expand All @@ -1187,8 +1187,9 @@ abstract class RDD[T: ClassTag](
* It must be greater than 0.000017.
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope {
require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
countApproxDistinct(p, 0)
countApproxDistinct(if (p < 4) 4 else p, 0)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val simpleRdd = sc.makeRDD(uniformDistro, 10)
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2)
assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1)
assert(error(simpleRdd.countApproxDistinct(0.02), size) < 0.1)
assert(error(simpleRdd.countApproxDistinct(0.5), size) < 0.22)
}

test("SparkContext.union") {
Expand Down
19 changes: 18 additions & 1 deletion docs/mllib-dimensionality-reduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ statistical method to find a rotation such that the first coordinate has the lar
possible, and each succeeding coordinate in turn has the largest variance possible. The columns of
the rotation matrix are called principal components. PCA is used widely in dimensionality reduction.

MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format.
MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format and any Vectors.

<div class="codetabs">
<div data-lang="scala" markdown="1">
Expand All @@ -157,6 +157,23 @@ val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are
// Project the rows to the linear space spanned by the top 10 principal components.
val projected: RowMatrix = mat.multiply(pc)
{% endhighlight %}

The following code demonstrates how to compute principal components on source vectors
and use them to project the vectors into a low-dimensional space while keeping associated labels:

{% highlight scala %}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.PCA

val data: RDD[LabeledPoint] = ...

// Compute the top 10 principal components.
val pca = new PCA(10).fit(data.map(_.features))

// Project vectors to the linear space spanned by the top 10 principal components, keeping the label
val projected = data.map(p => p.copy(features = pca.transform(p.features)))
{% endhighlight %}

</div>

<div data-lang="java" markdown="1">
Expand Down
55 changes: 54 additions & 1 deletion docs/mllib-feature-extraction.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ v_N

This example below demonstrates how to load a simple vectors file, extract a set of vectors, then transform those vectors using a transforming vector value.


<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
Expand All @@ -531,3 +530,57 @@ val transformedData2 = parsedData.map(x => transformer.transform(x))
</div>


## PCA

A feature transformer that projects vectors to a low-dimensional space using PCA.
Details you can read at [dimensionality reduction](mllib-dimensionality-reduction.html).

### Example

The following code demonstrates how to compute principal components on a `Vector`
and use them to project the vectors into a low-dimensional space while keeping associated labels
for calculation a [Linear Regression]((mllib-linear-methods.html))

<div class="codetabs">
<div data-lang="scala">
{% highlight scala %}
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.feature.PCA

val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}.cache()

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

val pca = new PCA(training.first().features.size/2).fit(data.map(_.features))
val training_pca = training.map(p => p.copy(features = pca.transform(p.features)))
val test_pca = test.map(p => p.copy(features = pca.transform(p.features)))

val numIterations = 100
val model = LinearRegressionWithSGD.train(training, numIterations)
val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations)

val valuesAndPreds = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}

val valuesAndPreds_pca = test_pca.map { point =>
val score = model_pca.predict(point.features)
(score, point.label)
}

val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean()

println("Mean Squared Error = " + MSE)
println("PCA Mean Squared Error = " + MSE_pca)
{% endhighlight %}
</div>
</div>
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1915,7 +1915,7 @@ In that case, consider
[reducing](#reducing-the-processing-time-of-each-batch) the batch processing time.

The progress of a Spark Streaming program can also be monitored using the
[StreamingListener](api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface,
[StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) interface,
which allows you to get receiver status and processing times. Note that this is a developer API
and it is likely to be improved upon (i.e., more information reported) in the future.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object DecisionTreeExample {
.text(s"input path to test dataset. If given, option fracTest is ignored." +
s" default: ${defaultParams.testInput}")
.action((x, c) => c.copy(testInput = x))
opt[String]("<dataFormat>")
opt[String]("dataFormat")
.text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
.action((x, c) => c.copy(dataFormat = x))
arg[String]("<input>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ object GBTExample {
.text(s"input path to test dataset. If given, option fracTest is ignored." +
s" default: ${defaultParams.testInput}")
.action((x, c) => c.copy(testInput = x))
opt[String]("<dataFormat>")
opt[String]("dataFormat")
.text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
.action((x, c) => c.copy(dataFormat = x))
arg[String]("<input>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object RandomForestExample {
.text(s"input path to test dataset. If given, option fracTest is ignored." +
s" default: ${defaultParams.testInput}")
.action((x, c) => c.copy(testInput = x))
opt[String]("<dataFormat>")
opt[String]("dataFormat")
.text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
.action((x, c) => c.copy(dataFormat = x))
arg[String]("<input>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object DecisionTreeRunner {
.text(s"input path to test dataset. If given, option fracTest is ignored." +
s" default: ${defaultParams.testInput}")
.action((x, c) => c.copy(testInput = x))
opt[String]("<dataFormat>")
opt[String]("dataFormat")
.text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
.action((x, c) => c.copy(dataFormat = x))
arg[String]("<input>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object GradientBoostedTreesRunner {
.text(s"input path to test dataset. If given, option fracTest is ignored." +
s" default: ${defaultParams.testInput}")
.action((x, c) => c.copy(testInput = x))
opt[String]("<dataFormat>")
opt[String]("dataFormat")
.text("data format: libsvm (default), dense (deprecated in Spark v1.1)")
.action((x, c) => c.copy(dataFormat = x))
arg[String]("<input>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scopt.OptionParser
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.clustering.{EMLDAOptimizer, OnlineLDAOptimizer, DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

Expand All @@ -48,6 +48,7 @@ object LDAExample {
topicConcentration: Double = -1,
vocabSize: Int = 10000,
stopwordFile: String = "",
algorithm: String = "em",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10) extends AbstractParams[Params]

Expand Down Expand Up @@ -78,6 +79,10 @@ object LDAExample {
.text(s"filepath for a list of stopwords. Note: This must fit on a single machine." +
s" default: ${defaultParams.stopwordFile}")
.action((x, c) => c.copy(stopwordFile = x))
opt[String]("algorithm")
.text(s"inference algorithm to use. em and online are supported." +
s" default: ${defaultParams.algorithm}")
.action((x, c) => c.copy(algorithm = x))
opt[String]("checkpointDir")
.text(s"Directory for checkpointing intermediate results." +
s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
Expand Down Expand Up @@ -128,7 +133,17 @@ object LDAExample {

// Run LDA.
val lda = new LDA()
lda.setK(params.k)

val optimizer = params.algorithm.toLowerCase match {
case "em" => new EMLDAOptimizer
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
case "online" => new OnlineLDAOptimizer().setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize)
case _ => throw new IllegalArgumentException(
s"Only em, online are supported but got ${params.algorithm}.")
}

lda.setOptimizer(optimizer)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDocConcentration(params.docConcentration)
.setTopicConcentration(params.topicConcentration)
Expand All @@ -137,14 +152,18 @@ object LDAExample {
sc.setCheckpointDir(params.checkpointDir.get)
}
val startTime = System.nanoTime()
val ldaModel = lda.run(corpus).asInstanceOf[DistributedLDAModel]
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9

println(s"Finished training LDA model. Summary:")
println(s"\t Training time: $elapsed sec")
val avgLogLikelihood = ldaModel.logLikelihood / actualCorpusSize.toDouble
println(s"\t Training data average log likelihood: $avgLogLikelihood")
println()

if (ldaModel.isInstanceOf[DistributedLDAModel]) {
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
val avgLogLikelihood = distLDAModel.logLikelihood / actualCorpusSize.toDouble
println(s"\t Training data average log likelihood: $avgLogLikelihood")
println()
}

// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.mllib.linalg._
Expand All @@ -50,6 +51,7 @@ import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTree
import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomForest}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -923,6 +925,14 @@ private[python] class PythonMLLibAPI extends Serializable {
RG.gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, parts, s)
}

/**
* Java stub for the constructor of Python mllib RankingMetrics
*/
def newRankingMetrics(predictionAndLabels: DataFrame): RankingMetrics[Any] = {
new RankingMetrics(predictionAndLabels.map(
r => (r.getSeq(0).toArray[Any], r.getSeq(1).toArray[Any])))
}


}

Expand Down
Loading

0 comments on commit 9ea0832

Please sign in to comment.