Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-588]: Full support for Spark on Kubernetes #167

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@
# If the Livy Web UI should be included in the Livy Server. Enabled by default.
# livy.ui.enabled = true

# Used to build links to Spark History Server pages on Spark App completion (Kubernetes only)
# livy.ui.history-server-url = http://spark-history-server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to rely on SHS? It may also not being present/enabled....can we avoid this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May make sense to add 'enabled' flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is useful for setting the URL for completed applications to match the K8s ingress host so that URLs for completed applications (from Livy UI) can redirect to history server. Not sure about the behavior with YARN, but for K8s it seems useful to have it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, the comment I guess about the behaviour when we don't have HS set up. In that case we can:
A) do not display link
B) display dummy link
I would prefer A here, but for that we there should be introduced additional config flag, eg.: history-server.enabled = true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, I'd do this in a separate PR, where we compare what is done for for YARN and we replicate the same for K8s, or if that is not feasible, we can add links to the SHS, but we might want to do that for YARN too then. Hence I think a separate PR for this feature would be better, while we focus here only on basic support to K8s.


# Whether to enable Livy server access control, if it is true then all the income requests will
# be checked if the requested user has permission.
# livy.server.access-control.enabled = false
Expand Down Expand Up @@ -185,3 +188,55 @@
# livy.server.auth.<custom>.class = <class of custom auth filter>
# livy.server.auth.<custom>.param.<foo1> = <bar1>
# livy.server.auth.<custom>.param.<foo2> = <bar2>

# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount
# if deployed to Kubernetes cluster as a Pod)
# Kubernetes oauth token file path
# livy.server.kubernetes.oauthTokenFile =
# Kubernetes oauth token string value
# livy.server.kubernetes.oauthTokenValue =
# Kubernetes CA cert file path
# livy.server.kubernetes.caCertFile =
# Kubernetes client key file path
# livy.server.kubernetes.clientKeyFile =
# Kubernetes client cert file path
# livy.server.kubernetes.clientCertFile =

# If Livy can't find the Kubernetes app within this time, consider it lost.
# livy.server.kubernetes.app-lookup-timeout = 600s
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
# livy.server.kubernetes.app-leakage.check-timeout = 600s
# How often to check livy session leakage
# livy.server.kubernetes.app-leakage.check-interval = 60s

# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description
# details, routes, etc...)
# livy.server.kubernetes.poll-interval = 15s

# Weather to create Kubernetes Nginx Ingress for Spark UI. If set to true, configure the desired
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Whether, not Weather.

# options below
# livy.server.kubernetes.ingress.create = false
# Kubernetes Nginx Ingress protocol. If set to https refer Ingress TLS section below
# livy.server.kubernetes.ingress.protocol = http
# Kubernetes Nginx Ingress host. Be sure to set it to the FQDN of your Nginx Ingress Controller
# proxy server
# livy.server.kubernetes.ingress.host = localhost
# Kubernetes secret name for Nginx Ingress TLS. Is omitted if 'livy.server.kubernetes.ingress.protocol'
# is not https
# livy.server.kubernetes.ingress.tls.secretName = spark-cluster-tls
# Kubernetes Nginx Ingress additional configuration snippet for specific config options
# livy.server.kubernetes.ingress.additionalConfSnippet =
# Kubernetes Nginx Ingress additional annotations for specific config options, eg. for configuring
# basic auth of external oauth2 proxy. Format: annotation1=value1;annotation2=value2;...
# livy.server.kubernetes.ingress.additionalAnnotations =

# Set to true to enable Grafana Loki integration and configure options below
livy.server.kubernetes.grafana.loki.enabled = false
# Grafana UI root endpoint to build links based on
# livy.server.kubernetes.grafana.url = http://localhost:3000
# Grafana Datasource name for Loki
# livy.server.kubernetes.grafana.loki.datasource = loki
# Time range from now to past to get logs for
# livy.server.kubernetes.grafana.timeRange = 6h
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
<spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>4.6.4</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.9</commons-codec.version>
<httpclient.version>4.5.3</httpclient.version>
Expand Down Expand Up @@ -316,6 +317,18 @@
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down Expand Up @@ -1075,6 +1088,8 @@
https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
</spark.bin.download.url>
<spark.bin.name>spark-3.0.0-bin-hadoop2.7</spark.bin.name>
<kubernetes.client.version>4.9.2</kubernetes.client.version>
<jackson.version>2.10.3</jackson.version>
</properties>
</profile>

