Skip to content

Commit

Permalink
move treeReduce and treeAggregate to mllib
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jul 1, 2014
1 parent 8a2a59c commit d58a087
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 81 deletions.
63 changes: 0 additions & 63 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -839,39 +839,6 @@ abstract class RDD[T: ClassTag](
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

/**
* :: DeveloperApi ::
* Reduces the elements of this RDD in a tree pattern.
* @param depth suggested depth of the tree
* @see [[org.apache.spark.rdd.RDD#reduce]]
*/
@DeveloperApi
def treeReduce(f: (T, T) => T, depth: Int): T = {
require(depth >= 1, s"Depth must be greater than 1 but got $depth.")
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
val local = this.mapPartitions(it => Iterator(reducePartition(it)))
val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
if (c.isDefined && x.isDefined) {
Some(cleanF(c.get, x.get))
} else if (c.isDefined) {
c
} else if (x.isDefined) {
x
} else {
None
}
}
local.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
Expand Down Expand Up @@ -907,36 +874,6 @@ abstract class RDD[T: ClassTag](
jobResult
}

/**
* :: DeveloperApi ::
* Aggregates the elements of this RDD in a tree pattern.
* @param depth suggested depth of the tree
* @see [[org.apache.spark.rdd.RDD#aggregate]]
*/
@DeveloperApi
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int): U = {
require(depth >= 1, s"Depth must be greater than 1 but got $depth.")
if (this.partitions.size == 0) {
return Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
}
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var local = this.mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = local.partitions.size
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
while (numPartitions > scale + numPartitions / scale) {
numPartitions /= scale
local = local.mapPartitionsWithIndex { (i, iter) =>
iter.map((i % numPartitions, _))
}.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values
}
local.reduce(cleanCombOp)
}

/**
* Return the number of elements in the RDD.
*/
Expand Down
18 changes: 0 additions & 18 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -820,22 +820,4 @@ class RDDSuite extends FunSuite with SharedSparkContext {
mutableDependencies += dep
}
}

test("treeAggregate") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
def seqOp = (c: Long, x: Int) => c + x
def combOp = (c1: Long, c2: Long) => c1 + c2
for (level <- 1 until 10) {
val sum = rdd.treeAggregate(0L)(seqOp, combOp, level)
assert(sum === -1000L)
}
}

test("treeReduce") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
for (level <- 1 until 10) {
val sum = rdd.treeReduce(_ + _, level)
assert(sum === -1000)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.rdd.RDDFunctions._

/**
* Class used to solve an optimization problem using Gradient Descent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.rdd.RDDFunctions._

/**
* :: DeveloperApi ::
Expand Down
62 changes: 62 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package org.apache.spark.mllib.rdd
import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
* Machine learning specific RDD functions.
Expand All @@ -44,6 +47,65 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
new SlidingRDD[T](self, windowSize)
}
}

/**
* Reduces the elements of this RDD in a tree pattern.
* @param depth suggested depth of the tree
* @see [[org.apache.spark.rdd.RDD#reduce]]
*/
def treeReduce(f: (T, T) => T, depth: Int): T = {
require(depth >= 1, s"Depth must be greater than 1 but got $depth.")
val cleanF = self.context.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
val local = self.mapPartitions(it => Iterator(reducePartition(it)))
val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
if (c.isDefined && x.isDefined) {
Some(cleanF(c.get, x.get))
} else if (c.isDefined) {
c
} else if (x.isDefined) {
x
} else {
None
}
}
RDDFunctions.fromRDD(local).treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

/**
* Aggregates the elements of this RDD in a tree pattern.
* @param depth suggested depth of the tree
* @see [[org.apache.spark.rdd.RDD#aggregate]]
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int): U = {
require(depth >= 1, s"Depth must be greater than 1 but got $depth.")
if (self.partitions.size == 0) {
return Utils.clone(zeroValue, self.context.env.closureSerializer.newInstance())
}
val cleanSeqOp = self.context.clean(seqOp)
val cleanCombOp = self.context.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var local = self.mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = local.partitions.size
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
while (numPartitions > scale + numPartitions / scale) {
numPartitions /= scale
local = local.mapPartitionsWithIndex { (i, iter) =>
iter.map((i % numPartitions, _))
}.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values
}
local.reduce(cleanCombOp)
}
}

private[mllib]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,22 @@ class RDDFunctionsSuite extends FunSuite with LocalSparkContext {
val expected = data.flatMap(x => x).sliding(3).toList
assert(sliding.collect().toList === expected)
}

test("treeAggregate") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
def seqOp = (c: Long, x: Int) => c + x
def combOp = (c1: Long, c2: Long) => c1 + c2
for (level <- 1 until 10) {
val sum = rdd.treeAggregate(0L)(seqOp, combOp, level)
assert(sum === -1000L)
}
}

test("treeReduce") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
for (level <- 1 until 10) {
val sum = rdd.treeReduce(_ + _, level)
assert(sum === -1000)
}
}
}

0 comments on commit d58a087

Please sign in to comment.