Skip to content

Commit

Permalink
destination-snowflake: get tests to pass - durably (#45370)
Browse files Browse the repository at this point in the history
### TL;DR

Make destination-snowflake pass all tests

### What changed?

- Updated CDK version to 0.45.0
- Reduced JUnit method execution timeout to 20 minutes
- Improved error handling in SnowflakeDestination's main function
- Enhanced error message for invalid permissions in integration test
- Implemented a more robust cleanup process for Airbyte internal tables and schemas
- Removed unused Batch and LocalFileBatch classes
- Not in the PR: I also deleted about 5k tables and 2k schemas, which were making our tests run slower than necessary. The cleanup logic will automate those cleanups. 

### How to test?

1. Run integration tests for the Snowflake destination connector
2. Verify that the new error message is displayed when testing with invalid permissions
3. Check that the cleanup process removes old tables and schemas as expected
4. Ensure that all existing functionality remains intact

### Why make this change?

These changes aim to improve the reliability and maintainability of the Snowflake destination connector. The updated CDK version and reduced test timeout should lead to faster and more efficient testing. The enhanced error handling and cleanup processes will help in identifying issues more quickly and keeping the test environment clean. Removing unused classes reduces code clutter and improves overall code quality.
  • Loading branch information
stephane-airbyte authored Sep 18, 2024
1 parent 66a999c commit e931c2a
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.19'
cdkVersionRequired = '0.45.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
JunitMethodExecutionTimeout=30 m
JunitMethodExecutionTimeout=20 m
testExecutionConcurrency=2
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.11.11
dockerImageTag: 3.11.12
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object SnowflakeDatabaseUtils {
private const val IP_NOT_IN_WHITE_LIST_ERR_MSG = "not allowed to access Snowflake"

@JvmStatic
fun createDataSource(config: JsonNode, airbyteEnvironment: String?): HikariDataSource {
fun createDataSource(config: JsonNode, airbyteEnvironment: String?): DataSource {

val dataSource = HikariDataSource()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,19 +316,17 @@ constructor(
}

fun main(args: Array<String>) {
IntegrationRunner.addOrphanedThreadFilter { t: Thread ->
if (IntegrationRunner.getThreadCreationInfo(t) != null) {
for (stackTraceElement in IntegrationRunner.getThreadCreationInfo(t)!!.stack) {
val stackClassName = stackTraceElement.className
val stackMethodName = stackTraceElement.methodName
if (
SFStatement::class.java.canonicalName == stackClassName &&
"close" == stackMethodName ||
SFSession::class.java.canonicalName == stackClassName &&
"callHeartBeatWithQueryTimeout" == stackMethodName
) {
return@addOrphanedThreadFilter false
}
IntegrationRunner.addOrphanedThreadFilter { threadInfo: IntegrationRunner.OrphanedThreadInfo ->
for (stackTraceElement in threadInfo.threadCreationInfo.stack) {
val stackClassName = stackTraceElement.className
val stackMethodName = stackTraceElement.methodName
if (
SFStatement::class.java.canonicalName == stackClassName &&
"close" == stackMethodName ||
SFSession::class.java.canonicalName == stackClassName &&
"callHeartBeatWithQueryTimeout" == stackMethodName
) {
return@addOrphanedThreadFilter false
}
}
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@ internal class SnowflakeDestinationIntegrationTest {
@Throws(Exception::class)
fun testCheckFailsWithInvalidPermissions() {
// TODO(sherifnada) this test case is assumes config.json does not have permission to access
// the
// schema
// the schema RESTRICTED_SCHEMA was created by the user AIRBYTETESTER, and then permissions
// were removed with
// 'REVOKE ALL ON SCHEMA restricted_schema FROM ROLE integration_tester_destination;'
// this connector should be updated with multiple credentials, each with a clear purpose
// (valid,
// invalid: insufficient permissions, invalid: wrong password, etc..)
// (valid, invalid: insufficient permissions, invalid: wrong password, etc..)
val credentialsJsonString = deserialize(Files.readString(Paths.get("secrets/config.json")))
val check =
SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString)
Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, check!!.status)
Assertions.assertEquals(
"Could not connect with provided configuration. Encountered Error with Snowflake Configuration: " +
"Current role does not have permissions on the target schema please verify your privileges",
check.message
)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping
import com.fasterxml.jackson.databind.JsonNode
import com.google.common.collect.ImmutableMap
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.db.factory.DataSourceFactory
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.cdk.integrations.base.JavaBaseConstants
Expand Down Expand Up @@ -1857,20 +1856,5 @@ abstract class AbstractSnowflakeSqlGeneratorIntegrationTest :
private var dataSource: DataSource =
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
private var database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(dataSource)

@JvmStatic
@BeforeAll
fun setupSnowflake(): Unit {
dataSource =
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
database = SnowflakeDatabaseUtils.getDatabase(dataSource)
}

@JvmStatic
@AfterAll
@Throws(Exception::class)
fun teardownSnowflake(): Unit {
DataSourceFactory.close(dataSource)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import io.airbyte.workers.exception.TestHarnessException
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.sql.SQLException
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
import kotlin.concurrent.Volatile
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
Expand Down Expand Up @@ -56,7 +60,7 @@ abstract class AbstractSnowflakeTypingDedupingTest(
dataSource =
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
database = SnowflakeDatabaseUtils.getDatabase(dataSource)
cleanAirbyteInternalTable(databaseName, database, forceUppercaseIdentifiers)
cleanAirbyteInternalTable(database)
return config
}

Expand Down Expand Up @@ -419,48 +423,101 @@ abstract class AbstractSnowflakeTypingDedupingTest(
"_AIRBYTE_GENERATION_ID",
)

@Volatile private var cleanedAirbyteInternalTable = false
private val cleanedAirbyteInternalTable = AtomicBoolean(false)
private val threadId = AtomicInteger(0)

@Throws(SQLException::class)
private fun cleanAirbyteInternalTable(
databaseName: String,
database: JdbcDatabase?,
forceUppercase: Boolean,
) {
if (!cleanedAirbyteInternalTable) {
synchronized(AbstractSnowflakeTypingDedupingTest::class.java) {
if (!cleanedAirbyteInternalTable) {
val destinationStateTableExists =
database!!.executeMetadataQuery {
it.getTables(
databaseName,
if (forceUppercase) {
"AIRBYTE_INTERNAL"
} else {
"airbyte_internal"
},
if (forceUppercase) {
"_AIRBYTE_DESTINATION_STATE"
} else {
"_airbyte_destination_state"
},
null
)
.next()
}
if (destinationStateTableExists) {
database.execute(
"""DELETE FROM "airbyte_internal"."_airbyte_destination_state" WHERE "updated_at" < current_date() - 7""",
private fun cleanAirbyteInternalTable(database: JdbcDatabase?) {
if (
database!!
.queryJsons("SHOW PARAMETERS LIKE 'QUOTED_IDENTIFIERS_IGNORE_CASE';")
.first()
.get("value")
.asText()
.toBoolean()
) {
return
}

if (!cleanedAirbyteInternalTable.getAndSet(true)) {
val cleanupCutoffHours = 6
LOGGER.info { "tableCleaner running" }
val executor =
Executors.newSingleThreadExecutor {
val thread = Executors.defaultThreadFactory().newThread(it)
thread.name =
"airbyteInternalTableCleanupThread-${threadId.incrementAndGet()}"
thread.isDaemon = true
thread
}
executor.execute {
database.execute(
"DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"updated_at\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())",
)
}
executor.execute {
database.execute(
"DELETE FROM \"AIRBYTE_INTERNAL\".\"_AIRBYTE_DESTINATION_STATE\" WHERE \"UPDATED_AT\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())",
)
}
executor.execute {
val schemaList =
database.queryJsons(
"SHOW SCHEMAS IN DATABASE INTEGRATION_TEST_DESTINATION;",
)
LOGGER.info(
"tableCleaner found ${schemaList.size} schemas in database INTEGRATION_TEST_DESTINATION"
)
schemaList
.associate {
it.get("name").asText() to Instant.parse(it.get("created_on").asText())
}
.filter {
it.value.isBefore(
Instant.now().minus(cleanupCutoffHours.toLong(), ChronoUnit.HOURS)
)
}
cleanedAirbyteInternalTable = true
.filter {
it.key.startsWith("SQL_GENERATOR", ignoreCase = true) ||
it.key.startsWith("TDTEST", ignoreCase = true) ||
it.key.startsWith("TYPING_DEDUPING", ignoreCase = true)
}
.forEach {
executor.execute {
database.execute(
"DROP SCHEMA INTEGRATION_TEST_DESTINATION.\"${it.key}\" /* created at ${it.value} */;"
)
}
}
}
for (schemaName in
listOf("AIRBYTE_INTERNAL", "airbyte_internal", "overridden_raw_dataset")) {
executor.execute {
val sql =
"SHOW TABLES IN schema INTEGRATION_TEST_DESTINATION.\"$schemaName\";"
val tableList = database.queryJsons(sql)
LOGGER.info {
"tableCleaner found ${tableList.size} tables in schema $schemaName"
}
tableList
.associate {
it.get("name").asText() to
Instant.parse(it.get("created_on").asText())
}
.filter {
it.value.isBefore(Instant.now().minus(6, ChronoUnit.HOURS)) &&
it.key.startsWith("TDTEST", ignoreCase = true)
}
.forEach {
executor.execute {
database.execute(
"DROP TABLE INTEGRATION_TEST_DESTINATION.\"$schemaName\".\"${it.key}\" /* created at ${it.value} */;"
)
}
}
}
}
}
}
}
}

open class Batch(val name: String)

class LocalFileBatch(name: String) : Batch(name)
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ desired namespace.

| Version | Date | Pull Request | Subject |
| :-------------- | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.11.12 | 2024-09-12 | [45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter |
| 3.11.11 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 3.11.10 | 2024-08-22 | [\#44526](https://github.com/airbytehq/airbyte/pull/44526) | Revert protocol compliance fix |
| 3.11.9 | 2024-08-19 | [\#43367](https://github.com/airbytehq/airbyte/pull/43367) | Add opt in using MERGE statement for upserts and deletes |
Expand Down

0 comments on commit e931c2a

Please sign in to comment.