Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding diagnose me component to XGboost sample #2953

Merged
merged 17 commits into from
Feb 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions components/diagnostics/diagnose_me/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


def run_diagnose_me(
bucket: str, execution_mode: str, target_apis: str
bucket: str, execution_mode: str, project_id:str , target_apis: str
) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]):
""" Performs environment verification specific to this pipeline.

Expand All @@ -29,7 +29,7 @@ def run_diagnose_me(
If set to HALT_ON_ERROR will case any error to raise an exception.
This is intended to stop the data processing of a pipeline. Can set
to False to only report Errors/Warnings.
target_apis:
target_apis:
String consisting of a comma separated list of apis to be verified.
Raises:
RuntimeError: If configuration is not setup properly and
Expand Down Expand Up @@ -61,9 +61,8 @@ def run_diagnose_me(
# from project configuration
project_config = gcp.get_gcp_configuration(
gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False)
project_id = ''
if not project_config.has_error:
project_id = project_config.parsed_output['core']['project']
auth_project_id = project_config.parsed_output['core']['project']
print('GCP credentials are configured with access to project: %s ...\n' % (project_id))
print('Following account(s) are active under this pipeline:\n')
subprocess.run(['gcloud', 'auth', 'list'])
Expand All @@ -75,6 +74,12 @@ def run_diagnose_me(
file=sys.stderr)
config_error_observed = True

if auth_project_id != project_id:
print(
'User provided project ID %s does not match the configuration %s\n' %
(project_id,auth_project_id), file=sys.stderr)
config_error_observed = True

# Get project buckets
get_project_bucket_results = gcp.get_gcp_configuration(
gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False)
Expand Down Expand Up @@ -111,9 +116,10 @@ def run_diagnose_me(
api_status = {}

if api_config_results.has_error:
raise RuntimeError('could not retrieve API status with error: %s' %
(api_config_results.stderr))

print('could not retrieve API status with error: %s' %
(api_config_results.stderr),file=sys.stderr)
config_error_observed = True

print('Checking APIs status ...')
for item in api_config_results.parsed_output:
api_status[item['config']['name']] = item['state']
Expand All @@ -122,33 +128,27 @@ def run_diagnose_me(

# Check if target apis are enabled
api_check_results = True
error_list = []
for api in target_apis.replace(' ', '').split(','):
if 'ENABLED' != api_status.get(api, 'DISABLED'):
api_check_results = False
error_list.append(
'API \"%s\" is not enabled. To enable this api go to ' % (api) +
print(
'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) +
'https://pantheon.corp.google.com/apis/library/%s?project=%s' %
(api, project_id))

if not api_check_results:
config_error_observed = True
print(
'Required APIs are not enabled:\n' + '\n'.join(error_list),
file=sys.stderr)
(api, project_id),file=sys.stderr)
config_error_observed = True

if 'HALT_ON_ERROR' in execution_mode and config_error_observed:
raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+
raise RuntimeError('There was an error in your environment configuration.\n'+
'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+
'\n'+
'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+
'checkbox during cluster creation to have the cluster configured automatically.\n'+
'checkbox during cluster creation to have the cluster configured automatically.\n'+
'For more information on this and other troubleshooting instructions refer to\n'+
'our troubleshooting guide.\n'+
'\n'+
'If you have intentionally modified the cluster configuration, you may\n'+
'If you have intentionally modified the cluster configuration, you may\n'+
'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n')

return (project_id, bucket)


Expand Down
76 changes: 40 additions & 36 deletions components/diagnostics/diagnose_me/component.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
inputs:
- {type: String, name: bucket}
- {type: String, name: execution_mode}
- {type: String, name: target_apis}
name: Run diagnose me
outputs:
- {name: bucket, type: String}
- {name: project_id, type: String}
implementation:
container:
args:
- --bucket
- {inputValue: bucket}
- --execution-mode
- {inputValue: execution_mode}
- --target-apis
- {inputValue: target_apis}
- '----output-paths'
- {outputPath: bucket}
- {outputPath: project_id}
command:
- python3
- -u
Expand All @@ -22,7 +12,7 @@ implementation:
from typing import NamedTuple

def run_diagnose_me(
bucket: str, execution_mode: str, target_apis: str
bucket: str, execution_mode: str, project_id:str , target_apis: str
) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]):
""" Performs environment verification specific to this pipeline.

Expand Down Expand Up @@ -67,9 +57,8 @@ implementation:
# from project configuration
project_config = gcp.get_gcp_configuration(
gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False)
project_id = ''
if not project_config.has_error:
project_id = project_config.parsed_output['core']['project']
auth_project_id = project_config.parsed_output['core']['project']
print('GCP credentials are configured with access to project: %s ...\n' % (project_id))
print('Following account(s) are active under this pipeline:\n')
subprocess.run(['gcloud', 'auth', 'list'])
Expand All @@ -81,6 +70,12 @@ implementation:
file=sys.stderr)
config_error_observed = True

