From 44a8b202cf0ab550cbdedce33891e1db88ad96e2 Mon Sep 17 00:00:00 2001 From: Jin Chi He Date: Wed, 23 Oct 2019 10:20:07 +0800 Subject: [PATCH] debug mnist ci/cd testing problem. --- .../components/t2t/containers/base/Dockerfile | 2 +- .../components/t2t/datacopy_component.yaml | 13 +-- .../components/t2t/t2t-train/train_model.py | 76 ++++++++------ .../components/t2t/train_component.yaml | 25 +++-- .../pipelines/example_pipelines/gh_summ.py | 47 ++++----- .../example_pipelines/gh_summ.py.tar.gz | Bin 1700 -> 2148 bytes .../pipelines-notebook.ipynb | 99 ++++++++---------- mnist/testing/tfjob_test.py | 8 ++ 8 files changed, 139 insertions(+), 131 deletions(-) diff --git a/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile b/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile index a5c26d158..718a3d7bc 100644 --- a/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile +++ b/github_issue_summarization/pipelines/components/t2t/containers/base/Dockerfile @@ -26,7 +26,7 @@ RUN pip install tensorflow-probability==0.5 RUN pip install tensor2tensor==1.11.0 RUN pip install tensorflow_hub==0.1.1 RUN pip install pyyaml==3.12 six==1.11.0 -RUN pip install google-cloud-storage +RUN pip install google-cloud-storage pathlib2 RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \ unzip -qq google-cloud-sdk.zip -d /tools && \ diff --git a/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml b/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml index 5d1e97ef0..30c00f916 100644 --- a/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml +++ b/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml @@ -20,9 +20,6 @@ metadata: labels: add-pod-env: 'true' inputs: - - name: working_dir - description: '...' - type: GCSPath - name: data_dir description: '...' type: GCSPath @@ -35,15 +32,19 @@ inputs: - name: action description: '...' type: String +outputs: + - name: copy_output_path + description: '...' + type: GCSPath implementation: container: - image: gcr.io/google-samples/ml-pipeline-t2ttrain:v2ap + image: gcr.io/google-samples/ml-pipeline-t2ttrain:v3ap args: [ --data-dir, {inputValue: data_dir}, --checkpoint-dir, {inputValue: checkpoint_dir}, --action, {inputValue: action}, - --working-dir, {inputValue: working_dir}, - --model-dir, {inputValue: model_dir} + --model-dir, {inputValue: model_dir}, + --copy-output-path, {outputPath: copy_output_path} ] env: KFP_POD_NAME: "{{pod.name}}" diff --git a/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py b/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py index e54120308..9e0daabd4 100644 --- a/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py +++ b/github_issue_summarization/pipelines/components/t2t/t2t-train/train_model.py @@ -22,18 +22,15 @@ from urlparse import urlparse from google.cloud import storage +import pathlib2 -# location of the model checkpoint from which we'll start our training -SOURCE_BUCKET = 'aju-dev-demos-codelabs' -PREFIX = 'kubecon/model_output_tbase.bak2019000/' COPY_ACTION = 'copy_data' TRAIN_ACTION = 'train' PROBLEM = 'gh_problem' OUTPUT_PATH = '/tmp/output' - def copy_blob(storage_client, source_bucket, source_blob, target_bucket_name, new_blob_name, new_blob_prefix, prefix): """Copies a blob from one bucket to another with a new name.""" @@ -49,17 +46,26 @@ def copy_blob(storage_client, source_bucket, source_blob, target_bucket_name, ne str(source_blob.name), str(source_bucket.name), str(new_blob.name), str(target_bucket.name)) -def copy_checkpoint(new_blob_prefix, target_bucket): +def copy_checkpoint(checkpoint_dir, model_dir): """Copy an existing model checkpoint directory to the working directory for the workflow, so that the training can start from that point. """ storage_client = storage.Client() - source_bucket = storage_client.bucket(SOURCE_BUCKET) retries = 10 + source_bucket_string = urlparse(checkpoint_dir).netloc + source_prefix = checkpoint_dir.replace('gs://' + source_bucket_string + '/', '') + logging.info("source bucket %s and prefix %s", source_bucket_string, source_prefix) + source_bucket = storage_client.bucket(source_bucket_string) + + target_bucket = urlparse(model_dir).netloc + logging.info("target bucket: %s", target_bucket) + new_blob_prefix = model_dir.replace('gs://' + target_bucket + '/', '') + logging.info("new_blob_prefix: %s", new_blob_prefix) + # Lists objects with the given prefix. - blob_list = list(source_bucket.list_blobs(prefix=PREFIX)) + blob_list = list(source_bucket.list_blobs(prefix=source_prefix)) logging.info('Copying files:') for blob in blob_list: sleeptime = 0.1 @@ -68,7 +74,7 @@ def copy_checkpoint(new_blob_prefix, target_bucket): logging.info('copying %s; retry %s', blob.name, num_retries) try: copy_blob(storage_client, source_bucket, blob, target_bucket, blob.name, new_blob_prefix, - PREFIX) + source_prefix) break except Exception as e: #pylint: disable=broad-except logging.warning(e) @@ -97,7 +103,6 @@ def run_training(args, data_dir, model_dir, problem): # print(result2) # then export the model... - model_export_command = ['t2t-exporter', '--model', 'transformer', '--hparams_set', 'transformer_prepend', '--problem', problem, @@ -124,17 +129,21 @@ def main(): help='...', required=True) parser.add_argument( - '--working-dir', + '--data-dir', help='...', required=True) parser.add_argument( - '--data-dir', + '--copy-output-path', help='...', - required=True) + ) parser.add_argument( + '--train-output-path', + help='...', + ) + parser.add_argument( # used for the copy step only '--checkpoint-dir', help='...', - required=True) + required=False) parser.add_argument( '--train-steps', help='...') @@ -145,34 +154,37 @@ def main(): args = parser.parse_args() - # Create metadata.json file for visualization. - metadata = { - 'outputs' : [{ - 'type': 'tensorboard', - 'source': args.model_dir, - }] - } - with open('/mlpipeline-ui-metadata.json', 'w') as f: - json.dump(metadata, f) - data_dir = args.data_dir logging.info("data dir: %s", data_dir) - - # model_startpoint = args.checkpoint_dir - logging.info("model_startpoint: %s", args.checkpoint_dir) model_dir = args.model_dir logging.info("model_dir: %s", model_dir) if args.action.lower() == COPY_ACTION: - # copy over the checkpoint directory - target_bucket = urlparse(args.working_dir).netloc - logging.info("target bucket: %s", target_bucket) - new_blob_prefix = model_dir.replace('gs://' + target_bucket + '/', '') - logging.info("new_blob_prefix: %s", new_blob_prefix) - copy_checkpoint(new_blob_prefix, target_bucket) + logging.info("model starting checkpoint: %s", args.checkpoint_dir) + copy_checkpoint(args.checkpoint_dir, model_dir) + # write the model dir path as an output param + logging.info("copy_output_path: %s", args.copy_output_path) + pathlib2.Path(args.copy_output_path).parent.mkdir(parents=True) + pathlib2.Path(args.copy_output_path).write_text(model_dir.decode('utf-8')) + elif args.action.lower() == TRAIN_ACTION: # launch the training job run_training(args, data_dir, model_dir, PROBLEM) + # write the model export path as an output param + logging.info("train_output_path: %s", args.train_output_path) + pathlib2.Path(args.train_output_path).parent.mkdir(parents=True) + export_dir = '%s/export' % model_dir + pathlib2.Path(args.train_output_path).write_text(export_dir.decode('utf-8')) + # Create metadata.json file for Tensorboard 'artifact' + metadata = { + 'outputs' : [{ + 'type': 'tensorboard', + 'source': model_dir, + }] + } + with open('/mlpipeline-ui-metadata.json', 'w') as f: + json.dump(metadata, f) + else: logging.warning("Error: unknown action mode %s", args.action) diff --git a/github_issue_summarization/pipelines/components/t2t/train_component.yaml b/github_issue_summarization/pipelines/components/t2t/train_component.yaml index 197e493bd..1017847d9 100644 --- a/github_issue_summarization/pipelines/components/t2t/train_component.yaml +++ b/github_issue_summarization/pipelines/components/t2t/train_component.yaml @@ -23,16 +23,10 @@ inputs: - name: train_steps description: '...' type: Integer - default: '2019300' - - name: working_dir - description: '...' - type: GCSPath + default: 2019300 - name: data_dir description: '...' type: GCSPath - - name: checkpoint_dir - description: '...' - type: GCSPath - name: model_dir description: '...' type: GCSPath @@ -43,22 +37,27 @@ inputs: description: '...' type: String outputs: - - name: output + - name: launch_server description: '...' type: String + - name: train_output_path + description: '...' + type: GCSPath + - name: MLPipeline UI metadata + type: UI metadata implementation: container: - image: gcr.io/google-samples/ml-pipeline-t2ttrain:v2ap + image: gcr.io/google-samples/ml-pipeline-t2ttrain:v3ap args: [ --data-dir, {inputValue: data_dir}, - --checkpoint-dir, {inputValue: checkpoint_dir}, --action, {inputValue: action}, - --working-dir, {inputValue: working_dir}, --model-dir, {inputValue: model_dir}, --train-steps, {inputValue: train_steps}, - --deploy-webapp, {inputValue: deploy_webapp} + --deploy-webapp, {inputValue: deploy_webapp}, + --train-output-path, {outputPath: train_output_path} ] env: KFP_POD_NAME: "{{pod.name}}" fileOutputs: - output: /tmp/output + launch_server: /tmp/output + MLPipeline UI metadata: /mlpipeline-ui-metadata.json diff --git a/github_issue_summarization/pipelines/example_pipelines/gh_summ.py b/github_issue_summarization/pipelines/example_pipelines/gh_summ.py index bf0bde1a1..b0f849180 100644 --- a/github_issue_summarization/pipelines/example_pipelines/gh_summ.py +++ b/github_issue_summarization/pipelines/example_pipelines/gh_summ.py @@ -16,6 +16,7 @@ import kfp.dsl as dsl import kfp.gcp as gcp import kfp.components as comp +from kfp.dsl.types import GCSPath, String COPY_ACTION = 'copy_data' @@ -25,11 +26,11 @@ MODEL = 'model' copydata_op = comp.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml' # pylint: disable=line-too-long + 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml' # pylint: disable=line-too-long ) train_op = comp.load_component_from_url( - 'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long + 'https://raw.githubusercontent.com/amygdala/kubeflow-examples/ghpl_update/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long ) metadata_log_op = comp.load_component_from_url( @@ -41,37 +42,34 @@ description='Demonstrate Tensor2Tensor-based training and TF-Serving' ) def gh_summ( #pylint: disable=unused-argument - train_steps=2019300, - project='YOUR_PROJECT_HERE', - github_token='YOUR_GITHUB_TOKEN_HERE', - working_dir='YOUR_GCS_DIR_HERE', - checkpoint_dir='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000', - deploy_webapp='true', - data_dir='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/' + train_steps: 'Integer' = 2019300, + project: String = 'YOUR_PROJECT_HERE', + github_token: String = 'YOUR_GITHUB_TOKEN_HERE', + working_dir: GCSPath = 'gs://YOUR_GCS_DIR_HERE', + checkpoint_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000/', + deploy_webapp: String = 'true', + data_dir: GCSPath = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/' ): copydata = copydata_op( - working_dir=working_dir, data_dir=data_dir, checkpoint_dir=checkpoint_dir, - model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'), - action=COPY_ACTION + model_dir='%s/%s/model_output' % (working_dir, dsl.RUN_ID_PLACEHOLDER), + action=COPY_ACTION, ).apply(gcp.use_gcp_secret('user-gcp-sa')) log_dataset = metadata_log_op( log_type=DATASET, workspace_name=WORKSPACE_NAME, - run_name='{{workflow.name}}', + run_name=dsl.RUN_ID_PLACEHOLDER, data_uri=data_dir ) train = train_op( - working_dir=working_dir, data_dir=data_dir, - checkpoint_dir=checkpoint_dir, - model_dir='%s/%s/model_output' % (working_dir, '{{workflow.name}}'), + model_dir=copydata.outputs['copy_output_path'], action=TRAIN_ACTION, train_steps=train_steps, deploy_webapp=deploy_webapp ).apply(gcp.use_gcp_secret('user-gcp-sa')) @@ -80,29 +78,28 @@ def gh_summ( #pylint: disable=unused-argument log_model = metadata_log_op( log_type=MODEL, workspace_name=WORKSPACE_NAME, - run_name='{{workflow.name}}', - model_uri='%s/%s/model_output' % (working_dir, '{{workflow.name}}') + run_name=dsl.RUN_ID_PLACEHOLDER, + model_uri=train.outputs['train_output_path'] ) serve = dsl.ContainerOp( name='serve', image='gcr.io/google-samples/ml-pipeline-kubeflow-tfserve', - arguments=["--model_name", 'ghsumm-%s' % ('{{workflow.name}}',), - "--model_path", '%s/%s/model_output/export' % (working_dir, '{{workflow.name}}') + arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,), + "--model_path", train.outputs['train_output_path'] ] - ) + ).apply(gcp.use_gcp_secret('user-gcp-sa')) + log_dataset.after(copydata) - train.after(copydata) log_model.after(train) - serve.after(train) train.set_gpu_limit(1) train.set_memory_limit('48G') - with dsl.Condition(train.output == 'true'): + with dsl.Condition(train.outputs['launch_server'] == 'true'): webapp = dsl.ContainerOp( name='webapp', image='gcr.io/google-samples/ml-pipeline-webapp-launcher:v2ap', - arguments=["--model_name", 'ghsumm-%s' % ('{{workflow.name}}',), + arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,), "--github_token", github_token] ) diff --git a/github_issue_summarization/pipelines/example_pipelines/gh_summ.py.tar.gz b/github_issue_summarization/pipelines/example_pipelines/gh_summ.py.tar.gz index b285e71f412e579056a99d16c0c05b517d72446b..5998f78cfc8d6c7a8d89f7aca799e746e94160d7 100644 GIT binary patch literal 2148 zcmV-q2%GmGiwFp06t!If|7U1lb9HTPE^v7+bYXG;?HgTh+c>h%`4xm;{8CwQ+AHo1 z>_aYbQ(x0KNSr+!wuQh*jI2gqkf;Q?ApiY_q$HCPCCf@0BMZz>n z4#bafjZ@-q+6cjWFZ4ne_&R9y*|;Rh9XyWSqmdWJ8BKxIAB^@5&V0(~9_&y5@nL_T z%?NQo*bpaTrIf_!=zUIjL?2@QauHJEl4SJs9w~JGv%I>n&M%h#njfvKFY}AJOp$4f z*tCLFkmjf-i`AE_e_E^M*ZElu-}UG^yS8X_OTrpwH=UWLpmB6*9WO3&^n!drW`;xV;9n4>h8;P?$JeRJ_Q{SyGhhQ&lu^*5MVZzo*Z7v{4{X`2^Rw5) zzKtULMmR0DfS7mblk@y znh}&{L4XtQzua<1Ph*(ul8`X#JA?T%BzDB$vkhgovByoA*?!_?0SReg`8WYo3MWZC z8z|(YxVahJ+;W-ju%E$;ac-MqWqNK)>eQO`DJZ$H+Rgo$w)0&V19lWYaKn+NaP!{IMLK~`Pv+y}eI95hndiWOd`Ogq07pX%8!@Kq z@|8)0*c2~PlgpK#FgDASWV9o&Ul!W`_>qs0@UFUNUq7E)=gVX3?C{%suWn9pC11 zp)%NDiYGr)N#TOY(dqK)`11Ixb$GSrTUlvA8@xJbF<+X^icHnHOM}ok%-jsh%4}t`80k!{H{W z8Tdx&x4fJ?_4`QVwpF)t^b%YZ!#whaodxR%OWLsI`vZl&Q!YePX(*pri5 z?sBS$ZA)?s=O~UsSVd!Ydx*Z~2%n$i8gI|zsZ9k=CiGYL`|Z(jZh^e)RL73zy_qvZNyFLi{L1R>EQtA##~FQJ8u+pvn6AYopW@r`Un! zuA*zSYB~S?_-j4tPC^MkSjqRdc{P?NGUvGADxO8YsLp=N=^K1RG-)j$EL{Uvf-ffja z5I=H_BE)DEpn2VEy;UO@&pOAaDx&u(sxj!IxOQ(K>%utLqO5YlUsheJtKDcx9`EPB z`bAWordmnq^tK!CS8`y*fM23o$>=1BhaDl0%uQ8wo>TWfv%{VY$x2K_My`- z+o_UF&tgO+3GU-^<&gE-4tI-7Z4Tp|(!;?Mzco+X?eZ|V=)pz*IbAfrdUtTrVws9J zaM#TD8dd*VLsC9zKG%b4{Nog_W}f8zxJ}eOKoH$$JWLM(Zx4(HVmB&FWK3UkTl8?TP~sbUf`*< z(|`m~@-Ram|9&EkhIgLhVK{<;>BiX%P2XsAvEj3l z;wgjG!Ae|%I`w8qS04W5*b-lkpS$>42-7P-*^~nJ+i79n=xR|ODtRyE;oeOmx_W=t zwvfOtU2W#ozkueQ7W!GbAl3R6zqzYqukUw1O)E%i($PJo)l1&3AEozly0*Kttnn=* z*}blm@_(>sPfYdQE}e4<;mGn0W_6_l{30#Vd1!q%zyJdbFu(u<3^2d|0}L?000Rs# azyJdbFu(u<3^2d|+u(mvVbpH`Pyhf}jXxj& literal 1700 zcmV;V23z?biwFoG0aGVZ9Q3#C0$f_k#AStI_kpDiCl5FZ?TXvjaNqK-G6?HD+Ip23kN{+k; z`Cf=xa~Svs&m%`%KAfD$zg!A`kAG{o-)CdVc)R`KS z3%$_gaUElP2CfmA@yo_Lqv3^7%qVyDP58cpvCsI$@lpGa_eV$kej#4t zUFxzC!T^a8>D!_VMO64>^GFUi9^V__1(K3o5Z&k!&4O2h;e#t^)5#B-QV-t^E_LQN7j0GX~#6- zj~F;;#{UJF0vkKXhqu%ec(8G324cX%G2`{Jn5b^+7EVQ>IA@k+Db;KFH1-79(+GPZ z+xS3?#Hf}}2Sq-fTleS|M$wb!$d37=af(>(#DczW)_VkDP@2$pMdMsn+3+ZhkwN1i zfW-S(vaj*j`E_ zKyo9U$PD^1ztKzy*37A=V9%?Tc#|_pA65dm!mh-MoN)?Iz>`;HtDrq+w!~4{^2tcf zMM}^G5_A^GKpQ%x9{l)3TMTtem)yo!$Hrw>VD}TmZ2A6Ck@do7@iLX=a+q}I~-nK zoOZ_-gDdNF)bI7L#uweo>!K@@BAXy)x^@Kke;iIi5Bk)MIm3Y%q_Fja;v%q4F9$ch z>)vOpdo%u|__xk3c+dPp>QtN&9H`CV&O^R4LU)}UVddx$vPs9twxcq4 z6=6h(>tXkFuW@Ju?JSrVKv!_qCSup@V(j|7yFyNXM9shlizdoRE7{wb9mB;cfPLO3 zzlfHrkTMY0N!S$$E;OIQU7)0YtH!f}&z0-as?}FL9|++q6!MjZ4;=?^tOLOib@|KM zS2PFNK0nuAvUZR#k1pF8^X_>6D9t7C7nzSekUiDnF|gc(L0`1g7#%K`D4F$Y<0>dC znp}-|Hr+l%YM8lygX^lCMR6@{rKQ|%x#gQF6mq279#Wf*UkDn)nG4) zT7~R-vrHbu$8NdTEfY|2d#CyCv68E1y`I17M8NTenJyGw=UnrhZ8=4C{Ec^xsuOe# zpQ_cmRZM-gQTIqFZJxc)?V~5GLe~;WRT|H^XRIn`?ZdBHtW_BgH@pI?cQh6~M7)Yy z42kJb#0tk!c^Uw@^T$0E?_fkoMVG{3UaMI;T{epc^O$(M9L$pqN6z?V&Z@uU$!@XE zaKN}6PP6t_49bz@X~(D1`u;1NmbT}y+6P~p#T$6~I^IcXo^Q|d{k|!w*x+iiXOyyX zSaK(V!=S{Ql9h(4s|)7A;z| uXwjlYixw?fv}n