Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Replace submission v1 with submission v2. #286

Merged
merged 3 commits into from
May 23, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions conf/kubernetes-resource-staging-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
---
apiVersion: extensions/v1beta1
kind: Deployment
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is Deployment compatible with k8s 1.5 (which as of now is "prior version" of k8s)?

Copy link
Member

@foxish foxish May 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. maybe we should use extensions.v1beta1/Deployment instead of the apps group here. The apps group move happened in 1.6 (https://github.com/kubernetes/kubernetes/blob/master/CHANGELOG.md#deployment-2). I'll verify with the team about the plan for this moving forward.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Deployment from the group extensions/v1beta1 will work on 1.5, 1.6 and 1.7, and we'll probably deprecate it in 1.8. So, I think we can use that for now, and not apps/v1beta1.

metadata:
name: spark-resource-staging-server
spec:
replicas: 1
template:
metadata:
labels:
resource-staging-server-instance: default
spec:
volumes:
- name: resource-staging-server-properties
configMap:
name: spark-resource-staging-server-config
containers:
- name: spark-resource-staging-server
image: kubespark/spark-resource-staging-server:v2.1.0-kubernetes-0.1.0-alpha.3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this image exist already?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resources:
requests:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without a limit section, we will have the pods belong to the burstable QoS class. Is this the intention?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added limits.

cpu: 100m
memory: 256Mi
limits:
cpu: 100m
memory: 256Mi
volumeMounts:
- name: resource-staging-server-properties
mountPath: '/etc/spark-resource-staging-server'
args:
- '/etc/spark-resource-staging-server/resource-staging-server.properties'
---
apiVersion: v1
kind: ConfigMap
metadata:
name: spark-resource-staging-server-config
data:
resource-staging-server.properties: |
spark.kubernetes.resourceStagingServer.port=10000
spark.ssl.kubernetes.resourceStagingServer.enabled=false
# Other possible properties are listed below, primarily for setting up TLS. The paths given by KeyStore, password, and PEM files here should correspond to
# files that are securely mounted into the resource staging server container, via e.g. secret volumes.
# spark.ssl.kubernetes.resourceStagingServer.keyStore=/mnt/secrets/resource-staging-server/keyStore.jks
# spark.ssl.kubernetes.resourceStagingServer.keyStorePassword=changeit
# spark.ssl.kubernetes.resourceStagingServer.keyPassword=changeit
# spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile=/mnt/secrets/resource-staging-server/keystore-password.txt
# spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile=/mnt/secrets/resource-staging-server/keystore-key-password.txt
# spark.ssl.kubernetes.resourceStagingServer.keyPem=/mnt/secrets/resource-staging-server/key.pem
# spark.ssl.kubernetes.resourceStagingServer.serverCertPem=/mnt/secrets/resource-staging-server/cert.pem
---
apiVersion: v1
kind: Service
metadata:
name: spark-resource-staging-service
spec:
type: NodePort
selector:
resource-staging-server-instance: default
ports:
- protocol: TCP
port: 10000
targetPort: 10000
nodePort: 31000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not typical to specify a nodeport, and should probably be called out explicitly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to make the documentation and deployment instructions simpler, basically removing the steps to find the NodePort.

Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ object SparkSubmit {
}

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.v1.Client"
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
childArgs += args.primaryResource
childArgs += args.mainClass
childArgs ++= args.childArgs
Expand Down
1 change: 0 additions & 1 deletion dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,3 @@ org.apache.spark.scheduler.ExternalClusterManager
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse
structured-streaming/*
org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager
416 changes: 266 additions & 150 deletions docs/running-on-kubernetes.md

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,24 @@
*/
package org.apache.spark.deploy.kubernetes

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream, InputStream, OutputStream}
import java.io.{File, FileInputStream, FileOutputStream, InputStream, OutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.google.common.io.Files
import org.apache.commons.codec.binary.Base64
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream, TarArchiveOutputStream}
import org.apache.commons.compress.utils.CharsetNames
import org.apache.commons.io.IOUtils
import scala.collection.mutable

import org.apache.spark.deploy.rest.kubernetes.v1.TarGzippedData
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ByteBufferOutputStream, Utils}
import org.apache.spark.util.Utils

private[spark] object CompressionUtils extends Logging {
// Defaults from TarArchiveOutputStream
private val BLOCK_SIZE = 10240
private val RECORD_SIZE = 512
private val ENCODING = CharsetNames.UTF_8

/**
* Compresses all of the given paths into a gzipped-tar archive, returning the compressed data in
* memory as an instance of {@link TarGzippedData}. The files are taken without consideration to
* their original folder structure, and are added to the tar archive in a flat hierarchy.
* Directories are not allowed, and duplicate file names are de-duplicated by appending a numeric
* suffix to the file name, before the file extension. For example, if paths a/b.txt and b/b.txt
* were provided, then the files added to the tar archive would be b.txt and b-1.txt.
* @param paths A list of file paths to be archived
* @return An in-memory representation of the compressed data.
*/
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
writeTarGzipToStream(raw, paths)
raw
}
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
TarGzippedData(
dataBase64 = compressedAsBase64,
blockSize = BLOCK_SIZE,
recordSize = RECORD_SIZE,
encoding = ENCODING
)
}

