Skip to content

Commit

Permalink
split DestinationAcceptanceTest
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Oct 9, 2024
1 parent 48a9e9e commit a4bfb99
Show file tree
Hide file tree
Showing 5 changed files with 2,044 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ package io.airbyte.cdk.integrations.standardtest.destination

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.google.common.collect.ImmutableMap
import com.google.common.collect.Lists
import com.google.common.collect.Sets
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.standardtest.destination.*
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataArgumentsProvider
import io.airbyte.cdk.integrations.standardtest.destination.RecordBasedDestinationAcceptanceTestInterface.Companion.JOB_ATTEMPT
import io.airbyte.cdk.integrations.standardtest.destination.RecordBasedDestinationAcceptanceTestInterface.Companion.JOB_ID
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.util.ArgumentProviderUtil
import io.airbyte.cdk.integrations.standardtest.destination.comparator.BasicTestDataComparator
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator
import io.airbyte.commons.features.EnvVariableFeatureFlags
Expand All @@ -25,34 +23,21 @@ import io.airbyte.commons.lang.Exceptions
import io.airbyte.commons.resources.MoreResources
import io.airbyte.commons.util.MoreIterators
import io.airbyte.configoss.JobGetSpecConfig
import io.airbyte.configoss.OperatorDbt
import io.airbyte.configoss.StandardCheckConnectionInput
import io.airbyte.configoss.StandardCheckConnectionOutput
import io.airbyte.configoss.WorkerDestinationConfig
import io.airbyte.protocol.models.Field
import io.airbyte.protocol.models.JsonSchemaType
import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConnectorSpecification
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.airbyte.protocol.models.v0.SyncMode
import io.airbyte.workers.exception.TestHarnessException
import io.airbyte.workers.general.DbtTransformationRunner
import io.airbyte.workers.general.DefaultCheckConnectionTestHarness
import io.airbyte.workers.general.DefaultGetSpecTestHarness
import io.airbyte.workers.helper.ConnectorConfigUpdater
import io.airbyte.workers.helper.EntrypointEnvChecker
import io.airbyte.workers.internal.AirbyteDestination
import io.airbyte.workers.internal.DefaultAirbyteDestination
import io.airbyte.workers.normalization.DefaultNormalizationRunner
Expand All @@ -66,29 +51,18 @@ import java.io.UncheckedIOException
import java.net.URISyntaxException
import java.nio.file.Files
import java.nio.file.Path
import java.time.Instant
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetTime
import java.time.ZoneOffset
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer
import java.util.stream.Stream
import kotlin.test.assertNotNull
import org.junit.jupiter.api.*
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.ArgumentsProvider
import org.junit.jupiter.params.provider.ArgumentsSource
import org.mockito.Mockito

private val LOGGER = KotlinLogging.logger {}

