Skip to content

Commit

Permalink
Pass meta to containerop and pipeline (#905)
Browse files Browse the repository at this point in the history
pass metadata from python conf to containerop and the pipeline
  • Loading branch information
gaoning777 authored Mar 6, 2019
1 parent ba64bd3 commit 974d602
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 67 deletions.
2 changes: 1 addition & 1 deletion sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def _compile(self, pipeline_func):
if pipeline_func not in registered_pipeline_functions:
raise ValueError('Please use a function with @dsl.pipeline decorator.')

pipeline_name, _ = dsl.Pipeline.get_pipeline_functions()[pipeline_func]
pipeline_name = dsl.Pipeline.get_pipeline_functions()[pipeline_func].name
pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_name)

# Create the arg list with no default values and call pipeline function.
Expand Down
17 changes: 15 additions & 2 deletions sdk/python/kfp/components/_dsl_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing import Mapping
from ._structures import ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_output_file_name, _default_component_name

from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta, _annotation_to_typemeta

def create_container_op_from_task(task_spec: TaskSpec):
argument_values = task_spec.arguments
Expand Down Expand Up @@ -125,12 +125,13 @@ def expand_argument_list(argument_list):
arguments=expanded_args,
output_paths=output_paths,
env=container_spec.env,
component_spec=component_spec,
)


_dummy_pipeline=None

def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None, env : Mapping[str, str]=None):
def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None, env : Mapping[str, str]=None, component_spec=None):
from .. import dsl
global _dummy_pipeline
need_dummy = dsl.Pipeline._default_pipeline is None
Expand All @@ -150,6 +151,16 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma

output_paths_for_container_op = {output_name_to_kubernetes[name]: path for name, path in output_paths.items()}

# Construct the ComponentMeta
component_meta = ComponentMeta(name=component_spec.name, description=component_spec.description)
# Inputs
if component_spec.inputs is not None:
for input in component_spec.inputs:
component_meta.inputs.append(ParameterMeta(name=input.name, description=input.description, param_type=_annotation_to_typemeta(input.type), default=input.default))
if component_spec.outputs is not None:
for output in component_spec.outputs:
component_meta.outputs.append(ParameterMeta(name=output.name, description=output.description, param_type=_annotation_to_typemeta(output.type)))

