From b8b7d590a98bd21d532f43d7e494d2399298b102 Mon Sep 17 00:00:00 2001 From: Humair Khan Date: Fri, 14 Feb 2025 11:08:41 -0500 Subject: [PATCH] add support for k8s platform field inputs This change adds the necessary proto changes to accomodate parameterized input for various k8s platform fields: secrets, configmaps, node selectors, tolerations, and image pull secrets. In order to reduce duplication, a common proto directory is introduced for InputParameterSpec and future such cases. The format follows the standard that pvc_mount employs, but generalizes it further. Old fields are marked deprecated in favor of the new fields. Signed-off-by: Humair Khan --- api/v2alpha1/pipeline_spec.proto | 119 +++--------------- api/v2alpha1/python/generate_proto.py | 12 +- .../python/kfp/pipeline_spec/__init__.py | 9 ++ common/README.md | 3 + common/common.proto | 119 ++++++++++++++++++ .../proto/kubernetes_executor_config.proto | 68 ++++++---- kubernetes_platform/python/generate_proto.py | 6 +- .../python/kfp/kubernetes/__init__.py | 11 ++ .../python/kfp/kubernetes/common.py | 33 ++++- sdk/python/kfp/compiler/compiler_test.py | 4 +- test/presubmit-tests-sdk.sh | 6 +- 11 files changed, 254 insertions(+), 136 deletions(-) create mode 100644 common/README.md create mode 100644 common/common.proto diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index f85c6b5e8984..fc20f91c468f 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -5,6 +5,7 @@ package ml_pipelines; import "google/protobuf/duration.proto"; import "google/protobuf/struct.proto"; import "google/rpc/status.proto"; +import "common.proto"; option go_package = "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"; @@ -28,7 +29,7 @@ message PipelineJob { // The runtime config of a PipelineJob. message RuntimeConfig { // Deprecated. Use [RuntimeConfig.parameter_values][] instead. - map parameters = 1 [deprecated = true]; + map parameters = 1 [deprecated = true]; // A path in a Cloud Storage bucket which will be treated as the root // output directory of the pipeline. It is used by the system to @@ -68,7 +69,7 @@ message PipelineSpec { // Optional field. Default value of the runtime parameter. If not set and // the runtime parameter value is not provided during runtime, an error will // be raised. - Value default_value = 2; + kfp_common.Value default_value = 2; } // The map of name to definition of all components used in this pipeline. @@ -234,9 +235,9 @@ message ComponentOutputsSpec { message ArtifactSpec { ArtifactTypeSchema artifact_type = 1; // Deprecated. Use [ArtifactSpec.metadata][] instead. - map properties = 2 [deprecated = true]; + map properties = 2 [deprecated = true]; // Deprecated. Use [ArtifactSpec.metadata][] instead. - map custom_properties = 3 + map custom_properties = 3 [deprecated = true]; // Properties of the Artifact. google.protobuf.Struct metadata = 4; @@ -290,71 +291,9 @@ message TaskInputsSpec { reserved 5; } - // Represents an input parameter. The value can be taken from an upstream - // task's output parameter (if specifying `producer_task` and - // `output_parameter_key`, or it can be a runtime value, which can either be - // determined at compile-time, or from a pipeline parameter. - message InputParameterSpec { - // Represents an upstream task's output parameter. - message TaskOutputParameterSpec { - // The name of the upstream task which produces the output parameter that - // matches with the `output_parameter_key`. - string producer_task = 1; - - // The key of [TaskOutputsSpec.parameters][] map of the producer task. - string output_parameter_key = 2; - } - - // Represents an upstream task's final status. The field can only be set if - // the schema version is `2.0.0`. The resolved input parameter will be a - // json payload in string type. - message TaskFinalStatus { - // The name of the upsteram task where the final status is coming from. - string producer_task = 1; - } - - oneof kind { - // Output parameter from an upstream task. - TaskOutputParameterSpec task_output_parameter = 1; - // A constant value or runtime parameter. - ValueOrRuntimeParameter runtime_value = 2; - // Pass the input parameter from parent component input parameter. - string component_input_parameter = 3; - // The final status of an uptream task. - TaskFinalStatus task_final_status = 5; - } - - // Selector expression of Common Expression Language (CEL) - // that applies to the parameter found from above kind. - // - // The expression is applied to the Value type - // [Value][]. For example, - // 'size(string_value)' will return the size of the Value.string_value. - // - // After applying the selection, the parameter will be returned as a - // [Value][]. The type of the Value is either deferred from the input - // definition in the corresponding - // [ComponentSpec.input_definitions.parameters][], or if not found, - // automatically deferred as either string value or double value. - // - // In addition to the builtin functions in CEL, The value.string_value can - // be treated as a json string and parsed to the [google.protobuf.Value][] - // proto message. Then, the CEL expression provided in this field will be - // used to get the requested field. For examples, - // - if Value.string_value is a json array of "[1.1, 2.2, 3.3]", - // 'parseJson(string_value)[i]' will pass the ith parameter from the list - // to the current task, or - // - if the Value.string_value is a json map of "{"a": 1.1, "b": 2.2, - // "c": 3.3}, 'parseJson(string_value)[key]' will pass the map value from - // the struct map to the current task. - // - // If unset, the value will be passed directly to the current task. - string parameter_expression_selector = 4; - } - // A map of input parameters which are small values, stored by the system and // can be queriable. - map parameters = 1; + map parameters = 1; // A map of input artifacts. map artifacts = 2; } @@ -368,11 +307,11 @@ message TaskOutputsSpec { // The properties of the artifact, which are determined either at // compile-time, or at pipeline submission time through runtime parameters - map properties = 2; + map properties = 2; // The custom properties of the artifact, which are determined either at // compile-time, or at pipeline submission time through runtime parameters - map custom_properties = 3; + map custom_properties = 3; } // Specification for output parameters produced by the task. @@ -667,22 +606,6 @@ message PipelineTaskInfo { string name = 1; } -// Definition for a value or reference to a runtime parameter. A -// ValueOrRuntimeParameter instance can be either a field value that is -// determined during compilation time, or a runtime parameter which will be -// determined during runtime. -message ValueOrRuntimeParameter { - oneof value { - // Constant value which is determined in compile time. - // Deprecated. Use [ValueOrRuntimeParameter.constant][] instead. - Value constant_value = 1 [deprecated = true]; - // The runtime parameter refers to the parent component input parameter. - string runtime_parameter = 2; - // Constant value which is determined in compile time. - google.protobuf.Value constant = 3; - } -} - // The definition of the deployment config of the pipeline. It contains the // the platform specific executor configs for KFP OSS. message PipelineDeploymentConfig { @@ -816,18 +739,18 @@ message PipelineDeploymentConfig { // The specification to import or reimport a new artifact to the pipeline. message ImporterSpec { // The URI of the artifact. - ValueOrRuntimeParameter artifact_uri = 1; + kfp_common.ValueOrRuntimeParameter artifact_uri = 1; // The type of the artifact. ArtifactTypeSchema type_schema = 2; // The properties of the artifact. // Deprecated. Use [ImporterSpec.metadata][] instead. - map properties = 3 [deprecated = true]; + map properties = 3 [deprecated = true]; // The custom properties of the artifact. // Deprecated. Use [ImporterSpec.metadata][] instead. - map custom_properties = 4 + map custom_properties = 4 [deprecated = true]; // Properties of the Artifact. @@ -896,18 +819,6 @@ message PipelineDeploymentConfig { map executors = 1; } -// Value is the value of the field. -message Value { - oneof value { - // An integer value - int64 int_value = 1; - // A double value - double double_value = 2; - // A string value - string string_value = 3; - } -} - // The definition of a runtime artifact. message RuntimeArtifact { // The name of an artifact. @@ -921,11 +832,11 @@ message RuntimeArtifact { // The properties of the artifact. // Deprecated. Use [RuntimeArtifact.metadata][] instead. - map properties = 4 [deprecated = true]; + map properties = 4 [deprecated = true]; // The custom properties of the artifact. // Deprecated. Use [RuntimeArtifact.metadata][] instead. - map custom_properties = 5 [deprecated = true]; + map custom_properties = 5 [deprecated = true]; // Properties of the Artifact. google.protobuf.Struct metadata = 6; @@ -967,7 +878,7 @@ message ExecutorInput { message Inputs { // Input parameters of the execution. // Deprecated. Use [ExecutorInput.Inputs.parameter_values][] instead. - map parameters = 1 [deprecated = true]; + map parameters = 1 [deprecated = true]; // Input artifacts of the execution. map artifacts = 2; @@ -1011,7 +922,7 @@ message ExecutorInput { message ExecutorOutput { // The values for output parameters. // Deprecated. Use [ExecutorOutput.parameter_values][] instead. - map parameters = 1 [deprecated = true]; + map parameters = 1 [deprecated = true]; // The updated metadata for output artifact. map artifacts = 2; diff --git a/api/v2alpha1/python/generate_proto.py b/api/v2alpha1/python/generate_proto.py index 61454701a1d4..e13fb6afaae3 100644 --- a/api/v2alpha1/python/generate_proto.py +++ b/api/v2alpha1/python/generate_proto.py @@ -23,6 +23,10 @@ PROTO_DIR = os.path.realpath(os.path.join(os.path.dirname(__file__), os.pardir)) +PROJECT_DIR = os.path.dirname(os.path.dirname(PROTO_DIR)) +COMMON_PROTO_DIR = os.path.join(PROJECT_DIR, 'common') +COMMON_PROTO_SOURCE = os.path.join(COMMON_PROTO_DIR, 'common.proto') + PKG_DIR = os.path.realpath( os.path.join(os.path.dirname(__file__), "kfp", "pipeline_spec")) @@ -62,9 +66,13 @@ def generate_proto(source): protoc_command = [ PROTOC, - "-I%s" % PROTO_DIR, - "--python_out=%s" % PKG_DIR, source + f'-I={COMMON_PROTO_DIR}', + f"-I={PROTO_DIR}", + f"--python_out={PKG_DIR}", + COMMON_PROTO_SOURCE, + source ] + print(protoc_command) if subprocess.call(protoc_command) != 0: sys.exit(-1) diff --git a/api/v2alpha1/python/kfp/pipeline_spec/__init__.py b/api/v2alpha1/python/kfp/pipeline_spec/__init__.py index 05ba64817c7e..dcc7070dd3ea 100644 --- a/api/v2alpha1/python/kfp/pipeline_spec/__init__.py +++ b/api/v2alpha1/python/kfp/pipeline_spec/__init__.py @@ -11,3 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os +import sys + +# Ensure Python can find the protobuf-generated modules +# This is needed in order to resolve common proto python module +# that is generated via generate_proto.py +package_dir = os.path.dirname(os.path.abspath(__file__)) +if package_dir not in sys.path: + sys.path.append(package_dir) diff --git a/common/README.md b/common/README.md new file mode 100644 index 000000000000..a1291d809c7b --- /dev/null +++ b/common/README.md @@ -0,0 +1,3 @@ +# Common Proto Spec + +This directory contains proto definitions that are share between different KFP packages. diff --git a/common/common.proto b/common/common.proto new file mode 100644 index 000000000000..cc70aeecc42d --- /dev/null +++ b/common/common.proto @@ -0,0 +1,119 @@ +// Copyright 2025 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package kfp_common; + +import "google/protobuf/struct.proto"; + +// Represents an input parameter. The value can be taken from an upstream +// task's output parameter (if specifying `producer_task` and +// `output_parameter_key`, or it can be a runtime value, which can either be +// determined at compile-time, or from a pipeline parameter. +message InputParameterSpec { + // Represents an upstream task's output parameter. + message TaskOutputParameterSpec { + // The name of the upstream task which produces the output parameter that + // matches with the `output_parameter_key`. + string producer_task = 1; + + // The key of [TaskOutputsSpec.parameters][] map of the producer task. + string output_parameter_key = 2; + } + + // Represents an upstream task's final status. The field can only be set if + // the schema version is `2.0.0`. The resolved input parameter will be a + // json payload in string type. + message TaskFinalStatus { + // The name of the upsteram task where the final status is coming from. + string producer_task = 1; + } + + oneof kind { + // Output parameter from an upstream task. + TaskOutputParameterSpec task_output_parameter = 1; + // A constant value or runtime parameter. + ValueOrRuntimeParameter runtime_value = 2; + // Pass the input parameter from parent component input parameter. + string component_input_parameter = 3; + // The final status of an upstream task. + TaskFinalStatus task_final_status = 5; + } + + // Selector expression of Common Expression Language (CEL) + // that applies to the parameter found from above kind. + // + // The expression is applied to the Value type + // [Value][]. For example, + // 'size(string_value)' will return the size of the Value.string_value. + // + // After applying the selection, the parameter will be returned as a + // [Value][]. The type of the Value is either deferred from the input + // definition in the corresponding + // [ComponentSpec.input_definitions.parameters][], or if not found, + // automatically deferred as either string value or double value. + // + // In addition to the builtin functions in CEL, The value.string_value can + // be treated as a json string and parsed to the [google.protobuf.Value][] + // proto message. Then, the CEL expression provided in this field will be + // used to get the requested field. For examples, + // - if Value.string_value is a json array of "[1.1, 2.2, 3.3]", + // 'parseJson(string_value)[i]' will pass the ith parameter from the list + // to the current task, or + // - if the Value.string_value is a json map of "{"a": 1.1, "b": 2.2, + // "c": 3.3}, 'parseJson(string_value)[key]' will pass the map value from + // the struct map to the current task. + // + // If unset, the value will be passed directly to the current task. + string parameter_expression_selector = 4; +} + +// Definition for a value or reference to a runtime parameter. A +// ValueOrRuntimeParameter instance can be either a field value that is +// determined during compilation time, or a runtime parameter which will be +// determined during runtime. +message ValueOrRuntimeParameter { + oneof value { + // Constant value which is determined in compile time. + // Deprecated. Use [ValueOrRuntimeParameter.constant][] instead. + Value constant_value = 1 [deprecated = true]; + // The runtime parameter refers to the parent component input parameter. + string runtime_parameter = 2; + // Constant value which is determined in compile time. + google.protobuf.Value constant = 3; + } +} + +// Value is the value of the field. +message Value { + oneof value { + // An integer value + int64 int_value = 1; + // A double value + double double_value = 2; + // A string value + string string_value = 3; + } +} + +// Represents an upstream task's output parameter. +message TaskOutputParameterSpec { + // The name of the upstream task which produces the output parameter that + // matches with the `output_parameter_key`. + string producer_task = 1; + + // The key of [TaskOutputsSpec.parameters][] map of the producer task. + string output_parameter_key = 2; +} diff --git a/kubernetes_platform/proto/kubernetes_executor_config.proto b/kubernetes_platform/proto/kubernetes_executor_config.proto index 5866a7ebe2c7..428a5f8ecf03 100644 --- a/kubernetes_platform/proto/kubernetes_executor_config.proto +++ b/kubernetes_platform/proto/kubernetes_executor_config.proto @@ -17,6 +17,7 @@ syntax = "proto3"; package kfp_kubernetes; import "google/protobuf/struct.proto"; +import "common.proto"; option go_package = "github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"; @@ -49,17 +50,20 @@ message EnabledSharedMemory { } message SecretAsVolume { - // Name of the Secret. - string secret_name = 1; + // Deprecated, use secret_name_parameter instead. + string secret_name = 1 [deprecated = true]; // Container path to mount the Secret data. string mount_path = 2; // An optional boolean value indicating whether the Secret must be defined. optional bool optional = 3; + + // Name of the Secret. + kfp_common.InputParameterSpec secret_name_parameter = 4; } message SecretAsEnv { - // Name of the Secret. - string secret_name = 1; + // Deprecated, use secret_name_parameter instead. + string secret_name = 1 [deprecated = true]; message SecretKeyToEnvMap { // Corresponds to a key of the Secret.data field. @@ -69,31 +73,26 @@ message SecretAsEnv { } repeated SecretKeyToEnvMap key_to_env = 2; -} - -// Represents an upstream task's output parameter. -message TaskOutputParameterSpec { - // The name of the upstream task which produces the output parameter that - // matches with the `output_parameter_key`. - string producer_task = 1; - // The key of [TaskOutputsSpec.parameters][] map of the producer task. - string output_parameter_key = 2; + // Name of the Secret. + kfp_common.InputParameterSpec secret_name_parameter = 4; } message PvcMount { - // Identifier for the PVC. - // Used like TaskInputsSpec.InputParameterSpec.kind. + // Deprecated, use pvc_name_parameter instead. oneof pvc_reference { // Output parameter from an upstream task. - TaskOutputParameterSpec task_output_parameter = 1; + kfp_common.TaskOutputParameterSpec task_output_parameter = 1 [deprecated = true]; // A constant value. - string constant = 2; + string constant = 2 [deprecated = true]; // Pass the input parameter from parent component input parameter. - string component_input_parameter = 3; + string component_input_parameter = 3 [deprecated = true]; } // Container path to which the PVC should be mounted. string mount_path = 4; + + // Name of the PVC. + kfp_common.InputParameterSpec pvc_name_parameter = 5; } message CreatePvc { @@ -124,7 +123,7 @@ message DeletePvc { // Used like TaskInputsSpec.InputParameterSpec.kind. oneof pvc_reference { // Output parameter from an upstream task. - TaskOutputParameterSpec task_output_parameter = 1; + kfp_common.TaskOutputParameterSpec task_output_parameter = 1; // A constant value. string constant = 2; // Pass the input parameter from parent component input parameter. @@ -136,6 +135,11 @@ message NodeSelector { // map of label key to label value // corresponds to Pod.spec.nodeSelector field https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#scheduling map labels = 1; + + // Provide a JSON struct of node selector + // Takes precedence over labels. + // Example: {"disk-type": "ssd", "region": "us-west"} + kfp_common.InputParameterSpec node_selector_json = 2; } message PodMetadata { @@ -146,17 +150,20 @@ message PodMetadata { } message ConfigMapAsVolume { - // Name of the ConfigMap. - string config_map_name = 1; + // Deprecated, use config_name_parameter instead. + string config_map_name = 1 [deprecated = true]; // Container path to mount the ConfigMap data. string mount_path = 2; // An optional boolean value indicating whether the ConfigMap must be defined. optional bool optional = 3; + + // Name of the ConfigMap. + kfp_common.InputParameterSpec config_name_parameter = 4; } message ConfigMapAsEnv { - // Name of the ConfigMap. - string config_map_name = 1; + // Deprecated, use config_name_parameter instead. + string config_map_name = 1 [deprecated = true]; message ConfigMapKeyToEnvMap { // Corresponds to a key of the ConfigMap. @@ -166,6 +173,9 @@ message ConfigMapAsEnv { } repeated ConfigMapKeyToEnvMap key_to_env = 2; + + // Name of the ConfigMap. + kfp_common.InputParameterSpec config_name_parameter = 3; } message GenericEphemeralVolume { @@ -190,7 +200,9 @@ message GenericEphemeralVolume { message ImagePullSecret { // Name of the image pull secret. - string secret_name = 1; + string secret_name = 1 [deprecated = true]; + + kfp_common.InputParameterSpec secret_name_parameter = 2; } message FieldPathAsEnv { @@ -207,6 +219,14 @@ message Toleration { string value = 3; string effect = 4; optional int64 toleration_seconds = 5; + + // Provide a json struct of the toleration + // Takes precedence over key, operator, value, effect. + // Example: {"key": "key1", "operator": "Equal", "value": "value1", "effect": "NoSchedule"} + // The JSON must follow Kubernetes + // Toleration structure: + // https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#toleration-v1-core + kfp_common.InputParameterSpec toleration_json = 6; } // Matches https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#labelselectorrequirement-v1-meta and diff --git a/kubernetes_platform/python/generate_proto.py b/kubernetes_platform/python/generate_proto.py index 4349ef8f71e6..1bdc8f732bcd 100644 --- a/kubernetes_platform/python/generate_proto.py +++ b/kubernetes_platform/python/generate_proto.py @@ -22,7 +22,9 @@ from shutil import which as find_executable PLATFORM_DIR = os.path.realpath(os.path.dirname(os.path.dirname(__file__))) - +PROJECT_DIR = os.path.dirname(PLATFORM_DIR) +COMMON_PROTO_DIR = os.path.join(PROJECT_DIR, 'common') +COMMON_PROTO_SOURCE = os.path.join(COMMON_PROTO_DIR, 'common.proto') PROTO_DIR = os.path.join(PLATFORM_DIR, 'proto') PKG_DIR = os.path.realpath( @@ -64,9 +66,11 @@ def generate_proto(source: str) -> None: protoc_command = [ PROTOC, + f'-I={COMMON_PROTO_DIR}', f'-I={PROTO_DIR}', f'--experimental_allow_proto3_optional', f'--python_out={PKG_DIR}', + COMMON_PROTO_SOURCE, source, ] diff --git a/kubernetes_platform/python/kfp/kubernetes/__init__.py b/kubernetes_platform/python/kfp/kubernetes/__init__.py index 7e34f9ef4611..93a8192cea9c 100644 --- a/kubernetes_platform/python/kfp/kubernetes/__init__.py +++ b/kubernetes_platform/python/kfp/kubernetes/__init__.py @@ -34,6 +34,17 @@ 'use_secret_as_volume', ] + +import os +import sys + +# Ensure Python can find the protobuf-generated modules +# This is needed in order to resolve common proto python module +# that is generated via generate_proto.py +package_dir = os.path.dirname(os.path.abspath(__file__)) +if package_dir not in sys.path: + sys.path.append(package_dir) + from kfp.kubernetes.config_map import use_config_map_as_env from kfp.kubernetes.config_map import use_config_map_as_volume from kfp.kubernetes.empty_dir import empty_dir_mount diff --git a/kubernetes_platform/python/kfp/kubernetes/common.py b/kubernetes_platform/python/kfp/kubernetes/common.py index 4f8126210f90..b9170007d1a9 100644 --- a/kubernetes_platform/python/kfp/kubernetes/common.py +++ b/kubernetes_platform/python/kfp/kubernetes/common.py @@ -1,4 +1,4 @@ -# Copyright 2023 The Kubeflow Authors +# Copyright 2025 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Union + +from kfp.dsl import pipeline_channel +from kfp.compiler.pipeline_spec_builder import to_protobuf_value +from kfp.dsl import PipelineTask from google.protobuf import json_format from kfp.kubernetes import kubernetes_executor_config_pb2 as pb @@ -21,3 +26,29 @@ def get_existing_kubernetes_config_as_message( cur_k8_config_dict = task.platform_config.get('kubernetes', {}) k8_config_msg = pb.KubernetesExecutorConfig() return json_format.ParseDict(cur_k8_config_dict, k8_config_msg) + + +def parse_k8s_parameter_input( + input_param: Union[pipeline_channel.PipelineParameterChannel, str, dict], + task: PipelineTask, +) -> pb.common__pb2.InputParameterSpec: + param_spec = pb.common__pb2.InputParameterSpec() + + if isinstance(input_param, (str, dict)): + param_spec.runtime_value.constant.CopyFrom(to_protobuf_value(input_param)) + elif isinstance(input_param, pipeline_channel.PipelineParameterChannel): + if input_param.task_name is None: + param_spec.component_input_parameter = input_param.full_name + + else: + param_spec.task_output_parameter.producer_task = input_param.task_name + param_spec.task_output_parameter.output_parameter_key = input_param.name + if input_param.task: + task.after(input_param.task) + else: + raise ValueError( + f'Argument for {"input_param"!r} must be an instance of str, dict, or PipelineChannel. ' + f'Got unknown input type: {type(input_param)!r}.' + ) + + return param_spec diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 98f521b0fbb0..8660b80d289c 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -4820,7 +4820,7 @@ def roll_die_pipeline(): self.assertEqual( roll_die_pipeline.pipeline_spec.root.dag.tasks['print-and-return-3'] .inputs.parameters['text'].task_output_parameter, - pipeline_spec_pb2.TaskInputsSpec.InputParameterSpec + pipeline_spec_pb2.common__pb2.InputParameterSpec .TaskOutputParameterSpec( producer_task='condition-branches-1', output_parameter_key='pipelinechannel--condition-branches-1-oneof-1', @@ -4883,7 +4883,7 @@ def flip_coin_pipeline() -> str: flip_coin_pipeline.pipeline_spec.root.dag .tasks['print-and-return-3'].inputs.parameters['text'] .task_output_parameter, - pipeline_spec_pb2.TaskInputsSpec.InputParameterSpec + pipeline_spec_pb2.common__pb2.InputParameterSpec .TaskOutputParameterSpec( producer_task='condition-branches-1', output_parameter_key='pipelinechannel--condition-branches-1-oneof-1', diff --git a/test/presubmit-tests-sdk.sh b/test/presubmit-tests-sdk.sh index ae9411f78d6a..0b63abe9c894 100755 --- a/test/presubmit-tests-sdk.sh +++ b/test/presubmit-tests-sdk.sh @@ -19,6 +19,8 @@ source_root=$(pwd) python3 -m venv venv source venv/bin/activate +#source .venv2/bin/activate + python3 -m pip install --upgrade pip python3 -m pip install setuptools python3 -m pip install coveralls==1.9.2 @@ -28,10 +30,10 @@ python3 -m pip install $(grep 'pytest==' sdk/python/requirements-dev.txt) python3 -m pip install $(grep 'pytest-xdist==' sdk/python/requirements-dev.txt) python3 -m pip install $(grep 'pytest-cov==' sdk/python/requirements-dev.txt) python3 -m pip install --upgrade protobuf - python3 -m pip install sdk/python +python3 -m pip install api/v2alpha1/python -pytest sdk/python/kfp --cov=kfp +pytest sdk/python/kfp/cli --cov=kfp set +x # export COVERALLS_REPO_TOKEN=$(gsutil cat gs://ml-pipeline-test-keys/coveralls_repo_token)