From 33e3e0d14ca5dc595c9ac569ad7dbcbd769cc6ca Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Wed, 31 Jul 2024 09:43:30 -0700 Subject: [PATCH] CDK test failures --- .../record_buffer/BaseSerializedBuffer.kt | 10 ++-- .../record_buffer/BufferingStrategy.kt | 3 +- .../InMemoryRecordBufferingStrategy.kt | 3 +- .../record_buffer/SerializableBuffer.kt | 2 +- .../SerializedBufferingStrategy.kt | 5 +- .../BufferedStreamConsumerTest.kt | 5 +- .../SerializedBufferingStrategyTest.kt | 3 +- .../destination/DestinationAcceptanceTest.kt | 46 ++++++++++--------- ...with_generation_id_configured_catalog.json | 4 +- .../destination/s3/S3ConsumerFactory.kt | 6 +-- .../s3/S3DestinationFlushFunction.kt | 6 +-- .../AvroFieldConversionFailureListener.kt | 21 ++------- .../destination/s3/avro/AvroRecordFactory.kt | 10 ++-- .../s3/avro/AvroSerializedBuffer.kt | 4 +- .../s3/avro/JsonToAvroSchemaConverter.kt | 26 ++--------- .../destination/s3/csv/BaseSheetGenerator.kt | 11 +++-- .../destination/s3/csv/CsvSerializedBuffer.kt | 4 +- .../destination/s3/csv/CsvSheetGenerator.kt | 8 +++- .../s3/csv/NoFlatteningSheetGenerator.kt | 4 +- .../csv/RootLevelFlatteningSheetGenerator.kt | 1 - .../csv/StagingDatabaseCsvSheetGenerator.kt | 3 +- .../s3/jsonl/JsonLSerializedBuffer.kt | 16 +++++-- .../s3/parquet/ParquetSerializedBuffer.kt | 9 +++- .../s3/S3BaseCsvDestinationAcceptanceTest.kt | 1 - 24 files changed, 102 insertions(+), 109 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt index 0093fd6aeab9..d973b469f330 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BaseSerializedBuffer.kt @@ -42,7 +42,11 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu */ @Deprecated("") @Throws(IOException::class) - protected abstract fun writeRecord(record: AirbyteRecordMessage, generationId: Long = 0) + protected abstract fun writeRecord( + record: AirbyteRecordMessage, + generationId: Long = 0, + syncId: Long = 0 + ) /** * TODO: (ryankfu) move destination to use serialized record string instead of passing entire @@ -80,7 +84,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu @Deprecated("") @Throws(Exception::class) - override fun accept(record: AirbyteRecordMessage, generationId: Long): Long { + override fun accept(record: AirbyteRecordMessage, generationId: Long, syncId: Long): Long { if (!isStarted) { if (useCompression) { compressedBuffer = GzipCompressorOutputStream(byteCounter) @@ -92,7 +96,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu } if (inputStream == null && !isClosed) { val startCount = byteCounter.count - @Suppress("deprecation") writeRecord(record, generationId) + @Suppress("deprecation") writeRecord(record, generationId, syncId) return byteCounter.count - startCount } else { throw IllegalCallerException("Buffer is already closed, it cannot accept more messages") diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BufferingStrategy.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BufferingStrategy.kt index 3f754b3c75dc..5201b12dad1c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BufferingStrategy.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/BufferingStrategy.kt @@ -30,7 +30,8 @@ interface BufferingStrategy : AutoCloseable { fun addRecord( stream: AirbyteStreamNameNamespacePair, message: AirbyteMessage, - generationId: Long = 0 + generationId: Long = 0, + syncId: Long = 0 ): Optional /** Flush the buffered messages from a single stream */ diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt index e34e7875f49d..e55b91aa28c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt @@ -42,7 +42,8 @@ class InMemoryRecordBufferingStrategy( override fun addRecord( stream: AirbyteStreamNameNamespacePair, message: AirbyteMessage, - generationId: Long + generationId: Long, + syncId: Long ): Optional { var flushed: Optional = Optional.empty() diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt index af5c74f66485..f03902ec020e 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializableBuffer.kt @@ -26,7 +26,7 @@ interface SerializableBuffer : AutoCloseable { */ @Deprecated("") @Throws(Exception::class) - fun accept(record: AirbyteRecordMessage, generationId: Long = 0): Long + fun accept(record: AirbyteRecordMessage, generationId: Long = 0, syncId: Long = 0): Long /** * TODO: (ryankfu) Move all destination connectors to pass the serialized record string instead diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategy.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategy.kt index c0b1b7589969..c059980d5a64 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategy.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategy.kt @@ -47,14 +47,15 @@ class SerializedBufferingStrategy override fun addRecord( stream: AirbyteStreamNameNamespacePair, message: AirbyteMessage, - generationId: Long + generationId: Long, + syncId: Long ): Optional { var flushed: Optional = Optional.empty() val buffer = getOrCreateBuffer(stream) @Suppress("DEPRECATION") - val actualMessageSizeInBytes = buffer.accept(message.record, generationId) + val actualMessageSizeInBytes = buffer.accept(message.record, generationId, syncId) totalBufferSizeInBytes += actualMessageSizeInBytes // Flushes buffer when either the buffer was completely filled or only a single stream was // filled diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt index d8b5be49db9f..d343a4d1c69b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers import org.mockito.Mockito import org.mockito.kotlin.any +import org.mockito.kotlin.eq import org.mockito.kotlin.mock class BufferedStreamConsumerTest { @@ -361,7 +362,7 @@ class BufferedStreamConsumerTest { // The first two records that we push will not trigger any flushes, but the third record // _will_ // trigger a flush - Mockito.`when`(strategy.addRecord(any(), any())) + Mockito.`when`(strategy.addRecord(any(), any(), eq(0))) .thenReturn( Optional.empty(), Optional.empty(), @@ -463,7 +464,7 @@ class BufferedStreamConsumerTest { // The first two records that we push will not trigger any flushes, but the third record // _will_ // trigger a flush - Mockito.`when`(strategy.addRecord(any(), any())) + Mockito.`when`(strategy.addRecord(any(), any(), eq(0))) .thenReturn( Optional.empty(), Optional.empty(), diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategyTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategyTest.kt index a17973cf043b..3c43353e3d78 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategyTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/SerializedBufferingStrategyTest.kt @@ -14,6 +14,7 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.Mockito import org.mockito.kotlin.any +import org.mockito.kotlin.eq class SerializedBufferingStrategyTest { private val catalog: ConfiguredAirbyteCatalog = @@ -37,7 +38,7 @@ class SerializedBufferingStrategyTest { @Throws(Exception::class) private fun setupMock(mockObject: SerializableBuffer) { - Mockito.`when`(mockObject.accept(any())).thenReturn(10L) + Mockito.`when`(mockObject.accept(any(), eq(0L))).thenReturn(10L) Mockito.`when`(mockObject.byteCount).thenReturn(10L) Mockito.`when`(mockObject.maxTotalBufferSizeInBytes).thenReturn(MAX_TOTAL_BUFFER_SIZE_BYTES) Mockito.`when`(mockObject.maxPerStreamBufferSizeInBytes) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index c0fa1d666d39..d1f586c8f434 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -33,7 +33,6 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteMessage.Type import io.airbyte.protocol.models.v0.AirbyteRecordMessage -import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import io.airbyte.protocol.models.v0.AirbyteStateMessage import io.airbyte.protocol.models.v0.AirbyteStateStats @@ -1581,13 +1580,13 @@ abstract class DestinationAcceptanceTest( return record } - private fun getChanges(record: JsonNode): MutableList { + private fun getMeta(record: JsonNode): ObjectNode { val meta = record.get(JavaBaseConstants.COLUMN_NAME_AB_META) val asString = if (meta.isTextual) meta.asText() else Jsons.serialize(meta) - val asMeta = Jsons.deserialize(asString, AirbyteRecordMessageMeta::class.java) + val asMeta = Jsons.deserialize(asString) - return asMeta.changes + return asMeta as ObjectNode } @Test @@ -1628,11 +1627,6 @@ abstract class DestinationAcceptanceTest( Assertions.assertEquals(message.record.emittedAt, record.get(abTsKey).asLong()) if (useV2Fields) { - // Validate that the loadTs at at least >= the time the test started - Assertions.assertTrue( - record.get(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT).asLong() >= - preRunTime.toEpochMilli() - ) // Generation id should match the one from the catalog Assertions.assertEquals( generationId, @@ -1642,18 +1636,22 @@ abstract class DestinationAcceptanceTest( } // Regardless of whether change failures are capatured, all v2 - // destinations should pass upstream changes through. + // destinations should pass upstream changes through and set sync id. if (useV2Fields) { - val changes = getChanges(destinationOutput[2]) + val metas = destinationOutput.map { getMeta(it) } + val syncIdsAllValid = metas.map { it["sync_id"].asLong() }.all { it == 100L } + Assertions.assertTrue(syncIdsAllValid) + + val changes = metas[2]["changes"].elements().asSequence().toList() Assertions.assertEquals(changes.size, 1) - Assertions.assertEquals(changes[0].field, "name") + Assertions.assertEquals(changes[0]["field"].asText(), "name") Assertions.assertEquals( - changes[0].change, - AirbyteRecordMessageMetaChange.Change.TRUNCATED + changes[0]["change"].asText(), + AirbyteRecordMessageMetaChange.Change.TRUNCATED.value() ) Assertions.assertEquals( - changes[0].reason, - AirbyteRecordMessageMetaChange.Reason.SOURCE_FIELD_SIZE_LIMITATION + changes[0]["reason"].asText(), + AirbyteRecordMessageMetaChange.Reason.SOURCE_FIELD_SIZE_LIMITATION.value() ) } @@ -1665,14 +1663,17 @@ abstract class DestinationAcceptanceTest( val data = getData(badRow) Assertions.assertTrue(data["id"] == null || data["id"].isNull) - val changes = getChanges(badRow) + val changes = getMeta(badRow)["changes"].elements().asSequence().toList() Assertions.assertEquals(1, changes.size) - Assertions.assertEquals("id", changes[0].field) - Assertions.assertEquals(AirbyteRecordMessageMetaChange.Change.NULLED, changes[0].change) + Assertions.assertEquals("id", changes[0]["field"].asText()) + Assertions.assertEquals( + AirbyteRecordMessageMetaChange.Change.NULLED.value(), + changes[0]["change"].asText() + ) Assertions.assertEquals( - AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR, - changes[0].reason + AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR.value(), + changes[0]["reason"].asText() ) // Expect the third message to have added a new change to an old one @@ -1681,7 +1682,8 @@ abstract class DestinationAcceptanceTest( Assertions.assertTrue( dataWithPreviousChange["id"] == null || dataWithPreviousChange["id"].isNull ) - val twoChanges = getChanges(badRowWithPreviousChange) + val twoChanges = + getMeta(badRowWithPreviousChange)["changes"].elements().asSequence().toList() Assertions.assertEquals(2, twoChanges.size) } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/users_with_generation_id_configured_catalog.json b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/users_with_generation_id_configured_catalog.json index 3d74442d56ae..aea0ae5be928 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/users_with_generation_id_configured_catalog.json +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v0/users_with_generation_id_configured_catalog.json @@ -18,7 +18,9 @@ "cursor_field": [], "destination_sync_mode": "overwrite", "primary_key": [], - "generation_id": 10 + "generation_id": 10, + "minimum_generation_id": 10, + "sync_id": 100 } ] } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt index b131579f9cec..435efd8d0eec 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3ConsumerFactory.kt @@ -167,13 +167,13 @@ class S3ConsumerFactory { // Buffer creation function: yields a file buffer that converts // incoming data to the correct format for the destination. - val generationIds = + val generationAndSyncIds = catalog.streams.associate { stream -> val descriptor = StreamDescriptor() .withNamespace(stream.stream.namespace) .withName(stream.stream.name) - descriptor to stream.generationId + descriptor to Pair(stream.generationId, stream.syncId) } val createFunction = @@ -201,7 +201,7 @@ class S3ConsumerFactory { flushBufferFunction(storageOps, writeConfigs, catalog) ) }, - generationIds + generationAndSyncIds ), catalog, // S3 has no concept of default namespace diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt index 3f4d79c87bc2..cb89081ddd0b 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationFlushFunction.kt @@ -18,7 +18,7 @@ import java.util.stream.Stream class S3DestinationFlushFunction( override val optimalBatchSizeBytes: Long, private val strategyProvider: () -> BufferingStrategy, - private val generationIds: Map = emptyMap() + private val generationAndSyncIds: Map> = emptyMap() ) : DestinationFlushFunction { override fun flush(streamDescriptor: StreamDescriptor, stream: Stream) { @@ -47,8 +47,8 @@ class S3DestinationFlushFunction( .withData(data) val completeMessage = AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(completeRecord) - val generationId = generationIds[streamDescriptor] ?: 0 - strategy.addRecord(nameAndNamespace, completeMessage, generationId) + val (generationId, syncId) = generationAndSyncIds[streamDescriptor] ?: Pair(0L, 0L) + strategy.addRecord(nameAndNamespace, completeMessage, generationId, syncId) } strategy.flushSingleStream(nameAndNamespace) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroFieldConversionFailureListener.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroFieldConversionFailureListener.kt index 9a70166cb9bf..7762e7124c27 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroFieldConversionFailureListener.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroFieldConversionFailureListener.kt @@ -7,29 +7,14 @@ package io.airbyte.cdk.integrations.destination.s3.avro import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import java.lang.Exception import org.apache.avro.Schema -import org.apache.avro.SchemaBuilder import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecordBuilder import tech.allegro.schema.json2avro.converter.FieldConversionFailureListener class AvroFieldConversionFailureListener : FieldConversionFailureListener() { - val CHANGE_SCHEMA: Schema = - SchemaBuilder.builder() - .record("change") - .fields() - .name("field") - .type() - .stringType() - .noDefault() - .name("change") - .type() - .stringType() - .noDefault() - .name("reason") - .type() - .stringType() - .noDefault() - .endRecord() + companion object { + val CHANGE_SCHEMA: Schema = AvroConstants.AVRO_CHANGES_SCHEMA + } override fun onFieldConversionFailure( avroName: String, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroRecordFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroRecordFactory.kt index 385b6c4a095e..c6672076c33b 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroRecordFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroRecordFactory.kt @@ -63,18 +63,16 @@ class AvroRecordFactory(private val schema: Schema?, private val converter: Json fun getAvroRecordV2( id: UUID, generationId: Long, + syncId: Long, recordMessage: AirbyteRecordMessage ): GenericData.Record { val jsonRecord = MAPPER.createObjectNode() jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, id.toString()) jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, recordMessage.emittedAt) - jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, System.currentTimeMillis()) jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId) - jsonRecord.replace( - JavaBaseConstants.COLUMN_NAME_AB_META, - MAPPER.valueToTree(recordMessage.meta) as ObjectNode - ) - + val meta = MAPPER.valueToTree(recordMessage.meta) as ObjectNode + meta.put("sync_id", syncId) + jsonRecord.replace(JavaBaseConstants.COLUMN_NAME_AB_META, meta) jsonRecord.setAll(recordMessage.data as ObjectNode) return converter!!.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt index 3839f8a700f8..1d36f954aa53 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroSerializedBuffer.kt @@ -56,10 +56,10 @@ class AvroSerializedBuffer( @Deprecated("Deprecated in Java") @Throws(IOException::class) - override fun writeRecord(record: AirbyteRecordMessage, generationId: Long) { + override fun writeRecord(record: AirbyteRecordMessage, generationId: Long, syncId: Long) { if (this.useV2FieldNames) { dataFileWriter!!.append( - avroRecordFactory.getAvroRecordV2(UUID.randomUUID(), generationId, record) + avroRecordFactory.getAvroRecordV2(UUID.randomUUID(), generationId, syncId, record) ) } else { dataFileWriter!!.append(avroRecordFactory.getAvroRecord(UUID.randomUUID(), record)) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt index 36245e51e744..70a68a313f83 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt @@ -121,31 +121,10 @@ class JsonToAvroSchemaConverter { .name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT) .type(TIMESTAMP_MILLIS_SCHEMA) .noDefault() - assembler - .name(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT) - .type(TIMESTAMP_MILLIS_SCHEMA) - .noDefault() assembler .name(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID) .type(Schema.create(Schema.Type.LONG)) .noDefault() - val changeSchema: Schema = - SchemaBuilder.builder() - .record("change") - .fields() - .name("field") - .type() - .stringType() - .noDefault() - .name("change") - .type() - .stringType() - .noDefault() - .name("reason") - .type() - .stringType() - .noDefault() - .endRecord() assembler .name(JavaBaseConstants.COLUMN_NAME_AB_META) .type( @@ -153,7 +132,10 @@ class JsonToAvroSchemaConverter { .record(JavaBaseConstants.COLUMN_NAME_AB_META) .fields() .name("changes") - .type(Schema.createArray(changeSchema)) + .type(Schema.createArray(AvroConstants.AVRO_CHANGES_SCHEMA)) + .noDefault() + .name("sync_id") + .type(Schema.create(Schema.Type.LONG)) .noDefault() .endRecord() ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt index 1446c9e60819..04443ec6b287 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/BaseSheetGenerator.kt @@ -4,9 +4,10 @@ package io.airbyte.cdk.integrations.destination.s3.csv import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.commons.jackson.MoreMappers import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteRecordMessage -import java.time.Instant import java.util.* /** @@ -17,15 +18,17 @@ abstract class BaseSheetGenerator(private val useV2Fields: Boolean = false) : Cs override fun getDataRow( id: UUID, recordMessage: AirbyteRecordMessage, - generationId: Long + generationId: Long, + syncId: Long, ): List { val data: MutableList = LinkedList() if (useV2Fields) { data.add(id) data.add(recordMessage.emittedAt) - data.add(Instant.now().toEpochMilli()) - data.add(Jsons.serialize(recordMessage.meta)) + val meta = MoreMappers.initMapper().valueToTree(recordMessage.meta) as ObjectNode + meta.put("sync_id", syncId) + data.add(Jsons.serialize(meta)) data.add(generationId) } else { data.add(id) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt index 93f3a61bc90b..b565cb0eb795 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSerializedBuffer.kt @@ -63,9 +63,9 @@ class CsvSerializedBuffer( */ @Deprecated("Deprecated in Java") @Throws(IOException::class) - override fun writeRecord(record: AirbyteRecordMessage, generationId: Long) { + override fun writeRecord(record: AirbyteRecordMessage, generationId: Long, syncId: Long) { csvPrinter!!.printRecord( - csvSheetGenerator.getDataRow(UUID.randomUUID(), record, generationId) + csvSheetGenerator.getDataRow(UUID.randomUUID(), record, generationId, syncId) ) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt index 21c5a2a360d3..5242532db315 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/CsvSheetGenerator.kt @@ -18,7 +18,12 @@ interface CsvSheetGenerator { // TODO: (ryankfu) remove this and switch over all destinations to pass in serialized // recordStrings, // both for performance and lowers memory footprint - fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage, generationId: Long = 0): List + fun getDataRow( + id: UUID, + recordMessage: AirbyteRecordMessage, + generationId: Long = 0, + syncId: Long = 0 + ): List fun getDataRow(formattedData: JsonNode): List @@ -38,7 +43,6 @@ interface CsvSheetGenerator { useV2FieldNames: Boolean = false ): CsvSheetGenerator { return if (formatConfig.flattening == Flattening.NO) { - println("NO FLAT") NoFlatteningSheetGenerator(useV2FieldNames) } else if (formatConfig.flattening == Flattening.ROOT_LEVEL) { RootLevelFlatteningSheetGenerator(jsonSchema!!, useV2FieldNames) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/NoFlatteningSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/NoFlatteningSheetGenerator.kt index fb57f905e599..d29f964b4164 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/NoFlatteningSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/NoFlatteningSheetGenerator.kt @@ -16,7 +16,6 @@ class NoFlatteningSheetGenerator( return listOf( JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, - JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, JavaBaseConstants.COLUMN_NAME_AB_META, JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, JavaBaseConstants.COLUMN_NAME_DATA, @@ -32,7 +31,6 @@ class NoFlatteningSheetGenerator( /** When no flattening is needed, the record column is just one json blob. */ override fun getRecordColumns(json: JsonNode): List { - val tmp = Jsons.serialize(json) - return listOf(tmp) + return listOf(Jsons.serialize(json)) } } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt index 7cbc5acf157d..1d2527b3657b 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt @@ -27,7 +27,6 @@ class RootLevelFlatteningSheetGenerator( mutableListOf( JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, - JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, JavaBaseConstants.COLUMN_NAME_AB_META, JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt index 093cf95fd2b5..1e93f2e571f5 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/StagingDatabaseCsvSheetGenerator.kt @@ -38,7 +38,8 @@ constructor( override fun getDataRow( id: UUID, recordMessage: AirbyteRecordMessage, - generationId: Long + generationId: Long, + syncId: Long ): List { return getDataRow( id, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt index 451739d78179..ca464e14711e 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/jsonl/JsonLSerializedBuffer.kt @@ -43,12 +43,18 @@ class JsonLSerializedBuffer( printWriter = PrintWriter(outputStream, true, StandardCharsets.UTF_8) } - private fun addV2Fields(json: ObjectNode, record: AirbyteRecordMessage, generationId: Long) { + private fun addV2Fields( + json: ObjectNode, + record: AirbyteRecordMessage, + generationId: Long, + syncId: Long + ) { json.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, UUID.randomUUID().toString()) json.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, record.emittedAt) - json.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, System.currentTimeMillis()) json.put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId) - json.replace(JavaBaseConstants.COLUMN_NAME_AB_META, MAPPER.valueToTree(record.meta)) + val meta = MAPPER.valueToTree(record.meta) + meta.put("sync_id", syncId) + json.replace(JavaBaseConstants.COLUMN_NAME_AB_META, meta) } private fun addV1Fields(json: ObjectNode, record: AirbyteRecordMessage) { @@ -57,10 +63,10 @@ class JsonLSerializedBuffer( } @Deprecated("Deprecated in Java") - override fun writeRecord(record: AirbyteRecordMessage, generationId: Long) { + override fun writeRecord(record: AirbyteRecordMessage, generationId: Long, syncId: Long) { val json = MAPPER.createObjectNode() if (useV2FieldNames) { - addV2Fields(json, record, generationId) + addV2Fields(json, record, generationId, syncId) } else { addV1Fields(json, record) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt index 786a55faaa93..6d0e46c44b01 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBuffer.kt @@ -111,12 +111,17 @@ class ParquetSerializedBuffer( @Deprecated("Deprecated in Java") @Throws(Exception::class) - override fun accept(record: AirbyteRecordMessage, generationId: Long): Long { + override fun accept(record: AirbyteRecordMessage, generationId: Long, syncId: Long): Long { if (inputStream == null && !isClosed) { val startCount: Long = byteCount if (useV2FieldNames) { parquetWriter.write( - avroRecordFactory.getAvroRecordV2(UUID.randomUUID(), generationId, record) + avroRecordFactory.getAvroRecordV2( + UUID.randomUUID(), + generationId, + syncId, + record + ) ) } else { parquetWriter.write(avroRecordFactory.getAvroRecord(UUID.randomUUID(), record)) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt index 415fb4eee704..324f9015ba4a 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseCsvDestinationAcceptanceTest.kt @@ -98,7 +98,6 @@ abstract class S3BaseCsvDestinationAcceptanceTest : key == JavaBaseConstants.COLUMN_NAME_AB_ID || (key == JavaBaseConstants.COLUMN_NAME_EMITTED_AT) || (key == JavaBaseConstants.COLUMN_NAME_AB_RAW_ID) || - (key == JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT) || (key == JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT) || (key == JavaBaseConstants.COLUMN_NAME_AB_META) || (key == JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID) ||