task = dsl.ContainerOp(
name=name,
image=container_image,
Expand All @@ -162,6 +173,8 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma
for name, value in env.items():
task.add_env_variable(k8s_client.V1EnvVar(name=name, value=value))

task._set_metadata(component_meta)

if need_dummy:
_dummy_pipeline.__exit__()

Expand Down
9 changes: 5 additions & 4 deletions sdk/python/kfp/dsl/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def component(func):
def foobar(model: TFModel(), step: MLStep()):
return dsl.ContainerOp()
"""
from functools import wraps
@wraps(func)
def _component(*args, **kargs):
import inspect
fullargspec = inspect.getfullargspec(func)
Expand All @@ -76,7 +78,7 @@ def _component(*args, **kargs):
# Inputs
for arg in fullargspec.args:
arg_type = TypeMeta()
arg_default = arg_defaults[arg] if arg in arg_defaults else ''
arg_default = arg_defaults[arg] if arg in arg_defaults else None
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
component_meta.inputs.append(ParameterMeta(name=arg, description='', param_type=arg_type, default=arg_default))
Expand All @@ -90,9 +92,8 @@ def _component(*args, **kargs):
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py

print(component_meta.serialize())
#TODO: parse the metadata to the ContainerOp.
container_op = func(*args, **kargs)
container_op._set_metadata(component_meta)
return container_op

return _component
return _component
11 changes: 11 additions & 0 deletions sdk/python/kfp/dsl/_container_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from . import _pipeline
from . import _pipeline_param
from ._pipeline_param import _extract_pipelineparams
from ._metadata import ComponentMeta
import re
from typing import Dict

Expand Down Expand Up @@ -63,6 +64,7 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
self.pod_annotations = {}
self.pod_labels = {}
self.num_retries = 0
self._metadata = None

self.argument_inputs = _extract_pipelineparams([str(arg) for arg in (command or []) + (arguments or [])])

Expand Down Expand Up @@ -295,3 +297,12 @@ def set_retry(self, num_retries: int):

def __repr__(self):
return str({self.__class__.__name__: self.__dict__})

def _set_metadata(self, metadata):
'''_set_metadata passes the containerop the metadata information
Args:
metadata (ComponentMeta): component metadata
'''
if not isinstance(metadata, ComponentMeta):
raise ValueError('_set_medata is expecting ComponentMeta.')
self._metadata = metadata
36 changes: 21 additions & 15 deletions sdk/python/kfp/dsl/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from typing import Dict, List
from abc import ABCMeta, abstractmethod
from ._types import BaseType, _check_valid_type_dict, _str_to_dict, _instance_to_dict
from ._types import BaseType, _check_valid_type_dict, _instance_to_dict

class BaseMeta(object):
__metaclass__ = ABCMeta
Expand All @@ -39,23 +39,29 @@ def __init__(self,
self.name = name
self.properties = {} if properties is None else properties

def to_dict(self):
return {self.name: self.properties}
def to_dict_or_str(self):
if self.properties is None or len(self.properties) == 0:
return self.name
else:
return {self.name: self.properties}

@staticmethod
def from_dict(json_dict):
if not _check_valid_type_dict(json_dict):
raise ValueError(json_dict + ' is not a valid type string')
def from_dict_or_str(json):
type_meta = TypeMeta()
type_meta.name, type_meta.properties = list(json_dict.items())[0]
if isinstance(json, Dict):
if not _check_valid_type_dict(json):
raise ValueError(json + ' is not a valid type string')
type_meta.name, type_meta.properties = list(json.items())[0]
elif isinstance(json, str):
type_meta.name = json
return type_meta

class ParameterMeta(BaseMeta):
def __init__(self,
name: str = '',
name: str,
description: str = '',
param_type: TypeMeta = None,
default = ''):
default = None):
self.name = name
self.description = description
self.param_type = TypeMeta() if param_type is None else param_type
Expand All @@ -64,13 +70,13 @@ def __init__(self,
def to_dict(self):
return {'name': self.name,
'description': self.description,
'type': self.param_type.to_dict(),
'type': self.param_type.to_dict_or_str(),
'default': self.default}

class ComponentMeta(BaseMeta):
def __init__(
self,
name: str = '',
name: str,
description: str = '',
inputs: List[ParameterMeta] = None,
outputs: List[ParameterMeta] = None
Expand All @@ -92,7 +98,7 @@ def to_dict(self):
class PipelineMeta(BaseMeta):
def __init__(
self,
name: str = '',
name: str,
description: str = '',
inputs: List[ParameterMeta] = None
):
Expand All @@ -114,13 +120,13 @@ def _annotation_to_typemeta(annotation):
TypeMeta
'''
if isinstance(annotation, BaseType):
arg_type = TypeMeta.from_dict(_instance_to_dict(annotation))
arg_type = TypeMeta.from_dict_or_str(_instance_to_dict(annotation))
elif isinstance(annotation, str):
arg_type = TypeMeta.from_dict(_str_to_dict(annotation))
arg_type = TypeMeta.from_dict_or_str(annotation)
elif isinstance(annotation, dict):
if not _check_valid_type_dict(annotation):
raise ValueError('Annotation ' + str(annotation) + ' is not a valid type dictionary.')
arg_type = TypeMeta.from_dict(annotation)
arg_type = TypeMeta.from_dict_or_str(annotation)
else:
return TypeMeta()
return arg_type
29 changes: 22 additions & 7 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,27 @@ def _pipeline(func):
args = fullargspec.args
annotations = fullargspec.annotations

# defaults
arg_defaults = {}
if fullargspec.defaults:
for arg, default in zip(reversed(fullargspec.args), reversed(fullargspec.defaults)):
arg_defaults[arg] = default

# Construct the PipelineMeta
pipeline_meta = PipelineMeta(name=func.__name__, description='')
pipeline_meta = PipelineMeta(name=name, description=description)
# Inputs
for arg in args:
arg_type = TypeMeta()
arg_default = arg_defaults[arg] if arg in arg_defaults else None
if arg in annotations:
arg_type = _annotation_to_typemeta(annotations[arg])
pipeline_meta.inputs.append(ParameterMeta(name=arg, description='', param_type=arg_type))
pipeline_meta.inputs.append(ParameterMeta(name=arg, description='', param_type=arg_type, default=arg_default))

#TODO: add descriptions to the metadata
#docstring parser:
# https://github.com/rr-/docstring_parser
# https://github.com/terrencepreilly/darglint/blob/master/darglint/parse.py
#TODO: parse the metadata to the Pipeline.

Pipeline.add_pipeline(name, description, func)
Pipeline.add_pipeline(pipeline_meta, func)
return func

return _pipeline
Expand Down Expand Up @@ -114,9 +119,9 @@ def get_pipeline_functions():
return Pipeline._pipeline_functions

@staticmethod
def add_pipeline(name, description, func):
def add_pipeline(pipeline_meta, func):
"""Add a pipeline function (decorated with @pipeline)."""
Pipeline._pipeline_functions[func] = (name, description)
Pipeline._pipeline_functions[func] = pipeline_meta

def __init__(self, name: str):
"""Create a new instance of Pipeline.
Expand All @@ -130,6 +135,7 @@ def __init__(self, name: str):
self.groups = [_ops_group.OpsGroup('pipeline', name=name)]
self.group_id = 0
self.conf = PipelineConf()
self._metadata = None

def __enter__(self):
if Pipeline._default_pipeline:
Expand Down Expand Up @@ -184,4 +190,13 @@ def get_next_group_id(self):
self.group_id += 1
return self.group_id

def _set_metadata(self, metadata):
'''_set_metadata passes the containerop the metadata information
Args:
metadata (ComponentMeta): component metadata
'''
if not isinstance(metadata, PipelineMeta):
raise ValueError('_set_medata is expecting PipelineMeta.')
self._metadata = metadata


13 changes: 3 additions & 10 deletions sdk/python/kfp/dsl/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ def check_types(checked_type, expected_type):
if isinstance(checked_type, BaseType):
checked_type = _instance_to_dict(checked_type)
elif isinstance(checked_type, str):
checked_type = _str_to_dict(checked_type)
checked_type = {checked_type: {}}
if isinstance(expected_type, BaseType):
expected_type = _instance_to_dict(expected_type)
elif isinstance(expected_type, str):
expected_type = _str_to_dict(expected_type)
expected_type = {expected_type: {}}
return _check_dict_types(checked_type, expected_type)

def _check_valid_type_dict(payload):
Expand Down Expand Up @@ -136,13 +136,6 @@ def _instance_to_dict(instance):
'''
return {type(instance).__name__: instance.__dict__}

def _str_to_dict(payload):
import json
json_dict = json.loads(payload)
if not _check_valid_type_dict(json_dict):
raise ValueError(payload + ' is not a valid type string')
return json_dict

def _check_dict_types(checked_type, expected_type):
'''_check_dict_types checks the type consistency.
Args:
Expand All @@ -163,4 +156,4 @@ def _check_dict_types(checked_type, expected_type):
str(checked_type[type_name][type_property]) + ' and ' +
str(expected_type[type_name][type_property]))
return False
return True
return True
25 changes: 20 additions & 5 deletions sdk/python/tests/dsl/component_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,30 @@


