-
Notifications
You must be signed in to change notification settings - Fork 6.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
dataflow: extensible template with javascript udf (#8291)
* dataflow: extensible template with javascript udf * address dandhlee's comments * address david's review comments * fix kwargs
- Loading branch information
1 parent
ad02dcc
commit 0f79dfe
Showing
6 changed files
with
290 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
// Copyright 2022 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// [START dataflow_extensible_template_udf] | ||
/** | ||
* User-defined function (UDF) to transform events | ||
* as part of a Dataflow template job. | ||
* | ||
* @param {string} inJson input Pub/Sub JSON message (stringified) | ||
*/ | ||
function process(inJson) { | ||
// Nashorn engine is only ECMAScript 5.1 (ES5) compliant. Newer ES6 | ||
// JavaScript keywords like `let` or `const` will cause syntax errors. | ||
var obj = JSON.parse(inJson); | ||
var includePubsubMessage = obj.data && obj.attributes; | ||
var data = includePubsubMessage ? obj.data : obj; | ||
|
||
if (!data.hasOwnProperty('url')) { | ||
throw new Error("No url found"); | ||
} else if (data.url !== "https://beam.apache.org/") { | ||
throw new Error("Unrecognized url"); | ||
} | ||
|
||
return JSON.stringify(obj); | ||
} | ||
// [END dataflow_extensible_template_udf] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
# Copyright 2022 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the 'License'); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an 'AS IS' BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
|
||
import json | ||
import os | ||
import time | ||
|
||
from google.cloud import bigquery | ||
from google.cloud import pubsub | ||
from google.cloud import storage | ||
|
||
try: | ||
# `conftest` cannot be imported when running in `nox`, but we still | ||
# try to import it for the autocomplete when writing the tests. | ||
from conftest import Utils | ||
except ModuleNotFoundError: | ||
Utils = None | ||
import pytest | ||
|
||
PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] | ||
|
||
NAME = "dataflow/extensible-template" | ||
|
||
BQ_TABLE = "ratings" | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def bucket_name(utils: Utils) -> str: | ||
yield from utils.storage_bucket(NAME) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def pubsub_topic(utils: Utils) -> str: | ||
yield from utils.pubsub_topic(NAME) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def bq_dataset(utils: Utils) -> str: | ||
yield from utils.bigquery_dataset(NAME) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def bq_table(utils: Utils, bq_dataset: str) -> str: | ||
yield from utils.bigquery_table( | ||
bq_dataset, | ||
BQ_TABLE, | ||
schema=[ | ||
bigquery.SchemaField("url", "STRING", mode="REQUIRED"), | ||
bigquery.SchemaField("review", "STRING", mode="REQUIRED"), | ||
], | ||
) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def dataflow_job_id( | ||
utils: Utils, | ||
bucket_name: str, | ||
pubsub_topic: str, | ||
bq_dataset: str, | ||
bq_table: str, | ||
) -> str: | ||
storage_client = storage.Client() | ||
bucket = storage_client.bucket(bucket_name) | ||
blob = bucket.blob("js/dataflow_udf_transform.js") | ||
blob.upload_from_filename(os.path.join(os.getcwd(), "dataflow_udf_transform.js")) | ||
output_table = f"{PROJECT}:{bq_dataset}.{bq_table}" | ||
yield from utils.dataflow_extensible_template_run( | ||
job_name=NAME, | ||
template_path="gs://dataflow-templates/latest/PubSub_to_BigQuery", | ||
bucket_name=bucket_name, | ||
parameters={ | ||
"inputTopic": pubsub_topic, | ||
"outputTableSpec": output_table, | ||
"javascriptTextTransformGcsPath": f"gs://{bucket_name}/js/dataflow_udf_transform.js", | ||
"javascriptTextTransformFunctionName": "process", | ||
}, | ||
) | ||
|
||
|
||
def test_extensible_template( | ||
utils: Utils, | ||
pubsub_topic: str, | ||
dataflow_job_id: str, | ||
bq_dataset: str, | ||
bq_table: str, | ||
) -> None: | ||
publisher_client = pubsub.PublisherClient() | ||
for i in range(30): | ||
good_msg = json.dumps( | ||
{ | ||
"url": "https://beam.apache.org/", | ||
"review": "positive" if i % 2 == 0 else "negative", | ||
} | ||
) | ||
publisher_client.publish(pubsub_topic, good_msg.encode("utf-8")) | ||
bad_msg = json.dumps( | ||
{ | ||
"url": "https://kafka.apache.org/", | ||
"review": "positive" if i % 2 == 0 else "negative", | ||
} | ||
) | ||
publisher_client.publish(pubsub_topic, bad_msg.encode("utf-8")) | ||
time.sleep(10) | ||
|
||
# Wait until the dataflow job starts running successfully. | ||
# The job is cancelled as part of the teardown to avoid leaking resource. | ||
utils.dataflow_jobs_wait( | ||
dataflow_job_id, | ||
target_states={"JOB_STATE_RUNNING"}, | ||
timeout_sec=300, | ||
poll_interval_sec=30, | ||
) | ||
|
||
query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}`" | ||
good_records = utils.bigquery_query(query) | ||
assert len(list(good_records)) > 0 | ||
|
||
query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}_error_records`" | ||
bad_records = utils.bigquery_query(query) | ||
assert len(list(bad_records)) > 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Copyright 2022 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# Default TEST_CONFIG_OVERRIDE for python repos. | ||
|
||
# You can copy this file into your directory, then it will be inported from | ||
# the noxfile.py. | ||
|
||
# The source of truth: | ||
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py | ||
|
||
TEST_CONFIG_OVERRIDE = { | ||
# You can opt out from the test for specific Python versions. | ||
"ignored_versions": ["2.7", "3.6", "3.7", "3.8", "3.10"], | ||
# Old samples are opted out of enforcing Python type hints | ||
# All new samples should feature them | ||
"enforce_type_hints": True, | ||
# An envvar key for determining the project id to use. Change it | ||
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a | ||
# build specific Cloud project. You can also use your own string | ||
# to use your own Cloud project. | ||
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT", | ||
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', | ||
# If you need to use a specific version of pip, | ||
# change pip_version_override to the string representation | ||
# of the version number, for example, "20.2.4" | ||
"pip_version_override": None, | ||
# A dictionary you want to inject into your test. Don't put any | ||
# secrets here. These values will override predefined values. | ||
# "envs": {}, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
google-api-python-client==2.47.0 | ||
google-cloud-bigquery==3.3.2 | ||
google-cloud-storage==2.1.0 | ||
pytest-xdist==2.5.0 | ||
pytest==7.0.1 | ||
pyyaml==6.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
apache-beam[gcp]==2.41.0 |