diff --git a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/build.gradle index 09e3a703a2b6..c34b9e0e9dd3 100644 --- a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/build.gradle @@ -4,12 +4,14 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.2.0' + cdkVersionRequired = '0.23.16' features = [ 'db-sources', // required for tests 'db-destinations', + 's3-destinations', + 'typing-deduping' ] - useLocalCdk = false + useLocalCdk = true } //remove once upgrading the CDK version to 0.4.x or later diff --git a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/gradle.properties b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/gradle.properties index 2b147dcf7175..f79303979c5b 100644 --- a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/gradle.properties +++ b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/gradle.properties @@ -1,3 +1,4 @@ # currently limit the number of parallel threads until further investigation into the issues \ # where integration tests run into race conditions testExecutionConcurrency=1 +junitMethodExecutionTimeout = 30 m diff --git a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/metadata.yaml index 073064b4272c..9d847da9d045 100644 --- a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/metadata.yaml @@ -7,17 +7,23 @@ data: connectorSubtype: database connectorType: destination definitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf - dockerImageTag: 0.2.0 + dockerImageTag: 1.0.0 dockerRepository: airbyte/destination-mssql-strict-encrypt githubIssueLabel: destination-mssql icon: mssql.svg license: ELv2 name: MS SQL Server - normalizationConfig: - normalizationIntegrationType: mssql - normalizationRepository: airbyte/normalization-mssql - normalizationTag: 0.4.1 releaseStage: alpha + releases: + breakingChanges: + 1.0.0: + upgradeDeadline: "2024-03-15" + message: > + This version removes the option to use "normalization" with MSSQL. It also changes + the schema and database of Airbyte's "raw" tables to be compatible with the new + [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) + format. These changes will likely require updates to downstream dbt / SQL models. + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql supportsDbt: true tags: diff --git a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java index 1be629003847..6d7eccf6c47e 100644 --- a/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mssql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mssql_strict_encrypt/MssqlStrictEncryptDestinationAcceptanceTest.java @@ -167,7 +167,7 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet TES @Override protected void tearDown(final TestDestinationEnv testEnv) { - dslContext.close(); + // do nothing } @AfterAll diff --git a/airbyte-integrations/connectors/destination-mssql/build.gradle b/airbyte-integrations/connectors/destination-mssql/build.gradle index ba588da10bce..ffba31154c17 100644 --- a/airbyte-integrations/connectors/destination-mssql/build.gradle +++ b/airbyte-integrations/connectors/destination-mssql/build.gradle @@ -4,12 +4,14 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.2.0' + cdkVersionRequired = '0.23.16' features = [ 'db-sources', // required for tests 'db-destinations', + 's3-destinations', + 'typing-deduping' ] - useLocalCdk = false + useLocalCdk = true } //remove once upgrading the CDK version to 0.4.x or later diff --git a/airbyte-integrations/connectors/destination-mssql/gradle.properties b/airbyte-integrations/connectors/destination-mssql/gradle.properties index 2b147dcf7175..f79303979c5b 100644 --- a/airbyte-integrations/connectors/destination-mssql/gradle.properties +++ b/airbyte-integrations/connectors/destination-mssql/gradle.properties @@ -1,3 +1,4 @@ # currently limit the number of parallel threads until further investigation into the issues \ # where integration tests run into race conditions testExecutionConcurrency=1 +junitMethodExecutionTimeout = 30 m diff --git a/airbyte-integrations/connectors/destination-mssql/metadata.yaml b/airbyte-integrations/connectors/destination-mssql/metadata.yaml index 53ce25ce47bd..82c8a3c14f4a 100644 --- a/airbyte-integrations/connectors/destination-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mssql/metadata.yaml @@ -8,10 +8,6 @@ data: icon: mssql.svg license: ELv2 name: MS SQL Server - normalizationConfig: - normalizationIntegrationType: mssql - normalizationRepository: airbyte/normalization-mssql - normalizationTag: 0.4.3 registries: cloud: dockerRepository: airbyte/destination-mssql-strict-encrypt @@ -19,6 +15,16 @@ data: oss: enabled: true releaseStage: alpha + releases: + breakingChanges: + 1.0.0: + upgradeDeadline: "2024-03-15" + message: > + This version removes the option to use "normalization" with Oracle. It also changes + the schema and database of Airbyte's "raw" tables to be compatible with the new + [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) + format. These changes will likely require updates to downstream dbt / SQL models. + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql supportsDbt: true tags: diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLDestination.java b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLDestination.java index b23fd171cb78..84c863a7d90c 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLDestination.java +++ b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/MSSQLDestination.java @@ -7,16 +7,23 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.factory.DatabaseDriver; +import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.cdk.integrations.base.Destination; import io.airbyte.cdk.integrations.base.IntegrationRunner; import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination; import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import java.io.File; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import org.jooq.SQLDialect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +88,18 @@ public JsonNode toJdbcConfig(final JsonNode config) { return Jsons.jsonNode(configBuilder.build()); } + @Override + protected JdbcSqlGenerator getSqlGenerator() { + return new RawOnlySqlGenerator(new MSSQLNameTransformer()); + } + + @Override + protected JdbcDestinationHandler getDestinationHandler(final String databaseName, + final JdbcDatabase database, + final String rawTableSchema) { + return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT); + } + private String getTrustStoreLocation() { // trust store location code found at https://stackoverflow.com/a/56570588 final String trustStoreLocation = Optional.ofNullable(System.getProperty("javax.net.ssl.trustStore")) @@ -104,4 +123,14 @@ public static void main(final String[] args) throws Exception { LOGGER.info("completed destination: {}", MSSQLDestination.class); } + @Override + public boolean isV2Destination() { + return true; + } + + @Override + protected boolean shouldAlwaysDisableTypeDedupe() { + return true; + } + } diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/SqlServerOperations.java b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/SqlServerOperations.java index 010793285c89..d1f4adf29c33 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/SqlServerOperations.java +++ b/airbyte-integrations/connectors/destination-mssql/src/main/java/io/airbyte/integrations/destination/mssql/SqlServerOperations.java @@ -10,7 +10,7 @@ import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations; import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import java.sql.SQLException; import java.util.List; @@ -38,9 +38,10 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN + "%s VARCHAR(64) PRIMARY KEY,\n" + "%s NVARCHAR(MAX),\n" // Microsoft SQL Server specific: NVARCHAR can store Unicode meanwhile VARCHAR - not + "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET()\n" + + "%s DATETIMEOFFSET(7)" + ");\n", - schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT); } @Override @@ -60,7 +61,7 @@ public String truncateTableQuery(final JdbcDatabase database, final String schem @Override public void insertRecords(final JdbcDatabase database, - final List records, + final List records, final String schemaName, final String tempTableName) throws SQLException { @@ -72,11 +73,12 @@ public void insertRecords(final JdbcDatabase database, "INSERT INTO %s.%s (%s, %s, %s) VALUES\n", schemaName, tempTableName, - JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, + JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT); final String recordQueryComponent = "(?, ?, ?),\n"; - final List> batches = Lists.partition(records, MAX_BATCH_SIZE); + final List> batches = Lists.partition(records, MAX_BATCH_SIZE); batches.forEach(record -> { try { SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, record); diff --git a/airbyte-integrations/connectors/destination-mssql/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-mssql/src/main/resources/spec.json index aa9ca41be384..6d690edc7a96 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-mssql/src/main/resources/spec.json @@ -114,6 +114,12 @@ } } ] + }, + "raw_data_schema": { + "type": "string", + "description": "The schema to write raw tables into (default: airbyte_internal)", + "title": "Raw Table Schema Name", + "order": 7 } } } diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java index ca0b09115616..53d768538a5f 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTest.java @@ -33,7 +33,6 @@ public class MSSQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTes private static MSSQLServerContainer db; private final StandardNameTransformer namingResolver = new StandardNameTransformer(); private JsonNode config; - private DSLContext dslContext; @Override protected String getImageName() { @@ -93,17 +92,16 @@ protected List retrieveNormalizedRecords(final TestDestinationEnv env, } private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { - try (final DSLContext dslContext = DatabaseConnectionHelper.createDslContext(db, null)) { - return getDatabase(dslContext).query( - ctx -> { - ctx.fetch(String.format("USE %s;", config.get(JdbcUtils.DATABASE_KEY))); - return ctx - .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .stream() - .map(this::getJsonFromRecord) - .collect(Collectors.toList()); - }); - } + final DSLContext dslContext = DatabaseConnectionHelper.createDslContext(db, null); + return getDatabase(dslContext).query( + ctx -> { + ctx.fetch(String.format("USE %s;", config.get(JdbcUtils.DATABASE_KEY))); + return ctx + .fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)) + .stream() + .map(this::getJsonFromRecord) + .collect(Collectors.toList()); + }); } @BeforeAll @@ -134,7 +132,7 @@ private static Database getDatabase(final DSLContext dslContext) { protected void setup(final TestDestinationEnv testEnv, final HashSet TEST_SCHEMAS) throws SQLException { final JsonNode configWithoutDbName = getConfig(db); final String dbName = Strings.addRandomSuffix("db", "_", 10); - dslContext = getDslContext(configWithoutDbName); + DSLContext dslContext = getDslContext(configWithoutDbName); final Database database = getDatabase(dslContext); database.query(ctx -> { ctx.fetch(String.format("CREATE DATABASE %s;", dbName)); @@ -150,8 +148,9 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet TES } @Override - protected void tearDown(final TestDestinationEnv testEnv) { - dslContext.close(); + protected void tearDown(final TestDestinationEnv testEnv) throws Exception { + db.stop(); + db.close(); } @Override diff --git a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java index 4991ce4ff163..844ab9a9d44e 100644 --- a/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java +++ b/airbyte-integrations/connectors/destination-mssql/src/test-integration/java/io/airbyte/integrations/destination/mssql/MSSQLDestinationAcceptanceTestSSL.java @@ -34,9 +34,7 @@ public class MSSQLDestinationAcceptanceTestSSL extends JdbcDestinationAcceptance private static MSSQLServerContainer db; private final StandardNameTransformer namingResolver = new StandardNameTransformer(); - private JsonNode configWithoutDbName; private JsonNode config; - private DSLContext dslContext; @Override protected String getImageName() { @@ -143,9 +141,9 @@ private static Database getDatabase(final DSLContext dslContext) { // 2. /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "A_Str0ng_Required_Password" @Override protected void setup(final TestDestinationEnv testEnv, HashSet TEST_SCHEMAS) throws SQLException { - configWithoutDbName = getConfig(db); + JsonNode configWithoutDbName = getConfig(db); final String dbName = Strings.addRandomSuffix("db", "_", 10); - dslContext = getDslContext(configWithoutDbName); + DSLContext dslContext = getDslContext(configWithoutDbName); final Database database = getDatabase(dslContext); database.query(ctx -> { ctx.fetch(String.format("CREATE DATABASE %s;", dbName)); @@ -162,7 +160,8 @@ protected void setup(final TestDestinationEnv testEnv, HashSet TEST_SCHE @Override protected void tearDown(final TestDestinationEnv testEnv) { - dslContext.close(); + // no op, called in {@link + // io.airbyte.integrations.destination.mssql.MSSQLDestinationAcceptanceTestSSL.cleanUp} } @AfterAll