Skip to content

Commit

Permalink
docs: submit job sample + sample maintenance (#449)
Browse files Browse the repository at this point in the history
* adding submit-job sample, updating quickstart sample, updating docstrings

* prettier lint fixes

* prettier lint fixes

Co-authored-by: Benjamin E. Coe <bencoe@google.com>
  • Loading branch information
2 people authored and NimJay committed Nov 19, 2022
1 parent f9324c1 commit ce94154
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 57 deletions.
2 changes: 1 addition & 1 deletion dataproc/createCluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Code for creating a Cloud Dataproc cluster with the Node.js Client Library.
// Create a Dataproc cluster with the Node.js Client Library.

// sample-metadata:
// title: Create Cluster
Expand Down
2 changes: 1 addition & 1 deletion dataproc/instantiateInlineWorkflowTemplate.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

// This sample instantiates an inline workflow template using the client
// library for Cloud Dataproc.
// library for Dataproc.

// sample-metadata:
// title: Instantiate an inline workflow template
Expand Down
70 changes: 17 additions & 53 deletions dataproc/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// usage: node quickstart.js <PROJECT_ID> <REGION> <CLUSTER_NAME> <JOB_FILE_PATH>

// [START dataproc_quickstart]
// This quickstart sample walks a user through creating a Cloud Dataproc
// This quickstart sample walks a user through creating a Dataproc
// cluster, submitting a PySpark job from Google Cloud Storage to the
// cluster, reading the output of the job and deleting the cluster, all
// using the Node.js client library.
Expand Down Expand Up @@ -80,64 +80,33 @@ function main(projectId, region, clusterName, jobFilePath) {
},
};

let [jobResp] = await jobClient.submitJob(job);
const jobId = jobResp.reference.jobId;
const [jobOperation] = await jobClient.submitJobAsOperation(job);
const [jobResponse] = await jobOperation.promise();

console.log(`Submitted job "${jobId}".`);
const matches = jobResponse.driverOutputResourceUri.match(
'gs://(.*?)/(.*)'
);

// Terminal states for a job
const terminalStates = new Set(['DONE', 'ERROR', 'CANCELLED']);
const storage = new Storage();

// Create a timeout such that the job gets cancelled if not
// in a termimal state after a fixed period of time.
const timeout = 600000;
const start = new Date();
const output = await storage
.bucket(matches[1])
.file(`${matches[2]}.000000000`)
.download();

// Wait for the job to finish.
const jobReq = {
projectId: projectId,
region: region,
jobId: jobId,
};
// Output a success message.
console.log(`Job finished successfully: ${output}`);

while (!terminalStates.has(jobResp.status.state)) {
if (new Date() - timeout > start) {
await jobClient.cancelJob(jobReq);
console.log(
`Job ${jobId} timed out after threshold of ` +
`${timeout / 60000} minutes.`
);
break;
}
await sleep(1);
[jobResp] = await jobClient.getJob(jobReq);
}

