diff --git a/sdk/python/kfp/_client.py b/sdk/python/kfp/_client.py index 8d54b6c966c..4ad9abbc386 100644 --- a/sdk/python/kfp/_client.py +++ b/sdk/python/kfp/_client.py @@ -25,6 +25,7 @@ from datetime import datetime from typing import Mapping, Callable +import kfp import kfp_server_api from kfp.compiler import compiler @@ -318,7 +319,7 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para IPython.display.display(IPython.display.HTML(html)) return response.run - def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None): + def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapping[str, str], run_name=None, experiment_name=None, pipeline_conf: kfp.dsl.PipelineConf = None): '''Runs pipeline on KFP-enabled Kubernetes cluster. This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution. @@ -333,7 +334,7 @@ def create_run_from_pipeline_func(self, pipeline_func: Callable, arguments: Mapp run_name = run_name or pipeline_name + ' ' + datetime.now().strftime('%Y-%m-%d %H-%M-%S') try: (_, pipeline_package_path) = tempfile.mkstemp(suffix='.zip') - compiler.Compiler().compile(pipeline_func, pipeline_package_path) + compiler.Compiler().compile(pipeline_func, pipeline_package_path, pipeline_conf=pipeline_conf) return self.create_run_from_pipeline_package(pipeline_package_path, arguments, run_name, experiment_name) finally: os.remove(pipeline_package_path) diff --git a/sdk/python/kfp/_runners.py b/sdk/python/kfp/_runners.py index 0666992a43a..70cc05d2785 100644 --- a/sdk/python/kfp/_runners.py +++ b/sdk/python/kfp/_runners.py @@ -20,9 +20,10 @@ from typing import Mapping, Callable from . import Client +from . import dsl -def run_pipeline_func_on_cluster(pipeline_func: Callable, arguments: Mapping[str, str], run_name : str = None, experiment_name : str = None, kfp_client : Client = None): +def run_pipeline_func_on_cluster(pipeline_func: Callable, arguments: Mapping[str, str], run_name : str = None, experiment_name : str = None, kfp_client : Client = None, pipeline_conf: dsl.PipelineConf = None): '''Runs pipeline on KFP-enabled Kubernetes cluster. This command compiles the pipeline function, creates or gets an experiment and submits the pipeline for execution. @@ -32,6 +33,7 @@ def run_pipeline_func_on_cluster(pipeline_func: Callable, arguments: Mapping[str run_name: Optional. Name of the run to be shown in the UI. experiment_name: Optional. Name of the experiment to add the run to. kfp_client: Optional. An instance of kfp.Client configured for the desired KFP cluster. + pipeline_conf: Optional. kfp.dsl.PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options.ta ''' kfp_client = kfp_client or Client() - return kfp_client.create_run_from_pipeline_func(pipeline_func, arguments, run_name, experiment_name) + return kfp_client.create_run_from_pipeline_func(pipeline_func, arguments, run_name, experiment_name, pipeline_conf) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index e2f56da78a2..036bd33b6b8 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -608,7 +608,7 @@ def _create_volumes(self, pipeline): volumes.sort(key=lambda x: x['name']) return volumes - def _create_pipeline_workflow(self, args, pipeline, op_transformers=None): + def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeline_conf=None): """Create workflow for the pipeline.""" # Input Parameters @@ -650,17 +650,17 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None): } } # set ttl after workflow finishes - if pipeline.conf.ttl_seconds_after_finished >= 0: - workflow['spec']['ttlSecondsAfterFinished'] = pipeline.conf.ttl_seconds_after_finished + if pipeline_conf.ttl_seconds_after_finished >= 0: + workflow['spec']['ttlSecondsAfterFinished'] = pipeline_conf.ttl_seconds_after_finished - if len(pipeline.conf.image_pull_secrets) > 0: + if len(pipeline_conf.image_pull_secrets) > 0: image_pull_secrets = [] - for image_pull_secret in pipeline.conf.image_pull_secrets: + for image_pull_secret in pipeline_conf.image_pull_secrets: image_pull_secrets.append(K8sHelper.convert_k8s_obj_to_json(image_pull_secret)) workflow['spec']['imagePullSecrets'] = image_pull_secrets - if pipeline.conf.timeout: - workflow['spec']['activeDeadlineSeconds'] = pipeline.conf.timeout + if pipeline_conf.timeout: + workflow['spec']['activeDeadlineSeconds'] = pipeline_conf.timeout if exit_handler: workflow['spec']['onExit'] = exit_handler.name @@ -688,13 +688,13 @@ def _validate_exit_handler_helper(group, exiting_op_names, handler_exists): return _validate_exit_handler_helper(pipeline.groups[0], [], False) - def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline): + def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline, pipeline_conf=None): """Sanitize operator/param names and inject pipeline artifact location.""" # Sanitize operator names and param names sanitized_ops = {} # pipeline level artifact location - artifact_location = pipeline.conf.artifact_location + artifact_location = pipeline_conf.artifact_location for op in pipeline.ops.values(): # inject pipeline level artifact location into if the op does not have @@ -732,7 +732,9 @@ def create_workflow(self, pipeline_func: Callable, pipeline_name: Text=None, pipeline_description: Text=None, - params_list: List[dsl.PipelineParam]=None) -> Dict[Text, Any]: + params_list: List[dsl.PipelineParam]=None, + pipeline_conf: dsl.PipelineConf = None, + ) -> Dict[Text, Any]: """ Create workflow spec from pipeline function and specified pipeline params/metadata. Currently, the pipeline params are either specified in the signature of the pipeline function or by passing a list of @@ -742,6 +744,7 @@ def create_workflow(self, :param pipeline_name: :param pipeline_description: :param params_list: list of pipeline params to append to the pipeline. + :param pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline. :return: workflow dict. """ params_list = params_list or [] @@ -778,8 +781,10 @@ def create_workflow(self, with dsl.Pipeline(pipeline_name) as dsl_pipeline: pipeline_func(*args_list) + pipeline_conf = pipeline_conf or dsl_pipeline.conf # Configuration passed to the compiler is overriding. Unfortunately, it's not trivial to detect whether the dsl_pipeline.conf was ever modified. + self._validate_exit_handler(dsl_pipeline) - self._sanitize_and_inject_artifact(dsl_pipeline) + self._sanitize_and_inject_artifact(dsl_pipeline, pipeline_conf) # Fill in the default values. args_list_with_defaults = [] @@ -802,12 +807,14 @@ def create_workflow(self, default=param.value) for param in params_list] op_transformers = [add_pod_env] - op_transformers.extend(dsl_pipeline.conf.op_transformers) + op_transformers.extend(pipeline_conf.op_transformers) workflow = self._create_pipeline_workflow( args_list_with_defaults, dsl_pipeline, - op_transformers) + op_transformers, + pipeline_conf, + ) from ._data_passing_rewriter import fix_big_data_passing workflow = fix_big_data_passing(workflow) @@ -817,23 +824,24 @@ def create_workflow(self, return workflow - def _compile(self, pipeline_func): + def _compile(self, pipeline_func, pipeline_conf: dsl.PipelineConf = None): """Compile the given pipeline function into workflow.""" - return self.create_workflow(pipeline_func=pipeline_func) + return self.create_workflow(pipeline_func=pipeline_func, pipeline_conf=pipeline_conf) - def compile(self, pipeline_func, package_path, type_check=True): + def compile(self, pipeline_func, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None): """Compile the given pipeline function into workflow yaml. Args: pipeline_func: pipeline functions with @dsl.pipeline decorator. package_path: the output workflow tar.gz file path. for example, "~/a.tar.gz" type_check: whether to enable the type check or not, default: False. + pipeline_conf: PipelineConf instance. Can specify op transforms, image pull secrets and other pipeline-level configuration options. Overrides any configuration that may be set by the pipeline. """ import kfp type_check_old_value = kfp.TYPE_CHECK try: kfp.TYPE_CHECK = type_check - workflow = self._compile(pipeline_func) + workflow = self._compile(pipeline_func, pipeline_conf) yaml.Dumper.ignore_aliases = lambda *args : True yaml_text = yaml.dump(workflow, default_flow_style=False) diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index fb12902583a..a6c4ea45606 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -14,7 +14,7 @@ from ._pipeline_param import PipelineParam, match_serialized_pipelineparam -from ._pipeline import Pipeline, pipeline, get_pipeline_conf +from ._pipeline import Pipeline, pipeline, get_pipeline_conf, PipelineConf from ._container_op import ContainerOp, InputArgumentPath, UserContainer, Sidecar from ._resource_op import ResourceOp from ._volume_op import (