Skip to content

Commit

Permalink
Sample: model retraining scenario using AI Platform components (#1513)
Browse files Browse the repository at this point in the history
* Initial commit of AI Platform sample

* Minor change to pipeline description in README
  • Loading branch information
kweinmeister authored and k8s-ci-robot committed Jun 19, 2019
1 parent 79411b7 commit 56134b8
Show file tree
Hide file tree
Showing 13 changed files with 1,081 additions and 0 deletions.
324 changes: 324 additions & 0 deletions samples/ai-platform/Chicago Crime Pipeline.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Chicago Crime Prediction Pipeline\n",
"\n",
"An example notebook that demonstrates how to:\n",
"* Download data from BigQuery\n",
"* Create a Kubeflow pipeline\n",
"* Include Google Cloud AI Platform components to train and deploy the model in the pipeline\n",
"* Submit a job for execution\n",
"\n",
"The model forecasts how many crimes are expected to be reported the next day, based on how many were reported over the previous `n` days."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%capture\n",
"\n",
"# Install the SDK (Uncomment the code if the SDK is not installed before)\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.21/kfp.tar.gz'\n",
"!pip3 install --upgrade pip -q\n",
"!pip3 install $KFP_PACKAGE --upgrade -q\n",
"!pip3 install pandas --upgrade -q"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"\n",
"import kfp\n",
"import kfp.components as comp\n",
"import kfp.dsl as dsl\n",
"import kfp.gcp as gcp\n",
"\n",
"import pandas as pd\n",
"\n",
"import time"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Constants"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Required Parameters\n",
"PROJECT_ID = '<ADD GCP PROJECT HERE>'\n",
"GCS_WORKING_DIR = 'gs://<ADD STORAGE LOCATION HERE>' # No ending slash\n",
"\n",
"# Optional Parameters\n",
"REGION = 'us-central1'\n",
"RUNTIME_VERSION = '1.13'\n",
"PACKAGE_URIS=json.dumps(['gs://chicago-crime/chicago_crime_trainer-0.0.tar.gz'])\n",
"TRAINER_OUTPUT_GCS_PATH = GCS_WORKING_DIR + '/train/output/' + str(int(time.time())) + '/'\n",
"DATA_GCS_PATH = GCS_WORKING_DIR + '/reports.csv'\n",
"PYTHON_MODULE = 'trainer.task'\n",
"TRAINER_ARGS = json.dumps([\n",
" '--data-file-url', DATA_GCS_PATH,\n",
" '--job-dir', GCS_WORKING_DIR\n",
"])\n",
"EXPERIMENT_NAME = 'Chicago Crime Prediction'\n",
"PIPELINE_NAME = 'Chicago Crime Prediction'\n",
"PIPELINE_DESCRIPTION = ''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Download data\n",
"\n",
"Define a download function that uses the BigQuery component"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bigquery_query_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/d2f5cc92a46012b9927209e2aaccab70961582dc/components/gcp/bigquery/query/component.yaml')\n",
"\n",
"QUERY = \"\"\"\n",
" SELECT count(*) as count, TIMESTAMP_TRUNC(date, DAY) as day\n",
" FROM `bigquery-public-data.chicago_crime.crime`\n",
" GROUP BY day\n",
" ORDER BY day\n",
"\"\"\"\n",
"\n",
"def download(project_id, data_gcs_path):\n",
"\n",
" return bigquery_query_op(\n",
" query=QUERY,\n",
" project_id=project_id,\n",
" output_gcs_path=data_gcs_path\n",
" ).apply(\n",
" gcp.use_gcp_secret('user-gcp-sa') \n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train the model\n",
"\n",
"Run training code that will pre-process the data and then submit a training job to the AI Platform."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mlengine_train_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/d2f5cc92a46012b9927209e2aaccab70961582dc/components/gcp/ml_engine/train/component.yaml')\n",
"\n",
"def train(project_id,\n",
" trainer_args,\n",
" package_uris,\n",
" trainer_output_gcs_path,\n",
" gcs_working_dir,\n",
" region,\n",
" python_module,\n",
" runtime_version):\n",
" \n",
" return mlengine_train_op(\n",
" project_id=project_id, \n",
" python_module=python_module,\n",
" package_uris=package_uris,\n",
" region=region,\n",
" args=trainer_args,\n",
" job_dir=trainer_output_gcs_path,\n",
" runtime_version=runtime_version\n",
" ).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Deploy model\n",
"\n",
"Deploy the model with the ID given from the training step"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"mlengine_deploy_op = comp.load_component_from_url(\n",
" 'https://raw.githubusercontent.com/kubeflow/pipelines/d2f5cc92a46012b9927209e2aaccab70961582dc/components/gcp/ml_engine/deploy/component.yaml')\n",
"\n",
"def deploy(\n",
" project_id,\n",
" model_uri,\n",
" model_id,\n",
" runtime_version):\n",
" \n",
" return mlengine_deploy_op(\n",
" model_uri=model_uri,\n",
" project_id=project_id, \n",
" model_id=model_id, \n",
" runtime_version=runtime_version, \n",
" replace_existing_version=True, \n",
" set_default=True).apply(gcp.use_gcp_secret('user-gcp-sa'))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" name=PIPELINE_NAME,\n",
" description=PIPELINE_DESCRIPTION\n",
")\n",
"\n",
"def pipeline(\n",
" data_gcs_path=dsl.PipelineParam(name='data_gcs_path', value=DATA_GCS_PATH),\n",
" gcs_working_dir=dsl.PipelineParam(name='gcs_working_dir', value=GCS_WORKING_DIR),\n",
" project_id=dsl.PipelineParam(name='project_id', value=PROJECT_ID),\n",
" python_module=dsl.PipelineParam(name='python_module', value=PYTHON_MODULE),\n",
" region=dsl.PipelineParam(name='region', value=REGION),\n",
" runtime_version=dsl.PipelineParam(name='runtime_version', value=RUNTIME_VERSION),\n",
" package_uris=dsl.PipelineParam(name='package_uris', value=PACKAGE_URIS),\n",
" trainer_output_gcs_path=dsl.PipelineParam(name='trainer_output_gcs_path', value=TRAINER_OUTPUT_GCS_PATH),\n",
" trainer_args=dsl.PipelineParam(name='trainer_args', value=TRAINER_ARGS),\n",
"): \n",
" download_task = download(project_id,\n",
" data_gcs_path)\n",
"\n",
" train_task = train(project_id,\n",
" trainer_args,\n",
" package_uris,\n",
" trainer_output_gcs_path,\n",
" gcs_working_dir,\n",
" region,\n",
" python_module,\n",
" runtime_version).after(download_task)\n",
" \n",
" deploy_task = deploy(project_id,\n",
" train_task.outputs['job_dir'],\n",
" train_task.outputs['job_id'],\n",
" runtime_version) \n",
" return True\n",
"\n",
"# Reference for invocation later\n",
"pipeline_func = pipeline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compile pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'\n",
"import kfp.compiler as compiler\n",
"compiler.Compiler().compile(pipeline_func, pipeline_filename)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit the pipeline for execution"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Specify pipeline argument values\n",
"arguments = {}\n",
"\n",
"# Get or create an experiment and submit a pipeline run\n",
"client = kfp.Client()\n",
"try:\n",
" experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)\n",
"except:\n",
" experiment = client.create_experiment(EXPERIMENT_NAME)\n",
"\n",
"# Submit a pipeline run\n",
"run_name = pipeline_func.__name__ + ' run'\n",
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit 56134b8

Please sign in to comment.