Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tech writer edits #2403

Merged
merged 4 commits into from
Oct 17, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 78 additions & 97 deletions components/gcp/dataflow/launch_python/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,34 @@

# Name
Data preparation by executing an Apache Beam job in Cloud Dataflow
Component: Data preparation by executing an Apache Beam job in Cloud Dataflow

# Labels
GCP, Cloud Dataflow, Apache Beam, Python, Kubeflow
Cloud Dataflow, Apache Beam, Kubeflow

# Summary
A Kubeflow Pipeline component that prepares data by submitting an Apache Beam job (authored in Python) to Cloud Dataflow for execution. The Python Beam code is run with Cloud Dataflow Runner.
A Kubeflow pipeline component that prepares data by submitting an Apache Beam job (authored in Python) to Cloud Dataflow for execution. The Python Beam code is run with Cloud Dataflow Runner.

# Facets
<!--Make sure the asset has data for the following facets:
Use case
Technique
Input data type
ML workflow

The data must map to the acceptable values for these facets, as documented on the “taxonomy” sheet of go/aihub-facets
https://gitlab.aihub-content-external.com/aihubbot/kfp-components/commit/fe387ab46181b5d4c7425dcb8032cb43e70411c1
--->
Use case:
Other

Technique:
Other

Input data type:
Tabular

ML workflow:
Data preparation

# Details
## Intended use
Expand All @@ -16,12 +38,12 @@ Use this component to run a Python Beam code to submit a Cloud Dataflow job as a
## Runtime arguments
Name | Description | Optional | Data type| Accepted values | Default |
:--- | :----------| :----------| :----------| :----------| :---------- |
python_file_path | The path to the Cloud Storage bucket or local directory containing the Python file to be run. | | GCSPath | | |
project_id | The ID of the Google Cloud Platform (GCP) project containing the Cloud Dataflow job.| | GCPProjectID | | |
staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information.This is done so that you can resume the job in case of failure. `staging_dir` is passed as the command line arguments (`staging_location` and `temp_location`) of the Beam code. | Yes | GCSPath | | None |
requirements_file_path | The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath | | None |
python_file_path | The path to the Cloud Storage bucket or local directory containing the Python file to be run. | - | GCSPath | - | - |
project_id | The ID of the Google Cloud Platform (GCP) project containing the Cloud Dataflow job.| -| GCPProjectID | -| -|
staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information.This is done so that you can resume the job in case of failure. The command line arguments, `staging_location` and `temp_location`, of the Beam code are passed through `staging_dir`. | Yes | GCSPath | - | None |
requirements_file_path | The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath | - | None |
args | The list of arguments to pass to the Python file. | No | List | A list of string arguments | None |
wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | | 30 |
wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | - | 30 |

## Input data schema

Expand All @@ -31,21 +53,20 @@ Before you use the component, the following files must be ready in a Cloud Stora

