Skip to content

Commit

Permalink
cdk-java: add file transfer mount to DestinationAcceptanceTest
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Oct 21, 2024
1 parent b0842a2 commit a0f15ff
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ abstract class DestinationAcceptanceTest(
protected var testSchemas: HashSet<String> = HashSet()

private lateinit var testEnv: TestDestinationEnv
private set
protected open val isCloudTest: Boolean = true
protected val featureFlags: FeatureFlags =
if (isCloudTest) {
Expand Down Expand Up @@ -2082,7 +2083,7 @@ abstract class DestinationAcceptanceTest(
val stateToCount = mutableMapOf<JsonNode, Int>()
messages.fold(0) { acc, message ->
if (message.type == Type.STATE) {
stateToCount[message.state.data] = acc
stateToCount[message.state.global.sharedState] = acc
0
} else {
acc + 1
Expand All @@ -2092,7 +2093,8 @@ abstract class DestinationAcceptanceTest(
expected.forEach { message ->
val clone = message.state
clone.destinationStats =
AirbyteStateStats().withRecordCount(stateToCount[clone.data]!!.toDouble())
AirbyteStateStats()
.withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble())
message.state = clone
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_array_test_1", "emitted_at": 1602637589100, "data": { "property_string" : "qqq", "property_array" : [ { "property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" } ] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "string_array" : ["foo bar", "some random special characters: ࠈൡሗ"], "array_date" : ["2021-01-23", "1504-02-29"], "array_timestamp_with_timezone" : ["2022-11-22T01:23:45+05:00", "9999-12-21T01:23:45-05:00"], "array_timestamp_without_timezone" : ["2022-11-22T01:23:45", "1504-02-29T01:23:45"], "array_number" : ["56.78", "0", "-12345.678"], "array_integer" : ["42", "0", "12345"], "array_boolean" : [true, false], "array_binary_data" : ["dGVzdA=="] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
{"type": "RECORD", "record": {"stream": "integer_test_1", "emitted_at": 1602637589100, "data": { "data" : "-12345" }}}
{"type": "RECORD", "record": {"stream": "boolean_test_1", "emitted_at": 1602637589200, "data": { "data" : true }}}
{"type": "RECORD", "record": {"stream": "binary_test_1", "emitted_at": 1602637589300, "data": { "data" : "dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "string_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "number_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "integer_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "boolean_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "binary_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "RECORD", "record": {"stream": "object_test_1", "emitted_at": 1602637589100, "data": {"property_string": "foo bar", "property_date": "2021-01-23", "property_timestamp_with_timezone": "2022-11-22T01:23:45+00:00", "property_timestamp_without_timezone": "2022-11-22T01:23:45", "property_number": "56.78", "property_integer": "42", "property_boolean": true, "property_binary_data" : "dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "object_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : "202", "next_field_name" : "next_field_name_2" }}}
{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : "203" }}}
{"type": "RECORD", "record": {"stream": "stream_with_binary_data", "emitted_at": 1602637589500, "data": { "some_id" : "303", "binary_field_name":"dGVzdA==" }}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "STATE", "state": {"type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "streamWithCamelCase"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_underscores"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_edge_case_field_names"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Expand All @@ -27,4 +27,4 @@
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_name_next"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "stream_with_binary_data"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "TRACE", "trace": {"type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "STREAM_WITH_ALL_CAPS"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "id": "1", "currency": "USD", "date": "2020-08-29T00:00:00Z", "NZD": "0.12", "HKD": "2.13"}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-08-31"}}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637689100, "data": { "id": "1", "currency": "USD", "date": "2020-08-30T00:00:00Z", "NZD": "1.14", "HKD": "7.15"}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-01"}}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637789200, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637889300, "data": { "id": "2", "currency": "EUR", "date": "2020-08-31T00:00:00Z", "NZD": "1.14", "HKD": "7.99", "USD": "10.99"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637989400, "data": { "id": "2", "currency": "EUR", "date": "2020-09-01T00:00:00Z", "NZD": "1.14", "HKD": "7.15", "USD": "10.16"}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2020-09-02"}}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "exchange_rate"}, "status": "COMPLETE"}, "emitted_at": 1602637589101}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "array_number" : ["-12345.678", "100000000000000000.1234"],"array_float" : ["-12345.678", "0", "1000000000000000000000000000000000000000000000000000.1234"], "array_integer" : ["42", "0", "12345"]}}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "array_test_1"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589100, "data": { "data" : "10000000000000000000000.1234" }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589200, "data": { "data" : "0" }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589300, "data": { "data" : "-12345.678" }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
{"type": "STATE", "state": { "type": "GLOBAL", "global": {"shared_state": {"start_date": "2022-02-14"}}}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "int_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "float_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "default_number_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
{"type": "TRACE", "trace": { "type": "STREAM_STATUS", "stream_status": {"stream_descriptor": {"name": "default_number_test"}, "status": "COMPLETE"}, "emitted_at": 1602637589301}}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ protected constructor(
.withType(AirbyteMessage.Type.STATE)
.withState(
AirbyteStateMessage()
.withData(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))),
.withGlobal(
AirbyteGlobalState()
.withSharedState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))
)
),
AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
Expand Down Expand Up @@ -579,7 +582,7 @@ protected constructor(
* unrelated catalog sync data is untouched too.
*/
@Test
fun testOverwriteSyncWithGenerationId() {
open fun testOverwriteSyncWithGenerationId() {
assumeTrue(
implementsOverwrite(),
"Destination's spec.json does not support overwrite sync mode."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.46.1'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.2.1
dockerImageTag: 1.3.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.airbyte.integrations.destination.s3

import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.destination.s3.BaseS3Destination
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfigFactory
import io.airbyte.cdk.integrations.destination.s3.StorageProvider
Expand All @@ -21,12 +20,4 @@ open class S3Destination : BaseS3Destination {
override fun storageProvider(): StorageProvider {
return StorageProvider.AWS_S3
}

companion object {
@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) {
IntegrationRunner(S3Destination()).run(args)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,20 @@ class S3StorageOperationsIntegrationTest {
objectPrefix: String,
numOfRecords: Int,
generationId: Long
) {
): Set<String> {
log.info { "Uploading $numOfRecords test objects" }
val insertedFiles: MutableSet<String> = mutableSetOf()
for (i in 1..numOfRecords) {
s3StorageOperations.uploadRecordsToBucket(
createStringBuffer("DummyStringToWrite"),
namespace,
objectPrefix,
generationId
insertedFiles.add(
s3StorageOperations.uploadRecordsToBucket(
createStringBuffer("DummyStringToWrite"),
namespace,
objectPrefix,
generationId
)
)
}
return insertedFiles
}

@Test
Expand Down Expand Up @@ -187,6 +191,7 @@ class S3StorageOperationsIntegrationTest {
outputFormat
)
var expectedNumberOfObjects = 0
val objectsByGenId: MutableMap<Int, Set<String>> = mutableMapOf()
if (numberOfGens != 0) {
for (i in 1..numberOfGens) {
val numberOfObjects =
Expand All @@ -195,7 +200,8 @@ class S3StorageOperationsIntegrationTest {
else -> randomNumber
}
expectedNumberOfObjects += numberOfObjects
uploadTestRecords(namespace, fullObjectPath, numberOfObjects, i.toLong())
objectsByGenId[i] =
uploadTestRecords(namespace, fullObjectPath, numberOfObjects, i.toLong())
}
}

Expand All @@ -204,7 +210,13 @@ class S3StorageOperationsIntegrationTest {
// This is used to assert non-existent case where currentGen gets decreased from highest Gen
if (currentGen > numberOfGens) {
val currentGenObjectSize = random.nextInt(5)
uploadTestRecords(namespace, fullObjectPath, currentGenObjectSize, currentGen.toLong())
objectsByGenId[currentGen] =
uploadTestRecords(
namespace,
fullObjectPath,
currentGenObjectSize,
currentGen.toLong()
)
}

val existingObjects =
Expand All @@ -215,7 +227,11 @@ class S3StorageOperationsIntegrationTest {
outputFormat,
currentGen.toLong()
)
assertEquals(expectedNumberOfObjects, existingObjects.size)
assertEquals(
expectedNumberOfObjects,
existingObjects.size,
"currentGen = $currentGen. Expected $expectedNumberOfObjects (${objectsByGenId}), got $existingObjects"
)
}

private fun createStringBuffer(strData: String): SerializableBuffer {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.3.0 | 2024-10-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | enable file transfer |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |
Expand Down

0 comments on commit a0f15ff

Please sign in to comment.