Skip to content

Commit

Permalink
feat: iceberg check implementation + minor fixes (#48686)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 authored Nov 27, 2024
1 parent 00ed1dc commit d499da0
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
implementation("software.amazon.awssdk:sts:2.29.9")
implementation("io.github.oshai:kotlin-logging-jvm:7.0.0")
implementation("org.apache.hadoop:hadoop-common:3.4.1")
implementation("org.projectnessie.nessie:nessie-client:0.99.0")

testImplementation("io.mockk:mockk:1.13.13")
testImplementation('org.junit.jupiter:junit-jupiter-api:5.11.3')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
dockerRepository: airbyte/destination-iceberg-v2
githubIssueLabel: destination-iceberg-v2
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,29 @@ class IcebergStreamLoader(
if (streamFailure == null) {
// Doing it first to make sure that data coming in the current batch is written to the
// main branch
log.info {
"No stream failure detected. Committing changes from staging branch '$stagingBranchName' to main branch '$mainBranchName."
}
table.manageSnapshots().fastForwardBranch(mainBranchName, stagingBranchName).commit()
if (stream.minimumGenerationId > 0) {
log.info {
"Detected a minimum generation ID (${stream.minimumGenerationId}). Preparing to delete obsolete generation IDs."
}
val generationIdsToDelete =
(0 until stream.minimumGenerationId).map(
icebergUtil::constructGenerationIdSuffix
)
val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil)
icebergTableCleaner.deleteGenerationId(
table,
DEFAULT_STAGING_BRANCH,
stagingBranchName,
generationIdsToDelete
)
// Doing it again to push the deletes from the staging to main branch
log.info {
"Deleted obsolete generation IDs up to ${stream.minimumGenerationId - 1}. " +
"Pushing these updates to the '$mainBranchName' branch."
}
table
.manageSnapshots()
.fastForwardBranch(mainBranchName, stagingBranchName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,48 @@
package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableCleaner
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.airbyte.integrations.destination.iceberg.v2.io.toIcebergTableIdentifier
import io.github.oshai.kotlinlogging.KotlinLogging
import javax.inject.Singleton
import org.apache.iceberg.Schema
import org.apache.iceberg.types.Types

@Singleton
class IcebergV2Checker : DestinationChecker<IcebergV2Configuration> {
class IcebergV2Checker(
private val icebergTableCleaner: IcebergTableCleaner,
private val icebergUtil: IcebergUtil
) : DestinationChecker<IcebergV2Configuration> {
private val log = KotlinLogging.logger {}
override fun check(config: IcebergV2Configuration) {
// TODO validate the config
catalogValidation(config)
}
private fun catalogValidation(config: IcebergV2Configuration) {
val catalogProperties = icebergUtil.toCatalogProperties(config)
val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties)

val testTableIdentifier = DestinationStream.Descriptor(TEST_NAMESPACE, TEST_TABLE)

val testTableSchema =
Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()),
)
val table =
icebergUtil.createTable(
testTableIdentifier,
catalog,
testTableSchema,
catalogProperties,
)

icebergTableCleaner.clearTable(
catalog,
testTableIdentifier.toIcebergTableIdentifier(),
table.io(),
table.location()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import jakarta.inject.Singleton

const val DEFAULT_CATALOG_NAME = "airbyte"
const val DEFAULT_STAGING_BRANCH = "airbyte_staging"
const val TEST_NAMESPACE = "airbyte_test_namespace"
const val TEST_TABLE = "airbyte_test_table"

data class IcebergV2Configuration(
override val awsAccessKeyConfiguration: AWSAccessKeyConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class IcebergV2Specification :
class IcebergV2SpecificationExtension : DestinationSpecificationExtension {
override val supportedSyncModes =
listOf(
DestinationSyncMode.OVERWRITE,
DestinationSyncMode.APPEND,
)
override val supportsIncremental = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ import org.apache.iceberg.SortOrder
import org.apache.iceberg.Table
import org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT
import org.apache.iceberg.aws.s3.S3FileIO
import org.apache.iceberg.aws.s3.S3FileIOProperties
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.catalog.Namespace
import org.apache.iceberg.catalog.SupportsNamespaces
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.Record
import org.apache.iceberg.exceptions.AlreadyExistsException
import org.projectnessie.client.NessieConfigConstants

private val logger = KotlinLogging.logger {}

Expand Down Expand Up @@ -106,11 +109,25 @@ class IcebergUtil {
properties: Map<String, String>
): Table {
val tableIdentifier = streamDescriptor.toIcebergTableIdentifier()
if (
catalog is SupportsNamespaces && !catalog.namespaceExists(tableIdentifier.namespace())
) {
catalog.createNamespace(tableIdentifier.namespace())
logger.info { "Created namespace '${tableIdentifier.namespace()}'." }
synchronized(tableIdentifier.namespace()) {
if (
catalog is SupportsNamespaces &&
!catalog.namespaceExists(tableIdentifier.namespace())
) {
try {
catalog.createNamespace(tableIdentifier.namespace())
logger.info { "Created namespace '${tableIdentifier.namespace()}'." }
} catch (e: AlreadyExistsException) {
// This exception occurs when multiple threads attempt to write to the same
// namespace in parallel.
// One thread may create the namespace successfully, causing the other threads
// to encounter this exception
// when they also try to create the namespace.
logger.info {
"Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations."
}
}
}
}

return if (!catalog.tableExists(tableIdentifier)) {
Expand Down Expand Up @@ -162,27 +179,36 @@ class IcebergUtil {
* @return The Iceberg [Catalog] configuration properties.
*/
fun toCatalogProperties(icebergConfiguration: IcebergV2Configuration): Map<String, String> {
// Nessie does not allow explicitly setting the S3 bucket region.
// Instead, it relies on a static credentials provider, which retrieves the region
// from the system property. Therefore, we need to set the region this way.
System.setProperty(
"aws.region",
icebergConfiguration.s3BucketConfiguration.s3BucketRegion.region,
)
return mutableMapOf(
// TODO make configurable?
ICEBERG_CATALOG_TYPE to ICEBERG_CATALOG_TYPE_NESSIE,
URI to icebergConfiguration.nessieServerConfiguration.serverUri,
"nessie.ref" to "main",
NessieConfigConstants.CONF_NESSIE_REF to
icebergConfiguration.nessieServerConfiguration.mainBranchName,
WAREHOUSE_LOCATION to
icebergConfiguration.nessieServerConfiguration.warehouseLocation,
// Use Iceberg's S3FileIO for file operations
CatalogProperties.FILE_IO_IMPL to S3FileIO::class.java.name,
"s3.access-key-id" to icebergConfiguration.awsAccessKeyConfiguration.accessKeyId!!,
"s3.secret-access-key" to
S3FileIOProperties.ACCESS_KEY_ID to
icebergConfiguration.awsAccessKeyConfiguration.accessKeyId!!,
S3FileIOProperties.SECRET_ACCESS_KEY to
icebergConfiguration.awsAccessKeyConfiguration.secretAccessKey!!,
"s3.region" to icebergConfiguration.s3BucketConfiguration.s3BucketRegion.toString(),
"s3.endpoint" to icebergConfiguration.s3BucketConfiguration.s3Endpoint!!,
"s3.path-style-access" to "true" // Required for MinIO
S3FileIOProperties.ENDPOINT to
icebergConfiguration.s3BucketConfiguration.s3Endpoint!!,
S3FileIOProperties.PATH_STYLE_ACCESS to "true" // Required for MinIO
)
.apply {
if (icebergConfiguration.nessieServerConfiguration.accessToken != null) {
put("nessie.authentication.type", "BEARER")
put(NessieConfigConstants.CONF_NESSIE_AUTH_TYPE, "BEARER")
put(
"nessie.authentication.token",
NessieConfigConstants.CONF_NESSIE_AUTH_TOKEN,
icebergConfiguration.nessieServerConfiguration.accessToken!!
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package io.airbyte.integrations.destination.iceberg.v2
import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.iceberg.v2.IcebergV2TestUtil.PATH
import org.junit.jupiter.api.Disabled

@Disabled
class IcebergV2CheckTest :
CheckIntegrationTest<IcebergV2Specification>(
successConfigFilenames = listOf(CheckTestConfig(PATH)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "append" ]
"supported_destination_sync_modes" : [ "overwrite", "append" ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,5 @@
"supportsIncremental" : true,
"supportsNormalization" : false,
"supportsDBT" : false,
"supported_destination_sync_modes" : [ "append" ]
"supported_destination_sync_modes" : [ "overwrite", "append" ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ internal class IcebergUtilTest {
assertEquals("main", catalogProperties["nessie.ref"])
assertEquals(awsAccessKey, catalogProperties["s3.access-key-id"])
assertEquals(awsSecretAccessKey, catalogProperties["s3.secret-access-key"])
assertEquals(S3BucketRegion.`us-east-1`.toString(), catalogProperties["s3.region"])
assertEquals(s3Endpoint, catalogProperties["s3.endpoint"])
assertEquals("true", catalogProperties["s3.path-style-access"])
assertEquals("BEARER", catalogProperties["nessie.authentication.type"])
Expand Down

0 comments on commit d499da0

Please sign in to comment.