From 2991827f9c7f52e4e9ff98a0cbda6523647b68b3 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 23 May 2024 09:15:36 -0700 Subject: [PATCH] interface changes --- .../operation/StagingStreamOperations.kt | 2 +- .../operation/AbstractStreamOperation.kt | 14 +++++++-- .../operation/StandardStreamOperation.kt | 2 +- .../destination/operation/StorageOperation.kt | 29 +++++++++++++++++-- .../operation/AbstractStreamOperationTest.kt | 22 +++++++------- 5 files changed, 51 insertions(+), 18 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt index 80f4e6c26003..95579acaf360 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/operation/StagingStreamOperations.kt @@ -33,7 +33,7 @@ class StagingStreamOperations( ) { private val log = KotlinLogging.logger {} - override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { + override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream) { val writeBuffer = StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt index d3533a194bfe..e7881fc4602a 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt @@ -24,6 +24,7 @@ abstract class AbstractStreamOperation) { + // redirect to the appropriate raw table (potentially the temp raw table). + writeRecordsImpl( + streamConfig.copy(id = streamConfig.id.copy(rawName = streamConfig.id.rawName + rawTableSuffix)), + stream, + ) + } + /** Write records will be destination type specific, Insert vs staging based on format */ - abstract override fun writeRecords( + abstract fun writeRecordsImpl( streamConfig: StreamConfig, stream: Stream ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt index 06c5b8dabab5..97fdc6eb6375 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StandardStreamOperation.kt @@ -24,7 +24,7 @@ class StandardStreamOperation( destinationInitialStatus, disableTypeDedupe ) { - override fun writeRecords(streamConfig: StreamConfig, stream: Stream) { + override fun writeRecordsImpl(streamConfig: StreamConfig, stream: Stream) { storageOperation.writeToStage(streamConfig, stream) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt index bae34e2b803b..83072da6f119 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperation.kt @@ -6,7 +6,6 @@ package io.airbyte.integrations.base.destination.operation import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig import io.airbyte.integrations.base.destination.typing_deduping.StreamId -import io.airbyte.protocol.models.v0.DestinationSyncMode import java.time.Instant import java.util.Optional @@ -16,9 +15,33 @@ interface StorageOperation { */ /** - * Prepare staging area which cloud be creating any object storage, temp tables or file storage + * Prepare staging area which cloud be creating any object storage, temp tables or file storage. + * Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in conjunction + * with [overwriteStage]. */ - fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) + fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false) + + /** + * Swap the "temporary" stage into the "real" stage. For example, + * `DROP TABLE airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`. + */ + fun overwriteStage(streamId: StreamId, suffix: String) + + /** + * Copy all records from the temporary stage into the real stage, then drop the temporary stage. + * For example `INSERT INTO airbyte_internal.foo SELECT * FROM airbyte_internal.foo_tmp; DROP + * TABLE airbyte_internal.foo_tmp`. + */ + fun transferFromTempStage(streamId: StreamId, suffix: String) + + /** + * Get the generation of a single record in the stage. Not necessarily the min or max generation, + * just _any_ record. + * + * [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage + * always contains exactly one generation. + */ + fun getStageGeneration(streamId: StreamId, suffix: String): Long /** Delete previously staged data, using deterministic information from streamId. */ fun cleanupStage(streamId: StreamId) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt index 8a372b9b5dbf..0355a32b0a6c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperationTest.kt @@ -52,7 +52,7 @@ class AbstractStreamOperationTest { storageOperation, destinationInitialStatus, ) { - override fun writeRecords( + override fun writeRecordsImpl( streamConfig: StreamConfig, stream: Stream ) { @@ -97,7 +97,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.createFinalTable(streamConfig, "", false) } confirmVerified(storageOperation) @@ -144,7 +144,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) } confirmVerified(storageOperation) @@ -189,7 +189,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") // No table creation - we can just reuse the existing table. } confirmVerified(storageOperation) @@ -232,7 +232,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) } confirmVerified(storageOperation) @@ -276,7 +276,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true) } confirmVerified(storageOperation) @@ -320,7 +320,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.createFinalTable(streamConfig, "", false) } confirmVerified(storageOperation) @@ -366,7 +366,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.softResetFinalTable(streamConfig) } confirmVerified(storageOperation) @@ -410,7 +410,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") // No soft reset - we can just reuse the existing table. } confirmVerified(storageOperation) @@ -450,7 +450,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") storageOperation.softResetFinalTable(streamConfig) } confirmVerified(storageOperation) @@ -505,7 +505,7 @@ class AbstractStreamOperationTest { val streamOperations = TestStreamOperation(storageOperation, initialState) verifySequence { - storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode) + storageOperation.prepareStage(streamId, "") } confirmVerified(storageOperation)