From 9d4d80dfcfd345eb9893957d9c2da652896f0785 Mon Sep 17 00:00:00 2001 From: Joe Bell Date: Wed, 13 Mar 2024 21:55:15 -0700 Subject: [PATCH] fmt --- .../build.gradle | 6 +- .../gradle.properties | 1 + .../metadata.yaml | 16 ++- ...trictEncryptDestinationAcceptanceTest.java | 37 +++--- .../src/test/resources/expected_spec.json | 120 ++++++++++++++++++ .../destination-oracle/build.gradle | 6 +- .../destination-oracle/gradle.properties | 1 + .../destination-oracle/metadata.yaml | 16 ++- .../destination/oracle/OracleDestination.java | 49 +++++-- .../oracle/OracleNameTransformer.java | 1 + .../destination/oracle/OracleOperations.java | 62 +++++---- .../src/main/resources/spec.json | 6 + .../NneOracleDestinationAcceptanceTest.java | 6 +- .../SshOracleDestinationAcceptanceTest.java | 5 +- ...ryptedOracleDestinationAcceptanceTest.java | 42 +++--- .../destinations/oracle-migrations.md | 65 ++++++++++ docs/integrations/destinations/oracle.md | 3 +- 17 files changed, 349 insertions(+), 93 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-oracle-strict-encrypt/gradle.properties create mode 100644 airbyte-integrations/connectors/destination-oracle/gradle.properties create mode 100644 docs/integrations/destinations/oracle-migrations.md diff --git a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/build.gradle index 0e940345ab00..05387567bbec 100644 --- a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.2.0' - features = ['db-destinations'] - useLocalCdk = false + cdkVersionRequired = '0.24.1' + features = ['db-destinations', 's3-destinations', 'typing-deduping'] + useLocalCdk = true } //remove once upgrading the CDK version to 0.4.x or later diff --git a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/gradle.properties b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/gradle.properties new file mode 100644 index 000000000000..6a0ea27448c4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/gradle.properties @@ -0,0 +1 @@ +junitMethodExecutionTimeout = 30 m diff --git a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/metadata.yaml index bd323c6fdbc5..fd5dde4bdc75 100644 --- a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/metadata.yaml @@ -7,17 +7,23 @@ data: connectorSubtype: database connectorType: destination definitionId: 3986776d-2319-4de9-8af8-db14c0996e72 - dockerImageTag: 0.2.0 + dockerImageTag: 1.0.0 dockerRepository: airbyte/destination-oracle-strict-encrypt githubIssueLabel: destination-oracle icon: oracle.svg license: ELv2 name: Oracle - normalizationConfig: - normalizationIntegrationType: oracle - normalizationRepository: airbyte/normalization-oracle - normalizationTag: 0.4.1 releaseStage: alpha + releases: + breakingChanges: + 1.0.0: + upgradeDeadline: "2024-03-15" + message: > + This version removes the option to use "normalization" with Oracle. It also changes + the schema and database of Airbyte's "raw" tables to be compatible with the new + [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) + format. These changes will likely require updates to downstream dbt / SQL models. + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. documentationUrl: https://docs.airbyte.com/integrations/destinations/oracle supportsDbt: true tags: diff --git a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java index 72e2a11ce32f..53ec2bca11ae 100644 --- a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java @@ -17,11 +17,11 @@ import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; -import io.airbyte.integrations.destination.oracle.OracleDestination; import io.airbyte.integrations.destination.oracle.OracleNameTransformer; import java.sql.SQLException; import java.util.ArrayList; @@ -73,7 +73,7 @@ protected List retrieveRecords(final TestDestinationEnv env, return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() .map(r -> Jsons.deserialize( - r.get(OracleDestination.COLUMN_NAME_DATA.replace("\"", "")).asText())) + r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); } @@ -113,16 +113,15 @@ protected List resolveIdentifier(final String identifier) { private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { - final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT); - - try (final DSLContext dslContext = getDslContext(config)) { - final List result = getDatabase(dslContext).query(ctx -> ctx.fetch(query).stream().toList()); - return result - .stream() - .map(r -> r.formatJSON(JSON_FORMAT)) - .map(Jsons::deserialize) - .collect(Collectors.toList()); - } + final String query = + String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase()); + final DSLContext dslContext = getDslContext(config); + final List result = getDatabase(dslContext).query(ctx -> ctx.fetch(query).stream().toList()); + return result + .stream() + .map(r -> r.formatJSON(JSON_FORMAT)) + .map(Jsons::deserialize) + .collect(Collectors.toList()); } private static Database getDatabase(final DSLContext dslContext) { @@ -151,15 +150,13 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet TES db.start(); config = getConfig(db); + final DSLContext dslContext = getDslContext(config); + final Database database = getDatabase(dslContext); + database.query( + ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName))); + database.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName))); - try (final DSLContext dslContext = getDslContext(config)) { - final Database database = getDatabase(dslContext); - database.query( - ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName))); - database.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName))); - - ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, dbName); - } + ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, dbName); } @Override diff --git a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test/resources/expected_spec.json index 86b2da9a042e..b6badb3b7228 100644 --- a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test/resources/expected_spec.json @@ -59,6 +59,126 @@ "examples": ["airbyte"], "default": "airbyte", "order": 6 + }, + "raw_data_schema": { + "type": "string", + "description": "The schema to write raw tables into (default: airbyte_internal)", + "title": "Raw Table Schema Name", + "order": 7 + }, + "tunnel_method": { + "type": "object", + "title": "SSH Tunnel Method", + "description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.", + "oneOf": [ + { + "title": "No Tunnel", + "required": ["tunnel_method"], + "properties": { + "tunnel_method": { + "description": "No ssh tunnel needed to connect to database", + "type": "string", + "const": "NO_TUNNEL", + "order": 0 + } + } + }, + { + "title": "SSH Key Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "ssh_key" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and ssh key", + "type": "string", + "const": "SSH_KEY_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_user": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host.", + "type": "string", + "order": 3 + }, + "ssh_key": { + "title": "SSH Private Key", + "description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )", + "type": "string", + "airbyte_secret": true, + "multiline": true, + "order": 4 + } + } + }, + { + "title": "Password Authentication", + "required": [ + "tunnel_method", + "tunnel_host", + "tunnel_port", + "tunnel_user", + "tunnel_user_password" + ], + "properties": { + "tunnel_method": { + "description": "Connect through a jump server tunnel host using username and password authentication", + "type": "string", + "const": "SSH_PASSWORD_AUTH", + "order": 0 + }, + "tunnel_host": { + "title": "SSH Tunnel Jump Server Host", + "description": "Hostname of the jump server host that allows inbound ssh tunnel.", + "type": "string", + "order": 1 + }, + "tunnel_port": { + "title": "SSH Connection Port", + "description": "Port on the proxy/jump server that accepts inbound ssh connections.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 22, + "examples": ["22"], + "order": 2 + }, + "tunnel_user": { + "title": "SSH Login Username", + "description": "OS-level username for logging into the jump server host", + "type": "string", + "order": 3 + }, + "tunnel_user_password": { + "title": "Password", + "description": "OS-level password for logging into the jump server host", + "type": "string", + "airbyte_secret": true, + "order": 4 + } + } + } + ] } } } diff --git a/airbyte-integrations/connectors/destination-oracle/build.gradle b/airbyte-integrations/connectors/destination-oracle/build.gradle index a192ee34744a..d53ebb8394ae 100644 --- a/airbyte-integrations/connectors/destination-oracle/build.gradle +++ b/airbyte-integrations/connectors/destination-oracle/build.gradle @@ -4,9 +4,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.2.0' - features = ['db-destinations'] - useLocalCdk = false + cdkVersionRequired = '0.24.1' + features = ['db-destinations', 's3-destinations', 'typing-deduping'] + useLocalCdk = true } //remove once upgrading the CDK version to 0.4.x or later diff --git a/airbyte-integrations/connectors/destination-oracle/gradle.properties b/airbyte-integrations/connectors/destination-oracle/gradle.properties new file mode 100644 index 000000000000..6a0ea27448c4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-oracle/gradle.properties @@ -0,0 +1 @@ +junitMethodExecutionTimeout = 30 m diff --git a/airbyte-integrations/connectors/destination-oracle/metadata.yaml b/airbyte-integrations/connectors/destination-oracle/metadata.yaml index f6f1acf44c37..a4bc8d5420ff 100644 --- a/airbyte-integrations/connectors/destination-oracle/metadata.yaml +++ b/airbyte-integrations/connectors/destination-oracle/metadata.yaml @@ -2,16 +2,12 @@ data: connectorSubtype: database connectorType: destination definitionId: 3986776d-2319-4de9-8af8-db14c0996e72 - dockerImageTag: 0.2.0 + dockerImageTag: 1.0.0 dockerRepository: airbyte/destination-oracle githubIssueLabel: destination-oracle icon: oracle.svg license: ELv2 name: Oracle - normalizationConfig: - normalizationIntegrationType: oracle - normalizationRepository: airbyte/normalization-oracle - normalizationTag: 0.4.3 registries: cloud: dockerRepository: airbyte/destination-oracle-strict-encrypt @@ -21,6 +17,16 @@ data: releaseStage: alpha documentationUrl: https://docs.airbyte.com/integrations/destinations/oracle supportsDbt: true + releases: + breakingChanges: + 1.0.0: + upgradeDeadline: "2024-03-15" + message: > + This version removes the option to use "normalization" with Oracle. It also changes + the schema and database of Airbyte's "raw" tables to be compatible with the new + [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) + format. These changes will likely require updates to downstream dbt / SQL models. + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. tags: - language:java ab_internal: diff --git a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java index 9a515ef1f74e..4d0b4d205d47 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java +++ b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleDestination.java @@ -7,20 +7,30 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination; import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import java.io.IOException; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomStringUtils; +import org.jooq.SQLDialect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,13 +39,6 @@ public class OracleDestination extends AbstractJdbcDestination implements Destin private static final Logger LOGGER = LoggerFactory.getLogger(OracleDestination.class); public static final String DRIVER_CLASS = DatabaseDriver.ORACLE.getDriverClassName(); - public static final String COLUMN_NAME_AB_ID = - "\"" + JavaBaseConstants.COLUMN_NAME_AB_ID.toUpperCase() + "\""; - public static final String COLUMN_NAME_DATA = - "\"" + JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase() + "\""; - public static final String COLUMN_NAME_EMITTED_AT = - "\"" + JavaBaseConstants.COLUMN_NAME_EMITTED_AT.toUpperCase() + "\""; - protected static final String KEY_STORE_FILE_PATH = "clientkeystore.jks"; private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8); public static final String ENCRYPTION_METHOD_KEY = "encryption_method"; @@ -134,6 +137,36 @@ private static void tryConvertAndImportCertificate(final String certificate) { } } + @Override + public boolean isV2Destination() { + return true; + } + + @Override + protected boolean shouldAlwaysDisableTypeDedupe() { + return true; + } + + @Override + protected JdbcSqlGenerator getSqlGenerator() { + return new RawOnlySqlGenerator(new OracleNameTransformer()); + } + + @Override + protected JdbcDestinationHandler getDestinationHandler(final String databaseName, + final JdbcDatabase database, + final String rawTableSchema) { + return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT); + } + + @Override + protected List getMigrations(final JdbcDatabase database, + final String databaseName, + final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler) { + return List.of(); + } + private static void convertAndImportCertificate(final String certificate) throws IOException, InterruptedException { final Runtime run = Runtime.getRuntime(); diff --git a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java index ace575355050..18fbe6995071 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java +++ b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleNameTransformer.java @@ -17,6 +17,7 @@ public String applyDefaultCase(final String input) { } @Override + @Deprecated public String getRawTableName(final String streamName) { return convertStreamName("airbyte_raw_" + streamName); } diff --git a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleOperations.java b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleOperations.java index 468e33bd7345..60f197dc9801 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleOperations.java +++ b/airbyte-integrations/connectors/destination-oracle/src/main/java/io/airbyte/integrations/destination/oracle/OracleOperations.java @@ -4,15 +4,18 @@ package io.airbyte.integrations.destination.oracle; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.upperQuoted; + import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; +import java.sql.Types; import java.time.Instant; import java.util.List; import java.util.UUID; @@ -58,18 +61,25 @@ public void createTableIfNotExists(final JdbcDatabase database, final String sch @Override public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) { return String.format( - "CREATE TABLE %s.%s ( \n" - + "%s VARCHAR(64) PRIMARY KEY,\n" - + "%s NCLOB,\n" - + "%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n" - + ")", + """ + CREATE TABLE %s.%s ( + %s VARCHAR(64) PRIMARY KEY, + %s JSON, + %s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + %s TIMESTAMP WITH TIME ZONE DEFAULT NULL, + %s JSON + ) + """, schemaName, tableName, - OracleDestination.COLUMN_NAME_AB_ID, OracleDestination.COLUMN_NAME_DATA, OracleDestination.COLUMN_NAME_EMITTED_AT, - OracleDestination.COLUMN_NAME_DATA); + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID), + upperQuoted(JavaBaseConstants.COLUMN_NAME_DATA), + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT), + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT), + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_META)); } private boolean tableExists(final JdbcDatabase database, final String schemaName, final String tableName) throws Exception { - final Integer count = database.queryInt("select count(*) \n from all_tables\n where upper(owner) = upper(?) and upper(table_name) = upper(?)", + final int count = database.queryInt("select count(*) \n from all_tables\n where upper(owner) = upper(?) and upper(table_name) = upper(?)", schemaName, tableName); return count == 1; } @@ -94,23 +104,25 @@ public String truncateTableQuery(final JdbcDatabase database, final String schem @Override public void insertRecords(final JdbcDatabase database, - final List records, + final List records, final String schemaName, final String tempTableName) throws Exception { final String tableName = String.format("%s.%s", schemaName, tempTableName); - final String columns = String.format("(%s, %s, %s)", - OracleDestination.COLUMN_NAME_AB_ID, OracleDestination.COLUMN_NAME_DATA, OracleDestination.COLUMN_NAME_EMITTED_AT); - final String recordQueryComponent = "(?, ?, ?)\n"; - insertRawRecordsInSingleQuery(tableName, columns, recordQueryComponent, database, records, UUID::randomUUID); + final String columns = String.format("(%s, %s, %s, %s, %s)", + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID), + upperQuoted(JavaBaseConstants.COLUMN_NAME_DATA), + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT), + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT), + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_META)); + insertRawRecordsInSingleQuery(tableName, columns, database, records, UUID::randomUUID); } // Adapted from SqlUtils.insertRawRecordsInSingleQuery to meet some needs specific to Oracle syntax private static void insertRawRecordsInSingleQuery(final String tableName, final String columns, - final String recordQueryComponent, final JdbcDatabase jdbcDatabase, - final List records, + final List records, final Supplier uuidSupplier) throws SQLException { if (records.isEmpty()) { @@ -129,20 +141,22 @@ private static void insertRawRecordsInSingleQuery(final String tableName, // The "SELECT 1 FROM DUAL" at the end is a formality to satisfy the needs of the Oracle syntax. // (see https://stackoverflow.com/a/93724 for details) final StringBuilder sql = new StringBuilder("INSERT ALL "); - records.forEach(r -> sql.append(String.format("INTO %s %s VALUES %s", tableName, columns, recordQueryComponent))); + records.forEach(r -> sql.append(String.format("INTO %s %s VALUES %s", tableName, columns, "(?, ?, ?, ?)\n"))); sql.append(" SELECT 1 FROM DUAL"); final String query = sql.toString(); try (final PreparedStatement statement = connection.prepareStatement(query)) { // second loop: bind values to the SQL string. int i = 1; - for (final AirbyteRecordMessage message : records) { + for (final PartialAirbyteMessage message : records) { // 1-indexed - final JsonNode formattedData = StandardNameTransformer.formatJsonPath(message.getData()); - statement.setString(i, uuidSupplier.get().toString()); - statement.setString(i + 1, Jsons.serialize(formattedData)); - statement.setTimestamp(i + 2, Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt()))); - i += 3; + final JsonNode formattedData = StandardNameTransformer.formatJsonPath(message.getRecord().getData()); + statement.setString(i++, uuidSupplier.get().toString()); + statement.setObject(i++, formattedData); + // statement.setString(i++, Jsons.serialize(formattedData)); + statement.setTimestamp(i++, Timestamp.from(Instant.ofEpochMilli(message.getRecord().getEmittedAt()))); + statement.setNull(i++, Types.TIMESTAMP); + statement.setObject(i++, ""); } statement.execute(); diff --git a/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json index 35aa4090b786..c5cd4f20adf2 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-oracle/src/main/resources/spec.json @@ -120,6 +120,12 @@ } } ] + }, + "raw_data_schema": { + "type": "string", + "description": "The schema to write raw tables into (default: airbyte_internal)", + "title": "Raw Table Schema Name", + "order": 7 } } } diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java index fc946fdddae9..599c50b76a5b 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java @@ -18,6 +18,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.commons.json.Jsons; import java.sql.SQLException; +import java.time.Duration; import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; @@ -43,7 +44,8 @@ public void testEncryption() throws SQLException { config.get("host").asText(), config.get("port").asInt(), config.get("sid").asText()), - getAdditionalProperties(algorithm))); + getAdditionalProperties(algorithm), + Duration.ofMinutes(5))); final String networkServiceBanner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; @@ -78,7 +80,7 @@ public void testCheckProtocol() throws SQLException { clone.get("host").asText(), clone.get("port").asInt(), clone.get("sid").asText()), - getAdditionalProperties(algorithm))); + getAdditionalProperties(algorithm), Duration.ofMinutes(5))); final String networkServiceBanner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; final List collect = database.queryJsons(networkServiceBanner); diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java index 10ce0fde6c7a..44221abcbd05 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/SshOracleDestinationAcceptanceTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.destination.oracle; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.upperQuoted; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; @@ -124,7 +126,8 @@ private List retrieveRecordsFromTable(final String tableName, final St (CheckedFunction, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig) .query( ctx -> ctx - .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT))) + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)))) .stream() .map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat())) .map(Jsons::deserialize) diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java index d87d36041168..180a0a543152 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.oracle; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.upperQuoted; import static io.airbyte.cdk.integrations.util.HostPortResolver.resolveHost; import static io.airbyte.cdk.integrations.util.HostPortResolver.resolvePort; import static org.hamcrest.CoreMatchers.equalTo; @@ -20,11 +21,13 @@ import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashSet; @@ -72,10 +75,10 @@ protected List retrieveRecords(final TestDestinationEnv env, final String namespace, final JsonNode streamSchema) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + return retrieveRecordsFromTable(namingResolver.convertStreamName(StreamId.concatenateRawTableName(namespace, streamName)), namespace) .stream() .map(r -> Jsons.deserialize( - r.get(OracleDestination.COLUMN_NAME_DATA.replace("\"", "")).asText())) + r.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()).asText())) .collect(Collectors.toList()); } @@ -122,17 +125,16 @@ protected JsonNode getFailCheckConfig() { private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { - try (final DSLContext dslContext = getDSLContext(config)) { - final List result = getDatabase(dslContext) - .query(ctx -> new ArrayList<>(ctx.fetch( - String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, - OracleDestination.COLUMN_NAME_EMITTED_AT)))); - return result - .stream() - .map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat())) - .map(Jsons::deserialize) - .collect(Collectors.toList()); - } + final DSLContext dslContext = getDSLContext(config); + final List result = getDatabase(dslContext) + .query(ctx -> new ArrayList<>(ctx.fetch( + String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, + upperQuoted(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT))))); + return result + .stream() + .map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat())) + .map(Jsons::deserialize) + .collect(Collectors.toList()); } private static DSLContext getDSLContext(final JsonNode config) { @@ -159,15 +161,13 @@ protected void setup(final TestDestinationEnv testEnv, HashSet TEST_SCHE db.start(); config = getConfig(db); + final DSLContext dslContext = getDSLContext(config); + final Database database = getDatabase(dslContext); + database.query( + ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName))); + database.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName))); - try (final DSLContext dslContext = getDSLContext(config)) { - final Database database = getDatabase(dslContext); - database.query( - ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName))); - database.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName))); - - ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, dbName); - } + ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, dbName); } @Override diff --git a/docs/integrations/destinations/oracle-migrations.md b/docs/integrations/destinations/oracle-migrations.md new file mode 100644 index 000000000000..26824b4eb765 --- /dev/null +++ b/docs/integrations/destinations/oracle-migrations.md @@ -0,0 +1,65 @@ +# Oracle Migration Guide + +## Upgrading to 1.0.0 + +This version removes the option to use "normalization" with clickhouse. It also changes +the schema and database of Airbyte's "raw" tables to be compatible with the new +[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) +format. These changes will likely require updates to downstream dbt / SQL models. After this update, +Airbyte will only produce the ‘raw’ v2 tables, which store all content in JSON. These changes remove +the ability to do deduplicated syncs with Clickhouse. (Clickhouse has an overview)[[https://clickhouse.com/docs/en/integrations/dbt]] +for integrating with dbt If you are interested in the Clickhouse destination gaining the full features +of Destinations V2 (including final tables), click [[https://github.com/airbytehq/airbyte/discussions/35339]] +to register your interest. + +This upgrade will ignore any existing raw tables and will not migrate any data to the new schema. +For each stream, you could perform the following query to migrate the data from the old raw table +to the new raw table: + +```sql +-- assumes your database was 'default' +-- replace `{{stream_name}}` with replace your stream name + +CREATE TABLE airbyte_internal.default_raw__stream_{{stream_name}} +( + `_airbyte_raw_id` String, + `_airbyte_extracted_at` DateTime64(3, 'GMT') DEFAULT now(), + `_airbyte_loaded_at` DateTime64(3, 'GMT') NULL, + `_airbyte_data` String, + PRIMARY KEY(`_airbyte_raw_id`) +) +ENGINE = MergeTree; + +INSERT INTO `airbyte_internal`.`default_raw__stream_{{stream_name}}` + SELECT + `_airbyte_ab_id` AS "_airbyte_raw_id", + `_airbyte_emitted_at` AS "_airbyte_extracted_at", + NULL AS "_airbyte_loaded_at", + _airbyte_data AS "_airbyte_data" + FROM default._airbyte_raw_{{stream_name}}; +``` + +Airbyte will not delete any of your v1 data. + +### Database/Schema and the Internal Schema +We have split the raw and final tables into their own schemas, which means that +we will only write into the raw table which will live in the `airbyte_internal` schema. +The tables written into this schema will be prefixed with either the default database provided in +the `DB Name` field when configuring clickhouse (but can also be overridden in the connection). You can +change the "raw" database from the default `airbyte_internal` by supplying a value for +`Raw Table Schema Name`. + +For Example: + +- Schema: `default` +- Stream Name: `my_stream` + +Writes to `airbyte_intneral.default_raw__stream_my_stream` + +where as: + +- DB Name: `default` +- Stream Name: `my_stream` +- Raw Table Schema Name: `raw_data` + +Writes to: `raw_data.default_raw__stream_my_stream` diff --git a/docs/integrations/destinations/oracle.md b/docs/integrations/destinations/oracle.md index d2e9867eb04a..89b54c95674e 100644 --- a/docs/integrations/destinations/oracle.md +++ b/docs/integrations/destinations/oracle.md @@ -91,7 +91,8 @@ Airbyte has the ability to connect to the Oracle source with 3 network connectiv ## Changelog | Version | Date | Pull Request | Subject | -| :---------- | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------- | +|:------------|:-----------|:-----------------------------------------------------------| :-------------------------------------------------------------------------------------------------- | +| 1.0.0 | 2024-03-15 | [\#xxxx](https://github.com/airbytehq/airbyte/pull/xxx) | Removes Normalization, updates to V2 Raw Table Format | | 0.2.0 | 2023-06-27 | [\#27781](https://github.com/airbytehq/airbyte/pull/27781) | License Update: Elv2 | | 0.1.19 | 2022-07-26 | [\#10719](https://github.com/airbytehq/airbyte/pull/) | Destination Oracle: added custom JDBC parameters support. | | 0.1.18 | 2022-07-14 | [\#14618](https://github.com/airbytehq/airbyte/pull/14618) | Removed additionalProperties: false from JDBC destination connectors |