diff --git a/sdk/python/kfp/compiler/_component_builder.py b/sdk/python/kfp/compiler/_component_builder.py index d4e7c77958b..ee40e9a7c9e 100644 --- a/sdk/python/kfp/compiler/_component_builder.py +++ b/sdk/python/kfp/compiler/_component_builder.py @@ -23,6 +23,8 @@ from pathlib import Path from typing import Callable from ..components._components import _create_task_factory_from_component_spec +from ..components._python_op import _func_to_component_spec +from ..components._yaml_utils import dump_yaml from ._container_builder import ContainerBuilder class VersionedDependency(object): @@ -115,15 +117,15 @@ def _dependency_to_requirements(dependency=[], filename='requirements.txt'): dependency_helper.add_python_package(version) dependency_helper.generate_pip_requirements(filename) -def _generate_dockerfile(filename, base_image, entrypoint_filename, python_version, requirement_filename=None): +def _generate_dockerfile(filename, base_image, python_version, requirement_filename=None, add_files=None): """ generates dockerfiles Args: filename (str): target file name for the dockerfile. base_image (str): the base image name. - entrypoint_filename (str): the path of the entrypoint source file that is copied to the docker image. python_version (str): choose python2 or python3 requirement_filename (str): requirement file name + add_files (Dict[str, str]): Map containing the files thats should be added to the container. add_files maps the build context relative source paths to the container destination paths. """ if python_version not in ['python2', 'python3']: raise ValueError('python_version has to be either python2 or python3') @@ -139,175 +141,10 @@ def _generate_dockerfile(filename, base_image, entrypoint_filename, python_versi f.write('RUN pip3 install -r /ml/requirements.txt\n') else: f.write('RUN pip install -r /ml/requirements.txt\n') - f.write('ADD ' + entrypoint_filename + ' /ml/main.py\n') - if python_version == 'python3': - f.write('ENTRYPOINT ["python3", "-u", "/ml/main.py"]') - else: - f.write('ENTRYPOINT ["python", "-u", "/ml/main.py"]') - -class CodeGenerator(object): - """ CodeGenerator helps to generate python codes with identation """ - def __init__(self, indentation='\t'): - self._indentation = indentation - self._code = [] - self._level = 0 - - def begin(self): - self._code = [] - self._level = 0 - - def indent(self): - self._level += 1 + + for src_path, dst_path in (add_files or {}).items(): + f.write('ADD ' + src_path + ' ' + dst_path + '\n') - def dedent(self): - if self._level == 0: - raise Exception('CodeGenerator dedent error') - self._level -= 1 - - def writeline(self, line): - self._code.append(self._indentation * self._level + line) - - def end(self): - line_sep = '\n' - return line_sep.join(self._code) + line_sep - -def _func_to_entrypoint(component_func, python_version='python3'): - ''' - args: - python_version (str): choose python2 or python3, default is python3 - ''' - if python_version not in ['python2', 'python3']: - raise ValueError('python_version has to be either python2 or python3') - - fullargspec = inspect.getfullargspec(component_func) - annotations = fullargspec[6] - input_args = fullargspec[0] - inputs = {} - output = None - if 'return' in annotations.keys(): - output = annotations['return'] - output_is_named_tuple = hasattr(output, '_fields') - - for key, value in annotations.items(): - if key != 'return': - inputs[key] = value - if len(input_args) != len(inputs): - raise Exception('Some input arguments do not contain annotations.') - if 'return' in annotations and annotations['return'] not in [int, - float, str, bool] and not output_is_named_tuple: - raise Exception('Output type not supported and supported types are [int, float, str, bool]') - if output_is_named_tuple: - types = output._field_types - for field in output._fields: #Make sure all elements are supported - if types[field] not in [int, float, str, bool]: - raise Exception('Output type not supported and supported types are [int, float, str, bool]') - - # inputs is a dictionary with key of argument name and value of type class - # output is a type class, e.g. int, str, bool, float, NamedTuple. - - # Follow the same indentation with the component source codes. - component_src = inspect.getsource(component_func) - match = re.search(r'\n([ \t]+)[\w]+', component_src) - indentation = match.group(1) if match else '\t' - codegen = CodeGenerator(indentation=indentation) - - # Function signature - new_func_name = 'wrapper_' + component_func.__name__ - codegen.begin() - func_signature = 'def ' + new_func_name + '(' - for input_arg in input_args: - func_signature += input_arg + ',' - func_signature = func_signature + '_output_files' if output_is_named_tuple else func_signature + '_output_file' - func_signature += '):' - codegen.writeline(func_signature) - - # Call user function - codegen.indent() - call_component_func = 'output = ' + component_func.__name__ + '(' - if output_is_named_tuple: - call_component_func = call_component_func.replace('output', 'outputs') - for input_arg in input_args: - call_component_func += inputs[input_arg].__name__ + '(' + input_arg + '),' - call_component_func = call_component_func.rstrip(',') - call_component_func += ')' - codegen.writeline(call_component_func) - - # Serialize output - codegen.writeline('import os') - if output_is_named_tuple: - codegen.writeline('for _output_file, output in zip(_output_files, outputs):') - codegen.indent() - codegen.writeline('os.makedirs(os.path.dirname(_output_file))') - codegen.writeline('with open(_output_file, "w") as data:') - codegen.indent() - codegen.writeline('data.write(str(output))') - wrapper_code = codegen.end() - - # CLI codes - codegen.begin() - codegen.writeline('import argparse') - codegen.writeline('parser = argparse.ArgumentParser(description="Parsing arguments")') - for input_arg in input_args: - codegen.writeline('parser.add_argument("' + input_arg + '", type=' + inputs[input_arg].__name__ + ')') - if output_is_named_tuple: - codegen.writeline('parser.add_argument("_output_files", type=str, nargs=' + str(len(annotations['return']._fields)) + ')') - else: - codegen.writeline('parser.add_argument("_output_file", type=str)') - codegen.writeline('args = vars(parser.parse_args())') - codegen.writeline('') - codegen.writeline('if __name__ == "__main__":') - codegen.indent() - codegen.writeline(new_func_name + '(**args)') - - # Remove the decorator from the component source - src_lines = component_src.split('\n') - start_line_num = 0 - for line in src_lines: - if line.startswith('def '): - break - start_line_num += 1 - if python_version == 'python2': - src_lines[start_line_num] = 'def ' + component_func.__name__ + '(' + ', '.join((inspect.getfullargspec(component_func).args)) + '):' - dedecorated_component_src = '\n'.join(src_lines[start_line_num:]) - if output_is_named_tuple: - dedecorated_component_src = 'from typing import NamedTuple\n' + dedecorated_component_src - - complete_component_code = dedecorated_component_src + '\n' + wrapper_code + '\n' + codegen.end() - return complete_component_code - -class ComponentBuilder(object): - """ Component Builder. """ - def __init__(self, gcs_staging, target_image, namespace): - self._arc_docker_filename = 'dockerfile' - self._arc_python_filename = 'main.py' - self._arc_requirement_filename = 'requirements.txt' - self._container_builder = ContainerBuilder(gcs_staging, gcr_image_tag=target_image, namespace=namespace) - - def build_image_from_func(self, component_func, base_image, timeout, dependency, python_version='python3'): - """ build_image builds an image for the given python function - args: - python_version (str): choose python2 or python3, default is python3 - """ - if python_version not in ['python2', 'python3']: - raise ValueError('python_version has to be either python2 or python3') - with tempfile.TemporaryDirectory() as local_build_dir: - # Generate entrypoint and serialization python codes - local_python_filepath = os.path.join(local_build_dir, self._arc_python_filename) - logging.info('Generate entrypoint and serialization codes.') - complete_component_code = _func_to_entrypoint(component_func, python_version) - with open(local_python_filepath, 'w') as f: - f.write(complete_component_code) - - local_requirement_filepath = os.path.join(local_build_dir, self._arc_requirement_filename) - logging.info('Generate requirement file') - _dependency_to_requirements(dependency, local_requirement_filepath) - - local_docker_filepath = os.path.join(local_build_dir, self._arc_docker_filename) - _generate_dockerfile(local_docker_filepath, base_image, self._arc_python_filename, python_version, self._arc_requirement_filename) - - # Prepare build files - logging.info('Generate build files.') - return self._container_builder.build(local_build_dir, self._arc_docker_filename, timeout=timeout) def _configure_logger(logger): """ _configure_logger configures the logger such that the info level logs @@ -328,51 +165,6 @@ def _configure_logger(logger): logger.addHandler(info_handler) logger.addHandler(error_handler) -def _generate_pythonop(component_func, target_image, target_component_file=None): - """ Generate operator for the pipeline authors - The returned value is in fact a function, which should generates a container_op instance. """ - - from ..components._python_op import _python_function_name_to_component_name - from ..components._structures import InputSpec, InputValuePlaceholder, OutputPathPlaceholder, OutputSpec, ContainerImplementation, ContainerSpec, ComponentSpec - - - #Component name and description are derived from the function's name and docstribng, but can be overridden by @python_component function decorator - #The decorator can set the _component_human_name and _component_description attributes. getattr is needed to prevent error when these attributes do not exist. - component_name = getattr(component_func, '_component_human_name', None) or _python_function_name_to_component_name(component_func.__name__) - component_description = getattr(component_func, '_component_description', None) or (component_func.__doc__.strip() if component_func.__doc__ else None) - - #TODO: Humanize the input/output names - input_names = inspect.getfullargspec(component_func)[0] - - return_ann = inspect.signature(component_func).return_annotation - output_is_named_tuple = hasattr(return_ann, '_fields') - - output_names = ['output'] - if output_is_named_tuple: - output_names = return_ann._fields - - component_spec = ComponentSpec( - name=component_name, - description=component_description, - inputs=[InputSpec(name=input_name, type='str') for input_name in input_names], #TODO: Change type to actual type - outputs=[OutputSpec(name=output_name, type='str') for output_name in output_names], - implementation=ContainerImplementation( - container=ContainerSpec( - image=target_image, - #command=['python3', program_file], #TODO: Include the command line - args=[InputValuePlaceholder(input_name) for input_name in input_names] + - [OutputPathPlaceholder(output_name) for output_name in output_names], - ) - ) - ) - - target_component_file = target_component_file or getattr(component_func, '_component_target_component_file', None) - if target_component_file: - from ..components._yaml_utils import dump_yaml - component_text = dump_yaml(component_spec.to_dict()) - Path(target_component_file).write_text(component_text) - - return _create_task_factory_from_component_spec(component_spec) def build_python_component(component_func, target_image, base_image=None, dependency=[], staging_gcs_path=None, timeout=600, namespace='kubeflow', target_component_file=None, python_version='python3'): """ build_component automatically builds a container image for the component_func @@ -414,15 +206,64 @@ def build_python_component(component_func, target_image, base_image=None, depend base_image = base_image() logging.info('Build an image that is based on ' + - base_image + - ' and push the image to ' + - target_image) - builder = ComponentBuilder(gcs_staging=staging_gcs_path, target_image=target_image, namespace=namespace) - image_name_with_digest = builder.build_image_from_func(component_func, - base_image=base_image, timeout=timeout, - python_version=python_version, dependency=dependency) - logging.info('Build component complete.') - return _generate_pythonop(component_func, image_name_with_digest, target_component_file) + base_image + + ' and push the image to ' + + target_image) + + component_spec = _func_to_component_spec(component_func, base_image=base_image) + command_line_args = component_spec.implementation.container.command + + dash_c_index = command_line_args.index('-c') + program_code_index = dash_c_index + 1 + program_code = command_line_args[program_code_index] + program_rel_path = 'ml/main.py' + program_container_path = '/' + program_rel_path + + # Replacing the inline code with calling a local program + # Before: python3 -u -c 'import sys ...' --param1 ... + # After: python3 -u main.py --param1 ... + command_line_args[program_code_index] = program_container_path + command_line_args.pop(index=dash_c_index) + + if python_version == 'python2': + import warnings + warnings.warn('Python2 is not longer supported') + # Replacing the python interpreter + python_interpreter_index = command_line_args.index('python3') + command_line_args[python_interpreter_index] = python_version + + arc_docker_filename = 'Dockerfile' + arc_requirement_filename = 'requirements.txt' + + with tempfile.TemporaryDirectory() as local_build_dir: + # Write the program code to a file in the context directory + local_python_filepath = os.path.join(local_build_dir, program_rel_path) + with open(local_python_filepath, 'w') as f: + f.write(program_code) + + # Generate the python package requirements file in the context directory + local_requirement_filepath = os.path.join(local_build_dir, arc_requirement_filename) + _dependency_to_requirements(dependency, local_requirement_filepath) + + # Generate Dockerfile in the context directory + local_docker_filepath = os.path.join(local_build_dir, arc_docker_filename) + _generate_dockerfile(local_docker_filepath, base_image, python_version, arc_requirement_filename, add_files={program_rel_path, program_container_path}) + + logging.info('Building and pushing container image.') + container_builder = ContainerBuilder(staging_gcs_path, target_image, namespace) + image_name_with_digest = container_builder.build(local_build_dir, arc_docker_filename, target_image, timeout) + + component_spec.implementation.container.image = image_name_with_digest + + # Optionally writing the component definition to a local file for sharing + target_component_file = target_component_file or getattr(component_func, '_component_target_component_file', None) + if target_component_file: + component_text = dump_yaml(component_spec.to_dict()) + Path(target_component_file).write_text(component_text) + + task_factory_function = _create_task_factory_from_component_spec(component_spec) + return task_factory_function + def build_docker_image(staging_gcs_path, target_image, dockerfile_path, timeout=600, namespace='kubeflow'): """ build_docker_image automatically builds a container image based on the specification in the dockerfile and diff --git a/sdk/python/tests/compiler/component_builder_test.py b/sdk/python/tests/compiler/component_builder_test.py index d49957700b4..8d928755e9d 100644 --- a/sdk/python/tests/compiler/component_builder_test.py +++ b/sdk/python/tests/compiler/component_builder_test.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kfp.compiler._component_builder import _generate_dockerfile, _dependency_to_requirements, _func_to_entrypoint -from kfp.compiler._component_builder import CodeGenerator +from kfp.compiler._component_builder import _generate_dockerfile +from kfp.compiler._component_builder import _dependency_to_requirements from kfp.compiler._component_builder import VersionedDependency from kfp.compiler._component_builder import DependencyHelper @@ -108,36 +108,12 @@ def test_add_python_package(self): self.assertEqual(target_requirement_payload, golden_requirement_payload) os.remove(temp_file) -def sample_component_func(a: str, b: int) -> float: - result = 3.45 - if a == "succ": - result = float(b + 5) - return result -def basic_decorator(name): - def wrapper(func): - return func - return wrapper - -@basic_decorator(name='component_sample') -def sample_component_func_two(a: str, b: int) -> float: - result = 3.45 - if a == 'succ': - result = float(b + 5) - return result - -def sample_component_func_three() -> float: - return 1.0 - -def sample_component_func_four() -> NamedTuple( - 'output', [('a', float), ('b', str)]): - from collections import namedtuple - output = namedtuple('output', ['a', 'b']) - return output(1.0, 'test') class TestGenerator(unittest.TestCase): def test_generate_dockerfile(self): """ Test generate dockerfile """ + from kfp.compiler._component_builder import _generate_dockerfile # prepare test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') @@ -146,14 +122,14 @@ def test_generate_dockerfile(self): FROM gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0 RUN apt-get update -y && apt-get install --no-install-recommends -y -q python3 python3-pip python3-setuptools ADD main.py /ml/main.py -ENTRYPOINT ["python3", "-u", "/ml/main.py"]''' +''' golden_dockerfile_payload_two = '''\ FROM gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0 RUN apt-get update -y && apt-get install --no-install-recommends -y -q python3 python3-pip python3-setuptools ADD requirements.txt /ml/requirements.txt RUN pip3 install -r /ml/requirements.txt ADD main.py /ml/main.py -ENTRYPOINT ["python3", "-u", "/ml/main.py"]''' +''' golden_dockerfile_payload_three = '''\ FROM gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0 @@ -161,27 +137,27 @@ def test_generate_dockerfile(self): ADD requirements.txt /ml/requirements.txt RUN pip install -r /ml/requirements.txt ADD main.py /ml/main.py -ENTRYPOINT ["python", "-u", "/ml/main.py"]''' +''' # check _generate_dockerfile(filename=target_dockerfile, base_image='gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0', - entrypoint_filename='main.py', python_version='python3') + python_version='python3', add_files={'main.py': '/ml/main.py'}) with open(target_dockerfile, 'r') as f: target_dockerfile_payload = f.read() self.assertEqual(target_dockerfile_payload, golden_dockerfile_payload_one) _generate_dockerfile(filename=target_dockerfile, base_image='gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0', - entrypoint_filename='main.py', python_version='python3', requirement_filename='requirements.txt') + python_version='python3', requirement_filename='requirements.txt', add_files={'main.py': '/ml/main.py'}) with open(target_dockerfile, 'r') as f: target_dockerfile_payload = f.read() self.assertEqual(target_dockerfile_payload, golden_dockerfile_payload_two) _generate_dockerfile(filename=target_dockerfile, base_image='gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0', - entrypoint_filename='main.py', python_version='python2', requirement_filename='requirements.txt') + python_version='python2', requirement_filename='requirements.txt', add_files={'main.py': '/ml/main.py'}) with open(target_dockerfile, 'r') as f: target_dockerfile_payload = f.read() self.assertEqual(target_dockerfile_payload, golden_dockerfile_payload_three) self.assertRaises(ValueError, _generate_dockerfile, filename=target_dockerfile, - base_image='gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0', entrypoint_filename='main.py', - python_version='python4', requirement_filename='requirements.txt') + base_image='gcr.io/ngao-mlpipeline-testing/tensorflow:1.10.0', + python_version='python4', requirement_filename='requirements.txt', add_files={'main.py': '/ml/main.py'}) # clean up os.remove(target_dockerfile) @@ -204,162 +180,3 @@ def test_generate_requirement(self): target_payload = f.read() self.assertEqual(target_payload, golden_payload) os.remove(temp_file) - - def test_func_to_entrypoint(self): - """ Test entrypoint generation """ - - # prepare - test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') - - # check - generated_codes = _func_to_entrypoint(component_func=sample_component_func) - golden = '''\ -def sample_component_func(a: str, b: int) -> float: - result = 3.45 - if a == "succ": - result = float(b + 5) - return result - -def wrapper_sample_component_func(a,b,_output_file): - output = sample_component_func(str(a),int(b)) - import os - os.makedirs(os.path.dirname(_output_file)) - with open(_output_file, "w") as data: - data.write(str(output)) - -import argparse -parser = argparse.ArgumentParser(description="Parsing arguments") -parser.add_argument("a", type=str) -parser.add_argument("b", type=int) -parser.add_argument("_output_file", type=str) -args = vars(parser.parse_args()) - -if __name__ == "__main__": - wrapper_sample_component_func(**args) -''' - self.assertEqual(golden, generated_codes) - - generated_codes = _func_to_entrypoint(component_func=sample_component_func_two) - golden = '''\ -def sample_component_func_two(a: str, b: int) -> float: - result = 3.45 - if a == 'succ': - result = float(b + 5) - return result - -def wrapper_sample_component_func_two(a,b,_output_file): - output = sample_component_func_two(str(a),int(b)) - import os - os.makedirs(os.path.dirname(_output_file)) - with open(_output_file, "w") as data: - data.write(str(output)) - -import argparse -parser = argparse.ArgumentParser(description="Parsing arguments") -parser.add_argument("a", type=str) -parser.add_argument("b", type=int) -parser.add_argument("_output_file", type=str) -args = vars(parser.parse_args()) - -if __name__ == "__main__": - wrapper_sample_component_func_two(**args) -''' - self.assertEqual(golden, generated_codes) - - generated_codes = _func_to_entrypoint(component_func=sample_component_func_three) - golden = '''\ -def sample_component_func_three() -> float: - return 1.0 - -def wrapper_sample_component_func_three(_output_file): - output = sample_component_func_three() - import os - os.makedirs(os.path.dirname(_output_file)) - with open(_output_file, "w") as data: - data.write(str(output)) - -import argparse -parser = argparse.ArgumentParser(description="Parsing arguments") -parser.add_argument("_output_file", type=str) -args = vars(parser.parse_args()) - -if __name__ == "__main__": - wrapper_sample_component_func_three(**args) -''' - self.assertEqual(golden, generated_codes) - - generated_codes = _func_to_entrypoint(component_func=sample_component_func_four) - golden = '''\ -from typing import NamedTuple -def sample_component_func_four() -> NamedTuple( - 'output', [('a', float), ('b', str)]): - from collections import namedtuple - output = namedtuple('output', ['a', 'b']) - return output(1.0, 'test') - -def wrapper_sample_component_func_four(_output_files): - outputs = sample_component_func_four() - import os - for _output_file, output in zip(_output_files, outputs): - os.makedirs(os.path.dirname(_output_file)) - with open(_output_file, "w") as data: - data.write(str(output)) - -import argparse -parser = argparse.ArgumentParser(description="Parsing arguments") -parser.add_argument("_output_files", type=str, nargs=2) -args = vars(parser.parse_args()) - -if __name__ == "__main__": - wrapper_sample_component_func_four(**args) -''' - self.assertEqual(golden, generated_codes) - - def test_func_to_entrypoint_python2(self): - """ Test entrypoint generation for python2""" - - # prepare - test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') - - # check - generated_codes = _func_to_entrypoint(component_func=sample_component_func_two, python_version='python2') - golden = '''\ -def sample_component_func_two(a, b): - result = 3.45 - if a == 'succ': - result = float(b + 5) - return result - -def wrapper_sample_component_func_two(a,b,_output_file): - output = sample_component_func_two(str(a),int(b)) - import os - os.makedirs(os.path.dirname(_output_file)) - with open(_output_file, "w") as data: - data.write(str(output)) - -import argparse -parser = argparse.ArgumentParser(description="Parsing arguments") -parser.add_argument("a", type=str) -parser.add_argument("b", type=int) -parser.add_argument("_output_file", type=str) -args = vars(parser.parse_args()) - -if __name__ == "__main__": - wrapper_sample_component_func_two(**args) -''' - self.assertEqual(golden, generated_codes) - -# hello function is used by the TestCodeGenerator to verify the auto generated python function -def hello(): - print("hello") - -class TestCodeGenerator(unittest.TestCase): - def test_codegen(self): - """ Test code generator a function""" - codegen = CodeGenerator(indentation=' ') - codegen.begin() - codegen.writeline('def hello():') - codegen.indent() - codegen.writeline('print("hello")') - generated_codes = codegen.end() - self.assertEqual(generated_codes, inspect.getsource(hello))