From e931c2add74fb92baad14d0377f3d70176637d89 Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Wed, 18 Sep 2024 11:19:13 -0700 Subject: [PATCH] destination-snowflake: get tests to pass - durably (#45370) ### TL;DR Make destination-snowflake pass all tests ### What changed? - Updated CDK version to 0.45.0 - Reduced JUnit method execution timeout to 20 minutes - Improved error handling in SnowflakeDestination's main function - Enhanced error message for invalid permissions in integration test - Implemented a more robust cleanup process for Airbyte internal tables and schemas - Removed unused Batch and LocalFileBatch classes - Not in the PR: I also deleted about 5k tables and 2k schemas, which were making our tests run slower than necessary. The cleanup logic will automate those cleanups. ### How to test? 1. Run integration tests for the Snowflake destination connector 2. Verify that the new error message is displayed when testing with invalid permissions 3. Check that the cleanup process removes old tables and schemas as expected 4. Ensure that all existing functionality remains intact ### Why make this change? These changes aim to improve the reliability and maintainability of the Snowflake destination connector. The updated CDK version and reduced test timeout should lead to faster and more efficient testing. The enhanced error handling and cleanup processes will help in identifying issues more quickly and keeping the test environment clean. Removing unused classes reduces code clutter and improves overall code quality. --- .../destination-snowflake/build.gradle | 2 +- .../destination-snowflake/gradle.properties | 3 +- .../destination-snowflake/metadata.yaml | 2 +- .../snowflake/SnowflakeDatabaseUtils.kt | 2 +- .../snowflake/SnowflakeDestination.kt | 24 ++-- .../SnowflakeDestinationIntegrationTest.kt | 13 +- ...actSnowflakeSqlGeneratorIntegrationTest.kt | 16 --- .../AbstractSnowflakeTypingDedupingTest.kt | 131 +++++++++++++----- docs/integrations/destinations/snowflake.md | 1 + 9 files changed, 120 insertions(+), 74 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index efa6fbd2dd74..df64c083f272 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.44.19' + cdkVersionRequired = '0.45.0' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/gradle.properties b/airbyte-integrations/connectors/destination-snowflake/gradle.properties index 061eb2c399d9..666ff08b0834 100644 --- a/airbyte-integrations/connectors/destination-snowflake/gradle.properties +++ b/airbyte-integrations/connectors/destination-snowflake/gradle.properties @@ -1 +1,2 @@ -JunitMethodExecutionTimeout=30 m +JunitMethodExecutionTimeout=20 m +testExecutionConcurrency=2 diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 99d68c9cb339..ea0985e32869 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.11.11 + dockerImageTag: 3.11.12 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt index cb8ce1f2ffd2..f0e70884fea5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDatabaseUtils.kt @@ -57,7 +57,7 @@ object SnowflakeDatabaseUtils { private const val IP_NOT_IN_WHITE_LIST_ERR_MSG = "not allowed to access Snowflake" @JvmStatic - fun createDataSource(config: JsonNode, airbyteEnvironment: String?): HikariDataSource { + fun createDataSource(config: JsonNode, airbyteEnvironment: String?): DataSource { val dataSource = HikariDataSource() diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt index 3cf78b14c453..aebb0389d9ef 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.kt @@ -316,19 +316,17 @@ constructor( } fun main(args: Array) { - IntegrationRunner.addOrphanedThreadFilter { t: Thread -> - if (IntegrationRunner.getThreadCreationInfo(t) != null) { - for (stackTraceElement in IntegrationRunner.getThreadCreationInfo(t)!!.stack) { - val stackClassName = stackTraceElement.className - val stackMethodName = stackTraceElement.methodName - if ( - SFStatement::class.java.canonicalName == stackClassName && - "close" == stackMethodName || - SFSession::class.java.canonicalName == stackClassName && - "callHeartBeatWithQueryTimeout" == stackMethodName - ) { - return@addOrphanedThreadFilter false - } + IntegrationRunner.addOrphanedThreadFilter { threadInfo: IntegrationRunner.OrphanedThreadInfo -> + for (stackTraceElement in threadInfo.threadCreationInfo.stack) { + val stackClassName = stackTraceElement.className + val stackMethodName = stackTraceElement.methodName + if ( + SFStatement::class.java.canonicalName == stackClassName && + "close" == stackMethodName || + SFSession::class.java.canonicalName == stackClassName && + "callHeartBeatWithQueryTimeout" == stackMethodName + ) { + return@addOrphanedThreadFilter false } } true diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt index 38be9827db01..baca1c92752b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.kt @@ -33,15 +33,20 @@ internal class SnowflakeDestinationIntegrationTest { @Throws(Exception::class) fun testCheckFailsWithInvalidPermissions() { // TODO(sherifnada) this test case is assumes config.json does not have permission to access - // the - // schema + // the schema RESTRICTED_SCHEMA was created by the user AIRBYTETESTER, and then permissions + // were removed with + // 'REVOKE ALL ON SCHEMA restricted_schema FROM ROLE integration_tester_destination;' // this connector should be updated with multiple credentials, each with a clear purpose - // (valid, - // invalid: insufficient permissions, invalid: wrong password, etc..) + // (valid, invalid: insufficient permissions, invalid: wrong password, etc..) val credentialsJsonString = deserialize(Files.readString(Paths.get("secrets/config.json"))) val check = SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString) Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, check!!.status) + Assertions.assertEquals( + "Could not connect with provided configuration. Encountered Error with Snowflake Configuration: " + + "Current role does not have permissions on the target schema please verify your privileges", + check.message + ) } @Test diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt index 0ff0dab36112..b5cf7aa14f7f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeSqlGeneratorIntegrationTest.kt @@ -6,7 +6,6 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping import com.fasterxml.jackson.databind.JsonNode import com.google.common.collect.ImmutableMap import edu.umd.cs.findbugs.annotations.SuppressFBWarnings -import io.airbyte.cdk.db.factory.DataSourceFactory import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.db.jdbc.JdbcUtils import io.airbyte.cdk.integrations.base.JavaBaseConstants @@ -1857,20 +1856,5 @@ abstract class AbstractSnowflakeSqlGeneratorIntegrationTest : private var dataSource: DataSource = SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) private var database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(dataSource) - - @JvmStatic - @BeforeAll - fun setupSnowflake(): Unit { - dataSource = - SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) - database = SnowflakeDatabaseUtils.getDatabase(dataSource) - } - - @JvmStatic - @AfterAll - @Throws(Exception::class) - fun teardownSnowflake(): Unit { - DataSourceFactory.close(dataSource) - } } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt index 66eae99f3bd3..56777528d6f2 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/kotlin/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.kt @@ -21,9 +21,13 @@ import io.airbyte.workers.exception.TestHarnessException import io.github.oshai.kotlinlogging.KotlinLogging import java.nio.file.Path import java.sql.SQLException +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger import javax.sql.DataSource -import kotlin.concurrent.Volatile import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Disabled @@ -56,7 +60,7 @@ abstract class AbstractSnowflakeTypingDedupingTest( dataSource = SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS) database = SnowflakeDatabaseUtils.getDatabase(dataSource) - cleanAirbyteInternalTable(databaseName, database, forceUppercaseIdentifiers) + cleanAirbyteInternalTable(database) return config } @@ -419,48 +423,101 @@ abstract class AbstractSnowflakeTypingDedupingTest( "_AIRBYTE_GENERATION_ID", ) - @Volatile private var cleanedAirbyteInternalTable = false + private val cleanedAirbyteInternalTable = AtomicBoolean(false) + private val threadId = AtomicInteger(0) @Throws(SQLException::class) - private fun cleanAirbyteInternalTable( - databaseName: String, - database: JdbcDatabase?, - forceUppercase: Boolean, - ) { - if (!cleanedAirbyteInternalTable) { - synchronized(AbstractSnowflakeTypingDedupingTest::class.java) { - if (!cleanedAirbyteInternalTable) { - val destinationStateTableExists = - database!!.executeMetadataQuery { - it.getTables( - databaseName, - if (forceUppercase) { - "AIRBYTE_INTERNAL" - } else { - "airbyte_internal" - }, - if (forceUppercase) { - "_AIRBYTE_DESTINATION_STATE" - } else { - "_airbyte_destination_state" - }, - null - ) - .next() - } - if (destinationStateTableExists) { - database.execute( - """DELETE FROM "airbyte_internal"."_airbyte_destination_state" WHERE "updated_at" < current_date() - 7""", + private fun cleanAirbyteInternalTable(database: JdbcDatabase?) { + if ( + database!! + .queryJsons("SHOW PARAMETERS LIKE 'QUOTED_IDENTIFIERS_IGNORE_CASE';") + .first() + .get("value") + .asText() + .toBoolean() + ) { + return + } + + if (!cleanedAirbyteInternalTable.getAndSet(true)) { + val cleanupCutoffHours = 6 + LOGGER.info { "tableCleaner running" } + val executor = + Executors.newSingleThreadExecutor { + val thread = Executors.defaultThreadFactory().newThread(it) + thread.name = + "airbyteInternalTableCleanupThread-${threadId.incrementAndGet()}" + thread.isDaemon = true + thread + } + executor.execute { + database.execute( + "DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"updated_at\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())", + ) + } + executor.execute { + database.execute( + "DELETE FROM \"AIRBYTE_INTERNAL\".\"_AIRBYTE_DESTINATION_STATE\" WHERE \"UPDATED_AT\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())", + ) + } + executor.execute { + val schemaList = + database.queryJsons( + "SHOW SCHEMAS IN DATABASE INTEGRATION_TEST_DESTINATION;", + ) + LOGGER.info( + "tableCleaner found ${schemaList.size} schemas in database INTEGRATION_TEST_DESTINATION" + ) + schemaList + .associate { + it.get("name").asText() to Instant.parse(it.get("created_on").asText()) + } + .filter { + it.value.isBefore( + Instant.now().minus(cleanupCutoffHours.toLong(), ChronoUnit.HOURS) ) } - cleanedAirbyteInternalTable = true + .filter { + it.key.startsWith("SQL_GENERATOR", ignoreCase = true) || + it.key.startsWith("TDTEST", ignoreCase = true) || + it.key.startsWith("TYPING_DEDUPING", ignoreCase = true) + } + .forEach { + executor.execute { + database.execute( + "DROP SCHEMA INTEGRATION_TEST_DESTINATION.\"${it.key}\" /* created at ${it.value} */;" + ) + } + } + } + for (schemaName in + listOf("AIRBYTE_INTERNAL", "airbyte_internal", "overridden_raw_dataset")) { + executor.execute { + val sql = + "SHOW TABLES IN schema INTEGRATION_TEST_DESTINATION.\"$schemaName\";" + val tableList = database.queryJsons(sql) + LOGGER.info { + "tableCleaner found ${tableList.size} tables in schema $schemaName" + } + tableList + .associate { + it.get("name").asText() to + Instant.parse(it.get("created_on").asText()) + } + .filter { + it.value.isBefore(Instant.now().minus(6, ChronoUnit.HOURS)) && + it.key.startsWith("TDTEST", ignoreCase = true) + } + .forEach { + executor.execute { + database.execute( + "DROP TABLE INTEGRATION_TEST_DESTINATION.\"$schemaName\".\"${it.key}\" /* created at ${it.value} */;" + ) + } + } } } } } } } - -open class Batch(val name: String) - -class LocalFileBatch(name: String) : Batch(name) diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index a93742b26206..4423065896cc 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -268,6 +268,7 @@ desired namespace. | Version | Date | Pull Request | Subject | | :-------------- | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 3.11.12 | 2024-09-12 | [45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter | | 3.11.11 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb | | 3.11.10 | 2024-08-22 | [\#44526](https://github.com/airbytehq/airbyte/pull/44526) | Revert protocol compliance fix | | 3.11.9 | 2024-08-19 | [\#43367](https://github.com/airbytehq/airbyte/pull/43367) | Add opt in using MERGE statement for upserts and deletes |