Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into get-struct
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 24, 2015
2 parents ec40d23 + c2467da commit 19907cf
Show file tree
Hide file tree
Showing 64 changed files with 1,419 additions and 276 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

val key = if (!isLocal && scheme == "file") {
env.httpFileServer.addFile(new File(uri.getPath))
env.rpcEnv.fileServer.addFile(new File(uri.getPath))
} else {
schemeCorrectedPath
}
Expand Down Expand Up @@ -1630,7 +1630,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
var key = ""
if (path.contains("\\")) {
// For local paths with backslashes on Windows, URI throws an exception
key = env.httpFileServer.addJar(new File(path))
key = env.rpcEnv.fileServer.addJar(new File(path))
} else {
val uri = new URI(path)
key = uri.getScheme match {
Expand All @@ -1644,7 +1644,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// of the AM to make it show up in the current working directory.
val fileName = new Path(uri.getPath).getName()
try {
env.httpFileServer.addJar(new File(fileName))
env.rpcEnv.fileServer.addJar(new File(fileName))
} catch {
case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
Expand All @@ -1655,7 +1655,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
} else {
try {
env.httpFileServer.addJar(new File(uri.getPath))
env.rpcEnv.fileServer.addJar(new File(uri.getPath))
} catch {
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
Expand Down
14 changes: 0 additions & 14 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class SparkEnv (
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val memoryManager: MemoryManager,
Expand All @@ -91,7 +90,6 @@ class SparkEnv (
if (!isStopped) {
isStopped = true
pythonWorkers.values.foreach(_.stop())
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
Expand Down Expand Up @@ -367,17 +365,6 @@ object SparkEnv extends Logging {

val cacheManager = new CacheManager(blockManager)

val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
null
}

val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
Expand Down Expand Up @@ -422,7 +409,6 @@ object SparkEnv extends Logging {
blockTransferService,
blockManager,
securityManager,
httpFileServer,
sparkFilesDir,
metricsSystem,
memoryManager,
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.rpc

import java.io.File
import java.nio.channels.ReadableByteChannel

import scala.concurrent.Future

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -132,8 +135,51 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
* that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method.
*/
def deserialize[T](deserializationAction: () => T): T

/**
* Return the instance of the file server used to serve files. This may be `null` if the
* RpcEnv is not operating in server mode.
*/
def fileServer: RpcEnvFileServer

/**
* Open a channel to download a file from the given URI. If the URIs returned by the
* RpcEnvFileServer use the "spark" scheme, this method will be called by the Utils class to
* retrieve the files.
*
* @param uri URI with location of the file.
*/
def openChannel(uri: String): ReadableByteChannel

}

/**
* A server used by the RpcEnv to server files to other processes owned by the application.
*
* The file server can return URIs handled by common libraries (such as "http" or "hdfs"), or
* it can return "spark" URIs which will be handled by `RpcEnv#fetchFile`.
*/
private[spark] trait RpcEnvFileServer {

/**
* Adds a file to be served by this RpcEnv. This is used to serve files from the driver
* to executors when they're stored on the driver's local file system.
*
* @param file Local file to serve.
* @return A URI for the location of the file.
*/
def addFile(file: File): String

/**
* Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using
* `SparkContext.addJar`.
*
* @param file Local file to serve.
* @return A URI for the location of the file.
*/
def addJar(file: File): String

}

private[spark] case class RpcEnvConfig(
conf: SparkConf,
Expand Down
60 changes: 57 additions & 3 deletions core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.rpc.akka

import java.io.File
import java.nio.channels.ReadableByteChannel
import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Future
Expand All @@ -30,7 +32,7 @@ import akka.pattern.{ask => akkaAsk}
import akka.remote.{AssociationEvent, AssociatedEvent, DisassociatedEvent, AssociationErrorEvent}
import akka.serialization.JavaSerializer

import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.{HttpFileServer, Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.rpc._
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, ThreadUtils}

Expand All @@ -41,7 +43,10 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, ThreadUtils}
* remove Akka from the dependencies.
*/
private[spark] class AkkaRpcEnv private[akka] (
val actorSystem: ActorSystem, conf: SparkConf, boundPort: Int)
val actorSystem: ActorSystem,
val securityManager: SecurityManager,
conf: SparkConf,
boundPort: Int)
extends RpcEnv(conf) with Logging {

private val defaultAddress: RpcAddress = {
Expand All @@ -64,6 +69,8 @@ private[spark] class AkkaRpcEnv private[akka] (
*/
private val refToEndpoint = new ConcurrentHashMap[RpcEndpointRef, RpcEndpoint]()

private val _fileServer = new AkkaFileServer(conf, securityManager)

private def registerEndpoint(endpoint: RpcEndpoint, endpointRef: RpcEndpointRef): Unit = {
endpointToRef.put(endpoint, endpointRef)
refToEndpoint.put(endpointRef, endpoint)
Expand Down Expand Up @@ -223,6 +230,7 @@ private[spark] class AkkaRpcEnv private[akka] (

override def shutdown(): Unit = {
actorSystem.shutdown()
_fileServer.shutdown()
}

override def stop(endpoint: RpcEndpointRef): Unit = {
Expand All @@ -241,6 +249,52 @@ private[spark] class AkkaRpcEnv private[akka] (
deserializationAction()
}
}

override def openChannel(uri: String): ReadableByteChannel = {
throw new UnsupportedOperationException(
"AkkaRpcEnv's files should be retrieved using an HTTP client.")
}

override def fileServer: RpcEnvFileServer = _fileServer

}

private[akka] class AkkaFileServer(
conf: SparkConf,
securityManager: SecurityManager) extends RpcEnvFileServer {

@volatile private var httpFileServer: HttpFileServer = _

override def addFile(file: File): String = {
getFileServer().addFile(file)
}

override def addJar(file: File): String = {
getFileServer().addJar(file)
}

def shutdown(): Unit = {
if (httpFileServer != null) {
httpFileServer.stop()
}
}

private def getFileServer(): HttpFileServer = {
if (httpFileServer == null) synchronized {
if (httpFileServer == null) {
httpFileServer = startFileServer()
}
}
httpFileServer
}

private def startFileServer(): HttpFileServer = {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
server
}

}

private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {
Expand All @@ -249,7 +303,7 @@ private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
config.name, config.host, config.port, config.conf, config.securityManager)
actorSystem.actorOf(Props(classOf[ErrorMonitor]), "ErrorMonitor")
new AkkaRpcEnv(actorSystem, config.conf, boundPort)
new AkkaRpcEnv(actorSystem, config.securityManager, config.conf, boundPort)
}
}

Expand Down
Loading

0 comments on commit 19907cf

Please sign in to comment.