Skip to content

Commit

Permalink
Improve docker tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanbuk-db committed Mar 14, 2024
1 parent 9986462 commit d9527b4
Showing 1 changed file with 116 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.sql.{Connection, DriverManager}
import java.util.Properties
import java.util.concurrent.TimeUnit

import scala.annotation.tailrec
import scala.concurrent.TimeoutException
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
Expand All @@ -38,8 +39,8 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.{DockerUtils, Utils}
import org.apache.spark.util.Utils.timeStringAsSeconds
import org.apache.spark.util.DockerUtils
import org.apache.spark.util.Utils.{bytesToString, timeStringAsSeconds}

abstract class DatabaseOnDocker {
/**
Expand Down Expand Up @@ -129,104 +130,129 @@ abstract class DockerJDBCIntegrationSuite
private var pulled: Boolean = false
protected var jdbcUrl: String = _

@tailrec private def retry_helper[T](n: Int)(body: => T): T = {
try body
catch {
case e: Throwable =>
if (n > 0) {
logWarning(e.getMessage, e)
logInfo(s"\n\n===== RETRYING =====\n")
retry_helper(n - 1)(body)
}
else throw e
}
}

override def beforeAll(): Unit = runIfTestsEnabled(s"Prepare for ${this.getClass.getName}") {
super.beforeAll()
try {
val config = DefaultDockerClientConfig.createDefaultConfigBuilder.build
val httpClient = new ZerodepDockerHttpClient.Builder()
.dockerHost(config.getDockerHost)
.sslConfig(config.getSSLConfig)
.build()
docker = DockerClientImpl.getInstance(config, httpClient)
// Check that Docker is actually up
retry_helper(5) {
super.beforeAll()
try {
docker.pingCmd().exec()
} catch {
case NonFatal(e) =>
log.error("Exception while connecting to Docker. Check whether Docker is running.")
throw e
}
try {
// Ensure that the Docker image is installed:
docker.inspectImageCmd(db.imageName).exec()
} catch {
case e: NotFoundException =>
log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
val callback = new PullImageResultCallback {
override def onNext(item: PullResponseItem): Unit = {
super.onNext(item)
val status = item.getStatus
if (status != null && status != "Downloading" && status != "Extracting") {
logInfo(s"$status ${item.getId}")
val config = DefaultDockerClientConfig.createDefaultConfigBuilder.build
val httpClient = new ZerodepDockerHttpClient.Builder()
.dockerHost(config.getDockerHost)
.sslConfig(config.getSSLConfig)
.build()
docker = DockerClientImpl.getInstance(config, httpClient)
// Check that Docker is actually up
try {
docker.pingCmd().exec()
} catch {
case NonFatal(e) =>
log.error("Exception while connecting to Docker. Check whether Docker is running.")
throw e
}
try {
// Ensure that the Docker image is installed:
docker.inspectImageCmd(db.imageName).exec()
} catch {
case e: NotFoundException =>
log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
val callback = new PullImageResultCallback {
override def onNext(item: PullResponseItem): Unit = {
super.onNext(item)
if (item.getStatus != null) {
item.getStatus match {
case s if item.getProgressDetail != null &&
item.getProgressDetail.getCurrent != null &&
item.getProgressDetail.getCurrent == item.getProgressDetail.getTotal =>
// logging for final progress procedural status
logInfo(s"$s ${item.getId} ${bytesToString(item.getProgressDetail.getTotal)}")
case s if s != "Downloading" && s != "Extracting" =>
logInfo(s"${item.getStatus} ${item.getId}")
case _ =>
}
}
}
}
}

val (success, time) = Utils.timeTakenMs(
override def onComplete(): Unit = {
pulled = true
}

override def onError(throwable: Throwable): Unit = {
logError(s"Failed to pull Docker image ${db.imageName}", throwable)
}
}
docker.pullImageCmd(db.imageName)
.exec(callback)
.awaitCompletion(imagePullTimeout, TimeUnit.SECONDS))

if (success) {
pulled = success
logInfo(s"Successfully pulled image ${db.imageName} in $time ms")
} else {
throw new TimeoutException(
s"Timeout('$imagePullTimeout secs') waiting for image ${db.imageName} to be pulled")
}
}
.awaitCompletion(imagePullTimeout, TimeUnit.SECONDS)
if (!pulled) {
throw new TimeoutException(
s"Timeout('$imagePullTimeout secs') waiting for image ${db.imageName} to be pulled")
}
}

val hostConfig = HostConfig
.newHostConfig()
.withNetworkMode("bridge")
.withPrivileged(db.privileged)
.withPortBindings(PortBinding.parse(s"$externalPort:${db.jdbcPort}"))
val hostConfig = HostConfig
.newHostConfig()
.withNetworkMode("bridge")
.withPrivileged(db.privileged)
.withPortBindings(PortBinding.parse(s"$externalPort:${db.jdbcPort}"))

if (db.usesIpc) {
hostConfig.withIpcMode("host")
}
if (db.usesIpc) {
hostConfig.withIpcMode("host")
}

val containerConfig = new ContainerConfig()
val containerConfig = new ContainerConfig()

db.beforeContainerStart(hostConfig, containerConfig)
db.beforeContainerStart(hostConfig, containerConfig)

// Create the database container:
val createContainerCmd = docker.createContainerCmd(db.imageName)
.withHostConfig(hostConfig)
.withExposedPorts(ExposedPort.tcp(db.jdbcPort))
.withEnv(db.env.map { case (k, v) => s"$k=$v" }.toList.asJava)
.withNetworkDisabled(false)
// Create the database container:
val createContainerCmd = docker.createContainerCmd(db.imageName)
.withHostConfig(hostConfig)
.withExposedPorts(ExposedPort.tcp(db.jdbcPort))
.withEnv(db.env.map { case (k, v) => s"$k=$v" }.toList.asJava)
.withNetworkDisabled(false)


db.getEntryPoint.foreach(ep => createContainerCmd.withEntrypoint(ep))
db.getStartupProcessName.foreach(n => createContainerCmd.withCmd(n))
db.getEntryPoint.foreach(ep => createContainerCmd.withEntrypoint(ep))
db.getStartupProcessName.foreach(n => createContainerCmd.withCmd(n))

container = createContainerCmd.exec()
// Start the container and wait until the database can accept JDBC connections:
docker.startContainerCmd(container.getId).exec()
eventually(timeout(startContainerTimeout.seconds), interval(1.second)) {
val response = docker.inspectContainerCmd(container.getId).exec()
assert(response.getState.getRunning)
}
jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
var conn: Connection = null
eventually(connectionTimeout, interval(1.second)) {
conn = getConnection()
}
// Run any setup queries:
try {
dataPreparation(conn)
} finally {
conn.close()
}
} catch {
case NonFatal(e) =>
logError(s"Failed to initialize Docker container for ${this.getClass.getName}", e)
container = createContainerCmd.exec()
// Start the container and wait until the database can accept JDBC connections:
docker.startContainerCmd(container.getId).exec()
eventually(timeout(startContainerTimeout.seconds), interval(1.second)) {
val response = docker.inspectContainerCmd(container.getId).exec()
assert(response.getState.getRunning)
}
jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
var conn: Connection = null
eventually(connectionTimeout, interval(1.second)) {
conn = getConnection()
}
// Run any setup queries:
try {
afterAll()
dataPreparation(conn)
} finally {
throw e
conn.close()
}
} catch {
case NonFatal(e) =>
logError(s"Failed to initialize Docker container for ${this.getClass.getName}", e)
try {
afterAll()
} finally {
throw e
}
}
}
}

Expand Down Expand Up @@ -265,9 +291,13 @@ abstract class DockerJDBCIntegrationSuite
logWarning(s"Could not stop container $container at stage '$status'", e)
} finally {
logContainerOutput()
docker.removeContainerCmd(container.getId).exec()
if (removePulledImage && pulled) {
docker.removeImageCmd(db.imageName).exec()
try {
docker.removeContainerCmd(container.getId).exec()
if (removePulledImage && pulled) {
docker.removeImageCmd(db.imageName).exec()
}
} catch {
case _: Exception => ()
}
}
}
Expand Down

0 comments on commit d9527b4

Please sign in to comment.