Skip to content

Commit

Permalink
Refactor DriverClient to be more Actor-based
Browse files Browse the repository at this point in the history
  • Loading branch information
aarondav committed Dec 25, 2013
1 parent bbc3628 commit 61372b1
Showing 1 changed file with 31 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,86 +17,43 @@

package org.apache.spark.deploy.client

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent._

import akka.actor._
import akka.actor.Actor.emptyBehavior
import akka.pattern.ask
import akka.remote.RemotingLifecycleEvent

import org.apache.spark.Logging
import org.apache.spark.deploy.{DeployMessage, DriverDescription}
import org.apache.spark.deploy.DriverDescription
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Actor that sends a single message to the standalone master and then shuts down.
* Actor that sends a single message to the standalone master and returns the response in the
* given promise.
*/
private[spark] abstract class SingleMessageClient(
actorSystem: ActorSystem, master: String, message: DeployMessage)
extends Logging {

// Concrete child classes must implement
def handleResponse(response: Any)

var actor: ActorRef = actorSystem.actorOf(Props(new DriverActor()))

class DriverActor extends Actor with Logging {
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
logInfo("Sending message to master " + master + "...")
val masterActor = context.actorSelection(Master.toAkkaUrl(master))
val timeoutDuration: FiniteDuration = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
val submitFuture = masterActor.ask(message)(timeoutDuration)
handleResponse(Await.result(submitFuture, timeoutDuration))
actorSystem.stop(actor)
actorSystem.shutdown()
class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
override def receive = {
case SubmitDriverResponse(success, message) => {
response.success((success, message))
}

override def receive = emptyBehavior
}
}

/**
* Submits a driver to the master.
*/
private[spark] class SubmissionClient(actorSystem: ActorSystem, master: String,
driverDescription: DriverDescription)
extends SingleMessageClient(actorSystem, master, RequestSubmitDriver(driverDescription)) {

override def handleResponse(response: Any) {
val resp = response.asInstanceOf[SubmitDriverResponse]
if (!resp.success) {
logError(s"Error submitting driver to $master")
logError(resp.message)
case KillDriverResponse(success, message) => {
response.success((success, message))
}
}
}

/**
* Terminates a client at the master.
*/
private[spark] class TerminationClient(actorSystem: ActorSystem, master: String, driverId: String)
extends SingleMessageClient(actorSystem, master, RequestKillDriver(driverId)) {

override def handleResponse(response: Any) {
val resp = response.asInstanceOf[KillDriverResponse]
if (!resp.success) {
logError(s"Error terminating $driverId at $master")
logError(resp.message)
// Relay all other messages to the server.
case message => {
logInfo(s"Sending message to master $master...")
val masterActor = context.actorSelection(Master.toAkkaUrl(master))
masterActor ! message
}
}
}

/**
* Executable utility for starting and terminating drivers inside of a standalone cluster.
*/
object DriverClient {
object DriverClient extends Logging {

def main(args: Array[String]) {
val driverArgs = new DriverClientArguments(args)
Expand All @@ -105,6 +62,9 @@ object DriverClient {
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0)
val master = driverArgs.master
val response = promise[(Boolean, String)]
val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))

driverArgs.cmd match {
case "launch" =>
Expand All @@ -116,13 +76,22 @@ object DriverClient {
driverArgs.driverOptions,
driverArgs.driverJavaOptions,
driverArgs.driverEnvVars)
val client = new SubmissionClient(actorSystem, driverArgs.master, driverDescription)
driver ! RequestSubmitDriver(driverDescription)

case "kill" =>
val master = driverArgs.master
val driverId = driverArgs.driverId
val client = new TerminationClient(actorSystem, master, driverId)
driver ! RequestKillDriver(driverId)
}

val (success, message) =
try {
Await.result(response.future, AkkaUtils.askTimeout)
} catch {
case e: TimeoutException => (false, s"Master $master failed to respond in time")
}
if (success) logInfo(message) else logError(message)
actorSystem.stop(driver)
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}

0 comments on commit 61372b1

Please sign in to comment.