Skip to content

Commit

Permalink
Add exportTimeout arg (#56)
Browse files Browse the repository at this point in the history
* Add exportTimeout

* Cancel on timeout

* Document exportTimeout

* Default exportTimeout of PT0S

* Test failure to cancel

* Clean imports

* Update README

* Disable block on run
  • Loading branch information
labianchin committed Apr 16, 2019
1 parent 6266bcc commit 8c2fe6e
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 25 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ simply streams the table contents via JDBC into target location as Avro.
- `--passwordFile`: a path to a local file containing the database password
- `--limit`: limit the output number of rows, indefinite by default
- `--avroSchemaNamespace`: the namespace of the generated avro schema, `"dbeam_generated"` by default
- `--exportTimeout`: maximum time the export can take, after this timeout the job is cancelled. Default is `PT0S` (no timeout).
- `--partitionColumn`: the name of a date/timestamp column to filter data based on current partition
- `--partition`: the date of the current partition, parsed using [ISODateTimeFormat.localDateOptionalTimeParser](http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#localDateOptionalTimeParser--)
- `--partitionPeriod`: the period in which dbeam runs, used to filter based on current partition and also to check if executions are being run for a too old partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.Serializable;
import java.sql.Connection;
import java.time.Duration;
import java.util.Optional;

@AutoValue
Expand All @@ -39,6 +40,8 @@ public abstract class JdbcExportArgs implements Serializable {

public abstract Boolean useAvroLogicalTypes();

public abstract Duration exportTimeout();

@AutoValue.Builder
abstract static class Builder {

Expand All @@ -52,26 +55,31 @@ abstract static class Builder {

abstract Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);

abstract Builder setExportTimeout(Duration exportTimeout);

abstract JdbcExportArgs build();
}

public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs,
QueryBuilderArgs queryBuilderArgs) {
return create(jdbcAvroArgs, queryBuilderArgs,
"dbeam_generated", Optional.empty(), false);
"dbeam_generated", Optional.empty(), false,
Duration.ZERO);
}

public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs,
QueryBuilderArgs queryBuilderArgs,
String avroSchemaNamespace,
Optional<String> avroDoc,
Boolean useAvroLogicalTypes) {
Boolean useAvroLogicalTypes,
Duration exportTimeout) {
return new AutoValue_JdbcExportArgs.Builder()
.setJdbcAvroOptions(jdbcAvroArgs)
.setQueryBuilderArgs(queryBuilderArgs)
.setAvroSchemaNamespace(avroSchemaNamespace)
.setAvroDoc(avroDoc)
.setUseAvroLogicalTypes(useAvroLogicalTypes)
.setExportTimeout(exportTimeout)
.build();
}

Expand Down
21 changes: 19 additions & 2 deletions dbeam-core/src/main/java/com/spotify/dbeam/beam/BeamHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Map;

import org.apache.beam.sdk.Pipeline;
Expand All @@ -39,8 +40,24 @@
public class BeamHelper {
private static Logger LOGGER = LoggerFactory.getLogger(BeamHelper.class);

public static PipelineResult waitUntilDone(PipelineResult result) {
PipelineResult.State state = result.waitUntilFinish();
public static PipelineResult waitUntilDone(PipelineResult result,
Duration exportTimeout) {
PipelineResult.State state = result.waitUntilFinish(
org.joda.time.Duration.millis(
exportTimeout.toMillis()));
if (!state.isTerminal()) {
try {
result.cancel();
} catch (IOException e) {
throw new Pipeline.PipelineExecutionException(
new Exception(String.format(
"Job exceeded timeout of %s, but was not possible to cancel, "
+ "finished with state %s",
exportTimeout.toString(), state.toString()), e));
}
throw new Pipeline.PipelineExecutionException(
new Exception("Job cancelled after exceeding timeout " + exportTimeout.toString()));
}
if (!state.equals(PipelineResult.State.DONE)) {
throw new Pipeline.PipelineExecutionException(
new Exception("Job finished with state " + state.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.StreamSupport;

import org.apache.avro.Schema;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -66,6 +67,9 @@ public JdbcAvroJob(PipelineOptions pipelineOptions, Pipeline pipeline,

public static JdbcAvroJob create(PipelineOptions pipelineOptions)
throws IOException, ClassNotFoundException {
// make sure pipeline.run() does not call waitUntilFinish
// instead we call with an explicit duration/exportTimeout configuration
pipelineOptions.as(DirectOptions.class).setBlockOnRun(false);
return new JdbcAvroJob(pipelineOptions,
Pipeline.create(pipelineOptions),
JdbcExportArgsFactory.fromPipelineOptions(pipelineOptions),
Expand Down Expand Up @@ -131,7 +135,8 @@ public PipelineOptions getPipelineOptions() {
}

public PipelineResult runAndWait() {
return BeamHelper.waitUntilDone(this.pipeline.run());
return BeamHelper.waitUntilDone(this.pipeline.run(),
jdbcExportArgs.exportTimeout());
}

public PipelineResult runExport() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.spotify.dbeam.args.JdbcExportArgs;
import com.spotify.dbeam.args.QueryBuilderArgs;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.DateTime;
Expand All @@ -51,7 +52,8 @@ public static JdbcExportArgs fromPipelineOptions(PipelineOptions options)
createQueryArgs(exportOptions),
exportOptions.getAvroSchemaNamespace(),
Optional.ofNullable(exportOptions.getAvroDoc()),
exportOptions.isUseAvroLogicalTypes()
exportOptions.isUseAvroLogicalTypes(),
Duration.parse(exportOptions.getExportTimeout())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions {
Integer getQueryParallelism();

void setQueryParallelism(Integer value);

@Default.String("PT0S")
@Description(
"Export timeout, after this duration the export will be terminated.")
String getExportTimeout();

void setExportTimeout(String value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2017 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.dbeam.jobs

import java.io.IOException

import com.spotify.dbeam.beam.BeamHelper
import java.time.Duration

import org.apache.beam.sdk.Pipeline.PipelineExecutionException
import org.apache.beam.sdk.PipelineResult
import org.apache.beam.sdk.metrics.MetricResults
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner


@RunWith(classOf[JUnitRunner])
class BeamHelperTest extends FlatSpec with Matchers with BeforeAndAfterAll {

"BeamHelper" should "throw exception in case pipeline result finish with state FAILED" in {
val mockResult = new PipelineResult {
override def waitUntilFinish(): PipelineResult.State = null
override def getState: PipelineResult.State = null
override def cancel(): PipelineResult.State = null
override def waitUntilFinish(duration: org.joda.time.Duration): PipelineResult.State =
PipelineResult.State.FAILED
override def metrics(): MetricResults = null
}

the[PipelineExecutionException] thrownBy {
BeamHelper.waitUntilDone(mockResult, Duration.ofMinutes(1))
} should have message "java.lang.Exception: Job finished with state FAILED"
}

"BeamHelper" should "cancel in case of timeout" in {
val mockResult = new PipelineResult {
override def waitUntilFinish(): PipelineResult.State = null
override def getState: PipelineResult.State = null
override def cancel(): PipelineResult.State = null
override def waitUntilFinish(duration: org.joda.time.Duration): PipelineResult.State =
PipelineResult.State.RUNNING
override def metrics(): MetricResults = null
}

the[PipelineExecutionException] thrownBy {
BeamHelper.waitUntilDone(mockResult, Duration.ofMinutes(1))
} should have message "java.lang.Exception: Job cancelled after exceeding timeout PT1M"
}

"BeamHelper" should "fail after failure to cancel in case of timeout" in {
val mockResult = new PipelineResult {
override def waitUntilFinish(): PipelineResult.State = null
override def getState: PipelineResult.State = null
override def cancel(): PipelineResult.State =
throw new IOException("something wrong")
override def waitUntilFinish(duration: org.joda.time.Duration): PipelineResult.State =
PipelineResult.State.RUNNING
override def metrics(): MetricResults = null
}

the[PipelineExecutionException] thrownBy {
BeamHelper.waitUntilDone(mockResult, Duration.ofMinutes(1))
} should have message
"java.lang.Exception: Job exceeded timeout of PT1M, but was not possible to cancel, finished with state RUNNING"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ import java.util.{Comparator, UUID}

import com.spotify.dbeam.JdbcTestFixtures
import com.spotify.dbeam.avro.JdbcAvroMetering
import com.spotify.dbeam.beam.BeamHelper
import com.spotify.dbeam.options.OutputOptions
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.Pipeline.PipelineExecutionException
import org.apache.beam.sdk.PipelineResult
import org.apache.beam.sdk.io.AvroSource
import org.apache.beam.sdk.metrics.MetricResults
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.testing.SourceTestUtils
import org.joda.time.Duration
import org.junit.runner.RunWith
import org.scalatest._
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -74,6 +69,7 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll {
"--targetParallelism=1", // no need for more threads when testing
"--partition=2025-02-28",
"--skipPartitionCheck",
"--exportTimeout=PT1M",
"--connectionUrl=" + connectionUrl,
"--username=",
"--passwordFile=" + passwordFile.getAbsolutePath,
Expand All @@ -94,20 +90,6 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll {
records should have size 2
}

"JdbcAvroJob" should "throw exception in case pipeline result finish with state FAILED" in {
val mockResult = new PipelineResult {
override def waitUntilFinish(): PipelineResult.State = PipelineResult.State.FAILED
override def getState: PipelineResult.State = null
override def cancel(): PipelineResult.State = null
override def waitUntilFinish(duration: Duration): PipelineResult.State = null
override def metrics(): MetricResults = null
}

the[PipelineExecutionException] thrownBy {
BeamHelper.waitUntilDone(mockResult)
} should have message "java.lang.Exception: Job finished with state FAILED"
}

"JdbcAvroJob" should "have a default exit code" in {
ExceptionHandling.exitCode(new IllegalStateException()) should be (49)
}
Expand Down

0 comments on commit 8c2fe6e

Please sign in to comment.