Skip to content

Commit

Permalink
Sanitize Kubernetes Names (#158)
Browse files Browse the repository at this point in the history
Resolves #151
  • Loading branch information
ckadner authored May 28, 2020
1 parent 89447c8 commit 7a12ca4
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 70 deletions.
48 changes: 48 additions & 0 deletions sdk/python/kfp_tekton/compiler/_k8s_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,57 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re

from kfp import dsl


def sanitize_k8s_name(name,
allow_capital_underscore=False,
allow_dot=False,
allow_slash=False,
max_length=63,
suffix_space=0):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.
https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
https://github.com/kubernetes/kubernetes/blob/c369cf18/staging/src/k8s.io/apimachinery/pkg/util/validation/validation.go#L89
Args:
name: original name
allow_capital_underscore: whether to allow capital letter and underscore
in this name (i.e. for parameters)
allow_dot: whether to allow dots in this name (i.e. for labels)
allow_slash: whether to allow slash in this name (i.e. for label and annotation keys)
max_length: maximum length of K8s name, default: 63
suffix_space: number of characters reserved for a suffix to be appended
Returns:
sanitized name.
"""
k8s_name = re.sub('[^-_./0-9A-Za-z]+', '-', name)

if not allow_capital_underscore:
k8s_name = re.sub('_', '-', k8s_name.lower())

if not allow_dot:
k8s_name = re.sub('[.]', '-', k8s_name)

if not allow_slash:
k8s_name = re.sub('[/]', '-', k8s_name)

# replace duplicate dashes, strip enclosing dashes
k8s_name = re.sub('-+', '-', k8s_name).strip('-')

# truncate if length exceeds max_length
max_length = max_length - suffix_space
k8s_name = k8s_name[:max_length].rstrip('-') if len(k8s_name) > max_length else k8s_name

return k8s_name


def convert_k8s_obj_to_json(k8s_obj):
"""
Builds a JSON K8s object.
Expand Down
16 changes: 13 additions & 3 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import List, Text, Dict, Any

from kfp import dsl
from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json
from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name
from kfp.compiler._op_to_template import _process_obj, _inputs_to_json, _outputs_to_json
from kfp.dsl import ArtifactLocation
from kfp.dsl._container_op import BaseOp
Expand Down Expand Up @@ -490,9 +490,19 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
if processed_op.pod_annotations or processed_op.pod_labels:
template.setdefault('metadata', {}) # Tekton change, don't wipe out existing metadata
if processed_op.pod_annotations:
template['metadata']['annotations'] = processed_op.pod_annotations
template['metadata']['annotations'] = {
sanitize_k8s_name(key, allow_capital_underscore=True, allow_dot=True,
allow_slash=True, max_length=253):
value
for key, value in processed_op.pod_annotations.items()
}
if processed_op.pod_labels:
template['metadata']['labels'] = processed_op.pod_labels
template['metadata']['labels'] = {
sanitize_k8s_name(key, allow_capital_underscore=True, allow_dot=True,
allow_slash=True, max_length=253):
sanitize_k8s_name(value, allow_capital_underscore=True, allow_dot=True)
for key, value in processed_op.pod_labels.items()
}

# sidecars
if processed_op.sidecars:
Expand Down
182 changes: 134 additions & 48 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import json
import tarfile
Expand All @@ -27,13 +28,13 @@

from kfp import dsl
from kfp.compiler._default_transformers import add_pod_env
from kfp.compiler._k8s_helper import sanitize_k8s_name
from kfp.compiler.compiler import Compiler
# from kfp.components._yaml_utils import dump_yaml
from kfp.components.structures import InputSpec
from kfp.dsl._for_loop import LoopArguments, LoopArgumentVariable
from kfp.dsl._metadata import _extract_pipeline_metadata

from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json
from kfp_tekton.compiler._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name

from kfp_tekton import tekton_api_version

Expand All @@ -55,7 +56,7 @@ def my_pipeline(a: int = 1, b: str = "default value"):
"""

def __init__(self, **kwargs):
# Intentionally set self.generate_pipeline and self.enable_artifacts to None because when _create_pipeline_workflow is called directly
# Intentionally set self.generate_pipeline and self.enable_artifacts to None because when _create_pipeline_workflow is called directly
# (e.g. in the case of there being no pipeline decorator), self.generate_pipeline and self.enable_artifacts is not set
self.generate_pipelinerun = None
self.enable_artifacts = None
Expand All @@ -77,20 +78,20 @@ def _get_loop_task(self, task: Dict, op_name_to_for_loop_op):
for loop_param in op_name_to_for_loop_op.values():
loop_args = loop_param.loop_args
if loop_args.name in tp['name']:
lpn = tp['name'].replace(loop_args.name, '').replace('-subvar-', '')
lpn = tp['name'].replace(loop_args.name, '').replace(LoopArgumentVariable.SUBVAR_NAME_DELIMITER, '')
if lpn:
tp['loopvalue'] = [value[lpn] for value in loop_args.items_or_pipeline_param]
tp['loop-value'] = [value[lpn] for value in loop_args.items_or_pipeline_param]
else:
tp['loopvalue'] = loop_args.items_or_pipeline_param
tp['loop-value'] = loop_args.items_or_pipeline_param
# Get the task params list
## Get the task_params list without loop first
loop_value = [p['loopvalue'] for p in task_parms_list if p.get('loopvalue')]
task_params_without_loop = [p for p in task_parms_list if not p.get('loopvalue')]
loop_value = [p['loop-value'] for p in task_parms_list if p.get('loop-value')]
task_params_without_loop = [p for p in task_parms_list if not p.get('loop-value')]
## Get the task_params list with loop
loop_params = [p for p in task_parms_list if p.get('loopvalue')]
for parm in loop_params:
del parm['loopvalue']
del parm['value']
loop_params = [p for p in task_parms_list if p.get('loop-value')]
for param in loop_params:
del param['loop-value']
del param['value']
value_iter = list(itertools.product(*loop_value))
value_iter_list = []
for values in value_iter:
Expand All @@ -106,10 +107,11 @@ def _get_loop_task(self, task: Dict, op_name_to_for_loop_op):
# Get the task list based on parmas list
task_list = []
del task['params']
task_old_name = task['name']
task_name_suffix_length = len(LoopArguments.LOOP_ITEM_NAME_BASE) + LoopArguments.NUM_CODE_CHARS + 2
task_old_name = sanitize_k8s_name(task['name'], suffix_space=task_name_suffix_length)
for i in range(len(task_parms_all)):
task['params'] = task_parms_all[i]
task['name'] = '%s-loop-items-%d' % (task_old_name, i)
task['name'] = '%s-%s-%d' % (task_old_name, LoopArguments.LOOP_ITEM_NAME_BASE, i)
task_list.append(copy.deepcopy(task))
del task['params']
return task_list
Expand Down Expand Up @@ -146,11 +148,11 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):
template = {
'apiVersion': tekton_api_version,
'metadata': {
'name': sub_group.name,
'name': sanitize_k8s_name(sub_group.name),
},
'spec': {}
}

# Generates template sections unique to conditions
if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition':
subgroup_inputs = inputs.get(sub_group.name, [])
Expand All @@ -162,9 +164,9 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):
# Adds params to condition template
params = []
if isinstance(condition.operand1, dsl.PipelineParam):
params.append({'name': operand1_value[9: len(operand1_value) - 1]}) # Substring to remove parameter reference wrapping
params.append({'name': operand1_value[9: len(operand1_value) - 1]}) # Substring to remove parameter reference wrapping
if isinstance(condition.operand2, dsl.PipelineParam):
params.append({'name': operand2_value[9: len(operand1_value) - 1]}) # Substring to remove parameter reference wrapping
params.append({'name': operand2_value[9: len(operand1_value) - 1]}) # Substring to remove parameter reference wrapping
if params:
template['spec']['params'] = params

Expand All @@ -177,8 +179,8 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):
if_else = 'print(0) if (input1 ' + condition.operator + ' input2) else print(1)\' ' + operand1_value + ' ' + operand2_value + '); '
exit_code = 'exit $EXITCODE'
shell_script = input_grab + try_catch + if_else + exit_code
template['apiVersion'] = 'tekton.dev/v1alpha1' # TODO Change to tekton_api_version once Conditions are out of v1alpha1

template['apiVersion'] = 'tekton.dev/v1alpha1' # TODO Change to tekton_api_version once Conditions are out of v1alpha1
template['kind'] = 'Condition'
template['spec']['check'] = {
'args': [shell_script],
Expand All @@ -188,7 +190,6 @@ def _group_to_dag_template(self, group, inputs, outputs, dependencies):

return template


def _create_dag_templates(self, pipeline, op_transformers=None, params=None, op_to_templates_handler=None):
"""Create all groups and ops templates in the pipeline.
Expand Down Expand Up @@ -294,7 +295,7 @@ def _workflow_with_pipelinerun(self, task_refs, pipeline, pipeline_template, wor
'apiVersion': tekton_api_version,
'kind': 'PipelineRun',
'metadata': {
'name': pipeline_template['metadata']['name'] + '-run'
'name': sanitize_k8s_name(pipeline_template['metadata']['name'], suffix_space=4) + '-run'
},
'spec': {
'params': [{
Expand Down Expand Up @@ -335,7 +336,7 @@ def _workflow_with_pipelinerun(self, task_refs, pipeline, pipeline_template, wor
service_template = {
'apiVersion': 'v1',
'kind': 'ServiceAccount',
'metadata': {'name': pipelinerun['metadata']['name'] + '-sa'}
'metadata': {'name': sanitize_k8s_name(pipelinerun['metadata']['name'], suffix_space=3) + '-sa'}
}
for image_pull_secret in pipeline.conf.image_pull_secrets:
service_template['imagePullSecrets'] = [{'name': image_pull_secret.name}]
Expand Down Expand Up @@ -393,7 +394,7 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
]
}
)

# process input parameters from upstream tasks for conditions and pair conditions with their ancestor conditions
opsgroup_stack = [pipeline.groups[0]]
condition_stack = [None]
Expand Down Expand Up @@ -510,6 +511,46 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli

return workflow # Tekton change, from return type Dict[Text, Any] to List[Dict[Text, Any]]

def _sanitize_and_inject_artifact(self, pipeline: dsl.Pipeline, pipeline_conf=None):
"""Sanitize operator/param names and inject pipeline artifact location."""

# Sanitize operator names and param names
sanitized_ops = {}
# pipeline level artifact location
artifact_location = pipeline_conf.artifact_location

for op in pipeline.ops.values():
# inject pipeline level artifact location into if the op does not have
# an artifact location config already.
if hasattr(op, "artifact_location"):
if artifact_location and not op.artifact_location:
op.artifact_location = artifact_location

sanitized_name = sanitize_k8s_name(op.name)
op.name = sanitized_name
for param in op.outputs.values():
param.name = sanitize_k8s_name(param.name, True)
if param.op_name:
param.op_name = sanitize_k8s_name(param.op_name)
if op.output is not None and not isinstance(op.output, dsl._container_op._MultipleOutputsError):
op.output.name = sanitize_k8s_name(op.output.name, True)
op.output.op_name = sanitize_k8s_name(op.output.op_name)
if op.dependent_names:
op.dependent_names = [sanitize_k8s_name(name) for name in op.dependent_names]
if isinstance(op, dsl.ContainerOp) and op.file_outputs is not None:
sanitized_file_outputs = {}
for key in op.file_outputs.keys():
sanitized_file_outputs[sanitize_k8s_name(key, True)] = op.file_outputs[key]
op.file_outputs = sanitized_file_outputs
elif isinstance(op, dsl.ResourceOp) and op.attribute_outputs is not None:
sanitized_attribute_outputs = {}
for key in op.attribute_outputs.keys():
sanitized_attribute_outputs[sanitize_k8s_name(key, True)] = \
op.attribute_outputs[key]
op.attribute_outputs = sanitized_attribute_outputs
sanitized_ops[sanitized_name] = op
pipeline.ops = sanitized_ops

# NOTE: the methods below are "copied" from KFP with changes in the method signatures (only)
# to accommodate multiple documents in the YAML output file:
# KFP Argo -> Dict[Text, Any]
Expand Down Expand Up @@ -709,31 +750,76 @@ def _create_and_write_workflow(


def _validate_workflow(workflow: List[Dict[Text, Any]]): # Tekton change, signature

# verify that all names and labels conform to kubernetes naming standards
# https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/

def _find_items(obj, search_key, current_path="", results_dict=dict()) -> dict:
if isinstance(obj, dict):
if search_key in obj:
results_dict.update({"%s.%s" % (current_path, search_key): obj[search_key]})
for k, v in obj.items():
_find_items(v, search_key, "%s.%s" % (current_path, k), results_dict)
elif isinstance(obj, list):
for i, list_item in enumerate(obj):
_find_items(list_item, search_key, "%s[%i]" % (current_path, i), results_dict)
return {k.lstrip("."): v for k, v in results_dict.items()}

non_k8s_names = {path: name for path, name in _find_items(workflow, "name").items()
if "metadata" in path and name != sanitize_k8s_name(name)
or "param" in path and name != sanitize_k8s_name(name, allow_capital_underscore=True)}

non_k8s_labels = {path: k_v_dict for path, k_v_dict in _find_items(workflow, "labels", "", {}).items()
if "metadata" in path and
any([k != sanitize_k8s_name(k, allow_capital_underscore=True, allow_dot=True, allow_slash=True, max_length=253) or
v != sanitize_k8s_name(v, allow_capital_underscore=True, allow_dot=True)
for k, v in k_v_dict.items()])}

non_k8s_annotations = {path: k_v_dict for path, k_v_dict in _find_items(workflow, "annotations", "", {}).items()
if "metadata" in path and
any([k != sanitize_k8s_name(k, allow_capital_underscore=True, allow_dot=True, allow_slash=True, max_length=253)
for k in k_v_dict.keys()])}

error_msg_tmplt = textwrap.dedent("""\
Internal compiler error: Found non-compliant Kubernetes %s:
%s
Please create a new issue at https://github.com/kubeflow/kfp-tekton/issues
attaching the pipeline DSL code and the pipeline YAML.""")

if non_k8s_names:
raise RuntimeError(error_msg_tmplt % ("names", json.dumps(non_k8s_names, sort_keys=False, indent=2)))

if non_k8s_labels:
raise RuntimeError(error_msg_tmplt % ("labels", json.dumps(non_k8s_labels, sort_keys=False, indent=2)))

if non_k8s_annotations:
raise RuntimeError(error_msg_tmplt % ("annotations", json.dumps(non_k8s_annotations, sort_keys=False, indent=2)))

# TODO: Tekton pipeline parameter validation
# workflow = workflow.copy()
# # Working around Argo lint issue
# for argument in workflow['spec'].get('arguments', {}).get('parameters', []):
# if 'value' not in argument:
# argument['value'] = ''
#
# yaml_text = dump_yaml(workflow)
# if '{{pipelineparam' in yaml_text:
# raise RuntimeError(
# '''Internal compiler error: Found unresolved PipelineParam.
# Please create a new issue at https://github.com/kubeflow/pipelines/issues attaching the pipeline code and the pipeline package.'''
# )
# workflow = workflow.copy()
# # Working around Argo lint issue
# for argument in workflow['spec'].get('arguments', {}).get('parameters', []):
# if 'value' not in argument:
# argument['value'] = ''
# yaml_text = dump_yaml(workflow)
# if '{{pipelineparam' in yaml_text:
# raise RuntimeError(
# '''Internal compiler error: Found unresolved PipelineParam.
# Please create a new issue at https://github.com/kubeflow/kfp-tekton/issues attaching the pipeline code and the pipeline package.'''
# )

# TODO: Tekton lint, if a tool exists for it
# # Running Argo lint if available
# import shutil
# import subprocess
# argo_path = shutil.which('argo')
# if argo_path:
# result = subprocess.run([argo_path, 'lint', '/dev/stdin'], input=yaml_text.encode('utf-8'), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# if result.returncode:
# raise RuntimeError(
# '''Internal compiler error: Compiler has produced Argo-incompatible workflow.
# Please create a new issue at https://github.com/kubeflow/pipelines/issues attaching the pipeline code and the pipeline package.
# Error: {}'''.format(result.stderr.decode('utf-8'))
# )
# # Running Argo lint if available
# import shutil
# import subprocess
# argo_path = shutil.which('argo')
# if argo_path:
# result = subprocess.run([argo_path, 'lint', '/dev/stdin'], input=yaml_text.encode('utf-8'), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# if result.returncode:
# raise RuntimeError(
# '''Internal compiler error: Compiler has produced Argo-incompatible workflow.
# Please create a new issue at https://github.com/kubeflow/kfp-tekton/issues attaching the pipeline code and the pipeline package.
# Error: {}'''.format(result.stderr.decode('utf-8'))
# )
pass
Loading

0 comments on commit 7a12ca4

Please sign in to comment.