From 0f79dfef2bdacf0d5c46889d4dd177b07d2c867d Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 30 Aug 2022 16:51:20 -0700 Subject: [PATCH] dataflow: extensible template with javascript udf (#8291) * dataflow: extensible template with javascript udf * address dandhlee's comments * address david's review comments * fix kwargs --- dataflow/conftest.py | 86 ++++++++++-- .../dataflow_udf_transform.js | 37 +++++ dataflow/extensible-templates/e2e_test.py | 129 ++++++++++++++++++ .../extensible-templates/noxfile_config.py | 42 ++++++ .../requirements-test.txt | 6 + .../extensible-templates/requirements.txt | 1 + 6 files changed, 290 insertions(+), 11 deletions(-) create mode 100644 dataflow/extensible-templates/dataflow_udf_transform.js create mode 100644 dataflow/extensible-templates/e2e_test.py create mode 100644 dataflow/extensible-templates/noxfile_config.py create mode 100644 dataflow/extensible-templates/requirements-test.txt create mode 100644 dataflow/extensible-templates/requirements.txt diff --git a/dataflow/conftest.py b/dataflow/conftest.py index 05f1ceb1a6a0..363c54ec3645 100644 --- a/dataflow/conftest.py +++ b/dataflow/conftest.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. from dataclasses import dataclass +from google.api_core.exceptions import NotFound import itertools import json import logging @@ -91,23 +92,44 @@ def storage_bucket(name: str) -> str: logging.info(f"Deleted storage_bucket: {bucket.name}") @staticmethod - def bigquery_dataset(name: str, project: str = PROJECT) -> str: + def bigquery_dataset( + name: str, + project: str = PROJECT, + location: str = REGION, + ) -> str: from google.cloud import bigquery bigquery_client = bigquery.Client() dataset_name = Utils.underscore_name(name) - dataset = bigquery_client.create_dataset( - bigquery.Dataset(f"{project}.{dataset_name}") - ) + dataset = bigquery.Dataset(f"{project}.{dataset_name}") + dataset.location = location + result = bigquery_client.create_dataset(dataset) + + logging.info(f"Created bigquery_dataset: {result.full_dataset_id}") + yield result.dataset_id - logging.info(f"Created bigquery_dataset: {dataset.full_dataset_id}") - yield dataset_name + try: + bigquery_client.delete_dataset( + f"{project}.{dataset_name}", delete_contents=True + ) + logging.info(f"Deleted bigquery_dataset: {result.full_dataset_id}") + except NotFound: + logging.info(f"{result.full_dataset_id} already deleted.") - bigquery_client.delete_dataset( - f"{project}.{dataset_name}", delete_contents=True + @staticmethod + def bigquery_table( + dataset_name: str, table_name: str, project: str = PROJECT, **kwargs + ) -> str: + from google.cloud import bigquery + bigquery_client = bigquery.Client() + table = bigquery.Table( + f"{project}.{dataset_name}.{table_name}", **kwargs ) - logging.info(f"Deleted bigquery_dataset: {dataset.full_dataset_id}") + result = bigquery_client.create_table(table) + logging.info(f"Created bigquery_table: {result.full_table_id}") + yield result.table_id + # This table will be deleted when the dataset is deleted. @staticmethod def bigquery_table_exists( @@ -138,7 +160,7 @@ def pubsub_topic(name: str, project: str = PROJECT) -> str: publisher_client = pubsub.PublisherClient() topic_path = publisher_client.topic_path(project, Utils.hyphen_name(name)) - topic = publisher_client.create_topic(topic_path) + topic = publisher_client.create_topic(request={"name": topic_path}) logging.info(f"Created pubsub_topic: {topic.name}") yield topic.name @@ -164,7 +186,9 @@ def pubsub_subscription( subscription_path = subscriber.subscription_path( project, Utils.hyphen_name(name) ) - subscription = subscriber.create_subscription(subscription_path, topic_path) + subscription = subscriber.create_subscription( + request={"name": subscription_path, "topic": topic_path} + ) logging.info(f"Created pubsub_subscription: {subscription.name}") yield subscription.name @@ -525,6 +549,46 @@ def dataflow_flex_template_run( Utils.dataflow_jobs_cancel(job_id) + @staticmethod + def dataflow_extensible_template_run( + job_name: str, + template_path: str, + bucket_name: str, + parameters: Dict[str, str] = {}, + project: str = PROJECT, + region: str = REGION, + ) -> str: + import yaml + + unique_job_name = Utils.hyphen_name(job_name) + logging.info(f"dataflow_job_name: {unique_job_name}") + cmd = [ + "gcloud", + "dataflow", + "jobs", + "run", + unique_job_name, + f"--gcs-location={template_path}", + f"--project={project}", + f"--region={region}", + f"--staging-location=gs://{bucket_name}/staging", + ] + [ + f"--parameters={name}={value}" + for name, value in { + **parameters, + }.items() + ] + logging.info(cmd) + + stdout = subprocess.check_output(cmd).decode("utf-8") + logging.info(f"Launched Dataflow Template job: {unique_job_name}") + job_id = yaml.safe_load(stdout)["id"] + logging.info(f"Dataflow Template job id: {job_id}") + logging.info(f">> {Utils.dataflow_job_url(job_id, project, region)}") + yield job_id + + Utils.dataflow_jobs_cancel(job_id) + @pytest.fixture(scope="session") def utils() -> Utils: diff --git a/dataflow/extensible-templates/dataflow_udf_transform.js b/dataflow/extensible-templates/dataflow_udf_transform.js new file mode 100644 index 000000000000..b0fc21cbd3f1 --- /dev/null +++ b/dataflow/extensible-templates/dataflow_udf_transform.js @@ -0,0 +1,37 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// [START dataflow_extensible_template_udf] +/** + * User-defined function (UDF) to transform events + * as part of a Dataflow template job. + * + * @param {string} inJson input Pub/Sub JSON message (stringified) + */ + function process(inJson) { + // Nashorn engine is only ECMAScript 5.1 (ES5) compliant. Newer ES6 + // JavaScript keywords like `let` or `const` will cause syntax errors. + var obj = JSON.parse(inJson); + var includePubsubMessage = obj.data && obj.attributes; + var data = includePubsubMessage ? obj.data : obj; + + if (!data.hasOwnProperty('url')) { + throw new Error("No url found"); + } else if (data.url !== "https://beam.apache.org/") { + throw new Error("Unrecognized url"); + } + + return JSON.stringify(obj); + } +// [END dataflow_extensible_template_udf] diff --git a/dataflow/extensible-templates/e2e_test.py b/dataflow/extensible-templates/e2e_test.py new file mode 100644 index 000000000000..fcb58c97097c --- /dev/null +++ b/dataflow/extensible-templates/e2e_test.py @@ -0,0 +1,129 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +import json +import os +import time + +from google.cloud import bigquery +from google.cloud import pubsub +from google.cloud import storage + +try: + # `conftest` cannot be imported when running in `nox`, but we still + # try to import it for the autocomplete when writing the tests. + from conftest import Utils +except ModuleNotFoundError: + Utils = None +import pytest + +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] + +NAME = "dataflow/extensible-template" + +BQ_TABLE = "ratings" + + +@pytest.fixture(scope="session") +def bucket_name(utils: Utils) -> str: + yield from utils.storage_bucket(NAME) + + +@pytest.fixture(scope="session") +def pubsub_topic(utils: Utils) -> str: + yield from utils.pubsub_topic(NAME) + + +@pytest.fixture(scope="session") +def bq_dataset(utils: Utils) -> str: + yield from utils.bigquery_dataset(NAME) + + +@pytest.fixture(scope="session") +def bq_table(utils: Utils, bq_dataset: str) -> str: + yield from utils.bigquery_table( + bq_dataset, + BQ_TABLE, + schema=[ + bigquery.SchemaField("url", "STRING", mode="REQUIRED"), + bigquery.SchemaField("review", "STRING", mode="REQUIRED"), + ], + ) + + +@pytest.fixture(scope="session") +def dataflow_job_id( + utils: Utils, + bucket_name: str, + pubsub_topic: str, + bq_dataset: str, + bq_table: str, +) -> str: + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob("js/dataflow_udf_transform.js") + blob.upload_from_filename(os.path.join(os.getcwd(), "dataflow_udf_transform.js")) + output_table = f"{PROJECT}:{bq_dataset}.{bq_table}" + yield from utils.dataflow_extensible_template_run( + job_name=NAME, + template_path="gs://dataflow-templates/latest/PubSub_to_BigQuery", + bucket_name=bucket_name, + parameters={ + "inputTopic": pubsub_topic, + "outputTableSpec": output_table, + "javascriptTextTransformGcsPath": f"gs://{bucket_name}/js/dataflow_udf_transform.js", + "javascriptTextTransformFunctionName": "process", + }, + ) + + +def test_extensible_template( + utils: Utils, + pubsub_topic: str, + dataflow_job_id: str, + bq_dataset: str, + bq_table: str, +) -> None: + publisher_client = pubsub.PublisherClient() + for i in range(30): + good_msg = json.dumps( + { + "url": "https://beam.apache.org/", + "review": "positive" if i % 2 == 0 else "negative", + } + ) + publisher_client.publish(pubsub_topic, good_msg.encode("utf-8")) + bad_msg = json.dumps( + { + "url": "https://kafka.apache.org/", + "review": "positive" if i % 2 == 0 else "negative", + } + ) + publisher_client.publish(pubsub_topic, bad_msg.encode("utf-8")) + time.sleep(10) + + # Wait until the dataflow job starts running successfully. + # The job is cancelled as part of the teardown to avoid leaking resource. + utils.dataflow_jobs_wait( + dataflow_job_id, + target_states={"JOB_STATE_RUNNING"}, + timeout_sec=300, + poll_interval_sec=30, + ) + + query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}`" + good_records = utils.bigquery_query(query) + assert len(list(good_records)) > 0 + + query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}_error_records`" + bad_records = utils.bigquery_query(query) + assert len(list(bad_records)) > 0 diff --git a/dataflow/extensible-templates/noxfile_config.py b/dataflow/extensible-templates/noxfile_config.py new file mode 100644 index 000000000000..d9fec8aa43af --- /dev/null +++ b/dataflow/extensible-templates/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be inported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7", "3.6", "3.7", "3.8", "3.10"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + # "envs": {}, +} diff --git a/dataflow/extensible-templates/requirements-test.txt b/dataflow/extensible-templates/requirements-test.txt new file mode 100644 index 000000000000..6b67fee9b84b --- /dev/null +++ b/dataflow/extensible-templates/requirements-test.txt @@ -0,0 +1,6 @@ +google-api-python-client==2.47.0 +google-cloud-bigquery==3.3.2 +google-cloud-storage==2.1.0 +pytest-xdist==2.5.0 +pytest==7.0.1 +pyyaml==6.0 \ No newline at end of file diff --git a/dataflow/extensible-templates/requirements.txt b/dataflow/extensible-templates/requirements.txt new file mode 100644 index 000000000000..69fcb8035bf2 --- /dev/null +++ b/dataflow/extensible-templates/requirements.txt @@ -0,0 +1 @@ +apache-beam[gcp]==2.41.0