From f5cae841406375a27ec6ee823fa8f3e010dc8889 Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Wed, 27 Mar 2019 08:01:51 +0100 Subject: [PATCH] Add exportTimeout --- .../java/com/spotify/dbeam/args/JdbcExportArgs.java | 12 ++++++++++-- .../main/java/com/spotify/dbeam/beam/BeamHelper.java | 6 ++++-- .../java/com/spotify/dbeam/jobs/JdbcAvroJob.java | 4 +++- .../spotify/dbeam/options/JdbcExportArgsFactory.java | 4 +++- .../dbeam/options/JdbcExportPipelineOptions.java | 7 +++++++ .../com/spotify/dbeam/jobs/JdbcAvroJobTest.scala | 7 ++++--- 6 files changed, 31 insertions(+), 9 deletions(-) diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java index cb28a9b9..1937d136 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.sql.Connection; +import java.time.Duration; import java.util.Optional; @AutoValue @@ -39,6 +40,8 @@ public abstract class JdbcExportArgs implements Serializable { public abstract Boolean useAvroLogicalTypes(); + public abstract Duration exportTimeout(); + @AutoValue.Builder abstract static class Builder { @@ -52,26 +55,31 @@ abstract static class Builder { abstract Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes); + abstract Builder setExportTimeout(Duration exportTimeout); + abstract JdbcExportArgs build(); } public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs, QueryBuilderArgs queryBuilderArgs) { return create(jdbcAvroArgs, queryBuilderArgs, - "dbeam_generated", Optional.empty(), false); + "dbeam_generated", Optional.empty(), false, + Duration.ofHours(23)); } public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs, QueryBuilderArgs queryBuilderArgs, String avroSchemaNamespace, Optional avroDoc, - Boolean useAvroLogicalTypes) { + Boolean useAvroLogicalTypes, + Duration exportTimeout) { return new AutoValue_JdbcExportArgs.Builder() .setJdbcAvroOptions(jdbcAvroArgs) .setQueryBuilderArgs(queryBuilderArgs) .setAvroSchemaNamespace(avroSchemaNamespace) .setAvroDoc(avroDoc) .setUseAvroLogicalTypes(useAvroLogicalTypes) + .setExportTimeout(exportTimeout) .build(); } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/beam/BeamHelper.java b/dbeam-core/src/main/java/com/spotify/dbeam/beam/BeamHelper.java index c7d1ab50..9dd212f9 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/beam/BeamHelper.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/beam/BeamHelper.java @@ -33,14 +33,16 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.util.MimeTypes; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BeamHelper { private static Logger LOGGER = LoggerFactory.getLogger(BeamHelper.class); - public static PipelineResult waitUntilDone(PipelineResult result) { - PipelineResult.State state = result.waitUntilFinish(); + public static PipelineResult waitUntilDone(PipelineResult result, + Duration exportTimeout) { + PipelineResult.State state = result.waitUntilFinish(exportTimeout); if (!state.equals(PipelineResult.State.DONE)) { throw new Pipeline.PipelineExecutionException( new Exception("Job finished with state " + state.toString())); diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java index 3783dec9..7a9ac2a8 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java @@ -127,7 +127,9 @@ public PipelineOptions getPipelineOptions() { } public PipelineResult runAndWait() { - return BeamHelper.waitUntilDone(this.pipeline.run()); + return BeamHelper.waitUntilDone(this.pipeline.run(), + org.joda.time.Duration.millis( + jdbcExportArgs.exportTimeout().toMillis())); } public PipelineResult runExport() throws Exception { diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java index eb99ef69..b6883a3a 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java @@ -28,6 +28,7 @@ import com.spotify.dbeam.args.QueryBuilderArgs; import java.io.IOException; +import java.time.Duration; import java.util.Optional; import org.apache.beam.sdk.options.PipelineOptions; @@ -54,7 +55,8 @@ public static JdbcExportArgs fromPipelineOptions(PipelineOptions options) createQueryArgs(exportOptions), exportOptions.getAvroSchemaNamespace(), Optional.ofNullable(exportOptions.getAvroDoc()), - exportOptions.isUseAvroLogicalTypes() + exportOptions.isUseAvroLogicalTypes(), + Duration.parse(exportOptions.getExportTimeout()) ); } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java index f905297b..2e77b3fd 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportPipelineOptions.java @@ -90,4 +90,11 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions { String getAvroCodec(); void setAvroCodec(String value); + + @Default.String("PT23H") + @Description( + "Export timeout, after this duration the export will be terminated.") + String getExportTimeout(); + + void setExportTimeout(String value); } diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala index 1516981d..1fb58b4b 100644 --- a/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala @@ -74,6 +74,7 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll { "--targetParallelism=1", // no need for more threads when testing "--partition=2025-02-28", "--skipPartitionCheck", + "--exportTimeout=PT1M", "--connectionUrl=" + connectionUrl, "--username=", "--passwordFile=" + passwordFile.getAbsolutePath, @@ -96,15 +97,15 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll { "JdbcAvroJob" should "throw exception in case pipeline result finish with state FAILED" in { val mockResult = new PipelineResult { - override def waitUntilFinish(): PipelineResult.State = PipelineResult.State.FAILED + override def waitUntilFinish(): PipelineResult.State = null override def getState: PipelineResult.State = null override def cancel(): PipelineResult.State = null - override def waitUntilFinish(duration: Duration): PipelineResult.State = null + override def waitUntilFinish(duration: Duration): PipelineResult.State = PipelineResult.State.FAILED override def metrics(): MetricResults = null } the[PipelineExecutionException] thrownBy { - BeamHelper.waitUntilDone(mockResult) + BeamHelper.waitUntilDone(mockResult, Duration.standardMinutes(1)) } should have message "java.lang.Exception: Job finished with state FAILED" }