Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK][Multi-user] refine sdk for multi-user support #3417

Merged
merged 5 commits into from
Apr 3, 2020
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 50 additions & 38 deletions sdk/python/kfp/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def camel_case_to_snake_case(name):
KF_PIPELINES_DEFAULT_EXPERIMENT_NAME = 'KF_PIPELINES_DEFAULT_EXPERIMENT_NAME'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ark-kun I think we could move these environment variable settings to the file-based settings as well (of course in a separate PR later). WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JFYI: This variables are used by the sample tests.

KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME = 'KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME'


class Client(object):
""" API Client for KubeFlow Pipeline.
"""
Expand All @@ -82,6 +83,11 @@ class Client(object):
IN_CLUSTER_DNS_NAME = 'ml-pipeline.{}.svc.cluster.local:8888'
KUBE_PROXY_PATH = 'api/v1/namespaces/{}/services/ml-pipeline:http/proxy/'

LOCAL_KFP_CONTEXT = os.path.expanduser('~/.config/kfp/context.json')
context_setting = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see writes to this variable, but I see no reads. How is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is read by get_user_namespace, see line 241 below.

'namespace': '',
}

# TODO: Wrap the configurations for different authentication methods.
def __init__(self, host=None, client_id=None, namespace='kubeflow', other_client_id=None, other_client_secret=None, existing_token=None):
"""Create a new instance of kfp client.
Expand Down Expand Up @@ -214,18 +220,38 @@ def _get_url_prefix(self):
# In-cluster pod. We could use relative URL.
return '/pipeline'

def create_experiment(self, name, description=None, namespace=''):
def set_user_namespace(self, namespace):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems to be a little confusing.
It's an instance method, but it changes a global setting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed context_setting to an instance variable.

"""Set user namespace into local context setting file.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd recommend call out this function is a multi-user only function, instead of messaging it in the "namespace" arg below.
"... This setting is only applicable when Kubeflow Pipelines is in the multi-user mode.
If this is set in single-user mode, this setting will be ignored."

(BTW what would happen if user set this, or passing namespace when the cluster is actually single-user mode? Will it be ignored?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the docstring with callout: "This function should only be used when Kubeflow Pipelines is in the multi-user mode.".
Looks like per current backend implementation, if a namespace value if passed to the backend, it will not be ignored. The namespace value will end up being stored in the experiment table. Maybe we should error out in the backend?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah sounds good. let's error it out

This function should only be used when Kubeflow Pipelines is in the multi-user mode.
Args:
namespace: kubernetes namespace the user has access to.
"""
self.context_setting['namespace'] = namespace
with open(self.LOCAL_KFP_CONTEXT, 'w') as f:
json.dump(self.context_setting, f)

def get_user_namespace(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems to be a little confusing.
It's an a getter method, but it actually changes the client state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a valid concern. My thought was to refresh from the file every time we read a value.
Maybe I should move the file-read part to the class initialization part, so this get can read only from memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the file read part to a separate method, called during init.

"""Get user namespace in context config.
Returns:
namespace: kubernetes namespace from the local context file or empty if it wasn't set.
"""
if os.path.exists(self.LOCAL_KFP_CONTEXT):
with open(self.LOCAL_KFP_CONTEXT, 'r') as f:
self.context_setting = json.load(f)
return self.context_setting['namespace']

def create_experiment(self, name, description=None, namespace=None):
"""Create a new experiment.
Args:
name: the name of the experiment.
description: description of the experiment.
namespace: kubernetes namespace where the experiment should be created.
For single user deployment, leave it as empty;
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
Returns:
An Experiment object. Most important field is id.
"""

namespace = namespace or self.get_user_namespace()
experiment = None
try:
experiment = self.get_experiment(experiment_name=name, namespace=namespace)
Expand All @@ -237,7 +263,7 @@ def create_experiment(self, name, description=None, namespace=''):
logging.info('Creating experiment {}.'.format(name))

resource_references = []
if namespace is not None:
if namespace:
key = kfp_server_api.models.ApiResourceKey(id=namespace, type=kfp_server_api.models.ApiResourceType.NAMESPACE)
reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
Expand All @@ -256,18 +282,19 @@ def create_experiment(self, name, description=None, namespace=''):
IPython.display.display(IPython.display.HTML(html))
return experiment

def list_experiments(self, page_token='', page_size=10, sort_by='', namespace=''):
def list_experiments(self, page_token='', page_size=10, sort_by='', namespace=None):
"""List experiments.
Args:
page_token: token for starting of the page.
page_size: size of the page.
sort_by: can be '[field_name]', '[field_name] des'. For example, 'name des'.
namespace: kubernetes namespace where the experiment was created.
For single user deployment, leave it as empty;
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
Returns:
A response object including a list of experiments and next page token.
"""
namespace = namespace or self.get_user_namespace()
response = self._experiment_api.list_experiment(
page_token=page_token,
page_size=page_size,
Expand All @@ -276,20 +303,21 @@ def list_experiments(self, page_token='', page_size=10, sort_by='', namespace=''
resource_reference_key_id=namespace)
return response

def get_experiment(self, experiment_id=None, experiment_name=None, namespace=''):
def get_experiment(self, experiment_id=None, experiment_name=None, namespace=None):
"""Get details of an experiment
Either experiment_id or experiment_name is required
Args:
experiment_id: id of the experiment. (Optional)
experiment_name: name of the experiment. (Optional)
namespace: kubernetes namespace where the experiment was created.
For single user deployment, leave it as empty;
For single user deployment, leave it as None;
For multi user, input the namespace where the user is authorized.
Returns:
A response object including details of a experiment.
Throws:
Exception if experiment is not found or None of the arguments is provided
"""
namespace = namespace or self.get_user_namespace()
if experiment_id is None and experiment_name is None:
raise ValueError('Either experiment_id or experiment_name is required')
if experiment_id is not None:
Expand Down Expand Up @@ -345,7 +373,7 @@ def list_pipelines(self, page_token='', page_size=10, sort_by=''):
return self._pipelines_api.list_pipelines(page_token=page_token, page_size=page_size, sort_by=sort_by)

# TODO: provide default namespace, similar to kubectl default namespaces.
def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None, namespace=None):
def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None):
"""Run a specified pipeline.

Args:
Expand All @@ -354,9 +382,6 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para
pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml).
params: a dictionary with key (string) as param name and value (string) as as param value.
pipeline_id: the string ID of a pipeline.
namespace: kubernetes namespace where the pipeline runs are created.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized

Returns:
A run object. Most important field is id.
Expand All @@ -369,26 +394,18 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para
api_params = [kfp_server_api.ApiParameter(
name=sanitize_k8s_name(name=k, allow_capital_underscore=True),
value=str(v)) for k,v in params.items()]
resource_references = []

key = kfp_server_api.models.ApiResourceKey(id=experiment_id,
type=kfp_server_api.models.ApiResourceType.EXPERIMENT)
reference = kfp_server_api.models.ApiResourceReference(key=key,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
if namespace is not None:
key = kfp_server_api.models.ApiResourceKey(id=namespace,
type=kfp_server_api.models.ApiResourceType.NAMESPACE)
reference = kfp_server_api.models.ApiResourceReference(key=key,
name=namespace,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)

spec = kfp_server_api.models.ApiPipelineSpec(
pipeline_id=pipeline_id,
workflow_manifest=pipeline_json_string,
parameters=api_params)
run_body = kfp_server_api.models.ApiRun(
pipeline_spec=spec, resource_references=resource_references, name=job_name)
pipeline_spec=spec, resource_references=[reference], name=job_name)

response = self._run_api.create_run(body=run_body)

Expand Down Expand Up @@ -458,12 +475,12 @@ def __repr__(self):
warnings.warn('Changing experiment name from "{}" to "{}".'.format(experiment_name, overridden_experiment_name))
experiment_name = overridden_experiment_name or 'Default'
run_name = run_name or pipeline_name + ' ' + datetime.now().strftime('%Y-%m-%d %H-%M-%S')
experiment = self.create_experiment(name=experiment_name)
run_info = self.run_pipeline(experiment.id, run_name, pipeline_file, arguments, namespace=namespace)
experiment = self.create_experiment(name=experiment_name, namespace=namespace)
run_info = self.run_pipeline(experiment.id, run_name, pipeline_file, arguments)
return RunPipelineResult(self, run_info)

