Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Dec 24, 2024
1 parent e681e01 commit 64502c6
Showing 1 changed file with 66 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergTableWriterFactory
import io.airbyte.integrations.destination.iceberg.v2.io.IcebergUtil
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.runs
import io.mockk.verify
import org.apache.iceberg.Schema
import org.apache.iceberg.Table
import org.apache.iceberg.UpdateSchema
import org.apache.iceberg.catalog.Catalog
import org.apache.iceberg.types.Type.PrimitiveType
import org.apache.iceberg.types.Types
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertTrue
Expand All @@ -50,7 +55,7 @@ internal class IcebergV2WriterTest {
linkedMapOf(
"id" to FieldType(IntegerType, nullable = true),
"name" to FieldType(StringType, nullable = true),
)
),
),
generationId = 1,
minimumGenerationId = 1,
Expand Down Expand Up @@ -80,18 +85,18 @@ internal class IcebergV2WriterTest {
10,
false,
"change",
Types.StringType.get()
Types.StringType.get(),
),
Types.NestedField.of(
11,
false,
"reason",
Types.StringType.get()
)
)
)
)
)
Types.StringType.get(),
),
),
),
),
),
),
Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()),
)
Expand Down Expand Up @@ -134,6 +139,11 @@ internal class IcebergV2WriterTest {
icebergTableWriterFactory = icebergTableWriterFactory,
icebergConfiguration = icebergConfiguration,
icebergUtil = icebergUtil,
icebergTableSynchronizer =
IcebergTableSynchronizer(
IcebergTypesComparator(),
IcebergSuperTypeFinder(IcebergTypesComparator()),
),
)
val streamLoader = icebergV2Writer.createStreamLoader(stream = stream)
assertNotNull(streamLoader)
Expand All @@ -151,7 +161,7 @@ internal class IcebergV2WriterTest {
linkedMapOf(
"id" to FieldType(IntegerType, nullable = true),
"name" to FieldType(StringType, nullable = true),
)
),
),
generationId = 1,
minimumGenerationId = 1,
Expand Down Expand Up @@ -184,6 +194,23 @@ internal class IcebergV2WriterTest {
}
val catalog: Catalog = mockk()
val table: Table = mockk { every { schema() } returns icebergSchema }
val updateSchema: UpdateSchema = mockk()
every { table.updateSchema().allowIncompatibleChanges() } returns updateSchema
every {
updateSchema.updateColumn(
any<String>(),
any<PrimitiveType>(),
)
} returns updateSchema
every {
updateSchema.addColumn(
any<String>(),
any<String>(),
any<PrimitiveType>(),
)
} returns updateSchema
every { updateSchema.commit() } just runs
every { table.refresh() } just runs
val icebergUtil: IcebergUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
Expand All @@ -199,14 +226,24 @@ internal class IcebergV2WriterTest {
icebergTableWriterFactory = icebergTableWriterFactory,
icebergConfiguration = icebergConfiguration,
icebergUtil = icebergUtil,
icebergTableSynchronizer =
IcebergTableSynchronizer(
IcebergTypesComparator(),
IcebergSuperTypeFinder(IcebergTypesComparator()),
),
)
val e =
assertThrows<IllegalArgumentException> {
icebergV2Writer.createStreamLoader(stream = stream)
}
assertTrue(
e.message?.startsWith("Table schema fields are different than catalog schema") ?: false
)
icebergV2Writer.createStreamLoader(stream = stream)

verify(exactly = 0) { updateSchema.deleteColumn(any()) }
verify(exactly = 0) { updateSchema.updateColumn(any(), any<PrimitiveType>()) }
verify(exactly = 0) { updateSchema.makeColumnOptional(any()) }
verify { updateSchema.addColumn(null, "_airbyte_raw_id", Types.StringType.get()) }
verify { updateSchema.addColumn(null, "id", Types.LongType.get()) }
verify { updateSchema.addColumn(null, "_airbyte_meta", any()) }
verify { updateSchema.addColumn(null, "_airbyte_generation_id", Types.LongType.get()) }
verify { updateSchema.addColumn(null, "id", Types.LongType.get()) }
verify { updateSchema.commit() }
verify { table.refresh() }
}

@Test
Expand All @@ -222,7 +259,7 @@ internal class IcebergV2WriterTest {
linkedMapOf(
"id" to FieldType(IntegerType, nullable = false),
"name" to FieldType(StringType, nullable = true),
)
),
),
generationId = 1,
minimumGenerationId = 1,
Expand Down Expand Up @@ -252,18 +289,18 @@ internal class IcebergV2WriterTest {
10,
false,
"change",
Types.StringType.get()
Types.StringType.get(),
),
Types.NestedField.of(
11,
false,
"reason",
Types.StringType.get()
)
)
)
)
)
Types.StringType.get(),
),
),
),
),
),
),
Types.NestedField.of(12, false, COLUMN_NAME_AB_GENERATION_ID, Types.LongType.get()),
)
Expand Down Expand Up @@ -306,6 +343,11 @@ internal class IcebergV2WriterTest {
icebergTableWriterFactory = icebergTableWriterFactory,
icebergConfiguration = icebergConfiguration,
icebergUtil = icebergUtil,
icebergTableSynchronizer =
IcebergTableSynchronizer(
IcebergTypesComparator(),
IcebergSuperTypeFinder(IcebergTypesComparator()),
),
)
val e =
assertThrows<IllegalArgumentException> {
Expand Down

0 comments on commit 64502c6

Please sign in to comment.