Skip to content

Commit

Permalink
Allowing Jdbc FetchSize Configuration for handling large rows (#2028)
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle authored Nov 28, 2024
1 parent add3dd7 commit c5a74fc
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
jdbcDriverJars,
maxConnections,
numPartitions,
waitOn);
waitOn,
options.getFetchSize());
}

public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
Expand All @@ -95,7 +96,8 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
String jdbcDriverJars,
long maxConnections,
Integer numPartitions,
Wait.OnSignal<?> waitOn) {
Wait.OnSignal<?> waitOn,
Integer fetchSize) {
JdbcIOWrapperConfig.Builder builder = builderWithDefaultsFor(sqlDialect);
SourceSchemaReference sourceSchemaReference =
sourceSchemaReferenceFrom(sqlDialect, dbName, namespace);
Expand Down Expand Up @@ -149,6 +151,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(

builder.setMaxPartitions(numPartitions);
builder = builder.setTables(ImmutableList.copyOf(tables));
builder = builder.setMaxFetchSize(fetchSize);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,20 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {

void setNumPartitions(Integer value);

@TemplateParameter.Text(
@TemplateParameter.Integer(
order = 9,
optional = true,
description = "The number of rows to fetch per page read for JDBC source.",
helpText =
"The number of rows to fetch per page read for JDBC source. If not set, the default of JdbcIO of 50_000 rows gets used."
+ " This ultimately translated to Statement.setFetchSize call at Jdbc layer. It should ONLY be used if the default value throws memory errors."
+ "In case you are using MySql source, please also note that FetchSize is ignored by the connector unless, you also set `useCursorFetch=true` as a connection property either in the source URL or the shard config file")
Integer getFetchSize();

void setFetchSize(Integer value);

@TemplateParameter.Text(
order = 10,
groupName = "Target",
description = "Cloud Spanner Instance Id.",
helpText = "The destination Cloud Spanner instance.")
Expand All @@ -127,7 +139,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setInstanceId(String value);

@TemplateParameter.Text(
order = 10,
order = 11,
groupName = "Target",
regexes = {"^[a-z]([a-z0-9_-]{0,28})[a-z0-9]$"},
description = "Cloud Spanner Database Id.",
Expand All @@ -137,7 +149,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setDatabaseId(String value);

@TemplateParameter.ProjectId(
order = 11,
order = 12,
groupName = "Target",
description = "Cloud Spanner Project Id.",
helpText = "This is the name of the Cloud Spanner project.")
Expand All @@ -146,7 +158,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setProjectId(String projectId);

@TemplateParameter.Text(
order = 12,
order = 13,
optional = true,
description = "Cloud Spanner Endpoint to call",
helpText = "The Cloud Spanner endpoint to call in the template.",
Expand All @@ -157,7 +169,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setSpannerHost(String value);

@TemplateParameter.Integer(
order = 13,
order = 14,
optional = true,
description = "Maximum number of connections to Source database per worker",
helpText =
Expand All @@ -169,7 +181,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setMaxConnections(Integer value);

@TemplateParameter.GcsReadFile(
order = 14,
order = 15,
optional = true,
description =
"Session File Path in Cloud Storage, to provide mapping information in the form of a session file",
Expand All @@ -182,7 +194,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setSessionFilePath(String value);

@TemplateParameter.GcsReadFile(
order = 15,
order = 16,
description = "Output directory for failed/skipped/filtered events",
helpText =
"This directory is used to dump the failed/skipped/filtered records in a migration.")
Expand All @@ -191,7 +203,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setOutputDirectory(String value);

@TemplateParameter.GcsReadFile(
order = 16,
order = 17,
optional = true,
description = "Custom jar location in Cloud Storage",
helpText =
Expand All @@ -202,7 +214,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setTransformationJarPath(String value);

@TemplateParameter.Text(
order = 17,
order = 18,
optional = true,
description = "Custom class name",
helpText =
Expand All @@ -214,7 +226,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setTransformationClassName(String value);

@TemplateParameter.Text(
order = 18,
order = 19,
optional = true,
description = "Custom parameters for transformation",
helpText =
Expand All @@ -225,7 +237,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setTransformationCustomParameters(String value);

@TemplateParameter.Text(
order = 19,
order = 20,
optional = true,
description = "Namespace",
helpText =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public static Builder builder() {
return new AutoValue_LocalCredentialsProvider.Builder();
}

@Override
public String toString() {
return "LocalCredentialsProvider{}";
}

@AutoValue.Builder
public abstract static class Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
if (tableConfig.maxPartitions() != null) {
jdbcIO = jdbcIO.withNumPartitions(tableConfig.maxPartitions());
}
if (config.maxFetchSize() != null) {
jdbcIO = jdbcIO.withFetchSize(config.maxFetchSize());
}
return jdbcIO;
}

Expand Down Expand Up @@ -443,6 +446,7 @@ private static PTransform<PBegin, PCollection<SourceRow>> getReadWithUniformPart
.setDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration))
.setDbAdapter(config.dialectAdapter())
.setApproxTotalRowCount(tableConfig.approxRowCount())
.setFetchSize(config.maxFetchSize())
.setRowMapper(
new JdbcSourceRowMapper(
config.valueMappingsProvider(),
Expand All @@ -463,7 +467,9 @@ private static PTransform<PBegin, PCollection<SourceRow>> getReadWithUniformPart
readWithUniformPartitionsBuilder =
readWithUniformPartitionsBuilder.setMaxPartitionsHint((long) tableConfig.maxPartitions());
}
return readWithUniformPartitionsBuilder.build();
ReadWithUniformPartitions readWithUniformPartitions = readWithUniformPartitionsBuilder.build();
LOG.info("Configured ReadWithUniformPartitions {} for {}", readWithUniformPartitions, config);
return readWithUniformPartitions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import javax.sql.DataSource;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter;
import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadAll;
import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -104,6 +106,14 @@ public abstract class ReadWithUniformPartitions<T> extends PTransform<PBegin, PC
*/
abstract JdbcIO.RowMapper<T> rowMapper();

/**
* Max fetch size for jdbc read. * If Null, {@link JdbcIO JdbcIO's} default fetch size of 50_000
* gets used. {@link JdbcIO.Read#withFetchSize(int)} recommends setting this manually only if the
* default value gives out of memory errors.
*/
@Nullable
abstract Integer fetchSize();

/**
* Hint for Maximum number of partitions of the source key space. If not set, it is auto inferred
* as 1/10 * sqrt({@link ReadWithUniformPartitions#autoAdjustMaxPartitions()}). Note that if
Expand Down Expand Up @@ -221,19 +231,42 @@ public PCollection<T> expand(PBegin input) {
Reshuffle.<Range>viaRandomKey().withNumBuckets(dbParallelizationForReads()))
.apply(
getTransformName("RangeRead", null),
JdbcIO.<Range, T>readAll()
.withOutputParallelization(false)
.withQuery(dbAdapter().getReadQuery(tableName(), colNames))
.withParameterSetter(rangePrepareator)
.withDataSourceProviderFn(dataSourceProviderFn())
.withRowMapper(rowMapper()));
buildJdbcIO(
JdbcIO.<Range, T>readAll(),
dbAdapter().getReadQuery(tableName(), colNames),
rangePrepareator,
dataSourceProviderFn(),
rowMapper(),
fetchSize()));
}

@VisibleForTesting
protected static <T> JdbcIO.ReadAll<Range, T> buildJdbcIO(
JdbcIO.ReadAll<Range, T> readAll,
String readQuery,
PreparedStatementSetter<Range> rangePrepareator,
SerializableFunction<Void, DataSource> dataSourceProviderFn,
RowMapper<T> rowMapper,
Integer fetchSize) {
ReadAll<Range, T> ret =
readAll
.withOutputParallelization(false)
.withQuery(readQuery)
.withParameterSetter(rangePrepareator)
.withDataSourceProviderFn(dataSourceProviderFn)
.withRowMapper(rowMapper);
if (fetchSize != null) {
ret = ret.withFetchSize(fetchSize);
}
return ret;
}

public static <T> Builder<T> builder() {
return new AutoValue_ReadWithUniformPartitions.Builder<T>()
.setCountQueryTimeoutMillis(SPLITTER_DEFAULT_COUNT_QUERY_TIMEOUT_MILLIS)
.setDbParallelizationForSplitProcess(null)
.setDbParallelizationForReads(null)
.setFetchSize(null)
.setAutoAdjustMaxPartitions(true);
}

Expand Down Expand Up @@ -451,6 +484,8 @@ public abstract Builder<T> setDataSourceProviderFn(

public abstract Builder<T> setRowMapper(JdbcIO.RowMapper<T> value);

public abstract Builder<T> setFetchSize(@Nullable Integer value);

public abstract Builder<T> setAdditionalOperationsOnRanges(
@Nullable PTransform<PCollection<ImmutableList<Range>>, ?> value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public JdbcIOWrapperConfig getJDBCIOWrapperConfig(
options.getJdbcDriverJars(),
options.getMaxConnections(),
options.getNumPartitions(),
waitOnSignal);
waitOnSignal,
options.getFetchSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public void testConfigWithMySqlDefaultsFromOptions() {
assertThat(config.dbAuth().getUserName().get()).isEqualTo(testUser);
assertThat(config.dbAuth().getPassword().get()).isEqualTo(testPassword);
assertThat(config.waitOn()).isNotNull();
assertThat(config.maxFetchSize()).isNull();
sourceDbToSpannerOptions.setFetchSize(42);
assertThat(
OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults(
sourceDbToSpannerOptions,
List.of("table1", "table2"),
null,
Wait.on(dummyPCollection))
.maxFetchSize())
.isEqualTo(42);
}

@Test
Expand All @@ -89,7 +99,8 @@ public void testConfigWithMySqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);

JdbcIOWrapperConfig configWithoutConnectionProperties =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
Expand All @@ -108,7 +119,8 @@ public void testConfigWithMySqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);

assertThat(configWithConnectionProperties.sourceDbURL())
.isEqualTo(
Expand Down Expand Up @@ -172,7 +184,8 @@ public void testConfigWithPostgreSqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);
JdbcIOWrapperConfig configWithoutConnectionParameters =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
SQLDialect.POSTGRESQL,
Expand All @@ -190,7 +203,8 @@ public void testConfigWithPostgreSqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);
assertThat(configWithoutConnectionParameters.sourceDbURL())
.isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=public");
assertThat(configWithConnectionParameters.sourceDbURL())
Expand Down Expand Up @@ -218,7 +232,8 @@ public void testConfigWithPostgreSqlUrlWithNamespace() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);
assertThat(configWithNamespace.sourceDbURL())
.isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=mynamespace");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc
assertThat(
jdbcIOWrapperWithFeatureEnabled.getTableReaders().values().stream().findFirst().get())
.isInstanceOf(ReadWithUniformPartitions.class);
// We test that setting the fetch size works for both modes. The more detailed testing of the
// fetch size getting applied to JdbcIO is covered in {@link ReadWithUniformPartitionTest}
assertThat(
JdbcIoWrapper.of(configWithFeatureEnabled.toBuilder().setMaxFetchSize(42).build())
.getTableReaders())
.hasSize(1);
assertThat(
JdbcIoWrapper.of(configWithFeatureDisabled.toBuilder().setMaxFetchSize(42).build())
.getTableReaders())
.hasSize(1);
}

@Test
Expand Down
Loading

0 comments on commit c5a74fc

Please sign in to comment.