def schedule_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None,
namespace=None, cron_schedule=None, description=None, max_concurrency=10, no_catchup=None):
cron_schedule=None, description=None, max_concurrency=10, no_catchup=None):
"""Schedule pipeline on kubeflow to run based upon a cron job

Arguments:
Expand All @@ -474,7 +491,6 @@ def schedule_pipeline(self, experiment_id, job_name, pipeline_package_path=None,
pipeline_package_path {string} -- The path to the pipeline package (default: {None})
params {dict} -- The pipeline parameters (default: {{}})
pipeline_id {string} -- The id of the pipeline which should run on schedule (default: {None})
namespace {string} -- The name space with which the pipeline should run (default: {None})
max_concurrency {int} -- Max number of concurrent runs scheduled (default: {10})
no_catchup {boolean} -- Whether the recurring run should catch up if behind schedule.
For example, if the recurring run is paused for a while and re-enabled
Expand All @@ -492,20 +508,12 @@ def schedule_pipeline(self, experiment_id, job_name, pipeline_package_path=None,
api_params = [kfp_server_api.ApiParameter(
name=sanitize_k8s_name(name=k, allow_capital_underscore=True),
value=str(v)) for k,v in params.items()]
resource_references = []

key = kfp_server_api.models.ApiResourceKey(id=experiment_id,
type=kfp_server_api.models.ApiResourceType.EXPERIMENT)
reference = kfp_server_api.models.ApiResourceReference(key=key,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)
if namespace is not None:
key = kfp_server_api.models.ApiResourceKey(id=namespace,
type=kfp_server_api.models.ApiResourceType.NAMESPACE)
reference = kfp_server_api.models.ApiResourceReference(key=key,
name=namespace,
relationship=kfp_server_api.models.ApiRelationship.OWNER)
resource_references.append(reference)

spec = kfp_server_api.models.ApiPipelineSpec(
pipeline_id=pipeline_id,
workflow_manifest=pipeline_json_string,
Expand All @@ -518,7 +526,7 @@ def schedule_pipeline(self, experiment_id, job_name, pipeline_package_path=None,
name=job_name,
description=description,
pipeline_spec=spec,
resource_references=resource_references,
resource_references=[reference],
max_concurrency=max_concurrency,
no_catchup=no_catchup,
trigger=trigger,
Expand All @@ -528,20 +536,24 @@ def schedule_pipeline(self, experiment_id, job_name, pipeline_package_path=None,
response = self._job_api.create_job(body=schedule_body)


def list_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None):
def list_runs(self, page_token='', page_size=10, sort_by='', experiment_id=None, namespace=None):
"""List runs.
Args:
page_token: token for starting of the page.
page_size: size of the page.
sort_by: one of 'field_name', 'field_name des'. For example, 'name des'.
experiment_id: experiment id to filter upon
namespace: kubernetes namespace to filter upon.
For single user deployment, leave it as None;
For multi user, input a namespace where the user is authorized.
Returns:
A response object including a list of experiments and next page token.
"""
namespace = namespace or self.get_user_namespace()
if experiment_id is not None:
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.EXPERIMENT, resource_reference_key_id=experiment_id)
else:
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by)
response = self._run_api.list_runs(page_token=page_token, page_size=page_size, sort_by=sort_by, resource_reference_key_type=kfp_server_api.models.api_resource_type.ApiResourceType.NAMESPACE, resource_reference_key_id=namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When both namespace and experiment id is empty, does backend handle this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backend will error out:

if refKey == nil {
return nil, util.NewInvalidInputError("ListRuns must filter by resource reference in multi-user mode.")
}
if refKey.Type == common.Namespace {
namespace := refKey.ID
if len(namespace) == 0 {
return nil, util.NewInvalidInputError("Invalid resource references for ListRuns. Namespace is empty.")
}
err = isAuthorized(s.resourceManager, ctx, namespace)
if err != nil {
return nil, util.Wrap(err, "Failed to authorize with namespace resource reference.")
}
} else if refKey.Type == common.Experiment || refKey.Type == "ExperimentUUID" {
// "ExperimentUUID" was introduced for perf optimization. We accept both refKey.Type for backward-compatible reason.
experimentID := refKey.ID
if len(experimentID) == 0 {
return nil, util.NewInvalidInputError("Invalid resource references for run. Experiment ID is empty.")
}
err = CanAccessExperiment(s.resourceManager, ctx, experimentID)
if err != nil {
return nil, util.Wrap(err, "Failed to authorize with experiment resource reference.")
}
} else {
return nil, util.NewInvalidInputError("Invalid resource references for ListRuns. Got %+v", request.ResourceReferenceKey)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant when not in multi user mode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In single user mode, it will return all runs then. There will be no joining with resource_references table. We haven't changed any of the single user mode behavior

if filterContext.ReferenceKey != nil {

return response

def get_run(self, run_id):
Expand Down