Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job killing in Spark #935

Closed
wants to merge 12 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import org.apache.spark.util.CompletionIterator

private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {

override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
override def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
{

Expand Down Expand Up @@ -74,7 +78,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)

CompletionIterator[T, Iterator[T]](itr, {
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
Expand All @@ -83,7 +87,9 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
metrics.shuffleReadMetrics = Some(shuffleMetrics)
context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
})

new InterruptibleIterator[T](context, completionIter)
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(cachedValues) =>
// Partition is in cache, so just return its values
logInfo("Found partition in cache!")
return cachedValues.asInstanceOf[Iterator[T]]
return new InterruptibleIterator(context, cachedValues.asInstanceOf[Iterator[T]])

case None =>
// Mark the split as loading (unless someone else marks it first)
Expand All @@ -55,7 +55,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
// downside of the current code is that threads wait serially if this does happen.
blockManager.get(key) match {
case Some(values) =>
return values.asInstanceOf[Iterator[T]]
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
loading.add(key)
Expand Down
50 changes: 50 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureJob.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.util.concurrent.{ExecutionException, TimeUnit, Future}

import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

class FutureJob[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: () => T)
extends Future[T] {

override def isDone: Boolean = jobWaiter.jobFinished

override def cancel(mayInterruptIfRunning: Boolean): Boolean = {
jobWaiter.kill()
true
}

override def isCancelled: Boolean = {
throw new UnsupportedOperationException
}

override def get(): T = {
jobWaiter.awaitResult() match {
case JobSucceeded =>
resultFunc()
case JobFailed(e: Exception, _) =>
throw new ExecutionException(e)
}
}

override def get(timeout: Long, unit: TimeUnit): T = {
throw new UnsupportedOperationException
}
}
30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in TaskContext.
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

def hasNext: Boolean = !context.interrupted && delegate.hasNext

def next(): T = delegate.next()
}
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/ShuffleFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ private[spark] abstract class ShuffleFetcher {
* Fetch the shuffle outputs for a given ShuffleDependency.
* @return An iterator over the elements of the fetched shuffle outputs.
*/
def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T]

/** Stop the fetcher */
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.io._
import java.net.URI
import java.util.Properties
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.Map
Expand Down Expand Up @@ -812,6 +813,38 @@ class SparkContext(
result
}

def submitJob[T, U, R](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly not unique to this method, but this is another example of something that is publicly accessible in a SparkContext that probably shouldn't be. Is there any plan to provide a more restricted SparkContext API (for use, e.g., in the shell), or are we going to continue to assume that if we don't tell people about something, then they won't misuse it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually why wouldn't we want this till be accessible? It is at the same
level as runJob.
On Sep 18, 2013 9:50 AM, "Mark Hamstra" notifications@github.com wrote:

In core/src/main/scala/org/apache/spark/SparkContext.scala:

@@ -812,6 +813,38 @@ class SparkContext(
result
}

  • def submitJob[T, U, R](

Certainly not unique to this method, but this is another example of
something that is publicly accessible in a SparkContext that probably
shouldn't be. Is there any plan to provide a more restricted SparkContext
API (for use, e.g., in the shell), or are we going to continue to assume
that if we don't tell people about something, then they won't misuse it?


Reply to this email directly or view it on GitHubhttps://github.com//pull/935/files#r6439038
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, and we certainly want custom RDD developers and the like to be able to access runJob etc. My question, though, is whether we really want all users of, e.g., spark-shell to be able to call runJob, submitJob, etc. directly. It's something of a question of whether spark-shell is targeted at developers (who want to be able to do anything in the repl that they could in standalone code), or whether it is targeted at end users who just want/need to be able to do the kinds of higher-level things with RDDs that we typically show in examples. Maybe the issue isn't so much that, within SparkContext itself, there needs to be another access level for methods as it is that we could use another kind of interactive tool/UI (something like iPython Notebooks, perhaps...) that doesn't have full access to all the SparkContext methods that many users shouldn't really be messing with directly.

rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
partitionResultHandler: (Int, U) => Unit,
resultFunc: () => R): Future[R] =
{
val callSite = Utils.formatSparkCallSite
val waiter = dagScheduler.submitJob(
rdd,
(context: TaskContext, iter: Iterator[T]) => processPartition(iter),
partitions,
callSite,
allowLocal = false,
partitionResultHandler,
null)
new FutureJob(waiter, resultFunc)
}

/**
* Kill a running job.
*/
def killJob(jobId: Int) {
dagScheduler.killJob(jobId)
}

def killAllJobs() {
dagScheduler.activeJobs.foreach { job =>
killJob(job.jobId)
}
}

/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TaskContext(
val splitId: Int,
val attemptId: Long,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty()
) extends Serializable {

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ private[spark] case class ExceptionFailure(

private[spark] case class OtherFailure(message: String) extends TaskEndReason

private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
private[spark] case object TaskResultTooBigFailure extends TaskEndReason

private[spark] case object TaskKilled extends TaskEndReason
Loading