Skip to content

Commit

Permalink
fixup! [FLINK-22001] Rename JobMasterServiceFactoryNg to JobMasterSer…
Browse files Browse the repository at this point in the history
…viceFactory
  • Loading branch information
tillrohrmann committed Apr 21, 2021
1 parent 7b7e8a4 commit 583e4dc
Showing 1 changed file with 34 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
Expand All @@ -34,7 +34,6 @@
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
Expand All @@ -54,16 +53,11 @@ public class DefaultJobMasterServiceProcessTest extends TestLogger {
ArchivedExecutionGraph.createFromInitializingJob(
jobId, "test", JobStatus.FAILED, throwable, 1337));

private CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<>();
private TestingJobMasterServiceProcessFactory.TestingFutureJobMasterServiceFactory
jobMasterServiceFactoryNg =
new TestingJobMasterServiceProcessFactory.TestingFutureJobMasterServiceFactory(
jobMasterServiceFuture);

@Test
public void testCloseAfterInitializationFailure()
throws ExecutionException, InterruptedException {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
public void testCloseAfterInitializationFailure() throws Exception {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
jobMasterServiceFuture.completeExceptionally(new RuntimeException("Init error"));

serviceProcess.closeAsync().get();
Expand All @@ -77,7 +71,9 @@ public void testCloseAfterInitializationFailure()

@Test
public void testCloseAfterInitializationSuccess() throws Exception {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
TestingJobMasterService testingJobMasterService = new TestingJobMasterService();
jobMasterServiceFuture.complete(testingJobMasterService);

Expand All @@ -90,7 +86,9 @@ public void testCloseAfterInitializationSuccess() throws Exception {

@Test
public void testJobMasterTerminationIsHandled() {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
CompletableFuture<Void> jobMasterTerminationFuture = new CompletableFuture<>();
TestingJobMasterService testingJobMasterService =
new TestingJobMasterService("localhost", jobMasterTerminationFuture, null);
Expand All @@ -109,9 +107,10 @@ public void testJobMasterTerminationIsHandled() {
}

@Test
public void testJobMasterGatewayGetsForwarded()
throws ExecutionException, InterruptedException {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
public void testJobMasterGatewayGetsForwarded() throws Exception {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
TestingJobMasterGateway testingGateway = new TestingJobMasterGatewayBuilder().build();
TestingJobMasterService testingJobMasterService =
new TestingJobMasterService("localhost", null, testingGateway);
Expand All @@ -121,8 +120,10 @@ public void testJobMasterGatewayGetsForwarded()
}

@Test
public void testLeaderAddressGetsForwarded() throws ExecutionException, InterruptedException {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
public void testLeaderAddressGetsForwarded() throws Exception {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
String testingAddress = "yolohost";
TestingJobMasterService testingJobMasterService =
new TestingJobMasterService(testingAddress, null, null);
Expand All @@ -133,22 +134,27 @@ public void testLeaderAddressGetsForwarded() throws ExecutionException, Interrup

@Test
public void testIsNotInitialized() {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
DefaultJobMasterServiceProcess serviceProcess =
createTestInstance(new CompletableFuture<>());
assertThat(serviceProcess.isInitialized(), is(false));
}

@Test
public void testIsInitialized() {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);

jobMasterServiceFuture.complete(new TestingJobMasterService());

assertThat(serviceProcess.isInitialized(), is(true));
}

@Test
public void testSuccessOnTerminalState() throws ExecutionException, InterruptedException {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
public void testSuccessOnTerminalState() throws Exception {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
jobMasterServiceFuture.complete(new TestingJobMasterService());
ArchivedExecutionGraph archivedExecutionGraph =
new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
Expand All @@ -169,7 +175,9 @@ public void testSuccessOnTerminalState() throws ExecutionException, InterruptedE

@Test
public void testJobFinishedByOther() {
DefaultJobMasterServiceProcess serviceProcess = resetTestAndCreateInstance();
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
jobMasterServiceFuture.complete(new TestingJobMasterService());

serviceProcess.jobFinishedByOther();
Expand All @@ -179,16 +187,13 @@ public void testJobFinishedByOther() {
futureWillCompleteExceptionally(JobNotFinishedException.class, TIMEOUT));
}

private DefaultJobMasterServiceProcess resetTestAndCreateInstance() {
jobMasterServiceFuture = new CompletableFuture<>();
private DefaultJobMasterServiceProcess createTestInstance(
CompletableFuture<JobMasterService> jobMasterServiceFuture) {

jobMasterServiceFactoryNg =
new TestingJobMasterServiceProcessFactory.TestingFutureJobMasterServiceFactory(
jobMasterServiceFuture);
return new DefaultJobMasterServiceProcess(
jobId,
UUID.randomUUID(),
jobMasterServiceFactoryNg,
new TestingJobMasterServiceFactory(() -> jobMasterServiceFuture),
failedArchivedExecutionGraphFactory);
}
}

0 comments on commit 583e4dc

Please sign in to comment.