Skip to content

Commit

Permalink
[FLINK-22001] Add missing check for empty job graph
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger authored and tillrohrmann committed Apr 21, 2021
1 parent 6a9e037 commit 6842f3e
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,6 @@ public void testJobClientSavepoint() throws Exception {
() -> jobClient.stopWithSavepoint(true, null).get());
}

@Test
public void testSubmissionError() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();

// JobGraph is not a valid job

JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();

assertThrows(
"Could not instantiate JobManager",
ExecutionException.class,
() ->
perJobMiniClusterFactory
.submitJob(jobGraph, ClassLoader.getSystemClassLoader())
.get());
}

@Test
public void testMultipleExecutions() throws Exception {
PerJobMiniClusterFactory perJobMiniClusterFactory = initializeMiniCluster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import org.apache.flink.util.Preconditions;

import static org.apache.flink.util.Preconditions.checkArgument;

/** Factory which creates a {@link JobMasterServiceLeadershipRunner}. */
public enum JobMasterServiceLeadershipRunnerFactory implements JobManagerRunnerFactory {
INSTANCE;
Expand All @@ -58,6 +60,9 @@ public JobManagerRunner createJobManagerRunner(
FatalErrorHandler fatalErrorHandler,
long initializationTimestamp)
throws Exception {

checkArgument(jobGraph.getNumberOfVertices() > 0, "The given job is empty");

final JobMasterConfiguration jobMasterConfiguration =
JobMasterConfiguration.fromConfiguration(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
Expand All @@ -36,9 +40,31 @@

import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/** Integration tests for the {@link JobMaster}. */
public class JobMasterITCase extends TestLogger {

@Test
public void testRejectionOfEmptyJobGraphs() throws Exception {
MiniCluster miniCluster =
new MiniCluster(
new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(1)
.build());
miniCluster.start();
JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();

try {
miniCluster.submitJob(jobGraph).get();
fail("Expect failure");
} catch (Throwable t) {
assertThat(t, containsMessage("The given job is empty"));
}
miniCluster.close();
}

/**
* This test is to guard against the issue reported in FLINK-22001, where any exception from the
* JobManager initialization was not forwarded to the user.
Expand Down

0 comments on commit 6842f3e

Please sign in to comment.