From 1bd65afe8c069262ba68fc1e66292f335b8ef886 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 20 Dec 2024 14:00:20 -0800 Subject: [PATCH] Bulk Load CDK: Remove interfaces from InputFlow, migrate tests to mockk (#49974) --- airbyte-cdk/bulk/core/load/build.gradle | 9 + .../MockDestinationWriter.kt | 6 +- ...DestinationRecordToAirbyteValueWithMeta.kt | 8 +- .../cdk/load/message/DestinationMessage.kt | 13 +- .../message/DestinationMessageDeserializer.kt | 26 +- .../load/message/DestinationMessageQueues.kt | 2 +- .../cdk/load/task/DestinationTaskLauncher.kt | 5 +- .../task/implementor/ProcessRecordsTask.kt | 21 +- .../load/task/internal/InputConsumerTask.kt | 13 +- ....kt => ReservingDeserializingInputFlow.kt} | 29 +- .../cdk/load/task/internal/SpillToDiskTask.kt | 2 +- .../io/airbyte/cdk/load/write/StreamLoader.kt | 4 +- ...AirbyteValueToAirbyteValueWithMetaTest.kt} | 10 +- .../load/message/DestinationMessageTest.kt | 2 +- .../load/task/DestinationTaskLauncherTest.kt | 20 +- .../load/task/DestinationTaskLauncherUTest.kt | 5 +- .../implementor/ProcessRecordsTaskTest.kt | 13 +- .../task/internal/InputConsumerTaskTest.kt | 491 ++++++++---------- .../ReservingDeserializingInputFlowTest.kt | 142 ++--- .../load/task/internal/SpillToDiskTaskTest.kt | 10 +- .../util/StubDestinationMessageFactory.kt | 8 +- .../ObjectStorageFormattingWriter.kt | 14 +- .../object_storage/RecordToPartAccumulator.kt | 4 +- .../RecordToPartAccumulatorTest.kt | 8 +- .../destination-dev-null/metadata.yaml | 2 +- .../destination/dev_null/DevNullWriter.kt | 10 +- .../destination-iceberg-v2/metadata.yaml | 2 +- .../iceberg/v2/IcebergStreamLoader.kt | 4 +- .../destination/iceberg/v2/io/IcebergUtil.kt | 17 +- .../iceberg/v2/io/IcebergUtilTest.kt | 8 +- docs/integrations/destinations/dev-null.md | 5 +- 31 files changed, 410 insertions(+), 503 deletions(-) rename airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/{SizedInputFlow.kt => ReservingDeserializingInputFlow.kt} (64%) rename airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/{DestinationRecordToAirbyteValueWithMetaTest.kt => DestinationRecordAirbyteValueToAirbyteValueWithMetaTest.kt} (86%) diff --git a/airbyte-cdk/bulk/core/load/build.gradle b/airbyte-cdk/bulk/core/load/build.gradle index 40e10a40eb96..8898379c9a2e 100644 --- a/airbyte-cdk/bulk/core/load/build.gradle +++ b/airbyte-cdk/bulk/core/load/build.gradle @@ -52,6 +52,15 @@ tasks.named('check').configure { dependsOn integrationTest } +project.tasks.matching { + it.name == 'spotbugsIntegrationTestLegacy' || + it.name == 'spotbugsIntegrationTest' || + it.name == 'spotbugsTest' || + it.name == 'spotbugsMain' +}.configureEach { + enabled = false +} + test { systemProperties(["mockk.junit.extension.requireParallelTesting":"true"]) } diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt index 68d71570bb38..f6f99528a8a7 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationWriter.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.test.util.OutputRecord @@ -38,7 +38,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { override val groupId: String? = null } - data class LocalBatch(val records: List) : MockBatch() { + data class LocalBatch(val records: List) : MockBatch() { override val state = Batch.State.STAGED } data class LocalFileBatch(val file: DestinationFile) : MockBatch() { @@ -72,7 +72,7 @@ class MockStreamLoader(override val stream: DestinationStream) : StreamLoader { } override suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean ): Batch { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt index 0bd98b3e1635..10cd2ec6e9e8 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMeta.kt @@ -5,7 +5,7 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.Meta import java.util.* @@ -59,5 +59,7 @@ fun Pair>.withAirbyteMeta( DestinationRecordToAirbyteValueWithMeta(stream, flatten) .convert(first, emittedAtMs, Meta(second)) -fun DestinationRecord.dataWithAirbyteMeta(stream: DestinationStream, flatten: Boolean = false) = - DestinationRecordToAirbyteValueWithMeta(stream, flatten).convert(data, emittedAtMs, meta) +fun DestinationRecordAirbyteValue.dataWithAirbyteMeta( + stream: DestinationStream, + flatten: Boolean = false +) = DestinationRecordToAirbyteValueWithMeta(stream, flatten).convert(data, emittedAtMs, meta) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt index 44f417520785..08bf0b4d002c 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt @@ -79,13 +79,15 @@ data class Meta( } } -data class DestinationRecord( +sealed interface DestinationRecord : DestinationRecordDomainMessage + +data class DestinationRecordAirbyteValue( override val stream: DestinationStream.Descriptor, val data: AirbyteValue, val emittedAtMs: Long, val meta: Meta?, val serialized: String, -) : DestinationStreamAffinedMessage { +) : DestinationRecord { override fun asProtocolMessage(): AirbyteMessage = AirbyteMessage() .withType(AirbyteMessage.Type.RECORD) @@ -348,7 +350,10 @@ class DestinationMessageFactory( private val catalog: DestinationCatalog, @Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean, ) { - fun fromAirbyteMessage(message: AirbyteMessage, serialized: String): DestinationMessage { + fun fromAirbyteMessage( + message: AirbyteMessage, + serialized: String, + ): DestinationMessage { fun toLong(value: Any?, name: String): Long? { return value?.let { when (it) { @@ -391,7 +396,7 @@ class DestinationMessageFactory( ) ) } else { - DestinationRecord( + DestinationRecordAirbyteValue( stream = stream.descriptor, data = message.record.data?.let { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt index c66c7bbecef1..b9b2ff39fdd0 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt @@ -8,19 +8,17 @@ import io.airbyte.cdk.load.util.deserializeToClass import io.airbyte.protocol.models.v0.AirbyteMessage import jakarta.inject.Singleton -interface Deserializer { - fun deserialize(serialized: String): T -} - /** * Converts the internal @[AirbyteMessage] to the internal @[DestinationMessage] Ideally, this would * not use protocol messages at all, but rather a specialized deserializer for routing. */ @Singleton -class DefaultDestinationMessageDeserializer(private val messageFactory: DestinationMessageFactory) : - Deserializer { - - override fun deserialize(serialized: String): DestinationMessage { +class ProtocolMessageDeserializer( + private val destinationMessageFactory: DestinationMessageFactory +) { + fun deserialize( + serialized: String, + ): DestinationMessage { val airbyteMessage = try { serialized.deserializeToClass(AirbyteMessage::class.java) @@ -49,13 +47,9 @@ class DefaultDestinationMessageDeserializer(private val messageFactory: Destinat ) } - val internalDestinationMessage = - try { - messageFactory.fromAirbyteMessage(airbyteMessage, serialized) - } catch (t: Throwable) { - throw RuntimeException("Failed to convert AirbyteMessage to DestinationMessage", t) - } - - return internalDestinationMessage + return destinationMessageFactory.fromAirbyteMessage( + airbyteMessage, + serialized, + ) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt index 7b68e69de3b9..0ab41148ce46 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageQueues.kt @@ -30,7 +30,7 @@ sealed class DestinationStreamEvent : Sized data class StreamRecordEvent( val index: Long, override val sizeBytes: Long, - val record: DestinationRecord + val payload: DestinationRecordAirbyteValue ) : DestinationStreamEvent() /** diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 8ed5d2a8a70b..f022a6804a79 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -11,7 +11,6 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.CheckpointMessageWrapped import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter @@ -29,7 +28,7 @@ import io.airbyte.cdk.load.task.implementor.TeardownTaskFactory import io.airbyte.cdk.load.task.internal.FlushCheckpointsTaskFactory import io.airbyte.cdk.load.task.internal.FlushTickTask import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory -import io.airbyte.cdk.load.task.internal.SizedInputFlow +import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask import io.airbyte.cdk.load.task.internal.UpdateCheckpointsTask @@ -125,7 +124,7 @@ class DefaultDestinationTaskLauncher( @Value("\${airbyte.file-transfer.enabled}") private val fileTransferEnabled: Boolean, // Input Consumer requirements - private val inputFlow: SizedInputFlow>, + private val inputFlow: ReservingDeserializingInputFlow, private val recordQueueSupplier: MessageQueueSupplier>, private val checkpointQueue: QueueWriter>, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt index 23ebe1e0146f..3ec320e2448d 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTask.kt @@ -9,14 +9,13 @@ import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.BatchEnvelope -import io.airbyte.cdk.load.message.Deserializer -import io.airbyte.cdk.load.message.DestinationMessage -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationRecordStreamComplete import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MultiProducerChannel +import io.airbyte.cdk.load.message.ProtocolMessageDeserializer import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DestinationTaskLauncher @@ -47,7 +46,7 @@ interface ProcessRecordsTask : KillableScope class DefaultProcessRecordsTask( private val config: DestinationConfiguration, private val taskLauncher: DestinationTaskLauncher, - private val deserializer: Deserializer, + private val deserializer: ProtocolMessageDeserializer, private val syncManager: SyncManager, private val diskManager: ReservationManager, private val inputQueue: MessageQueue, @@ -70,7 +69,7 @@ class DefaultProcessRecordsTask( file.localFile.inputStream().use { val records = if (file.isEmpty) { - emptyList().listIterator() + emptyList().listIterator() } else { it.toRecordIterator() } @@ -91,7 +90,11 @@ class DefaultProcessRecordsTask( log.info { "Forcing finalization of all accumulators." } accumulators.forEach { (streamDescriptor, acc) -> val finalBatch = - acc.processRecords(emptyList().listIterator(), 0, true) + acc.processRecords( + emptyList().listIterator(), + 0, + true + ) handleBatch(streamDescriptor, finalBatch, null) } } @@ -113,7 +116,7 @@ class DefaultProcessRecordsTask( } } - private fun InputStream.toRecordIterator(): Iterator { + private fun InputStream.toRecordIterator(): Iterator { return lineSequence() .map { when (val message = deserializer.deserialize(it)) { @@ -127,7 +130,7 @@ class DefaultProcessRecordsTask( .takeWhile { it !is DestinationRecordStreamComplete && it !is DestinationRecordStreamIncomplete } - .map { it as DestinationRecord } + .map { it as DestinationRecordAirbyteValue } .iterator() } } @@ -147,7 +150,7 @@ data class FileAggregateMessage( @Secondary class DefaultProcessRecordsTaskFactory( private val config: DestinationConfiguration, - private val deserializer: Deserializer, + private val deserializer: ProtocolMessageDeserializer, private val syncManager: SyncManager, @Named("diskManager") private val diskManager: ReservationManager, @Named("fileAggregateQueue") private val inputQueue: MessageQueue, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index 66da0212e0fe..2f9081dd75cc 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -14,8 +14,7 @@ import io.airbyte.cdk.load.message.CheckpointMessageWrapped import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationFileStreamComplete import io.airbyte.cdk.load.message.DestinationFileStreamIncomplete -import io.airbyte.cdk.load.message.DestinationMessage -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationRecordStreamComplete import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage @@ -55,7 +54,7 @@ interface InputConsumerTask : KillableScope @Secondary class DefaultInputConsumerTask( private val catalog: DestinationCatalog, - private val inputFlow: SizedInputFlow>, + private val inputFlow: ReservingDeserializingInputFlow, private val recordQueueSupplier: MessageQueueSupplier>, private val checkpointQueue: QueueWriter>, @@ -72,12 +71,12 @@ class DefaultInputConsumerTask( val manager = syncManager.getStreamManager(stream) val recordQueue = recordQueueSupplier.get(stream) when (val message = reserved.value) { - is DestinationRecord -> { + is DestinationRecordAirbyteValue -> { val wrapped = StreamRecordEvent( index = manager.countRecordIn(), sizeBytes = sizeBytes, - record = message + payload = message ) recordQueue.publish(reserved.replace(wrapped)) } @@ -193,7 +192,7 @@ class DefaultInputConsumerTask( interface InputConsumerTaskFactory { fun make( catalog: DestinationCatalog, - inputFlow: SizedInputFlow>, + inputFlow: ReservingDeserializingInputFlow, recordQueueSupplier: MessageQueueSupplier>, checkpointQueue: QueueWriter>, @@ -207,7 +206,7 @@ class DefaultInputConsumerTaskFactory(private val syncManager: SyncManager) : InputConsumerTaskFactory { override fun make( catalog: DestinationCatalog, - inputFlow: SizedInputFlow>, + inputFlow: ReservingDeserializingInputFlow, recordQueueSupplier: MessageQueueSupplier>, checkpointQueue: QueueWriter>, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SizedInputFlow.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/ReservingDeserializingInputFlow.kt similarity index 64% rename from airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SizedInputFlow.kt rename to airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/ReservingDeserializingInputFlow.kt index 6101b1d1798f..c7dce658529a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SizedInputFlow.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/ReservingDeserializingInputFlow.kt @@ -5,8 +5,8 @@ package io.airbyte.cdk.load.task.internal import io.airbyte.cdk.load.command.DestinationConfiguration -import io.airbyte.cdk.load.message.Deserializer import io.airbyte.cdk.load.message.DestinationMessage +import io.airbyte.cdk.load.message.ProtocolMessageDeserializer import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.Reserved import io.github.oshai.kotlinlogging.KotlinLogging @@ -16,17 +16,18 @@ import java.io.InputStream import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector -interface SizedInputFlow : Flow> - -abstract class ReservingDeserializingInputFlow : SizedInputFlow> { +@Singleton +class ReservingDeserializingInputFlow( + val config: DestinationConfiguration, + val deserializer: ProtocolMessageDeserializer, + @Named("memoryManager") val memoryManager: ReservationManager, + val inputStream: InputStream, +) : Flow>> { val log = KotlinLogging.logger {} - abstract val config: DestinationConfiguration - abstract val deserializer: Deserializer - abstract val memoryManager: ReservationManager - abstract val inputStream: InputStream - - override suspend fun collect(collector: FlowCollector>>) { + override suspend fun collect( + collector: FlowCollector>> + ) { log.info { "Reserved ${memoryManager.totalCapacityBytes/1024}mb memory for input processing" } @@ -50,11 +51,3 @@ abstract class ReservingDeserializingInputFlow : SizedInputFlow, - @Named("memoryManager") override val memoryManager: ReservationManager, - override val inputStream: InputStream -) : ReservingDeserializingInputFlow() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt index bf3a9df7ccc6..182bbe3d9fba 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTask.kt @@ -92,7 +92,7 @@ class DefaultSpillToDiskTask( diskManager.reserve(event.sizeBytes) // write to disk - outputStream.write(event.record.serialized) + outputStream.write(event.payload.serialized) outputStream.write("\n") // calculate whether we should flush diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt index 3fe495067434..308dc5e99c09 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamLoader.kt @@ -7,7 +7,7 @@ package io.airbyte.cdk.load.write import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed @@ -56,7 +56,7 @@ interface StreamLoader : BatchAccumulator { interface BatchAccumulator { suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean = false ): Batch = diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMetaTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/DestinationRecordAirbyteValueToAirbyteValueWithMetaTest.kt similarity index 86% rename from airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMetaTest.kt rename to airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/DestinationRecordAirbyteValueToAirbyteValueWithMetaTest.kt index cf37f224a42d..60979a5bdca5 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/DestinationRecordToAirbyteValueWithMetaTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/DestinationRecordAirbyteValueToAirbyteValueWithMetaTest.kt @@ -5,12 +5,12 @@ package io.airbyte.cdk.load.data import io.airbyte.cdk.load.command.MockDestinationCatalogFactory -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.Meta import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test -class DestinationRecordToAirbyteValueWithMetaTest { +class DestinationRecordAirbyteValueToAirbyteValueWithMetaTest { val stream = MockDestinationCatalogFactory.stream1 val emittedAtMs = 123456L val syncId = stream.syncId @@ -41,7 +41,8 @@ class DestinationRecordToAirbyteValueWithMetaTest { ) val expected = LinkedHashMap(expectedMeta) expected[Meta.COLUMN_NAME_DATA] = data - val mockRecord = DestinationRecord(stream.descriptor, data, emittedAtMs, Meta(), "dummy") + val mockRecord = + DestinationRecordAirbyteValue(stream.descriptor, data, emittedAtMs, Meta(), "test") val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = false) val uuid = withMeta.values.remove(Meta.COLUMN_NAME_AB_RAW_ID) as StringValue Assertions.assertTrue( @@ -64,7 +65,8 @@ class DestinationRecordToAirbyteValueWithMetaTest { ) val expected = LinkedHashMap(expectedMeta) data.values.forEach { (name, value) -> expected[name] = value } - val mockRecord = DestinationRecord(stream.descriptor, data, emittedAtMs, Meta(), "dummy") + val mockRecord = + DestinationRecordAirbyteValue(stream.descriptor, data, emittedAtMs, Meta(), "test") val withMeta = mockRecord.dataWithAirbyteMeta(stream, flatten = true) withMeta.values.remove(Meta.COLUMN_NAME_AB_RAW_ID) Assertions.assertEquals(expected, withMeta.values) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt index 3bf820fa42f7..4cadc72fa5ec 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt @@ -47,7 +47,7 @@ class DestinationMessageTest { private fun convert( factory: DestinationMessageFactory, - message: AirbyteMessage + message: AirbyteMessage, ): DestinationMessage { val serialized = message.serializeToString() return factory.fromAirbyteMessage( diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index 34081e2be221..a5f59c711fa3 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -13,7 +13,6 @@ import io.airbyte.cdk.load.command.MockDestinationConfiguration import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.BatchEnvelope import io.airbyte.cdk.load.message.CheckpointMessageWrapped -import io.airbyte.cdk.load.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier @@ -44,7 +43,7 @@ import io.airbyte.cdk.load.task.internal.FlushCheckpointsTaskFactory import io.airbyte.cdk.load.task.internal.FlushTickTask import io.airbyte.cdk.load.task.internal.InputConsumerTask import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory -import io.airbyte.cdk.load.task.internal.SizedInputFlow +import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow import io.airbyte.cdk.load.task.internal.SpillToDiskTask import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask @@ -61,7 +60,6 @@ import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.toList import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions @@ -93,7 +91,7 @@ class DestinationTaskLauncherTest { @Inject lateinit var flushCheckpointsTaskFactory: MockFlushCheckpointsTaskFactory @Inject lateinit var mockForceFlushTask: MockForceFlushTask @Inject lateinit var updateCheckpointsTask: MockUpdateCheckpointsTask - @Inject lateinit var inputFlow: MockInputFlow + @Inject lateinit var inputFlow: ReservingDeserializingInputFlow @Inject lateinit var queueWriter: MockQueueWriter @Inject lateinit var messageQueueSupplier: MockMessageQueueSupplier @Inject lateinit var flushTickTask: FlushTickTask @@ -104,26 +102,22 @@ class DestinationTaskLauncherTest { @Singleton @Primary @Requires(env = ["DestinationTaskLauncherTest"]) - fun flushTickTask(): FlushTickTask = mockk(relaxed = true) + fun inputFlow(): ReservingDeserializingInputFlow = mockk(relaxed = true) @Singleton @Primary @Requires(env = ["DestinationTaskLauncherTest"]) - fun processRecordsTaskFactory(): ProcessRecordsTaskFactory = mockk(relaxed = true) + fun flushTickTask(): FlushTickTask = mockk(relaxed = true) @Singleton @Primary @Requires(env = ["DestinationTaskLauncherTest"]) - fun processBatchTaskFactory(): ProcessBatchTaskFactory = mockk(relaxed = true) + fun processRecordsTaskFactory(): ProcessRecordsTaskFactory = mockk(relaxed = true) @Singleton @Primary @Requires(env = ["DestinationTaskLauncherTest"]) - class MockInputFlow : SizedInputFlow> { - override suspend fun collect( - collector: FlowCollector>> - ) {} - } + fun processBatchTaskFactory(): ProcessBatchTaskFactory = mockk(relaxed = true) @Singleton @Primary @@ -154,7 +148,7 @@ class DestinationTaskLauncherTest { override fun make( catalog: DestinationCatalog, - inputFlow: SizedInputFlow>, + inputFlow: ReservingDeserializingInputFlow, recordQueueSupplier: MessageQueueSupplier< DestinationStream.Descriptor, Reserved>, diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index b763d0613953..0a92a0a83b78 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -8,7 +8,6 @@ import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationConfiguration import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.CheckpointMessageWrapped -import io.airbyte.cdk.load.message.DestinationMessage import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.QueueWriter @@ -28,7 +27,7 @@ import io.airbyte.cdk.load.task.implementor.TeardownTaskFactory import io.airbyte.cdk.load.task.internal.FlushCheckpointsTaskFactory import io.airbyte.cdk.load.task.internal.FlushTickTask import io.airbyte.cdk.load.task.internal.InputConsumerTaskFactory -import io.airbyte.cdk.load.task.internal.SizedInputFlow +import io.airbyte.cdk.load.task.internal.ReservingDeserializingInputFlow import io.airbyte.cdk.load.task.internal.SpillToDiskTask import io.airbyte.cdk.load.task.internal.SpillToDiskTaskFactory import io.airbyte.cdk.load.task.internal.TimedForcedCheckpointFlushTask @@ -74,7 +73,7 @@ class DestinationTaskLauncherUTest { private val failSyncTaskFactory: FailSyncTaskFactory = mockk(relaxed = true) // Input Comsumer requirements - private val inputFlow: SizedInputFlow> = mockk(relaxed = true) + private val inputFlow: ReservingDeserializingInputFlow = mockk(relaxed = true) private val recordQueueSupplier: MessageQueueSupplier> = mockk(relaxed = true) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt index b3299a0c4eab..46bd7c2a15b6 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt @@ -10,11 +10,10 @@ import io.airbyte.cdk.load.command.MockDestinationCatalogFactory import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.BatchEnvelope -import io.airbyte.cdk.load.message.Deserializer -import io.airbyte.cdk.load.message.DestinationMessage -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MultiProducerChannel +import io.airbyte.cdk.load.message.ProtocolMessageDeserializer import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.SyncManager import io.airbyte.cdk.load.task.DefaultDestinationTaskLauncher @@ -37,7 +36,7 @@ import org.junit.jupiter.api.Test class ProcessRecordsTaskTest { private lateinit var config: DestinationConfiguration private lateinit var diskManager: ReservationManager - private lateinit var deserializer: Deserializer + private lateinit var deserializer: ProtocolMessageDeserializer private lateinit var streamLoader: StreamLoader private lateinit var batchAccumulator: BatchAccumulator private lateinit var inputQueue: MessageQueue @@ -62,12 +61,12 @@ class ProcessRecordsTaskTest { deserializer = mockk(relaxed = true) coEvery { deserializer.deserialize(any()) } answers { - DestinationRecord( + DestinationRecordAirbyteValue( stream = MockDestinationCatalogFactory.stream1.descriptor, data = IntegerValue(firstArg().toLong()), emittedAtMs = 0L, meta = null, - serialized = firstArg(), + serialized = firstArg() ) } processRecordsTaskFactory = @@ -84,7 +83,7 @@ class ProcessRecordsTaskTest { class MockBatch( override val groupId: String?, override val state: Batch.State, - recordIterator: Iterator + recordIterator: Iterator ) : Batch { val records = recordIterator.asSequence().toList() } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt index 2623715bf598..02d89192c3e4 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskTest.kt @@ -4,7 +4,7 @@ package io.airbyte.cdk.load.task.internal -import io.airbyte.cdk.load.command.DestinationConfiguration +import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.command.MockDestinationCatalogFactory import io.airbyte.cdk.load.message.CheckpointMessageWrapped @@ -14,218 +14,158 @@ import io.airbyte.cdk.load.message.GlobalCheckpointWrapped import io.airbyte.cdk.load.message.MessageQueue import io.airbyte.cdk.load.message.MessageQueueSupplier import io.airbyte.cdk.load.message.StreamCheckpointWrapped -import io.airbyte.cdk.load.message.StreamEndEvent import io.airbyte.cdk.load.message.StreamRecordEvent +import io.airbyte.cdk.load.state.DefaultStreamManager import io.airbyte.cdk.load.state.ReservationManager import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.state.SyncManager -import io.airbyte.cdk.load.test.util.CoroutineTestUtils +import io.airbyte.cdk.load.test.util.CoroutineTestUtils.Companion.assertThrows import io.airbyte.cdk.load.test.util.StubDestinationMessageFactory -import io.airbyte.cdk.load.util.takeUntilInclusive -import io.micronaut.context.annotation.Primary -import io.micronaut.context.annotation.Requires -import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.coVerifySequence +import io.mockk.impl.annotations.MockK import io.mockk.mockk -import jakarta.inject.Inject -import jakarta.inject.Named -import jakarta.inject.Singleton -import kotlinx.coroutines.channels.Channel +import java.util.concurrent.ConcurrentLinkedQueue import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.take -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -@MicronautTest( - rebuildContext = true, - environments = - [ - "InputConsumerTaskTest", - "MockDestinationConfiguration", - "MockDestinationCatalog", - ] -) class InputConsumerTaskTest { - @Inject lateinit var config: DestinationConfiguration - @Inject lateinit var taskFactory: InputConsumerTaskFactory - @Inject + companion object { + val STREAM1 = DestinationStream.Descriptor("test", "stream1") + val STREAM2 = DestinationStream.Descriptor("test", "stream2") + } + + @MockK(relaxed = true) lateinit var recordQueueSupplier: MessageQueueSupplier> - @Inject lateinit var checkpointQueue: MessageQueue> - @Inject lateinit var syncManager: SyncManager - @Inject lateinit var mockInputFlow: MockInputFlow - @Inject lateinit var mockCatalogFactory: MockDestinationCatalogFactory + @MockK(relaxed = true) + lateinit var checkpointQueue: MessageQueue> + @MockK(relaxed = true) lateinit var syncManager: SyncManager + @MockK(relaxed = true) lateinit var memoryManager: ReservationManager + @MockK(relaxed = true) lateinit var inputFlow: ReservingDeserializingInputFlow + @MockK(relaxed = true) lateinit var catalog: DestinationCatalog + @MockK(relaxed = true) lateinit var stream1: DestinationStream + @MockK(relaxed = true) lateinit var stream2: DestinationStream + @MockK(relaxed = true) lateinit var queue1: MessageQueue> + @MockK(relaxed = true) lateinit var queue2: MessageQueue> - @Singleton - @Primary - @Requires(env = ["InputConsumerTaskTest"]) - class MockInputFlow(@Named("memoryManager") val memoryManager: ReservationManager) : - SizedInputFlow> { - private val messages = Channel>>(Channel.UNLIMITED) - val initialMemory = memoryManager.remainingCapacityBytes + @BeforeEach + fun setup() { + coEvery { stream1.descriptor } returns STREAM1 + coEvery { stream2.descriptor } returns STREAM2 - override suspend fun collect( - collector: FlowCollector>> - ) { - for (message in messages) { - collector.emit(message) - } - } + coEvery { catalog.streams } returns listOf(stream1, stream2) - suspend fun addMessage(message: DestinationMessage, size: Long = 0L) { - messages.send(Pair(size, memoryManager.reserve(1, message))) - } + coEvery { recordQueueSupplier.get(STREAM1) } returns queue1 + coEvery { recordQueueSupplier.get(STREAM2) } returns queue2 - fun stop() { - messages.close() - } + coEvery { syncManager.getStreamManager(STREAM1) } returns DefaultStreamManager(stream1) + coEvery { syncManager.getStreamManager(STREAM2) } returns DefaultStreamManager(stream2) } + private fun DestinationMessage.wrap(bytesReserved: Long) = + bytesReserved to Reserved(memoryManager, bytesReserved, this) + @Test fun testSendRecords() = runTest { - val queue1 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream1.descriptor) - val queue2 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream2.descriptor) + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = firstArg>>>() + collector.emit( + StubDestinationMessageFactory.makeRecord( + MockDestinationCatalogFactory.stream1, + ) + .wrap(1L) + ) + repeat(2) { + collector.emit( + StubDestinationMessageFactory.makeRecord( + MockDestinationCatalogFactory.stream2, + ) + .wrap(it + 2L) + ) + } + } - val manager1 = - syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor) - val manager2 = - syncManager.getStreamManager(MockDestinationCatalogFactory.stream2.descriptor) + val task = + DefaultInputConsumerTaskFactory(syncManager) + .make( + catalog, + inputFlow, + recordQueueSupplier, + checkpointQueue, + mockk(), + ) + task.execute() - (0 until 10).forEach { - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeRecord( - MockDestinationCatalogFactory.stream1, - "test${it}" - ), - it * 2L + coVerify(exactly = 1) { + queue1.publish( + match { + it.value is StreamRecordEvent && + (it.value as StreamRecordEvent).payload.stream == STREAM1 + } ) } - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream1) - ) - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream2) - ) - - val task = - taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), + coVerify(exactly = 2) { + queue2.publish( + match { + it.value is StreamRecordEvent && + (it.value as StreamRecordEvent).payload.stream == STREAM2 + } ) - launch { task.execute() } - - val messages1 = - queue1 - .consume() - .takeUntilInclusive { (it.value as StreamRecordEvent).record.serialized == "test9" } - .toList() - - Assertions.assertEquals(10, messages1.size) - val expectedRecords = - (0 until 10).map { - StreamRecordEvent( - it.toLong(), - it * 2L, - StubDestinationMessageFactory.makeRecord( - MockDestinationCatalogFactory.stream1, - "test${it}" - ) - ) - } - val streamComplete1: Reserved = - queue1.consume().take(1).toList().first() - val streamComplete2: Reserved = - queue2.consume().take(1).toList().first() - - Assertions.assertEquals(expectedRecords, messages1.map { it.value }) - Assertions.assertEquals(expectedRecords.map { _ -> 1L }, messages1.map { it.bytesReserved }) - Assertions.assertEquals(StreamEndEvent(10), streamComplete1.value) - Assertions.assertEquals(1, streamComplete1.bytesReserved) - Assertions.assertEquals(10L, manager1.recordCount()) - Assertions.assertEquals(emptyList(), queue1.consume().toList()) - Assertions.assertEquals(StreamEndEvent(0), streamComplete2.value) - Assertions.assertEquals(emptyList(), queue2.consume().toList()) - Assertions.assertEquals(0L, manager2.recordCount()) - mockInputFlow.stop() + } + assert(syncManager.getStreamManager(stream1.descriptor).recordCount() == 1L) + assert(syncManager.getStreamManager(stream2.descriptor).recordCount() == 2L) } @Test fun testSendEndOfStream() = runTest { - val queue1 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream1.descriptor) - val queue2 = recordQueueSupplier.get(MockDestinationCatalogFactory.stream2.descriptor) - - val manager1 = - syncManager.getStreamManager(MockDestinationCatalogFactory.stream1.descriptor) - val manager2 = - syncManager.getStreamManager(MockDestinationCatalogFactory.stream2.descriptor) - - (0 until 10).forEach { _ -> - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeRecord( - MockDestinationCatalogFactory.stream1, - "whatever" - ), - 0L - ) - } - - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeRecord(MockDestinationCatalogFactory.stream2, "test"), - 1L - ) - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream1), - 0L - ) - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream2), - 0L - ) - val task = - taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), - ) - val job = launch { task.execute() } - mockInputFlow.stop() - job.join() - queue2.close() - Assertions.assertEquals( - listOf( - StreamRecordEvent( - 0, - 1L, + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = firstArg>>>() + collector.emit( StubDestinationMessageFactory.makeRecord( - MockDestinationCatalogFactory.stream2, - "test" - ) - ), - StreamEndEvent(1) - ), - queue2.consume().toList().map { it.value } - ) - Assertions.assertEquals(1L, manager2.recordCount()) + MockDestinationCatalogFactory.stream1, + ) + .wrap(1L) + ) + collector.emit( + StubDestinationMessageFactory.makeStreamComplete( + MockDestinationCatalogFactory.stream1, + ) + .wrap(2L) + ) + collector.emit( + StubDestinationMessageFactory.makeStreamComplete( + MockDestinationCatalogFactory.stream2, + ) + .wrap(3L) + ) + } - Assertions.assertEquals(manager2.endOfStreamRead(), true) - Assertions.assertEquals(manager1.endOfStreamRead(), true) + val task = + DefaultInputConsumerTaskFactory(syncManager) + .make( + catalog, + inputFlow, + recordQueueSupplier, + checkpointQueue, + mockk(), + ) + task.execute() + coVerifySequence { + memoryManager.release(2L) + memoryManager.release(3L) + } - queue1.close() - val messages1 = queue1.consume().toList() - Assertions.assertEquals(11, messages1.size) - Assertions.assertEquals(messages1[10].value, StreamEndEvent(10)) - Assertions.assertEquals( - mockInputFlow.initialMemory - 11, - mockInputFlow.memoryManager.remainingCapacityBytes, - "1 byte per message should have been reserved, but the end-of-stream should have been released" - ) + assert(syncManager.getStreamManager(stream1.descriptor).recordCount() == 1L) + assert(syncManager.getStreamManager(stream1.descriptor).endOfStreamRead()) + assert(syncManager.getStreamManager(stream2.descriptor).recordCount() == 0L) + assert(syncManager.getStreamManager(stream2.descriptor).endOfStreamRead()) } @Test @@ -245,37 +185,36 @@ class InputConsumerTaskTest { ) val task = - taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), - ) - launch { task.execute() } - batches.forEach { (stream, count, expectedCount) -> - repeat(count) { - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeRecord(stream, "test"), - 1L + DefaultInputConsumerTaskFactory(syncManager) + .make( + catalog, + inputFlow, + recordQueueSupplier, + checkpointQueue, + mockk(), ) + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = firstArg>>>() + batches.forEach { (stream, count, _) -> + repeat(count) { + collector.emit(StubDestinationMessageFactory.makeRecord(stream).wrap(1L)) + } + collector.emit( + StubDestinationMessageFactory.makeStreamState(stream, count.toLong()) + .wrap(0L) + ) + } } - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamState(stream, count.toLong()), - 0L - ) - val state = - checkpointQueue.consume().take(1).toList().first().value as StreamCheckpointWrapped - Assertions.assertEquals(expectedCount, state.index) - Assertions.assertEquals(count.toLong(), state.checkpoint.destinationStats?.recordCount) + task.execute() + + val published = ConcurrentLinkedQueue>() + coEvery { checkpointQueue.publish(any()) } coAnswers { published.add(firstArg()) } + published.toList().zip(batches).forEach { (checkpoint, event) -> + val wrapped = checkpoint.value + Assertions.assertEquals(event.expectedStateIndex, wrapped.index) + Assertions.assertEquals(event.stream.descriptor, wrapped.stream) } - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream1) - ) - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream2) - ) - mockInputFlow.stop() } @Test @@ -301,79 +240,87 @@ class InputConsumerTaskTest { ) val task = - taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(), - ) - launch { task.execute() } - batches.forEach { event -> - when (event) { - is AddRecords -> { - repeat(event.count) { - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeRecord(event.stream, "test"), - 1L - ) + DefaultInputConsumerTaskFactory(syncManager) + .make( + catalog, + inputFlow, + recordQueueSupplier, + checkpointQueue, + mockk(), + ) + + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = firstArg>>>() + batches.forEach { event -> + when (event) { + is AddRecords -> { + repeat(event.count) { + collector.emit( + StubDestinationMessageFactory.makeRecord(event.stream).wrap(1L) + ) + } + } + is SendState -> { + collector.emit( + StubDestinationMessageFactory.makeGlobalState( + event.expectedStream1Count + ) + .wrap(0L) + ) + } } } - is SendState -> { - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeGlobalState(event.expectedStream1Count), - 0L - ) - val state = - checkpointQueue.consume().take(1).toList().first().value - as GlobalCheckpointWrapped - val stream1State = - state.streamIndexes.find { - it.first == MockDestinationCatalogFactory.stream1.descriptor - }!! - val stream2State = - state.streamIndexes.find { - it.first == MockDestinationCatalogFactory.stream2.descriptor - }!! - Assertions.assertEquals(event.expectedStream1Count, stream1State.second) - Assertions.assertEquals(event.expectedStream2Count, stream2State.second) - Assertions.assertEquals( - event.expectedStats, - state.checkpoint.destinationStats?.recordCount - ) - } } + val checkpoints = ConcurrentLinkedQueue>() + coEvery { checkpointQueue.publish(any()) } coAnswers { checkpoints.add(firstArg()) } + + task.execute() + + checkpoints.toList().zip(batches.filterIsInstance()).forEach { + (checkpoint, event) -> + val wrapped = checkpoint.value + val stream1State = wrapped.streamIndexes.find { it.first == stream1.descriptor }!! + val stream2State = wrapped.streamIndexes.find { it.first == stream2.descriptor }!! + Assertions.assertEquals(event.expectedStream1Count, stream1State.second) + Assertions.assertEquals(event.expectedStream2Count, stream2State.second) + Assertions.assertEquals( + event.expectedStats, + wrapped.checkpoint.destinationStats?.recordCount + ) } - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream1) - ) - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeStreamComplete(MockDestinationCatalogFactory.stream2) - ) - mockInputFlow.stop() } @Test fun testFileStreamIncompleteThrows() = runTest { - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeFile(MockDestinationCatalogFactory.stream1, "test"), - 1L - ) - mockInputFlow.addMessage( - StubDestinationMessageFactory.makeFileStreamIncomplete( - MockDestinationCatalogFactory.stream1 - ), - 0L - ) + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = firstArg>>>() + collector.emit( + StubDestinationMessageFactory.makeFile( + MockDestinationCatalogFactory.stream1, + "test" + ) + .wrap(1L) + ) + collector.emit( + StubDestinationMessageFactory.makeFileStreamIncomplete( + MockDestinationCatalogFactory.stream1 + ) + .wrap(0L) + ) + } + val task = - taskFactory.make( - mockCatalogFactory.make(), - mockInputFlow, - recordQueueSupplier, - checkpointQueue, - mockk(relaxed = true), - ) - CoroutineTestUtils.assertThrows(IllegalStateException::class) { task.execute() } - mockInputFlow.stop() + DefaultInputConsumerTaskFactory(syncManager) + .make( + catalog, + inputFlow, + recordQueueSupplier, + checkpointQueue, + mockk(relaxed = true), + ) + + assertThrows(IllegalStateException::class) { task.execute() } } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/ReservingDeserializingInputFlowTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/ReservingDeserializingInputFlowTest.kt index 9454d0fa212e..1fd634ab5c08 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/ReservingDeserializingInputFlowTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/ReservingDeserializingInputFlowTest.kt @@ -5,108 +5,70 @@ package io.airbyte.cdk.load.task.internal import io.airbyte.cdk.load.command.DestinationConfiguration -import io.airbyte.cdk.load.message.Deserializer +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.ProtocolMessageDeserializer import io.airbyte.cdk.load.state.ReservationManager -import io.micronaut.context.annotation.Primary -import io.micronaut.context.annotation.Requires -import io.micronaut.test.extensions.junit5.annotation.MicronautTest -import jakarta.inject.Inject -import jakarta.inject.Named -import jakarta.inject.Singleton -import java.io.InputStream -import java.util.stream.Stream +import io.airbyte.cdk.load.state.Reserved +import io.mockk.coEvery +import io.mockk.impl.annotations.MockK +import java.io.ByteArrayInputStream import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Assertions -import org.junit.jupiter.api.extension.ExtensionContext -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.Arguments -import org.junit.jupiter.params.provider.ArgumentsProvider -import org.junit.jupiter.params.provider.ArgumentsSource +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test -@MicronautTest( - rebuildContext = true, - environments = - [ - "ReservingDeserializingInputFlowTest", - "MockDestinationCatalog", - "MockDestinationConfiguration" - ], -) class ReservingDeserializingInputFlowTest { - @Inject lateinit var config: DestinationConfiguration - @Inject lateinit var inputFlow: ReservingDeserializingInputFlow - @Inject lateinit var inputStream: MockInputStream - - @Singleton - @Primary - @Requires(env = ["ReservingDeserializingInputFlowTest"]) - class MockInputFlow( - override val config: DestinationConfiguration, - override val inputStream: InputStream, - override val deserializer: Deserializer, - @Named("memoryManager") override val memoryManager: ReservationManager, - ) : ReservingDeserializingInputFlow() - - @Singleton - @Primary - @Requires(env = ["ReservingDeserializingInputFlowTest"]) - class MockDeserializer : Deserializer { - override fun deserialize(serialized: String): String { - return serialized.reversed() + "!" - } + companion object { + const val RATIO = 1.1 } - @Singleton - @Primary - @Requires(env = ["ReservingDeserializingInputFlowTest"]) - class MockInputStream : InputStream() { - val chars = mutableListOf() - - fun load(lines: List) { - lines.forEach { line -> - chars.addAll(line.toList()) - chars.add('\n') - } - } + @MockK(relaxed = true) lateinit var config: DestinationConfiguration + @MockK(relaxed = true) lateinit var deserializer: ProtocolMessageDeserializer + @MockK(relaxed = true) lateinit var memoryManager: ReservationManager + @MockK(relaxed = true) lateinit var stream: DestinationStream.Descriptor + lateinit var inputFlow: ReservingDeserializingInputFlow - override fun read(): Int { - return if (chars.isEmpty()) { - -1 - } else { - chars.removeAt(0).code + @BeforeEach + fun setup() { + coEvery { memoryManager.reserve(any(), any()) } answers + { + Reserved(memoryManager, firstArg(), secondArg()) } - } } - class InputConsumerTestArgumentsProvider : ArgumentsProvider { - override fun provideArguments(context: ExtensionContext): Stream { - return Stream.of( - Arguments.of(listOf("cat", "dog", "turtle")), - Arguments.of(listOf("", "109j321dcDASD", "2023", "1", "2", "3")) + @Test + fun testInputConsumer() = runTest { + val records = + listOf( + "foo", + "hello there", + "goodbye", ) - } - } + val bytes = records.joinToString("\n").toByteArray() + val inputStream = ByteArrayInputStream(bytes) + + inputFlow = + ReservingDeserializingInputFlow(config, deserializer, memoryManager, inputStream) - @ParameterizedTest - @ArgumentsSource(InputConsumerTestArgumentsProvider::class) - fun testInputConsumer(testInput: List) = runTest { - inputStream.load(testInput) - val inputs = inputFlow.toList() - Assertions.assertEquals( - testInput.filter { it != "" }.map { it.reversed() + "!" }, - inputs.map { it.second.value }.toList() - ) - Assertions.assertEquals( - testInput.filter { it != "" }.map { it.length.toLong() }, - inputs.map { it.first }.toList() - ) - Assertions.assertEquals( - testInput - .filter { it != "" } - .map { (it.length.toLong() * config.estimatedRecordMemoryOverheadRatio).toLong() } - .toList(), - inputs.map { it.second.bytesReserved }.toList() - ) + coEvery { config.estimatedRecordMemoryOverheadRatio } returns RATIO + coEvery { deserializer.deserialize(any()) } answers + { + DestinationRecordAirbyteValue( + stream, + NullValue, + 0L, + null, + firstArg().reversed() + "!", + ) + } + val inputs = inputFlow.toList().map { it.first to it.second.value } + val expectedOutputs = + records.map { + it.length.toLong() to + DestinationRecordAirbyteValue(stream, NullValue, 0L, null, it.reversed() + "!") + } + assert(inputs == expectedOutputs) } } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt index 5220ae432507..76372c4e51b8 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/SpillToDiskTaskTest.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.load.command.MockDestinationConfiguration import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.file.DefaultSpillFileProvider import io.airbyte.cdk.load.file.SpillFileProvider -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationStreamEvent import io.airbyte.cdk.load.message.DestinationStreamEventQueue import io.airbyte.cdk.load.message.DestinationStreamQueueSupplier @@ -96,7 +96,6 @@ class SpillToDiskTaskTest { 2L, StubDestinationMessageFactory.makeRecord( MockDestinationCatalogFactory.stream1, - "test 3", ), ) // flush strategy returns true, so we flush @@ -136,7 +135,6 @@ class SpillToDiskTaskTest { 2L, StubDestinationMessageFactory.makeRecord( MockDestinationCatalogFactory.stream1, - "test 3", ), ) @@ -254,13 +252,13 @@ class SpillToDiskTaskTest { StreamRecordEvent( index = index, sizeBytes = Fixtures.SERIALIZED_SIZE_BYTES, - record = - DestinationRecord( + payload = + DestinationRecordAirbyteValue( stream = MockDestinationCatalogFactory.stream1.descriptor, data = NullValue, emittedAtMs = 0, meta = null, - serialized = "test${index}", + serialized = "test" ), ), ), diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/StubDestinationMessageFactory.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/StubDestinationMessageFactory.kt index d2e649f04b42..459186a73af9 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/StubDestinationMessageFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/StubDestinationMessageFactory.kt @@ -11,7 +11,7 @@ import io.airbyte.cdk.load.message.CheckpointMessage import io.airbyte.cdk.load.message.DestinationFile import io.airbyte.cdk.load.message.DestinationFileStreamComplete import io.airbyte.cdk.load.message.DestinationFileStreamIncomplete -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.DestinationRecordStreamComplete import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete import io.airbyte.cdk.load.message.GlobalCheckpoint @@ -22,13 +22,13 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage * Shared factory methods for making stub destination messages for testing. */ object StubDestinationMessageFactory { - fun makeRecord(stream: DestinationStream, record: String): DestinationRecord { - return DestinationRecord( + fun makeRecord(stream: DestinationStream): DestinationRecordAirbyteValue { + return DestinationRecordAirbyteValue( stream = stream.descriptor, data = NullValue, emittedAtMs = 0, meta = null, - serialized = record + serialized = "test" ) } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt index c5417306a47b..95081b553ea4 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt @@ -25,7 +25,7 @@ import io.airbyte.cdk.load.file.avro.toAvroWriter import io.airbyte.cdk.load.file.csv.toCsvPrinterWithHeader import io.airbyte.cdk.load.file.parquet.ParquetWriter import io.airbyte.cdk.load.file.parquet.toParquetWriter -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.util.serializeToString import io.airbyte.cdk.load.util.write import io.github.oshai.kotlinlogging.KotlinLogging @@ -37,7 +37,7 @@ import java.io.OutputStream import org.apache.avro.Schema interface ObjectStorageFormattingWriter : Closeable { - fun accept(record: DestinationRecord) + fun accept(record: DestinationRecordAirbyteValue) fun flush() } @@ -80,7 +80,7 @@ class JsonFormattingWriter( private val rootLevelFlattening: Boolean, ) : ObjectStorageFormattingWriter { - override fun accept(record: DestinationRecord) { + override fun accept(record: DestinationRecordAirbyteValue) { val data = record.dataWithAirbyteMeta(stream, rootLevelFlattening).toJson().serializeToString() outputStream.write(data) @@ -104,7 +104,7 @@ class CSVFormattingWriter( private val finalSchema = stream.schema.withAirbyteMeta(rootLevelFlattening) private val printer = finalSchema.toCsvPrinterWithHeader(outputStream) - override fun accept(record: DestinationRecord) { + override fun accept(record: DestinationRecordAirbyteValue) { printer.printRecord( record.dataWithAirbyteMeta(stream, rootLevelFlattening).toCsvRecord(finalSchema) ) @@ -137,7 +137,7 @@ class AvroFormattingWriter( log.info { "Generated avro schema: $avroSchema" } } - override fun accept(record: DestinationRecord) { + override fun accept(record: DestinationRecordAirbyteValue) { val dataMapped = pipeline.map(record.data, record.meta?.changes) val withMeta = dataMapped.withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening) writer.write(withMeta.toAvroRecord(mappedSchema, avroSchema)) @@ -170,7 +170,7 @@ class ParquetFormattingWriter( log.info { "Generated avro schema: $avroSchema" } } - override fun accept(record: DestinationRecord) { + override fun accept(record: DestinationRecordAirbyteValue) { val dataMapped = pipeline.map(record.data, record.meta?.changes) val withMeta = dataMapped.withAirbyteMeta(stream, record.emittedAtMs, rootLevelFlattening) writer.write(withMeta.toAvroRecord(mappedSchema, avroSchema)) @@ -210,7 +210,7 @@ class BufferedFormattingWriter( val bufferSize: Int get() = buffer.size() - override fun accept(record: DestinationRecord) { + override fun accept(record: DestinationRecordAirbyteValue) { writer.accept(record) } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt index 0a871b74188d..6180f9b7ff62 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt @@ -10,7 +10,7 @@ import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.PartFactory import io.airbyte.cdk.load.message.Batch -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.object_storage.* import io.airbyte.cdk.load.write.BatchAccumulator import io.github.oshai.kotlinlogging.KotlinLogging @@ -38,7 +38,7 @@ class RecordToPartAccumulator( private val currentObject = ConcurrentHashMap>() override suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean ): Batch { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt index 527651bbc476..6fc960633066 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt @@ -9,7 +9,7 @@ import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriter import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.object_storage.* import io.mockk.coEvery import io.mockk.coVerify @@ -40,8 +40,8 @@ class RecordToPartAccumulatorTest { coEvery { bufferedWriter.close() } returns Unit } - private fun makeRecord(): DestinationRecord = - DestinationRecord( + private fun makeRecord(): DestinationRecordAirbyteValue = + DestinationRecordAirbyteValue( DestinationStream.Descriptor("test", "stream"), ObjectValue(linkedMapOf()), 0L, @@ -49,7 +49,7 @@ class RecordToPartAccumulatorTest { "" ) - private fun makeRecords(n: Int): Iterator = + private fun makeRecords(n: Int): Iterator = (0 until n).map { makeRecord() }.listIterator() private fun makeBytes(n: Int): ByteArray? = diff --git a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml index ca2b28ffac1d..698f7f924ead 100644 --- a/airbyte-integrations/connectors/destination-dev-null/metadata.yaml +++ b/airbyte-integrations/connectors/destination-dev-null/metadata.yaml @@ -10,7 +10,7 @@ data: - suite: integrationTests connectorType: destination definitionId: a7bcc9d8-13b3-4e49-b80d-d020b90045e3 - dockerImageTag: 0.7.13 + dockerImageTag: 0.7.14 dockerRepository: airbyte/destination-dev-null documentationUrl: https://docs.airbyte.com/integrations/destinations/dev-null githubIssueLabel: destination-dev-null diff --git a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt index 1bf7d57284b7..cdb5da037782 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/main/kotlin/io/airbyte/integrations/destination/dev_null/DevNullWriter.kt @@ -8,7 +8,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader @@ -68,7 +68,7 @@ class LoggingStreamLoader(override val stream: DestinationStream, loggingConfig: } override suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean, ): Batch { @@ -100,7 +100,7 @@ class LoggingStreamLoader(override val stream: DestinationStream, loggingConfig: class SilentStreamLoader(override val stream: DestinationStream) : StreamLoader { override suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean ): Batch { @@ -123,7 +123,7 @@ class ThrottledStreamLoader( private val log = KotlinLogging.logger {} override suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean ): Batch { @@ -153,7 +153,7 @@ class FailingStreamLoader(override val stream: DestinationStream, private val nu } override suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean ): Batch { diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index 87e005b677ab..62a6c0bc406a 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -16,7 +16,7 @@ data: alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.2.1 + dockerImageTag: 0.2.2 dockerRepository: airbyte/destination-iceberg-v2 documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 githubIssueLabel: destination-iceberg-v2 diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt index 18546300050b..0f46cdbd61c2 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt @@ -9,7 +9,7 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationFile -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.write.StreamLoader @@ -32,7 +32,7 @@ class IcebergStreamLoader( private val log = KotlinLogging.logger {} override suspend fun processRecords( - records: Iterator, + records: Iterator, totalSizeBytes: Long, endOfStream: Boolean ): Batch { diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index b760d8bac44d..c3b3d627f827 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -15,7 +15,7 @@ import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema import io.airbyte.cdk.load.data.withAirbyteMeta -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.integrations.destination.iceberg.v2.ACCESS_KEY_ID import io.airbyte.integrations.destination.iceberg.v2.GlueCredentialsProvider import io.airbyte.integrations.destination.iceberg.v2.IcebergV2Configuration @@ -149,18 +149,19 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { } /** - * Converts an Airbyte [DestinationRecord] into an Iceberg [Record]. The converted record will - * be wrapped to include [Operation] information, which is used by the writer to determine how - * to write the data to the underlying Iceberg files. + * Converts an Airbyte [DestinationRecordAirbyteValue] into an Iceberg [Record]. The converted + * record will be wrapped to include [Operation] information, which is used by the writer to + * determine how to write the data to the underlying Iceberg files. * - * @param record The Airbyte [DestinationRecord] record to be converted for writing by Iceberg. + * @param record The Airbyte [DestinationRecordAirbyteValue] record to be converted for writing + * by Iceberg. * @param stream The Airbyte [DestinationStream] that contains information about the stream. * @param tableSchema The Iceberg [Table] [Schema]. * @param pipeline The [MapperPipeline] used to convert the Airbyte record to an Iceberg record. - * @return An Iceberg [Record] representation of the Airbyte [DestinationRecord]. + * @return An Iceberg [Record] representation of the Airbyte [DestinationRecordAirbyteValue]. */ fun toRecord( - record: DestinationRecord, + record: DestinationRecordAirbyteValue, stream: DestinationStream, tableSchema: Schema, pipeline: MapperPipeline @@ -275,7 +276,7 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { } private fun getOperation( - record: DestinationRecord, + record: DestinationRecordAirbyteValue, importType: ImportType, ): Operation = if ( diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index 48edc396c725..088047ad8edc 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -21,7 +21,7 @@ import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue import io.airbyte.cdk.load.data.TimestampValue import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory -import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.Meta import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID @@ -197,7 +197,7 @@ internal class IcebergUtilTest { syncId = 1, ) val airbyteRecord = - DestinationRecord( + DestinationRecordAirbyteValue( stream = airbyteStream.descriptor, data = ObjectValue( @@ -245,7 +245,7 @@ internal class IcebergUtilTest { syncId = 1, ) val airbyteRecord = - DestinationRecord( + DestinationRecordAirbyteValue( stream = airbyteStream.descriptor, data = ObjectValue( @@ -297,7 +297,7 @@ internal class IcebergUtilTest { syncId = 1, ) val airbyteRecord = - DestinationRecord( + DestinationRecordAirbyteValue( stream = airbyteStream.descriptor, data = ObjectValue( diff --git a/docs/integrations/destinations/dev-null.md b/docs/integrations/destinations/dev-null.md index 901f6eb9f680..8b3d8c0604b2 100644 --- a/docs/integrations/destinations/dev-null.md +++ b/docs/integrations/destinations/dev-null.md @@ -49,8 +49,9 @@ The OSS and Cloud variants have the same version number starting from version `0 | Version | Date | Pull Request | Subject | |:------------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------| -| 0.7.13 | 2024-12-18 | [49899](https://github.com/airbytehq/airbyte/pull/49899) | Use a base image: airbyte/java-connector-base:1.0.0 | -| 0.7.12 | 2024-12-04 | [48794](https://github.com/airbytehq/airbyte/pull/48794) | Promoting release candidate 0.7.12-rc.2 to a main version. | +| 0.7.14 | 2024-12-20 | [49974](https://github.com/airbytehq/airbyte/pull/49974) | Non-functional CDK changes | +| 0.7.13 | 2024-12-18 | [49899](https://github.com/airbytehq/airbyte/pull/49899) | Use a base image: airbyte/java-connector-base:1.0.0 | +| 0.7.12 | 2024-12-04 | [48794](https://github.com/airbytehq/airbyte/pull/48794) | Promoting release candidate 0.7.12-rc.2 to a main version. | | 0.7.12-rc.2 | 2024-11-26 | [48693](https://github.com/airbytehq/airbyte/pull/48693) | Update for testing progressive rollout | | 0.7.12-rc.1 | 2024-11-25 | [48693](https://github.com/airbytehq/airbyte/pull/48693) | Update for testing progressive rollout | | 0.7.11 | 2024-11-18 | [48468](https://github.com/airbytehq/airbyte/pull/48468) | Implement File CDk |