Skip to content

Commit

Permalink
Merge branch 'master' into #11322-remove-docker-from-config
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-at-airbyte authored Jan 14, 2025
2 parents 3212f81 + 6a8ee2d commit c076a23
Show file tree
Hide file tree
Showing 1,076 changed files with 20,305 additions and 18,388 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ on:
jobs:
publish_connectors:
name: Publish connectors
runs-on: connector-publish-large
runs-on: ubuntu-24.04-4core
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ jobs:
runs-on: linux-20.04-large # Custom runner, defined in GitHub org settings
timeout-minutes: 360 # 6 hours
steps:
- name: Install Python
id: install_python
uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Check PAT rate limits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package io.airbyte.cdk.discover

import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.DoubleCodec
import io.airbyte.cdk.data.JsonDecoder
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.JsonStringCodec
Expand Down Expand Up @@ -63,8 +64,8 @@ interface MetaField : FieldOrMetaField {
enum class CommonMetaField(
override val type: FieldType,
) : MetaField {
CDC_UPDATED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_DELETED_AT(CdcOffsetDateTimeMetaFieldType),
CDC_UPDATED_AT(CdcStringMetaFieldType),
CDC_DELETED_AT(CdcStringMetaFieldType),
;

override val id: String
Expand All @@ -89,3 +90,9 @@ data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec
override val jsonDecoder: JsonDecoder<OffsetDateTime> = OffsetDateTimeCodec
}

data object CdcNumberMetaFieldType : LosslessFieldType {
override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.NUMBER
override val jsonEncoder: JsonEncoder<Double> = DoubleCodec
override val jsonDecoder: JsonDecoder<Double> = DoubleCodec
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

private val logger = KotlinLogging.logger {}

@Singleton
@Requires(property = Operation.PROPERTY, value = "check")
@Requires(env = ["destination"])
Expand All @@ -40,6 +43,7 @@ class CheckOperation<T : ConfigurationSpecification, C : DestinationConfiguratio
)
outputConsumer.accept(successMessage)
} catch (t: Throwable) {
logger.warn(t) { "Caught throwable during CHECK" }
val (traceMessage, statusMessage) = exceptionHandler.handleCheckFailure(t)
outputConsumer.accept(traceMessage)
outputConsumer.accept(statusMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

package io.airbyte.cdk.load.data

import com.fasterxml.jackson.annotation.JsonValue
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.node.NullNode
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
Expand Down Expand Up @@ -48,6 +54,11 @@ sealed interface AirbyteValue {
// (mostly the date/timestamp/time types - everything else is fine)
data object NullValue : AirbyteValue, Comparable<NullValue> {
override fun compareTo(other: NullValue): Int = 0

// make sure that we serialize this as a NullNode, rather than an empty object.
// We can't just return `null`, because jackson treats that as an error
// and falls back to its normal serialization behavior.
@JsonValue fun toJson(): NullNode = NullNode.instance
}

@JvmInline
Expand Down Expand Up @@ -75,34 +86,39 @@ value class NumberValue(val value: BigDecimal) : AirbyteValue, Comparable<Number
value class DateValue(val value: LocalDate) : AirbyteValue, Comparable<DateValue> {
constructor(date: String) : this(LocalDate.parse(date))
override fun compareTo(other: DateValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimestampWithTimezoneValue(val value: OffsetDateTime) :
AirbyteValue, Comparable<TimestampWithTimezoneValue> {
constructor(timestamp: String) : this(OffsetDateTime.parse(timestamp))
override fun compareTo(other: TimestampWithTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimestampWithoutTimezoneValue(val value: LocalDateTime) :
AirbyteValue, Comparable<TimestampWithoutTimezoneValue> {
constructor(timestamp: String) : this(LocalDateTime.parse(timestamp))
override fun compareTo(other: TimestampWithoutTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimeWithTimezoneValue(val value: OffsetTime) :
AirbyteValue, Comparable<TimeWithTimezoneValue> {
constructor(time: String) : this(OffsetTime.parse(time))
override fun compareTo(other: TimeWithTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
value class TimeWithoutTimezoneValue(val value: LocalTime) :
AirbyteValue, Comparable<TimeWithoutTimezoneValue> {
constructor(time: String) : this(LocalTime.parse(time))
override fun compareTo(other: TimeWithoutTimezoneValue): Int = value.compareTo(other.value)
@JsonValue fun toJson() = value.toString()
}

@JvmInline
Expand All @@ -112,12 +128,28 @@ value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue {
}
}

// jackson can't figure out how to serialize this class,
// so write a custom serializer that just serializes the map directly.
// For some reason, the more obvious `@JsonValue fun toJson() = values`
// doesn't work either.
@JsonSerialize(using = ObjectValueSerializer::class)
@JvmInline
value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue {
@JsonValue fun toJson() = values
companion object {
fun from(map: Map<String, Any?>): ObjectValue =
ObjectValue(map.mapValuesTo(linkedMapOf()) { (_, v) -> AirbyteValue.from(v) })
}
}

private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
override fun serialize(
value: ObjectValue,
gen: JsonGenerator,
serializers: SerializerProvider,
) {
gen.writePOJO(value.values)
}
}

@JvmInline value class UnknownValue(val value: JsonNode) : AirbyteValue
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}

private suspend fun flushGlobalCheckpoints() {
if (globalCheckpoints.isEmpty()) {
log.info { "No global checkpoints to flush" }
return
}
while (!globalCheckpoints.isEmpty()) {
val head = globalCheckpoints.peek()
val allStreamsPersisted =
Expand All @@ -167,12 +171,20 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}

private suspend fun flushStreamCheckpoints() {
val noCheckpointStreams = mutableSetOf<DestinationStream.Descriptor>()
for (stream in catalog.streams) {

val manager = syncManager.getStreamManager(stream.descriptor)
val streamCheckpoints = streamCheckpoints[stream.descriptor] ?: return
val streamCheckpoints = streamCheckpoints[stream.descriptor]
if (streamCheckpoints == null) {
noCheckpointStreams.add(stream.descriptor)

continue
}
while (true) {
val (nextIndex, nextMessage) = streamCheckpoints.peek() ?: break
if (manager.areRecordsPersistedUntil(nextIndex)) {

log.info {
"Flushing checkpoint for stream: ${stream.descriptor} at index: $nextIndex"
}
Expand All @@ -184,6 +196,9 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
}
}
}
if (noCheckpointStreams.isNotEmpty()) {
log.info { "No checkpoints for streams: $noCheckpointStreams" }
}
}

private suspend fun validateAndSendMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ interface DestinationTaskLauncher : TaskLauncher {
justification = "arguments are guaranteed to be non-null by Kotlin's type system"
)
class DefaultDestinationTaskLauncher(
private val taskScopeProvider: TaskScopeProvider<WrappedTask<ScopedTask>>,
private val taskScopeProvider: TaskScopeProvider,
private val catalog: DestinationCatalog,
private val config: DestinationConfiguration,
private val syncManager: SyncManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.cdk.load.task

import io.airbyte.cdk.load.util.CloseableCoroutine

interface Task {
suspend fun execute()
}
Expand All @@ -21,24 +19,3 @@ interface TaskLauncher {
*/
suspend fun run()
}

/**
* Wraps tasks with exception handling. It should perform all necessary exception handling, then
* execute the provided callback.
*/
interface TaskExceptionHandler<T : Task, U : Task> {
// Wrap a task with exception handling.
suspend fun withExceptionHandling(task: T): U

// Set a callback that will be invoked when any exception handling is done.
suspend fun setCallback(callback: suspend () -> Unit)
}

/** Provides the scope(s) in which tasks run. */
interface TaskScopeProvider<T : Task> : CloseableCoroutine {
/** Launch a task in the correct scope. */
suspend fun launch(task: T)

/** Unliked [close], may attempt to fail gracefully, but should guarantee return. */
suspend fun kill()
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ interface WrappedTask<T : Task> : Task {

@Singleton
@Secondary
class DestinationTaskScopeProvider(config: DestinationConfiguration) :
TaskScopeProvider<WrappedTask<ScopedTask>> {
class TaskScopeProvider(config: DestinationConfiguration) {
private val log = KotlinLogging.logger {}

private val timeoutMs = config.gracefulCancellationTimeoutMs
Expand All @@ -81,7 +80,7 @@ class DestinationTaskScopeProvider(config: DestinationConfiguration) :

private val failFastScope = ControlScope("input", Job(), Dispatchers.IO)

override suspend fun launch(task: WrappedTask<ScopedTask>) {
suspend fun launch(task: WrappedTask<ScopedTask>) {
val scope =
when (task.innerTask) {
is InternalScope -> internalScope
Expand All @@ -97,7 +96,7 @@ class DestinationTaskScopeProvider(config: DestinationConfiguration) :
}
}

override suspend fun close() {
suspend fun close() {
// Under normal operation, all tasks should be complete
// (except things like force flush, which loop). So
// - it's safe to force cancel the internal tasks
Expand Down Expand Up @@ -126,7 +125,7 @@ class DestinationTaskScopeProvider(config: DestinationConfiguration) :
internalScope.job.cancel()
}

override suspend fun kill() {
suspend fun kill() {
log.info { "Killing task scopes" }
// Terminate tasks which should be immediately terminated
failFastScope.job.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,33 @@

package io.airbyte.cdk.load.data.json

import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
import io.airbyte.cdk.load.data.NumberValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.data.UnknownValue
import io.airbyte.cdk.load.util.Jsons
import io.airbyte.cdk.load.util.serializeToString
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.OffsetDateTime
import java.time.OffsetTime
import kotlin.test.assertEquals
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow

class AirbyteValueToJsonTest {
@Test
Expand All @@ -38,4 +57,36 @@ class AirbyteValueToJsonTest {

Assertions.assertEquals(airbyteValue, roundTripValue)
}

/**
* We have some code that relies on being able to directly Jackson-serialize an AirbyteValue
* (for example, the [serializeToString] extension method). Verify that this behaves correctly.
*/
@Test
fun testAllTypesSerialization() {
val testCases: Map<AirbyteValue, String> =
mapOf(
NullValue to "null",
StringValue("foo") to "\"foo\"",
BooleanValue(true) to "true",
IntegerValue(BigInteger("42")) to "42",
NumberValue(BigDecimal("42.1")) to "42.1",
DateValue(LocalDate.parse("2024-01-23")) to "\"2024-01-23\"",
TimestampWithTimezoneValue(OffsetDateTime.parse("2024-01-23T12:34:56.78Z")) to
"\"2024-01-23T12:34:56.780Z\"",
TimestampWithoutTimezoneValue(LocalDateTime.parse("2024-01-23T12:34:56.78")) to
"\"2024-01-23T12:34:56.780\"",
TimeWithTimezoneValue(OffsetTime.parse("12:34:56.78Z")) to "\"12:34:56.780Z\"",
TimeWithoutTimezoneValue(LocalTime.parse("12:34:56.78")) to "\"12:34:56.780\"",
ArrayValue(listOf(NullValue, ArrayValue(listOf(NullValue)))) to "[null,[null]]",
ObjectValue(linkedMapOf("foo" to ObjectValue(linkedMapOf("bar" to NullValue)))) to
"""{"foo":{"bar":null}}""",
UnknownValue(Jsons.readTree("""{"foo": "bar"}""")) to """{"foo":"bar"}"""
)
testCases.forEach { (value, expectedSerialization) ->
val actual =
assertDoesNotThrow("Failed to serialize $value") { Jsons.writeValueAsString(value) }
assertEquals(expectedSerialization, actual, "Incorrect serialization for $value")
}
}
}
Loading

0 comments on commit c076a23

Please sign in to comment.