Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4899][MESOS] Support for Checkpointing on Coarse Grained Mode #17750

Closed
wants to merge 5 commits into from

Conversation

gkc2104
Copy link

@gkc2104 gkc2104 commented Apr 24, 2017

Support for Mesos checkpointing
https://issues.apache.org/jira/browse/SPARK-4899
#60

What changes were proposed in this pull request?

Enabled checkpointing on Coarse grained mode

How was this patch tested?

Unit Tests ensure that the correct SchedulerDriver is created

Please review http://spark.apache.org/contributing.html before opening a pull request.

@lhoss
Copy link

lhoss commented Apr 25, 2017

would be great to have this soon in 2.2.x (maybe even backported to 2.1.x)
many accepted reviews already in metamx#26

@lins05
Copy link
Contributor

lins05 commented May 4, 2017

IMO we should not enable checkpointing in fine-grained mode. Because with checkpointing enabled, mesos agents would persist all status updates to disk which means great I/O cost because fine-grained mode makes use of mesos status updates to send the task results back to the driver.

Also I'm not sure whether it makes sense to set the failover_timeout or not. The framework timeout is designed for frameworks that can reconcile with mesos master of existing tasks when re-connected, but the mesos scheduler in spark doesn't implement that yet. Currently when the spark driver disconnects with the mesos master, the master would immediately remove the spark driver from the frameworks list, the spark mesos scheduler would immediately abort itself.

@gkc2104
Copy link
Author

gkc2104 commented May 4, 2017

Hey @lins05, thanks for taking the time to look into this.

Yes, It is true that there is an associated overhead in both modes, that's why the defaults have not been changed. i.e. Default behavior is not to checkpoint.
https://github.com/apache/spark/blob/v2.1.1/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala#L72
https://github.com/apache/mesos/blob/1.2.0/include/mesos/mesos.proto#L237
It still sets both checkpoint and failoverTimeout to None which gets false and 0.0 for defaults.

Setting failover_timeout is necessary as there has to be a max limit for how long an agent can be considered to be given back to a failing task.

And considering that this is being used in the latest version I guess the Spark Driver does support it.
https://github.com/apache/spark/blob/v2.1.1/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala#L315

@lins05
Copy link
Contributor

lins05 commented May 5, 2017

Yes, It is true that there is an associated overhead in both modes, that's why the defaults have not been changed. i.e. Default behavior is not to checkpoint.

The overhead in fine-grained mode would be much heavier than coarse grained mode. For example, with checkpoint enabled, each time you run rdd.collect() on an 100MB RDD, the mesos agent where the executor runs would write 100MB to disk and delete it after the driver acknowledge the message.

In contrast, in the coarse grained mode the executor would send the 100MB data to the driver directly without going through mesos agents. The only thing that agents write to disk are small task status messages like TASK_RUNNING/TASK_KILLED which are typically several KBbytes.

Setting failover_timeout is necessary as there has to be a max limit for how long an agent can be considered to be given back to a failing task.

And considering that this is being used in the latest version I guess the Spark Driver does support it.
https://github.com/apache/spark/blob/v2.1.1/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala#L315

The code in your link is the mesos cluster scheduler, which is a mesos framework that launches spark drivers for you, not the mesos scheduler inside the spark driver that launches executors.

It has checkpoint and failover_timout set so that the spark drivers managed by it won't be killed even if itself is restarted/killed. If you look at the code of MesosClusterScheduler.regsitered method you can see it calls driver.reconcileTasks(), which is how it achieves that. In contrast you can't the call to reconcileTasks in e.g. MesosCoarseGrainedSchedulerBackend.

@gkc2104
Copy link
Author

gkc2104 commented May 6, 2017

In contrast, in the coarse grained mode the executor would send the 100MB data to the driver directly without going through mesos agents. The only thing that agents write to disk are small task status messages like TASK_RUNNING/TASK_KILLED which are typically several KBbytes.

Do you then think it would be a viable option to enable it by default on Coarse grained and have it not used in Fine-grained.

The code in your link is the mesos cluster scheduler, which is a mesos framework that launches spark drivers for you, not the mesos scheduler inside the spark driver that launches executors.

This makes sense now, I definitely did not consider this , but this explains it.

In contrast you can't the call to reconcileTasks in e.g. MesosCoarseGrainedSchedulerBackend.

Could you expand on this a bit more, I assume we could maintain the state of the tasks similar to how driver state is maintained in MesosClusterScheduler, and accordingly update state at crucial points, like start.

I'll start implementing that, if you think we could enable it to reconcileTasks with state.

@lins05
Copy link
Contributor

