Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK - Refactoring - Replaced the TypeMeta class #1930

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ._op_to_template import _op_to_template
from ._default_transformers import add_pod_env

from ..dsl._metadata import TypeMeta, _extract_pipeline_metadata
from ..dsl._metadata import _extract_pipeline_metadata
from ..dsl._ops_group import OpsGroup

class Compiler(object):
Expand Down Expand Up @@ -596,7 +596,7 @@ def _compile(self, pipeline_func):

args_list = []
for arg_name in argspec.args:
arg_type = TypeMeta()
arg_type = None
for input in pipeline_meta.inputs:
if arg_name == input.name:
arg_type = input.param_type
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ def create_task_from_component_and_arguments(pythonic_arguments):
if kfp.TYPE_CHECK:
for input_spec in component_spec.inputs:
if input_spec.name == key:
if arguments[key].param_type is not None and not check_types(arguments[key].param_type.to_dict_or_str(), '' if input_spec.type is None else input_spec.type):
raise InconsistentTypeException('Component "' + name + '" is expecting ' + key + ' to be type(' + str(input_spec.type) + '), but the passed argument is type(' + arguments[key].param_type.serialize() + ')')
if arguments[key].param_type is not None and not check_types(arguments[key].param_type, '' if input_spec.type is None else input_spec.type):
raise InconsistentTypeException('Component "' + name + '" is expecting ' + key + ' to be type(' + str(input_spec.type) + '), but the passed argument is type(' + str(arguments[key].param_type) + ')')
arguments[key] = str(arguments[key])

task = TaskSpec(
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/kfp/components/_dsl_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Mapping
from ._structures import ContainerImplementation, ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_output_file_name, _default_component_name
from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta
from kfp.dsl._metadata import ComponentMeta, ParameterMeta

def create_container_op_from_task(task_spec: TaskSpec):
argument_values = task_spec.arguments
Expand Down Expand Up @@ -143,10 +143,10 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma
# Inputs
if component_spec.inputs is not None:
for input in component_spec.inputs:
component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, param_type=_annotation_to_typemeta(input.type), default=input.default))
component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, param_type=input.type, default=input.default))
if component_spec.outputs is not None:
for output in component_spec.outputs:
component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, param_type=_annotation_to_typemeta(output.type)))
component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, param_type=output.type))

task = dsl.ContainerOp(
name=name,
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/kfp/dsl/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ def _component(*args, **kargs):
if kfp.TYPE_CHECK:
arg_index = 0
for arg in args:
if isinstance(arg, PipelineParam) and not check_types(arg.param_type.to_dict_or_str(), component_meta.inputs[arg_index].param_type.to_dict_or_str()):
if isinstance(arg, PipelineParam) and not check_types(arg.param_type, component_meta.inputs[arg_index].param_type):
raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + component_meta.inputs[arg_index].name +
' to be type(' + component_meta.inputs[arg_index].param_type.serialize() +
'), but the passed argument is type(' + arg.param_type.serialize() + ')')
' to be type(' + str(component_meta.inputs[arg_index].param_type) +
'), but the passed argument is type(' + str(arg.param_type) + ')')
arg_index += 1
if kargs is not None:
for key in kargs:
if isinstance(kargs[key], PipelineParam):
for input_spec in component_meta.inputs:
if input_spec.name == key and not check_types(kargs[key].param_type.to_dict_or_str(), input_spec.param_type.to_dict_or_str()):
if input_spec.name == key and not check_types(kargs[key].param_type, input_spec.param_type):
raise InconsistentTypeException('Component "' + component_meta.name + '" is expecting ' + input_spec.name +
' to be type(' + input_spec.param_type.serialize() +
'), but the passed argument is type(' + kargs[key].param_type.serialize() + ')')
' to be type(' + str(input_spec.param_type) +
'), but the passed argument is type(' + str(kargs[key].param_type) + ')')

container_op = func(*args, **kargs)
container_op._set_metadata(component_meta)
Expand Down
71 changes: 15 additions & 56 deletions sdk/python/kfp/dsl/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,64 +32,22 @@ def serialize(self):
def __eq__(self, other):
return self.__dict__ == other.__dict__

class TypeMeta(BaseMeta):
def __init__(self,
name: str = '',
properties: Dict = None):
self.name = name
self.properties = {} if properties is None else properties

def to_dict_or_str(self):
if self.properties is None or len(self.properties) == 0:
return self.name
else:
return {self.name: self.properties}

@staticmethod
def from_dict_or_str(payload):
'''from_dict_or_str accepts a payload object and returns a TypeMeta instance
Args:
payload (str/dict): the payload could be a str or a dict
'''

