diff --git a/images/Dockerfile b/images/Dockerfile index ec35ae857f6..d969ef9003b 100644 --- a/images/Dockerfile +++ b/images/Dockerfile @@ -103,12 +103,17 @@ RUN pip install -U pip wheel && \ COPY checkout.sh /usr/local/bin RUN chmod a+x /usr/local/bin/checkout.sh +COPY run_workflows.sh /usr/local/bin +RUN chmod a+x /usr/local/bin/run_workflows.sh + # Install docker. RUN curl https://get.docker.com/ | sh # Work around for https://github.com/ksonnet/ksonnet/issues/298 ENV USER root -# Add the directory where we will checkout kubeflow/testing +# Add the directory where we will checkout kubeflow/testing # which contains shared scripts. ENV PYTHONPATH /src/kubeflow/testing/py + +ENTRYPOINT ["/usr/local/bin/run_workflows.sh"] \ No newline at end of file diff --git a/images/Makefile b/images/Makefile index f16c99216f8..cfaf63b6c38 100755 --- a/images/Makefile +++ b/images/Makefile @@ -15,7 +15,7 @@ # Requirements: # https://github.com/mattrobenolt/jinja2-cli # pip install jinja2-clie -IMG = gcr.io/mlkube-testing/test-worker +IMG = gcr.io/kubeflow-ci/test-worker TAG := $(shell date +v%Y%m%d)-$(shell git describe --tags --always --dirty)-$(shell git diff | sha256sum | cut -c -6) DIR := ${CURDIR} diff --git a/images/run_workflows.sh b/images/run_workflows.sh new file mode 100644 index 00000000000..78983e4deea --- /dev/null +++ b/images/run_workflows.sh @@ -0,0 +1,18 @@ +#!/bin/bash +# +# This script is meant to be the entrypoint for a prow job. +# It checkos out a repo and then looks for prow_config.yaml in that +# repo and uses that to run one or more workflows. +set -ex + +# Checkout the code. +/usr/local/bin/checkout.sh /src + +# Trigger a workflow +python -m kubeflow.testing.run_e2e_workflow \ + --project=mlkube-testing \ + --zone=us-east1-d \ + --cluster=kubeflow-testing \ + --bucket=kubernetes-jenkins \ + --config_file=/src/${REPO_OWNER}/${REPO_NAME}/prow_config.yaml \ + --repos_dir=/src \ No newline at end of file diff --git a/py/kubeflow/testing/argo_client.py b/py/kubeflow/testing/argo_client.py index 291638c74eb..31281e2bee8 100644 --- a/py/kubeflow/testing/argo_client.py +++ b/py/kubeflow/testing/argo_client.py @@ -22,40 +22,73 @@ def log_status(workflow): workflow["metadata"]["namespace"], workflow["status"]["phase"]) -def wait_for_workflow(client, namespace, name, +def wait_for_workflows(client, namespace, names, timeout=datetime.timedelta(minutes=30), polling_interval=datetime.timedelta(seconds=30), status_callback=None): - """Wait for the specified workflow to finish. + """Wait for multiple workflows to finish. Args: client: K8s api client. namespace: namespace for the workflow. - name: Name of the workflow. + names: Names of the workflows to wait for. timeout: How long to wait for the workflow. polling_interval: How often to poll for the status of the workflow. status_callback: (Optional): Callable. If supplied this callable is invoked after we poll the job. Callable takes a single argument which is the job. + Returns: + results: A list of the final status of the workflows. Raises: TimeoutError: If timeout waiting for the job to finish. """ crd_api = k8s_client.CustomObjectsApi(client) end_time = datetime.datetime.now() + timeout while True: - results = crd_api.get_namespaced_custom_object( - GROUP, VERSION, namespace, PLURAL, name) + all_results = [] + + for n in names: + results = crd_api.get_namespaced_custom_object( + GROUP, VERSION, namespace, PLURAL, n) - if status_callback: - status_callback(results) + all_results.append(results) + if status_callback: + status_callback(results) - if results["status"]["phase"] in ["Failed", "Succeeded"]: - return results + done = True + for results in all_results: + if results["status"]["phase"] not in ["Failed", "Succeeded"]: + done = False + if done: + return all_results if datetime.datetime.now() + polling_interval > end_time: raise util.TimeoutError( "Timeout waiting for workflow {0} in namespace {1} to finish.".format( name, namespace)) time.sleep(polling_interval.seconds) + +def wait_for_workflow(client, namespace, name, + timeout=datetime.timedelta(minutes=30), + polling_interval=datetime.timedelta(seconds=30), + status_callback=None): + """Wait for the specified workflow to finish. + + Args: + client: K8s api client. + namespace: namespace for the workflow. + name: Name of the workflow + timeout: How long to wait for the workflow. + polling_interval: How often to poll for the status of the workflow. + status_callback: (Optional): Callable. If supplied this callable is + invoked after we poll the job. Callable takes a single argument which + is the job. + + Raises: + TimeoutError: If timeout waiting for the job to finish. + """ + results = wait_for_workflows(client, namespace, [name], + timeout, polling_interval, status_callback) + return results[0] diff --git a/py/kubeflow/testing/run_e2e_workflow.py b/py/kubeflow/testing/run_e2e_workflow.py index 51bc01fe626..c3706110e64 100644 --- a/py/kubeflow/testing/run_e2e_workflow.py +++ b/py/kubeflow/testing/run_e2e_workflow.py @@ -1,9 +1,31 @@ """Run the E2E workflow. -This script submits an Argo workflow to run the E2E tests and waits for -it to finish. It is intended to be invoked by prow jobs. +This script submits Argo workflows to run the E2E tests and waits for +them to finish. It is intended to be invoked by prow jobs. It requires the workflow to be expressed as a ksonnet app. + +The script can take a config file via --config_file. +The --config_file is expected to be a YAML file as follows: + +workflows: + - name: e2e-test + app_dir: tensorflow/k8s/test/workflows + component: workflows + + - name: lint + app_dir: kubeflow/kubeflow/testing/workflows + component: workflows + +app_dir is expected to be in the form of +{REPO_OWNER}/{REPO_NAME}/path/to/ksonnet/app + +component is the name of the ksonnet component corresponding +to the workflow to launch. + +The script expects that the directories +{repos_dir}/{app_dir} exists. Where repos_dir is provided +as a command line argument. """ import argparse @@ -17,10 +39,19 @@ from google.cloud import storage # pylint: disable=no-name-in-module from kubeflow.testing import util import sys +import yaml # The namespace to launch the Argo workflow in. NAMESPACE = "kubeflow-test-infra" +class WorkflowComponent(object): + """Datastructure to represent a ksonnet component to submit a workflow.""" + + def __init__(self, name, app_dir, component): + self.name = name + self.app_dir = app_dir + self.component = component + def _get_src_dir(): return os.path.abspath(os.path.join(__file__, "..",)) @@ -58,7 +89,23 @@ def create_finished_file(bucket, success): target = os.path.join(prow_artifacts.get_gcs_dir(bucket), "finished.json") upload_to_gcs(contents, target) +def parse_config_file(config_file, root_dir): + with open(config_file) as hf: + results = yaml.load(hf) + + components = [] + for i in results["workflows"]: + components.append(WorkflowComponent( + i["name"], os.path.join(root_dir, i["app_dir"]), i["component"])) + return components + def run(args, file_handler): + workflows = [] + if args.config_file: + workflows.extend(parse_config_file(args.config_file, args.repos_dir)) + + if args.app_dir and args.component: + workflows.append(WorkflowComponent("legacy", args.app_dir, args.component)) create_started_file(args.bucket) util.maybe_activate_service_account() @@ -66,76 +113,82 @@ def run(args, file_handler): util.configure_kubectl(args.project, args.zone, args.cluster) util.load_kube_config() - # Create the name for the workflow - # We truncate sha numbers to prevent the workflow name from being too large. - # Workflow name should not be more than 63 characters because its used - # as a label on the pods. - workflow_name = os.getenv("JOB_NAME") - job_type = os.getenv("JOB_TYPE") - if job_type == "presubmit": - workflow_name += "-{0}".format(os.getenv("PULL_NUMBER")) - workflow_name += "-{0}".format(os.getenv("PULL_PULL_SHA")[0:7]) - - elif job_type == "postsubmit": - workflow_name += "-{0}".format(os.getenv("PULL_BASE_SHA")[0:7]) - - workflow_name += "-{0}".format(os.getenv("BUILD_NUMBER")) - - salt = uuid.uuid4().hex[0:4] - # Add some salt. This is mostly a convenience for the case where you - # are submitting jobs manually for testing/debugging. Since the prow should - # vend unique build numbers for each job. - workflow_name += "-{0}".format(salt) - - # Create a new environment for this run - env = workflow_name - - util.run(["ks", "env", "add", env], cwd=args.app_dir) - - util.run(["ks", "param", "set", "--env=" + env, args.component, - "name", workflow_name], - cwd=args.app_dir) - util.load_kube_config() - api_client = k8s_client.ApiClient() - - # Set the prow environment variables. - prow_env = [] - - names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER", - "PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER", - "REPO_NAME"] - names.sort() - for v in names: - if not os.getenv(v): - continue - prow_env.append("{0}={1}".format(v, os.getenv(v))) - - util.run(["ks", "param", "set", "--env=" + env, args.component, "prow_env", ",".join(prow_env)], - cwd=args.app_dir) - util.run(["ks", "param", "set", "--env=" + env, args.component, "namespace", NAMESPACE], - cwd=args.app_dir) - util.run(["ks", "param", "set", "--env=" + env, args.component, "bucket", args.bucket], - cwd=args.app_dir) - - # For debugging print out the manifest - util.run(["ks", "show", env, "-c", args.component], cwd=args.app_dir) - util.run(["ks", "apply",env, "-c", args.component], cwd=args.app_dir) - - ui_url = ("http://testing-argo.kubeflow.io/timeline/kubeflow-test-infra/{0}" - ";tab=workflow".format(workflow_name)) - logging.info("URL for workflow: %s", ui_url) - success = False + workflow_names = [] + for w in workflows: + app_dir = w.app_dir + # Create the name for the workflow + # We truncate sha numbers to prevent the workflow name from being too large. + # Workflow name should not be more than 63 characters because its used + # as a label on the pods. + workflow_name = os.getenv("JOB_NAME") + "-" + w.name + job_type = os.getenv("JOB_TYPE") + if job_type == "presubmit": + workflow_name += "-{0}".format(os.getenv("PULL_NUMBER")) + workflow_name += "-{0}".format(os.getenv("PULL_PULL_SHA")[0:7]) + + elif job_type == "postsubmit": + workflow_name += "-{0}".format(os.getenv("PULL_BASE_SHA")[0:7]) + + workflow_name += "-{0}".format(os.getenv("BUILD_NUMBER")) + + salt = uuid.uuid4().hex[0:4] + # Add some salt. This is mostly a convenience for the case where you + # are submitting jobs manually for testing/debugging. Since the prow should + # vend unique build numbers for each job. + workflow_name += "-{0}".format(salt) + + workflow_names.append(workflow_name) + # Create a new environment for this run + env = workflow_name + + util.run(["ks", "env", "add", env], cwd=w.app_dir) + + util.run(["ks", "param", "set", "--env=" + env, w.component, + "name", workflow_name], + cwd=w.app_dir) + + # Set the prow environment variables. + prow_env = [] + + names = ["JOB_NAME", "JOB_TYPE", "BUILD_ID", "BUILD_NUMBER", + "PULL_BASE_SHA", "PULL_NUMBER", "PULL_PULL_SHA", "REPO_OWNER", + "REPO_NAME"] + names.sort() + for v in names: + if not os.getenv(v): + continue + prow_env.append("{0}={1}".format(v, os.getenv(v))) + + util.run(["ks", "param", "set", "--env=" + env, w.component, "prow_env", ",".join(prow_env)], + cwd=w.app_dir) + util.run(["ks", "param", "set", "--env=" + env, w.component, "namespace", NAMESPACE], + cwd=w.app_dir) + util.run(["ks", "param", "set", "--env=" + env, w.component, "bucket", args.bucket], + cwd=w.app_dir) + + # For debugging print out the manifest + util.run(["ks", "show", env, "-c", w.component], cwd=w.app_dir) + util.run(["ks", "apply",env, "-c", w.component], cwd=w.app_dir) + + ui_url = ("http://testing-argo.kubeflow.io/timeline/kubeflow-test-infra/{0}" + ";tab=workflow".format(workflow_name)) + logging.info("URL for workflow: %s", ui_url) + + success = True try: - results = argo_client.wait_for_workflow(api_client, NAMESPACE, workflow_name, - status_callback=argo_client.log_status) - if results["status"]["phase"] == "Succeeded": - success = True - logging.info("Workflow %s/%s finished phase: %s", NAMESPACE, workflow_name, - results["status"]["phase"] ) + results = argo_client.wait_for_workflows(api_client, NAMESPACE, + workflow_names, + status_callback=argo_client.log_status) + for r in results: + if results["status"]["phase"] != "Succeeded": + success = False + logging.info("Workflow %s/%s finished phase: %s", NAMESPACE, + results["metadata"]["name"], + results["status"]["phase"] ) except util.TimeoutError: success = False - logging.error("Time out waiting for Workflow %s/%s to finish", NAMESPACE, workflow_name) + logging.error("Time out waiting for Workflows %s to finish", ",".join(workflow_names)) finally: create_finished_file(args.bucket, success) @@ -178,14 +231,31 @@ def main(unparsed_args=None): # pylint: disable=too-many-locals type=str, help="The bucket to use for the Gubernator outputs.") + parser.add_argument( + "--config_file", + default="", + type=str, + help="Yaml file containing the config.") + + parser.add_argument( + "--repos_dir", + default="", + type=str, + help="The directory where the different repos are checked out.") + + # TODO(jlewi): app_dir and component predate the use of a config + # file we should consider getting rid of them once all repos + # have been updated to run multiple workflows. parser.add_argument( "--app_dir", type=str, + default="", help="The directory where the ksonnet app is stored.") parser.add_argument( "--component", type=str, + default="", help="The ksonnet component to use.") ############################################################################# diff --git a/py/kubeflow/testing/run_e2e_workflow_test.py b/py/kubeflow/testing/run_e2e_workflow_test.py index e856822d776..838c9e0aba5 100644 --- a/py/kubeflow/testing/run_e2e_workflow_test.py +++ b/py/kubeflow/testing/run_e2e_workflow_test.py @@ -4,14 +4,28 @@ import mock from kubeflow.testing import run_e2e_workflow import tempfile +import yaml from google.cloud import storage # pylint: disable=no-name-in-module + class TestRunE2eWorkflow(unittest.TestCase): + + def assertItemsMatch(self, expected, actual): + """Check that expected matches actual. + + Args: + Expected: List of strings. These can be regex. + Actual: Actual items. + """ + self.assertEqual(len(expected), len(actual)) + for index, e in enumerate(expected): + self.assertRegexpMatches(actual[index], e) + @mock.patch("kubeflow.testing.run_e2e_workflow.upload_file_to_gcs") @mock.patch("kubeflow.testing.run_e2e_workflow.upload_to_gcs") @mock.patch("kubeflow.testing.run_e2e_workflow.util.load_kube_config") - @mock.patch("kubeflow.testing.run_e2e_workflow.argo_client.wait_for_workflow") + @mock.patch("kubeflow.testing.run_e2e_workflow.argo_client.wait_for_workflows") @mock.patch("kubeflow.testing.run_e2e_workflow.util.configure_kubectl") @mock.patch("kubeflow.testing.run_e2e_workflow.util.run") def testMainPresubmit(self, mock_run, mock_configure, mock_wait, *unused_mocks): # pylint: disable=no-self-use @@ -27,42 +41,107 @@ def testMainPresubmit(self, mock_run, mock_configure, mock_wait, *unused_mocks): args = ["--project=some-project", "--cluster=some-cluster", "--zone=us-east1-d", "--bucket=some-bucket", - "--component=workflows", "--env=prow"] + "--app_dir=/some/dir", + "--component=workflows"] run_e2e_workflow.main(args) mock_configure.assert_called_once_with("some-project", "us-east1-d", "some-cluster",) - self.assertItemsEqual( - ["ks", "param", "set", "workflows", "name"], - mock_run.call_args_list[0][0][0][:-1]) - # Workflow name will have some random salt at the end. - self.assertRegexpMatches(mock_run.call_args_list[0][0][0][-1], - "kubeflow-presubmit-77-[0-9a-z]{4}") - - self.assertItemsEqual( - ["ks", "param", "set", "workflows", "prow_env", - "BUILD_NUMBER=1234,JOB_NAME=kubeflow-presubmit,JOB_TYPE=presubmit" - ",PULL_NUMBER=77,PULL_PULL_SHA=123abc,REPO_NAME=fake_name" - ",REPO_OWNER=fake_org"], - mock_run.call_args_list[1][0][0]) - - self.assertItemsEqual( - ["ks", "param", "set", "workflows", "namespace", - "kubeflow-test-infra"], - mock_run.call_args_list[2][0][0]) - - self.assertItemsEqual( - ["ks", "param", "set", "workflows", "bucket", "some-bucket"], - mock_run.call_args_list[3][0][0]) - - self.assertItemsEqual( - ["ks", "show", "prow", "-c", "workflows"], - mock_run.call_args_list[4][0][0]) - - self.assertItemsEqual( - ["ks", "apply", "prow", "-c", "workflows"], - mock_run.call_args_list[5][0][0]) + expected_calls = [ + ["ks", "env", "add", "kubeflow-presubmit-legacy-77-123abc-1234-.*"], + ["ks", "param", "set", "--env=.*", "workflows", "name", + "kubeflow-presubmit-legacy-77-[0-9a-z]{4}"], + ["ks", "param", "set", + "--env=.*", + "workflows", "prow_env", + "BUILD_NUMBER=1234,JOB_NAME=kubeflow-presubmit,JOB_TYPE=presubmit" + ",PULL_NUMBER=77,PULL_PULL_SHA=123abc,REPO_NAME=fake_name" + ",REPO_OWNER=fake_org"], + ["ks", "param", "set", + "--env=.*", + "workflows", "namespace", + "kubeflow-test-infra"], + ["ks", "param", "set", + "--env=.*", + "workflows", "bucket", "some-bucket"], + ["ks", "show", "kubeflow-presubmit.*", "-c", "workflows"], + ["ks", "apply", "kubeflow-presubmit.*", "-c", "workflows"], + ] + + for i, expected in enumerate(expected_calls): + self.assertItemsMatch( + expected, + mock_run.call_args_list[i][0][0]) + self.assertEquals( + "/some/dir", + mock_run.call_args_list[i][1]["cwd"]) + + @mock.patch("kubeflow.testing.run_e2e_workflow.upload_file_to_gcs") + @mock.patch("kubeflow.testing.run_e2e_workflow.upload_to_gcs") + @mock.patch("kubeflow.testing.run_e2e_workflow.util.load_kube_config") + @mock.patch("kubeflow.testing.run_e2e_workflow.argo_client.wait_for_workflows") + @mock.patch("kubeflow.testing.run_e2e_workflow.util.configure_kubectl") + @mock.patch("kubeflow.testing.run_e2e_workflow.util.run") + def testWithConfig(self, mock_run, mock_configure, mock_wait, *unused_mocks): # pylint: disable=no-self-use + """Test creating a workflow from a config file.""" + + config = { + "workflows": [ + {"app_dir": "kubeflow/testing/workflows", + "component": "workflows", + "name": "wf", + },] + } + with tempfile.NamedTemporaryFile(delete=False) as hf: + yaml.dump(config, hf) + name = hf.name + + os.environ["REPO_OWNER"] = "fake_org" + os.environ["REPO_NAME"] = "fake_name" + os.environ["PULL_NUMBER"] = "77" + os.environ["PULL_PULL_SHA"] = "123abc" + os.environ["JOB_NAME"] = "kubeflow-presubmit" + os.environ["JOB_TYPE"] = "presubmit" + os.environ["BUILD_NUMBER"] = "1234" + + args = ["--project=some-project", "--cluster=some-cluster", + "--zone=us-east1-d", "--bucket=some-bucket", + "--config_file=" + name, + "--repos_dir=/src"] + run_e2e_workflow.main(args) + + mock_configure.assert_called_once_with("some-project", "us-east1-d", + "some-cluster",) + + expected_calls = [ + ["ks", "env", "add", "kubeflow-presubmit-wf-77-123abc-1234-.*"], + ["ks", "param", "set", "--env=.*", "workflows", "name", + "kubeflow-presubmit-wf-77-[0-9a-z]{4}"], + ["ks", "param", "set", + "--env=.*", + "workflows", "prow_env", + "BUILD_NUMBER=1234,JOB_NAME=kubeflow-presubmit,JOB_TYPE=presubmit" + ",PULL_NUMBER=77,PULL_PULL_SHA=123abc,REPO_NAME=fake_name" + ",REPO_OWNER=fake_org"], + ["ks", "param", "set", + "--env=.*", + "workflows", "namespace", + "kubeflow-test-infra"], + ["ks", "param", "set", + "--env=.*", + "workflows", "bucket", "some-bucket"], + ["ks", "show", "kubeflow-presubmit.*", "-c", "workflows"], + ["ks", "apply", "kubeflow-presubmit.*", "-c", "workflows"], + ] + + for i, expected in enumerate(expected_calls): + self.assertItemsMatch( + expected, + mock_run.call_args_list[i][0][0]) + self.assertEquals( + "/src/kubeflow/testing/workflows", + mock_run.call_args_list[i][1]["cwd"]) if __name__ == "__main__": unittest.main() diff --git a/py/kubeflow/testing/util.py b/py/kubeflow/testing/util.py index 5a0035a59d9..82ad3247479 100755 --- a/py/kubeflow/testing/util.py +++ b/py/kubeflow/testing/util.py @@ -490,8 +490,10 @@ def _save_kube_config(config_map): loader.load_and_set(client_configuration) def maybe_activate_service_account(): - if not os.getenv("GOOGLE_APPLICATION_CREDENTIALS"): + if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"): logging.info("GOOGLE_APPLICATION_CREDENTIALS is set; configuring gcloud " "to use service account.") - run(["gcloud", "auth", "activate-service-account", - "--key-file=" + os.getenv("GOOGLE_APPLICATION_CREDENTIALS")]) + run(["gcloud", "auth", "activate-service-account", + "--key-file=" + os.getenv("GOOGLE_APPLICATION_CREDENTIALS")]) + else: + logging.info("GOOGLE_APPLICATION_CREDENTIALS is not set.") \ No newline at end of file