diff --git a/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile b/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile index a5c26d158..718a3d7bc 100644 --- a/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile +++ b/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile @@ -26,7 +26,7 @@ RUN pip install tensorflow-probability==0.5 RUN pip install tensor2tensor==1.11.0 RUN pip install tensorflow_hub==0.1.1 RUN pip install pyyaml==3.12 six==1.11.0 -RUN pip install google-cloud-storage +RUN pip install google-cloud-storage pathlib2 RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \ unzip -qq google-cloud-sdk.zip -d /tools && \ diff --git a/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml b/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml index 5d1e97ef0..30c00f916 100644 --- a/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml +++ b/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml @@ -20,9 +20,6 @@ metadata: labels: add-pod-env: 'true' inputs: - - name: working_dir - description: '...' - type: GCSPath - name: data_dir description: '...' type: GCSPath @@ -35,15 +32,19 @@ inputs: - name: action description: '...' type: String +outputs: + - name: copy_output_path + description: '...' + type: GCSPath implementation: container: - image: gcr.io/google-samples/ml-pipeline-t2ttrain:v2ap + image: gcr.io/google-samples/ml-pipeline-t2ttrain:v3ap args: [ --data-dir, {inputValue: data_dir}, --checkpoint-dir, {inputValue: checkpoint_dir}, --action, {inputValue: action}, - --working-dir, {inputValue: working_dir}, - --model-dir, {inputValue: model_dir} + --model-dir, {inputValue: model_dir}, + --copy-output-path, {outputPath: copy_output_path} ] env: KFP_POD_NAME: "{{pod.name}}" diff --git a/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py b/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py index e54120308..9e0daabd4 100644 --- a/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py +++ b/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py @@ -22,18 +22,15 @@ from urlparse import urlparse from google.cloud import storage +import pathlib2 -# location of the model checkpoint from which we'll start our training -SOURCE_BUCKET = 'aju-dev-demos-codelabs' -PREFIX = 'kubecon/model_output_tbase.bak2019000/' COPY_ACTION = 'copy_data' TRAIN_ACTION = 'train' PROBLEM = 'gh_problem' OUTPUT_PATH = '/tmp/output' - def copy_blob(storage_client, source_bucket, source_blob, target_bucket_name, new_blob_name, new_blob_prefix, prefix): """Copies a blob from one bucket to another with a new name.""" @@ -49,17 +46,26 @@ def copy_blob(storage_client, source_bucket, source_blob, target_bucket_name, ne str(source_blob.name), str(source_bucket.name), str(new_blob.name), str(target_bucket.name)) -def copy_checkpoint(new_blob_prefix, target_bucket): +def copy_checkpoint(checkpoint_dir, model_dir): """Copy an existing model checkpoint directory to the working directory for the workflow, so that the training can start from that point. """ storage_client = storage.Client() - source_bucket = storage_client.bucket(SOURCE_BUCKET) retries = 10 + source_bucket_string = urlparse(checkpoint_dir).netloc + source_prefix = checkpoint_dir.replace('gs://' + source_bucket_string + '/', '') + logging.info("source bucket %s and prefix %s", source_bucket_string, source_prefix) + source_bucket = storage_client.bucket(source_bucket_string) + + target_bucket = urlparse(model_dir).netloc + logging.info("target bucket: %s", target_bucket) + new_blob_prefix = model_dir.replace('gs://' + target_bucket + '/', '') + logging.info("new_blob_prefix: %s", new_blob_prefix) + # Lists objects with the given prefix. - blob_list = list(source_bucket.list_blobs(prefix=PREFIX)) + blob_list = list(source_bucket.list_blobs(prefix=source_prefix)) logging.info('Copying files:') for blob in blob_list: sleeptime = 0.1 @@ -68,7 +74,7 @@ def copy_checkpoint(new_blob_prefix, target_bucket): logging.info('copying %s; retry %s', blob.name, num_retries) try: copy_blob(storage_client, source_bucket, blob, target_bucket, blob.name, new_blob_prefix, - PREFIX) + source_prefix) break except Exception as e: #pylint: disable=broad-except logging.warning(e) @@ -97,7 +103,6 @@ def run_training(args, data_dir, model_dir, problem): # print(result2) # then export the model... - model_export_command = ['t2t-exporter', '--model', 'transformer', '--hparams_set', 'transformer_prepend', '--problem', problem, @@ -124,17 +129,21 @@ def main(): help='...', required=True) parser.add_argument( - '--working-dir', + '--data-dir', help='...', required=True) parser.add_argument( - '--data-dir', + '--copy-output-path', help='...', - required=True) + ) parser.add_argument( + '--train-output-path', + help='...', + ) + parser.add_argument( # used for the copy step only '--checkpoint-dir', help='...', - required=True) + required=False) parser.add_argument( '--train-steps', help='...') @@ -145,34 +154,37 @@ def main(): args = parser.parse_args() - # Create metadata.json file for visualization. - metadata = { - 'outputs' : [{ - 'type': 'tensorboard', - 'source': args.model_dir, - }] - } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) - data_dir = args.data_dir logging.info("data dir: %s", data_dir) - - # model_startpoint = args.checkpoint_dir - logging.info("model_startpoint: %s", args.checkpoint_dir) model_dir = args.model_dir logging.info("model_dir: %s", model_dir) if args.action.lower() == COPY_ACTION: - # copy over the checkpoint directory - target_bucket = urlparse(args.working_dir).netloc - logging.info("target bucket: %s", target_bucket) - new_blob_prefix = model_dir.replace('gs://' + target_bucket + '/', '') - logging.info("new_blob_prefix: %s", new_blob_prefix) - copy_checkpoint(new_blob_prefix, target_bucket) + logging.info("model starting checkpoint: %s", args.checkpoint_dir) + copy_checkpoint(args.checkpoint_dir, model_dir) + # write the model dir path as an output param + logging.info("copy_output_path: %s", args.copy_output_path) + pathlib2.Path(args.copy_output_path).parent.mkdir(parents=True) + pathlib2.Path(args.copy_output_path).write_text(model_dir.decode('utf-8')) + elif args.action.lower() == TRAIN_ACTION: # launch the training job run_training(args, data_dir, model_dir, PROBLEM) + # write the model export path as an output param + logging.info("train_output_path: %s", args.train_output_path) + pathlib2.Path(args.train_output_path).parent.mkdir(parents=True) + export_dir = '%s/export' % model_dir + pathlib2.Path(args.train_output_path).write_text(export_dir.decode('utf-8')) + # Create metadata.json file for Tensorboard 'artifact' + metadata = { + 'outputs' : [{ + 'type': 'tensorboard', + 'source': model_dir, + }] + } + with open('/mlpipeline-ui-metadata.json', 'w') as f: + json.dump(metadata, f) + else: logging.warning("Error: unknown action mode %s", args.action) diff --git a/github_issue_summarization/pipelines/components/t2t/train_component.yaml b/github_issue_summarization/pipelines/components/t2t/train_component.yaml index 197e493bd..1017847d9 100644 --- a/github_issue_summarization/pipelines/components/t2t/train_component.yaml +++ b/github_issue_summarization/pipelines/components/t2t/train_component.yaml @@ -23,16 +23,10 @@ inputs: - name: train_steps description: '...' type: Integer - default: '2019300' - - name: working_dir - description: '...' - type: GCSPath + default: 2019300 - name: data_dir description: '...' type: GCSPath - - name: checkpoint_dir - description: '...' - type: GCSPath - name: model_dir description: '...' type: GCSPath @@ -43,22 +37,27 @@ inputs: description: '...' type: String outputs: - - name: output + - name: launch_server description: '...' type: String + - name: train_output_path + description: '...' + type: GCSPath + - name: MLPipeline UI metadata + type: UI metadata implementation: container: - image: gcr.io/google-samples/ml-pipeline-t2ttrain:v2ap + image: gcr.io/google-samples/ml-pipeline-t2ttrain:v3ap args: [ --data-dir, {inputValue: data_dir}, - --checkpoint-dir, {inputValue: checkpoint_dir}, --action, {inputValue: action}, - --working-dir, {inputValue: working_dir}, --model-dir, {inputValue: model_dir}, --train-steps, {inputValue: train_steps}, - --deploy-webapp, {inputValue: deploy_webapp} + --deploy-webapp, {inputValue: deploy_webapp}, + --train-output-path, {outputPath: train_output_path} ] env: KFP_POD_NAME: "{{pod.name}}" fileOutputs: - output: /tmp/output + launch_server: /tmp/output + MLPipeline UI metadata: /mlpipeline-ui-metadata.json diff --git a/github_issue_summarization/pipelines/example_pipelines/gh_summ.py b/github_issue_summarization/pipelines/example_pipelines/gh_summ.py index bf0bde1a1..b0f849180 100644 --- a/github_issue_summarization/pipelines/example_pipelines/gh_summ.py +++ b/github_issue_summarization/pipelines/example_pipelines/gh_summ.py @@ -16,6 +16,7 @@ import kfp.dsl as dsl import kfp.gcp as gcp import kfp.components as comp +from kfp.dsl.types import GCSPath, String COPY_ACTION = 'copy_data' @@ -25,11 +26,11 @@ MODEL = 'model' copydata_op = comp.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml' # pylint: disable=line-too-long + 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml' # pylint: disable=line-too-long ) train_op = comp.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long + 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long ) metadata_log_op = comp.load_component_from_url( @@ -41,37 +42,34 @@ description='Demonstrate Tensor2Tensor-based training and TF-Serving' ) def gh_summ( #pylint: disable=unused-argument - train_steps=2019300, - project='YOUR_PROJECT_HERE', - github_token='YOUR_GITHUB_TOKEN_HERE', - working_dir='YOUR_GCS_DIR_HERE', - checkpoint_dir='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000', - deploy_webapp='true', - data_dir='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/' + train_steps: 'Integer' = 2019300, + project: String = 'YOUR_PROJECT_HERE', + github_token: String = 'YOUR_GITHUB_TOKEN_HERE', + working_dir: GCSPath = 'gs://YOUR_GCS_DIR_HERE', + checkpoint_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000/', + deploy_webapp: String = 'true', + data_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/' ): copydata = copydata_op( - working_dir=working_dir, data_dir=data_dir, checkpoint_dir=checkpoint_dir, - model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'), - action=COPY_ACTION + model_dir='%s/%s/model_output' % (working_dir, dsl.RUN_ID_PLACEHOLDER), + action=COPY_ACTION, ).apply(gcp.use_gcp_secret('user-gcp-sa')) log_dataset = metadata_log_op( log_type=DATASET, workspace_name=WORKSPACE_NAME, - run_name='{{workflow.name}}', + run_name=dsl.RUN_ID_PLACEHOLDER, data_uri=data_dir ) train = train_op( - working_dir=working_dir, data_dir=data_dir, - checkpoint_dir=checkpoint_dir, - model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'), + model_dir=copydata.outputs['copy_output_path'], action=TRAIN_ACTION, train_steps=train_steps, deploy_webapp=deploy_webapp ).apply(gcp.use_gcp_secret('user-gcp-sa')) @@ -80,29 +78,28 @@ def gh_summ( #pylint: disable=unused-argument log_model = metadata_log_op( log_type=MODEL, workspace_name=WORKSPACE_NAME, - run_name='{{workflow.name}}', - model_uri='%s/%s/model_output' % (working_dir, '{{workflow.name}}') + run_name=dsl.RUN_ID_PLACEHOLDER, + model_uri=train.outputs['train_output_path'] ) serve = dsl.ContainerOp( name='serve', image='gcr.io/google-samples/ml-pipeline-kubeflow-tfserve', - arguments=["--model_name", 'ghsumm-%s' % ('{{workflow.name}}',), - "--model_path", '%s/%s/model_output/export' % (working_dir, '{{workflow.name}}') + arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,), + "--model_path", train.outputs['train_output_path'] ] - ) + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + log_dataset.after(copydata) - train.after(copydata) log_model.after(train) - serve.after(train) train.set_gpu_limit(1) train.set_memory_limit('48G') - with dsl.Condition(train.output == 'true'): + with dsl.Condition(train.outputs['launch_server'] == 'true'): webapp = dsl.ContainerOp( name='webapp', image='gcr.io/google-samples/ml-pipeline-webapp-launcher:v2ap', - arguments=["--model_name", 'ghsumm-%s' % ('{{workflow.name}}',), + arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,), "--github_token", github_token] ) diff --git a/github_issue_summarization/pipelines/example_pipelines/gh_summ.py.tar.gz b/github_issue_summarization/pipelines/example_pipelines/gh_summ.py.tar.gz index b285e71f4..5998f78cf 100644 Binary files a/github_issue_summarization/pipelines/example_pipelines/gh_summ.py.tar.gz and b/github_issue_summarization/pipelines/example_pipelines/gh_summ.py.tar.gz differ diff --git a/github_issue_summarization/pipelines/example_pipelines/pipelines-notebook.ipynb b/github_issue_summarization/pipelines/example_pipelines/pipelines-notebook.ipynb index 46386f206..8de017d09 100644 --- a/github_issue_summarization/pipelines/example_pipelines/pipelines-notebook.ipynb +++ b/github_issue_summarization/pipelines/example_pipelines/pipelines-notebook.ipynb @@ -29,7 +29,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Do some installations and imports, and set some variables. Set the `WORKING_DIR` to a path under the Cloud Storage bucket you created earlier. The Pipelines SDK is bundled with the notebook server image, but we'll make sure that we're using the most current version for this example." + "Do some installations and imports, and set some variables. Set the `WORKING_DIR` to a path under the Cloud Storage bucket you created earlier. The Pipelines SDK is bundled with the notebook server image, but we'll make sure that we're using the most current version for this example. You may need to restart your kernel after the SDK update." ] }, { @@ -52,6 +52,8 @@ "import kfp.dsl as dsl\n", "import kfp.gcp as gcp\n", "import kfp.components as comp\n", + "from kfp.dsl.types import Integer, GCSPath, String\n", + "\n", "import kfp.notebook" ] }, @@ -139,15 +141,15 @@ "MODEL = 'model'\n", "\n", "copydata_op = comp.load_component_from_url(\n", - " 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/preempt/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml'\n", + " 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml' # pylint: disable=line-too-long\n", " )\n", "\n", "train_op = comp.load_component_from_url(\n", - " 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/preempt/github_issue_summarization/pipelines/components/t2t/train_component.yaml'\n", + " 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long\n", " )\n", "\n", "metadata_log_op = comp.load_component_from_url(\n", - " 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/preempt/github_issue_summarization/pipelines/components/t2t/metadata_log_component.yaml'\n", + " 'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/metadata_log_component.yaml' # pylint: disable=line-too-long\n", " )" ] }, @@ -170,37 +172,34 @@ " description='Demonstrate Tensor2Tensor-based training and TF-Serving'\n", ")\n", "def gh_summ( #pylint: disable=unused-argument\n", - " train_steps=2019300,\n", - " project='YOUR_PROJECT_HERE',\n", - " github_token='YOUR_GITHUB_TOKEN_HERE',\n", - " working_dir='YOUR_GCS_DIR_HERE',\n", - " checkpoint_dir='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000',\n", - " deploy_webapp='true',\n", - " data_dir='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'\n", + " train_steps: Integer = 2019300,\n", + " project: String = 'YOUR_PROJECT_HERE',\n", + " github_token: String = 'YOUR_GITHUB_TOKEN_HERE',\n", + " working_dir: GCSPath = 'YOUR_GCS_DIR_HERE',\n", + " checkpoint_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000/',\n", + " deploy_webapp: String = 'true',\n", + " data_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'\n", " ):\n", "\n", "\n", " copydata = copydata_op(\n", - " working_dir=working_dir,\n", " data_dir=data_dir,\n", " checkpoint_dir=checkpoint_dir,\n", - " model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'),\n", - " action=COPY_ACTION\n", + " model_dir='%s/%s/model_output' % (working_dir, dsl.RUN_ID_PLACEHOLDER),\n", + " action=COPY_ACTION,\n", " ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", "\n", "\n", " log_dataset = metadata_log_op(\n", " log_type=DATASET,\n", " workspace_name=WORKSPACE_NAME,\n", - " run_name='{{workflow.name}}',\n", + " run_name=dsl.RUN_ID_PLACEHOLDER,\n", " data_uri=data_dir\n", " )\n", "\n", " train = train_op(\n", - " working_dir=working_dir,\n", " data_dir=data_dir,\n", - " checkpoint_dir=checkpoint_dir,\n", - " model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'),\n", + " model_dir=copydata.outputs['copy_output_path'],\n", " action=TRAIN_ACTION, train_steps=train_steps,\n", " deploy_webapp=deploy_webapp\n", " ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", @@ -209,29 +208,27 @@ " log_model = metadata_log_op(\n", " log_type=MODEL,\n", " workspace_name=WORKSPACE_NAME,\n", - " run_name='{{workflow.name}}',\n", - " model_uri='%s/%s/model_output' % (working_dir, '{{workflow.name}}')\n", + " run_name=dsl.RUN_ID_PLACEHOLDER,\n", + " model_uri=train.outputs['train_output_path']\n", " )\n", "\n", " serve = dsl.ContainerOp(\n", " name='serve',\n", " image='gcr.io/google-samples/ml-pipeline-kubeflow-tfserve',\n", - " arguments=[\"--model_name\", 'ghsumm-%s' % ('{{workflow.name}}',),\n", - " \"--model_path\", '%s/%s/model_output/export' % (working_dir, '{{workflow.name}}')\n", + " arguments=[\"--model_name\", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),\n", + " \"--model_path\", train.outputs['train_output_path']\n", " ]\n", " )\n", " log_dataset.after(copydata)\n", - " train.after(copydata)\n", " log_model.after(train)\n", - " serve.after(train)\n", - " train.set_gpu_limit(4)\n", + " train.set_gpu_limit(1)\n", " train.set_memory_limit('48G')\n", "\n", - " with dsl.Condition(train.output == 'true'):\n", + " with dsl.Condition(train.outputs['launch_server'] == 'true'):\n", " webapp = dsl.ContainerOp(\n", " name='webapp',\n", " image='gcr.io/google-samples/ml-pipeline-webapp-launcher:v2ap',\n", - " arguments=[\"--model_name\", 'ghsumm-%s' % ('{{workflow.name}}',),\n", + " arguments=[\"--model_name\", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),\n", " \"--github_token\", github_token]\n", "\n", " )\n", @@ -328,80 +325,74 @@ "\n", "@dsl.pipeline(\n", " name='Github issue summarization',\n", - " description='Demonstrate TFT-based feature processing, TFMA, TFJob, CMLE OP, and TF-Serving'\n", + " description='Demonstrate Tensor2Tensor-based training and TF-Serving'\n", ")\n", "def gh_summ2(\n", - " train_steps = 2019300,\n", - " project = PROJECT_NAME,\n", - " github_token='YOUR_GITHUB_TOKEN_HERE',\n", - " working_dir='YOUR_GCS_DIR_HERE',\n", - " checkpoint_dir = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000',\n", - " deploy_webapp = 'true',\n", - " data_dir = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'\n", + " train_steps: Integer = 2019300,\n", + " project: String = 'YOUR_PROJECT_HERE',\n", + " github_token: String = 'YOUR_GITHUB_TOKEN_HERE',\n", + " working_dir: GCSPath = 'YOUR_GCS_DIR_HERE',\n", + " checkpoint_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000/',\n", + " deploy_webapp: String = 'true',\n", + " data_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'\n", " ):\n", "\n", " # The new pre-processing op.\n", " preproc = preproc_op(project=project,\n", - " data_dir=('%s/%s/gh_data' % (working_dir, '{{workflow.name}}')))\n", + " data_dir=('%s/%s/gh_data' % (working_dir, dsl.RUN_ID_PLACEHOLDER)))\n", "\n", " copydata = copydata_op(\n", - " working_dir=working_dir,\n", " data_dir=data_dir,\n", " checkpoint_dir=checkpoint_dir,\n", - " model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'),\n", - " action=COPY_ACTION\n", + " model_dir='%s/%s/model_output' % (working_dir, dsl.RUN_ID_PLACEHOLDER),\n", + " action=COPY_ACTION,\n", " ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", "\n", "\n", " log_dataset = metadata_log_op(\n", " log_type=DATASET,\n", " workspace_name=WORKSPACE_NAME,\n", - " run_name='{{workflow.name}}',\n", + " run_name=dsl.RUN_ID_PLACEHOLDER,\n", " data_uri=data_dir\n", " )\n", "\n", " train = train_op(\n", - " working_dir=working_dir,\n", " data_dir=data_dir,\n", - " checkpoint_dir=checkpoint_dir,\n", - " model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'),\n", + " model_dir=copydata.outputs['copy_output_path'],\n", " action=TRAIN_ACTION, train_steps=train_steps,\n", " deploy_webapp=deploy_webapp\n", " ).apply(gcp.use_gcp_secret('user-gcp-sa'))\n", "\n", " log_dataset.after(copydata)\n", - " train.after(copydata)\n", " train.after(preproc)\n", "\n", " log_model = metadata_log_op(\n", " log_type=MODEL,\n", " workspace_name=WORKSPACE_NAME,\n", - " run_name='{{workflow.name}}',\n", - " model_uri='%s/%s/model_output' % (working_dir, '{{workflow.name}}')\n", + " run_name=dsl.RUN_ID_PLACEHOLDER,\n", + " model_uri=train.outputs['train_output_path']\n", " )\n", "\n", " serve = dsl.ContainerOp(\n", " name='serve',\n", " image='gcr.io/google-samples/ml-pipeline-kubeflow-tfserve',\n", - " arguments=[\"--model_name\", 'ghsumm-%s' % ('{{workflow.name}}',),\n", - " \"--model_path\", '%s/%s/model_output/export' % (working_dir, '{{workflow.name}}')\n", + " arguments=[\"--model_name\", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),\n", + " \"--model_path\", train.outputs['train_output_path']\n", " ]\n", " )\n", - "\n", " log_model.after(train)\n", - " serve.after(train)\n", - " train.set_gpu_limit(4)\n", + " train.set_gpu_limit(1)\n", " train.set_memory_limit('48G')\n", "\n", - " with dsl.Condition(train.output == 'true'):\n", + " with dsl.Condition(train.outputs['launch_server'] == 'true'):\n", " webapp = dsl.ContainerOp(\n", " name='webapp',\n", " image='gcr.io/google-samples/ml-pipeline-webapp-launcher:v2ap',\n", - " arguments=[\"--model_name\", 'ghsumm-%s' % ('{{workflow.name}}',),\n", + " arguments=[\"--model_name\", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),\n", " \"--github_token\", github_token]\n", "\n", " )\n", - " webapp.after(serve) \n" + " webapp.after(serve)\n" ] }, { diff --git a/mnist/testing/tfjob_test.py b/mnist/testing/tfjob_test.py index cd82ee05e..3e921efb2 100644 --- a/mnist/testing/tfjob_test.py +++ b/mnist/testing/tfjob_test.py @@ -64,6 +64,14 @@ def test_train(self): util.run(['wget', '-O', '/usr/local/bin/kustomize', kusUrl], cwd=self.app_dir) util.run(['chmod', 'a+x', '/usr/local/bin/kustomize'], cwd=self.app_dir) + # TODO @jinchihe: The kubectl has been upgraded to 1.14.0 in kubeflow/testing/pull/500. + # But the test-worker image is not refreshed, see issue kubeflow/testing/issues/501. + # The below code can be removed once test-worker released. + kusUrl = 'https://storage.googleapis.com/kubernetes-release/' \ + 'release/v1.14.0/bin/linux/amd64/kubectl' + util.run(['wget', '-O', '/usr/local/bin/kubectl', kusUrl], cwd=self.app_dir) + util.run(['chmod', 'a+x', '/usr/local/bin/kubectl'], cwd=self.app_dir) + # Setup parameters for kustomize configmap = 'mnist-map-training' for pair in self.params.split(","):