type_meta = TypeMeta()
if isinstance(payload, dict):
if not _check_valid_type_dict(payload):
raise ValueError(payload + ' is not a valid type string')
type_meta.name, type_meta.properties = list(payload.items())[0]
# Convert possible OrderedDict to dict
type_meta.properties = dict(type_meta.properties)
elif isinstance(payload, str):
type_meta.name = payload
else:
raise ValueError('from_dict_or_str is expecting either dict or str.')
return type_meta

def serialize(self):
return str(self.to_dict_or_str())

@staticmethod
def deserialize(payload):
'''deserialize expects two types of input: dict and str
1) If the payload is a string, the type is named as such with no properties.
2) If the payload is a dict, the type name and properties are extracted. '''
return TypeMeta.from_dict_or_str(payload)

class ParameterMeta(BaseMeta):
def __init__(self,
name: str,
description: str = '',
param_type: TypeMeta = None,
param_type = None,
default = None):
self.name = name
self.description = description
self.param_type = TypeMeta() if param_type is None else param_type
self.param_type = param_type
self.default = default

def to_dict(self):
return {'name': self.name,
'description': self.description,
'type': self.param_type.to_dict_or_str(),
'type': self.param_type or '',
'default': self.default}

class ComponentMeta(BaseMeta):
Expand Down Expand Up @@ -132,25 +90,25 @@ def to_dict(self):
}

def _annotation_to_typemeta(annotation):
'''_annotation_to_type_meta converts an annotation to an instance of TypeMeta
'''_annotation_to_type_meta converts an annotation to a type structure
Args:
annotation(BaseType/str/dict): input/output annotations
BaseType: registered in kfp.dsl.types
str: either a string of a dict serialization or a string of the type name
dict: type name and properties. note that the properties values can be dict.
Returns:
TypeMeta
dict or string representing the type
'''
if isinstance(annotation, BaseType):
arg_type = TypeMeta.deserialize(_instance_to_dict(annotation))
arg_type = _instance_to_dict(annotation)
elif isinstance(annotation, str):
arg_type = TypeMeta.deserialize(annotation)
arg_type = annotation
elif isinstance(annotation, dict):
if not _check_valid_type_dict(annotation):
raise ValueError('Annotation ' + str(annotation) + ' is not a valid type dictionary.')
arg_type = TypeMeta.deserialize(annotation)
arg_type = annotation
else:
return TypeMeta()
return None
return arg_type


Expand All @@ -174,7 +132,7 @@ def _extract_component_metadata(func):
# Inputs
inputs = []
for arg in fullargspec.args:
arg_type = TypeMeta()
arg_type = None
arg_default = arg_defaults[arg] if arg in arg_defaults else None
if isinstance(arg_default, PipelineParam):
arg_default = arg_default.value
Expand Down Expand Up @@ -227,19 +185,20 @@ def _extract_pipeline_metadata(func):
)
# Inputs
for arg in args:
arg_type = TypeMeta()
arg_type = None
arg_default = arg_defaults[arg] if arg in arg_defaults else None
if isinstance(arg_default, PipelineParam):
arg_default = arg_default.value
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
if 'openapi_schema_validator' in arg_type.properties and arg_default is not None:
arg_type_properties = list(arg_type.values())[0] if isinstance(arg_type, dict) else {}
if 'openapi_schema_validator' in arg_type_properties and arg_default is not None:
from jsonschema import validate
import json
schema_object = arg_type.properties['openapi_schema_validator']
schema_object = arg_type_properties['openapi_schema_validator']
if isinstance(schema_object, str):
# In case the property value for the schema validator is a string instead of a dict.
schema_object = json.loads(arg_type.properties['openapi_schema_validator'])
schema_object = json.loads(schema_object)
validate(instance=arg_default, schema=schema_object)
pipeline_meta.inputs.append(ParameterMeta(name=arg, description='', param_type=arg_type, default=arg_default))

Expand Down
7 changes: 3 additions & 4 deletions sdk/python/kfp/dsl/_pipeline_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

import re
from collections import namedtuple
from typing import List
from ._metadata import TypeMeta
from typing import Dict, List, Union


# TODO: Move this to a separate class
Expand Down Expand Up @@ -136,7 +135,7 @@ class PipelineParam(object):
value passed between components.
"""

def __init__(self, name: str, op_name: str=None, value: str=None, param_type: TypeMeta=TypeMeta(), pattern: str=None):
def __init__(self, name: str, op_name: str=None, value: str=None, param_type : Union[str, Dict] = None, pattern: str=None):
"""Create a new instance of PipelineParam.
Args:
name: name of the pipeline parameter.
Expand Down Expand Up @@ -218,6 +217,6 @@ def __hash__(self):

def ignore_type(self):
"""ignore_type ignores the type information such that type checking would also pass"""
self.param_type = TypeMeta()
self.param_type = None
return self

3 changes: 3 additions & 0 deletions sdk/python/kfp/dsl/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def _check_dict_types(checked_type, expected_type):
checked_type (dict): A dict that describes a type from the upstream component output
expected_type (dict): A dict that describes a type from the downstream component input
'''
if not checked_type or not expected_type:
# If the type is empty, it matches any types
return True
checked_type_name,_ = list(checked_type.items())[0]
expected_type_name,_ = list(expected_type.items())[0]
if checked_type_name == '' or expected_type_name == '':
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/tests/dsl/component_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import kfp
import kfp.dsl as dsl
from kfp.dsl import component, graph_component
from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
from kfp.dsl.types import Integer, GCSPath, InconsistentTypeException
from kfp.dsl import ContainerOp, Pipeline, PipelineParam
import unittest
Expand All @@ -36,10 +36,10 @@ def componentA(a: {'ArtifactA': {'file_type': 'csv'}}, b: Integer() = 12, c: {'A
containerOp = componentA(1,2,c=3)

golden_meta = ComponentMeta(name='componentA', description='')
golden_meta.inputs.append(ParameterMeta(name='a', description='', param_type=TypeMeta(name='ArtifactA', properties={'file_type': 'csv'})))
golden_meta.inputs.append(ParameterMeta(name='b', description='', param_type=TypeMeta(name='Integer', properties={'openapi_schema_validator': {"type": "integer"}}), default=12))
golden_meta.inputs.append(ParameterMeta(name='c', description='', param_type=TypeMeta(name='ArtifactB', properties={'path_type':'file', 'file_type': 'tsv'}), default='gs://hello/world'))
golden_meta.outputs.append(ParameterMeta(name='model', description='', param_type=TypeMeta(name='Integer', properties={'openapi_schema_validator': {"type": "integer"}})))
golden_meta.inputs.append(ParameterMeta(name='a', description='', param_type={'ArtifactA': {'file_type': 'csv'}}))
golden_meta.inputs.append(ParameterMeta(name='b', description='', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}, default=12))
golden_meta.inputs.append(ParameterMeta(name='c', description='', param_type={'ArtifactB': {'path_type':'file', 'file_type': 'tsv'}}, default='gs://hello/world'))
golden_meta.outputs.append(ParameterMeta(name='model', description='', param_type={'Integer': {'openapi_schema_validator': {"type": "integer"}}}))

self.assertEqual(containerOp._metadata, golden_meta)

Expand Down
59 changes: 13 additions & 46 deletions sdk/python/tests/dsl/metadata_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
import unittest

class TestTypeMeta(unittest.TestCase):
def test_deserialize(self):
component_dict = {
'GCSPath': {
'bucket_type': 'directory',
'file_type': 'csv'
}
}
golden_type_meta = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'csv'})
self.assertEqual(TypeMeta.deserialize(component_dict), golden_type_meta)

