Skip to content

Commit

Permalink
mv PeriodicRDDCheckpointer to rdd.util and explain why use cache in P…
Browse files Browse the repository at this point in the history
…eriodicGraphCheckpointer
  • Loading branch information
ding committed Apr 19, 2017
1 parent 11bc349 commit 9a6fd1f
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 8 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils}
import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
SamplingUtils}

Expand Down Expand Up @@ -1419,7 +1419,7 @@ abstract class RDD[T: ClassTag](
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
queue ++= collectionUtils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.util
package org.apache.spark.rdd.util

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.PeriodicCheckpointer


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers w
}

test("get a range of elements in an array not partitioned by a range partitioner") {
val pairArr = util.Random.shuffle((1 to 1000).toList).map(x => (x, x))
val pairArr = scala.util.Random.shuffle((1 to 1000).toList).map(x => (x, x))
val pairs = sc.parallelize(pairArr, 10)
val range = pairs.filterByRange(200, 800).collect()
assert((800 to 200 by -1).toArray.sorted === range.map(_._1).sorted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.util
package org.apache.spark.utils

import org.apache.hadoop.fs.Path

import org.apache.spark.{SharedSparkContext, SparkContext, SparkFunSuite}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.util.PeriodicRDDCheckpointer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils


class PeriodicRDDCheckpointerSuite extends SparkFunSuite with SharedSparkContext {
Expand Down
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.reflect.ClassTag
import org.apache.spark.graphx.util.PeriodicGraphCheckpointer
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.util.PeriodicRDDCheckpointer
import org.apache.spark.rdd.util.PeriodicRDDCheckpointer

/**
* Implements a Pregel-like bulk-synchronous message-passing API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ private[spark] class PeriodicGraphCheckpointer[VD, ED](

override protected def persist(data: Graph[VD, ED]): Unit = {
if (data.vertices.getStorageLevel == StorageLevel.NONE) {
/* We need to use cache because persist does not honor the default storage level requested
* when constructing the graph. Only cache does that.
*/
data.vertices.cache()
}
if (data.edges.getStorageLevel == StorageLevel.NONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.spark.mllib.tree.configuration.{BoostingStrategy => OldBoostin
import org.apache.spark.mllib.tree.impurity.{Variance => OldVariance}
import org.apache.spark.mllib.tree.loss.{Loss => OldLoss}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.util.PeriodicRDDCheckpointer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.PeriodicRDDCheckpointer


private[spark] object GradientBoostedTrees extends Logging {
Expand Down

0 comments on commit 9a6fd1f

Please sign in to comment.