Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing Jdbc FetchSize Configuration for handling large rows #2028

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
jdbcDriverJars,
maxConnections,
numPartitions,
waitOn);
waitOn,
options.getFetchSize());
}

public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
Expand All @@ -95,7 +96,8 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
String jdbcDriverJars,
long maxConnections,
Integer numPartitions,
Wait.OnSignal<?> waitOn) {
Wait.OnSignal<?> waitOn,
Integer fetchSize) {
JdbcIOWrapperConfig.Builder builder = builderWithDefaultsFor(sqlDialect);
SourceSchemaReference sourceSchemaReference =
sourceSchemaReferenceFrom(sqlDialect, dbName, namespace);
Expand Down Expand Up @@ -149,6 +151,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(

builder.setMaxPartitions(numPartitions);
builder = builder.setTables(ImmutableList.copyOf(tables));
builder = builder.setMaxFetchSize(fetchSize);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,20 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {

void setNumPartitions(Integer value);

@TemplateParameter.Text(
@TemplateParameter.Integer(
order = 9,
optional = true,
description = "The number of rows to fetch per page read for JDBC source.",
helpText =
"The number of rows to fetch per page read for JDBC source. If not set, the default of JdbcIO of 50_000 rows gets used."
+ " This ultimately translated to Statement.setFetchSize call at Jdbc layer. It should ONLY be used if the default value throws memory errors."
+ "In case you are using MySql source, please also note that FetchSize is ignored by the connector unless, you also set `useCursorFetch=true` as a connection property either in the source URL or the shard config file")
Integer getFetchSize();

void setFetchSize(Integer value);

@TemplateParameter.Text(
order = 10,
groupName = "Target",
description = "Cloud Spanner Instance Id.",
helpText = "The destination Cloud Spanner instance.")
Expand All @@ -127,7 +139,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setInstanceId(String value);

@TemplateParameter.Text(
order = 10,
order = 11,
groupName = "Target",
regexes = {"^[a-z]([a-z0-9_-]{0,28})[a-z0-9]$"},
description = "Cloud Spanner Database Id.",
Expand All @@ -137,7 +149,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setDatabaseId(String value);

@TemplateParameter.ProjectId(
order = 11,
order = 12,
groupName = "Target",
description = "Cloud Spanner Project Id.",
helpText = "This is the name of the Cloud Spanner project.")
Expand All @@ -146,7 +158,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setProjectId(String projectId);

@TemplateParameter.Text(
order = 12,
order = 13,
optional = true,
description = "Cloud Spanner Endpoint to call",
helpText = "The Cloud Spanner endpoint to call in the template.",
Expand All @@ -157,7 +169,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setSpannerHost(String value);

@TemplateParameter.Integer(
order = 13,
order = 14,
optional = true,
description = "Maximum number of connections to Source database per worker",
helpText =
Expand All @@ -169,7 +181,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setMaxConnections(Integer value);

@TemplateParameter.GcsReadFile(
order = 14,
order = 15,
optional = true,
description =
"Session File Path in Cloud Storage, to provide mapping information in the form of a session file",
Expand All @@ -182,7 +194,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setSessionFilePath(String value);

@TemplateParameter.GcsReadFile(
order = 15,
order = 16,
description = "Output directory for failed/skipped/filtered events",
helpText =
"This directory is used to dump the failed/skipped/filtered records in a migration.")
Expand All @@ -191,7 +203,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setOutputDirectory(String value);

@TemplateParameter.GcsReadFile(
order = 16,
order = 17,
optional = true,
description = "Custom jar location in Cloud Storage",
helpText =
Expand All @@ -202,7 +214,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setTransformationJarPath(String value);

@TemplateParameter.Text(
order = 17,
order = 18,
optional = true,
description = "Custom class name",
helpText =
Expand All @@ -214,7 +226,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setTransformationClassName(String value);

@TemplateParameter.Text(
order = 18,
order = 19,
optional = true,
description = "Custom parameters for transformation",
helpText =
Expand All @@ -225,7 +237,7 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
void setTransformationCustomParameters(String value);

@TemplateParameter.Text(
order = 19,
order = 20,
optional = true,
description = "Namespace",
helpText =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public static Builder builder() {
return new AutoValue_LocalCredentialsProvider.Builder();
}

@Override
public String toString() {
return "LocalCredentialsProvider{}";
}

@AutoValue.Builder
public abstract static class Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ private static PTransform<PBegin, PCollection<SourceRow>> getJdbcIO(
if (tableConfig.maxPartitions() != null) {
jdbcIO = jdbcIO.withNumPartitions(tableConfig.maxPartitions());
}
if (config.maxFetchSize() != null) {
jdbcIO = jdbcIO.withFetchSize(config.maxFetchSize());
}
return jdbcIO;
}

Expand Down Expand Up @@ -443,6 +446,7 @@ private static PTransform<PBegin, PCollection<SourceRow>> getReadWithUniformPart
.setDataSourceProviderFn(JdbcIO.PoolableDataSourceProvider.of(dataSourceConfiguration))
.setDbAdapter(config.dialectAdapter())
.setApproxTotalRowCount(tableConfig.approxRowCount())
.setFetchSize(config.maxFetchSize())
.setRowMapper(
new JdbcSourceRowMapper(
config.valueMappingsProvider(),
Expand All @@ -463,7 +467,9 @@ private static PTransform<PBegin, PCollection<SourceRow>> getReadWithUniformPart
readWithUniformPartitionsBuilder =
readWithUniformPartitionsBuilder.setMaxPartitionsHint((long) tableConfig.maxPartitions());
}
return readWithUniformPartitionsBuilder.build();
ReadWithUniformPartitions readWithUniformPartitions = readWithUniformPartitionsBuilder.build();
LOG.info("Configured ReadWithUniformPartitions {} for {}", readWithUniformPartitions, config);
return readWithUniformPartitions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import javax.sql.DataSource;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter;
import org.apache.beam.sdk.io.jdbc.JdbcIO.ReadAll;
import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -104,6 +106,14 @@ public abstract class ReadWithUniformPartitions<T> extends PTransform<PBegin, PC
*/
abstract JdbcIO.RowMapper<T> rowMapper();

/**
* Max fetch size for jdbc read. * If Null, {@link JdbcIO JdbcIO's} default fetch size of 50_000
* gets used. {@link JdbcIO.Read#withFetchSize(int)} recommends setting this manually only if the
* default value gives out of memory errors.
*/
@Nullable
abstract Integer fetchSize();

/**
* Hint for Maximum number of partitions of the source key space. If not set, it is auto inferred
* as 1/10 * sqrt({@link ReadWithUniformPartitions#autoAdjustMaxPartitions()}). Note that if
Expand Down Expand Up @@ -221,19 +231,42 @@ public PCollection<T> expand(PBegin input) {
Reshuffle.<Range>viaRandomKey().withNumBuckets(dbParallelizationForReads()))
.apply(
getTransformName("RangeRead", null),
JdbcIO.<Range, T>readAll()
.withOutputParallelization(false)
.withQuery(dbAdapter().getReadQuery(tableName(), colNames))
.withParameterSetter(rangePrepareator)
.withDataSourceProviderFn(dataSourceProviderFn())
.withRowMapper(rowMapper()));
buildJdbcIO(
JdbcIO.<Range, T>readAll(),
dbAdapter().getReadQuery(tableName(), colNames),
rangePrepareator,
dataSourceProviderFn(),
rowMapper(),
fetchSize()));
}

@VisibleForTesting
protected static <T> JdbcIO.ReadAll<Range, T> buildJdbcIO(
JdbcIO.ReadAll<Range, T> readAll,
String readQuery,
PreparedStatementSetter<Range> rangePrepareator,
SerializableFunction<Void, DataSource> dataSourceProviderFn,
RowMapper<T> rowMapper,
Integer fetchSize) {
ReadAll<Range, T> ret =
readAll
.withOutputParallelization(false)
.withQuery(readQuery)
.withParameterSetter(rangePrepareator)
.withDataSourceProviderFn(dataSourceProviderFn)
.withRowMapper(rowMapper);
if (fetchSize != null) {
ret = ret.withFetchSize(fetchSize);
}
return ret;
}

public static <T> Builder<T> builder() {
return new AutoValue_ReadWithUniformPartitions.Builder<T>()
.setCountQueryTimeoutMillis(SPLITTER_DEFAULT_COUNT_QUERY_TIMEOUT_MILLIS)
.setDbParallelizationForSplitProcess(null)
.setDbParallelizationForReads(null)
.setFetchSize(null)
.setAutoAdjustMaxPartitions(true);
}

Expand Down Expand Up @@ -451,6 +484,8 @@ public abstract Builder<T> setDataSourceProviderFn(

public abstract Builder<T> setRowMapper(JdbcIO.RowMapper<T> value);

public abstract Builder<T> setFetchSize(@Nullable Integer value);

public abstract Builder<T> setAdditionalOperationsOnRanges(
@Nullable PTransform<PCollection<ImmutableList<Range>>, ?> value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ public JdbcIOWrapperConfig getJDBCIOWrapperConfig(
options.getJdbcDriverJars(),
options.getMaxConnections(),
options.getNumPartitions(),
waitOnSignal);
waitOnSignal,
options.getFetchSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public void testConfigWithMySqlDefaultsFromOptions() {
assertThat(config.dbAuth().getUserName().get()).isEqualTo(testUser);
assertThat(config.dbAuth().getPassword().get()).isEqualTo(testPassword);
assertThat(config.waitOn()).isNotNull();
assertThat(config.maxFetchSize()).isNull();
sourceDbToSpannerOptions.setFetchSize(42);
assertThat(
OptionsToConfigBuilder.getJdbcIOWrapperConfigWithDefaults(
sourceDbToSpannerOptions,
List.of("table1", "table2"),
null,
Wait.on(dummyPCollection))
.maxFetchSize())
.isEqualTo(42);
}

@Test
Expand All @@ -89,7 +99,8 @@ public void testConfigWithMySqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);

JdbcIOWrapperConfig configWithoutConnectionProperties =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
Expand All @@ -108,7 +119,8 @@ public void testConfigWithMySqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);

assertThat(configWithConnectionProperties.sourceDbURL())
.isEqualTo(
Expand Down Expand Up @@ -172,7 +184,8 @@ public void testConfigWithPostgreSqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);
JdbcIOWrapperConfig configWithoutConnectionParameters =
OptionsToConfigBuilder.getJdbcIOWrapperConfig(
SQLDialect.POSTGRESQL,
Expand All @@ -190,7 +203,8 @@ public void testConfigWithPostgreSqlUrlFromOptions() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);
assertThat(configWithoutConnectionParameters.sourceDbURL())
.isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=public");
assertThat(configWithConnectionParameters.sourceDbURL())
Expand Down Expand Up @@ -218,7 +232,8 @@ public void testConfigWithPostgreSqlUrlWithNamespace() {
"mysql-jar",
10,
0,
Wait.on(dummyPCollection));
Wait.on(dummyPCollection),
null);
assertThat(configWithNamespace.sourceDbURL())
.isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=mynamespace");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@ public void testReadWithUniformPartitionFeatureFlag() throws RetriableSchemaDisc
assertThat(
jdbcIOWrapperWithFeatureEnabled.getTableReaders().values().stream().findFirst().get())
.isInstanceOf(ReadWithUniformPartitions.class);
// We test that setting the fetch size works for both modes. The more detailed testing of the
// fetch size getting applied to JdbcIO is covered in {@link ReadWithUniformPartitionTest}
assertThat(
JdbcIoWrapper.of(configWithFeatureEnabled.toBuilder().setMaxFetchSize(42).build())
.getTableReaders())
.hasSize(1);
assertThat(
JdbcIoWrapper.of(configWithFeatureDisabled.toBuilder().setMaxFetchSize(42).build())
.getTableReaders())
.hasSize(1);
}

@Test
Expand Down
Loading
Loading