From 3b48c994fab460489095dd3ba07d1e9b638e9489 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 3 Jun 2024 15:00:01 -0700 Subject: [PATCH] prune test hierarchy --- .../destination-redshift/metadata.yaml | 6 +- .../RedshiftDestinationAcceptanceTest.java | 244 ------------------ ...tagingInsertDestinationAcceptanceTest.java | 231 ++++++++++++++++- ...shiftStagingDestinationAcceptanceTest.java | 209 ++++++++++++++- ...RedshiftDestinationBaseAcceptanceTest.java | 219 ---------------- .../AbstractRedshiftTypingDedupingTest.java | 2 +- .../RedshiftSqlGeneratorIntegrationTest.java | 2 +- .../destinations/redshift-migrations.md | 20 +- docs/integrations/destinations/redshift.md | 5 +- 9 files changed, 460 insertions(+), 478 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java delete mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index c24799d19b64..553ae84b43f9 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 2.6.4 + dockerImageTag: 3.0.0 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift @@ -24,6 +24,10 @@ data: This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early. upgradeDeadline: "2024-03-15" + 3.0.0: + message: > + Version 3.0.0 of destination-redshift removes support for the "standard inserts" mode. S3 staging was always preferred for being faster and less expensive, and as part of Airbyte 1.0, we are officially removing the inferior "standard inserts" mode. Upgrading to this version of the destination will require a configuration with an S3 staging area. + upgradeDeadline: "2024-07-31" resourceRequirements: jobSpecific: - jobType: sync diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java deleted file mode 100644 index 195bd54a52f0..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import com.amazon.redshift.util.RedshiftTimestamp; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.cdk.db.Database; -import io.airbyte.cdk.db.factory.ConnectionFactory; -import io.airbyte.cdk.db.factory.DatabaseDriver; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; -import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces; -import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; -import java.io.IOException; -import java.sql.Connection; -import java.sql.SQLException; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.ChronoField; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import org.jooq.impl.DSL; -import org.junit.jupiter.api.parallel.Execution; -import org.junit.jupiter.api.parallel.ExecutionMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// these tests are not yet thread-safe, unlike the DV2 tests. -@Execution(ExecutionMode.SAME_THREAD) -public abstract class RedshiftDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestinationAcceptanceTest.class); - - // config from which to create / delete schemas. - private JsonNode baseConfig; - // config which refers to the schema that the test is being run in. - protected JsonNode config; - private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); - private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); - - private Database database; - private Connection connection; - protected TestDestinationEnv testDestinationEnv; - - @Override - protected String getImageName() { - return "airbyte/destination-redshift:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - - public abstract JsonNode getStaticConfig() throws IOException; - - @Override - protected JsonNode getFailCheckConfig() { - final JsonNode invalidConfig = Jsons.clone(config); - ((ObjectNode) invalidConfig).put("password", "wrong password"); - return invalidConfig; - } - - @Override - protected TestDataComparator getTestDataComparator() { - return new RedshiftTestDataComparator(); - } - - @Override - protected boolean supportBasicDataTypeTest() { - return true; - } - - @Override - protected boolean supportArrayDataTypeTest() { - return true; - } - - @Override - protected boolean supportObjectDataTypeTest() { - return true; - } - - @Override - protected boolean supportIncrementalSchemaChanges() { - return true; - } - - @Override - protected boolean supportsInDestinationNormalization() { - return true; - } - - @Override - protected List retrieveRecords(final TestDestinationEnv env, - final String streamName, - final String namespace, - final JsonNode streamSchema) - throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) - .stream() - .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) - .collect(Collectors.toList()); - } - - @Override - protected boolean implementsNamespaces() { - return true; - } - - @Override - protected List retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace) - throws Exception { - String tableName = namingResolver.getIdentifier(streamName); - if (!tableName.startsWith("\"")) { - // Currently, Normalization always quote tables identifiers - tableName = "\"" + tableName + "\""; - } - return retrieveRecordsFromTable(tableName, namespace); - } - - private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { - return getDatabase().query( - ctx -> ctx - .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .stream() - .map(record -> getJsonFromRecord( - record, - value -> { - if (value instanceof final RedshiftTimestamp rts) { - // We can't just use rts.toInstant().toString(), because that will mangle historical - // dates (e.g. 1504-02-28...) because toInstant() just converts to epoch millis, - // which works _very badly_ for for very old dates. - // Instead, convert to a string and then parse that string. - // We can't just rts.toString(), because that loses the timezone... - // so instead we use getPostgresqlString and parse that >.> - // Thanks, redshift. - return Optional.of( - ZonedDateTime.parse( - rts.getPostgresqlString(), - new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd HH:mm:ss") - .optionalStart() - .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 9, true) - .optionalEnd() - .appendPattern("X") - .toFormatter()) - .withZoneSameInstant(ZoneOffset.UTC) - .toString()); - } else { - return Optional.empty(); - } - })) - .collect(Collectors.toList())); - } - - // for each test we create a new schema in the database. run the test in there and then remove it. - @Override - protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { - final String schemaName = TestingNamespaces.generate(); - final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); - baseConfig = getStaticConfig(); - database = createDatabase(); - removeOldNamespaces(); - getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); - final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;", - USER_WITHOUT_CREDS, baseConfig.get("password").asText()); - getDatabase().query(ctx -> ctx.execute(createUser)); - final JsonNode configForSchema = Jsons.clone(baseConfig); - ((ObjectNode) configForSchema).put("schema", schemaName); - TEST_SCHEMAS.add(schemaName); - config = configForSchema; - testDestinationEnv = testEnv; - } - - private void removeOldNamespaces() { - final List schemas; - try { - schemas = getDatabase().query(ctx -> ctx.fetch("SELECT schema_name FROM information_schema.schemata;")) - .stream() - .map(record -> record.get("schema_name").toString()) - .toList(); - } catch (final SQLException e) { - // if we can't fetch the schemas, just return. - return; - } - - int schemasDeletedCount = 0; - for (final String schema : schemas) { - if (TestingNamespaces.isOlderThan2Days(schema)) { - try { - getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); - schemasDeletedCount++; - } catch (final SQLException e) { - LOGGER.error("Failed to delete old dataset: {}", schema, e); - } - } - } - LOGGER.info("Deleted {} old schemas.", schemasDeletedCount); - } - - @Override - protected void tearDown(final TestDestinationEnv testEnv) throws Exception { - System.out.println("TEARING_DOWN_SCHEMAS: " + getTestSchemas()); - getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", config.get("schema").asText()))); - for (final String schema : getTestSchemas()) { - getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); - } - getDatabase().query(ctx -> ctx.execute(String.format("drop user if exists %s;", USER_WITHOUT_CREDS))); - RedshiftConnectionHandler.close(connection); - } - - protected Database createDatabase() { - connection = ConnectionFactory.create(baseConfig.get(JdbcUtils.USERNAME_KEY).asText(), - baseConfig.get(JdbcUtils.PASSWORD_KEY).asText(), - RedshiftStagingS3Destination.Companion.getSSL_JDBC_PARAMETERS(), - String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), - baseConfig.get(JdbcUtils.HOST_KEY).asText(), - baseConfig.get(JdbcUtils.PORT_KEY).asInt(), - baseConfig.get(JdbcUtils.DATABASE_KEY).asText())); - - return new Database(DSL.using(connection)); - } - - protected Database getDatabase() { - return database; - } - - @Override - protected int getMaxRecordValueLimit() { - return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java index 0732e6041454..7bf112708464 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java @@ -4,21 +4,250 @@ package io.airbyte.integrations.destination.redshift; +import com.amazon.redshift.util.RedshiftTimestamp; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.cdk.db.Database; +import io.airbyte.cdk.db.factory.ConnectionFactory; +import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces; +import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import java.nio.file.Path; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.jooq.impl.DSL; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Integration test testing {@link RedshiftStagingS3Destination}. The default Redshift integration * test credentials contain S3 credentials - this automatically causes COPY to be selected. */ +// these tests are not yet thread-safe, unlike the DV2 tests. +@Execution(ExecutionMode.SAME_THREAD) @Disabled -public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest { +public class RedshiftS3StagingInsertDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftS3StagingInsertDestinationAcceptanceTest.class); + + // config from which to create / delete schemas. + private JsonNode baseConfig; + // config which refers to the schema that the test is being run in. + protected JsonNode config; + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); + + private Database database; + private Connection connection; + protected TestDestinationEnv testDestinationEnv; + + @Override + protected String getImageName() { + return "airbyte/destination-redshift:dev"; + } + + @Override + protected JsonNode getConfig() { + return config; + } public JsonNode getStaticConfig() { return Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json"))); } + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("password", "wrong password"); + return invalidConfig; + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new RedshiftTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + + @Override + protected boolean supportIncrementalSchemaChanges() { + return true; + } + + @Override + protected boolean supportsInDestinationNormalization() { + return true; + } + + @Override + protected List retrieveRecords(final TestDestinationEnv env, + final String streamName, + final String namespace, + final JsonNode streamSchema) + throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) + .collect(Collectors.toList()); + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace) + throws Exception { + String tableName = namingResolver.getIdentifier(streamName); + if (!tableName.startsWith("\"")) { + // Currently, Normalization always quote tables identifiers + tableName = "\"" + tableName + "\""; + } + return retrieveRecordsFromTable(tableName, namespace); + } + + private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { + return getDatabase().query( + ctx -> ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(record -> getJsonFromRecord( + record, + value -> { + if (value instanceof final RedshiftTimestamp rts) { + // We can't just use rts.toInstant().toString(), because that will mangle historical + // dates (e.g. 1504-02-28...) because toInstant() just converts to epoch millis, + // which works _very badly_ for for very old dates. + // Instead, convert to a string and then parse that string. + // We can't just rts.toString(), because that loses the timezone... + // so instead we use getPostgresqlString and parse that >.> + // Thanks, redshift. + return Optional.of( + ZonedDateTime.parse( + rts.getPostgresqlString(), + new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 9, true) + .optionalEnd() + .appendPattern("X") + .toFormatter()) + .withZoneSameInstant(ZoneOffset.UTC) + .toString()); + } else { + return Optional.empty(); + } + })) + .collect(Collectors.toList())); + } + + // for each test we create a new schema in the database. run the test in there and then remove it. + @Override + protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { + final String schemaName = TestingNamespaces.generate(); + final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); + baseConfig = getStaticConfig(); + database = createDatabase(); + removeOldNamespaces(); + getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); + final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;", + USER_WITHOUT_CREDS, baseConfig.get("password").asText()); + getDatabase().query(ctx -> ctx.execute(createUser)); + final JsonNode configForSchema = Jsons.clone(baseConfig); + ((ObjectNode) configForSchema).put("schema", schemaName); + TEST_SCHEMAS.add(schemaName); + config = configForSchema; + testDestinationEnv = testEnv; + } + + private void removeOldNamespaces() { + final List schemas; + try { + schemas = getDatabase().query(ctx -> ctx.fetch("SELECT schema_name FROM information_schema.schemata;")) + .stream() + .map(record -> record.get("schema_name").toString()) + .toList(); + } catch (final SQLException e) { + // if we can't fetch the schemas, just return. + return; + } + + int schemasDeletedCount = 0; + for (final String schema : schemas) { + if (TestingNamespaces.isOlderThan2Days(schema)) { + try { + getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); + schemasDeletedCount++; + } catch (final SQLException e) { + LOGGER.error("Failed to delete old dataset: {}", schema, e); + } + } + } + LOGGER.info("Deleted {} old schemas.", schemasDeletedCount); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + System.out.println("TEARING_DOWN_SCHEMAS: " + getTestSchemas()); + getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", config.get("schema").asText()))); + for (final String schema : getTestSchemas()) { + getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); + } + getDatabase().query(ctx -> ctx.execute(String.format("drop user if exists %s;", USER_WITHOUT_CREDS))); + RedshiftConnectionHandler.close(connection); + } + + protected Database createDatabase() { + connection = ConnectionFactory.create(baseConfig.get(JdbcUtils.USERNAME_KEY).asText(), + baseConfig.get(JdbcUtils.PASSWORD_KEY).asText(), + RedshiftStagingS3Destination.Companion.getSSL_JDBC_PARAMETERS(), + String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), + baseConfig.get(JdbcUtils.HOST_KEY).asText(), + baseConfig.get(JdbcUtils.PORT_KEY).asInt(), + baseConfig.get(JdbcUtils.DATABASE_KEY).asText())); + + return new Database(DSL.using(connection)); + } + + protected Database getDatabase() { + return database; + } + + @Override + protected int getMaxRecordValueLimit() { + return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; + } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java index 72c992fdb183..8cfb97321f9f 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java @@ -4,32 +4,231 @@ package io.airbyte.integrations.destination.redshift; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import io.airbyte.cdk.db.Database; +import io.airbyte.cdk.db.factory.ConnectionFactory; +import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.base.ssh.SshTunnel; +import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces; +import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.commons.io.IOs; +import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import java.io.IOException; import java.nio.file.Path; +import java.sql.Connection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.jooq.impl.DSL; import org.junit.jupiter.api.Disabled; -/* +/** * SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using * the S3 Staging mechanism for upload of data and "password" authentication for the SSH bastion * configuration. */ @Disabled -public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { +public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { + + protected String schemaName; + // config from which to create / delete schemas. + protected JsonNode baseConfig; + // config which refers to the schema that the test is being run in. + protected JsonNode config; + + private Database database; + + private Connection connection; + + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); + + public SshTunnel.TunnelMethod getTunnelMethod() { + return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + } @Override - public TunnelMethod getTunnelMethod() { - return TunnelMethod.SSH_PASSWORD_AUTH; + protected String getImageName() { + return "airbyte/destination-redshift:dev"; } @Override + protected JsonNode getConfig() throws Exception { + final Map configAsMap = deserializeToObjectMap(config); + final Builder configMapBuilder = new Builder<>().putAll(configAsMap); + return getTunnelConfig(getTunnelMethod(), configMapBuilder); + } + + protected JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final ImmutableMap.Builder builderWithSchema) { + final JsonNode sshBastionHost = config.get("ssh_bastion_host"); + final JsonNode sshBastionPort = config.get("ssh_bastion_port"); + final JsonNode sshBastionUser = config.get("ssh_bastion_user"); + final JsonNode sshBastionPassword = config.get("ssh_bastion_password"); + final JsonNode sshBastionKey = config.get("ssh_bastion_key"); + + final String tunnelUserPassword = tunnelMethod.equals(SSH_PASSWORD_AUTH) ? sshBastionPassword.asText() : ""; + final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? sshBastionKey.asText() : ""; + + return Jsons.jsonNode(builderWithSchema + .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() + .put("tunnel_host", sshBastionHost) + .put("tunnel_method", tunnelMethod.toString()) + .put("tunnel_port", sshBastionPort.intValue()) + .put("tunnel_user", sshBastionUser) + .put("tunnel_user_password", tunnelUserPassword) + .put("ssh_key", sshKey) + .build())) + .build()); + } + + public static Map deserializeToObjectMap(final JsonNode json) { + final ObjectMapper objectMapper = MoreMappers.initMapper(); + return objectMapper.convertValue(json, new TypeReference<>() {}); + } + public JsonNode getStaticConfig() throws IOException { final Path configPath = Path.of("secrets/config_staging.json"); final String configAsString = IOs.readFile(configPath); return Jsons.deserialize(configAsString); } + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("password", "wrong password"); + return invalidConfig; + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace) + throws Exception { + final String tableName = namingResolver.getIdentifier(streamName); + return retrieveRecordsFromTable(tableName, namespace); + } + + @Override + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) + throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) + .collect(Collectors.toList()); + } + + private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { + return SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + return getDatabase().query(ctx -> ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(this::getJsonFromRecord) + .collect(Collectors.toList())); + }); + + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new RedshiftTestDataComparator(); + } + + private Database createDatabaseFromConfig(final JsonNode config) { + connection = ConnectionFactory.create(config.get(JdbcUtils.USERNAME_KEY).asText(), + config.get(JdbcUtils.PASSWORD_KEY).asText(), + RedshiftStagingS3Destination.Companion.getSSL_JDBC_PARAMETERS(), + String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), + config.get(JdbcUtils.HOST_KEY).asText(), + config.get(JdbcUtils.PORT_KEY).asInt(), + config.get(JdbcUtils.DATABASE_KEY).asText())); + + return new Database(DSL.using(connection)); + } + + private Database getDatabase() { + return database; + } + + @Override + protected int getMaxRecordValueLimit() { + return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; + } + + @Override + protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { + baseConfig = getStaticConfig(); + final JsonNode configForSchema = Jsons.clone(baseConfig); + schemaName = TestingNamespaces.generate(); + TEST_SCHEMAS.add(schemaName); + ((ObjectNode) configForSchema).put("schema", schemaName); + config = configForSchema; + database = createDatabaseFromConfig(config); + // create the schema + + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabase().query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName))); + }); + + // create the user + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabase().query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s' SESSION TIMEOUT 60;", + USER_WITHOUT_CREDS, baseConfig.get("password").asText()))); + }); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + // blow away the test schema at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabase().query(ctx -> ctx.fetch(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName))); + }); + + // blow away the user at the end. + SshTunnel.sshWrap( + getConfig(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY, + config -> { + getDatabase().query(ctx -> ctx.fetch(String.format("DROP USER IF EXISTS %s;", USER_WITHOUT_CREDS))); + }); + RedshiftConnectionHandler.close(connection); + } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java deleted file mode 100644 index 1bd18c2eae94..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_KEY_AUTH; -import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableMap.Builder; -import io.airbyte.cdk.db.Database; -import io.airbyte.cdk.db.factory.ConnectionFactory; -import io.airbyte.cdk.db.factory.DatabaseDriver; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.base.ssh.SshTunnel; -import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; -import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces; -import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; -import io.airbyte.commons.jackson.MoreMappers; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; -import java.io.IOException; -import java.sql.Connection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.jooq.impl.DSL; - -public abstract class SshRedshiftDestinationBaseAcceptanceTest extends JdbcDestinationAcceptanceTest { - - protected String schemaName; - // config from which to create / delete schemas. - protected JsonNode baseConfig; - // config which refers to the schema that the test is being run in. - protected JsonNode config; - - private Database database; - - private Connection connection; - - private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); - private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); - - public abstract SshTunnel.TunnelMethod getTunnelMethod(); - - @Override - protected String getImageName() { - return "airbyte/destination-redshift:dev"; - } - - @Override - protected JsonNode getConfig() throws Exception { - final Map configAsMap = deserializeToObjectMap(config); - final Builder configMapBuilder = new Builder<>().putAll(configAsMap); - return getTunnelConfig(getTunnelMethod(), configMapBuilder); - } - - protected JsonNode getTunnelConfig(final SshTunnel.TunnelMethod tunnelMethod, final ImmutableMap.Builder builderWithSchema) { - final JsonNode sshBastionHost = config.get("ssh_bastion_host"); - final JsonNode sshBastionPort = config.get("ssh_bastion_port"); - final JsonNode sshBastionUser = config.get("ssh_bastion_user"); - final JsonNode sshBastionPassword = config.get("ssh_bastion_password"); - final JsonNode sshBastionKey = config.get("ssh_bastion_key"); - - final String tunnelUserPassword = tunnelMethod.equals(SSH_PASSWORD_AUTH) ? sshBastionPassword.asText() : ""; - final String sshKey = tunnelMethod.equals(SSH_KEY_AUTH) ? sshBastionKey.asText() : ""; - - return Jsons.jsonNode(builderWithSchema - .put("tunnel_method", Jsons.jsonNode(ImmutableMap.builder() - .put("tunnel_host", sshBastionHost) - .put("tunnel_method", tunnelMethod.toString()) - .put("tunnel_port", sshBastionPort.intValue()) - .put("tunnel_user", sshBastionUser) - .put("tunnel_user_password", tunnelUserPassword) - .put("ssh_key", sshKey) - .build())) - .build()); - } - - public static Map deserializeToObjectMap(final JsonNode json) { - final ObjectMapper objectMapper = MoreMappers.initMapper(); - return objectMapper.convertValue(json, new TypeReference<>() {}); - } - - public abstract JsonNode getStaticConfig() throws IOException; - - @Override - protected JsonNode getFailCheckConfig() { - final JsonNode invalidConfig = Jsons.clone(config); - ((ObjectNode) invalidConfig).put("password", "wrong password"); - return invalidConfig; - } - - @Override - protected boolean implementsNamespaces() { - return true; - } - - @Override - protected List retrieveNormalizedRecords(final TestDestinationEnv env, final String streamName, final String namespace) - throws Exception { - final String tableName = namingResolver.getIdentifier(streamName); - return retrieveRecordsFromTable(tableName, namespace); - } - - @Override - protected List retrieveRecords(final TestDestinationEnv testEnv, - final String streamName, - final String namespace, - final JsonNode streamSchema) - throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) - .stream() - .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) - .collect(Collectors.toList()); - } - - private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception { - return SshTunnel.sshWrap( - getConfig(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY, - config -> { - return getDatabase().query(ctx -> ctx - .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .stream() - .map(this::getJsonFromRecord) - .collect(Collectors.toList())); - }); - - } - - @Override - protected TestDataComparator getTestDataComparator() { - return new RedshiftTestDataComparator(); - } - - private Database createDatabaseFromConfig(final JsonNode config) { - connection = ConnectionFactory.create(config.get(JdbcUtils.USERNAME_KEY).asText(), - config.get(JdbcUtils.PASSWORD_KEY).asText(), - RedshiftStagingS3Destination.Companion.getSSL_JDBC_PARAMETERS(), - String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), - config.get(JdbcUtils.HOST_KEY).asText(), - config.get(JdbcUtils.PORT_KEY).asInt(), - config.get(JdbcUtils.DATABASE_KEY).asText())); - - return new Database(DSL.using(connection)); - } - - private Database getDatabase() { - return database; - } - - @Override - protected int getMaxRecordValueLimit() { - return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; - } - - @Override - protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { - baseConfig = getStaticConfig(); - final JsonNode configForSchema = Jsons.clone(baseConfig); - schemaName = TestingNamespaces.generate(); - TEST_SCHEMAS.add(schemaName); - ((ObjectNode) configForSchema).put("schema", schemaName); - config = configForSchema; - database = createDatabaseFromConfig(config); - // create the schema - - SshTunnel.sshWrap( - getConfig(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY, - config -> { - getDatabase().query(ctx -> ctx.fetch(String.format("CREATE SCHEMA %s;", schemaName))); - }); - - // create the user - SshTunnel.sshWrap( - getConfig(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY, - config -> { - getDatabase().query(ctx -> ctx.fetch(String.format("CREATE USER %s WITH PASSWORD '%s' SESSION TIMEOUT 60;", - USER_WITHOUT_CREDS, baseConfig.get("password").asText()))); - }); - } - - @Override - protected void tearDown(final TestDestinationEnv testEnv) throws Exception { - // blow away the test schema at the end. - SshTunnel.sshWrap( - getConfig(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY, - config -> { - getDatabase().query(ctx -> ctx.fetch(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName))); - }); - - // blow away the user at the end. - SshTunnel.sshWrap( - getConfig(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY, - config -> { - getDatabase().query(ctx -> ctx.fetch(String.format("DROP USER IF EXISTS %s;", USER_WITHOUT_CREDS))); - }); - RedshiftConnectionHandler.close(connection); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java index a4c11560a591..8eb8c3a9d07c 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java @@ -12,8 +12,8 @@ import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; -import io.airbyte.integrations.destination.redshift.RedshiftStagingS3Destination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; +import io.airbyte.integrations.destination.redshift.RedshiftStagingS3Destination; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGeneratorIntegrationTest.RedshiftSourceOperations; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStream; diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java index aeba929cf24a..623842091a83 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java @@ -29,8 +29,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; -import io.airbyte.integrations.destination.redshift.RedshiftStagingS3Destination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; +import io.airbyte.integrations.destination.redshift.RedshiftStagingS3Destination; import java.nio.file.Files; import java.nio.file.Path; import java.sql.ResultSet; diff --git a/docs/integrations/destinations/redshift-migrations.md b/docs/integrations/destinations/redshift-migrations.md index 7cd43c08cb65..046b666e9775 100644 --- a/docs/integrations/destinations/redshift-migrations.md +++ b/docs/integrations/destinations/redshift-migrations.md @@ -1,14 +1,26 @@ # Redshift Migration Guide +## Upgrading to 3.0.0 + +This version removes support for standard inserts. Although this loading method is easier to set up than S3 staging, it has two major disadvantages: + +* Standard inserts is significantly slower +* Standard inserts is significantly more expensive + +[Redshift's documentation](https://docs.aws.amazon.com/redshift/latest/dg/r_INSERT_30.html#r_INSERT_30_usage_notes) states: +> We strongly encourage you to use the COPY command to load large amounts of data. Using individual INSERT statements to populate a table might be prohibitively slow. + +See our [Redshift docs](https://docs.airbyte.com/integrations/destinations/redshift#for-copy-strategy) for more information on how to set up S3 staging. + ## Upgrading to 2.0.0 This version introduces [Destinations V2](/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early. Worthy of specific mention, this version includes: -- Per-record error handling -- Clearer table structure -- Removal of sub-tables for nested properties -- Removal of SCD tables +* Per-record error handling +* Clearer table structure +* Removal of sub-tables for nested properties +* Removal of SCD tables Learn more about what's new in Destinations V2 [here](/using-airbyte/core-concepts/typing-deduping). diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 8faa80dcd319..ad3fe28a86c6 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -15,7 +15,7 @@ This Redshift destination connector has two replication strategies: [here](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-install.html). **Not recommended for production workloads as this does not scale well**. -For INSERT strategy: +### For INSERT strategy: - **Host** - **Port** @@ -35,7 +35,7 @@ For INSERT strategy: Airbyte automatically picks an approach depending on the given configuration - if S3 configuration is present, Airbyte will use the COPY strategy and vice versa. -For COPY strategy: +### For COPY strategy: - **S3 Bucket Name** - See [this](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) to @@ -241,6 +241,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.0.0 | 2024-06-04 | [38886](https://github.com/airbytehq/airbyte/pull/38886) | Remove standard inserts mode | | 2.6.4 | 2024-05-31 | [38825](https://github.com/airbytehq/airbyte/pull/38825) | Adopt CDK 0.35.15 | | 2.6.3 | 2024-05-31 | [38803](https://github.com/airbytehq/airbyte/pull/38803) | Source auto-conversion to Kotlin | | 2.6.2 | 2024-05-14 | [38189](https://github.com/airbytehq/airbyte/pull/38189) | adding an option to DROP CASCADE on resets |