From 04d1404583e89d2cb8b786b697a08b0dbd8797c2 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 10 May 2024 10:08:50 -0700 Subject: [PATCH] throw exception on bad generation id --- .../destination/async/AsyncStreamConsumer.kt | 22 ++++++++-- .../async/AsyncStreamConsumerTest.kt | 44 ++++++++++++++++--- .../typing_deduping/CatalogParser.kt | 14 ++++-- 3 files changed, 66 insertions(+), 14 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt index aa0172c56272..06e8d2171f12 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt @@ -162,13 +162,16 @@ constructor( bufferManager.close() + val unsuccessfulStreams = ArrayList() val streamSyncSummaries = streamNames.associate { streamDescriptor -> - // If we didn't receive a stream status message, assume success. - // Platform won't send us any stream status messages yet (since we're not declaring - // supportsRefresh in metadata), so we will always hit this case. + // If we didn't receive a stream status message, assume failure. + // This is possible if e.g. the orchestrator crashes before sending us the message. val terminalStatusFromSource = - terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE + terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.INCOMPLETE + if (terminalStatusFromSource == AirbyteStreamStatus.INCOMPLETE) { + unsuccessfulStreams.add(streamDescriptor) + } StreamDescriptorUtils.withDefaultNamespace( streamDescriptor, bufferManager.defaultNamespace, @@ -183,6 +186,17 @@ constructor( // as this throws an exception, we need to be after all other close functions. propagateFlushWorkerExceptionIfPresent() logger.info { "${AsyncStreamConsumer::class.java} closed" } + + // In principle, platform should detect this. + // However, as a backstop, the destination should still do this check. + // This handles e.g. platform bugs where we don't receive a stream status message. + // In this case, it would be misleading to mark the sync as successful, because e.g. we + // maybe didn't commit a truncate. + if (unsuccessfulStreams.isNotEmpty()) { + throw RuntimeException( + "Some streams were unsuccessful due to a source error: $unsuccessfulStreams" + ) + } } private fun getRecordCounter(streamDescriptor: StreamDescriptor): AtomicLong { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index f21d0ca6115a..29f1b49d91d0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -148,6 +148,26 @@ class AsyncStreamConsumerTest { ), ), ) + private val STREAM2_SUCCESS_MESSAGE = + Jsons.serialize( + AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace( + AirbyteTraceMessage() + .withType(AirbyteTraceMessage.Type.STREAM_STATUS) + .withStreamStatus( + AirbyteStreamStatusTraceMessage() + .withStreamDescriptor( + StreamDescriptor() + .withName(STREAM_NAME2) + .withNamespace(SCHEMA_NAME), + ) + .withStatus( + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE + ), + ), + ), + ) private val STREAM2_FAILURE_MESSAGE = Jsons.serialize( AirbyteMessage() @@ -262,6 +282,9 @@ class AsyncStreamConsumerTest { consumer.start() consumeRecords(consumer, expectedRecords) consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES) + consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length) + consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length) + consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length) consumer.close() verifyStartAndClose() @@ -298,6 +321,9 @@ class AsyncStreamConsumerTest { consumeRecords(consumer, expectedRecords) consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES) consumer.accept(Jsons.serialize(STATE_MESSAGE2), RECORD_SIZE_20_BYTES) + consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length) + consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length) + consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length) consumer.close() verifyStartAndClose() @@ -334,6 +360,9 @@ class AsyncStreamConsumerTest { consumer.start() consumeRecords(consumer, allRecords) + consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length) + consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length) + consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length) consumer.close() verifyStartAndClose() @@ -496,7 +525,8 @@ class AsyncStreamConsumerTest { consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length) consumer.accept(STREAM2_FAILURE_MESSAGE, STREAM2_FAILURE_MESSAGE.length) consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length) - consumer.close() + // We had a failure, so expect an exception + assertThrows(RuntimeException::class.java) { consumer.close() } val captor: ArgumentCaptor> = ArgumentCaptor.captor() @@ -532,29 +562,29 @@ class AsyncStreamConsumerTest { consumer.start() consumeRecords(consumer, expectedRecords) // Note: no stream status messages - consumer.close() + // We assume stream failure, so expect an exception + assertThrows(RuntimeException::class.java) { consumer.close() } val captor: ArgumentCaptor> = ArgumentCaptor.captor() Mockito.verify(onClose).accept(any(), capture(captor)) assertEquals( - // All streams have a COMPLETE status. - // TODO: change this to INCOMPLETE after we switch the default behavior. + // All streams have an INCOMPLETE status. mapOf( StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) to StreamSyncSummary( expectedRecords.size.toLong(), - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE, ), StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME2) to StreamSyncSummary( 0, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE, ), StreamDescriptor().withNamespace(DEFAULT_NAMESPACE).withName(STREAM_NAME3) to StreamSyncSummary( 0, - AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE, + AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE, ), ), captor.value, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index 88e4f5053b27..6a155c10dd57 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -6,9 +6,11 @@ package io.airbyte.integrations.base.destination.typing_deduping import com.google.common.annotations.VisibleForTesting import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.commons.exceptions.ConfigErrorException import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import io.airbyte.protocol.models.v0.DestinationSyncMode import io.github.oshai.kotlinlogging.KotlinLogging import java.util.Optional import java.util.function.Consumer @@ -32,6 +34,12 @@ constructor( if (it.stream.namespace.isNullOrEmpty()) { it.stream.namespace = defaultNamespace } + // The refreshes project is the beginning of the end for OVERWRITE syncs. + // The sync mode still exists, but we are fully dependent on min_generation to trigger + // overwrite logic. + if (it.destinationSyncMode == DestinationSyncMode.OVERWRITE) { + it.destinationSyncMode = DestinationSyncMode.APPEND + } } // this code is bad and I feel bad @@ -122,9 +130,9 @@ constructor( @VisibleForTesting fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig { if (stream.generationId == null) { - stream.generationId = 0 - stream.minimumGenerationId = 0 - stream.syncId = 0 + throw ConfigErrorException( + "You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0" + ) } if ( stream.minimumGenerationId != 0.toLong() &&