Skip to content

Commit

Permalink
Destination S3 Data Lake: move per-stream setup into start() (#53172)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Feb 7, 2025
1 parent 4729319 commit 04bbe45
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.3.5
dockerImageTag: 0.3.6
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package io.airbyte.integrations.destination.s3_data_lake

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.MapperPipeline
import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory
import io.airbyte.cdk.load.message.Batch
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
import io.airbyte.cdk.load.message.SimpleBatch
Expand All @@ -18,17 +18,50 @@ import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import io.github.oshai.kotlinlogging.KotlinLogging
import org.apache.iceberg.Table

private val logger = KotlinLogging.logger {}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
class S3DataLakeStreamLoader(
private val icebergConfiguration: S3DataLakeConfiguration,
override val stream: DestinationStream,
private val table: Table,
private val s3DataLakeTableSynchronizer: S3DataLakeTableSynchronizer,
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
private val s3DataLakeUtil: S3DataLakeUtil,
private val pipeline: MapperPipeline,
private val stagingBranchName: String,
private val mainBranchName: String
) : StreamLoader {
private val log = KotlinLogging.logger {}
private lateinit var table: Table
private val pipeline = IcebergParquetPipelineFactory().create(stream)

@SuppressFBWarnings(
"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
"something about the `table` lateinit var is confusing spotbugs"
)
override suspend fun start() {
val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration)
val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
val incomingSchema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
table =
s3DataLakeUtil.createTable(
streamDescriptor = stream.descriptor,
catalog = catalog,
schema = incomingSchema,
properties = properties
)

s3DataLakeTableSynchronizer.applySchemaChanges(table, incomingSchema)

try {
logger.info {
"maybe creating branch $DEFAULT_STAGING_BRANCH for stream ${stream.descriptor}"
}
table.manageSnapshots().createBranch(DEFAULT_STAGING_BRANCH).commit()
} catch (e: IllegalArgumentException) {
logger.info {
"branch $DEFAULT_STAGING_BRANCH already exists for stream ${stream.descriptor}"
}
}
}

override suspend fun processRecords(
records: Iterator<DestinationRecordAirbyteValue>,
Expand All @@ -42,7 +75,7 @@ class S3DataLakeStreamLoader(
importType = stream.importType
)
.use { writer ->
log.info { "Writing records to branch $stagingBranchName" }
logger.info { "Writing records to branch $stagingBranchName" }
records.forEach { record ->
val icebergRecord =
s3DataLakeUtil.toRecord(
Expand All @@ -64,7 +97,7 @@ class S3DataLakeStreamLoader(
writeResult.dataFiles().forEach { append.appendFile(it) }
append.commit()
}
log.info { "Finished writing records to $stagingBranchName" }
logger.info { "Finished writing records to $stagingBranchName" }
}

return SimpleBatch(Batch.State.COMPLETE)
Expand All @@ -74,12 +107,12 @@ class S3DataLakeStreamLoader(
if (streamFailure == null) {
// Doing it first to make sure that data coming in the current batch is written to the
// main branch
log.info {
logger.info {
"No stream failure detected. Committing changes from staging branch '$stagingBranchName' to main branch '$mainBranchName."
}
table.manageSnapshots().fastForwardBranch(mainBranchName, stagingBranchName).commit()
if (stream.minimumGenerationId > 0) {
log.info {
logger.info {
"Detected a minimum generation ID (${stream.minimumGenerationId}). Preparing to delete obsolete generation IDs."
}
val generationIdsToDelete =
Expand All @@ -93,7 +126,7 @@ class S3DataLakeStreamLoader(
generationIdsToDelete
)
// Doing it again to push the deletes from the staging to main branch
log.info {
logger.info {
"Deleted obsolete generation IDs up to ${stream.minimumGenerationId - 1}. " +
"Pushing these updates to the '$mainBranchName' branch."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@ package io.airbyte.integrations.destination.s3_data_lake
import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory
import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil
import io.github.oshai.kotlinlogging.KotlinLogging
import javax.inject.Singleton
import org.apache.iceberg.catalog.TableIdentifier

private val logger = KotlinLogging.logger {}

@Singleton
class S3DataLakeWriter(
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
Expand Down Expand Up @@ -60,37 +56,12 @@ class S3DataLakeWriter(
}

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration)
val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties)
val pipeline = IcebergParquetPipelineFactory().create(stream)
val incomingSchema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline)
val table =
s3DataLakeUtil.createTable(
streamDescriptor = stream.descriptor,
catalog = catalog,
schema = incomingSchema,
properties = properties
)

s3DataLakeTableSynchronizer.applySchemaChanges(table, incomingSchema)

try {
logger.info {
"maybe creating branch $DEFAULT_STAGING_BRANCH for stream ${stream.descriptor}"
}
table.manageSnapshots().createBranch(DEFAULT_STAGING_BRANCH).commit()
} catch (e: IllegalArgumentException) {
logger.info {
"branch $DEFAULT_STAGING_BRANCH already exists for stream ${stream.descriptor}"
}
}

return S3DataLakeStreamLoader(
stream = stream,
table = table,
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
s3DataLakeUtil = s3DataLakeUtil,
pipeline = pipeline,
icebergConfiguration,
stream,
s3DataLakeTableSynchronizer,
s3DataLakeTableWriterFactory,
s3DataLakeUtil,
stagingBranchName = DEFAULT_STAGING_BRANCH,
mainBranchName = icebergConfiguration.icebergCatalogConfiguration.mainBranchName,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ abstract class S3DataLakeWriteTest(
fun testDedupNullPk() {
val failure = expectFailure {
runSync(
configContents,
updatedConfig,
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Dedupe(primaryKey = listOf(listOf("id")), cursor = emptyList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import io.mockk.just
import io.mockk.mockk
import io.mockk.runs
import io.mockk.verify
import kotlinx.coroutines.runBlocking
import org.apache.iceberg.Schema
import org.apache.iceberg.Table
import org.apache.iceberg.UpdateSchema
Expand Down Expand Up @@ -243,7 +244,8 @@ internal class S3DataLakeWriterTest {
catalog = destinationCatalog,
tableIdGenerator = tableIdGenerator,
)
s3DataLakeWriter.createStreamLoader(stream = stream)
val streamLoader = s3DataLakeWriter.createStreamLoader(stream = stream)
runBlocking { streamLoader.start() }

verify(exactly = 0) { updateSchema.deleteColumn(any()) }
verify(exactly = 0) { updateSchema.updateColumn(any(), any<PrimitiveType>()) }
Expand Down Expand Up @@ -387,7 +389,8 @@ internal class S3DataLakeWriterTest {
tableIdGenerator = tableIdGenerator,
)

s3DataLakeWriter.createStreamLoader(stream = stream)
val streamLoader = s3DataLakeWriter.createStreamLoader(stream = stream)
runBlocking { streamLoader.start() }

verify(exactly = 0) { updateSchema.deleteColumn(any()) }
verify(exactly = 0) { updateSchema.updateColumn(any(), any<PrimitiveType>()) }
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ written into the Iceberg file.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------|
| 0.3.6 | 2025-02-06 | [\#53172](https://github.com/airbytehq/airbyte/pull/53172) | Internal refactor |
| 0.3.5 | 2025-02-06 | [\#53164](https://github.com/airbytehq/airbyte/pull/53164) | Improve error message on null primary key in dedup mode |
| 0.3.4 | 2025-02-05 | [\#53173](https://github.com/airbytehq/airbyte/pull/53173) | Tweak spec wording |
| 0.3.3 | 2025-02-05 | [\#53176](https://github.com/airbytehq/airbyte/pull/53176) | Fix time_with_timezone handling (values are now adjusted to UTC) |
Expand Down

0 comments on commit 04bbe45

Please sign in to comment.