Skip to content

Commit

Permalink
Prefer a clustered over primary for initial load of table (#40638)
Browse files Browse the repository at this point in the history
Co-authored-by: Evan Tahler <evan@airbyte.io>
  • Loading branch information
rodireich and evantahler authored Jul 1, 2024
1 parent 1b85b28 commit b66d528
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.32
dockerImageTag: 4.0.33
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_SCHEMA_NAME;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_TABLE_NAME;
import static io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE;
import static io.airbyte.cdk.db.jdbc.JdbcUtils.getFullyQualifiedTableName;
import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY;
import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY;

Expand Down Expand Up @@ -42,6 +43,7 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -92,7 +94,7 @@ public static String discoverClusteredIndexForStream(final JdbcDatabase database
if (r.getShort(JDBC_COLUMN_TYPE) == DatabaseMetaData.tableIndexClustered) {
final String schemaName =
r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString(JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME);
final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME));
final String streamName = getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME));
final String columnName = r.getString(JDBC_COLUMN_COLUMN_NAME);
return new ClusteredIndexAttributesFromDb(streamName, columnName);
} else {
Expand All @@ -102,7 +104,12 @@ public static String discoverClusteredIndexForStream(final JdbcDatabase database
} catch (final SQLException e) {
LOGGER.debug(String.format("Could not retrieve clustered indexes without a table name (%s), not blocking, fall back to use pk.", e));
}
return clusteredIndexes.getOrDefault(stream.getName(), null);
LOGGER.debug("clusteredIndexes: {}", StringUtils.join(clusteredIndexes));
final String streamName = stream.getName();
final String namespace = stream.getNamespace();

return clusteredIndexes.getOrDefault(
getFullyQualifiedTableName(namespace, streamName), null);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.source.mssql.MsSQLTestDatabase.BaseImage;
import io.airbyte.integrations.source.mssql.initialsync.MssqlInitialLoadHandler;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import io.airbyte.protocol.models.v0.*;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.*;
Expand Down Expand Up @@ -117,4 +115,34 @@ public void testTableWithNullCursorValueShouldThrowException() throws Exception
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering with no null values as a cursor. {tableName='dbo.id_and_name', cursorColumnName='id', cursorSqlType=INTEGER, cause=Cursor column contains NULL value}");
}

@Test
void testDiscoverWithNonClusteredPk() throws SQLException {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);")
.with("CREATE CLUSTERED INDEX n1 ON id_and_name (name)");
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());
final String oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db,
new AirbyteStream().withName(
actual.getStreams().get(0).getName()).withNamespace(actual.getStreams().get(0).getNamespace()));
assertEquals(oc, "name");
}

@Test
void testDiscoverWithNoClusteredIndex() throws SQLException {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY NONCLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);")
.with("CREATE NONCLUSTERED INDEX n1 ON id_and_name (name)");
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(CATALOG, actual);
final var db = source().createDatabase(getConfig());
final String oc = MssqlInitialLoadHandler.discoverClusteredIndexForStream(db,
new AirbyteStream().withName(
actual.getStreams().get(0).getName()).withNamespace(actual.getStreams().get(0).getNamespace()));
assertNull(oc);
}

}
3 changes: 2 additions & 1 deletion docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.33 | 2024-06-30 | [40638](https://github.com/airbytehq/airbyte/pull/40638) | Fix a bug that could slow down an initial load of a large table using a different clustered index from the primary key. |
| 4.0.32 | 2024-06-26 | [40558](https://github.com/airbytehq/airbyte/pull/40558) | Handle DatetimeOffset correctly. |
| 4.0.31 | 2024-06-14 | [39419](https://github.com/airbytehq/airbyte/pull/39419) | Handle DatetimeOffset correctly. |
| 4.0.30 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
Expand Down Expand Up @@ -567,4 +568,4 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
| 0.1.5 | 2020-11-30 | [1038](https://github.com/airbytehq/airbyte/pull/1038) | Change JDBC sources to discover more than standard schemas |
| 0.1.4 | 2020-11-30 | [1046](https://github.com/airbytehq/airbyte/pull/1046) | Add connectors using an index YAML file |

</details>
</details>

0 comments on commit b66d528

Please sign in to comment.