Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
[SPARK-21694][MESOS] Support Mesos CNI network labels
Browse files Browse the repository at this point in the history
JIRA ticket: https://issues.apache.org/jira/browse/SPARK-21694

Spark already supports launching containers attached to a given CNI network by specifying it via the config `spark.mesos.network.name`.

This PR adds support to pass in network labels to CNI plugins via a new config option `spark.mesos.network.labels`. These network labels are key-value pairs that are set in the `NetworkInfo` of both the driver and executor tasks. More details in the related Mesos documentation:  http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins

Unit tests, for both driver and executor tasks.
Manual integration test to submit a job with the `spark.mesos.network.labels` option, hit the mesos/state.json endpoint, and check that the labels are set in the driver and executor tasks.

ArtRand skonto

Author: Susan X. Huynh <xhuynh@mesosphere.com>

Closes apache#18910 from susanxhuynh/sh-mesos-cni-labels.
  • Loading branch information
susanxhuynh committed Jan 8, 2018
1 parent 6f9aa1b commit a407997
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 9 deletions.
14 changes: 14 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,20 @@ See the [configuration page](configuration.html) for information on Spark config
for more details.
</td>
</tr>
<tr>
<td><code>spark.mesos.network.labels</code></td>
<td><code>(none)</code></td>
<td>
Pass network labels to CNI plugins. This is a comma-separated list
of key-value pairs, where each key-value pair has the format key:value.
Example:

<pre>key1:val1,key2:val2</pre>
See
<a href="http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins">the Mesos CNI docs</a>
for more details.
</td>
</tr>
<tr>
<td><code>spark.mesos.fetcherCache.enable</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,33 @@ package object config {
.stringConf
.createWithDefault("")

private [spark] val DRIVER_LABELS =
private[spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional

private [spark] val DRIVER_FAILOVER_TIMEOUT =
private[spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)

private[spark] val NETWORK_NAME =
ConfigBuilder("spark.mesos.network.name")
.doc("Attach containers to the given named network. If this job is launched " +
"in cluster mode, also launch the driver in the given named network.")
.stringConf
.createOptional

private[spark] val NETWORK_LABELS =
ConfigBuilder("spark.mesos.network.labels")
.doc("Network labels to pass to CNI plugins. This is a comma-separated list " +
"of key-value pairs, where each key-value pair has the format key:value. " +
"Example: key1:val1,key2:val2")
.stringConf
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

private def executorHostname(offer: Offer): String = {
if (sc.conf.getOption("spark.mesos.network.name").isDefined) {
if (sc.conf.get(NETWORK_NAME).isDefined) {
// The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
"0.0.0.0"
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Vo
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME}
import org.apache.spark.internal.Logging

/**
Expand Down Expand Up @@ -161,8 +162,12 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
volumes.foreach(_.foreach(containerInfo.addVolumes(_)))
}

conf.getOption("spark.mesos.network.name").map { name =>
val info = NetworkInfo.newBuilder().setName(name).build()
conf.get(NETWORK_NAME).map { name =>
val networkLabels = MesosProtoUtils.mesosLabels(conf.get(NETWORK_LABELS).getOrElse(""))
val info = NetworkInfo.newBuilder()
.setName(name)
.setLabels(networkLabels)
.build()
containerInfo.addNetworkInfos(info)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
}

test("supports spark.mesos.network.name") {
test("supports spark.mesos.network.name and spark.mesos.network.labels") {
setScheduler()

val mem = 1000
Expand All @@ -234,7 +234,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.network.name" -> "test-network-name"),
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"),
"s1",
new Date()))

Expand All @@ -247,6 +248,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}

test("accept/decline offers with driver constraints") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getLabels.equals(taskLabels))
}

test("mesos supports spark.mesos.network.name") {
test("mesos supports spark.mesos.network.name and spark.mesos.network.labels") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"
))

val (mem, cpu) = (backend.executorMemory(sc), 4)
Expand All @@ -582,6 +583,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}

test("supports spark.scheduler.minRegisteredResourcesRatio") {
Expand Down

0 comments on commit a407997

Please sign in to comment.