Skip to content

Commit

Permalink
Add support for parallel exports (#52)
Browse files Browse the repository at this point in the history
* add draft support for parallel exports

* separate functions for split queries

* fix doc

* fix logic and add test cases

* add check for -ve parallelism

* refactor metering for parallel exports

* check result set

* rename to query parallelism to not conflict with beam's targetParallelism

* fix record gauge reporting

* change order of e2e test

* fix metering for parallel queries

* add e2e test for parallel query
  • Loading branch information
anish749 authored and labianchin committed Apr 16, 2019
1 parent e98ee90 commit ae3b30f
Show file tree
Hide file tree
Showing 13 changed files with 459 additions and 101 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ target
.repl
*.swp
.DS_Store
e2e/results/*
180 changes: 165 additions & 15 deletions dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* -\-\-
* DBeam Core
* --
* Copyright (C) 2016 - 2018 Spotify AB
* Copyright (C) 2016 - 2019 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,21 +20,27 @@

package com.spotify.dbeam.args;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.joda.time.DateTime;
import org.joda.time.Days;
import org.joda.time.LocalDate;
import org.joda.time.ReadablePeriod;

/**
* A POJO describing a how to create a JDBC {@link Connection}.
* A POJO describing how to create queries for DBeam exports.
*/
@AutoValue
public abstract class QueryBuilderArgs implements Serializable {
Expand All @@ -49,6 +55,10 @@ public abstract class QueryBuilderArgs implements Serializable {

public abstract ReadablePeriod partitionPeriod();

public abstract Optional<String> splitColumn();

public abstract Optional<Integer> queryParallelism();

public abstract Builder builder();

@AutoValue.Builder
Expand All @@ -70,6 +80,15 @@ public abstract static class Builder {

public abstract Builder setPartitionPeriod(ReadablePeriod partitionPeriod);

public abstract Builder setSplitColumn(String splitColumn);

public abstract Builder setSplitColumn(Optional<String> splitColumn);

public abstract Builder setQueryParallelism(Integer parallelism);

public abstract Builder setQueryParallelism(Optional<Integer> queryParallelism);


public abstract QueryBuilderArgs build();
}

Expand All @@ -78,29 +97,160 @@ private static Boolean checkTableName(String tableName) {
}

public static QueryBuilderArgs create(String tableName) {
Preconditions.checkArgument(tableName != null,
"TableName cannot be null");
Preconditions.checkArgument(checkTableName(tableName),
"'table' must follow [a-zA-Z_][a-zA-Z0-9_]*");
checkArgument(tableName != null,
"TableName cannot be null");
checkArgument(checkTableName(tableName),
"'table' must follow [a-zA-Z_][a-zA-Z0-9_]*");
return new AutoValue_QueryBuilderArgs.Builder()
.setTableName(tableName)
.setPartitionPeriod(Days.ONE)
.build();
}

public Iterable<String> buildQueries() {
/**
* Create queries to be executed for the export job.
*
* @param connection A connection which is used to determine limits for parallel queries.
* @return A list of queries to be executed.
* @throws SQLException when it fails to find out limits for splits.
*/
public Iterable<String> buildQueries(Connection connection)
throws SQLException {
checkArgument(!queryParallelism().isPresent() || splitColumn().isPresent(),
"Cannot use queryParallelism because no column to split is specified. "
+ "Please specify column to use for splitting using --splitColumn");
checkArgument(queryParallelism().isPresent() || !splitColumn().isPresent(),
"argument splitColumn has no effect since --queryParallelism is not specified");
queryParallelism().ifPresent(p -> checkArgument(p > 0,
"Query Parallelism must be a positive number. Specified queryParallelism was %s", p));

final String limit = this.limit().map(l -> String.format(" LIMIT %d", l)).orElse("");
final String where = this.partitionColumn().flatMap(

final String partitionCondition = this.partitionColumn().flatMap(
partitionColumn ->
this.partition().map(partition -> {
final LocalDate datePartition = partition.toLocalDate();
final String nextPartition = datePartition.plus(partitionPeriod()).toString();
return String.format(" WHERE %s >= '%s' AND %s < '%s'",
partitionColumn, datePartition, partitionColumn, nextPartition);
return String.format(" AND %s >= '%s' AND %s < '%s'",
partitionColumn, datePartition, partitionColumn, nextPartition);
})
).orElse("");
return Lists.newArrayList(
String.format("SELECT * FROM %s%s%s", this.tableName(), where, limit));

if (queryParallelism().isPresent() && splitColumn().isPresent()) {

long[] minMax = findInputBounds(connection, this.tableName(), partitionCondition,
splitColumn().get());
long min = minMax[0];
long max = minMax[1];

String queryPrefix = String
.format("SELECT * FROM %s WHERE 1=1%s", this.tableName(), partitionCondition);

return queriesForBounds(min, max, queryParallelism().get(), queryPrefix);
} else {
return Lists.newArrayList(
String.format("SELECT * FROM %s WHERE 1=1%s%s", this.tableName(), partitionCondition,
limit));
}
}

/**
* Helper function which finds the min and max limits for the given split column with the
* partition conditions.
*
* @return A long array of two elements, with [0] being min and [1] being max.
* @throws SQLException when there is an exception retrieving the max and min fails.
*/
private long[] findInputBounds(Connection connection, String tableName, String partitionCondition,
String splitColumn)
throws SQLException {
// Generate queries to get limits of split column.
String query = String.format(
"SELECT min(%s) as min_s, max(%s) as max_s FROM %s WHERE 1=1%s",
splitColumn,
splitColumn,
tableName,
partitionCondition);
long min;
long max;
try (Statement statement = connection.createStatement()) {
final ResultSet
resultSet =
statement.executeQuery(query);
// Check and make sure we have a record. This should ideally succeed always.
checkState(resultSet.next(), "Result Set for Min/Max returned zero records");

// min_s and max_s would both of the same type
switch (resultSet.getMetaData().getColumnType(1)) {
case Types.LONGVARBINARY:
case Types.BIGINT:
case Types.INTEGER:
min = resultSet.getLong("min_s");
max = resultSet.getLong("max_s");
break;
default:
throw new IllegalArgumentException("splitColumn should be of type Integer / Long");
}
}

return new long[]{min, max};
}

/**
* Given a min, max and expected queryParallelism, generate all required queries that should be
* executed.
*/
protected Iterable<String> queriesForBounds(long min, long max, final int parallelism,
String queryPrefix) {
// We try not to generate more than queryParallelism. Hence we don't want to loose number by
// rounding down. Also when queryParallelism is higher than max - min, we don't want 0 queries
long bucketSize = (long) Math.ceil((double) (max - min) / (double) parallelism);
bucketSize =
bucketSize == 0 ? 1 : bucketSize; // If max and min is same, we export only 1 query
String limitWithParallelism = this.limit()
.map(l -> String.format(" LIMIT %d", l / parallelism)).orElse("");
List<String> queries = new ArrayList<>(parallelism);

String parallelismCondition;
long i = min;
while (i + bucketSize < max) {

// Include lower bound and exclude the upper bound.
parallelismCondition =
String.format(" AND %s >= %s AND %s < %s",
splitColumn().get(),
i,
splitColumn().get(),
i + bucketSize);
queries.add(String
.format("%s%s%s", queryPrefix,
parallelismCondition,
limitWithParallelism));
i = i + bucketSize;
}

// Add last query
if (i + bucketSize >= max) {
// If bucket size exceeds max, we must use max and the predicate
// should include upper bound.
parallelismCondition =
String.format(" AND %s >= %s AND %s <= %s",
splitColumn().get(),
i,
splitColumn().get(),
max);
queries.add(String
.format("%s%s%s", queryPrefix,
parallelismCondition,
limitWithParallelism));
}

// If queryParallelism is higher than max-min, this will generate less queries.
// But lets never generate more queries.
checkState(queries.size() <= parallelism,
"Unable to generate expected number of queries for given min max.");

return queries;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private ResultSet executeQuery(String query) throws Exception {
"jdbcavroio : Executing query with fetchSize={} (this might take a few minutes) ...",
statement.getFetchSize());
ResultSet resultSet = statement.executeQuery();
this.metering.finishExecuteQuery(System.currentTimeMillis() - startTime);
this.metering.exposeExecuteQueryMs(System.currentTimeMillis() - startTime);
return resultSet;
}

Expand All @@ -199,15 +199,15 @@ public void write(String query) throws Exception {
final Map<Integer, JdbcAvroRecord.SqlFunction<ResultSet, Object>>
mappings = JdbcAvroRecord.computeAllMappings(resultSet);
final int columnCount = resultSet.getMetaData().getColumnCount();
this.metering.startIterate();
long startMs = metering.startWriteMeter();
while (resultSet.next()) {
final GenericRecord genericRecord = JdbcAvroRecord.convertResultSetIntoAvroRecord(
schema, resultSet, mappings, columnCount);
this.dataFileWriter.append(genericRecord);
this.metering.incrementRecordCount();
}
this.dataFileWriter.sync();
this.metering.finishIterate();
this.metering.exposeWriteElapsedMs(System.currentTimeMillis() - startMs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void incrementRecordCount() {
}
}

public void exposeMetrics(long elapsedMs) {
public void exposeWriteElapsedMs(long elapsedMs) {
logger.info(String.format("jdbcavroio : Read %d rows, took %5.2f seconds",
rowCount, elapsedMs / 1000.0));
this.writeElapsedMs.inc(elapsedMs);
Expand All @@ -88,17 +88,16 @@ public void exposeMetrics(long elapsedMs) {
}
}

public void startIterate() {
this.writeIterateStartTime = System.currentTimeMillis();
public long startWriteMeter() {
long startTs = System.currentTimeMillis();
this.writeIterateStartTime = startTs;
this.rowCount = 0;
return startTs;
}

public void finishIterate() {
exposeMetrics(System.currentTimeMillis() - this.writeIterateStartTime);
}

public void finishExecuteQuery(long elapsed) {
public void exposeExecuteQueryMs(long elapsedMs) {
logger.info(String.format("jdbcavroio : Execute query took %5.2f seconds",
elapsed / 1000.0));
this.executeQueryElapsedMs.inc(elapsed);
elapsedMs / 1000.0));
this.executeQueryElapsedMs.inc(elapsedMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ public void prepareExport() throws Exception {
this.pipeline, jdbcExportArgs);
BeamHelper.saveStringOnSubPath(output, "/_AVRO_SCHEMA.avsc", generatedSchema.toString(true));
final List<String> queries = StreamSupport.stream(
jdbcExportArgs.queryBuilderArgs().buildQueries().spliterator(), false)
jdbcExportArgs
.queryBuilderArgs()
.buildQueries(jdbcExportArgs.createConnection())
.spliterator(),
false)
.collect(Collectors.toList());

for (int i = 0; i < queries.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* -\-\-
* DBeam Core
* --
* Copyright (C) 2016 - 2018 Spotify AB
* Copyright (C) 2016 - 2019 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,15 +21,12 @@
package com.spotify.dbeam.options;

import com.google.common.base.Preconditions;

import com.spotify.dbeam.args.JdbcAvroArgs;
import com.spotify.dbeam.args.JdbcConnectionArgs;
import com.spotify.dbeam.args.JdbcExportArgs;
import com.spotify.dbeam.args.QueryBuilderArgs;

import java.io.IOException;
import java.util.Optional;

import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.DateTime;
import org.joda.time.Days;
Expand Down Expand Up @@ -80,6 +77,8 @@ public static QueryBuilderArgs createQueryArgs(JdbcExportPipelineOptions options
.setPartitionColumn(partitionColumn)
.setPartition(partition)
.setPartitionPeriod(partitionPeriod)
.setSplitColumn(Optional.ofNullable(options.getSplitColumn()))
.setQueryParallelism(Optional.ofNullable(options.getQueryParallelism()))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions {
void setPartitionColumn(String value);

@Description("By default, when partition column is not specified, "
+ "fails if partition is too old. Set this flag to ignore this check.")
+ "fails if partition is too old. Set this flag to ignore this check.")
@Default.Boolean(false)
Boolean isSkipPartitionCheck();

void setSkipPartitionCheck(Boolean value);

@Description("The minimum partition required for the job not to fail "
+ "(when partition column is not specified),"
+ "by default `now() - 2*partitionPeriod`.")
+ "(when partition column is not specified),"
+ "by default `now() - 2*partitionPeriod`.")
String getPartitionPeriod();

void setPartitionPeriod(String value);
Expand Down Expand Up @@ -90,4 +90,17 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions {
String getAvroCodec();

void setAvroCodec(String value);

@Description(
"Column used to create splits in case of parallel exports. "
+ "Should be used with queryParallelism")
String getSplitColumn();

void setSplitColumn(String value);

@Description(
"Number of queries to run in parallel for exports. Should be used with splitColumn")
Integer getQueryParallelism();

void setQueryParallelism(Integer value);
}
Loading

0 comments on commit ae3b30f

Please sign in to comment.