Skip to content

Commit

Permalink
refactor: use import type to select Iceberg writer (#48621)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev authored Nov 22, 2024
1 parent a1cab9a commit 9041bca
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ class IcebergStreamLoader(
totalSizeBytes: Long
): Batch {
icebergTableWriterFactory
.create(table = table, generationId = icebergUtil.constructGenerationIdSuffix(stream))
.create(
table = table,
generationId = icebergUtil.constructGenerationIdSuffix(stream),
importType = stream.importType
)
.use { writer ->
log.info { "Writing records to branch $stagingBranchName" }
records.forEach { record ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
import io.airbyte.cdk.load.command.ImportType
import io.airbyte.cdk.load.command.Overwrite
import jakarta.inject.Singleton
import java.util.UUID
import org.apache.iceberg.FileFormat
Expand All @@ -30,9 +34,11 @@ class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
* Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table].
*
* @param table An Iceberg [Table]
* @param generationId ID assigned to the data generation associated with the incoming data.
* @param importType The [ImportType] of the sync job.
* @return The [BaseTaskWriter] that writes records to the target [Table].
*/
fun create(table: Table, generationId: String): BaseTaskWriter<Record> {
fun create(table: Table, generationId: String, importType: ImportType): BaseTaskWriter<Record> {
icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId)
val format =
FileFormat.valueOf(
Expand All @@ -52,23 +58,25 @@ class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) {
WRITE_TARGET_FILE_SIZE_BYTES,
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
)
return if (identifierFieldIds == null || identifierFieldIds.isEmpty()) {
newAppendWriter(
table = table,
appenderFactory = appenderFactory,
targetFileSize = targetFileSize,
outputFileFactory = outputFileFactory,
format = format
)
} else {
newDeltaWriter(
table = table,
identifierFieldIds = identifierFieldIds,
appenderFactory = appenderFactory,
targetFileSize = targetFileSize,
outputFileFactory = outputFileFactory,
format = format
)
return when (importType) {
is Append,
Overwrite ->
newAppendWriter(
table = table,
appenderFactory = appenderFactory,
targetFileSize = targetFileSize,
outputFileFactory = outputFileFactory,
format = format
)
is Dedupe ->
newDeltaWriter(
table = table,
identifierFieldIds = identifierFieldIds,
appenderFactory = appenderFactory,
targetFileSize = targetFileSize,
outputFileFactory = outputFileFactory,
format = format
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.iceberg.v2.io

import io.airbyte.cdk.load.command.Append
import io.airbyte.cdk.load.command.Dedupe
import io.mockk.every
import io.mockk.mockk
import java.nio.ByteBuffer
Expand Down Expand Up @@ -70,7 +72,16 @@ internal class IcebergTableWriterFactoryTest {
}

val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil)
val writer = factory.create(table = table, generationId = generationIdSuffix)
val writer =
factory.create(
table = table,
generationId = generationIdSuffix,
importType =
Dedupe(
primaryKey = listOf(primaryKeyIds.map { it.toString() }),
cursor = primaryKeyIds.map { it.toString() }
)
)
assertNotNull(writer)
assertEquals(PartitionedDeltaWriter::class.java, writer.javaClass)
}
Expand Down Expand Up @@ -120,7 +131,15 @@ internal class IcebergTableWriterFactoryTest {

val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil)
val writer =
factory.create(table = table, generationId = "ab-generation-id-${Random.nextLong(100)}")
factory.create(
table = table,
generationId = "ab-generation-id-${Random.nextLong(100)}",
importType =
Dedupe(
primaryKey = listOf(primaryKeyIds.map { it.toString() }),
cursor = primaryKeyIds.map { it.toString() }
)
)
assertNotNull(writer)
assertEquals(UnpartitionedDeltaWriter::class.java, writer.javaClass)
}
Expand Down Expand Up @@ -169,7 +188,11 @@ internal class IcebergTableWriterFactoryTest {

val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil)
val writer =
factory.create(table = table, generationId = "ab-generation-id-${Random.nextLong(100)}")
factory.create(
table = table,
generationId = "ab-generation-id-${Random.nextLong(100)}",
importType = Append
)
assertNotNull(writer)
assertEquals(PartitionedAppendWriter::class.java, writer.javaClass)
}
Expand Down Expand Up @@ -218,7 +241,11 @@ internal class IcebergTableWriterFactoryTest {

val factory = IcebergTableWriterFactory(icebergUtil = icebergUtil)
val writer =
factory.create(table = table, generationId = "ab-generation-id-${Random.nextLong(100)}")
factory.create(
table = table,
generationId = "ab-generation-id-${Random.nextLong(100)}",
importType = Append
)
assertNotNull(writer)
assertEquals(UnpartitionedAppendWriter::class.java, writer.javaClass)
}
Expand Down

0 comments on commit 9041bca

Please sign in to comment.