Skip to content

Commit

Permalink
Merge branch 'iceberg-schema-supertype-finder' into iceberg-schema-up…
Browse files Browse the repository at this point in the history
…date-logic
  • Loading branch information
subodh1810 committed Dec 24, 2024
2 parents 64502c6 + 0ecd07b commit 22aaddc
Show file tree
Hide file tree
Showing 36 changed files with 2,550 additions and 393 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ repos:

- id: spotless
name: Format Java files with Spotless
entry: bash -c 'command -v mvn >/dev/null 2>&1 || { echo "Maven not installed, skipping spotless" >&2; exit 0; }; mvn -f spotless-maven-pom.xml spotless:apply'
entry: bash -c 'command -v mvn >/dev/null 2>&1 || { if [ -z "$CI" ]; then echo "Maven not installed, skipping spotless" >&2; exit 0; fi }; mvn -f spotless-maven-pom.xml spotless:apply'
language: system
files: \.(java|kt|gradle)$
pass_filenames: false
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class StateManager(
initialState: OpaqueStateValue?,
) : StateManagerScopedToFeed {
private var currentStateValue: OpaqueStateValue? = initialState
private var pendingStateValue: OpaqueStateValue? = initialState
private var pendingStateValue: OpaqueStateValue? = null
private var pendingNumRecords: Long = 0L

@Synchronized override fun current(): OpaqueStateValue? = currentStateValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ class StateManagerStreamStatesTest {
stateManager.scoped(stream).current(),
)
Assertions.assertEquals(listOf<CatalogValidationFailure>(), handler.get())

val emptyCheckpoint: List<AirbyteStateMessage> = stateManager.checkpoint()
// check if state manager hasn't set for this stream, state would be null and thus skipped.
Assertions.assertTrue(emptyCheckpoint.isEmpty())

// update state manager with fake work result
stateManager
.scoped(stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class Transformations {
.joinToString(separator = ".")
}

fun toAvroSafeName(name: String) = toAlphanumericAndUnderscore(name)
fun toAvroSafeName(name: String): String {
val stripped = toAlphanumericAndUnderscore(name)
return if (stripped.substring(0, 1).matches("[A-Za-z_]".toRegex())) {
stripped
} else {
"_$stripped"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import org.junit.jupiter.api.Test

class TransformationsTest {
@Test
fun `test avro illegal start character`() {
val unsafeName = "1d_view"
assert(Transformations.toAvroSafeName(unsafeName) == "_1d_view")
}

@Test
fun `test avro special characters`() {
val unsafeName = "1d_view!@#$%^&*()"
assert(Transformations.toAvroSafeName(unsafeName) == "_1d_view__________")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class AirbyteTypeToAvroSchema {
.fold(builder) { acc, (name, field) ->
val converted = convert(field.type, path + name)
val propertySchema = maybeMakeNullable(field, converted)
val nameMangled = Transformations.toAlphanumericAndUnderscore(name)
val nameMangled = Transformations.toAvroSafeName(name)
acc.name(nameMangled).type(propertySchema).let {
if (field.nullable) {
it.withDefault(null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.avro.toAvroSchema
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow

class AirbyteTypeToAvroSchemaTest {
@Test
fun `test name mangling`() {
val schema =
ObjectType(
properties =
linkedMapOf(
"1d_view" to FieldType(type = StringType, nullable = false),
)
)
val descriptor = DestinationStream.Descriptor("test", "stream")
assertDoesNotThrow { schema.toAvroSchema(descriptor) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.io.Closeable
import java.io.OutputStream
import java.util.concurrent.atomic.AtomicLong
import org.apache.avro.Schema

interface ObjectStorageFormattingWriter : Closeable {
Expand Down Expand Up @@ -207,16 +208,25 @@ class BufferedFormattingWriter<T : OutputStream>(
private val streamProcessor: StreamProcessor<T>,
private val wrappingBuffer: T
) : ObjectStorageFormattingWriter {
// An empty buffer is not a guarantee of a non-empty
// file, some writers (parquet) start with a
// header. Avoid writing empty files by requiring
// both 0 bytes AND 0 rows.
private val rowsAdded = AtomicLong(0)
val bufferSize: Int
get() = buffer.size()
get() =
if (rowsAdded.get() == 0L) {
0
} else buffer.size()

override fun accept(record: DestinationRecordAirbyteValue) {
writer.accept(record)
rowsAdded.incrementAndGet()
}

fun takeBytes(): ByteArray? {
wrappingBuffer.flush()
if (buffer.size() == 0) {
if (bufferSize == 0) {
return null
}

Expand All @@ -229,7 +239,7 @@ class BufferedFormattingWriter<T : OutputStream>(
writer.flush()
writer.close()
streamProcessor.partFinisher.invoke(wrappingBuffer)
return if (buffer.size() > 0) {
return if (bufferSize > 0) {
buffer.toByteArray()
} else {
null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.write.object_storage

import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriter
import io.airbyte.cdk.load.file.object_storage.ObjectStorageFormattingWriter
import io.mockk.coEvery
import io.mockk.impl.annotations.MockK
import io.mockk.mockk
import java.io.ByteArrayOutputStream
import org.junit.jupiter.api.Test

class ObjectStorageFormattingWriterTest {
@MockK(relaxed = true) lateinit var underlyingWriter: ObjectStorageFormattingWriter

@Test
fun `buffered formatting writer never produces empty parts`() {
val outputStream = ByteArrayOutputStream()
outputStream.write("i am a header".toByteArray())
val bufferedWriter =
BufferedFormattingWriter(
underlyingWriter,
outputStream,
NoopProcessor,
NoopProcessor.wrapper(outputStream),
)

assert(bufferedWriter.bufferSize == 0) { "buffer appears empty despite header" }
assert(bufferedWriter.takeBytes() == null) { "buffer yields no data despite header" }
assert(bufferedWriter.finish() == null) { "buffer yields no data despite header" }
}

@Test
fun `buffered formatting writer yields entire buffer once any data has been added`() {
val outputStream = ByteArrayOutputStream()
outputStream.write("i am a header".toByteArray())
val bufferedWriter =
BufferedFormattingWriter(
underlyingWriter,
outputStream,
NoopProcessor,
NoopProcessor.wrapper(outputStream),
)

assert(bufferedWriter.takeBytes() == null)
coEvery { bufferedWriter.accept(any()) } coAnswers { outputStream.write("!".toByteArray()) }
bufferedWriter.accept(mockk())
val bytes = bufferedWriter.takeBytes()
assert(bytes != null) { "buffer yields data now that we've written to it" }
assert(bytes.contentEquals("i am a header!".toByteArray())) {
"buffer yields all data written to it"
}
}
}
4 changes: 3 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.48.1 | 2024-11-13 | [\#48482](https://github.com/airbytehq/airbyte/pull/48482) | Adding support converting very large numbers via BigInteger l |
| 0.48.4 | 2024-12-24 | [\#50410](https://github.com/airbytehq/airbyte/pull/50410) | Save SSL key to /tmp |
| 0.48.3 | 2024-12-23 | [\#49858](https://github.com/airbytehq/airbyte/pull/49858) | Relax various Destination CDK methods visibility. |
| 0.48.1 | 2024-11-13 | [\#48482](https://github.com/airbytehq/airbyte/pull/48482) | Adding support converting very large numbers via BigInteger |
| 0.48.0 | 2024-10-23 | [\#46302](https://github.com/airbytehq/airbyte/pull/46302) | Add support for file transfer |
| 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest |
| 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object SSLCertificateUtils {
directory: String?
): URI {
val fs = Objects.requireNonNullElse(filesystem, FileSystems.getDefault())
val pathToStore = fs!!.getPath(Objects.toString(directory, ""))
val pathToStore = fs!!.getPath(Objects.toString(directory, "/tmp"))
val pathToFile =
pathToStore.resolve(KEYSTORE_FILE_NAME + RANDOM.nextInt() + KEYSTORE_FILE_TYPE)
val os = Files.newOutputStream(pathToFile)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.48.2
version=0.48.4
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ abstract class JdbcDestinationHandler<DestinationState>(
): ResultSet = dbmetadata.getTables(catalogName, id.rawNamespace, id.rawName + suffix, null)

@Throws(Exception::class)
private fun isFinalTableEmpty(id: StreamId): Boolean {
protected open fun isFinalTableEmpty(id: StreamId): Boolean {
return !jdbcDatabase.queryBoolean(
dslContext
.select(
Expand Down Expand Up @@ -211,7 +211,8 @@ abstract class JdbcDestinationHandler<DestinationState>(
}

@Throws(SQLException::class)
protected fun getAllDestinationStates(): Map<AirbyteStreamNameNamespacePair, DestinationState> {
protected open fun getAllDestinationStates():
Map<AirbyteStreamNameNamespacePair, DestinationState> {
try {
// Guarantee the table exists.
jdbcDatabase.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ constructor(
.columns(buildFinalTableFields(columns, metaFields))
}

private fun insertAndDeleteTransaction(
protected open fun insertAndDeleteTransaction(
streamConfig: StreamConfig,
finalSuffix: String?,
minRawTimestamp: Optional<Instant>,
Expand Down Expand Up @@ -507,7 +507,7 @@ constructor(
return createSchemaSql.sql
}

protected fun createTableSql(
protected open fun createTableSql(
namespace: String,
tableName: String,
columns: LinkedHashMap<ColumnId, AirbyteType>
Expand Down
3 changes: 2 additions & 1 deletion airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,8 @@ airbyte-ci connectors --language=low-code migrate-to-manifest-only
## Changelog

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |
|---------|------------------------------------------------------------| ---------------------------------------------------------------------------------------------------------------------------- |
| 4.48.1 | [#50410](https://github.com/airbytehq/airbyte/pull/50410) | Java connector build: give ownership of built artifacts to the current image user. |
| 4.48.0 | [#49960](https://github.com/airbytehq/airbyte/pull/49960) | Deprecate airbyte-ci format command |
| 4.47.0 | [#49832](https://github.com/airbytehq/airbyte/pull/49462) | Build java connectors from the base image declared in `metadata.yaml`. |
| 4.46.5 | [#49835](https://github.com/airbytehq/airbyte/pull/49835) | Fix connector language discovery for projects with Kotlin Gradle build scripts. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,11 @@ async def with_airbyte_java_connector(context: ConnectorContext, connector_java_
)
base = with_integration_base_java(context, build_platform).with_entrypoint(["/airbyte/base.sh"])

current_user = (await base.with_exec(["whoami"]).stdout()).strip()
connector_container = (
base.with_workdir("/airbyte")
.with_env_variable("APPLICATION", application)
.with_mounted_directory("built_artifacts", build_stage.directory("/airbyte"))
.with_mounted_directory("built_artifacts", build_stage.directory("/airbyte"), owner=current_user)
.with_exec(sh_dash_c(["mv built_artifacts/* ."]))
)
return await finalize_build(context, connector_container)
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "4.48.0"
version = "4.48.1"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
dockerRepository: airbyte/destination-iceberg-v2
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
githubIssueLabel: destination-iceberg-v2
Expand Down
Loading

0 comments on commit 22aaddc

Please sign in to comment.