diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index c24799d19b64..553ae84b43f9 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 2.6.4 + dockerImageTag: 3.0.0 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift @@ -24,6 +24,10 @@ data: This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early. upgradeDeadline: "2024-03-15" + 3.0.0: + message: > + Version 3.0.0 of destination-redshift removes support for the "standard inserts" mode. S3 staging was always preferred for being faster and less expensive, and as part of Airbyte 1.0, we are officially removing the inferior "standard inserts" mode. Upgrading to this version of the destination will require a configuration with an S3 staging area. + upgradeDeadline: "2024-07-31" resourceRequirements: jobSpecific: - jobType: sync diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt index dcef6414ea74..9aadbb493f00 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftDestination.kt @@ -5,35 +5,88 @@ package io.airbyte.integrations.destination.redshift import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.node.ObjectNode +import com.google.common.collect.ImmutableMap +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.db.factory.DataSourceFactory.close +import io.airbyte.cdk.db.factory.DataSourceFactory.create +import io.airbyte.cdk.db.factory.DatabaseDriver +import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcSourceOperations +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.emitConfigErrorTrace import io.airbyte.cdk.integrations.base.Destination import io.airbyte.cdk.integrations.base.IntegrationRunner -import io.airbyte.cdk.integrations.destination.jdbc.copy.SwitchingDestination +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer +import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOverride +import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage +import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination +import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer +import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator +import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption +import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig +import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig.Companion.fromJson +import io.airbyte.cdk.integrations.destination.s3.NoEncryption +import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.attemptS3WriteAndDelete +import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig +import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations +import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory.Companion.builder +import io.airbyte.commons.exceptions.ConnectionErrorException import io.airbyte.commons.json.Jsons.deserialize +import io.airbyte.commons.json.Jsons.emptyObject +import io.airbyte.commons.json.Jsons.jsonNode import io.airbyte.commons.resources.MoreResources.readResource +import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser +import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations +import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants +import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations +import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer import io.airbyte.integrations.destination.redshift.util.RedshiftUtil +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConnectorSpecification -import java.util.function.Function +import java.time.Duration +import java.util.Optional +import java.util.function.Consumer +import javax.sql.DataSource +import org.apache.commons.lang3.NotImplementedException +import org.apache.commons.lang3.StringUtils import org.slf4j.Logger import org.slf4j.LoggerFactory -/** - * The Redshift Destination offers two replication strategies. The first inserts via a typical SQL - * Insert statement. Although less efficient, this requires less user set up. See - * [RedshiftInsertDestination] for more detail. The second inserts via streaming the data to an S3 - * bucket, and Cop-ing the date into Redshift. This is more efficient, and recommended for - * production workloads, but does require users to set up an S3 bucket and pass in additional - * credentials. See [RedshiftStagingS3Destination] for more detail. This class inspect the given - * arguments to determine which strategy to use. - */ class RedshiftDestination : - SwitchingDestination( - DestinationType::class.java, - Function { config: JsonNode -> getTypeFromConfig(config) }, - destinationMap - ) { - enum class DestinationType { - STANDARD, - COPY_S3 + AbstractJdbcDestination( + DRIVER_CLASS, + RedshiftSQLNameTransformer(), + RedshiftSqlOperations() + ), + Destination { + private fun isEphemeralKeysAndPurgingStagingData( + config: JsonNode, + encryptionConfig: EncryptionConfig + ): Boolean { + return !isPurgeStagingData(config) && + encryptionConfig is AesCbcEnvelopeEncryption && + encryptionConfig.keyType == AesCbcEnvelopeEncryption.KeyType.EPHEMERAL } @Throws(Exception::class) @@ -45,42 +98,316 @@ class RedshiftDestination : return originalSpec } - override val isV2Destination: Boolean - get() = true + override fun check(config: JsonNode): AirbyteConnectionStatus? { + val s3Config: S3DestinationConfig = + S3DestinationConfig.getS3DestinationConfig(RedshiftUtil.findS3Options(config)) + val encryptionConfig = + if (config.has(RedshiftDestinationConstants.UPLOADING_METHOD)) + fromJson( + config[RedshiftDestinationConstants.UPLOADING_METHOD][JdbcUtils.ENCRYPTION_KEY] + ) + else NoEncryption() + if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) { + return AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage( + "You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt." + ) + } + attemptS3WriteAndDelete( + S3StorageOperations(RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), + s3Config, + s3Config.bucketPath + ) + + val nameTransformer = namingResolver + val redshiftS3StagingSqlOperations = + RedshiftS3StagingSqlOperations( + nameTransformer, + s3Config.getS3Client(), + s3Config, + encryptionConfig + ) + val dataSource = getDataSource(config) + try { + val database: JdbcDatabase = DefaultJdbcDatabase(dataSource) + val outputSchema = + super.namingResolver.getIdentifier(config[JdbcUtils.SCHEMA_KEY].asText()) + attemptTableOperations( + outputSchema, + database, + nameTransformer, + redshiftS3StagingSqlOperations, + false + ) + RedshiftUtil.checkSvvTableAccess(database) + return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) + } catch (e: ConnectionErrorException) { + val message = getErrorMessage(e.stateCode, e.errorCode, e.exceptionMessage, e) + emitConfigErrorTrace(e, message) + return AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(message) + } catch (e: Exception) { + LOGGER.error("Exception while checking connection: ", e) + return AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage( + """ + Could not connect with provided configuration. + ${e.message} + """.trimIndent() + ) + } finally { + try { + close(dataSource) + } catch (e: Exception) { + LOGGER.warn("Unable to close data source.", e) + } + } + } + + override val isV2Destination: Boolean = true + + override fun getDataSource(config: JsonNode): DataSource { + val jdbcConfig: JsonNode = getJdbcConfig(config) + return create( + jdbcConfig[JdbcUtils.USERNAME_KEY].asText(), + if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY)) jdbcConfig[JdbcUtils.PASSWORD_KEY].asText() + else null, + DRIVER_CLASS, + jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(), + getDefaultConnectionProperties(config), + Duration.ofMinutes(2) + ) + } + + override fun getDatabase(dataSource: DataSource): JdbcDatabase { + return DefaultJdbcDatabase(dataSource) + } + + fun getDatabase(dataSource: DataSource, sourceOperations: JdbcSourceOperations?): JdbcDatabase { + return DefaultJdbcDatabase(dataSource, sourceOperations) + } + + override val namingResolver: NamingConventionTransformer + get() = RedshiftSQLNameTransformer() + + override fun getDefaultConnectionProperties(config: JsonNode): Map { + // The following properties can be overriden through jdbcUrlParameters in the config. + val connectionOptions: MutableMap = HashMap() + // Redshift properties + // https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option + // connectTimeout is different from Hikari pool's connectionTimout, driver defaults to + // 10seconds so + // increase it to match hikari's default + connectionOptions["connectTimeout"] = "120" + // HikariPool properties + // https://github.com/brettwooldridge/HikariCP?tab=readme-ov-file#frequently-used + // connectionTimeout is set explicitly to 2 minutes when creating data source. + // Do aggressive keepAlive with minimum allowed value, this only applies to connection + // sitting idle + // in the pool. + connectionOptions["keepaliveTime"] = Duration.ofSeconds(30).toMillis().toString() + connectionOptions.putAll(SSL_JDBC_PARAMETERS) + return connectionOptions + } + + // this is a no op since we override getDatabase. + override fun toJdbcConfig(config: JsonNode): JsonNode { + return emptyObject() + } + + override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator { + return RedshiftSqlGenerator(namingResolver, config) + } + + override fun getDestinationHandler( + databaseName: String, + database: JdbcDatabase, + rawTableSchema: String + ): JdbcDestinationHandler { + return RedshiftDestinationHandler(databaseName, database, rawTableSchema) + } + + override fun getMigrations( + database: JdbcDatabase, + databaseName: String, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler + ): List> { + return listOf>( + RedshiftRawTableAirbyteMetaMigration(database, databaseName) + ) + } + + @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") + override fun getDataTransformer( + parsedCatalog: ParsedCatalog?, + defaultNamespace: String? + ): StreamAwareDataTransformer { + // Redundant override to keep in consistent with InsertDestination. TODO: Unify these 2 + // classes with + // composition. + return RedshiftSuperLimitationTransformer(parsedCatalog, defaultNamespace!!) + } + + @Deprecated("") + override fun getConsumer( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog, + outputRecordCollector: Consumer + ): AirbyteMessageConsumer? { + throw NotImplementedException("Should use the getSerializedMessageConsumer instead") + } + + @Throws(Exception::class) + override fun getSerializedMessageConsumer( + config: JsonNode, + catalog: ConfiguredAirbyteCatalog, + outputRecordCollector: Consumer + ): SerializedAirbyteMessageConsumer { + val encryptionConfig = + if (config.has(RedshiftDestinationConstants.UPLOADING_METHOD)) + fromJson( + config[RedshiftDestinationConstants.UPLOADING_METHOD][JdbcUtils.ENCRYPTION_KEY] + ) + else NoEncryption() + val s3Options = RedshiftUtil.findS3Options(config) + val s3Config: S3DestinationConfig = S3DestinationConfig.getS3DestinationConfig(s3Options) + + val defaultNamespace = config["schema"].asText() + for (stream in catalog.streams) { + if (StringUtils.isEmpty(stream.stream.namespace)) { + stream.stream.namespace = defaultNamespace + } + } + + val sqlGenerator = RedshiftSqlGenerator(namingResolver, config) + val parsedCatalog: ParsedCatalog + val typerDeduper: TyperDeduper + val database = getDatabase(getDataSource(config)) + val databaseName = config[JdbcUtils.DATABASE_KEY].asText() + val catalogParser: CatalogParser + val rawNamespace: String + if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) { + rawNamespace = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get() + catalogParser = CatalogParser(sqlGenerator, rawNamespace) + } else { + rawNamespace = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE + catalogParser = CatalogParser(sqlGenerator, rawNamespace) + } + val redshiftDestinationHandler = + RedshiftDestinationHandler(databaseName, database, rawNamespace) + parsedCatalog = catalogParser.parseCatalog(catalog) + val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName) + val v2TableMigrator = NoopV2TableMigrator() + val disableTypeDedupe = + config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false) + val redshiftMigrations: List> = + getMigrations(database, databaseName, sqlGenerator, redshiftDestinationHandler) + typerDeduper = + if (disableTypeDedupe) { + NoOpTyperDeduperWithV1V2Migrations( + sqlGenerator, + redshiftDestinationHandler, + parsedCatalog, + migrator, + v2TableMigrator, + redshiftMigrations + ) + } else { + DefaultTyperDeduper( + sqlGenerator, + redshiftDestinationHandler, + parsedCatalog, + migrator, + v2TableMigrator, + redshiftMigrations + ) + } + + return builder( + outputRecordCollector, + database, + RedshiftS3StagingSqlOperations( + namingResolver, + s3Config.getS3Client(), + s3Config, + encryptionConfig + ), + namingResolver, + config, + catalog, + isPurgeStagingData(s3Options), + typerDeduper, + parsedCatalog, + defaultNamespace, + JavaBaseConstants.DestinationColumns.V2_WITH_META + ) + .setDataTransformer(getDataTransformer(parsedCatalog, defaultNamespace)) + .build() + .createAsync() + } + + private fun isPurgeStagingData(config: JsonNode?): Boolean { + return !config!!.has("purge_staging_data") || config["purge_staging_data"].asBoolean() + } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(RedshiftDestination::class.java) - private val destinationMap: Map = - java.util.Map.of( - DestinationType.STANDARD, - RedshiftInsertDestination.Companion.sshWrappedDestination(), - DestinationType.COPY_S3, - RedshiftStagingS3Destination.Companion.sshWrappedDestination() + val DRIVER_CLASS: String = DatabaseDriver.REDSHIFT.driverClassName + val SSL_JDBC_PARAMETERS: Map = + ImmutableMap.of( + JdbcUtils.SSL_KEY, + "true", + "sslfactory", + "com.amazon.redshift.ssl.NonValidatingFactory" ) - private fun getTypeFromConfig(config: JsonNode): DestinationType { - return determineUploadMode(config) + private fun sshWrappedDestination(): Destination { + return SshWrappedDestination( + RedshiftDestination(), + JdbcUtils.HOST_LIST_KEY, + JdbcUtils.PORT_LIST_KEY + ) } - @JvmStatic - fun determineUploadMode(config: JsonNode): DestinationType { - val jsonNode = RedshiftUtil.findS3Options(config) + fun getJdbcConfig(redshiftConfig: JsonNode): JsonNode { + val schema = + Optional.ofNullable(redshiftConfig[JdbcUtils.SCHEMA_KEY]) + .map { obj: JsonNode -> obj.asText() } + .orElse("public") + val configBuilder = + ImmutableMap.builder() + .put(JdbcUtils.USERNAME_KEY, redshiftConfig[JdbcUtils.USERNAME_KEY].asText()) + .put(JdbcUtils.PASSWORD_KEY, redshiftConfig[JdbcUtils.PASSWORD_KEY].asText()) + .put( + JdbcUtils.JDBC_URL_KEY, + String.format( + "jdbc:redshift://%s:%s/%s", + redshiftConfig[JdbcUtils.HOST_KEY].asText(), + redshiftConfig[JdbcUtils.PORT_KEY].asText(), + redshiftConfig[JdbcUtils.DATABASE_KEY].asText() + ) + ) + .put(JdbcUtils.SCHEMA_KEY, schema) - if (RedshiftUtil.anyOfS3FieldsAreNullOrEmpty(jsonNode)) { - LOGGER.warn( - "The \"standard\" upload mode is not performant, and is not recommended for production. " + - "Please use the Amazon S3 upload mode if you are syncing a large amount of data." + if (redshiftConfig.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { + configBuilder.put( + JdbcUtils.JDBC_URL_PARAMS_KEY, + redshiftConfig[JdbcUtils.JDBC_URL_PARAMS_KEY] ) - return DestinationType.STANDARD } - return DestinationType.COPY_S3 + + return jsonNode(configBuilder.build()) } @Throws(Exception::class) @JvmStatic fun main(args: Array) { - val destination: Destination = RedshiftDestination() + val destination: Destination = sshWrappedDestination() LOGGER.info("starting destination: {}", RedshiftDestination::class.java) IntegrationRunner(destination).run(args) LOGGER.info("completed destination: {}", RedshiftDestination::class.java) diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.kt deleted file mode 100644 index 2510e545fbab..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftInsertDestination.kt +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import com.fasterxml.jackson.databind.JsonNode -import com.google.common.collect.ImmutableMap -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.factory.DataSourceFactory.create -import io.airbyte.cdk.db.factory.DatabaseDriver -import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase -import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.db.jdbc.JdbcSourceOperations -import io.airbyte.cdk.db.jdbc.JdbcUtils -import io.airbyte.cdk.integrations.base.Destination -import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination -import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer -import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator -import io.airbyte.commons.json.Jsons.jsonNode -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog -import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator -import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer -import io.airbyte.integrations.destination.redshift.util.RedshiftUtil -import java.time.Duration -import java.util.* -import javax.sql.DataSource - -class RedshiftInsertDestination : - AbstractJdbcDestination( - DRIVER_CLASS, - REDSHIFT_OPTIMAL_BATCH_SIZE_FOR_FLUSH, - RedshiftSQLNameTransformer(), - RedshiftSqlOperations() - ) { - override fun toJdbcConfig(redshiftConfig: JsonNode): JsonNode { - return getJdbcConfig(redshiftConfig) - } - - override fun getDataSource(config: JsonNode): DataSource { - val jdbcConfig = getJdbcConfig(config) - return create( - jdbcConfig[JdbcUtils.USERNAME_KEY].asText(), - if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY)) jdbcConfig[JdbcUtils.PASSWORD_KEY].asText() - else null, - DRIVER_CLASS, - jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(), - getDefaultConnectionProperties(config), - Duration.ofMinutes(2) - ) - } - - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - @Throws(Exception::class) - override fun destinationSpecificTableOperations(database: JdbcDatabase?) { - RedshiftUtil.checkSvvTableAccess(database!!) - } - - override fun getDatabase(dataSource: DataSource): JdbcDatabase { - return DefaultJdbcDatabase(dataSource) - } - - fun getDatabase(dataSource: DataSource, sourceOperations: JdbcSourceOperations?): JdbcDatabase { - return DefaultJdbcDatabase(dataSource, sourceOperations) - } - - override fun getDefaultConnectionProperties(config: JsonNode): Map { - // The following properties can be overriden through jdbcUrlParameters in the config. - val connectionOptions: MutableMap = HashMap() - // Redshift properties - // https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option - // connectTimeout is different from Hikari pool's connectionTimout, driver defaults to - // 10seconds so - // increase it to match hikari's default - connectionOptions["connectTimeout"] = "120" - // See RedshiftProperty.LOG_SERVER_ERROR_DETAIL, defaults to true - connectionOptions["logservererrordetail"] = "false" - // HikariPool properties - // https://github.com/brettwooldridge/HikariCP?tab=readme-ov-file#frequently-used - // TODO: Change data source factory to configure these properties - connectionOptions.putAll(SSL_JDBC_PARAMETERS) - return connectionOptions - } - - override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator { - return RedshiftSqlGenerator(super.namingResolver, config) - } - - override fun getDestinationHandler( - databaseName: String, - database: JdbcDatabase, - rawTableSchema: String - ): JdbcDestinationHandler { - return RedshiftDestinationHandler(databaseName, database, rawTableSchema) - } - - override fun getMigrations( - database: JdbcDatabase, - databaseName: String, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler - ): List> { - return java.util.List.of>( - RedshiftRawTableAirbyteMetaMigration(database, databaseName) - ) - } - - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - override fun getDataTransformer( - parsedCatalog: ParsedCatalog?, - defaultNamespace: String? - ): StreamAwareDataTransformer { - return RedshiftSuperLimitationTransformer(parsedCatalog, defaultNamespace!!) - } - - companion object { - val DRIVER_CLASS: String = DatabaseDriver.REDSHIFT.driverClassName - @JvmField - val SSL_JDBC_PARAMETERS: Map = - ImmutableMap.of( - JdbcUtils.SSL_KEY, - "true", - "sslfactory", - "com.amazon.redshift.ssl.NonValidatingFactory" - ) - - // insert into stmt has ~200 bytes - // Per record overhead of ~150 bytes for strings in statement like JSON_PARSE.. uuid etc - // If the flush size allows the max batch of 10k records, then net overhead is ~1.5MB. - // Lets round it to 2MB for wiggle room and keep a max buffer of 14MB per flush. - // This will allow not sending record set larger than 14M limiting the batch insert - // statement. - private const val REDSHIFT_OPTIMAL_BATCH_SIZE_FOR_FLUSH = 14 * 1024 * 1024L - - fun sshWrappedDestination(): Destination { - return SshWrappedDestination( - RedshiftInsertDestination(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY - ) - } - - fun getJdbcConfig(redshiftConfig: JsonNode): JsonNode { - val schema = - Optional.ofNullable(redshiftConfig[JdbcUtils.SCHEMA_KEY]) - .map { obj: JsonNode -> obj.asText() } - .orElse("public") - val configBuilder = - ImmutableMap.builder() - .put(JdbcUtils.USERNAME_KEY, redshiftConfig[JdbcUtils.USERNAME_KEY].asText()) - .put(JdbcUtils.PASSWORD_KEY, redshiftConfig[JdbcUtils.PASSWORD_KEY].asText()) - .put( - JdbcUtils.JDBC_URL_KEY, - String.format( - "jdbc:redshift://%s:%s/%s", - redshiftConfig[JdbcUtils.HOST_KEY].asText(), - redshiftConfig[JdbcUtils.PORT_KEY].asText(), - redshiftConfig[JdbcUtils.DATABASE_KEY].asText() - ) - ) - .put(JdbcUtils.SCHEMA_KEY, schema) - - if (redshiftConfig.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { - configBuilder.put( - JdbcUtils.JDBC_URL_PARAMS_KEY, - redshiftConfig[JdbcUtils.JDBC_URL_PARAMS_KEY] - ) - } - - return jsonNode(configBuilder.build()) - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.kt b/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.kt deleted file mode 100644 index c59c8969c57e..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/main/kotlin/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.kt +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.destination.redshift - -import com.fasterxml.jackson.databind.JsonNode -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.factory.DataSourceFactory.close -import io.airbyte.cdk.db.factory.DataSourceFactory.create -import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase -import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.db.jdbc.JdbcUtils -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer -import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.emitConfigErrorTrace -import io.airbyte.cdk.integrations.base.Destination -import io.airbyte.cdk.integrations.base.JavaBaseConstants -import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer -import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOverride -import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage -import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination -import io.airbyte.cdk.integrations.destination.NamingConventionTransformer -import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer -import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator -import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption -import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig -import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig.Companion.fromJson -import io.airbyte.cdk.integrations.destination.s3.NoEncryption -import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.attemptS3WriteAndDelete -import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig -import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations -import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory.Companion.builder -import io.airbyte.commons.exceptions.ConnectionErrorException -import io.airbyte.commons.json.Jsons.emptyObject -import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser -import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler -import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations -import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog -import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper -import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration -import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants -import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState -import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer -import io.airbyte.integrations.destination.redshift.util.RedshiftUtil -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus -import io.airbyte.protocol.models.v0.AirbyteMessage -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog -import java.time.Duration -import java.util.function.Consumer -import javax.sql.DataSource -import org.apache.commons.lang3.NotImplementedException -import org.apache.commons.lang3.StringUtils -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -class RedshiftStagingS3Destination : - AbstractJdbcDestination( - RedshiftInsertDestination.DRIVER_CLASS, - RedshiftSQLNameTransformer(), - RedshiftSqlOperations() - ), - Destination { - private fun isEphemeralKeysAndPurgingStagingData( - config: JsonNode, - encryptionConfig: EncryptionConfig - ): Boolean { - return !isPurgeStagingData(config) && - encryptionConfig is AesCbcEnvelopeEncryption && - encryptionConfig.keyType == AesCbcEnvelopeEncryption.KeyType.EPHEMERAL - } - - override fun check(config: JsonNode): AirbyteConnectionStatus? { - val s3Config: S3DestinationConfig = - S3DestinationConfig.getS3DestinationConfig(RedshiftUtil.findS3Options(config)) - val encryptionConfig = - if (config.has(RedshiftDestinationConstants.UPLOADING_METHOD)) - fromJson( - config[RedshiftDestinationConstants.UPLOADING_METHOD][JdbcUtils.ENCRYPTION_KEY] - ) - else NoEncryption() - if (isEphemeralKeysAndPurgingStagingData(config, encryptionConfig)) { - return AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage( - "You cannot use ephemeral keys and disable purging your staging data. This would produce S3 objects that you cannot decrypt." - ) - } - attemptS3WriteAndDelete( - S3StorageOperations(RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config), - s3Config, - s3Config.bucketPath - ) - - val nameTransformer = namingResolver - val redshiftS3StagingSqlOperations = - RedshiftS3StagingSqlOperations( - nameTransformer, - s3Config.getS3Client(), - s3Config, - encryptionConfig - ) - val dataSource = getDataSource(config) - try { - val database: JdbcDatabase = DefaultJdbcDatabase(dataSource) - val outputSchema = - super.namingResolver.getIdentifier(config[JdbcUtils.SCHEMA_KEY].asText()) - attemptTableOperations( - outputSchema, - database, - nameTransformer, - redshiftS3StagingSqlOperations, - false - ) - RedshiftUtil.checkSvvTableAccess(database) - return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) - } catch (e: ConnectionErrorException) { - val message = getErrorMessage(e.stateCode, e.errorCode, e.exceptionMessage, e) - emitConfigErrorTrace(e, message) - return AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage(message) - } catch (e: Exception) { - LOGGER.error("Exception while checking connection: ", e) - return AirbyteConnectionStatus() - .withStatus(AirbyteConnectionStatus.Status.FAILED) - .withMessage( - """ - Could not connect with provided configuration. - ${e.message} - """.trimIndent() - ) - } finally { - try { - close(dataSource) - } catch (e: Exception) { - LOGGER.warn("Unable to close data source.", e) - } - } - } - - override fun getDataSource(config: JsonNode): DataSource { - val jdbcConfig: JsonNode = RedshiftInsertDestination.Companion.getJdbcConfig(config) - return create( - jdbcConfig[JdbcUtils.USERNAME_KEY].asText(), - if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY)) jdbcConfig[JdbcUtils.PASSWORD_KEY].asText() - else null, - RedshiftInsertDestination.Companion.DRIVER_CLASS, - jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(), - getDefaultConnectionProperties(config), - Duration.ofMinutes(2) - ) - } - - override val namingResolver: NamingConventionTransformer - get() = RedshiftSQLNameTransformer() - - override fun getDefaultConnectionProperties(config: JsonNode): Map { - // TODO: Pull common code from RedshiftInsertDestination and RedshiftStagingS3Destination - // into a - // base class. - // The following properties can be overriden through jdbcUrlParameters in the config. - val connectionOptions: MutableMap = HashMap() - // Redshift properties - // https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-connecttimeout-option - // connectTimeout is different from Hikari pool's connectionTimout, driver defaults to - // 10seconds so - // increase it to match hikari's default - connectionOptions["connectTimeout"] = "120" - // HikariPool properties - // https://github.com/brettwooldridge/HikariCP?tab=readme-ov-file#frequently-used - // connectionTimeout is set explicitly to 2 minutes when creating data source. - // Do aggressive keepAlive with minimum allowed value, this only applies to connection - // sitting idle - // in the pool. - connectionOptions["keepaliveTime"] = Duration.ofSeconds(30).toMillis().toString() - connectionOptions.putAll(RedshiftInsertDestination.Companion.SSL_JDBC_PARAMETERS) - return connectionOptions - } - - // this is a no op since we override getDatabase. - override fun toJdbcConfig(config: JsonNode): JsonNode { - return emptyObject() - } - - override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator { - return RedshiftSqlGenerator(namingResolver, config) - } - - override fun getDestinationHandler( - databaseName: String, - database: JdbcDatabase, - rawTableSchema: String - ): JdbcDestinationHandler { - return RedshiftDestinationHandler(databaseName, database, rawTableSchema) - } - - protected override fun getMigrations( - database: JdbcDatabase, - databaseName: String, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler - ): List> { - return java.util.List.of>( - RedshiftRawTableAirbyteMetaMigration(database, databaseName) - ) - } - - @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") - override fun getDataTransformer( - parsedCatalog: ParsedCatalog?, - defaultNamespace: String? - ): StreamAwareDataTransformer { - // Redundant override to keep in consistent with InsertDestination. TODO: Unify these 2 - // classes with - // composition. - return RedshiftSuperLimitationTransformer(parsedCatalog, defaultNamespace!!) - } - - @Deprecated("") - override fun getConsumer( - config: JsonNode, - catalog: ConfiguredAirbyteCatalog, - outputRecordCollector: Consumer - ): AirbyteMessageConsumer? { - throw NotImplementedException("Should use the getSerializedMessageConsumer instead") - } - - @Throws(Exception::class) - override fun getSerializedMessageConsumer( - config: JsonNode, - catalog: ConfiguredAirbyteCatalog, - outputRecordCollector: Consumer - ): SerializedAirbyteMessageConsumer? { - val encryptionConfig = - if (config.has(RedshiftDestinationConstants.UPLOADING_METHOD)) - fromJson( - config[RedshiftDestinationConstants.UPLOADING_METHOD][JdbcUtils.ENCRYPTION_KEY] - ) - else NoEncryption() - val s3Options = RedshiftUtil.findS3Options(config) - val s3Config: S3DestinationConfig = S3DestinationConfig.getS3DestinationConfig(s3Options) - - val defaultNamespace = config["schema"].asText() - for (stream in catalog.streams) { - if (StringUtils.isEmpty(stream.stream.namespace)) { - stream.stream.namespace = defaultNamespace - } - } - - val sqlGenerator = RedshiftSqlGenerator(namingResolver, config) - val parsedCatalog: ParsedCatalog - val typerDeduper: TyperDeduper - val database = getDatabase(getDataSource(config)) - val databaseName = config[JdbcUtils.DATABASE_KEY].asText() - val catalogParser: CatalogParser - val rawNamespace: String - if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) { - rawNamespace = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get() - catalogParser = CatalogParser(sqlGenerator, rawNamespace) - } else { - rawNamespace = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE - catalogParser = CatalogParser(sqlGenerator, rawNamespace) - } - val redshiftDestinationHandler = - RedshiftDestinationHandler(databaseName, database, rawNamespace) - parsedCatalog = catalogParser.parseCatalog(catalog) - val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName) - val v2TableMigrator = NoopV2TableMigrator() - val disableTypeDedupe = - config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false) - val redshiftMigrations: List> = - getMigrations(database, databaseName, sqlGenerator, redshiftDestinationHandler) - typerDeduper = - if (disableTypeDedupe) { - NoOpTyperDeduperWithV1V2Migrations( - sqlGenerator, - redshiftDestinationHandler, - parsedCatalog, - migrator, - v2TableMigrator, - redshiftMigrations - ) - } else { - DefaultTyperDeduper( - sqlGenerator, - redshiftDestinationHandler, - parsedCatalog, - migrator, - v2TableMigrator, - redshiftMigrations - ) - } - - return builder( - outputRecordCollector, - database, - RedshiftS3StagingSqlOperations( - namingResolver, - s3Config.getS3Client(), - s3Config, - encryptionConfig - ), - namingResolver, - config, - catalog, - isPurgeStagingData(s3Options), - typerDeduper, - parsedCatalog, - defaultNamespace, - JavaBaseConstants.DestinationColumns.V2_WITH_META - ) - .setDataTransformer(getDataTransformer(parsedCatalog, defaultNamespace)) - .build() - .createAsync() - } - - private fun isPurgeStagingData(config: JsonNode?): Boolean { - return !config!!.has("purge_staging_data") || config["purge_staging_data"].asBoolean() - } - - companion object { - private val LOGGER: Logger = - LoggerFactory.getLogger(RedshiftStagingS3Destination::class.java) - - fun sshWrappedDestination(): Destination { - return SshWrappedDestination( - RedshiftStagingS3Destination(), - JdbcUtils.HOST_LIST_KEY, - JdbcUtils.PORT_LIST_KEY - ) - } - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json index 6372072375c4..7f291ffc3ba5 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-redshift/src/main/resources/spec.json @@ -225,17 +225,6 @@ "order": 7 } } - }, - { - "title": "Standard", - "required": ["method"], - "description": "(not recommended) Direct loading using SQL INSERT statements. This method is extremely inefficient and provided only for quick testing. In all other cases, you should use S3 uploading.", - "properties": { - "method": { - "type": "string", - "const": "Standard" - } - } } ] }, diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java index dfefbf0c0f10..04b352e8bd11 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftConnectionTest.java @@ -17,7 +17,7 @@ public class RedshiftConnectionTest { - private final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); + private final JsonNode config = Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json"))); private final RedshiftDestination destination = new RedshiftDestination(); private AirbyteConnectionStatus status; diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java deleted file mode 100644 index 4ea666f811da..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationAcceptanceTest.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import com.amazon.redshift.util.RedshiftTimestamp; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.cdk.db.Database; -import io.airbyte.cdk.db.factory.ConnectionFactory; -import io.airbyte.cdk.db.factory.DatabaseDriver; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; -import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces; -import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; -import java.io.IOException; -import java.sql.Connection; -import java.sql.SQLException; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.ChronoField; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import org.jooq.impl.DSL; -import org.junit.jupiter.api.parallel.Execution; -import org.junit.jupiter.api.parallel.ExecutionMode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// these tests are not yet thread-safe, unlike the DV2 tests. -@Execution(ExecutionMode.SAME_THREAD) -public abstract class RedshiftDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftDestinationAcceptanceTest.class); - - // config from which to create / delete schemas. - private JsonNode baseConfig; - // config which refers to the schema that the test is being run in. - protected JsonNode config; - private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); - private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); - - private Database database; - private Connection connection; - protected TestDestinationEnv testDestinationEnv; - - @Override - protected String getImageName() { - return "airbyte/destination-redshift:dev"; - } - - @Override - protected JsonNode getConfig() { - return config; - } - - public abstract JsonNode getStaticConfig() throws IOException; - - @Override - protected JsonNode getFailCheckConfig() { - final JsonNode invalidConfig = Jsons.clone(config); - ((ObjectNode) invalidConfig).put("password", "wrong password"); - return invalidConfig; - } - - @Override - protected TestDataComparator getTestDataComparator() { - return new RedshiftTestDataComparator(); - } - - @Override - protected boolean supportBasicDataTypeTest() { - return true; - } - - @Override - protected boolean supportArrayDataTypeTest() { - return true; - } - - @Override - protected boolean supportObjectDataTypeTest() { - return true; - } - - @Override - protected boolean supportIncrementalSchemaChanges() { - return true; - } - - @Override - protected boolean supportsInDestinationNormalization() { - return true; - } - - @Override - protected List retrieveRecords(final TestDestinationEnv env, - final String streamName, - final String namespace, - final JsonNode streamSchema) - throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) - .stream() - .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) - .collect(Collectors.toList()); - } - - @Override - protected boolean implementsNamespaces() { - return true; - } - - @Override - protected List retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace) - throws Exception { - String tableName = namingResolver.getIdentifier(streamName); - if (!tableName.startsWith("\"")) { - // Currently, Normalization always quote tables identifiers - tableName = "\"" + tableName + "\""; - } - return retrieveRecordsFromTable(tableName, namespace); - } - - private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { - return getDatabase().query( - ctx -> ctx - .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .stream() - .map(record -> getJsonFromRecord( - record, - value -> { - if (value instanceof final RedshiftTimestamp rts) { - // We can't just use rts.toInstant().toString(), because that will mangle historical - // dates (e.g. 1504-02-28...) because toInstant() just converts to epoch millis, - // which works _very badly_ for for very old dates. - // Instead, convert to a string and then parse that string. - // We can't just rts.toString(), because that loses the timezone... - // so instead we use getPostgresqlString and parse that >.> - // Thanks, redshift. - return Optional.of( - ZonedDateTime.parse( - rts.getPostgresqlString(), - new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd HH:mm:ss") - .optionalStart() - .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 9, true) - .optionalEnd() - .appendPattern("X") - .toFormatter()) - .withZoneSameInstant(ZoneOffset.UTC) - .toString()); - } else { - return Optional.empty(); - } - })) - .collect(Collectors.toList())); - } - - // for each test we create a new schema in the database. run the test in there and then remove it. - @Override - protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { - final String schemaName = TestingNamespaces.generate(); - final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); - baseConfig = getStaticConfig(); - database = createDatabase(); - removeOldNamespaces(); - getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); - final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;", - USER_WITHOUT_CREDS, baseConfig.get("password").asText()); - getDatabase().query(ctx -> ctx.execute(createUser)); - final JsonNode configForSchema = Jsons.clone(baseConfig); - ((ObjectNode) configForSchema).put("schema", schemaName); - TEST_SCHEMAS.add(schemaName); - config = configForSchema; - testDestinationEnv = testEnv; - } - - private void removeOldNamespaces() { - final List schemas; - try { - schemas = getDatabase().query(ctx -> ctx.fetch("SELECT schema_name FROM information_schema.schemata;")) - .stream() - .map(record -> record.get("schema_name").toString()) - .toList(); - } catch (final SQLException e) { - // if we can't fetch the schemas, just return. - return; - } - - int schemasDeletedCount = 0; - for (final String schema : schemas) { - if (TestingNamespaces.isOlderThan2Days(schema)) { - try { - getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); - schemasDeletedCount++; - } catch (final SQLException e) { - LOGGER.error("Failed to delete old dataset: {}", schema, e); - } - } - } - LOGGER.info("Deleted {} old schemas.", schemasDeletedCount); - } - - @Override - protected void tearDown(final TestDestinationEnv testEnv) throws Exception { - System.out.println("TEARING_DOWN_SCHEMAS: " + getTestSchemas()); - getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", config.get("schema").asText()))); - for (final String schema : getTestSchemas()) { - getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); - } - getDatabase().query(ctx -> ctx.execute(String.format("drop user if exists %s;", USER_WITHOUT_CREDS))); - RedshiftConnectionHandler.close(connection); - } - - protected Database createDatabase() { - connection = ConnectionFactory.create(baseConfig.get(JdbcUtils.USERNAME_KEY).asText(), - baseConfig.get(JdbcUtils.PASSWORD_KEY).asText(), - RedshiftInsertDestination.SSL_JDBC_PARAMETERS, - String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), - baseConfig.get(JdbcUtils.HOST_KEY).asText(), - baseConfig.get(JdbcUtils.PORT_KEY).asInt(), - baseConfig.get(JdbcUtils.DATABASE_KEY).asText())); - - return new Database(DSL.using(connection)); - } - - protected Database getDatabase() { - return database; - } - - @Override - protected int getMaxRecordValueLimit() { - return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java deleted file mode 100644 index 59e9130af79f..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftInsertDestinationAcceptanceTest.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.json.Jsons; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import org.junit.jupiter.api.Disabled; - -/** - * Integration test testing the {@link RedshiftInsertDestination}. - */ -@Disabled -public class RedshiftInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest { - - public JsonNode getStaticConfig() throws IOException { - return Jsons.deserialize(Files.readString(Path.of("secrets/config.json"))); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java index 0732e6041454..a8b359e791d9 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftS3StagingInsertDestinationAcceptanceTest.java @@ -4,21 +4,250 @@ package io.airbyte.integrations.destination.redshift; +import com.amazon.redshift.util.RedshiftTimestamp; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.cdk.db.Database; +import io.airbyte.cdk.db.factory.ConnectionFactory; +import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; +import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces; +import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import java.nio.file.Path; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.jooq.impl.DSL; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Integration test testing {@link RedshiftStagingS3Destination}. The default Redshift integration - * test credentials contain S3 credentials - this automatically causes COPY to be selected. + * Integration test testing {@link RedshiftDestination}. The default Redshift integration test + * credentials contain S3 credentials - this automatically causes COPY to be selected. */ +// these tests are not yet thread-safe, unlike the DV2 tests. +@Execution(ExecutionMode.SAME_THREAD) @Disabled -public class RedshiftS3StagingInsertDestinationAcceptanceTest extends RedshiftDestinationAcceptanceTest { +public class RedshiftS3StagingInsertDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftS3StagingInsertDestinationAcceptanceTest.class); + + // config from which to create / delete schemas. + private JsonNode baseConfig; + // config which refers to the schema that the test is being run in. + protected JsonNode config; + private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); + + private Database database; + private Connection connection; + protected TestDestinationEnv testDestinationEnv; + + @Override + protected String getImageName() { + return "airbyte/destination-redshift:dev"; + } + + @Override + protected JsonNode getConfig() { + return config; + } public JsonNode getStaticConfig() { return Jsons.deserialize(IOs.readFile(Path.of("secrets/config_staging.json"))); } + @Override + protected JsonNode getFailCheckConfig() { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("password", "wrong password"); + return invalidConfig; + } + + @Override + protected TestDataComparator getTestDataComparator() { + return new RedshiftTestDataComparator(); + } + + @Override + protected boolean supportBasicDataTypeTest() { + return true; + } + + @Override + protected boolean supportArrayDataTypeTest() { + return true; + } + + @Override + protected boolean supportObjectDataTypeTest() { + return true; + } + + @Override + protected boolean supportIncrementalSchemaChanges() { + return true; + } + + @Override + protected boolean supportsInDestinationNormalization() { + return true; + } + + @Override + protected List retrieveRecords(final TestDestinationEnv env, + final String streamName, + final String namespace, + final JsonNode streamSchema) + throws Exception { + return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + .stream() + .map(j -> j.get(JavaBaseConstants.COLUMN_NAME_DATA)) + .collect(Collectors.toList()); + } + + @Override + protected boolean implementsNamespaces() { + return true; + } + + @Override + protected List retrieveNormalizedRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace) + throws Exception { + String tableName = namingResolver.getIdentifier(streamName); + if (!tableName.startsWith("\"")) { + // Currently, Normalization always quote tables identifiers + tableName = "\"" + tableName + "\""; + } + return retrieveRecordsFromTable(tableName, namespace); + } + + private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { + return getDatabase().query( + ctx -> ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) + .stream() + .map(record -> getJsonFromRecord( + record, + value -> { + if (value instanceof final RedshiftTimestamp rts) { + // We can't just use rts.toInstant().toString(), because that will mangle historical + // dates (e.g. 1504-02-28...) because toInstant() just converts to epoch millis, + // which works _very badly_ for for very old dates. + // Instead, convert to a string and then parse that string. + // We can't just rts.toString(), because that loses the timezone... + // so instead we use getPostgresqlString and parse that >.> + // Thanks, redshift. + return Optional.of( + ZonedDateTime.parse( + rts.getPostgresqlString(), + new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd HH:mm:ss") + .optionalStart() + .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 9, true) + .optionalEnd() + .appendPattern("X") + .toFormatter()) + .withZoneSameInstant(ZoneOffset.UTC) + .toString()); + } else { + return Optional.empty(); + } + })) + .collect(Collectors.toList())); + } + + // for each test we create a new schema in the database. run the test in there and then remove it. + @Override + protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws Exception { + final String schemaName = TestingNamespaces.generate(); + final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); + baseConfig = getStaticConfig(); + database = createDatabase(); + removeOldNamespaces(); + getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); + final String createUser = String.format("create user %s with password '%s' SESSION TIMEOUT 60;", + USER_WITHOUT_CREDS, baseConfig.get("password").asText()); + getDatabase().query(ctx -> ctx.execute(createUser)); + final JsonNode configForSchema = Jsons.clone(baseConfig); + ((ObjectNode) configForSchema).put("schema", schemaName); + TEST_SCHEMAS.add(schemaName); + config = configForSchema; + testDestinationEnv = testEnv; + } + + private void removeOldNamespaces() { + final List schemas; + try { + schemas = getDatabase().query(ctx -> ctx.fetch("SELECT schema_name FROM information_schema.schemata;")) + .stream() + .map(record -> record.get("schema_name").toString()) + .toList(); + } catch (final SQLException e) { + // if we can't fetch the schemas, just return. + return; + } + + int schemasDeletedCount = 0; + for (final String schema : schemas) { + if (TestingNamespaces.isOlderThan2Days(schema)) { + try { + getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); + schemasDeletedCount++; + } catch (final SQLException e) { + LOGGER.error("Failed to delete old dataset: {}", schema, e); + } + } + } + LOGGER.info("Deleted {} old schemas.", schemasDeletedCount); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + System.out.println("TEARING_DOWN_SCHEMAS: " + getTestSchemas()); + getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", config.get("schema").asText()))); + for (final String schema : getTestSchemas()) { + getDatabase().query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE", schema))); + } + getDatabase().query(ctx -> ctx.execute(String.format("drop user if exists %s;", USER_WITHOUT_CREDS))); + RedshiftConnectionHandler.close(connection); + } + + protected Database createDatabase() { + connection = ConnectionFactory.create(baseConfig.get(JdbcUtils.USERNAME_KEY).asText(), + baseConfig.get(JdbcUtils.PASSWORD_KEY).asText(), + RedshiftDestination.Companion.getSSL_JDBC_PARAMETERS(), + String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), + baseConfig.get(JdbcUtils.HOST_KEY).asText(), + baseConfig.get(JdbcUtils.PORT_KEY).asInt(), + baseConfig.get(JdbcUtils.DATABASE_KEY).asText())); + + return new Database(DSL.using(connection)); + } + + protected Database getDatabase() { + return database; + } + + @Override + protected int getMaxRecordValueLimit() { + return RedshiftSqlOperations.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; + } + } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftS3StagingDestinationAcceptanceTest.java similarity index 51% rename from airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java rename to airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftS3StagingDestinationAcceptanceTest.java index 4c2da0a04ce7..4cb605df2e27 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshKeyRedshiftS3StagingDestinationAcceptanceTest.java @@ -4,12 +4,7 @@ package io.airbyte.integrations.destination.redshift; -import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import java.io.IOException; -import java.nio.file.Path; import org.junit.jupiter.api.Disabled; /* @@ -17,17 +12,11 @@ * Insert mechanism for upload of data and "key" authentication for the SSH bastion configuration. */ @Disabled -public class SshKeyRedshiftInsertDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { +public class SshKeyRedshiftS3StagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { @Override public TunnelMethod getTunnelMethod() { return TunnelMethod.SSH_KEY_AUTH; } - public JsonNode getStaticConfig() throws IOException { - final Path configPath = Path.of("secrets/config.json"); - final String configAsString = IOs.readFile(configPath); - return Jsons.deserialize(configAsString); - } - } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java index 72c992fdb183..4b56ca2e97d5 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshPasswordRedshiftStagingDestinationAcceptanceTest.java @@ -4,15 +4,12 @@ package io.airbyte.integrations.destination.redshift; -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import java.io.IOException; -import java.nio.file.Path; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH; + +import io.airbyte.cdk.integrations.base.ssh.SshTunnel; import org.junit.jupiter.api.Disabled; -/* +/** * SshPasswordRedshiftStagingDestinationAcceptanceTest runs basic Redshift Destination Tests using * the S3 Staging mechanism for upload of data and "password" authentication for the SSH bastion * configuration. @@ -21,15 +18,8 @@ public class SshPasswordRedshiftStagingDestinationAcceptanceTest extends SshRedshiftDestinationBaseAcceptanceTest { @Override - public TunnelMethod getTunnelMethod() { - return TunnelMethod.SSH_PASSWORD_AUTH; - } - - @Override - public JsonNode getStaticConfig() throws IOException { - final Path configPath = Path.of("secrets/config_staging.json"); - final String configAsString = IOs.readFile(configPath); - return Jsons.deserialize(configAsString); + public SshTunnel.TunnelMethod getTunnelMethod() { + return SSH_PASSWORD_AUTH; } } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java index 22ef33b5b098..325385f60233 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/SshRedshiftDestinationBaseAcceptanceTest.java @@ -22,11 +22,13 @@ import io.airbyte.cdk.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; import io.airbyte.cdk.integrations.standardtest.destination.TestingNamespaces; import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; +import io.airbyte.commons.io.IOs; import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import java.io.IOException; +import java.nio.file.Path; import java.sql.Connection; import java.util.HashSet; import java.util.List; @@ -90,7 +92,11 @@ public static Map deserializeToObjectMap(final JsonNode json) { return objectMapper.convertValue(json, new TypeReference<>() {}); } - public abstract JsonNode getStaticConfig() throws IOException; + public JsonNode getStaticConfig() throws IOException { + final Path configPath = Path.of("secrets/config_staging.json"); + final String configAsString = IOs.readFile(configPath); + return Jsons.deserialize(configAsString); + } @Override protected JsonNode getFailCheckConfig() { @@ -146,7 +152,7 @@ protected TestDataComparator getTestDataComparator() { private Database createDatabaseFromConfig(final JsonNode config) { connection = ConnectionFactory.create(config.get(JdbcUtils.USERNAME_KEY).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText(), - RedshiftInsertDestination.SSL_JDBC_PARAMETERS, + RedshiftDestination.Companion.getSSL_JDBC_PARAMETERS(), String.format(DatabaseDriver.REDSHIFT.getUrlFormatString(), config.get(JdbcUtils.HOST_KEY).asText(), config.get(JdbcUtils.PORT_KEY).asInt(), diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java index 729a1c90245c..1a5774b6fc4a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.destination.redshift.typing_deduping; -import static io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer.*; +import static io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer.REDSHIFT_VARCHAR_MAX_BYTE_SIZE; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -12,7 +12,7 @@ import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; -import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination; +import io.airbyte.integrations.destination.redshift.RedshiftDestination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGeneratorIntegrationTest.RedshiftSourceOperations; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -41,7 +41,7 @@ protected String getImageName() { @Override protected DataSource getDataSource(final JsonNode config) { - return new RedshiftInsertDestination().getDataSource(config); + return new RedshiftDestination().getDataSource(config); } @Override diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java index ddcf82773bd3..cc0fbd74c904 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftSqlGeneratorIntegrationTest.java @@ -29,7 +29,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus; import io.airbyte.integrations.base.destination.typing_deduping.Sql; -import io.airbyte.integrations.destination.redshift.RedshiftInsertDestination; +import io.airbyte.integrations.destination.redshift.RedshiftDestination; import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer; import java.nio.file.Files; import java.nio.file.Path; @@ -138,7 +138,7 @@ public static void setupJdbcDatasource() throws Exception { databaseName = config.get(JdbcUtils.DATABASE_KEY).asText(); // TODO: Its sad to instantiate unneeded dependency to construct database and datsources. pull it to // static methods. - final RedshiftInsertDestination insertDestination = new RedshiftInsertDestination(); + final RedshiftDestination insertDestination = new RedshiftDestination(); dataSource = insertDestination.getDataSource(config); database = insertDestination.getDatabase(dataSource, new RedshiftSourceOperations()); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java deleted file mode 100644 index b7c78a4cec8e..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift.typing_deduping; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import java.nio.file.Path; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -public class RedshiftStandardInsertsRawSchemaOverrideDisableTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { - - @Override - protected ObjectNode getBaseConfig() { - return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config_raw_schema_override.json"))); - } - - @Override - protected String getRawSchema() { - return "overridden_raw_dataset"; - } - - @Override - protected boolean disableFinalTableComparison() { - return true; - } - - @Disabled - @Test - @Override - public void identicalNameSimultaneousSync() {} - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java deleted file mode 100644 index 7f99362777dd..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/typing_deduping/RedshiftStandardInsertsTypingDedupingTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift.typing_deduping; - -import static org.junit.jupiter.api.Assertions.*; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.io.IOs; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteMessage; -import io.airbyte.protocol.models.v0.AirbyteStream; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import org.junit.jupiter.api.Test; - -public class RedshiftStandardInsertsTypingDedupingTest extends AbstractRedshiftTypingDedupingTest { - - @Override - protected ObjectNode getBaseConfig() { - return (ObjectNode) Jsons.deserialize(IOs.readFile(Path.of("secrets/1s1t_config.json"))); - } - - @Test - public void testStandardInsertBatchSizeGtThan16Mb() throws Exception { - final String placeholderRecord = """ - {"type": "RECORD", - "record":{ - "emitted_at": 1000, - "data": { - "id1": 1, - "id2": 200, - "updated_at": "2000-01-01T00:00:00Z", - "_ab_cdc_deleted_at": null, - "name": "PLACE_HOLDER", - "address": {"city": "San Francisco", "state": "CA"}} - } - } - """; - final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) - .withStream(new AirbyteStream() - .withNamespace(getStreamNamespace()) - .withName(getStreamName()) - .withJsonSchema(getSchema())))); - List messages = new ArrayList<>(); - final int numberOfRecords = 1000; - for (int i = 0; i < numberOfRecords; ++i) { - // Stuff the record with 40Kb string, making the total record size to 41233 bytes - // Total sync generates ~39MB in 1000 records. - // Standard insert should not fail and chunk it into smaller inserts < 16MB statement length - final AirbyteMessage placeHolderMessage = Jsons.deserialize(placeholderRecord, AirbyteMessage.class); - placeHolderMessage.getRecord().setNamespace(getStreamNamespace()); - placeHolderMessage.getRecord().setStream(getStreamName()); - ((ObjectNode) placeHolderMessage.getRecord().getData()).put("id1", i); - ((ObjectNode) placeHolderMessage.getRecord().getData()).put("id2", 200 + i); - ((ObjectNode) placeHolderMessage.getRecord().getData()).put("name", generateRandomString(40 * 1024)); - messages.add(placeHolderMessage); - } - runSync(catalog, messages); - // we just need to iterate over final tables to verify the count and confirm they are inserted - // properly. - List finalTableResults = dumpFinalTableRecords(getStreamNamespace(), getStreamName()); - assertEquals(1000, finalTableResults.size()); - // getJsons query doesn't have order by clause, so using sum of n-numbers math to assert all IDs are - // inserted - int id1sum = 0; - int id2sum = 0; - int id1ExpectedSum = ((numberOfRecords - 1) * (numberOfRecords)) / 2; // n(n+1)/2 - int id2ExpectedSum = (200 * numberOfRecords) + id1ExpectedSum; // 200*n + id1Sum - for (JsonNode record : finalTableResults) { - id1sum += record.get("id1").asInt(); - id2sum += record.get("id2").asInt(); - } - assertEquals(id1ExpectedSum, id1sum); - assertEquals(id2ExpectedSum, id2sum); - } - -} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java b/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java deleted file mode 100644 index 8467d6a65fb6..000000000000 --- a/airbyte-integrations/connectors/destination-redshift/src/test/java/io/airbyte/integrations/destination/redshift/RedshiftDestinationTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.redshift; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.airbyte.commons.jackson.MoreMappers; -import io.airbyte.integrations.destination.redshift.RedshiftDestination.DestinationType; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -@DisplayName("RedshiftDestination") -public class RedshiftDestinationTest { - - private static final ObjectMapper mapper = MoreMappers.initMapper(); - - @Test - @DisplayName("When not given S3 credentials should use INSERT") - public void useStandardInsert() { - final var standardInsertConfigStub = mapper.createObjectNode(); - standardInsertConfigStub.put("method", "Standard"); - final var uploadingMethodStub = mapper.createObjectNode(); - uploadingMethodStub.set("uploading_method", standardInsertConfigStub); - assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(uploadingMethodStub)); - } - - @Test - @DisplayName("When given standard backward compatibility test") - public void useStandardInsertBackwardCompatibility() { - final var standardInsertConfigStub = mapper.createObjectNode(); - assertEquals(DestinationType.STANDARD, RedshiftDestination.determineUploadMode(standardInsertConfigStub)); - } - - @Test - @DisplayName("When given S3 credentials should use COPY") - public void useS3Staging() { - final var s3StagingStub = mapper.createObjectNode(); - final var uploadingMethodStub = mapper.createObjectNode(); - s3StagingStub.put("s3_bucket_name", "fake-bucket"); - s3StagingStub.put("s3_bucket_region", "fake-region"); - s3StagingStub.put("access_key_id", "test"); - s3StagingStub.put("secret_access_key", "test key"); - s3StagingStub.put("method", "S3 Staging"); - uploadingMethodStub.set("uploading_method", s3StagingStub); - assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(uploadingMethodStub)); - } - - @Test - @DisplayName("When given S3 backward compatibility test") - public void useS3StagingBackwardCompatibility() { - final var s3StagingStub = mapper.createObjectNode(); - s3StagingStub.put("s3_bucket_name", "fake-bucket"); - s3StagingStub.put("s3_bucket_region", "fake-region"); - s3StagingStub.put("access_key_id", "test"); - s3StagingStub.put("secret_access_key", "test key"); - assertEquals(DestinationType.COPY_S3, RedshiftDestination.determineUploadMode(s3StagingStub)); - } - -} diff --git a/docs/integrations/destinations/redshift-migrations.md b/docs/integrations/destinations/redshift-migrations.md index 7cd43c08cb65..046b666e9775 100644 --- a/docs/integrations/destinations/redshift-migrations.md +++ b/docs/integrations/destinations/redshift-migrations.md @@ -1,14 +1,26 @@ # Redshift Migration Guide +## Upgrading to 3.0.0 + +This version removes support for standard inserts. Although this loading method is easier to set up than S3 staging, it has two major disadvantages: + +* Standard inserts is significantly slower +* Standard inserts is significantly more expensive + +[Redshift's documentation](https://docs.aws.amazon.com/redshift/latest/dg/r_INSERT_30.html#r_INSERT_30_usage_notes) states: +> We strongly encourage you to use the COPY command to load large amounts of data. Using individual INSERT statements to populate a table might be prohibitively slow. + +See our [Redshift docs](https://docs.airbyte.com/integrations/destinations/redshift#for-copy-strategy) for more information on how to set up S3 staging. + ## Upgrading to 2.0.0 This version introduces [Destinations V2](/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2), which provides better error handling, incremental delivery of data for large syncs, and improved final table structures. To review the breaking changes, and how to upgrade, see [here](/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading). These changes will likely require updates to downstream dbt / SQL models, which we walk through [here](/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations). Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. You can manually sync existing connections prior to the next scheduled sync to start the upgrade early. Worthy of specific mention, this version includes: -- Per-record error handling -- Clearer table structure -- Removal of sub-tables for nested properties -- Removal of SCD tables +* Per-record error handling +* Clearer table structure +* Removal of sub-tables for nested properties +* Removal of SCD tables Learn more about what's new in Destinations V2 [here](/using-airbyte/core-concepts/typing-deduping). diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index bfcc29fbf941..a03d69fb3c5c 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -15,7 +15,7 @@ This Redshift destination connector has two replication strategies: [here](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-install.html). **Not recommended for production workloads as this does not scale well**. -For INSERT strategy: +### For INSERT strategy: - **Host** - **Port** @@ -35,7 +35,7 @@ For INSERT strategy: Airbyte automatically picks an approach depending on the given configuration - if S3 configuration is present, Airbyte will use the COPY strategy and vice versa. -For COPY strategy: +### For COPY strategy: - **S3 Bucket Name** - See [this](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) to @@ -244,6 +244,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.0.0 | 2024-06-04 | [38886](https://github.com/airbytehq/airbyte/pull/38886) | Remove standard inserts mode | | 2.6.4 | 2024-05-31 | [38825](https://github.com/airbytehq/airbyte/pull/38825) | Adopt CDK 0.35.15 | | 2.6.3 | 2024-05-31 | [38803](https://github.com/airbytehq/airbyte/pull/38803) | Source auto-conversion to Kotlin | | 2.6.2 | 2024-05-14 | [38189](https://github.com/airbytehq/airbyte/pull/38189) | adding an option to DROP CASCADE on resets |