From b66d528a4d23c0bd3fadbfa201c222c890848166 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Mon, 1 Jul 2024 10:16:58 -0700 Subject: [PATCH] Prefer a clustered over primary for initial load of table (#40638) Co-authored-by: Evan Tahler --- .../connectors/source-mssql/metadata.yaml | 2 +- .../initialsync/MssqlInitialLoadHandler.java | 11 ++++- .../source/mssql/MssqlSourceTest.java | 40 ++++++++++++++++--- docs/integrations/sources/mssql.md | 3 +- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/metadata.yaml b/airbyte-integrations/connectors/source-mssql/metadata.yaml index 616badf675c2..138e0a6e3eb2 100644 --- a/airbyte-integrations/connectors/source-mssql/metadata.yaml +++ b/airbyte-integrations/connectors/source-mssql/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 - dockerImageTag: 4.0.32 + dockerImageTag: 4.0.33 dockerRepository: airbyte/source-mssql documentationUrl: https://docs.airbyte.com/integrations/sources/mssql githubIssueLabel: source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadHandler.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadHandler.java index 0a5e6a5f0a79..afda9c70acb7 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadHandler.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadHandler.java @@ -9,6 +9,7 @@ import static io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_SCHEMA_NAME; import static io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_TABLE_NAME; import static io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE; +import static io.airbyte.cdk.db.jdbc.JdbcUtils.getFullyQualifiedTableName; import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY; import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY; @@ -42,6 +43,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +94,7 @@ public static String discoverClusteredIndexForStream(final JdbcDatabase database if (r.getShort(JDBC_COLUMN_TYPE) == DatabaseMetaData.tableIndexClustered) { final String schemaName = r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString(JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME); - final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME)); + final String streamName = getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME)); final String columnName = r.getString(JDBC_COLUMN_COLUMN_NAME); return new ClusteredIndexAttributesFromDb(streamName, columnName); } else { @@ -102,7 +104,12 @@ public static String discoverClusteredIndexForStream(final JdbcDatabase database } catch (final SQLException e) { LOGGER.debug(String.format("Could not retrieve clustered indexes without a table name (%s), not blocking, fall back to use pk.", e)); } - return clusteredIndexes.getOrDefault(stream.getName(), null); + LOGGER.debug("clusteredIndexes: {}", StringUtils.join(clusteredIndexes)); + final String streamName = stream.getName(); + final String namespace = stream.getNamespace(); + + return clusteredIndexes.getOrDefault( + getFullyQualifiedTableName(namespace, streamName), null); } @VisibleForTesting diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java index 38a8b554a1f0..6daecf68817d 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java @@ -7,20 +7,18 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Lists; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.util.MoreIterators; import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage; +import io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadHandler; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; -import io.airbyte.protocol.models.v0.AirbyteCatalog; -import io.airbyte.protocol.models.v0.CatalogHelpers; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; +import io.airbyte.protocol.models.v0.*; +import java.sql.SQLException; import java.util.Collections; import java.util.List; import org.junit.jupiter.api.*; @@ -117,4 +115,34 @@ public void testTableWithNullCursorValueShouldThrowException() throws Exception "The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering with no null values as a cursor. {tableName='dbo.id_and_name', cursorColumnName='id', cursorSqlType=INTEGER, cause=Cursor column contains NULL value}"); } + @Test + void testDiscoverWithNonClusteredPk() throws SQLException { + testdb + .with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);") + .with("CREATE INDEX i1 ON id_and_name (id);") + .with("CREATE CLUSTERED INDEX n1 ON id_and_name (name)"); + final AirbyteCatalog actual = source().discover(getConfig()); + assertEquals(CATALOG, actual); + final var db = source().createDatabase(getConfig()); + final String oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db, + new AirbyteStream().withName( + actual.getStreams().get(0).getName()).withNamespace(actual.getStreams().get(0).getNamespace())); + assertEquals(oc, "name"); + } + + @Test + void testDiscoverWithNoClusteredIndex() throws SQLException { + testdb + .with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);") + .with("CREATE INDEX i1 ON id_and_name (id);") + .with("CREATE NONCLUSTERED INDEX n1 ON id_and_name (name)"); + final AirbyteCatalog actual = source().discover(getConfig()); + assertEquals(CATALOG, actual); + final var db = source().createDatabase(getConfig()); + final String oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db, + new AirbyteStream().withName( + actual.getStreams().get(0).getName()).withNamespace(actual.getStreams().get(0).getNamespace())); + assertNull(oc); + } + } diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 9617df924aac..671e0e1d158b 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------| +| 4.0.33 | 2024-06-30 | [40638](https://github.com/airbytehq/airbyte/pull/40638) | Fix a bug that could slow down an initial load of a large table using a different clustered index from the primary key. | | 4.0.32 | 2024-06-26 | [40558](https://github.com/airbytehq/airbyte/pull/40558) | Handle DatetimeOffset correctly. | | 4.0.31 | 2024-06-14 | [39419](https://github.com/airbytehq/airbyte/pull/39419) | Handle DatetimeOffset correctly. | | 4.0.30 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. | @@ -567,4 +568,4 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura | 0.1.5 | 2020-11-30 | [1038](https://github.com/airbytehq/airbyte/pull/1038) | Change JDBC sources to discover more than standard schemas | | 0.1.4 | 2020-11-30 | [1046](https://github.com/airbytehq/airbyte/pull/1046) | Add connectors using an index YAML file | - \ No newline at end of file +