Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Dynamic allocation (#272)
Browse files Browse the repository at this point in the history
* dynamic allocation: shuffle service docker, yaml and test fixture

* dynamic allocation: changes to spark-core

* dynamic allocation: tests

* dynamic allocation: docs

* dynamic allocation: kubernetes allocator and executor accounting

* dynamic allocation: shuffle service, node caching
  • Loading branch information
foxish committed Jul 24, 2017
1 parent 2af7f05 commit 20956e7
Show file tree
Hide file tree
Showing 18 changed files with 683 additions and 60 deletions.
53 changes: 53 additions & 0 deletions conf/kubernetes-shuffle-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

apiVersion: extensions/v1beta1
kind: DaemonSet
metadata:
labels:
app: spark-shuffle-service
spark-version: 2.1.0
name: shuffle
spec:
template:
metadata:
labels:
app: spark-shuffle-service
spark-version: 2.1.0
spec:
volumes:
- name: temp-volume
hostPath:
path: '/var/tmp' # change this path according to your cluster configuration.
containers:
- name: shuffle
# This is an official image that is built
# from the dockerfiles/shuffle directory
# in the spark distribution.
image: kubespark/spark-shuffle:v2.1.0-kubernetes-0.1.0-alpha.3
volumeMounts:
- mountPath: '/tmp'
name: temp-volume
# more volumes can be mounted here.
# The spark job must be configured to use these
# mounts using the configuration:
# spark.kubernetes.shuffle.dir=<mount-1>,<mount-2>,...
resources:
requests:
cpu: "1"
limits:
cpu: "1"
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(executorId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage
case class RetrieveSparkAppConfig(executorId: String) extends CoarseGrainedClusterMessage

case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeExecutor(executorId, reason)
context.reply(true)

case RetrieveSparkAppConfig =>
case RetrieveSparkAppConfig(executorId) =>
val reply = SparkAppConfig(sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey())
context.reply(reply)
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,14 @@ private[spark] class BlockManager(
blockManagerId = if (idFromMaster != null) idFromMaster else id

shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
val shuffleServerHostName = if (blockManagerId.isDriver) {
blockTransferService.hostName
} else {
conf.get("spark.shuffle.service.host", blockTransferService.hostName)
}
logInfo(s"external shuffle service host = $shuffleServerHostName, " +
s"port = $externalShuffleServicePort")
BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort)
} else {
blockManagerId
}
Expand Down
66 changes: 64 additions & 2 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ from the other deployment modes. See the [configuration page](configuration.html
<td>
The namespace that will be used for running the driver and executor pods. When using
<code>spark-submit</code> in cluster mode, this can also be passed to <code>spark-submit</code> via the
<code>--kubernetes-namespace</code> command line argument. The namespace must already exist.
<code>--kubernetes-namespace</code> command line argument.
</td>
</tr>
<tr>
Expand All @@ -208,6 +208,37 @@ from the other deployment modes. See the [configuration page](configuration.html
<a href="https://docs.docker.com/engine/reference/commandline/tag/">Docker tag</a> format.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.shuffle.namespace</code></td>
<td><code>default</code></td>
<td>
Namespace in which the shuffle service pods are present. The shuffle service must be
created in the cluster prior to attempts to use it.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.shuffle.labels</code></td>
<td><code>(none)</code></td>
<td>
Labels that will be used to look up shuffle service pods. This should be a comma-separated list of label key-value pairs,
where each label is in the format <code>key=value</code>. The labels chosen must be such that
they match exactly one shuffle service pod on each node that executors are launched.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.allocation.batch.size</code></td>
<td><code>5</code></td>
<td>
Number of pods to launch at once in each round of executor pod allocation.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.allocation.batch.delay</code></td>
<td><code>1</code></td>
<td>
Number of seconds to wait between each round of executor pod allocation.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.submission.caCertFile</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -389,10 +420,41 @@ from the other deployment modes. See the [configuration page](configuration.html
</tr>
</table>

## Dynamic Executor Scaling

Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running
an external shuffle service. This is typically a [daemonset](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
with a provisioned [hostpath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volume.
This shuffle service may be shared by executors belonging to different SparkJobs. Using Spark with dynamic allocation
on Kubernetes assumes that a cluster administrator has set up one or more shuffle-service daemonsets in the cluster.

A sample configuration file is provided in `conf/kubernetes-shuffle-service.yaml` which can be customized as needed
for a particular cluster. It is important to note that `spec.template.metadata.labels` are setup appropriately for the shuffle
service because there may be multiple shuffle service instances running in a cluster. The labels give us a way to target a particular
shuffle service.

For example, if the shuffle service we want to use is in the default namespace, and
has pods with labels `app=spark-shuffle-service` and `spark-version=2.1.0`, we can
use those tags to target that particular shuffle service at job launch time. In order to run a job with dynamic allocation enabled,
the command may then look like the following:

bin/spark-submit \
--deploy-mode cluster \
--class org.apache.spark.examples.GroupByTest \
--master k8s://<k8s-master>:<port> \
--kubernetes-namespace default \
--conf spark.app.name=group-by-test \
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:latest \
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:latest \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.kubernetes.shuffle.namespace=default \
--conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \
examples/jars/spark_examples_2.11-2.2.0.jar 10 400000 2

## Current Limitations

Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that
should be lifted in the future include:
* Applications can only use a fixed number of executors. Dynamic allocation is not supported.
* Applications can only run in cluster mode.
* Only Scala and Java applications can be run.
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object BuildCommons {
"core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe",
"tags", "sketch"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects

val optionallyEnabledProjects@Seq(mesos, yarn, java8Tests, sparkGangliaLgpl,
streamingKinesisAsl, dockerIntegrationTests, kubernetes, _*) =
Seq("mesos", "yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests",
Expand Down
6 changes: 3 additions & 3 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ important matters to keep in mind when developing this feature.

# Building Spark with Kubernetes Support

To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven.
To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile
the Kubernetes core implementation module along with its dependencies:

git checkout branch-2.1-kubernetes
build/mvn package -Pkubernetes -DskipTests
build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests

To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the
`kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes

import org.apache.spark.SparkException

object ConfigurationUtils {
def parseKeyValuePairs(
maybeKeyValues: Option[String],
configKey: String,
keyValueType: String): Map[String, String] = {

maybeKeyValues.map(keyValues => {
keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
keyValue.split("=", 2).toSeq match {
case Seq(k, v) =>
(k, v)
case _ =>
throw new SparkException(s"Custom $keyValueType set by $configKey must be a" +
s" comma-separated list of key-value pairs, with format <key>=<value>." +
s" Got value: $keyValue. All values: $keyValues")
}
}).toMap
}).getOrElse(Map.empty[String, String])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val SPARK_SHUFFLE_SERVICE_HOST =
ConfigBuilder("spark.shuffle.service.host")
.doc("Host for Spark Shuffle Service")
.internal()
.stringConf
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
Expand Down Expand Up @@ -270,6 +277,44 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_NAMESPACE =
ConfigBuilder("spark.kubernetes.shuffle.namespace")
.doc("Namespace of the shuffle service")
.stringConf
.createWithDefault("default")

private[spark] val KUBERNETES_SHUFFLE_SVC_IP =
ConfigBuilder("spark.kubernetes.shuffle.ip")
.doc("This setting is for debugging only. Setting this " +
"allows overriding the IP that the executor thinks its colocated " +
"shuffle service is on")
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_LABELS =
ConfigBuilder("spark.kubernetes.shuffle.labels")
.doc("Labels to identify the shuffle service")
.stringConf
.createOptional

private[spark] val KUBERNETES_SHUFFLE_DIR =
ConfigBuilder("spark.kubernetes.shuffle.dir")
.doc("Path to the shared shuffle directories.")
.stringConf
.createOptional

private[spark] val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of dynamic allocation. ")
.intConf
.createWithDefault(5)

private[spark] val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation. ")
.longConf
.createWithDefault(1)

private[spark] val DRIVER_SERVICE_MANAGER_TYPE =
ConfigBuilder("spark.kubernetes.driver.serviceManagerType")
.doc("A tag indicating which class to use for creating the Kubernetes service and" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,5 @@ package object constants {
s"$INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
private[spark] val DOWNLOAD_JARS_VOLUME_NAME = "download-jars"
private[spark] val DOWNLOAD_FILES_VOLUME_NAME = "download-files"
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
}
Loading

0 comments on commit 20956e7

Please sign in to comment.