Skip to content


SDK - Switching python container components to Lightweight components…
Browse files Browse the repository at this point in the history
… code generator (#1889)

* SDK - Switching python container components to Lightweight components code generator

* Fixed the tests

Had to remove the python2 test since python2 code generation is going away (python2 is near its End of Life and Kubeflow Pipelines only support python 3.5+).

* Added description for the internal add_files parameter

* Fixed typo

* Removed the `test_func_to_entrypoint` test
This was proposed by @gaoning777: `_func_to_entrypoint` is now just a reference to `_func_to_component_spec` which is extensively covered by other tests.
  • Loading branch information
Ark-kun authored and k8s-ci-robot committed Sep 4, 2019
1 parent 1b4919f commit cf681cb
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 418 deletions.
289 changes: 65 additions & 224 deletions sdk/python/kfp/compiler/
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from pathlib import Path
from typing import Callable
from ..components._components import _create_task_factory_from_component_spec
from ..components._python_op import _func_to_component_spec
from ..components._yaml_utils import dump_yaml
from ._container_builder import ContainerBuilder

class VersionedDependency(object):
Expand Down Expand Up @@ -115,15 +117,15 @@ def _dependency_to_requirements(dependency=[], filename='requirements.txt'):

def _generate_dockerfile(filename, base_image, entrypoint_filename, python_version, requirement_filename=None):
def _generate_dockerfile(filename, base_image, python_version, requirement_filename=None, add_files=None):
generates dockerfiles
filename (str): target file name for the dockerfile.
base_image (str): the base image name.
entrypoint_filename (str): the path of the entrypoint source file that is copied to the docker image.
python_version (str): choose python2 or python3
requirement_filename (str): requirement file name
add_files (Dict[str, str]): Map containing the files thats should be added to the container. add_files maps the build context relative source paths to the container destination paths.
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
Expand All @@ -139,175 +141,10 @@ def _generate_dockerfile(filename, base_image, entrypoint_filename, python_versi
f.write('RUN pip3 install -r /ml/requirements.txt\n')
f.write('RUN pip install -r /ml/requirements.txt\n')
f.write('ADD ' + entrypoint_filename + ' /ml/\n')
if python_version == 'python3':
f.write('ENTRYPOINT ["python3", "-u", "/ml/"]')
f.write('ENTRYPOINT ["python", "-u", "/ml/"]')

class CodeGenerator(object):
""" CodeGenerator helps to generate python codes with identation """
def __init__(self, indentation='\t'):
self._indentation = indentation
self._code = []
self._level = 0

def begin(self):
self._code = []
self._level = 0

def indent(self):
self._level += 1

for src_path, dst_path in (add_files or {}).items():
f.write('ADD ' + src_path + ' ' + dst_path + '\n')

def dedent(self):
if self._level == 0:
raise Exception('CodeGenerator dedent error')
self._level -= 1

def writeline(self, line):
self._code.append(self._indentation * self._level + line)

def end(self):
line_sep = '\n'
return line_sep.join(self._code) + line_sep

def _func_to_entrypoint(component_func, python_version='python3'):
python_version (str): choose python2 or python3, default is python3
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')

fullargspec = inspect.getfullargspec(component_func)
annotations = fullargspec[6]
input_args = fullargspec[0]
inputs = {}
output = None
if 'return' in annotations.keys():
output = annotations['return']
output_is_named_tuple = hasattr(output, '_fields')

for key, value in annotations.items():
if key != 'return':
inputs[key] = value
if len(input_args) != len(inputs):
raise Exception('Some input arguments do not contain annotations.')
if 'return' in annotations and annotations['return'] not in [int,
float, str, bool] and not output_is_named_tuple:
raise Exception('Output type not supported and supported types are [int, float, str, bool]')
if output_is_named_tuple:
types = output._field_types
for field in output._fields: #Make sure all elements are supported
if types[field] not in [int, float, str, bool]:
raise Exception('Output type not supported and supported types are [int, float, str, bool]')

# inputs is a dictionary with key of argument name and value of type class
# output is a type class, e.g. int, str, bool, float, NamedTuple.

# Follow the same indentation with the component source codes.
component_src = inspect.getsource(component_func)
match ='\n([ \t]+)[\w]+', component_src)
indentation = if match else '\t'
codegen = CodeGenerator(indentation=indentation)

# Function signature
new_func_name = 'wrapper_' + component_func.__name__
func_signature = 'def ' + new_func_name + '('
for input_arg in input_args:
func_signature += input_arg + ','
func_signature = func_signature + '_output_files' if output_is_named_tuple else func_signature + '_output_file'
func_signature += '):'

# Call user function
call_component_func = 'output = ' + component_func.__name__ + '('
if output_is_named_tuple:
call_component_func = call_component_func.replace('output', 'outputs')
for input_arg in input_args:
call_component_func += inputs[input_arg].__name__ + '(' + input_arg + '),'
call_component_func = call_component_func.rstrip(',')
call_component_func += ')'

# Serialize output
codegen.writeline('import os')
if output_is_named_tuple:
codegen.writeline('for _output_file, output in zip(_output_files, outputs):')
codegen.writeline('with open(_output_file, "w") as data:')
wrapper_code = codegen.end()

# CLI codes
codegen.writeline('import argparse')
codegen.writeline('parser = argparse.ArgumentParser(description="Parsing arguments")')
for input_arg in input_args:
codegen.writeline('parser.add_argument("' + input_arg + '", type=' + inputs[input_arg].__name__ + ')')
if output_is_named_tuple:
codegen.writeline('parser.add_argument("_output_files", type=str, nargs=' + str(len(annotations['return']._fields)) + ')')
codegen.writeline('parser.add_argument("_output_file", type=str)')
codegen.writeline('args = vars(parser.parse_args())')
codegen.writeline('if __name__ == "__main__":')
codegen.writeline(new_func_name + '(**args)')

# Remove the decorator from the component source
src_lines = component_src.split('\n')
start_line_num = 0
for line in src_lines:
if line.startswith('def '):
start_line_num += 1
if python_version == 'python2':
src_lines[start_line_num] = 'def ' + component_func.__name__ + '(' + ', '.join((inspect.getfullargspec(component_func).args)) + '):'
dedecorated_component_src = '\n'.join(src_lines[start_line_num:])
if output_is_named_tuple:
dedecorated_component_src = 'from typing import NamedTuple\n' + dedecorated_component_src

complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end()
return complete_component_code

class ComponentBuilder(object):
""" Component Builder. """
def __init__(self, gcs_staging, target_image, namespace):
self._arc_docker_filename = 'dockerfile'
self._arc_python_filename = ''
self._arc_requirement_filename = 'requirements.txt'
self._container_builder = ContainerBuilder(gcs_staging, gcr_image_tag=target_image, namespace=namespace)

def build_image_from_func(self, component_func, base_image, timeout, dependency, python_version='python3'):
""" build_image builds an image for the given python function
python_version (str): choose python2 or python3, default is python3
if python_version not in ['python2', 'python3']:
raise ValueError('python_version has to be either python2 or python3')
with tempfile.TemporaryDirectory() as local_build_dir:
# Generate entrypoint and serialization python codes
local_python_filepath = os.path.join(local_build_dir, self._arc_python_filename)'Generate entrypoint and serialization codes.')
complete_component_code = _func_to_entrypoint(component_func, python_version)
with open(local_python_filepath, 'w') as f:

local_requirement_filepath = os.path.join(local_build_dir, self._arc_requirement_filename)'Generate requirement file')
_dependency_to_requirements(dependency, local_requirement_filepath)

local_docker_filepath = os.path.join(local_build_dir, self._arc_docker_filename)
_generate_dockerfile(local_docker_filepath, base_image, self._arc_python_filename, python_version, self._arc_requirement_filename)

# Prepare build files'Generate build files.')
return, self._arc_docker_filename, timeout=timeout)

def _configure_logger(logger):
""" _configure_logger configures the logger such that the info level logs
Expand All @@ -328,51 +165,6 @@ def _configure_logger(logger):

def _generate_pythonop(component_func, target_image, target_component_file=None):
""" Generate operator for the pipeline authors
The returned value is in fact a function, which should generates a container_op instance. """

from ..components._python_op import _python_function_name_to_component_name
from ..components._structures import InputSpec, InputValuePlaceholder, OutputPathPlaceholder, OutputSpec, ContainerImplementation, ContainerSpec, ComponentSpec

#Component name and description are derived from the function's name and docstribng, but can be overridden by @python_component function decorator
#The decorator can set the _component_human_name and _component_description attributes. getattr is needed to prevent error when these attributes do not exist.
component_name = getattr(component_func, '_component_human_name', None) or _python_function_name_to_component_name(component_func.__name__)
component_description = getattr(component_func, '_component_description', None) or (component_func.__doc__.strip() if component_func.__doc__ else None)

#TODO: Humanize the input/output names
input_names = inspect.getfullargspec(component_func)[0]

return_ann = inspect.signature(component_func).return_annotation
output_is_named_tuple = hasattr(return_ann, '_fields')

output_names = ['output']
if output_is_named_tuple:
output_names = return_ann._fields

component_spec = ComponentSpec(
inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Change type to actual type
outputs=[OutputSpec(name=output_name, type='str') for output_name in output_names],
#command=['python3', program_file], #TODO: Include the command line
args=[InputValuePlaceholder(input_name) for input_name in input_names] +
[OutputPathPlaceholder(output_name) for output_name in output_names],

target_component_file = target_component_file or getattr(component_func, '_component_target_component_file', None)
if target_component_file:
from ..components._yaml_utils import dump_yaml
component_text = dump_yaml(component_spec.to_dict())

return _create_task_factory_from_component_spec(component_spec)

def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, timeout=600, namespace='kubeflow', target_component_file=None, python_version='python3'):
""" build_component automatically builds a container image for the component_func
Expand Down Expand Up @@ -414,15 +206,64 @@ def build_python_component(component_func, target_image, base_image=None, depend
base_image = base_image()'Build an image that is based on ' +
base_image +
' and push the image to ' +
builder = ComponentBuilder(gcs_staging=staging_gcs_path, target_image=target_image, namespace=namespace)
image_name_with_digest = builder.build_image_from_func(component_func,
base_image=base_image, timeout=timeout,
python_version=python_version, dependency=dependency)'Build component complete.')
return _generate_pythonop(component_func, image_name_with_digest, target_component_file)
base_image +
' and push the image to ' +

component_spec = _func_to_component_spec(component_func, base_image=base_image)
command_line_args = component_spec.implementation.container.command

dash_c_index = command_line_args.index('-c')
program_code_index = dash_c_index + 1
program_code = command_line_args[program_code_index]
program_rel_path = 'ml/'
program_container_path = '/' + program_rel_path

# Replacing the inline code with calling a local program
# Before: python3 -u -c 'import sys ...' --param1 ...
# After: python3 -u --param1 ...
command_line_args[program_code_index] = program_container_path

if python_version == 'python2':
import warnings
warnings.warn('Python2 is not longer supported')
# Replacing the python interpreter
python_interpreter_index = command_line_args.index('python3')
command_line_args[python_interpreter_index] = python_version

arc_docker_filename = 'Dockerfile'
arc_requirement_filename = 'requirements.txt'

with tempfile.TemporaryDirectory() as local_build_dir:
# Write the program code to a file in the context directory
local_python_filepath = os.path.join(local_build_dir, program_rel_path)
with open(local_python_filepath, 'w') as f:

# Generate the python package requirements file in the context directory
local_requirement_filepath = os.path.join(local_build_dir, arc_requirement_filename)
_dependency_to_requirements(dependency, local_requirement_filepath)

# Generate Dockerfile in the context directory
local_docker_filepath = os.path.join(local_build_dir, arc_docker_filename)
_generate_dockerfile(local_docker_filepath, base_image, python_version, arc_requirement_filename, add_files={program_rel_path, program_container_path})'Building and pushing container image.')
container_builder = ContainerBuilder(staging_gcs_path, target_image, namespace)
image_name_with_digest =, arc_docker_filename, target_image, timeout)

component_spec.implementation.container.image = image_name_with_digest

# Optionally writing the component definition to a local file for sharing
target_component_file = target_component_file or getattr(component_func, '_component_target_component_file', None)
if target_component_file:
component_text = dump_yaml(component_spec.to_dict())

task_factory_function = _create_task_factory_from_component_spec(component_spec)
return task_factory_function

def build_docker_image(staging_gcs_path, target_image, dockerfile_path, timeout=600, namespace='kubeflow'):
""" build_docker_image automatically builds a container image based on the specification in the dockerfile and
Expand Down

0 comments on commit cf681cb

Please sign in to comment.