Skip to content

Commit

Permalink
Add exportTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Mar 27, 2019
1 parent 24872c7 commit f5cae84
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.Serializable;
import java.sql.Connection;
import java.time.Duration;
import java.util.Optional;

@AutoValue
Expand All @@ -39,6 +40,8 @@ public abstract class JdbcExportArgs implements Serializable {

public abstract Boolean useAvroLogicalTypes();

public abstract Duration exportTimeout();

@AutoValue.Builder
abstract static class Builder {

Expand All @@ -52,26 +55,31 @@ abstract static class Builder {

abstract Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);

abstract Builder setExportTimeout(Duration exportTimeout);

abstract JdbcExportArgs build();
}

public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs,
QueryBuilderArgs queryBuilderArgs) {
return create(jdbcAvroArgs, queryBuilderArgs,
"dbeam_generated", Optional.empty(), false);
"dbeam_generated", Optional.empty(), false,
Duration.ofHours(23));
}

public static JdbcExportArgs create(JdbcAvroArgs jdbcAvroArgs,
QueryBuilderArgs queryBuilderArgs,
String avroSchemaNamespace,
Optional<String> avroDoc,
Boolean useAvroLogicalTypes) {
Boolean useAvroLogicalTypes,
Duration exportTimeout) {
return new AutoValue_JdbcExportArgs.Builder()
.setJdbcAvroOptions(jdbcAvroArgs)
.setQueryBuilderArgs(queryBuilderArgs)
.setAvroSchemaNamespace(avroSchemaNamespace)
.setAvroDoc(avroDoc)
.setUseAvroLogicalTypes(useAvroLogicalTypes)
.setExportTimeout(exportTimeout)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.util.MimeTypes;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamHelper {
private static Logger LOGGER = LoggerFactory.getLogger(BeamHelper.class);

public static PipelineResult waitUntilDone(PipelineResult result) {
PipelineResult.State state = result.waitUntilFinish();
public static PipelineResult waitUntilDone(PipelineResult result,
Duration exportTimeout) {
PipelineResult.State state = result.waitUntilFinish(exportTimeout);
if (!state.equals(PipelineResult.State.DONE)) {
throw new Pipeline.PipelineExecutionException(
new Exception("Job finished with state " + state.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ public PipelineOptions getPipelineOptions() {
}

public PipelineResult runAndWait() {
return BeamHelper.waitUntilDone(this.pipeline.run());
return BeamHelper.waitUntilDone(this.pipeline.run(),
org.joda.time.Duration.millis(
jdbcExportArgs.exportTimeout().toMillis()));
}

public PipelineResult runExport() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.spotify.dbeam.args.QueryBuilderArgs;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -54,7 +55,8 @@ public static JdbcExportArgs fromPipelineOptions(PipelineOptions options)
createQueryArgs(exportOptions),
exportOptions.getAvroSchemaNamespace(),
Optional.ofNullable(exportOptions.getAvroDoc()),
exportOptions.isUseAvroLogicalTypes()
exportOptions.isUseAvroLogicalTypes(),
Duration.parse(exportOptions.getExportTimeout())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ public interface JdbcExportPipelineOptions extends DBeamPipelineOptions {
String getAvroCodec();

void setAvroCodec(String value);

@Default.String("PT23H")
@Description(
"Export timeout, after this duration the export will be terminated.")
String getExportTimeout();

void setExportTimeout(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll {
"--targetParallelism=1", // no need for more threads when testing
"--partition=2025-02-28",
"--skipPartitionCheck",
"--exportTimeout=PT1M",
"--connectionUrl=" + connectionUrl,
"--username=",
"--passwordFile=" + passwordFile.getAbsolutePath,
Expand All @@ -96,15 +97,15 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll {

"JdbcAvroJob" should "throw exception in case pipeline result finish with state FAILED" in {
val mockResult = new PipelineResult {
override def waitUntilFinish(): PipelineResult.State = PipelineResult.State.FAILED
override def waitUntilFinish(): PipelineResult.State = null
override def getState: PipelineResult.State = null
override def cancel(): PipelineResult.State = null
override def waitUntilFinish(duration: Duration): PipelineResult.State = null
override def waitUntilFinish(duration: Duration): PipelineResult.State = PipelineResult.State.FAILED
override def metrics(): MetricResults = null
}

the[PipelineExecutionException] thrownBy {
BeamHelper.waitUntilDone(mockResult)
BeamHelper.waitUntilDone(mockResult, Duration.standardMinutes(1))
} should have message "java.lang.Exception: Job finished with state FAILED"
}

Expand Down

0 comments on commit f5cae84

Please sign in to comment.