Expand Down
8 changes: 8 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ private void initializeServer() throws Exception {
// on the cluster, it would be tricky to solve that problem in a generic way.
livyConf.set(RPC_SERVER_ADDRESS, null);

// If we are running on Kubernetes, get RPC_SERVER_ADDRESS from "spark.driver.host" option
// this option is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
// line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
if (conf.get("spark.master").startsWith("k8s")) {
livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
}


if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
// Test flag is turned on so we will just infinite loop here. It should cause
// timeout and we should still see yarn application being cleaned up.
Expand Down
5 changes: 5 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@
<version>${zookeeper.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ td .progress {
margin: 0;
}

.with-scroll-bar {
display: block;
overflow-y: scroll;
max-height: 200px;
}

#session-summary {
margin: 20px 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function loadSessionsTable(sessions) {
tdWrap(session.proxyUser) +
tdWrap(session.kind) +
tdWrap(session.state) +
tdWrap(logLinks(session, "session")) +
tdWrapWithClass(logLinks(session, "session"), "with-scroll-bar") +
"</tr>"
);
});
Expand All @@ -42,7 +42,7 @@ function loadBatchesTable(sessions) {
tdWrap(session.owner) +
tdWrap(session.proxyUser) +
tdWrap(session.state) +
tdWrap(logLinks(session, "batch")) +
tdWrapWithClass(logLinks(session, "batch"), "with-scroll-bar") +
"</tr>"
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,23 @@ function driverLogLink(session) {
}
}

function executorsLogLinks(session) {
var executorLogUrls = session.appInfo.executorLogUrls;
if (executorLogUrls != null) {
return executorLogUrls.split(";").map(function (pair) {
var nameAndLink = pair.split("#");
return divWrap(anchorLink(nameAndLink[1], nameAndLink[0]));
}).join("");
} else {
return "";
}
}

function logLinks(session, kind) {
var sessionLog = divWrap(uiLink(kind + "/" + session.id + "/log", "session"));
var driverLog = divWrap(driverLogLink(session));
return sessionLog + driverLog;
var executorsLogs = executorsLogLinks(session);
return sessionLog + driverLog + executorsLogs;
}

function appIdLink(session) {
Expand All @@ -75,6 +88,18 @@ function tdWrap(val) {
return "<td>" + inner + "</td>";
}

function tdWrapWithClass(val, cl) {
var inner = "";
if (val != null) {
inner = val;
}
var clVal = "";
if (cl != null) {
clVal = " class=\"" + cl + "\"";
}
return "<td" + clVal + ">" + inner + "</td>";
}

function preWrap(inner) {
return "<pre>" + escapeHtml(inner) + "</pre>";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ function sumWrap(name, val) {
}
}

function sumWrapWithClass(name, val, cl) {
var clVal = "";
if (cl != null) {
clVal = " class=\"" + cl + "\"";
}
if (val != null) {
return "<li" + clVal + "><strong>" + name + ": </strong>" + val + "</li>";
} else {
return "";
}
}

function formatError(output) {
var errStr = output.evalue + "\n";
var trace = output.traceback;
Expand Down Expand Up @@ -93,7 +105,7 @@ function appendSummary(session) {
sumWrap("Proxy User", session.proxyUser) +
sumWrap("Session Kind", session.kind) +
sumWrap("State", session.state) +
sumWrap("Logs", logLinks(session, "session")) +
sumWrapWithClass("Logs", logLinks(session, "session"), "with-scroll-bar") +
"</ul>"
);
}
Expand Down
50 changes: 50 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ object LivyConf {
val SERVER_BASE_PATH = Entry("livy.ui.basePath", "")

val UI_ENABLED = Entry("livy.ui.enabled", true)
val UI_HISTORY_SERVER_URL = Entry("livy.ui.history-server-url", "http://spark-history-server")

val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072)
val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072)
Expand Down Expand Up @@ -249,6 +250,52 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")