The Beam Python code should follow the [Beam programming guide](https://beam.apache.org/documentation/programming-guide/) as well as the following additional requirements to be compatible with this component:
- It accepts the command line arguments `--project`, `--temp_location`, `--staging_location`, which are [standard Dataflow Runner options](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-pipeline-options).
- It enables `info logging` before the start of a Cloud Dataflow job in the Python code. This is important to allow the component to track the status and ID of the job that is created. For example, calling `logging.getLogger().setLevel(logging.INFO)` before any other code.

- It enables `info logging` before the start of a Cloud Dataflow job in the Python code. This allows the component to track the status and ID of the job that is created. For example, calling `logging.getLogger().setLevel(logging.INFO)` before any other code.

## Output
Name | Description
:--- | :----------
job_id | The id of the Cloud Dataflow job that is created.
job_id | The ID of the Cloud Dataflow job that is created.

## Cautions & requirements
To use the components, the following requirements must be met:
- Cloud Dataflow API is enabled.
- The component is running under a secret Kubeflow user service account in a Kubeflow Pipeline cluster. For example:
```
component_op(...).apply(gcp.use_gcp_secret('user-gcp-sa'))
```
- The component is running under a secret Kubeflow user service account in a Kubeflow Pipelines cluster. For example:
```
component_op(...).apply(gcp.use_gcp_secret('user-gcp-sa'))
```
The Kubeflow user service account is a member of:
- `roles/dataflow.developer` role of the project.
- `roles/storage.objectViewer` role of the Cloud Storage Objects `python_file_path` and `requirements_file_path`.
Expand All @@ -59,83 +80,49 @@ The component does several things during the execution:
- Stores the Cloud Dataflow job information in `staging_dir` so the job can be resumed in case of failure.
- Waits for the job to finish.
The steps to use the component in a pipeline are:
1. Install the Kubeflow Pipelines SDK:
1. Install the Kubeflow pipeline's SDK:

```python
%%capture --no-stderr

KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
```

```python
%%capture --no-stderr

KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
```

2. Load the component using KFP SDK
2. Load the component using the Kubeflow pipeline's SDK:

```python
import kfp.components as comp

```python
import kfp.components as comp

dataflow_python_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/e598176c02f45371336ccaa819409e8ec83743df/components/gcp/dataflow/launch_python/component.yaml')
help(dataflow_python_op)
```
dataflow_python_op = comp.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/e598176c02f45371336ccaa819409e8ec83743df/components/gcp/dataflow/launch_python/component.yaml')
help(dataflow_python_op)
```

### Sample
Note: The following sample code works in an IPython notebook or directly in Python code. See the sample code below to learn how to execute the template.
In this sample, we run a wordcount sample code in a Kubeflow Pipeline. The output will be stored in a Cloud Storage bucket. Here is the sample code:

The following sample code works in an IPython notebook or directly in Python code. See the sample code below to learn how to execute the template.
In this sample, we run a wordcount sample code in a Kubeflow pipeline. The output will be stored in a Cloud Storage bucket. Here is the sample code:

```python
!gsutil cat gs://ml-pipeline-playground/samples/dataflow/wc/wc.py
```

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A minimalist word-counting workflow that counts words in Shakespeare.

This is the first in a series of successively more detailed 'word count'
examples.

Next, see the wordcount pipeline, then the wordcount_debugging pipeline, for
more detailed examples that introduce additional concepts.

Concepts:
Concepts:

1. Reading data from text files
2. Specifying 'inline' transforms
3. Counting a PCollection
4. Writing data to Cloud Storage as text files
To execute this pipeline locally, first edit the code to specify the output
location. Output location could be a local file path or an output prefix
on GCS. (Only update the output location marked with the first CHANGE comment.)
1. Reading data from text files.
2. Specifying inline transforms.
3. Counting a PCollection.
4. Writing data to Cloud Storage as text files.

Notes:

To execute this pipeline locally, first edit the code to specify the output location. Output location could be a local file path or an output prefix on Cloud Storage. (Only update the output location marked with the first CHANGE comment in the following code.)

To execute this pipeline remotely, first edit the code to set your project ID,
runner type, the staging location, the temp location, and the output location.
The specified GCS bucket(s) must already exist. (Update all the places marked
with a CHANGE comment.)
To execute this pipeline remotely, first edit the code to set your project ID, runner type, the staging location, the temp location, and the output location.
The specified Cloud Storage bucket(s) must already exist. (Update all the places marked with a CHANGE comment in the following code.)

Then, run the pipeline as described in the README. It will be deployed and run
using the Google Cloud Dataflow Service. No args are required to run the
pipeline. You can see the results in your output bucket in the GCS browser.
"""
Then, run the pipeline as described in the README. It will be deployed and run using the Cloud Dataflow service. No arguments are required to run the pipeline. You can see the results in your output bucket in the Cloud Storage browser.

```python
from __future__ import absolute_import

import argparse
Expand All @@ -161,22 +148,22 @@ In this sample, we run a wordcount sample code in a Kubeflow Pipeline. The outpu
help='Input file to process.')
parser.add_argument('--output',
dest='output',
# CHANGE 1/5: The Google Cloud Storage path is required
# for outputting the results.
# CHANGE 1/5: The Cloud Storage path is required
# to output the results.
default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# pipeline_args.extend([
# # CHANGE 2/5: (OPTIONAL) Change this to DataflowRunner to
# # run your pipeline on the Google Cloud Dataflow Service.
# # run your pipeline on the Cloud Dataflow Service.
# '--runner=DirectRunner',
# # CHANGE 3/5: Your project ID is required in order to run your pipeline on
# # the Google Cloud Dataflow Service.
# # the Cloud Dataflow Service.
# '--project=SET_YOUR_PROJECT_ID_HERE',
# # CHANGE 4/5: Your Google Cloud Storage path is required for staging local
# # CHANGE 4/5: Your Cloud Storage path is required for staging local
# # files.
# '--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY',
# # CHANGE 5/5: Your Google Cloud Storage path is required for temporary
# # CHANGE 5/5: Your Cloud Storage path is required for temporary
# # files.
# '--temp_location=gs://YOUR_BUCKET_NAME/AND_TEMP_DIRECTORY',
# '--job_name=your-wordcount-job',
Expand Down Expand Up @@ -214,27 +201,24 @@ In this sample, we run a wordcount sample code in a Kubeflow Pipeline. The outpu
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

```

#### Set sample parameters


```python
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_STAGING_DIR = 'gs://<Please put your GCS path here>' # No ending slash
# Required parameters
PROJECT_ID = '<Put your project ID here>'
GCS_STAGING_DIR = 'gs://<Put your GCS path here>' # No ending slash
```


```python
# Optional Parameters
# Optional parameters
EXPERIMENT_NAME = 'Dataflow - Launch Python'
OUTPUT_FILE = '{}/wc/wordcount.out'.format(GCS_STAGING_DIR)
```

#### Example pipeline that uses the component


```python
import kfp.dsl as dsl
import kfp.gcp as gcp
Expand Down Expand Up @@ -264,7 +248,6 @@ def pipeline(

#### Compile the pipeline


```python
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
Expand All @@ -274,12 +257,11 @@ compiler.Compiler().compile(pipeline_func, pipeline_filename)

#### Submit the pipeline for execution


```python
#Specify pipeline argument values
#Specify values for the pipeline's arguments
arguments = {}

#Get or create an experiment and submit a pipeline run
#Get or create an experiment
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
Expand All @@ -291,14 +273,13 @@ run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arg

#### Inspect the output


```python
!gsutil cat $OUTPUT_FILE
```

## References
* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_python.py)
* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)
* [Component Python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_python.py)
* [Component Docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)
* [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/dataflow/launch_python/sample.ipynb)
* [Dataflow Python Quickstart](https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python)

Expand Down