From 19a4997535729342c926a803371a2c097833551a Mon Sep 17 00:00:00 2001 From: bradmiro Date: Tue, 17 Dec 2019 16:15:33 -0500 Subject: [PATCH] Added quickstart test --- dataproc/quickstart/quickstart_test.py | 70 ++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 dataproc/quickstart/quickstart_test.py diff --git a/dataproc/quickstart/quickstart_test.py b/dataproc/quickstart/quickstart_test.py new file mode 100644 index 000000000000..e9a32c38ab65 --- /dev/null +++ b/dataproc/quickstart/quickstart_test.py @@ -0,0 +1,70 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid +import pytest + +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage + +import quickstart + + +PROJECT_ID = os.environ['GCLOUD_PROJECT'] +REGION = 'us-central1' +CLUSTER_NAME = 'test-cluster-{}'.format(str(uuid.uuid4())) +STAGING_BUCKET = 'test-bucket-{}'.format(str(uuid.uuid4())) +JOB_FILE_NAME = 'sum.py' +JOB_FILE_PATH = 'gs://{}/{}'.format(STAGING_BUCKET, JOB_FILE_NAME) +SORT_CODE = ( + "import pyspark\n" + "sc = pyspark.SparkContext()\n" + "rdd = sc.parallelize((1,2,3,4,5))\n" + "sum = rdd.reduce(lambda x, y: x + y)\n" +) + + +@pytest.fixture(autouse=True) +def setup_teardown(): + storage_client = storage.Client() + bucket = storage_client.create_bucket(STAGING_BUCKET) + blob = bucket.blob(JOB_FILE_NAME) + blob.upload_from_string(SORT_CODE) + + yield + + cluster_client = dataproc.ClusterControllerClient(client_options={ + 'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION) + }) + + # The quickstart sample deletes the cluster, but in the event that the + # test fails before cluster deletion occurs, it can be manually deleted here. + clusters = cluster_client.list_clusters(PROJECT_ID, REGION) + + for cluster in clusters: + if cluster.cluster_name == CLUSTER_NAME: + cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME) + + blob.delete() + + +def test_quickstart(capsys): + quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH) + + out, _ = capsys.readouterr() + assert 'Cluster created successfully' in out + assert 'Submitted job' in out + assert 'finished with state DONE:' in out + assert 'successfully deleted' in out