Skip to content

Commit

Permalink
fix merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Jan 14, 2025
1 parent f9920aa commit b2f8671
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.13
dockerImageTag: 0.2.14
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2
package io.airbyte.integrations.destination.s3_data_lake

import io.airbyte.integrations.destination.iceberg.v2.IcebergTypesComparator.Companion.PARENT_CHILD_SEPARATOR
import io.airbyte.integrations.destination.iceberg.v2.IcebergTypesComparator.Companion.splitIntoParentAndLeaf
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.PARENT_CHILD_SEPARATOR
import io.airbyte.integrations.destination.s3_data_lake.S3DataLakeTypesComparator.Companion.splitIntoParentAndLeaf
import jakarta.inject.Singleton
import org.apache.iceberg.Schema
import org.apache.iceberg.Table
Expand All @@ -26,9 +26,9 @@ import org.apache.iceberg.types.Type.PrimitiveType
* @property superTypeFinder Used to find a common supertype when data types differ.
*/
@Singleton
class IcebergTableSynchronizer(
private val comparator: IcebergTypesComparator,
private val superTypeFinder: IcebergSuperTypeFinder,
class S3DataLakeTableSynchronizer(
private val comparator: S3DataLakeTypesComparator,
private val superTypeFinder: S3DataLakeSuperTypeFinder,
) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class S3DataLakeWriter(
private val s3DataLakeTableWriterFactory: S3DataLakeTableWriterFactory,
private val icebergConfiguration: S3DataLakeConfiguration,
private val s3DataLakeUtil: S3DataLakeUtil,
private val icebergTableSynchronizer: IcebergTableSynchronizer
private val s3DataLakeTableSynchronizer: S3DataLakeTableSynchronizer
) : DestinationWriter {

override fun createStreamLoader(stream: DestinationStream): StreamLoader {
Expand All @@ -40,7 +40,7 @@ class S3DataLakeWriter(
existingSchema = table.schema()
)

icebergTableSynchronizer.applySchemaChanges(table, incomingSchema)
s3DataLakeTableSynchronizer.applySchemaChanges(table, incomingSchema)

return S3DataLakeStreamLoader(
stream = stream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2
package io.airbyte.integrations.destination.s3_data_lake

import io.mockk.every
import io.mockk.just
Expand All @@ -21,21 +21,21 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

/**
* Tests for [IcebergTableSynchronizer].
* Tests for [S3DataLakeTableSynchronizer].
*
* We use a mocked [Table] and [UpdateSchema] to verify that the right calls are made based on the
* computed [IcebergTypesComparator.ColumnDiff].
* computed [S3DataLakeTypesComparator.ColumnDiff].
*/
class IcebergTableSynchronizerTest {
class S3DataLakeTableSynchronizerTest {

// Mocks
private lateinit var mockTable: Table
private lateinit var mockUpdateSchema: UpdateSchema

// Collaborators under test
private val comparator = spyk(IcebergTypesComparator())
private val superTypeFinder = spyk(IcebergSuperTypeFinder(comparator))
private val synchronizer = IcebergTableSynchronizer(comparator, superTypeFinder)
private val comparator = spyk(S3DataLakeTypesComparator())
private val superTypeFinder = spyk(S3DataLakeSuperTypeFinder(comparator))
private val synchronizer = S3DataLakeTableSynchronizer(comparator, superTypeFinder)

@BeforeEach
fun setUp() {
Expand Down Expand Up @@ -73,7 +73,7 @@ class IcebergTableSynchronizerTest {
// The comparator will see no changes
every { comparator.compareSchemas(incomingSchema, existingSchema) } answers
{
IcebergTypesComparator.ColumnDiff()
S3DataLakeTypesComparator.ColumnDiff()
}

val result = synchronizer.applySchemaChanges(mockTable, incomingSchema)
Expand Down Expand Up @@ -213,7 +213,7 @@ class IcebergTableSynchronizerTest {
val incomingSchema = buildSchema() // Not too relevant, since we expect an exception

every { mockTable.schema() } returns existingSchema
val diff = IcebergTypesComparator.ColumnDiff(newColumns = mutableListOf("outer~inner~leaf"))
val diff = S3DataLakeTypesComparator.ColumnDiff(newColumns = mutableListOf("outer~inner~leaf"))
every { comparator.compareSchemas(incomingSchema, existingSchema) } returns diff

assertThatThrownBy { synchronizer.applySchemaChanges(mockTable, incomingSchema) }
Expand All @@ -236,7 +236,7 @@ class IcebergTableSynchronizerTest {

every { mockTable.schema() } returns existingSchema
val diff =
IcebergTypesComparator.ColumnDiff(updatedDataTypes = mutableListOf("complex_col"))
S3DataLakeTypesComparator.ColumnDiff(updatedDataTypes = mutableListOf("complex_col"))
every { comparator.compareSchemas(incomingSchema, existingSchema) } returns diff

// Let superTypeFinder return a struct type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ internal class S3DataLakeWriterTest {
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
icebergConfiguration = icebergConfiguration,
s3DataLakeUtil = s3DataLakeUtil,
icebergTableSynchronizer =
IcebergTableSynchronizer(
IcebergTypesComparator(),
IcebergSuperTypeFinder(IcebergTypesComparator()),
s3DataLakeTableSynchronizer =
S3DataLakeTableSynchronizer(
S3DataLakeTypesComparator(),
S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()),
)
)
val streamLoader = s3DataLakeWriter.createStreamLoader(stream = stream)
Expand Down Expand Up @@ -211,7 +211,7 @@ internal class S3DataLakeWriterTest {
} returns updateSchema
every { updateSchema.commit() } just runs
every { table.refresh() } just runs
val icebergUtil: IcebergUtil = mockk {
val s3DataLakeUtil: S3DataLakeUtil = mockk {
every { createCatalog(any(), any()) } returns catalog
every { createTable(any(), any(), any(), any()) } returns table
every { toCatalogProperties(any()) } returns mapOf()
Expand All @@ -225,14 +225,14 @@ internal class S3DataLakeWriterTest {
S3DataLakeWriter(
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
icebergConfiguration = icebergConfiguration,
icebergUtil = icebergUtil,
icebergTableSynchronizer =
IcebergTableSynchronizer(
IcebergTypesComparator(),
IcebergSuperTypeFinder(IcebergTypesComparator()),
s3DataLakeUtil = s3DataLakeUtil,
s3DataLakeTableSynchronizer =
S3DataLakeTableSynchronizer(
S3DataLakeTypesComparator(),
S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()),
),
)
icebergV2Writer.createStreamLoader(stream = stream)
s3DataLakeWriter.createStreamLoader(stream = stream)

verify(exactly = 0) { updateSchema.deleteColumn(any()) }
verify(exactly = 0) { updateSchema.updateColumn(any(), any<PrimitiveType>()) }
Expand Down Expand Up @@ -342,11 +342,11 @@ internal class S3DataLakeWriterTest {
S3DataLakeWriter(
s3DataLakeTableWriterFactory = s3DataLakeTableWriterFactory,
icebergConfiguration = icebergConfiguration,
icebergUtil = icebergUtil,
icebergTableSynchronizer =
IcebergTableSynchronizer(
IcebergTypesComparator(),
IcebergSuperTypeFinder(IcebergTypesComparator()),
s3DataLakeUtil = s3DataLakeUtil,
s3DataLakeTableSynchronizer =
S3DataLakeTableSynchronizer(
S3DataLakeTypesComparator(),
S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()),
),
)
val e =
Expand Down
21 changes: 11 additions & 10 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ for more information.
<details>
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------|
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------|
| 0.2.14 | 2025-01-14 | [\#50413](https://github.com/airbytehq/airbyte/pull/50413) | Update existing table schema based on the incoming schema |
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |

</details>

0 comments on commit b2f8671

Please sign in to comment.