diff --git a/components/diagnostics/diagnose_me/component.py b/components/diagnostics/diagnose_me/component.py index 16ce8b98498..79c46546e09 100644 --- a/components/diagnostics/diagnose_me/component.py +++ b/components/diagnostics/diagnose_me/component.py @@ -16,7 +16,7 @@ def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str + bucket: str, execution_mode: str, project_id:str , target_apis: str ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): """ Performs environment verification specific to this pipeline. @@ -29,7 +29,7 @@ def run_diagnose_me( If set to HALT_ON_ERROR will case any error to raise an exception. This is intended to stop the data processing of a pipeline. Can set to False to only report Errors/Warnings. - target_apis: + target_apis: String consisting of a comma separated list of apis to be verified. Raises: RuntimeError: If configuration is not setup properly and @@ -61,9 +61,8 @@ def run_diagnose_me( # from project configuration project_config = gcp.get_gcp_configuration( gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] + auth_project_id = project_config.parsed_output['core']['project'] print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) print('Following account(s) are active under this pipeline:\n') subprocess.run(['gcloud', 'auth', 'list']) @@ -75,6 +74,12 @@ def run_diagnose_me( file=sys.stderr) config_error_observed = True + if auth_project_id != project_id: + print( + 'User provided project ID %s does not match the configuration %s\n' % + (project_id,auth_project_id), file=sys.stderr) + config_error_observed = True + # Get project buckets get_project_bucket_results = gcp.get_gcp_configuration( gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) @@ -111,9 +116,10 @@ def run_diagnose_me( api_status = {} if api_config_results.has_error: - raise RuntimeError('could not retrieve API status with error: %s' % - (api_config_results.stderr)) - + print('could not retrieve API status with error: %s' % + (api_config_results.stderr),file=sys.stderr) + config_error_observed = True + print('Checking APIs status ...') for item in api_config_results.parsed_output: api_status[item['config']['name']] = item['state'] @@ -122,33 +128,27 @@ def run_diagnose_me( # Check if target apis are enabled api_check_results = True - error_list = [] for api in target_apis.replace(' ', '').split(','): if 'ENABLED' != api_status.get(api, 'DISABLED'): api_check_results = False - error_list.append( - 'API \"%s\" is not enabled. To enable this api go to ' % (api) + + print( + 'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id)) - - if not api_check_results: - config_error_observed = True - print( - 'Required APIs are not enabled:\n' + '\n'.join(error_list), - file=sys.stderr) + (api, project_id),file=sys.stderr) + config_error_observed = True if 'HALT_ON_ERROR' in execution_mode and config_error_observed: - raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ + raise RuntimeError('There was an error in your environment configuration.\n'+ 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ '\n'+ 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ - 'checkbox during cluster creation to have the cluster configured automatically.\n'+ + 'checkbox during cluster creation to have the cluster configured automatically.\n'+ 'For more information on this and other troubleshooting instructions refer to\n'+ 'our troubleshooting guide.\n'+ '\n'+ - 'If you have intentionally modified the cluster configuration, you may\n'+ + 'If you have intentionally modified the cluster configuration, you may\n'+ 'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n') - + return (project_id, bucket) diff --git a/components/diagnostics/diagnose_me/component.yaml b/components/diagnostics/diagnose_me/component.yaml index 5823bbf20ec..d9892d2e4b7 100644 --- a/components/diagnostics/diagnose_me/component.yaml +++ b/components/diagnostics/diagnose_me/component.yaml @@ -1,19 +1,9 @@ -inputs: -- {type: String, name: bucket} -- {type: String, name: execution_mode} -- {type: String, name: target_apis} +name: Run diagnose me +outputs: +- {name: bucket, type: String} +- {name: project_id, type: String} implementation: container: - args: - - --bucket - - {inputValue: bucket} - - --execution-mode - - {inputValue: execution_mode} - - --target-apis - - {inputValue: target_apis} - - '----output-paths' - - {outputPath: bucket} - - {outputPath: project_id} command: - python3 - -u @@ -22,7 +12,7 @@ implementation: from typing import NamedTuple def run_diagnose_me( - bucket: str, execution_mode: str, target_apis: str + bucket: str, execution_mode: str, project_id:str , target_apis: str ) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]): """ Performs environment verification specific to this pipeline. @@ -67,9 +57,8 @@ implementation: # from project configuration project_config = gcp.get_gcp_configuration( gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False) - project_id = '' if not project_config.has_error: - project_id = project_config.parsed_output['core']['project'] + auth_project_id = project_config.parsed_output['core']['project'] print('GCP credentials are configured with access to project: %s ...\n' % (project_id)) print('Following account(s) are active under this pipeline:\n') subprocess.run(['gcloud', 'auth', 'list']) @@ -81,6 +70,12 @@ implementation: file=sys.stderr) config_error_observed = True + if auth_project_id != project_id: + print( + 'User provided project ID %s does not match the configuration %s\n' % + (project_id,auth_project_id), file=sys.stderr) + config_error_observed = True + # Get project buckets get_project_bucket_results = gcp.get_gcp_configuration( gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False) @@ -117,8 +112,9 @@ implementation: api_status = {} if api_config_results.has_error: - raise RuntimeError('could not retrieve API status with error: %s' % - (api_config_results.stderr)) + print('could not retrieve API status with error: %s' % + (api_config_results.stderr),file=sys.stderr) + config_error_observed = True print('Checking APIs status ...') for item in api_config_results.parsed_output: @@ -128,23 +124,17 @@ implementation: # Check if target apis are enabled api_check_results = True - error_list = [] for api in target_apis.replace(' ', '').split(','): if 'ENABLED' != api_status.get(api, 'DISABLED'): api_check_results = False - error_list.append( - 'API \"%s\" is not enabled. To enable this api go to ' % (api) + + print( + 'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) + 'https://pantheon.corp.google.com/apis/library/%s?project=%s' % - (api, project_id)) - - if not api_check_results: - config_error_observed = True - print( - 'Required APIs are not enabled:\n' + '\n'.join(error_list), - file=sys.stderr) + (api, project_id),file=sys.stderr) + config_error_observed = True if 'HALT_ON_ERROR' in execution_mode and config_error_observed: - raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+ + raise RuntimeError('There was an error in your environment configuration.\n'+ 'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+ '\n'+ 'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+ @@ -163,9 +153,10 @@ implementation: return str_value import argparse - _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.\n') + _parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.') _parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--project-id", dest="project_id", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS) _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) _parsed_args = vars(_parser.parse_args()) @@ -190,8 +181,20 @@ implementation: pass with open(output_file, 'w') as f: f.write(_output_serializers[idx](_outputs[idx])) + args: + - --bucket + - {inputValue: bucket} + - --execution-mode + - {inputValue: execution_mode} + - --project-id + - {inputValue: project_id} + - --target-apis + - {inputValue: target_apis} + - '----output-paths' + - {outputPath: bucket} + - {outputPath: project_id} image: google/cloud-sdk:276.0.0 -description: | +description: |- Performs environment verification specific to this pipeline. args: @@ -208,7 +211,8 @@ description: | Raises: RuntimeError: If configuration is not setup properly and HALT_ON_ERROR flag is set. -name: Run diagnose me -outputs: -- {type: String, name: bucket} -- {type: String, name: project_id} +inputs: +- {name: bucket, type: String} +- {name: execution_mode, type: String} +- {name: project_id, type: String} +- {name: target_apis, type: String} \ No newline at end of file diff --git a/samples/core/xgboost_training_cm/xgboost_training_cm.py b/samples/core/xgboost_training_cm/xgboost_training_cm.py old mode 100755 new mode 100644 index 6b1742ef2c0..f18c5b67336 --- a/samples/core/xgboost_training_cm/xgboost_training_cm.py +++ b/samples/core/xgboost_training_cm/xgboost_training_cm.py @@ -21,6 +21,9 @@ import os import subprocess +diagnose_me_op = components.load_component_from_url( + 'https://raw.githubusercontent.com/kubeflow/pipelines/df450617af6e385da8c436628afafb1c76ca6c79/components/diagnostics/diagnose_me/component.yaml') + confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml') roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/roc/component.yaml') @@ -210,6 +213,8 @@ def xgb_train_pipeline( eval_data='gs://ml-pipeline-playground/sfpd/eval.csv', schema='gs://ml-pipeline-playground/sfpd/schema.json', target='resolution', + execution_mode='HALT_ON_ERROR', + required_apis='stackdriver.googleapis.com, storage-api.googleapis.com, bigquery.googleapis.com, dataflow.googleapis.com, dataproc.googleapis.com', rounds=200, workers=2, true_label='ACTION', @@ -223,7 +228,13 @@ def xgb_train_pipeline( transform_output_eval = os.path.join(output_template, 'eval', 'part-*') train_output = os.path.join(output_template, 'train_output') predict_output = os.path.join(output_template, 'predict_output') - + + _diagnose_me_op = diagnose_me_op( + bucket=output, + execution_mode=execution_mode, + project_id=project, + target_apis=required_apis) + with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op( project_id=project, region=region, @@ -238,7 +249,7 @@ def xgb_train_pipeline( 'initialization_actions.sh'), ], image_version='1.2' - ) + ).after(_diagnose_me_op) _analyze_op = dataproc_analyze_op( project=project,