Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5484][GraphX] Periodically do checkpoint in Pregel #15125

Closed
wants to merge 20 commits into from

Conversation

dding3
Copy link
Contributor

@dding3 dding3 commented Sep 17, 2016

What changes were proposed in this pull request?

Pregel-based iterative algorithms with more than ~50 iterations begin to slow down and eventually fail with a StackOverflowError due to Spark's lack of support for long lineage chains.

This PR causes Pregel to checkpoint the graph periodically if the checkpoint directory is set.
This PR moves PeriodicGraphCheckpointer.scala from mllib to graphx, moves PeriodicRDDCheckpointer.scala, PeriodicCheckpointer.scala from mllib to core

How was this patch tested?

unit tests, manual tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

@SparkQA
Copy link

SparkQA commented Sep 17, 2016

Test build #65519 has finished for PR 15125 at commit 720a741.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 17, 2016

Test build #65521 has finished for PR 15125 at commit b06b22c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 17, 2016

Test build #65522 has finished for PR 15125 at commit 01523cb.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 17, 2016

Test build #65523 has finished for PR 15125 at commit 70f826d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor

mallman commented Jan 17, 2017

Hi @dding3. Thanks for working on this! I was able to rebase and apply your patch to our build of Spark 2.1 to successfully compute the connected components of a graph with 5.2 billion vertices, 3.7 billion edges and 2.4 billion connected components. It took 832 iterations, but without your patch it stalled around iteration number 330.

Can you please rebase your patch? I will take up a review of this PR when you've had a chance to rebase.

Cheers.

@dding3
Copy link
Contributor Author

dding3 commented Jan 17, 2017

Sure. I will work on the rebase and update the PR soon.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71556 has finished for PR 15125 at commit e786838.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor

mallman commented Jan 18, 2017

LGTM. @srowen, can you recommend an mllib committer to review these changes? I'm not familiar with that team.

@srowen
Copy link
Member

srowen commented Jan 18, 2017

I don't know. @ankurdave ? I am not sure this is being maintained in any meaningful sense now.

}
if (data.edges.getStorageLevel == StorageLevel.NONE) {
data.edges.persist()
data.persist()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, GraphImpl.persist actually persists its vertices and replicatedVertexView.edges (i.e., edges). But the problem is we only check the storage level of vertices. Maybe we should keep the original.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I enhanced the persistence tests in PeriodicGraphCheckpointSuite to check that the storage level requested in the graph construction is the storage level seen after persistence. Both this version and the original version of this method failed that unit test.

The graph's vertex and edge rdds are somewhat peculiar in that .cache() and .persist() do not do the same thing, unlike other RDDs. And while .cache() honors the default storage level specified at graph construction time, .persist() always caches with the MEMORY_ONLY storage level.

At any rate, getting the PeriodicGraphCheckpointer to honor the default storage level specified at graph construction time requires changing these method calls from persist() to cache().

var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg).cache()
val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
checkpointInterval, graph.vertices.sparkContext)
messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update it before the loop? I think it should be enough to do the update in the loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. What do you think, @dding3?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, for the sake of simplicity and consistency, I'm going to suggest we keep the checkpointer update calls but remove all .cache() calls. The update calls persist the underlying data, making the calls to .cache() unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need cache graph/messages here so they don't need to be computed again in the loop. I agree with you and I will keep the checkpointer update calls and remove all .cache calls.

var activeMessages = messages.count()

// Loop
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
prevG = g
g = g.joinVertices(messages)(vprog).cache()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpointer will do persist. So I think we don't need to call cache here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this may be more subtle than I thought. I'm going to think through this again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I agree with your observation here, @viirya.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@viirya
Copy link
Member

viirya commented Feb 10, 2017

@mallman Thanks for ping me on this. The changes looks good to me overall. The most issue might be as @srowen said, I don't know if this is actively maintained still.

I saw most recently commits to GraphX coming from @ankurdave, @tdas, @dbtsai.

@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this test suite isn't moved into the GraphX codebase?

@@ -23,7 +23,7 @@ import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.mllib.util.MLlibTestSparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this file isn't moved into the core codebase?

@@ -76,7 +77,7 @@ import org.apache.spark.storage.StorageLevel
*
* TODO: Move this out of MLlib?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be removed.

@mallman
Copy link
Contributor

mallman commented Feb 10, 2017

@viirya @dding3 I'm going to rerun our big connected components computation with the changes I've suggested to validate that it still performs and completes as expected. Given the time required to complete this calculation (10+ hours), I might not get to it until next week.

@dding3
Copy link
Contributor Author

dding3 commented Feb 12, 2017

