Skip to content

Commit

Permalink
CDK test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored and gisripa committed Aug 13, 2024
1 parent 7f710bc commit 33e3e0d
Show file tree
Hide file tree
Showing 24 changed files with 102 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
*/
@Deprecated("")
@Throws(IOException::class)
protected abstract fun writeRecord(record: AirbyteRecordMessage, generationId: Long = 0)
protected abstract fun writeRecord(
record: AirbyteRecordMessage,
generationId: Long = 0,
syncId: Long = 0
)

/**
* TODO: (ryankfu) move destination to use serialized record string instead of passing entire
Expand Down Expand Up @@ -80,7 +84,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu

@Deprecated("")
@Throws(Exception::class)
override fun accept(record: AirbyteRecordMessage, generationId: Long): Long {
override fun accept(record: AirbyteRecordMessage, generationId: Long, syncId: Long): Long {
if (!isStarted) {
if (useCompression) {
compressedBuffer = GzipCompressorOutputStream(byteCounter)
Expand All @@ -92,7 +96,7 @@ protected constructor(private val bufferStorage: BufferStorage) : SerializableBu
}
if (inputStream == null && !isClosed) {
val startCount = byteCounter.count
@Suppress("deprecation") writeRecord(record, generationId)
@Suppress("deprecation") writeRecord(record, generationId, syncId)
return byteCounter.count - startCount
} else {
throw IllegalCallerException("Buffer is already closed, it cannot accept more messages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ interface BufferingStrategy : AutoCloseable {
fun addRecord(
stream: AirbyteStreamNameNamespacePair,
message: AirbyteMessage,
generationId: Long = 0
generationId: Long = 0,
syncId: Long = 0
): Optional<BufferFlushType>

/** Flush the buffered messages from a single stream */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class InMemoryRecordBufferingStrategy(
override fun addRecord(
stream: AirbyteStreamNameNamespacePair,
message: AirbyteMessage,
generationId: Long
generationId: Long,
syncId: Long
): Optional<BufferFlushType> {
var flushed: Optional<BufferFlushType> = Optional.empty()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ interface SerializableBuffer : AutoCloseable {
*/
@Deprecated("")
@Throws(Exception::class)
fun accept(record: AirbyteRecordMessage, generationId: Long = 0): Long
fun accept(record: AirbyteRecordMessage, generationId: Long = 0, syncId: Long = 0): Long

/**
* TODO: (ryankfu) Move all destination connectors to pass the serialized record string instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ class SerializedBufferingStrategy
override fun addRecord(
stream: AirbyteStreamNameNamespacePair,
message: AirbyteMessage,
generationId: Long
generationId: Long,
syncId: Long
): Optional<BufferFlushType> {
var flushed: Optional<BufferFlushType> = Optional.empty()

val buffer = getOrCreateBuffer(stream)

@Suppress("DEPRECATION")
val actualMessageSizeInBytes = buffer.accept(message.record, generationId)
val actualMessageSizeInBytes = buffer.accept(message.record, generationId, syncId)
totalBufferSizeInBytes += actualMessageSizeInBytes
// Flushes buffer when either the buffer was completely filled or only a single stream was
// filled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers
import org.mockito.Mockito
import org.mockito.kotlin.any
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock

class BufferedStreamConsumerTest {
Expand Down Expand Up @@ -361,7 +362,7 @@ class BufferedStreamConsumerTest {
// The first two records that we push will not trigger any flushes, but the third record
// _will_
// trigger a flush
Mockito.`when`(strategy.addRecord(any(), any()))
Mockito.`when`(strategy.addRecord(any(), any(), eq(0)))
.thenReturn(
Optional.empty(),
Optional.empty(),
Expand Down Expand Up @@ -463,7 +464,7 @@ class BufferedStreamConsumerTest {
// The first two records that we push will not trigger any flushes, but the third record
// _will_
// trigger a flush
Mockito.`when`(strategy.addRecord(any(), any()))
Mockito.`when`(strategy.addRecord(any(), any(), eq(0)))
.thenReturn(
Optional.empty(),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.Mockito
import org.mockito.kotlin.any
import org.mockito.kotlin.eq

class SerializedBufferingStrategyTest {
private val catalog: ConfiguredAirbyteCatalog =
Expand All @@ -37,7 +38,7 @@ class SerializedBufferingStrategyTest {

@Throws(Exception::class)
private fun setupMock(mockObject: SerializableBuffer) {
Mockito.`when`(mockObject.accept(any())).thenReturn(10L)
Mockito.`when`(mockObject.accept(any(), eq(0L))).thenReturn(10L)
Mockito.`when`(mockObject.byteCount).thenReturn(10L)
Mockito.`when`(mockObject.maxTotalBufferSizeInBytes).thenReturn(MAX_TOTAL_BUFFER_SIZE_BYTES)
Mockito.`when`(mockObject.maxPerStreamBufferSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteMessage.Type
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
Expand Down Expand Up @@ -1581,13 +1580,13 @@ abstract class DestinationAcceptanceTest(
return record
}

private fun getChanges(record: JsonNode): MutableList<AirbyteRecordMessageMetaChange> {
private fun getMeta(record: JsonNode): ObjectNode {
val meta = record.get(JavaBaseConstants.COLUMN_NAME_AB_META)

val asString = if (meta.isTextual) meta.asText() else Jsons.serialize(meta)
val asMeta = Jsons.deserialize(asString, AirbyteRecordMessageMeta::class.java)
val asMeta = Jsons.deserialize(asString)

return asMeta.changes
return asMeta as ObjectNode
}

@Test
Expand Down Expand Up @@ -1628,11 +1627,6 @@ abstract class DestinationAcceptanceTest(
Assertions.assertEquals(message.record.emittedAt, record.get(abTsKey).asLong())

if (useV2Fields) {
// Validate that the loadTs at at least >= the time the test started
Assertions.assertTrue(
record.get(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT).asLong() >=
preRunTime.toEpochMilli()
)
// Generation id should match the one from the catalog
Assertions.assertEquals(
generationId,
Expand All @@ -1642,18 +1636,22 @@ abstract class DestinationAcceptanceTest(
}

// Regardless of whether change failures are capatured, all v2
// destinations should pass upstream changes through.
// destinations should pass upstream changes through and set sync id.
if (useV2Fields) {
val changes = getChanges(destinationOutput[2])
val metas = destinationOutput.map { getMeta(it) }
val syncIdsAllValid = metas.map { it["sync_id"].asLong() }.all { it == 100L }
Assertions.assertTrue(syncIdsAllValid)

val changes = metas[2]["changes"].elements().asSequence().toList()
Assertions.assertEquals(changes.size, 1)
Assertions.assertEquals(changes[0].field, "name")
Assertions.assertEquals(changes[0]["field"].asText(), "name")
Assertions.assertEquals(
changes[0].change,
AirbyteRecordMessageMetaChange.Change.TRUNCATED
changes[0]["change"].asText(),
AirbyteRecordMessageMetaChange.Change.TRUNCATED.value()
)
Assertions.assertEquals(
changes[0].reason,
AirbyteRecordMessageMetaChange.Reason.SOURCE_FIELD_SIZE_LIMITATION
changes[0]["reason"].asText(),
AirbyteRecordMessageMetaChange.Reason.SOURCE_FIELD_SIZE_LIMITATION.value()
)
}

Expand All @@ -1665,14 +1663,17 @@ abstract class DestinationAcceptanceTest(
val data = getData(badRow)

Assertions.assertTrue(data["id"] == null || data["id"].isNull)
val changes = getChanges(badRow)
val changes = getMeta(badRow)["changes"].elements().asSequence().toList()

Assertions.assertEquals(1, changes.size)
Assertions.assertEquals("id", changes[0].field)
Assertions.assertEquals(AirbyteRecordMessageMetaChange.Change.NULLED, changes[0].change)
Assertions.assertEquals("id", changes[0]["field"].asText())
Assertions.assertEquals(
AirbyteRecordMessageMetaChange.Change.NULLED.value(),
changes[0]["change"].asText()
)
Assertions.assertEquals(
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR,
changes[0].reason
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR.value(),
changes[0]["reason"].asText()
)

// Expect the third message to have added a new change to an old one
Expand All @@ -1681,7 +1682,8 @@ abstract class DestinationAcceptanceTest(
Assertions.assertTrue(
dataWithPreviousChange["id"] == null || dataWithPreviousChange["id"].isNull
)
val twoChanges = getChanges(badRowWithPreviousChange)
val twoChanges =
getMeta(badRowWithPreviousChange)["changes"].elements().asSequence().toList()
Assertions.assertEquals(2, twoChanges.size)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
"cursor_field": [],
"destination_sync_mode": "overwrite",
"primary_key": [],
"generation_id": 10
"generation_id": 10,
"minimum_generation_id": 10,
"sync_id": 100
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ class S3ConsumerFactory {
// Buffer creation function: yields a file buffer that converts
// incoming data to the correct format for the destination.

val generationIds =
val generationAndSyncIds =
catalog.streams.associate { stream ->
val descriptor =
StreamDescriptor()
.withNamespace(stream.stream.namespace)
.withName(stream.stream.name)
descriptor to stream.generationId
descriptor to Pair(stream.generationId, stream.syncId)
}

val createFunction =
Expand Down Expand Up @@ -201,7 +201,7 @@ class S3ConsumerFactory {
flushBufferFunction(storageOps, writeConfigs, catalog)
)
},
generationIds
generationAndSyncIds
),
catalog,
// S3 has no concept of default namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.util.stream.Stream
class S3DestinationFlushFunction(
override val optimalBatchSizeBytes: Long,
private val strategyProvider: () -> BufferingStrategy,
private val generationIds: Map<StreamDescriptor, Long> = emptyMap()
private val generationAndSyncIds: Map<StreamDescriptor, Pair<Long, Long>> = emptyMap()
) : DestinationFlushFunction {

override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
Expand Down Expand Up @@ -47,8 +47,8 @@ class S3DestinationFlushFunction(
.withData(data)
val completeMessage =
AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(completeRecord)
val generationId = generationIds[streamDescriptor] ?: 0
strategy.addRecord(nameAndNamespace, completeMessage, generationId)
val (generationId, syncId) = generationAndSyncIds[streamDescriptor] ?: Pair(0L, 0L)
strategy.addRecord(nameAndNamespace, completeMessage, generationId, syncId)
}
strategy.flushSingleStream(nameAndNamespace)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,14 @@ package io.airbyte.cdk.integrations.destination.s3.avro
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.lang.Exception
import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecordBuilder
import tech.allegro.schema.json2avro.converter.FieldConversionFailureListener

class AvroFieldConversionFailureListener : FieldConversionFailureListener() {
val CHANGE_SCHEMA: Schema =
SchemaBuilder.builder()
.record("change")
.fields()
.name("field")
.type()
.stringType()
.noDefault()
.name("change")
.type()
.stringType()
.noDefault()
.name("reason")
.type()
.stringType()
.noDefault()
.endRecord()
companion object {
val CHANGE_SCHEMA: Schema = AvroConstants.AVRO_CHANGES_SCHEMA
}

override fun onFieldConversionFailure(
avroName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,16 @@ class AvroRecordFactory(private val schema: Schema?, private val converter: Json
fun getAvroRecordV2(
id: UUID,
generationId: Long,
syncId: Long,
recordMessage: AirbyteRecordMessage
): GenericData.Record {
val jsonRecord = MAPPER.createObjectNode()
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, id.toString())
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, recordMessage.emittedAt)
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, System.currentTimeMillis())
jsonRecord.put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId)
jsonRecord.replace(
JavaBaseConstants.COLUMN_NAME_AB_META,
MAPPER.valueToTree(recordMessage.meta) as ObjectNode
)

val meta = MAPPER.valueToTree(recordMessage.meta) as ObjectNode
meta.put("sync_id", syncId)
jsonRecord.replace(JavaBaseConstants.COLUMN_NAME_AB_META, meta)
jsonRecord.setAll<JsonNode>(recordMessage.data as ObjectNode)

return converter!!.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class AvroSerializedBuffer(

@Deprecated("Deprecated in Java")
@Throws(IOException::class)
override fun writeRecord(record: AirbyteRecordMessage, generationId: Long) {
override fun writeRecord(record: AirbyteRecordMessage, generationId: Long, syncId: Long) {
if (this.useV2FieldNames) {
dataFileWriter!!.append(
avroRecordFactory.getAvroRecordV2(UUID.randomUUID(), generationId, record)
avroRecordFactory.getAvroRecordV2(UUID.randomUUID(), generationId, syncId, record)
)
} else {
dataFileWriter!!.append(avroRecordFactory.getAvroRecord(UUID.randomUUID(), record))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,39 +121,21 @@ class JsonToAvroSchemaConverter {
.name(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)
.type(TIMESTAMP_MILLIS_SCHEMA)
.noDefault()
assembler
.name(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)
.type(TIMESTAMP_MILLIS_SCHEMA)
.noDefault()
assembler
.name(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID)
.type(Schema.create(Schema.Type.LONG))
.noDefault()
val changeSchema: Schema =
SchemaBuilder.builder()
.record("change")
.fields()
.name("field")
.type()
.stringType()
.noDefault()
.name("change")
.type()
.stringType()
.noDefault()
.name("reason")
.type()
.stringType()
.noDefault()
.endRecord()
assembler
.name(JavaBaseConstants.COLUMN_NAME_AB_META)
.type(
SchemaBuilder.builder()
.record(JavaBaseConstants.COLUMN_NAME_AB_META)
.fields()
.name("changes")
.type(Schema.createArray(changeSchema))
.type(Schema.createArray(AvroConstants.AVRO_CHANGES_SCHEMA))
.noDefault()
.name("sync_id")
.type(Schema.create(Schema.Type.LONG))
.noDefault()
.endRecord()
)
Expand Down
Loading

0 comments on commit 33e3e0d

Please sign in to comment.