Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3427] [GraphX] Avoid active vertex tracking in static PageRank #2308

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,43 @@ object PageRank extends Logging {
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{
// Initialize the pagerankGraph with each edge attribute having
// Initialize the PageRank graph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[Double, Double] = graph
var rankGraph: Graph[Double, Double] = graph
// Associate the degree with each vertex
.outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
// Set the weight on the edges based on the degree
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => 1.0 )
.cache()
.mapVertices( (id, attr) => resetProb )

// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
// The initial message received by all vertices in PageRank
val initialMessage = 0.0
var iteration = 0
var prevRankGraph: Graph[Double, Double] = null
while (iteration < numIter) {
rankGraph.cache()

// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)(
vertexProgram, sendMessage, messageCombiner)
// Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
// do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
val rankUpdates = rankGraph.mapReduceTriplets[Double](
e => Iterator((e.dstId, e.srcAttr * e.attr)), _ + _)

// Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
// that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
// edge partitions.
prevRankGraph = rankGraph
rankGraph = rankGraph.joinVertices(rankUpdates) {
(id, oldRank, msgSum) => resetProb + (1.0 - resetProb) * msgSum
}.cache()

rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
logInfo(s"PageRank finished iteration $iteration.")
prevRankGraph.vertices.unpersist(false)
prevRankGraph.edges.unpersist(false)

iteration += 1
}

rankGraph
}

/**
Expand Down