From 1cf8278dd8ad486c89fd77f63a1994d175303e8a Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Thu, 30 Jan 2020 18:17:46 -0800 Subject: [PATCH 01/14] adding diagnose me component --- .../diagnostics/diagnose_me/component.py | 120 +++++++++++++ .../diagnostics/diagnose_me/component.yaml | 170 ++++++++++++++++++ 2 files changed, 290 insertions(+) create mode 100644 components/diagnostics/diagnose_me/component.py create mode 100644 components/diagnostics/diagnose_me/component.yaml diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py new file mode 100644 index 00000000000..b7ad3974e4a --- /dev/null +++ b/components/diagnostics/diagnose_me/component.py @@ -0,0 +1,120 @@ +from typing import NamedTuple + + +def run_diagnose_me(target_apis: str, bucket: str) -> NamedTuple('Outputs', [('project_id', str), ('bucket', str)]): + """ Performs enviroment verification specific to this pipeline. + + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' does not come with pip3 pre-installed. + import subprocess + subprocess.run( + ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], + capture_output=True) + subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], + capture_output=True) + subprocess.run(['python3', 'get-pip.py'], capture_output=True) + subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], + capture_output=True) + + import sys + from typing import List, Text + import os + from kfp.cli.diagnose_me import gcp + from kfp.cli.diagnose_me import utility + import json as json_library + + config_error_observed = False + + # Get the project ID + # from project configuration + project_config = gcp.get_gcp_configuration( + gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) + project_id = '' + if not project_config.has_error: + project_id = project_config.parsed_output['core']['project'] + print('GCP credentials are configured with access to project: %s ...\n' % + (project_id)) + print('Following account(s) are active under this pipeline:\n') + subprocess.run(['gcloud', 'auth', 'list']) + print('\n') + else: + print('Project configuration is not accessible with error %s\n' % + (project_config.stderr) + 'Follow the instructions at\n' + + 'https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials \n' + + 'to verify you have configured the required gcp secret.', file=sys.stderr) + config_error_observed = True + + # Get project buckets + get_project_bucket_results = gcp.get_gcp_configuration( + gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) + + if get_project_bucket_results.has_error: + print('could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), file=sys.stderr) + config_error_observed = True + + # Get the root of the user provided bucket i.e. gs://root. + bucket_root = '/'.join(bucket.split('/')[0:3]) + + print('Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % (bucket)) + + if bucket_root in get_project_bucket_results.json_output: + print('Provided bucket \n %s\nis accessible within the project\n %s.' % ( + bucket, project_id)) + print('\n\n') + + else: + print(''' + Could not find the bucket %s in project %s + Please verify that you have provided the correct GCS bucket name. + + Only the following buckets are visible in this project: + \n%s + ''' + % (bucket, project_id, get_project_bucket_results.parsed_output), file=sys.stderr) + config_error_observed = True + + # Verify APIs that are required are enabled + api_config_results = gcp.get_gcp_configuration( + gcp.Commands.GET_APIS) + + api_status = {} + + if api_config_results.has_error: + raise RuntimeError('could not retrieve API status with error: %s' % ( + api_config_results.stderr)) + + for item in api_config_results.parsed_output: + api_status[item['config']['name']] = item['state'] + # printing the results in stdout for logging purposes + print('%s %s' % (item['config']['name'], item['state'])) + + # Check if target apis are enabled + api_check_results = True + error_list = [] + for api in target_apis.replace(' ', '').split(','): + if 'ENABLED' != api_status.get(api, 'DISABLED'): + api_check_results = False + error_list.append( + 'API \"%s\" is not enabled. To enable this api go to https://pantheon.corp.google.com/apis/library/%s?project=%s' % (api, api, project_id)) + + if not api_check_results: + config_error_observed = True + print('Required APIs are not enabled:\n' + + '\n'.join(error_list), file=sys.stderr) + + return (project_id, bucket) + + +if __name__ == '__main__': + import kfp.components as comp + + comp.func_to_container_op( + run_diagnose_me, + base_image='google/cloud-sdk:276.0.0', + output_component_file='component.yaml', + ) diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml new file mode 100644 index 00000000000..f244b578fa3 --- /dev/null +++ b/components/diagnostics/diagnose_me/component.yaml @@ -0,0 +1,170 @@ +name: Run diagnose me +description: |- + Performs enviroment verification specific to this pipeline. + + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. +inputs: +- {type: String, name: target_apis} +- {type: String, name: bucket} +outputs: +- {type: String, name: project_id} +- {type: String, name: bucket} +implementation: + container: + image: google/cloud-sdk:276.0.0 + command: + - python3 + - -u + - -c + - | + from typing import NamedTuple + + def run_diagnose_me(target_apis: str, bucket: str) -> NamedTuple('Outputs', [('project_id', str), ('bucket', str)]): + """ Performs enviroment verification specific to this pipeline. + + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' does not come with pip3 pre-installed. + import subprocess + subprocess.run( + ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], + capture_output=True) + subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], + capture_output=True) + subprocess.run(['python3', 'get-pip.py'], capture_output=True) + subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], + capture_output=True) + + import sys + from typing import List, Text + import os + from kfp.cli.diagnose_me import gcp + from kfp.cli.diagnose_me import utility + import json as json_library + + config_error_observed = False + + # Get the project ID + # from project configuration + project_config = gcp.get_gcp_configuration( + gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) + project_id = '' + if not project_config.has_error: + project_id = project_config.parsed_output['core']['project'] + print('GCP credentials are configured with access to project: %s ...\n' % + (project_id)) + print('Following account(s) are active under this pipeline:\n') + subprocess.run(['gcloud', 'auth', 'list']) + print('\n') + else: + print('Project configuration is not accessible with error %s\n' % + (project_config.stderr) + 'Follow the instructions at\n' + + 'https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials \n' + + 'to verify you have configured the required gcp secret.', file=sys.stderr) + config_error_observed = True + + # Get project buckets + get_project_bucket_results = gcp.get_gcp_configuration( + gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) + + if get_project_bucket_results.has_error: + print('could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), file=sys.stderr) + config_error_observed = True + + # Get the root of the user provided bucket i.e. gs://root. + bucket_root = '/'.join(bucket.split('/')[0:3]) + + print('Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % (bucket)) + + if bucket_root in get_project_bucket_results.json_output: + print('Provided bucket \n %s\nis accessible within the project\n %s.' % ( + bucket, project_id)) + print('\n\n') + + else: + print(''' + Could not find the bucket %s in project %s + Please verify that you have provided the correct GCS bucket name. + + Only the following buckets are visible in this project: + \n%s''' + % (bucket, project_id, get_project_bucket_results.parsed_output), file=sys.stderr) + config_error_observed = True + + # Verify APIs that are required are enabled + api_config_results = gcp.get_gcp_configuration( + gcp.Commands.GET_APIS) + + api_status = {} + + if api_config_results.has_error: + raise RuntimeError('could not retrieve API status with error: %s' % ( + api_config_results.stderr)) + + for item in api_config_results.parsed_output: + api_status[item['config']['name']] = item['state'] + # printing the results in stdout for logging purposes + print('%s %s' % (item['config']['name'], item['state'])) + + # Check if target apis are enabled + api_check_results = True + error_list = [] + for api in target_apis.replace(' ', '').split(','): + if 'ENABLED' != api_status.get(api, 'DISABLED'): + api_check_results = False + error_list.append( + 'API \"%s\" is not enabled. To enable this api go to https://pantheon.corp.google.com/apis/library/%s?project=%s' % (api, api, project_id)) + + if not api_check_results: + config_error_observed = True + print('Required APIs are not enabled:\n' + + '\n'.join(error_list), file=sys.stderr) + + return (project_id, bucket) + + def _serialize_str(str_value: str) -> str: + if not isinstance(str_value, str): + raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) + return str_value + + import argparse + _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs enviroment verification specific to this pipeline.\n\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.') + _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = run_diagnose_me(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_str, + _serialize_str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --target-apis + - {inputValue: target_apis} + - --bucket + - {inputValue: bucket} + - '----output-paths' + - {outputPath: project_id} + - {outputPath: bucket} From edcbe8d08cd39fea9871c5ac70178efb20ecd0ed Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 09:41:14 -0800 Subject: [PATCH 02/14] Updating component with execution flag --- .../diagnostics/diagnose_me/component.py | 118 ++++++++----- .../diagnostics/diagnose_me/component.yaml | 158 +++++++++++------- 2 files changed, 181 insertions(+), 95 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index b7ad3974e4a..16ce8b98498 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -1,15 +1,43 @@ -from typing import NamedTuple - +# Copyright 2020 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -def run_diagnose_me(target_apis: str, bucket: str) -> NamedTuple('Outputs', [('project_id', str), ('bucket', str)]): - """ Performs enviroment verification specific to this pipeline. +from typing import NamedTuple - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. - """ - # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' does not come with pip3 pre-installed. +def run_diagnose_me( + bucket: str, execution_mode: str, target_apis: str +) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): + """ Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' + # does not come with pip3 pre-installed. import subprocess subprocess.run( ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], @@ -36,16 +64,15 @@ def run_diagnose_me(target_apis: str, bucket: str) -> NamedTuple('Outputs', [('p project_id = '' if not project_config.has_error: project_id = project_config.parsed_output['core']['project'] - print('GCP credentials are configured with access to project: %s ...\n' % - (project_id)) + print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) print('Following account(s) are active under this pipeline:\n') subprocess.run(['gcloud', 'auth', 'list']) print('\n') else: - print('Project configuration is not accessible with error %s\n' % - (project_config.stderr) + 'Follow the instructions at\n' + - 'https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials \n' - + 'to verify you have configured the required gcp secret.', file=sys.stderr) + print( + 'Project configuration is not accessible with error %s\n' % + (project_config.stderr), + file=sys.stderr) config_error_observed = True # Get project buckets @@ -53,41 +80,41 @@ def run_diagnose_me(target_apis: str, bucket: str) -> NamedTuple('Outputs', [('p gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) if get_project_bucket_results.has_error: - print('could not retrieve project buckets with error: %s' % - (get_project_bucket_results.stderr), file=sys.stderr) + print( + 'could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), + file=sys.stderr) config_error_observed = True # Get the root of the user provided bucket i.e. gs://root. bucket_root = '/'.join(bucket.split('/')[0:3]) - print('Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % (bucket)) + print( + 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % + (bucket)) if bucket_root in get_project_bucket_results.json_output: - print('Provided bucket \n %s\nis accessible within the project\n %s.' % ( - bucket, project_id)) - print('\n\n') + print('Provided bucket \n %s\nis accessible within the project\n %s\n' % + (bucket, project_id)) else: - print(''' - Could not find the bucket %s in project %s - Please verify that you have provided the correct GCS bucket name. - - Only the following buckets are visible in this project: - \n%s - ''' - % (bucket, project_id, get_project_bucket_results.parsed_output), file=sys.stderr) + print('Could not find the bucket %s in project %s' % (bucket, project_id) + + 'Please verify that you have provided the correct GCS bucket name.\n' + + 'Only the following buckets are visible in this project:\n%s' % + (get_project_bucket_results.parsed_output), + file=sys.stderr) config_error_observed = True # Verify APIs that are required are enabled - api_config_results = gcp.get_gcp_configuration( - gcp.Commands.GET_APIS) + api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) api_status = {} if api_config_results.has_error: - raise RuntimeError('could not retrieve API status with error: %s' % ( - api_config_results.stderr)) - + raise RuntimeError('could not retrieve API status with error: %s' % + (api_config_results.stderr)) + + print('Checking APIs status ...') for item in api_config_results.parsed_output: api_status[item['config']['name']] = item['state'] # printing the results in stdout for logging purposes @@ -100,13 +127,28 @@ def run_diagnose_me(target_apis: str, bucket: str) -> NamedTuple('Outputs', [('p if 'ENABLED' != api_status.get(api, 'DISABLED'): api_check_results = False error_list.append( - 'API \"%s\" is not enabled. To enable this api go to https://pantheon.corp.google.com/apis/library/%s?project=%s' % (api, api, project_id)) + 'API \"%s\" is not enabled. To enable this api go to ' % (api) + + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % + (api, project_id)) if not api_check_results: config_error_observed = True - print('Required APIs are not enabled:\n' + - '\n'.join(error_list), file=sys.stderr) - + print( + 'Required APIs are not enabled:\n' + '\n'.join(error_list), + file=sys.stderr) + + if 'HALT_ON_ERROR' in execution_mode and config_error_observed: + raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ + 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ + '\n'+ + 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ + 'checkbox during cluster creation to have the cluster configured automatically.\n'+ + 'For more information on this and other troubleshooting instructions refer to\n'+ + 'our troubleshooting guide.\n'+ + '\n'+ + 'If you have intentionally modified the cluster configuration, you may\n'+ + 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') + return (project_id, bucket) diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index f244b578fa3..5823bbf20ec 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -1,19 +1,19 @@ -name: Run diagnose me -description: |- - Performs enviroment verification specific to this pipeline. - - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. inputs: -- {type: String, name: target_apis} -- {type: String, name: bucket} -outputs: -- {type: String, name: project_id} - {type: String, name: bucket} +- {type: String, name: execution_mode} +- {type: String, name: target_apis} implementation: container: - image: google/cloud-sdk:276.0.0 + args: + - --bucket + - {inputValue: bucket} + - --execution-mode + - {inputValue: execution_mode} + - --target-apis + - {inputValue: target_apis} + - '----output-paths' + - {outputPath: bucket} + - {outputPath: project_id} command: - python3 - -u @@ -21,15 +21,29 @@ implementation: - | from typing import NamedTuple - def run_diagnose_me(target_apis: str, bucket: str) -> NamedTuple('Outputs', [('project_id', str), ('bucket', str)]): - """ Performs enviroment verification specific to this pipeline. - - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. - """ - - # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' does not come with pip3 pre-installed. + def run_diagnose_me( + bucket: str, execution_mode: str, target_apis: str + ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): + """ Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' + # does not come with pip3 pre-installed. import subprocess subprocess.run( ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], @@ -56,16 +70,15 @@ implementation: project_id = '' if not project_config.has_error: project_id = project_config.parsed_output['core']['project'] - print('GCP credentials are configured with access to project: %s ...\n' % - (project_id)) + print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) print('Following account(s) are active under this pipeline:\n') subprocess.run(['gcloud', 'auth', 'list']) print('\n') else: - print('Project configuration is not accessible with error %s\n' % - (project_config.stderr) + 'Follow the instructions at\n' + - 'https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials \n' - + 'to verify you have configured the required gcp secret.', file=sys.stderr) + print( + 'Project configuration is not accessible with error %s\n' % + (project_config.stderr), + file=sys.stderr) config_error_observed = True # Get project buckets @@ -73,40 +86,41 @@ implementation: gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) if get_project_bucket_results.has_error: - print('could not retrieve project buckets with error: %s' % - (get_project_bucket_results.stderr), file=sys.stderr) + print( + 'could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), + file=sys.stderr) config_error_observed = True # Get the root of the user provided bucket i.e. gs://root. bucket_root = '/'.join(bucket.split('/')[0:3]) - print('Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % (bucket)) + print( + 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % + (bucket)) if bucket_root in get_project_bucket_results.json_output: - print('Provided bucket \n %s\nis accessible within the project\n %s.' % ( - bucket, project_id)) - print('\n\n') + print('Provided bucket \n %s\nis accessible within the project\n %s\n' % + (bucket, project_id)) else: - print(''' - Could not find the bucket %s in project %s - Please verify that you have provided the correct GCS bucket name. - - Only the following buckets are visible in this project: - \n%s''' - % (bucket, project_id, get_project_bucket_results.parsed_output), file=sys.stderr) + print('Could not find the bucket %s in project %s' % (bucket, project_id) + + 'Please verify that you have provided the correct GCS bucket name.\n' + + 'Only the following buckets are visible in this project:\n%s' % + (get_project_bucket_results.parsed_output), + file=sys.stderr) config_error_observed = True # Verify APIs that are required are enabled - api_config_results = gcp.get_gcp_configuration( - gcp.Commands.GET_APIS) + api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) api_status = {} if api_config_results.has_error: - raise RuntimeError('could not retrieve API status with error: %s' % ( - api_config_results.stderr)) + raise RuntimeError('could not retrieve API status with error: %s' % + (api_config_results.stderr)) + print('Checking APIs status ...') for item in api_config_results.parsed_output: api_status[item['config']['name']] = item['state'] # printing the results in stdout for logging purposes @@ -119,12 +133,27 @@ implementation: if 'ENABLED' != api_status.get(api, 'DISABLED'): api_check_results = False error_list.append( - 'API \"%s\" is not enabled. To enable this api go to https://pantheon.corp.google.com/apis/library/%s?project=%s' % (api, api, project_id)) + 'API \"%s\" is not enabled. To enable this api go to ' % (api) + + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % + (api, project_id)) if not api_check_results: config_error_observed = True - print('Required APIs are not enabled:\n' + - '\n'.join(error_list), file=sys.stderr) + print( + 'Required APIs are not enabled:\n' + '\n'.join(error_list), + file=sys.stderr) + + if 'HALT_ON_ERROR' in execution_mode and config_error_observed: + raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ + 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ + '\n'+ + 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ + 'checkbox during cluster creation to have the cluster configured automatically.\n'+ + 'For more information on this and other troubleshooting instructions refer to\n'+ + 'our troubleshooting guide.\n'+ + '\n'+ + 'If you have intentionally modified the cluster configuration, you may\n'+ + 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') return (project_id, bucket) @@ -134,9 +163,10 @@ implementation: return str_value import argparse - _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs enviroment verification specific to this pipeline.\n\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.') - _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) + _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.\n') _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) _parsed_args = vars(_parser.parse_args()) _output_files = _parsed_args.pop("_output_paths", []) @@ -160,11 +190,25 @@ implementation: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) - args: - - --target-apis - - {inputValue: target_apis} - - --bucket - - {inputValue: bucket} - - '----output-paths' - - {outputPath: project_id} - - {outputPath: bucket} + image: google/cloud-sdk:276.0.0 +description: | + Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. +name: Run diagnose me +outputs: +- {type: String, name: bucket} +- {type: String, name: project_id} From c17503997c817c6e83322c7847892ce3b0ec7a65 Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 11:05:04 -0800 Subject: [PATCH 03/14] chaning return type to match XGBOOST Pipeline --- .../diagnostics/diagnose_me/component.yaml | 2 +- .../xgboost_training_cm.py | 207 +++++++++--------- 2 files changed, 108 insertions(+), 101 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index 5823bbf20ec..bd5dd581e82 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -211,4 +211,4 @@ description: | name: Run diagnose me outputs: - {type: String, name: bucket} -- {type: String, name: project_id} +- {type: GCPProjectID, name: project_id} diff --git a/samples/core/xgboost_training_cm/xgboost_training_cm.py b/samples/core/xgboost_training_cm/xgboost_training_cm.py index 6b1742ef2c0..54261997266 100755 --- a/samples/core/xgboost_training_cm/xgboost_training_cm.py +++ b/samples/core/xgboost_training_cm/xgboost_training_cm.py @@ -21,9 +21,14 @@ import os import subprocess -confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml') +diagnose_me_op = components.load_component_from_url( + 'https://raw.githubusercontent.com/kubeflow/pipelines/edcbe8d08cd39fea9871c5ac70178efb20ecd0ed/components/diagnostics/diagnose_me/component.yaml') -roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/roc/component.yaml') +confusion_matrix_op = components.load_component_from_url( + 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml') + +roc_op = components.load_component_from_url( + 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/roc/component.yaml') dataproc_create_cluster_op = components.load_component_from_url( 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/gcp/dataproc/create_cluster/component.yaml') @@ -39,7 +44,8 @@ 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/gcp/dataproc/submit_spark_job/component.yaml' ) -_PYSRC_PREFIX = 'gs://ml-pipeline-playground/dataproc-example' # Common path to python src. +# Common path to python src. +_PYSRC_PREFIX = 'gs://ml-pipeline-playground/dataproc-example' _XGBOOST_PKG = 'gs://ml-pipeline-playground/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar' @@ -49,11 +55,11 @@ def delete_directory_from_gcs(dir_path): - """Delete a GCS dir recursively. Ignore errors.""" - try: - subprocess.call(['gsutil', '-m', 'rm', '-r', dir_path]) - except: - pass + """Delete a GCS dir recursively. Ignore errors.""" + try: + subprocess.call(['gsutil', '-m', 'rm', '-r', dir_path]) + except: + pass # ! Please do not forget to enable the Dataproc API in your cluster https://console.developers.google.com/apis/api/dataproc.googleapis.com/overview @@ -63,28 +69,28 @@ def delete_directory_from_gcs(dir_path): def dataproc_analyze_op( - project, - region, - cluster_name, - schema, - train_data, - output): - """Submit dataproc analyze as a pyspark job. - - :param project: GCP project ID. - :param region: Which zone to run this analyze. - :param cluster_name: Name of the cluster. - :param schema: GCS path to the schema. - :param train_data: GCS path to the training data. - :param output: GCS path to store the output. - """ - return dataproc_submit_pyspark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_python_file_uri=os.path.join(_PYSRC_PREFIX, 'analyze_run.py'), - args=['--output', str(output), '--train', str(train_data), '--schema', str(schema)] - ) + project, + region, + cluster_name, + schema, + train_data, + output): + """Submit dataproc analyze as a pyspark job. + :param project: GCP project ID. + :param region: Which zone to run this analyze. + :param cluster_name: Name of the cluster. + :param schema: GCS path to the schema. + :param train_data: GCS path to the training data. + :param output: GCS path to store the output. + """ + return dataproc_submit_pyspark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_python_file_uri=os.path.join(_PYSRC_PREFIX, 'analyze_run.py'), + args=['--output', str(output), '--train', + str(train_data), '--schema', str(schema)] + ) def dataproc_transform_op( @@ -97,40 +103,39 @@ def dataproc_transform_op( analysis, output ): - """Submit dataproc transform as a pyspark job. - - :param project: GCP project ID. - :param region: Which zone to run this analyze. - :param cluster_name: Name of the cluster. - :param train_data: GCS path to the training data. - :param eval_data: GCS path of the eval csv file. - :param target: Target column name. - :param analysis: GCS path of the analysis results - :param output: GCS path to use for output. - """ - - # Remove existing [output]/train and [output]/eval if they exist. - delete_directory_from_gcs(os.path.join(output, 'train')) - delete_directory_from_gcs(os.path.join(output, 'eval')) - - return dataproc_submit_pyspark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_python_file_uri=os.path.join(_PYSRC_PREFIX, - 'transform_run.py'), - args=[ - '--output', - str(output), - '--analysis', - str(analysis), - '--target', - str(target), - '--train', - str(train_data), - '--eval', - str(eval_data) - ]) + """Submit dataproc transform as a pyspark job. + :param project: GCP project ID. + :param region: Which zone to run this analyze. + :param cluster_name: Name of the cluster. + :param train_data: GCS path to the training data. + :param eval_data: GCS path of the eval csv file. + :param target: Target column name. + :param analysis: GCS path of the analysis results + :param output: GCS path to use for output. + """ + + # Remove existing [output]/train and [output]/eval if they exist. + delete_directory_from_gcs(os.path.join(output, 'train')) + delete_directory_from_gcs(os.path.join(output, 'eval')) + + return dataproc_submit_pyspark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_python_file_uri=os.path.join(_PYSRC_PREFIX, + 'transform_run.py'), + args=[ + '--output', + str(output), + '--analysis', + str(analysis), + '--target', + str(target), + '--train', + str(train_data), + '--eval', + str(eval_data) + ]) def dataproc_train_op( @@ -147,27 +152,27 @@ def dataproc_train_op( is_classification=True ): - if is_classification: - config='gs://ml-pipeline-playground/trainconfcla.json' - else: - config='gs://ml-pipeline-playground/trainconfreg.json' - - return dataproc_submit_spark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_class=_TRAINER_MAIN_CLS, - spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}), - args=json.dumps([ - str(config), - str(rounds), - str(workers), - str(analysis), - str(target), - str(train_data), - str(eval_data), - str(output) - ])) + if is_classification: + config = 'gs://ml-pipeline-playground/trainconfcla.json' + else: + config = 'gs://ml-pipeline-playground/trainconfreg.json' + + return dataproc_submit_spark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_class=_TRAINER_MAIN_CLS, + spark_job=json.dumps({'jarFileUris': [_XGBOOST_PKG]}), + args=json.dumps([ + str(config), + str(rounds), + str(workers), + str(analysis), + str(target), + str(train_data), + str(eval_data), + str(output) + ])) def dataproc_predict_op( @@ -181,22 +186,23 @@ def dataproc_predict_op( output ): - return dataproc_submit_spark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_class=_PREDICTOR_MAIN_CLS, - spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}), - args=json.dumps([ - str(model), - str(data), - str(analysis), - str(target), - str(output) - ])) + return dataproc_submit_spark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_class=_PREDICTOR_MAIN_CLS, + spark_job=json.dumps({'jarFileUris': [_XGBOOST_PKG]}), + args=json.dumps([ + str(model), + str(data), + str(analysis), + str(target), + str(output) + ])) # ======================================================================= + @dsl.pipeline( name='XGBoost Trainer', description='A trainer that does end-to-end distributed training for XGBoost models.' @@ -234,8 +240,8 @@ def xgb_train_pipeline( region=region, name=cluster_name, initialization_actions=[ - os.path.join(_PYSRC_PREFIX, - 'initialization_actions.sh'), + os.path.join(_PYSRC_PREFIX, + 'initialization_actions.sh'), ], image_version='1.2' ) @@ -296,5 +302,6 @@ def xgb_train_pipeline( output_dir=output_template ).after(_predict_op) + if __name__ == '__main__': kfp.compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.yaml') From c8ae3d3cbc4c8426d724d73a7cfc08fd4578ee48 Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 11:20:04 -0800 Subject: [PATCH 04/14] changing the diagnose_me output type --- components/diagnostics/diagnose_me/component.py | 2 +- components/diagnostics/diagnose_me/component.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index 16ce8b98498..29dc12c13c8 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -17,7 +17,7 @@ def run_diagnose_me( bucket: str, execution_mode: str, target_apis: str -) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): +) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): """ Performs environment verification specific to this pipeline. args: diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index bd5dd581e82..01876d49abb 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -23,7 +23,7 @@ implementation: def run_diagnose_me( bucket: str, execution_mode: str, target_apis: str - ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): + ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): """ Performs environment verification specific to this pipeline. args: @@ -178,7 +178,7 @@ implementation: _output_serializers = [ _serialize_str, - _serialize_str, + str, ] From 471f660b0ad0cfc15de10a79e46d72a0023e04be Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 11:26:34 -0800 Subject: [PATCH 05/14] changing the diagnose_me output type --- components/diagnostics/diagnose_me/component.py | 2 +- components/diagnostics/diagnose_me/component.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index 16ce8b98498..29dc12c13c8 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -17,7 +17,7 @@ def run_diagnose_me( bucket: str, execution_mode: str, target_apis: str -) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): +) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): """ Performs environment verification specific to this pipeline. args: diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index bd5dd581e82..01876d49abb 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -23,7 +23,7 @@ implementation: def run_diagnose_me( bucket: str, execution_mode: str, target_apis: str - ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): + ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): """ Performs environment verification specific to this pipeline. args: @@ -178,7 +178,7 @@ implementation: _output_serializers = [ _serialize_str, - _serialize_str, + str, ] From 0bf15c1a729cd52349438847947f42191ac470e6 Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 11:28:44 -0800 Subject: [PATCH 06/14] reverting changes on XGBoost.py --- .../xgboost_training_cm.py | 209 +++++++++--------- 1 file changed, 101 insertions(+), 108 deletions(-) diff --git a/samples/core/xgboost_training_cm/xgboost_training_cm.py b/samples/core/xgboost_training_cm/xgboost_training_cm.py index 54261997266..62b2d4cefcd 100755 --- a/samples/core/xgboost_training_cm/xgboost_training_cm.py +++ b/samples/core/xgboost_training_cm/xgboost_training_cm.py @@ -21,14 +21,9 @@ import os import subprocess -diagnose_me_op = components.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/pipelines/edcbe8d08cd39fea9871c5ac70178efb20ecd0ed/components/diagnostics/diagnose_me/component.yaml') +confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml') -confusion_matrix_op = components.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml') - -roc_op = components.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/roc/component.yaml') +roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/roc/component.yaml') dataproc_create_cluster_op = components.load_component_from_url( 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/gcp/dataproc/create_cluster/component.yaml') @@ -44,8 +39,7 @@ 'https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/gcp/dataproc/submit_spark_job/component.yaml' ) -# Common path to python src. -_PYSRC_PREFIX = 'gs://ml-pipeline-playground/dataproc-example' +_PYSRC_PREFIX = 'gs://ml-pipeline-playground/dataproc-example' # Common path to python src. _XGBOOST_PKG = 'gs://ml-pipeline-playground/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar' @@ -55,11 +49,11 @@ def delete_directory_from_gcs(dir_path): - """Delete a GCS dir recursively. Ignore errors.""" - try: - subprocess.call(['gsutil', '-m', 'rm', '-r', dir_path]) - except: - pass + """Delete a GCS dir recursively. Ignore errors.""" + try: + subprocess.call(['gsutil', '-m', 'rm', '-r', dir_path]) + except: + pass # ! Please do not forget to enable the Dataproc API in your cluster https://console.developers.google.com/apis/api/dataproc.googleapis.com/overview @@ -69,28 +63,28 @@ def delete_directory_from_gcs(dir_path): def dataproc_analyze_op( - project, - region, - cluster_name, - schema, - train_data, - output): - """Submit dataproc analyze as a pyspark job. - :param project: GCP project ID. - :param region: Which zone to run this analyze. - :param cluster_name: Name of the cluster. - :param schema: GCS path to the schema. - :param train_data: GCS path to the training data. - :param output: GCS path to store the output. - """ - return dataproc_submit_pyspark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_python_file_uri=os.path.join(_PYSRC_PREFIX, 'analyze_run.py'), - args=['--output', str(output), '--train', - str(train_data), '--schema', str(schema)] - ) + project, + region, + cluster_name, + schema, + train_data, + output): + """Submit dataproc analyze as a pyspark job. + + :param project: GCP project ID. + :param region: Which zone to run this analyze. + :param cluster_name: Name of the cluster. + :param schema: GCS path to the schema. + :param train_data: GCS path to the training data. + :param output: GCS path to store the output. + """ + return dataproc_submit_pyspark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_python_file_uri=os.path.join(_PYSRC_PREFIX, 'analyze_run.py'), + args=['--output', str(output), '--train', str(train_data), '--schema', str(schema)] + ) def dataproc_transform_op( @@ -103,39 +97,40 @@ def dataproc_transform_op( analysis, output ): - """Submit dataproc transform as a pyspark job. - :param project: GCP project ID. - :param region: Which zone to run this analyze. - :param cluster_name: Name of the cluster. - :param train_data: GCS path to the training data. - :param eval_data: GCS path of the eval csv file. - :param target: Target column name. - :param analysis: GCS path of the analysis results - :param output: GCS path to use for output. - """ - - # Remove existing [output]/train and [output]/eval if they exist. - delete_directory_from_gcs(os.path.join(output, 'train')) - delete_directory_from_gcs(os.path.join(output, 'eval')) - - return dataproc_submit_pyspark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_python_file_uri=os.path.join(_PYSRC_PREFIX, - 'transform_run.py'), - args=[ - '--output', - str(output), - '--analysis', - str(analysis), - '--target', - str(target), - '--train', - str(train_data), - '--eval', - str(eval_data) - ]) + """Submit dataproc transform as a pyspark job. + + :param project: GCP project ID. + :param region: Which zone to run this analyze. + :param cluster_name: Name of the cluster. + :param train_data: GCS path to the training data. + :param eval_data: GCS path of the eval csv file. + :param target: Target column name. + :param analysis: GCS path of the analysis results + :param output: GCS path to use for output. + """ + + # Remove existing [output]/train and [output]/eval if they exist. + delete_directory_from_gcs(os.path.join(output, 'train')) + delete_directory_from_gcs(os.path.join(output, 'eval')) + + return dataproc_submit_pyspark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_python_file_uri=os.path.join(_PYSRC_PREFIX, + 'transform_run.py'), + args=[ + '--output', + str(output), + '--analysis', + str(analysis), + '--target', + str(target), + '--train', + str(train_data), + '--eval', + str(eval_data) + ]) def dataproc_train_op( @@ -152,27 +147,27 @@ def dataproc_train_op( is_classification=True ): - if is_classification: - config = 'gs://ml-pipeline-playground/trainconfcla.json' - else: - config = 'gs://ml-pipeline-playground/trainconfreg.json' - - return dataproc_submit_spark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_class=_TRAINER_MAIN_CLS, - spark_job=json.dumps({'jarFileUris': [_XGBOOST_PKG]}), - args=json.dumps([ - str(config), - str(rounds), - str(workers), - str(analysis), - str(target), - str(train_data), - str(eval_data), - str(output) - ])) + if is_classification: + config='gs://ml-pipeline-playground/trainconfcla.json' + else: + config='gs://ml-pipeline-playground/trainconfreg.json' + + return dataproc_submit_spark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_class=_TRAINER_MAIN_CLS, + spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}), + args=json.dumps([ + str(config), + str(rounds), + str(workers), + str(analysis), + str(target), + str(train_data), + str(eval_data), + str(output) + ])) def dataproc_predict_op( @@ -186,23 +181,22 @@ def dataproc_predict_op( output ): - return dataproc_submit_spark_op( - project_id=project, - region=region, - cluster_name=cluster_name, - main_class=_PREDICTOR_MAIN_CLS, - spark_job=json.dumps({'jarFileUris': [_XGBOOST_PKG]}), - args=json.dumps([ - str(model), - str(data), - str(analysis), - str(target), - str(output) - ])) + return dataproc_submit_spark_op( + project_id=project, + region=region, + cluster_name=cluster_name, + main_class=_PREDICTOR_MAIN_CLS, + spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}), + args=json.dumps([ + str(model), + str(data), + str(analysis), + str(target), + str(output) + ])) # ======================================================================= - @dsl.pipeline( name='XGBoost Trainer', description='A trainer that does end-to-end distributed training for XGBoost models.' @@ -240,8 +234,8 @@ def xgb_train_pipeline( region=region, name=cluster_name, initialization_actions=[ - os.path.join(_PYSRC_PREFIX, - 'initialization_actions.sh'), + os.path.join(_PYSRC_PREFIX, + 'initialization_actions.sh'), ], image_version='1.2' ) @@ -302,6 +296,5 @@ def xgb_train_pipeline( output_dir=output_template ).after(_predict_op) - if __name__ == '__main__': - kfp.compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.yaml') + kfp.compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.yaml') \ No newline at end of file From e0e4f85bcf749875459522f4a2f5062194c132cb Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 11:35:17 -0800 Subject: [PATCH 07/14] reverting changes in Xgboost_trainer --- samples/core/xgboost_training_cm/xgboost_training_cm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) mode change 100755 => 100644 samples/core/xgboost_training_cm/xgboost_training_cm.py diff --git a/samples/core/xgboost_training_cm/xgboost_training_cm.py b/samples/core/xgboost_training_cm/xgboost_training_cm.py old mode 100755 new mode 100644 index 62b2d4cefcd..6b1742ef2c0 --- a/samples/core/xgboost_training_cm/xgboost_training_cm.py +++ b/samples/core/xgboost_training_cm/xgboost_training_cm.py @@ -297,4 +297,4 @@ def xgb_train_pipeline( ).after(_predict_op) if __name__ == '__main__': - kfp.compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.yaml') \ No newline at end of file + kfp.compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.yaml') From d4525e99032280ef44c6321d64927d6bafece5c6 Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 11:57:43 -0800 Subject: [PATCH 08/14] remvoing raise error with flag --- .../diagnostics/diagnose_me/component.py | 377 ++++++++++-------- .../diagnostics/diagnose_me/component.yaml | 377 ++++++++---------- 2 files changed, 378 insertions(+), 376 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index 29dc12c13c8..eef700ed5af 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -1,162 +1,215 @@ -# Copyright 2020 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import NamedTuple - - -def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str -) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): - """ Performs environment verification specific to this pipeline. - - args: - bucket: - string name of the bucket to be checked. Must be of the format - gs://bucket_root/any/path/here/is/ignored where any path beyond root - is ignored. - execution_mode: - If set to HALT_ON_ERROR will case any error to raise an exception. - This is intended to stop the data processing of a pipeline. Can set - to False to only report Errors/Warnings. - target_apis: - String consisting of a comma separated list of apis to be verified. - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. - """ - - # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' - # does not come with pip3 pre-installed. - import subprocess - subprocess.run( - ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], - capture_output=True) - subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], - capture_output=True) - subprocess.run(['python3', 'get-pip.py'], capture_output=True) - subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], - capture_output=True) - - import sys - from typing import List, Text - import os - from kfp.cli.diagnose_me import gcp - from kfp.cli.diagnose_me import utility - import json as json_library - - config_error_observed = False - - # Get the project ID - # from project configuration - project_config = gcp.get_gcp_configuration( - gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' - if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] - print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) - print('Following account(s) are active under this pipeline:\n') - subprocess.run(['gcloud', 'auth', 'list']) - print('\n') - else: - print( - 'Project configuration is not accessible with error %s\n' % - (project_config.stderr), - file=sys.stderr) - config_error_observed = True - - # Get project buckets - get_project_bucket_results = gcp.get_gcp_configuration( - gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) - - if get_project_bucket_results.has_error: - print( - 'could not retrieve project buckets with error: %s' % - (get_project_bucket_results.stderr), - file=sys.stderr) - config_error_observed = True - - # Get the root of the user provided bucket i.e. gs://root. - bucket_root = '/'.join(bucket.split('/')[0:3]) - - print( - 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % - (bucket)) - - if bucket_root in get_project_bucket_results.json_output: - print('Provided bucket \n %s\nis accessible within the project\n %s\n' % - (bucket, project_id)) - - else: - print('Could not find the bucket %s in project %s' % (bucket, project_id) + - 'Please verify that you have provided the correct GCS bucket name.\n' + - 'Only the following buckets are visible in this project:\n%s' % - (get_project_bucket_results.parsed_output), - file=sys.stderr) - config_error_observed = True - - # Verify APIs that are required are enabled - api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) - - api_status = {} - - if api_config_results.has_error: - raise RuntimeError('could not retrieve API status with error: %s' % - (api_config_results.stderr)) - - print('Checking APIs status ...') - for item in api_config_results.parsed_output: - api_status[item['config']['name']] = item['state'] - # printing the results in stdout for logging purposes - print('%s %s' % (item['config']['name'], item['state'])) - - # Check if target apis are enabled - api_check_results = True - error_list = [] - for api in target_apis.replace(' ', '').split(','): - if 'ENABLED' != api_status.get(api, 'DISABLED'): - api_check_results = False - error_list.append( - 'API \"%s\" is not enabled. To enable this api go to ' % (api) + - 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id)) - - if not api_check_results: - config_error_observed = True - print( - 'Required APIs are not enabled:\n' + '\n'.join(error_list), - file=sys.stderr) - - if 'HALT_ON_ERROR' in execution_mode and config_error_observed: - raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ - 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ - '\n'+ - 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ - 'checkbox during cluster creation to have the cluster configured automatically.\n'+ - 'For more information on this and other troubleshooting instructions refer to\n'+ - 'our troubleshooting guide.\n'+ - '\n'+ - 'If you have intentionally modified the cluster configuration, you may\n'+ - 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') - - return (project_id, bucket) - - -if __name__ == '__main__': - import kfp.components as comp - - comp.func_to_container_op( - run_diagnose_me, - base_image='google/cloud-sdk:276.0.0', - output_component_file='component.yaml', - ) +inputs: +- {type: String, name: bucket} +- {type: String, name: execution_mode} +- {type: String, name: target_apis} +implementation: + container: + args: + - --bucket + - {inputValue: bucket} + - --execution-mode + - {inputValue: execution_mode} + - --target-apis + - {inputValue: target_apis} + - '----output-paths' + - {outputPath: bucket} + - {outputPath: project_id} + command: + - python3 + - -u + - -c + - | + from typing import NamedTuple + + def run_diagnose_me( + bucket: str, execution_mode: str, target_apis: str + ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): + """ Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' + # does not come with pip3 pre-installed. + import subprocess + subprocess.run( + ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], + capture_output=True) + subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], + capture_output=True) + subprocess.run(['python3', 'get-pip.py'], capture_output=True) + subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], + capture_output=True) + + import sys + from typing import List, Text + import os + from kfp.cli.diagnose_me import gcp + from kfp.cli.diagnose_me import utility + import json as json_library + + config_error_observed = False + + # Get the project ID + # from project configuration + project_config = gcp.get_gcp_configuration( + gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) + project_id = '' + if not project_config.has_error: + project_id = project_config.parsed_output['core']['project'] + print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) + print('Following account(s) are active under this pipeline:\n') + subprocess.run(['gcloud', 'auth', 'list']) + print('\n') + else: + print( + 'Project configuration is not accessible with error %s\n' % + (project_config.stderr), + file=sys.stderr) + config_error_observed = True + + # Get project buckets + get_project_bucket_results = gcp.get_gcp_configuration( + gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) + + if get_project_bucket_results.has_error: + print( + 'could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), + file=sys.stderr) + config_error_observed = True + + # Get the root of the user provided bucket i.e. gs://root. + bucket_root = '/'.join(bucket.split('/')[0:3]) + + print( + 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % + (bucket)) + + if bucket_root in get_project_bucket_results.json_output: + print('Provided bucket \n %s\nis accessible within the project\n %s\n' % + (bucket, project_id)) + + else: + print('Could not find the bucket %s in project %s' % (bucket, project_id) + + 'Please verify that you have provided the correct GCS bucket name.\n' + + 'Only the following buckets are visible in this project:\n%s' % + (get_project_bucket_results.parsed_output), + file=sys.stderr) + config_error_observed = True + + # Verify APIs that are required are enabled + api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) + + api_status = {} + + if api_config_results.has_error: + print('could not retrieve API status with error: %s' % + (api_config_results.stderr),sys.stderr) + config_error_observed = True + + print('Checking APIs status ...') + for item in api_config_results.parsed_output: + api_status[item['config']['name']] = item['state'] + # printing the results in stdout for logging purposes + print('%s %s' % (item['config']['name'], item['state'])) + + # Check if target apis are enabled + api_check_results = True + error_list = [] + for api in target_apis.replace(' ', '').split(','): + if 'ENABLED' != api_status.get(api, 'DISABLED'): + api_check_results = False + error_list.append( + 'API \"%s\" is not enabled. To enable this api go to ' % (api) + + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % + (api, project_id)) + + if not api_check_results: + config_error_observed = True + print( + 'Required APIs are not enabled:\n' + '\n'.join(error_list), + file=sys.stderr) + + if 'HALT_ON_ERROR' in execution_mode and config_error_observed: + raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ + 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ + '\n'+ + 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ + 'checkbox during cluster creation to have the cluster configured automatically.\n'+ + 'For more information on this and other troubleshooting instructions refer to\n'+ + 'our troubleshooting guide.\n'+ + '\n'+ + 'If you have intentionally modified the cluster configuration, you may\n'+ + 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') + + return (project_id, bucket) + + def _serialize_str(str_value: str) -> str: + if not isinstance(str_value, str): + raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) + return str_value + + import argparse + _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.\n') + _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = run_diagnose_me(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_str, + str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + image: google/cloud-sdk:276.0.0 +description: | + Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. +name: Run diagnose me +outputs: +- {type: String, name: bucket} +- {type: GCPProjectID, name: project_id} diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index 01876d49abb..4b8176818b0 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -1,214 +1,163 @@ -inputs: -- {type: String, name: bucket} -- {type: String, name: execution_mode} -- {type: String, name: target_apis} -implementation: - container: - args: - - --bucket - - {inputValue: bucket} - - --execution-mode - - {inputValue: execution_mode} - - --target-apis - - {inputValue: target_apis} - - '----output-paths' - - {outputPath: bucket} - - {outputPath: project_id} - command: - - python3 - - -u - - -c - - | - from typing import NamedTuple - - def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str - ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): - """ Performs environment verification specific to this pipeline. - - args: - bucket: - string name of the bucket to be checked. Must be of the format - gs://bucket_root/any/path/here/is/ignored where any path beyond root - is ignored. - execution_mode: - If set to HALT_ON_ERROR will case any error to raise an exception. - This is intended to stop the data processing of a pipeline. Can set - to False to only report Errors/Warnings. - target_apis: - String consisting of a comma separated list of apis to be verified. - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. - """ - - # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' - # does not come with pip3 pre-installed. - import subprocess - subprocess.run( - ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], - capture_output=True) - subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], - capture_output=True) - subprocess.run(['python3', 'get-pip.py'], capture_output=True) - subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], - capture_output=True) - - import sys - from typing import List, Text - import os - from kfp.cli.diagnose_me import gcp - from kfp.cli.diagnose_me import utility - import json as json_library - - config_error_observed = False - - # Get the project ID - # from project configuration - project_config = gcp.get_gcp_configuration( - gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' - if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] - print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) - print('Following account(s) are active under this pipeline:\n') - subprocess.run(['gcloud', 'auth', 'list']) - print('\n') - else: - print( - 'Project configuration is not accessible with error %s\n' % - (project_config.stderr), - file=sys.stderr) - config_error_observed = True - - # Get project buckets - get_project_bucket_results = gcp.get_gcp_configuration( - gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) - - if get_project_bucket_results.has_error: - print( - 'could not retrieve project buckets with error: %s' % - (get_project_bucket_results.stderr), - file=sys.stderr) - config_error_observed = True - - # Get the root of the user provided bucket i.e. gs://root. - bucket_root = '/'.join(bucket.split('/')[0:3]) - - print( - 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % - (bucket)) - - if bucket_root in get_project_bucket_results.json_output: - print('Provided bucket \n %s\nis accessible within the project\n %s\n' % - (bucket, project_id)) - - else: - print('Could not find the bucket %s in project %s' % (bucket, project_id) + - 'Please verify that you have provided the correct GCS bucket name.\n' + - 'Only the following buckets are visible in this project:\n%s' % - (get_project_bucket_results.parsed_output), - file=sys.stderr) - config_error_observed = True - - # Verify APIs that are required are enabled - api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) - - api_status = {} - - if api_config_results.has_error: - raise RuntimeError('could not retrieve API status with error: %s' % - (api_config_results.stderr)) - - print('Checking APIs status ...') - for item in api_config_results.parsed_output: - api_status[item['config']['name']] = item['state'] - # printing the results in stdout for logging purposes - print('%s %s' % (item['config']['name'], item['state'])) - - # Check if target apis are enabled - api_check_results = True - error_list = [] - for api in target_apis.replace(' ', '').split(','): - if 'ENABLED' != api_status.get(api, 'DISABLED'): - api_check_results = False - error_list.append( - 'API \"%s\" is not enabled. To enable this api go to ' % (api) + - 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id)) - - if not api_check_results: - config_error_observed = True - print( - 'Required APIs are not enabled:\n' + '\n'.join(error_list), - file=sys.stderr) - - if 'HALT_ON_ERROR' in execution_mode and config_error_observed: - raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ - 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ - '\n'+ - 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ - 'checkbox during cluster creation to have the cluster configured automatically.\n'+ - 'For more information on this and other troubleshooting instructions refer to\n'+ - 'our troubleshooting guide.\n'+ - '\n'+ - 'If you have intentionally modified the cluster configuration, you may\n'+ - 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') - - return (project_id, bucket) - - def _serialize_str(str_value: str) -> str: - if not isinstance(str_value, str): - raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) - return str_value - - import argparse - _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.\n') - _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) - _parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS) - _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) - _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) - _parsed_args = vars(_parser.parse_args()) - _output_files = _parsed_args.pop("_output_paths", []) - - _outputs = run_diagnose_me(**_parsed_args) - - if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): - _outputs = [_outputs] - - _output_serializers = [ - _serialize_str, - str, - - ] - - import os - for idx, output_file in enumerate(_output_files): - try: - os.makedirs(os.path.dirname(output_file)) - except OSError: - pass - with open(output_file, 'w') as f: - f.write(_output_serializers[idx](_outputs[idx])) - image: google/cloud-sdk:276.0.0 -description: | - Performs environment verification specific to this pipeline. - - args: - bucket: - string name of the bucket to be checked. Must be of the format - gs://bucket_root/any/path/here/is/ignored where any path beyond root - is ignored. - execution_mode: - If set to HALT_ON_ERROR will case any error to raise an exception. - This is intended to stop the data processing of a pipeline. Can set - to False to only report Errors/Warnings. - target_apis: - String consisting of a comma separated list of apis to be verified. - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. -name: Run diagnose me -outputs: -- {type: String, name: bucket} -- {type: GCPProjectID, name: project_id} +# Copyright 2020 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import NamedTuple + + +def run_diagnose_me( + bucket: str, execution_mode: str, target_apis: str +) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): + """ Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' + # does not come with pip3 pre-installed. + import subprocess + subprocess.run( + ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], + capture_output=True) + subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], + capture_output=True) + subprocess.run(['python3', 'get-pip.py'], capture_output=True) + subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], + capture_output=True) + + import sys + from typing import List, Text + import os + from kfp.cli.diagnose_me import gcp + from kfp.cli.diagnose_me import utility + import json as json_library + + config_error_observed = False + + # Get the project ID + # from project configuration + project_config = gcp.get_gcp_configuration( + gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) + project_id = '' + if not project_config.has_error: + project_id = project_config.parsed_output['core']['project'] + print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) + print('Following account(s) are active under this pipeline:\n') + subprocess.run(['gcloud', 'auth', 'list']) + print('\n') + else: + print( + 'Project configuration is not accessible with error %s\n' % + (project_config.stderr), + file=sys.stderr) + config_error_observed = True + + # Get project buckets + get_project_bucket_results = gcp.get_gcp_configuration( + gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) + + if get_project_bucket_results.has_error: + print( + 'could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), + file=sys.stderr) + config_error_observed = True + + # Get the root of the user provided bucket i.e. gs://root. + bucket_root = '/'.join(bucket.split('/')[0:3]) + + print( + 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % + (bucket)) + + if bucket_root in get_project_bucket_results.json_output: + print('Provided bucket \n %s\nis accessible within the project\n %s\n' % + (bucket, project_id)) + + else: + print('Could not find the bucket %s in project %s' % (bucket, project_id) + + 'Please verify that you have provided the correct GCS bucket name.\n' + + 'Only the following buckets are visible in this project:\n%s' % + (get_project_bucket_results.parsed_output), + file=sys.stderr) + config_error_observed = True + + # Verify APIs that are required are enabled + api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) + + api_status = {} + + if api_config_results.has_error: + print('could not retrieve API status with error: %s' % + (api_config_results.stderr),sys.stderr) + config_error_observed = True + + print('Checking APIs status ...') + for item in api_config_results.parsed_output: + api_status[item['config']['name']] = item['state'] + # printing the results in stdout for logging purposes + print('%s %s' % (item['config']['name'], item['state'])) + + # Check if target apis are enabled + api_check_results = True + error_list = [] + for api in target_apis.replace(' ', '').split(','): + if 'ENABLED' != api_status.get(api, 'DISABLED'): + api_check_results = False + error_list.append( + 'API \"%s\" is not enabled. To enable this api go to ' % (api) + + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % + (api, project_id)) + + if not api_check_results: + config_error_observed = True + print( + 'Required APIs are not enabled:\n' + '\n'.join(error_list), + file=sys.stderr) + + if 'HALT_ON_ERROR' in execution_mode and config_error_observed: + raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ + 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ + '\n'+ + 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ + 'checkbox during cluster creation to have the cluster configured automatically.\n'+ + 'For more information on this and other troubleshooting instructions refer to\n'+ + 'our troubleshooting guide.\n'+ + '\n'+ + 'If you have intentionally modified the cluster configuration, you may\n'+ + 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') + + return (project_id, bucket) + + +if __name__ == '__main__': + import kfp.components as comp + + comp.func_to_container_op( + run_diagnose_me, + base_image='google/cloud-sdk:276.0.0', + output_component_file='component.yaml', + ) From 004e798ab5bfc6459522f83e95f7a13ebea00242 Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 13:32:52 -0800 Subject: [PATCH 09/14] upadating error message --- .../diagnostics/diagnose_me/component.py | 372 ++++++++---------- .../diagnostics/diagnose_me/component.yaml | 372 ++++++++++-------- 2 files changed, 366 insertions(+), 378 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index eef700ed5af..b92ef766b92 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -1,215 +1,157 @@ -inputs: -- {type: String, name: bucket} -- {type: String, name: execution_mode} -- {type: String, name: target_apis} -implementation: - container: - args: - - --bucket - - {inputValue: bucket} - - --execution-mode - - {inputValue: execution_mode} - - --target-apis - - {inputValue: target_apis} - - '----output-paths' - - {outputPath: bucket} - - {outputPath: project_id} - command: - - python3 - - -u - - -c - - | - from typing import NamedTuple - - def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str - ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): - """ Performs environment verification specific to this pipeline. - - args: - bucket: - string name of the bucket to be checked. Must be of the format - gs://bucket_root/any/path/here/is/ignored where any path beyond root - is ignored. - execution_mode: - If set to HALT_ON_ERROR will case any error to raise an exception. - This is intended to stop the data processing of a pipeline. Can set - to False to only report Errors/Warnings. - target_apis: - String consisting of a comma separated list of apis to be verified. - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. - """ - - # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' - # does not come with pip3 pre-installed. - import subprocess - subprocess.run( - ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], - capture_output=True) - subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], - capture_output=True) - subprocess.run(['python3', 'get-pip.py'], capture_output=True) - subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], - capture_output=True) - - import sys - from typing import List, Text - import os - from kfp.cli.diagnose_me import gcp - from kfp.cli.diagnose_me import utility - import json as json_library - - config_error_observed = False - - # Get the project ID - # from project configuration - project_config = gcp.get_gcp_configuration( - gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' - if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] - print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) - print('Following account(s) are active under this pipeline:\n') - subprocess.run(['gcloud', 'auth', 'list']) - print('\n') - else: - print( - 'Project configuration is not accessible with error %s\n' % - (project_config.stderr), - file=sys.stderr) - config_error_observed = True - - # Get project buckets - get_project_bucket_results = gcp.get_gcp_configuration( - gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) - - if get_project_bucket_results.has_error: - print( - 'could not retrieve project buckets with error: %s' % - (get_project_bucket_results.stderr), - file=sys.stderr) - config_error_observed = True - - # Get the root of the user provided bucket i.e. gs://root. - bucket_root = '/'.join(bucket.split('/')[0:3]) - - print( - 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % - (bucket)) - - if bucket_root in get_project_bucket_results.json_output: - print('Provided bucket \n %s\nis accessible within the project\n %s\n' % - (bucket, project_id)) - - else: - print('Could not find the bucket %s in project %s' % (bucket, project_id) + - 'Please verify that you have provided the correct GCS bucket name.\n' + - 'Only the following buckets are visible in this project:\n%s' % - (get_project_bucket_results.parsed_output), - file=sys.stderr) - config_error_observed = True - - # Verify APIs that are required are enabled - api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) - - api_status = {} - - if api_config_results.has_error: - print('could not retrieve API status with error: %s' % - (api_config_results.stderr),sys.stderr) - config_error_observed = True - - print('Checking APIs status ...') - for item in api_config_results.parsed_output: - api_status[item['config']['name']] = item['state'] - # printing the results in stdout for logging purposes - print('%s %s' % (item['config']['name'], item['state'])) - - # Check if target apis are enabled - api_check_results = True - error_list = [] - for api in target_apis.replace(' ', '').split(','): - if 'ENABLED' != api_status.get(api, 'DISABLED'): - api_check_results = False - error_list.append( - 'API \"%s\" is not enabled. To enable this api go to ' % (api) + - 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id)) - - if not api_check_results: - config_error_observed = True - print( - 'Required APIs are not enabled:\n' + '\n'.join(error_list), - file=sys.stderr) - - if 'HALT_ON_ERROR' in execution_mode and config_error_observed: - raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ - 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ - '\n'+ - 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ - 'checkbox during cluster creation to have the cluster configured automatically.\n'+ - 'For more information on this and other troubleshooting instructions refer to\n'+ - 'our troubleshooting guide.\n'+ - '\n'+ - 'If you have intentionally modified the cluster configuration, you may\n'+ - 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') - - return (project_id, bucket) - - def _serialize_str(str_value: str) -> str: - if not isinstance(str_value, str): - raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) - return str_value - - import argparse - _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.\n') - _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) - _parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS) - _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) - _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) - _parsed_args = vars(_parser.parse_args()) - _output_files = _parsed_args.pop("_output_paths", []) - - _outputs = run_diagnose_me(**_parsed_args) - - if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): - _outputs = [_outputs] - - _output_serializers = [ - _serialize_str, - str, - - ] - - import os - for idx, output_file in enumerate(_output_files): - try: - os.makedirs(os.path.dirname(output_file)) - except OSError: - pass - with open(output_file, 'w') as f: - f.write(_output_serializers[idx](_outputs[idx])) - image: google/cloud-sdk:276.0.0 -description: | - Performs environment verification specific to this pipeline. - - args: - bucket: - string name of the bucket to be checked. Must be of the format - gs://bucket_root/any/path/here/is/ignored where any path beyond root - is ignored. - execution_mode: - If set to HALT_ON_ERROR will case any error to raise an exception. - This is intended to stop the data processing of a pipeline. Can set - to False to only report Errors/Warnings. - target_apis: - String consisting of a comma separated list of apis to be verified. - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. -name: Run diagnose me -outputs: -- {type: String, name: bucket} -- {type: GCPProjectID, name: project_id} +# Copyright 2020 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import NamedTuple + + +def run_diagnose_me( + bucket: str, execution_mode: str, target_apis: str +) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): + """ Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' + # does not come with pip3 pre-installed. + import subprocess + subprocess.run( + ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], + capture_output=True) + subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], + capture_output=True) + subprocess.run(['python3', 'get-pip.py'], capture_output=True) + subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], + capture_output=True) + + import sys + from typing import List, Text + import os + from kfp.cli.diagnose_me import gcp + from kfp.cli.diagnose_me import utility + import json as json_library + + config_error_observed = False + + # Get the project ID + # from project configuration + project_config = gcp.get_gcp_configuration( + gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) + project_id = '' + if not project_config.has_error: + project_id = project_config.parsed_output['core']['project'] + print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) + print('Following account(s) are active under this pipeline:\n') + subprocess.run(['gcloud', 'auth', 'list']) + print('\n') + else: + print( + 'Project configuration is not accessible with error %s\n' % + (project_config.stderr), + file=sys.stderr) + config_error_observed = True + + # Get project buckets + get_project_bucket_results = gcp.get_gcp_configuration( + gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) + + if get_project_bucket_results.has_error: + print( + 'could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), + file=sys.stderr) + config_error_observed = True + + # Get the root of the user provided bucket i.e. gs://root. + bucket_root = '/'.join(bucket.split('/')[0:3]) + + print( + 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % + (bucket)) + + if bucket_root in get_project_bucket_results.json_output: + print('Provided bucket \n %s\nis accessible within the project\n %s\n' % + (bucket, project_id)) + + else: + print('Could not find the bucket %s in project %s' % (bucket, project_id) + + 'Please verify that you have provided the correct GCS bucket name.\n' + + 'Only the following buckets are visible in this project:\n%s' % + (get_project_bucket_results.parsed_output), + file=sys.stderr) + config_error_observed = True + + # Verify APIs that are required are enabled + api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) + + api_status = {} + + if api_config_results.has_error: + print('could not retrieve API status with error: %s' % + (api_config_results.stderr),sys.stderr) + config_error_observed = True + + print('Checking APIs status ...') + for item in api_config_results.parsed_output: + api_status[item['config']['name']] = item['state'] + # printing the results in stdout for logging purposes + print('%s %s' % (item['config']['name'], item['state'])) + + # Check if target apis are enabled + api_check_results = True + for api in target_apis.replace(' ', '').split(','): + if 'ENABLED' != api_status.get(api, 'DISABLED'): + api_check_results = False + print( + 'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) + + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % + (api, project_id),sys.stderr) + config_error_observed = True + + if 'HALT_ON_ERROR' in execution_mode and config_error_observed: + raise RuntimeError('There was an error in your environment configuration.\n'+ + 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ + '\n'+ + 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ + 'checkbox during cluster creation to have the cluster configured automatically.\n'+ + 'For more information on this and other troubleshooting instructions refer to\n'+ + 'our troubleshooting guide.\n'+ + '\n'+ + 'If you have intentionally modified the cluster configuration, you may\n'+ + 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') + + return (project_id, bucket) + + +if __name__ == '__main__': + import kfp.components as comp + + comp.func_to_container_op( + run_diagnose_me, + base_image='google/cloud-sdk:276.0.0', + output_component_file='component.yaml', + ) diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index 4b8176818b0..b03a4085906 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -1,163 +1,209 @@ -# Copyright 2020 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import NamedTuple - - -def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str -) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): - """ Performs environment verification specific to this pipeline. - - args: - bucket: - string name of the bucket to be checked. Must be of the format - gs://bucket_root/any/path/here/is/ignored where any path beyond root - is ignored. - execution_mode: - If set to HALT_ON_ERROR will case any error to raise an exception. - This is intended to stop the data processing of a pipeline. Can set - to False to only report Errors/Warnings. - target_apis: - String consisting of a comma separated list of apis to be verified. - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. - """ - - # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' - # does not come with pip3 pre-installed. - import subprocess - subprocess.run( - ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], - capture_output=True) - subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], - capture_output=True) - subprocess.run(['python3', 'get-pip.py'], capture_output=True) - subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], - capture_output=True) - - import sys - from typing import List, Text - import os - from kfp.cli.diagnose_me import gcp - from kfp.cli.diagnose_me import utility - import json as json_library - - config_error_observed = False - - # Get the project ID - # from project configuration - project_config = gcp.get_gcp_configuration( - gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' - if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] - print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) - print('Following account(s) are active under this pipeline:\n') - subprocess.run(['gcloud', 'auth', 'list']) - print('\n') - else: - print( - 'Project configuration is not accessible with error %s\n' % - (project_config.stderr), - file=sys.stderr) - config_error_observed = True - - # Get project buckets - get_project_bucket_results = gcp.get_gcp_configuration( - gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) - - if get_project_bucket_results.has_error: - print( - 'could not retrieve project buckets with error: %s' % - (get_project_bucket_results.stderr), - file=sys.stderr) - config_error_observed = True - - # Get the root of the user provided bucket i.e. gs://root. - bucket_root = '/'.join(bucket.split('/')[0:3]) - - print( - 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % - (bucket)) - - if bucket_root in get_project_bucket_results.json_output: - print('Provided bucket \n %s\nis accessible within the project\n %s\n' % - (bucket, project_id)) - - else: - print('Could not find the bucket %s in project %s' % (bucket, project_id) + - 'Please verify that you have provided the correct GCS bucket name.\n' + - 'Only the following buckets are visible in this project:\n%s' % - (get_project_bucket_results.parsed_output), - file=sys.stderr) - config_error_observed = True - - # Verify APIs that are required are enabled - api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) - - api_status = {} - - if api_config_results.has_error: - print('could not retrieve API status with error: %s' % - (api_config_results.stderr),sys.stderr) - config_error_observed = True - - print('Checking APIs status ...') - for item in api_config_results.parsed_output: - api_status[item['config']['name']] = item['state'] - # printing the results in stdout for logging purposes - print('%s %s' % (item['config']['name'], item['state'])) - - # Check if target apis are enabled - api_check_results = True - error_list = [] - for api in target_apis.replace(' ', '').split(','): - if 'ENABLED' != api_status.get(api, 'DISABLED'): - api_check_results = False - error_list.append( - 'API \"%s\" is not enabled. To enable this api go to ' % (api) + - 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id)) - - if not api_check_results: - config_error_observed = True - print( - 'Required APIs are not enabled:\n' + '\n'.join(error_list), - file=sys.stderr) - - if 'HALT_ON_ERROR' in execution_mode and config_error_observed: - raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ - 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ - '\n'+ - 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ - 'checkbox during cluster creation to have the cluster configured automatically.\n'+ - 'For more information on this and other troubleshooting instructions refer to\n'+ - 'our troubleshooting guide.\n'+ - '\n'+ - 'If you have intentionally modified the cluster configuration, you may\n'+ - 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') - - return (project_id, bucket) - - -if __name__ == '__main__': - import kfp.components as comp - - comp.func_to_container_op( - run_diagnose_me, - base_image='google/cloud-sdk:276.0.0', - output_component_file='component.yaml', - ) +name: Run diagnose me +description: |- + Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. +implementation: + container: + image: google/cloud-sdk:276.0.0 + command: + - python3 + - -u + - -c + - | + from typing import NamedTuple + + def run_diagnose_me( + bucket: str, execution_mode: str, target_apis: str + ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): + """ Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. + """ + + # Installing pip3 and kfp, since the base image 'google/cloud-sdk:276.0.0' + # does not come with pip3 pre-installed. + import subprocess + subprocess.run( + ['curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'], + capture_output=True) + subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'], + capture_output=True) + subprocess.run(['python3', 'get-pip.py'], capture_output=True) + subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'], + capture_output=True) + + import sys + from typing import List, Text + import os + from kfp.cli.diagnose_me import gcp + from kfp.cli.diagnose_me import utility + import json as json_library + + config_error_observed = False + + # Get the project ID + # from project configuration + project_config = gcp.get_gcp_configuration( + gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) + project_id = '' + if not project_config.has_error: + project_id = project_config.parsed_output['core']['project'] + print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) + print('Following account(s) are active under this pipeline:\n') + subprocess.run(['gcloud', 'auth', 'list']) + print('\n') + else: + print( + 'Project configuration is not accessible with error %s\n' % + (project_config.stderr), + file=sys.stderr) + config_error_observed = True + + # Get project buckets + get_project_bucket_results = gcp.get_gcp_configuration( + gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) + + if get_project_bucket_results.has_error: + print( + 'could not retrieve project buckets with error: %s' % + (get_project_bucket_results.stderr), + file=sys.stderr) + config_error_observed = True + + # Get the root of the user provided bucket i.e. gs://root. + bucket_root = '/'.join(bucket.split('/')[0:3]) + + print( + 'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' % + (bucket)) + + if bucket_root in get_project_bucket_results.json_output: + print('Provided bucket \n %s\nis accessible within the project\n %s\n' % + (bucket, project_id)) + + else: + print('Could not find the bucket %s in project %s' % (bucket, project_id) + + 'Please verify that you have provided the correct GCS bucket name.\n' + + 'Only the following buckets are visible in this project:\n%s' % + (get_project_bucket_results.parsed_output), + file=sys.stderr) + config_error_observed = True + + # Verify APIs that are required are enabled + api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS) + + api_status = {} + + if api_config_results.has_error: + print('could not retrieve API status with error: %s' % + (api_config_results.stderr),sys.stderr) + config_error_observed = True + + print('Checking APIs status ...') + for item in api_config_results.parsed_output: + api_status[item['config']['name']] = item['state'] + # printing the results in stdout for logging purposes + print('%s %s' % (item['config']['name'], item['state'])) + + # Check if target apis are enabled + api_check_results = True + for api in target_apis.replace(' ', '').split(','): + if 'ENABLED' != api_status.get(api, 'DISABLED'): + api_check_results = False + print( + 'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) + + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % + (api, project_id),sys.stderr) + config_error_observed = True + + if 'HALT_ON_ERROR' in execution_mode and config_error_observed: + raise RuntimeError('There was an error in your environment configuration.\n'+ + 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ + '\n'+ + 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ + 'checkbox during cluster creation to have the cluster configured automatically.\n'+ + 'For more information on this and other troubleshooting instructions refer to\n'+ + 'our troubleshooting guide.\n'+ + '\n'+ + 'If you have intentionally modified the cluster configuration, you may\n'+ + 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') + + return (project_id, bucket) + + def _serialize_str(str_value: str) -> str: + if not isinstance(str_value, str): + raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) + return str_value + + import argparse + _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.') + _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = run_diagnose_me(**_parsed_args) + + if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str): + _outputs = [_outputs] + + _output_serializers = [ + _serialize_str, + str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --bucket + - {inputValue: bucket} + - --execution-mode + - {inputValue: execution_mode} + - --target-apis + - {inputValue: target_apis} + - '----output-paths' + - {outputPath: bucket} + - {outputPath: project_id} +inputs: +- {name: bucket, type: String} +- {name: execution_mode, type: String} +- {name: target_apis, type: String} +outputs: +- {name: bucket, type: String} +- {name: project_id, type: GCPProjectID} From f1d124da952dda27a4da2e2249792cc998d9c53b Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 14:30:39 -0800 Subject: [PATCH 10/14] Adding support for project_id verification --- .../diagnostics/diagnose_me/component.py | 17 ++++-- .../diagnostics/diagnose_me/component.yaml | 61 +++++++++++-------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index b92ef766b92..d97d810adb4 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -16,8 +16,8 @@ def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str -) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): + bucket: str, execution_mode: str, project_id:str , target_apis: str +) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): """ Performs environment verification specific to this pipeline. args: @@ -61,9 +61,8 @@ def run_diagnose_me( # from project configuration project_config = gcp.get_gcp_configuration( gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] + auth_project_id = project_config.parsed_output['core']['project'] print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) print('Following account(s) are active under this pipeline:\n') subprocess.run(['gcloud', 'auth', 'list']) @@ -75,6 +74,12 @@ def run_diagnose_me( file=sys.stderr) config_error_observed = True + if auth_project_id is not project_id: + print( + 'User provided project ID %s does not match the configuration %s\n' % + (project_id,auth_project_id), file=sys.stderr) + config_error_observed = True + # Get project buckets get_project_bucket_results = gcp.get_gcp_configuration( gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) @@ -112,7 +117,7 @@ def run_diagnose_me( if api_config_results.has_error: print('could not retrieve API status with error: %s' % - (api_config_results.stderr),sys.stderr) + (api_config_results.stderr),file=sys.stderr) config_error_observed = True print('Checking APIs status ...') @@ -129,7 +134,7 @@ def run_diagnose_me( print( 'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id),sys.stderr) + (api, project_id),file=sys.stderr) config_error_observed = True if 'HALT_ON_ERROR' in execution_mode and config_error_observed: diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index b03a4085906..bb615496961 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -1,24 +1,9 @@ name: Run diagnose me -description: |- - Performs environment verification specific to this pipeline. - - args: - bucket: - string name of the bucket to be checked. Must be of the format - gs://bucket_root/any/path/here/is/ignored where any path beyond root - is ignored. - execution_mode: - If set to HALT_ON_ERROR will case any error to raise an exception. - This is intended to stop the data processing of a pipeline. Can set - to False to only report Errors/Warnings. - target_apis: - String consisting of a comma separated list of apis to be verified. - Raises: - RuntimeError: If configuration is not setup properly and - HALT_ON_ERROR flag is set. +outputs: +- {name: bucket, type: String} +- {name: project_id, type: String} implementation: container: - image: google/cloud-sdk:276.0.0 command: - python3 - -u @@ -27,8 +12,8 @@ implementation: from typing import NamedTuple def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str - ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', 'GCPProjectID')]): + bucket: str, execution_mode: str, project_id:str , target_apis: str + ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): """ Performs environment verification specific to this pipeline. args: @@ -72,9 +57,8 @@ implementation: # from project configuration project_config = gcp.get_gcp_configuration( gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] + auth_project_id = project_config.parsed_output['core']['project'] print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) print('Following account(s) are active under this pipeline:\n') subprocess.run(['gcloud', 'auth', 'list']) @@ -86,6 +70,12 @@ implementation: file=sys.stderr) config_error_observed = True + if auth_project_id is not project_id: + print( + 'User provided project ID %s does not match the configuration %s\n' % + (project_id,auth_project_id), sys.stderr) + config_error_observed = True + # Get project buckets get_project_bucket_results = gcp.get_gcp_configuration( gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) @@ -166,6 +156,7 @@ implementation: _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.') _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--project-id", dest="project_id", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) _parsed_args = vars(_parser.parse_args()) @@ -178,7 +169,7 @@ implementation: _output_serializers = [ _serialize_str, - str, + _serialize_str, ] @@ -195,15 +186,33 @@ implementation: - {inputValue: bucket} - --execution-mode - {inputValue: execution_mode} + - --project-id + - {inputValue: project_id} - --target-apis - {inputValue: target_apis} - '----output-paths' - {outputPath: bucket} - {outputPath: project_id} + image: google/cloud-sdk:276.0.0 +description: |- + Performs environment verification specific to this pipeline. + + args: + bucket: + string name of the bucket to be checked. Must be of the format + gs://bucket_root/any/path/here/is/ignored where any path beyond root + is ignored. + execution_mode: + If set to HALT_ON_ERROR will case any error to raise an exception. + This is intended to stop the data processing of a pipeline. Can set + to False to only report Errors/Warnings. + target_apis: + String consisting of a comma separated list of apis to be verified. + Raises: + RuntimeError: If configuration is not setup properly and + HALT_ON_ERROR flag is set. inputs: - {name: bucket, type: String} - {name: execution_mode, type: String} +- {name: project_id, type: String} - {name: target_apis, type: String} -outputs: -- {name: bucket, type: String} -- {name: project_id, type: GCPProjectID} From 5b37c00e446df0066fa59d840f90ce34faebec93 Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 14:39:11 -0800 Subject: [PATCH 11/14] updating xgboost to add verification component --- .../xgboost_training_cm/xgboost_training_cm.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/samples/core/xgboost_training_cm/xgboost_training_cm.py b/samples/core/xgboost_training_cm/xgboost_training_cm.py index 6b1742ef2c0..f463cccd514 100644 --- a/samples/core/xgboost_training_cm/xgboost_training_cm.py +++ b/samples/core/xgboost_training_cm/xgboost_training_cm.py @@ -21,6 +21,9 @@ import os import subprocess +diagnose_me_op = components.load_component_from_url( + 'https://raw.githubusercontent.com/kubeflow/pipelines/f1d124da952dda27a4da2e2249792cc998d9c53b/components/diagnostics/diagnose_me/component.yaml') + confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml') roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/roc/component.yaml') @@ -210,6 +213,8 @@ def xgb_train_pipeline( eval_data='gs://ml-pipeline-playground/sfpd/eval.csv', schema='gs://ml-pipeline-playground/sfpd/schema.json', target='resolution', + execution_mode='HALT_ON_ERROR', + required_apis='stackdriver.googleapis.com, storage-api.googleapis.com, bigquery.googleapis.com, dataflow.googleapis.com, dataproc.googleapis.com', rounds=200, workers=2, true_label='ACTION', @@ -223,7 +228,13 @@ def xgb_train_pipeline( transform_output_eval = os.path.join(output_template, 'eval', 'part-*') train_output = os.path.join(output_template, 'train_output') predict_output = os.path.join(output_template, 'predict_output') - + + _diagnose_me_op = diagnose_me_op( + bucket=output, + execution_mode=execution_mode, + project_id=project, + target_apis=required_apis) + with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op( project_id=project, region=region, @@ -238,7 +249,7 @@ def xgb_train_pipeline( 'initialization_actions.sh'), ], image_version='1.2' - ) + ).after(_diagnose_me_op) _analyze_op = dataproc_analyze_op( project=project, From ef6be6ed81919706399672fc180304b4f00e65fe Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 16:13:43 -0800 Subject: [PATCH 12/14] chang project name validation --- components/diagnostics/diagnose_me/component.py | 2 +- components/diagnostics/diagnose_me/component.yaml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index d97d810adb4..79c46546e09 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -74,7 +74,7 @@ def run_diagnose_me( file=sys.stderr) config_error_observed = True - if auth_project_id is not project_id: + if auth_project_id != project_id: print( 'User provided project ID %s does not match the configuration %s\n' % (project_id,auth_project_id), file=sys.stderr) diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index bb615496961..d9892d2e4b7 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -70,10 +70,10 @@ implementation: file=sys.stderr) config_error_observed = True - if auth_project_id is not project_id: + if auth_project_id != project_id: print( 'User provided project ID %s does not match the configuration %s\n' % - (project_id,auth_project_id), sys.stderr) + (project_id,auth_project_id), file=sys.stderr) config_error_observed = True # Get project buckets @@ -113,7 +113,7 @@ implementation: if api_config_results.has_error: print('could not retrieve API status with error: %s' % - (api_config_results.stderr),sys.stderr) + (api_config_results.stderr),file=sys.stderr) config_error_observed = True print('Checking APIs status ...') @@ -130,7 +130,7 @@ implementation: print( 'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id),sys.stderr) + (api, project_id),file=sys.stderr) config_error_observed = True if 'HALT_ON_ERROR' in execution_mode and config_error_observed: @@ -215,4 +215,4 @@ inputs: - {name: bucket, type: String} - {name: execution_mode, type: String} - {name: project_id, type: String} -- {name: target_apis, type: String} +- {name: target_apis, type: String} \ No newline at end of file From 8c8ab0f15b65292b545ac034c18cbbcc74bc1f7f Mon Sep 17 00:00:00 2001 From: Sina Chavoshi Date: Fri, 31 Jan 2020 16:14:56 -0800 Subject: [PATCH 13/14] chang project name validation --- components/diagnostics/diagnose_me/component.py | 2 +- components/diagnostics/diagnose_me/component.yaml | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index d97d810adb4..79c46546e09 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -74,7 +74,7 @@ def run_diagnose_me( file=sys.stderr) config_error_observed = True - if auth_project_id is not project_id: + if auth_project_id != project_id: print( 'User provided project ID %s does not match the configuration %s\n' % (project_id,auth_project_id), file=sys.stderr) diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index bb615496961..d9892d2e4b7 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -70,10 +70,10 @@ implementation: file=sys.stderr) config_error_observed = True - if auth_project_id is not project_id: + if auth_project_id != project_id: print( 'User provided project ID %s does not match the configuration %s\n' % - (project_id,auth_project_id), sys.stderr) + (project_id,auth_project_id), file=sys.stderr) config_error_observed = True # Get project buckets @@ -113,7 +113,7 @@ implementation: if api_config_results.has_error: print('could not retrieve API status with error: %s' % - (api_config_results.stderr),sys.stderr) + (api_config_results.stderr),file=sys.stderr) config_error_observed = True print('Checking APIs status ...') @@ -130,7 +130,7 @@ implementation: print( 'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id),sys.stderr) + (api, project_id),file=sys.stderr) config_error_observed = True if 'HALT_ON_ERROR' in execution_mode and config_error_observed: @@ -215,4 +215,4 @@ inputs: - {name: bucket, type: String} - {name: execution_mode, type: String} - {name: project_id, type: String} -- {name: target_apis, type: String} +- {name: target_apis, type: String} \ No newline at end of file From e12158674ab1eb9f5f92a0ff21826acbddb5b39f Mon Sep 17 00:00:00 2001 From: sina chavoshi Date: Fri, 31 Jan 2020 17:41:03 -0800 Subject: [PATCH 14/14] changing the component to load from latest yaml --- samples/core/xgboost_training_cm/xgboost_training_cm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/core/xgboost_training_cm/xgboost_training_cm.py b/samples/core/xgboost_training_cm/xgboost_training_cm.py index f463cccd514..f18c5b67336 100644 --- a/samples/core/xgboost_training_cm/xgboost_training_cm.py +++ b/samples/core/xgboost_training_cm/xgboost_training_cm.py @@ -22,7 +22,7 @@ import subprocess diagnose_me_op = components.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/pipelines/f1d124da952dda27a4da2e2249792cc998d9c53b/components/diagnostics/diagnose_me/component.yaml') + 'https://raw.githubusercontent.com/kubeflow/pipelines/df450617af6e385da8c436628afafb1c76ca6c79/components/diagnostics/diagnose_me/component.yaml') confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml')