Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-702]: Submit Spark apps to Kubernetes #249

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,29 @@
# cause session leakage, so we need to check session leakage.
# How long to check livy session leakage
# livy.server.yarn.app-leakage.check-timeout = 600s
# how often to check livy session leakage
# How often to check livy session leakage
# livy.server.yarn.app-leakage.check-interval = 60s

# How often Livy polls YARN to refresh YARN app state.
# livy.server.yarn.poll-interval = 5s
#

# If Livy can't find the Kubernetes app within this time, consider it lost
# livy.server.kubernetes.app-lookup-timeout = 600s
# When the cluster is busy, we may fail to launch Kubernetes app in app-lookup-timeout, then it
# would cause session leakage, so we need to check session leakage
# How long to check livy session leakage
# livy.server.kubernetes.app-leakage.check-timeout = 600s
# How often to check livy session leakage
# livy.server.kubernetes.app-leakage.check-interval = 60s

# How often Livy polls KubeApiServer to refresh KubernetesApp state (Pods state, logs, description
# details, routes, etc...)
# livy.server.kubernetes.poll-interval = 15s

# Comma-separated list of the Kubernetes namespaces to allow for applications creation.
# All namespaces are allowed if empty
# livy.server.kubernetes.allowedNamespaces =

# Days to keep Livy server request logs.
# livy.server.request-log-retain.days = 5

Expand Down Expand Up @@ -189,6 +206,21 @@
# livy.server.auth.<custom>.param.<foo1> = <bar1>
# livy.server.auth.<custom>.param.<foo2> = <bar2>

# Manual authentication to KubeApiserver (by default configured with Kubernetes ServiceAccount
# if deployed to Kubernetes cluster as a Pod)
# Kubernetes oauth token file path
# livy.server.kubernetes.oauthTokenFile =
# Kubernetes oauth token string value
# livy.server.kubernetes.oauthTokenValue =
# Kubernetes CA cert file path
# livy.server.kubernetes.caCertFile =
# Kubernetes client key file path
# livy.server.kubernetes.clientKeyFile =
# Kubernetes client cert file path
# livy.server.kubernetes.clientCertFile =
# Kubernetes client default namespace
# livy.server.kubernetes.defaultNamespace =

# Enable to allow custom classpath by proxy user in cluster mode
# The below configuration parameter is disabled by default.
# livy.server.session.allow-custom-classpath = true
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<spark.scala-2.11.version>2.4.5</spark.scala-2.11.version>
<spark.scala-2.12.version>2.4.5</spark.scala-2.12.version>
<spark.version>${spark.scala-2.11.version}</spark.version>
<kubernetes.client.version>4.6.4</kubernetes.client.version>
<hive.version>3.0.0</hive.version>
<commons-codec.version>1.9</commons-codec.version>
<httpclient.version>4.5.13</httpclient.version>
Expand Down Expand Up @@ -319,6 +320,18 @@
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down Expand Up @@ -1165,6 +1178,8 @@
https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
</spark.bin.download.url>
<spark.bin.name>spark-3.0.0-bin-hadoop2.7</spark.bin.name>
<kubernetes.client.version>4.9.2</kubernetes.client.version>
<jackson.version>2.10.3</jackson.version>
</properties>
</profile>

Expand Down
7 changes: 7 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import javax.security.sasl.Sasl;

Expand Down Expand Up @@ -151,6 +152,12 @@ public String findLocalAddress() throws IOException {
return address.getCanonicalHostName();
}

public boolean isRunningOnKubernetes() {
return Optional.ofNullable(get("livy.spark.master"))
.filter(s -> s.startsWith("k8s"))
.isPresent();
}

Comment on lines +155 to +160

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function may always return false, since "livy.spark.master" will not get by RSCConf

Copy link
Contributor

@idzikovsky idzikovsky Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seems like that's what I've faced in #249 (comment)

However, it seems like it's not affecting functionality, as this function is used while setting RPC_SERVER_ADDRESS here:
https://github.com/apache/incubator-livy/pull/249/files/b87c0cebb65ce7f34e6b4b6b738095be6254cf69#diff-43114318c4b009c2404f7eb326a84c184fb1501a3237c49a771df851d0f6f328R172-R178

And the value of RPC_SERVER_ADDRESS is not used anyway since Livy 0.7 because of things I've explained in #388.

