Skip to content

Commit

Permalink
CDK: Destinations: Backport CDK fixes for redshift (#40499)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jul 1, 2024
1 parent 9221114 commit 779d363
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 45 deletions.
5 changes: 4 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 0.40.8 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.38.3 | 2024-06-25 | [\#40499](https://github.com/airbytehq/airbyte/pull/40499) | (backport) Make JdbcDatabase SQL statement logging optional; add generation_id support to JdbcSqlGenerator |
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| ~~0.40.6~~ | | | (this version does not exist) |
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.40.7
version=0.40.8
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst
Expand Down Expand Up @@ -50,7 +51,8 @@ abstract class JdbcDestinationHandler<DestinationState>(
protected val catalogName: String?,
protected val jdbcDatabase: JdbcDatabase,
protected val rawTableNamespace: String,
private val dialect: SQLDialect
private val dialect: SQLDialect,
private val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
) : DestinationHandler<DestinationState> {
protected val dslContext: DSLContext
get() = DSL.using(dialect)
Expand Down Expand Up @@ -363,6 +365,14 @@ abstract class JdbcDestinationHandler<DestinationState>(
)
}

protected open fun isAirbyteGenerationColumnMatch(existingTable: TableDefinition): Boolean {
return toJdbcTypeName(AirbyteProtocolType.INTEGER)
.equals(
existingTable.columns.getValue(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID).type,
ignoreCase = true,
)
}

open protected fun existingSchemaMatchesStreamConfig(
stream: StreamConfig?,
existingTable: TableDefinition
Expand All @@ -375,7 +385,11 @@ abstract class JdbcDestinationHandler<DestinationState>(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
) && isAirbyteExtractedAtColumnMatch(existingTable)) ||
!(existingTable.columns.containsKey(JavaBaseConstants.COLUMN_NAME_AB_META) &&
isAirbyteMetaColumnMatch(existingTable))
isAirbyteMetaColumnMatch(existingTable)) ||
(columns == DestinationColumns.V2_WITH_GENERATION &&
!(existingTable.columns.containsKey(
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
) && isAirbyteGenerationColumnMatch(existingTable)))
) {
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
// soft-reset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
Expand All @@ -23,12 +24,11 @@ import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.sql.Timestamp
import java.time.Instant
import java.util.*
import kotlin.Any
import kotlin.Boolean
import kotlin.IllegalArgumentException
import java.util.Locale
import java.util.Optional
import kotlin.Int
import org.jooq.Condition
import org.jooq.CreateTableColumnStep
import org.jooq.DSLContext
import org.jooq.DataType
import org.jooq.Field
Expand All @@ -37,6 +37,7 @@ import org.jooq.Name
import org.jooq.Record
import org.jooq.SQLDialect
import org.jooq.SelectConditionStep
import org.jooq.SelectFieldOrAsterisk
import org.jooq.conf.ParamType
import org.jooq.impl.DSL
import org.jooq.impl.SQLDataType
Expand All @@ -45,7 +46,9 @@ abstract class JdbcSqlGenerator
@JvmOverloads
constructor(
protected val namingTransformer: NamingConventionTransformer,
private val cascadeDrop: Boolean = false
private val cascadeDrop: Boolean = false,
@VisibleForTesting
internal val columns: DestinationColumns = DestinationColumns.V2_WITH_GENERATION,
) : SqlGenerator {
protected val cdcDeletedAtColumn: ColumnId = buildColumnId("_ab_cdc_deleted_at")

Expand Down Expand Up @@ -199,6 +202,9 @@ constructor(
SQLDataType.VARCHAR(36).nullable(false)
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT] =
timestampWithTimeZoneType.nullable(false)
if (columns == DestinationColumns.V2_WITH_GENERATION) {
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID] = SQLDataType.BIGINT
}
if (includeMetaColumn)
metaColumns[JavaBaseConstants.COLUMN_NAME_AB_META] = structType.nullable(false)
return metaColumns
Expand Down Expand Up @@ -332,38 +338,50 @@ constructor(
rawTableName: Name,
namespace: String,
tableName: String
) =
dslContext
.createTable(rawTableName)
.column(
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
SQLDataType.VARCHAR(36).nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
timestampWithTimeZoneType.nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
timestampWithTimeZoneType.nullable(true),
)
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
.`as`(
DSL.select(
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
DSL.cast(null, timestampWithTimeZoneType)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
)
.from(DSL.table(DSL.name(namespace, tableName))),
): String {
val hasGenerationId = columns == DestinationColumns.V2_WITH_GENERATION

val createTable: CreateTableColumnStep =
dslContext
.createTable(rawTableName)
.column(
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
SQLDataType.VARCHAR(36).nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
timestampWithTimeZoneType.nullable(false),
)
.column(
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
timestampWithTimeZoneType.nullable(true),
)
.column(JavaBaseConstants.COLUMN_NAME_DATA, structType.nullable(false))
.column(JavaBaseConstants.COLUMN_NAME_AB_META, structType.nullable(true))
if (hasGenerationId) {
createTable.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
}

val selectColumns: MutableList<SelectFieldOrAsterisk> =
mutableListOf(
DSL.field(JavaBaseConstants.COLUMN_NAME_AB_ID)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID),
DSL.field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT),
DSL.cast(null, timestampWithTimeZoneType)
.`as`(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT),
DSL.field(JavaBaseConstants.COLUMN_NAME_DATA)
.`as`(JavaBaseConstants.COLUMN_NAME_DATA),
DSL.cast(null, structType).`as`(JavaBaseConstants.COLUMN_NAME_AB_META),
)
if (hasGenerationId) {
selectColumns += DSL.value(0).`as`(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
}

return createTable
.`as`(DSL.select(selectColumns).from(DSL.table(DSL.name(namespace, tableName))))
.getSQL(ParamType.INLINED)
}

override fun clearLoadedAt(streamId: StreamId): Sql {
return of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.DestinationColumns
import io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
Expand Down Expand Up @@ -90,16 +92,18 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina

@Throws(Exception::class)
override fun createRawTable(streamId: StreamId) {
database.execute(
val columns =
dslContext
.createTable(DSL.name(streamId.rawNamespace, streamId.rawName))
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
.column(COLUMN_NAME_AB_EXTRACTED_AT, timestampWithTimeZoneType.nullable(false))
.column(COLUMN_NAME_AB_LOADED_AT, timestampWithTimeZoneType)
.column(COLUMN_NAME_DATA, structType.nullable(false))
.column(COLUMN_NAME_AB_META, structType.nullable(true))
.getSQL(ParamType.INLINED)
)
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
columns.column(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, SQLDataType.BIGINT)
}
database.execute(columns.getSQL(ParamType.INLINED))
}

@Throws(Exception::class)
Expand All @@ -118,7 +122,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
public override fun insertRawTableRecords(streamId: StreamId, records: List<JsonNode>) {
insertRecords(
DSL.name(streamId.rawNamespace, streamId.rawName),
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
sqlGenerator.columns.rawColumns,
records,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META
Expand All @@ -143,9 +147,12 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
records: List<JsonNode>,
generationId: Long,
) {
// TODO handle generation ID
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
(if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES)
.toMutableList()
if (sqlGenerator.columns == DestinationColumns.V2_WITH_GENERATION) {
columnNames += COLUMN_NAME_AB_GENERATION_ID
}
insertRecords(
DSL.name(streamId.finalNamespace, streamId.finalName + suffix),
columnNames,
Expand Down

0 comments on commit 779d363

Please sign in to comment.