Skip to content

Commit

Permalink
Merge branch 'master' into btkcodedev/amzsqstolowcode
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev authored Jul 20, 2024
2 parents 51effa5 + 823d36b commit a6635d0
Show file tree
Hide file tree
Showing 1,410 changed files with 26,620 additions and 22,630 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.63.6
current_version = 0.63.8
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/approve-regression-tests-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
--header 'content-type: application/json' \
--data '{
"state": "success",
"context": "Regression tests manual approval",
"context": "Regression Test Results Reviewed and Approved",
"target_url": "https://github.com/airbytehq/airbyte/tree/master/airbyte-ci/connectors/live-tests"
}')
if [ $response -ne 201 ]; then
Expand Down
17 changes: 15 additions & 2 deletions .github/workflows/connectors_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ jobs:
- 'airbyte-integrations/connectors/**/*'
- 'airbyte-cdk/java/**/*'
- 'buildSrc/**/*'
# The Connector CI Tests is a status check emitted by airbyte-ci
# We make it pass once we have determined that there are no changes to the connectors
# The Connector CI Tests & Regression Test Results Reviewed and Approved are status checks emitted by airbyte-ci
# We make them pass once we have determined that there are no changes to the connectors
- name: "Skip Connectors CI tests"
if: steps.changes.outputs.connectors != 'true' && github.event_name == 'pull_request'
run: |
Expand All @@ -57,6 +57,19 @@ jobs:
"context": "Connectors CI tests",
"target_url": "${{ github.event.workflow_run.html_url }}"
}' \
- name: "Skip Regression Test Results Reviewed and Approved"
if: steps.changes.outputs.connectors != 'true' && github.event_name == 'pull_request'
run: |
curl --request POST \
--url https://api.github.com/repos/${{ github.repository }}/statuses/${{ github.event.pull_request.head.sha }} \
--header 'authorization: Bearer ${{ secrets.GITHUB_TOKEN }}' \
--header 'content-type: application/json' \
--data '{
"state": "success",
"description": "[Skipped]",
"context": "Regression Test Results Reviewed and Approved",
"target_url": "${{ github.event.workflow_run.html_url }}"
}' \
connectors_ci:
needs: changes
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/live_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ on:
description: Use the local CDK when building the target connector
default: "false"
type: boolean
connection_subset:
description: The subset of connections to select from.
required: true
type: choice
options:
- sandboxes
- all

jobs:
live_tests:
Expand Down Expand Up @@ -86,6 +93,11 @@ jobs:
echo "USE_LOCAL_CDK_FLAG=" >> $GITHUB_ENV
fi
- name: Setup Connection Subset Option
if: github.event_name == 'workflow_dispatch'
run: |
echo "CONNECTION_SUBSET=--connector_live_tests.connection-subset=${{ github.event.inputs.connection_subset }}" >> $GITHUB_ENV
- name: Run Live Tests [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch' # TODO: consider using the matrix strategy (https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs). See https://github.com/airbytehq/airbyte/pull/37659#discussion_r1583380234 for details.
uses: ./.github/actions/run-airbyte-ci
Expand All @@ -102,4 +114,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=all --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=all --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }} ${{ env.CONNECTION_SUBSET }}
4 changes: 2 additions & 2 deletions .github/workflows/publish_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
runs-on: connector-publish-large
steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Publish modified connectors [On merge to master]
id: publish-modified-connectors
if: github.event_name == 'push'
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:
if: ${{ failure() && github.ref == 'refs/heads/master' }}
steps:
- name: Checkout Airbyte
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Match GitHub User to Slack User
id: match-github-to-slack-user
uses: ./.github/actions/match-github-to-slack-user
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ on:
description: Use the local CDK when building the target connector
default: "false"
type: boolean
connection_subset:
description: The subset of connections to select from.
required: true
type: choice
options:
- sandboxes
- all

jobs:
regression_tests:
Expand Down Expand Up @@ -86,6 +93,11 @@ jobs:
echo "USE_LOCAL_CDK_FLAG=" >> $GITHUB_ENV
fi
- name: Setup Connection Subset Option
if: github.event_name == 'workflow_dispatch'
run: |
echo "CONNECTION_SUBSET=--connector_live_tests.connection-subset=${{ github.event.inputs.connection_subset }}" >> $GITHUB_ENV
- name: Run Regression Tests [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch' # TODO: consider using the matrix strategy (https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs). See https://github.com/airbytehq/airbyte/pull/37659#discussion_r1583380234 for details.
uses: ./.github/actions/run-airbyte-ci
Expand All @@ -102,4 +114,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=regression --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=regression --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }} ${{ env.CONNECTION_SUBSET }}
2 changes: 1 addition & 1 deletion .github/workflows/run-mypy-on-modified-cdk-files.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Connector Extensibility - Run mypy on modified cdk files
name: Python CDK - Run mypy on modified cdk files

on:
pull_request:
Expand Down
8 changes: 8 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.42.2 | 2024-07-04 | [\#40208](https://github.com/airbytehq/airbyte/pull/40208) | Implement a new connector error handling and translation framework |
| 0.41.8 | 2024-07-18 | [\#42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics message for WASS occurrence. |
| 0.41.7 | 2024-07-17 | [\#42055](https://github.com/airbytehq/airbyte/pull/42055) | Add debezium heartbeat timeout back to shutdown debezium. |
| 0.41.6 | 2024-07-17 | [\#41996](https://github.com/airbytehq/airbyte/pull/41996) | Fix java interop compilation issue in Config/TransientErrorException. |
| 0.41.5 | 2024-07-16 | [\#42011] (https://github.com/airbytehq/airbyte/pull/42011) | Async consumer accepts null default namespace |
| 0.41.4 | 2024-07-15 | [\#41959](https://github.com/airbytehq/airbyte/pull/41959) | Allow setting `internal_message` in Config/TransientErrorException. Destinations: shorten error message for INCOMPLETE stream status. |
| 0.41.3 | 2024-07-15 | [\#41680](https://github.com/airbytehq/airbyte/pull/41680) | Fix: CompletableFutures.allOf now handles empty list and `Throwable` |
| 0.41.2 | 2024-07-12 | [\#40567](https://github.com/airbytehq/airbyte/pull/40567) | Fix BaseSqlGenerator test case (generation_id support); update minimum platform version for refreshes support. |
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm |
| 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ object DbAnalyticsUtils {
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"
const val DEBEZIUM_CLOSE_REASON_KEY = "db-sources-debezium-close-reason"
const val WASS_OCCURRENCE_KEY = "db-sources-wass-occurrence"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
Expand All @@ -39,4 +40,9 @@ object DbAnalyticsUtils {
fun debeziumCloseReasonMessage(reason: String): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(DEBEZIUM_CLOSE_REASON_KEY).withValue(reason)
}

@JvmStatic
fun wassOccurrenceMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(WASS_OCCURRENCE_KEY).withValue("1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
*/
package io.airbyte.cdk.integrations.base

import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.commons.exceptions.TransientErrorException
import io.airbyte.commons.stream.AirbyteStreamStatusHolder
import io.airbyte.protocol.models.v0.*
import java.time.Instant
Expand All @@ -16,12 +18,30 @@ object AirbyteTraceMessageUtility {

@JvmStatic
fun emitConfigErrorTrace(e: Throwable, displayMessage: String?) {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
if (e is ConfigErrorException) {
emitErrorTrace(
e,
displayMessage,
AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR,
e.internalMessage,
)
} else {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.CONFIG_ERROR)
}
}

@JvmStatic
fun emitTransientErrorTrace(e: Throwable, displayMessage: String?) {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
if (e is TransientErrorException) {
emitErrorTrace(
e,
displayMessage,
AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR,
e.internalMessage,
)
} else {
emitErrorTrace(e, displayMessage, AirbyteErrorTraceMessage.FailureType.TRANSIENT_ERROR)
}
}

fun emitCustomErrorTrace(displayMessage: String?, internalMessage: String?) {
Expand Down Expand Up @@ -70,9 +90,10 @@ object AirbyteTraceMessageUtility {
fun emitErrorTrace(
e: Throwable,
displayMessage: String?,
failureType: AirbyteErrorTraceMessage.FailureType
failureType: AirbyteErrorTraceMessage.FailureType,
internalMessage: String = "",
) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType))
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType, internalMessage))
}

@JvmStatic
Expand All @@ -99,15 +120,22 @@ object AirbyteTraceMessageUtility {
fun makeErrorTraceAirbyteMessage(
e: Throwable,
displayMessage: String?,
failureType: AirbyteErrorTraceMessage.FailureType
failureType: AirbyteErrorTraceMessage.FailureType,
internalMessage: String = "",
): AirbyteMessage {
val actualInternalMessage: String =
if (internalMessage.isEmpty()) {
e.toString()
} else {
internalMessage + "\n" + e.toString()
}
return makeAirbyteMessageFromTraceMessage(
makeAirbyteTraceMessage(AirbyteTraceMessage.Type.ERROR)
.withError(
AirbyteErrorTraceMessage()
.withFailureType(failureType)
.withMessage(displayMessage)
.withInternalMessage(e.toString())
.withInternalMessage(actualInternalMessage)
.withStackTrace(ExceptionUtils.getStackTrace(e))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Preconditions
import com.google.common.collect.Lists
import datadog.trace.api.Trace
import io.airbyte.cdk.integrations.util.ApmTraceUtils
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil
import io.airbyte.cdk.integrations.util.ConnectorExceptionHandler
import io.airbyte.cdk.integrations.util.concurrent.ConcurrentStreamConsumer
import io.airbyte.commons.features.EnvVariableFeatureFlags
import io.airbyte.commons.features.FeatureFlags
Expand Down Expand Up @@ -110,17 +109,24 @@ internal constructor(

@Trace(operationName = "RUN_OPERATION")
@Throws(Exception::class)
fun run(args: Array<String>) {
@JvmOverloads
fun run(
args: Array<String>,
exceptionHandler: ConnectorExceptionHandler = ConnectorExceptionHandler()
) {
val parsed = cliParser.parse(args)
try {
runInternal(parsed)
runInternal(parsed, exceptionHandler)
} catch (e: Exception) {
throw e
}
}

@Throws(Exception::class)
private fun runInternal(parsed: IntegrationConfig) {
private fun runInternal(
parsed: IntegrationConfig,
exceptionHandler: ConnectorExceptionHandler
) {
LOGGER.info { "Running integration: ${integration.javaClass.name}" }
LOGGER.info { "Command: ${parsed.command}" }
LOGGER.info { "Integration config: $parsed" }
Expand Down Expand Up @@ -213,59 +219,8 @@ internal constructor(
}
}
} catch (e: Exception) {
LOGGER.error(e) { "caught exception!" }
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An
// attempt is made
// to
// find the root exception that corresponds to a configuration error. If that does not
// exist, we
// just return the original exception.
ApmTraceUtils.addExceptionToTrace(e)
val rootConfigErrorThrowable = ConnectorExceptionUtil.getRootConfigError(e)
val rootTransientErrorThrowable = ConnectorExceptionUtil.getRootTransientError(e)
// If the source connector throws a config error, a trace message with the relevant
// message should
// be surfaced.
if (parsed.command == Command.CHECK) {
// Currently, special handling is required for the CHECK case since the user display
// information in
// the trace message is
// not properly surfaced to the FE. In the future, we can remove this and just throw
// an exception.
outputRecordCollector.accept(
AirbyteMessage()
.withType(AirbyteMessage.Type.CONNECTION_STATUS)
.withConnectionStatus(
AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage(
ConnectorExceptionUtil.getDisplayMessage(
rootConfigErrorThrowable
)
)
)
)
return
}

if (ConnectorExceptionUtil.isConfigError(rootConfigErrorThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(
e,
ConnectorExceptionUtil.getDisplayMessage(rootConfigErrorThrowable),
)
// On receiving a config error, the container should be immediately shut down.
System.exit(1)
} else if (ConnectorExceptionUtil.isTransientError(rootTransientErrorThrowable)) {
AirbyteTraceMessageUtility.emitTransientErrorTrace(
e,
ConnectorExceptionUtil.getDisplayMessage(rootTransientErrorThrowable)
)
// On receiving a transient error, the container should be immediately shut down.
System.exit(1)
}
throw e
exceptionHandler.handleException(e, parsed.command, outputRecordCollector)
}

LOGGER.info { "Completed integration: ${integration.javaClass.name}" }
}

Expand Down
Loading

0 comments on commit a6635d0

Please sign in to comment.