Skip to content

Commit

Permalink
Support runnig multiple workflows and using a single entrypoint for p…
Browse files Browse the repository at this point in the history
…row jobs. (kubeflow#25)

Currently each repository needs to build a new Docker container. The sole purpose of this container is to specify the path to the Argo workflow to run.

We can simplify this by just allowing each repository to define a "prow_config.yaml" file at the root of the repository and then having a single binary which will read that config file and use it to figure out which workflows to invoke

This fixes kubeflow#24

We also add support for running multiple workflows.

This will make it easy to start using prow to build our containers on each commit (kubeflow#19)
  • Loading branch information
jlewi authored Feb 13, 2018
1 parent 17c1f7b commit d80d85c
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 114 deletions.
7 changes: 6 additions & 1 deletion images/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@ RUN pip install -U pip wheel && \
COPY checkout.sh /usr/local/bin
RUN chmod a+x /usr/local/bin/checkout.sh

COPY run_workflows.sh /usr/local/bin
RUN chmod a+x /usr/local/bin/run_workflows.sh

# Install docker.
RUN curl https://get.docker.com/ | sh

# Work around for https://github.com/ksonnet/ksonnet/issues/298
ENV USER root

# Add the directory where we will checkout kubeflow/testing
# Add the directory where we will checkout kubeflow/testing
# which contains shared scripts.
ENV PYTHONPATH /src/kubeflow/testing/py

ENTRYPOINT ["/usr/local/bin/run_workflows.sh"]
2 changes: 1 addition & 1 deletion images/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# Requirements:
# https://github.com/mattrobenolt/jinja2-cli
# pip install jinja2-clie
IMG = gcr.io/mlkube-testing/test-worker
IMG = gcr.io/kubeflow-ci/test-worker
TAG := $(shell date +v%Y%m%d)-$(shell git describe --tags --always --dirty)-$(shell git diff | sha256sum | cut -c -6)
DIR := ${CURDIR}

Expand Down
18 changes: 18 additions & 0 deletions images/run_workflows.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
#
# This script is meant to be the entrypoint for a prow job.
# It checkos out a repo and then looks for prow_config.yaml in that
# repo and uses that to run one or more workflows.
set -ex

# Checkout the code.
/usr/local/bin/checkout.sh /src

# Trigger a workflow
python -m kubeflow.testing.run_e2e_workflow \
--project=mlkube-testing \
--zone=us-east1-d \
--cluster=kubeflow-testing \
--bucket=kubernetes-jenkins \
--config_file=/src/${REPO_OWNER}/${REPO_NAME}/prow_config.yaml \
--repos_dir=/src
51 changes: 42 additions & 9 deletions py/kubeflow/testing/argo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,40 +22,73 @@ def log_status(workflow):
workflow["metadata"]["namespace"],
workflow["status"]["phase"])

def wait_for_workflow(client, namespace, name,
def wait_for_workflows(client, namespace, names,
timeout=datetime.timedelta(minutes=30),
polling_interval=datetime.timedelta(seconds=30),
status_callback=None):
"""Wait for the specified workflow to finish.
"""Wait for multiple workflows to finish.
Args:
client: K8s api client.
namespace: namespace for the workflow.
name: Name of the workflow.
names: Names of the workflows to wait for.
timeout: How long to wait for the workflow.
polling_interval: How often to poll for the status of the workflow.
status_callback: (Optional): Callable. If supplied this callable is
invoked after we poll the job. Callable takes a single argument which
is the job.
Returns:
results: A list of the final status of the workflows.
Raises:
TimeoutError: If timeout waiting for the job to finish.
"""
crd_api = k8s_client.CustomObjectsApi(client)
end_time = datetime.datetime.now() + timeout
while True:
results = crd_api.get_namespaced_custom_object(
GROUP, VERSION, namespace, PLURAL, name)
all_results = []

for n in names:
results = crd_api.get_namespaced_custom_object(
GROUP, VERSION, namespace, PLURAL, n)

if status_callback:
status_callback(results)
all_results.append(results)
if status_callback:
status_callback(results)

if results["status"]["phase"] in ["Failed", "Succeeded"]:
return results
done = True
for results in all_results:
if results["status"]["phase"] not in ["Failed", "Succeeded"]:
done = False

if done:
return all_results
if datetime.datetime.now() + polling_interval > end_time:
raise util.TimeoutError(
"Timeout waiting for workflow {0} in namespace {1} to finish.".format(
name, namespace))

time.sleep(polling_interval.seconds)

def wait_for_workflow(client, namespace, name,
timeout=datetime.timedelta(minutes=30),
polling_interval=datetime.timedelta(seconds=30),
status_callback=None):
"""Wait for the specified workflow to finish.
Args:
client: K8s api client.
namespace: namespace for the workflow.
name: Name of the workflow
timeout: How long to wait for the workflow.
polling_interval: How often to poll for the status of the workflow.
status_callback: (Optional): Callable. If supplied this callable is
invoked after we poll the job. Callable takes a single argument which
is the job.
Raises:
TimeoutError: If timeout waiting for the job to finish.
"""
results = wait_for_workflows(client, namespace, [name],
timeout, polling_interval, status_callback)
return results[0]
206 changes: 138 additions & 68 deletions py/kubeflow/testing/run_e2e_workflow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
"""Run the E2E workflow.
This script submits an Argo workflow to run the E2E tests and waits for
it to finish. It is intended to be invoked by prow jobs.
This script submits Argo workflows to run the E2E tests and waits for
them to finish. It is intended to be invoked by prow jobs.
It requires the workflow to be expressed as a ksonnet app.
The script can take a config file via --config_file.
The --config_file is expected to be a YAML file as follows:
workflows:
- name: e2e-test
app_dir: tensorflow/k8s/test/workflows
component: workflows
- name: lint
app_dir: kubeflow/kubeflow/testing/workflows
component: workflows
app_dir is expected to be in the form of
{REPO_OWNER}/{REPO_NAME}/path/to/ksonnet/app
component is the name of the ksonnet component corresponding
to the workflow to launch.
The script expects that the directories
{repos_dir}/{app_dir} exists. Where repos_dir is provided
as a command line argument.
"""

import argparse
Expand All @@ -17,10 +39,19 @@
from google.cloud import storage # pylint: disable=no-name-in-module
from kubeflow.testing import util
import sys
import yaml

# The namespace to launch the Argo workflow in.
NAMESPACE = "kubeflow-test-infra"

class WorkflowComponent(object):
"""Datastructure to represent a ksonnet component to submit a workflow."""

def __init__(self, name, app_dir, component):
self.name = name
self.app_dir = app_dir
self.component = component

def _get_src_dir():
return os.path.abspath(os.path.join(__file__, "..",))

Expand Down Expand Up @@ -58,84 +89,106 @@ def create_finished_file(bucket, success):
target = os.path.join(prow_artifacts.get_gcs_dir(bucket), "finished.json")
upload_to_gcs(contents, target)

def parse_config_file(config_file, root_dir):
with open(config_file) as hf:
results = yaml.load(hf)

components = []
for i in results["workflows"]:
components.append(WorkflowComponent(
i["name"], os.path.join(root_dir, i["app_dir"]), i["component"]))
return components

def run(args, file_handler):
workflows = []
if args.config_file:
workflows.extend(parse_config_file(args.config_file, args.repos_dir))

if args.app_dir and args.component:
workflows.append(WorkflowComponent("legacy", args.app_dir, args.component))
create_started_file(args.bucket)

util.maybe_activate_service_account()

util.configure_kubectl(args.project, args.zone, args.cluster)
util.load_kube_config()

# Create the name for the workflow
# We truncate sha numbers to prevent the workflow name from being too large.
# Workflow name should not be more than 63 characters because its used
# as a label on the pods.
workflow_name = os.getenv("JOB_NAME")
job_type = os.getenv("JOB_TYPE")
if job_type == "presubmit":
workflow_name += "-{0}".format(os.getenv("PULL_NUMBER"))
workflow_name += "-{0}".format(os.getenv("PULL_PULL_SHA")[0:7])

elif job_type == "postsubmit":
workflow_name += "-{0}".format(os.getenv("PULL_BASE_SHA")[0:7])

workflow_name += "-{0}".format(os.getenv("BUILD_NUMBER"))

salt = uuid.uuid4().hex[0:4]
# Add some salt. This is mostly a convenience for the case where you
# are submitting jobs manually for testing/debugging. Since the prow should
# vend unique build numbers for each job.
workflow_name += "-{0}".format(salt)

# Create a new environment for this run
env = workflow_name

util.run(["ks", "env", "add", env], cwd=args.app_dir)

util.run(["ks", "param", "set", "--env=" + env, args.component,
"name", workflow_name],
cwd=args.app_dir)
util.load_kube_config()

api_client = k8s_client.ApiClient()

# Set the prow environment variables.
prow_env = []

names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER",
"PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER",
"REPO_NAME"]
names.sort()
for v in names:
if not os.getenv(v):
continue
prow_env.append("{0}={1}".format(v, os.getenv(v)))

util.run(["ks", "param", "set", "--env=" + env, args.component, "prow_env", ",".join(prow_env)],
cwd=args.app_dir)
util.run(["ks", "param", "set", "--env=" + env, args.component, "namespace", NAMESPACE],
cwd=args.app_dir)
util.run(["ks", "param", "set", "--env=" + env, args.component, "bucket", args.bucket],
cwd=args.app_dir)

# For debugging print out the manifest
util.run(["ks", "show", env, "-c", args.component], cwd=args.app_dir)
util.run(["ks", "apply",env, "-c", args.component], cwd=args.app_dir)

ui_url = ("http://testing-argo.kubeflow.io/timeline/kubeflow-test-infra/{0}"
";tab=workflow".format(workflow_name))
logging.info("URL for workflow: %s", ui_url)
success = False
workflow_names = []
for w in workflows:
app_dir = w.app_dir
# Create the name for the workflow
# We truncate sha numbers to prevent the workflow name from being too large.
# Workflow name should not be more than 63 characters because its used
# as a label on the pods.
workflow_name = os.getenv("JOB_NAME") + "-" + w.name
job_type = os.getenv("JOB_TYPE")
if job_type == "presubmit":
workflow_name += "-{0}".format(os.getenv("PULL_NUMBER"))
workflow_name += "-{0}".format(os.getenv("PULL_PULL_SHA")[0:7])

elif job_type == "postsubmit":
workflow_name += "-{0}".format(os.getenv("PULL_BASE_SHA")[0:7])

workflow_name += "-{0}".format(os.getenv("BUILD_NUMBER"))

salt = uuid.uuid4().hex[0:4]
# Add some salt. This is mostly a convenience for the case where you
# are submitting jobs manually for testing/debugging. Since the prow should
# vend unique build numbers for each job.
workflow_name += "-{0}".format(salt)

workflow_names.append(workflow_name)
# Create a new environment for this run
env = workflow_name

util.run(["ks", "env", "add", env], cwd=w.app_dir)

util.run(["ks", "param", "set", "--env=" + env, w.component,
"name", workflow_name],
cwd=w.app_dir)

# Set the prow environment variables.
prow_env = []

names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER",
"PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER",
"REPO_NAME"]
names.sort()
for v in names:
if not os.getenv(v):
continue
prow_env.append("{0}={1}".format(v, os.getenv(v)))

util.run(["ks", "param", "set", "--env=" + env, w.component, "prow_env", ",".join(prow_env)],
cwd=w.app_dir)
util.run(["ks", "param", "set", "--env=" + env, w.component, "namespace", NAMESPACE],
cwd=w.app_dir)
util.run(["ks", "param", "set", "--env=" + env, w.component, "bucket", args.bucket],
cwd=w.app_dir)

# For debugging print out the manifest
util.run(["ks", "show", env, "-c", w.component], cwd=w.app_dir)
util.run(["ks", "apply",env, "-c", w.component], cwd=w.app_dir)

ui_url = ("http://testing-argo.kubeflow.io/timeline/kubeflow-test-infra/{0}"
";tab=workflow".format(workflow_name))
logging.info("URL for workflow: %s", ui_url)

success = True
try:
results = argo_client.wait_for_workflow(api_client, NAMESPACE, workflow_name,
status_callback=argo_client.log_status)
if results["status"]["phase"] == "Succeeded":
success = True
logging.info("Workflow %s/%s finished phase: %s", NAMESPACE, workflow_name,
results["status"]["phase"] )
results = argo_client.wait_for_workflows(api_client, NAMESPACE,
workflow_names,
status_callback=argo_client.log_status)
for r in results:
if results["status"]["phase"] != "Succeeded":
success = False
logging.info("Workflow %s/%s finished phase: %s", NAMESPACE,
results["metadata"]["name"],
results["status"]["phase"] )
except util.TimeoutError:
success = False
logging.error("Time out waiting for Workflow %s/%s to finish", NAMESPACE, workflow_name)
logging.error("Time out waiting for Workflows %s to finish", ",".join(workflow_names))
finally:
create_finished_file(args.bucket, success)

Expand Down Expand Up @@ -178,14 +231,31 @@ def main(unparsed_args=None): # pylint: disable=too-many-locals
type=str,
help="The bucket to use for the Gubernator outputs.")

parser.add_argument(
"--config_file",
default="",
type=str,
help="Yaml file containing the config.")

parser.add_argument(
"--repos_dir",
default="",
type=str,
help="The directory where the different repos are checked out.")

# TODO(jlewi): app_dir and component predate the use of a config
# file we should consider getting rid of them once all repos
# have been updated to run multiple workflows.
parser.add_argument(
"--app_dir",
type=str,
default="",
help="The directory where the ksonnet app is stored.")

parser.add_argument(
"--component",
type=str,
default="",
help="The ksonnet component to use.")

#############################################################################
Expand Down
Loading

0 comments on commit d80d85c

Please sign in to comment.