lins05 commented May 12, 2017

Do you then think it would be a viable option to enable it by default on Coarse grained and have it not used in Fine-grained.

SGTM, especially considering fine-grained mode is already deprecated.

Could you expand on this a bit more, I assume we could maintain the state of the tasks similar to how driver state is maintained in MesosClusterScheduler, and accordingly update state at crucial points, like start.
I'll start implementing that, if you think we could enable it to reconcileTasks with state.

I don't think it's an easy task at all, because the spark driver is not designed to recover from crash.

The state in the MesosClusterScheduler is pretty simple. It's just a REST server that accepts requests from clients and launches spark drivers on their behalf. And it just need to persist its mesos framework id, because it need to re-register with mesos master with the same framework id if it's restarted. In the current implementation MesosClusterScheduler uses zookeeper as the persist storage. Aside from that, the MesosClusterScheduler has no other stateful information.

The spark driver is totally different, because it contains lots of stateful information: the job/stage/task info, executors info, catalog that holds temporary views, to name a few. And all those are kept in the driver's memory and would be lost whenever the driver crashes. So it doesn't make sense to set failover_timeout at all, because spark driver doesn't support fail over.

@mgummelt
Copy link
Contributor

  1. Please don't add any features to fine-grained mode. It's deprecated.

  2. The linked JIRA, as well as the title for this PR, only refer to checkpointing, not failover timeouts, which are a different feature. These two features should be separate JIRAs and separate PRs.

  3. As @lins05 has explained, the Spark driver is not designed to recover from a crash. Recovery is not as simple as adding a failover timeout.

I am looking at solving a problem where an intermittent network partition can result in the driver being killed unnecessarily, and it's possible that adding a failover_timeout will solve that, but I'm still looking into that.

@gkc2104
Copy link
Author

gkc2104 commented May 24, 2017

Updated the PR to only include checkpointing on Coarse grained mode.

<td><code>spark.mesos.checkpoint</code></td>
<td>false</td>
<td>
If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk.
Copy link
Contributor

@mgummelt mgummelt May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's customize this copy a bit for Spark instead of just copying the protobuf docs. e.g. "tasks" should be "executors" and you should remove the part about "this framework", in place of something about Spark in particular.

@@ -158,7 +158,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
Option.empty,
Option.empty,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not to touch this.

Copy link
Contributor

@mgummelt mgummelt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comments

@@ -520,7 +520,7 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.mesos.checkpoint</code></td>
<td>false</td>
<td>
If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk.
If set to true, the agents that are running the spark-executors will write framework pids (Spark), executor pids and status updates to disk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

s/spark/Spark

remove the '-'

remove "(Spark)". All of this data applies to Spark, not just the framework pid.

s/pids/pid (there's only one framework)

@@ -56,4 +56,9 @@ package object config {
.stringConf
.createOptional

private[spark] val CHECKPOINT =
ConfigBuilder("spark.mesos.checkpoint")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add .doc like the others.

@mgummelt
Copy link
Contributor

LGTM

@srowen Can we get a merge? Thanks.

@mgummelt
Copy link
Contributor

Actually, first, @gkc2104 can you please remove "fine-grained mode" from the PR title?

@gkc2104 gkc2104 changed the title [SPARK-4899][MESOS] Support for checkpointing on Coarse and Fine grai… [SPARK-4899][MESOS] Support for Checkpointing on Coarse Grained Mode May 24, 2017
<td><code>spark.mesos.checkpoint</code></td>
<td>false</td>
<td>
If set to true, the agents that are running the Spark executors will write the framework pid, executor pids and status updates to disk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the mesos agents

@tegataiprime
Copy link

tegataiprime commented May 31, 2017

@gkc2104 @mgummelt Will there be a separate issue & pr for adding the failover_timeout?
(SPARK-7877 ?)

@lins05
Copy link
Contributor

lins05 commented Jun 8, 2017

ping @srowen, i think this PR is ready to merge

@susanxhuynh
Copy link
Contributor

@gkc2104 @lins05 I have created a separate PR for configuring the driver's failover_timeout: #18674 I would appreciate a review if you are interested.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@gkc2104 gkc2104 closed this Oct 7, 2019
@gkc2104 gkc2104 deleted the enableMesosCheckpointing-squash branch October 7, 2019 16:24
@tushar-rishav
Copy link

@srowen @mgummelt @gkc2104 Just curious, why did we not merge this? Or has this feature been addressed already elsewhere? I couldn't find it anywhere in the latest codebase and documentation. Accept my apology in advance if this feature is merged already as being a beginner in Spark, I am still unaware of all the features.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants