Skip to content

Commit

Permalink
Change default exportTimeout to P7D
Browse files Browse the repository at this point in the history
Because after Beam SDK 2.14, `PT0S` is no longer valid:

```
[main] ERROR com.spotify.dbeam.jobs.ExceptionHandling - Failure:
java.lang.IllegalArgumentException: maxCumulativeBackoff PT0S must be at least 1 millisecond
        at org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
        at org.apache.beam.sdk.util.FluentBackoff.withMaxCumulativeBackoff(FluentBackoff.java:131)
        at org.apache.beam.runners.dataflow.DataflowPipelineJob.getMessagesBackoff(DataflowPipelineJob.java:251)
        at org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:280)
        at org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:224)
        at org.apache.beam.runners.dataflow.DataflowPipelineJob.waitUntilFinish(DataflowPipelineJob.java:183)
        at com.spotify.dbeam.beam.BeamHelper.waitUntilDone(BeamHelper.java:45)
        at com.spotify.dbeam.jobs.JdbcAvroJob.runAndWait(JdbcAvroJob.java:135)
        at com.spotify.dbeam.jobs.JdbcAvroJob.runExport(JdbcAvroJob.java:141)
        at com.spotify.dbeam.jobs.JdbcAvroJob.main(JdbcAvroJob.java:148)
```
  • Loading branch information
labianchin committed Aug 6, 2019
1 parent 947c45f commit d777455
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 44 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ simply streams the table contents via JDBC into target location as Avro.
- `--passwordFile`: a path to a local file containing the database password
- `--limit`: limit the output number of rows, indefinite by default
- `--avroSchemaNamespace`: the namespace of the generated avro schema, `"dbeam_generated"` by default
- `--exportTimeout`: maximum time the export can take, after this timeout the job is cancelled. Default is `PT0S` (no timeout).
- `--exportTimeout`: maximum time the export can take, after this timeout the job is cancelled. Default is `P7D` (long timeout).
- `--partitionColumn`: the name of a date/timestamp column to filter data based on current partition
- `--partition`: the date of the current partition, parsed using [ISODateTimeFormat.localDateOptionalTimeParser](http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#localDateOptionalTimeParser--)
- `--partitionPeriod`: the period in which dbeam runs, used to filter based on current partition and also to check if executions are being run for a too old partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package com.spotify.dbeam.args;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;

import java.io.Serializable;
import java.sql.Connection;
Expand Down Expand Up @@ -60,11 +61,12 @@ abstract static class Builder {
abstract JdbcExportArgs build();
}

public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs,
@VisibleForTesting
static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs,
QueryBuilderArgs queryBuilderArgs) {
return create(jdbcAvroArgs, queryBuilderArgs,
"dbeam_generated", Optional.empty(), false,
Duration.ZERO);
Duration.ofDays(7));
}

public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions {

void setQueryParallelism(Integer value);

@Default.String("PT0S")
@Default.String("P7D")
@Description(
"Export timeout, after this duration the export will be terminated.")
String getExportTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.spotify.dbeam.jobs

import java.time.Duration
import java.util.Optional

import com.spotify.dbeam.JdbcTestFixtures
import com.spotify.dbeam.args.{JdbcAvroArgs, JdbcConnectionArgs, JdbcExportArgs, QueryBuilderArgs}
import org.joda.time.{DateTime, DateTimeZone, Days}
Expand All @@ -35,48 +38,38 @@ class PsqlReplicationCheckTest extends FlatSpec with Matchers with BeforeAndAfte
override def beforeAll(): Unit = {
JdbcTestFixtures.createFixtures(db, Seq(JdbcTestFixtures.record1))
}
private def createArgs(url: String = connectionUrl,
queryBuilderArgs: QueryBuilderArgs = QueryBuilderArgs.create("some_table")) = {
JdbcExportArgs.create(
JdbcAvroArgs.create(JdbcConnectionArgs.create(url)),
queryBuilderArgs,
"dbeam_generated", Optional.empty(), false,
Duration.ZERO
)
}

it should "fail with invalid driver" in {
val args = JdbcExportArgs.create(
JdbcAvroArgs.create(
JdbcConnectionArgs.create("jdbc:mysql://some_db")
.withUsername("dbeam-extractor")
.withPassword("secret")
),
QueryBuilderArgs.create("some_table")
)
val args = createArgs("jdbc:mysql://some_db")

a[IllegalArgumentException] should be thrownBy {
PsqlReplicationCheck.validateOptions(args)
}
}

it should "fail with missing partition" in {
val args = JdbcExportArgs.create(
JdbcAvroArgs.create(
JdbcConnectionArgs.create("jdbc:postgresql://some_db")
.withUsername("dbeam-extractor")
.withPassword("secret")
),
QueryBuilderArgs.create("some_table")
)
val args = createArgs("jdbc:postgresql://some_db")

a[IllegalArgumentException] should be thrownBy {
PsqlReplicationCheck.validateOptions(args)
}
}

it should "validate" in {
val args = JdbcExportArgs.create(
JdbcAvroArgs.create(
JdbcConnectionArgs.create("jdbc:postgresql://some_db")
.withUsername("dbeam-extractor")
.withPassword("secret")
),
val args = createArgs("jdbc:postgresql://some_db",
QueryBuilderArgs.create("some_table")
.builder()
.setPartition(new DateTime(2027, 7, 31, 0, 0, DateTimeZone.UTC)).build()
)
.setPartition(new DateTime(2027, 7, 31, 0, 0, DateTimeZone.UTC)).build(
))

PsqlReplicationCheck.validateOptions(args)
}
Expand Down Expand Up @@ -132,15 +125,11 @@ class PsqlReplicationCheckTest extends FlatSpec with Matchers with BeforeAndAfte
"parsedatetime('2017-02-01 23.58.57 UTC', 'yyyy-MM-dd HH.mm.ss z', 'en', 'UTC')" +
" AS last_replication, " +
"13 AS replication_delay"
val replicationCheck = new PsqlReplicationCheck(
JdbcExportArgs.create(
JdbcAvroArgs.create(
JdbcConnectionArgs.create(connectionUrl)),
QueryBuilderArgs.create("coffees").builder()
val args = createArgs(connectionUrl,
QueryBuilderArgs.create("coffees").builder()
.setPartition(DateTime.parse("2025-02-28T00:00:00"))
.build()),
query
)
.build())
val replicationCheck = new PsqlReplicationCheck(args, query)
val lastReplication = new DateTime(2017, 2, 1, 23, 58, 57, DateTimeZone.UTC)

val actual = replicationCheck.queryReplication()
Expand All @@ -157,15 +146,11 @@ class PsqlReplicationCheckTest extends FlatSpec with Matchers with BeforeAndAfte
"parsedatetime('2030-02-01 23.58.57 UTC', 'yyyy-MM-dd HH.mm.ss z', 'en', 'UTC')" +
" AS last_replication, " +
"13 AS replication_delay"
val replicationCheck = new PsqlReplicationCheck(
JdbcExportArgs.create(
JdbcAvroArgs.create(
JdbcConnectionArgs.create(connectionUrl)),
QueryBuilderArgs.create("coffees").builder()
val args = createArgs(connectionUrl,
QueryBuilderArgs.create("coffees").builder()
.setPartition(DateTime.parse("2025-02-28T00:00:00"))
.build()),
query
)
.build())
val replicationCheck = new PsqlReplicationCheck(args, query)
val lastReplication = new DateTime(2030, 2, 1, 23, 58, 57, DateTimeZone.UTC)

val actual = replicationCheck.queryReplication()
Expand Down

0 comments on commit d777455

Please sign in to comment.