from kfp.dsl._component import component
from kfp.dsl._metadata import ComponentMeta, ParameterMeta, TypeMeta
from kfp.dsl._types import GCSPath, Integer
import unittest

@component
def componentA(a: {'Schema': {'file_type': 'csv'}}, b: '{"number": {"step": "large"}}' = 12, c: GCSPath(path_type='file', file_type='tsv') = 'gs://hello/world') -> {'model': Integer()}:
return 7
import mock

class TestPythonComponent(unittest.TestCase):

def test_component(self):
"""Test component decorator."""
componentA(1,2,3)

class MockContainerOp:
def _set_metadata(self, component_meta):
self._metadata = component_meta

@component
def componentA(a: {'Schema': {'file_type': 'csv'}}, b: {'number': {'step': 'large'}} = 12, c: GCSPath(path_type='file', file_type='tsv') = 'gs://hello/world') -> {'model': Integer()}:
return MockContainerOp()

containerOp = componentA(1,2,3)

golden_meta = ComponentMeta(name='componentA', description='')
golden_meta.inputs.append(ParameterMeta(name='a', description='', param_type=TypeMeta(name='Schema', properties={'file_type': 'csv'})))
golden_meta.inputs.append(ParameterMeta(name='b', description='', param_type=TypeMeta(name='number', properties={'step': 'large'}), default=12))
golden_meta.inputs.append(ParameterMeta(name='c', description='', param_type=TypeMeta(name='GCSPath', properties={'path_type':'file', 'file_type': 'tsv'}), default='gs://hello/world'))
golden_meta.outputs.append(ParameterMeta(name='model', description='', param_type=TypeMeta(name='Integer')))

self.assertEqual(containerOp._metadata, golden_meta)
Loading

0 comments on commit 974d602

Please sign in to comment.