Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations CDK: generation_id/sync_id plumbing #38358

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.36.0 | 2024-05-29 | [\#38358](https://github.com/airbytehq/airbyte/pull/38358) | Plumb generation_id / sync_id to destinations code |
| 0.35.14 | 2024-05-28 | [\#38738](https://github.com/airbytehq/airbyte/pull/38738) | make ThreadCreationInfo cast as nullable |
| 0.35.13 | 2024-05-28 | [\#38632](https://github.com/airbytehq/airbyte/pull/38632) | minor changes to allow conversion of snowflake tests to kotlin |
| 0.35.12 | 2024-05-23 | [\#38638](https://github.com/airbytehq/airbyte/pull/38638) | Minor change to support Snowflake conversion to Kotlin |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.airbyte.cdk.integrations.base

import java.util.*
import java.util.Locale
import org.apache.commons.lang3.StringUtils

fun upperQuoted(column: String): String {
Expand All @@ -24,41 +24,60 @@ object JavaBaseConstants {
const val COLUMN_NAME_DATA: String = "_airbyte_data"
@JvmField
val LEGACY_RAW_TABLE_COLUMNS: List<String> =
java.util.List.of(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)
listOf(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)

// destination v2
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
const val COLUMN_NAME_AB_LOADED_AT: String = "_airbyte_loaded_at"
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"

const val AIRBYTE_META_SYNC_ID_KEY = "sync_id"

// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
// use this column list.
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List<String> =
java.util.List.of(
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
java.util.List.of(
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION: List<String> =
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID,
)
@JvmField
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META)
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID
)

const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
enum class DestinationColumns(val rawColumns: List<String>) {
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
V2_WITH_GENERATION(V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't expecting this enum to become useful this early!

LEGACY(LEGACY_RAW_TABLE_COLUMNS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
protected abstract fun writeRecord(
recordString: String,
airbyteMetaString: String,
emittedAt: Long
generationId: Long,
emittedAt: Long,
)

/**
Expand Down Expand Up @@ -99,7 +100,12 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
}

@Throws(Exception::class)
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
override fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long {
if (!isStarted) {
if (useCompression) {
compressedBuffer = GzipCompressorOutputStream(byteCounter)
Expand All @@ -111,7 +117,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
}
if (inputStream == null && !isClosed) {
val startCount = byteCounter.count
writeRecord(recordString, airbyteMetaString, emittedAt)
writeRecord(recordString, airbyteMetaString, generationId, emittedAt)
return byteCounter.count - startCount
} else {
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ interface SerializableBuffer : AutoCloseable {
* @throws Exception
*/
@Throws(Exception::class)
fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long
fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long

/** Flush a buffer implementation. */
@Throws(Exception::class) fun flush()
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.14
version=0.36.0
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,10 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
includeCdcDeletedAt: Boolean,
streamId: StreamId,
suffix: String?,
records: List<JsonNode>
records: List<JsonNode>,
generationId: Long,
) {
// TODO handle generation ID
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gisripa I'm leaving the jdbc sqlgenerator integration test unimplemented for now. It's probably (?) pattern-matchable from https://github.com/airbytehq/airbyte/pull/38359/files#diff-39dbfe0f01e0b6a3efd7e11401344874227b2e30e04229391b38cbb9893913b2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. Seems similar. for now its just no-op for old jdbc connectors right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. in theory this means they'll continue to build+test successfully, though... I haven't validated that in any way

val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
insertRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ class AvroSerializedBuffer(

@Throws(IOException::class)
@Suppress("DEPRECATION")
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
override fun writeRecord(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
) {
// TODO Remove this double deserialization when S3 Destinations moves to Async.
writeRecord(
Jsons.deserialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ abstract class BaseSheetGenerator : CsvSheetGenerator {
id: UUID,
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String
formattedAirbyteMetaString: String,
generationId: Long,
): List<Any> {
// TODO: Make this abstract or default if No-op is intended in NoFlatteningSheetGenerator or
// RootLevelFlatteningSheetGenerator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,19 @@ class CsvSerializedBuffer(
}

@Throws(IOException::class)
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
override fun writeRecord(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
) {
csvPrinter!!.printRecord(
csvSheetGenerator.getDataRow(
UUID.randomUUID(),
recordString,
emittedAt,
airbyteMetaString,
generationId,
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ interface CsvSheetGenerator {
id: UUID,
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String
formattedAirbyteMetaString: String,
generationId: Long,
): List<Any>

object Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StagingDatabaseCsvSheetGenerator
@JvmOverloads
constructor(
private val destinationColumns: JavaBaseConstants.DestinationColumns =
JavaBaseConstants.DestinationColumns.LEGACY
JavaBaseConstants.DestinationColumns.LEGACY,
) : CsvSheetGenerator {
override fun getHeaderRow(): List<String> {
return destinationColumns.rawColumns
Expand All @@ -40,7 +40,9 @@ constructor(
id,
Jsons.serialize(recordMessage.data),
recordMessage.emittedAt,
Jsons.serialize(recordMessage.meta)
Jsons.serialize(recordMessage.meta),
// Legacy code. Default to generation 0.
0,
)
}

Expand All @@ -52,7 +54,8 @@ constructor(
id: UUID,
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String
formattedAirbyteMetaString: String,
generationId: Long,
): List<Any> {
return when (destinationColumns) {
JavaBaseConstants.DestinationColumns.LEGACY ->
Expand All @@ -67,6 +70,15 @@ constructor(
)
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META ->
listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString)
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION ->
listOf(
id,
Instant.ofEpochMilli(emittedAt),
"",
formattedString,
formattedAirbyteMetaString,
generationId
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ class JsonLSerializedBuffer(
}

@Suppress("DEPRECATION")
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
override fun writeRecord(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
) {
// TODO Remove this double deserialization when S3 Destinations moves to Async.
writeRecord(
Jsons.deserialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ class ParquetSerializedBuffer(
}

@Throws(Exception::class)
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
override fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long {
throw UnsupportedOperationException(
"This method is not supported for ParquetSerializedBuffer"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ internal class AsyncFlush(
writer.accept(
record.serialized!!,
Jsons.serialize(record.record!!.meta),
// Destinations that want to use generations should switch to the new
// structure (e.g. StagingStreamOperations)
0,
record.record!!.emittedAt
)
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ private constructor(
val tableName: String
when (destinationColumns) {
JavaBaseConstants.DestinationColumns.V2_WITH_META,
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> {
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META,
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> {
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
outputSchema = streamId.rawNamespace
tableName = streamId.rawName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
it.accept(
record.serialized!!,
Jsons.serialize(record.record!!.meta),
streamConfig.generationId,
record.record!!.emittedAt
)
}
it.flush()
log.info {
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
}
storageOperation.writeToStage(streamConfig.id, writeBuffer)
storageOperation.writeToStage(streamConfig, writeBuffer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.base.JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
Expand All @@ -15,6 +16,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -102,7 +104,22 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
val streamConfig =
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
streamOpsMap[streamConfig.id]?.writeRecords(
streamConfig,
stream.map { record ->
if (record.record!!.meta == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want to modify the record object? That's a lot of !!!!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think modifying the object makes sense, most destinations are just going to Jsons.serialize(record.record!!.meta), so this is the best place to have logic that's identical across destinations. Otherwise we'd need to inject syncId down through multiple layers, just so that each implementation can do Jsons.serialize(meta.add("sync_id", syncId))

though I have some opinions about

  • passing around AirbyteMessage instead of AirbyteRecordMessage
  • directly using the protocol object (i.e. why are these fields nullable to begin with)

record.record!!.meta = AirbyteRecordMessageMeta()
}
record.also {
it.record!!
.meta!!
.setAdditionalProperty(
AIRBYTE_META_SYNC_ID_KEY,
streamConfig.syncId,
)
}
},
)
}

override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
disableTypeDedupe
) {
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
storageOperation.writeToStage(streamConfig.id, stream)
storageOperation.writeToStage(streamConfig, stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ interface StorageOperation<Data> {
fun cleanupStage(streamId: StreamId)

/** Write data to stage. */
fun writeToStage(streamId: StreamId, data: Data)
fun writeToStage(streamConfig: StreamConfig, data: Data)

/*
* ==================== Final Table Operations ================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,19 @@ abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> : Destination
if (
!(schemaMatchesExpectation(
existingV2AirbyteRawTable,
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META,
) ||
schemaMatchesExpectation(
existingV2AirbyteRawTable,
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
) ||
schemaMatchesExpectation(
existingV2AirbyteRawTable,
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION,
))
) {
throw UnexpectedSchemaException(
"Destination V2 Raw Table does not match expected Schema"
"Destination V2 Raw Table does not match expected Schema",
)
}
}
Expand Down
Loading
Loading