Skip to content

Commit

Permalink
update ci tests for mnist example
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchihe committed Nov 28, 2019
1 parent 341decc commit 9fdbc7b
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 191 deletions.
106 changes: 41 additions & 65 deletions mnist/testing/deploy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,80 +10,56 @@
* Provides utilities for testing
Manually running the test
1. Configure your KUBECONFIG file to point to the desired cluster
2. Set --params=name=${NAME},namespace=${NAMESPACE}
* name should be the name for your job
* namespace should be the namespace to use
3. Use the modelBasePath parameter to the model to test.
--params=...,modelBasePath=${MODEL_BASE_PATH}
pytest deploy_test.py -s ...
"""

import logging
import os
import subprocess
import pytest

from kubernetes import client as k8s_client
from kubeflow.tf_operator import test_runner #pylint: disable=no-name-in-module

from kubeflow.testing import test_util
from kubeflow.testing import util

# TODO(jlewi): Should we refactor this to use pytest like predict_test
# and not depend on test_runner.
class MnistDeployTest(test_util.TestCase):
def __init__(self, args):
namespace, name, env = test_runner.parse_runtime_params(args)
self.app_dir = args.app_dir

if not self.app_dir:
self.app_dir = os.path.join(os.path.dirname(__file__), "..",
"serving/GCS")
self.app_dir = os.path.abspath(self.app_dir)
logging.info("--app_dir not set defaulting to: %s", self.app_dir)

self.env = env
self.namespace = namespace
self.params = args.params
super(MnistDeployTest, self).__init__(class_name="MnistDeployTest",
name=name)

def test_serve(self):
# We repeat the test multiple times.
# This ensures that if we delete the job we can create a new job with the
# same name.
api_client = k8s_client.ApiClient()

# TODO (jinchihe) beflow code will be removed once new test-worker image
# is publish in https://github.com/kubeflow/testing/issues/373.
kusUrl = 'https://github.com/kubernetes-sigs/kustomize/' \
'releases/download/v2.0.3/kustomize_2.0.3_linux_amd64'
util.run(['wget', '-O', '/usr/local/bin/kustomize', kusUrl], cwd=self.app_dir)
util.run(['chmod', 'a+x', '/usr/local/bin/kustomize'], cwd=self.app_dir)

# Apply the components
configmap = 'mnist-map-serving'
for pair in self.params.split(","):
k, v = pair.split("=", 1)
if k == "namespace":
util.run(['kustomize', 'edit', 'set', k, v], cwd=self.app_dir)
else:
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=' + k + '=' + v], cwd=self.app_dir)

# Seems the util.run cannot handle pipes case, using check_call.
subCmd = 'kustomize build ' + self.app_dir + '| kubectl apply -f -'
subprocess.check_call(subCmd, shell=True)

util.wait_for_deployment(api_client, self.namespace, self.name,
timeout_minutes=4)

# We don't delete the resources. We depend on the namespace being
# garbage collected.

def test_deploy(record_xml_attribute, name, namespace, **kwargs):

util.set_pytest_junit(record_xml_attribute, "test_deploy")

util.maybe_activate_service_account()

app_dir = os.path.join(os.path.dirname(__file__), "..", "serving/GCS")
app_dir = os.path.abspath(app_dir)
logging.info("--app_dir not set defaulting to: %s", app_dir)

api_client = k8s_client.ApiClient()

# Configure custom parameters using kustomize
configmap = 'mnist-map-serving'
util.run(['kustomize', 'edit', 'set', 'namespace', namespace], cwd=app_dir)
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=name' + '=' + name], cwd=app_dir)

for key, value in kwargs.items():
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=' + key + '=' + value], cwd=app_dir)

# Apply the components
util.run(['kustomize', 'build', app_dir, '-o', 'generated.yaml'], cwd=app_dir)
util.run(['kubectl', 'apply', '-f', 'generated.yaml'], cwd=app_dir)

util.wait_for_deployment(api_client, namespace, name, timeout_minutes=4)

# We don't delete the resources. We depend on the namespace being
# garbage collected.

if __name__ == "__main__":
# TODO(jlewi): It looks like using test_runner we don't exit with an error
# if the deployment doesn't succeed. So the Argo workflow continues which
# isn't what we want. Might be a good reason to switch to using
# pytest.
test_runner.main(module=__name__)
logging.basicConfig(level=logging.INFO,
format=('%(levelname)s|%(asctime)s'
'|%(pathname)s|%(lineno)d| %(message)s'),
datefmt='%Y-%m-%dT%H:%M:%S',
)
logging.getLogger().setLevel(logging.INFO)
pytest.main()

204 changes: 91 additions & 113 deletions mnist/testing/tfjob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,129 +13,107 @@
* Provides utilities for testing
Manually running the test
1. Configure your KUBECONFIG file to point to the desired cluster
2. Set --params=name=${NAME},namespace=${NAMESPACE}
* name should be the name for your job
* namespace should be the namespace to use
3. To test a new image set the parameter image e.g
--params=name=${NAME},namespace=${NAMESPACE},image=${IMAGE}
4. To control how long it trains set sample_size and num_epochs
--params=trainSteps=10,batchSize=10,...
pytest tfjobs_test.py -s \
name=tfjobs-test-${BUILD_ID} \
namespace=${testNamespace} \
image=${trainerImage} \
trainSteps=10 \
batchSize=10 \
learningRate=0.01 \
numPs=1 \
numWorkers=2 \
modelDir=${modelDir} \
exportDir=${modelDir} \
"""

import json
import logging
import os
import subprocess
import pytest

from kubernetes import client as k8s_client
from kubeflow.tf_operator import tf_job_client #pylint: disable=no-name-in-module
from kubeflow.tf_operator import test_runner #pylint: disable=no-name-in-module

from kubeflow.testing import test_util
from kubeflow.testing import util

class TFJobTest(test_util.TestCase):
def __init__(self, args):
namespace, name, env = test_runner.parse_runtime_params(args)
self.app_dir = args.app_dir

if not self.app_dir:
self.app_dir = os.path.join(os.path.dirname(__file__), "..",
"training/GCS")
self.app_dir = os.path.abspath(self.app_dir)
logging.info("--app_dir not set defaulting to: %s", self.app_dir)

self.env = env
self.namespace = namespace
self.params = args.params
super(TFJobTest, self).__init__(class_name="TFJobTest", name=name)

def test_train(self):
# We repeat the test multiple times.
# This ensures that if we delete the job we can create a new job with the
# same name.
api_client = k8s_client.ApiClient()

# TODO (jinchihe) beflow code will be removed once new test-worker image
# is publish in https://github.com/kubeflow/testing/issues/373.
kusUrl = 'https://github.com/kubernetes-sigs/kustomize/' \
'releases/download/v2.0.3/kustomize_2.0.3_linux_amd64'
util.run(['wget', '-O', '/usr/local/bin/kustomize', kusUrl], cwd=self.app_dir)
util.run(['chmod', 'a+x', '/usr/local/bin/kustomize'], cwd=self.app_dir)

# Setup parameters for kustomize
configmap = 'mnist-map-training'
for pair in self.params.split(","):
k, v = pair.split("=", 1)
if k == "namespace":
util.run(['kustomize', 'edit', 'set', k, v], cwd=self.app_dir)
elif k == "image":
util.run(['kustomize', 'edit', 'set', k, 'training-image=' + v], cwd=self.app_dir)
elif k == "numPs":
util.run(['../base/definition.sh', '--numPs', v], cwd=self.app_dir)
elif k == "numWorkers":
util.run(['../base/definition.sh', '--numWorkers', v], cwd=self.app_dir)
elif k == "secret":
secretName, secretMountPath = v.split("=", 1)
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=secretName=' + secretName], cwd=self.app_dir)
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=secretMountPath=' + secretMountPath], cwd=self.app_dir)
elif k == "envVariables":
var_k, var_v = v.split("=", 1)
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=' + var_k + '=' + var_v], cwd=self.app_dir)
else:
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=' + k + '=' + v], cwd=self.app_dir)

# Create the TF job
# Seems the util.run cannot handle pipes case, using check_call.
subCmd = 'kustomize build ' + self.app_dir + '| kubectl apply -f -'
subprocess.check_call(subCmd, shell=True)
logging.info("Created job %s in namespaces %s", self.name, self.namespace)

# Wait for the job to complete.
logging.info("Waiting for job to finish.")
results = tf_job_client.wait_for_job(
api_client,
self.namespace,
self.name,
status_callback=tf_job_client.log_status)
logging.info("Final TFJob:\n %s", json.dumps(results, indent=2))

# Check for errors creating pods and services. Can potentially
# help debug failed test runs.
creation_failures = tf_job_client.get_creation_failures_from_tfjob(
api_client, self.namespace, results)
if creation_failures:
logging.warning(creation_failures)

if not tf_job_client.job_succeeded(results):
self.failure = "Job {0} in namespace {1} in status {2}".format( # pylint: disable=attribute-defined-outside-init
self.name, self.namespace, results.get("status", {}))
logging.error(self.failure)

# if the TFJob failed, print out the pod logs for debugging.
pod_names = tf_job_client.get_pod_names(
api_client, self.namespace, self.name)
logging.info("The Pods name:\n %s", pod_names)

core_api = k8s_client.CoreV1Api(api_client)

for pod in pod_names:
logging.info("Getting logs of Pod %s.", pod)
try:
pod_logs = core_api.read_namespaced_pod_log(pod, self.namespace)
logging.info("The logs of Pod %s log:\n %s", pod, pod_logs)
except k8s_client.rest.ApiException as e:
logging.info("Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n", e)
return

# We don't delete the jobs. We rely on TTLSecondsAfterFinished
# to delete old jobs. Leaving jobs around should make it
# easier to debug.
def test_training(record_xml_attribute, name, namespace,
image, numPs, numWorkers, **kwargs):

util.set_pytest_junit(record_xml_attribute, "test_mnist")

util.maybe_activate_service_account()

app_dir = os.path.join(os.path.dirname(__file__), "..", "training/GCS")
app_dir = os.path.abspath(app_dir)
logging.info("--app_dir not set defaulting to: %s", app_dir)

api_client = k8s_client.ApiClient()

# Configurate custom parameters using kustomize
configmap = 'mnist-map-training'
util.run(['kustomize', 'edit', 'set', 'namespace', namespace], cwd=app_dir)
util.run(['kustomize', 'edit', 'set', 'image', image], cwd=app_dir)
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=name' + '=' + name], cwd=app_dir)

util.run(['../base/definition.sh', '--numPs', numPs], cwd=app_dir)
util.run(['../base/definition.sh', '--numWorkers', numWorkers], cwd=app_dir)

for key, value in kwargs.items():
util.run(['kustomize', 'edit', 'add', 'configmap', configmap,
'--from-literal=' + key + '=' + value], cwd=app_dir)

# Created the TFJobs.
util.run(['kustomize', 'build', app_dir, '-o', 'generated.yaml'], cwd=app_dir)
util.run(['kubectl', 'apply', '-f', 'generated.yaml'], cwd=app_dir)
logging.info("Created job %s in namespaces %s", name, namespace)

# Wait for the job to complete.
logging.info("Waiting for job to finish.")
results = tf_job_client.wait_for_job(
api_client,
namespace,
name,
status_callback=tf_job_client.log_status)
logging.info("Final TFJob:\n %s", json.dumps(results, indent=2))

# Check for errors creating pods and services. Can potentially
# help debug failed test runs.
creation_failures = tf_job_client.get_creation_failures_from_tfjob(
api_client, namespace, results)
if creation_failures:
logging.warning(creation_failures)

if not tf_job_client.job_succeeded(results):
failure = "Job {0} in namespace {1} in status {2}".format( # pylint: disable=attribute-defined-outside-init
name, namespace, results.get("status", {}))
logging.error(failure)

# if the TFJob failed, print out the pod logs for debugging.
pod_names = tf_job_client.get_pod_names(
api_client, namespace, name)
logging.info("The Pods name:\n %s", pod_names)

core_api = k8s_client.CoreV1Api(api_client)

for pod in pod_names:
logging.info("Getting logs of Pod %s.", pod)
try:
pod_logs = core_api.read_namespaced_pod_log(pod, namespace)
logging.info("The logs of Pod %s log:\n %s", pod, pod_logs)
except k8s_client.rest.ApiException as e:
logging.info("Exception when calling CoreV1Api->read_namespaced_pod_log: %s\n", e)
return

# We don't delete the jobs. We rely on TTLSecondsAfterFinished
# to delete old jobs. Leaving jobs around should make it
# easier to debug.

if __name__ == "__main__":
test_runner.main(module=__name__)
logging.basicConfig(level=logging.INFO,
format=('%(levelname)s|%(asctime)s'
'|%(pathname)s|%(lineno)d| %(message)s'),
datefmt='%Y-%m-%dT%H:%M:%S',
)
logging.getLogger().setLevel(logging.INFO)
pytest.main()
22 changes: 11 additions & 11 deletions prow_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@ workflows:
include_dirs:
- code_search/*

# E2E test for mnist example
- app_dir: kubeflow/examples/test/workflows
component: mnist
name: mnist
job_types:
- periodic
- presubmit
- postsubmit
include_dirs:
- mnist/*

# E2E test for github issue summarization example
- app_dir: kubeflow/examples/test/workflows
component: gis
Expand Down Expand Up @@ -75,3 +64,14 @@ workflows:
include_dirs:
- xgboost_synthetic/*
- py/kubeflow/examples/create_e2e_workflow.py

# E2E test for mnist example
- py_func: kubeflow.examples.create_e2e_workflow.create_workflow
name: mnist
job_types:
- periodic
- presubmit
- postsubmit
include_dirs:
- mnist/*
- py/kubeflow/examples/create_e2e_workflow.py
Loading

0 comments on commit 9fdbc7b

Please sign in to comment.