diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt index 5003c09c4acf..715875180aa9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/JavaBaseConstants.kt @@ -3,7 +3,7 @@ */ package io.airbyte.cdk.integrations.base -import java.util.* +import java.util.Locale import org.apache.commons.lang3.StringUtils fun upperQuoted(column: String): String { @@ -24,19 +24,22 @@ object JavaBaseConstants { const val COLUMN_NAME_DATA: String = "_airbyte_data" @JvmField val LEGACY_RAW_TABLE_COLUMNS: List = - java.util.List.of(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT) + listOf(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT) // destination v2 const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id" const val COLUMN_NAME_AB_LOADED_AT: String = "_airbyte_loaded_at" const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at" const val COLUMN_NAME_AB_META: String = "_airbyte_meta" + const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id" + + const val AIRBYTE_META_SYNC_ID_KEY = "sync_id" // Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2 // use this column list. @JvmField val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List = - java.util.List.of( + listOf( COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, @@ -44,21 +47,37 @@ object JavaBaseConstants { ) @JvmField val V2_RAW_TABLE_COLUMN_NAMES: List = - java.util.List.of( + listOf( + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_LOADED_AT, + COLUMN_NAME_DATA, + COLUMN_NAME_AB_META, + ) + @JvmField + val V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION: List = + listOf( COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_LOADED_AT, COLUMN_NAME_DATA, COLUMN_NAME_AB_META, + COLUMN_NAME_AB_GENERATION_ID, ) @JvmField val V2_FINAL_TABLE_METADATA_COLUMNS: List = - java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META) + listOf( + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_META, + COLUMN_NAME_AB_GENERATION_ID + ) const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal" enum class DestinationColumns(val rawColumns: List) { V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES), V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META), + V2_WITH_GENERATION(V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION), LEGACY(LEGACY_RAW_TABLE_COLUMNS) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt index 7aa6932b5697..e8fc289c299e 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt @@ -57,7 +57,8 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu protected abstract fun writeRecord( recordString: String, airbyteMetaString: String, - emittedAt: Long + generationId: Long, + emittedAt: Long, ) /** @@ -99,7 +100,12 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu } @Throws(Exception::class) - override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long { + override fun accept( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ): Long { if (!isStarted) { if (useCompression) { compressedBuffer = GzipCompressorOutputStream(byteCounter) @@ -111,7 +117,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu } if (inputStream == null && !isClosed) { val startCount = byteCounter.count - writeRecord(recordString, airbyteMetaString, emittedAt) + writeRecord(recordString, airbyteMetaString, generationId, emittedAt) return byteCounter.count - startCount } else { throw IllegalCallerException("Buffer is already closed, it cannot accept more messages") diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt index 0469d30f1f5f..e1a118d85b58 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt @@ -37,7 +37,12 @@ interface SerializableBuffer : AutoCloseable { * @throws Exception */ @Throws(Exception::class) - fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long + fun accept( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ): Long /** Flush a buffer implementation. */ @Throws(Exception::class) fun flush() diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt index 0d4d25867b70..9154230cd7be 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt @@ -146,8 +146,10 @@ abstract class JdbcSqlGeneratorIntegrationTest + records: List, + generationId: Long, ) { + // TODO handle generation ID val columnNames = if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES insertRecords( diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt index 0e3875f47ff8..0a8628b556a8 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt @@ -58,7 +58,12 @@ class AvroSerializedBuffer( @Throws(IOException::class) @Suppress("DEPRECATION") - override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) { + override fun writeRecord( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ) { // TODO Remove this double deserialization when S3 Destinations moves to Async. writeRecord( Jsons.deserialize( diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt index 885d8e8f1a62..d9cb5e1521df 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt @@ -28,7 +28,8 @@ abstract class BaseSheetGenerator : CsvSheetGenerator { id: UUID, formattedString: String, emittedAt: Long, - formattedAirbyteMetaString: String + formattedAirbyteMetaString: String, + generationId: Long, ): List { // TODO: Make this abstract or default if No-op is intended in NoFlatteningSheetGenerator or // RootLevelFlatteningSheetGenerator diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt index 587c31bc4983..a2ffdf39f0b4 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt @@ -68,13 +68,19 @@ class CsvSerializedBuffer( } @Throws(IOException::class) - override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) { + override fun writeRecord( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ) { csvPrinter!!.printRecord( csvSheetGenerator.getDataRow( UUID.randomUUID(), recordString, emittedAt, airbyteMetaString, + generationId, ), ) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt index ece5f0986aaf..93bc3e9d061c 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt @@ -26,7 +26,8 @@ interface CsvSheetGenerator { id: UUID, formattedString: String, emittedAt: Long, - formattedAirbyteMetaString: String + formattedAirbyteMetaString: String, + generationId: Long, ): List object Factory { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt index 35e1dd91d3f3..d990bbc14a49 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt @@ -29,7 +29,7 @@ class StagingDatabaseCsvSheetGenerator @JvmOverloads constructor( private val destinationColumns: JavaBaseConstants.DestinationColumns = - JavaBaseConstants.DestinationColumns.LEGACY + JavaBaseConstants.DestinationColumns.LEGACY, ) : CsvSheetGenerator { override fun getHeaderRow(): List { return destinationColumns.rawColumns @@ -40,7 +40,9 @@ constructor( id, Jsons.serialize(recordMessage.data), recordMessage.emittedAt, - Jsons.serialize(recordMessage.meta) + Jsons.serialize(recordMessage.meta), + // Legacy code. Default to generation 0. + 0, ) } @@ -52,7 +54,8 @@ constructor( id: UUID, formattedString: String, emittedAt: Long, - formattedAirbyteMetaString: String + formattedAirbyteMetaString: String, + generationId: Long, ): List { return when (destinationColumns) { JavaBaseConstants.DestinationColumns.LEGACY -> @@ -67,6 +70,15 @@ constructor( ) JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString) + JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> + listOf( + id, + Instant.ofEpochMilli(emittedAt), + "", + formattedString, + formattedAirbyteMetaString, + generationId + ) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt index 08fd34b90912..23e578f73104 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt @@ -60,7 +60,12 @@ class JsonLSerializedBuffer( } @Suppress("DEPRECATION") - override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) { + override fun writeRecord( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ) { // TODO Remove this double deserialization when S3 Destinations moves to Async. writeRecord( Jsons.deserialize( diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt index 91e9adedf6bc..37ac3a6d2eb0 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt @@ -122,7 +122,12 @@ class ParquetSerializedBuffer( } @Throws(Exception::class) - override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long { + override fun accept( + recordString: String, + airbyteMetaString: String, + generationId: Long, + emittedAt: Long + ): Long { throw UnsupportedOperationException( "This method is not supported for ParquetSerializedBuffer" ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt index 84af38672a07..eaa610a7beab 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/AsyncFlush.kt @@ -61,6 +61,9 @@ internal class AsyncFlush( writer.accept( record!!.serialized!!, Jsons.serialize(record.record!!.meta), + // Destinations that want to use generations should switch to the new + // structure (e.g. StagingStreamOperations) + 0, record.record!!.emittedAt ) } catch (e: Exception) { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt index 5721f1e1deb2..773c11d8bbd2 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt @@ -277,7 +277,8 @@ private constructor( val tableName: String when (destinationColumns) { JavaBaseConstants.DestinationColumns.V2_WITH_META, - JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> { + JavaBaseConstants.DestinationColumns.V2_WITHOUT_META, + JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> { val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id outputSchema = streamId.rawNamespace tableName = streamId.rawName diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt index eb7c193757c6..80f4e6c26003 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt @@ -42,6 +42,7 @@ class StagingStreamOperations( it.accept( record.serialized!!, Jsons.serialize(record.record!!.meta), + streamConfig.generationId, record.record!!.emittedAt ) } @@ -49,7 +50,7 @@ class StagingStreamOperations( log.info { "Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging" } - storageOperation.writeToStage(streamConfig.id, writeBuffer) + storageOperation.writeToStage(streamConfig, writeBuffer) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt index c09d71de040a..04634f40acdd 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultSyncOperation.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.base.destination.operation +import io.airbyte.cdk.integrations.base.JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.cdk.integrations.destination.operation.SyncOperation @@ -15,6 +16,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.StreamDescriptor import io.github.oshai.kotlinlogging.KotlinLogging import java.util.concurrent.CompletableFuture @@ -102,7 +104,22 @@ class DefaultSyncOperation( override fun flushStream(descriptor: StreamDescriptor, stream: Stream) { val streamConfig = parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name) - streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream) + streamOpsMap[streamConfig.id]?.writeRecords( + streamConfig, + stream.map { record -> + if (record.record!!.meta == null) { + record.record!!.meta = AirbyteRecordMessageMeta() + } + record.also { + it.record!! + .meta!! + .setAdditionalProperty( + AIRBYTE_META_SYNC_ID_KEY, + streamConfig.syncId, + ) + } + }, + ) } override fun finalizeStreams(streamSyncSummaries: Map) { diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt index e437f1a63f52..06c5b8dabab5 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt @@ -25,6 +25,6 @@ class StandardStreamOperation( disableTypeDedupe ) { override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { - storageOperation.writeToStage(streamConfig.id, stream) + storageOperation.writeToStage(streamConfig, stream) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt index 29d22d27c10f..bae34e2b803b 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt @@ -24,7 +24,7 @@ interface StorageOperation { fun cleanupStage(streamId: StreamId) /** Write data to stage. */ - fun writeToStage(streamId: StreamId, data: Data) + fun writeToStage(streamConfig: StreamConfig, data: Data) /* * ==================== Final Table Operations ================================ diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt index 52da4e7b1bf4..70b607e9e1c9 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.kt @@ -118,15 +118,19 @@ abstract class BaseDestinationV1V2Migrator : Destination if ( !(schemaMatchesExpectation( existingV2AirbyteRawTable, - JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META + JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META, ) || schemaMatchesExpectation( existingV2AirbyteRawTable, - JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES + JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES, + ) || + schemaMatchesExpectation( + existingV2AirbyteRawTable, + JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION, )) ) { throw UnexpectedSchemaException( - "Destination V2 Raw Table does not match expected Schema" + "Destination V2 Raw Table does not match expected Schema", ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 08c2ec8ef980..9535f91ca3db 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -122,7 +122,8 @@ abstract class BaseSqlGeneratorIntegrationTest + records: List, + generationId: Long, ) /** @@ -1084,7 +1085,7 @@ abstract class BaseSqlGeneratorIntegrationTest this.copyWithLiftedData(record) } + .map { record: JsonNode -> this.deserializeMetaAndLiftData(record) } .collect(Collectors.toList()), actualRecords .stream() - .map { record: JsonNode -> this.copyWithLiftedData(record) } + .map { record: JsonNode -> this.deserializeMetaAndLiftData(record) } .collect(Collectors.toList()), rawRecordIdentityComparator, rawRecordSortComparator, @@ -126,12 +126,12 @@ constructor( } /** - * Lift _airbyte_data fields to the root level. If _airbyte_data is a string, deserialize it - * first. + * If airbyte_data/airbyte_meta are strings, deserialize them. Then lift _airbyte_data fields to + * the root level. * * @return A copy of the record, but with all fields in _airbyte_data lifted to the top level. */ - private fun copyWithLiftedData(record: JsonNode): JsonNode { + private fun deserializeMetaAndLiftData(record: JsonNode): JsonNode { val copy = record.deepCopy() copy.remove(getMetadataColumnName(rawRecordColumnNames, "_airbyte_data")) var airbyteData = record[getMetadataColumnName(rawRecordColumnNames, "_airbyte_data")] @@ -150,6 +150,13 @@ constructor( ) } } + + val metadataColumnName = getMetadataColumnName(rawRecordColumnNames, "_airbyte_meta") + val airbyteMeta = record[metadataColumnName] + if (airbyteMeta != null && airbyteMeta.isTextual) { + copy.set(metadataColumnName, Jsons.deserializeExact(airbyteMeta.asText())) + } + return copy } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl index b965f22e8bad..7eca1811235c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/alltypes_inputrecords.jsonl @@ -1,4 +1,4 @@ -{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}} -{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} -{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}} -{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}} +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}, "_airbyte_generation_id": 42} +{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}, "_airbyte_generation_id": 42} +{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}, "_airbyte_meta": {"sync_id": 42, "changes": [{"field": "string", "change": "NULLED", "reason": "SOURCE_SERIALIZATION_ERROR"}]}, "_airbyte_generation_id": 42} +{"_airbyte_raw_id": "a4a783b5-7729-4d0b-b659-48ceb08713f1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118}, "_airbyte_generation_id": 42} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl index 2eb8fae2374d..17b317704c9e 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/sqlgenerator/safe_cast/alltypes_inputrecords.jsonl @@ -1,2 +1,2 @@ // Note that array and struct have invalid values ({} and [] respectively). -{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}} +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": null, "number": "foo", "integer": "bar", "boolean": "fizz", "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": "airbyte", "unknown": null}, "_airbyte_generation_id": 42}