Skip to content

Commit

Permalink
[SPARK-30541][TESTS] Implement KafkaDelegationTokenSuite with testRetry
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
`KafkaDelegationTokenSuite` has been ignored because showed flaky behaviour. In this PR I've changed the approach how the test executed and turning it on again. This PR contains the following:
* The test runs in separate JVM in order to avoid modified security context
* The body of the test runs in `testRetry` which reties if failed
* Additional logs to analyse possible failures
* Enhanced clean-up code

### Why are the changes needed?
`KafkaDelegationTokenSuite ` is ignored.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Executed the test in loop 1k+ times in jenkins (locally much harder to reproduce).

Closes #27877 from gaborgsomogyi/SPARK-30541.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
gaborgsomogyi authored and dongjoon-hyun committed Mar 22, 2020
1 parent 3799d2b commit bf342ba
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 3 deletions.
4 changes: 3 additions & 1 deletion external/kafka-0-10-sql/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN

log4j.logger.org.apache.spark.sql.kafka010.KafkaTestUtils=DEBUG
log4j.logger.org.apache.directory.server.kerberos.kdc.authentication=DEBUG
log4j.logger.org.apache.directory.server.core.DefaultDirectoryService=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class KafkaDelegationTokenSuite extends StreamTest with SharedSparkSession with
}
}

ignore("Roundtrip") {
testRetry("Roundtrip", 3) {
val hadoopConf = new Configuration()
val manager = new HadoopDelegationTokenManager(spark.sparkContext.conf, hadoopConf, null)
val credentials = new Credentials()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class KafkaTestUtils(

kdc.getKrb5conf.delete()
Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8)
logDebug(s"krb5.conf file content: $krb5confStr")
}

private def addedKrb5Config(key: String, value: String): String = {
Expand Down Expand Up @@ -299,6 +300,7 @@ class KafkaTestUtils(
}
brokerReady = false
zkReady = false
kdcReady = false

if (producer != null) {
producer.close()
Expand All @@ -307,6 +309,7 @@ class KafkaTestUtils(

if (adminClient != null) {
adminClient.close()
adminClient = null
}

if (server != null) {
Expand Down Expand Up @@ -341,6 +344,7 @@ class KafkaTestUtils(
Configuration.getConfiguration.refresh()
if (kdc != null) {
kdc.stop()
kdc = null
}
UserGroupInformation.reset()
SecurityUtils.setGlobalKrbDebug(false)
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ object SparkParallelTestGrouping {
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
"org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2ListenerSuite",
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite"
"org.apache.spark.sql.hive.thriftserver.ThriftServerWithSparkContextSuite",
"org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite"
)

private val DEFAULT_TEST_GROUP = "default_test_group"
Expand Down

0 comments on commit bf342ba

Please sign in to comment.