diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt index 8ae1dacaf18f..0dd9c5e3ed36 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriterTest.kt @@ -26,10 +26,15 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil import io.mockk.every +import io.mockk.just import io.mockk.mockk +import io.mockk.runs +import io.mockk.verify import org.apache.iceberg.Schema import org.apache.iceberg.Table +import org.apache.iceberg.UpdateSchema import org.apache.iceberg.catalog.Catalog +import org.apache.iceberg.types.Type.PrimitiveType import org.apache.iceberg.types.Types import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertTrue @@ -50,7 +55,7 @@ internal class IcebergV2WriterTest { linkedMapOf( "id" to FieldType(IntegerType, nullable = true), "name" to FieldType(StringType, nullable = true), - ) + ), ), generationId = 1, minimumGenerationId = 1, @@ -80,18 +85,18 @@ internal class IcebergV2WriterTest { 10, false, "change", - Types.StringType.get() + Types.StringType.get(), ), Types.NestedField.of( 11, false, "reason", - Types.StringType.get() - ) - ) - ) - ) - ) + Types.StringType.get(), + ), + ), + ), + ), + ), ), Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) @@ -134,6 +139,11 @@ internal class IcebergV2WriterTest { icebergTableWriterFactory = icebergTableWriterFactory, icebergConfiguration = icebergConfiguration, icebergUtil = icebergUtil, + icebergTableSynchronizer = + IcebergTableSynchronizer( + IcebergTypesComparator(), + IcebergSuperTypeFinder(IcebergTypesComparator()), + ), ) val streamLoader = icebergV2Writer.createStreamLoader(stream = stream) assertNotNull(streamLoader) @@ -151,7 +161,7 @@ internal class IcebergV2WriterTest { linkedMapOf( "id" to FieldType(IntegerType, nullable = true), "name" to FieldType(StringType, nullable = true), - ) + ), ), generationId = 1, minimumGenerationId = 1, @@ -184,6 +194,23 @@ internal class IcebergV2WriterTest { } val catalog: Catalog = mockk() val table: Table = mockk { every { schema() } returns icebergSchema } + val updateSchema: UpdateSchema = mockk() + every { table.updateSchema().allowIncompatibleChanges() } returns updateSchema + every { + updateSchema.updateColumn( + any(), + any(), + ) + } returns updateSchema + every { + updateSchema.addColumn( + any(), + any(), + any(), + ) + } returns updateSchema + every { updateSchema.commit() } just runs + every { table.refresh() } just runs val icebergUtil: IcebergUtil = mockk { every { createCatalog(any(), any()) } returns catalog every { createTable(any(), any(), any(), any()) } returns table @@ -199,14 +226,24 @@ internal class IcebergV2WriterTest { icebergTableWriterFactory = icebergTableWriterFactory, icebergConfiguration = icebergConfiguration, icebergUtil = icebergUtil, + icebergTableSynchronizer = + IcebergTableSynchronizer( + IcebergTypesComparator(), + IcebergSuperTypeFinder(IcebergTypesComparator()), + ), ) - val e = - assertThrows { - icebergV2Writer.createStreamLoader(stream = stream) - } - assertTrue( - e.message?.startsWith("Table schema fields are different than catalog schema") ?: false - ) + icebergV2Writer.createStreamLoader(stream = stream) + + verify(exactly = 0) { updateSchema.deleteColumn(any()) } + verify(exactly = 0) { updateSchema.updateColumn(any(), any()) } + verify(exactly = 0) { updateSchema.makeColumnOptional(any()) } + verify { updateSchema.addColumn(null, "_airbyte_raw_id", Types.StringType.get()) } + verify { updateSchema.addColumn(null, "id", Types.LongType.get()) } + verify { updateSchema.addColumn(null, "_airbyte_meta", any()) } + verify { updateSchema.addColumn(null, "_airbyte_generation_id", Types.LongType.get()) } + verify { updateSchema.addColumn(null, "id", Types.LongType.get()) } + verify { updateSchema.commit() } + verify { table.refresh() } } @Test @@ -222,7 +259,7 @@ internal class IcebergV2WriterTest { linkedMapOf( "id" to FieldType(IntegerType, nullable = false), "name" to FieldType(StringType, nullable = true), - ) + ), ), generationId = 1, minimumGenerationId = 1, @@ -252,18 +289,18 @@ internal class IcebergV2WriterTest { 10, false, "change", - Types.StringType.get() + Types.StringType.get(), ), Types.NestedField.of( 11, false, "reason", - Types.StringType.get() - ) - ) - ) - ) - ) + Types.StringType.get(), + ), + ), + ), + ), + ), ), Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()), ) @@ -306,6 +343,11 @@ internal class IcebergV2WriterTest { icebergTableWriterFactory = icebergTableWriterFactory, icebergConfiguration = icebergConfiguration, icebergUtil = icebergUtil, + icebergTableSynchronizer = + IcebergTableSynchronizer( + IcebergTypesComparator(), + IcebergSuperTypeFinder(IcebergTypesComparator()), + ), ) val e = assertThrows {