component_str = 'GCSPath'
golden_type_meta = TypeMeta(name='GCSPath')
self.assertEqual(TypeMeta.deserialize(component_str), golden_type_meta)


def test_eq(self):
type_a = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'csv'})
type_b = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'tsv'})
type_c = TypeMeta(name='GCSPatha', properties={'bucket_type': 'directory',
'file_type': 'csv'})
type_d = TypeMeta(name='GCSPath', properties={'bucket_type': 'directory',
'file_type': 'csv'})
self.assertNotEqual(type_a, type_b)
self.assertNotEqual(type_a, type_c)
self.assertEqual(type_a, type_d)


class TestComponentMeta(unittest.TestCase):

Expand All @@ -53,34 +23,31 @@ def test_to_dict(self):
description='foobar example',
inputs=[ParameterMeta(name='input1',
description='input1 desc',
param_type=TypeMeta(name='GCSPath',
properties={'bucket_type': 'directory',
'file_type': 'csv'
}
),
param_type={'GCSPath': {
'bucket_type': 'directory',
'file_type': 'csv'
}},
default='default1'
),
ParameterMeta(name='input2',
description='input2 desc',
param_type=TypeMeta(name='TFModel',
properties={'input_data': 'tensor',
'version': '1.8.0'
}
),
param_type={'TFModel': {
'input_data': 'tensor',
'version': '1.8.0'
}},
default='default2'
),
ParameterMeta(name='input3',
description='input3 desc',
param_type=TypeMeta(name='Integer'),
param_type='Integer',
default='default3'
),
],
outputs=[ParameterMeta(name='output1',
description='output1 desc',
param_type=TypeMeta(name='Schema',
properties={'file_type': 'tsv'
}
),
param_type={'Schema': {
'file_type': 'tsv'
}},
default='default_output1'
)
]
Expand Down
Loading