Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-c…
Browse files Browse the repository at this point in the history
…heck
  • Loading branch information
jkbradley committed Aug 7, 2014
2 parents ee918e9 + 32096c2 commit 064985b
Show file tree
Hide file tree
Showing 17 changed files with 393 additions and 247 deletions.
2 changes: 1 addition & 1 deletion bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
FWDIR="$(cd `dirname $0`/..; pwd)"

function usage {
echo "Usage: ./sbin/spark-sql [options] [cli option]"
echo "Usage: ./bin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ abstract class RDD[T: ClassTag](
},
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h2
h1
}).cardinality()
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {

def error(est: Long, size: Long) = math.abs(est - size) / size.toDouble

val size = 100
val uniformDistro = for (i <- 1 to 100000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro)
assert(error(simpleRdd.countApproxDistinct(4, 0), size) < 0.4)
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.1)
val size = 1000
val uniformDistro = for (i <- 1 to 5000) yield i % size
val simpleRdd = sc.makeRDD(uniformDistro, 10)
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2)
assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1)
}

test("SparkContext.union") {
Expand Down
9 changes: 8 additions & 1 deletion dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ RELEASE_VERSION=${RELEASE_VERSION:-1.0.0}
RC_NAME=${RC_NAME:-rc2}
USER_NAME=${USER_NAME:-pwendell}

if [ -z "$JAVA_HOME" ]; then
echo "Error: JAVA_HOME is not set, cannot proceed."
exit -1
fi
JAVA_7_HOME=${JAVA_7_HOME:-$JAVA_HOME}

set -e

GIT_TAG=v$RELEASE_VERSION-$RC_NAME
Expand Down Expand Up @@ -130,7 +136,8 @@ scp spark-* \
cd spark
sbt/sbt clean
cd docs
PRODUCTION=1 jekyll build
# Compile docs with Java 7 to use nicer format
JAVA_HOME=$JAVA_7_HOME PRODUCTION=1 jekyll build
echo "Copying release documentation"
rc_docs_folder=${rc_folder}-docs
ssh $USER_NAME@people.apache.org \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.random.{RandomRDDGenerators => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance}
import org.apache.spark.mllib.tree.impurity._
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.correlation.CorrelationNames
Expand Down Expand Up @@ -523,17 +521,8 @@ class PythonMLLibAPI extends Serializable {

val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)

val algo: Algo = algoStr match {
case "classification" => Classification
case "regression" => Regression
case _ => throw new IllegalArgumentException(s"Bad algoStr parameter: $algoStr")
}
val impurity: Impurity = impurityStr match {
case "gini" => Gini
case "entropy" => Entropy
case "variance" => Variance
case _ => throw new IllegalArgumentException(s"Bad impurityStr parameter: $impurityStr")
}
val algo = Algo.fromString(algoStr)
val impurity = Impurities.fromString(impurityStr)

val strategy = new Strategy(
algo = algo,
Expand Down
130 changes: 61 additions & 69 deletions mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,87 +36,25 @@ class IDF {

// TODO: Allow different IDF formulations.

private var brzIdf: BDV[Double] = _

/**
* Computes the inverse document frequency.
* @param dataset an RDD of term frequency vectors
*/
def fit(dataset: RDD[Vector]): this.type = {
brzIdf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
def fit(dataset: RDD[Vector]): IDFModel = {
val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator)(
seqOp = (df, v) => df.add(v),
combOp = (df1, df2) => df1.merge(df2)
).idf()
this
new IDFModel(idf)
}

/**
* Computes the inverse document frequency.
* @param dataset a JavaRDD of term frequency vectors
*/
def fit(dataset: JavaRDD[Vector]): this.type = {
def fit(dataset: JavaRDD[Vector]): IDFModel = {
fit(dataset.rdd)
}

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors.
* @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors
*/
def transform(dataset: RDD[Vector]): RDD[Vector] = {
if (!initialized) {
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
}
val theIdf = brzIdf
val bcIdf = dataset.context.broadcast(theIdf)
dataset.mapPartitions { iter =>
val thisIdf = bcIdf.value
iter.map { v =>
val n = v.size
v match {
case sv: SparseVector =>
val nnz = sv.indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
k += 1
}
Vectors.sparse(n, sv.indices, newValues)
case dv: DenseVector =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = dv.values(j) * thisIdf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
}
}

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
*/
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(dataset.rdd).toJavaRDD()
}

/** Returns the IDF vector. */
def idf(): Vector = {
if (!initialized) {
throw new IllegalStateException("Haven't learned IDF yet. Call fit first.")
}
Vectors.fromBreeze(brzIdf)
}

private def initialized: Boolean = brzIdf != null
}

private object IDF {
Expand Down Expand Up @@ -177,18 +115,72 @@ private object IDF {
private def isEmpty: Boolean = m == 0L

/** Returns the current IDF vector. */
def idf(): BDV[Double] = {
def idf(): Vector = {
if (isEmpty) {
throw new IllegalStateException("Haven't seen any document yet.")
}
val n = df.length
val inv = BDV.zeros[Double](n)
val inv = new Array[Double](n)
var j = 0
while (j < n) {
inv(j) = math.log((m + 1.0)/ (df(j) + 1.0))
j += 1
}
inv
Vectors.dense(inv)
}
}
}

/**
* :: Experimental ::
* Represents an IDF model that can transform term frequency vectors.
*/
@Experimental
class IDFModel private[mllib] (val idf: Vector) extends Serializable {

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors.
* @param dataset an RDD of term frequency vectors
* @return an RDD of TF-IDF vectors
*/
def transform(dataset: RDD[Vector]): RDD[Vector] = {
val bcIdf = dataset.context.broadcast(idf)
dataset.mapPartitions { iter =>
val thisIdf = bcIdf.value
iter.map { v =>
val n = v.size
v match {
case sv: SparseVector =>
val nnz = sv.indices.size
val newValues = new Array[Double](nnz)
var k = 0
while (k < nnz) {
newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
k += 1
}
Vectors.sparse(n, sv.indices, newValues)
case dv: DenseVector =>
val newValues = new Array[Double](n)
var j = 0
while (j < n) {
newValues(j) = dv.values(j) * thisIdf(j)
j += 1
}
Vectors.dense(newValues)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}
}
}

/**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
*/
def transform(dataset: JavaRDD[Vector]): JavaRDD[Vector] = {
transform(dataset.rdd).toJavaRDD()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.mllib.feature

import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}

import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.rdd.RDDFunctions._
Expand All @@ -35,37 +36,55 @@ import org.apache.spark.rdd.RDD
* @param withStd True by default. Scales the data to unit standard deviation.
*/
@Experimental
class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransformer {
class StandardScaler(withMean: Boolean, withStd: Boolean) extends Logging {

def this() = this(false, true)

require(withMean || withStd, s"withMean and withStd both equal to false. Doing nothing.")

private var mean: BV[Double] = _
private var factor: BV[Double] = _
if (!(withMean || withStd)) {
logWarning("Both withMean and withStd are false. The model does nothing.")
}

/**
* Computes the mean and variance and stores as a model to be used for later scaling.
*
* @param data The data used to compute the mean and variance to build the transformation model.
* @return This StandardScalar object.
* @return a StandardScalarModel
*/
def fit(data: RDD[Vector]): this.type = {
def fit(data: RDD[Vector]): StandardScalerModel = {
// TODO: skip computation if both withMean and withStd are false
val summary = data.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
new StandardScalerModel(withMean, withStd, summary.mean, summary.variance)
}
}

mean = summary.mean.toBreeze
factor = summary.variance.toBreeze
require(mean.length == factor.length)
/**
* :: Experimental ::
* Represents a StandardScaler model that can transform vectors.
*
* @param withMean whether to center the data before scaling
* @param withStd whether to scale the data to have unit standard deviation
* @param mean column mean values
* @param variance column variance values
*/
@Experimental
class StandardScalerModel private[mllib] (
val withMean: Boolean,
val withStd: Boolean,
val mean: Vector,
val variance: Vector) extends VectorTransformer {

require(mean.size == variance.size)

private lazy val factor: BDV[Double] = {
val f = BDV.zeros[Double](variance.size)
var i = 0
while (i < factor.length) {
factor(i) = if (factor(i) != 0.0) 1.0 / math.sqrt(factor(i)) else 0.0
while (i < f.size) {
f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0
i += 1
}

this
f
}

/**
Expand All @@ -76,13 +95,7 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
* for the column with zero variance.
*/
override def transform(vector: Vector): Vector = {
if (mean == null || factor == null) {
throw new IllegalStateException(
"Haven't learned column summary statistics yet. Call fit first.")
}

require(vector.size == mean.length)

require(mean.size == vector.size)
if (withMean) {
vector.toBreeze match {
case dv: BDV[Double] =>
Expand Down Expand Up @@ -115,5 +128,4 @@ class StandardScaler(withMean: Boolean, withStd: Boolean) extends VectorTransfor
vector
}
}

}
Loading

0 comments on commit 064985b

Please sign in to comment.