private static final Map<String, DeprecatedConf> configsWithAlternatives
= Collections.unmodifiableMap(new HashMap<String, DeprecatedConf>() {{
put(RSCConf.Entry.CLIENT_IN_PROCESS.key, DepConf.CLIENT_IN_PROCESS);
Expand Down
7 changes: 7 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/driver/RSCDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ private void initializeServer() throws Exception {
// on the cluster, it would be tricky to solve that problem in a generic way.
livyConf.set(RPC_SERVER_ADDRESS, null);

// If we are running on Kubernetes, set RPC_SERVER_ADDRESS from "spark.driver.host" option,
// which is set in class org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep:
// line 61: val driverHostname = s"$resolvedServiceName.${kubernetesConf.namespace()}.svc"
if (livyConf.isRunningOnKubernetes()) {
livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By some reason this version does not work for me like it was with the same piece from #167.

I got the same exception as here: #167 (comment)

I'm not sure why, but it seems like it's because the livy.spark.master property is not set inside of Spark Driver of Livy Session on my env.
While the same piece from #167 works fine:

    if (conf.get("spark.master").startsWith("k8s")) {
      livyConf.set(RPC_SERVER_ADDRESS, conf.get("spark.driver.host"));
    }

But I'm not using code from your branch, but rather backporting your patch to our Livy build which is based on older Livy version, so maybe it's the cause.

On the other hand, during quick lookup I've not found any code that bypass livy.spark.master property into driver, so I see no reason why it should work here.
If I get a chance to test this issue on build from this PR, I'll share what get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be related to the backporting indeed. Will be happy to help once you get more debug info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to use the latest Livy with your patches and the issue has not appeared. So it seems like I got that problem because I've backported something wrong.
Anyway, thank you for help!

if (livyConf.getBoolean(TEST_STUCK_START_DRIVER)) {
// Test flag is turned on so we will just infinite loop here. It should cause
// timeout and we should still see yarn application being cleaned up.
Expand Down
5 changes: 5 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>metrics-healthchecks</artifactId>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
</dependency>

<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
38 changes: 38 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,35 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")

// Kubernetes oauth token file path.
val KUBERNETES_OAUTH_TOKEN_FILE = Entry("livy.server.kubernetes.oauthTokenFile", "")
// Kubernetes oauth token string value.
val KUBERNETES_OAUTH_TOKEN_VALUE = Entry("livy.server.kubernetes.oauthTokenValue", "")
// Kubernetes CA cert file path.
val KUBERNETES_CA_CERT_FILE = Entry("livy.server.kubernetes.caCertFile", "")
// Kubernetes client key file path.
val KUBERNETES_CLIENT_KEY_FILE = Entry("livy.server.kubernetes.clientKeyFile", "")
// Kubernetes client cert file path.
val KUBERNETES_CLIENT_CERT_FILE = Entry("livy.server.kubernetes.clientCertFile", "")
// Kubernetes client default namespace.
val KUBERNETES_DEFAULT_NAMESPACE = Entry("livy.server.kubernetes.defaultNamespace", "")

// Comma-separated list of the Kubernetes namespaces to allow for applications creation.
// All namespaces are allowed if empty.
val KUBERNETES_ALLOWED_NAMESPACES = Entry("livy.server.kubernetes.allowedNamespaces", null)

// If Livy can't find the Kubernetes app within this time, consider it lost.
val KUBERNETES_APP_LOOKUP_TIMEOUT = Entry("livy.server.kubernetes.app-lookup-timeout", "600s")
// How often Livy polls Kubernetes to refresh Kubernetes app state.
val KUBERNETES_POLL_INTERVAL = Entry("livy.server.kubernetes.poll-interval", "15s")

// How long to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_TIMEOUT =
Entry("livy.server.kubernetes.app-leakage.check-timeout", "600s")
// How often to check livy session leakage.
val KUBERNETES_APP_LEAKAGE_CHECK_INTERVAL =
Entry("livy.server.kubernetes.app-leakage.check-interval", "60s")

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down Expand Up @@ -364,6 +393,15 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")

/** Return true if spark master starts with k8s. */
def isRunningOnKubernetes(): Boolean = sparkMaster().startsWith("k8s")

/** Return Kubernetes namespace or all if not set. */
def getKubernetesNamespaces(): Set[String] =
Option(get(KUBERNETES_ALLOWED_NAMESPACES)).filterNot(_.isEmpty)
.map(_.split(",").toSet)
.getOrElse(Set.empty)

/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

Expand Down
13 changes: 8 additions & 5 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManag
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
import org.apache.livy.utils.{SparkKubernetesApp, SparkYarnApp}
import org.apache.livy.utils.LivySparkUtils._
import org.apache.livy.utils.SparkYarnApp

class LivyServer extends Logging {

Expand Down Expand Up @@ -142,10 +142,13 @@ class LivyServer extends Logging {

testRecovery(livyConf)

// Initialize YarnClient ASAP to save time.
// Initialize YarnClient or KubernetesClient ASAP to save time.
if (livyConf.isRunningOnYarn()) {
SparkYarnApp.init(livyConf)
Future { SparkYarnApp.yarnClient }
} else if (livyConf.isRunningOnKubernetes()) {
SparkKubernetesApp.init(livyConf)
Future { SparkKubernetesApp.kubernetesClient }
}

if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
Expand Down Expand Up @@ -415,10 +418,10 @@ class LivyServer extends Logging {
}

private[livy] def testRecovery(livyConf: LivyConf): Unit = {
if (!livyConf.isRunningOnYarn()) {
// If recovery is turned on but we are not running on YARN, quit.
if (!livyConf.isRunningOnYarn() && !livyConf.isRunningOnKubernetes()) {
// If recovery is turned on but we are not running on YARN or Kubernetes, quit.
require(livyConf.get(LivyConf.RECOVERY_MODE) == SESSION_RECOVERY_MODE_OFF,
"Session recovery requires YARN.")
"Session recovery requires YARN or Kubernetes.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,9 @@ class InteractiveSession(
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))

if (livyConf.isRunningOnYarn() || driverProcess.isDefined) {
if (livyConf.isRunningOnYarn() || driverProcess.isDefined
// Create SparkKubernetesApp anyway to recover app monitoring on Livy server restart
|| livyConf.isRunningOnKubernetes()) {
Some(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is just the same line as 404 why is it in it's own else if block? Wouldn't it make more sense to add || livyConf.isRunningOnKubernetes() to the if on line 403?

Copy link
Contributor Author

@jahstreet jahstreet Jan 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, nice catch, agree. EDIT: resolved

} else {
None
Expand Down Expand Up @@ -540,6 +542,8 @@ class InteractiveSession(
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
// We need to call #kill here explicitly to delete Interactive pods from the cluster
if (livyConf.isRunningOnKubernetes()) app.foreach(_.kill())
} catch {
case _: Exception =>
app.foreach {
Expand Down
30 changes: 28 additions & 2 deletions server/src/main/scala/org/apache/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,36 @@ object SparkApp {
sparkConf ++ Map(
SPARK_YARN_TAG_KEY -> mergedYarnTags,
"spark.yarn.submit.waitAppCompletion" -> "false")
} else if (livyConf.isRunningOnKubernetes()) {

// We don't allow to submit applications to the namespaces different from the configured
val kubernetesNamespaces = livyConf.getKubernetesNamespaces()
val targetNamespace = sparkConf.getOrElse("spark.kubernetes.namespace",
SparkKubernetesApp.kubernetesClient.getDefaultNamespace)
if (kubernetesNamespaces.nonEmpty && !kubernetesNamespaces.contains(targetNamespace)) {
throw new IllegalArgumentException(
s"Requested namespace $targetNamespace doesn't match the configured: " +
kubernetesNamespaces.mkString(", "))
}

import KubernetesConstants._
sparkConf ++ Map(
"spark.kubernetes.namespace" -> targetNamespace,
// Mark Spark pods with the unique appTag label to be used for their discovery
s"spark.kubernetes.driver.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
s"spark.kubernetes.executor.label.$SPARK_APP_TAG_LABEL" -> uniqueAppTag,
// Mark Spark pods as created by Livy for the additional tracing
s"spark.kubernetes.driver.label.$CREATED_BY_ANNOTATION" -> "livy",
s"spark.kubernetes.executor.label.$CREATED_BY_ANNOTATION" -> "livy",
"spark.kubernetes.submission.waitAppCompletion" -> "false")
} else {
sparkConf
}
}

/**
* Return a SparkApp object to control the underlying Spark application via YARN or spark-submit.
* Return a SparkApp object to control the underlying Spark application via YARN, Kubernetes
* or spark-submit.
*
* @param uniqueAppTag A tag that can uniquely identify the application.
*/
Expand All @@ -89,8 +112,11 @@ object SparkApp {
listener: Option[SparkAppListener]): SparkApp = {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else if (livyConf.isRunningOnKubernetes()) {
new SparkKubernetesApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
require(process.isDefined, "process must not be None when Livy master is not YARN.")
require(process.isDefined, "process must not be None when Livy master is not YARN or " +
"Kubernetes.")
new SparkProcApp(process.get, listener)
}
}
Expand Down
Loading