Skip to content

Commit

Permalink
Allow us to pass a map of environment variables within our BasicFunct…
Browse files Browse the repository at this point in the history
…ionalityIntegrationTest class (#50427)

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
frifriSF59 and octavia-squidington-iii authored Dec 27, 2024
1 parent c519289 commit b65ebc2
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MockBasicFunctionalityIntegrationTest :
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
envVars = emptyMap(),
supportFileTransfer = false,
) {
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,15 @@ abstract class IntegrationTest(
messages: List<InputMessage>,
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
): List<AirbyteMessage> =
runSync(
configContents,
DestinationCatalog(listOf(stream)),
messages,
streamStatus,
useFileTransfer
useFileTransfer,
envVars
)

/**
Expand Down Expand Up @@ -170,13 +172,15 @@ abstract class IntegrationTest(
*/
streamStatus: AirbyteStreamStatus? = AirbyteStreamStatus.COMPLETE,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
): List<AirbyteMessage> {
val destination =
destinationProcessFactory.createDestinationProcess(
"write",
configContents,
catalog.asProtocolObject(),
useFileTransfer = useFileTransfer,
envVars = envVars,
)
return runBlocking(Dispatchers.IO) {
launch { destination.run() }
Expand Down Expand Up @@ -212,13 +216,15 @@ abstract class IntegrationTest(
inputStateMessage: StreamCheckpoint,
allowGracefulShutdown: Boolean,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
): AirbyteStateMessage {
val destination =
destinationProcessFactory.createDestinationProcess(
"write",
configContents,
DestinationCatalog(listOf(stream)).asProtocolObject(),
useFileTransfer,
envVars
)
return runBlocking(Dispatchers.IO) {
launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ abstract class DestinationProcessFactory {
configContents: String? = null,
catalog: ConfiguredAirbyteCatalog? = null,
useFileTransfer: Boolean = false,
envVars: Map<String, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
): DestinationProcess

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class DockerizedDestination(
catalog: ConfiguredAirbyteCatalog?,
private val testName: String,
useFileTransfer: Boolean,
envVars: Map<String, String>,
vararg featureFlags: FeatureFlag,
) : DestinationProcess {
private val process: Process
Expand Down Expand Up @@ -94,6 +95,8 @@ class DockerizedDestination(
val containerName = "$shortImageName-$command-$randomSuffix"
logger.info { "Creating docker container $containerName" }
logger.info { "File transfer ${if (useFileTransfer) "is " else "isn't"} enabled" }
val additionalEnvEntries = envVars.flatMap { (key, value) -> listOf("-e", "$key=$value") }
logger.info { "Env vars: $envVars loaded" }
val cmd: MutableList<String> =
(listOf(
"docker",
Expand All @@ -120,6 +123,7 @@ class DockerizedDestination(
"-e",
"USE_FILE_TRANSFER=$useFileTransfer",
) +
additionalEnvEntries +
featureFlags.flatMap { listOf("-e", it.envVarBindingDeclaration) } +
listOf(
// Yes, we hardcode the job ID to 0.
Expand Down Expand Up @@ -275,6 +279,7 @@ class DockerizedDestinationFactory(
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
useFileTransfer: Boolean,
envVars: Map<String, String>,
vararg featureFlags: FeatureFlag,
): DestinationProcess {
return DockerizedDestination(
Expand All @@ -284,6 +289,7 @@ class DockerizedDestinationFactory(
catalog,
testName,
useFileTransfer,
envVars,
*featureFlags,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.File
import java.io.PipedInputStream
import java.io.PipedOutputStream
Expand All @@ -23,11 +24,14 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertFalse

private val logger = KotlinLogging.logger {}

class NonDockerizedDestination(
command: String,
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
useFileTransfer: Boolean,
envVars: Map<String, String>,
vararg featureFlags: FeatureFlag,
) : DestinationProcess {
private val destinationStdinPipe: PrintWriter
Expand All @@ -41,6 +45,9 @@ class NonDockerizedDestination(
private val file = File("/tmp/test_file")

init {
envVars.forEach { (key, value) -> IntegrationTest.nonDockerMockEnvVars.set(key, value) }
logger.info { "Env vars: $envVars loaded" }

if (useFileTransfer) {
IntegrationTest.nonDockerMockEnvVars.set("USE_FILE_TRANSFER", "true")
val fileContentStr = "123"
Expand Down Expand Up @@ -118,6 +125,7 @@ class NonDockerizedDestinationFactory : DestinationProcessFactory() {
configContents: String?,
catalog: ConfiguredAirbyteCatalog?,
useFileTransfer: Boolean,
envVars: Map<String, String>,
vararg featureFlags: FeatureFlag,
): DestinationProcess {
// TODO pass test name into the destination process
Expand All @@ -126,6 +134,7 @@ class NonDockerizedDestinationFactory : DestinationProcessFactory() {
configContents,
catalog,
useFileTransfer,
envVars,
*featureFlags
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ abstract class BasicFunctionalityIntegrationTest(
val promoteUnionToObject: Boolean,
val preserveUndeclaredFields: Boolean,
val supportFileTransfer: Boolean,
val envVars: Map<String, String>,
/**
* Whether the destination commits new data when it receives a non-`COMPLETE` stream status. For
* example:
Expand Down Expand Up @@ -170,7 +171,8 @@ abstract class BasicFunctionalityIntegrationTest(
blob = """{"foo": "bar"}""",
sourceRecordCount = 1,
)
)
),
envVars = envVars,
)

val stateMessages = messages.filter { it.type == AirbyteMessage.Type.STATE }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DevNullBasicFunctionalityIntegrationTest :
preserveUndeclaredFields = false,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
envVars = emptyMap(),
supportFileTransfer = false,
) {
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test
abstract class IcebergV2WriteTest(
configContents: String,
destinationCleaner: DestinationCleaner,
envVars: Map<String, String> = emptyMap(),
) :
BasicFunctionalityIntegrationTest(
configContents,
Expand All @@ -37,6 +38,7 @@ abstract class IcebergV2WriteTest(
preserveUndeclaredFields = false,
commitDataIncrementally = false,
supportFileTransfer = false,
envVars = envVars,
allTypesBehavior = StronglyTyped(),
nullEqualsUnset = true,
) {
Expand Down Expand Up @@ -80,11 +82,16 @@ class IcebergGlueWriteTest :
IcebergV2TestUtil.getCatalog(
IcebergV2TestUtil.parseConfig(IcebergV2TestUtil.GLUE_CONFIG_PATH)
)
),
)
) {
@Test
@Disabled("dest iceberge-v2 doesn't support unknown types")
override fun testUnknownTypes() {}

@Test
override fun testBasicWrite() {
super.testBasicWrite()
}
}

@Disabled(
Expand All @@ -96,6 +103,7 @@ class IcebergNessieMinioWriteTest :
// we're writing to ephemeral testcontainers, so no need to clean up after ourselves
NoopDestinationCleaner
) {

companion object {
private fun getToken(): String {
val client = OkHttpClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ abstract class S3V2WriteTest(
allTypesBehavior: AllTypesBehavior,
nullEqualsUnset: Boolean = false,
failOnUnknownTypes: Boolean = false,
envVars: Map<String, String> = emptyMap(),
) :
BasicFunctionalityIntegrationTest(
S3V2TestUtils.getConfig(path),
Expand All @@ -42,6 +43,7 @@ abstract class S3V2WriteTest(
allTypesBehavior = allTypesBehavior,
nullEqualsUnset = nullEqualsUnset,
supportFileTransfer = true,
envVars = envVars,
failOnUnknownTypes = failOnUnknownTypes,
) {
@Disabled("Irrelevant for file destinations")
Expand Down

0 comments on commit b65ebc2

Please sign in to comment.