Skip to content

Commit

Permalink
Merge branch 'master' into daryna/source-jira/fix-pk
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko authored Jun 18, 2024
2 parents ff3cdc7 + dfc7e65 commit 5181623
Show file tree
Hide file tree
Showing 410 changed files with 18,375 additions and 6,236 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.63.0
current_version = 0.63.1
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- airbyte-ci/connectors/pipelines/**
- airbyte-ci/connectors/base_images/**
- airbyte-ci/connectors/common_utils/**
- airbyte-ci/connectors/connectors_insights/**
- airbyte-ci/connectors/connector_ops/**
- airbyte-ci/connectors/connectors_qa/**
- airbyte-ci/connectors/ci_credentials/**
Expand Down
49 changes: 49 additions & 0 deletions .github/workflows/connectors_insights.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: Connectors Insights

on:
schedule:
- cron: "0 0,12 * * *" # Run every 12 hours UTC
workflow_dispatch:

jobs:
connectors_insights:
name: Connectors Insights generation
runs-on: connector-nightly-xlarge
timeout-minutes: 1440 # 24 hours
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Get Dagger Engine Image
uses: ./.github/actions/get-dagger-engine-image
with:
dagger_engine_image: "registry.dagger.io/engine:v0.9.6"
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install Poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}
- name: Install dependencies
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry -C airbyte-ci/connectors/connectors_insights install --no-interaction --no-root
- name: Install project
run: poetry -C airbyte-ci/connectors/connectors_insights install --no-interaction
- name: Write Google service account key to file
run: echo "$GCP_SA_KEY" > $HOME/gcp-sa-key.json
env:
GCP_SA_KEY: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
- name: Set GOOGLE_APPLICATION_CREDENTIALS
run: echo "GOOGLE_APPLICATION_CREDENTIALS=$HOME/gcp-sa-key.json" >> $GITHUB_ENV
- name: Run connectors insights
run: |
poetry -C airbyte-ci/connectors/connectors_insights run connectors-insights generate --gcs-uri=gs://prod-airbyte-cloud-connector-metadata-service/connector_insights --connector-directory airbyte-integrations/connectors/ --concurrency 10
48 changes: 27 additions & 21 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

This page will walk through the process of developing with the Java CDK.

* [Developing with the Java CDK](#developing-with-the-java-cdk)
* [Intro to the Java CDK](#intro-to-the-java-cdk)
* [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
* [How is the CDK published?](#how-is-the-cdk-published)
* [Using the Java CDK](#using-the-java-cdk)
* [Building the CDK](#building-the-cdk)
* [Bumping the CDK version](#bumping-the-cdk-version)
* [Publishing the CDK](#publishing-the-cdk)
* [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
* [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
* [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
* [Publishing the CDK and switching to a pinned CDK reference](#publishing-the-cdk-and-switching-to-a-pinned-cdk-reference)
* [Troubleshooting CDK Dependency Caches](#troubleshooting-cdk-dependency-caches)
* [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
* [Changelog](#changelog)
* [Java CDK](#java-cdk)
- [Developing with the Java CDK](#developing-with-the-java-cdk)
- [Intro to the Java CDK](#intro-to-the-java-cdk)
- [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
- [How is the CDK published?](#how-is-the-cdk-published)
- [Using the Java CDK](#using-the-java-cdk)
- [Building the CDK](#building-the-cdk)
- [Bumping the CDK version](#bumping-the-cdk-version)
- [Publishing the CDK](#publishing-the-cdk)
- [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
- [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
- [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
- [Publishing the CDK and switching to a pinned CDK reference](#publishing-the-cdk-and-switching-to-a-pinned-cdk-reference)
- [Troubleshooting CDK Dependency Caches](#troubleshooting-cdk-dependency-caches)
- [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
- [Changelog](#changelog)
- [Java CDK](#java-cdk)

## Intro to the Java CDK

Expand Down Expand Up @@ -173,11 +173,17 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
|:--------| :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |
| 0.36.6 | 2024-06-05 | [\#39106](https://github.com/airbytehq/airbyte/pull/39106) | Skip write to storage with 0 byte file |
| 0.36.5 | 2024-06-01 | [\#38792](https://github.com/airbytehq/airbyte/pull/38792) | Throw config exception if no selectable table exists in user provided schemas |
| 0.36.4 | 2024-05-31 | [\#38824](https://github.com/airbytehq/airbyte/pull/38824) | Param marked as non-null to nullable in JdbcDestinationHandler for NPE fix |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -25,4 +26,11 @@ object DbAnalyticsUtils {
.withType(DATA_TYPES_SERIALIZATION_ERROR_KEY)
.withValue("1")
}

@JvmStatic
fun cdcSnapshotForceShutdownMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage()
.withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY)
.withValue("1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object AirbyteTraceMessageUtility {
outputRecordCollector.accept(message)
}

private fun makeErrorTraceAirbyteMessage(
fun makeErrorTraceAirbyteMessage(
e: Throwable,
displayMessage: String?,
failureType: AirbyteErrorTraceMessage.FailureType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,16 @@ constructor(

bufferManager.close()

val unsuccessfulStreams = ArrayList<StreamDescriptor>()
val streamSyncSummaries =
streamNames.associate { streamDescriptor ->
// If we didn't receive a stream status message, assume success.
// Platform won't send us any stream status messages yet (since we're not declaring
// supportsRefresh in metadata), so we will always hit this case.
// If we didn't receive a stream status message, assume failure.
// This is possible if e.g. the orchestrator crashes before sending us the message.
val terminalStatusFromSource =
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.INCOMPLETE
if (terminalStatusFromSource == AirbyteStreamStatus.INCOMPLETE) {
unsuccessfulStreams.add(streamDescriptor)
}
StreamDescriptorUtils.withDefaultNamespace(
streamDescriptor,
bufferManager.defaultNamespace,
Expand All @@ -183,6 +186,17 @@ constructor(
// as this throws an exception, we need to be after all other close functions.
propagateFlushWorkerExceptionIfPresent()
logger.info { "${AsyncStreamConsumer::class.java} closed" }

// In principle, platform should detect this.
// However, as a backstop, the destination should still do this check.
// This handles e.g. platform bugs where we don't receive a stream status message.
// In this case, it would be misleading to mark the sync as successful, because e.g. we
// maybe didn't commit a truncate.
if (unsuccessfulStreams.isNotEmpty()) {
throw RuntimeException(
"Some streams were unsuccessful due to a source error: $unsuccessfulStreams"
)
}
}

private fun getRecordCounter(streamDescriptor: StreamDescriptor): AtomicLong {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.37.3
version=0.40.1
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,26 @@ class AsyncStreamConsumerTest {
),
),
)
private val STREAM2_SUCCESS_MESSAGE =
Jsons.serialize(
AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
.withTrace(
AirbyteTraceMessage()
.withType(AirbyteTraceMessage.Type.STREAM_STATUS)
.withStreamStatus(
AirbyteStreamStatusTraceMessage()
.withStreamDescriptor(
StreamDescriptor()
.withName(STREAM_NAME2)
.withNamespace(SCHEMA_NAME),
)
.withStatus(
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE
),
),
),
)
private val STREAM2_FAILURE_MESSAGE =
Jsons.serialize(
AirbyteMessage()
Expand Down Expand Up @@ -262,6 +282,9 @@ class AsyncStreamConsumerTest {
consumer.start()
consumeRecords(consumer, expectedRecords)
consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES)
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()

verifyStartAndClose()
Expand Down Expand Up @@ -298,6 +321,9 @@ class AsyncStreamConsumerTest {
consumeRecords(consumer, expectedRecords)
consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES)
consumer.accept(Jsons.serialize(STATE_MESSAGE2), RECORD_SIZE_20_BYTES)
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()

verifyStartAndClose()
Expand Down Expand Up @@ -334,6 +360,9 @@ class AsyncStreamConsumerTest {

consumer.start()
consumeRecords(consumer, allRecords)
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()

verifyStartAndClose()
Expand Down Expand Up @@ -496,7 +525,8 @@ class AsyncStreamConsumerTest {
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
consumer.accept(STREAM2_FAILURE_MESSAGE, STREAM2_FAILURE_MESSAGE.length)
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
consumer.close()
// We had a failure, so expect an exception
assertThrows(RuntimeException::class.java) { consumer.close() }

val captor: ArgumentCaptor<Map<StreamDescriptor, StreamSyncSummary>> =
ArgumentCaptor.captor()
Expand Down Expand Up @@ -532,29 +562,29 @@ class AsyncStreamConsumerTest {
consumer.start()
consumeRecords(consumer, expectedRecords)
// Note: no stream status messages
consumer.close()
// We assume stream failure, so expect an exception
assertThrows(RuntimeException::class.java) { consumer.close() }

val captor: ArgumentCaptor<Map<StreamDescriptor, StreamSyncSummary>> =
ArgumentCaptor.captor()
Mockito.verify(onClose).accept(any(), capture(captor))
assertEquals(
// All streams have a COMPLETE status.
// TODO: change this to INCOMPLETE after we switch the default behavior.
// All streams have an INCOMPLETE status.
mapOf(
StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) to
StreamSyncSummary(
expectedRecords.size.toLong(),
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
),
StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME2) to
StreamSyncSummary(
0,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
),
StreamDescriptor().withNamespace(DEFAULT_NAMESPACE).withName(STREAM_NAME3) to
StreamSyncSummary(
0,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
),
),
captor.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core')

api 'org.postgresql:postgresql:42.6.0'
api 'org.postgresql:postgresql:42.6.2'

testFixturesApi testFixtures(project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core'))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ abstract class JdbcDestinationHandler<DestinationState>(
streamConfig,
finalTableDefinition.isPresent,
initialRawTableState,
// TODO fix this
// for now, no JDBC destinations actually do refreshes
// so this is just to make our code compile
InitialRawTableStatus(false, false, Optional.empty()),
isSchemaMismatch,
isFinalTableEmpty,
destinationState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import java.util.*
abstract class DebeziumPropertiesManager(
private val properties: Properties,
private val config: JsonNode,
private val catalog: ConfiguredAirbyteCatalog
private val catalog: ConfiguredAirbyteCatalog,
private val streamNames: List<String>
) {
fun getDebeziumProperties(offsetManager: AirbyteFileOffsetBackingStore): Properties {
return getDebeziumProperties(offsetManager, Optional.empty())
Expand Down Expand Up @@ -73,7 +74,7 @@ abstract class DebeziumPropertiesManager(
// following
props.setProperty("value.converter.replace.null.with.default", "false")
// includes
props.putAll(getIncludeConfiguration(catalog, config))
props.putAll(getIncludeConfiguration(catalog, config, streamNames))

return props
}
Expand All @@ -84,7 +85,8 @@ abstract class DebeziumPropertiesManager(

protected abstract fun getIncludeConfiguration(
catalog: ConfiguredAirbyteCatalog,
config: JsonNode?
config: JsonNode?,
streamNames: List<String>
): Properties

companion object {
Expand Down
Loading

0 comments on commit 5181623

Please sign in to comment.