Skip to content

Commit

Permalink
Destinations CDK: refreshes logic (#38622)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jun 17, 2024
1 parent 2432cc8 commit cb6f6ec
Show file tree
Hide file tree
Showing 15 changed files with 746 additions and 165 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.39.0
version=0.40.0
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ abstract class JdbcDestinationHandler<DestinationState>(
streamConfig,
finalTableDefinition.isPresent,
initialRawTableState,
// TODO fix this
// for now, no JDBC destinations actually do refreshes
// so this is just to make our code compile
InitialRawTableStatus(false, false, Optional.empty()),
isSchemaMismatch,
isFinalTableEmpty,
destinationState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
) {

private val log = KotlinLogging.logger {}
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
override fun writeRecordsImpl(
streamConfig: StreamConfig,
suffix: String,
stream: Stream<PartialAirbyteMessage>
) {
val writeBuffer =
StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns)

Expand All @@ -51,7 +55,7 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
}
if (it.byteCount != 0L) {
storageOperation.writeToStage(streamConfig, writeBuffer)
storageOperation.writeToStage(streamConfig, suffix, writeBuffer)
} else {
log.info { "Skipping writing to storage since there are no bytes to write" }
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
destinationInitialStatus,
disableTypeDedupe
) {
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
storageOperation.writeToStage(streamConfig, stream)
override fun writeRecordsImpl(
streamConfig: StreamConfig,
suffix: String,
stream: Stream<PartialAirbyteMessage>
) {
storageOperation.writeToStage(streamConfig, suffix, stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.integrations.base.destination.operation

import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.time.Instant
import java.util.Optional

Expand All @@ -16,15 +15,44 @@ interface StorageOperation<Data> {
*/

/**
* Prepare staging area which cloud be creating any object storage, temp tables or file storage
* Prepare staging area which cloud be creating any object storage, temp tables or file storage.
* Similar to [createFinalTable], accepts a [suffix] parameter, which should be used in
* conjunction with [overwriteStage].
*
* @param replace If true, then replace existing resources with empty e.g. tables. If false,
* then leave existing resources untouched.
*/
fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode)
fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean = false)

/**
* Swap the "temporary" stage into the "real" stage. For example, `DROP TABLE IF NOT EXISTS
* airbyte_internal.foo; ALTER TABLE airbyte_internal.foo_tmp RENAME TO foo`.
*/
fun overwriteStage(streamId: StreamId, suffix: String)

/**
* Copy all records from the temporary stage into the real stage, then drop the temporary stage.
* For example `INSERT INTO airbyte_internal.foo SELECT * FROM airbyte_internal.foo_tmp; DROP
* TABLE airbyte_internal.foo_tmp`.
*/
fun transferFromTempStage(streamId: StreamId, suffix: String)

/**
* Get the generation of a single record in the stage. Not necessarily the min or max
* generation, just _any_ record.
*
* [AbstractStreamOperation] is responsible for orchestrating the stages so that the temp stage
* always contains exactly one generation.
*
* @return The generation ID of a record in the stage, or `null` if the stage is empty.
*/
fun getStageGeneration(streamId: StreamId, suffix: String): Long?

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)

/** Write data to stage. */
fun writeToStage(streamConfig: StreamConfig, data: Data)
fun writeToStage(streamConfig: StreamConfig, suffix: String, data: Data)

/*
* ==================== Final Table Operations ================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ package io.airbyte.integrations.base.destination.typing_deduping
data class DestinationInitialStatus<DestinationState>(
val streamConfig: StreamConfig,
val isFinalTablePresent: Boolean,
// TODO we should probably make this nullable, then delete InitialRawTableStatus.rawTableExists
val initialRawTableStatus: InitialRawTableStatus,
/**
* The state of the temp raw table, or null if there is no temp raw table at the start of the
* sync.
*/
val initialTempRawTableStatus: InitialRawTableStatus,
val isSchemaMismatch: Boolean,
val isFinalTableEmpty: Boolean,
val destinationState: DestinationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ import java.util.*

data class InitialRawTableStatus(
val rawTableExists: Boolean,
/**
* Whether there were any records with null `_airbyte_loaded_at`, at the time that this status
* was fetched.
*/
val hasUnprocessedRecords: Boolean,
// TODO Make maxProcessedTimestamp just `Instant?` instead of Optional
/**
* The highest timestamp such that all records in `SELECT * FROM raw_table WHERE
* _airbyte_extracted_at <= ?` have a nonnull `_airbyte_loaded_at`.
*
* Destinations MAY use this value to only run T+D on records with `_airbyte_extracted_at > ?`
* (note the strictly-greater comparison).
*/
val maxProcessedTimestamp: Optional<Instant>
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ data class StreamId(
return "$quote$finalNamespace$quote.$quote$finalName$suffix$quote"
}

fun rawTableId(quote: String): String {
return "$quote$rawNamespace$quote.$quote$rawName$quote"
@JvmOverloads
fun rawTableId(quote: String, suffix: String = ""): String {
return "$quote$rawNamespace$quote.$quote$rawName$suffix$quote"
}

fun finalName(quote: String): String {
Expand Down
Loading

0 comments on commit cb6f6ec

Please sign in to comment.