Skip to content

Commit

Permalink
cdk-java: add file transfer mount to DestinationAcceptanceTest
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Oct 9, 2024
1 parent bc093d8 commit 78d6139
Show file tree
Hide file tree
Showing 20 changed files with 62 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.airbyte.commons.features.FeatureFlags
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.resources.MoreResources
import io.airbyte.protocol.models.v0.ConnectorSpecification
import java.nio.file.Path

abstract class BaseConnector : Integration {
open val featureFlags: FeatureFlags = EnvVariableFeatureFlags()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ abstract class DestinationAcceptanceTest(
protected var testSchemas: HashSet<String> = HashSet()

private lateinit var testEnv: TestDestinationEnv
protected var fileTransferMountSource: Path? = null
private set
protected open val isCloudTest: Boolean = true
protected val featureFlags: FeatureFlags =
if (isCloudTest) {
Expand Down Expand Up @@ -435,12 +437,15 @@ abstract class DestinationAcceptanceTest(
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java)
testSchemas = HashSet()
setup(testEnv, testSchemas)
fileTransferMountSource =
if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null

processFactory =
DockerProcessFactory(
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource,
"host",
getConnectorEnv()
)
Expand Down Expand Up @@ -2082,7 +2087,7 @@ abstract class DestinationAcceptanceTest(
val stateToCount = mutableMapOf<JsonNode, Int>()
messages.fold(0) { acc, message ->
if (message.type == Type.STATE) {
stateToCount[message.state.data] = acc
stateToCount[message.state.global.sharedState] = acc
0
} else {
acc + 1
Expand All @@ -2092,7 +2097,7 @@ abstract class DestinationAcceptanceTest(
expected.forEach { message ->
val clone = message.state
clone.destinationStats =
AirbyteStateStats().withRecordCount(stateToCount[clone.data]!!.toDouble())
AirbyteStateStats().withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble())
message.state = clone
}
} else {
Expand Down Expand Up @@ -2684,6 +2689,8 @@ abstract class DestinationAcceptanceTest(
return supportsInDestinationNormalization() || normalizationFromDefinition()
}

protected open val supportsFileTransfer: Boolean = false

companion object {
private val RANDOM = Random()
private const val NORMALIZATION_VERSION = "dev"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" } ] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "string_array" : ["foo bar", "some random special characters: ࠈൡሗ"], "array_date" : ["2021-01-23", "1504-02-29"], "array_timestamp_with_timezone" : ["2022-11-22T01:23:45+05:00", "9999-12-21T01:23:45-05:00"], "array_timestamp_without_timezone" : ["2022-11-22T01:23:45", "1504-02-29T01:23:45"], "array_number" : ["56.78", "0", "-12345.678"], "array_integer" : ["42", "0", "12345"], "array_boolean" : [true, false], "array_binary_data" : ["dGVzdA=="] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
{"type": "RECORD", "record": {"stream": "integer_test_1", "emitted_at": 1602637589100, "data": { "data" : "-12345" }}}
{"type": "RECORD", "record": {"stream": "boolean_test_1", "emitted_at": 1602637589200, "data": { "data" : true }}}
{"type": "RECORD", "record": {"stream": "binary_test_1", "emitted_at": 1602637589300, "data": { "data" : "dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "string_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "number_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "integer_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "boolean_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_test_1", "emitted_at": 1602637589100, "data": {"property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : "202", "next_field_name" : "next_field_name_2" }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : "203" }}}
{"type": "RECORD", "record": {"stream": "stream_with_binary_data", "emitted_at": 1602637589500, "data": { "some_id" : "303", "binary_field_name":"dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "streamWithCamelCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_underscores"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_edge_case_field_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Expand All @@ -27,4 +27,4 @@
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name_next"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_binary_data"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": "1", "currency": "USD", "date": "2020-08-29T00:00:00Z", "NZD": "0.12", "HKD": "2.13"}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-08-31"}}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": "1", "currency": "USD", "date": "2020-08-30T00:00:00Z", "NZD": "1.14", "HKD": "7.15"}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-01"}}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.99", "USD": "10.99"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": "2", "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ abstract class AbstractSourceConnectorTest {
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource = null,
"host",
envMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.commons.features

import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.util.function.Function

private val log = KotlinLogging.logger {}
Expand Down Expand Up @@ -46,6 +47,10 @@ class EnvVariableFeatureFlags : FeatureFlags {
return getEnvOrDefault(DEPLOYMENT_MODE, "") { arg: String -> arg }
}

override fun airbyteStagingDirectory(): Path {
return getEnvOrDefault(AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME, DEFAULT_AIRBYTE_STAGING_DIRECTORY) { arg: String -> Path.of(arg) }
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
fun <T> getEnvOrDefault(key: String?, defaultValue: T, parser: Function<String, T>): T {
val value = System.getenv(key)
Expand Down Expand Up @@ -73,5 +78,7 @@ class EnvVariableFeatureFlags : FeatureFlags {
const val STRICT_COMPARISON_NORMALIZATION_TAG: String =
"STRICT_COMPARISON_NORMALIZATION_TAG"
const val DEPLOYMENT_MODE: String = "DEPLOYMENT_MODE"
val DEFAULT_AIRBYTE_STAGING_DIRECTORY: Path = Path.of("/staging/files")
const val AIRBYTE_STAGING_DIRECTORY_PROPERTY_NAME: String = "AIRBYTE_STAGING_DIRECTORY"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.commons.features

import java.nio.file.Path

/**
* Interface that describe which features are activated in airbyte. Currently, the only
* implementation relies on env. Ideally it should be on some DB.
Expand Down Expand Up @@ -51,4 +53,6 @@ interface FeatureFlags {
* @return empty string for the default deployment mode, "CLOUD" for cloud deployment mode.
*/
fun deploymentMode(): String?

fun airbyteStagingDirectory(): Path
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.commons.features

import java.nio.file.Path

open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags {
override fun autoDetectSchema(): Boolean {
return wrapped.autoDetectSchema()
Expand Down Expand Up @@ -36,6 +38,10 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags
return wrapped.deploymentMode()
}

override fun airbyteStagingDirectory(): Path {
return wrapped.airbyteStagingDirectory()
}

companion object {
/** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */
@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Joiner
import com.google.common.base.Strings
import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.BaseConnector
import io.airbyte.commons.features.EnvVariableFeatureFlags
import io.airbyte.commons.io.IOs
import io.airbyte.commons.io.LineGobbler
import io.airbyte.commons.map.MoreMaps
Expand All @@ -30,6 +32,7 @@ class DockerProcessFactory(
private val workspaceRoot: Path,
private val workspaceMountSource: String?,
private val localMountSource: String?,
private val fileTransferMountSource: Path?,
private val networkName: String?,
private val envMap: Map<String, String>
) : ProcessFactory {
Expand Down Expand Up @@ -125,6 +128,11 @@ class DockerProcessFactory(
cmd.add(String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION))
}

if (fileTransferMountSource != null) {
cmd.add("-v")
cmd.add("$fileTransferMountSource:${EnvVariableFeatureFlags.DEFAULT_AIRBYTE_STAGING_DIRECTORY}")
}

val allEnvMap = MoreMaps.merge(jobMetadata, envMap, additionalEnvironmentVariables)
for ((key, value) in allEnvMap) {
cmd.add("-e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ protected constructor(
override val imageName: String
get() = "airbyte/destination-s3:dev"

override val supportsFileTransfer = true

override fun getDefaultSchema(config: JsonNode): String? {
if (config.has("s3_bucket_path")) {
return config["s3_bucket_path"].asText()
Expand Down Expand Up @@ -260,7 +262,10 @@ protected constructor(
.withType(AirbyteMessage.Type.STATE)
.withState(
AirbyteStateMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))),
.withGlobal(
AirbyteGlobalState()
.withSharedState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))
)
),
AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
Expand Down Expand Up @@ -579,7 +584,7 @@ protected constructor(
* unrelated catalog sync data is untouched too.
*/
@Test
fun testOverwriteSyncWithGenerationId() {
open fun testOverwriteSyncWithGenerationId() {
assumeTrue(
implementsOverwrite(),
"Destination's spec.json does not support overwrite sync mode."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,7 @@ abstract class BaseTypingDedupingTest {
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
fileTransferMountSource = null,
"host",
emptyMap()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.46.1'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.2.1
dockerImageTag: 1.3.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.airbyte.integrations.destination.s3

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.destination.s3.BaseS3Destination
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory
import io.airbyte.cdk.integrations.destination.s3.StorageProvider
Expand All @@ -21,12 +20,4 @@ open class S3Destination : BaseS3Destination {
override fun storageProvider(): StorageProvider {
return StorageProvider.AWS_S3
}

companion object {
@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) {
IntegrationRunner(S3Destination()).run(args)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.s3

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest
import org.junit.jupiter.api.Test

class S3CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() {
override val baseConfigJson: JsonNode
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | enable file transfer |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |
Expand Down

0 comments on commit 78d6139

Please sign in to comment.