Skip to content

Commit

Permalink
minimiaze import of ml
Browse files Browse the repository at this point in the history
minimiaze import of ml
  • Loading branch information
zhengruifeng committed Jan 2, 2020
1 parent a5bbf16 commit a675881
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.mllib.evaluation

import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml.stat.SummaryBuilderImpl._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}

Expand Down Expand Up @@ -60,16 +60,14 @@ class RegressionMetrics @Since("2.0.0") (
* Use SummarizerBuffer to calculate summary statistics of observations and errors.
*/
private lazy val summary = {
predictionAndObservations.map {
val weightedVectors = predictionAndObservations.map {
case (prediction: Double, observation: Double, weight: Double) =>
(Vectors.dense(observation, observation - prediction, prediction), weight)
case (prediction: Double, observation: Double) =>
(Vectors.dense(observation, observation - prediction, prediction), 1.0)
}.treeAggregate(createSummarizerBuffer("mean", "normL1", "normL2", "variance"))(
seqOp = { case (c, (v, w)) => c.add(v.nonZeroIterator, v.size, w) },
combOp = { case (c1, c2) => c1.merge(c2) },
depth = 2
)
}
Statistics.colStats(weightedVectors,
Seq("mean", "normL1", "normL2", "variance"))
}

private lazy val SSy = math.pow(summary.normL2(0), 2)
Expand Down
9 changes: 5 additions & 4 deletions mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
s"source vector size $numFeatures must be no less than k=$k")

val mat = if (numFeatures > 65535) {
val summary = Statistics.colStats(sources, Seq("mean"))
val meanVector = summary.mean.asBreeze
val meanCentredRdd = sources.map { rowVector =>
Vectors.fromBreeze(rowVector.asBreeze - meanVector)
val summary = Statistics.colStats(sources.map((_, 1.0)), Seq("mean"))
val mean = Vectors.fromML(summary.mean)
val meanCentredRdd = sources.map { row =>
BLAS.axpy(-1, mean, row)
row
}
new RowMatrix(meanCentredRdd)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class StandardScaler @Since("1.1.0") (withMean: Boolean, withStd: Boolean) exten
@Since("1.1.0")
def fit(data: RDD[Vector]): StandardScalerModel = {
// TODO: skip computation if both withMean and withStd are false
val summary = Statistics.colStats(data, Seq("mean", "std"))
val summary = Statistics.colStats(data.map((_, 1.0)), Seq("mean", "std"))

new StandardScalerModel(
Vectors.fromML(summary.std),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class RowMatrix @Since("1.0.0") (
val n = numCols().toInt
checkNumColumns(n)

val summary = Statistics.colStats(rows, Seq("count", "mean"))
val summary = Statistics.colStats(rows.map((_, 1.0)), Seq("count", "mean"))
val m = summary.count
require(m > 1, s"RowMatrix.computeCovariance called on matrix with only $m rows." +
" Cannot compute the covariance of a RowMatrix with <= 1 row.")
Expand Down Expand Up @@ -616,7 +616,7 @@ class RowMatrix @Since("1.0.0") (
10 * math.log(numCols()) / threshold
}

val summary = Statistics.colStats(rows, Seq("normL2"))
val summary = Statistics.colStats(rows.map((_, 1.0)), Seq("normL2"))
columnSimilaritiesDIMSUM(summary.normL2.toArray, gamma)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ object Statistics {
}

/**
* Computes required column-wise summary statistics for the input RDD[Vector].
* Computes required column-wise summary statistics for the input RDD[(Vector, Double)].
*
* @param X an RDD[Vector] for which column-wise summary statistics are to be computed.
* @param X an RDD containing vectors and weights for which column-wise summary statistics
* are to be computed.
* @return [[SummarizerBuffer]] object containing column-wise summary statistics.
*/
private[mllib] def colStats(X: RDD[Vector], requested: Seq[String]) = {
private[mllib] def colStats(X: RDD[(Vector, Double)], requested: Seq[String]) = {
X.treeAggregate(createSummarizerBuffer(requested: _*))(
seqOp = { case (c, v) => c.add(v.nonZeroIterator, v.size, 1.0) },
seqOp = { case (c, (v, w)) => c.add(v.nonZeroIterator, v.size, w) },
combOp = { case (c1, c2) => c1.merge(c2) },
depth = 2
)
Expand Down

0 comments on commit a675881

Please sign in to comment.