def writeTarGzipToStream(outputStream: OutputStream, paths: Iterable[String]): Unit = {
Utils.tryWithResource(new GZIPOutputStream(outputStream)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
Expand Down Expand Up @@ -98,50 +72,14 @@ private[spark] object CompressionUtils extends Logging {
}
}

/**
* Decompresses the provided tar archive to a directory.
* @param compressedData In-memory representation of the compressed data, ideally created via
* {@link createTarGzip}.
* @param rootOutputDir Directory to write the output files to. All files from the tarball
* are written here in a flat hierarchy.
* @return List of file paths for each file that was unpacked from the archive.
*/
def unpackAndWriteCompressedFiles(
compressedData: TarGzippedData,
rootOutputDir: File): Seq[String] = {
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
if (!rootOutputDir.exists) {
if (!rootOutputDir.mkdirs) {
throw new IllegalStateException(s"Failed to create output directory for unpacking" +
s" files at ${rootOutputDir.getAbsolutePath}")
}
} else if (rootOutputDir.isFile) {
throw new IllegalArgumentException(s"Root dir for writing decompressed files: " +
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
}
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
unpackTarStreamToDirectory(
compressedBytesStream,
rootOutputDir,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)
}
}

def unpackTarStreamToDirectory(
inputStream: InputStream,
outputDir: File,
blockSize: Int = BLOCK_SIZE,
recordSize: Int = RECORD_SIZE,
encoding: String = ENCODING): Seq[String] = {
def unpackTarStreamToDirectory(inputStream: InputStream, outputDir: File): Seq[String] = {
val paths = mutable.Buffer.empty[String]
Utils.tryWithResource(new GZIPInputStream(inputStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
blockSize,
recordSize,
encoding)) { tarInputStream =>
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(outputDir, nextTarEntry.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v2.{ContainerNameEqualityPredicate, InitContainerUtil}
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}

private[spark] trait SparkPodInitContainerBootstrap {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
Expand Down Expand Up @@ -212,77 +211,6 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.driverSubmissionTimeout")
.doc("Time to wait for the driver process to start running before aborting its execution.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(60L)

private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyStore")
.doc("KeyStore file for the driver submission server listening on SSL. Can be pre-mounted" +
" on the driver container or uploaded from the submitting client.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.trustStore")
.doc("TrustStore containing certificates for communicating to the driver submission server" +
" over SSL.")
.stringConf
.createOptional

private[spark] val DRIVER_SUBMIT_SSL_ENABLED =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.enabled")
.doc("Whether or not to use SSL when sending the application dependencies to the driver pod.")
.booleanConf
.createWithDefault(false)

private[spark] val DRIVER_SUBMIT_SSL_KEY_PEM =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyPem")
.doc("Key PEM file that the driver submission server will use when setting up TLS" +
" connections. Can be pre-mounted on the driver pod's disk or uploaded from the" +
" submitting client's machine.")
.stringConf
.createOptional

private[spark] val DRIVER_SUBMIT_SSL_SERVER_CERT_PEM =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.serverCertPem")
.doc("Certificate PEM file that is associated with the key PEM file" +
" the submission server uses to set up TLS connections. Can be pre-mounted" +
" on the driver pod's disk or uploaded from the submitting client's machine.")
.stringConf
.createOptional

private[spark] val DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.clientCertPem")
.doc("Certificate pem file that the submission client uses to connect to the submission" +
" server over TLS. This should often be the same as the server certificate, but can be" +
" different if the submission client will contact the driver through a proxy instead of" +
" the driver service directly.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
ConfigBuilder("spark.kubernetes.driver.service.name")
.doc("Kubernetes service that exposes the driver pod for external access.")
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY =
ConfigBuilder("spark.kubernetes.driver.submissionServerMemory")
.doc("The amount of memory to allocate for the driver submission server.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("256m")

private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT =
ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort")
.doc("Whether to expose the driver Web UI port as a service NodePort. Turned off by default" +
" because NodePort is a limited resource. Use alternatives if possible.")
.booleanConf
.createWithDefault(false)

private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
Expand Down Expand Up @@ -327,13 +255,6 @@ package object config extends Logging {
.longConf
.createWithDefault(1)

private[spark] val DRIVER_SERVICE_MANAGER_TYPE =
ConfigBuilder("spark.kubernetes.driver.serviceManagerType")
.doc("A tag indicating which class to use for creating the Kubernetes service and" +
" determining its URI for the submission client.")
.stringConf
.createWithDefault(NodePortUrisDriverServiceManager.TYPE)

private[spark] val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the" +
Expand All @@ -347,8 +268,7 @@ package object config extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

// Spark dependency server for submission v2

// Spark resource staging server.
private[spark] val RESOURCE_STAGING_SERVER_PORT =
ConfigBuilder("spark.kubernetes.resourceStagingServer.port")
.doc("Port for the Kubernetes resource staging server to listen on.")
Expand Down Expand Up @@ -451,7 +371,7 @@ package object config extends Logging {
.stringConf
.createOptional

// Driver and Init-Container parameters for submission v2
// Driver and Init-Container parameters
private[spark] val RESOURCE_STAGING_SERVER_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
.doc("Base URI for the Spark resource staging server.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import java.io.File
import java.util.Collections
Expand All @@ -25,8 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl}
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import java.io.File

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import java.lang.Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.kubernetes.v2.RetrofitClientFactoryImpl
import org.apache.spark.deploy.rest.kubernetes.RetrofitClientFactoryImpl
import org.apache.spark.util.Utils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import io.fabric8.kubernetes.api.model.{PodBuilder, Secret, SecretBuilder}
import scala.collection.JavaConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.submit.DriverPodKubernetesCredentialsProvider

private[spark] trait DriverPodKubernetesCredentialsMounterProvider {

Expand Down
Loading