Skip to content

Commit

Permalink
Fix special compilation cases for kfp samples (#91)
Browse files Browse the repository at this point in the history
* fix test cases

* add eof newline

* fix file restructure

* Generalize for special cases
Generalize for any nested pipeline or pipeline without a decorator as long as there is a config file with the necessary information to compile the pipelines

* Add more clear error throwing
also fix test_kfp_samples_report.txt

* Check for special pipelines from config file

* Add comment
  • Loading branch information
drewbutlerbb4 authored Apr 20, 2020
1 parent ebe05e3 commit 6aa7ed3
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 56 deletions.
105 changes: 55 additions & 50 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,59 +382,64 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli

# Generate pipelinerun if generate-pipelinerun flag is enabled
# The base templete is generated first and then insert optional parameters.
if self.generate_pipelinerun:
pipelinerun = {
'apiVersion': tekton_api_version,
'kind': 'PipelineRun',
'metadata': {
'name': pipeline_template['metadata']['name'] + '-run'
},
'spec': {
'params': [{
'name': p['name'],
'value': p.get('default', '')
} for p in pipeline_template['spec']['params']
],
'pipelineRef': {
'name': pipeline_template['metadata']['name']
# Wrapped in a try catch for when this method is called directly (e.g. there is no pipeline decorator)
try:
if self.generate_pipelinerun:
pipelinerun = {
'apiVersion': tekton_api_version,
'kind': 'PipelineRun',
'metadata': {
'name': pipeline_template['metadata']['name'] + '-run'
},
'spec': {
'params': [{
'name': p['name'],
'value': p.get('default', '')
} for p in pipeline_template['spec']['params']
],
'pipelineRef': {
'name': pipeline_template['metadata']['name']
}
}
}
}


pod_template = {}
for task in task_refs:
op = pipeline.ops.get(task['name'])
if op.affinity:
pod_template['affinity'] = convert_k8s_obj_to_json(op.affinity)
if op.tolerations:
pod_template['tolerations'] = pod_template.get('tolerations', []) + op.tolerations
if op.node_selector:
pod_template['nodeSelector'] = op.node_selector

if pod_template:
pipelinerun['spec']['podtemplate'] = pod_template

# add workflow level timeout to pipeline run
if pipeline_conf.timeout:
pipelinerun['spec']['timeout'] = '%ds' % pipeline_conf.timeout

# generate the Tekton service account template
service_template = {}
if len(pipeline_conf.image_pull_secrets) > 0:
service_template = {
'apiVersion': 'v1',
'kind': 'ServiceAccount',
'metadata': {'name': pipelinerun['metadata']['name'] + '-sa'}
}
for image_pull_secret in pipeline_conf.image_pull_secrets:
service_template['imagePullSecrets'] = [{'name': image_pull_secret.name}]

if service_template:
workflow = workflow + [service_template]
pipelinerun['spec']['serviceAccountName'] = service_template['metadata']['name']

workflow = workflow + [pipelinerun]
pod_template = {}
for task in task_refs:
op = pipeline.ops.get(task['name'])
if op.affinity:
pod_template['affinity'] = convert_k8s_obj_to_json(op.affinity)
if op.tolerations:
pod_template['tolerations'] = pod_template.get('tolerations', []) + op.tolerations
if op.node_selector:
pod_template['nodeSelector'] = op.node_selector

if pod_template:
pipelinerun['spec']['podtemplate'] = pod_template

# add workflow level timeout to pipeline run
if pipeline_conf.timeout:
pipelinerun['spec']['timeout'] = '%ds' % pipeline_conf.timeout

# generate the Tekton service account template
service_template = {}
if len(pipeline_conf.image_pull_secrets) > 0:
service_template = {
'apiVersion': 'v1',
'kind': 'ServiceAccount',
'metadata': {'name': pipelinerun['metadata']['name'] + '-sa'}
}
for image_pull_secret in pipeline_conf.image_pull_secrets:
service_template['imagePullSecrets'] = [{'name': image_pull_secret.name}]

if service_template:
workflow = workflow + [service_template]
pipelinerun['spec']['serviceAccountName'] = service_template['metadata']['name']

workflow = workflow + [pipelinerun]
except:
# Intentionally do nothing for when _create_pipeline_workflow is called directly (e.g. in the case of there
# being no pipeline decorator) and self.generate_pipeline is not set
pass

# Use regex to replace all the Argo variables to Tekton variables. For variables that are unique to Argo,
# we raise an Error to alert users about the unsupported variables. Here is the list of Argo variables.
Expand Down
13 changes: 13 additions & 0 deletions sdk/python/tests/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pipeline: compose.py
type: nested
components:
- name: save_most_frequent_word
- name: download_save_most_frequent_word
---
pipeline: basic_no_decorator.py
type: no_decorator
components:
function: save_most_frequent_word
name: 'Save Most Frequent'
description: 'Get Most Frequent Word and Save to GCS'
paramsList: ["message_param", "output_path_param"]
17 changes: 13 additions & 4 deletions sdk/python/tests/test_kfp_samples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ KFP_TESTDATA_DIR="${KFP_CLONE_DIR}/sdk/python/tests/compiler/testdata"
TEKTON_COMPILED_YAML_DIR="${TEMP_DIR}/tekton_compiler_output"
COMPILE_REPORT_FILE="${PROJECT_DIR}/sdk/python/tests/test_kfp_samples_report.txt"
COMPILER_OUTPUTS_FILE="${TEMP_DIR}/test_kfp_samples_output.txt"
CONFIG_FILE="${PROJECT_DIR}/sdk/python/tests/config.yaml"

mkdir -p "${TEMP_DIR}"
mkdir -p "${TEKTON_COMPILED_YAML_DIR}"
Expand Down Expand Up @@ -70,14 +71,22 @@ cp "${COMPILE_REPORT_FILE}" "${COMPILE_REPORT_FILE_OLD}"
# delete the previous compiler output file
rm -f "${COMPILER_OUTPUTS_FILE}"

# check which pipelines have special configurations
PIPELINES=$(awk '/pipeline:/{print $NF}' ${CONFIG_FILE})

# compile each of the Python scripts in the KFP testdata folder
for f in "${KFP_TESTDATA_DIR}"/*.py; do
echo -e "\nCompiling ${f##*/}:" >> "${COMPILER_OUTPUTS_FILE}"
if dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1;
then
echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
IS_SPECIAL=$(grep -E ${f##*/} <<< ${PIPELINES})
if [ -z "${IS_SPECIAL}" ]; then
if dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1;
then
echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
else
echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
fi
else
echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
python3 -m test_util ${f} ${CONFIG_FILE} | grep -E 'SUCCESS:|FAILURE:'
fi
done | tee "${COMPILE_REPORT_FILE}"

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/test_kfp_samples_report.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
SUCCESS: add_pod_env.py
SUCCESS: artifact_location.py
SUCCESS: basic.py
FAILURE: basic_no_decorator.py
SUCCESS: basic_no_decorator.py
FAILURE: coin.py
FAILURE: compose.py
SUCCESS: compose.py
SUCCESS: default_value.py
FAILURE: input_artifact_raw_value.py
FAILURE: loop_over_lightweight_output.py
Expand Down
102 changes: 102 additions & 0 deletions sdk/python/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/bin/bash

# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import shutil
import zipfile
import yaml
import tempfile
import importlib
import kfp_tekton.compiler as compiler
import filecmp

def _get_yaml_from_zip(zip_file):
with zipfile.ZipFile(zip_file, 'r') as zip:
with open(zip.extract(zip.namelist()[0]), 'r') as yaml_file:
return list(yaml.safe_load_all(yaml_file))

def get_config(config_path):
with open(config_path) as file:
return list(yaml.safe_load_all(file))

def get_params_from_config(pipeline_name, config_path):
pipelines = get_config(config_path)

for pipeline in pipelines:
if pipeline_name == pipeline["pipeline"]:
return pipeline

def test_workflow_without_decorator(pipeline_mod, params_dict):
"""Test compiling a workflow and appending pipeline params."""

try:
pipeline_params = []
for param in params_dict.get('paramsList', []):
pipeline_params.append(getattr(pipeline_mod, param))

compiled_workflow = compiler.TektonCompiler()._create_workflow(
getattr(pipeline_mod,params_dict['function']),
params_dict.get('name', None),
params_dict.get('description', None),
pipeline_params if pipeline_params else None,
params_dict.get('conf', None))
return True
except :
return False

def test_nested_workflow(pipeline_mod, pipeline_list):
"""Test compiling a simple workflow, and a bigger one composed from the simple one."""

tmpdir = tempfile.mkdtemp()
try:
for pipeline in pipeline_list:
pipeline_name = pipeline['name']
package_path = os.path.join(tmpdir, pipeline_name + '.zip')
compiler.TektonCompiler().compile(getattr(pipeline_mod, pipeline_name), package_path)
return True
except:
return False


if __name__ == '__main__':
test_data_path = sys.argv[1]
config_path = sys.argv[2]
did_compile = False

# Import pipeline
test_data_dir, test_data_file = os.path.split(test_data_path)
import_name, test_data_ext = os.path.splitext(test_data_file)
sys.path.append(test_data_dir)
pipeline_mod = importlib.import_module(import_name)

# Get the pipeline specific parameters from the config file
params = get_params_from_config(test_data_file, config_path)
if params == None:
raise ValueError('No pipeline matches available in the config file')
test_type = params['type']

if test_type == 'nested':
did_compile = test_nested_workflow(pipeline_mod, params['components'])
elif test_type == 'no_decorator':
did_compile = test_workflow_without_decorator(pipeline_mod, params['components'])
else:
raise ValueError('Pipeline type \''+test_type+'\' is not recognized')
if did_compile:
print("SUCCESS:", test_data_file)
else:
print("FAILURE:", test_data_file)

0 comments on commit 6aa7ed3

Please sign in to comment.