Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK/Components - Reworked the component model structures. #642

6 changes: 3 additions & 3 deletions sdk/python/kfp/compiler/_component_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def _generate_pythonop(component_func, target_image, target_component_file=None)
The returned value is in fact a function, which should generates a container_op instance. """

from ..components._python_op import _python_function_name_to_component_name
from ..components._structures import InputSpec, OutputSpec, ImplementationSpec, ContainerSpec, ComponentSpec
from ..components._structures import InputSpec, InputValuePlaceholder, OutputPathPlaceholder, OutputSpec, ContainerImplementation, ContainerSpec, ComponentSpec


#Component name and description are derived from the function's name and docstribng, but can be overridden by @python_component function decorator
Expand All @@ -428,11 +428,11 @@ def _generate_pythonop(component_func, target_image, target_component_file=None)
description=component_description,
inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Chnage type to actual type
outputs=[OutputSpec(name=output_name)],
implementation=ImplementationSpec(
implementation=ContainerImplementation(
container=ContainerSpec(
image=target_image,
#command=['python3', program_file], #TODO: Include the command line
args=[{'value': input_name} for input_name in input_names] + [{'output': output_name}],
args=[InputValuePlaceholder(input_name) for input_name in input_names] + [OutputPathPlaceholder(output_name)],
)
)
)
Expand Down
142 changes: 64 additions & 78 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from collections import OrderedDict
from ._yaml_utils import load_yaml
from ._structures import ComponentSpec
from ._structures import *


_default_component_name = 'Component'
Expand Down Expand Up @@ -238,86 +239,71 @@ def expand_command_part(arg): #input values with original names
return None
if isinstance(arg, (str, int, float, bool)):
return str(arg)
elif isinstance(arg, dict):
if len(arg) != 1:
raise ValueError('Failed to parse argument dict: "{}"'.format(arg))
(func_name, func_argument) = list(arg.items())[0]
func_name=func_name.lower()

if func_name == 'value':
assert isinstance(func_argument, str)
port_name = func_argument
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return str(input_value)
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(port_name))

elif func_name == 'file':
assert isinstance(func_argument, str)
port_name = func_argument
input_filename = _generate_input_file_name(port_name)
input_key = input_name_to_kubernetes[port_name]
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return input_filename
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
raise ValueError('No value provided for input {}'.format(port_name))

elif func_name == 'output':
assert isinstance(func_argument, str)
port_name = func_argument
output_filename = _generate_output_file_name(port_name)
output_key = output_name_to_kubernetes[port_name]
if output_key in file_outputs:
if file_outputs[output_key] != output_filename:
raise ValueError('Conflicting output files specified for port {}: {} and {}'.format(port_name, file_outputs[output_key], output_filename))

if isinstance(arg, InputValuePlaceholder):
port_name = arg.input_name
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return str(input_value)
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
file_outputs[output_key] = output_filename

return output_filename

elif func_name == 'concat':
assert isinstance(func_argument, list)
items_to_concatenate = func_argument
expanded_argument_strings = expand_argument_list(items_to_concatenate)
return ''.join(expanded_argument_strings)

elif func_name == 'if':
assert isinstance(func_argument, dict)
condition_node = func_argument['cond']
then_node = func_argument['then']
else_node = func_argument.get('else', None)
condition_result = expand_command_part(condition_node)
from distutils.util import strtobool
condition_result_bool = condition_result and strtobool(condition_result) #Python gotcha: bool('False') == True; Need to use strtobool; Also need to handle None and []
result_node = then_node if condition_result_bool else else_node
if result_node is None:
return []
if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
raise ValueError('No value provided for input {}'.format(port_name))

if isinstance(arg, InputPathPlaceholder):
port_name = arg.input_name
input_filename = _generate_input_file_name(port_name)
input_key = input_name_to_kubernetes[port_name]
input_value = pythonic_input_argument_values[input_name_to_pythonic[port_name]]
if input_value is not None:
return input_filename
else:
input_spec = inputs_dict[port_name]
if input_spec.optional:
#Even when we support default values there is no need to check for a default here.
#In current execution flow (called by python task factory), the missing argument would be replaced with the default value by python itself.
return None
else:
expanded_result = expand_command_part(result_node)
return expanded_result

elif func_name == 'ispresent':
assert isinstance(func_argument, str)
input_name = func_argument
pythonic_input_name = input_name_to_pythonic[input_name]
argument_is_present = pythonic_input_argument_values[pythonic_input_name] is not None
return str(argument_is_present)
raise ValueError('No value provided for input {}'.format(port_name))

elif isinstance(arg, OutputPathPlaceholder):
port_name = arg.output_name
output_filename = _generate_output_file_name(port_name)
output_key = output_name_to_kubernetes[port_name]
if output_key in file_outputs:
if file_outputs[output_key] != output_filename:
raise ValueError('Conflicting output files specified for port {}: {} and {}'.format(port_name, file_outputs[output_key], output_filename))
else:
file_outputs[output_key] = output_filename

return output_filename

elif isinstance(arg, ConcatPlaceholder):
expanded_argument_strings = expand_argument_list(arg.items)
return ''.join(expanded_argument_strings)

elif isinstance(arg, IfPlaceholder):
arg = arg.if_structure
condition_result = expand_command_part(arg.condition)
from distutils.util import strtobool
condition_result_bool = condition_result and strtobool(condition_result) #Python gotcha: bool('False') == True; Need to use strtobool; Also need to handle None and []
result_node = arg.then_value if condition_result_bool else arg.else_value
if result_node is None:
return []
if isinstance(result_node, list):
expanded_result = expand_argument_list(result_node)
else:
expanded_result = expand_command_part(result_node)
return expanded_result

elif isinstance(arg, IsPresentPlaceholder):
pythonic_input_name = input_name_to_pythonic[arg.input_name]
argument_is_present = pythonic_input_argument_values[pythonic_input_name] is not None
return str(argument_is_present)
else:
raise TypeError('Unrecognized argument type: {}'.format(arg))

Expand Down
87 changes: 32 additions & 55 deletions sdk/python/kfp/components/_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from ._yaml_utils import dump_yaml
from ._components import _create_task_factory_from_component_spec
from ._structures import InputSpec, OutputSpec, ImplementationSpec, ContainerSpec, ComponentSpec
from ._structures import *

from pathlib import Path
from typing import TypeVar, Generic
Expand Down Expand Up @@ -79,73 +79,50 @@ def _func_to_component_spec(func, extra_code='', base_image=_default_base_image)
extra_output_names = []
arguments = []

def annotation_to_argument_kind_and_type_name(annotation):
def annotation_to_type_struct(annotation):
if not annotation or annotation == inspect.Parameter.empty:
return ('value', None)
if hasattr(annotation, '__origin__'): #Generic type
type_name = annotation.__origin__.__name__
type_args = annotation.__args__
#if len(type_args) != 1:
# raise TypeError('Unsupported generic type {}'.format(type_name))
inner_type = type_args[0]
if type_name == InputFile.__name__:
return ('file', inner_type.__name__)
elif type_name == OutputFile.__name__:
return ('output', inner_type.__name__)
return None
if isinstance(annotation, type):
return ('value', annotation.__name__)
return str(annotation.__name__)
else:
#!!! It's important to preserve string anotations as strings. Annotations that are neither types nor strings are converted to strings.
#Materializer adds double quotes to the types it does not recognize. - fix it to not quote strings.
#We need two kind of strings: we can use any type name for component YAML, but for generated Python code we must use valid python type annotations.
return ('value', "'" + str(annotation) + "'")
return str(annotation)

for parameter in parameters:
annotation = parameter.annotation

(argument_kind, parameter_type_name) = annotation_to_argument_kind_and_type_name(annotation)

parameter_to_type_name[parameter.name] = parameter_type_name

type_struct = annotation_to_type_struct(parameter.annotation)
parameter_to_type_name[parameter.name] = str(type_struct)
#TODO: Humanize the input/output names
arguments.append({argument_kind: parameter.name})

parameter_spec = OrderedDict([('name', parameter.name)])
if parameter_type_name:
parameter_spec['type'] = parameter_type_name
if argument_kind == 'value' or argument_kind == 'file':
inputs.append(parameter_spec)
elif argument_kind == 'output':
outputs.append(parameter_spec)
else:
#Cannot happen
raise ValueError('Unrecognized argument kind {}.'.format(argument_kind))
arguments.append(InputValuePlaceholder(parameter.name))

input_spec = InputSpec(
name=parameter.name,
type=type_struct,
)
inputs.append(input_spec)

#Analyzing the return type annotations.
return_ann = signature.return_annotation
if hasattr(return_ann, '_fields'): #NamedTuple
for field_name in return_ann._fields:
output_spec = OrderedDict([('name', field_name)])
type_struct = None
if hasattr(return_ann, '_field_types'):
output_type = return_ann._field_types.get(field_name, None)
if isinstance(output_type, type):
output_type_name = output_type.__name__
else:
output_type_name = str(output_type)

if output_type:
output_spec['type'] = output_type_name
type_struct = annotation_to_type_struct(return_ann._field_types.get(field_name, None))

output_spec = OutputSpec(
name=field_name,
type=type_struct,
)
outputs.append(output_spec)
extra_output_names.append(field_name)
arguments.append({'output': field_name})
else:
output_spec = OrderedDict([('name', single_output_name_const)])
(_, output_type_name) = annotation_to_argument_kind_and_type_name(signature.return_annotation)
if output_type_name:
output_spec['type'] = output_type_name
arguments.append(OutputPathPlaceholder(field_name))
elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty:
type_struct = annotation_to_type_struct(signature.return_annotation)
output_spec = OutputSpec(
name=single_output_name_const,
type=type_struct,
)
outputs.append(output_spec)
extra_output_names.append(single_output_pythonic_name_const)
arguments.append({'output': single_output_name_const})
arguments.append(OutputPathPlaceholder(single_output_name_const))

func_name=func.__name__

Expand Down Expand Up @@ -226,9 +203,9 @@ def annotation_to_argument_kind_and_type_name(annotation):
component_spec = ComponentSpec(
name=component_name,
description=description,
inputs=[InputSpec.from_struct(input) for input in inputs],
outputs=[OutputSpec.from_struct(output) for output in outputs],
implementation=ImplementationSpec(
inputs=inputs,
outputs=outputs,
implementation=ContainerImplementation(
container=ContainerSpec(
image=base_image,
command=['python3', '-c', full_source],
Expand Down
Loading