Skip to content

Commit

Permalink
cdk-minor-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 20, 2024
1 parent 471c8b3 commit d7bca46
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 20 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.7 | 2024-05-20 | [\#38357](https://github.com/airbytehq/airbyte/pull/38357) | Decouple create namespace from per stream operation interface. |
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.6
version=0.35.7
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class NoOpJdbcDestinationHandler<DestinationState>(
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun createNamespaces(schemas: Set<String>) {
// Empty op, not used in old code.
}

override fun toJdbcTypeName(airbyteType: AirbyteType): String {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ abstract class AbstractStreamOperation<DestinationState : MinimumDestinationStat
val stream = destinationInitialStatus.streamConfig
storageOperation.prepareStage(stream.id, stream.destinationSyncMode)
if (!disableTypeDedupe) {
storageOperation.createFinalNamespace(stream.id)
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
private val defaultNamespace: String,
private val streamOperationFactory: StreamOperationFactory<DestinationState>,
private val migrations: List<Migration<DestinationState>>,
private val disableTypeDedupe: Boolean = false,
private val executorService: ExecutorService =
Executors.newFixedThreadPool(
10,
Expand All @@ -47,7 +48,8 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(

private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
log.info { "Preparing required schemas and tables for all streams" }
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)
val streamConfigs = parsedCatalog.streams
val streamsInitialStates = destinationHandler.gatherInitialState(streamConfigs)

val postMigrationInitialStates =
tdutils.executeRawTableMigrations(
Expand All @@ -60,11 +62,23 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
postMigrationInitialStates.associate { it.streamConfig.id to it.destinationState }
)

// Prepare raw and final schemas
val rawNamespaces = streamConfigs.map { it.id.rawNamespace }.toSet()
val finalNamespaces = streamConfigs.map { it.id.finalNamespace }.toSet()
val allNamespaces =
if (disableTypeDedupe) rawNamespaces else rawNamespaces + finalNamespaces
destinationHandler.createNamespaces(allNamespaces)

val initializationFutures =
postMigrationInitialStates
.map {
CompletableFuture.supplyAsync(
{ Pair(it.streamConfig.id, streamOperationFactory.createInstance(it)) },
{
Pair(
it.streamConfig.id,
streamOperationFactory.createInstance(it, disableTypeDedupe)
)
},
executorService,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ interface StorageOperation<Data> {
* ==================== Final Table Operations ================================
*/

/** Create final namespace extracted from [StreamId] */
fun createFinalNamespace(streamId: StreamId)

/** Create final table extracted from [StreamId] */
fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ fun interface StreamOperationFactory<DestinationState> {
* implementation.
*/
fun createInstance(
destinationInitialStatus: DestinationInitialStatus<DestinationState>
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
disableTypeDedupe: Boolean,
): StreamOperation<DestinationState>
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ interface DestinationHandler<DestinationState> {

@Throws(Exception::class)
fun commitDestinationStates(destinationStates: Map<StreamId, DestinationState>)

/**
* Create all required namespaces required for the Sync. Implementations may optimize for
* checking if schema exists already.
*
* This exists here instead of StorageOperations to avoid issuing create namespace call for
* every stream and instead called from Sync operation with distinct set of namespaces required
* within the sync.
*/
fun createNamespaces(schemas: Set<String>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ interface SqlGenerator {
fun createTable(stream: StreamConfig, suffix: String, force: Boolean): Sql

/**
* Used to create either the airbyte_internal or final schemas if they don't exist
* TODO delete this; superseded by [DestinationHandler.createNamespaces]
*
* @param schema the schema to create
* @return SQL to create the schema if it does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, "", false)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -143,7 +142,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -186,7 +184,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
// No table creation - we can just reuse the existing table.
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -227,7 +224,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -274,7 +270,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, EXPECTED_OVERWRITE_SUFFIX, true)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -316,7 +311,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.createFinalTable(streamConfig, "", false)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -360,7 +354,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.softResetFinalTable(streamConfig)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -402,7 +395,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
// No soft reset - we can just reuse the existing table.
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -440,7 +432,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
storageOperation.softResetFinalTable(streamConfig)
}
confirmVerified(storageOperation)
Expand Down Expand Up @@ -493,7 +484,6 @@ class AbstractStreamOperationTest {

verifySequence {
storageOperation.prepareStage(streamId, streamConfig.destinationSyncMode)
storageOperation.createFinalNamespace(streamId)
}
confirmVerified(storageOperation)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DefaultSyncOperationTest {
private val streamOperations: MutableMap<StreamConfig, StreamOperation<MockState>> =
mutableMapOf()
private val streamOperationFactory: StreamOperationFactory<MockState> =
StreamOperationFactory { initialStatus: DestinationInitialStatus<MockState> ->
StreamOperationFactory { initialStatus: DestinationInitialStatus<MockState>, _ ->
streamOperations.computeIfAbsent(initialStatus.streamConfig) {
spyk(TestStreamOperation(initialStatus.destinationState))
}
Expand Down Expand Up @@ -118,6 +118,9 @@ class DefaultSyncOperationTest {
),
),
)
destinationHandler.createNamespaces(
setOf(appendStreamConfig.id.rawNamespace, appendStreamConfig.id.finalNamespace)
)
streamOperations.values.onEach { it.updatedDestinationState }
destinationHandler.commitDestinationStates(
mapOf(
Expand Down Expand Up @@ -201,6 +204,9 @@ class DefaultSyncOperationTest {
),
),
)
destinationHandler.createNamespaces(
setOf(appendStreamConfig.id.rawNamespace, appendStreamConfig.id.finalNamespace)
)
streamOperations.values.onEach { it.updatedDestinationState }
destinationHandler.commitDestinationStates(
mapOf(
Expand Down

0 comments on commit d7bca46

Please sign in to comment.