Skip to content

Commit

Permalink
new interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jun 7, 2024
1 parent f7a341e commit c8dfb6c
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,41 @@ import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag.getRawNamespaceOve
import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator
import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryption
import io.airbyte.cdk.integrations.destination.s3.AesCbcEnvelopeEncryptionBlobDecorator
import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig
import io.airbyte.cdk.integrations.destination.s3.EncryptionConfig.Companion.fromJson
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.NoEncryption
import io.airbyte.cdk.integrations.destination.s3.S3BaseChecks.attemptS3WriteAndDelete
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory.Companion.builder
import io.airbyte.cdk.integrations.destination.staging.operation.StagingStreamOperations
import io.airbyte.commons.exceptions.ConnectionErrorException
import io.airbyte.commons.json.Jsons.deserialize
import io.airbyte.commons.json.Jsons.emptyObject
import io.airbyte.commons.json.Jsons.jsonNode
import io.airbyte.commons.resources.MoreResources.readResource
import io.airbyte.integrations.base.destination.operation.DefaultFlush
import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
import io.airbyte.integrations.base.destination.typing_deduping.DefaultTyperDeduper
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.destination.redshift.constants.RedshiftDestinationConstants
import io.airbyte.integrations.destination.redshift.operation.RedshiftStagingStorageOperation
import io.airbyte.integrations.destination.redshift.operations.RedshiftS3StagingSqlOperations
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDV2Migration
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator
Expand All @@ -66,6 +71,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConnectorSpecification
import java.time.Duration
import java.util.Optional
import java.util.concurrent.Executors
import java.util.function.Consumer
import javax.sql.DataSource
import org.apache.commons.lang3.NotImplementedException
Expand Down Expand Up @@ -236,8 +242,14 @@ class RedshiftStagingS3Destination :
sqlGenerator: SqlGenerator,
destinationHandler: DestinationHandler<RedshiftState>
): List<Migration<RedshiftState>> {
return listOf<Migration<RedshiftState>>(
RedshiftRawTableAirbyteMetaMigration(database, databaseName)
return listOf(
RedshiftDV2Migration(
namingResolver,
database,
databaseName,
sqlGenerator as RedshiftSqlGenerator,
),
RedshiftRawTableAirbyteMetaMigration(database, databaseName),
)
}

Expand Down Expand Up @@ -285,7 +297,6 @@ class RedshiftStagingS3Destination :

val sqlGenerator = RedshiftSqlGenerator(namingResolver, config)
val parsedCatalog: ParsedCatalog
val typerDeduper: TyperDeduper
val database = getDatabase(getDataSource(config))
val databaseName = config[JdbcUtils.DATABASE_KEY].asText()
val catalogParser: CatalogParser
Expand All @@ -300,54 +311,59 @@ class RedshiftStagingS3Destination :
val redshiftDestinationHandler =
RedshiftDestinationHandler(databaseName, database, rawNamespace)
parsedCatalog = catalogParser.parseCatalog(catalog)
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
val v2TableMigrator = NoopV2TableMigrator()
val disableTypeDedupe =
config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
val redshiftMigrations: List<Migration<RedshiftState>> =
getMigrations(database, databaseName, sqlGenerator, redshiftDestinationHandler)
typerDeduper =
if (disableTypeDedupe) {
NoOpTyperDeduperWithV1V2Migrations(
sqlGenerator,
redshiftDestinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
redshiftMigrations
)
} else {
DefaultTyperDeduper(
sqlGenerator,
redshiftDestinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
redshiftMigrations
)
}

return builder(
outputRecordCollector,
database,
RedshiftS3StagingSqlOperations(
namingResolver,
s3Config.getS3Client(),
s3Config,
encryptionConfig
),
namingResolver,
config,
catalog,
isPurgeStagingData(s3Options),
typerDeduper,
parsedCatalog,
defaultNamespace,
JavaBaseConstants.DestinationColumns.V2_WITH_META
val s3StorageOperations =
S3StorageOperations(namingResolver, s3Config.getS3Client(), s3Config)
val keyEncryptingKey: ByteArray?
if (encryptionConfig is AesCbcEnvelopeEncryption) {
s3StorageOperations.addBlobDecorator(
AesCbcEnvelopeEncryptionBlobDecorator(encryptionConfig.key)
)
.setDataTransformer(getDataTransformer(parsedCatalog, defaultNamespace))
.build()
.createAsync()
keyEncryptingKey = encryptionConfig.key
} else {
keyEncryptingKey = null
}

val redshiftStagingStorageOperation = RedshiftStagingStorageOperation(
s3Config,
isPurgeStagingData(s3Options),
s3StorageOperations,
sqlGenerator,
redshiftDestinationHandler,
keyEncryptingKey,
)
val syncOperation = DefaultSyncOperation(
parsedCatalog,
redshiftDestinationHandler,
defaultNamespace,
{ initialStatus, disableTD ->
StagingStreamOperations(
redshiftStagingStorageOperation,
initialStatus,
FileUploadFormat.CSV,
JavaBaseConstants.DestinationColumns.V2_WITH_META,
disableTD
)
},
redshiftMigrations,
disableTypeDedupe,
)
return AsyncStreamConsumer(
outputRecordCollector,
onStart = {},
onClose = {_, streamSyncSummaries -> syncOperation.finalizeStreams(streamSyncSummaries)},
onFlush = DefaultFlush(OPTIMAL_FLUSH_BATCH_SIZE, syncOperation),
catalog,
BufferManager(bufferMemoryLimit),
Optional.ofNullable(defaultNamespace),
FlushFailure(),
Executors.newFixedThreadPool(5),
AirbyteMessageDeserializer(getDataTransformer(parsedCatalog, defaultNamespace)),
)
}

private fun isPurgeStagingData(config: JsonNode?): Boolean {
Expand All @@ -367,6 +383,9 @@ class RedshiftStagingS3Destination :
"com.amazon.redshift.ssl.NonValidatingFactory"
)

private const val OPTIMAL_FLUSH_BATCH_SIZE: Long = 50 * 1024 * 1024
private val bufferMemoryLimit: Long = (Runtime.getRuntime().maxMemory() * 0.5).toLong()

private fun sshWrappedDestination(): Destination {
return SshWrappedDestination(
RedshiftStagingS3Destination(),
Expand Down
Loading

0 comments on commit c8dfb6c

Please sign in to comment.