diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/build.gradle b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/build.gradle index d395ecd3a6f0f..5b8253e4686f7 100644 --- a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/build.gradle @@ -9,6 +9,7 @@ dependencies { implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') api "org.apache.iceberg:iceberg-core:${project.ext.apacheIcebergVersion}" api "org.apache.iceberg:iceberg-api:${project.ext.apacheIcebergVersion}" + api("org.apache.iceberg:iceberg-data:${project.ext.apacheIcebergVersion}") api "org.apache.iceberg:iceberg-parquet:${project.ext.apacheIcebergVersion}" api "org.apache.iceberg:iceberg-nessie:${project.ext.apacheIcebergVersion}" diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/java/io/airbyte/integrations/destination/s3_data_lake/io/BaseDeltaTaskWriter.java b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/java/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/BaseDeltaTaskWriter.java similarity index 98% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/java/io/airbyte/integrations/destination/s3_data_lake/io/BaseDeltaTaskWriter.java rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/java/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/BaseDeltaTaskWriter.java index 08761e66d6c04..3031707c8e319 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/java/io/airbyte/integrations/destination/s3_data_lake/io/BaseDeltaTaskWriter.java +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/java/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/BaseDeltaTaskWriter.java @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake.io; +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io; import io.airbyte.cdk.ConfigErrorException; import java.io.IOException; diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinder.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergSuperTypeFinder.kt similarity index 91% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinder.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergSuperTypeFinder.kt index b92a403434bda..600545fdf775d 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinder.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergSuperTypeFinder.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake +package io.airbyte.cdk.load.toolkits.iceberg.parquet import io.airbyte.cdk.ConfigErrorException import jakarta.inject.Singleton @@ -19,22 +19,22 @@ import org.apache.iceberg.types.Types.* * The "supertype" is a type to which both input types can safely be promoted without data loss. For * instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc. * - * @property S3DataLakeTypesComparator comparator used to verify deep type equality. + * @property IcebergTypesComparator comparator used to verify deep type equality. */ @Singleton -class S3DataLakeSuperTypeFinder(private val s3DataLakeTypesComparator: S3DataLakeTypesComparator) { +class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) { private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) /** * Returns a supertype for [existingType] and [incomingType] if one exists. - * - If they are deeply equal (according to [S3DataLakeTypesComparator.typesAreEqual]), returns - * the [existingType] as-is. + * - If they are deeply equal (according to [IcebergTypesComparator.typesAreEqual]), returns the + * [existingType] as-is. * - Otherwise, attempts to combine them into a valid supertype. * - Throws [ConfigErrorException] if no valid supertype can be found. */ fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type { // If the two types are already deeply equal, return one of them (arbitrary). - if (s3DataLakeTypesComparator.typesAreEqual(incomingType, existingType)) { + if (icebergTypesComparator.typesAreEqual(incomingType, existingType)) { return existingType } // Otherwise, attempt to combine them into a valid supertype. diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizer.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTableSynchronizer.kt similarity index 92% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizer.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTableSynchronizer.kt index a6a0386da5cab..ddfbca76f2e2e 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizer.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTableSynchronizer.kt @@ -2,11 +2,11 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake +package io.airbyte.cdk.load.toolkits.iceberg.parquet import io.airbyte.cdk.ConfigErrorException -import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.PARENT_CHILD_SEPARATOR -import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.splitIntoParentAndLeaf +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.PARENT_CHILD_SEPARATOR +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.splitIntoParentAndLeaf import jakarta.inject.Singleton import org.apache.iceberg.Schema import org.apache.iceberg.Table @@ -14,7 +14,7 @@ import org.apache.iceberg.UpdateSchema import org.apache.iceberg.types.Type import org.apache.iceberg.types.Type.PrimitiveType -/** Describes how the [S3DataLakeTableSynchronizer] handles column type changes. */ +/** Describes how the [IcebergTableSynchronizer] handles column type changes. */ enum class ColumnTypeChangeBehavior { /** * Find the supertype between the old and new types, throwing an error if Iceberg does not @@ -30,7 +30,7 @@ enum class ColumnTypeChangeBehavior { }; /** - * If true, [S3DataLakeTableSynchronizer.maybeApplySchemaChanges] will commit the schema update + * If true, [IcebergTableSynchronizer.maybeApplySchemaChanges] will commit the schema update * itself. If false, the caller is responsible for calling * `schemaUpdateResult.pendingUpdate?.commit()`. */ @@ -50,9 +50,9 @@ enum class ColumnTypeChangeBehavior { * @property superTypeFinder Used to find a common supertype when data types differ. */ @Singleton -class S3DataLakeTableSynchronizer( - private val comparator: S3DataLakeTypesComparator, - private val superTypeFinder: S3DataLakeSuperTypeFinder, +class IcebergTableSynchronizer( + private val comparator: IcebergTypesComparator, + private val superTypeFinder: IcebergSuperTypeFinder, ) { /** * Compare [table]'s current schema with [incomingSchema] and apply changes as needed: diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTypesComparator.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTypesComparator.kt similarity index 99% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTypesComparator.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTypesComparator.kt index 5e09704bc2e6e..c19dbbe9a21d2 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTypesComparator.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTypesComparator.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake +package io.airbyte.cdk.load.toolkits.iceberg.parquet import jakarta.inject.Singleton import org.apache.iceberg.Schema @@ -17,7 +17,7 @@ import org.apache.iceberg.types.Types * - Columns that changed from required to optional. */ @Singleton -class S3DataLakeTypesComparator { +class IcebergTypesComparator { companion object { /** Separator used to represent nested field paths: parent~child. */ diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/TableIdGenerator.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/TableIdGenerator.kt new file mode 100644 index 0000000000000..ec1b561fc3c85 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/TableIdGenerator.kt @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.toolkits.iceberg.parquet + +import io.airbyte.cdk.load.command.DestinationStream +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.TableIdentifier + +/** + * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should + * handle catalog-specific naming restrictions. + */ +// TODO accept default namespace in config as a val here +interface TableIdGenerator { + fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier +} + +class SimpleTableIdGenerator(private val configNamespace: String? = "") : TableIdGenerator { + override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier { + val namespace = stream.namespace ?: configNamespace + return tableIdOf(namespace!!, stream.name) + } +} + +// iceberg namespace+name must both be nonnull. +fun tableIdOf(namespace: String, name: String): TableIdentifier = + TableIdentifier.of(Namespace.of(namespace), name) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleaner.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergTableCleaner.kt similarity index 90% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleaner.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergTableCleaner.kt index 2513712f7a042..4a634b27e7400 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleaner.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergTableCleaner.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake.io +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io import jakarta.inject.Singleton import org.apache.iceberg.Table @@ -16,7 +16,7 @@ import org.apache.iceberg.io.SupportsPrefixOperations * catalog implementations do not clear the underlying files written to table storage. */ @Singleton -class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) { +class IcebergTableCleaner(private val icebergUtil: IcebergUtil) { /** * Clears the table identified by the provided [TableIdentifier]. This removes all data and @@ -49,7 +49,7 @@ class S3DataLakeTableCleaner(private val s3DataLakeUtil: S3DataLakeUtil) { val genIdsToDelete = generationIdSuffix .filter { - s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(it) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(it) true } .toSet() diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactory.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergTableWriterFactory.kt similarity index 96% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactory.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergTableWriterFactory.kt index 688766c1461ed..5efc7ca59c713 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergTableWriterFactory.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake.io +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io import io.airbyte.cdk.load.command.Append import io.airbyte.cdk.load.command.Dedupe @@ -30,7 +30,7 @@ import org.apache.iceberg.util.PropertyUtil * and whether primary keys are configured on the destination table's schema. */ @Singleton -class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) { +class IcebergTableWriterFactory(private val icebergUtil: IcebergUtil) { /** * Creates a new [BaseTaskWriter] based on the configuration of the destination target [Table]. * @@ -45,7 +45,7 @@ class S3DataLakeTableWriterFactory(private val s3DataLakeUtil: S3DataLakeUtil) { importType: ImportType, schema: Schema ): BaseTaskWriter { - s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationId) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationId) val format = FileFormat.valueOf( table diff --git a/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergUtil.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergUtil.kt new file mode 100644 index 0000000000000..2bfb46cd0ae10 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergUtil.kt @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io + +import io.airbyte.cdk.load.command.Dedupe +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.ImportType +import io.airbyte.cdk.load.data.MapperPipeline +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord +import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema +import io.airbyte.cdk.load.data.withAirbyteMeta +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator +import io.github.oshai.kotlinlogging.KotlinLogging +import javax.inject.Singleton +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.CatalogUtil +import org.apache.iceberg.FileFormat +import org.apache.iceberg.Schema +import org.apache.iceberg.SortOrder +import org.apache.iceberg.Table +import org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT +import org.apache.iceberg.catalog.Catalog +import org.apache.iceberg.catalog.SupportsNamespaces +import org.apache.iceberg.data.Record +import org.apache.iceberg.exceptions.AlreadyExistsException + +private val logger = KotlinLogging.logger {} + +const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" + +@Singleton +class IcebergUtil(private val tableIdGenerator: TableIdGenerator) { + class InvalidFormatException(message: String) : Exception(message) + + private val generationIdRegex = Regex("""ab-generation-id-\d+-e""") + + fun assertGenerationIdSuffixIsOfValidFormat(generationId: String) { + if (!generationIdRegex.matches(generationId)) { + throw InvalidFormatException( + "Invalid format: $generationId. Expected format is 'ab-generation-id--e'", + ) + } + } + + fun constructGenerationIdSuffix(stream: DestinationStream): String { + return constructGenerationIdSuffix(stream.generationId) + } + + fun constructGenerationIdSuffix(generationId: Long): String { + if (generationId < 0) { + throw IllegalArgumentException( + "GenerationId must be non-negative. Provided: $generationId", + ) + } + return "ab-generation-id-${generationId}-e" + } + /** + * Builds an Iceberg [Catalog]. + * + * @param catalogName The name of the catalog. + * @param properties The map of catalog configuration properties. + * @return The configured Iceberg [Catalog]. + */ + fun createCatalog(catalogName: String, properties: Map): Catalog { + return CatalogUtil.buildIcebergCatalog(catalogName, properties, Configuration()) + } + + /** Create the namespace if it doesn't already exist. */ + fun createNamespace(streamDescriptor: DestinationStream.Descriptor, catalog: Catalog) { + val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor) + synchronized(tableIdentifier.namespace()) { + if ( + catalog is SupportsNamespaces && + !catalog.namespaceExists(tableIdentifier.namespace()) + ) { + try { + catalog.createNamespace(tableIdentifier.namespace()) + logger.info { "Created namespace '${tableIdentifier.namespace()}'." } + } catch (e: AlreadyExistsException) { + // This exception occurs when multiple threads attempt to write to the same + // namespace in parallel. + // One thread may create the namespace successfully, causing the other threads + // to encounter this exception + // when they also try to create the namespace. + logger.info { + "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." + } + } + } + } + } + + /** + * Builds (if necessary) an Iceberg [Table]. This includes creating the table's namespace if it + * does not already exist. If the [Table] already exists, it is loaded from the [Catalog]. + * + * @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's + * namespace and name. + * @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once + * created. + * @param schema The Iceberg [Schema] associated with the [Table]. + * @param properties The [Table] configuration properties derived from the [Catalog]. + * @return The Iceberg [Table], created if it does not yet exist. + */ + fun createTable( + streamDescriptor: DestinationStream.Descriptor, + catalog: Catalog, + schema: Schema, + properties: Map + ): Table { + val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor) + return if (!catalog.tableExists(tableIdentifier)) { + logger.info { "Creating Iceberg table '$tableIdentifier'...." } + catalog + .buildTable(tableIdentifier, schema) + .withProperties(properties) + .withProperty(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name.lowercase()) + .withSortOrder(getSortOrder(schema = schema)) + .create() + } else { + logger.info { "Loading Iceberg table $tableIdentifier ..." } + catalog.loadTable(tableIdentifier) + } + } + + /** + * Converts an Airbyte [DestinationRecordAirbyteValue] into an Iceberg [Record]. The converted + * record will be wrapped to include [Operation] information, which is used by the writer to + * determine how to write the data to the underlying Iceberg files. + * + * @param record The Airbyte [DestinationRecordAirbyteValue] record to be converted for writing + * by Iceberg. + * @param stream The Airbyte [DestinationStream] that contains information about the stream. + * @param tableSchema The Iceberg [Table] [Schema]. + * @param pipeline The [MapperPipeline] used to convert the Airbyte record to an Iceberg record. + * @return An Iceberg [Record] representation of the Airbyte [DestinationRecordAirbyteValue]. + */ + fun toRecord( + record: DestinationRecordAirbyteValue, + stream: DestinationStream, + tableSchema: Schema, + pipeline: MapperPipeline + ): Record { + val dataMapped = + pipeline + .map(record.data, record.meta?.changes) + .withAirbyteMeta(stream, record.emittedAtMs, true) + // TODO figure out how to detect the actual operation value + return RecordWrapper( + delegate = dataMapped.toIcebergRecord(tableSchema), + operation = getOperation(record = record, importType = stream.importType) + ) + } + + fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema { + val primaryKeys = + when (stream.importType) { + is Dedupe -> (stream.importType as Dedupe).primaryKey + else -> emptyList() + } + return pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys) + } + + private fun getSortOrder(schema: Schema): SortOrder { + val builder = SortOrder.builderFor(schema) + schema.identifierFieldNames().forEach { builder.asc(it) } + return builder.build() + } + + private fun getOperation( + record: DestinationRecordAirbyteValue, + importType: ImportType, + ): Operation = + if ( + record.data is ObjectValue && + (record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null && + (record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] !is NullValue + ) { + Operation.DELETE + } else if (importType is Dedupe) { + Operation.UPDATE + } else { + Operation.INSERT + } +} diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/Operation.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/Operation.kt similarity index 72% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/Operation.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/Operation.kt index 76d466213c6d4..151a661ce7e25 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/Operation.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/Operation.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake.io +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io /** Delta operations for data. */ enum class Operation { diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/PartitionedWriters.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/PartitionedWriters.kt similarity index 98% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/PartitionedWriters.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/PartitionedWriters.kt index 645e710243ee1..7b09df29e181d 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/PartitionedWriters.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/PartitionedWriters.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake.io +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io import java.io.IOException import java.io.UncheckedIOException diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/RecordWrapper.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/RecordWrapper.kt similarity index 86% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/RecordWrapper.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/RecordWrapper.kt index 44a72a0c2de00..021334ab80027 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/RecordWrapper.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/RecordWrapper.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake.io +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io import org.apache.iceberg.data.Record diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/UnpartitionedWriters.kt b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/UnpartitionedWriters.kt similarity index 96% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/UnpartitionedWriters.kt rename to airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/UnpartitionedWriters.kt index 638652cae793a..42dc70cd6f679 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/UnpartitionedWriters.kt +++ b/airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/UnpartitionedWriters.kt @@ -2,7 +2,7 @@ * Copyright (c) 2024 Airbyte, Inc., all rights reserved. */ -package io.airbyte.integrations.destination.s3_data_lake.io +package io.airbyte.cdk.load.toolkits.iceberg.parquet.io import org.apache.iceberg.FileFormat import org.apache.iceberg.PartitionSpec diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml index 96c75090de3ce..eb081ce9e0ca5 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml @@ -26,7 +26,7 @@ data: alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 716ca874-520b-4902-9f80-9fad66754b89 - dockerImageTag: 0.3.12 + dockerImageTag: 0.3.13 dockerRepository: airbyte/destination-s3-data-lake documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake githubIssueLabel: destination-s3-data-lake diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeChecker.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeChecker.kt index 3b51afd8fab25..2ccd034b2eae3 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeChecker.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeChecker.kt @@ -6,7 +6,9 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.load.check.DestinationChecker import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner +import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableCleaner +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import javax.inject.Singleton import org.apache.iceberg.Schema @@ -14,8 +16,9 @@ import org.apache.iceberg.types.Types @Singleton class S3DataLakeChecker( - private val s3DataLakeTableCleaner: S3DataLakeTableCleaner, + private val icebergTableCleaner: IcebergTableCleaner, private val s3DataLakeUtil: S3DataLakeUtil, + private val icebergUtil: IcebergUtil, private val tableIdGenerator: TableIdGenerator, ) : DestinationChecker { @@ -24,7 +27,7 @@ class S3DataLakeChecker( } private fun catalogValidation(config: S3DataLakeConfiguration) { val catalogProperties = s3DataLakeUtil.toCatalogProperties(config) - val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties) + val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, catalogProperties) val testTableIdentifier = DestinationStream.Descriptor(TEST_NAMESPACE, TEST_TABLE) @@ -33,15 +36,16 @@ class S3DataLakeChecker( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get()), ) + s3DataLakeUtil.createNamespaceWithGlueHandling(testTableIdentifier, catalog) val table = - s3DataLakeUtil.createTable( + icebergUtil.createTable( testTableIdentifier, catalog, testTableSchema, catalogProperties, ) - s3DataLakeTableCleaner.clearTable( + icebergTableCleaner.clearTable( catalog, tableIdGenerator.toTableIdentifier(testTableIdentifier), table.io(), diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt index 457656f18a8f0..0ec48f8406ac5 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoader.kt @@ -11,9 +11,12 @@ import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue import io.airbyte.cdk.load.message.SimpleBatch import io.airbyte.cdk.load.state.StreamProcessingFailed +import io.airbyte.cdk.load.toolkits.iceberg.parquet.ColumnTypeChangeBehavior +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTableSynchronizer +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableCleaner +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.cdk.load.write.StreamLoader -import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner -import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import io.github.oshai.kotlinlogging.KotlinLogging import org.apache.iceberg.Schema @@ -26,9 +29,10 @@ private val logger = KotlinLogging.logger {} class S3DataLakeStreamLoader( private val icebergConfiguration: S3DataLakeConfiguration, override val stream: DestinationStream, - private val s3DataLakeTableSynchronizer: S3DataLakeTableSynchronizer, - private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory, + private val icebergTableSynchronizer: IcebergTableSynchronizer, + private val icebergTableWriterFactory: IcebergTableWriterFactory, private val s3DataLakeUtil: S3DataLakeUtil, + private val icebergUtil: IcebergUtil, private val stagingBranchName: String, private val mainBranchName: String ) : StreamLoader { @@ -43,8 +47,7 @@ class S3DataLakeStreamLoader( } else { ColumnTypeChangeBehavior.SAFE_SUPERTYPE } - private val incomingSchema = - s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + private val incomingSchema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) @SuppressFBWarnings( "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE", @@ -52,9 +55,10 @@ class S3DataLakeStreamLoader( ) override suspend fun start() { val properties = s3DataLakeUtil.toCatalogProperties(config = icebergConfiguration) - val catalog = s3DataLakeUtil.createCatalog(DEFAULT_CATALOG_NAME, properties) + val catalog = icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, properties) + s3DataLakeUtil.createNamespaceWithGlueHandling(stream.descriptor, catalog) table = - s3DataLakeUtil.createTable( + icebergUtil.createTable( streamDescriptor = stream.descriptor, catalog = catalog, schema = incomingSchema, @@ -89,10 +93,10 @@ class S3DataLakeStreamLoader( totalSizeBytes: Long, endOfStream: Boolean ): Batch { - s3DataLakeTableWriterFactory + icebergTableWriterFactory .create( table = table, - generationId = s3DataLakeUtil.constructGenerationIdSuffix(stream), + generationId = icebergUtil.constructGenerationIdSuffix(stream), importType = stream.importType, schema = targetSchema, ) @@ -100,7 +104,7 @@ class S3DataLakeStreamLoader( logger.info { "Writing records to branch $stagingBranchName" } records.forEach { record -> val icebergRecord = - s3DataLakeUtil.toRecord( + icebergUtil.toRecord( record = record, stream = stream, tableSchema = targetSchema, @@ -145,10 +149,10 @@ class S3DataLakeStreamLoader( } val generationIdsToDelete = (0 until stream.minimumGenerationId).map( - s3DataLakeUtil::constructGenerationIdSuffix + icebergUtil::constructGenerationIdSuffix ) - val s3DataLakeTableCleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) - s3DataLakeTableCleaner.deleteGenerationId( + val icebergTableCleaner = IcebergTableCleaner(icebergUtil = icebergUtil) + icebergTableCleaner.deleteGenerationId( table, stagingBranchName, generationIdsToDelete @@ -173,7 +177,7 @@ class S3DataLakeStreamLoader( * the end of the sync, to get a fresh [UpdateSchema] instance. */ private fun computeOrExecuteSchemaUpdate() = - s3DataLakeTableSynchronizer.maybeApplySchemaChanges( + icebergTableSynchronizer.maybeApplySchemaChanges( table, incomingSchema, columnTypeChangeBehavior, diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableIdGeneratorFactory.kt similarity index 72% rename from airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt rename to airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableIdGeneratorFactory.kt index 5ed8ab7c7fde7..4076b79aae987 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/TableIdGenerator.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableIdGeneratorFactory.kt @@ -9,27 +9,13 @@ import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration import io.airbyte.cdk.load.command.iceberg.parquet.NessieCatalogConfiguration import io.airbyte.cdk.load.command.iceberg.parquet.RestCatalogConfiguration import io.airbyte.cdk.load.data.Transformations +import io.airbyte.cdk.load.toolkits.iceberg.parquet.SimpleTableIdGenerator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.tableIdOf import io.micronaut.context.annotation.Factory import javax.inject.Singleton -import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.TableIdentifier -/** - * Convert our internal stream descriptor to an Iceberg [TableIdentifier]. Implementations should - * handle catalog-specific naming restrictions. - */ -// TODO accept default namespace in config as a val here -interface TableIdGenerator { - fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier -} - -class SimpleTableIdGenerator(private val configNamespace: String? = "") : TableIdGenerator { - override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier { - val namespace = stream.namespace ?: configNamespace - return tableIdOf(namespace!!, stream.name) - } -} - /** AWS Glue requires lowercase database+table names. */ class GlueTableIdGenerator(private val databaseName: String?) : TableIdGenerator { override fun toTableIdentifier(stream: DestinationStream.Descriptor): TableIdentifier { @@ -67,7 +53,3 @@ class TableIdGeneratorFactory(private val s3DataLakeConfiguration: S3DataLakeCon ) } } - -// iceberg namespace+name must both be nonnull. -private fun tableIdOf(namespace: String, name: String) = - TableIdentifier.of(Namespace.of(namespace), name) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt index 87d105f208779..c1e42d7dd76da 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriter.kt @@ -7,19 +7,23 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.ConfigErrorException import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTableSynchronizer +import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader -import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import javax.inject.Singleton import org.apache.iceberg.catalog.TableIdentifier @Singleton class S3DataLakeWriter( - private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory, + private val icebergTableWriterFactory: IcebergTableWriterFactory, private val icebergConfiguration: S3DataLakeConfiguration, private val s3DataLakeUtil: S3DataLakeUtil, - private val s3DataLakeTableSynchronizer: S3DataLakeTableSynchronizer, + private val icebergUtil: IcebergUtil, + private val icebergTableSynchronizer: IcebergTableSynchronizer, private val catalog: DestinationCatalog, private val tableIdGenerator: TableIdGenerator, ) : DestinationWriter { @@ -59,9 +63,10 @@ class S3DataLakeWriter( return S3DataLakeStreamLoader( icebergConfiguration, stream, - s3DataLakeTableSynchronizer, - s3DataLakeTableWriterFactory, + icebergTableSynchronizer, + icebergTableWriterFactory, s3DataLakeUtil, + icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = icebergConfiguration.icebergCatalogConfiguration.mainBranchName, ) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtil.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtil.kt index edb6532b948f3..b463dd90f31f0 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtil.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtil.kt @@ -4,21 +4,13 @@ package io.airbyte.integrations.destination.s3_data_lake.io -import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationStream -import io.airbyte.cdk.load.command.ImportType import io.airbyte.cdk.load.command.aws.AwsAssumeRoleCredentials import io.airbyte.cdk.load.command.iceberg.parquet.GlueCatalogConfiguration import io.airbyte.cdk.load.command.iceberg.parquet.IcebergCatalogConfiguration import io.airbyte.cdk.load.command.iceberg.parquet.NessieCatalogConfiguration import io.airbyte.cdk.load.command.iceberg.parquet.RestCatalogConfiguration -import io.airbyte.cdk.load.data.MapperPipeline -import io.airbyte.cdk.load.data.NullValue -import io.airbyte.cdk.load.data.ObjectValue -import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord -import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema -import io.airbyte.cdk.load.data.withAirbyteMeta -import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.integrations.destination.s3_data_lake.ACCESS_KEY_ID import io.airbyte.integrations.destination.s3_data_lake.ASSUME_ROLE_ARN import io.airbyte.integrations.destination.s3_data_lake.ASSUME_ROLE_EXTERNAL_ID @@ -29,172 +21,47 @@ import io.airbyte.integrations.destination.s3_data_lake.AWS_CREDENTIALS_MODE_STA import io.airbyte.integrations.destination.s3_data_lake.GlueCredentialsProvider import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeConfiguration import io.airbyte.integrations.destination.s3_data_lake.SECRET_ACCESS_KEY -import io.airbyte.integrations.destination.s3_data_lake.TableIdGenerator import io.github.oshai.kotlinlogging.KotlinLogging import jakarta.inject.Singleton -import org.apache.hadoop.conf.Configuration import org.apache.iceberg.CatalogProperties import org.apache.iceberg.CatalogProperties.URI import org.apache.iceberg.CatalogUtil import org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_GLUE import org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_NESSIE import org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST -import org.apache.iceberg.FileFormat -import org.apache.iceberg.Schema -import org.apache.iceberg.SortOrder -import org.apache.iceberg.Table -import org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT import org.apache.iceberg.aws.AwsClientProperties import org.apache.iceberg.aws.AwsProperties import org.apache.iceberg.aws.s3.S3FileIO import org.apache.iceberg.aws.s3.S3FileIOProperties import org.apache.iceberg.catalog.Catalog -import org.apache.iceberg.catalog.SupportsNamespaces -import org.apache.iceberg.data.Record -import org.apache.iceberg.exceptions.AlreadyExistsException import org.projectnessie.client.NessieConfigConstants -import software.amazon.awssdk.services.glue.model.ConcurrentModificationException -private val logger = KotlinLogging.logger {} - -const val AIRBYTE_CDC_DELETE_COLUMN = "_ab_cdc_deleted_at" -const val EXTERNAL_ID = "AWS_ASSUME_ROLE_EXTERNAL_ID" -const val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" -const val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" private const val AWS_REGION = "aws.region" +private val logger = KotlinLogging.logger {} + /** * Collection of Iceberg related utilities. - * @param awsSystemCredentials is a temporary fix to allow us to run the integrations tests. This + * @param assumeRoleCredentials is a temporary fix to allow us to run the integrations tests. This * will be removed when we change all of this to use Micronaut */ @Singleton class S3DataLakeUtil( - private val tableIdGenerator: TableIdGenerator, + private val icebergUtil: IcebergUtil, private val assumeRoleCredentials: AwsAssumeRoleCredentials?, ) { - - internal class InvalidFormatException(message: String) : Exception(message) - - private val generationIdRegex = Regex("""ab-generation-id-\d+-e""") - - fun assertGenerationIdSuffixIsOfValidFormat(generationId: String) { - if (!generationIdRegex.matches(generationId)) { - throw InvalidFormatException( - "Invalid format: $generationId. Expected format is 'ab-generation-id--e'", - ) - } - } - - fun constructGenerationIdSuffix(stream: DestinationStream): String { - return constructGenerationIdSuffix(stream.generationId) - } - - fun constructGenerationIdSuffix(generationId: Long): String { - if (generationId < 0) { - throw IllegalArgumentException( - "GenerationId must be non-negative. Provided: $generationId", - ) - } - return "ab-generation-id-${generationId}-e" - } - /** - * Builds an Iceberg [Catalog]. - * - * @param catalogName The name of the catalog. - * @param properties The map of catalog configuration properties. - * @return The configured Iceberg [Catalog]. - */ - fun createCatalog(catalogName: String, properties: Map): Catalog { - return CatalogUtil.buildIcebergCatalog(catalogName, properties, Configuration()) - } - - /** - * Builds (if necessary) an Iceberg [Table]. This includes creating the table's namespace if it - * does not already exist. If the [Table] already exists, it is loaded from the [Catalog]. - * - * @param streamDescriptor The [DestinationStream.Descriptor] that contains the Airbyte stream's - * namespace and name. - * @param catalog The Iceberg [Catalog] that contains the [Table] or should contain it once - * created. - * @param schema The Iceberg [Schema] associated with the [Table]. - * @param properties The [Table] configuration properties derived from the [Catalog]. - * @return The Iceberg [Table], created if it does not yet exist. - */ - fun createTable( + fun createNamespaceWithGlueHandling( streamDescriptor: DestinationStream.Descriptor, - catalog: Catalog, - schema: Schema, - properties: Map - ): Table { - val tableIdentifier = tableIdGenerator.toTableIdentifier(streamDescriptor) - synchronized(tableIdentifier.namespace()) { - if ( - catalog is SupportsNamespaces && - !catalog.namespaceExists(tableIdentifier.namespace()) - ) { - try { - catalog.createNamespace(tableIdentifier.namespace()) - logger.info { "Created namespace '${tableIdentifier.namespace()}'." } - } catch (e: AlreadyExistsException) { - // This exception occurs when multiple threads attempt to write to the same - // namespace in parallel. - // One thread may create the namespace successfully, causing the other threads - // to encounter this exception - // when they also try to create the namespace. - logger.info { - "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." - } - } catch (e: ConcurrentModificationException) { - // do the same for AWS Glue - logger.info { - "Namespace '${tableIdentifier.namespace()}' was likely created by another thread during parallel operations." - } - } + catalog: Catalog + ) { + try { + icebergUtil.createNamespace(streamDescriptor, catalog) + } catch (e: ConcurrentModificationException) { + // glue catalog throws its own special exception + logger.info { + "Namespace '${streamDescriptor.namespace}' was likely created by another thread during parallel operations." } } - - return if (!catalog.tableExists(tableIdentifier)) { - logger.info { "Creating Iceberg table '$tableIdentifier'...." } - catalog - .buildTable(tableIdentifier, schema) - .withProperties(properties) - .withProperty(DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name.lowercase()) - .withSortOrder(getSortOrder(schema = schema)) - .create() - } else { - logger.info { "Loading Iceberg table $tableIdentifier ..." } - catalog.loadTable(tableIdentifier) - } - } - - /** - * Converts an Airbyte [DestinationRecordAirbyteValue] into an Iceberg [Record]. The converted - * record will be wrapped to include [Operation] information, which is used by the writer to - * determine how to write the data to the underlying Iceberg files. - * - * @param record The Airbyte [DestinationRecordAirbyteValue] record to be converted for writing - * by Iceberg. - * @param stream The Airbyte [DestinationStream] that contains information about the stream. - * @param tableSchema The Iceberg [Table] [Schema]. - * @param pipeline The [MapperPipeline] used to convert the Airbyte record to an Iceberg record. - * @return An Iceberg [Record] representation of the Airbyte [DestinationRecordAirbyteValue]. - */ - fun toRecord( - record: DestinationRecordAirbyteValue, - stream: DestinationStream, - tableSchema: Schema, - pipeline: MapperPipeline - ): Record { - val dataMapped = - pipeline - .map(record.data, record.meta?.changes) - .withAirbyteMeta(stream, record.emittedAtMs, true) - // TODO figure out how to detect the actual operation value - return RecordWrapper( - delegate = dataMapped.toIcebergRecord(tableSchema), - operation = getOperation(record = record, importType = stream.importType) - ) } /** @@ -230,13 +97,13 @@ class S3DataLakeUtil( } } + // TODO this + nessie probably belong in base CDK toolkit private fun buildRestProperties( config: S3DataLakeConfiguration, catalogConfig: RestCatalogConfiguration, s3Properties: Map, region: String ): Map { - println(region) val awsAccessKeyId = requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) { "AWS Access Key ID is required for Rest configuration" @@ -409,35 +276,4 @@ class S3DataLakeUtil( return properties } - - fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema { - val primaryKeys = - when (stream.importType) { - is Dedupe -> (stream.importType as Dedupe).primaryKey - else -> emptyList() - } - return pipeline.finalSchema.withAirbyteMeta(true).toIcebergSchema(primaryKeys) - } - - private fun getSortOrder(schema: Schema): SortOrder { - val builder = SortOrder.builderFor(schema) - schema.identifierFieldNames().forEach { builder.asc(it) } - return builder.build() - } - - private fun getOperation( - record: DestinationRecordAirbyteValue, - importType: ImportType, - ): Operation = - if ( - record.data is ObjectValue && - (record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] != null && - (record.data as ObjectValue).values[AIRBYTE_CDC_DELETE_COLUMN] !is NullValue - ) { - Operation.DELETE - } else if (importType is Dedupe) { - Operation.UPDATE - } else { - Operation.INSERT - } } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt index e43dc61e3d78c..cfc0c684a70ea 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDestinationCleaner.kt @@ -7,9 +7,9 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.isNamespaceOld import io.airbyte.cdk.load.test.util.IntegrationTest.Companion.randomizedNamespaceRegex -import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableCleaner -import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil -import io.github.oshai.kotlinlogging.KotlinLogging +import io.airbyte.cdk.load.toolkits.iceberg.parquet.SimpleTableIdGenerator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableCleaner +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -17,8 +17,6 @@ import org.apache.iceberg.catalog.Catalog import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.SupportsNamespaces -private val logger = KotlinLogging.logger {} - class S3DataLakeDestinationCleaner(private val catalog: Catalog) : DestinationCleaner { override fun cleanup() { val namespaces: List = @@ -28,13 +26,7 @@ class S3DataLakeDestinationCleaner(private val catalog: Catalog) : DestinationCl } // we're passing explicit TableIdentifier to clearTable, so just use SimpleTableIdGenerator - val tableCleaner = - S3DataLakeTableCleaner( - S3DataLakeUtil( - SimpleTableIdGenerator(), - S3DataLakeTestUtil.getAwsAssumeRoleCredentials() - ) - ) + val tableCleaner = IcebergTableCleaner(IcebergUtil(SimpleTableIdGenerator())) runBlocking(Dispatchers.IO) { namespaces.forEach { namespace -> diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTestUtil.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTestUtil.kt index cb197f9209dc7..8618470a0cb8a 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTestUtil.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTestUtil.kt @@ -9,10 +9,13 @@ import io.airbyte.cdk.command.ConfigurationSpecification import io.airbyte.cdk.command.ValidatedJsonUtils import io.airbyte.cdk.load.command.aws.AwsAssumeRoleCredentials import io.airbyte.cdk.load.command.aws.AwsEnvVarConstants +import io.airbyte.cdk.load.toolkits.iceberg.parquet.SimpleTableIdGenerator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.cdk.load.util.Jsons import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import java.nio.file.Files import java.nio.file.Path +import org.apache.iceberg.catalog.Catalog object S3DataLakeTestUtil { val GLUE_CONFIG_PATH: Path = Path.of("secrets/glue.json") @@ -47,9 +50,10 @@ object S3DataLakeTestUtil { fun getCatalog( config: S3DataLakeConfiguration, awsAssumeRoleCredentials: AwsAssumeRoleCredentials - ) = - S3DataLakeUtil(SimpleTableIdGenerator(), awsAssumeRoleCredentials).let { icebergUtil -> - val props = icebergUtil.toCatalogProperties(config) - icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) - } + ): Catalog { + val icebergUtil = IcebergUtil(SimpleTableIdGenerator()) + val s3DataLakeUtil = S3DataLakeUtil(icebergUtil, awsAssumeRoleCredentials) + val props = s3DataLakeUtil.toCatalogProperties(config) + return icebergUtil.createCatalog(DEFAULT_CATALOG_NAME, props) + } } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt index 2214771966849..00700bc2ae2ba 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt @@ -18,11 +18,11 @@ import io.airbyte.cdk.load.message.InputRecord import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.OutputRecord +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.BaseDeltaTaskWriter import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior import io.airbyte.cdk.load.write.StronglyTyped import io.airbyte.cdk.load.write.UnionBehavior -import io.airbyte.integrations.destination.s3_data_lake.io.BaseDeltaTaskWriter import java.nio.file.Files import java.util.Base64 import kotlin.test.assertContains diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt index 0fe6e596c09ce..5cd1b9d98ce2f 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeStreamLoaderTest.kt @@ -23,7 +23,12 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID -import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeTableWriterFactory +import io.airbyte.cdk.load.toolkits.iceberg.parquet.ColumnTypeChangeBehavior +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergSuperTypeFinder +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTableSynchronizer +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.airbyte.integrations.destination.s3_data_lake.io.S3DataLakeUtil import io.mockk.every import io.mockk.just @@ -101,7 +106,7 @@ internal class S3DataLakeStreamLoaderTest { ), Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) - val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory = mockk() + val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -127,9 +132,12 @@ internal class S3DataLakeStreamLoaderTest { val table: Table = mockk { every { schema() } returns icebergSchema } every { table.manageSnapshots().createBranch(any()).commit() } just runs val s3DataLakeUtil: S3DataLakeUtil = mockk { + every { createNamespaceWithGlueHandling(any(), any()) } just runs + every { toCatalogProperties(any()) } returns mapOf() + } + val icebergUtil: IcebergUtil = mockk { every { createCatalog(any(), any()) } returns catalog every { createTable(any(), any(), any(), any()) } returns table - every { toCatalogProperties(any()) } returns mapOf() every { toIcebergSchema(any(), any()) } answers { val pipeline = secondArg() as MapperPipeline @@ -140,12 +148,13 @@ internal class S3DataLakeStreamLoaderTest { S3DataLakeStreamLoader( icebergConfiguration, stream, - S3DataLakeTableSynchronizer( - S3DataLakeTypesComparator(), - S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()), + IcebergTableSynchronizer( + IcebergTypesComparator(), + IcebergSuperTypeFinder(IcebergTypesComparator()), ), - s3DataLakeTableWriterFactory, + icebergTableWriterFactory, s3DataLakeUtil, + icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", ) @@ -174,7 +183,7 @@ internal class S3DataLakeStreamLoaderTest { Schema( Types.NestedField.of(2, true, "name", Types.StringType.get()), ) - val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory = mockk() + val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -221,9 +230,12 @@ internal class S3DataLakeStreamLoaderTest { every { table.manageSnapshots().fastForwardBranch(any(), any()).commit() } just runs every { table.newScan().planFiles() } returns CloseableIterable.empty() val s3DataLakeUtil: S3DataLakeUtil = mockk { + every { createNamespaceWithGlueHandling(any(), any()) } just runs + every { toCatalogProperties(any()) } returns mapOf() + } + val icebergUtil: IcebergUtil = mockk { every { createCatalog(any(), any()) } returns catalog every { createTable(any(), any(), any(), any()) } returns table - every { toCatalogProperties(any()) } returns mapOf() every { toIcebergSchema(any(), any()) } answers { val pipeline = secondArg() as MapperPipeline @@ -236,12 +248,13 @@ internal class S3DataLakeStreamLoaderTest { S3DataLakeStreamLoader( icebergConfiguration, stream, - S3DataLakeTableSynchronizer( - S3DataLakeTypesComparator(), - S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()), + IcebergTableSynchronizer( + IcebergTypesComparator(), + IcebergSuperTypeFinder(IcebergTypesComparator()), ), - s3DataLakeTableWriterFactory, + icebergTableWriterFactory, s3DataLakeUtil, + icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", ) @@ -322,7 +335,7 @@ internal class S3DataLakeStreamLoaderTest { Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) val icebergSchema = Schema(columns, emptySet()) - val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory = mockk() + val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() val awsConfiguration: AWSAccessKeyConfiguration = mockk { every { accessKeyId } returns "access-key" every { secretAccessKey } returns "secret-access-key" @@ -369,9 +382,12 @@ internal class S3DataLakeStreamLoaderTest { every { table.manageSnapshots().fastForwardBranch(any(), any()).commit() } just runs every { table.newScan().planFiles() } returns CloseableIterable.empty() val s3DataLakeUtil: S3DataLakeUtil = mockk { + every { createNamespaceWithGlueHandling(any(), any()) } just runs + every { toCatalogProperties(any()) } returns mapOf() + } + val icebergUtil: IcebergUtil = mockk { every { createCatalog(any(), any()) } returns catalog every { createTable(any(), any(), any(), any()) } returns table - every { toCatalogProperties(any()) } returns mapOf() every { toIcebergSchema(any(), any()) } answers { val pipeline = secondArg() as MapperPipeline @@ -384,12 +400,13 @@ internal class S3DataLakeStreamLoaderTest { S3DataLakeStreamLoader( icebergConfiguration, stream, - S3DataLakeTableSynchronizer( - S3DataLakeTypesComparator(), - S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()), + IcebergTableSynchronizer( + IcebergTypesComparator(), + IcebergSuperTypeFinder(IcebergTypesComparator()), ), - s3DataLakeTableWriterFactory, + icebergTableWriterFactory, s3DataLakeUtil, + icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", ) @@ -427,8 +444,9 @@ internal class S3DataLakeStreamLoaderTest { syncId = 1, ) val icebergConfiguration: S3DataLakeConfiguration = mockk() - val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory = mockk() - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergTableWriterFactory: IcebergTableWriterFactory = mockk() + val s3DataLakeUtil: S3DataLakeUtil = mockk() + val icebergUtil: IcebergUtil = mockk { every { toIcebergSchema(any(), any()) } answers { val pipeline = secondArg() as MapperPipeline @@ -439,12 +457,13 @@ internal class S3DataLakeStreamLoaderTest { S3DataLakeStreamLoader( icebergConfiguration, stream, - S3DataLakeTableSynchronizer( - S3DataLakeTypesComparator(), - S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()), + IcebergTableSynchronizer( + IcebergTypesComparator(), + IcebergSuperTypeFinder(IcebergTypesComparator()), ), - s3DataLakeTableWriterFactory, + icebergTableWriterFactory, s3DataLakeUtil, + icebergUtil, stagingBranchName = DEFAULT_STAGING_BRANCH, mainBranchName = "main", ) diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt index 0358055d10f7b..5b371fdd9ad87 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt @@ -5,6 +5,8 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.ConfigErrorException +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergSuperTypeFinder +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator import org.apache.iceberg.types.Type import org.apache.iceberg.types.Type.TypeID.DOUBLE import org.apache.iceberg.types.Type.TypeID.LONG @@ -15,10 +17,10 @@ import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test -/** Comprehensive test suite for [S3DataLakeSuperTypeFinder]. */ +/** Comprehensive test suite for [IcebergSuperTypeFinder]. */ class S3DataLakeSuperTypeFinderTest { - private val superTypeFinder = S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()) + private val superTypeFinder = IcebergSuperTypeFinder(IcebergTypesComparator()) @Test fun testIdenticalPrimitiveTypes() { diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizerTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizerTest.kt index 4d1f6130eee2a..8acf3a84576e7 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizerTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTableSynchronizerTest.kt @@ -5,6 +5,11 @@ package io.airbyte.integrations.destination.s3_data_lake import io.airbyte.cdk.ConfigErrorException +import io.airbyte.cdk.load.toolkits.iceberg.parquet.ColumnTypeChangeBehavior +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergSuperTypeFinder +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTableSynchronizer +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.SchemaUpdateResult import io.mockk.confirmVerified import io.mockk.every import io.mockk.just @@ -23,10 +28,10 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test /** - * Tests for [S3DataLakeTableSynchronizer]. + * Tests for [IcebergTableSynchronizer]. * * We use a mocked [Table] and [UpdateSchema] to verify that the right calls are made based on the - * computed [S3DataLakeTypesComparator.ColumnDiff]. + * computed [IcebergTypesComparator.ColumnDiff]. */ class S3DataLakeTableSynchronizerTest { @@ -36,9 +41,9 @@ class S3DataLakeTableSynchronizerTest { private lateinit var mockNewSchema: Schema // Collaborators under test - private val comparator = spyk(S3DataLakeTypesComparator()) - private val superTypeFinder = spyk(S3DataLakeSuperTypeFinder(comparator)) - private val synchronizer = S3DataLakeTableSynchronizer(comparator, superTypeFinder) + private val comparator = spyk(IcebergTypesComparator()) + private val superTypeFinder = spyk(IcebergSuperTypeFinder(comparator)) + private val synchronizer = IcebergTableSynchronizer(comparator, superTypeFinder) @BeforeEach fun setUp() { @@ -81,7 +86,7 @@ class S3DataLakeTableSynchronizerTest { // The comparator will see no changes every { comparator.compareSchemas(incomingSchema, existingSchema) } answers { - S3DataLakeTypesComparator.ColumnDiff() + IcebergTypesComparator.ColumnDiff() } val result = @@ -243,8 +248,7 @@ class S3DataLakeTableSynchronizerTest { val incomingSchema = buildSchema() // Not too relevant, since we expect an exception every { mockTable.schema() } returns existingSchema - val diff = - S3DataLakeTypesComparator.ColumnDiff(newColumns = mutableListOf("outer~inner~leaf")) + val diff = IcebergTypesComparator.ColumnDiff(newColumns = mutableListOf("outer~inner~leaf")) every { comparator.compareSchemas(incomingSchema, existingSchema) } returns diff assertThatThrownBy { @@ -273,7 +277,7 @@ class S3DataLakeTableSynchronizerTest { every { mockTable.schema() } returns existingSchema val diff = - S3DataLakeTypesComparator.ColumnDiff(updatedDataTypes = mutableListOf("complex_col")) + IcebergTypesComparator.ColumnDiff(updatedDataTypes = mutableListOf("complex_col")) every { comparator.compareSchemas(incomingSchema, existingSchema) } returns diff // Let superTypeFinder return a struct type diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTypesComparatorTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTypesComparatorTest.kt index 6e95e4923e597..96a9dfe319cee 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTypesComparatorTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeTypesComparatorTest.kt @@ -3,7 +3,8 @@ */ package io.airbyte.integrations.destination.s3_data_lake -import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.splitIntoParentAndLeaf +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.IcebergTypesComparator.Companion.splitIntoParentAndLeaf import org.apache.iceberg.Schema import org.apache.iceberg.types.Type import org.apache.iceberg.types.Types @@ -11,10 +12,10 @@ import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.jupiter.api.Test -/** Comprehensive test suite for [S3DataLakeTypesComparator]. */ +/** Comprehensive test suite for [IcebergTypesComparator]. */ class S3DataLakeTypesComparatorTest { - private val comparator = S3DataLakeTypesComparator() + private val comparator = IcebergTypesComparator() /** * Helper function to create a simple Iceberg [Types.NestedField]. diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/BaseDeltaTaskWriterTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/BaseDeltaTaskWriterTest.kt index e40eb87a167aa..88b0c80dc29cb 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/BaseDeltaTaskWriterTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/BaseDeltaTaskWriterTest.kt @@ -4,7 +4,9 @@ package io.airbyte.integrations.destination.s3_data_lake.io -import io.airbyte.integrations.destination.s3_data_lake.io.BaseDeltaTaskWriter.RowDataDeltaWriter +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.BaseDeltaTaskWriter +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.Operation +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.RecordWrapper import io.mockk.every import io.mockk.mockk import io.mockk.verify @@ -43,7 +45,7 @@ internal class BaseDeltaTaskWriterTest { ) val primaryKeyIds = setOf(1) val schema = Schema(columns, primaryKeyIds) - val deltaWriter: RowDataDeltaWriter = mockk { + val deltaWriter: BaseDeltaTaskWriter.RowDataDeltaWriter = mockk { every { deleteKey(any()) } returns Unit every { write(any()) } returns Unit } @@ -93,7 +95,7 @@ internal class BaseDeltaTaskWriterTest { Types.NestedField.required(3, "timestamp", Types.TimestampType.withZone()), ) val schema = Schema(columns) - val deltaWriter: RowDataDeltaWriter = mockk { + val deltaWriter: BaseDeltaTaskWriter.RowDataDeltaWriter = mockk { every { deleteKey(any()) } returns Unit every { write(any()) } returns Unit } diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleanerTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleanerTest.kt index d976ee0bd8cb5..a1db0d4a8002f 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleanerTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableCleanerTest.kt @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.s3_data_lake.io +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableCleaner +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil import io.mockk.Runs import io.mockk.called import io.mockk.every @@ -28,12 +30,12 @@ internal class S3DataLakeTableCleanerTest { @Test fun testClearingTableWithPrefix() { val catalog: Catalog = mockk { every { dropTable(any(), true) } returns true } - val s3DataLakeUtil: S3DataLakeUtil = mockk() + val icebergUtil: IcebergUtil = mockk() val tableIdentifier: TableIdentifier = mockk() val fileIo: S3FileIO = mockk { every { deletePrefix(any()) } returns Unit } val tableLocation = "table/location" - val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) + val cleaner = IcebergTableCleaner(icebergUtil) cleaner.clearTable( catalog = catalog, @@ -49,12 +51,12 @@ internal class S3DataLakeTableCleanerTest { @Test fun testClearingTableWithoutPrefix() { val catalog: Catalog = mockk { every { dropTable(any(), true) } returns true } - val s3DataLakeUtil: S3DataLakeUtil = mockk() + val icebergUtil: IcebergUtil = mockk() val tableIdentifier: TableIdentifier = mockk() val fileIo: FileIO = mockk() val tableLocation = "table/location" - val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) + val cleaner = IcebergTableCleaner(icebergUtil) cleaner.clearTable( catalog = catalog, @@ -69,10 +71,10 @@ internal class S3DataLakeTableCleanerTest { @Test fun `deleteGenerationId handles empty scan results gracefully`() { - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergUtil: IcebergUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) + val cleaner = IcebergTableCleaner(icebergUtil) val generationIdSuffix = "ab-generation-id-0-e" val tasks = CloseableIterable.empty() @@ -87,10 +89,10 @@ internal class S3DataLakeTableCleanerTest { @Test fun `deleteGenerationId deletes matching file via deleteFile`() { - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergUtil: IcebergUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) + val cleaner = IcebergTableCleaner(icebergUtil) val generationIdSuffix = "ab-generation-id-0-e" val filePathToDelete = "path/to/gen-5678/foo-bar-ab-generation-id-0-e.parquet" val fileScanTask = mockk() @@ -114,7 +116,7 @@ internal class S3DataLakeTableCleanerTest { } verify { - s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) table.newDelete().toBranch(eq("staging")) delete.deleteFile(filePathToDelete) delete.commit() @@ -123,10 +125,10 @@ internal class S3DataLakeTableCleanerTest { @Test fun `deleteGenerationId should not delete non matching file via deleteFile`() { - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergUtil: IcebergUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val cleaner = S3DataLakeTableCleaner(s3DataLakeUtil = s3DataLakeUtil) + val cleaner = IcebergTableCleaner(icebergUtil) val generationIdSuffix = "ab-generation-id-10-e" val filePathToDelete = "path/to/gen-5678/foo-bar-ab-generation-id-10-e.parquet" val fileScanTask = mockk() @@ -150,7 +152,7 @@ internal class S3DataLakeTableCleanerTest { } verify(exactly = 0) { - s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(generationIdSuffix) table.newDelete().toBranch(any()) delete.deleteFile(any()) delete.commit() diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactoryTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactoryTest.kt index b784f72f884bb..17d3ae27f2c6b 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactoryTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeTableWriterFactoryTest.kt @@ -6,6 +6,12 @@ package io.airbyte.integrations.destination.s3_data_lake.io import io.airbyte.cdk.load.command.Append import io.airbyte.cdk.load.command.Dedupe +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.PartitionedAppendWriter +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.PartitionedDeltaWriter +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.UnpartitionedAppendWriter +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.UnpartitionedDeltaWriter import io.mockk.every import io.mockk.mockk import java.nio.ByteBuffer @@ -67,11 +73,11 @@ internal class S3DataLakeTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergUtil: IcebergUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) + val factory = IcebergTableWriterFactory(icebergUtil) val writer = factory.create( table = table, @@ -126,11 +132,11 @@ internal class S3DataLakeTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergUtil: IcebergUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) + val factory = IcebergTableWriterFactory(icebergUtil) val writer = factory.create( table = table, @@ -184,11 +190,11 @@ internal class S3DataLakeTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergUtil: IcebergUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) + val factory = IcebergTableWriterFactory(icebergUtil) val writer = factory.create( table = table, @@ -238,11 +244,11 @@ internal class S3DataLakeTableWriterFactoryTest { every { schema() } returns tableSchema every { spec() } returns tableSpec } - val s3DataLakeUtil: S3DataLakeUtil = mockk { + val icebergUtil: IcebergUtil = mockk { every { assertGenerationIdSuffixIsOfValidFormat(any()) } returns Unit } - val factory = S3DataLakeTableWriterFactory(s3DataLakeUtil = s3DataLakeUtil) + val factory = IcebergTableWriterFactory(icebergUtil) val writer = factory.create( table = table, diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt index 87ee52265dec6..89d85bab3b9d3 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/io/S3DataLakeUtilTest.kt @@ -27,8 +27,12 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_EXTRACTED_AT import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_GENERATION_ID import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_META import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID +import io.airbyte.cdk.load.toolkits.iceberg.parquet.SimpleTableIdGenerator +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.AIRBYTE_CDC_DELETE_COLUMN +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.Operation +import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.RecordWrapper import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeConfiguration -import io.airbyte.integrations.destination.s3_data_lake.SimpleTableIdGenerator import io.mockk.every import io.mockk.mockk import io.mockk.verify @@ -54,11 +58,13 @@ import org.junit.jupiter.api.assertThrows internal class S3DataLakeUtilTest { private lateinit var s3DataLakeUtil: S3DataLakeUtil + private lateinit var icebergUtil: IcebergUtil private val tableIdGenerator = SimpleTableIdGenerator() @BeforeEach fun setup() { - s3DataLakeUtil = S3DataLakeUtil(tableIdGenerator, assumeRoleCredentials = null) + icebergUtil = IcebergUtil(tableIdGenerator) + s3DataLakeUtil = S3DataLakeUtil(icebergUtil, assumeRoleCredentials = null) } @Test @@ -70,8 +76,7 @@ internal class S3DataLakeUtilTest { URI to "http://localhost:19120/api/v1", WAREHOUSE_LOCATION to "s3://test/" ) - val catalog = - s3DataLakeUtil.createCatalog(catalogName = catalogName, properties = properties) + val catalog = icebergUtil.createCatalog(catalogName = catalogName, properties = properties) assertNotNull(catalog) assertEquals(catalogName, catalog.name()) assertEquals(NessieCatalog::class.java, catalog.javaClass) @@ -98,8 +103,9 @@ internal class S3DataLakeUtilTest { every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false } + s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog) val table = - s3DataLakeUtil.createTable( + icebergUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -134,8 +140,9 @@ internal class S3DataLakeUtilTest { every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns false } + s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog) val table = - s3DataLakeUtil.createTable( + icebergUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -161,8 +168,9 @@ internal class S3DataLakeUtilTest { every { namespaceExists(any()) } returns true every { tableExists(tableIdGenerator.toTableIdentifier(streamDescriptor)) } returns true } + s3DataLakeUtil.createNamespaceWithGlueHandling(streamDescriptor, catalog) val table = - s3DataLakeUtil.createTable( + icebergUtil.createTable( streamDescriptor = streamDescriptor, catalog = catalog, schema = schema, @@ -215,7 +223,7 @@ internal class S3DataLakeUtilTest { ) val schema = Schema(columns) val icebergRecord = - s3DataLakeUtil.toRecord( + icebergUtil.toRecord( record = airbyteRecord, pipeline = pipeline, tableSchema = schema, @@ -267,7 +275,7 @@ internal class S3DataLakeUtilTest { ) val schema = Schema(columns, setOf(1)) val icebergRecord = - s3DataLakeUtil.toRecord( + icebergUtil.toRecord( record = airbyteRecord, pipeline = pipeline, tableSchema = schema, @@ -314,7 +322,7 @@ internal class S3DataLakeUtilTest { ) val schema = Schema(columns, setOf(1)) val icebergRecord = - s3DataLakeUtil.toRecord( + icebergUtil.toRecord( record = airbyteRecord, pipeline = pipeline, tableSchema = schema, @@ -378,7 +386,7 @@ internal class S3DataLakeUtilTest { fun `assertGenerationIdSuffixIsOfValidFormat accepts valid format`() { val validGenerationId = "ab-generation-id-123-e" assertDoesNotThrow { - s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(validGenerationId) + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(validGenerationId) } } @@ -386,8 +394,8 @@ internal class S3DataLakeUtilTest { fun `assertGenerationIdSuffixIsOfValidFormat throws exception for invalid prefix`() { val invalidGenerationId = "invalid-generation-id-123" val exception = - assertThrows { - s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) + assertThrows { + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) } assertEquals( "Invalid format: $invalidGenerationId. Expected format is 'ab-generation-id--e'", @@ -399,8 +407,8 @@ internal class S3DataLakeUtilTest { fun `assertGenerationIdSuffixIsOfValidFormat throws exception for missing number`() { val invalidGenerationId = "ab-generation-id-" val exception = - assertThrows { - s3DataLakeUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) + assertThrows { + icebergUtil.assertGenerationIdSuffixIsOfValidFormat(invalidGenerationId) } assertEquals( "Invalid format: $invalidGenerationId. Expected format is 'ab-generation-id--e'", @@ -413,7 +421,7 @@ internal class S3DataLakeUtilTest { val stream = mockk() every { stream.generationId } returns 42 val expectedSuffix = "ab-generation-id-42-e" - val result = s3DataLakeUtil.constructGenerationIdSuffix(stream) + val result = icebergUtil.constructGenerationIdSuffix(stream) assertEquals(expectedSuffix, result) } @@ -423,7 +431,7 @@ internal class S3DataLakeUtilTest { every { stream.generationId } returns -1 val exception = assertThrows { - s3DataLakeUtil.constructGenerationIdSuffix(stream) + icebergUtil.constructGenerationIdSuffix(stream) } assertEquals( "GenerationId must be non-negative. Provided: ${stream.generationId}", @@ -451,7 +459,7 @@ internal class S3DataLakeUtilTest { syncId = 1, ) val pipeline = ParquetMapperPipelineFactory().create(stream) - val schema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) assertEquals(primaryKeys.toSet(), schema.identifierFieldNames()) assertEquals(6, schema.columns().size) assertNotNull(schema.findField("id")) @@ -481,7 +489,7 @@ internal class S3DataLakeUtilTest { syncId = 1, ) val pipeline = ParquetMapperPipelineFactory().create(stream) - val schema = s3DataLakeUtil.toIcebergSchema(stream = stream, pipeline = pipeline) + val schema = icebergUtil.toIcebergSchema(stream = stream, pipeline = pipeline) assertEquals(emptySet(), schema.identifierFieldNames()) assertEquals(6, schema.columns().size) assertNotNull(schema.findField("id")) diff --git a/docs/integrations/destinations/s3-data-lake.md b/docs/integrations/destinations/s3-data-lake.md index ad4ae0f567ffd..502424e315ba7 100644 --- a/docs/integrations/destinations/s3-data-lake.md +++ b/docs/integrations/destinations/s3-data-lake.md @@ -157,6 +157,7 @@ drop all table versions. | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------| +| 0.3.13 | 2025-02-14 | [\#53697](https://github.com/airbytehq/airbyte/pull/53697) | Internal refactor | | 0.3.12 | 2025-02-12 | [\#53170](https://github.com/airbytehq/airbyte/pull/53170) | Improve documentation, tweak error handling of invalid schema evolution | | 0.3.11 | 2025-02-12 | [\#53216](https://github.com/airbytehq/airbyte/pull/53216) | Support arbitrary schema change in overwrite / truncate refresh / clear sync | | 0.3.10 | 2025-02-11 | [\#53622](https://github.com/airbytehq/airbyte/pull/53622) | Enable the Nessie integration tests |