Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-2099. Report progress while task is running. #1056

Closed
wants to merge 7 commits into from

Conversation

sryza
Copy link
Contributor

@sryza sryza commented Jun 11, 2014

This is a sketch of a patch that allows the UI to show metrics for tasks that have not yet completed. It adds a heartbeat every 2 seconds from the executors to the driver, reporting metrics for all of the executor's tasks.

It still needs unit tests, polish, and cluster testing, but I wanted to put it up to get feedback on the approach.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15690/

@jerryshao
Copy link
Contributor

Hi @sryza , two quick questions:

  1. Will this add additional overhead to Spark run-time, especially for Spark Streaming jobs in which batchDuration is quite short?
  2. Some metrics like shuffle write metrics will only be updated before the task is finished, so fetch these metrics in every 2 seconds will always get 0.

Sorry if I miss something.

@sryza
Copy link
Contributor Author

sryza commented Jun 12, 2014

Will this add additional overhead to Spark run-time, especially for Spark Streaming jobs in which batchDuration is quite short?

It will add overheard, as it's another RPC, but it should be tiny. The overheard isn't affected by the streaming batch duration or number of tasks of tasks that run - we just take a snapshot of the metrics for any running tasks on the node every 2 seconds. If tasks are started frequently, that traffic will far exceed the heartbeat traffic.

Some metrics like shuffle write metrics will only be updated before the task is finished, so fetch these metrics in every 2 seconds will always get 0.

This patch doesn't rip out the existing metrics reports that accompany task completions, so metrics will still end up collected even for tasks that start and finish in between heartbeats.

@rxin
Copy link
Contributor

rxin commented Jun 12, 2014

I haven't looked into your implementation yet, but the block manager also sends heartbeats back to the driver. Perhaps we can consolidate heartbeats.

@sryza
Copy link
Contributor Author

sryza commented Jun 20, 2014

I've pushed a little further on this, and I'm a little confused about the right way to proceed with respect to Mesos.

Currently statusUpdates go through the ExecutorBackend interface. My plan was to add heartbeats to this interface as well. With the CoarseGrainedExecutorBackend, statusUpdates and other RPCs go to the driver through actors. So adding a heartbeat message is relatively straightforward. But with the MesosExecutorBackend, statusUpdates go through a Mesos-specific interface and RPC framework that I don't see how to add a message to.

Any guidance on the right thing to do here? One option would be to have an actor separate from the ExecutorBackend interface responsible just for heartbeats. It would route them to the scheduler and BlockManagerMaster.

@pwendell
Copy link
Contributor

Hey @sryza, that status update exists in the CoarseGrainedExecutorBackend interface mostly for historical reasons. The very first scheduler Spark had was mesos, and mesos offered a way to send control plan messages (like "my task is finished") through its own messaging system. So this control plan messaging in Spark was coupled with the cluster scheduler.

These things don't really need to be coupled and longer term I'd actually like to see all of the control plane messages go directly from the executor to the driver to keep it simpler and avoid fragmentation between deploy modes.

For your patch, I'd circumvent this interface rather than extending it.

Instead, I'd just piggy back your status updates on top of the existing block manager heartbeat that goes directly to the driver. We've already started cramming other stuff in there because it's the one control-plan message that gets sent from executors to the driver that is not the task status message. At least, that's the first attempt I'd make at this patch.

A better solution would be to add a general heartbeat actor on the executor, similar to the one in the block manager, but a shared one that would heartbeat on behalf of all the components. That might not be too bad either.

@sryza
Copy link
Contributor Author

sryza commented Jun 20, 2014

Thanks, that makes sense.

The block manager stuff seems to be pretty self-contained. The only data included in the block manager heartbeat is the block manager ID, and the rest of the block manager RPCs concern block-related happenings. So my inclination is to not muck this up with task data and to add a general heartbeat actor.

@pwendell
Copy link
Contributor

Sure - it would be great to add a general heartbeat mechanism that is shared between this and the blockmanager.

@sryza
Copy link
Contributor Author

sryza commented Jun 25, 2014

Uploaded a new patch that adds a general executor->driver heartbeat. With the patch, observed jobs running fine on a pseudo-distributed yarn cluster.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16106/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16107/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16112/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16119/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1056:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class SparkListenerExecutorMetricsUpdate(
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17641/consoleFull

@andrewor14
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1056. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17646/consoleFull


val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskBytesSpilled
def updateAggregateMetrics(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a javadoc for this?

@pwendell
Copy link
Contributor

pwendell commented Aug 1, 2014

Sandy - I took a pass on this. Mostly minor comments, but I did propose lowering the default frequency from 2 seconds. Overall this is looking in good shape.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1056:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class SparkListenerExecutorMetricsUpdate(
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17646/consoleFull

@sryza
Copy link
Contributor Author

sryza commented Aug 1, 2014

Thanks @pwendell and @andrewor14 for your continued reviews.

10 seconds sounds fine to me. Not that it's a shining beacon of performance, but MapReduce actually uses task->application master heartbeats in exactly the same way. I.e. it doesn't rely on them for them for starting or stopping tasks. MR AMs will actually receive heartbeats more frequently than Spark drivers, as there's one per task instead of one per executor. I just checked and the interval there is 3 seconds.

It might be best to base the interval on the number of executors, but that's probably work for a separate patch.

@pwendell
Copy link
Contributor

pwendell commented Aug 1, 2014

Yeah I think it's fine to put that too another patch and just make it something a bit more on the conservative side (10 seconds) for now.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1056. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17667/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1056:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class SparkListenerExecutorMetricsUpdate(
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17667/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1056. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17668/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1056:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class SparkListenerExecutorMetricsUpdate(
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17668/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 1, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 1056. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17669/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 1056:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class SparkListenerExecutorMetricsUpdate(
case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17669/consoleFull

@pwendell
Copy link
Contributor

pwendell commented Aug 1, 2014

Looks good Sandy, I'm going to merge this.

@asfgit asfgit closed this in 8d338f6 Aug 1, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This is a sketch of a patch that allows the UI to show metrics for tasks that have not yet completed.  It adds a heartbeat every 2 seconds from the executors to the driver, reporting metrics for all of the executor's tasks.

It still needs unit tests, polish, and cluster testing, but I wanted to put it up to get feedback on the approach.

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#1056 from sryza/sandy-spark-2099 and squashes the following commits:

93b9fdb [Sandy Ryza] Up heartbeat interval to 10 seconds and other tidying
132aec7 [Sandy Ryza] Heartbeat and HeartbeatResponse are already Serializable as case classes
38dffde [Sandy Ryza] Additional review feedback and restore test that was removed in BlockManagerSuite
51fa396 [Sandy Ryza] Remove hostname race, add better comments about threading, and some stylistic improvements
3084f10 [Sandy Ryza] Make TaskUIData a case class again
3bda974 [Sandy Ryza] Stylistic fixes
0dae734 [Sandy Ryza] SPARK-2099. Report progress while task is running.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants