diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index 13db5844c6043..501dd40db79cc 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -22,6 +22,7 @@ import java.sql.{Connection, DriverManager} import java.util.Properties import java.util.concurrent.TimeUnit +import scala.annotation.tailrec import scala.concurrent.TimeoutException import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal @@ -38,8 +39,8 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.{DockerUtils, Utils} -import org.apache.spark.util.Utils.timeStringAsSeconds +import org.apache.spark.util.DockerUtils +import org.apache.spark.util.Utils.{bytesToString, timeStringAsSeconds} abstract class DatabaseOnDocker { /** @@ -129,104 +130,129 @@ abstract class DockerJDBCIntegrationSuite private var pulled: Boolean = false protected var jdbcUrl: String = _ + @tailrec private def retry_helper[T](n: Int)(body: => T): T = { + try body + catch { + case e: Throwable => + if (n > 0) { + logWarning(e.getMessage, e) + logInfo(s"\n\n===== RETRYING =====\n") + retry_helper(n - 1)(body) + } + else throw e + } + } + override def beforeAll(): Unit = runIfTestsEnabled(s"Prepare for ${this.getClass.getName}") { - super.beforeAll() - try { - val config = DefaultDockerClientConfig.createDefaultConfigBuilder.build - val httpClient = new ZerodepDockerHttpClient.Builder() - .dockerHost(config.getDockerHost) - .sslConfig(config.getSSLConfig) - .build() - docker = DockerClientImpl.getInstance(config, httpClient) - // Check that Docker is actually up + retry_helper(5) { + super.beforeAll() try { - docker.pingCmd().exec() - } catch { - case NonFatal(e) => - log.error("Exception while connecting to Docker. Check whether Docker is running.") - throw e - } - try { - // Ensure that the Docker image is installed: - docker.inspectImageCmd(db.imageName).exec() - } catch { - case e: NotFoundException => - log.warn(s"Docker image ${db.imageName} not found; pulling image from registry") - val callback = new PullImageResultCallback { - override def onNext(item: PullResponseItem): Unit = { - super.onNext(item) - val status = item.getStatus - if (status != null && status != "Downloading" && status != "Extracting") { - logInfo(s"$status ${item.getId}") + val config = DefaultDockerClientConfig.createDefaultConfigBuilder.build + val httpClient = new ZerodepDockerHttpClient.Builder() + .dockerHost(config.getDockerHost) + .sslConfig(config.getSSLConfig) + .build() + docker = DockerClientImpl.getInstance(config, httpClient) + // Check that Docker is actually up + try { + docker.pingCmd().exec() + } catch { + case NonFatal(e) => + log.error("Exception while connecting to Docker. Check whether Docker is running.") + throw e + } + try { + // Ensure that the Docker image is installed: + docker.inspectImageCmd(db.imageName).exec() + } catch { + case e: NotFoundException => + log.warn(s"Docker image ${db.imageName} not found; pulling image from registry") + val callback = new PullImageResultCallback { + override def onNext(item: PullResponseItem): Unit = { + super.onNext(item) + if (item.getStatus != null) { + item.getStatus match { + case s if item.getProgressDetail != null && + item.getProgressDetail.getCurrent != null && + item.getProgressDetail.getCurrent == item.getProgressDetail.getTotal => + // logging for final progress procedural status + logInfo(s"$s ${item.getId} ${bytesToString(item.getProgressDetail.getTotal)}") + case s if s != "Downloading" && s != "Extracting" => + logInfo(s"${item.getStatus} ${item.getId}") + case _ => + } + } } - } - } - val (success, time) = Utils.timeTakenMs( + override def onComplete(): Unit = { + pulled = true + } + + override def onError(throwable: Throwable): Unit = { + logError(s"Failed to pull Docker image ${db.imageName}", throwable) + } + } docker.pullImageCmd(db.imageName) .exec(callback) - .awaitCompletion(imagePullTimeout, TimeUnit.SECONDS)) - - if (success) { - pulled = success - logInfo(s"Successfully pulled image ${db.imageName} in $time ms") - } else { - throw new TimeoutException( - s"Timeout('$imagePullTimeout secs') waiting for image ${db.imageName} to be pulled") - } - } + .awaitCompletion(imagePullTimeout, TimeUnit.SECONDS) + if (!pulled) { + throw new TimeoutException( + s"Timeout('$imagePullTimeout secs') waiting for image ${db.imageName} to be pulled") + } + } - val hostConfig = HostConfig - .newHostConfig() - .withNetworkMode("bridge") - .withPrivileged(db.privileged) - .withPortBindings(PortBinding.parse(s"$externalPort:${db.jdbcPort}")) + val hostConfig = HostConfig + .newHostConfig() + .withNetworkMode("bridge") + .withPrivileged(db.privileged) + .withPortBindings(PortBinding.parse(s"$externalPort:${db.jdbcPort}")) - if (db.usesIpc) { - hostConfig.withIpcMode("host") - } + if (db.usesIpc) { + hostConfig.withIpcMode("host") + } - val containerConfig = new ContainerConfig() + val containerConfig = new ContainerConfig() - db.beforeContainerStart(hostConfig, containerConfig) + db.beforeContainerStart(hostConfig, containerConfig) - // Create the database container: - val createContainerCmd = docker.createContainerCmd(db.imageName) - .withHostConfig(hostConfig) - .withExposedPorts(ExposedPort.tcp(db.jdbcPort)) - .withEnv(db.env.map { case (k, v) => s"$k=$v" }.toList.asJava) - .withNetworkDisabled(false) + // Create the database container: + val createContainerCmd = docker.createContainerCmd(db.imageName) + .withHostConfig(hostConfig) + .withExposedPorts(ExposedPort.tcp(db.jdbcPort)) + .withEnv(db.env.map { case (k, v) => s"$k=$v" }.toList.asJava) + .withNetworkDisabled(false) - db.getEntryPoint.foreach(ep => createContainerCmd.withEntrypoint(ep)) - db.getStartupProcessName.foreach(n => createContainerCmd.withCmd(n)) + db.getEntryPoint.foreach(ep => createContainerCmd.withEntrypoint(ep)) + db.getStartupProcessName.foreach(n => createContainerCmd.withCmd(n)) - container = createContainerCmd.exec() - // Start the container and wait until the database can accept JDBC connections: - docker.startContainerCmd(container.getId).exec() - eventually(timeout(startContainerTimeout.seconds), interval(1.second)) { - val response = docker.inspectContainerCmd(container.getId).exec() - assert(response.getState.getRunning) - } - jdbcUrl = db.getJdbcUrl(dockerIp, externalPort) - var conn: Connection = null - eventually(connectionTimeout, interval(1.second)) { - conn = getConnection() - } - // Run any setup queries: - try { - dataPreparation(conn) - } finally { - conn.close() - } - } catch { - case NonFatal(e) => - logError(s"Failed to initialize Docker container for ${this.getClass.getName}", e) + container = createContainerCmd.exec() + // Start the container and wait until the database can accept JDBC connections: + docker.startContainerCmd(container.getId).exec() + eventually(timeout(startContainerTimeout.seconds), interval(1.second)) { + val response = docker.inspectContainerCmd(container.getId).exec() + assert(response.getState.getRunning) + } + jdbcUrl = db.getJdbcUrl(dockerIp, externalPort) + var conn: Connection = null + eventually(connectionTimeout, interval(1.second)) { + conn = getConnection() + } + // Run any setup queries: try { - afterAll() + dataPreparation(conn) } finally { - throw e + conn.close() } + } catch { + case NonFatal(e) => + logError(s"Failed to initialize Docker container for ${this.getClass.getName}", e) + try { + afterAll() + } finally { + throw e + } + } } } @@ -265,9 +291,13 @@ abstract class DockerJDBCIntegrationSuite logWarning(s"Could not stop container $container at stage '$status'", e) } finally { logContainerOutput() - docker.removeContainerCmd(container.getId).exec() - if (removePulledImage && pulled) { - docker.removeImageCmd(db.imageName).exec() + try { + docker.removeContainerCmd(container.getId).exec() + if (removePulledImage && pulled) { + docker.removeImageCmd(db.imageName).exec() + } + } catch { + case _: Exception => () } } }