diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java index f8ef633ae989..dd1eb2a10578 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/java/io/airbyte/cdk/testutils/TestDatabase.java @@ -24,10 +24,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Stream; import javax.sql.DataSource; import org.jooq.DSLContext; import org.jooq.SQLDialect; +import org.jooq.exception.DataAccessException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.JdbcDatabaseContainer; @@ -87,7 +87,7 @@ public T onClose(String fmtSql, Object... fmtArgs) { * Executes a SQL statement after calling String.format on the arguments. */ public T with(String fmtSql, Object... fmtArgs) { - execSQL(Stream.of(String.format(fmtSql, fmtArgs))); + execSQL(List.of(String.format(fmtSql, fmtArgs))); return self(); } @@ -113,9 +113,9 @@ final public boolean isInitialized() { return dslContext != null; } - abstract protected Stream> inContainerBootstrapCmd(); + abstract protected List> inContainerBootstrapCmd(); - abstract protected Stream inContainerUndoBootstrapCmd(); + abstract protected List inContainerUndoBootstrapCmd(); abstract public DatabaseDriver getDatabaseDriver(); @@ -167,22 +167,17 @@ public Database getDatabase() { return new Database(getDslContext()); } - protected void execSQL(final Stream sql) { + protected void execSQL(final List sqls) { try { - getDatabase().query(ctx -> { - sql.forEach(statement -> { - LOGGER.debug("{}", statement); - ctx.execute(statement); - }); - return null; - }); - } catch (SQLException e) { + for (String sql : sqls) { + getDslContext().execute(sql); + } + } catch (DataAccessException e) { throw new RuntimeException(e); } } - protected void execInContainer(Stream cmds) { - final List cmd = cmds.toList(); + protected void execInContainer(List cmd) { if (cmd.isEmpty()) { return; } @@ -232,7 +227,7 @@ public B integrationTestConfigBuilder() { @Override public void close() { - execSQL(this.cleanupSQL.stream()); + execSQL(this.cleanupSQL); execInContainer(inContainerUndoBootstrapCmd()); } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java index 5d4dcb3e68d0..0a5429f18cdf 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/java/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.airbyte.cdk.db.factory.DatabaseDriver; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.db.jdbc.streaming.AdaptiveStreamingQueryConfig; @@ -20,10 +21,10 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType; import java.sql.JDBCType; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Stream; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -140,24 +141,27 @@ public BareBonesTestDatabase(PostgreSQLContainer container) { } @Override - protected Stream> inContainerBootstrapCmd() { - final var sql = Stream.of( + protected List> inContainerBootstrapCmd() { + final var sqls = List.of( String.format("CREATE DATABASE %s", getDatabaseName()), String.format("CREATE USER %s PASSWORD '%s'", getUserName(), getPassword()), String.format("GRANT ALL PRIVILEGES ON DATABASE %s TO %s", getDatabaseName(), getUserName()), String.format("ALTER USER %s WITH SUPERUSER", getUserName())); - return Stream.of(Stream.concat( - Stream.of("psql", - "-d", getContainer().getDatabaseName(), - "-U", getContainer().getUsername(), - "-v", "ON_ERROR_STOP=1", - "-a"), - sql.flatMap(stmt -> Stream.of("-c", stmt)))); + List cmd = Lists.newArrayList("psql", + "-d", getContainer().getDatabaseName(), + "-U", getContainer().getUsername(), + "-v", "ON_ERROR_STOP=1", + "-a"); + for (String sql : sqls) { + cmd.add("-c"); + cmd.add(sql); + } + return List.of(cmd); } @Override - protected Stream inContainerUndoBootstrapCmd() { - return Stream.empty(); + protected List inContainerUndoBootstrapCmd() { + return Collections.emptyList(); } @Override