Skip to content

Commit

Permalink
[SPARK-3649] Remove GraphX custom serializers
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Sep 23, 2014
1 parent f9d6220 commit a49c2ad
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 567 deletions.
14 changes: 6 additions & 8 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.RoutingTablePartition
import org.apache.spark.graphx.impl.ShippableVertexPartition
import org.apache.spark.graphx.impl.VertexAttributeBlock
import org.apache.spark.graphx.impl.RoutingTableMessageRDDFunctions._
import org.apache.spark.graphx.impl.VertexRDDFunctions._

/**
* Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by
Expand Down Expand Up @@ -233,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
case _ =>
this.withPartitionsRDD[VD3](
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
}
)
Expand Down Expand Up @@ -277,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
case _ =>
this.withPartitionsRDD(
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
}
)
Expand All @@ -297,7 +295,7 @@ class VertexRDD[@specialized VD: ClassTag](
*/
def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
val shuffled = messages.copartitionWithVertices(this.partitioner.get)
val shuffled = messages.partitionBy(this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
Expand Down Expand Up @@ -371,7 +369,7 @@ object VertexRDD {
def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size))
}
val vertexPartitions = vPartitioned.mapPartitions(
iter => Iterator(ShippableVertexPartition(iter)),
Expand Down Expand Up @@ -412,7 +410,7 @@ object VertexRDD {
): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size))
case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size))
}
val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
Expand Down Expand Up @@ -454,7 +452,7 @@ object VertexRDD {
.setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")

val numEdgePartitions = edges.partitions.size
vid2pid.copartitionWithVertices(vertexPartitioner).mapPartitions(
vid2pid.partitionBy(vertexPartitioner).mapPartitions(
iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
preservesPartitioning = true)
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,6 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap

import org.apache.spark.graphx.impl.RoutingTablePartition.RoutingTableMessage

private[graphx]
class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) {
/** Copartition an `RDD[RoutingTableMessage]` with the vertex RDD with the given `partitioner`. */
def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = {
new ShuffledRDD[VertexId, Int, Int](
self, partitioner).setSerializer(new RoutingTableMessageSerializer)
}
}

private[graphx]
object RoutingTableMessageRDDFunctions {
import scala.language.implicitConversions

implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = {
new RoutingTableMessageRDDFunctions(rdd)
}
}

private[graphx]
object RoutingTablePartition {
/**
Expand Down
Loading

0 comments on commit a49c2ad

Please sign in to comment.