From 11bc6c228c9b465b085995648e88592300025ed5 Mon Sep 17 00:00:00 2001 From: Vardhan Thigle Date: Thu, 21 Nov 2024 12:11:38 +0000 Subject: [PATCH] Allowing Jdbc FetchSize Configuration for handling large rows --- .../v2/options/OptionsToConfigBuilder.java | 7 ++- .../v2/options/SourceDbToSpannerOptions.java | 34 ++++++++----- .../auth/dbauth/LocalCredentialsProvider.java | 5 ++ .../io/jdbc/iowrapper/JdbcIoWrapper.java | 8 ++- .../transforms/ReadWithUniformPartitions.java | 47 +++++++++++++++--- .../v2/templates/PipelineController.java | 3 +- .../options/OptionsToConfigBuilderTest.java | 25 ++++++++-- .../io/jdbc/iowrapper/JdbcIoWrapperTest.java | 10 ++++ .../ReadWithUniformPartitionsTest.java | 49 +++++++++++++++++++ 9 files changed, 162 insertions(+), 26 deletions(-) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java index 7c8ddb5b7c..57f12abc2d 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java @@ -76,7 +76,8 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults( jdbcDriverJars, maxConnections, numPartitions, - waitOn); + waitOn, + options.getFetchSize()); } public static JdbcIOWrapperConfig getJdbcIOWrapperConfig( @@ -95,7 +96,8 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig( String jdbcDriverJars, long maxConnections, Integer numPartitions, - Wait.OnSignal waitOn) { + Wait.OnSignal waitOn, + Integer fetchSize) { JdbcIOWrapperConfig.Builder builder = builderWithDefaultsFor(sqlDialect); SourceSchemaReference sourceSchemaReference = sourceSchemaReferenceFrom(sqlDialect, dbName, namespace); @@ -149,6 +151,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig( builder.setMaxPartitions(numPartitions); builder = builder.setTables(ImmutableList.copyOf(tables)); + builder = builder.setMaxFetchSize(fetchSize); return builder.build(); } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java index d9321df03e..428010f91e 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java @@ -117,8 +117,20 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setNumPartitions(Integer value); - @TemplateParameter.Text( + @TemplateParameter.Integer( order = 9, + optional = true, + description = "The number of rows to fetch per page read for JDBC source.", + helpText = + "The number of rows to fetch per page read for JDBC source. If not set, the default of JdbcIO of 50_000 rows gets used." + + " This ultimately translated to Statement.setFetchSize call at Jdbc layer. It should ONLY be used if the default value throws memory errors." + + "In case you are using MySql source, please also note that FetchSize is ignored by the connector unless, you also set `useCursorFetch=true` as a connection property either in the source URL or the shard config file") + Integer getFetchSize(); + + void setFetchSize(Integer value); + + @TemplateParameter.Text( + order = 10, groupName = "Target", description = "Cloud Spanner Instance Id.", helpText = "The destination Cloud Spanner instance.") @@ -127,7 +139,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setInstanceId(String value); @TemplateParameter.Text( - order = 10, + order = 11, groupName = "Target", regexes = {"^[a-z]([a-z0-9_-]{0,28})[a-z0-9]$"}, description = "Cloud Spanner Database Id.", @@ -137,7 +149,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setDatabaseId(String value); @TemplateParameter.ProjectId( - order = 11, + order = 12, groupName = "Target", description = "Cloud Spanner Project Id.", helpText = "This is the name of the Cloud Spanner project.") @@ -146,7 +158,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setProjectId(String projectId); @TemplateParameter.Text( - order = 12, + order = 13, optional = true, description = "Cloud Spanner Endpoint to call", helpText = "The Cloud Spanner endpoint to call in the template.", @@ -157,7 +169,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setSpannerHost(String value); @TemplateParameter.Integer( - order = 13, + order = 14, optional = true, description = "Maximum number of connections to Source database per worker", helpText = @@ -169,7 +181,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setMaxConnections(Integer value); @TemplateParameter.GcsReadFile( - order = 14, + order = 15, optional = true, description = "Session File Path in Cloud Storage, to provide mapping information in the form of a session file", @@ -182,7 +194,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setSessionFilePath(String value); @TemplateParameter.GcsReadFile( - order = 15, + order = 16, description = "Output directory for failed/skipped/filtered events", helpText = "This directory is used to dump the failed/skipped/filtered records in a migration.") @@ -191,7 +203,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setOutputDirectory(String value); @TemplateParameter.GcsReadFile( - order = 16, + order = 17, optional = true, description = "Custom jar location in Cloud Storage", helpText = @@ -202,7 +214,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setTransformationJarPath(String value); @TemplateParameter.Text( - order = 17, + order = 18, optional = true, description = "Custom class name", helpText = @@ -214,7 +226,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setTransformationClassName(String value); @TemplateParameter.Text( - order = 18, + order = 19, optional = true, description = "Custom parameters for transformation", helpText = @@ -225,7 +237,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { void setTransformationCustomParameters(String value); @TemplateParameter.Text( - order = 19, + order = 20, optional = true, description = "Namespace", helpText = diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/auth/dbauth/LocalCredentialsProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/auth/dbauth/LocalCredentialsProvider.java index e1aeff804a..aa33138144 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/auth/dbauth/LocalCredentialsProvider.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/auth/dbauth/LocalCredentialsProvider.java @@ -43,6 +43,11 @@ public static Builder builder() { return new AutoValue_LocalCredentialsProvider.Builder(); } + @Override + public String toString() { + return "LocalCredentialsProvider{}"; + } + @AutoValue.Builder public abstract static class Builder { diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java index 2fe9c626e0..d001f8864f 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java @@ -416,6 +416,9 @@ private static PTransform> getJdbcIO( if (tableConfig.maxPartitions() != null) { jdbcIO = jdbcIO.withNumPartitions(tableConfig.maxPartitions()); } + if (config.maxFetchSize() != null) { + jdbcIO = jdbcIO.withFetchSize(config.maxFetchSize()); + } return jdbcIO; } @@ -443,6 +446,7 @@ private static PTransform> getReadWithUniformPart .setDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration)) .setDbAdapter(config.dialectAdapter()) .setApproxTotalRowCount(tableConfig.approxRowCount()) + .setFetchSize(config.maxFetchSize()) .setRowMapper( new JdbcSourceRowMapper( config.valueMappingsProvider(), @@ -463,7 +467,9 @@ private static PTransform> getReadWithUniformPart readWithUniformPartitionsBuilder = readWithUniformPartitionsBuilder.setMaxPartitionsHint((long) tableConfig.maxPartitions()); } - return readWithUniformPartitionsBuilder.build(); + ReadWithUniformPartitions readWithUniformPartitions = readWithUniformPartitionsBuilder.build(); + LOG.info("Configured ReadWithUniformPartitions {} for {}", readWithUniformPartitions, config); + return readWithUniformPartitions; } /** diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitions.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitions.java index 38b92759a3..8941e4d30d 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitions.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitions.java @@ -34,6 +34,8 @@ import javax.sql.DataSource; import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter; +import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadAll; +import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.PTransform; @@ -104,6 +106,14 @@ public abstract class ReadWithUniformPartitions extends PTransform rowMapper(); + /** + * Max fetch size for jdbc read. * If Null, {@link JdbcIO JdbcIO's} default fetch size of 50_000 + * gets used. {@link JdbcIO.Read#withFetchSize(int)} recommends setting this manually only if the + * default value gives out of memory errors. + */ + @Nullable + abstract Integer fetchSize(); + /** * Hint for Maximum number of partitions of the source key space. If not set, it is auto inferred * as 1/10 * sqrt({@link ReadWithUniformPartitions#autoAdjustMaxPartitions()}). Note that if @@ -221,12 +231,34 @@ public PCollection expand(PBegin input) { Reshuffle.viaRandomKey().withNumBuckets(dbParallelizationForReads())) .apply( getTransformName("RangeRead", null), - JdbcIO.readAll() - .withOutputParallelization(false) - .withQuery(dbAdapter().getReadQuery(tableName(), colNames)) - .withParameterSetter(rangePrepareator) - .withDataSourceProviderFn(dataSourceProviderFn()) - .withRowMapper(rowMapper())); + buildJdbcIO( + JdbcIO.readAll(), + dbAdapter().getReadQuery(tableName(), colNames), + rangePrepareator, + dataSourceProviderFn(), + rowMapper(), + fetchSize())); + } + + @VisibleForTesting + protected static JdbcIO.ReadAll buildJdbcIO( + JdbcIO.ReadAll readAll, + String readQuery, + PreparedStatementSetter rangePrepareator, + SerializableFunction dataSourceProviderFn, + RowMapper rowMapper, + Integer fetchSize) { + ReadAll ret = + readAll + .withOutputParallelization(false) + .withQuery(readQuery) + .withParameterSetter(rangePrepareator) + .withDataSourceProviderFn(dataSourceProviderFn) + .withRowMapper(rowMapper); + if (fetchSize != null) { + ret = ret.withFetchSize(fetchSize); + } + return ret; } public static Builder builder() { @@ -234,6 +266,7 @@ public static Builder builder() { .setCountQueryTimeoutMillis(SPLITTER_DEFAULT_COUNT_QUERY_TIMEOUT_MILLIS) .setDbParallelizationForSplitProcess(null) .setDbParallelizationForReads(null) + .setFetchSize(null) .setAutoAdjustMaxPartitions(true); } @@ -451,6 +484,8 @@ public abstract Builder setDataSourceProviderFn( public abstract Builder setRowMapper(JdbcIO.RowMapper value); + public abstract Builder setFetchSize(@Nullable Integer value); + public abstract Builder setAdditionalOperationsOnRanges( @Nullable PTransform>, ?> value); diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java index 637fa0ae23..69277ae89d 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java @@ -294,7 +294,8 @@ public JdbcIOWrapperConfig getJDBCIOWrapperConfig( options.getJdbcDriverJars(), options.getMaxConnections(), options.getNumPartitions(), - waitOnSignal); + waitOnSignal, + options.getFetchSize()); } @Override diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java index 7210706a0b..9ea7010288 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java @@ -66,6 +66,16 @@ public void testConfigWithMySqlDefaultsFromOptions() { assertThat(config.dbAuth().getUserName().get()).isEqualTo(testUser); assertThat(config.dbAuth().getPassword().get()).isEqualTo(testPassword); assertThat(config.waitOn()).isNotNull(); + assertThat(config.maxFetchSize()).isNull(); + sourceDbToSpannerOptions.setFetchSize(42); + assertThat( + OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults( + sourceDbToSpannerOptions, + List.of("table1", "table2"), + null, + Wait.on(dummyPCollection)) + .maxFetchSize()) + .isEqualTo(42); } @Test @@ -89,7 +99,8 @@ public void testConfigWithMySqlUrlFromOptions() { "mysql-jar", 10, 0, - Wait.on(dummyPCollection)); + Wait.on(dummyPCollection), + null); JdbcIOWrapperConfig configWithoutConnectionProperties = OptionsToConfigBuilder.getJdbcIOWrapperConfig( @@ -108,7 +119,8 @@ public void testConfigWithMySqlUrlFromOptions() { "mysql-jar", 10, 0, - Wait.on(dummyPCollection)); + Wait.on(dummyPCollection), + null); assertThat(configWithConnectionProperties.sourceDbURL()) .isEqualTo( @@ -172,7 +184,8 @@ public void testConfigWithPostgreSqlUrlFromOptions() { "mysql-jar", 10, 0, - Wait.on(dummyPCollection)); + Wait.on(dummyPCollection), + null); JdbcIOWrapperConfig configWithoutConnectionParameters = OptionsToConfigBuilder.getJdbcIOWrapperConfig( SQLDialect.POSTGRESQL, @@ -190,7 +203,8 @@ public void testConfigWithPostgreSqlUrlFromOptions() { "mysql-jar", 10, 0, - Wait.on(dummyPCollection)); + Wait.on(dummyPCollection), + null); assertThat(configWithoutConnectionParameters.sourceDbURL()) .isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=public"); assertThat(configWithConnectionParameters.sourceDbURL()) @@ -218,7 +232,8 @@ public void testConfigWithPostgreSqlUrlWithNamespace() { "mysql-jar", 10, 0, - Wait.on(dummyPCollection)); + Wait.on(dummyPCollection), + null); assertThat(configWithNamespace.sourceDbURL()) .isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=mynamespace"); } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java index 1c8335b4de..da129dc20f 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java @@ -372,6 +372,16 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc assertThat( jdbcIOWrapperWithFeatureEnabled.getTableReaders().values().stream().findFirst().get()) .isInstanceOf(ReadWithUniformPartitions.class); + // We test that setting the fetch size works for both modes. The more detailed testing of the + // fetch size getting applied to JdbcIO is covered in {@link ReadWithUniformPartitionTest} + assertThat( + JdbcIoWrapper.of(configWithFeatureEnabled.toBuilder().setMaxFetchSize(42).build()) + .getTableReaders()) + .hasSize(1); + assertThat( + JdbcIoWrapper.of(configWithFeatureDisabled.toBuilder().setMaxFetchSize(42).build()) + .getTableReaders()) + .hasSize(1); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitionsTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitionsTest.java index 73d90a1292..9f58aabdee 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitionsTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/transforms/ReadWithUniformPartitionsTest.java @@ -32,6 +32,12 @@ */ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql.MysqlDialectAdapter; import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql.MysqlDialectAdapter.MySqlVersion; @@ -44,6 +50,7 @@ import java.sql.SQLException; import java.util.Iterator; import javax.sql.DataSource; +import org.apache.beam.sdk.io.jdbc.JdbcIO; import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -233,6 +240,48 @@ public void testMaxPartitionAutoInference() { assertThat(readWithUniformPartitionsLargeRowCount.maxPartitionsHint()).isEqualTo(4472L); } + @Test + public void testBuildJdbc() { + JdbcIO.ReadAll mockReadAll = mock(JdbcIO.ReadAll.class); + String testQuery = "Select *"; + JdbcIO.PreparedStatementSetter mockRangePrepareator = + mock(JdbcIO.PreparedStatementSetter.class); + SerializableFunction mockDataSourceProviderFn = + mock(SerializableFunction.class); + JdbcIO.RowMapper mockRowMapper = mock(RowMapper.class); + Integer testFetchSize = 42; + + when(mockReadAll.withQuery(testQuery)).thenReturn(mockReadAll); + when(mockReadAll.withParameterSetter(mockRangePrepareator)).thenReturn(mockReadAll); + when(mockReadAll.withDataSourceProviderFn(mockDataSourceProviderFn)).thenReturn(mockReadAll); + when(mockReadAll.withOutputParallelization(false)).thenReturn(mockReadAll); + when(mockReadAll.withRowMapper(mockRowMapper)).thenReturn(mockReadAll); + when(mockReadAll.withFetchSize(testFetchSize)).thenReturn(mockReadAll); + + ReadWithUniformPartitions.buildJdbcIO( + mockReadAll, + testQuery, + mockRangePrepareator, + mockDataSourceProviderFn, + mockRowMapper, + null); + // No fetch size set. + verify(mockReadAll, never()).withFetchSize(anyInt()); + ReadWithUniformPartitions.buildJdbcIO( + mockReadAll, + testQuery, + mockRangePrepareator, + mockDataSourceProviderFn, + mockRowMapper, + testFetchSize); + verify(mockReadAll, times(1)).withFetchSize(testFetchSize); + verify(mockReadAll, times(2)).withQuery(testQuery); + verify(mockReadAll, times(2)).withParameterSetter(mockRangePrepareator); + verify(mockReadAll, times(2)).withDataSourceProviderFn(mockDataSourceProviderFn); + verify(mockReadAll, times(2)).withOutputParallelization(false); + verify(mockReadAll, times(2)).withRowMapper(mockRowMapper); + } + @Test public void testMaxPartitionAutoInferencePreConditions() { Range initialRangeWithWrongColumChild =