if auth_project_id != project_id:
print(
'User provided project ID %s does not match the configuration %s\n' %
(project_id,auth_project_id), file=sys.stderr)
config_error_observed = True

# Get project buckets
get_project_bucket_results = gcp.get_gcp_configuration(
gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False)
Expand Down Expand Up @@ -117,8 +112,9 @@ implementation:
api_status = {}

if api_config_results.has_error:
raise RuntimeError('could not retrieve API status with error: %s' %
(api_config_results.stderr))
print('could not retrieve API status with error: %s' %
(api_config_results.stderr),file=sys.stderr)
config_error_observed = True

print('Checking APIs status ...')
for item in api_config_results.parsed_output:
Expand All @@ -128,23 +124,17 @@ implementation:

# Check if target apis are enabled
api_check_results = True
error_list = []
for api in target_apis.replace(' ', '').split(','):
if 'ENABLED' != api_status.get(api, 'DISABLED'):
api_check_results = False
error_list.append(
'API \"%s\" is not enabled. To enable this api go to ' % (api) +
print(
'API \"%s\" is not accessible or not enabled. To enable this api go to ' % (api) +
'https://pantheon.corp.google.com/apis/library/%s?project=%s' %
(api, project_id))

if not api_check_results:
config_error_observed = True
print(
'Required APIs are not enabled:\n' + '\n'.join(error_list),
file=sys.stderr)
(api, project_id),file=sys.stderr)
config_error_observed = True

if 'HALT_ON_ERROR' in execution_mode and config_error_observed:
raise RuntimeError('There was an error in your environment configuration. See above for details.\n'+
raise RuntimeError('There was an error in your environment configuration.\n'+
'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'+
'\n'+
'We highly recommend that you recreate the cluster and check "Allow access ..." \n'+
Expand All @@ -163,9 +153,10 @@ implementation:
return str_value

import argparse
_parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.\n')
_parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.')
_parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--project-id", dest="project_id", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
_parsed_args = vars(_parser.parse_args())
Expand All @@ -190,8 +181,20 @@ implementation:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
args:
- --bucket
- {inputValue: bucket}
- --execution-mode
- {inputValue: execution_mode}
- --project-id
- {inputValue: project_id}
- --target-apis
- {inputValue: target_apis}
- '----output-paths'
- {outputPath: bucket}
- {outputPath: project_id}
image: google/cloud-sdk:276.0.0
description: |
description: |-
Performs environment verification specific to this pipeline.

args:
Expand All @@ -208,7 +211,8 @@ description: |
Raises:
RuntimeError: If configuration is not setup properly and
HALT_ON_ERROR flag is set.
name: Run diagnose me
outputs:
- {type: String, name: bucket}
- {type: String, name: project_id}
inputs:
- {name: bucket, type: String}
- {name: execution_mode, type: String}
- {name: project_id, type: String}
- {name: target_apis, type: String}
15 changes: 13 additions & 2 deletions samples/core/xgboost_training_cm/xgboost_training_cm.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import os
import subprocess

diagnose_me_op = components.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/df450617af6e385da8c436628afafb1c76ca6c79/components/diagnostics/diagnose_me/component.yaml')

confusion_matrix_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/confusion_matrix/component.yaml')

roc_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/ff116b6f1a0f0cdaafb64fcd04214c169045e6fc/components/local/roc/component.yaml')
Expand Down Expand Up @@ -210,6 +213,8 @@ def xgb_train_pipeline(
eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
schema='gs://ml-pipeline-playground/sfpd/schema.json',
target='resolution',
execution_mode='HALT_ON_ERROR',
required_apis='stackdriver.googleapis.com, storage-api.googleapis.com, bigquery.googleapis.com, dataflow.googleapis.com, dataproc.googleapis.com',
rounds=200,
workers=2,
true_label='ACTION',
Expand All @@ -223,7 +228,13 @@ def xgb_train_pipeline(
transform_output_eval = os.path.join(output_template, 'eval', 'part-*')
train_output = os.path.join(output_template, 'train_output')
predict_output = os.path.join(output_template, 'predict_output')


_diagnose_me_op = diagnose_me_op(
bucket=output,
execution_mode=execution_mode,
project_id=project,
target_apis=required_apis)

with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op(
project_id=project,
region=region,
Expand All @@ -238,7 +249,7 @@ def xgb_train_pipeline(
'initialization_actions.sh'),
],
image_version='1.2'
)
).after(_diagnose_me_op)

_analyze_op = dataproc_analyze_op(
project=project,
Expand Down