Skip to content

Commit

Permalink
destination-snowflake: bump CDK
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Sep 17, 2024
1 parent c35540c commit e01bb55
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.19'
cdkVersionRequired = '0.45.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
JunitMethodExecutionTimeout=30 m
JunitMethodExecutionTimeout=20 m
testExecutionConcurrency=2
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.11.11
dockerImageTag: 3.11.12
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object SnowflakeDatabaseUtils {
private const val IP_NOT_IN_WHITE_LIST_ERR_MSG = "not allowed to access Snowflake"

@JvmStatic
fun createDataSource(config: JsonNode, airbyteEnvironment: String?): HikariDataSource {
fun createDataSource(config: JsonNode, airbyteEnvironment: String?): DataSource {

val dataSource = HikariDataSource()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,19 +316,17 @@ constructor(
}

fun main(args: Array<String>) {
IntegrationRunner.addOrphanedThreadFilter { t: Thread ->
if (IntegrationRunner.getThreadCreationInfo(t) != null) {
for (stackTraceElement in IntegrationRunner.getThreadCreationInfo(t)!!.stack) {
val stackClassName = stackTraceElement.className
val stackMethodName = stackTraceElement.methodName
if (
SFStatement::class.java.canonicalName == stackClassName &&
"close" == stackMethodName ||
SFSession::class.java.canonicalName == stackClassName &&
"callHeartBeatWithQueryTimeout" == stackMethodName
) {
return@addOrphanedThreadFilter false
}
IntegrationRunner.addOrphanedThreadFilter { threadInfo: IntegrationRunner.OrphanedThreadInfo ->
for (stackTraceElement in threadInfo.threadCreationInfo.stack) {
val stackClassName = stackTraceElement.className
val stackMethodName = stackTraceElement.methodName
if (
SFStatement::class.java.canonicalName == stackClassName &&
"close" == stackMethodName ||
SFSession::class.java.canonicalName == stackClassName &&
"callHeartBeatWithQueryTimeout" == stackMethodName
) {
return@addOrphanedThreadFilter false
}
}
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@ internal class SnowflakeDestinationIntegrationTest {
@Throws(Exception::class)
fun testCheckFailsWithInvalidPermissions() {
// TODO(sherifnada) this test case is assumes config.json does not have permission to access
// the
// schema
// the schema RESTRICTED_SCHEMA was created by the user AIRBYTETESTER, and then permissions
// were removed with
// 'REVOKE ALL ON SCHEMA restricted_schema FROM ROLE integration_tester_destination;'
// this connector should be updated with multiple credentials, each with a clear purpose
// (valid,
// invalid: insufficient permissions, invalid: wrong password, etc..)
// (valid, invalid: insufficient permissions, invalid: wrong password, etc..)
val credentialsJsonString = deserialize(Files.readString(Paths.get("secrets/config.json")))
val check =
SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString)
Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, check!!.status)
Assertions.assertEquals(
"Could not connect with provided configuration. Encountered Error with Snowflake Configuration: " +
"Current role does not have permissions on the target schema please verify your privileges",
check.message
)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping
import com.fasterxml.jackson.databind.JsonNode
import com.google.common.collect.ImmutableMap
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.db.factory.DataSourceFactory
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
import io.airbyte.cdk.integrations.base.JavaBaseConstants
Expand Down Expand Up @@ -1857,20 +1856,5 @@ abstract class AbstractSnowflakeSqlGeneratorIntegrationTest :
private var dataSource: DataSource =
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
private var database: JdbcDatabase = SnowflakeDatabaseUtils.getDatabase(dataSource)

@JvmStatic
@BeforeAll
fun setupSnowflake(): Unit {
dataSource =
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
database = SnowflakeDatabaseUtils.getDatabase(dataSource)
}

@JvmStatic
@AfterAll
@Throws(Exception::class)
fun teardownSnowflake(): Unit {
DataSourceFactory.close(dataSource)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import io.airbyte.workers.exception.TestHarnessException
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
import java.sql.SQLException
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
import kotlin.concurrent.Volatile
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
Expand Down Expand Up @@ -56,7 +60,7 @@ abstract class AbstractSnowflakeTypingDedupingTest(
dataSource =
SnowflakeDatabaseUtils.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS)
database = SnowflakeDatabaseUtils.getDatabase(dataSource)
cleanAirbyteInternalTable(databaseName, database, forceUppercaseIdentifiers)
cleanAirbyteInternalTable(database)
return config
}

Expand Down Expand Up @@ -419,48 +423,101 @@ abstract class AbstractSnowflakeTypingDedupingTest(
"_AIRBYTE_GENERATION_ID",
)

@Volatile private var cleanedAirbyteInternalTable = false
private val cleanedAirbyteInternalTable = AtomicBoolean(false)
private val threadId = AtomicInteger(0)

@Throws(SQLException::class)
private fun cleanAirbyteInternalTable(
databaseName: String,
database: JdbcDatabase?,
forceUppercase: Boolean,
) {
if (!cleanedAirbyteInternalTable) {
synchronized(AbstractSnowflakeTypingDedupingTest::class.java) {
if (!cleanedAirbyteInternalTable) {
val destinationStateTableExists =
database!!.executeMetadataQuery {
it.getTables(
databaseName,
if (forceUppercase) {
"AIRBYTE_INTERNAL"
} else {
"airbyte_internal"
},
if (forceUppercase) {
"_AIRBYTE_DESTINATION_STATE"
} else {
"_airbyte_destination_state"
},
null
)
.next()
}
if (destinationStateTableExists) {
database.execute(
"""DELETE FROM "airbyte_internal"."_airbyte_destination_state" WHERE "updated_at" < current_date() - 7""",
private fun cleanAirbyteInternalTable(database: JdbcDatabase?) {
if (
database!!
.queryJsons("SHOW PARAMETERS LIKE 'QUOTED_IDENTIFIERS_IGNORE_CASE';")
.first()
.get("value")
.asText()
.toBoolean()
) {
return
}

if (!cleanedAirbyteInternalTable.getAndSet(true)) {
val cleanupCutoffHours = 6
LOGGER.info { "tableCleaner running" }
val executor =
Executors.newSingleThreadExecutor {
val thread = Executors.defaultThreadFactory().newThread(it)
thread.name =
"airbyteInternalTableCleanupThread-${threadId.incrementAndGet()}"
thread.isDaemon = true
thread
}
executor.execute {
database.execute(
"DELETE FROM \"airbyte_internal\".\"_airbyte_destination_state\" WHERE \"updated_at\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())",
)
}
executor.execute {
database.execute(
"DELETE FROM \"AIRBYTE_INTERNAL\".\"_AIRBYTE_DESTINATION_STATE\" WHERE \"UPDATED_AT\" < timestampadd('hours', -$cleanupCutoffHours, current_timestamp())",
)
}
executor.execute {
val schemaList =
database.queryJsons(
"SHOW SCHEMAS IN DATABASE INTEGRATION_TEST_DESTINATION;",
)
LOGGER.info(
"tableCleaner found ${schemaList.size} schemas in database INTEGRATION_TEST_DESTINATION"
)
schemaList
.associate {
it.get("name").asText() to Instant.parse(it.get("created_on").asText())
}
.filter {
it.value.isBefore(
Instant.now().minus(cleanupCutoffHours.toLong(), ChronoUnit.HOURS)
)
}
cleanedAirbyteInternalTable = true
.filter {
it.key.startsWith("SQL_GENERATOR", ignoreCase = true) ||
it.key.startsWith("TDTEST", ignoreCase = true) ||
it.key.startsWith("TYPING_DEDUPING", ignoreCase = true)
}
.forEach {
executor.execute {
database.execute(
"DROP SCHEMA INTEGRATION_TEST_DESTINATION.\"${it.key}\" /* created at ${it.value} */;"
)
}
}
}
for (schemaName in
listOf("AIRBYTE_INTERNAL", "airbyte_internal", "overridden_raw_dataset")) {
executor.execute {
val sql =
"SHOW TABLES IN schema INTEGRATION_TEST_DESTINATION.\"$schemaName\";"
val tableList = database.queryJsons(sql)
LOGGER.info {
"tableCleaner found ${tableList.size} tables in schema $schemaName"
}
tableList
.associate {
it.get("name").asText() to
Instant.parse(it.get("created_on").asText())
}
.filter {
it.value.isBefore(Instant.now().minus(6, ChronoUnit.HOURS)) &&
it.key.startsWith("TDTEST", ignoreCase = true)
}
.forEach {
executor.execute {
database.execute(
"DROP TABLE INTEGRATION_TEST_DESTINATION.\"$schemaName\".\"${it.key}\" /* created at ${it.value} */;"
)
}
}
}
}
}
}
}
}

open class Batch(val name: String)

class LocalFileBatch(name: String) : Batch(name)
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ desired namespace.

| Version | Date | Pull Request | Subject |
| :-------------- | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 3.11.12 | 2024-09-12 | [45370](https://github.com/airbytehq/airbyte/pull/45370) | fix a race condition in our orphanedThreadFilter |
| 3.11.11 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 3.11.10 | 2024-08-22 | [\#44526](https://github.com/airbytehq/airbyte/pull/44526) | Revert protocol compliance fix |
| 3.11.9 | 2024-08-19 | [\#43367](https://github.com/airbytehq/airbyte/pull/43367) | Add opt in using MERGE statement for upserts and deletes |
Expand Down

0 comments on commit e01bb55

Please sign in to comment.