Skip to content

Commit

Permalink
fix: overhauled quickstart (#280)
Browse files Browse the repository at this point in the history
* Added create cluster sample for Cloud Dataproc

* Modify test and slight change to Dataproc success message

* Changed region tag to include 'dataproc'

* changed dataproc imports to explicit v1

* Added create cluster sample for Cloud Dataproc

Updated READMEs

ignore createCluster until it lands

Modify test and slight change to Dataproc success message

changed dataproc imports to explicit v1

* Adding updated Dataproc quickstart

* Added mocha commands imports to tests

Co-authored-by: Benjamin E. Coe <bencoe@google.com>
  • Loading branch information
2 people authored and NimJay committed Nov 19, 2022
1 parent 14f36df commit 4165794
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 47 deletions.
4 changes: 2 additions & 2 deletions dataproc/createCluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ function main(projectId, region, clusterName) {

async function createCluster() {
// TODO(developer): Uncomment and set the following variables
// project_id = 'YOUR_PROJECT_ID'
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_CLUSTER_REGION'
// cluster_name = 'YOUR_CLUSTER_NAME'
// clusterName = 'YOUR_CLUSTER_NAME'

// Create the cluster config
const request = {
Expand Down
6 changes: 3 additions & 3 deletions dataproc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
"test": "mocha system-test --timeout 600000"
},
"dependencies": {
"@google-cloud/dataproc": "^1.4.4",
"uuid": "^3.3.3",
"yargs": "^15.0.0"
"@google-cloud/dataproc": "^1.4.1",
"@google-cloud/storage": "^4.1.3",
"sleep": "^6.1.0"
},
"devDependencies": {
"chai": "^4.2.0",
Expand Down
161 changes: 127 additions & 34 deletions dataproc/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,135 @@

'use strict';

// [START dataproc_quickstart]
const dataproc = require('@google-cloud/dataproc');
const client = new dataproc.v1.ClusterControllerClient();

async function quickstart() {
const projectId = await client.getProjectId();
const request = {
region: 'global',
projectId,
};
const [resources] = await client.listClusters(request);
console.log('Total resources:', resources.length);
for (const resource of resources) {
console.log(resource);
}
function main(projectId, region, clusterName, jobFilePath) {
// [START dataproc_quickstart]
const dataproc = require('@google-cloud/dataproc').v1;
const {Storage} = require('@google-cloud/storage');

let nextRequest = request;
// Or obtain the paged response.
const options = {autoPaginate: false};
do {
const responses = await client.listClusters(nextRequest, options);
// The actual resources in a response.
const resources = responses[0];
// The next request if the response shows that there are more responses.
nextRequest = responses[1];
// The actual response object, if necessary.
// const rawResponse = responses[2];
for (const resource of resources) {
console.log(resource);
}
} while (nextRequest);
const sleep = require('sleep');

// Create a cluster client with the endpoint set to the desired cluster region
const clusterClient = new dataproc.ClusterControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
});

client.listClustersStream(request).on('data', element => {
console.log(element);
// Create a job client with the endpoint set to the desired cluster region
const jobClient = new dataproc.v1.JobControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
});

async function quickstart() {
// TODO(developer): Uncomment and set the following variables
// projectId = 'YOUR_PROJECT_ID'
// region = 'YOUR_CLUSTER_REGION'
// clusterName = 'YOUR_CLUSTER_NAME'
// jobFilePath = 'YOUR_JOB_FILE_PATH'

// Create the cluster config
const cluster = {
projectId: projectId,
region: region,
cluster: {
clusterName: clusterName,
config: {
masterConfig: {
numInstances: 1,
machineTypeUri: 'n1-standard-1',
},
workerConfig: {
numInstances: 2,
machineTypeUri: 'n1-standard-1',
},
},
},
};

// Create the cluster
const [operation] = await clusterClient.createCluster(cluster);
const [response] = await operation.promise();

// Output a success message
console.log(`Cluster created successfully: ${response.clusterName}`);

const job = {
projectId: projectId,
region: region,
job: {
placement: {
clusterName: clusterName,
},
pysparkJob: {
mainPythonFileUri: jobFilePath,
},
},
};

let [jobResp] = await jobClient.submitJob(job);
const jobId = jobResp.reference.jobId;

console.log(`Submitted job "${jobId}".`);

// Terminal states for a job
const terminalStates = new Set(['DONE', 'ERROR', 'CANCELLED']);

// Create a timeout such that the job gets cancelled if not
// in a termimal state after a fixed period of time.
const timeout = 600000;
const start = new Date();

// Wait for the job to finish.
const jobReq = {
projectId: projectId,
region: region,
jobId: jobId,
};

while (!terminalStates.has(jobResp.status.state)) {
if (new Date() - timeout > start) {
await jobClient.cancelJob(jobReq);
console.log(
`Job ${jobId} timed out after threshold of ` +
`${timeout / 60000} minutes.`
);
break;
}
await sleep.sleep(1);
[jobResp] = await jobClient.getJob(jobReq);
}

const clusterReq = {
projectId: projectId,
region: region,
clusterName: clusterName,
};

const [clusterResp] = await clusterClient.getCluster(clusterReq);

const storage = new Storage();

const output = await storage
.bucket(clusterResp.config.configBucket)
.file(
`google-cloud-dataproc-metainfo/${clusterResp.clusterUuid}/` +
`jobs/${jobId}/driveroutput.000000000`
)
.download();

// Output a success message.
console.log(
`Job ${jobId} finished with state ${jobResp.status.state}:\n${output}`
);

// Delete the cluster once the job has terminated.
const [deleteOperation] = await clusterClient.deleteCluster(clusterReq);
await deleteOperation.promise();

// Output a success message
console.log(`Cluster ${clusterName} successfully deleted.`);
}

quickstart();
// [END dataproc_quickstart]
}

quickstart();
// [END dataproc_quickstart]
main(...process.argv.slice(2));
6 changes: 3 additions & 3 deletions dataproc/system-test/createCluster.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ const cp = require('child_process');
const uuid = require('uuid');

const region = 'us-central1';
const clusterName = `test-${uuid()}`;
const clusterName = `node-cc-test-${uuid()}`;

const dataproc = require('@google-cloud/dataproc').v1;
const client = new dataproc.v1.ClusterControllerClient({
const clusterClient = new dataproc.ClusterControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
});

Expand All @@ -40,7 +40,7 @@ describe('create a dataproc cluster', () => {
});

after(async () => {
await client.deleteCluster({
await clusterClient.deleteCluster({
projectId: projectId,
region: region,
clusterName: clusterName,
Expand Down
69 changes: 64 additions & 5 deletions dataproc/system-test/quickstart.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,74 @@
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const {assert} = require('chai');
const {describe, it} = require('mocha');
const {describe, it, before, after} = require('mocha');
const cp = require('child_process');
const uuid = require('uuid');

const dataproc = require('@google-cloud/dataproc').v1;
const {Storage} = require('@google-cloud/storage');

const myUuid = uuid();
const region = 'us-central1';
const clusterName = `node-qs-test-${myUuid}`;
const bucketName = `node-dataproc-qs-test-${myUuid}`;
const projectId = process.env.GCLOUD_PROJECT;
const jobFileName = 'sum.py';
const jobFilePath = `gs://${bucketName}/${jobFileName}`;
const sortCode =
'import pyspark\n' +
'sc = pyspark.SparkContext()\n' +
'rdd = sc.parallelize((1,2,3,4,5))\n' +
'sum = rdd.reduce(lambda x, y: x + y)\n';

const clusterClient = new dataproc.v1.ClusterControllerClient({
apiEndpoint: `${region}-dataproc.googleapis.com`,
});

const storage = new Storage();

const execSync = cmd => cp.execSync(cmd, {encoding: 'utf-8'});

describe('dataproc samples', () => {
it('should run the quickstart', async () => {
const stdout = execSync('node quickstart');
assert.match(stdout, /Total resources:/);
describe('execute the quickstart', () => {
before(async () => {
const [bucket] = await storage.createBucket(bucketName);
await bucket.file(jobFileName).save(sortCode);
});

it('should execute the quickstart', async () => {
const stdout = execSync(
`node quickstart.js "${projectId}" "${region}" "${clusterName}" "${jobFilePath}"`
);
assert.match(stdout, /Cluster created successfully/);
assert.match(stdout, /Submitted job/);
assert.match(stdout, /finished with state DONE:/);
assert.match(stdout, /successfully deleted/);
});

after(async () => {
await storage
.bucket(bucketName)
.file(jobFileName)
.delete();
await storage.bucket(bucketName).delete();

const [clusters] = await clusterClient.listClusters({
projectId: projectId,
region: region,
});

for (const cluster of clusters) {
if (cluster.clusterName === clusterName) {
await clusterClient.deleteCluster({
projectId: projectId,
region: region,
clusterName: clusterName,
});
break;
}
}
});
});

0 comments on commit 4165794

Please sign in to comment.