From 373789eb5587aef960f9b029ed5034c35b37fd22 Mon Sep 17 00:00:00 2001 From: Brad Miro Date: Tue, 24 Mar 2020 12:17:21 -0400 Subject: [PATCH] feat: added dataproc workflows samples (#3056) * Added workflows sample --- .../instantiate_inline_workflow_template.py | 98 +++++++++++++++++++ ...stantiate_inline_workflow_template_test.py | 31 ++++++ 2 files changed, 129 insertions(+) create mode 100644 dataproc/instantiate_inline_workflow_template.py create mode 100644 dataproc/instantiate_inline_workflow_template_test.py diff --git a/dataproc/instantiate_inline_workflow_template.py b/dataproc/instantiate_inline_workflow_template.py new file mode 100644 index 000000000000..d492506bc72b --- /dev/null +++ b/dataproc/instantiate_inline_workflow_template.py @@ -0,0 +1,98 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This sample walks a user through instantiating an inline +# workflow for Cloud Dataproc using the Python client library. +# +# This script can be run on its own: +# python workflows.py ${PROJECT_ID} ${REGION} + +import sys +# [START dataproc_instantiate_inline_workflow_template] +from google.cloud import dataproc_v1 as dataproc + + +def instantiate_inline_workflow_template(project_id, region): + """This sample walks a user through submitting a workflow + for a Cloud Dataproc using the Python client library. + + Args: + project_id (string): Project to use for running the workflow. + region (string): Region where the workflow resources should live. + """ + + # Create a client with the endpoint set to the desired region. + workflow_template_client = dataproc.WorkflowTemplateServiceClient( + client_options={ + 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)} + ) + + parent = workflow_template_client.region_path(project_id, region) + + template = { + 'jobs': [ + { + 'hadoop_job': { + 'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/' + 'hadoop-mapreduce-examples.jar', + 'args': [ + 'teragen', + '1000', + 'hdfs:///gen/' + ] + }, + 'step_id': 'teragen' + }, + { + 'hadoop_job': { + 'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/' + 'hadoop-mapreduce-examples.jar', + 'args': [ + 'terasort', + 'hdfs:///gen/', + 'hdfs:///sort/' + ] + }, + 'step_id': 'terasort', + 'prerequisite_step_ids': [ + 'teragen' + ] + }], + 'placement': { + 'managed_cluster': { + 'cluster_name': 'my-managed-cluster', + 'config': { + 'gce_cluster_config': { + # Leave 'zone_uri' empty for 'Auto Zone Placement' + # 'zone_uri': '' + 'zone_uri': 'us-central1-a' + } + } + } + } + } + + # Submit the request to instantiate the workflow from an inline template. + operation = workflow_template_client.instantiate_inline_workflow_template( + parent, template + ) + operation.result() + + # Output a success message. + print('Workflow ran successfully.') +# [END dataproc_instantiate_inline_workflow_template] + + +if __name__ == "__main__": + instantiate_inline_workflow_template(sys.argv[1], sys.argv[2]) diff --git a/dataproc/instantiate_inline_workflow_template_test.py b/dataproc/instantiate_inline_workflow_template_test.py new file mode 100644 index 000000000000..6fe3711973b6 --- /dev/null +++ b/dataproc/instantiate_inline_workflow_template_test.py @@ -0,0 +1,31 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import instantiate_inline_workflow_template + + +PROJECT_ID = os.environ['GCLOUD_PROJECT'] +REGION = 'us-central1' + + +def test_workflows(capsys): + # Wrapper function for client library function + instantiate_inline_workflow_template.instantiate_inline_workflow_template( + PROJECT_ID, REGION + ) + + out, _ = capsys.readouterr() + assert "successfully" in out