diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java index f0e5ae473e12..82b1d533807e 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java @@ -128,9 +128,14 @@ final public T initialized() { connectionProperties, JdbcConnector.getConnectionTimeout(connectionProperties, getDatabaseDriver().getDriverClassName())); this.dslContext = DSLContextFactory.create(dataSource, getSqlDialect()); + initializedPostHook(); return self(); } + public void initializedPostHook() { + + } + final public boolean isInitialized() { return dslContext != null; } diff --git a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java index 4117ea1a4ee5..c7ef532b507b 100644 --- a/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java +++ b/airbyte-integrations/connectors/source-mssql/src/testFixtures/java/io/airbyte/integrations/source/mssql/MsSQLTestDatabase.java @@ -68,11 +68,51 @@ static public MsSQLTestDatabase in(final BaseImage imageName, final ContainerMod .initialized(); } + private class MssqlTestDatabaseBackgroundThread extends Thread { + + private volatile boolean stop = false; + + public void run() { + LOGGER.info(formatLogLine("Started new database " + getDatabaseName())); + boolean wasRunning = false; + while (!stop) { + try { + final var r = query(ctx -> ctx.fetch( + "EXEC master.dbo.xp_servicecontrol 'QueryState', N'SQLServerAGENT';").get(0)); + String agentState = r.getValue(0).toString(); + LOGGER.info(formatLogLine("agentState=" + agentState)); + if ("Running.".equals(agentState)) { + wasRunning = true; + } else if (wasRunning && !"Running.".equals(agentState)) { + LOGGER.info(formatLogLine("agent was running. agentState=" + agentState)); + } + } catch (final Throwable t) { + String exceptionAsString = StringUtils.join(ExceptionUtils.getStackFrames(t), "\n "); + LOGGER.info(formatLogLine("got exception " + exceptionAsString)); + } + try { + Thread.sleep(5l); + } catch (InterruptedException e) { + LOGGER.info(formatLogLine("interrupted")); + } + } + } + + } + + private final MssqlTestDatabaseBackgroundThread bgThread; + public MsSQLTestDatabase(final MSSQLServerContainer container) { super(container); + bgThread = new MssqlTestDatabaseBackgroundThread(); LOGGER.info("SGX creating new database. databaseId=" + this.databaseId + ", databaseName=" + getDatabaseName()); } + @Override + public void initializedPostHook() { + bgThread.start(); + } + public MsSQLTestDatabase withCdc() { return with("EXEC sys.sp_cdc_enable_db;"); } @@ -244,6 +284,11 @@ public synchronized String getCertificate(final CertificateKey certificateKey) { return cachedCerts.get(certificateKey); } + public void close() { + bgThread.stop = true; + super.close(); + } + @Override public MsSQLConfigBuilder configBuilder() { return new MsSQLConfigBuilder(this);