From 92770455bf85d6949a727473773e14896d8b3cfb Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 11 Dec 2017 16:42:06 -0800 Subject: [PATCH] Add comments and region tags to Cloud Tasks samples --- appengine/flexible/tasks/README.md | 4 +-- .../tasks/create_app_engine_queue_task.py | 33 ++++++++++++------- appengine/flexible/tasks/main.py | 2 ++ tasks/pull_queue_snippets.py | 19 ++++++++++- 4 files changed, 43 insertions(+), 15 deletions(-) diff --git a/appengine/flexible/tasks/README.md b/appengine/flexible/tasks/README.md index 60f4b69e3d31..ffab1069bd47 100644 --- a/appengine/flexible/tasks/README.md +++ b/appengine/flexible/tasks/README.md @@ -12,8 +12,8 @@ App Engine queues push tasks to an App Engine HTTP target. This directory contains both the App Engine app to deploy, as well as the snippets to run locally to push tasks to it, which could also be called on App Engine. -`app_engine_queue_snippets.py` is a simple command-line program to create tasks -to be pushed to the App Engine app. +`create_app_engine_queue_task.py` is a simple command-line program to create +tasks to be pushed to the App Engine app. `main.py` is the main App Engine app. This app serves as an endpoint to receive App Engine task attempts. diff --git a/appengine/flexible/tasks/create_app_engine_queue_task.py b/appengine/flexible/tasks/create_app_engine_queue_task.py index 66cac1ffb56a..150ec60a5398 100644 --- a/appengine/flexible/tasks/create_app_engine_queue_task.py +++ b/appengine/flexible/tasks/create_app_engine_queue_task.py @@ -20,12 +20,7 @@ import json -def seconds_from_now_to_rfc3339_datetime(seconds): - """Return an RFC 3339 datetime string for a number of seconds from now.""" - d = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds) - return d.isoformat('T') + 'Z' - - +# [START cloud_tasks_appengine_create_task] def create_task(project, queue, location, payload=None, in_seconds=None): """Create a task for a given queue with an arbitrary payload.""" @@ -34,10 +29,11 @@ def create_task(project, queue, location, payload=None, in_seconds=None): # Create a client. client = googleapiclient.discovery.build('cloudtasks', 'v2beta2') + # Construct the request body. url = '/log_payload' body = { 'task': { - 'app_engine_http_request': { + 'app_engine_http_request': { # Specify the type of request. 'http_method': 'POST', 'relative_url': url } @@ -45,25 +41,38 @@ def create_task(project, queue, location, payload=None, in_seconds=None): } if payload is not None: - # Payload is a string (unicode), and must be encoded for base64. - # The finished request body is JSON, which requires unicode. - body['task']['app_engine_http_request']['payload'] = base64.b64encode( - payload.encode()).decode() + # The API expects base64 encoding of the payload, so encode the unicode + # `payload` object into a byte string and base64 encode it. + base64_encoded_payload = base64.b64encode(payload.encode()) + + # The request body object will be emitted in JSON, which requires + # unicode objects, so convert the byte string to unicode, still base64. + converted_payload = base64_encoded_payload.decode() + + # Add the payload to the request. + body['task']['app_engine_http_request']['payload'] = converted_payload if in_seconds is not None: - scheduled_time = seconds_from_now_to_rfc3339_datetime(in_seconds) + # Convert "seconds from now" into an rfc3339 datetime string. + d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds) + scheduled_time = d.isoformat('T') + 'Z' + + # Add the rfc3339 datetime string to the request. body['task']['schedule_time'] = scheduled_time + # Construct the fully qualified queue name. queue_name = 'projects/{}/locations/{}/queues/{}'.format( project, location, queue) print('Sending task {}'.format(json.dumps(body))) + # Use the client to build and send the task. response = client.projects().locations().queues().tasks().create( parent=queue_name, body=body).execute() print('Created task {}'.format(response['name'])) return response +# [END cloud_tasks_appengine_create_task] if __name__ == '__main__': diff --git a/appengine/flexible/tasks/main.py b/appengine/flexible/tasks/main.py index 0bcf6fe0645c..174e8a3f7735 100644 --- a/appengine/flexible/tasks/main.py +++ b/appengine/flexible/tasks/main.py @@ -14,6 +14,7 @@ """App Engine app to serve as an endpoint for App Engine queue samples.""" +# [START cloud_tasks_appengine_quickstart] from flask import Flask, request app = Flask(__name__) @@ -25,6 +26,7 @@ def log_payload(): payload = request.get_data(as_text=True) or '(empty payload)' print('Received task with payload: {}'.format(payload)) return 'Printed task payload: {}'.format(payload) +# [END cloud_tasks_appengine_quickstart] @app.route('/') diff --git a/tasks/pull_queue_snippets.py b/tasks/pull_queue_snippets.py index 803849b24f89..c2c0c50b4069 100644 --- a/tasks/pull_queue_snippets.py +++ b/tasks/pull_queue_snippets.py @@ -24,6 +24,7 @@ import base64 +# [START cloud_tasks_create_task] def create_task(project, queue, location): """Create a task for a given queue with an arbitrary payload.""" @@ -32,25 +33,40 @@ def create_task(project, queue, location): # Create a client. client = googleapiclient.discovery.build('cloudtasks', 'v2beta2') + # Prepare the payload. payload = 'a message for the recipient' + + # The API expects base64 encoding of the payload, so encode the unicode + # `payload` object into a byte string and base64 encode it. + base64_encoded_payload = base64.b64encode(payload.encode()) + + # The request body object will be emitted in JSON, which requires + # unicode objects, so convert the byte string to unicode (still base64). + converted_payload = base64_encoded_payload.decode() + + # Construct the request body. task = { 'task': { 'pull_message': { - 'payload': base64.b64encode(payload.encode()).decode() + 'payload': converted_payload } } } + # Construct the fully qualified queue name. queue_name = 'projects/{}/locations/{}/queues/{}'.format( project, location, queue) + # Use the client to build and send the task. response = client.projects().locations().queues().tasks().create( parent=queue_name, body=task).execute() print('Created task {}'.format(response['name'])) return response +# [END cloud_tasks_create_task] +# [START cloud_tasks_pull_task] def pull_task(project, queue, location): """Pull a single task from a given queue and lease it for 10 minutes.""" @@ -74,6 +90,7 @@ def pull_task(project, queue, location): print('Pulled task {}'.format(response)) return response['tasks'][0] +# [END cloud_tasks_pull_task] def acknowledge_task(task):