From 583e4dc8b21fb145ec22130c797d88752079a067 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 21 Apr 2021 22:49:35 +0200 Subject: [PATCH] fixup! [FLINK-22001] Rename JobMasterServiceFactoryNg to JobMasterServiceFactory --- .../DefaultJobMasterServiceProcessTest.java | 63 ++++++++++--------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java index c9649feaffb98d..fe3fbcf532043f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; -import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceProcessFactory; +import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; @@ -34,7 +34,6 @@ import java.time.Duration; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.function.Function; import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; @@ -54,16 +53,11 @@ public class DefaultJobMasterServiceProcessTest extends TestLogger { ArchivedExecutionGraph.createFromInitializingJob( jobId, "test", JobStatus.FAILED, throwable, 1337)); - private CompletableFuture jobMasterServiceFuture = new CompletableFuture<>(); - private TestingJobMasterServiceProcessFactory.TestingFutureJobMasterServiceFactory - jobMasterServiceFactoryNg = - new TestingJobMasterServiceProcessFactory.TestingFutureJobMasterServiceFactory( - jobMasterServiceFuture); - @Test - public void testCloseAfterInitializationFailure() - throws ExecutionException, InterruptedException { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + public void testCloseAfterInitializationFailure() throws Exception { + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); jobMasterServiceFuture.completeExceptionally(new RuntimeException("Init error")); serviceProcess.closeAsync().get(); @@ -77,7 +71,9 @@ public void testCloseAfterInitializationFailure() @Test public void testCloseAfterInitializationSuccess() throws Exception { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); TestingJobMasterService testingJobMasterService = new TestingJobMasterService(); jobMasterServiceFuture.complete(testingJobMasterService); @@ -90,7 +86,9 @@ public void testCloseAfterInitializationSuccess() throws Exception { @Test public void testJobMasterTerminationIsHandled() { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); CompletableFuture jobMasterTerminationFuture = new CompletableFuture<>(); TestingJobMasterService testingJobMasterService = new TestingJobMasterService("localhost", jobMasterTerminationFuture, null); @@ -109,9 +107,10 @@ public void testJobMasterTerminationIsHandled() { } @Test - public void testJobMasterGatewayGetsForwarded() - throws ExecutionException, InterruptedException { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + public void testJobMasterGatewayGetsForwarded() throws Exception { + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); TestingJobMasterGateway testingGateway = new TestingJobMasterGatewayBuilder().build(); TestingJobMasterService testingJobMasterService = new TestingJobMasterService("localhost", null, testingGateway); @@ -121,8 +120,10 @@ public void testJobMasterGatewayGetsForwarded() } @Test - public void testLeaderAddressGetsForwarded() throws ExecutionException, InterruptedException { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + public void testLeaderAddressGetsForwarded() throws Exception { + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); String testingAddress = "yolohost"; TestingJobMasterService testingJobMasterService = new TestingJobMasterService(testingAddress, null, null); @@ -133,13 +134,16 @@ public void testLeaderAddressGetsForwarded() throws ExecutionException, Interrup @Test public void testIsNotInitialized() { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + DefaultJobMasterServiceProcess serviceProcess = + createTestInstance(new CompletableFuture<>()); assertThat(serviceProcess.isInitialized(), is(false)); } @Test public void testIsInitialized() { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); jobMasterServiceFuture.complete(new TestingJobMasterService()); @@ -147,8 +151,10 @@ public void testIsInitialized() { } @Test - public void testSuccessOnTerminalState() throws ExecutionException, InterruptedException { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + public void testSuccessOnTerminalState() throws Exception { + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); jobMasterServiceFuture.complete(new TestingJobMasterService()); ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build(); @@ -169,7 +175,9 @@ public void testSuccessOnTerminalState() throws ExecutionException, InterruptedE @Test public void testJobFinishedByOther() { - DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance(); + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); jobMasterServiceFuture.complete(new TestingJobMasterService()); serviceProcess.jobFinishedByOther(); @@ -179,16 +187,13 @@ public void testJobFinishedByOther() { futureWillCompleteExceptionally(JobNotFinishedException.class, TIMEOUT)); } - private DefaultJobMasterServiceProcess resetTestAndCreateInstance() { - jobMasterServiceFuture = new CompletableFuture<>(); + private DefaultJobMasterServiceProcess createTestInstance( + CompletableFuture jobMasterServiceFuture) { - jobMasterServiceFactoryNg = - new TestingJobMasterServiceProcessFactory.TestingFutureJobMasterServiceFactory( - jobMasterServiceFuture); return new DefaultJobMasterServiceProcess( jobId, UUID.randomUUID(), - jobMasterServiceFactoryNg, + new TestingJobMasterServiceFactory(() -> jobMasterServiceFuture), failedArchivedExecutionGraphFactory); } }