Skip to content

Commit

Permalink
dataflow: extensible template with javascript udf (GoogleCloudPlatfor…
Browse files Browse the repository at this point in the history
…m#8291)

* dataflow: extensible template with javascript udf

* address dandhlee's comments

* address david's review comments

* fix kwargs
  • Loading branch information
anguillanneuf authored Aug 30, 2022
1 parent ad02dcc commit 0f79dfe
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 11 deletions.
86 changes: 75 additions & 11 deletions dataflow/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

from dataclasses import dataclass
from google.api_core.exceptions import NotFound
import itertools
import json
import logging
Expand Down Expand Up @@ -91,23 +92,44 @@ def storage_bucket(name: str) -> str:
logging.info(f"Deleted storage_bucket: {bucket.name}")

@staticmethod
def bigquery_dataset(name: str, project: str = PROJECT) -> str:
def bigquery_dataset(
name: str,
project: str = PROJECT,
location: str = REGION,
) -> str:
from google.cloud import bigquery

bigquery_client = bigquery.Client()

dataset_name = Utils.underscore_name(name)
dataset = bigquery_client.create_dataset(
bigquery.Dataset(f"{project}.{dataset_name}")
)
dataset = bigquery.Dataset(f"{project}.{dataset_name}")
dataset.location = location
result = bigquery_client.create_dataset(dataset)

logging.info(f"Created bigquery_dataset: {result.full_dataset_id}")
yield result.dataset_id

logging.info(f"Created bigquery_dataset: {dataset.full_dataset_id}")
yield dataset_name
try:
bigquery_client.delete_dataset(
f"{project}.{dataset_name}", delete_contents=True
)
logging.info(f"Deleted bigquery_dataset: {result.full_dataset_id}")
except NotFound:
logging.info(f"{result.full_dataset_id} already deleted.")

bigquery_client.delete_dataset(
f"{project}.{dataset_name}", delete_contents=True
@staticmethod
def bigquery_table(
dataset_name: str, table_name: str, project: str = PROJECT, **kwargs
) -> str:
from google.cloud import bigquery
bigquery_client = bigquery.Client()
table = bigquery.Table(
f"{project}.{dataset_name}.{table_name}", **kwargs
)
logging.info(f"Deleted bigquery_dataset: {dataset.full_dataset_id}")
result = bigquery_client.create_table(table)
logging.info(f"Created bigquery_table: {result.full_table_id}")
yield result.table_id
# This table will be deleted when the dataset is deleted.

@staticmethod
def bigquery_table_exists(
Expand Down Expand Up @@ -138,7 +160,7 @@ def pubsub_topic(name: str, project: str = PROJECT) -> str:

publisher_client = pubsub.PublisherClient()
topic_path = publisher_client.topic_path(project, Utils.hyphen_name(name))
topic = publisher_client.create_topic(topic_path)
topic = publisher_client.create_topic(request={"name": topic_path})

logging.info(f"Created pubsub_topic: {topic.name}")
yield topic.name
Expand All @@ -164,7 +186,9 @@ def pubsub_subscription(
subscription_path = subscriber.subscription_path(
project, Utils.hyphen_name(name)
)
subscription = subscriber.create_subscription(subscription_path, topic_path)
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)

logging.info(f"Created pubsub_subscription: {subscription.name}")
yield subscription.name
Expand Down Expand Up @@ -525,6 +549,46 @@ def dataflow_flex_template_run(

Utils.dataflow_jobs_cancel(job_id)

@staticmethod
def dataflow_extensible_template_run(
job_name: str,
template_path: str,
bucket_name: str,
parameters: Dict[str, str] = {},
project: str = PROJECT,
region: str = REGION,
) -> str:
import yaml

unique_job_name = Utils.hyphen_name(job_name)
logging.info(f"dataflow_job_name: {unique_job_name}")
cmd = [
"gcloud",
"dataflow",
"jobs",
"run",
unique_job_name,
f"--gcs-location={template_path}",
f"--project={project}",
f"--region={region}",
f"--staging-location=gs://{bucket_name}/staging",
] + [
f"--parameters={name}={value}"
for name, value in {
**parameters,
}.items()
]
logging.info(cmd)

stdout = subprocess.check_output(cmd).decode("utf-8")
logging.info(f"Launched Dataflow Template job: {unique_job_name}")
job_id = yaml.safe_load(stdout)["id"]
logging.info(f"Dataflow Template job id: {job_id}")
logging.info(f">> {Utils.dataflow_job_url(job_id, project, region)}")
yield job_id

Utils.dataflow_jobs_cancel(job_id)


@pytest.fixture(scope="session")
def utils() -> Utils:
Expand Down
37 changes: 37 additions & 0 deletions dataflow/extensible-templates/dataflow_udf_transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// [START dataflow_extensible_template_udf]
/**
* User-defined function (UDF) to transform events
* as part of a Dataflow template job.
*
* @param {string} inJson input Pub/Sub JSON message (stringified)
*/
function process(inJson) {
// Nashorn engine is only ECMAScript 5.1 (ES5) compliant. Newer ES6
// JavaScript keywords like `let` or `const` will cause syntax errors.
var obj = JSON.parse(inJson);
var includePubsubMessage = obj.data && obj.attributes;
var data = includePubsubMessage ? obj.data : obj;

if (!data.hasOwnProperty('url')) {
throw new Error("No url found");
} else if (data.url !== "https://beam.apache.org/") {
throw new Error("Unrecognized url");
}

return JSON.stringify(obj);
}
// [END dataflow_extensible_template_udf]
129 changes: 129 additions & 0 deletions dataflow/extensible-templates/e2e_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

import json
import os
import time

from google.cloud import bigquery
from google.cloud import pubsub
from google.cloud import storage

try:
# `conftest` cannot be imported when running in `nox`, but we still
# try to import it for the autocomplete when writing the tests.
from conftest import Utils
except ModuleNotFoundError:
Utils = None
import pytest

PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]

NAME = "dataflow/extensible-template"

BQ_TABLE = "ratings"


@pytest.fixture(scope="session")
def bucket_name(utils: Utils) -> str:
yield from utils.storage_bucket(NAME)


@pytest.fixture(scope="session")
def pubsub_topic(utils: Utils) -> str:
yield from utils.pubsub_topic(NAME)


@pytest.fixture(scope="session")
def bq_dataset(utils: Utils) -> str:
yield from utils.bigquery_dataset(NAME)


@pytest.fixture(scope="session")
def bq_table(utils: Utils, bq_dataset: str) -> str:
yield from utils.bigquery_table(
bq_dataset,
BQ_TABLE,
schema=[
bigquery.SchemaField("url", "STRING", mode="REQUIRED"),
bigquery.SchemaField("review", "STRING", mode="REQUIRED"),
],
)


@pytest.fixture(scope="session")
def dataflow_job_id(
utils: Utils,
bucket_name: str,
pubsub_topic: str,
bq_dataset: str,
bq_table: str,
) -> str:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob("js/dataflow_udf_transform.js")
blob.upload_from_filename(os.path.join(os.getcwd(), "dataflow_udf_transform.js"))
output_table = f"{PROJECT}:{bq_dataset}.{bq_table}"
yield from utils.dataflow_extensible_template_run(
job_name=NAME,
template_path="gs://dataflow-templates/latest/PubSub_to_BigQuery",
bucket_name=bucket_name,
parameters={
"inputTopic": pubsub_topic,
"outputTableSpec": output_table,
"javascriptTextTransformGcsPath": f"gs://{bucket_name}/js/dataflow_udf_transform.js",
"javascriptTextTransformFunctionName": "process",
},
)


def test_extensible_template(
utils: Utils,
pubsub_topic: str,
dataflow_job_id: str,
bq_dataset: str,
bq_table: str,
) -> None:
publisher_client = pubsub.PublisherClient()
for i in range(30):
good_msg = json.dumps(
{
"url": "https://beam.apache.org/",
"review": "positive" if i % 2 == 0 else "negative",
}
)
publisher_client.publish(pubsub_topic, good_msg.encode("utf-8"))
bad_msg = json.dumps(
{
"url": "https://kafka.apache.org/",
"review": "positive" if i % 2 == 0 else "negative",
}
)
publisher_client.publish(pubsub_topic, bad_msg.encode("utf-8"))
time.sleep(10)

# Wait until the dataflow job starts running successfully.
# The job is cancelled as part of the teardown to avoid leaking resource.
utils.dataflow_jobs_wait(
dataflow_job_id,
target_states={"JOB_STATE_RUNNING"},
timeout_sec=300,
poll_interval_sec=30,
)

query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}`"
good_records = utils.bigquery_query(query)
assert len(list(good_records)) > 0

query = f"SELECT * FROM `{PROJECT}.{bq_dataset}.{bq_table}_error_records`"
bad_records = utils.bigquery_query(query)
assert len(list(bad_records)) > 0
42 changes: 42 additions & 0 deletions dataflow/extensible-templates/noxfile_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Default TEST_CONFIG_OVERRIDE for python repos.

# You can copy this file into your directory, then it will be inported from
# the noxfile.py.

# The source of truth:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
"ignored_versions": ["2.7", "3.6", "3.7", "3.8", "3.10"],
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": True,
# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
# If you need to use a specific version of pip,
# change pip_version_override to the string representation
# of the version number, for example, "20.2.4"
"pip_version_override": None,
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
# "envs": {},
}
6 changes: 6 additions & 0 deletions dataflow/extensible-templates/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
google-api-python-client==2.47.0
google-cloud-bigquery==3.3.2
google-cloud-storage==2.1.0
pytest-xdist==2.5.0
pytest==7.0.1
pyyaml==6.0
1 change: 1 addition & 0 deletions dataflow/extensible-templates/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
apache-beam[gcp]==2.41.0

0 comments on commit 0f79dfe

Please sign in to comment.