Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK/Components/Python - Removed python_op in favor of python_component #85

Merged
58 changes: 41 additions & 17 deletions sdk/python/kfp/compiler/_component_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,22 @@ def _configure_logger(logger):
logger.addHandler(info_handler)
logger.addHandler(error_handler)

def _generate_pythonop(component_func, target_image):
def _generate_pythonop(component_func, target_image, target_component_file=None):
""" Generate operator for the pipeline authors
component_meta is a dict of name, description, base_image, target_image, input_list
The returned value is in fact a function, which should generates a container_op instance. """
component_meta = dsl.PythonComponent.get_python_component(component_func)

from ..components._python_op import _python_function_name_to_component_name

#Component name and description are derived from the function's name and docstribng, but can be overridden by @python_component function decorator
#The decorator can set the _component_human_name and _component_description attributes. getattr is needed to prevent error when these attributes do not exist.
component_name = getattr(component_func, '_component_human_name', None) or _python_function_name_to_component_name(component_func.__name__)
component_description = getattr(component_func, '_component_description', None) or (component_func.__doc__.strip() if component_func.__doc__ else None)

input_names = inspect.getfullargspec(component_func)[0]

component_artifact = {}
component_artifact['name'] = component_meta['name']
component_artifact['description'] = component_meta['description']
component_artifact['name'] = component_name
component_artifact['description'] = component_description
component_artifact['outputs'] = [{'name': 'output'}]
component_artifact['inputs'] = []
component_artifact['implementation'] = {
Expand All @@ -328,14 +334,23 @@ def _generate_pythonop(component_func, target_image):
'type': 'str'
})
component_artifact['implementation']['dockerContainer']['arguments'].append({'value': input})

target_component_file = target_component_file or getattr(component_func, '_component_target_component_file', None)
if target_component_file:
from ..components._yaml_utils import dump_yaml
component_text = dump_yaml(component_artifact)
Path(target_component_file).write_text(component_text)

return _create_task_factory_from_component_dict(component_artifact)

def build_python_component(component_func, staging_gcs_path, target_image, build_image=True, timeout=600, namespace='kubeflow'):
def build_python_component(component_func, target_image, base_image=None, staging_gcs_path=None, build_image=True, timeout=600, namespace='kubeflow', target_component_file=None):
""" build_component automatically builds a container image for the component_func
based on the base_image and pushes to the target_image.

Args:
component_func (python function): The python function to build components upon
base_image (str): Docker image to use as a base image
target_image (str): Full URI to push the target image
staging_gcs_path (str): GCS blob that can store temporary build files
timeout (int): the timeout for the image build(in secs), default is 600 seconds
namespace (str): the namespace within which to run the kubernetes kaniko job, default is "kubeflow"
Expand All @@ -344,23 +359,32 @@ def build_python_component(component_func, staging_gcs_path, target_image, build
Raises:
ValueError: The function is not decorated with python_component decorator
"""

_configure_logger(logging.getLogger())
component_meta = dsl.PythonComponent.get_python_component(component_func)
component_meta['inputs'] = inspect.getfullargspec(component_func)[0]

if component_meta is None:
raise ValueError('The function "%s" does not exist. '
'Did you forget @dsl.python_component decoration?' % component_func)
logging.info('Build an image that is based on ' +
component_meta['base_image'] +

if component_func is None:
raise ValueError('component_func must not be None')
if target_image is None:
raise ValueError('target_image must not be None')

if build_image:
if staging_gcs_path is None:
raise ValueError('staging_gcs_path must not be None')

if base_image is None:
base_image = getattr(component_func, '_component_base_image', None)
if base_image is None:
raise ValueError('base_image must not be None')

logging.info('Build an image that is based on ' +
base_image +
' and push the image to ' +
target_image)
if build_image:
builder = ImageBuilder(gcs_base=staging_gcs_path, target_image=target_image)
builder.build_image_from_func(component_func, namespace=namespace,
base_image=component_meta['base_image'], timeout=timeout)
base_image=base_image, timeout=timeout)
logging.info('Build component complete.')
return _generate_pythonop(component_func, target_image)
return _generate_pythonop(component_func, target_image, target_component_file)

def build_docker_image(staging_gcs_path, target_image, dockerfile_path, timeout=600, namespace='kubeflow'):
""" build_docker_image automatically builds a container image based on the specification in the dockerfile and
Expand Down
74 changes: 34 additions & 40 deletions sdk/python/kfp/components/_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

__all__ = [
'python_op',
'func_to_container_op',
'func_to_component_text',
]
Expand Down Expand Up @@ -47,6 +46,24 @@ def _python_function_name_to_component_name(name):


def _func_to_component_spec(func, extra_code='', base_image=_default_base_image) -> ComponentSpec:
'''Takes a self-contained python function and converts it to component

Args:
func: Required. The function to be converted
base_image: Optional. Docker image to be used as a base image for the python component. Must have python 3.5+ installed. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Python source code that gets placed before the function code. Can be used as workaround to define types used in function signature.
'''
decorator_base_image = getattr(func, '_component_base_image', None)
if decorator_base_image is not None:
if base_image is not _default_base_image and decorator_base_image != base_image:
raise ValueError('base_image ({}) conflicts with the decorator-specified base image metadata ({})'.format(base_image, decorator_base_image))
else:
base_image = decorator_base_image
else:
if base_image is None:
raise ValueError('base_image cannot be None')

import inspect
import re
from collections import OrderedDict
Expand Down Expand Up @@ -199,8 +216,12 @@ def annotation_to_argument_kind_and_type_name(annotation):
#Removing consecutive blank lines
full_source = re.sub('\n\n\n+', '\n\n', full_source).strip('\n') + '\n'

component_name = _python_function_name_to_component_name(func_name)
description = func.__doc__.strip() + '\n' if func.__doc__ else None #Interesting: unlike ruamel.yaml, PyYaml cannot handle trailing spaces in the last line (' \n') and switches the style to double-quoted.
#Component name and description are derived from the function's name and docstribng, but can be overridden by @python_component function decorator
#The decorator can set the _component_human_name and _component_description attributes. getattr is needed to prevent error when these attributes do not exist.
component_name = getattr(func, '_component_human_name', None) or _python_function_name_to_component_name(func.__name__)
description = getattr(func, '_component_description', None) or func.__doc__
if description:
description = description.strip() + '\n' #Interesting: unlike ruamel.yaml, PyYaml cannot handle trailing spaces in the last line (' \n') and switches the style to double-quoted.

component_spec = ComponentSpec(
name=component_name,
Expand Down Expand Up @@ -238,8 +259,9 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s

Args:
func: The python function to convert
base_image: Optional. Specify a custom Docker containerimage to use in the component. For lightweight components, the image needs to have python and the fire package.
extra_code: Optional. Extra code to add before the function code. May contain imports and other functions.
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.

Returns:
Textual representation of a component definition
Expand All @@ -264,8 +286,9 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s
Args:
func: The python function to convert
output_component_file: Write a component definition to a local file. Can be used for sharing.
base_image: Optional. Specify a custom Docker containerimage to use in the component. For lightweight components, the image needs to have python and the fire package.
extra_code: Optional. Extra code to add before the function code. May contain imports and other functions.
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.
'''

component_yaml = func_to_component_text(func, extra_code, base_image)
Expand All @@ -288,9 +311,10 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s

Args:
func: The python function to convert
base_image: Optional. Specify a custom Docker containerimage to use in the component. For lightweight components, the image needs to have python and the fire package.
base_image: Optional. Specify a custom Docker container image to use in the component. For lightweight components, the image needs to have python 3.5+. Default is tensorflow/tensorflow:1.11.0-py3
Note: The image can also be specified by decorating the function with the @python_component decorator. If different base images are explicitly specified in both places, an error is raised.
output_component_file: Optional. Write a component definition to a local file. Can be used for sharing.
extra_code: Optional. Extra code to add before the function code. May contain imports and other functions.
extra_code: Optional. Extra code to add before the function code. Can be used as workaround to define types used in function signature.

Returns:
A factory function with a strongly-typed signature taken from the python function.
Expand All @@ -299,41 +323,11 @@ def add_multiply_two_numbers(a: float, b: float) -> NamedTuple('DummyName', [('s

component_spec = _func_to_component_spec(func, extra_code, base_image)

output_component_file = output_component_file or getattr(func, '_component_target_component_file', None)
if output_component_file:
component_dict = component_spec.to_struct()
component_yaml = dump_yaml(component_dict)
Path(output_component_file).write_text(component_yaml)
#TODO: assert ComponentSpec.from_struct(load_yaml(output_component_file)) == component_spec

return _create_task_factory_from_component_spec(component_spec)


def python_op(func=None, base_image=_default_base_image, output_component_file=None, extra_code=''):
'''
Decorator that replaces a Python function with an equivalent task (ContainerOp) factory

Function docstring is used as component description.
Argument and return annotations are used as component input/output types.
To declare a function with multiple return values, use the NamedTuple return annotation syntax:

from typing import NamedTuple
@python_op(base_image='tensorflow/tensorflow:1.11.0-py3')
def add_multiply_two_numbers_op(a: float, b: float) -> NamedTuple('DummyName', [('sum', float), ('product', float)]):
"""Returns sum and product of two arguments"""
return (a + b, a * b)

Args:
func: The python function to convert
base_image: Optional. Specify a custom Docker containerimage to use in the component. For lightweight components, the image needs to have python and the fire package.
output_component_file: Optional. Write a component definition to a local file. Can be used for sharing.
extra_code: Optional. Extra code to add before the function code. May contain imports and other functions.

Returns:
A factory function with a strongly-typed signature taken from the python function.
Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp) that can run the original function in a container.
'''

if func:
return func_to_container_op(func, output_component_file, base_image, extra_code)
else:
return lambda f: func_to_container_op(f, output_component_file, base_image, extra_code)
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
from ._pipeline import Pipeline, pipeline
from ._container_op import ContainerOp
from ._ops_group import OpsGroup, ExitHandler, Condition
from ._component import PythonComponent, python_component
from ._component import python_component
54 changes: 21 additions & 33 deletions sdk/python/kfp/dsl/_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.

def python_component(name, description, base_image):
"""Decorator of component functions.
def python_component(name, description=None, base_image=None, target_component_file: str = None):
"""Decorator for Python component functions.
This decorator adds the metadata to the function object itself.

Args:
name: Human-readable name of the component
description: Optional. Description of the component
base_image: Optional. Docker container image to use as the base of the component. Needs to have Python 3.5+ installed.
target_component_file: Optional. Local file to store the component definition. The file can then be used for sharing.

Returns:
The same function (with some metadata fields set).

Usage:
```python
@dsl.python_component(
name='my awesome component',
description='Come, Let's play'
base_image='tensorflow/tensorflow'
description='Come, Let's play',
base_image='tensorflow/tensorflow:1.11.0-py3',
)
def my_component(a: str, b: int) -> str:
...
```
"""
def _python_component(func):
PythonComponent.add_python_component(name, description, base_image, func)
func._component_human_name = name
if description:
func._component_description = description
if base_image:
func._component_base_image = base_image
if target_component_file:
func._component_target_component_file = target_component_file
return func

return _python_component

class PythonComponent():
"""A pipeline contains a list of operators.

This class is not supposed to be used by component authors since component authors can use
component functions (decorated with @python_component) to reference their pipelines. This class
is useful for implementing a compiler. For example, the compiler can use the following
to get the PythonComponent object:
"""


# All pipeline functions with @pipeline decorator that are imported.
# Each key is a pipeline function. Each value is a dictionary of name, description, base_image.
_component_functions = {}

@staticmethod
def add_python_component(name, description, base_image, func):
""" Add a python component """
PythonComponent._component_functions[func] = {
'name': name,
'description': description,
'base_image': base_image
}

@staticmethod
def get_python_component(func):
""" Get a python component """
return PythonComponent._component_functions.get(func, None)
Loading