Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZEPPELIN-6016] Rewrite and enable Livy integration tests #4743

Merged
merged 15 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,11 @@ jobs:
rm -rf spark/interpreter/metastore_db
./mvnw verify -pl spark-submit,spark/interpreter -am -Dtest=org/apache/zeppelin/spark/* -Pspark-3.5 -Pspark-scala-2.13 -Phadoop3 -Pintegration -DfailIfNoTests=false ${MAVEN_ARGS}

livy-0-7-with-spark-3-4-1-under-python3:
# The version combination is based on the facts:
# 1. official Livy 0.8 binary tarball is built against Spark 2.4
# 2. official Spark 2.4 binary tarball is built against Scala 2.11
# 3. Spark 2.4 support Python 2.7, 3.4 to 3.7
livy-0-8-with-spark-2-4-under-python37:
runs-on: ubuntu-20.04
steps:
- name: Checkout
Expand All @@ -449,14 +453,14 @@ jobs:
- name: install environment
run: |
./mvnw install -DskipTests -pl livy -am ${MAVEN_ARGS}
./testing/downloadSpark.sh "3.4.1" "3"
./testing/downloadLivy.sh "0.7.1-incubating"
- name: Setup conda environment with python 3.9 and R
./testing/downloadSpark.sh "2.4.8" "2.7"
./testing/downloadLivy.sh "0.8.0-incubating" "2.11"
- name: Setup conda environment with python 3.7 and R
uses: conda-incubator/setup-miniconda@v2
with:
activate-environment: python_3_with_R
environment-file: testing/env_python_3_with_R.yml
python-version: 3.9
activate-environment: python_37_with_R
environment-file: testing/env_python_3.7_with_R.yml
python-version: 3.7
miniforge-variant: Mambaforge
channels: conda-forge,defaults
channel-priority: true
Expand All @@ -466,7 +470,10 @@ jobs:
run: |
R -e "IRkernel::installspec()"
- name: run tests
run: ./mvnw verify -pl livy -am ${MAVEN_ARGS}
run: |
export SPARK_HOME=$PWD/spark-2.4.8-bin-hadoop2.7
export LIVY_HOME=$PWD/apache-livy-0.8.0-incubating_2.11-bin
./mvnw verify -pl livy -am ${MAVEN_ARGS}

default-build:
runs-on: ubuntu-20.04
Expand Down
9 changes: 4 additions & 5 deletions livy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
Livy interpreter for Apache Zeppelin

# Prerequisities
You can follow the instructions at [Livy Quick Start](http://livy.io/quickstart.html) to set up livy.
You can follow the instructions at [Livy Get Started](https://livy.apache.org/get-started/) to set up livy.

# Run Integration Tests
You can add integration test to [LivyInterpreter.java](https://github.com/apache/zeppelin/blob/master/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java) and run the integration test either via the CI environment or locally. You need to download livy-0.2 and spark-1.5.2 to local, then use the following script to run the integration test.
You can add integration test to [LivyInterpreter.java](https://github.com/apache/zeppelin/blob/master/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java) and run the integration test either via the CI environment or locally. You need to download livy-0.8 and spark-2.4.8 to local, then use the following script to run the integration test.

```bash
#!/usr/bin/env bash
export LIVY_HOME=<path_of_livy_0.2.0>
export SPARK_HOME=<path_of_spark-1.5.2>
export LIVY_HOME=<path_of_livy_0.8.0>
export SPARK_HOME=<path_of_spark-2.4.8>
./mvnw clean verify -pl livy -DfailIfNoTests=false
```
116 changes: 4 additions & 112 deletions livy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,107 +37,9 @@
<commons.exec.version>1.3</commons.exec.version>
<spring.web.version>4.3.0.RELEASE</spring.web.version>
<spring.security.kerberosclient>1.0.1.RELEASE</spring.security.kerberosclient>

<!--test library versions-->
<livy.version>0.7.1-incubating</livy.version>
<spark.version>2.4.8</spark.version>
<hadoop.version>${hadoop3.3.version}</hadoop.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-integration-test</artifactId>
<version>${livy.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<artifactId>hadoop-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-client</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-server-tests</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-test-lib</artifactId>
<version>${livy.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
Expand Down Expand Up @@ -172,26 +74,16 @@
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<version>${hadoop.version}</version>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<version>${hadoop.version}</version>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
64 changes: 16 additions & 48 deletions livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.zeppelin.livy;

import org.apache.commons.io.IOUtils;
import org.apache.livy.test.framework.Cluster;
import org.apache.livy.test.framework.Cluster$;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
Expand All @@ -31,9 +29,7 @@
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,46 +44,23 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

@Disabled("FIXME: temporarily disable the broken tests")
public class LivyInterpreterIT {
private static final Logger LOGGER = LoggerFactory.getLogger(LivyInterpreterIT.class);
private static Cluster cluster;
public class LivyInterpreterIT extends WithLivyServer {
private static final Logger LOG = LoggerFactory.getLogger(LivyInterpreterIT.class);
private static Properties properties;

@BeforeAll
public static void setUp() {
public static void beforeAll() throws IOException {
if (!checkPreCondition()) {
return;
}
cluster = Cluster$.MODULE$.get();
LOGGER.info("Starting livy at {}", cluster.livyEndpoint());
WithLivyServer.beforeAll();
properties = new Properties();
properties.setProperty("zeppelin.livy.url", cluster.livyEndpoint());
properties.setProperty("zeppelin.livy.url", LIVY_ENDPOINT);
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "100");
properties.setProperty("zeppelin.livy.displayAppInfo", "false");
}

@AfterAll
public static void tearDown() {
if (cluster != null) {
LOGGER.info("Shutting down livy at {}", cluster.livyEndpoint());
cluster.cleanUp();
}
}

public static boolean checkPreCondition() {
if (System.getenv("LIVY_HOME") == null) {
LOGGER.warn(("livy integration is skipped because LIVY_HOME is not set"));
return false;
}
if (System.getenv("SPARK_HOME") == null) {
LOGGER.warn(("livy integration is skipped because SPARK_HOME is not set"));
return false;
}
return true;
}


@Test
void testSparkInterpreter() throws InterpreterException {
Expand Down Expand Up @@ -141,7 +114,6 @@ private void testRDD(final LivySparkInterpreter sparkInterpreter, boolean isSpar
.setAuthenticationInfo(authInfo)
.setInterpreterOut(output)
.build();
;

InterpreterResult result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
Expand Down Expand Up @@ -294,11 +266,10 @@ private void testDataFrame(LivySparkInterpreter sparkInterpreter,
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());

if (!isSpark2) {
assertTrue(result.message().get(0).getData().contains("Table not found"));
} else {
assertTrue(result.message().get(0).getData().contains("Table or view not found"));
}
String errMsg = result.message().get(0).getData();
assertTrue(errMsg.contains("Table not found") ||
errMsg.contains("Table or view not found") ||
errMsg.contains("TABLE_OR_VIEW_NOT_FOUND"));

// test sql cancel
if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
Expand Down Expand Up @@ -431,7 +402,7 @@ void testPySparkInterpreter() throws InterpreterException {
assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]")
|| result.message().get(0).getData().contains("[Row(_1='hello', _2=20)]"));
} else {
result = pysparkInterpreter.interpret("df=spark.createDataFrame([(\"hello\",20)])\n"
result = pysparkInterpreter.interpret("df=spark.createDataFrame([('hello',20)])\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(1, result.message().size());
Expand Down Expand Up @@ -485,15 +456,14 @@ public void run() {
}

@Test
void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation()
void testSparkInterpreterStringWithoutTruncation()
throws InterpreterException {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
Properties properties2 = new Properties(properties);
properties2.put("zeppelin.livy.displayAppInfo", "true");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that Livy can not extract appId for Spark local mode, and seems no workaround without touching Livy code, I abandoned to fix it after some struggling, just delete.

// enable spark ui because it is disabled by livy integration test
properties2.put("livy.spark.ui.enabled", "true");
properties2.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
Expand All @@ -519,21 +489,19 @@ void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation()
try {
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
// check yarn appId and ensure it is not null
assertTrue(result.message().get(1).getData().contains("Spark Application Id: application_"));
assertEquals(1, result.message().size(), result.toString());

// html output
String htmlCode = "println(\"%html <h1> hello </h1>\")";
result = sparkInterpreter.interpret(htmlCode, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
assertEquals(1, result.message().size());
assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType());

// detect spark version
result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
assertEquals(1, result.message().size());

boolean isSpark2 = isSpark2(sparkInterpreter, context);

Expand All @@ -552,7 +520,7 @@ void testSparkInterpreterWithDisplayAppInfo_StringWithoutTruncation()
+ ".toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(2, result.message().size());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
Expand Down Expand Up @@ -673,7 +641,7 @@ void testLivyParams() throws InterpreterException {
try {
InterpreterResult result = sparkInterpreter.interpret("sc.version\n" +
"assert(sc.getConf.get(\"spark.executor.cores\") == \"4\" && " +
"sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")"
"sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")"
, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(1, result.message().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LivySQLInterpreterTest {
private LivySparkSQLInterpreter sqlInterpreter;

@BeforeEach
public void setUp() {
public void beforeEach() {
Properties properties = new Properties();
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
Expand Down
Loading
Loading