Skip to content

Commit

Permalink
[AWS Sagemaker] aws-samples kmeans-hpo pipeline test (kubeflow#3905)
Browse files Browse the repository at this point in the history
* aws-samples kmeans-hpo pipeline test
	- code clean up
	- removed unused args

* add test dependency

* Trigger Build
  • Loading branch information
surajkota authored and Nicholas Thomson committed Jun 17, 2020
1 parent 9db2919 commit aa89fa3
Show file tree
Hide file tree
Showing 16 changed files with 67 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import pytest
import os
import json
import utils
from utils import kfp_client_utils
from utils import sagemaker_utils
from test_workteam_component import create_workteamjob
import time


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

@pytest.mark.parametrize(
"test_file_dir",
[pytest.param("resources/config/kmeans-mnist-hpo", marks=pytest.mark.canary_test)],
[
pytest.param(
"resources/config/kmeans-mnist-hpo", marks=pytest.mark.canary_test
),
"resources/config/aws-samples-hpo-spot-training",
],
)
def test_hyperparameter_tuning(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir
Expand All @@ -23,19 +28,10 @@ def test_hyperparameter_tuning(
os.path.join(download_dir, "config.yaml"),
)
)

test_params["Arguments"]["channels"] = json.dumps(
test_params["Arguments"]["channels"]
)
test_params["Arguments"]["static_parameters"] = json.dumps(
test_params["Arguments"]["static_parameters"]
)
test_params["Arguments"]["integer_parameters"] = json.dumps(
test_params["Arguments"]["integer_parameters"]
)
test_params["Arguments"]["categorical_parameters"] = json.dumps(
test_params["Arguments"]["categorical_parameters"]
)
if "job_name" in test_params["Arguments"]:
test_params["Arguments"]["job_name"] = (
utils.generate_random_string(5) + "-" + test_params["Arguments"]["job_name"]
)

_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
Expand Down Expand Up @@ -68,6 +64,8 @@ def test_hyperparameter_tuning(
print(f"HPO job name: {hpo_job_name}")
hpo_response = sagemaker_utils.describe_hpo_job(sagemaker_client, hpo_job_name)
assert hpo_response["HyperParameterTuningJobStatus"] == "Completed"
if "job_name" in test_params["Arguments"]:
assert hpo_response["HyperParameterTuningJobName"] == hpo_job_name

# Verify training image output is an ECR image
training_image = utils.read_from_file_in_tar(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest
import os
import json
import utils
from utils import kfp_client_utils
from utils import minio_utils
Expand Down Expand Up @@ -33,12 +32,6 @@ def test_trainingjob(
)
)

test_params["Arguments"]["hyperparameters"] = json.dumps(
test_params["Arguments"]["hyperparameters"]
)
test_params["Arguments"]["channels"] = json.dumps(
test_params["Arguments"]["channels"]
)
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pytest
import os
import json
import utils
from utils import kfp_client_utils
from utils import sagemaker_utils
Expand Down Expand Up @@ -58,7 +57,7 @@ def test_workteamjob(

outputs = {"sagemaker-private-workforce": ["workteam_arn"]}

try:
try:
output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ dependencies:
- kfp==0.5.*
- minio==5.0.10
- sagemaker==1.56.*
- ruamel.yaml==0.16.*


Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
PipelineDefinition: ../../../../../samples/contrib/aws-samples/mnist-kmeans-sagemaker/kmeans-hpo-pipeline.py
TestName: aws-samples-hpo-spot-training
Timeout: 7200
ExpectedTrainingImage: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
Arguments:
job_name: HPO-kmeans-sample
region: ((REGION))
channels:
- ChannelName: train
DataSource:
S3DataSource:
S3Uri: s3://((DATA_BUCKET))/mnist_kmeans_example/train_data
S3DataType: S3Prefix
S3DataDistributionType: FullyReplicated
CompressionType: None
RecordWrapperType: None
InputMode: File
- ChannelName: test
DataSource:
S3DataSource:
S3Uri: s3://((DATA_BUCKET))/mnist_kmeans_example/test_data
S3DataType: S3Prefix
S3DataDistributionType: FullyReplicated
CompressionType: None
RecordWrapperType: None
InputMode: File
output_location: s3://((DATA_BUCKET))/mnist_kmeans_example/output
spot_instance: True
checkpoint_config:
S3Uri: s3://((DATA_BUCKET))/mnist_kmeans_example/output/checkpoints/
role_arn: ((ROLE_ARN))
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PipelineDefinition: resources/definition/groundtruth_pipeline.py
TestName: image-classification-groundtruth
Timeout: 1200
Timeout: 300
StatusToCheck: 'running'
Arguments:
region: ((REGION))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

sagemaker_model_op = components.load_component_from_file("../../model/component.yaml")
sagemaker_deploy_op = components.load_component_from_file("../../deploy/component.yaml")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

sagemaker_model_op = components.load_component_from_file("../../model/component.yaml")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import kfp
import json
import copy
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

sagemaker_gt_op = components.load_component_from_file(
"../../ground_truth/component.yaml"
Expand Down Expand Up @@ -34,7 +31,7 @@ def ground_truth_test(
workteam_arn="",
):

ground_truth_train = sagemaker_gt_op(
sagemaker_gt_op(
region=region,
role=role,
job_name=ground_truth_train_job_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret


sagemaker_hpo_op = components.load_component_from_file(
"../../hyperparameter_tuning/component.yaml"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

sagemaker_model_op = components.load_component_from_file("../../model/component.yaml")
sagemaker_batch_transform_op = components.load_component_from_file(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
#!/usr/bin/env python3

import kfp
import json
import copy
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

sagemaker_workteam_op = components.load_component_from_file(
"../../workteam/component.yaml"
Expand All @@ -20,7 +17,7 @@ def workteam_test(
region="", team_name="", description="", user_pool="", user_groups="", client_id=""
):

workteam = sagemaker_workteam_op(
sagemaker_workteam_op(
region=region,
team_name=team_name,
description=description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import subprocess
import pytest
import tarfile
import yaml
from ruamel.yaml import YAML
import random
import string
import shutil
Expand Down Expand Up @@ -93,7 +93,8 @@ def replace_placeholders(input_filename, output_filename):

def load_params(file_name):
with open(file_name, "r") as f:
return yaml.safe_load(f)
yaml = YAML(typ="safe")
return yaml.load(f)


def generate_random_string(length):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import utils
import pytest
import time

from utils import argo_utils

Expand All @@ -24,23 +23,17 @@ def compile_and_run_pipeline(
return run.id


def wait_for_job_completion(client, run_id, timeout, status_to_check):
response = client.wait_for_run_completion(run_id, timeout)
status = None
if response.run.status:
status = response.run.status.lower() == status_to_check
return status


def wait_for_job_status(client, run_id, timeout, status_to_check="succeeded"):
if status_to_check == "succeeded":
status = wait_for_job_completion(client, run_id, timeout, status_to_check)
else:
time.sleep(timeout)
response = None
try:
response = client.wait_for_run_completion(run_id, timeout)
except TimeoutError:
print(f"run-id: {run_id} did not stop within specified timeout")
response = client.get_run(run_id)
status = None
if response.run.status:
status = response.run.status.lower() == status_to_check

status = False
if response and response.run.status:
status = response.run.status.lower() == status_to_check
return status


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

import kfp
import json
import os
import copy
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

sagemaker_hpo_op = components.load_component_from_file('../../../../components/aws/sagemaker/hyperparameter_tuning/component.yaml')

components_dir = os.path.join(os.path.dirname(__file__), '../../../../components/aws/sagemaker/')
sagemaker_hpo_op = components.load_component_from_file(os.path.join(components_dir, 'hyperparameter_tuning/component.yaml'))


channelObjList = []
Expand Down Expand Up @@ -40,7 +43,7 @@
description='SageMaker hyperparameter tuning job test'
)
def hpo_test(region='us-east-1',
hpo_job_name='HPO-kmeans-sample',
job_name='HPO-kmeans-sample',
image='',
algorithm_name='K-Means',
training_input_mode='File',
Expand Down Expand Up @@ -81,7 +84,7 @@ def hpo_test(region='us-east-1',
training = sagemaker_hpo_op(
region=region,
endpoint_url=endpoint_url,
job_name=hpo_job_name,
job_name=job_name,
image=image,
training_input_mode=training_input_mode,
algorithm_name=algorithm_name,
Expand Down

0 comments on commit aa89fa3

Please sign in to comment.