const clusterReq = {
// Delete the cluster once the job has terminated.
const deleteClusterReq = {
projectId: projectId,
region: region,
clusterName: clusterName,
};

const [clusterResp] = await clusterClient.getCluster(clusterReq);

const storage = new Storage();

const output = await storage
.bucket(clusterResp.config.configBucket)
.file(
`google-cloud-dataproc-metainfo/${clusterResp.clusterUuid}/` +
`jobs/${jobId}/driveroutput.000000000`
)
.download();

// Output a success message.
console.log(
`Job ${jobId} finished with state ${jobResp.status.state}:\n${output}`
const [deleteOperation] = await clusterClient.deleteCluster(
deleteClusterReq
);

// Delete the cluster once the job has terminated.
const [deleteOperation] = await clusterClient.deleteCluster(clusterReq);
await deleteOperation.promise();

// Output a success message
Expand All @@ -147,11 +116,6 @@ function main(projectId, region, clusterName, jobFilePath) {
quickstart();
}

// Helper function to sleep for the given number of seconds
function sleep(seconds) {
return new Promise(resolve => setTimeout(resolve, seconds * 1000));
}

const args = process.argv.slice(2);

if (args.length !== 4) {
Expand Down
83 changes: 83 additions & 0 deletions dataproc/submitJob.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Submit a Spark job to a Dataproc cluster with the Node.js Client Library.

// sample-metadata:
// title: Submit Job
// usage: node submitJob.js <PROJECT_ID> <REGION> <CLUSTER_NAME>

/*eslint no-warning-comments: [0, { "terms": ["todo", "fixme"], "location": "anywhere" }]*/

function main(
projectId = 'YOUR_PROJECT_ID',
region = 'YOUR_CLUSTER_REGION',
clusterName = 'YOUR_CLUSTER_NAME'
) {
// [START dataproc_submit_job]
const dataproc = require('@google-cloud/dataproc');
const {Storage} = require('@google-cloud/storage');

// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_CLUSTER_REGION'
// clusterName = 'YOUR_CLUSTER_NAME'

// Create a client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
projectId: projectId,
});

async function submitJob() {
const job = {
projectId: projectId,
region: region,
job: {
placement: {
clusterName: clusterName,
},
sparkJob: {
mainClass: 'org.apache.spark.examples.SparkPi',
jarFileUris: [
'file:///usr/lib/spark/examples/jars/spark-examples.jar',
],
args: ['1000'],
},
},
};

const [jobOperation] = await jobClient.submitJobAsOperation(job);
const [jobResponse] = await jobOperation.promise();

const matches = jobResponse.driverOutputResourceUri.match(
'gs://(.*?)/(.*)'
);

const storage = new Storage();

const output = await storage
.bucket(matches[1])
.file(`${matches[2]}.000000000`)
.download();

// Output a success message.
console.log(`Job finished successfully: ${output}`);
// [END dataproc_submit_job]
}

submitJob();
}

main(...process.argv.slice(2));
3 changes: 1 addition & 2 deletions dataproc/system-test/quickstart.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ describe('execute the quickstart', () => {
`node quickstart.js "${projectId}" "${region}" "${clusterName}" "${jobFilePath}"`
);
assert.match(stdout, /Cluster created successfully/);
assert.match(stdout, /Submitted job/);
assert.match(stdout, /finished with state DONE:/);
assert.match(stdout, /Job finished successfully/);
assert.match(stdout, /successfully deleted/);
});

Expand Down
73 changes: 73 additions & 0 deletions dataproc/system-test/submitJob.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const {assert} = require('chai');
const {describe, it, before, after} = require('mocha');
const cp = require('child_process');
const {v4} = require('uuid');

const projectId = process.env.GCLOUD_PROJECT;
const region = 'us-central1';
const clusterName = `node-sj-test-${v4()}`;
const cluster = {
projectId: projectId,
region: region,
cluster: {
clusterName: clusterName,
config: {
masterConfig: {
numInstances: 1,
machineTypeUri: 'n1-standard-1',
},
workerConfig: {
numInstances: 2,
machineTypeUri: 'n1-standard-1',
},
},
},
};

const dataproc = require('@google-cloud/dataproc');
const clusterClient = new dataproc.v1.ClusterControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
});

const execSync = cmd =>
cp.execSync(cmd, {
encoding: 'utf-8',
});

describe('submit a Spark job to a Dataproc cluster', () => {
before(async () => {
const [operation] = await clusterClient.createCluster(cluster);
await operation.promise();
});

it('should submit a job to a dataproc cluster', async () => {
const stdout = execSync(
`node submitJob.js "${projectId}" "${region}" "${clusterName}"`
);
assert.match(stdout, new RegExp('Job finished successfully'));
});

after(async () => {
await clusterClient.deleteCluster({
projectId: projectId,
region: region,
clusterName: clusterName,
});
});
});

0 comments on commit ce94154

Please sign in to comment.