// Kubernetes oauth token file path.
val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "")
// Kubernetes oauth token string value.
val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "")
// Kubernetes CA cert file path.
val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
// Kubernetes client key file path.
val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "")
// Kubernetes client cert file path.
val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "")

// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

// How long to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
// How often to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")

// Weather to create Kubernetes Nginx Ingress for Spark UI.
val KUBERNETES_INGRESS_CREATE = Entry("livy.server.kubernetes.ingress.create", false)
// Kubernetes Nginx Ingress protocol.
val KUBERNETES_INGRESS_PROTOCOL = Entry("livy.server.kubernetes.ingress.protocol", "http")
// Kubernetes Nginx Ingress host.
val KUBERNETES_INGRESS_HOST = Entry("livy.server.kubernetes.ingress.host", "localhost")
// Kubernetes Nginx Ingress additional configuration snippet.
val KUBERNETES_INGRESS_ADDITIONAL_CONF_SNIPPET =
Entry("livy.server.kubernetes.ingress.additionalConfSnippet", "")
// Kubernetes Nginx Ingress additional annotations: key1=value1;key2=value2;... .
val KUBERNETES_INGRESS_ADDITIONAL_ANNOTATIONS =
Entry("livy.server.kubernetes.ingress.additionalAnnotations", "")
// Kubernetes secret name for Nginx Ingress TLS.
// Is omitted if 'livy.server.kubernetes.ingress.protocol' value doesn't end with 's'
val KUBERNETES_INGRESS_TLS_SECRET_NAME =
Entry("livy.server.kubernetes.ingress.tls.secretName", "spark-cluster-tls")

val KUBERNETES_GRAFANA_LOKI_ENABLED = Entry("livy.server.kubernetes.grafana.loki.enabled", false)
val KUBERNETES_GRAFANA_URL = Entry("livy.server.kubernetes.grafana.url", "http://localhost:3000")
val KUBERNETES_GRAFANA_LOKI_DATASOURCE =
Entry("livy.server.kubernetes.grafana.loki.datasource", "loki")
val KUBERNETES_GRAFANA_TIME_RANGE = Entry("livy.server.kubernetes.grafana.timeRange", "6h")

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down Expand Up @@ -360,6 +407,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")

/** Return true if spark master starts with k8s. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
/** Return true if spark master starts with k8s. */
/** Returns true if spark master starts with k8s. */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything else is commented with Return. Consistency?

def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")

/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

Expand Down
15 changes: 9 additions & 6 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.livy.server

import java.io.{BufferedInputStream, InputStream}
import java.net.InetAddress
import java.util.concurrent._
import java.util.EnumSet
import java.util.concurrent._
import javax.servlet._

import scala.collection.JavaConverters._
Expand All @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
import org.apache.livy.utils.SparkYarnApp

class LivyServer extends Logging {

Expand Down Expand Up @@ -142,10 +142,13 @@ class LivyServer extends Logging {

testRecovery(livyConf)

// Initialize YarnClient ASAP to save time.
// Initialize YarnClient/KubernetesClient ASAP to save time.
if (livyConf.isRunningOnYarn()) {
SparkYarnApp.init(livyConf)
Future { SparkYarnApp.yarnClient }
} else if (livyConf.isRunningOnKubernetes()) {
SparkKubernetesApp.init(livyConf)
Future { SparkKubernetesApp.kubernetesClient }
}

if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
Expand Down Expand Up @@ -415,10 +418,10 @@ class LivyServer extends Logging {
}

private[livy] def testRecovery(livyConf: LivyConf): Unit = {
if (!livyConf.isRunningOnYarn()) {
// If recovery is turned on but we are not running on YARN, quit.
if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
// If recovery is turned on but we are not running on YARN or Kubernetes, quit.
require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF,
"Session recovery requires YARN.")
s"Session recovery requires YARN or Kubernetes.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,9 @@ class InteractiveSession(
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))

if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
if (!livyConf.isRunningOnKubernetes()){
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
None
Expand Down Expand Up @@ -481,6 +483,7 @@ class InteractiveSession(
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
if(livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
} catch {
case _: Exception =>
app.foreach {
Expand Down
Loading