Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination MSSQL - Remove Normalization #35874

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
cdkVersionRequired = '0.23.16'
features = [
'db-sources', // required for tests
'db-destinations',
's3-destinations',
'typing-deduping'
]
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where integration tests run into race conditions
testExecutionConcurrency=1
junitMethodExecutionTimeout = 30 m
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,23 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: d4353156-9217-4cad-8dd7-c108fd4f74cf
dockerImageTag: 0.2.0
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-mssql-strict-encrypt
githubIssueLabel: destination-mssql
icon: mssql.svg
license: ELv2
name: MS SQL Server
normalizationConfig:
normalizationIntegrationType: mssql
normalizationRepository: airbyte/normalization-mssql
normalizationTag: 0.4.1
releaseStage: alpha
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-03-15"
message: >
This version removes the option to use "normalization" with MSSQL. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
supportsDbt: true
tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
// do nothing
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.2.0'
cdkVersionRequired = '0.23.16'
features = [
'db-sources', // required for tests
'db-destinations',
's3-destinations',
'typing-deduping'
]
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# currently limit the number of parallel threads until further investigation into the issues \
# where integration tests run into race conditions
testExecutionConcurrency=1
junitMethodExecutionTimeout = 30 m
14 changes: 10 additions & 4 deletions airbyte-integrations/connectors/destination-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ data:
icon: mssql.svg
license: ELv2
name: MS SQL Server
normalizationConfig:
normalizationIntegrationType: mssql
normalizationRepository: airbyte/normalization-mssql
normalizationTag: 0.4.3
registries:
cloud:
dockerRepository: airbyte/destination-mssql-strict-encrypt
enabled: true
oss:
enabled: true
releaseStage: alpha
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-03-15"
message: >
This version removes the option to use "normalization" with Oracle. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
supportsDbt: true
tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,6 +88,18 @@ public JsonNode toJdbcConfig(final JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
return new RawOnlySqlGenerator(new MSSQLNameTransformer());
}

@Override
protected JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema) {
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
}

private String getTrustStoreLocation() {
// trust store location code found at https://stackoverflow.com/a/56570588
final String trustStoreLocation = Optional.ofNullable(System.getProperty("javax.net.ssl.trustStore"))
Expand All @@ -104,4 +123,14 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed destination: {}", MSSQLDestination.class);
}

@Override
public boolean isV2Destination() {
return true;
}

@Override
protected boolean shouldAlwaysDisableTypeDedupe() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import java.sql.SQLException;
import java.util.List;

Expand Down Expand Up @@ -38,9 +38,10 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
+ "%s VARCHAR(64) PRIMARY KEY,\n"
+ "%s NVARCHAR(MAX),\n" // Microsoft SQL Server specific: NVARCHAR can store Unicode meanwhile VARCHAR - not
+ "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET()\n"
+ "%s DATETIMEOFFSET(7)"
+ ");\n",
schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT);
}

@Override
Expand All @@ -60,7 +61,7 @@ public String truncateTableQuery(final JdbcDatabase database, final String schem

@Override
public void insertRecords(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
final List<PartialAirbyteMessage> records,
final String schemaName,
final String tempTableName)
throws SQLException {
Expand All @@ -72,11 +73,12 @@ public void insertRecords(final JdbcDatabase database,
"INSERT INTO %s.%s (%s, %s, %s) VALUES\n",
schemaName,
tempTableName,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT);
final String recordQueryComponent = "(?, ?, ?),\n";
final List<List<AirbyteRecordMessage>> batches = Lists.partition(records, MAX_BATCH_SIZE);
final List<List<PartialAirbyteMessage>> batches = Lists.partition(records, MAX_BATCH_SIZE);
batches.forEach(record -> {
try {
SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@
}
}
]
},
"raw_data_schema": {
"type": "string",
"description": "The schema to write raw tables into (default: airbyte_internal)",
"title": "Raw Table Schema Name",
"order": 7
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class MSSQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTes
private static MSSQLServerContainer<?> db;
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private JsonNode config;
private DSLContext dslContext;

@Override
protected String getImageName() {
Expand Down Expand Up @@ -93,17 +92,16 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env,
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
try (final DSLContext dslContext = DatabaseConnectionHelper.createDslContext(db, null)) {
return getDatabase(dslContext).query(
ctx -> {
ctx.fetch(String.format("USE %s;", config.get(JdbcUtils.DATABASE_KEY)));
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
});
}
final DSLContext dslContext = DatabaseConnectionHelper.createDslContext(db, null);
return getDatabase(dslContext).query(
ctx -> {
ctx.fetch(String.format("USE %s;", config.get(JdbcUtils.DATABASE_KEY)));
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList());
});
}

@BeforeAll
Expand Down Expand Up @@ -134,7 +132,7 @@ private static Database getDatabase(final DSLContext dslContext) {
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) throws SQLException {
final JsonNode configWithoutDbName = getConfig(db);
final String dbName = Strings.addRandomSuffix("db", "_", 10);
dslContext = getDslContext(configWithoutDbName);
DSLContext dslContext = getDslContext(configWithoutDbName);
final Database database = getDatabase(dslContext);
database.query(ctx -> {
ctx.fetch(String.format("CREATE DATABASE %s;", dbName));
Expand All @@ -150,8 +148,9 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
db.stop();
db.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ public class MSSQLDestinationAcceptanceTestSSL extends JdbcDestinationAcceptance

private static MSSQLServerContainer<?> db;
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private JsonNode configWithoutDbName;
private JsonNode config;
private DSLContext dslContext;

@Override
protected String getImageName() {
Expand Down Expand Up @@ -143,9 +141,9 @@ private static Database getDatabase(final DSLContext dslContext) {
// 2. /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "A_Str0ng_Required_Password"
@Override
protected void setup(final TestDestinationEnv testEnv, HashSet<String> TEST_SCHEMAS) throws SQLException {
configWithoutDbName = getConfig(db);
JsonNode configWithoutDbName = getConfig(db);
final String dbName = Strings.addRandomSuffix("db", "_", 10);
dslContext = getDslContext(configWithoutDbName);
DSLContext dslContext = getDslContext(configWithoutDbName);
final Database database = getDatabase(dslContext);
database.query(ctx -> {
ctx.fetch(String.format("CREATE DATABASE %s;", dbName));
Expand All @@ -162,7 +160,8 @@ protected void setup(final TestDestinationEnv testEnv, HashSet<String> TEST_SCHE

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
// no op, called in {@link
// io.airbyte.integrations.destination.mssql.MSSQLDestinationAcceptanceTestSSL.cleanUp}
}

@AfterAll
Expand Down
Loading