Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dsl generate zip file #855

Merged
merged 10 commits into from
Mar 26, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/basic/condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ def flipcoin():

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(flipcoin, __file__ + '.tar.gz')
compiler.Compiler().compile(flipcoin, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/basic/exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ def download_and_print(url='gs://ml-pipeline-playground/shakespeare1.txt'):

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(download_and_print, __file__ + '.tar.gz')
compiler.Compiler().compile(download_and_print, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/basic/immediate_value.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ def immediate_value_pipeline():

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(immediate_value_pipeline, __file__ + '.tar.gz')
compiler.Compiler().compile(immediate_value_pipeline, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/basic/parallel_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ def download_and_join(

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(download_and_join, __file__ + '.tar.gz')
compiler.Compiler().compile(download_and_join, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/basic/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ def retry_sample_pipeline():

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz')
compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/basic/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ def sequential_pipeline(url='gs://ml-pipeline-playground/shakespeare1.txt'):

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(sequential_pipeline, __file__ + '.tar.gz')
compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/ibm-samples/ffdl-seldon/ffdl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ def ffdlPipeline(

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(ffdlPipeline, __file__ + '.tar.gz')
compiler.Compiler().compile(ffdlPipeline, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/ibm-samples/watson/watson_train_serve_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ def kfp_wml_pipeline(
if __name__ == '__main__':
# compile the pipeline
import kfp.compiler as compiler
pipeline_filename = kfp_wml_pipeline.__name__ + '.tar.gz'
pipeline_filename = kfp_wml_pipeline.__name__ + '.zip'
compiler.Compiler().compile(kfp_wml_pipeline, pipeline_filename)
2 changes: 1 addition & 1 deletion samples/kubeflow-tf/kubeflow-training-classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@ def kubeflow_training(output, project,

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(kubeflow_training, __file__ + '.tar.gz')
compiler.Compiler().compile(kubeflow_training, __file__ + '.zip')
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,10 @@
"outputs": [],
"source": [
"# Compile it into a tar package.\n",
"compiler.Compiler().compile(taxi_cab_classification, 'tfx.tar.gz')\n",
"compiler.Compiler().compile(taxi_cab_classification, 'tfx.zip')\n",
"\n",
"# Submit a run.\n",
"run = client.run_pipeline(exp.id, 'tfx', 'tfx.tar.gz',\n",
"run = client.run_pipeline(exp.id, 'tfx', 'tfx.zip',\n",
" params={'output': OUTPUT_DIR,\n",
" 'project': PROJECT_NAME})"
]
Expand Down Expand Up @@ -572,9 +572,9 @@
"metadata": {},
"outputs": [],
"source": [
"compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx.tar.gz')\n",
"compiler.Compiler().compile(my_taxi_cab_classification, 'my-tfx.zip')\n",
"\n",
"run = client.run_pipeline(exp.id, 'my-tfx', 'my-tfx.tar.gz',\n",
"run = client.run_pipeline(exp.id, 'my-tfx', 'my-tfx.zip',\n",
" params={'output': OUTPUT_DIR,\n",
" 'project': PROJECT_NAME,\n",
" 'model': DEPLOYER_MODEL,\n",
Expand Down Expand Up @@ -633,7 +633,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.5.4rc1"
}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@
"outputs": [],
"source": [
"pipeline_func = calc_pipeline\n",
"pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n",
"pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'\n",
"import kfp.compiler as compiler\n",
"compiler.Compiler().compile(pipeline_func, pipeline_filename)"
]
Expand Down Expand Up @@ -278,7 +278,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
"version": "3.5.4rc1"
}
},
"nbformat": 4,
Expand Down
2 changes: 1 addition & 1 deletion samples/resnet-cmle/resnet-train-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,4 @@ def resnet_train(

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(resnet_train, __file__ + '.tar.gz')
compiler.Compiler().compile(resnet_train, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/tfx/taxi-cab-classification-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ def taxi_cab_classification(

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(taxi_cab_classification, __file__ + '.tar.gz')
compiler.Compiler().compile(taxi_cab_classification, __file__ + '.zip')
2 changes: 1 addition & 1 deletion samples/xgboost-spark/xgboost-training-cm.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,4 @@ def xgb_train_pipeline(

if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.tar.gz')
compiler.Compiler().compile(xgb_train_pipeline, __file__ + '.zip')
14 changes: 7 additions & 7 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import inspect
import re
import tarfile
import zipfile
import yaml

from .. import dsl
Expand Down Expand Up @@ -573,10 +574,9 @@ def compile(self, pipeline_func, package_path):
yaml.Dumper.ignore_aliases = lambda *args : True
yaml_text = yaml.dump(workflow, default_flow_style=False)

from contextlib import closing
from io import BytesIO
with tarfile.open(package_path, "w:gz") as tar:
with closing(BytesIO(yaml_text.encode())) as yaml_file:
tarinfo = tarfile.TarInfo('pipeline.yaml')
tarinfo.size = len(yaml_file.getvalue())
tar.addfile(tarinfo, fileobj=yaml_file)
with zipfile.ZipFile(package_path, "w") as zip:
zipinfo = zipfile.ZipInfo('pipeline.yaml')
zipinfo.compress_type = zipfile.ZIP_DEFLATED
zip.writestr(zipinfo, yaml_text)


31 changes: 16 additions & 15 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import shutil
import subprocess
import sys
import tarfile
import zipfile
import tempfile
import unittest
import yaml
Expand Down Expand Up @@ -114,9 +114,10 @@ def test_operator_to_template(self):
self.maxDiff = None
self.assertEqual(golden_output, compiler.Compiler()._op_to_template(op))

def _get_yaml_from_tar(self, tar_file):
with tarfile.open(tar_file, 'r:gz') as tar:
return yaml.load(tar.extractfile(tar.getmembers()[0]))
def _get_yaml_from_zip(self, zip_file):
with zipfile.ZipFile(zip_file, 'r') as zip:
with open(zip.extract(zip.namelist()[0]), 'r') as yaml_file:
return yaml.load(yaml_file)

def test_basic_workflow(self):
"""Test compiling a basic workflow."""
Expand All @@ -125,12 +126,12 @@ def test_basic_workflow(self):
sys.path.append(test_data_dir)
import basic
tmpdir = tempfile.mkdtemp()
package_path = os.path.join(tmpdir, 'workflow.tar.gz')
package_path = os.path.join(tmpdir, 'workflow.zip')
try:
compiler.Compiler().compile(basic.save_most_frequent_word, package_path)
with open(os.path.join(test_data_dir, 'basic.yaml'), 'r') as f:
golden = yaml.load(f)
compiled = self._get_yaml_from_tar(package_path)
compiled = self._get_yaml_from_zip(package_path)

self.maxDiff = None
# Comment next line for generating golden yaml.
Expand All @@ -149,15 +150,15 @@ def test_composing_workflow(self):
tmpdir = tempfile.mkdtemp()
try:
# First make sure the simple pipeline can be compiled.
simple_package_path = os.path.join(tmpdir, 'simple.tar.gz')
simple_package_path = os.path.join(tmpdir, 'simple.zip')
compiler.Compiler().compile(compose.save_most_frequent_word, simple_package_path)

# Then make sure the composed pipeline can be compiled and also compare with golden.
compose_package_path = os.path.join(tmpdir, 'compose.tar.gz')
compose_package_path = os.path.join(tmpdir, 'compose.zip')
compiler.Compiler().compile(compose.download_save_most_frequent_word, compose_package_path)
with open(os.path.join(test_data_dir, 'compose.yaml'), 'r') as f:
golden = yaml.load(f)
compiled = self._get_yaml_from_tar(compose_package_path)
compiled = self._get_yaml_from_zip(compose_package_path)

self.maxDiff = None
# Comment next line for generating golden yaml.
Expand All @@ -178,13 +179,13 @@ def test_package_compile(self):
os.chdir(test_package_dir)
subprocess.check_call(['python3', 'setup.py', 'sdist', '--format=gztar', '-d', tmpdir])
package_path = os.path.join(tmpdir, 'testsample-0.1.tar.gz')
target_tar = os.path.join(tmpdir, 'compose.tar.gz')
target_zip = os.path.join(tmpdir, 'compose.zip')
subprocess.check_call([
'dsl-compile', '--package', package_path, '--namespace', 'mypipeline',
'--output', target_tar, '--function', 'download_save_most_frequent_word'])
'--output', target_zip, '--function', 'download_save_most_frequent_word'])
with open(os.path.join(test_data_dir, 'compose.yaml'), 'r') as f:
golden = yaml.load(f)
compiled = self._get_yaml_from_tar(target_tar)
compiled = self._get_yaml_from_zip(target_zip)

self.maxDiff = None
self.assertEqual(golden, compiled)
Expand All @@ -197,12 +198,12 @@ def _test_py_compile(self, file_base_name):
py_file = os.path.join(test_data_dir, file_base_name + '.py')
tmpdir = tempfile.mkdtemp()
try:
target_tar = os.path.join(tmpdir, file_base_name + '.tar.gz')
target_zip = os.path.join(tmpdir, file_base_name + '.zip')
subprocess.check_call([
'dsl-compile', '--py', py_file, '--output', target_tar])
'dsl-compile', '--py', py_file, '--output', target_zip])
with open(os.path.join(test_data_dir, file_base_name + '.yaml'), 'r') as f:
golden = yaml.load(f)
compiled = self._get_yaml_from_tar(target_tar)
compiled = self._get_yaml_from_zip(target_zip)

self.maxDiff = None
self.assertEqual(golden, compiled)
Expand Down
32 changes: 16 additions & 16 deletions test/sample-test/run_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ if [ "$TEST_NAME" == 'tf-training' ]; then
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" kubeflow-training-classification.py
fi

dsl-compile --py kubeflow-training-classification.py --output kubeflow-training-classification.tar.gz
dsl-compile --py kubeflow-training-classification.py --output kubeflow-training-classification.zip

cd "${TEST_DIR}"
python3 run_kubeflow_test.py --input ${BASE_DIR}/samples/kubeflow-tf/kubeflow-training-classification.tar.gz --result $SAMPLE_KUBEFLOW_TEST_RESULT --output $SAMPLE_KUBEFLOW_TEST_OUTPUT --namespace ${NAMESPACE}
python3 run_kubeflow_test.py --input ${BASE_DIR}/samples/kubeflow-tf/kubeflow-training-classification.zip --result $SAMPLE_KUBEFLOW_TEST_RESULT --output $SAMPLE_KUBEFLOW_TEST_OUTPUT --namespace ${NAMESPACE}

echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_KUBEFLOW_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_KUBEFLOW_TEST_RESULT}
Expand All @@ -178,9 +178,9 @@ elif [ "$TEST_NAME" == "tfx" ]; then
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" taxi-cab-classification-pipeline.py
fi

dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.tar.gz
dsl-compile --py taxi-cab-classification-pipeline.py --output taxi-cab-classification-pipeline.zip
cd "${TEST_DIR}"
python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline.tar.gz --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE}
python3 run_tfx_test.py --input ${BASE_DIR}/samples/tfx/taxi-cab-classification-pipeline.zip --result $SAMPLE_TFX_TEST_RESULT --output $SAMPLE_TFX_TEST_OUTPUT --namespace ${NAMESPACE}
echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_TFX_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_TFX_TEST_RESULT}
elif [ "$TEST_NAME" == "sequential" ]; then
Expand All @@ -189,10 +189,10 @@ elif [ "$TEST_NAME" == "sequential" ]; then

# Compile samples
cd ${BASE_DIR}/samples/basic
dsl-compile --py sequential.py --output sequential.tar.gz
dsl-compile --py sequential.py --output sequential.zip

cd "${TEST_DIR}"
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/sequential.tar.gz --result $SAMPLE_SEQUENTIAL_TEST_RESULT --output $SAMPLE_SEQUENTIAL_TEST_OUTPUT --testname sequential --namespace ${NAMESPACE}
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/sequential.zip --result $SAMPLE_SEQUENTIAL_TEST_RESULT --output $SAMPLE_SEQUENTIAL_TEST_OUTPUT --testname sequential --namespace ${NAMESPACE}

echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_SEQUENTIAL_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_SEQUENTIAL_TEST_RESULT}
Expand All @@ -202,10 +202,10 @@ elif [ "$TEST_NAME" == "condition" ]; then

# Compile samples
cd ${BASE_DIR}/samples/basic
dsl-compile --py condition.py --output condition.tar.gz
dsl-compile --py condition.py --output condition.zip

cd "${TEST_DIR}"
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/condition.tar.gz --result $SAMPLE_CONDITION_TEST_RESULT --output $SAMPLE_CONDITION_TEST_OUTPUT --testname condition --namespace ${NAMESPACE}
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/condition.zip --result $SAMPLE_CONDITION_TEST_RESULT --output $SAMPLE_CONDITION_TEST_OUTPUT --testname condition --namespace ${NAMESPACE}

echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_CONDITION_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_CONDITION_TEST_RESULT}
Expand All @@ -215,10 +215,10 @@ elif [ "$TEST_NAME" == "exithandler" ]; then

# Compile samples
cd ${BASE_DIR}/samples/basic
dsl-compile --py exit_handler.py --output exit_handler.tar.gz
dsl-compile --py exit_handler.py --output exit_handler.zip

cd "${TEST_DIR}"
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/exit_handler.tar.gz --result $SAMPLE_EXIT_HANDLER_TEST_RESULT --output $SAMPLE_EXIT_HANDLER_TEST_OUTPUT --testname exithandler --namespace ${NAMESPACE}
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/exit_handler.zip --result $SAMPLE_EXIT_HANDLER_TEST_RESULT --output $SAMPLE_EXIT_HANDLER_TEST_OUTPUT --testname exithandler --namespace ${NAMESPACE}

echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_EXIT_HANDLER_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_EXIT_HANDLER_TEST_RESULT}
Expand All @@ -228,10 +228,10 @@ elif [ "$TEST_NAME" == "immediatevalue" ]; then

# Compile samples
cd ${BASE_DIR}/samples/basic
dsl-compile --py immediate_value.py --output immediate_value.tar.gz
dsl-compile --py immediate_value.py --output immediate_value.zip

cd "${TEST_DIR}"
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/immediate_value.tar.gz --result $SAMPLE_IMMEDIATE_VALUE_TEST_RESULT --output $SAMPLE_IMMEDIATE_VALUE_TEST_OUTPUT --testname immediatevalue --namespace ${NAMESPACE}
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/immediate_value.zip --result $SAMPLE_IMMEDIATE_VALUE_TEST_RESULT --output $SAMPLE_IMMEDIATE_VALUE_TEST_OUTPUT --testname immediatevalue --namespace ${NAMESPACE}

echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_IMMEDIATE_VALUE_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_IMMEDIATE_VALUE_TEST_RESULT}
Expand All @@ -241,10 +241,10 @@ elif [ "$TEST_NAME" == "paralleljoin" ]; then

