From 9697fa7161a4a19940afa27f4e746ca38f87b2ae Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Mon, 22 Jul 2024 11:16:17 -0700 Subject: [PATCH] cdk-java: remove (Serial)StagingConsumerFactory (#41950) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ## How ## Review guide ## User Impact ## Can this PR be safely reverted and rolled back? - [ ] YES 💚 - [ ] NO ❌ --- .../destination/staging/SerialFlush.kt | 6 +- .../staging/SerialStagingConsumerFactory.kt | 181 ---------- .../staging/StagingConsumerFactory.kt | 320 ------------------ 3 files changed, 5 insertions(+), 502 deletions(-) delete mode 100644 airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt delete mode 100644 airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt index fda2c17bb259..5513f9f0f00c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt @@ -13,6 +13,9 @@ import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.* +import kotlin.collections.HashMap +import kotlin.collections.HashSet import org.apache.commons.io.FileUtils private val log = KotlinLogging.logger {} @@ -26,6 +29,7 @@ private val log = KotlinLogging.logger {} * writing, we avoid doing so to simplify the migration to async flushing. */ object SerialFlush { + val RANDOM_CONNECTION_ID: UUID = UUID.randomUUID() /** * Logic handling how destinations with staging areas (aka bucket storages) will flush their * buffer @@ -88,7 +92,7 @@ object SerialFlush { val stageName = stagingOperations.getStageName(schemaName, writeConfig.outputTableName) val stagingPath = stagingOperations.getStagingPath( - SerialStagingConsumerFactory.Companion.RANDOM_CONNECTION_ID, + RANDOM_CONNECTION_ID, schemaName, writeConfig.streamName, writeConfig.outputTableName, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt deleted file mode 100644 index 1bc80893274f..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialStagingConsumerFactory.kt +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.cdk.integrations.destination.staging - -import com.fasterxml.jackson.databind.JsonNode -import com.google.common.base.Preconditions -import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer -import io.airbyte.cdk.integrations.destination.NamingConventionTransformer -import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer -import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig -import io.airbyte.cdk.integrations.destination.record_buffer.BufferCreateFunction -import io.airbyte.cdk.integrations.destination.record_buffer.SerializedBufferingStrategy -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper -import io.airbyte.protocol.models.v0.AirbyteMessage -import io.airbyte.protocol.models.v0.AirbyteStream -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream -import io.github.oshai.kotlinlogging.KotlinLogging -import java.time.Instant -import java.util.UUID -import java.util.function.Consumer -import java.util.function.Function - -private val LOGGER = KotlinLogging.logger {} -/** - * Uses both Factory and Consumer design pattern to create a single point of creation for consuming - * [AirbyteMessage] for processing - */ -open class SerialStagingConsumerFactory { - fun create( - outputRecordCollector: Consumer, - database: JdbcDatabase, - stagingOperations: StagingOperations, - namingResolver: NamingConventionTransformer, - onCreateBuffer: BufferCreateFunction, - config: JsonNode, - catalog: ConfiguredAirbyteCatalog, - purgeStagingData: Boolean, - typerDeduper: TyperDeduper, - parsedCatalog: ParsedCatalog, - defaultNamespace: String?, - useDestinationsV2Columns: Boolean - ): AirbyteMessageConsumer { - val writeConfigs = - createWriteConfigs( - namingResolver, - config, - catalog, - parsedCatalog, - useDestinationsV2Columns - ) - return BufferedStreamConsumer( - outputRecordCollector, - GeneralStagingFunctions.onStartFunction( - database, - stagingOperations, - writeConfigs, - typerDeduper - ), - SerializedBufferingStrategy( - onCreateBuffer, - catalog, - SerialFlush.function( - database, - stagingOperations, - writeConfigs, - catalog, - ) - ), - GeneralStagingFunctions.onCloseFunction( - database, - stagingOperations, - writeConfigs, - purgeStagingData, - typerDeduper - ), - catalog, - { data: JsonNode? -> stagingOperations.isValidData(data) }, - defaultNamespace - ) - } - - companion object { - - // using a random string here as a placeholder for the moment. - // This would avoid mixing data in the staging area between different syncs (especially if - // they - // manipulate streams with similar names) - // if we replaced the random connection id by the actual connection_id, we'd gain the - // opportunity to - // leverage data that was uploaded to stage - // in a previous attempt but failed to load to the warehouse for some reason (interrupted?) - // instead. - // This would also allow other programs/scripts - // to load (or reload backups?) in the connection's staging area to be loaded at the next - // sync. - private val SYNC_DATETIME: Instant = Instant.now() - val RANDOM_CONNECTION_ID: UUID = UUID.randomUUID() - - /** - * Creates a list of all [WriteConfig] for each stream within a [ConfiguredAirbyteCatalog]. - * Each write config represents the configuration settings for writing to a destination - * connector - * - * @param namingResolver [NamingConventionTransformer] used to transform names that are - * acceptable by each destination connector - * @param config destination connector configuration parameters - * @param catalog [ConfiguredAirbyteCatalog] collection of configured - * [ConfiguredAirbyteStream] - * @return list of all write configs for each stream in a [ConfiguredAirbyteCatalog] - */ - private fun createWriteConfigs( - namingResolver: NamingConventionTransformer, - config: JsonNode, - catalog: ConfiguredAirbyteCatalog, - parsedCatalog: ParsedCatalog, - useDestinationsV2Columns: Boolean - ): List { - return catalog.streams.map { - toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns) - .apply(it) - } - } - - private fun toWriteConfig( - namingResolver: NamingConventionTransformer, - config: JsonNode, - parsedCatalog: ParsedCatalog, - useDestinationsV2Columns: Boolean - ): Function { - return Function { stream: ConfiguredAirbyteStream -> - Preconditions.checkNotNull( - stream.destinationSyncMode, - "Undefined destination sync mode" - ) - val abStream = stream.stream - val streamName = abStream.name - - val outputSchema: String - val tableName: String? - if (useDestinationsV2Columns) { - val streamId = parsedCatalog.getStream(abStream.namespace, streamName).id - outputSchema = streamId.rawNamespace - tableName = streamId.rawName - } else { - outputSchema = - getOutputSchema(abStream, config["schema"].asText(), namingResolver) - tableName = @Suppress("deprecation") namingResolver.getRawTableName(streamName) - } - val tmpTableName = - @Suppress("deprecation") namingResolver.getTmpTableName(streamName) - val syncMode = stream.destinationSyncMode - - val writeConfig = - WriteConfig( - streamName, - abStream.namespace, - outputSchema, - tmpTableName, - tableName, - syncMode, - SYNC_DATETIME - ) - LOGGER.info { "Write config: $writeConfig" } - writeConfig - } - } - - private fun getOutputSchema( - stream: AirbyteStream, - defaultDestSchema: String, - namingResolver: NamingConventionTransformer - ): String { - return if (stream.namespace != null) namingResolver.getNamespace(stream.namespace) - else namingResolver.getNamespace(defaultDestSchema) - } - } -} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt deleted file mode 100644 index d39482253134..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.cdk.integrations.destination.staging - -import com.fasterxml.jackson.databind.JsonNode -import com.google.common.base.Preconditions -import io.airbyte.cdk.db.jdbc.JdbcDatabase -import io.airbyte.cdk.integrations.base.JavaBaseConstants -import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer -import io.airbyte.cdk.integrations.destination.NamingConventionTransformer -import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer -import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager -import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer -import io.airbyte.cdk.integrations.destination.async.deser.IdentityDataTransformer -import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer -import io.airbyte.cdk.integrations.destination.async.state.FlushFailure -import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig -import io.airbyte.commons.exceptions.ConfigErrorException -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper -import io.airbyte.protocol.models.v0.* -import io.airbyte.protocol.models.v0.AirbyteMessage -import io.airbyte.protocol.models.v0.AirbyteStream -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream -import io.airbyte.protocol.models.v0.StreamDescriptor -import io.github.oshai.kotlinlogging.KotlinLogging -import java.time.Instant -import java.util.Optional -import java.util.concurrent.Executors -import java.util.function.Consumer -import java.util.function.Function - -private val LOGGER = KotlinLogging.logger {} -/** - * Uses both Factory and Consumer design pattern to create a single point of creation for consuming - * [AirbyteMessage] for processing - */ -class StagingConsumerFactory -private constructor( - private val outputRecordCollector: Consumer?, - private val database: JdbcDatabase?, - private val stagingOperations: StagingOperations?, - private val namingResolver: NamingConventionTransformer?, - private val config: JsonNode?, - private val catalog: ConfiguredAirbyteCatalog?, - private val purgeStagingData: Boolean, - private val typerDeduper: TyperDeduper?, - private val parsedCatalog: ParsedCatalog?, - private val defaultNamespace: String, - private val destinationColumns: JavaBaseConstants.DestinationColumns, - // Optional fields - private val bufferMemoryLimit: Optional, - private val optimalBatchSizeBytes: Long, - private val dataTransformer: StreamAwareDataTransformer -) : SerialStagingConsumerFactory() { - class Builder { - // Required (?) fields - // (TODO which of these are _actually_ required, and which have we just coincidentally - // always - // provided?) - var outputRecordCollector: Consumer? = null - var database: JdbcDatabase? = null - var stagingOperations: StagingOperations? = null - var namingResolver: NamingConventionTransformer? = null - var config: JsonNode? = null - var catalog: ConfiguredAirbyteCatalog? = null - var purgeStagingData: Boolean = false - var typerDeduper: TyperDeduper? = null - var parsedCatalog: ParsedCatalog? = null - var defaultNamespace: String? = null - var destinationColumns: JavaBaseConstants.DestinationColumns = - JavaBaseConstants.DestinationColumns.LEGACY - - // Optional fields - private var bufferMemoryLimit = Optional.empty() - private var optimalBatchSizeBytes = (50 * 1024 * 1024).toLong() - - private var dataTransformer: StreamAwareDataTransformer? = null - - fun setBufferMemoryLimit(bufferMemoryLimit: Optional): Builder { - this.bufferMemoryLimit = bufferMemoryLimit - return this - } - - fun setOptimalBatchSizeBytes(optimalBatchSizeBytes: Long): Builder { - this.optimalBatchSizeBytes = optimalBatchSizeBytes - return this - } - - fun setDataTransformer(dataTransformer: StreamAwareDataTransformer?): Builder { - this.dataTransformer = dataTransformer - return this - } - - fun build(): StagingConsumerFactory { - return StagingConsumerFactory( - outputRecordCollector, - database, - stagingOperations, - namingResolver, - config, - catalog, - purgeStagingData, - typerDeduper, - parsedCatalog, - // If we don't set a default namespace, throw. This is required for staging - // destinations. - defaultNamespace!!, - destinationColumns, - bufferMemoryLimit, - optimalBatchSizeBytes, - (if (dataTransformer != null) dataTransformer else IdentityDataTransformer())!! - ) - } - } - - fun createAsync(): SerializedAirbyteMessageConsumer { - val typerDeduper = this.typerDeduper!! - val stagingOperations = this.stagingOperations!! - - val writeConfigs: List = - createWriteConfigs(namingResolver, config, catalog, parsedCatalog, destinationColumns) - val streamDescToWriteConfig: Map = - streamDescToWriteConfig(writeConfigs) - val flusher = - AsyncFlush( - streamDescToWriteConfig, - stagingOperations, - database, - catalog, - optimalBatchSizeBytes, - destinationColumns - ) - return AsyncStreamConsumer( - outputRecordCollector!!, - GeneralStagingFunctions.onStartFunction( - database!!, - stagingOperations, - writeConfigs, - typerDeduper - ), - GeneralStagingFunctions.onCloseFunction( - database, - stagingOperations, - writeConfigs, - purgeStagingData, - typerDeduper - ), - flusher, - catalog!!, - BufferManager(defaultNamespace, getMemoryLimit(bufferMemoryLimit)), - FlushFailure(), - Executors.newFixedThreadPool(5), - AirbyteMessageDeserializer(dataTransformer), - ) - } - - companion object { - - private val SYNC_DATETIME: Instant = Instant.now() - - @JvmStatic - fun builder( - outputRecordCollector: Consumer, - database: JdbcDatabase?, - stagingOperations: StagingOperations, - namingResolver: NamingConventionTransformer?, - config: JsonNode?, - catalog: ConfiguredAirbyteCatalog, - purgeStagingData: Boolean, - typerDeduper: TyperDeduper, - parsedCatalog: ParsedCatalog?, - defaultNamespace: String?, - destinationColumns: JavaBaseConstants.DestinationColumns - ): Builder { - val builder = Builder() - builder.outputRecordCollector = outputRecordCollector - builder.database = database - builder.stagingOperations = stagingOperations - builder.namingResolver = namingResolver - builder.config = config - builder.catalog = catalog - builder.purgeStagingData = purgeStagingData - builder.typerDeduper = typerDeduper - builder.parsedCatalog = parsedCatalog - builder.defaultNamespace = defaultNamespace - builder.destinationColumns = destinationColumns - return builder - } - - private fun getMemoryLimit(bufferMemoryLimit: Optional): Long { - return bufferMemoryLimit.orElse( - (Runtime.getRuntime().maxMemory() * BufferManager.MEMORY_LIMIT_RATIO).toLong() - ) - } - - private fun streamDescToWriteConfig( - writeConfigs: List - ): Map { - val conflictingStreams: MutableSet = HashSet() - val streamDescToWriteConfig: MutableMap = - HashMap() - for (config in writeConfigs) { - val streamIdentifier = toStreamDescriptor(config) - if (streamDescToWriteConfig.containsKey(streamIdentifier)) { - conflictingStreams.add(config) - val existingConfig: WriteConfig = - streamDescToWriteConfig.getValue(streamIdentifier) - // The first conflicting stream won't have any problems, so we need to - // explicitly add it here. - conflictingStreams.add(existingConfig) - } else { - streamDescToWriteConfig[streamIdentifier] = config - } - } - if (conflictingStreams.isNotEmpty()) { - var affectedStreamsAsString = - conflictingStreams.joinToString(", ") { config: WriteConfig -> - config.namespace + "." + config.streamName - } - val message = - "You are trying to write multiple streams to the same table. Consider switching to a custom namespace format using " + - "\${SOURCE_NAMESPACE}, or moving one of them into a separate connection with a different stream prefix. " + - "Affected streams: $affectedStreamsAsString" - throw ConfigErrorException(message) - } - return streamDescToWriteConfig - } - - private fun toStreamDescriptor(config: WriteConfig): StreamDescriptor { - return StreamDescriptor().withName(config.streamName).withNamespace(config.namespace) - } - - /** - * Creates a list of all [WriteConfig] for each stream within a [ConfiguredAirbyteCatalog]. - * Each write config represents the configuration settings for writing to a destination - * connector - * - * @param namingResolver [NamingConventionTransformer] used to transform names that are - * acceptable by each destination connector - * @param config destination connector configuration parameters - * @param catalog [ConfiguredAirbyteCatalog] collection of configured - * [ConfiguredAirbyteStream] - * @return list of all write configs for each stream in a [ConfiguredAirbyteCatalog] - */ - private fun createWriteConfigs( - namingResolver: NamingConventionTransformer?, - config: JsonNode?, - catalog: ConfiguredAirbyteCatalog?, - parsedCatalog: ParsedCatalog?, - destinationColumns: JavaBaseConstants.DestinationColumns - ): List { - return catalog!!.streams.map { - toWriteConfig(namingResolver, config, parsedCatalog, destinationColumns).apply(it) - } - } - - private fun toWriteConfig( - namingResolver: NamingConventionTransformer?, - config: JsonNode?, - parsedCatalog: ParsedCatalog?, - destinationColumns: JavaBaseConstants.DestinationColumns - ): Function { - return Function { stream: ConfiguredAirbyteStream - -> - Preconditions.checkNotNull( - stream.destinationSyncMode, - "Undefined destination sync mode" - ) - val abStream = stream.stream - val streamName = abStream.name - - val outputSchema: String - val tableName: String - when (destinationColumns) { - JavaBaseConstants.DestinationColumns.V2_WITH_META, - JavaBaseConstants.DestinationColumns.V2_WITHOUT_META, - JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> { - val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id - outputSchema = streamId.rawNamespace - tableName = streamId.rawName - } - JavaBaseConstants.DestinationColumns.LEGACY -> { - outputSchema = - getOutputSchema(abStream, config!!["schema"].asText(), namingResolver) - tableName = - @Suppress("deprecation") namingResolver!!.getRawTableName(streamName) - } - } - val tmpTableName = - @Suppress("deprecation") namingResolver!!.getTmpTableName(streamName) - val syncMode = stream.destinationSyncMode - - val writeConfig = - WriteConfig( - streamName, - abStream.namespace, - outputSchema, - tmpTableName, - tableName, - syncMode, - SYNC_DATETIME - ) - LOGGER.info { "Write config: $writeConfig" } - writeConfig - } - } - - private fun getOutputSchema( - stream: AirbyteStream, - defaultDestSchema: String, - namingResolver: NamingConventionTransformer? - ): String { - return if (stream.namespace != null) namingResolver!!.getNamespace(stream.namespace) - else namingResolver!!.getNamespace(defaultDestSchema) - } - } -}