From a0f15ff978860cdccd7b8164be79a5fd737124f2 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Mon, 30 Sep 2024 17:07:34 -0700 Subject: [PATCH] cdk-java: add file transfer mount to DestinationAcceptanceTest --- .../destination/DestinationAcceptanceTest.kt | 6 ++-- .../data_type_array_object_test_messages.txt | 4 +-- .../v1/data_type_array_test_messages.txt | 4 +-- .../v1/data_type_basic_test_messages.txt | 4 +-- .../v1/data_type_object_test_messages.txt | 4 +-- .../resources/v1/edge_case_messages.txt | 4 +-- .../resources/v1/exchange_rate_messages.txt | 8 ++--- .../number_data_type_array_test_messages.txt | 2 +- .../v1/number_data_type_test_messages.txt | 4 +-- .../s3/S3DestinationAcceptanceTest.kt | 7 ++-- .../connectors/destination-s3/build.gradle | 2 +- .../connectors/destination-s3/metadata.yaml | 2 +- .../destination/s3/S3Destination.kt | 9 ----- .../s3/S3StorageOperationsIntegrationTest.kt | 34 ++++++++++++++----- docs/integrations/destinations/s3.md | 1 + 15 files changed, 54 insertions(+), 41 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index 8c2e3444336d..d3dfc0d5c794 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -101,6 +101,7 @@ abstract class DestinationAcceptanceTest( protected var testSchemas: HashSet = HashSet() private lateinit var testEnv: TestDestinationEnv + private set protected open val isCloudTest: Boolean = true protected val featureFlags: FeatureFlags = if (isCloudTest) { @@ -2082,7 +2083,7 @@ abstract class DestinationAcceptanceTest( val stateToCount = mutableMapOf() messages.fold(0) { acc, message -> if (message.type == Type.STATE) { - stateToCount[message.state.data] = acc + stateToCount[message.state.global.sharedState] = acc 0 } else { acc + 1 @@ -2092,7 +2093,8 @@ abstract class DestinationAcceptanceTest( expected.forEach { message -> val clone = message.state clone.destinationStats = - AirbyteStateStats().withRecordCount(stateToCount[clone.data]!!.toDouble()) + AirbyteStateStats() + .withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble()) message.state = clone } } else { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt index 20f49b274070..c7302afbb860 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt @@ -1,3 +1,3 @@ {"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" } ] }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt index 499f275d2293..ae9c362e568d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt @@ -1,3 +1,3 @@ {"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "string_array" : ["foo bar", "some random special characters: ࠈൡሗ"], "array_date" : ["2021-01-23", "1504-02-29"], "array_timestamp_with_timezone" : ["2022-11-22T01:23:45+05:00", "9999-12-21T01:23:45-05:00"], "array_timestamp_without_timezone" : ["2022-11-22T01:23:45", "1504-02-29T01:23:45"], "array_number" : ["56.78", "0", "-12345.678"], "array_integer" : ["42", "0", "12345"], "array_boolean" : [true, false], "array_binary_data" : ["dGVzdA=="] }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt index 6223ae1557c7..2fce53790090 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt @@ -9,9 +9,9 @@ {"type": "RECORD", "record": {"stream": "integer_test_1", "emitted_at": 1602637589100, "data": { "data" : "-12345" }}} {"type": "RECORD", "record": {"stream": "boolean_test_1", "emitted_at": 1602637589200, "data": { "data" : true }}} {"type": "RECORD", "record": {"stream": "binary_test_1", "emitted_at": 1602637589300, "data": { "data" : "dGVzdA==" }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "string_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "number_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "integer_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "boolean_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt index d09fb2903698..5246fde74a76 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt @@ -1,3 +1,3 @@ {"type": "RECORD", "record": {"stream": "object_test_1", "emitted_at": 1602637589100, "data": {"property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt index 11fe9b7d38d9..7fbe03cac9a2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt @@ -15,7 +15,7 @@ {"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : "202", "next_field_name" : "next_field_name_2" }}} {"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : "203" }}} {"type": "RECORD", "record": {"stream": "stream_with_binary_data", "emitted_at": 1602637589500, "data": { "some_id" : "303", "binary_field_name":"dGVzdA==" }}} -{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}} +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "streamWithCamelCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_underscores"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_edge_case_field_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} @@ -27,4 +27,4 @@ {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name_next"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_binary_data"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt index 5c15c041f22f..b99ba789350f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt @@ -1,9 +1,9 @@ {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": "1", "currency": "USD", "date": "2020-08-29T00:00:00Z", "NZD": "0.12", "HKD": "2.13"}}} -{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-08-31"}}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": "1", "currency": "USD", "date": "2020-08-30T00:00:00Z", "NZD": "1.14", "HKD": "7.15"}}} -{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-01"}}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.99", "USD": "10.99"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": "2", "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}} -{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}} -{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}} +{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_array_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_array_test_messages.txt index c8e6efff6256..f23d38bcca1d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_array_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_array_test_messages.txt @@ -1,3 +1,3 @@ {"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "array_number" : ["-12345.678", "100000000000000000.1234"],"array_float" : ["-12345.678", "0", "1000000000000000000000000000000000000000000000000000.1234"], "array_integer" : ["42", "0", "12345"]}}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} {"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_test_messages.txt index a1a68b77a193..8b495ac3d93b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/number_data_type_test_messages.txt @@ -7,7 +7,7 @@ {"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589100, "data": { "data" : "10000000000000000000000.1234" }}} {"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589200, "data": { "data" : "0" }}} {"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589300, "data": { "data" : "-12345.678" }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} {"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "int_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}} {"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "float_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}} -{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "default_number_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}} \ No newline at end of file +{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "default_number_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}} diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 3bd38d2e0857..8095464a4d62 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -260,7 +260,10 @@ protected constructor( .withType(AirbyteMessage.Type.STATE) .withState( AirbyteStateMessage() - .withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))), + .withGlobal( + AirbyteGlobalState() + .withSharedState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))) + ) ), AirbyteMessage() .withType(AirbyteMessage.Type.TRACE) @@ -579,7 +582,7 @@ protected constructor( * unrelated catalog sync data is untouched too. */ @Test - fun testOverwriteSyncWithGenerationId() { + open fun testOverwriteSyncWithGenerationId() { assumeTrue( implementsOverwrite(), "Destination's spec.json does not support overwrite sync mode." diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index d47914ebb801..b5faddcddae5 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.46.1' features = ['db-destinations', 's3-destinations'] - useLocalCdk = false + useLocalCdk = true } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 3dfdd600bf72..727d7c75c210 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.2.1 + dockerImageTag: 1.3.0 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt index 89173b904b09..9968bd366d9b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.s3 import com.google.common.annotations.VisibleForTesting -import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.s3.BaseS3Destination import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory import io.airbyte.cdk.integrations.destination.s3.StorageProvider @@ -21,12 +20,4 @@ open class S3Destination : BaseS3Destination { override fun storageProvider(): StorageProvider { return StorageProvider.AWS_S3 } - - companion object { - @Throws(Exception::class) - @JvmStatic - fun main(args: Array) { - IntegrationRunner(S3Destination()).run(args) - } - } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3StorageOperationsIntegrationTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3StorageOperationsIntegrationTest.kt index 21910cd99278..6a6def73e3c4 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3StorageOperationsIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3StorageOperationsIntegrationTest.kt @@ -91,16 +91,20 @@ class S3StorageOperationsIntegrationTest { objectPrefix: String, numOfRecords: Int, generationId: Long - ) { + ): Set { log.info { "Uploading $numOfRecords test objects" } + val insertedFiles: MutableSet = mutableSetOf() for (i in 1..numOfRecords) { - s3StorageOperations.uploadRecordsToBucket( - createStringBuffer("DummyStringToWrite"), - namespace, - objectPrefix, - generationId + insertedFiles.add( + s3StorageOperations.uploadRecordsToBucket( + createStringBuffer("DummyStringToWrite"), + namespace, + objectPrefix, + generationId + ) ) } + return insertedFiles } @Test @@ -187,6 +191,7 @@ class S3StorageOperationsIntegrationTest { outputFormat ) var expectedNumberOfObjects = 0 + val objectsByGenId: MutableMap> = mutableMapOf() if (numberOfGens != 0) { for (i in 1..numberOfGens) { val numberOfObjects = @@ -195,7 +200,8 @@ class S3StorageOperationsIntegrationTest { else -> randomNumber } expectedNumberOfObjects += numberOfObjects - uploadTestRecords(namespace, fullObjectPath, numberOfObjects, i.toLong()) + objectsByGenId[i] = + uploadTestRecords(namespace, fullObjectPath, numberOfObjects, i.toLong()) } } @@ -204,7 +210,13 @@ class S3StorageOperationsIntegrationTest { // This is used to assert non-existent case where currentGen gets decreased from highest Gen if (currentGen > numberOfGens) { val currentGenObjectSize = random.nextInt(5) - uploadTestRecords(namespace, fullObjectPath, currentGenObjectSize, currentGen.toLong()) + objectsByGenId[currentGen] = + uploadTestRecords( + namespace, + fullObjectPath, + currentGenObjectSize, + currentGen.toLong() + ) } val existingObjects = @@ -215,7 +227,11 @@ class S3StorageOperationsIntegrationTest { outputFormat, currentGen.toLong() ) - assertEquals(expectedNumberOfObjects, existingObjects.size) + assertEquals( + expectedNumberOfObjects, + existingObjects.size, + "currentGen = $currentGen. Expected $expectedNumberOfObjects (${objectsByGenId}), got $existingObjects" + ) } private fun createStringBuffer(strData: String): SerializableBuffer { diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 25dfc19257da..02a69c64cb21 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | enable file transfer | | 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | | 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | | 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |