From 36405930eaa16740ef66e425f5ff0f2bd32897ec Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Feb 2019 10:56:38 -0800 Subject: [PATCH 1/9] dsl generate zip file --- samples/basic/condition.py | 2 +- samples/basic/exit_handler.py | 2 +- samples/basic/immediate_value.py | 2 +- samples/basic/parallel_join.py | 2 +- samples/basic/retry.py | 2 +- samples/basic/sequential.py | 2 +- .../ibm-samples/ffdl-seldon/ffdl_pipeline.py | 2 +- .../watson/watson_train_serve_pipeline.py | 2 +- .../kubeflow-training-classification.py | 2 +- ...ow Pipeline Using TFX OSS Components.ipynb | 10 +++--- ...ghtweight Python components - basics.ipynb | 4 +-- samples/resnet-cmle/resnet-train-pipeline.py | 2 +- .../tfx/taxi-cab-classification-pipeline.py | 2 +- samples/xgboost-spark/xgboost-training-cm.py | 2 +- sdk/python/kfp/compiler/compiler.py | 14 ++++---- sdk/python/tests/compiler/compiler_tests.py | 32 +++++++++---------- 16 files changed, 42 insertions(+), 42 deletions(-) diff --git a/samples/basic/condition.py b/samples/basic/condition.py index 98a8b16cf5d..76d02336e77 100755 --- a/samples/basic/condition.py +++ b/samples/basic/condition.py @@ -76,4 +76,4 @@ def flipcoin(): if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(flipcoin, __file__ + '.tar.gz') + compiler.Compiler().compile(flipcoin, __file__ + '.zip') diff --git a/samples/basic/exit_handler.py b/samples/basic/exit_handler.py index adb40416354..85806ffe771 100755 --- a/samples/basic/exit_handler.py +++ b/samples/basic/exit_handler.py @@ -46,4 +46,4 @@ def download_and_print(url='gs://ml-pipeline-playground/shakespeare1.txt'): if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(download_and_print, __file__ + '.tar.gz') + compiler.Compiler().compile(download_and_print, __file__ + '.zip') diff --git a/samples/basic/immediate_value.py b/samples/basic/immediate_value.py index 32ae9634b82..9cc2842dd11 100755 --- a/samples/basic/immediate_value.py +++ b/samples/basic/immediate_value.py @@ -40,4 +40,4 @@ def immediate_value_pipeline(): if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(immediate_value_pipeline, __file__ + '.tar.gz') + compiler.Compiler().compile(immediate_value_pipeline, __file__ + '.zip') diff --git a/samples/basic/parallel_join.py b/samples/basic/parallel_join.py index 3742a7ecd38..6b14a821f0e 100755 --- a/samples/basic/parallel_join.py +++ b/samples/basic/parallel_join.py @@ -48,4 +48,4 @@ def download_and_join( if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(download_and_join, __file__ + '.tar.gz') + compiler.Compiler().compile(download_and_join, __file__ + '.zip') diff --git a/samples/basic/retry.py b/samples/basic/retry.py index 4d5e244291c..f3407c305f7 100755 --- a/samples/basic/retry.py +++ b/samples/basic/retry.py @@ -39,4 +39,4 @@ def retry_sample_pipeline(): if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz') + compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.zip') diff --git a/samples/basic/sequential.py b/samples/basic/sequential.py index 84b8a78c811..511becfc722 100755 --- a/samples/basic/sequential.py +++ b/samples/basic/sequential.py @@ -38,4 +38,4 @@ def sequential_pipeline(url='gs://ml-pipeline-playground/shakespeare1.txt'): if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(sequential_pipeline, __file__ + '.tar.gz') + compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip') diff --git a/samples/ibm-samples/ffdl-seldon/ffdl_pipeline.py b/samples/ibm-samples/ffdl-seldon/ffdl_pipeline.py index 972f265e19e..d7725d54ccf 100644 --- a/samples/ibm-samples/ffdl-seldon/ffdl_pipeline.py +++ b/samples/ibm-samples/ffdl-seldon/ffdl_pipeline.py @@ -57,4 +57,4 @@ def ffdlPipeline( if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(ffdlPipeline, __file__ + '.tar.gz') + compiler.Compiler().compile(ffdlPipeline, __file__ + '.zip') diff --git a/samples/ibm-samples/watson/watson_train_serve_pipeline.py b/samples/ibm-samples/watson/watson_train_serve_pipeline.py index e0f17ebd880..f0ecfcb84d0 100644 --- a/samples/ibm-samples/watson/watson_train_serve_pipeline.py +++ b/samples/ibm-samples/watson/watson_train_serve_pipeline.py @@ -74,5 +74,5 @@ def kfp_wml_pipeline( if __name__ == '__main__': # compile the pipeline import kfp.compiler as compiler - pipeline_filename = kfp_wml_pipeline.__name__ + '.tar.gz' + pipeline_filename = kfp_wml_pipeline.__name__ + '.zip' compiler.Compiler().compile(kfp_wml_pipeline, pipeline_filename) diff --git a/samples/kubeflow-tf/kubeflow-training-classification.py b/samples/kubeflow-tf/kubeflow-training-classification.py index 22935a13a5b..14a63a5581d 100755 --- a/samples/kubeflow-tf/kubeflow-training-classification.py +++ b/samples/kubeflow-tf/kubeflow-training-classification.py @@ -112,4 +112,4 @@ def kubeflow_training(output, project, if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(kubeflow_training, __file__ + '.tar.gz') + compiler.Compiler().compile(kubeflow_training, __file__ + '.zip') diff --git a/samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb b/samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb index d2d88f39c24..2419a2a9669 100644 --- a/samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb +++ b/samples/notebooks/KubeFlow Pipeline Using TFX OSS Components.ipynb @@ -300,10 +300,10 @@ "outputs": [], "source": [ "# Compile it into a tar package.\n", - "compiler.Compiler().compile(taxi_cab_classification, 'tfx.tar.gz')\n", + "compiler.Compiler().compile(taxi_cab_classification, 'tfx.zip')\n", "\n", "# Submit a run.\n", - "run = client.run_pipeline(exp.id, 'tfx', 'tfx.tar.gz',\n", + "run = client.run_pipeline(exp.id, 'tfx', 'tfx.zip',\n", " params={'output': OUTPUT_DIR,\n", " 'project': PROJECT_NAME})" ] @@ -572,9 +572,9 @@ "metadata": {}, "outputs": [], "source": [ - "compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx.tar.gz')\n", + "compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx.zip')\n", "\n", - "run = client.run_pipeline(exp.id, 'my-tfx', 'my-tfx.tar.gz',\n", + "run = client.run_pipeline(exp.id, 'my-tfx', 'my-tfx.zip',\n", " params={'output': OUTPUT_DIR,\n", " 'project': PROJECT_NAME,\n", " 'model': DEPLOYER_MODEL,\n", @@ -633,7 +633,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.4" + "version": "3.5.4rc1" } }, "nbformat": 4, diff --git a/samples/notebooks/Lightweight Python components - basics.ipynb b/samples/notebooks/Lightweight Python components - basics.ipynb index 585e2ecf9ab..22279dc8be6 100644 --- a/samples/notebooks/Lightweight Python components - basics.ipynb +++ b/samples/notebooks/Lightweight Python components - basics.ipynb @@ -228,7 +228,7 @@ "outputs": [], "source": [ "pipeline_func = calc_pipeline\n", - "pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n", + "pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'\n", "import kfp.compiler as compiler\n", "compiler.Compiler().compile(pipeline_func, pipeline_filename)" ] @@ -278,7 +278,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.0" + "version": "3.5.4rc1" } }, "nbformat": 4, diff --git a/samples/resnet-cmle/resnet-train-pipeline.py b/samples/resnet-cmle/resnet-train-pipeline.py index 854ab507a8b..1f293e64b0d 100644 --- a/samples/resnet-cmle/resnet-train-pipeline.py +++ b/samples/resnet-cmle/resnet-train-pipeline.py @@ -105,4 +105,4 @@ def resnet_train( if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(resnet_train, __file__ + '.tar.gz') + compiler.Compiler().compile(resnet_train, __file__ + '.zip') diff --git a/samples/tfx/taxi-cab-classification-pipeline.py b/samples/tfx/taxi-cab-classification-pipeline.py index 4099cd7cd5f..c980a1830e1 100755 --- a/samples/tfx/taxi-cab-classification-pipeline.py +++ b/samples/tfx/taxi-cab-classification-pipeline.py @@ -178,4 +178,4 @@ def taxi_cab_classification( if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(taxi_cab_classification, __file__ + '.tar.gz') + compiler.Compiler().compile(taxi_cab_classification, __file__ + '.zip') diff --git a/samples/xgboost-spark/xgboost-training-cm.py b/samples/xgboost-spark/xgboost-training-cm.py index 2ef8d7506cf..4f60fef193d 100755 --- a/samples/xgboost-spark/xgboost-training-cm.py +++ b/samples/xgboost-spark/xgboost-training-cm.py @@ -204,4 +204,4 @@ def xgb_train_pipeline( if __name__ == '__main__': import kfp.compiler as compiler - compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.tar.gz') + compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.zip') diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index da82504d99f..4267521f121 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -17,6 +17,7 @@ import inspect import re import tarfile +import zipfile import yaml from .. import dsl @@ -573,10 +574,9 @@ def compile(self, pipeline_func, package_path): yaml.Dumper.ignore_aliases = lambda *args : True yaml_text = yaml.dump(workflow, default_flow_style=False) - from contextlib import closing - from io import BytesIO - with tarfile.open(package_path, "w:gz") as tar: - with closing(BytesIO(yaml_text.encode())) as yaml_file: - tarinfo = tarfile.TarInfo('pipeline.yaml') - tarinfo.size = len(yaml_file.getvalue()) - tar.addfile(tarinfo, fileobj=yaml_file) + with zipfile.ZipFile(package_path, "w") as zip: + zipinfo = zipfile.ZipInfo('pipeline.yaml') + zipinfo.compress_type = zipfile.ZIP_DEFLATED + zip.writestr(zipinfo, yaml_text) + + diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 4fa726f03ee..68a03b39a64 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -18,7 +18,7 @@ import shutil import subprocess import sys -import tarfile +import zipfile import tempfile import unittest import yaml @@ -114,9 +114,9 @@ def test_operator_to_template(self): self.maxDiff = None self.assertEqual(golden_output, compiler.Compiler()._op_to_template(op)) - def _get_yaml_from_tar(self, tar_file): - with tarfile.open(tar_file, 'r:gz') as tar: - return yaml.load(tar.extractfile(tar.getmembers()[0])) + def _get_yaml_from_zip(self, zip_file): + with zipfile.ZipFile(zip_file, 'r') as zip: + return yaml.load(zip.extract(zip.namelist()[0])) def test_basic_workflow(self): """Test compiling a basic workflow.""" @@ -125,12 +125,12 @@ def test_basic_workflow(self): sys.path.append(test_data_dir) import basic tmpdir = tempfile.mkdtemp() - package_path = os.path.join(tmpdir, 'workflow.tar.gz') + package_path = os.path.join(tmpdir, 'workflow.zip') try: compiler.Compiler().compile(basic.save_most_frequent_word, package_path) with open(os.path.join(test_data_dir, 'basic.yaml'), 'r') as f: golden = yaml.load(f) - compiled = self._get_yaml_from_tar(package_path) + compiled = self._get_yaml_from_zip(package_path) self.maxDiff = None # Comment next line for generating golden yaml. @@ -149,15 +149,15 @@ def test_composing_workflow(self): tmpdir = tempfile.mkdtemp() try: # First make sure the simple pipeline can be compiled. - simple_package_path = os.path.join(tmpdir, 'simple.tar.gz') + simple_package_path = os.path.join(tmpdir, 'simple.zip') compiler.Compiler().compile(compose.save_most_frequent_word, simple_package_path) # Then make sure the composed pipeline can be compiled and also compare with golden. - compose_package_path = os.path.join(tmpdir, 'compose.tar.gz') + compose_package_path = os.path.join(tmpdir, 'compose.zip') compiler.Compiler().compile(compose.download_save_most_frequent_word, compose_package_path) with open(os.path.join(test_data_dir, 'compose.yaml'), 'r') as f: golden = yaml.load(f) - compiled = self._get_yaml_from_tar(compose_package_path) + compiled = self._get_yaml_from_zip(compose_package_path) self.maxDiff = None # Comment next line for generating golden yaml. @@ -177,14 +177,14 @@ def test_package_compile(self): try: os.chdir(test_package_dir) subprocess.check_call(['python3', 'setup.py', 'sdist', '--format=gztar', '-d', tmpdir]) - package_path = os.path.join(tmpdir, 'testsample-0.1.tar.gz') - target_tar = os.path.join(tmpdir, 'compose.tar.gz') + package_path = os.path.join(tmpdir, 'testsample-0.1.zip') + target_zip = os.path.join(tmpdir, 'compose.zip') subprocess.check_call([ 'dsl-compile', '--package', package_path, '--namespace', 'mypipeline', - '--output', target_tar, '--function', 'download_save_most_frequent_word']) + '--output', target_zip, '--function', 'download_save_most_frequent_word']) with open(os.path.join(test_data_dir, 'compose.yaml'), 'r') as f: golden = yaml.load(f) - compiled = self._get_yaml_from_tar(target_tar) + compiled = self._get_yaml_from_zip(target_zip) self.maxDiff = None self.assertEqual(golden, compiled) @@ -197,12 +197,12 @@ def _test_py_compile(self, file_base_name): py_file = os.path.join(test_data_dir, file_base_name + '.py') tmpdir = tempfile.mkdtemp() try: - target_tar = os.path.join(tmpdir, file_base_name + '.tar.gz') + target_zip = os.path.join(tmpdir, file_base_name + '.zip') subprocess.check_call([ - 'dsl-compile', '--py', py_file, '--output', target_tar]) + 'dsl-compile', '--py', py_file, '--output', target_zip]) with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f: golden = yaml.load(f) - compiled = self._get_yaml_from_tar(target_tar) + compiled = self._get_yaml_from_zip(target_zip) self.maxDiff = None self.assertEqual(golden, compiled) From cafb694760c8b48fa1c2bbb71fbeeec888646d05 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Feb 2019 11:41:02 -0800 Subject: [PATCH 2/9] minor fix --- sdk/python/tests/compiler/compiler_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 68a03b39a64..d17a7e65aa1 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -177,7 +177,7 @@ def test_package_compile(self): try: os.chdir(test_package_dir) subprocess.check_call(['python3', 'setup.py', 'sdist', '--format=gztar', '-d', tmpdir]) - package_path = os.path.join(tmpdir, 'testsample-0.1.zip') + package_path = os.path.join(tmpdir, 'testsample-0.1.tar.gz') target_zip = os.path.join(tmpdir, 'compose.zip') subprocess.check_call([ 'dsl-compile', '--package', package_path, '--namespace', 'mypipeline', From 828abbdbc343abe5531721c926f98c621ae7fae5 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Feb 2019 12:13:59 -0800 Subject: [PATCH 3/9] fix zip read in the unit test --- sdk/python/tests/compiler/compiler_tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index d17a7e65aa1..7afd7ec0933 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -116,7 +116,8 @@ def test_operator_to_template(self): def _get_yaml_from_zip(self, zip_file): with zipfile.ZipFile(zip_file, 'r') as zip: - return yaml.load(zip.extract(zip.namelist()[0])) + with open(zip.extract(zip.namelist()[0]), 'r') as yaml_file: + return yaml.load(yaml_file) def test_basic_workflow(self): """Test compiling a basic workflow.""" From 6877af426a9b98572bda0ff7ef243e893c0682f2 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Feb 2019 13:15:19 -0800 Subject: [PATCH 4/9] update sample tests --- test/sample-test/run_test.sh | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/test/sample-test/run_test.sh b/test/sample-test/run_test.sh index 7a4255a355e..be87ddec7af 100755 --- a/test/sample-test/run_test.sh +++ b/test/sample-test/run_test.sh @@ -153,10 +153,10 @@ if [ "$TEST_NAME" == 'tf-training' ]; then sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" kubeflow-training-classification.py fi - dsl-compile --py kubeflow-training-classification.py --output kubeflow-training-classification.tar.gz + dsl-compile --py kubeflow-training-classification.py --output kubeflow-training-classification.zip cd "${TEST_DIR}" - python3 run_kubeflow_test.py --input ${BASE_DIR}/samples/kubeflow-tf/kubeflow-training-classification.tar.gz --result $SAMPLE_KUBEFLOW_TEST_RESULT --output $SAMPLE_KUBEFLOW_TEST_OUTPUT --namespace ${NAMESPACE} + python3 run_kubeflow_test.py --input ${BASE_DIR}/samples/kubeflow-tf/kubeflow-training-classification.zip --result $SAMPLE_KUBEFLOW_TEST_RESULT --output $SAMPLE_KUBEFLOW_TEST_OUTPUT --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_KUBEFLOW_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_KUBEFLOW_TEST_RESULT} @@ -178,9 +178,9 @@ elif [ "$TEST_NAME" == "tfx" ]; then sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline.py fi - dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz + dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.zip cd "${TEST_DIR}" - python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline.tar.gz --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE} + python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline.zip --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_TFX_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_TFX_TEST_RESULT} elif [ "$TEST_NAME" == "sequential" ]; then @@ -189,10 +189,10 @@ elif [ "$TEST_NAME" == "sequential" ]; then # Compile samples cd ${BASE_DIR}/samples/basic - dsl-compile --py sequential.py --output sequential.tar.gz + dsl-compile --py sequential.py --output sequential.zip cd "${TEST_DIR}" - python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/sequential.tar.gz --result $SAMPLE_SEQUENTIAL_TEST_RESULT --output $SAMPLE_SEQUENTIAL_TEST_OUTPUT --testname sequential --namespace ${NAMESPACE} + python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/sequential.zip --result $SAMPLE_SEQUENTIAL_TEST_RESULT --output $SAMPLE_SEQUENTIAL_TEST_OUTPUT --testname sequential --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_SEQUENTIAL_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_SEQUENTIAL_TEST_RESULT} @@ -202,10 +202,10 @@ elif [ "$TEST_NAME" == "condition" ]; then # Compile samples cd ${BASE_DIR}/samples/basic - dsl-compile --py condition.py --output condition.tar.gz + dsl-compile --py condition.py --output condition.zip cd "${TEST_DIR}" - python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/condition.tar.gz --result $SAMPLE_CONDITION_TEST_RESULT --output $SAMPLE_CONDITION_TEST_OUTPUT --testname condition --namespace ${NAMESPACE} + python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/condition.zip --result $SAMPLE_CONDITION_TEST_RESULT --output $SAMPLE_CONDITION_TEST_OUTPUT --testname condition --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_CONDITION_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_CONDITION_TEST_RESULT} @@ -215,10 +215,10 @@ elif [ "$TEST_NAME" == "exithandler" ]; then # Compile samples cd ${BASE_DIR}/samples/basic - dsl-compile --py exit_handler.py --output exit_handler.tar.gz + dsl-compile --py exit_handler.py --output exit_handler.zip cd "${TEST_DIR}" - python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/exit_handler.tar.gz --result $SAMPLE_EXIT_HANDLER_TEST_RESULT --output $SAMPLE_EXIT_HANDLER_TEST_OUTPUT --testname exithandler --namespace ${NAMESPACE} + python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/exit_handler.zip --result $SAMPLE_EXIT_HANDLER_TEST_RESULT --output $SAMPLE_EXIT_HANDLER_TEST_OUTPUT --testname exithandler --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_EXIT_HANDLER_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_EXIT_HANDLER_TEST_RESULT} @@ -228,10 +228,10 @@ elif [ "$TEST_NAME" == "immediatevalue" ]; then # Compile samples cd ${BASE_DIR}/samples/basic - dsl-compile --py immediate_value.py --output immediate_value.tar.gz + dsl-compile --py immediate_value.py --output immediate_value.zip cd "${TEST_DIR}" - python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/immediate_value.tar.gz --result $SAMPLE_IMMEDIATE_VALUE_TEST_RESULT --output $SAMPLE_IMMEDIATE_VALUE_TEST_OUTPUT --testname immediatevalue --namespace ${NAMESPACE} + python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/immediate_value.zip --result $SAMPLE_IMMEDIATE_VALUE_TEST_RESULT --output $SAMPLE_IMMEDIATE_VALUE_TEST_OUTPUT --testname immediatevalue --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_IMMEDIATE_VALUE_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_IMMEDIATE_VALUE_TEST_RESULT} @@ -241,10 +241,10 @@ elif [ "$TEST_NAME" == "paralleljoin" ]; then # Compile samples cd ${BASE_DIR}/samples/basic - dsl-compile --py parallel_join.py --output parallel_join.tar.gz + dsl-compile --py parallel_join.py --output parallel_join.zip cd "${TEST_DIR}" - python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/parallel_join.tar.gz --result $SAMPLE_PARALLEL_JOIN_TEST_RESULT --output $SAMPLE_PARALLEL_JOIN_TEST_OUTPUT --testname paralleljoin --namespace ${NAMESPACE} + python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/parallel_join.zip --result $SAMPLE_PARALLEL_JOIN_TEST_RESULT --output $SAMPLE_PARALLEL_JOIN_TEST_OUTPUT --testname paralleljoin --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_PARALLEL_JOIN_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_PARALLEL_JOIN_TEST_RESULT} @@ -265,10 +265,10 @@ elif [ "$TEST_NAME" == "xgboost" ]; then sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" xgboost-training-cm.py sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" xgboost-training-cm.py fi - dsl-compile --py xgboost-training-cm.py --output xgboost-training-cm.tar.gz + dsl-compile --py xgboost-training-cm.py --output xgboost-training-cm.zip cd "${TEST_DIR}" - python3 run_xgboost_test.py --input ${BASE_DIR}/samples/xgboost-spark/xgboost-training-cm.tar.gz --result $SAMPLE_XGBOOST_TEST_RESULT --output $SAMPLE_XGBOOST_TEST_OUTPUT --namespace ${NAMESPACE} + python3 run_xgboost_test.py --input ${BASE_DIR}/samples/xgboost-spark/xgboost-training-cm.zip --result $SAMPLE_XGBOOST_TEST_RESULT --output $SAMPLE_XGBOOST_TEST_OUTPUT --namespace ${NAMESPACE} echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/" gsutil cp ${SAMPLE_XGBOOST_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_XGBOOST_TEST_RESULT} From 8187f52e1598d56362b9a93b7e2e7578e2c4193d Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Mar 2019 16:23:53 -0700 Subject: [PATCH 5/9] dsl compiler generates pipeline based on the input name suffix --- sdk/python/kfp/compiler/compiler.py | 30 ++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 6beb21ebeaf..a5fce5de187 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -590,17 +590,25 @@ def compile(self, pipeline_func, package_path, type_check=False): yaml.Dumper.ignore_aliases = lambda *args : True yaml_text = yaml.dump(workflow, default_flow_style=False) - from contextlib import closing - from io import BytesIO - with tarfile.open(package_path, "w:gz") as tar: - with closing(BytesIO(yaml_text.encode())) as yaml_file: - tarinfo = tarfile.TarInfo('pipeline.yaml') - tarinfo.size = len(yaml_file.getvalue()) - tar.addfile(tarinfo, fileobj=yaml_file) + if package_path.endswith('.tar.gz') or package_path.endswith('.tgz'): + from contextlib import closing + from io import BytesIO + with tarfile.open(package_path, "w:gz") as tar: + with closing(BytesIO(yaml_text.encode())) as yaml_file: + tarinfo = tarfile.TarInfo('pipeline.yaml') + tarinfo.size = len(yaml_file.getvalue()) + tar.addfile(tarinfo, fileobj=yaml_file) + elif package_path.endswith('.zip'): + with zipfile.ZipFile(package_path, "w") as zip: + zipinfo = zipfile.ZipInfo('pipeline.yaml') + zipinfo.compress_type = zipfile.ZIP_DEFLATED + zip.writestr(zipinfo, yaml_text) + elif package_path.endswith('.yaml') or package_path.endswith('.yml'): + with open(package_path, 'w') as yaml_file: + yaml_file.write(yaml_text) + else: + raise ValueError('The output path '+ package_path + ' should ends with one of the following formats: [.tar.gz, .tgz, .zip, .yaml, .yml]') finally: kfp.TYPE_CHECK = type_check_old_value - # with zipfile.ZipFile(package_path, "w") as zip: - # zipinfo = zipfile.ZipInfo('pipeline.yaml') - # zipinfo.compress_type = zipfile.ZIP_DEFLATED - # zip.writestr(zipinfo, yaml_text) + From f5f7e35dfd0c44d697d1feab78f6be0fd740d2d8 Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Mar 2019 17:21:36 -0700 Subject: [PATCH 6/9] add unit tests for different output format --- sdk/python/tests/compiler/compiler_tests.py | 57 ++++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 14fd63fdb10..57e18901f4e 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -19,6 +19,7 @@ import subprocess import sys import zipfile +import tarfile import tempfile import unittest import yaml @@ -123,6 +124,10 @@ def _get_yaml_from_zip(self, zip_file): with open(zip.extract(zip.namelist()[0]), 'r') as yaml_file: return yaml.load(yaml_file) + def _get_yaml_from_tar(self, tar_file): + with tarfile.open(tar_file, 'r:gz') as tar: + return yaml.load(tar.extractfile(tar.getmembers()[0])) + def test_basic_workflow(self): """Test compiling a basic workflow.""" @@ -197,7 +202,7 @@ def test_package_compile(self): shutil.rmtree(tmpdir) os.chdir(cwd) - def _test_py_compile(self, file_base_name): + def _test_py_compile_zip(self, file_base_name): test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') py_file = os.path.join(test_data_dir, file_base_name + '.py') tmpdir = tempfile.mkdtemp() @@ -214,33 +219,69 @@ def _test_py_compile(self, file_base_name): finally: shutil.rmtree(tmpdir) + def _test_py_compile_targz(self, file_base_name): + test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') + py_file = os.path.join(test_data_dir, file_base_name + '.py') + tmpdir = tempfile.mkdtemp() + try: + target_tar = os.path.join(tmpdir, file_base_name + '.tar.gz') + subprocess.check_call([ + 'dsl-compile', '--py', py_file, '--output', target_tar]) + with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f: + golden = yaml.load(f) + compiled = self._get_yaml_from_tar(target_tar) + + self.maxDiff = None + self.assertEqual(golden, compiled) + finally: + shutil.rmtree(tmpdir) + + def _test_py_compile_yaml(self, file_base_name): + test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') + py_file = os.path.join(test_data_dir, file_base_name + '.py') + tmpdir = tempfile.mkdtemp() + try: + target_yaml = os.path.join(tmpdir, file_base_name + '-pipeline.yaml') + subprocess.check_call([ + 'dsl-compile', '--py', py_file, '--output', target_yaml]) + with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f: + golden = yaml.load(f) + + with open(os.path.join(test_data_dir, target_yaml), 'r') as f: + compiled = yaml.load(f) + + self.maxDiff = None + self.assertEqual(golden, compiled) + finally: + shutil.rmtree(tmpdir) + def test_py_compile_basic(self): """Test basic sequential pipeline.""" - self._test_py_compile('basic') + self._test_py_compile_zip('basic') def test_py_compile_condition(self): """Test a pipeline with conditions.""" - self._test_py_compile('coin') + self._test_py_compile_zip('coin') def test_py_compile_immediate_value(self): """Test a pipeline with immediate value parameter.""" - self._test_py_compile('immediate_value') + self._test_py_compile_targz('immediate_value') def test_py_compile_default_value(self): """Test a pipeline with a parameter with default value.""" - self._test_py_compile('default_value') + self._test_py_compile_targz('default_value') def test_py_volume(self): """Test a pipeline with a volume and volume mount.""" - self._test_py_compile('volume') + self._test_py_compile_yaml('volume') def test_py_retry(self): """Test retry functionality.""" - self._test_py_compile('retry') + self._test_py_compile_yaml('retry') def test_py_image_pull_secret(self): """Test pipeline imagepullsecret.""" - self._test_py_compile('imagepullsecret') + self._test_py_compile_yaml('imagepullsecret') def test_type_checking_with_consistent_types(self): """Test type check pipeline parameters against component metadata.""" From f8b7e2a76a398e4b822cc842f1c3280cce9bc81d Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Mon, 25 Mar 2019 18:23:48 -0700 Subject: [PATCH 7/9] update the sdk client to support tar zip and yaml --- sdk/python/kfp/_client.py | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/sdk/python/kfp/_client.py b/sdk/python/kfp/_client.py index e8c0c56c161..aafad8ab9b3 100644 --- a/sdk/python/kfp/_client.py +++ b/sdk/python/kfp/_client.py @@ -18,6 +18,7 @@ import json import os import tarfile +import zipfile import yaml from datetime import datetime @@ -165,18 +166,36 @@ def get_experiment(self, experiment_id=None, experiment_name=None): return self._experiment_api.get_experiment(id=experiment.id) raise ValueError('No experiment is found with name {}.'.format(experiment_name)) - def _extract_pipeline_yaml(self, tar_file): - with tarfile.open(tar_file, "r:gz") as tar: - all_yaml_files = [m for m in tar if m.isfile() and - (os.path.splitext(m.name)[-1] == '.yaml' or os.path.splitext(m.name)[-1] == '.yml')] - if len(all_yaml_files) == 0: - raise ValueError('Invalid package. Missing pipeline yaml file in the package.') + def _extract_pipeline_yaml(self, package_file): + if package_file.endswith('.tar.gz') or package_file.endswith('.tgz'): + with tarfile.open(package_file, "r:gz") as tar: + all_yaml_files = [m for m in tar if m.isfile() and + (os.path.splitext(m.name)[-1] == '.yaml' or os.path.splitext(m.name)[-1] == '.yml')] + if len(all_yaml_files) == 0: + raise ValueError('Invalid package. Missing pipeline yaml file in the package.') - if len(all_yaml_files) > 1: - raise ValueError('Invalid package. Multiple yaml files in the package.') + if len(all_yaml_files) > 1: + raise ValueError('Invalid package. Multiple yaml files in the package.') - with tar.extractfile(all_yaml_files[0]) as f: + with tar.extractfile(all_yaml_files[0]) as f: + return yaml.load(f) + elif package_file.endswith('.zip'): + with zipfile.ZipFile(package_file, 'r') as zip: + all_yaml_files = [m for m in zip.namelist() if + (os.path.splitext(m)[-1] == '.yaml' or os.path.splitext(m)[-1] == '.yml')] + if len(all_yaml_files) == 0: + raise ValueError('Invalid package. Missing pipeline yaml file in the package.') + + if len(all_yaml_files) > 1: + raise ValueError('Invalid package. Multiple yaml files in the package.') + + with zip.extractfile(all_yaml_files[0]) as f: + return yaml.load(f) + elif package_file.endswith('.yaml') or package_file.endswith('.yml'): + with open(package_file, 'r') as f: return yaml.load(f) + else: + raise ValueError('The package_file '+ package_file + ' should ends with one of the following formats: [.tar.gz, .tgz, .zip, .yaml, .yml]') def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, params={}, pipeline_id=None): """Run a specified pipeline. @@ -184,7 +203,7 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path=None, para Args: experiment_id: The string id of an experiment. job_name: name of the job. - pipeline_package_path: local path of the pipeline package(tar.gz file). + pipeline_package_path: local path of the pipeline package(the filename should end with one of the following .tar.gz, .tgz, .zip, .yaml, .yml). params: a dictionary with key (string) as param name and value (string) as as param value. pipeline_id: the string ID of a pipeline. From 5ea9b39494eda5814384b5b0a62eccbf85e9648f Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 26 Mar 2019 10:07:25 -0700 Subject: [PATCH 8/9] fix typo --- sdk/python/kfp/_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/kfp/_client.py b/sdk/python/kfp/_client.py index aafad8ab9b3..d52aa59b6d0 100644 --- a/sdk/python/kfp/_client.py +++ b/sdk/python/kfp/_client.py @@ -189,7 +189,7 @@ def _extract_pipeline_yaml(self, package_file): if len(all_yaml_files) > 1: raise ValueError('Invalid package. Multiple yaml files in the package.') - with zip.extractfile(all_yaml_files[0]) as f: + with zip.extract(all_yaml_files[0]) as f: return yaml.load(f) elif package_file.endswith('.yaml') or package_file.endswith('.yml'): with open(package_file, 'r') as f: From d7eed55257a74f5b15acc27ecc8be3c912967abd Mon Sep 17 00:00:00 2001 From: Ning Gao Date: Tue, 26 Mar 2019 12:03:08 -0700 Subject: [PATCH 9/9] fix file write --- sdk/python/kfp/_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/python/kfp/_client.py b/sdk/python/kfp/_client.py index d52aa59b6d0..4e0f44fe74e 100644 --- a/sdk/python/kfp/_client.py +++ b/sdk/python/kfp/_client.py @@ -189,7 +189,8 @@ def _extract_pipeline_yaml(self, package_file): if len(all_yaml_files) > 1: raise ValueError('Invalid package. Multiple yaml files in the package.') - with zip.extract(all_yaml_files[0]) as f: + filename = zip.extract(all_yaml_files[0]) + with open(filename, 'r') as f: return yaml.load(f) elif package_file.endswith('.yaml') or package_file.endswith('.yml'): with open(package_file, 'r') as f: