Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-1149
Browse files Browse the repository at this point in the history
  • Loading branch information
liguoqiang committed Mar 7, 2014
2 parents b0d5c07 + 328c73d commit e3e56aa
Show file tree
Hide file tree
Showing 88 changed files with 2,595 additions and 389 deletions.
5 changes: 4 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
<artifactId>spark-assembly_2.10</artifactId>
<name>Spark Project Assembly</name>
<url>http://spark.apache.org/</url>
<packaging>pom</packaging>

<properties>
<spark.jar>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</spark.jar>
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
<spark.jar.basename>${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</spark.jar.basename>
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
<deb.user>root</deb.user>
Expand Down
11 changes: 11 additions & 0 deletions assembly/src/main/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
<include>**/*</include>
</includes>
</fileSet>
<fileSet>
<directory>
${project.parent.basedir}/assembly/target/${spark.jar.dir}
</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>${spark.jar.basename}</include>
</includes>
</fileSet>
</fileSets>

<dependencySets>
Expand All @@ -75,6 +84,8 @@
<excludes>
<exclude>org.apache.hadoop:*:jar</exclude>
<exclude>org.apache.spark:*:jar</exclude>
<exclude>org.apache.zookeeper:*:jar</exclude>
<exclude>org.apache.avro:*:jar</exclude>
</excludes>
</dependencySet>
</dependencySets>
Expand Down
16 changes: 16 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-plus</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
Expand Down Expand Up @@ -118,6 +130,10 @@
<artifactId>chill-java</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
Expand Down
28 changes: 24 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,30 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
if (storageLevel.useDisk && !storageLevel.useMemory) {
// In the case that this RDD is to be persisted using DISK_ONLY
// the iterator will be passed directly to the blockManager (rather then
// caching it to an ArrayBuffer first), then the resulting block data iterator
// will be passed back to the user. If the iterator generates a lot of data,
// this means that it doesn't all have to be held in memory at one time.
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
// blocks aren't dropped by the block store before enabling that.
blockManager.put(key, computedValues, storageLevel, tellMaster = true)
return blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
}
} finally {
loading.synchronized {
loading.remove(key)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.google.common.io.Files

import org.apache.spark.util.Utils

private[spark] class HttpFileServer extends Logging {
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {

var baseDir : File = null
var fileDir : File = null
Expand All @@ -38,9 +38,10 @@ private[spark] class HttpFileServer extends Logging {
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(baseDir)
httpServer = new HttpServer(baseDir, securityManager)
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
}

def stop() {
Expand Down
60 changes: 55 additions & 5 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package org.apache.spark

import java.io.File

import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.bio.SocketConnector
import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool

import org.apache.spark.util.Utils


/**
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
*/
Expand All @@ -38,7 +41,8 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
* around a Jetty server.
*/
private[spark] class HttpServer(resourceBase: File) extends Logging {
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
extends Logging {
private var server: Server = null
private var port: Int = -1

Expand All @@ -59,14 +63,60 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
server.setThreadPool(threadPool)
val resHandler = new ResourceHandler
resHandler.setResourceBase(resourceBase.getAbsolutePath)

val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
server.setHandler(handlerList)

if (securityManager.isAuthenticationEnabled()) {
logDebug("HttpServer is using security")
val sh = setupSecurityHandler(securityManager)
// make sure we go through security handler to get resources
sh.setHandler(handlerList)
server.setHandler(sh)
} else {
logDebug("HttpServer is not using security")
server.setHandler(handlerList)
}

server.start()
port = server.getConnectors()(0).getLocalPort()
}
}

/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
* isn't passed in plaintext.
*/
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
val constraint = new Constraint()
// use DIGEST-MD5 as the authentication mechanism
constraint.setName(Constraint.__DIGEST_AUTH)
constraint.setRoles(Array("user"))
constraint.setAuthenticate(true)
constraint.setDataConstraint(Constraint.DC_NONE)

val cm = new ConstraintMapping()
cm.setConstraint(constraint)
cm.setPathSpec("/*")
val sh = new ConstraintSecurityHandler()

// the hashLoginService lets us do a single user and
// secret right now. This could be changed to use the
// JAASLoginService for other options.
val hashLogin = new HashLoginService()

val userCred = new Password(securityMgr.getSecretKey())
if (userCred == null) {
throw new Exception("Error: secret key is null with authentication on")
}
hashLogin.putUser(securityMgr.getHttpUser(), userCred, Array("user"))
sh.setLoginService(hashLogin)
sh.setAuthenticator(new DigestAuthenticator());
sh.setConstraintMappings(Array(cm))
sh
}

def stop() {
if (server == null) {
throw new ServerStateException("Server is already stopped")
Expand Down
Loading

0 comments on commit e3e56aa

Please sign in to comment.