Thank you guys for reviewing the code. I have updated it based on the comments.

@SparkQA
Copy link

SparkQA commented Feb 12, 2017

Test build #72748 has finished for PR 15125 at commit a25d00c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor

mallman commented Feb 13, 2017

@dding3 These latest changes look great. I'll run our big connected components job today and report back.

@mallman
Copy link
Contributor

mallman commented Apr 11, 2017

@felixcheung ping

@mallman
Copy link
Contributor

mallman commented Apr 18, 2017

@srowen Can you merge this PR, please? It's been over a month since we've heard from any of the reviewers.

@srowen
Copy link
Member

srowen commented Apr 18, 2017

I feel like i really don't know anything about graphx and can't evaluate this. It seems reasonable. I don't know if graphx is really active at this stage?

@mallman
Copy link
Contributor

mallman commented Apr 18, 2017

I feel like i really don't know anything about graphx and can't evaluate this. It seems reasonable. I don't know if graphx is really active at this stage?

Understood. Let me respond to your concerns.

First, this PR has undergone extensive review and revision from people who have experience with this codebase, including from someone (me) who's successfully scaled it to a pregel computation with 800+ iterations. Without this patch, that computation is impossible. I'm not a committer, but as someone who's spent a large amount of time in the guts of GraphX I believe I'm qualified to sign off on this PR.

Second, the consumers of the GraphX codebase are dependent on support of it. I understand if the Spark team doesn't want to support it anymore, but in that case I think the responsible thing to do would be to split it out and put it into its own repo. That would allow the interested parties to maintain it independently of Spark. So long as GraphX is part of Spark, the only way to maintain it is through the Spark project.

There seems to be some sentiment that GraphX is somehow deprecated or a dead end project, yet I do not see a viable alternative. Even then, it would be very helpful to officially deprecate the project before ending support of it.

Thanks.

@srowen
Copy link
Member

srowen commented Apr 18, 2017

Yeah, catch-22. I'd also like to split out graphx, but I sorta think that's what's already happened with GraphFrames. I don't feel strongly enough to campaign for it, but think graphx should probably be deprecated soon.

It's probably a fine change and don't mind merging it if there are no objections.

@mallman
Copy link
Contributor

mallman commented Apr 18, 2017

Yeah, catch-22. I'd also like to split out graphx, but I sorta think that's what's already happened with GraphFrames. I don't feel strongly enough to campaign for it, but think graphx should probably be deprecated soon.

Unfortunately, GraphFrames does not currently scale to the level we need. Because of fundamental differences between RDDs and DataFrames in the way they can be extended, I do not see that changing anytime soon. For those reasons, GraphX remains essential to large-scale consumers. But certainly I'm all for splitting it out into its own repo if it means the community can better maintain it.

Right now we're keeping some patches on our own private branch, but since there's community interest in this patch (small as it may be) and a significant amount of work done on this PR, I hope we can merge this in.

@felixcheung
Copy link
Member

felixcheung commented Apr 18, 2017 via email

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dding3 a few comments - could you take a look and update?

