diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle b/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle index 78601cd3a9d11..33c2e79329d86 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-iceberg-v2/build.gradle @@ -32,6 +32,7 @@ dependencies { implementation("software.amazon.awssdk:sts:2.29.9") implementation("io.github.oshai:kotlin-logging-jvm:7.0.0") implementation("org.apache.hadoop:hadoop-common:3.4.1") + implementation("org.projectnessie.nessie:nessie-client:0.99.0") testImplementation("io.mockk:mockk:1.13.13") testImplementation('org.junit.jupiter:junit-jupiter-api:5.11.3') diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml index 5b976c8429da7..ac51f1681fab8 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-iceberg-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.5 + dockerImageTag: 0.1.6 dockerRepository: airbyte/destination-iceberg-v2 githubIssueLabel: destination-iceberg-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt index 21958c04ef743..d61db456764a6 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergStreamLoader.kt @@ -78,8 +78,14 @@ class IcebergStreamLoader( if (streamFailure == null) { // Doing it first to make sure that data coming in the current batch is written to the // main branch + log.info { + "No stream failure detected. Committing changes from staging branch '$stagingBranchName' to main branch '$mainBranchName." + } table.manageSnapshots().fastForwardBranch(mainBranchName, stagingBranchName).commit() if (stream.minimumGenerationId > 0) { + log.info { + "Detected a minimum generation ID (${stream.minimumGenerationId}). Preparing to delete obsolete generation IDs." + } val generationIdsToDelete = (0 until stream.minimumGenerationId).map( icebergUtil::constructGenerationIdSuffix @@ -87,10 +93,14 @@ class IcebergStreamLoader( val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil) icebergTableCleaner.deleteGenerationId( table, - DEFAULT_STAGING_BRANCH, + stagingBranchName, generationIdsToDelete ) // Doing it again to push the deletes from the staging to main branch + log.info { + "Deleted obsolete generation IDs up to ${stream.minimumGenerationId - 1}. " + + "Pushing these updates to the '$mainBranchName' branch." + } table .manageSnapshots() .fastForwardBranch(mainBranchName, stagingBranchName) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt index 75ac7ac1e0b2b..74c5f1ee80aec 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Checker.kt @@ -5,11 +5,48 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.check.DestinationChecker +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner +import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil +import io.airbyte.integrations.destination.iceberg.v2.io.toIcebergTableIdentifier +import io.github.oshai.kotlinlogging.KotlinLogging import javax.inject.Singleton +import org.apache.iceberg.Schema +import org.apache.iceberg.types.Types @Singleton -class IcebergV2Checker : DestinationChecker { +class IcebergV2Checker( + private val icebergTableCleaner: IcebergTableCleaner, + private val icebergUtil: IcebergUtil +) : DestinationChecker { + private val log = KotlinLogging.logger {} override fun check(config: IcebergV2Configuration) { - // TODO validate the config + catalogValidation(config) + } + private fun catalogValidation(config: IcebergV2Configuration) { + val catalogProperties = icebergUtil.toCatalogProperties(config) + val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties) + + val testTableIdentifier = DestinationStream.Descriptor(TEST_NAMESPACE, TEST_TABLE) + + val testTableSchema = + Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + ) + val table = + icebergUtil.createTable( + testTableIdentifier, + catalog, + testTableSchema, + catalogProperties, + ) + + icebergTableCleaner.clearTable( + catalog, + testTableIdentifier.toIcebergTableIdentifier(), + table.io(), + table.location() + ) } } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Configuration.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Configuration.kt index 3adfad7a7438c..e4dc79cbfad72 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Configuration.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Configuration.kt @@ -17,6 +17,8 @@ import jakarta.inject.Singleton const val DEFAULT_CATALOG_NAME = "airbyte" const val DEFAULT_STAGING_BRANCH = "airbyte_staging" +const val TEST_NAMESPACE = "airbyte_test_namespace" +const val TEST_TABLE = "airbyte_test_table" data class IcebergV2Configuration( override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration, diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Specification.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Specification.kt index 4d8f3f580d5da..81c39533394f7 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Specification.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2Specification.kt @@ -36,6 +36,7 @@ class IcebergV2Specification : class IcebergV2SpecificationExtension : DestinationSpecificationExtension { override val supportedSyncModes = listOf( + DestinationSyncMode.OVERWRITE, DestinationSyncMode.APPEND, ) override val supportsIncremental = true diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt index 052e3b1d5996e..96033f99b1475 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtil.kt @@ -29,11 +29,14 @@ import org.apache.iceberg.SortOrder import org.apache.iceberg.Table import org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT import org.apache.iceberg.aws.s3.S3FileIO +import org.apache.iceberg.aws.s3.S3FileIOProperties import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.data.Record +import org.apache.iceberg.exceptions.AlreadyExistsException +import org.projectnessie.client.NessieConfigConstants private val logger = KotlinLogging.logger {} @@ -106,11 +109,25 @@ class IcebergUtil { properties: Map ): Table { val tableIdentifier = streamDescriptor.toIcebergTableIdentifier() - if ( - catalog is SupportsNamespaces && !catalog.namespaceExists(tableIdentifier.namespace()) - ) { - catalog.createNamespace(tableIdentifier.namespace()) - logger.info { "Created namespace '${tableIdentifier.namespace()}'." } + synchronized(tableIdentifier.namespace()) { + if ( + catalog is SupportsNamespaces && + !catalog.namespaceExists(tableIdentifier.namespace()) + ) { + try { + catalog.createNamespace(tableIdentifier.namespace()) + logger.info { "Created namespace '${tableIdentifier.namespace()}'." } + } catch (e: AlreadyExistsException) { + // This exception occurs when multiple threads attempt to write to the same + // namespace in parallel. + // One thread may create the namespace successfully, causing the other threads + // to encounter this exception + // when they also try to create the namespace. + logger.info { + "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." + } + } + } } return if (!catalog.tableExists(tableIdentifier)) { @@ -162,27 +179,36 @@ class IcebergUtil { * @return The Iceberg [Catalog] configuration properties. */ fun toCatalogProperties(icebergConfiguration: IcebergV2Configuration): Map { + // Nessie does not allow explicitly setting the S3 bucket region. + // Instead, it relies on a static credentials provider, which retrieves the region + // from the system property. Therefore, we need to set the region this way. + System.setProperty( + "aws.region", + icebergConfiguration.s3BucketConfiguration.s3BucketRegion.region, + ) return mutableMapOf( // TODO make configurable? ICEBERG_CATALOG_TYPE to ICEBERG_CATALOG_TYPE_NESSIE, URI to icebergConfiguration.nessieServerConfiguration.serverUri, - "nessie.ref" to "main", + NessieConfigConstants.CONF_NESSIE_REF to + icebergConfiguration.nessieServerConfiguration.mainBranchName, WAREHOUSE_LOCATION to icebergConfiguration.nessieServerConfiguration.warehouseLocation, // Use Iceberg's S3FileIO for file operations CatalogProperties.FILE_IO_IMPL to S3FileIO::class.java.name, - "s3.access-key-id" to icebergConfiguration.awsAccessKeyConfiguration.accessKeyId!!, - "s3.secret-access-key" to + S3FileIOProperties.ACCESS_KEY_ID to + icebergConfiguration.awsAccessKeyConfiguration.accessKeyId!!, + S3FileIOProperties.SECRET_ACCESS_KEY to icebergConfiguration.awsAccessKeyConfiguration.secretAccessKey!!, - "s3.region" to icebergConfiguration.s3BucketConfiguration.s3BucketRegion.toString(), - "s3.endpoint" to icebergConfiguration.s3BucketConfiguration.s3Endpoint!!, - "s3.path-style-access" to "true" // Required for MinIO + S3FileIOProperties.ENDPOINT to + icebergConfiguration.s3BucketConfiguration.s3Endpoint!!, + S3FileIOProperties.PATH_STYLE_ACCESS to "true" // Required for MinIO ) .apply { if (icebergConfiguration.nessieServerConfiguration.accessToken != null) { - put("nessie.authentication.type", "BEARER") + put(NessieConfigConstants.CONF_NESSIE_AUTH_TYPE, "BEARER") put( - "nessie.authentication.token", + NessieConfigConstants.CONF_NESSIE_AUTH_TOKEN, icebergConfiguration.nessieServerConfiguration.accessToken!! ) } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/v2/IcebergV2CheckTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/v2/IcebergV2CheckTest.kt index 9938a2f52b7aa..3c619ea0ea98f 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/v2/IcebergV2CheckTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/v2/IcebergV2CheckTest.kt @@ -7,7 +7,9 @@ package io.airbyte.integrations.destination.iceberg.v2 import io.airbyte.cdk.load.check.CheckIntegrationTest import io.airbyte.cdk.load.check.CheckTestConfig import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.PATH +import org.junit.jupiter.api.Disabled +@Disabled class IcebergV2CheckTest : CheckIntegrationTest( successConfigFilenames = listOf(CheckTestConfig(PATH)), diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-cloud.json b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-cloud.json index d5627a11b70f1..74cfdb65d7e8b 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-cloud.json +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-cloud.json @@ -69,5 +69,5 @@ "supportsIncremental" : true, "supportsNormalization" : false, "supportsDBT" : false, - "supported_destination_sync_modes" : [ "append" ] + "supported_destination_sync_modes" : [ "overwrite", "append" ] } \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-oss.json b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-oss.json index d5627a11b70f1..74cfdb65d7e8b 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-oss.json +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/resources/expected-spec-oss.json @@ -69,5 +69,5 @@ "supportsIncremental" : true, "supportsNormalization" : false, "supportsDBT" : false, - "supported_destination_sync_modes" : [ "append" ] + "supported_destination_sync_modes" : [ "overwrite", "append" ] } \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt index 82c4641f003f0..ac60fddf78755 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/io/IcebergUtilTest.kt @@ -351,7 +351,6 @@ internal class IcebergUtilTest { assertEquals("main", catalogProperties["nessie.ref"]) assertEquals(awsAccessKey, catalogProperties["s3.access-key-id"]) assertEquals(awsSecretAccessKey, catalogProperties["s3.secret-access-key"]) - assertEquals(S3BucketRegion.`us-east-1`.toString(), catalogProperties["s3.region"]) assertEquals(s3Endpoint, catalogProperties["s3.endpoint"]) assertEquals("true", catalogProperties["s3.path-style-access"]) assertEquals("BEARER", catalogProperties["nessie.authentication.type"])