abstract class BaseDestinationAcceptanceTest(
abstract class AbstractDestinationAcceptanceTest(
// If false, ignore counts and only verify the final state message.
private val verifyIndividualStateAndCounts: Boolean = false,
protected val useV2Fields: Boolean = false,
Expand Down Expand Up @@ -914,36 +888,7 @@ abstract class BaseDestinationAcceptanceTest(
val supportNumberInfinity: Boolean = false
)

class NamespaceTestCaseProvider : ArgumentsProvider {
@Throws(Exception::class)
override fun provideArguments(context: ExtensionContext): Stream<out Arguments> {
val testCases = Jsons.deserialize(MoreResources.readResource(NAMESPACE_TEST_CASES_JSON))
return MoreIterators.toList(testCases.elements())
.filter { testCase: JsonNode -> testCase["enabled"].asBoolean() }
.map { testCase: JsonNode ->
val namespaceInCatalog =
TestingNamespaces.generate(testCase["namespace"].asText())
val namespaceInDst =
TestingNamespaces.generateFromOriginal(
namespaceInCatalog,
testCase["namespace"].asText(),
testCase["normalized"].asText()
)
Arguments.of(
testCase["id"]
.asText(), // Add uniqueness to namespace to avoid collisions between
// tests.
namespaceInCatalog,
namespaceInDst
)
}
.stream()
}

companion object {
const val NAMESPACE_TEST_CASES_JSON: String = "namespace_test_cases.json"
}
}

protected fun supportsNormalization(): Boolean {
return supportsInDestinationNormalization() || normalizationFromDefinition()
Expand All @@ -955,10 +900,7 @@ abstract class BaseDestinationAcceptanceTest(
private val RANDOM = Random()
private const val NORMALIZATION_VERSION = "dev"

@JvmStatic
protected val JOB_ID = "0"
@JvmStatic
protected val JOB_ATTEMPT = 0


/**
* Reverses a list by creating a new list with the same elements of the input list and then
Expand Down Expand Up @@ -1036,61 +978,5 @@ abstract class BaseDestinationAcceptanceTest(
}
}
}



@JvmStatic
protected val specialNumericTypesSupportTest: SpecialNumericTypes
/**
* NaN and Infinity test are not supported by default. Please override this method to
* specify NaN/Infinity types support example:
*
* <pre>
*
* protected SpecialNumericTypes getSpecialNumericTypesSupportTest() { return
* SpecialNumericTypes.builder() .supportNumberNan(true) .supportIntegerNan(true)
* .build(); } </pre> *
*
* @return SpecialNumericTypes with support flags
*/
get() = SpecialNumericTypes()

@JvmStatic
@Throws(IOException::class)
protected fun readCatalogFromFile(catalogFilename: String): AirbyteCatalog {
return Jsons.deserialize(
MoreResources.readResource(catalogFilename),
AirbyteCatalog::class.java
)
}

@JvmStatic
@Throws(IOException::class)
protected fun readMessagesFromFile(messagesFilename: String): List<AirbyteMessage> {
return MoreResources.readResource(messagesFilename).trim().lines().map {
Jsons.deserialize(it, AirbyteMessage::class.java)
}
}

/** Mutate the input airbyte record message namespace. */
@JvmStatic
protected fun getRecordMessagesWithNewNamespace(
airbyteMessages: List<AirbyteMessage>,
namespace: String?
): List<AirbyteMessage> {
airbyteMessages.forEach(
Consumer { message: AirbyteMessage ->
if (message.record != null) {
message.record.namespace = namespace
}
}
)
return airbyteMessages
}

@JvmStatic
protected fun <V0, V1> convertProtocolObject(v1: V1, klass: Class<V0>): V0 {
return Jsons.`object`(Jsons.jsonNode(v1), klass)!!
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class DestinationAcceptanceTest(
expectNumericTimestamps: Boolean = false,
expectSchemalessObjectsCoercedToStrings: Boolean = false,
expectUnionsPromotedToDisjointRecords: Boolean = false
): BaseDestinationAcceptanceTest(
): AbstractDestinationAcceptanceTest(
verifyIndividualStateAndCounts = verifyIndividualStateAndCounts,
useV2Fields = useV2Fields,
supportsChangeCapture = supportsChangeCapture,
Expand All @@ -62,7 +62,7 @@ abstract class DestinationAcceptanceTest(

/**
* Verify that when given valid credentials, that check connection returns a success response.
* Assume that the [BaseDestinationAcceptanceTest.getConfig] is valid.
* Assume that the [AbstractDestinationAcceptanceTest.getConfig] is valid.
*/
@Test
@Throws(Exception::class)
Expand All @@ -75,7 +75,7 @@ abstract class DestinationAcceptanceTest(

/**
* Verify that when given invalid credentials, that check connection returns a failed response.
* Assume that the [BaseDestinationAcceptanceTest.getFailCheckConfig] is invalid.
* Assume that the [AbstractDestinationAcceptanceTest.getFailCheckConfig] is invalid.
*/
@Test
@Throws(Exception::class)
Expand Down
Loading

0 comments on commit a4bfb99

Please sign in to comment.