Skip to content

Commit

Permalink
Destinations Bigquery, snowflake, redshift: internal CDK refactors fo…
Browse files Browse the repository at this point in the history
…r DV2 (airbytehq#33730)
  • Loading branch information
edgao authored and jatinyadav-cc committed Feb 26, 2024
1 parent 4ce7559 commit 87bb39d
Show file tree
Hide file tree
Showing 39 changed files with 750 additions and 592 deletions.
41 changes: 22 additions & 19 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@

This page will walk through the process of developing with the Java CDK.

- [Developing with the Java CDK](#developing-with-the-java-cdk)
- [Intro to the Java CDK](#intro-to-the-java-cdk)
- [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
- [How is the CDK published?](#how-is-the-cdk-published)
- [Using the Java CDK](#using-the-java-cdk)
- [Building the CDK](#building-the-cdk)
- [Bumping the CDK version](#bumping-the-cdk-version)
- [Publishing the CDK](#publishing-the-cdk)
- [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
- [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
- [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
- [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
- [Common Debugging Tips](#common-debugging-tips)
- [Changelog](#changelog)
- [Java CDK](#java-cdk)
* [Developing with the Java CDK](#developing-with-the-java-cdk)
* [Intro to the Java CDK](#intro-to-the-java-cdk)
* [What is included in the Java CDK?](#what-is-included-in-the-java-cdk)
* [How is the CDK published?](#how-is-the-cdk-published)
* [Using the Java CDK](#using-the-java-cdk)
* [Building the CDK](#building-the-cdk)
* [Bumping the CDK version](#bumping-the-cdk-version)
* [Publishing the CDK](#publishing-the-cdk)
* [Developing Connectors with the Java CDK](#developing-connectors-with-the-java-cdk)
* [Referencing the CDK from Java connectors](#referencing-the-cdk-from-java-connectors)
* [Developing a connector alongside the CDK](#developing-a-connector-alongside-the-cdk)
* [Publishing the CDK and switching to a pinned CDK reference](#publishing-the-cdk-and-switching-to-a-pinned-cdk-reference)
* [Troubleshooting CDK Dependency Caches](#troubleshooting-cdk-dependency-caches)
* [Developing a connector against a pinned CDK version](#developing-a-connector-against-a-pinned-cdk-version)
* [Common Debugging Tips](#common-debugging-tips)
* [Changelog](#changelog)
* [Java CDK](#java-cdk)

## Intro to the Java CDK

Expand Down Expand Up @@ -162,10 +164,11 @@ MavenLocal debugging steps:

### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:--------------------------------------|
| 0.10.4 | 2023-12-20 | [\#33071](https://github.com/airbytehq/airbyte/pull/33071) | Add the ability to parse JDBC parameters with another delimiter than '&' |
| 0.10.3 | 2024-01-03 | [\#33312](https://github.com/airbytehq/airbyte/pull/33312) | Send out count in AirbyteStateMessage |
| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.11.0 | 2024-01-04 | [\#33730](https://github.com/airbytehq/airbyte/pull/33730) | DV2 T+D uses Sql struct to represent transactions; other T+D-related changes |
| 0.10.4 | 2023-12-20 | [\#33071](https://github.com/airbytehq/airbyte/pull/33071) | Add the ability to parse JDBC parameters with another delimiter than '&' |
| 0.10.3 | 2024-01-03 | [\#33312](https://github.com/airbytehq/airbyte/pull/33312) | Send out count in AirbyteStateMessage |
| 0.10.1 | 2023-12-21 | [\#33723](https://github.com/airbytehq/airbyte/pull/33723) | Make memory-manager log message less scary |
| 0.10.0 | 2023-12-20 | [\#33704](https://github.com/airbytehq/airbyte/pull/33704) | JdbcDestinationHandler now properly implements `getInitialRawTableState`; reenable SqlGenerator test |
| 0.9.0 | 2023-12-18 | [\#33124](https://github.com/airbytehq/airbyte/pull/33124) | Make Schema Creation Separate from Table Creation, exclude the T&D module from the CDK |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.10.4
version=0.11.0
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/db-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ java {
dependencies {
// Depends on core CDK classes (OK 👍)
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
testFixturesImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:typing-deduping'))

compileOnly project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
Expand Down Expand Up @@ -189,9 +190,10 @@ private static PartialAirbyteMessage getDummyRecord() {
.withSerialized(dummyDataToInsert.toString());
}

protected DataSource getDataSource(final JsonNode config) {
@VisibleForTesting
public DataSource getDataSource(final JsonNode config) {
final JsonNode jdbcConfig = toJdbcConfig(config);
Map<String, String> connectionProperties = getConnectionProperties(config);
final Map<String, String> connectionProperties = getConnectionProperties(config);
return DataSourceFactory.create(
jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText(),
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
Expand All @@ -201,7 +203,8 @@ protected DataSource getDataSource(final JsonNode config) {
getConnectionTimeout(connectionProperties));
}

protected JdbcDatabase getDatabase(final DataSource dataSource) {
@VisibleForTesting
public JdbcDatabase getDatabase(final DataSource dataSource) {
return new DefaultJdbcDatabase(dataSource);
}

Expand All @@ -227,7 +230,7 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
protected JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database) {
return new JdbcDestinationHandler(databaseName, database);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.cdk.integrations.destination.jdbc.CustomSqlType;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.sql.DatabaseMetaData;
import java.sql.JDBCType;
Expand All @@ -19,6 +20,7 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -111,22 +113,23 @@ record -> record.getTimestamp("min_timestamp"))) {
}

@Override
public void execute(final String sql) throws Exception {
if (sql == null || sql.isEmpty()) {
return;
}
public void execute(final Sql sql) throws Exception {
final List<List<String>> transactions = sql.transactions();
final UUID queryId = UUID.randomUUID();
LOGGER.info("Executing sql {}: {}", queryId, sql);
final long startTime = System.currentTimeMillis();
for (final List<String> transaction : transactions) {
final UUID transactionId = UUID.randomUUID();
LOGGER.info("Executing sql {}-{}: {}", queryId, transactionId, String.join("\n", transaction));
final long startTime = System.currentTimeMillis();

try {
jdbcDatabase.execute(sql);
} catch (final SQLException e) {
LOGGER.error("Sql {} failed", queryId, e);
throw e;
}
try {
jdbcDatabase.executeWithinTransaction(transaction);
} catch (final SQLException e) {
LOGGER.error("Sql {}-{} failed", queryId, transactionId, e);
throw e;
}

LOGGER.info("Sql {} completed in {} ms", queryId, System.currentTimeMillis() - startTime);
LOGGER.info("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime);
}
}

public static Optional<TableDefinition> findExistingTable(final JdbcDatabase jdbcDatabase,
Expand Down
Loading

0 comments on commit 87bb39d

Please sign in to comment.