From 3f6818485e1a4b344fec09798d3af5135f9e287f Mon Sep 17 00:00:00 2001 From: Brad Miro Date: Tue, 15 Sep 2020 20:02:09 -0400 Subject: [PATCH] adding submit job sample and updating submit job in quickstart (#284) --- .../snippets/src/main/java/Quickstart.java | 53 +++++---- .../snippets/src/main/java/SubmitJob.java | 98 +++++++++++++++++ .../src/test/java/QuickstartTest.java | 4 +- .../snippets/src/test/java/SubmitJobTest.java | 101 ++++++++++++++++++ 4 files changed, 225 insertions(+), 31 deletions(-) create mode 100644 dataproc/snippets/src/main/java/SubmitJob.java create mode 100644 dataproc/snippets/src/test/java/SubmitJobTest.java diff --git a/dataproc/snippets/src/main/java/Quickstart.java b/dataproc/snippets/src/main/java/Quickstart.java index d5ac76259f3..698c49db82e 100644 --- a/dataproc/snippets/src/main/java/Quickstart.java +++ b/dataproc/snippets/src/main/java/Quickstart.java @@ -38,6 +38,7 @@ import com.google.cloud.dataproc.v1.Job; import com.google.cloud.dataproc.v1.JobControllerClient; import com.google.cloud.dataproc.v1.JobControllerSettings; +import com.google.cloud.dataproc.v1.JobMetadata; import com.google.cloud.dataproc.v1.JobPlacement; import com.google.cloud.dataproc.v1.PySparkJob; import com.google.cloud.storage.Blob; @@ -49,6 +50,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class Quickstart { @@ -117,9 +120,9 @@ public static void quickstart( // Create the Cloud Dataproc cluster. OperationFuture createClusterAsyncRequest = clusterControllerClient.createClusterAsync(projectId, region, cluster); - Cluster response = createClusterAsyncRequest.get(); + Cluster clusterResponse = createClusterAsyncRequest.get(); System.out.println( - String.format("Cluster created successfully: %s", response.getClusterName())); + String.format("Cluster created successfully: %s", clusterResponse.getClusterName())); // Configure the settings for our job. JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); @@ -129,34 +132,26 @@ public static void quickstart( // Submit an asynchronous request to execute the job. Job request = jobControllerClient.submitJob(projectId, region, job); String jobId = request.getReference().getJobId(); - System.out.println(String.format("Submitted job \"%s\"", jobId)); + System.out.println(String.format("Submitting job \"%s\"", jobId)); // Wait for the job to finish. - CompletableFuture finishedJobFuture = - CompletableFuture.supplyAsync( - () -> waitForJobCompletion(jobControllerClient, projectId, region, jobId)); - int timeout = 10; - try { - Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES); - System.out.println(String.format("Job %s finished successfully.", jobId)); - - // Cloud Dataproc job output gets saved to a GCS bucket allocated to it. - Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName); - Storage storage = StorageOptions.getDefaultInstance().getService(); - Blob blob = - storage.get( - clusterInfo.getConfig().getConfigBucket(), - String.format( - "google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput.000000000", - clusterInfo.getClusterUuid(), jobId)); - System.out.println( - String.format( - "Job \"%s\" finished with state %s:\n%s", - jobId, jobInfo.getStatus().getState(), new String(blob.getContent()))); - } catch (TimeoutException e) { - System.err.println( - String.format("Job timed out after %d minutes: %s", timeout, e.getMessage())); - } + System.out.println(String.format("Job %s finished successfully.", jobId)); + + OperationFuture submitJobAsOperationAsyncRequest = + jobControllerClient.submitJobAsOperationAsync(projectId, region, job); + + Job jobResponse = submitJobAsOperationAsyncRequest.get(); + + // Print output from Google Cloud Storage. + Matcher matches = + Pattern.compile("gs://(.*?)/(.*)").matcher(jobResponse.getDriverOutputResourceUri()); + matches.matches(); + + Storage storage = StorageOptions.getDefaultInstance().getService(); + Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2))); + + System.out.println( + String.format("Job finished successfully: %s", new String(blob.getContent()))); // Delete the cluster. OperationFuture deleteClusterAsyncRequest = @@ -165,7 +160,7 @@ public static void quickstart( System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName)); } catch (ExecutionException e) { - System.err.println(String.format("Error executing quickstart: %s ", e.getMessage())); + System.err.println(String.format("quickstart: %s ", e.getMessage())); } } diff --git a/dataproc/snippets/src/main/java/SubmitJob.java b/dataproc/snippets/src/main/java/SubmitJob.java new file mode 100644 index 00000000000..d1f727e8671 --- /dev/null +++ b/dataproc/snippets/src/main/java/SubmitJob.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// [START dataproc_submit_job] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.HadoopJob; +import com.google.cloud.dataproc.v1.Job; +import com.google.cloud.dataproc.v1.JobControllerClient; +import com.google.cloud.dataproc.v1.JobControllerSettings; +import com.google.cloud.dataproc.v1.JobMetadata; +import com.google.cloud.dataproc.v1.JobPlacement; +import com.google.cloud.dataproc.v1.SparkJob; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.ExecutionException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SubmitJob { + + public static void submitJob() throws IOException, InterruptedException { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String region = "your-project-region"; + String clusterName = "your-cluster-name"; + submitJob(projectId, region, clusterName); + } + + public static void submitJob( + String projectId, String region, String clusterName) + throws IOException, InterruptedException { + String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region); + + // Configure the settings for the job controller client. + JobControllerSettings jobControllerSettings = + JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build(); + + // Create a job controller client with the configured settings. Using a try-with-resources + // closes the client, + // but this can also be done manually with the .close() method. + try (JobControllerClient jobControllerClient = + JobControllerClient.create(jobControllerSettings)) { + + // Configure cluster placement for the job. + JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build(); + + // Configure Spark job settings. + SparkJob sparkJob = + SparkJob.newBuilder() + .setMainClass("org.apache.spark.examples.SparkPi") + .addJarFileUris("file:///usr/lib/spark/examples/jars/spark-examples.jar") + .addArgs("1000") + .build(); + + Job job = Job.newBuilder().setPlacement(jobPlacement).setSparkJob(sparkJob).build(); + + // Submit an asynchronous request to execute the job. + OperationFuture submitJobAsOperationAsyncRequest = + jobControllerClient.submitJobAsOperationAsync(projectId, region, job); + + Job response = submitJobAsOperationAsyncRequest.get(); + + // Print output from Google Cloud Storage. + Matcher matches = + Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri()); + matches.matches(); + + Storage storage = StorageOptions.getDefaultInstance().getService(); + Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2))); + + System.out.println( + String.format("Job finished successfully: %s", new String(blob.getContent()))); + + } catch (ExecutionException e) { + // If the job does not complete successfully, print the error message. + System.err.println(String.format("submitJob: %s ", e.getMessage())); + } + } +} +// [END dataproc_submit_job] diff --git a/dataproc/snippets/src/test/java/QuickstartTest.java b/dataproc/snippets/src/test/java/QuickstartTest.java index bf31e0325d6..2a1afa46cd0 100644 --- a/dataproc/snippets/src/test/java/QuickstartTest.java +++ b/dataproc/snippets/src/test/java/QuickstartTest.java @@ -92,8 +92,8 @@ public void quickstartTest() throws IOException, InterruptedException { String output = bout.toString(); assertThat(output, CoreMatchers.containsString("Cluster created successfully")); - assertThat(output, CoreMatchers.containsString("Submitted job")); - assertThat(output, CoreMatchers.containsString("finished with state DONE:")); + assertThat(output, CoreMatchers.containsString("Submitting job")); + assertThat(output, CoreMatchers.containsString("Job finished successfully:")); assertThat(output, CoreMatchers.containsString("successfully deleted")); } diff --git a/dataproc/snippets/src/test/java/SubmitJobTest.java b/dataproc/snippets/src/test/java/SubmitJobTest.java new file mode 100644 index 00000000000..b532f8ca895 --- /dev/null +++ b/dataproc/snippets/src/test/java/SubmitJobTest.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.dataproc.v1.Cluster; +import com.google.cloud.dataproc.v1.ClusterControllerClient; +import com.google.cloud.dataproc.v1.ClusterControllerSettings; +import com.google.cloud.dataproc.v1.ClusterOperationMetadata; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SubmitJobTest { + + private static final String CLUSTER_NAME = + String.format("java-sj-test--%s", UUID.randomUUID().toString()); + private static final String REGION = "us-central1"; + private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String ENDPOINT = String.format("%s-dataproc.googleapis.com:443", REGION); + + private ByteArrayOutputStream bout; + + private static void requireEnv(String varName) { + assertNotNull( + String.format("Environment variable '%s' is required to perform these tests.", varName), + System.getenv(varName)); + } + + @BeforeClass + public static void checkRequirements() { + requireEnv("GOOGLE_APPLICATION_CREDENTIALS"); + requireEnv("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() throws IOException, ExecutionException, InterruptedException { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + // Create the Dataproc cluster. + Cluster cluster = Cluster.newBuilder().setClusterName(CLUSTER_NAME).build(); + OperationFuture createClusterAsyncRequest = + clusterControllerClient.createClusterAsync(PROJECT_ID, REGION, cluster); + createClusterAsyncRequest.get(); + } + } + + @Test + public void submitJobTest() throws IOException, InterruptedException { + SubmitJob.submitJob(PROJECT_ID, REGION, CLUSTER_NAME); + String output = bout.toString(); + + assertThat(output, CoreMatchers.containsString("Job finished successfully")); + } + + @After + public void tearDown() throws IOException, InterruptedException, ExecutionException { + + ClusterControllerSettings clusterControllerSettings = + ClusterControllerSettings.newBuilder().setEndpoint(ENDPOINT).build(); + + try (ClusterControllerClient clusterControllerClient = + ClusterControllerClient.create(clusterControllerSettings)) { + OperationFuture deleteClusterAsyncRequest = + clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME); + deleteClusterAsyncRequest.get(); + } + } +}