# Compile samples
cd ${BASE_DIR}/samples/basic
dsl-compile --py parallel_join.py --output parallel_join.tar.gz
dsl-compile --py parallel_join.py --output parallel_join.zip

cd "${TEST_DIR}"
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/parallel_join.tar.gz --result $SAMPLE_PARALLEL_JOIN_TEST_RESULT --output $SAMPLE_PARALLEL_JOIN_TEST_OUTPUT --testname paralleljoin --namespace ${NAMESPACE}
python3 run_basic_test.py --input ${BASE_DIR}/samples/basic/parallel_join.zip --result $SAMPLE_PARALLEL_JOIN_TEST_RESULT --output $SAMPLE_PARALLEL_JOIN_TEST_OUTPUT --testname paralleljoin --namespace ${NAMESPACE}

echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_PARALLEL_JOIN_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_PARALLEL_JOIN_TEST_RESULT}
Expand All @@ -265,10 +265,10 @@ elif [ "$TEST_NAME" == "xgboost" ]; then
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-roc:\([a-zA-Z0-9_.-]\)\+|${LOCAL_ROC_IMAGE}|g" xgboost-training-cm.py
sed -i -e "s|gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:\([a-zA-Z0-9_.-]\)\+|${LOCAL_CONFUSIONMATRIX_IMAGE}|g" xgboost-training-cm.py
fi
dsl-compile --py xgboost-training-cm.py --output xgboost-training-cm.tar.gz
dsl-compile --py xgboost-training-cm.py --output xgboost-training-cm.zip

cd "${TEST_DIR}"
python3 run_xgboost_test.py --input ${BASE_DIR}/samples/xgboost-spark/xgboost-training-cm.tar.gz --result $SAMPLE_XGBOOST_TEST_RESULT --output $SAMPLE_XGBOOST_TEST_OUTPUT --namespace ${NAMESPACE}
python3 run_xgboost_test.py --input ${BASE_DIR}/samples/xgboost-spark/xgboost-training-cm.zip --result $SAMPLE_XGBOOST_TEST_RESULT --output $SAMPLE_XGBOOST_TEST_OUTPUT --namespace ${NAMESPACE}

echo "Copy the test results to GCS ${RESULTS_GCS_DIR}/"
gsutil cp ${SAMPLE_XGBOOST_TEST_RESULT} ${RESULTS_GCS_DIR}/${SAMPLE_XGBOOST_TEST_RESULT}
Expand Down