Skip to content

Commit

Permalink
destination-snowflake: truncate large records
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Sep 13, 2024
1 parent 2d22e6f commit a607474
Showing 1 changed file with 68 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.integrations.destination.snowflake

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import io.airbyte.cdk.db.factory.DataSourceFactory.close
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.db.jdbc.JdbcUtils
Expand All @@ -20,6 +21,8 @@ import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
Expand All @@ -36,11 +39,7 @@ import io.airbyte.integrations.destination.snowflake.operation.SnowflakeStagingC
import io.airbyte.integrations.destination.snowflake.operation.SnowflakeStorageOperation
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.*
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
Expand Down Expand Up @@ -266,10 +265,73 @@ constructor(
},
onFlush = DefaultFlush(optimalFlushBatchSize, syncOperation),
catalog = catalog,
bufferManager = BufferManager(defaultNamespace, snowflakeBufferMemoryLimit)
bufferManager = BufferManager(defaultNamespace, snowflakeBufferMemoryLimit),
airbyteMessageDeserializer =
AirbyteMessageDeserializer(
SnowflakeLargeRecordTruncator(parsedCatalog, defaultNamespace)
)
)
}

private class SnowflakeLargeRecordTruncator(
private val parsedCatalog: ParsedCatalog,
private val defaultNamespace: String
) : StreamAwareDataTransformer {
val maxRowSize = 16 * 1_20 * 1_24
override fun transform(
streamDescriptor: StreamDescriptor?,
data: JsonNode?,
meta: AirbyteRecordMessageMeta?
): Pair<JsonNode?, AirbyteRecordMessageMeta?> {
if (data == null) {
return Pair(null, meta)
}
val metaChanges: MutableList<AirbyteRecordMessageMetaChange> = ArrayList()
if (meta != null && meta.changes != null) {
metaChanges.addAll(meta.changes)
}

val namespace =
if (
(streamDescriptor!!.namespace != null &&
streamDescriptor.namespace.isNotEmpty())
)
streamDescriptor.namespace
else defaultNamespace
val streamConfig = parsedCatalog.getStream(namespace, streamDescriptor.name)

var totalSize = 0
val finalData = JsonNodeFactory.instance.objectNode()
val fieldValueByName =
data.fields().asSequence().associate { it.key to it.value }.toMutableMap()
for (pkField in streamConfig.primaryKey) {
val fieldValue = fieldValueByName.remove(pkField.name)
finalData.set<JsonNode>(pkField.name, fieldValue)
totalSize += fieldValue?.asText()?.length ?: 0
}
val fieldNameSortedByValueSize =
fieldValueByName.keys.sortedBy { fieldValueByName.getValue(it).asText().length }
for (fieldName in fieldNameSortedByValueSize) {
val fieldValue = fieldValueByName.remove(fieldName)
val fieldSize = fieldValue?.asText()?.length ?: 0
if (totalSize + fieldSize > maxRowSize) break
finalData.set<JsonNode>(fieldName, fieldValue)
totalSize += fieldSize
}
for (fieldEntry in fieldValueByName) {
metaChanges.add(
AirbyteRecordMessageMetaChange()
.withField(fieldEntry.key)
.withChange(AirbyteRecordMessageMetaChange.Change.NULLED)
.withReason(
AirbyteRecordMessageMetaChange.Reason.DESTINATION_RECORD_SIZE_LIMITATION
)
)
}
return Pair(finalData, AirbyteRecordMessageMeta().withChanges(metaChanges))
}
}

override val isV2Destination: Boolean
get() = true

Expand Down

0 comments on commit a607474

Please sign in to comment.