Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple split of DestinationAcceptanceTest #46689

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest|
| 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty|
| 0.47.1 | 2024-09-27 | [\#45397](https://github.com/airbytehq/airbyte/pull/45397) | Allow logical replication from Postgres 16 read-replicas|
| 0.47.0 | 2024-09-26 | [\#42030](https://github.com/airbytehq/airbyte/pull/42030) | minor refactor |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.47.2
version=0.47.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.standardtest.destination

import com.fasterxml.jackson.databind.JsonNode
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.commons.features.EnvVariableFeatureFlags
import io.airbyte.commons.features.FeatureFlags
import io.airbyte.commons.features.FeatureFlagsWrapper
import io.airbyte.commons.lang.Exceptions
import io.airbyte.configoss.WorkerDestinationConfig
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.workers.helper.ConnectorConfigUpdater
import io.airbyte.workers.internal.AirbyteDestination
import io.airbyte.workers.internal.DefaultAirbyteDestination
import io.airbyte.workers.process.AirbyteIntegrationLauncher
import io.airbyte.workers.process.DockerProcessFactory
import io.airbyte.workers.process.ProcessFactory
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Files
import java.nio.file.Path
import java.util.*
import java.util.function.Consumer
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.mockito.Mockito

private val LOGGER = KotlinLogging.logger {}

@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION")
abstract class BaseDestinationAcceptanceTest(
// If false, ignore counts and only verify the final state message.
protected val verifyIndividualStateAndCounts: Boolean = false,
) {
protected lateinit var processFactory: ProcessFactory
private set
protected lateinit var jobRoot: Path
private set
protected var localRoot: Path? = null
private set
protected lateinit var testEnv: DestinationAcceptanceTest.TestDestinationEnv
private set
protected var fileTransferMountSource: Path? = null
private set
protected open val supportsFileTransfer: Boolean = false
protected var testSchemas: HashSet<String> = HashSet()
protected lateinit var mConnectorConfigUpdater: ConnectorConfigUpdater
private set
protected open val isCloudTest: Boolean = true
protected val featureFlags: FeatureFlags =
if (isCloudTest) {
FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "CLOUD")
} else {
FeatureFlagsWrapper.overridingDeploymentMode(EnvVariableFeatureFlags(), "OSS")
}
protected abstract val imageName: String
/**
* Name of the docker image that the tests will run against.
*
* @return docker image name
*/
get

/**
* Configuration specific to the integration. Will be passed to integration where appropriate in
* each test. Should be valid.
*
* @return integration-specific configuration
*/
@Throws(Exception::class) protected abstract fun getConfig(): JsonNode

protected open fun supportsInDestinationNormalization(): Boolean {
return false
}

protected fun inDestinationNormalizationFlags(shouldNormalize: Boolean): Map<String, String> {
if (shouldNormalize && supportsInDestinationNormalization()) {
return java.util.Map.of("NORMALIZATION_TECHNIQUE", "LEGACY")
}
return emptyMap()
}

protected fun getDestinationConfig(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
): WorkerDestinationConfig {
return WorkerDestinationConfig()
.withConnectionId(UUID.randomUUID())
.withCatalog(
DestinationAcceptanceTest.convertProtocolObject(
catalog,
io.airbyte.protocol.models.ConfiguredAirbyteCatalog::class.java
)
)
.withDestinationConnectionConfiguration(config)
}

protected fun runSyncAndVerifyStateOutput(
config: JsonNode,
messages: List<AirbyteMessage>,
catalog: ConfiguredAirbyteCatalog,
runNormalization: Boolean,
) {
runSyncAndVerifyStateOutput(
config,
messages,
catalog,
runNormalization,
imageName,
verifyIndividualStateAndCounts
)
}

@Throws(Exception::class)
protected fun runSyncAndVerifyStateOutput(
config: JsonNode,
messages: List<AirbyteMessage>,
catalog: ConfiguredAirbyteCatalog,
runNormalization: Boolean,
imageName: String,
verifyIndividualStateAndCounts: Boolean
) {
val destinationOutput = runSync(config, messages, catalog, runNormalization, imageName)

var expected = messages.filter { it.type == AirbyteMessage.Type.STATE }
var actual = destinationOutput.filter { it.type == AirbyteMessage.Type.STATE }

if (verifyIndividualStateAndCounts) {
/* Collect the counts and add them to each expected state message */
val stateToCount = mutableMapOf<JsonNode, Int>()
messages.fold(0) { acc, message ->
if (message.type == AirbyteMessage.Type.STATE) {
stateToCount[message.state.global.sharedState] = acc
0
} else {
acc + 1
}
}

expected.forEach { message ->
val clone = message.state
clone.destinationStats =
AirbyteStateStats()
.withRecordCount(stateToCount[clone.global.sharedState]!!.toDouble())
message.state = clone
}
} else {
/* Null the stats and collect only the final messages */
val finalActual =
actual.lastOrNull()
?: throw IllegalArgumentException(
"All message sets used for testing should include a state record"
)
val clone = finalActual.state
clone.destinationStats = null
finalActual.state = clone

expected = listOf(expected.last())
actual = listOf(finalActual)
}

Assertions.assertEquals(expected, actual)
}

@Throws(Exception::class)
open protected fun runSync(
config: JsonNode,
messages: List<AirbyteMessage>,
catalog: ConfiguredAirbyteCatalog,
runNormalization: Boolean,
imageName: String,
): List<AirbyteMessage> {
val destinationConfig = getDestinationConfig(config, catalog)
return runSync(messages, runNormalization, imageName, destinationConfig)
}

@Throws(Exception::class)
protected fun runSync(
messages: List<AirbyteMessage>,
runNormalization: Boolean,
imageName: String,
destinationConfig: WorkerDestinationConfig,
): List<AirbyteMessage> {
val destination = getDestination(imageName)

destination.start(
destinationConfig,
jobRoot,
inDestinationNormalizationFlags(runNormalization)
)
messages.forEach(
Consumer { message: AirbyteMessage ->
Exceptions.toRuntime {
destination.accept(
DestinationAcceptanceTest.convertProtocolObject(
message,
io.airbyte.protocol.models.AirbyteMessage::class.java
)
)
}
}
)
destination.notifyEndOfInput()

val destinationOutput: MutableList<AirbyteMessage> = ArrayList()
while (!destination.isFinished()) {
destination.attemptRead().ifPresent {
destinationOutput.add(
DestinationAcceptanceTest.convertProtocolObject(it, AirbyteMessage::class.java)
)
}
}

destination.close()

return destinationOutput
}

protected fun getDestination(imageName: String): AirbyteDestination {
return DefaultAirbyteDestination(
integrationLauncher =
AirbyteIntegrationLauncher(
DestinationAcceptanceTest.JOB_ID,
DestinationAcceptanceTest.JOB_ATTEMPT,
imageName,
processFactory,
null,
null,
false,
featureFlags
)
)
}

@BeforeEach
@Throws(Exception::class)
open fun setUpInternal() {
val testDir = Path.of("/tmp/airbyte_tests/")
Files.createDirectories(testDir)
val workspaceRoot = Files.createTempDirectory(testDir, "test")
jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job"))
localRoot = Files.createTempDirectory(testDir, "output")
LOGGER.info { "${"jobRoot: {}"} $jobRoot" }
LOGGER.info { "${"localRoot: {}"} $localRoot" }
testEnv = DestinationAcceptanceTest.TestDestinationEnv(localRoot)
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java)
testSchemas = HashSet()
setup(testEnv, testSchemas)
fileTransferMountSource =
if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null

processFactory =
DockerProcessFactory(
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
"host",
getConnectorEnv()
)
}

/**
* Function that performs any setup of external resources required for the test. e.g.
* instantiate a postgres database. This function will be called before EACH test.
*
* @param testEnv
* - information about the test environment.
* @param TEST_SCHEMAS
* @throws Exception
* - can throw any exception, test framework will handle.
*/
@Throws(Exception::class)
protected abstract fun setup(
testEnv: DestinationAcceptanceTest.TestDestinationEnv,
TEST_SCHEMAS: HashSet<String>
)

open fun getConnectorEnv(): Map<String, String> {
return emptyMap()
}
}
Loading
Loading