From 78d6139fde0bf90df36becd08416135e2c5efd17 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Mon, 30 Sep 2024 17:07:34 -0700 Subject: [PATCH] cdk-java: add file transfer mount to DestinationAcceptanceTest --- .../io/airbyte/cdk/integrations/BaseConnector.kt | 1 + .../destination/DestinationAcceptanceTest.kt | 11 +++++++++-- .../v1/data_type_array_object_test_messages.txt | 4 ++-- .../resources/v1/data_type_array_test_messages.txt | 4 ++-- .../resources/v1/data_type_basic_test_messages.txt | 4 ++-- .../resources/v1/data_type_object_test_messages.txt | 4 ++-- .../testFixtures/resources/v1/edge_case_messages.txt | 4 ++-- .../resources/v1/exchange_rate_messages.txt | 8 ++++---- .../source/AbstractSourceConnectorTest.kt | 1 + .../commons/features/EnvVariableFeatureFlags.kt | 7 +++++++ .../io/airbyte/commons/features/FeatureFlags.kt | 4 ++++ .../airbyte/commons/features/FeatureFlagsWrapper.kt | 6 ++++++ .../airbyte/workers/process/DockerProcessFactory.kt | 8 ++++++++ .../destination/s3/S3DestinationAcceptanceTest.kt | 9 +++++++-- .../typing_deduping/BaseTypingDedupingTest.kt | 1 + .../connectors/destination-s3/build.gradle | 2 +- .../connectors/destination-s3/metadata.yaml | 2 +- .../integrations/destination/s3/S3Destination.kt | 9 --------- .../s3/S3CsvAssumeRoleDestinationAcceptanceTest.kt | 1 + docs/integrations/destinations/s3.md | 1 + 20 files changed, 62 insertions(+), 29 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt index ef1d80ea635f..a937f4c911d2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/BaseConnector.kt @@ -10,6 +10,7 @@ import io.airbyte.commons.features.FeatureFlags import io.airbyte.commons.json.Jsons import io.airbyte.commons.resources.MoreResources import io.airbyte.protocol.models.v0.ConnectorSpecification +import java.nio.file.Path abstract class BaseConnector : Integration { open val featureFlags: FeatureFlags = EnvVariableFeatureFlags() diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index 8c2e3444336d..c975a0558818 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -101,6 +101,8 @@ abstract class DestinationAcceptanceTest( protected var testSchemas: HashSet = HashSet() private lateinit var testEnv: TestDestinationEnv + protected var fileTransferMountSource: Path? = null + private set protected open val isCloudTest: Boolean = true protected val featureFlags: FeatureFlags = if (isCloudTest) { @@ -435,12 +437,15 @@ abstract class DestinationAcceptanceTest( mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java) testSchemas = HashSet() setup(testEnv, testSchemas) + fileTransferMountSource = + if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null processFactory = DockerProcessFactory( workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource, "host", getConnectorEnv() ) @@ -2082,7 +2087,7 @@ abstract class DestinationAcceptanceTest( val stateToCount = mutableMapOf() messages.fold(0) { acc, message -> if (message.type == Type.STATE) { - stateToCount[message.state.data] = acc + stateToCount[message.state.global.sharedState] = acc 0 } else { acc + 1 @@ -2092,7 +2097,7 @@ abstract class DestinationAcceptanceTest( expected.forEach { message -> val clone = message.state clone.destinationStats = - AirbyteStateStats().withRecordCount(stateToCount[clone.data]!!.toDouble()) + AirbyteStateStats().withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble()) message.state = clone } } else { @@ -2684,6 +2689,8 @@ abstract class DestinationAcceptanceTest( return supportsInDestinationNormalization() || normalizationFromDefinition() } + protected open val supportsFileTransfer: Boolean = false + companion object { private val RANDOM = Random() private const val NORMALIZATION_VERSION = "dev" diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt index 20f49b274070..c7302afbb860 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_object_test_messages.txt @@ -1,3 +1,3 @@ {"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" } ] }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt index 499f275d2293..ae9c362e568d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_array_test_messages.txt @@ -1,3 +1,3 @@ {"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "string_array" : ["foo bar", "some random special characters: ࠈൡሗ"], "array_date" : ["2021-01-23", "1504-02-29"], "array_timestamp_with_timezone" : ["2022-11-22T01:23:45+05:00", "9999-12-21T01:23:45-05:00"], "array_timestamp_without_timezone" : ["2022-11-22T01:23:45", "1504-02-29T01:23:45"], "array_number" : ["56.78", "0", "-12345.678"], "array_integer" : ["42", "0", "12345"], "array_boolean" : [true, false], "array_binary_data" : ["dGVzdA=="] }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt index 6223ae1557c7..2fce53790090 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_basic_test_messages.txt @@ -9,9 +9,9 @@ {"type": "RECORD", "record": {"stream": "integer_test_1", "emitted_at": 1602637589100, "data": { "data" : "-12345" }}} {"type": "RECORD", "record": {"stream": "boolean_test_1", "emitted_at": 1602637589200, "data": { "data" : true }}} {"type": "RECORD", "record": {"stream": "binary_test_1", "emitted_at": 1602637589300, "data": { "data" : "dGVzdA==" }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "string_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "number_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "integer_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "boolean_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt index d09fb2903698..5246fde74a76 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/data_type_object_test_messages.txt @@ -1,3 +1,3 @@ {"type": "RECORD", "record": {"stream": "object_test_1", "emitted_at": 1602637589100, "data": {"property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" }}} -{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}} +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt index 11fe9b7d38d9..7fbe03cac9a2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/edge_case_messages.txt @@ -15,7 +15,7 @@ {"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : "202", "next_field_name" : "next_field_name_2" }}} {"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : "203" }}} {"type": "RECORD", "record": {"stream": "stream_with_binary_data", "emitted_at": 1602637589500, "data": { "some_id" : "303", "binary_field_name":"dGVzdA==" }}} -{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}} +{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "streamWithCamelCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_underscores"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_edge_case_field_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} @@ -27,4 +27,4 @@ {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name_next"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_binary_data"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} -{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt index 5c15c041f22f..b99ba789350f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/resources/v1/exchange_rate_messages.txt @@ -1,9 +1,9 @@ {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": "1", "currency": "USD", "date": "2020-08-29T00:00:00Z", "NZD": "0.12", "HKD": "2.13"}}} -{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-08-31"}}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": "1", "currency": "USD", "date": "2020-08-30T00:00:00Z", "NZD": "1.14", "HKD": "7.15"}}} -{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}} +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-01"}}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.99", "USD": "10.99"}}} {"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": "2", "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}} -{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}} -{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} \ No newline at end of file +{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}} +{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt index 4761398a496c..a71aca115d2e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", envMap ) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt index 5106ad19f598..1bc37ac4afd6 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt @@ -4,6 +4,7 @@ package io.airbyte.commons.features import io.github.oshai.kotlinlogging.KotlinLogging +import java.nio.file.Path import java.util.function.Function private val log = KotlinLogging.logger {} @@ -46,6 +47,10 @@ class EnvVariableFeatureFlags : FeatureFlags { return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg } } + override fun airbyteStagingDirectory(): Path { + return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, DEFAULT_AIRBYTE_STAGING_DIRECTORY) { arg: String -> Path.of(arg) } + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java fun getEnvOrDefault(key: String?, defaultValue: T, parser: Function): T { val value = System.getenv(key) @@ -73,5 +78,7 @@ class EnvVariableFeatureFlags : FeatureFlags { const val STRICT_COMPARISON_NORMALIZATION_TAG: String = "STRICT_COMPARISON_NORMALIZATION_TAG" const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE" + val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files") + const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY" } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt index a8626b46ec64..91fd2f028a7a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + /** * Interface that describe which features are activated in airbyte. Currently, the only * implementation relies on env. Ideally it should be on some DB. @@ -51,4 +53,6 @@ interface FeatureFlags { * @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode. */ fun deploymentMode(): String? + + fun airbyteStagingDirectory(): Path } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt index 056c6730332c..f339de179b17 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt @@ -3,6 +3,8 @@ */ package io.airbyte.commons.features +import java.nio.file.Path + open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags { override fun autoDetectSchema(): Boolean { return wrapped.autoDetectSchema() @@ -36,6 +38,10 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags return wrapped.deploymentMode() } + override fun airbyteStagingDirectory(): Path { + return wrapped.airbyteStagingDirectory() + } + companion object { /** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */ @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt index e58319655d9d..9ca95e9720a6 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt @@ -7,6 +7,8 @@ import com.google.common.annotations.VisibleForTesting import com.google.common.base.Joiner import com.google.common.base.Strings import com.google.common.collect.Lists +import io.airbyte.cdk.integrations.BaseConnector +import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.io.IOs import io.airbyte.commons.io.LineGobbler import io.airbyte.commons.map.MoreMaps @@ -30,6 +32,7 @@ class DockerProcessFactory( private val workspaceRoot: Path, private val workspaceMountSource: String?, private val localMountSource: String?, + private val fileTransferMountSource: Path?, private val networkName: String?, private val envMap: Map ) : ProcessFactory { @@ -125,6 +128,11 @@ class DockerProcessFactory( cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION)) } + if (fileTransferMountSource != null) { + cmd.add("-v") + cmd.add("$fileTransferMountSource:${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}") + } + val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables) for ((key, value) in allEnvMap) { cmd.add("-e") diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 3bd38d2e0857..4a1bc4b8a32b 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -75,6 +75,8 @@ protected constructor( override val imageName: String get() = "airbyte/destination-s3:dev" + override val supportsFileTransfer = true + override fun getDefaultSchema(config: JsonNode): String? { if (config.has("s3_bucket_path")) { return config["s3_bucket_path"].asText() @@ -260,7 +262,10 @@ protected constructor( .withType(AirbyteMessage.Type.STATE) .withState( AirbyteStateMessage() - .withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))), + .withGlobal( + AirbyteGlobalState() + .withSharedState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))) + ) ), AirbyteMessage() .withType(AirbyteMessage.Type.TRACE) @@ -579,7 +584,7 @@ protected constructor( * unrelated catalog sync data is untouched too. */ @Test - fun testOverwriteSyncWithGenerationId() { + open fun testOverwriteSyncWithGenerationId() { assumeTrue( implementsOverwrite(), "Destination's spec.json does not support overwrite sync mode." diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 1c7c08350d01..dcfdda420636 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -1525,6 +1525,7 @@ abstract class BaseTypingDedupingTest { workspaceRoot, workspaceRoot.toString(), localRoot.toString(), + fileTransferMountSource = null, "host", emptyMap() ) diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index d47914ebb801..b5faddcddae5 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -6,7 +6,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.46.1' features = ['db-destinations', 's3-destinations'] - useLocalCdk = false + useLocalCdk = true } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 3dfdd600bf72..727d7c75c210 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.2.1 + dockerImageTag: 1.3.0 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt index 89173b904b09..9968bd366d9b 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/io/airbyte/integrations/destination/s3/S3Destination.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.destination.s3 import com.google.common.annotations.VisibleForTesting -import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.s3.BaseS3Destination import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory import io.airbyte.cdk.integrations.destination.s3.StorageProvider @@ -21,12 +20,4 @@ open class S3Destination : BaseS3Destination { override fun storageProvider(): StorageProvider { return StorageProvider.AWS_S3 } - - companion object { - @Throws(Exception::class) - @JvmStatic - fun main(args: Array) { - IntegrationRunner(S3Destination()).run(args) - } - } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvAssumeRoleDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvAssumeRoleDestinationAcceptanceTest.kt index 20fbe2331407..1e6db1cfe1b9 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvAssumeRoleDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3/S3CsvAssumeRoleDestinationAcceptanceTest.kt @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.s3 import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest +import org.junit.jupiter.api.Test class S3CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override val baseConfigJson: JsonNode diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 25dfc19257da..02a69c64cb21 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | enable file transfer | | 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | | 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | | 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |