From 7bed45e4cab49ef2f0cbd56c87b03455e0686700 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 25 Jun 2024 17:30:37 -0700 Subject: [PATCH] implement redshift refreshes --- .../BaseSqlGeneratorIntegrationTest.kt | 3 + .../destination-redshift/build.gradle | 4 +- .../destination-redshift/metadata.yaml | 3 +- .../redshift/RedshiftDestination.kt | 9 +- .../RedshiftStagingStorageOperation.kt | 91 ++++++-- .../RedshiftDestinationHandler.kt | 32 ++- .../RedshiftS3StagingStorageOperationTest.kt | 217 ++++++++++++++++++ .../AbstractRedshiftTypingDedupingTest.kt | 42 +++- ...refresh_append_with_new_gen_id_final.jsonl | 9 + ...resh_overwrite_with_new_gen_id_final.jsonl | 3 + ...efresh_overwrite_with_new_gen_id_raw.jsonl | 3 + ..._expectedrecords_with_new_gen_id_raw.jsonl | 10 + docs/integrations/destinations/redshift.md | 1 + 13 files changed, 391 insertions(+), 36 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingStorageOperationTest.kt create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl create mode 100644 airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index ea4626e29b68..1494d8bdf58a 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -1814,6 +1814,9 @@ abstract class BaseSqlGeneratorIntegrationTest = jdbcDatabase.queryJsons(sql) + private fun toJdbcTypeName(airbyteProtocolType: AirbyteProtocolType): String { return when (airbyteProtocolType) { AirbyteProtocolType.STRING -> "varchar" diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingStorageOperationTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingStorageOperationTest.kt new file mode 100644 index 000000000000..63c22a169cba --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/RedshiftS3StagingStorageOperationTest.kt @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.redshift + +import com.amazon.redshift.util.RedshiftException +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage +import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat +import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig +import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations +import io.airbyte.cdk.integrations.destination.staging.StagingSerializedBufferFactory +import io.airbyte.commons.json.Jsons +import io.airbyte.commons.string.Strings +import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation.Companion.TMP_TABLE_SUFFIX +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.destination.redshift.operation.RedshiftStagingStorageOperation +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler +import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator +import io.airbyte.integrations.destination.redshift.util.RedshiftUtil +import io.airbyte.protocol.models.v0.AirbyteMessage.Type +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.nio.file.Files +import java.nio.file.Path +import java.util.Optional +import kotlin.test.assertEquals +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import org.junit.jupiter.api.parallel.Execution +import org.junit.jupiter.api.parallel.ExecutionMode + +@Execution(ExecutionMode.CONCURRENT) +class RedshiftS3StagingStorageOperationTest { + private val randomString = Strings.addRandomSuffix("", "", 10) + private val streamId = + StreamId( + finalNamespace = "final_namespace_$randomString", + finalName = "final_name_$randomString", + rawNamespace = "raw_namespace_$randomString", + rawName = "raw_name_$randomString", + originalNamespace = "original_namespace_$randomString", + originalName = "original_name_$randomString", + ) + private val streamConfig = + StreamConfig( + streamId, + DestinationSyncMode.APPEND, + emptyList(), + Optional.empty(), + LinkedHashMap(), + GENERATION_ID, + 0, + SYNC_ID, + ) + private val storageOperation = + RedshiftStagingStorageOperation( + s3Config, + keepStagingFiles = false, + s3StorageOperations, + RedshiftSqlGenerator(RedshiftSQLNameTransformer(), config), + RedshiftDestinationHandler(databaseName, jdbcDatabase, streamId.rawNamespace) + ) + + @BeforeEach + fun setup() { + jdbcDatabase.execute("CREATE SCHEMA ${streamId.rawNamespace}") + } + + @AfterEach + fun teardown() { + jdbcDatabase.execute("DROP SCHEMA ${streamId.rawNamespace} CASCADE") + } + + @Test + fun testTransferStage() { + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, TMP_TABLE_SUFFIX) + // Table is currently empty, so expect null generation. + assertEquals(null, storageOperation.getStageGeneration(streamId, TMP_TABLE_SUFFIX)) + + // Write one record to the real raw table + writeRecords(suffix = "", record(1)) + assertEquals( + listOf("""{"record_number":1}"""), + // We write the raw data as a string column, not a JSON column, so use asText(). + dumpRawRecords("").map { it["_airbyte_data"].asText() }, + ) + + // And write one record to the temp final table + writeRecords(suffix = TMP_TABLE_SUFFIX, record(2)) + assertEquals( + listOf("""{"record_number":2}"""), + dumpRawRecords(TMP_TABLE_SUFFIX).map { it["_airbyte_data"].asText() }, + ) + assertEquals(GENERATION_ID, storageOperation.getStageGeneration(streamId, TMP_TABLE_SUFFIX)) + + // If we transfer the records, we should end up with 2 records in the real raw table. + storageOperation.transferFromTempStage(streamId, TMP_TABLE_SUFFIX) + assertEquals( + listOf( + """{"record_number":1}""", + """{"record_number":2}""", + ), + dumpRawRecords("") + .sortedBy { + Jsons.deserialize(it["_airbyte_data"].asText())["record_number"].asLong() + } + .map { it["_airbyte_data"].asText() }, + ) + + // After transferring the records to the real table, the temp table should no longer exist. + assertEquals( + """ERROR: relation "${streamId.rawNamespace}.${streamId.rawName}$TMP_TABLE_SUFFIX" does not exist""", + assertThrows { dumpRawRecords(TMP_TABLE_SUFFIX) }.message, + ) + } + + @Test + fun testOverwriteStage() { + // If we then create another temp raw table and _overwrite_ the real raw table, + // we should end up with a single raw record. + storageOperation.prepareStage(streamId, "") + storageOperation.prepareStage(streamId, TMP_TABLE_SUFFIX) + writeRecords(suffix = "", record(3)) + writeRecords(suffix = TMP_TABLE_SUFFIX, record(4)) + + storageOperation.overwriteStage(streamId, TMP_TABLE_SUFFIX) + + assertEquals( + listOf("""{"record_number":4}"""), + dumpRawRecords("").map { it["_airbyte_data"].asText() }, + ) + assertEquals( + """ERROR: relation "${streamId.rawNamespace}.${streamId.rawName}$TMP_TABLE_SUFFIX" does not exist""", + assertThrows { dumpRawRecords(TMP_TABLE_SUFFIX) }.message, + ) + } + + private fun dumpRawRecords(suffix: String): List { + return jdbcDatabase.queryJsons( + "SELECT * FROM ${streamId.rawNamespace}.${streamId.rawName}$suffix" + ) + } + + private fun record(recordNumber: Int): PartialAirbyteMessage { + val serializedData = """{"record_number":$recordNumber}""" + return PartialAirbyteMessage() + .withType(Type.RECORD) + .withSerialized(serializedData) + .withRecord( + PartialAirbyteRecordMessage() + .withNamespace(streamId.originalNamespace) + .withStream(streamId.originalName) + .withEmittedAt(10_000) + .withMeta( + AirbyteRecordMessageMeta() + .withChanges(emptyList()) + .withAdditionalProperty( + JavaBaseConstants.AIRBYTE_META_SYNC_ID_KEY, + SYNC_ID, + ), + ) + .withData(Jsons.deserialize(serializedData)), + ) + } + + /** + * Utility method to create the SerializableBuffer, write records into it, and then push that + * buffer into [storageOperation]. + */ + private fun writeRecords(suffix: String, vararg records: PartialAirbyteMessage) { + val writeBuffer = + StagingSerializedBufferFactory.initializeBuffer( + FileUploadFormat.CSV, + JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION + ) + + writeBuffer.use { + records.forEach { record: PartialAirbyteMessage -> + it.accept( + record.serialized!!, + Jsons.serialize(record.record!!.meta), + GENERATION_ID, + record.record!!.emittedAt + ) + } + it.flush() + storageOperation.writeToStage(streamConfig, suffix, writeBuffer) + } + } + + companion object { + private val config = + Jsons.deserialize(Files.readString(Path.of("secrets/1s1t_config_staging.json"))) + private val s3Config = + S3DestinationConfig.getS3DestinationConfig(RedshiftUtil.findS3Options(config)) + private val s3StorageOperations = + S3StorageOperations(RedshiftSQLNameTransformer(), s3Config.getS3Client(), s3Config) + private val jdbcDatabase = + RedshiftDestination().run { + val dataSource = getDataSource(config) + getDatabase(dataSource) + } + private val databaseName = config[JdbcUtils.DATABASE_KEY].asText() + + private const val SYNC_ID = 12L + private const val GENERATION_ID = 42L + } +} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.kt index 6f745e418f79..82441f60612c 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.kt +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/kotlin/io/airbyte/integrations/destination/redshift/typing_deduping/AbstractRedshiftTypingDedupingTest.kt @@ -11,7 +11,6 @@ import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest import io.airbyte.commons.json.Jsons.deserialize import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest -import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest.Companion import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator import io.airbyte.integrations.destination.redshift.RedshiftDestination import io.airbyte.integrations.destination.redshift.RedshiftSQLNameTransformer @@ -236,6 +235,9 @@ abstract class AbstractRedshiftTypingDedupingTest : JdbcTypingDedupingTest() { catalog, messages1, "airbyte/destination-redshift:3.1.1", + // Old connector version can't handle TRACE messages; disable the + // stream status message + streamStatus = null, ) // Second sync @@ -286,15 +288,16 @@ abstract class AbstractRedshiftTypingDedupingTest : JdbcTypingDedupingTest() { @Test fun testGenerationIdMigrationForOverwrite() { - val catalog = + // First sync + val catalog1 = ConfiguredAirbyteCatalog() .withStreams( listOf( ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) - .withSyncId(42L) - .withGenerationId(43L) + .withSyncId(41L) + .withGenerationId(42L) .withMinimumGenerationId(0L) .withStream( AirbyteStream() @@ -304,14 +307,37 @@ abstract class AbstractRedshiftTypingDedupingTest : JdbcTypingDedupingTest() { ), ), ) - - // First sync val messages1 = readMessages("dat/sync1_messages.jsonl") - runSync(catalog, messages1, "airbyte/destination-redshift:3.1.1") + runSync( + catalog1, + messages1, + "airbyte/destination-redshift:3.1.1", + // Old connector version can't handle TRACE messages; disable the + // stream status message + streamStatus = null, + ) // Second sync + val catalog2 = + ConfiguredAirbyteCatalog() + .withStreams( + listOf( + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withSyncId(42L) + .withGenerationId(43L) + .withMinimumGenerationId(43L) + .withStream( + AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(BaseTypingDedupingTest.Companion.SCHEMA), + ), + ), + ) val messages2 = readMessages("dat/sync2_messages.jsonl") - runSync(catalog, messages2) + runSync(catalog2, messages2) val expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_overwrite_raw.jsonl") val expectedFinalRecords2 = diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl new file mode 100644 index 000000000000..d14394d3eccb --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_with_new_gen_id_final.jsonl @@ -0,0 +1,9 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} + +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000Z"} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl new file mode 100644 index 000000000000..bc2849224699 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_final.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000Z"} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl new file mode 100644 index 000000000000..f2e286786eaa --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_with_new_gen_id_raw.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl new file mode 100644 index 000000000000..023a15b98ac9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_with_new_gen_id_raw.jsonl @@ -0,0 +1,10 @@ +// We keep the records from the first sync +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 43} +// And append the records from the second sync +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes":[],"sync_id":42}, "_airbyte_generation_id": 44} diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 3404930e527e..4869e2a74271 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -244,6 +244,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.0 | 2024-07-02 | [40567](https://github.com/airbytehq/airbyte/pull/40567) | Support for [refreshes](../../operator-guides/refreshes.md) and resumable full refresh. WARNING: You must upgrade to platform 0.63.0 before upgrading to this connector version. | | 3.2.0 | 2024-07-02 | [40201](https://github.com/airbytehq/airbyte/pull/40201) | Add `_airbyte_generation_id` column, and add `sync_id` to `_airbyte_meta` column | | 3.1.1 | 2024-06-26 | [39008](https://github.com/airbytehq/airbyte/pull/39008) | Internal code changes | | 3.1.0 | 2024-06-26 | [39141](https://github.com/airbytehq/airbyte/pull/39141) | Remove nonfunctional "encrypted staging" option |