From cb6f6ec40a5fb6e63d730a0e1c6e80958ccc87d9 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 17 Jun 2024 08:59:49 -0700 Subject: [PATCH] Destinations CDK: refreshes logic (#38622) --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../src/main/resources/version.properties | 2 +- .../typing_deduping/JdbcDestinationHandler.kt | 4 + .../operation/StagingStreamOperations.kt | 8 +- .../operation/AbstractStreamOperation.kt | 270 ++++++++++--- .../operation/StandardStreamOperation.kt | 8 +- .../destination/operation/StorageOperation.kt | 36 +- .../DestinationInitialStatus.kt | 6 + .../typing_deduping/InitialRawTableStatus.kt | 12 + .../destination/typing_deduping/StreamId.kt | 5 +- .../operation/AbstractStreamOperationTest.kt | 370 +++++++++++++++--- .../operation/DefaultSyncOperationTest.kt | 2 + .../DefaultTyperDeduperTest.kt | 102 ++++- .../BaseSqlGeneratorIntegrationTest.kt | 18 +- .../typing_deduping/BaseTypingDedupingTest.kt | 67 +++- 15 files changed, 746 insertions(+), 165 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 7f4bd2f281af..147a25593205 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | | :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation | | 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) | | 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version | | 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 04e72f5f2523..3318d6b6867b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.39.0 +version=0.40.0 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 32a02483c1ad..f30c69b7f0d5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -325,6 +325,10 @@ abstract class JdbcDestinationHandler( streamConfig, finalTableDefinition.isPresent, initialRawTableState, + // TODO fix this + // for now, no JDBC destinations actually do refreshes + // so this is just to make our code compile + InitialRawTableStatus(false, false, Optional.empty()), isSchemaMismatch, isFinalTableEmpty, destinationState diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt index cc744efc074a..7806d8a54c36 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt @@ -33,7 +33,11 @@ class StagingStreamOperations( ) { private val log = KotlinLogging.logger {} - override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { + override fun writeRecordsImpl( + streamConfig: StreamConfig, + suffix: String, + stream: Stream + ) { val writeBuffer = StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns) @@ -51,7 +55,7 @@ class StagingStreamOperations( "Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging" } if (it.byteCount != 0L) { - storageOperation.writeToStage(streamConfig, writeBuffer) + storageOperation.writeToStage(streamConfig, suffix, writeBuffer) } else { log.info { "Skipping writing to storage since there are no bytes to write" } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt index d3533a194bfe..0992bba6ebfd 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt @@ -11,8 +11,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableS import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus -import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging +import java.time.Instant import java.util.Optional import java.util.stream.Stream @@ -24,9 +24,11 @@ abstract class AbstractStreamOperation false + stream.generationId -> true + else -> { + // This is technically already handled in CatalogParser. + throw IllegalArgumentException("Hybrid refreshes are not yet supported.") + } + } + + if (isTruncateSync) { + prepareStageForTruncate(destinationInitialStatus, stream) + rawTableSuffix = TMP_TABLE_SUFFIX + initialRawTableStatus = null + } else { + rawTableSuffix = NO_SUFFIX + initialRawTableStatus = prepareStageForNormalSync(stream, destinationInitialStatus) + } + if (!disableTypeDedupe) { // Prepare final tables based on sync mode. finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus) @@ -47,9 +68,103 @@ abstract class AbstractStreamOperation + ): InitialRawTableStatus { + log.info { + "${stream.id.originalNamespace}.${stream.id.originalName}: non-truncate sync. Creating raw table if not exists." + } + storageOperation.prepareStage(stream.id, NO_SUFFIX) + if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) { + log.info { + "${stream.id.originalNamespace}.${stream.id.originalName}: non-truncate sync, but temp raw table exists. Transferring it to real raw table." + } + // There was a previous truncate refresh attempt, which failed, and left some + // records behind. + // Retrieve those records and put them in the real stage. + // This is necessary to avoid certain data loss scenarios. + // (specifically: a user initiates a truncate sync, which fails, but emits some records. + // It also emits a state message for "resumable" full refresh. + // The user then initiates an incremental sync, which runs using that state. + // In this case, we MUST retain the records from the truncate attempt.) + storageOperation.transferFromTempStage(stream.id, TMP_TABLE_SUFFIX) + + // We need to combine the raw table statuses from the real and temp raw tables. + val hasUnprocessedRecords = + destinationInitialStatus.initialTempRawTableStatus.hasUnprocessedRecords || + destinationInitialStatus.initialRawTableStatus.hasUnprocessedRecords + // Pick the earlier min timestamp. + val maxProcessedTimestamp: Optional = + destinationInitialStatus.initialRawTableStatus.maxProcessedTimestamp + .flatMap { realRawTableTimestamp -> + destinationInitialStatus.initialTempRawTableStatus.maxProcessedTimestamp + .flatMap { tempRawTableTimestamp -> + if (realRawTableTimestamp.isBefore(tempRawTableTimestamp)) { + Optional.of(realRawTableTimestamp) + } else { + Optional.of(tempRawTableTimestamp) + } + } + .or { Optional.of(realRawTableTimestamp) } + } + .or { destinationInitialStatus.initialTempRawTableStatus.maxProcessedTimestamp } + log.info { + "${stream.id.originalNamespace}.${stream.id.originalName}: After record transfer, initial raw table status is $initialRawTableStatus." + } + return InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = hasUnprocessedRecords, + maxProcessedTimestamp = maxProcessedTimestamp, + ) + } else { + log.info { + "${stream.id.originalNamespace}.${stream.id.originalName}: non-truncate sync and no temp raw table. Initial raw table status is $initialRawTableStatus." + } + return destinationInitialStatus.initialRawTableStatus + } + } + + private fun prepareStageForTruncate( + destinationInitialStatus: DestinationInitialStatus, + stream: StreamConfig + ) { + if (destinationInitialStatus.initialTempRawTableStatus.rawTableExists) { + val tempStageGeneration = + storageOperation.getStageGeneration(stream.id, TMP_TABLE_SUFFIX) + if (tempStageGeneration == null || tempStageGeneration == stream.generationId) { + log.info { + "${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and existing temp raw table belongs to generation $tempStageGeneration (== current generation ${stream.generationId}). Retaining it." + } + // The temp table is from the correct generation. Set up any other resources + // (staging file, etc.), but leave the table untouched. + storageOperation.prepareStage( + stream.id, + TMP_TABLE_SUFFIX, + ) + } else { + log.info { + "${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and existing temp raw table belongs to generation $tempStageGeneration (!= current generation ${stream.generationId}). Truncating it." + } + // The temp stage is from the wrong generation. Nuke it. + storageOperation.prepareStage( + stream.id, + TMP_TABLE_SUFFIX, + replace = true, + ) + } + // (if the existing temp stage is from the correct generation, then we're resuming + // a truncate refresh, and should keep the previous temp stage). + } else { + log.info { + "${stream.id.originalNamespace}.${stream.id.originalName}: truncate sync, and no preexisting temp raw table. Creating it." + } + // We're initiating a new truncate refresh. Create a new temp stage. + storageOperation.prepareStage( + stream.id, + TMP_TABLE_SUFFIX, + ) + } } private fun prepareFinalTable( @@ -68,27 +183,19 @@ abstract class AbstractStreamOperation return prepareFinalTableForOverwrite(initialStatus) - DestinationSyncMode.APPEND, - DestinationSyncMode.APPEND_DEDUP -> { - if ( - initialStatus.isSchemaMismatch || - initialStatus.destinationState.needsSoftReset() - ) { - // We're loading data directly into the existing table. - // Make sure it has the right schema. - // Also, if a raw table migration wants us to do a soft reset, do that - // here. - log.info { "Executing soft-reset on final table of stream $stream" } - storageOperation.softResetFinalTable(stream) - } - return NO_SUFFIX + if (isTruncateSync) { + // Truncate refresh. Use a temp final table. + return prepareFinalTableForOverwrite(initialStatus) + } else { + if (initialStatus.isSchemaMismatch || initialStatus.destinationState.needsSoftReset()) { + // We're loading data directly into the existing table. + // Make sure it has the right schema. + // Also, if a raw table migration wants us to do a soft reset, do that + // here. + log.info { "Executing soft-reset on final table of stream $stream" } + storageOperation.softResetFinalTable(stream) } + return NO_SUFFIX } } @@ -98,14 +205,13 @@ abstract class AbstractStreamOperation) { + // redirect to the appropriate raw table (potentially the temp raw table). + writeRecordsImpl( + streamConfig, + // TODO it's a little annoying to have to remember which suffix goes here. + // conceptually it's simple (writing records == writing to stage => raw table suffix) + // but still an easy mistake to make. + // Maybe worth defining `data class StageName(suffix: String)` + // and `data class FinalName(suffix: String)`? + // ... and separating those out from StreamId + rawTableSuffix, + stream, + ) + } + /** Write records will be destination type specific, Insert vs staging based on format */ - abstract override fun writeRecords( + abstract fun writeRecordsImpl( streamConfig: StreamConfig, + suffix: String, stream: Stream ) override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) { // Delete staging directory, implementation will handle if it has to do it or not or a No-OP storageOperation.cleanupStage(streamConfig.id) + + val streamSuccessful = syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE + // Overwrite the raw table before doing anything else. + // This ensures that if T+D fails, we can easily retain the records on the next sync. + // It also means we don't need to run T+D using the temp raw table, + // which is possible (`typeAndDedupe(streamConfig.id.copy(rawName = streamConfig.id.rawName + // + suffix))` + // but annoying and confusing. + if (isTruncateSync && streamSuccessful) { + log.info { + "Overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync and we received a stream success message." + } + storageOperation.overwriteStage(streamConfig.id, rawTableSuffix) + } else { + log.info { + "Not overwriting raw table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful" + } + } + if (disableTypeDedupe) { log.info { "Typing and deduping disabled, skipping final table finalization. " + @@ -131,51 +272,54 @@ abstract class AbstractStreamOperation 0 || - (isNotOverwriteSync && initialRawTableStatus.hasUnprocessedRecords) - hasRecordsNeedingTd - } else { - false - } - if (!shouldRunTypingDeduping) { + // Normal syncs should T+D regardless of status, so the user sees progress after every + // attempt. + // We know this is a normal sync, so initialRawTableStatus is nonnull. + if ( + !isTruncateSync && + syncSummary.recordsWritten == 0L && + !initialRawTableStatus!!.hasUnprocessedRecords + ) { log.info { "Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " + "because it had no records during this sync and no unprocessed records from a previous sync." } + } else if (isTruncateSync && (!streamSuccessful || syncSummary.recordsWritten == 0L)) { + // But truncate syncs should only T+D if the sync was successful, since we're T+Ding + // into a temp final table anyway. And we only need to check if _this_ sync emitted + // records, since we've nuked the old raw data. + log.info { + "Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Stream success: $streamSuccessful; records written: ${syncSummary.recordsWritten}" + } } else { - // In overwrite mode, we want to read all the raw records. Typically, this is equivalent + // In truncate mode, we want to read all the raw records. Typically, this is equivalent // to filtering on timestamp, but might as well be explicit. val timestampFilter = - if (isNotOverwriteSync) { - initialRawTableStatus.maxProcessedTimestamp + if (!isTruncateSync) { + initialRawTableStatus!!.maxProcessedTimestamp } else { Optional.empty() } storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix) } - if ( - streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE && - finalTmpTableSuffix.isNotBlank() - // We should only overwrite the final table if the stream was successful. - // This prevents data downtime if the stream didn't emit all the data. - && - syncSummary.terminalStatus == AirbyteStreamStatus.COMPLETE - ) { + // We want to run this independently of whether we ran T+D. + // E.g. it's valid for a sync to emit 0 records (e.g. the source table is legitimately + // empty), in which case we want to overwrite the final table with an empty table. + if (isTruncateSync && streamSuccessful && finalTmpTableSuffix.isNotBlank()) { + log.info { + "Overwriting final table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} because this is a truncate sync, we received a stream success message, and we are using a temp final table.." + } storageOperation.overwriteFinalTable(streamConfig, finalTmpTableSuffix) + } else { + log.info { + "Not overwriting final table for ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName}. Truncate sync: $isTruncateSync; stream success: $streamSuccessful; final table suffix not blank: ${finalTmpTableSuffix.isNotBlank()}" + } } } + + companion object { + private const val NO_SUFFIX = "" + const val TMP_TABLE_SUFFIX = "_airbyte_tmp" + } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt index 06c5b8dabab5..3ed1ed5c3feb 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt @@ -24,7 +24,11 @@ class StandardStreamOperation( destinationInitialStatus, disableTypeDedupe ) { - override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { - storageOperation.writeToStage(streamConfig, stream) + override fun writeRecordsImpl( + streamConfig: StreamConfig, + suffix: String, + stream: Stream + ) { + storageOperation.writeToStage(streamConfig, suffix, stream) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt index bae34e2b803b..827ec79decdd 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt @@ -6,7 +6,6 @@ package io.airbyte.integrations.base.destination.operation import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.StreamId -import io.airbyte.protocol.models.v0.DestinationSyncMode import java.time.Instant import java.util.Optional @@ -16,15 +15,44 @@ interface StorageOperation { */ /** - * Prepare staging area which cloud be creating any object storage, temp tables or file storage + * Prepare staging area which cloud be creating any object storage, temp tables or file storage. + * Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in + * conjunction with [overwriteStage]. + * + * @param replace If true, then replace existing resources with empty e.g. tables. If false, + * then leave existing resources untouched. */ - fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) + fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false) + + /** + * Swap the "temporary" stage into the "real" stage. For example, `DROP TABLE IF NOT EXISTS + * airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`. + */ + fun overwriteStage(streamId: StreamId, suffix: String) + + /** + * Copy all records from the temporary stage into the real stage, then drop the temporary stage. + * For example `INSERT INTO airbyte_internal.foo SELECT * FROM airbyte_internal.foo_tmp; DROP + * TABLE airbyte_internal.foo_tmp`. + */ + fun transferFromTempStage(streamId: StreamId, suffix: String) + + /** + * Get the generation of a single record in the stage. Not necessarily the min or max + * generation, just _any_ record. + * + * [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage + * always contains exactly one generation. + * + * @return The generation ID of a record in the stage, or `null` if the stage is empty. + */ + fun getStageGeneration(streamId: StreamId, suffix: String): Long? /** Delete previously staged data, using deterministic information from streamId. */ fun cleanupStage(streamId: StreamId) /** Write data to stage. */ - fun writeToStage(streamConfig: StreamConfig, data: Data) + fun writeToStage(streamConfig: StreamConfig, suffix: String, data: Data) /* * ==================== Final Table Operations ================================ diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStatus.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStatus.kt index 2003a8b7d0e0..6f03d0410772 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStatus.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationInitialStatus.kt @@ -7,7 +7,13 @@ package io.airbyte.integrations.base.destination.typing_deduping data class DestinationInitialStatus( val streamConfig: StreamConfig, val isFinalTablePresent: Boolean, + // TODO we should probably make this nullable, then delete InitialRawTableStatus.rawTableExists val initialRawTableStatus: InitialRawTableStatus, + /** + * The state of the temp raw table, or null if there is no temp raw table at the start of the + * sync. + */ + val initialTempRawTableStatus: InitialRawTableStatus, val isSchemaMismatch: Boolean, val isFinalTableEmpty: Boolean, val destinationState: DestinationState, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt index 49df7a54bc49..7d905bd36a6d 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt @@ -8,6 +8,18 @@ import java.util.* data class InitialRawTableStatus( val rawTableExists: Boolean, + /** + * Whether there were any records with null `_airbyte_loaded_at`, at the time that this status + * was fetched. + */ val hasUnprocessedRecords: Boolean, + // TODO Make maxProcessedTimestamp just `Instant?` instead of Optional + /** + * The highest timestamp such that all records in `SELECT * FROM raw_table WHERE + * _airbyte_extracted_at <= ?` have a nonnull `_airbyte_loaded_at`. + * + * Destinations MAY use this value to only run T+D on records with `_airbyte_extracted_at > ?` + * (note the strictly-greater comparison). + */ val maxProcessedTimestamp: Optional ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt index 011431c3d9c8..bbbf41e38629 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamId.kt @@ -42,8 +42,9 @@ data class StreamId( return "$quote$finalNamespace$quote.$quote$finalName$suffix$quote" } - fun rawTableId(quote: String): String { - return "$quote$rawNamespace$quote.$quote$rawName$quote" + @JvmOverloads + fun rawTableId(quote: String, suffix: String = ""): String { + return "$quote$rawNamespace$quote.$quote$rawName$suffix$quote" } fun finalName(quote: String): String { diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt index dd0c3922be04..c42ab1992ef0 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt @@ -52,8 +52,9 @@ class AbstractStreamOperationTest { storageOperation, destinationInitialStatus, ) { - override fun writeRecords( + override fun writeRecordsImpl( streamConfig: StreamConfig, + suffix: String, stream: Stream ) { // noop @@ -66,28 +67,26 @@ class AbstractStreamOperationTest { mockk>>(relaxed = true) @Nested - inner class Overwrite { + inner class Truncate { private val streamConfig = StreamConfig( streamId, - DestinationSyncMode.OVERWRITE, + DestinationSyncMode.APPEND, listOf(), Optional.empty(), columns, - // TODO currently these values are unused. Eventually we should restructure this - // class - // to test based on generation ID instead of sync mode. - 0, - 0, - 0 + generationId = 21, + minimumGenerationId = 21, + syncId = 0 ) @Test fun emptyDestination() { val initialState = mockk> { - every { streamConfig } returns this@Overwrite.streamConfig + every { streamConfig } returns this@Truncate.streamConfig every { initialRawTableStatus } returns mockk() + every { initialTempRawTableStatus.rawTableExists } returns false every { isFinalTablePresent } returns false every { destinationState.withSoftReset(any()) @@ -97,7 +96,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, EXPECTED_SUFFIX) storageOperation.createFinalTable(streamConfig, "", false) } confirmVerified(storageOperation) @@ -110,6 +109,7 @@ class AbstractStreamOperationTest { verifySequence { storageOperation.cleanupStage(streamId) + storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX) storageOperation.typeAndDedupe( streamConfig, Optional.empty(), @@ -125,11 +125,12 @@ class AbstractStreamOperationTest { } @Test - fun existingEmptyTableSchemaMismatch() { + fun existingEmptyFinalTableSchemaMismatch() { val initialState = mockk> { - every { streamConfig } returns this@Overwrite.streamConfig + every { streamConfig } returns this@Truncate.streamConfig every { initialRawTableStatus } returns mockk() + every { initialTempRawTableStatus.rawTableExists } returns false every { isFinalTablePresent } returns true every { isFinalTableEmpty } returns true // Even though there's a schema mismatch, we're running in overwrite mode, @@ -144,8 +145,8 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) - storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + storageOperation.prepareStage(streamId, EXPECTED_SUFFIX) + storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, true) } confirmVerified(storageOperation) @@ -157,12 +158,13 @@ class AbstractStreamOperationTest { verifySequence { storageOperation.cleanupStage(streamId) + storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX) storageOperation.typeAndDedupe( streamConfig, Optional.empty(), - EXPECTED_OVERWRITE_SUFFIX, + EXPECTED_SUFFIX, ) - storageOperation.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + storageOperation.overwriteFinalTable(streamConfig, EXPECTED_SUFFIX) } confirmVerified(storageOperation) checkUnnecessaryStub( @@ -173,11 +175,12 @@ class AbstractStreamOperationTest { } @Test - fun existingEmptyTableMatchingSchema() { + fun existingEmptyFinalTableMatchingSchema() { val initialState = mockk> { - every { streamConfig } returns this@Overwrite.streamConfig + every { streamConfig } returns this@Truncate.streamConfig every { initialRawTableStatus } returns mockk() + every { initialTempRawTableStatus.rawTableExists } returns false every { isFinalTablePresent } returns true every { isFinalTableEmpty } returns true every { isSchemaMismatch } returns false @@ -189,7 +192,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, EXPECTED_SUFFIX) // No table creation - we can just reuse the existing table. } confirmVerified(storageOperation) @@ -202,6 +205,7 @@ class AbstractStreamOperationTest { verifySequence { storageOperation.cleanupStage(streamId) + storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX) storageOperation.typeAndDedupe( streamConfig, Optional.empty(), @@ -217,11 +221,12 @@ class AbstractStreamOperationTest { } @Test - fun existingNonEmptyTable() { + fun existingNonEmptyFinalTable() { val initialState = mockk> { - every { streamConfig } returns this@Overwrite.streamConfig + every { streamConfig } returns this@Truncate.streamConfig every { initialRawTableStatus } returns mockk() + every { initialTempRawTableStatus.rawTableExists } returns false every { isFinalTablePresent } returns true every { isFinalTableEmpty } returns false every { @@ -232,8 +237,8 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) - storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + storageOperation.prepareStage(streamId, EXPECTED_SUFFIX) + storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, true) } confirmVerified(storageOperation) @@ -245,12 +250,13 @@ class AbstractStreamOperationTest { verifySequence { storageOperation.cleanupStage(streamId) + storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX) storageOperation.typeAndDedupe( streamConfig, Optional.empty(), - EXPECTED_OVERWRITE_SUFFIX, + EXPECTED_SUFFIX, ) - storageOperation.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + storageOperation.overwriteFinalTable(streamConfig, EXPECTED_SUFFIX) } confirmVerified(storageOperation) checkUnnecessaryStub( @@ -261,11 +267,12 @@ class AbstractStreamOperationTest { } @Test - fun existingNonEmptyTableStatusIncomplete() { + fun existingNonEmptyFinalTableStatusIncomplete() { val initialState = mockk> { - every { streamConfig } returns this@Overwrite.streamConfig + every { streamConfig } returns this@Truncate.streamConfig every { initialRawTableStatus } returns mockk() + every { initialTempRawTableStatus.rawTableExists } returns false every { isFinalTablePresent } returns true every { isFinalTableEmpty } returns false every { @@ -294,11 +301,12 @@ class AbstractStreamOperationTest { } @Test - fun existingNonEmptyTableNoNewRecords() { + fun existingNonEmptyFinalTableNoNewRecords() { val initialState = mockk> { - every { streamConfig } returns this@Overwrite.streamConfig + every { streamConfig } returns this@Truncate.streamConfig every { initialRawTableStatus } returns mockk() + every { initialTempRawTableStatus.rawTableExists } returns false every { isFinalTablePresent } returns true every { isFinalTableEmpty } returns false every { @@ -309,8 +317,8 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) - storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) + storageOperation.prepareStage(streamId, EXPECTED_SUFFIX) + storageOperation.createFinalTable(streamConfig, EXPECTED_SUFFIX, true) } confirmVerified(storageOperation) @@ -322,7 +330,8 @@ class AbstractStreamOperationTest { verifySequence { storageOperation.cleanupStage(streamId) - storageOperation.overwriteFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX) + storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX) + storageOperation.overwriteFinalTable(streamConfig, EXPECTED_SUFFIX) } confirmVerified(storageOperation) checkUnnecessaryStub( @@ -331,10 +340,96 @@ class AbstractStreamOperationTest { initialState.destinationState ) } + + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#generationIds" + ) + fun existingTempRawTableMatchingGeneration(existingTempTableGeneration: Long?) { + val initialState = + mockk> { + every { streamConfig } returns this@Truncate.streamConfig + every { initialTempRawTableStatus.rawTableExists } returns true + every { isFinalTablePresent } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + every { storageOperation.getStageGeneration(streamId, EXPECTED_SUFFIX) } returns + existingTempTableGeneration + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.getStageGeneration(streamId, EXPECTED_SUFFIX) + storageOperation.prepareStage(streamId, EXPECTED_SUFFIX) + storageOperation.createFinalTable(streamConfig, "", false) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub(initialState, initialState.destinationState) + } + + @Test + fun existingTempRawTableWrongGeneration() { + val initialState = + mockk> { + every { streamConfig } returns this@Truncate.streamConfig + every { initialTempRawTableStatus.rawTableExists } returns true + every { isFinalTablePresent } returns false + every { + destinationState.withSoftReset(any()) + } returns destinationState + } + every { storageOperation.getStageGeneration(streamId, EXPECTED_SUFFIX) } returns -1 + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.getStageGeneration(streamId, EXPECTED_SUFFIX) + storageOperation.prepareStage(streamId, EXPECTED_SUFFIX, replace = true) + storageOperation.createFinalTable(streamConfig, "", false) + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(42, AirbyteStreamStatus.COMPLETE) + ) + + verifySequence { + storageOperation.cleanupStage(streamId) + storageOperation.overwriteStage(streamId, EXPECTED_SUFFIX) + storageOperation.typeAndDedupe( + streamConfig, + Optional.empty(), + "", + ) + } + confirmVerified(storageOperation) + checkUnnecessaryStub(initialState, initialState.destinationState) + } } @Nested - inner class NonOverwrite { + inner class NormalSync { @ParameterizedTest @MethodSource( "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" @@ -343,6 +438,7 @@ class AbstractStreamOperationTest { val initialState = mockk> { every { this@mockk.streamConfig } returns streamConfig + every { initialTempRawTableStatus.rawTableExists } returns false every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() every { isFinalTablePresent } returns false every { @@ -353,7 +449,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.createFinalTable(streamConfig, "", false) } confirmVerified(storageOperation) @@ -384,10 +480,11 @@ class AbstractStreamOperationTest { @MethodSource( "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" ) - fun existingTableSchemaMismatch(streamConfig: StreamConfig) { + fun existingFinalTableSchemaMismatch(streamConfig: StreamConfig) { val initialState = mockk> { every { this@mockk.streamConfig } returns streamConfig + every { initialTempRawTableStatus.rawTableExists } returns false every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() every { isFinalTablePresent } returns true every { isSchemaMismatch } returns true @@ -399,7 +496,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.softResetFinalTable(streamConfig) } confirmVerified(storageOperation) @@ -430,10 +527,11 @@ class AbstractStreamOperationTest { @MethodSource( "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" ) - fun existingTableSchemaMatch(streamConfig: StreamConfig) { + fun existingFinalTableSchemaMatch(streamConfig: StreamConfig) { val initialState = mockk> { every { this@mockk.streamConfig } returns streamConfig + every { initialTempRawTableStatus.rawTableExists } returns false every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() every { isFinalTablePresent } returns true every { isSchemaMismatch } returns false @@ -443,7 +541,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") // No soft reset - we can just reuse the existing table. } confirmVerified(storageOperation) @@ -466,14 +564,71 @@ class AbstractStreamOperationTest { checkUnnecessaryStub(initialState, initialState.initialRawTableStatus) } + /** + * Run a test where the current sync emits 0 records. Verify all the behavior around + * existing raw tables. + */ + @ParameterizedTest + @MethodSource( + "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#normalSyncRawTableStatuses" + ) + fun testRawTableHandling( + shouldRunTD: Boolean, + timestampFilter: Optional, + realRawTableStatus: InitialRawTableStatus, + tempRawTableStatus: InitialRawTableStatus, + streamConfig: StreamConfig, + ) { + val initialState = + mockk> { + every { this@mockk.streamConfig } returns streamConfig + every { initialRawTableStatus } returns realRawTableStatus + every { initialTempRawTableStatus } returns tempRawTableStatus + every { isFinalTablePresent } returns true + every { isSchemaMismatch } returns false + every { destinationState } returns MinimumDestinationState.Impl(false) + } + + val streamOperations = TestStreamOperation(storageOperation, initialState) + + verifySequence { + storageOperation.prepareStage(streamId, "") + if (tempRawTableStatus.rawTableExists) { + storageOperation.transferFromTempStage(streamId, EXPECTED_SUFFIX) + } + // No soft reset - we can just reuse the existing table. + } + confirmVerified(storageOperation) + + clearMocks(storageOperation) + streamOperations.finalizeTable( + streamConfig, + StreamSyncSummary(0, AirbyteStreamStatus.COMPLETE) + ) + + verifySequence { + storageOperation.cleanupStage(streamId) + if (shouldRunTD) { + storageOperation.typeAndDedupe( + streamConfig, + timestampFilter, + "", + ) + } + } + confirmVerified(storageOperation) + checkUnnecessaryStub(initialState) + } + @ParameterizedTest @MethodSource( "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigs" ) - fun existingTableAndStateRequiresSoftReset(streamConfig: StreamConfig) { + fun existingFinalTableAndStateRequiresSoftReset(streamConfig: StreamConfig) { val initialState = mockk> { every { this@mockk.streamConfig } returns streamConfig + every { initialTempRawTableStatus.rawTableExists } returns false every { initialRawTableStatus.maxProcessedTimestamp } returns Optional.empty() every { isFinalTablePresent } returns true every { isSchemaMismatch } returns false @@ -483,7 +638,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.softResetFinalTable(streamConfig) } confirmVerified(storageOperation) @@ -510,13 +665,14 @@ class AbstractStreamOperationTest { @MethodSource( "io.airbyte.integrations.base.destination.operation.AbstractStreamOperationTest#nonOverwriteStreamConfigsAndBoolean" ) - fun existingNonEmptyTableNoNewRecords( + fun existingNonEmptyFinalTableNoNewRecords( streamConfig: StreamConfig, hasUnprocessedRecords: Boolean ) { val initialState = mockk> { every { this@mockk.streamConfig } returns streamConfig + every { initialTempRawTableStatus.rawTableExists } returns false // This is an overwrite sync, so we can ignore the old raw records. // We should skip T+D if the current sync emitted 0 records. every { initialRawTableStatus.hasUnprocessedRecords } returns @@ -537,9 +693,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) - verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) - } + verifySequence { storageOperation.prepareStage(streamId, "") } confirmVerified(storageOperation) clearMocks(storageOperation) @@ -587,7 +741,7 @@ class AbstractStreamOperationTest { ) to AirbyteProtocolType.STRING, ) - const val EXPECTED_OVERWRITE_SUFFIX = "_airbyte_tmp" + const val EXPECTED_SUFFIX = "_airbyte_tmp" val maxProcessedTimestamp = Optional.of(Instant.parse("2024-01-23T12:34:56Z")) private val appendStreamConfig = @@ -597,12 +751,9 @@ class AbstractStreamOperationTest { listOf(), Optional.empty(), columns, - // TODO currently these values are unused. Eventually we should restructure this - // class - // to test based on generation ID instead of sync mode. - 0, - 0, - 0 + generationId = 21, + minimumGenerationId = 0, + syncId = 0 ) private val dedupStreamConfig = StreamConfig( @@ -611,13 +762,11 @@ class AbstractStreamOperationTest { listOf(pk1, pk2), Optional.of(cursor), columns, - // TODO currently these values are unused. Eventually we should restructure this - // class - // to test based on generation ID instead of sync mode. - 0, - 0, - 0 + generationId = 21, + minimumGenerationId = 0, + syncId = 0 ) + private val streamConfigs = arrayOf(appendStreamConfig, dedupStreamConfig) // junit 5 doesn't support class-level parameterization... // so we have to hack this in a somewhat dumb way. @@ -625,10 +774,7 @@ class AbstractStreamOperationTest { // so just shove them together. @JvmStatic fun nonOverwriteStreamConfigs(): Stream = - Stream.of( - Arguments.of(appendStreamConfig), - Arguments.of(dedupStreamConfig), - ) + streamConfigs.map { Arguments.of(it) }.stream() // Some tests are further parameterized, which this method supports. @JvmStatic @@ -639,5 +785,105 @@ class AbstractStreamOperationTest { Arguments.of(dedupStreamConfig, true), Arguments.of(dedupStreamConfig, false), ) + + // ValueSource and CsvSource don't support null, so we have to write an entire method. + @JvmStatic + fun generationIds(): Stream = + Stream.of( + Arguments.of(null), + Arguments.of(21L), + ) + + /** + * The five arguments are: + * * whether we need to run T+D (assuming the sync emitted 0 records and was successful) + * * if we need to run T+D, the timestamp filter to use. + * * the initial real raw table status + * * the initial temp raw table status + * * the StreamConfig to use + */ + @JvmStatic + fun normalSyncRawTableStatuses(): Stream { + val validRawTableStatuses = + arrayOf( + // The raw table doesn't exist + InitialRawTableStatus( + rawTableExists = false, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty() + ), + // The raw table exists, but is empty + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty() + ), + // The raw table exists and contains records, but they're all processed and old + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.of(Instant.parse("2023-01-01T12:34:56Z")), + ), + // The raw table exists and contains records, but they're all processed and new + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.of(Instant.parse("2024-01-01T12:34:56Z")), + ), + // The raw table exists and contains unprocessed records. Already-processed + // records are old. + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.parse("2023-01-01T12:34:56Z")), + ), + // The raw table exists and contains unprocessed records. Already-processed + // records are new. + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.parse("2024-01-01T12:34:56Z")), + ) + ) + return streamConfigs + .flatMap { streamConfig -> + validRawTableStatuses.flatMap { realRawStatus -> + validRawTableStatuses.map { tempRawStatus -> + val shouldRunTD = + realRawStatus.hasUnprocessedRecords || + tempRawStatus.hasUnprocessedRecords + + // Find the lower of the two timestamps. + val timestampFilter = + if (realRawStatus.maxProcessedTimestamp.isPresent) { + if (tempRawStatus.maxProcessedTimestamp.isPresent) { + if ( + realRawStatus.maxProcessedTimestamp + .get() + .isBefore(tempRawStatus.maxProcessedTimestamp.get()) + ) { + realRawStatus.maxProcessedTimestamp + } else { + tempRawStatus.maxProcessedTimestamp + } + } else { + realRawStatus.maxProcessedTimestamp + } + } else { + tempRawStatus.maxProcessedTimestamp + } + + Arguments.of( + shouldRunTD, + timestampFilter, + realRawStatus, + tempRawStatus, + streamConfig, + ) + } + } + } + .stream() + } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt index 762d246babd1..9bb819b99c49 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperationTest.kt @@ -78,6 +78,7 @@ class DefaultSyncOperationTest { hasUnprocessedRecords = false, maxProcessedTimestamp = Optional.empty(), ), + initialTempRawTableStatus = mockk(), isSchemaMismatch = true, isFinalTableEmpty = false, destinationState = @@ -175,6 +176,7 @@ class DefaultSyncOperationTest { hasUnprocessedRecords = false, maxProcessedTimestamp = Optional.empty(), ), + initialTempRawTableStatus = mockk(), isSchemaMismatch = true, isFinalTableEmpty = false, destinationState = diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt index a9e38fb18d9b..8074c8a20e35 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt @@ -12,6 +12,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.migrators.Minimu import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus import io.airbyte.protocol.models.v0.DestinationSyncMode import io.airbyte.protocol.models.v0.StreamDescriptor +import io.mockk.mockk import java.time.Instant import java.util.* import java.util.function.Consumer @@ -119,7 +120,13 @@ class DefaultTyperDeduperTest { initialStates.forEach( Consumer { initialState: DestinationInitialStatus -> Mockito.`when`(initialState.initialRawTableStatus) - .thenReturn(InitialRawTableStatus(true, true, Optional.empty())) + .thenReturn( + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.empty() + ) + ) } ) @@ -316,9 +323,10 @@ class DefaultTyperDeduperTest { Mockito.`when`(initialState.initialRawTableStatus) .thenReturn( InitialRawTableStatus( - true, - true, - Optional.of(Instant.parse("2023-01-01T12:34:56Z")) + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = + Optional.of(Instant.parse("2023-01-01T12:34:56Z")) ) ) } @@ -413,7 +421,13 @@ class DefaultTyperDeduperTest { Mockito.`when`(initialState.isFinalTableEmpty).thenReturn(false) Mockito.`when`(initialState.isSchemaMismatch).thenReturn(false) Mockito.`when`(initialState.initialRawTableStatus) - .thenReturn(InitialRawTableStatus(true, true, Optional.of(Instant.now()))) + .thenReturn( + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.now()) + ) + ) } ) @@ -471,7 +485,13 @@ class DefaultTyperDeduperTest { initialStates.forEach( Consumer { initialState: DestinationInitialStatus -> Mockito.`when`(initialState.initialRawTableStatus) - .thenReturn(InitialRawTableStatus(true, false, Optional.empty())) + .thenReturn( + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = false, + maxProcessedTimestamp = Optional.empty() + ) + ) } ) @@ -521,9 +541,10 @@ class DefaultTyperDeduperTest { Mockito.`when`(initialState.initialRawTableStatus) .thenReturn( InitialRawTableStatus( - true, - true, - Optional.of(Instant.parse("2023-01-23T12:34:56Z")) + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = + Optional.of(Instant.parse("2023-01-23T12:34:56Z")) ) ) } @@ -600,7 +621,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( OVERWRITE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), true, false, MockState(false, false, true) @@ -608,7 +634,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( APPEND_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), true, false, MockState(false, false, true) @@ -616,7 +647,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( DEDUPE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), true, false, MockState(false, false, true) @@ -712,7 +748,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( OVERWRITE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), false, false, MockState(false, false, false) @@ -720,7 +761,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( APPEND_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), false, false, MockState(false, false, false) @@ -728,7 +774,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( DEDUPE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), false, false, MockState(false, false, false) @@ -815,7 +866,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( OVERWRITE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), false, false, MockState(true, false, false) @@ -823,7 +879,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( APPEND_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), false, false, MockState(true, false, false) @@ -831,7 +892,12 @@ class DefaultTyperDeduperTest { DestinationInitialStatus( DEDUPE_STREAM_CONFIG, true, - InitialRawTableStatus(true, true, Optional.of(Instant.ofEpochMilli(42))), + InitialRawTableStatus( + rawTableExists = true, + hasUnprocessedRecords = true, + maxProcessedTimestamp = Optional.of(Instant.ofEpochMilli(42)) + ), + initialTempRawTableStatus = mockk(), false, false, MockState(true, false, false) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index b28d16b2c077..ea4626e29b68 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -583,14 +583,22 @@ abstract class BaseSqlGeneratorIntegrationTest, imageName: String = this.imageName, - configTransformer: Function = Function.identity() + configTransformer: Function = Function.identity(), + streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE ) { val destination = startSync(catalog, imageName, configTransformer) val outputFuture = destinationOutputFuture(destination) pushMessages(messages, destination) + if (streamStatus != null) { + pushStatusMessages(catalog, destination, streamStatus) + } endSync(destination, outputFuture) } + private fun pushStatusMessages( + catalog: io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog, + destination: AirbyteDestination, + streamStatus: AirbyteStreamStatus + ) { + catalog.streams.forEach { + destination.accept( + io.airbyte.protocol.models + .AirbyteMessage() + .withType(io.airbyte.protocol.models.AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withType(AirbyteTraceMessage.Type.STREAM_STATUS) + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(it.stream.namespace) + .withName(it.stream.name), + ) + .withStatus(streamStatus), + ), + ), + ) + } + } + // In the background, read messages from the destination until it terminates. We need to clear // stdout in real time, to prevent the buffer from filling up and blocking the destination. private fun destinationOutputFuture(