From b43d2b0d6d0decaeb1ae1381e7e5dd618e56ab43 Mon Sep 17 00:00:00 2001 From: Anish Chakraborty Date: Thu, 28 Mar 2019 09:30:11 +0100 Subject: [PATCH] Add support for parallel exports (#52) * add draft support for parallel exports * separate functions for split queries * fix doc * fix logic and add test cases * add check for -ve parallelism * refactor metering for parallel exports * check result set * rename to query parallelism to not conflict with beam's targetParallelism * fix record gauge reporting * change order of e2e test * fix metering for parallel queries * add e2e test for parallel query --- .gitignore | 1 + .../spotify/dbeam/args/QueryBuilderArgs.java | 180 +++++++++++++++-- .../com/spotify/dbeam/avro/JdbcAvroIO.java | 6 +- .../spotify/dbeam/avro/JdbcAvroMetering.java | 19 +- .../com/spotify/dbeam/jobs/JdbcAvroJob.java | 6 +- .../dbeam/options/JdbcExportArgsFactory.java | 7 +- .../options/JdbcExportPipelineOptions.java | 19 +- .../com/spotify/dbeam/JdbcTestFixtures.scala | 9 +- .../dbeam/args/JdbcExportArgsTest.scala | 186 +++++++++++++----- .../dbeam/args/ParallelQueriesTest.scala | 105 ++++++++++ .../dbeam/avro/JdbcAvroRecordTest.scala | 13 +- .../spotify/dbeam/jobs/JdbcAvroJobTest.scala | 4 +- e2e/e2e.sh | 5 +- 13 files changed, 459 insertions(+), 101 deletions(-) create mode 100644 dbeam-core/src/test/scala/com/spotify/dbeam/args/ParallelQueriesTest.scala diff --git a/.gitignore b/.gitignore index badb7642..1a5daa2f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ target .repl *.swp .DS_Store +e2e/results/* diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java b/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java index 81465177..b8e9f600 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/args/QueryBuilderArgs.java @@ -2,7 +2,7 @@ * -\-\- * DBeam Core * -- - * Copyright (C) 2016 - 2018 Spotify AB + * Copyright (C) 2016 - 2019 Spotify AB * -- * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,21 +20,27 @@ package com.spotify.dbeam.args; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; - import java.io.Serializable; import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; - import org.joda.time.DateTime; import org.joda.time.Days; import org.joda.time.LocalDate; import org.joda.time.ReadablePeriod; /** - * A POJO describing a how to create a JDBC {@link Connection}. + * A POJO describing how to create queries for DBeam exports. */ @AutoValue public abstract class QueryBuilderArgs implements Serializable { @@ -49,6 +55,10 @@ public abstract class QueryBuilderArgs implements Serializable { public abstract ReadablePeriod partitionPeriod(); + public abstract Optional splitColumn(); + + public abstract Optional queryParallelism(); + public abstract Builder builder(); @AutoValue.Builder @@ -70,6 +80,15 @@ public abstract static class Builder { public abstract Builder setPartitionPeriod(ReadablePeriod partitionPeriod); + public abstract Builder setSplitColumn(String splitColumn); + + public abstract Builder setSplitColumn(Optional splitColumn); + + public abstract Builder setQueryParallelism(Integer parallelism); + + public abstract Builder setQueryParallelism(Optional queryParallelism); + + public abstract QueryBuilderArgs build(); } @@ -78,29 +97,160 @@ private static Boolean checkTableName(String tableName) { } public static QueryBuilderArgs create(String tableName) { - Preconditions.checkArgument(tableName != null, - "TableName cannot be null"); - Preconditions.checkArgument(checkTableName(tableName), - "'table' must follow [a-zA-Z_][a-zA-Z0-9_]*"); + checkArgument(tableName != null, + "TableName cannot be null"); + checkArgument(checkTableName(tableName), + "'table' must follow [a-zA-Z_][a-zA-Z0-9_]*"); return new AutoValue_QueryBuilderArgs.Builder() .setTableName(tableName) .setPartitionPeriod(Days.ONE) .build(); } - public Iterable buildQueries() { + /** + * Create queries to be executed for the export job. + * + * @param connection A connection which is used to determine limits for parallel queries. + * @return A list of queries to be executed. + * @throws SQLException when it fails to find out limits for splits. + */ + public Iterable buildQueries(Connection connection) + throws SQLException { + checkArgument(!queryParallelism().isPresent() || splitColumn().isPresent(), + "Cannot use queryParallelism because no column to split is specified. " + + "Please specify column to use for splitting using --splitColumn"); + checkArgument(queryParallelism().isPresent() || !splitColumn().isPresent(), + "argument splitColumn has no effect since --queryParallelism is not specified"); + queryParallelism().ifPresent(p -> checkArgument(p > 0, + "Query Parallelism must be a positive number. Specified queryParallelism was %s", p)); + final String limit = this.limit().map(l -> String.format(" LIMIT %d", l)).orElse(""); - final String where = this.partitionColumn().flatMap( + + final String partitionCondition = this.partitionColumn().flatMap( partitionColumn -> this.partition().map(partition -> { final LocalDate datePartition = partition.toLocalDate(); final String nextPartition = datePartition.plus(partitionPeriod()).toString(); - return String.format(" WHERE %s >= '%s' AND %s < '%s'", - partitionColumn, datePartition, partitionColumn, nextPartition); + return String.format(" AND %s >= '%s' AND %s < '%s'", + partitionColumn, datePartition, partitionColumn, nextPartition); }) ).orElse(""); - return Lists.newArrayList( - String.format("SELECT * FROM %s%s%s", this.tableName(), where, limit)); + + if (queryParallelism().isPresent() && splitColumn().isPresent()) { + + long[] minMax = findInputBounds(connection, this.tableName(), partitionCondition, + splitColumn().get()); + long min = minMax[0]; + long max = minMax[1]; + + String queryPrefix = String + .format("SELECT * FROM %s WHERE 1=1%s", this.tableName(), partitionCondition); + + return queriesForBounds(min, max, queryParallelism().get(), queryPrefix); + } else { + return Lists.newArrayList( + String.format("SELECT * FROM %s WHERE 1=1%s%s", this.tableName(), partitionCondition, + limit)); + } + } + + /** + * Helper function which finds the min and max limits for the given split column with the + * partition conditions. + * + * @return A long array of two elements, with [0] being min and [1] being max. + * @throws SQLException when there is an exception retrieving the max and min fails. + */ + private long[] findInputBounds(Connection connection, String tableName, String partitionCondition, + String splitColumn) + throws SQLException { + // Generate queries to get limits of split column. + String query = String.format( + "SELECT min(%s) as min_s, max(%s) as max_s FROM %s WHERE 1=1%s", + splitColumn, + splitColumn, + tableName, + partitionCondition); + long min; + long max; + try (Statement statement = connection.createStatement()) { + final ResultSet + resultSet = + statement.executeQuery(query); + // Check and make sure we have a record. This should ideally succeed always. + checkState(resultSet.next(), "Result Set for Min/Max returned zero records"); + + // min_s and max_s would both of the same type + switch (resultSet.getMetaData().getColumnType(1)) { + case Types.LONGVARBINARY: + case Types.BIGINT: + case Types.INTEGER: + min = resultSet.getLong("min_s"); + max = resultSet.getLong("max_s"); + break; + default: + throw new IllegalArgumentException("splitColumn should be of type Integer / Long"); + } + } + + return new long[]{min, max}; + } + + /** + * Given a min, max and expected queryParallelism, generate all required queries that should be + * executed. + */ + protected Iterable queriesForBounds(long min, long max, final int parallelism, + String queryPrefix) { + // We try not to generate more than queryParallelism. Hence we don't want to loose number by + // rounding down. Also when queryParallelism is higher than max - min, we don't want 0 queries + long bucketSize = (long) Math.ceil((double) (max - min) / (double) parallelism); + bucketSize = + bucketSize == 0 ? 1 : bucketSize; // If max and min is same, we export only 1 query + String limitWithParallelism = this.limit() + .map(l -> String.format(" LIMIT %d", l / parallelism)).orElse(""); + List queries = new ArrayList<>(parallelism); + + String parallelismCondition; + long i = min; + while (i + bucketSize < max) { + + // Include lower bound and exclude the upper bound. + parallelismCondition = + String.format(" AND %s >= %s AND %s < %s", + splitColumn().get(), + i, + splitColumn().get(), + i + bucketSize); + queries.add(String + .format("%s%s%s", queryPrefix, + parallelismCondition, + limitWithParallelism)); + i = i + bucketSize; + } + + // Add last query + if (i + bucketSize >= max) { + // If bucket size exceeds max, we must use max and the predicate + // should include upper bound. + parallelismCondition = + String.format(" AND %s >= %s AND %s <= %s", + splitColumn().get(), + i, + splitColumn().get(), + max); + queries.add(String + .format("%s%s%s", queryPrefix, + parallelismCondition, + limitWithParallelism)); + } + + // If queryParallelism is higher than max-min, this will generate less queries. + // But lets never generate more queries. + checkState(queries.size() <= parallelism, + "Unable to generate expected number of queries for given min max."); + + return queries; } } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java index 03aca955..03e3d36a 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java @@ -183,7 +183,7 @@ private ResultSet executeQuery(String query) throws Exception { "jdbcavroio : Executing query with fetchSize={} (this might take a few minutes) ...", statement.getFetchSize()); ResultSet resultSet = statement.executeQuery(); - this.metering.finishExecuteQuery(System.currentTimeMillis() - startTime); + this.metering.exposeExecuteQueryMs(System.currentTimeMillis() - startTime); return resultSet; } @@ -199,7 +199,7 @@ public void write(String query) throws Exception { final Map> mappings = JdbcAvroRecord.computeAllMappings(resultSet); final int columnCount = resultSet.getMetaData().getColumnCount(); - this.metering.startIterate(); + long startMs = metering.startWriteMeter(); while (resultSet.next()) { final GenericRecord genericRecord = JdbcAvroRecord.convertResultSetIntoAvroRecord( schema, resultSet, mappings, columnCount); @@ -207,7 +207,7 @@ public void write(String query) throws Exception { this.metering.incrementRecordCount(); } this.dataFileWriter.sync(); - this.metering.finishIterate(); + this.metering.exposeWriteElapsedMs(System.currentTimeMillis() - startMs); } } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroMetering.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroMetering.java index 074c7a8c..d4165597 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroMetering.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroMetering.java @@ -75,7 +75,7 @@ public void incrementRecordCount() { } } - public void exposeMetrics(long elapsedMs) { + public void exposeWriteElapsedMs(long elapsedMs) { logger.info(String.format("jdbcavroio : Read %d rows, took %5.2f seconds", rowCount, elapsedMs / 1000.0)); this.writeElapsedMs.inc(elapsedMs); @@ -88,17 +88,16 @@ public void exposeMetrics(long elapsedMs) { } } - public void startIterate() { - this.writeIterateStartTime = System.currentTimeMillis(); + public long startWriteMeter() { + long startTs = System.currentTimeMillis(); + this.writeIterateStartTime = startTs; + this.rowCount = 0; + return startTs; } - public void finishIterate() { - exposeMetrics(System.currentTimeMillis() - this.writeIterateStartTime); - } - - public void finishExecuteQuery(long elapsed) { + public void exposeExecuteQueryMs(long elapsedMs) { logger.info(String.format("jdbcavroio : Execute query took %5.2f seconds", - elapsed / 1000.0)); - this.executeQueryElapsedMs.inc(elapsed); + elapsedMs / 1000.0)); + this.executeQueryElapsedMs.inc(elapsedMs); } } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java index 3783dec9..ca35f7a8 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java @@ -92,7 +92,11 @@ public void prepareExport() throws Exception { this.pipeline, jdbcExportArgs); BeamHelper.saveStringOnSubPath(output, "/_AVRO_SCHEMA.avsc", generatedSchema.toString(true)); final List queries = StreamSupport.stream( - jdbcExportArgs.queryBuilderArgs().buildQueries().spliterator(), false) + jdbcExportArgs + .queryBuilderArgs() + .buildQueries(jdbcExportArgs.createConnection()) + .spliterator(), + false) .collect(Collectors.toList()); for (int i = 0; i < queries.size(); i++) { diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java index eb99ef69..67f0456c 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java @@ -2,7 +2,7 @@ * -\-\- * DBeam Core * -- - * Copyright (C) 2016 - 2018 Spotify AB + * Copyright (C) 2016 - 2019 Spotify AB * -- * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,12 @@ package com.spotify.dbeam.options; import com.google.common.base.Preconditions; - import com.spotify.dbeam.args.JdbcAvroArgs; import com.spotify.dbeam.args.JdbcConnectionArgs; import com.spotify.dbeam.args.JdbcExportArgs; import com.spotify.dbeam.args.QueryBuilderArgs; - import java.io.IOException; import java.util.Optional; - import org.apache.beam.sdk.options.PipelineOptions; import org.joda.time.DateTime; import org.joda.time.Days; @@ -80,6 +77,8 @@ public static QueryBuilderArgs createQueryArgs(JdbcExportPipelineOptions options .setPartitionColumn(partitionColumn) .setPartition(partition) .setPartitionPeriod(partitionPeriod) + .setSplitColumn(Optional.ofNullable(options.getSplitColumn())) + .setQueryParallelism(Optional.ofNullable(options.getQueryParallelism())) .build(); } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java index f905297b..8debea20 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java @@ -37,15 +37,15 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions { void setPartitionColumn(String value); @Description("By default, when partition column is not specified, " - + "fails if partition is too old. Set this flag to ignore this check.") + + "fails if partition is too old. Set this flag to ignore this check.") @Default.Boolean(false) Boolean isSkipPartitionCheck(); void setSkipPartitionCheck(Boolean value); @Description("The minimum partition required for the job not to fail " - + "(when partition column is not specified)," - + "by default `now() - 2*partitionPeriod`.") + + "(when partition column is not specified)," + + "by default `now() - 2*partitionPeriod`.") String getPartitionPeriod(); void setPartitionPeriod(String value); @@ -90,4 +90,17 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions { String getAvroCodec(); void setAvroCodec(String value); + + @Description( + "Column used to create splits in case of parallel exports. " + + "Should be used with queryParallelism") + String getSplitColumn(); + + void setSplitColumn(String value); + + @Description( + "Number of queries to run in parallel for exports. Should be used with splitColumn") + Integer getQueryParallelism(); + + void setQueryParallelism(Integer value); } diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/JdbcTestFixtures.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/JdbcTestFixtures.scala index 00e4d0e3..9565de94 100644 --- a/dbeam-core/src/test/scala/com/spotify/dbeam/JdbcTestFixtures.scala +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/JdbcTestFixtures.scala @@ -30,16 +30,16 @@ object JdbcTestFixtures { TimeZone.setDefault(TimeZone.getTimeZone("UTC")) type recordType = (String, Option[Int], BigDecimal, Float, Double, - Boolean, Int, Long, Timestamp, Option[Timestamp], Option[Byte], UUID) + Boolean, Int, Long, Timestamp, Option[Timestamp], Option[Byte], UUID, Long) val record1: recordType = ("costa rica caffee", None, BigDecimal("7.20"), (82.5).toFloat, (320.7).toDouble, true, 17, 200L, new java.sql.Timestamp(1488300933000L), None, Option(168.toByte), - UUID.fromString("123e4567-e89b-12d3-a456-426655440000")) + UUID.fromString("123e4567-e89b-12d3-a456-426655440000"), 1) val record2: recordType = ("colombian caffee", None, BigDecimal("9.20"), (87.5).toFloat, (230.7).toDouble, true, 13, 201L, new java.sql.Timestamp(1488300723000L), None, None, - UUID.fromString("123e4567-e89b-a456-12d3-426655440000")) + UUID.fromString("123e4567-e89b-a456-12d3-426655440000"), 2) def createFixtures(db: Database, records: Seq[recordType]): Unit = { class Coffees(tag: slick.jdbc.H2Profile.api.Tag) extends @@ -56,8 +56,9 @@ object JdbcTestFixtures { def updated = column[Option[java.sql.Timestamp]]("UPDATED") def bt = column[Option[Byte]]("BT") def uid = column[UUID]("UID") + def rownum = column[Long]("ROWNUM") def * = (name, supID, price, temperature, size, - isArabic, sales, total, created, updated, bt, uid) + isArabic, sales, total, created, updated, bt, uid, rownum) } val coffee = TableQuery[Coffees] diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/args/JdbcExportArgsTest.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/args/JdbcExportArgsTest.scala index 36b11f0b..50d63a22 100644 --- a/dbeam-core/src/test/scala/com/spotify/dbeam/args/JdbcExportArgsTest.scala +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/args/JdbcExportArgsTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2017 Spotify AB. + * Copyright 2017-2019 Spotify AB. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,10 @@ package com.spotify.dbeam.args +import java.sql.Connection import java.util.Optional +import com.spotify.dbeam.JdbcTestFixtures import com.spotify.dbeam.options.{JdbcExportArgsFactory, JdbcExportPipelineOptions} import org.apache.avro.file.CodecFactory import org.apache.beam.sdk.options.{PipelineOptions, PipelineOptionsFactory} @@ -26,18 +28,28 @@ import org.joda.time.{DateTime, DateTimeZone, Period} import org.junit.runner.RunWith import org.scalatest._ import org.scalatest.junit.JUnitRunner +import slick.jdbc.H2Profile.api._ import scala.collection.JavaConverters._ @RunWith(classOf[JUnitRunner]) class JdbcExportArgsTest extends FlatSpec with Matchers { + private val connectionUrl: String = + "jdbc:h2:mem:test;MODE=PostgreSQL;DATABASE_TO_UPPER=false;DB_CLOSE_DELAY=-1" + private val db = Database.forURL(connectionUrl, driver = "org.h2.Driver") + JdbcTestFixtures.createFixtures(db, Seq(JdbcTestFixtures.record1, JdbcTestFixtures.record2)) + private val connection: Connection = db.source.createConnection() + + private val baseQueryNoConditions = "SELECT * FROM some_table WHERE 1=1" + def optionsFromArgs(cmdLineArgs: String): JdbcExportArgs = { PipelineOptionsFactory.register(classOf[JdbcExportPipelineOptions]) val opts: PipelineOptions = - PipelineOptionsFactory.fromArgs(cmdLineArgs.split(" "):_*).withValidation().create() + PipelineOptionsFactory.fromArgs(cmdLineArgs.split(" "): _*).withValidation().create() JdbcExportArgsFactory.fromPipelineOptions(opts) } + it should "fail on missing table name" in { a[IllegalArgumentException] should be thrownBy { val tableName: String = null @@ -76,7 +88,7 @@ class JdbcExportArgsTest extends FlatSpec with Matchers { QueryBuilderArgs.create("some_table") ) - options should be (expected) + options should be(expected) } it should "fail to parse invalid table parameter" in { a[IllegalArgumentException] should be thrownBy { @@ -135,7 +147,7 @@ class JdbcExportArgsTest extends FlatSpec with Matchers { QueryBuilderArgs.create("some_table") ) - options should be (expected) + options should be(expected) } it should "parse correctly for mysql connection" in { val options = optionsFromArgs("--connectionUrl=jdbc:mysql://some_db --table=some_table " + @@ -150,7 +162,7 @@ class JdbcExportArgsTest extends FlatSpec with Matchers { ), QueryBuilderArgs.create("some_table") ) - options should be (expected) + options should be(expected) } it should "configure username" in { val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + @@ -165,103 +177,141 @@ class JdbcExportArgsTest extends FlatSpec with Matchers { QueryBuilderArgs.create("some_table") ) - options should be (expected) + options should be(expected) } it should "configure limit" in { val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + "--table=some_table --password=secret --limit=7").queryBuilderArgs() val expected = QueryBuilderArgs.create("some_table") - .builder().setLimit(7).build() - actual should be (expected) - actual.buildQueries().asScala should - contain theSameElementsAs Seq("SELECT * FROM some_table LIMIT 7") + .builder().setLimit(7).build() + actual should be(expected) + actual.buildQueries(connection).asScala should + contain theSameElementsAs Seq(s"$baseQueryNoConditions LIMIT 7") } it should "configure partition" in { val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + "--table=some_table --password=secret --partition=2027-07-31").queryBuilderArgs() val expected = QueryBuilderArgs.create("some_table") - .builder().setPartition(new DateTime(2027, 7, 31, 0, 0, DateTimeZone.UTC)).build() - actual should be (expected) - actual.buildQueries().asScala should - contain theSameElementsAs Seq("SELECT * FROM some_table") + .builder().setPartition(new DateTime(2027, 7, 31, 0, 0, DateTimeZone.UTC)).build() + actual should be(expected) + actual.buildQueries(connection).asScala should + contain theSameElementsAs Seq(baseQueryNoConditions) } it should "configure partition with full ISO date time (Styx cron syntax)" in { val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --partition=2027-07-31T13:37:59Z").queryBuilderArgs() val expected = QueryBuilderArgs.create("some_table") - .builder().setPartition(new DateTime(2027, 7, 31, 13, 37, 59, DateTimeZone.UTC)).build() - actual should be (expected) - actual.buildQueries().asScala should - contain theSameElementsAs Seq("SELECT * FROM some_table") + .builder().setPartition(new DateTime(2027, 7, 31, 13, 37, 59, DateTimeZone.UTC)).build() + actual should be(expected) + actual.buildQueries(connection).asScala should + contain theSameElementsAs Seq(baseQueryNoConditions) } it should "configure partition with month date (Styx monthly schedule)" in { val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + "--table=some_table --password=secret --partition=2027-05").queryBuilderArgs() val expected = QueryBuilderArgs.create("some_table") - .builder().setPartition(new DateTime(2027, 5, 1, 0, 0, 0, DateTimeZone.UTC)).build() - actual should be (expected) - actual.buildQueries().asScala should - contain theSameElementsAs Seq("SELECT * FROM some_table") + .builder().setPartition(new DateTime(2027, 5, 1, 0, 0, 0, DateTimeZone.UTC)).build() + actual should be(expected) + actual.buildQueries(connection).asScala should + contain theSameElementsAs Seq(baseQueryNoConditions) } it should "configure partition column" in { val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --partition=2027-07-31 --partitionColumn=col").queryBuilderArgs() val expected = QueryBuilderArgs.create("some_table") - .builder() - .setPartitionColumn("col") - .setPartition(new DateTime(2027, 7, 31, 0, 0, 0, DateTimeZone.UTC)).build() - actual should be (expected) - actual.buildQueries().asScala should - contain theSameElementsAs Seq("SELECT * FROM some_table " + - "WHERE col >= '2027-07-31' AND col < '2027-08-01'") + .builder() + .setPartitionColumn("col") + .setPartition(new DateTime(2027, 7, 31, 0, 0, 0, DateTimeZone.UTC)).build() + actual should be(expected) + actual.buildQueries(connection).asScala should + contain theSameElementsAs Seq(s"$baseQueryNoConditions " + + "AND col >= '2027-07-31' AND col < '2027-08-01'") } it should "configure partition column and limit" in { val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --partition=2027-07-31 --partitionColumn=col --limit=5").queryBuilderArgs() val expected = QueryBuilderArgs.create("some_table") - .builder() - .setLimit(5) - .setPartitionColumn("col") - .setPartition(new DateTime(2027, 7, 31, 0, 0, 0, DateTimeZone.UTC)).build() - actual should be (expected) + .builder() + .setLimit(5) + .setPartitionColumn("col") + .setPartition(new DateTime(2027, 7, 31, 0, 0, 0, DateTimeZone.UTC)).build() + actual should be(expected) - actual.buildQueries().asScala should + actual.buildQueries(connection).asScala should contain theSameElementsAs Seq( - "SELECT * FROM some_table WHERE col >= '2027-07-31'" + - " AND col < '2027-08-01' LIMIT 5") + s"$baseQueryNoConditions AND col >= '2027-07-31'" + + " AND col < '2027-08-01' LIMIT 5") } it should "configure partition column and partition period" in { val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --partition=2027-07-31 " + "--partitionColumn=col --partitionPeriod=P1M").queryBuilderArgs() val expected = QueryBuilderArgs.create("some_table") - .builder() - .setPartitionColumn("col") - .setPartitionPeriod(Period.parse("P1M")) - .setPartition(new DateTime(2027, 7, 31, 0, 0, 0, DateTimeZone.UTC)).build() - actual should be (expected) - actual.buildQueries().asScala should + .builder() + .setPartitionColumn("col") + .setPartitionPeriod(Period.parse("P1M")) + .setPartition(new DateTime(2027, 7, 31, 0, 0, 0, DateTimeZone.UTC)).build() + actual should be(expected) + actual.buildQueries(connection).asScala should + contain theSameElementsAs Seq( + s"$baseQueryNoConditions " + + "AND col >= '2027-07-31' AND col < '2027-08-31'") + } + + it should "create queries for split column of integer type" in { + val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=COFFEES " + + "--password=secret --splitColumn=ROWNUM --queryParallelism=5").queryBuilderArgs() + val baseCoffeesQueryNoConditions = "SELECT * FROM COFFEES WHERE 1=1" + val expected = QueryBuilderArgs.create("COFFEES") + .builder() + .setSplitColumn("ROWNUM") + .setQueryParallelism(5) // We have only two values of ROWNUM but still give a higher parallism + .build() + actual should be(expected) + actual.buildQueries(connection).asScala should contain theSameElementsAs Seq( - "SELECT * FROM some_table " + - "WHERE col >= '2027-07-31' AND col < '2027-08-31'") + s"$baseCoffeesQueryNoConditions " + + "AND ROWNUM >= 1 AND ROWNUM <= 2") } + + it should "create queries with partition column and split column with queryParallelism" in { + val actual = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=COFFEES " + + "--password=secret --partitionColumn=created --partition=2027-07-31 --partitionPeriod=P1M " + + "--splitColumn=ROWNUM --queryParallelism=5").queryBuilderArgs() + val baseCoffeesQueryNoConditions = "SELECT * FROM COFFEES WHERE 1=1 " + + "AND created >= '2027-07-31' AND created < '2027-08-31'" + val expected = QueryBuilderArgs.create("COFFEES") + .builder() + .setPartitionColumn("created") + .setPartitionPeriod(Period.parse("P1M")) + .setPartition(new DateTime(2027, 7, 31, 0, 0, 0, DateTimeZone.UTC)) + .setSplitColumn("ROWNUM") + .setQueryParallelism(5) // We have only two values of ROWNUM but still give a higher parallism + .build() + actual should be(expected) + actual.buildQueries(connection).asScala should + contain theSameElementsAs Seq( + s"$baseCoffeesQueryNoConditions " + + "AND ROWNUM >= 0 AND ROWNUM <= 0") + } + it should "configure avro schema namespace" in { val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --avroSchemaNamespace=ns") - options.avroSchemaNamespace() should be ("ns") + options.avroSchemaNamespace() should be("ns") } it should "configure avro doc" in { val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --avroDoc=doc") - options.avroDoc() should be (Optional.of("doc")) + options.avroDoc() should be(Optional.of("doc")) } it should "configure use avro logical types" in { val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + @@ -273,23 +323,23 @@ class JdbcExportArgsTest extends FlatSpec with Matchers { val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --fetchSize=1234") - options.jdbcAvroOptions().fetchSize() should be (1234) + options.jdbcAvroOptions().fetchSize() should be(1234) } it should "configure deflate compression level on avro codec" in { val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --avroCodec=deflate7") - options.jdbcAvroOptions().avroCodec() should be ("deflate7") + options.jdbcAvroOptions().avroCodec() should be("deflate7") options.jdbcAvroOptions().getCodecFactory.toString should - be (CodecFactory.deflateCodec(7).toString) + be(CodecFactory.deflateCodec(7).toString) } it should "configure snappy as avro codec" in { val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table " + "--password=secret --avroCodec=snappy") - options.jdbcAvroOptions().avroCodec() should be ("snappy") + options.jdbcAvroOptions().avroCodec() should be("snappy") options.jdbcAvroOptions().getCodecFactory.toString should - be (CodecFactory.snappyCodec().toString) + be(CodecFactory.snappyCodec().toString) } it should "fail on invalid avro codec" in { a[IllegalArgumentException] should be thrownBy { @@ -297,4 +347,36 @@ class JdbcExportArgsTest extends FlatSpec with Matchers { "--table=some_table --password=secret --avroCodec=lzma") } } + + it should "fail on queryParallelism with no split column" in { + a[IllegalArgumentException] should be thrownBy { + optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + + "--table=some_table --password=secret --queryParallelism=10") + .queryBuilderArgs().buildQueries(connection) + } + } + + it should "fail on split column is specified with no queryParallelism" in { + a[IllegalArgumentException] should be thrownBy { + optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + + "--table=some_table --password=secret --splitColumn=id") + .queryBuilderArgs().buildQueries(connection) + } + } + + it should "not accept 0 queryParallelism" in { + a[IllegalArgumentException] should be thrownBy { + optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + + "--table=some_table --password=secret --queryParallelism=0 --splitColumn=id") + .queryBuilderArgs().buildQueries(connection) + } + } + + it should "not accept -ve queryParallelism" in { + a[IllegalArgumentException] should be thrownBy { + optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db " + + "--table=some_table --password=secret --queryParallelism=-5 --splitColumn=id") + .queryBuilderArgs().buildQueries(connection) + } + } } diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/args/ParallelQueriesTest.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/args/ParallelQueriesTest.scala new file mode 100644 index 00000000..4326d597 --- /dev/null +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/args/ParallelQueriesTest.scala @@ -0,0 +1,105 @@ +/* + * Copyright 2017-2019 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.dbeam.args + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Matchers} + +import scala.collection.JavaConverters._ + +/** + * Additional tests for building parallel queries when splitColumn + * and queryParallelism is specified. + */ +@RunWith(classOf[JUnitRunner]) +class ParallelQueriesTest extends FlatSpec with Matchers { + + // These values are same for all tests + private val queryBuilder = QueryBuilderArgs + .create("some_table") // Things that + .builder() + .setSplitColumn("sp") + .build() + private val queryPrefix = "SEL ... tab" + + private def splitPointsToRanges(splitPoints: Seq[Long]): List[String] = { + val paired = splitPoints + .sorted + .sliding(2) + .toList + + paired + .reverse + .tail + .reverse.map(z => s"$queryPrefix AND sp >= ${z(0)} AND sp < ${z(1)}") ++ + paired + .lastOption + .map(z => s"$queryPrefix AND sp >= ${z(0)} AND sp <= ${z(1)}") // last element should be included + } + + it should "build appropriate parallel queries for a a given range" in { + val actual = queryBuilder.queriesForBounds(100, 400, 3, queryPrefix) + val expected = splitPointsToRanges(Seq(100, 200, 300, 400)) + + actual.asScala should contain theSameElementsAs expected + } + + it should "build appropriate parallel queries for a a given range which doesn't divide equally" in { + val actual = queryBuilder.queriesForBounds(100, 400, 7, queryPrefix) + val expected = splitPointsToRanges(Seq(100, 143, 186, 229, 272, 315, 358, 400)) + + actual.asScala should contain theSameElementsAs expected + } + + it should "parallel queries should have equal distribution of range" in { + val actual = queryBuilder.queriesForBounds(100, 400, 6, queryPrefix) + val expected = splitPointsToRanges(Seq(100, 150, 200, 250, 300, 350, 400)) + + actual.asScala should contain theSameElementsAs expected + } + + it should "build 1 query if queryParallelism is more than max-min" in { + val actual = queryBuilder.queriesForBounds(1, 2, 5, queryPrefix) + val expected = splitPointsToRanges(Seq(1, 2)) + + actual.asScala should contain theSameElementsAs expected + } + + it should "build 1 query when max and min are same" in { + val actual = queryBuilder.queriesForBounds(1, 1, 5, queryPrefix) + val expected = splitPointsToRanges(Seq(1, 1)) + + actual.asScala should contain theSameElementsAs expected + } + + it should "build 1 query when max and min are same and queryParallelism is 1" in { + val actual = queryBuilder.queriesForBounds(1, 1, 1, queryPrefix) + val expected = splitPointsToRanges(Seq(1, 1)) + + actual.asScala should contain theSameElementsAs expected + } + + it should "build 1 query when queryParallelism is 1" in { + val actual = queryBuilder.queriesForBounds(2, 345, 1, queryPrefix) + val expected = splitPointsToRanges(Seq(2, 345)) + + actual.asScala should contain theSameElementsAs expected + } + +} diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala index 25631bf6..931f5721 100644 --- a/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala @@ -42,7 +42,7 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { } it should "create schema" in { - val fieldCount = 12 + val fieldCount = 13 val actual: Schema = JdbcAvroSchema.createSchemaByReadingOneRow( db.source.createConnection(), "coffees", "dbeam_generated", @@ -55,7 +55,7 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { actual.getFields.size() should be (fieldCount) actual.getFields.asScala.map(_.name()) should be (List("COF_NAME", "SUP_ID", "PRICE", "TEMPERATURE", "SIZE", - "IS_ARABIC", "SALES", "TOTAL", "CREATED", "UPDATED", "BT", "UID")) + "IS_ARABIC", "SALES", "TOTAL", "CREATED", "UPDATED", "BT", "UID", "ROWNUM")) actual.getFields.asScala.map(_.schema().getType) should be (List.fill(fieldCount)(Schema.Type.UNION)) actual.getFields.asScala.map(_.schema().getTypes.get(0).getType) should @@ -74,12 +74,13 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { actual.getField("UPDATED").schema().getTypes.get(1).getProp("logicalType") should be (null) actual.getField("BT").schema().getTypes.get(1).getType should be (Schema.Type.INT) actual.getField("UID").schema().getTypes.get(1).getType should be (Schema.Type.BYTES) + actual.getField("ROWNUM").schema().getTypes.get(1).getType should be (Schema.Type.LONG) actual.toString shouldNot be (null) actual.getDoc should be ("Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test") } it should "create schema with logical types" in { - val fieldCount = 12 + val fieldCount = 13 val actual: Schema = JdbcAvroSchema.createSchemaByReadingOneRow( db.source.createConnection(), "coffees", "dbeam_generated", @@ -93,7 +94,7 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { actual.getFields.size() should be (fieldCount) actual.getFields.asScala.map(_.name()) should be (List("COF_NAME", "SUP_ID", "PRICE", "TEMPERATURE", "SIZE", - "IS_ARABIC", "SALES", "TOTAL", "CREATED", "UPDATED", "BT", "UID")) + "IS_ARABIC", "SALES", "TOTAL", "CREATED", "UPDATED", "BT", "UID", "ROWNUM")) actual.getFields.asScala.map(_.schema().getType) should be (List.fill(fieldCount)(Schema.Type.UNION)) actual.getFields.asScala.map(_.schema().getTypes.get(0).getType) should @@ -113,6 +114,7 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { be ("timestamp-millis") actual.getField("BT").schema().getTypes.get(1).getType should be (Schema.Type.INT) actual.getField("UID").schema().getTypes.get(1).getType should be (Schema.Type.BYTES) + actual.getField("ROWNUM").schema().getTypes.get(1).getType should be (Schema.Type.LONG) actual.toString shouldNot be (null) actual.getDoc should be ("Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test") } @@ -152,7 +154,7 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { record shouldNot be (null) record.getSchema should be (schema) - record.getSchema.getFields.size() should be (12) + record.getSchema.getFields.size() should be (13) record.get(0) should be (record1._1) record.get(1) should be (record1._2.map(x => x : java.lang.Integer).orNull) record.get(2) should be (record1._3.toString) @@ -165,6 +167,7 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { record.get(9) should be (record1._10.map(_.getTime).map(x => x : java.lang.Long).orNull) record.get(10) should be (record1._11.map(_.toInt).map(x => x : java.lang.Integer).orNull) record.get(11) should be (toByteBuffer(record1._12)) + record.get(12) should be (record1._13) } } diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala index 1516981d..6db7aa9d 100644 --- a/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala @@ -129,9 +129,9 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll { "JdbcAvroJob" should "increment counter metrics" in { val metering = new JdbcAvroMetering(1, 1) - metering.exposeMetrics(0) + metering.exposeWriteElapsedMs(0) metering.incrementRecordCount() - metering.exposeMetrics(0) + metering.exposeWriteElapsedMs(0) } } diff --git a/e2e/e2e.sh b/e2e/e2e.sh index 154b8c2c..4719218d 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -82,14 +82,15 @@ runDBeamDockerCon() { runScenario() { for ((i=1;i<=3;i++)); do runDBeamDockerCon "${@:2}" - jq -r "[\"$1\", .recordCount, .bytesWritten?, .writeElapsedMs, .msPerMillionRows] | @tsv" < "$OUTPUT/_METRICS.json" >> ./bench_dbeam_results + jq -r "[\"$1\", .recordCount, .writeElapsedMs, .msPerMillionRows, .bytesWritten?] | @tsv" < "$OUTPUT/_METRICS.json" >> ./bench_dbeam_results done } runSuite() { - printf 'scenario\trecords\tbytesWritten\twriteElapsedMs\tmsPerMillionRows\n' >> ./bench_dbeam_results + printf 'scenario\t\trecords\twriteElapsedMs\tmsPerMillionRows\tbytesWritten\n' >> ./bench_dbeam_results table=demo_table BINARY_TRANSFER='false' runScenario "deflate1t5" --avroCodec=deflate1 + BINARY_TRANSFER='false' runScenario "||query" --avroCodec=deflate1 --queryParallelism=5 --splitColumn=row_number }