diff --git a/backend/Dockerfile b/backend/Dockerfile index 754e7b9d0ac3..23d588f194ca 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -28,7 +28,7 @@ FROM python:3.5 as compiler RUN apt-get update -y && \ apt-get install --no-install-recommends -y -q default-jdk python3-setuptools python3-dev RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py -RUN python3 -m pip install apache-beam[gcp]==2.17 pyarrow==0.14.1 tfx==0.15.0 +RUN python3 -m pip install tfx==0.21.0rc0 WORKDIR /go/src/github.com/kubeflow/pipelines COPY sdk sdk diff --git a/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py b/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py index ed7f1031e7be..13f5be3e531d 100644 --- a/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py +++ b/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py @@ -28,6 +28,7 @@ from tfx.components.statistics_gen.component import StatisticsGen from tfx.components.trainer.component import Trainer from tfx.components.transform.component import Transform +from tfx.orchestration import data_types from tfx.orchestration import pipeline from tfx.orchestration.kubeflow import kubeflow_dag_runner from tfx.proto import evaluator_pb2 @@ -37,14 +38,18 @@ # Define pipeline params used for pipeline execution. # Path to the module file, should be a GCS path. -_taxi_module_file_param = dsl.PipelineParam( +_taxi_module_file_param = data_types.RuntimeParameter( name='module-file', - value='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py' + default= + 'gs://ml-pipeline-playground/tfx_taxi_simple/modules/tfx_taxi_utils_1205.py', + ptype=Text, ) # Path to the CSV data file, under which their should be a data.csv file. -_data_root_param = dsl.PipelineParam( - name='data-root', value='gs://ml-pipeline-playground/tfx_taxi_simple/data' +_data_root_param = data_types.RuntimeParameter( + name='data-root', + default='gs://ml-pipeline-playground/tfx_taxi_simple/data', + ptype=Text, ) # Path of pipeline root, should be a GCS path. @@ -54,8 +59,8 @@ def _create_test_pipeline( - pipeline_root: Text, csv_input_location: Text, taxi_module_file: Text, - enable_cache: bool + pipeline_root: Text, csv_input_location: data_types.RuntimeParameter, + taxi_module_file: data_types.RuntimeParameter, enable_cache: bool ): """Creates a simple Kubeflow-based Chicago Taxi TFX pipeline. @@ -71,17 +76,17 @@ def _create_test_pipeline( examples = external_input(csv_input_location) example_gen = CsvExampleGen(input=examples) - statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples']) + statistics_gen = StatisticsGen(examples=example_gen.outputs['examples']) infer_schema = SchemaGen( - stats=statistics_gen.outputs['statistics'], + statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False, ) validate_stats = ExampleValidator( - stats=statistics_gen.outputs['statistics'], + statistics=statistics_gen.outputs['statistics'], schema=infer_schema.outputs['schema'], ) transform = Transform( - input_data=example_gen.outputs['examples'], + examples=example_gen.outputs['examples'], schema=infer_schema.outputs['schema'], module_file=taxi_module_file, ) @@ -89,13 +94,13 @@ def _create_test_pipeline( module_file=taxi_module_file, transformed_examples=transform.outputs['transformed_examples'], schema=infer_schema.outputs['schema'], - transform_output=transform.outputs['transform_graph'], + transform_graph=transform.outputs['transform_graph'], train_args=trainer_pb2.TrainArgs(num_steps=10), eval_args=trainer_pb2.EvalArgs(num_steps=5), ) model_analyzer = Evaluator( examples=example_gen.outputs['examples'], - model_exports=trainer.outputs['model'], + model=trainer.outputs['model'], feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec( specs=[ evaluator_pb2.SingleSlicingSpec( @@ -114,7 +119,7 @@ def _create_test_pipeline( # https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226 _pipeline_root_param = dsl.PipelineParam(name='pipeline-root') pusher = Pusher( - model_export=trainer.outputs['model'], + model=trainer.outputs['model'], model_blessing=model_validator.outputs['blessing'], push_destination=pusher_pb2.PushDestination( filesystem=pusher_pb2.PushDestination.Filesystem( @@ -140,8 +145,8 @@ def _create_test_pipeline( enable_cache = True pipeline = _create_test_pipeline( pipeline_root, - str(_data_root_param), - str(_taxi_module_file_param), + _data_root_param, + _taxi_module_file_param, enable_cache=enable_cache, ) # Make sure the version of TFX image used is consistent with the version of @@ -150,15 +155,16 @@ def _create_test_pipeline( kubeflow_metadata_config=kubeflow_dag_runner. get_default_kubeflow_metadata_config(), # TODO: remove this override when KubeflowDagRunnerConfig doesn't default to use_gcp_secret op. - pipeline_operator_funcs=list(filter( - lambda operator: operator.__name__.find('gcp_secret') == -1, - kubeflow_dag_runner.get_default_pipeline_operator_funcs())), - tfx_image='tensorflow/tfx:0.15.0', + pipeline_operator_funcs=list( + filter( + lambda operator: operator.__name__.find('gcp_secret') == -1, + kubeflow_dag_runner.get_default_pipeline_operator_funcs() + ) + ), + tfx_image='tensorflow/tfx:0.21.0rc0', ) kfp_runner = kubeflow_dag_runner.KubeflowDagRunner( output_filename=__file__ + '.yaml', config=config ) - # Make sure kfp_runner recognizes those parameters. - kfp_runner._params.extend([_data_root_param, _taxi_module_file_param]) kfp_runner.run(pipeline) diff --git a/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb b/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb index 3d581df0021a..adf6b6a08ea1 100644 --- a/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb +++ b/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb @@ -18,7 +18,7 @@ "\n", "This pipeline requires Google Cloud Storage permission to run. \n", "If KFP was deployed through K8S marketplace, please follow instructions in [the guideline](https://github.com/kubeflow/pipelines/blob/master/manifests/gcp_marketplace/guide.md#gcp-service-account-credentials)\n", - "to make sure the service account has `storage.admin` role." + "to make sure that the GCP full scope access was enabled when creating the cluster, or at least, that the service account has `storage.admin` role." ] }, { @@ -29,28 +29,19 @@ "source": [ "!python3 -m pip install pip --upgrade --quiet --user\n", "!python3 -m pip install kfp --upgrade --quiet --user\n", - "!python3 -m pip install tfx --upgrade --quiet --user" + "!python3 -m pip install tfx==0.21.0rc0 --quiet --user" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "In this example we'll need a very recent version of TFX SDK to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n", + "In this example we'll need TFX SDK later than 0.21 to leverage the [`RuntimeParameter`](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/orchestration/data_types.py#L137) feature.\n", "\n", "## RuntimeParameter in TFX DSL\n", "Currently, TFX DSL only supports parameterizing field in the `PARAMETERS` section of `ComponentSpec`, see [here](https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/types/component_spec.py#L126). This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as `RuntimeParameter` objects. In other word, the dictionary should be able to be passed in to [`ParseDict()` method](https://github.com/protocolbuffers/protobuf/blob/04a11fc91668884d1793bff2a0f72ee6ce4f5edd/python/google/protobuf/json_format.py#L433) and produce the correct pb message." ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "!python3 -m pip install --quiet --index-url https://test.pypi.org/simple/ tfx==0.16.0.dev20191212 --user" - ] - }, { "cell_type": "code", "execution_count": null, @@ -92,6 +83,7 @@ "\n", "# Path of pipeline data root, should be a GCS path.\n", "# Note that when running on KFP, the pipeline root is always a runtime parameter.\n", + "# The value specified here will be its default.\n", "pipeline_root = os.path.join('gs://my-bucket', 'tfx_taxi_simple',\n", " kfp.dsl.RUN_ID_PLACEHOLDER)\n", "\n", @@ -160,7 +152,7 @@ "metadata": {}, "outputs": [], "source": [ - "statistics_gen = StatisticsGen(input_data=example_gen.outputs['examples'])" + "statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])" ] }, { @@ -170,7 +162,7 @@ "outputs": [], "source": [ "infer_schema = SchemaGen(\n", - " stats=statistics_gen.outputs['statistics'], infer_feature_shape=False)" + " statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)" ] }, { @@ -180,7 +172,7 @@ "outputs": [], "source": [ "validate_stats = ExampleValidator(\n", - " stats=statistics_gen.outputs['statistics'],\n", + " statistics=statistics_gen.outputs['statistics'],\n", " schema=infer_schema.outputs['schema'])" ] }, @@ -193,7 +185,7 @@ "# The module file used in Transform and Trainer component is paramterized by\n", "# _taxi_module_file_param.\n", "transform = Transform(\n", - " input_data=example_gen.outputs['examples'],\n", + " examples=example_gen.outputs['examples'],\n", " schema=infer_schema.outputs['schema'],\n", " module_file=taxi_module_file_param)" ] @@ -210,7 +202,7 @@ " module_file=taxi_module_file_param,\n", " transformed_examples=transform.outputs['transformed_examples'],\n", " schema=infer_schema.outputs['schema'],\n", - " transform_output=transform.outputs['transform_graph'],\n", + " transform_graph=transform.outputs['transform_graph'],\n", " train_args={'num_steps': train_steps},\n", " eval_args={'num_steps': eval_steps})" ] @@ -224,7 +216,7 @@ "# The name of slicing column is specified as a RuntimeParameter.\n", "model_analyzer = Evaluator(\n", " examples=example_gen.outputs['examples'],\n", - " model_exports=trainer.outputs['model'],\n", + " model=trainer.outputs['model'],\n", " feature_slicing_spec=dict(specs=[{\n", " 'column_for_slicing': [slicing_column]\n", " }]))" @@ -246,19 +238,18 @@ "metadata": {}, "outputs": [], "source": [ - "# Currently we use this hack to ensure push_destination can\n", - "# be correctly parameterized and interpreted.\n", + "# Hack: ensuring push_destination can be correctly parameterized and interpreted.\n", "# pipeline root will be specified as a dsl.PipelineParam with the name\n", "# pipeline-root, see:\n", "# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226\n", - "pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n", + "_pipeline_root_param = dsl.PipelineParam(name='pipeline-root')\n", "pusher = Pusher(\n", - " model_export=trainer.outputs['model'],\n", + " model=trainer.outputs['model'],\n", " model_blessing=model_validator.outputs['blessing'],\n", " push_destination=pusher_pb2.PushDestination(\n", " filesystem=pusher_pb2.PushDestination.Filesystem(\n", " base_directory=os.path.join(\n", - " str(pipeline_root_param), 'model_serving'))))\n" + " str(_pipeline_root_param), 'model_serving'))))\n" ] }, { @@ -290,10 +281,15 @@ "source": [ "# Specify a TFX docker image. For the full list of tags please see:\n", "# https://hub.docker.com/r/tensorflow/tfx/tags\n", - "tfx_image = 'tensorflow/tfx:0.16.0.dev20191205'\n", + "tfx_image = 'tensorflow/tfx:0.21.0rc0'\n", "config = kubeflow_dag_runner.KubeflowDagRunnerConfig(\n", " kubeflow_metadata_config=kubeflow_dag_runner\n", " .get_default_kubeflow_metadata_config(),\n", + " # Switch to use GCP service account by deleting the next line,\n", + " # if KFP is operating workload identity, or with GCP full scope permission.\n", + " pipeline_operator_funcs=list(filter(\n", + " lambda operator: operator.__name__.find('gcp_secret') == -1,\n", + " kubeflow_dag_runner.get_default_pipeline_operator_funcs())),\n", " tfx_image=tfx_image)\n", "kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)\n", "# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.\n", @@ -308,13 +304,13 @@ "outputs": [], "source": [ "run_result = kfp.Client(\n", - " host=''\n", + " host='1234567abcde-dot-us-central2.pipelines.googleusercontent.com' # Put your KFP endpoint here\n", ").create_run_from_pipeline_package(\n", " pipeline_name + '.tar.gz', \n", " arguments={\n", - " 'pipeline-root': '' + kfp.dsl.RUN_ID_PLACEHOLDER,\n", - " 'module-file': '', # delete this line to use default module file.\n", - " 'data-root': '' # delete this line to use default data.\n", + " 'pipeline-root': 'gs:///tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,\n", + " # 'module-file': '', # delete this line to use default module file.\n", + " # 'data-root': '' # delete this line to use default data.\n", "})" ] } @@ -335,7 +331,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.5.3" + "version": "3.7.5rc1" } }, "nbformat": 4, diff --git a/test/sample-test/Dockerfile b/test/sample-test/Dockerfile index 1b1d82663619..0b85065d9dc9 100644 --- a/test/sample-test/Dockerfile +++ b/test/sample-test/Dockerfile @@ -19,7 +19,7 @@ RUN pip3 install google-api-python-client==1.7.0 RUN pip3 install google-cloud-storage==1.17.0 RUN pip3 install fire==0.2.1 RUN pip3 install yamale==2.0 -RUN pip3 install apache-beam[gcp]==2.17 pyarrow==0.14.1 tfx==0.15.0 +RUN pip3 install tfx==0.21.0rc0 # Install python client, including DSL compiler. COPY ./sdk/python /sdk/python