* Call this to unpersist the Dataset.
*/
def unpersistDataSet(): Unit = {
while (persistedQueue.nonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it's the goal but this isn't thread-safe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree with you, that this is not thread safe. Is that a concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the limited internal only use, it should be ok

@@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.mllib.impl
package org.apache.spark.util
Copy link
Member

@felixcheung felixcheung Apr 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's PeriodicRDDCheckpointer, shouldn't this be in the org.apache.spark.rdd.util namespace?

@@ -122,27 +125,39 @@ object Pregel extends Logging {
require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
s" but got ${maxIterations}")

var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
val checkpointInterval = graph.vertices.sparkContext.getConf
.getInt("spark.graphx.pregel.checkpointInterval", 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -87,10 +87,10 @@ private[mllib] class PeriodicGraphCheckpointer[VD, ED](

override protected def persist(data: Graph[VD, ED]): Unit = {
if (data.vertices.getStorageLevel == StorageLevel.NONE) {
data.vertices.persist()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mallman @dding3 it will be good to add a comment on that here

@felixcheung
Copy link
Member

I think we want to bring GraphFrames to feature/performance parity with GraphX - @mallman would love to understand the challenges you have run into. Better yet, would be great to get some issues created to track them

@mallman
Copy link
Contributor

mallman commented Apr 20, 2017

I think we want to bring GraphFrames to feature/performance parity with GraphX - @mallman would love to understand the challenges you have run into. Better yet, would be great to get some issues created to track them

Regarding the scale/performance of GraphX vis-a-vis GraphFrames, I can speak from our experience with the connected components algorithm.

As you know, there are two implementations of the connected components algorithm in the GraphFrames project. There's an implementation which "piggybacks" on GraphX. And there's an implementation that does not use GraphX.

We don't really need any vertex or edge attributes when computing connected components. Any such attributes are strictly overhead. We found that using vertex and edge attributes of type Boolean and value null provide the least overhead. I would expect piggybacking on top of GraphX cannot scale better than using GraphX itself. And we just didn't get performance parity using GraphFrames in this way.

GraphX uses custom implementations of the RDD interface with fast point indices for lookups and joins. By contrast, the Dataset interface is closed to extension by clients, and that's by design.

Considering the problem that way, I think that bringing a Dataset-based graph library to performance parity with an RDD-based graph library will be quite challenging. This is especially true in cases where we the client doesn't even need vertex or edge attributes.

I think that to even get to performance parity, Spark SQL needs to include support for some kind of columnar indices. But even if GraphFrames implements a better algorithm for connected components than that in GraphX, would that algorithm perform better in GraphX if it was ported to that codebase?

We'd love to use something like GraphFrames pervasively, as it does provide a much more convenient interface when we do use vertex or edge attributes. In fact, before we discovered GraphFrames we made quite a lot of headway into building our own graph library of the sort. However, we found that the overhead incurred by the DataFrame approach (we took) was untenable.

We use a sort of hybrid approach. We do everything except graph computations with dataframes. We convert to RDDs for graph computations.

Cheers.

@SparkQA
Copy link

SparkQA commented Apr 20, 2017

Test build #75968 has finished for PR 15125 at commit 9a6fd1f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2017

Test build #76003 has finished for PR 15125 at commit 5015b44.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

thanks @mallman I'll digest this and see what improvements we could make in gf.

@felixcheung
Copy link
Member

felixcheung commented Apr 21, 2017

@dding3 @mallman one last thing to consider, with this PR we are having

spark.graphx.pregel.checkpointInterval

defaulting to 10. This changes the existing behavior (where there is no checkpointing). Do you think it will be lower risk to have this default to -1 hence off by default?

I think this goes along with the earlier discussion of whether this should be documented in release note.

@dding3
Copy link
Contributor Author

dding3 commented Apr 22, 2017

OK, agreed. If user didn't set checkpointer directory while we turn on checkpoint in pregel by default, there may be exception. I will change spark.graphx.pregel.checkpointInterval to -1 as default value.

@SparkQA
Copy link

SparkQA commented Apr 22, 2017

Test build #76069 has finished for PR 15125 at commit 24d4ad6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

messages are periodically checkpoint and the checkpoint interval is set by
"spark.graphx.pregel.checkpointInterval", it can be disable by set as -1):
of its implementation (note: to avoid stackOverflowError due to long lineage chains, pregel support periodcally
checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can recommend a good value (say 10 was the earlier default) to set to since now it defaults to off
Also, good point about checkpointdir - would be good to mention that is required be set as well and link to any doc we have on that

@SparkQA
Copy link

SparkQA commented Apr 24, 2017

Test build #76088 has finished for PR 15125 at commit ec62659.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -709,7 +709,8 @@ messages remaining.

The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch*
of its implementation (note: to avoid stackOverflowError due to long lineage chains, pregel support periodcally
checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval"):
checkpoint graph and messages by setting "spark.graphx.pregel.checkpointInterval" to a positive number,
say 10. And set checkpoint directory as well using SparkContext.setCheckpointDir(directory: String)):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this suggestion seems a bit soft :) how did it come up with the value 10 earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reference the value in LDA and other ml algorithms in spark, by default their checkpointInterval is set to 10.

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
@mallman @srowen any more thought?

asfgit pushed a commit that referenced this pull request Apr 25, 2017
## What changes were proposed in this pull request?

Pregel-based iterative algorithms with more than ~50 iterations begin to slow down and eventually fail with a StackOverflowError due to Spark's lack of support for long lineage chains.

This PR causes Pregel to checkpoint the graph periodically if the checkpoint directory is set.
This PR moves PeriodicGraphCheckpointer.scala from mllib to graphx, moves PeriodicRDDCheckpointer.scala, PeriodicCheckpointer.scala from mllib to core
## How was this patch tested?

unit tests, manual tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: ding <ding@localhost.localdomain>
Author: dding3 <ding.ding@intel.com>
Author: Michael Allman <michael@videoamp.com>

Closes #15125 from dding3/cp2_pregel.

(cherry picked from commit 0a7f5f2)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
@felixcheung
Copy link
Member

merged to master/2.2
I talked to Ankur about this change offline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants