-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SageMaker HPO component and sample usage in a pipeline #1628
Changes from 3 commits
de5d933
1624a45
4a35ab5
4dc7b7a
e6fb6b0
71f55e0
93eb0cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,11 +19,16 @@ | |
import string | ||
import random | ||
import json | ||
import yaml | ||
from urlparse import urlparse | ||
|
||
import boto3 | ||
from botocore.exceptions import ClientError | ||
from sagemaker import get_execution_role | ||
from sagemaker.amazon.amazon_estimator import get_image_uri | ||
|
||
import logging | ||
logging.getLogger().setLevel(logging.INFO) | ||
|
||
def get_client(region=None): | ||
"""Builds a client to the AWS SageMaker API.""" | ||
|
@@ -53,10 +58,10 @@ def create_training_job(client, image, instance_type, instance_count, volume_siz | |
"VolumeSizeInGB": volume_size | ||
}, | ||
"TrainingJobName": job_name, | ||
"HyperParameters": { | ||
"k": "10", | ||
"feature_dim": "784", | ||
"mini_batch_size": "500" | ||
"HyperParameters": { | ||
"k": "10", | ||
"feature_dim": "784", | ||
"mini_batch_size": "500" | ||
}, | ||
"StoppingCondition": { | ||
"MaxRuntimeInSeconds": 60 * 60 | ||
|
@@ -221,5 +226,174 @@ def print_tranformation_job_result(output_location): | |
results = f.readlines() | ||
print("Sample transform result: {}".format(results[0])) | ||
|
||
|
||
def create_hyperparameter_tuning_job_request(args): | ||
with open('/app/common/hpo.template.yaml', 'r') as f: | ||
request = yaml.safe_load(f) | ||
|
||
built_in_algos = { | ||
'blazingtext': 'blazingtext', | ||
carolynwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
'deepar forecasting': 'forecasting-deepar', | ||
'factorization machines': 'factorization-machines', | ||
'image classification': 'image-classification', | ||
'ip insights': 'ipinsights', | ||
'k-means': 'kmeans', | ||
'k-nearest neighbors': 'knn', | ||
'k-nn': 'knn', | ||
'lda': 'lda', | ||
'linear learner': 'linear-learner', | ||
'neural topic model': 'ntm', | ||
'object2vec': 'object2vec', | ||
'object detection': 'object-detection', | ||
'pca': 'pca', | ||
'random cut forest': 'randomcutforest', | ||
'semantic segmentation': 'semantic-segmentation', | ||
'sequence to sequence': 'seq2seq', | ||
'seq2seq modeling': 'seq2seq', | ||
'xgboost': 'xgboost' | ||
} | ||
|
||
### Create a hyperparameter tuning job | ||
if args['job_name']: | ||
hpo_job_name = args['job_name'] | ||
else: | ||
hpo_job_name = "HPOJob-" + strftime("%Y%m%d%H%M%S", gmtime()) + '-' + id_generator() | ||
carolynwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
request['HyperParameterTuningJobName'] = hpo_job_name | ||
|
||
request['HyperParameterTuningJobConfig']['Strategy'] = args['strategy'] | ||
request['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['Type'] = args['metric_type'] | ||
request['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['MetricName'] = args['metric_name'] | ||
request['HyperParameterTuningJobConfig']['ResourceLimits']['MaxNumberOfTrainingJobs'] = args['max_num_jobs'] | ||
request['HyperParameterTuningJobConfig']['ResourceLimits']['MaxParallelTrainingJobs'] = args['max_parallel_jobs'] | ||
request['HyperParameterTuningJobConfig']['ParameterRanges']['IntegerParameterRanges'] = args['integer_parameters'] | ||
request['HyperParameterTuningJobConfig']['ParameterRanges']['ContinuousParameterRanges'] = args['continuous_parameters'] | ||
request['HyperParameterTuningJobConfig']['ParameterRanges']['CategoricalParameterRanges'] = args['categorical_parameters'] | ||
request['HyperParameterTuningJobConfig']['TrainingJobEarlyStoppingType'] = args['early_stopping_type'] | ||
|
||
request['TrainingJobDefinition']['StaticHyperParameters'] = args['static_parameters'] | ||
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingInputMode'] = args['training_input_mode'] | ||
|
||
# TODO: determine if algorithm name or training image is used for algorithms from AWS marketplace | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems you cover this from line 279-300 Remove #TODO if it's done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as other TODOs |
||
### Update training image (for BYOC) or algorithm resource name | ||
if not args['image'] and not args['algorithm_name']: | ||
logging.error('Please specify training image or algorithm name.') | ||
raise Exception('Could not create job request') | ||
if args['image'] and args['algorithm_name']: | ||
logging.error('Both image and algorithm name inputted, only one allowed. Proceeding with image.') | ||
|
||
if args['image']: | ||
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingImage'] = args['image'] | ||
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('AlgorithmName') | ||
else: | ||
# TODO: determine if users can make custom algorithm resources that have the same name as built-in algorithm names | ||
algo_name = args['algorithm_name'].lower().strip() | ||
if algo_name in built_in_algos.keys(): | ||
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingImage'] = get_image_uri(args['region'], built_in_algos[algo_name]) | ||
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('AlgorithmName') | ||
# Just to give the user more leeway for built-in algorithm name inputs | ||
elif algo_name in built_in_algos.values(): | ||
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingImage'] = get_image_uri(args['region'], algo_name) | ||
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('AlgorithmName') | ||
else: | ||
request['TrainingJobDefinition']['AlgorithmSpecification']['AlgorithmName'] = args['algorithm_name'] | ||
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('TrainingImage') | ||
|
||
### Update metric definitions | ||
if args['metric_definitions']: | ||
for key, val in args['metric_definitions'].items(): | ||
request['TrainingJobDefinition']['AlgorithmSpecification']['MetricDefinitions'].append({'Name': key, 'Regex': val}) | ||
else: | ||
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('MetricDefinitions') | ||
|
||
# TODO: clarify if both security group ids and subnets are required for vpc config | ||
### Update or pop VPC configs | ||
if args['vpc_security_group_ids'] and args['vpc_subnets']: | ||
request['TrainingJobDefinition']['VpcConfig']['SecurityGroupIds'] = [args['vpc_security_group_ids']] | ||
request['TrainingJobDefinition']['VpcConfig']['Subnets'] = [args['vpc_subnets']] | ||
else: | ||
request['TrainingJobDefinition'].pop('VpcConfig') | ||
|
||
### Update input channels, must have at least one specified | ||
if len(args['channels']) > 0: | ||
request['TrainingJobDefinition']['InputDataConfig'] = args['channels'] | ||
else: | ||
logging.error("Must specify at least one input channel.") | ||
raise Exception('Could not make job request') | ||
|
||
request['TrainingJobDefinition']['OutputDataConfig']['S3OutputPath'] = args['output_location'] | ||
request['TrainingJobDefinition']['OutputDataConfig']['KmsKeyId'] = args['output_encryption_key'] | ||
request['TrainingJobDefinition']['ResourceConfig']['InstanceType'] = args['instance_type'] | ||
request['TrainingJobDefinition']['ResourceConfig']['InstanceCount'] = args['instance_count'] | ||
request['TrainingJobDefinition']['ResourceConfig']['VolumeSizeInGB'] = args['volume_size'] | ||
request['TrainingJobDefinition']['ResourceConfig']['VolumeKmsKeyId'] = args['resource_encryption_key'] | ||
request['TrainingJobDefinition']['StoppingCondition']['MaxRuntimeInSeconds'] = args['max_run_time'] | ||
request['TrainingJobDefinition']['EnableNetworkIsolation'] = args['network_isolation'] | ||
request['TrainingJobDefinition']['EnableInterContainerTrafficEncryption'] = args['traffic_encryption'] | ||
request['TrainingJobDefinition']['RoleArn'] = args['role'] | ||
|
||
### Update or pop warm start configs | ||
if args['warm_start_type'] and args['parent_hpo_jobs']: | ||
request['WarmStartConfig']['WarmStartType'] = args['warm_start_type'] | ||
parent_jobs = [n.strip() for n in args['parent_hpo_jobs'].split(',')] | ||
for i in range(len(parent_jobs)): | ||
request['WarmStartConfig']['ParentHyperParameterTuningJobs'].append({'HyperParameterTuningJobName': parent_jobs[i]}) | ||
else: | ||
if args['warm_start_type'] or args['parent_hpo_jobs']: | ||
if not args['warm_start_type']: | ||
logging.error('Must specify warm start type as either "IdenticalDataAndAlgorithm" or "TransferLearning".') | ||
if not args['parent_hpo_jobs']: | ||
logging.error("Must specify at least one parent hyperparameter tuning job") | ||
# TODO: determine what should actually happen here | ||
# raise Exception('Could not create job request') | ||
logging.info("Proceeding without warm start") | ||
request.pop('WarmStartConfig') | ||
|
||
### Update tags | ||
for key, val in args['tags'].items(): | ||
request['Tags'].append({'Key': key, 'Value': val}) | ||
|
||
return request | ||
|
||
|
||
def create_hyperparameter_tuning_job(client, args): | ||
"""Create a Sagemaker HPO job""" | ||
request = create_hyperparameter_tuning_job_request(args) | ||
try: | ||
job_arn = client.create_hyper_parameter_tuning_job(**request) | ||
hpo_job_name = request['HyperParameterTuningJobName'] | ||
logging.info("Created Hyperparameter Training Job with name: " + hpo_job_name) | ||
logging.info("URL: https://{}.console.aws.amazon.com/sagemaker/home?region={}#/hyper-tuning-jobs/{}".format(args['region'], args['region'], hpo_job_name)) | ||
return hpo_job_name | ||
except ClientError as e: | ||
raise Exception(e.response['Error']['Message']) | ||
|
||
|
||
def wait_for_hyperparameter_training_job(client, hpo_job_name): | ||
### Wait until the job finishes | ||
while(True): | ||
response = client.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=hpo_job_name) | ||
status = response['HyperParameterTuningJobStatus'] | ||
if status == 'Completed': | ||
logging.info("Hyperparameter tuning job ended with status: " + status) | ||
break | ||
if status == 'Failed': | ||
message = response['FailureReason'] | ||
logging.error('Hyperparameter tuning failed with the following error: {}'.format(message)) | ||
raise Exception('Hyperparameter tuning job failed') | ||
logging.info("Hyperparameter tuning job is still in status: " + status) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the job has stopped then it will go in infinite loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes that's right. Eventually if/when stopping the job when the run is terminated is implemented, the job can also be Stopping/Stopped state but the pod+container+script should be terminated anyway |
||
time.sleep(30) | ||
|
||
|
||
def get_best_training_job_and_hyperparameters(client, hpo_job_name): | ||
### Get and return best training job and its hyperparameters, without the objective metric | ||
info = client.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=hpo_job_name) | ||
best_job = info['BestTrainingJob']['TrainingJobName'] | ||
training_info = client.describe_training_job(TrainingJobName=best_job) | ||
train_hyperparameters = training_info['HyperParameters'] | ||
train_hyperparameters.pop('_tuning_objective_metric') | ||
return best_job, train_hyperparameters | ||
|
||
|
||
def id_generator(size=4, chars=string.ascii_uppercase + string.digits): | ||
return ''.join(random.choice(chars) for _ in range(size)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
HyperParameterTuningJobName: '' | ||
HyperParameterTuningJobConfig: | ||
Strategy: '' | ||
HyperParameterTuningJobObjective: | ||
Type: '' | ||
MetricName: '' | ||
ResourceLimits: | ||
MaxNumberOfTrainingJobs: 0 | ||
MaxParallelTrainingJobs: 0 | ||
ParameterRanges: | ||
IntegerParameterRanges: [] | ||
ContinuousParameterRanges: [] | ||
CategoricalParameterRanges: [] | ||
TrainingJobEarlyStoppingType: '' | ||
TrainingJobDefinition: | ||
StaticHyperParameters: {} | ||
AlgorithmSpecification: | ||
TrainingImage: '' | ||
TrainingInputMode: '' | ||
AlgorithmName: '' | ||
MetricDefinitions: [] | ||
RoleArn: '' | ||
InputDataConfig: [] | ||
VpcConfig: | ||
SecurityGroupIds: [] | ||
Subnets: [] | ||
OutputDataConfig: | ||
KmsKeyId: '' | ||
S3OutputPath: '' | ||
ResourceConfig: | ||
InstanceType: '' | ||
InstanceCount: 0 | ||
VolumeSizeInGB: 0 | ||
VolumeKmsKeyId: '' | ||
StoppingCondition: | ||
MaxRuntimeInSeconds: 0 | ||
EnableNetworkIsolation: True | ||
EnableInterContainerTrafficEncryption: False | ||
WarmStartConfig: | ||
ParentHyperParameterTuningJobs: [] | ||
WarmStartType: '' | ||
Tags: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
# SageMaker hyperparameter optimization Kubeflow Pipeline component | ||
## Summary | ||
Component to submit hyperparameter tuning jobs to SageMaker directly from a Kubeflow Pipelines workflow. | ||
|
||
# Details | ||
|
||
## Intended Use | ||
For hyperparameter tuning jobs using AWS SageMaker. | ||
|
||
## Runtime Arguments | ||
Argument | Description | Optional | Data type | Accepted values | Default | | ||
:--- | :---------- | :----------| :----------| :---------- | :----------| | ||
region | The region where the cluster launches | No | String | | | | ||
job_name | The name of the tuning job. Must be unique within the same AWS account and AWS region | Yes | String | | | | ||
image | The registry path of the Docker image that contains the training algorithm | No | String | | | | ||
role | The Amazon Resource Name (ARN) that Amazon SageMaker assumes to perform tasks on your behalf | No | String | | | | ||
algorithm_name | The name of the algorithm resource to use for the hyperparameter tuning job; only specify this parameter if training image is not specified | Yes | String | | | | ||
training_input_mode | The input mode that the algorithm supports | No | String | File, Pipe | File | | ||
metric_definitions | The dictionary of name-regex pairs specify the metrics that the algorithm emits | Yes | Dict | | {} | | ||
strategy | How hyperparameter tuning chooses the combinations of hyperparameter values to use for the training job it launches | No | String | Bayesian, Random | Bayesian | | ||
metric_name | The name of the metric to use for the objective metric | No | String | | | | ||
metric_type | Whether to minimize or maximize the objective metric | No | String | Maximize, Minimize | | | ||
early_stopping_type | Whether to minimize or maximize the objective metric | No | String | Off, Auto | Off | | ||
static_parameters | The values of hyperparameters that do not change for the tuning job | Yes | Dict | | {} | | ||
integer_parameters | The array of IntegerParameterRange objects that specify ranges of integer hyperparameters that you want to search | Yes | List of Dicts | | [] | | ||
continuous_parameters | The array of ContinuousParameterRange objects that specify ranges of continuous hyperparameters that you want to search | Yes | List of Dicts | | [] | | ||
categorical_parameters | The array of CategoricalParameterRange objects that specify ranges of categorical hyperparameters that you want to search | Yes | List of Dicts | | [] | | ||
channels | A list of dicts specifying the input channels (at least one); refer to documentation for parameters | No | List of Dicts | | [{}] | | ||
output_location | The Amazon S3 path where you want Amazon SageMaker to store the results of the transform job | No | String | | | | ||
output_encryption_key | The AWS KMS key that Amazon SageMaker uses to encrypt the model artifacts | Yes | String | | | | ||
instance_type | The ML compute instance type | No | String | ml.m4.xlarge, ml.m4.2xlarge, ml.m4.4xlarge, ml.m4.10xlarge, ml.m4.16xlarge, ml.m5.large, ml.m5.xlarge, ml.m5.2xlarge, ml.m5.4xlarge, ml.m5.12xlarge, ml.m5.24xlarge, ml.c4.xlarge, ml.c4.2xlarge, ml.c4.4xlarge, ml.c4.8xlarge, ml.p2.xlarge, ml.p2.8xlarge, ml.p2.16xlarge, ml.p3.2xlarge, ml.p3.8xlarge, ml.p3.16xlarge, ml.c5.xlarge, ml.c5.2xlarge, ml.c5.4xlarge, ml.c5.9xlarge, ml.c5.18xlarge | ml.m4.xlarge | | ||
instance_count | The number of ML compute instances to use in each training job | No | Int | ≥ 1 | 1 | | ||
volume_size | The size of the ML storage volume that you want to provision in GB | No | Int | ≥ 1 | 1 | | ||
max_num_jobs | The maximum number of training jobs that a hyperparameter tuning job can launch | No | Int | [1, 500] | | | ||
max_parallel_jobs | The maximum number of concurrent training jobs that a hyperparameter tuning job can launch | No | Int | [1, 10] | | | ||
max_run_time | The maximum run time in seconds per training job | No | Int | ≤ 432000 (5 days) | 86400 (1 day) | | ||
resource_encryption_key | The AWS KMS key that Amazon SageMaker uses to encrypt data on the storage volume attached to the ML compute instance(s) | Yes | String | | | | ||
vpc_security_group_ids | The VPC security group IDs, in the form sg-xxxxxxxx | Yes | String | | | | ||
vpc_subnets | The ID of the subnets in the VPC to which you want to connect your hpo job | Yes | String | | | | ||
network_isolation | Isolates the training container if true | Yes | Boolean | False, True | True | | ||
traffic_encryption | Encrypts all communications between ML compute instances in distributed training if true | Yes | Boolean | False, True | False | | ||
warm_start_type | Specifies the type of warm start used | Yes | String | IdenticalDataAndAlgorithm, TransferLearning | | | ||
parent_hpo_jobs | List of previously completed or stopped hyperparameter tuning jobs to be used as a starting point | Yes | String | Yes | | | | ||
tags | Key-value pairs to categorize AWS resources | Yes | Dict | | {} | | ||
|
||
## Outputs | ||
Name | Description | ||
:--- | :---------- | ||
model_artifact_url | URL where model artifacts were stored | ||
best_job_name | Best hyperparameter tuning training job name | ||
best_hyperparameters | Tuned hyperparameters | ||
|
||
# Requirements | ||
* Kubeflow pipelines SDK: https://www.kubeflow.org/docs/pipelines/sdk/install-sdk/ | ||
* Kubeflow set-up | ||
|
||
# Samples | ||
## On its own | ||
K-Means algorithm tuning on MNIST dataset: /kubeflow/pipelines/samples/aws-samples/mnist-kmeans-sagemaker/kmeans-hpo-pipeline.py | ||
|
||
Follow the same steps as in the README for the MNIST classification pipeline: | ||
carolynwang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
1. Get and store data in S3 buckets | ||
2. Prepare an IAM roles with permissions to run SageMaker jobs | ||
3. Add 'aws-secret' to your kubeflow namespace | ||
4. Compile the pipeline: | ||
```bash | ||
dsl-compile --py kmeans-hpo-pipeline.py --output kmeans-hpo-pipeline.tar.gz | ||
``` | ||
5. In the Kubeflow UI, upload the compiled pipeline specification (the .tar.gz file) and create a new run. Update the role_arn and the data paths, and optionally any other run parameters. | ||
6. Once the pipeline completes, you can see the outputs under 'Output parameters' in the HPO component's Input/Output section. | ||
|
||
## Integrated into a pipeline | ||
MNIST Classification using K-Means pipeline: [Coming Soon] | ||
|
||
# Resources | ||
* Using Amazon built-in algorithms https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's a revision version change, I don't have concern. Better to run exiting workflows to make sure this boto version works for all previous jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've run a pipeline using the existing components using this version of boto and they still work