Skip to content

Commit

Permalink
generation id / sync id in cdk
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 23, 2024
1 parent aef0fc7 commit b7dae03
Show file tree
Hide file tree
Showing 23 changed files with 190 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.airbyte.cdk.integrations.base

import java.util.*
import java.util.Locale
import org.apache.commons.lang3.StringUtils

fun upperQuoted(column: String): String {
Expand All @@ -24,41 +24,60 @@ object JavaBaseConstants {
const val COLUMN_NAME_DATA: String = "_airbyte_data"
@JvmField
val LEGACY_RAW_TABLE_COLUMNS: List<String> =
java.util.List.of(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)
listOf(COLUMN_NAME_AB_ID, COLUMN_NAME_DATA, COLUMN_NAME_EMITTED_AT)

// destination v2
const val COLUMN_NAME_AB_RAW_ID: String = "_airbyte_raw_id"
const val COLUMN_NAME_AB_LOADED_AT: String = "_airbyte_loaded_at"
const val COLUMN_NAME_AB_EXTRACTED_AT: String = "_airbyte_extracted_at"
const val COLUMN_NAME_AB_META: String = "_airbyte_meta"
const val COLUMN_NAME_AB_GENERATION_ID: String = "_airbyte_generation_id"

const val AIRBYTE_META_SYNC_ID_KEY = "sync_id"

// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
// use this column list.
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List<String> =
java.util.List.of(
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
java.util.List.of(
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION: List<String> =
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID,
)
@JvmField
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META)
listOf(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META,
COLUMN_NAME_AB_GENERATION_ID
)

const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
enum class DestinationColumns(val rawColumns: List<String>) {
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
V2_WITH_GENERATION(V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION),
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
protected abstract fun writeRecord(
recordString: String,
airbyteMetaString: String,
emittedAt: Long
generationId: Long,
emittedAt: Long,
)

/**
Expand Down Expand Up @@ -99,7 +100,12 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
}

@Throws(Exception::class)
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
override fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long {
if (!isStarted) {
if (useCompression) {
compressedBuffer = GzipCompressorOutputStream(byteCounter)
Expand All @@ -111,7 +117,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
}
if (inputStream == null && !isClosed) {
val startCount = byteCounter.count
writeRecord(recordString, airbyteMetaString, emittedAt)
writeRecord(recordString, airbyteMetaString, generationId, emittedAt)
return byteCounter.count - startCount
} else {
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ interface SerializableBuffer : AutoCloseable {
* @throws Exception
*/
@Throws(Exception::class)
fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long
fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long

/** Flush a buffer implementation. */
@Throws(Exception::class) fun flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,10 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
includeCdcDeletedAt: Boolean,
streamId: StreamId,
suffix: String?,
records: List<JsonNode>
records: List<JsonNode>,
generationId: Long,
) {
// TODO handle generation ID
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
insertRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ class AvroSerializedBuffer(

@Throws(IOException::class)
@Suppress("DEPRECATION")
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
override fun writeRecord(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
) {
// TODO Remove this double deserialization when S3 Destinations moves to Async.
writeRecord(
Jsons.deserialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ abstract class BaseSheetGenerator : CsvSheetGenerator {
id: UUID,
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String
formattedAirbyteMetaString: String,
generationId: Long,
): List<Any> {
// TODO: Make this abstract or default if No-op is intended in NoFlatteningSheetGenerator or
// RootLevelFlatteningSheetGenerator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,19 @@ class CsvSerializedBuffer(
}

@Throws(IOException::class)
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
override fun writeRecord(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
) {
csvPrinter!!.printRecord(
csvSheetGenerator.getDataRow(
UUID.randomUUID(),
recordString,
emittedAt,
airbyteMetaString,
generationId,
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ interface CsvSheetGenerator {
id: UUID,
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String
formattedAirbyteMetaString: String,
generationId: Long,
): List<Any>

object Factory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StagingDatabaseCsvSheetGenerator
@JvmOverloads
constructor(
private val destinationColumns: JavaBaseConstants.DestinationColumns =
JavaBaseConstants.DestinationColumns.LEGACY
JavaBaseConstants.DestinationColumns.LEGACY,
) : CsvSheetGenerator {
override fun getHeaderRow(): List<String> {
return destinationColumns.rawColumns
Expand All @@ -40,7 +40,9 @@ constructor(
id,
Jsons.serialize(recordMessage.data),
recordMessage.emittedAt,
Jsons.serialize(recordMessage.meta)
Jsons.serialize(recordMessage.meta),
// Legacy code. Default to generation 0.
0,
)
}

Expand All @@ -52,7 +54,8 @@ constructor(
id: UUID,
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String
formattedAirbyteMetaString: String,
generationId: Long,
): List<Any> {
return when (destinationColumns) {
JavaBaseConstants.DestinationColumns.LEGACY ->
Expand All @@ -67,6 +70,15 @@ constructor(
)
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META ->
listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString)
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION ->
listOf(
id,
Instant.ofEpochMilli(emittedAt),
"",
formattedString,
formattedAirbyteMetaString,
generationId
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ class JsonLSerializedBuffer(
}

@Suppress("DEPRECATION")
override fun writeRecord(recordString: String, airbyteMetaString: String, emittedAt: Long) {
override fun writeRecord(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
) {
// TODO Remove this double deserialization when S3 Destinations moves to Async.
writeRecord(
Jsons.deserialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ class ParquetSerializedBuffer(
}

@Throws(Exception::class)
override fun accept(recordString: String, airbyteMetaString: String, emittedAt: Long): Long {
override fun accept(
recordString: String,
airbyteMetaString: String,
generationId: Long,
emittedAt: Long
): Long {
throw UnsupportedOperationException(
"This method is not supported for ParquetSerializedBuffer"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ internal class AsyncFlush(
writer.accept(
record!!.serialized!!,
Jsons.serialize(record.record!!.meta),
// Destinations that want to use generations should switch to the new
// structure (e.g. StagingStreamOperations)
0,
record.record!!.emittedAt
)
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ private constructor(
val tableName: String
when (destinationColumns) {
JavaBaseConstants.DestinationColumns.V2_WITH_META,
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> {
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META,
JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION -> {
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
outputSchema = streamId.rawNamespace
tableName = streamId.rawName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ class StagingStreamOperations<DestinationState : MinimumDestinationState>(
it.accept(
record.serialized!!,
Jsons.serialize(record.record!!.meta),
streamConfig.generationId,
record.record!!.emittedAt
)
}
it.flush()
log.info {
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
}
storageOperation.writeToStage(streamConfig.id, writeBuffer)
storageOperation.writeToStage(streamConfig, writeBuffer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.base.JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
Expand All @@ -15,6 +16,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.CompletableFuture
Expand Down Expand Up @@ -102,7 +104,22 @@ class DefaultSyncOperation<DestinationState : MinimumDestinationState>(
override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
val streamConfig =
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
streamOpsMap[streamConfig.id]?.writeRecords(
streamConfig,
stream.map { record ->
if (record.record!!.meta == null) {
record.record!!.meta = AirbyteRecordMessageMeta()
}
record.also {
it.record!!
.meta!!
.setAdditionalProperty(
AIRBYTE_META_SYNC_ID_KEY,
streamConfig.syncId,
)
}
},
)
}

override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ class StandardStreamOperation<DestinationState : MinimumDestinationState>(
disableTypeDedupe
) {
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
storageOperation.writeToStage(streamConfig.id, stream)
storageOperation.writeToStage(streamConfig, stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ interface StorageOperation<Data> {
fun cleanupStage(streamId: StreamId)

/** Write data to stage. */
fun writeToStage(streamId: StreamId, data: Data)
fun writeToStage(streamConfig: StreamConfig, data: Data)

/*
* ==================== Final Table Operations ================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,19 @@ abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> : Destination
if (
!(schemaMatchesExpectation(
existingV2AirbyteRawTable,
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META,
) ||
schemaMatchesExpectation(
existingV2AirbyteRawTable,
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
) ||
schemaMatchesExpectation(
existingV2AirbyteRawTable,
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITH_GENERATION,
))
) {
throw UnexpectedSchemaException(
"Destination V2 Raw Table does not match expected Schema"
"Destination V2 Raw Table does not match expected Schema",
)
}
}
Expand Down
Loading

0 comments on commit b7dae03

Please sign in to comment.