From e8fea2e71af3621a55e76d825423563b6debcc1d Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 20 May 2024 11:37:43 -0700 Subject: [PATCH] generation id / sync id in cdk --- .../integrations/base/JavaBaseConstants.kt | 29 ++++++++++-- .../record_buffer/BaseSerializedBuffer.kt | 12 +++-- .../record_buffer/SerializableBuffer.kt | 7 ++- .../JdbcSqlGeneratorIntegrationTest.kt | 4 +- .../s3/avro/AvroSerializedBuffer.kt | 7 ++- .../destination/s3/csv/BaseSheetGenerator.kt | 3 +- .../destination/s3/csv/CsvSerializedBuffer.kt | 8 +++- .../destination/s3/csv/CsvSheetGenerator.kt | 3 +- .../csv/StagingDatabaseCsvSheetGenerator.kt | 18 ++++++-- .../s3/jsonl/JsonLSerializedBuffer.kt | 7 ++- .../s3/parquet/ParquetSerializedBuffer.kt | 7 ++- .../destination/staging/AsyncFlush.kt | 3 ++ .../staging/StagingConsumerFactory.kt | 3 +- .../operation/StagingStreamOperations.kt | 3 +- .../operation/DefaultSyncOperation.kt | 19 +++++++- .../operation/StandardStreamOperation.kt | 2 +- .../destination/operation/StorageOperation.kt | 2 +- .../BaseDestinationV1V2Migrator.kt | 10 ++-- .../BaseSqlGeneratorIntegrationTest.kt | 15 ++++-- .../typing_deduping/BaseTypingDedupingTest.kt | 46 ++++++++++++++++++- .../typing_deduping/RecordDiffer.kt | 17 +++++-- .../sqlgenerator/alltypes_inputrecords.jsonl | 8 ++-- .../safe_cast/alltypes_inputrecords.jsonl | 2 +- .../destination-bigquery/build.gradle | 2 +- 24 files changed, 192 insertions(+), 45 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt index 5003c09c4acf..715875180aa9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt @@ -3,7 +3,7 @@ */ package io.airbyte.cdk.integrations.base -import java.util.* +import java.util.Locale import org.apache.commons.lang3.StringUtils fun upperQuoted(column: String): String { @@ -24,19 +24,22 @@ object JavaBaseConstants { const val COLUMN_NAME_DATA: String = "_airbyte_data" @JvmField val LEGACY_RAW_TABLE_COLUMNS: List = - java.util.List.of(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT) + listOf(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT) // destination v2 const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id" const val COLUMN_NAME_AB_LOADED_AT: String = "_airbyte_loaded_at" const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at" const val COLUMN_NAME_AB_META: String = "_airbyte_meta" + const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id" + + const val AIRBYTE_META_SYNC_ID_KEY = "sync_id" // Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2 // use this column list. @JvmField val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List = - java.util.List.of( + listOf( COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, @@ -44,21 +47,37 @@ object JavaBaseConstants { ) @JvmField val V2_RAW_TABLE_COLUMN_NAMES: List = - java.util.List.of( + listOf( + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_LOADED_AT, + COLUMN_NAME_DATA, + COLUMN_NAME_AB_META, + ) + @JvmField + val V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION: List = + listOf( COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, COLUMN_NAME_DATA, COLUMN_NAME_AB_META, + COLUMN_NAME_AB_GENERATION_ID, ) @JvmField val V2_FINAL_TABLE_METADATA_COLUMNS: List = - java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META) + listOf( + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_META, + COLUMN_NAME_AB_GENERATION_ID + ) const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal" enum class DestinationColumns(val rawColumns: List) { V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES), V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META), + V2_WITH_GENERATION(V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION), LEGACY(LEGACY_RAW_TABLE_COLUMNS) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt index a837a6114e7f..ebabc53b2860 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt @@ -57,7 +57,8 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu protected abstract fun writeRecord( recordString: String, airbyteMetaString: String, - emittedAt: Long + generationId: Long, + emittedAt: Long, ) /** @@ -99,7 +100,12 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu } @Throws(Exception::class) - override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long { + override fun accept( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ): Long { if (!isStarted) { if (useCompression) { compressedBuffer = GzipCompressorOutputStream(byteCounter) @@ -111,7 +117,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu } if (inputStream == null && !isClosed) { val startCount = byteCounter.count - writeRecord(recordString, airbyteMetaString, emittedAt) + writeRecord(recordString, airbyteMetaString, generationId, emittedAt) return byteCounter.count - startCount } else { throw IllegalCallerException("Buffer is already closed, it cannot accept more messages") diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt index 0469d30f1f5f..e1a118d85b58 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt @@ -37,7 +37,12 @@ interface SerializableBuffer : AutoCloseable { * @throws Exception */ @Throws(Exception::class) - fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long + fun accept( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ): Long /** Flush a buffer implementation. */ @Throws(Exception::class) fun flush() diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt index 05b21c933412..40406c36cdeb 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt @@ -140,8 +140,10 @@ abstract class JdbcSqlGeneratorIntegrationTest + records: List, + generationId: Long, ) { + // TODO handle generation ID val columnNames = if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES insertRecords( diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt index 5f5464a25360..ed7b4d804eb2 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt @@ -58,7 +58,12 @@ class AvroSerializedBuffer( @Throws(IOException::class) @Suppress("DEPRECATION") - override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) { + override fun writeRecord( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ) { // TODO Remove this double deserialization when S3 Destinations moves to Async. writeRecord( Jsons.deserialize( diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt index 885d8e8f1a62..d9cb5e1521df 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt @@ -28,7 +28,8 @@ abstract class BaseSheetGenerator : CsvSheetGenerator { id: UUID, formattedString: String, emittedAt: Long, - formattedAirbyteMetaString: String + formattedAirbyteMetaString: String, + generationId: Long, ): List { // TODO: Make this abstract or default if No-op is intended in NoFlatteningSheetGenerator or // RootLevelFlatteningSheetGenerator diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt index 10e2c56d9ae4..ae8939cee23d 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt @@ -68,13 +68,19 @@ class CsvSerializedBuffer( } @Throws(IOException::class) - override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) { + override fun writeRecord( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ) { csvPrinter!!.printRecord( csvSheetGenerator.getDataRow( UUID.randomUUID(), recordString, emittedAt, airbyteMetaString, + generationId, ), ) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt index ece5f0986aaf..93bc3e9d061c 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt @@ -26,7 +26,8 @@ interface CsvSheetGenerator { id: UUID, formattedString: String, emittedAt: Long, - formattedAirbyteMetaString: String + formattedAirbyteMetaString: String, + generationId: Long, ): List object Factory { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt index 35e1dd91d3f3..d990bbc14a49 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt @@ -29,7 +29,7 @@ class StagingDatabaseCsvSheetGenerator @JvmOverloads constructor( private val destinationColumns: JavaBaseConstants.DestinationColumns = - JavaBaseConstants.DestinationColumns.LEGACY + JavaBaseConstants.DestinationColumns.LEGACY, ) : CsvSheetGenerator { override fun getHeaderRow(): List { return destinationColumns.rawColumns @@ -40,7 +40,9 @@ constructor( id, Jsons.serialize(recordMessage.data), recordMessage.emittedAt, - Jsons.serialize(recordMessage.meta) + Jsons.serialize(recordMessage.meta), + // Legacy code. Default to generation 0. + 0, ) } @@ -52,7 +54,8 @@ constructor( id: UUID, formattedString: String, emittedAt: Long, - formattedAirbyteMetaString: String + formattedAirbyteMetaString: String, + generationId: Long, ): List { return when (destinationColumns) { JavaBaseConstants.DestinationColumns.LEGACY -> @@ -67,6 +70,15 @@ constructor( ) JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString) + JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> + listOf( + id, + Instant.ofEpochMilli(emittedAt), + "", + formattedString, + formattedAirbyteMetaString, + generationId + ) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt index 6091c6e5490b..aabf30753261 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt @@ -60,7 +60,12 @@ class JsonLSerializedBuffer( } @Suppress("DEPRECATION") - override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) { + override fun writeRecord( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ) { // TODO Remove this double deserialization when S3 Destinations moves to Async. writeRecord( Jsons.deserialize( diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt index 21e465b7e765..d86bd162aef2 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt @@ -118,7 +118,12 @@ class ParquetSerializedBuffer( } @Throws(Exception::class) - override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long { + override fun accept( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ): Long { throw UnsupportedOperationException( "This method is not supported for ParquetSerializedBuffer" ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt index b40b83a62d65..ad7722e16843 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt @@ -61,6 +61,9 @@ internal class AsyncFlush( writer.accept( record.serialized!!, Jsons.serialize(record.record!!.meta), + // Destinations that want to use generations should switch to the new + // structure (e.g. StagingStreamOperations) + 0, record.record!!.emittedAt ) } catch (e: Exception) { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt index 93acb51c45e2..2cd699c75995 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt @@ -275,7 +275,8 @@ private constructor( val tableName: String when (destinationColumns) { JavaBaseConstants.DestinationColumns.V2_WITH_META, - JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> { + JavaBaseConstants.DestinationColumns.V2_WITHOUT_META, + JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> { val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id outputSchema = streamId.rawNamespace tableName = streamId.rawName diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt index eb7c193757c6..80f4e6c26003 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt @@ -42,6 +42,7 @@ class StagingStreamOperations( it.accept( record.serialized!!, Jsons.serialize(record.record!!.meta), + streamConfig.generationId, record.record!!.emittedAt ) } @@ -49,7 +50,7 @@ class StagingStreamOperations( log.info { "Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging" } - storageOperation.writeToStage(streamConfig.id, writeBuffer) + storageOperation.writeToStage(streamConfig, writeBuffer) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt index c09d71de040a..04634f40acdd 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.base.destination.operation +import io.airbyte.cdk.integrations.base.JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.cdk.integrations.destination.operation.SyncOperation @@ -15,6 +16,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging import java.util.concurrent.CompletableFuture @@ -102,7 +104,22 @@ class DefaultSyncOperation( override fun flushStream(descriptor: StreamDescriptor, stream: Stream) { val streamConfig = parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name) - streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream) + streamOpsMap[streamConfig.id]?.writeRecords( + streamConfig, + stream.map { record -> + if (record.record!!.meta == null) { + record.record!!.meta = AirbyteRecordMessageMeta() + } + record.also { + it.record!! + .meta!! + .setAdditionalProperty( + AIRBYTE_META_SYNC_ID_KEY, + streamConfig.syncId, + ) + } + }, + ) } override fun finalizeStreams(streamSyncSummaries: Map) { diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt index e437f1a63f52..06c5b8dabab5 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt @@ -25,6 +25,6 @@ class StandardStreamOperation( disableTypeDedupe ) { override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { - storageOperation.writeToStage(streamConfig.id, stream) + storageOperation.writeToStage(streamConfig, stream) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt index 29d22d27c10f..bae34e2b803b 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt @@ -24,7 +24,7 @@ interface StorageOperation { fun cleanupStage(streamId: StreamId) /** Write data to stage. */ - fun writeToStage(streamId: StreamId, data: Data) + fun writeToStage(streamConfig: StreamConfig, data: Data) /* * ==================== Final Table Operations ================================ diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt index 268ab985bf2e..22de48a938d4 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt @@ -115,15 +115,19 @@ abstract class BaseDestinationV1V2Migrator : Destination if ( !(schemaMatchesExpectation( existingV2AirbyteRawTable, - JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META + JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META, ) || schemaMatchesExpectation( existingV2AirbyteRawTable, - JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES + JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES, + ) || + schemaMatchesExpectation( + existingV2AirbyteRawTable, + JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION, )) ) { throw UnexpectedSchemaException( - "Destination V2 Raw Table does not match expected Schema" + "Destination V2 Raw Table does not match expected Schema", ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index c3223701c83d..bcb5488acfa0 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -118,7 +118,8 @@ abstract class BaseSqlGeneratorIntegrationTest + records: List, + generationId: Long, ) /** @@ -1079,7 +1080,7 @@ abstract class BaseSqlGeneratorIntegrationTest, actualRecords: List) { val diff = diffRecords( - expectedRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }, - actualRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }, + expectedRecords.map { record: JsonNode -> this.deserializeMetaAndLiftData(record) }, + actualRecords.map { record: JsonNode -> this.deserializeMetaAndLiftData(record) }, rawRecordIdentityComparator, rawRecordSortComparator, rawRecordIdentityExtractor, @@ -120,12 +120,12 @@ constructor( } /** - * Lift _airbyte_data fields to the root level. If _airbyte_data is a string, deserialize it - * first. + * If airbyte_data/airbyte_meta are strings, deserialize them. Then lift _airbyte_data fields to + * the root level. * * @return A copy of the record, but with all fields in _airbyte_data lifted to the top level. */ - private fun copyWithLiftedData(record: JsonNode): JsonNode { + private fun deserializeMetaAndLiftData(record: JsonNode): JsonNode { val copy = record.deepCopy() copy.remove(getMetadataColumnName(rawRecordColumnNames, "_airbyte_data")) var airbyteData = record[getMetadataColumnName(rawRecordColumnNames, "_airbyte_data")] @@ -144,6 +144,13 @@ constructor( ) } } + + val metadataColumnName = getMetadataColumnName(rawRecordColumnNames, "_airbyte_meta") + val airbyteMeta = record[metadataColumnName] + if (airbyteMeta != null && airbyteMeta.isTextual) { + copy.set(metadataColumnName, Jsons.deserializeExact(airbyteMeta.asText())) + } + return copy } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl index b965f22e8bad..7eca1811235c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl @@ -1,4 +1,4 @@ -{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}} -{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} -{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}} -{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}} +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}, "_airbyte_generation_id": 42} +{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}, "_airbyte_generation_id": 42} +{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}, "_airbyte_generation_id": 42} +{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}, "_airbyte_generation_id": 42} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl index 2eb8fae2374d..17b317704c9e 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl @@ -1,2 +1,2 @@ // Note that array and struct have invalid values ({} and [] respectively). -{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}} +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}, "_airbyte_generation_id": 42} diff --git a/airbyte-integrations/connectors/destination-bigquery/build.gradle b/airbyte-integrations/connectors/destination-bigquery/build.gradle index c23be7d3cc4b..824d442c7412 100644 --- a/airbyte-integrations/connectors/destination-bigquery/build.gradle +++ b/airbyte-integrations/connectors/destination-bigquery/build.gradle @@ -11,7 +11,7 @@ airbyteJavaConnector { 'gcs-destinations', 'core', ] - useLocalCdk = false + useLocalCdk = true } java {