From 5fc5b30073d63a7848863515180456216dd63d15 Mon Sep 17 00:00:00 2001 From: ansh0l Date: Wed, 29 Jun 2022 11:57:03 +0530 Subject: [PATCH 1/6] chore: correct skip backup tests env variable (#753) --- tests/system/_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/_helpers.py b/tests/system/_helpers.py index 0cb00b15ff..51a6d773c4 100644 --- a/tests/system/_helpers.py +++ b/tests/system/_helpers.py @@ -31,7 +31,7 @@ INSTANCE_ID = os.environ.get(INSTANCE_ID_ENVVAR, INSTANCE_ID_DEFAULT) SKIP_BACKUP_TESTS_ENVVAR = "SKIP_BACKUP_TESTS" -SKIP_BACKUP_TESTS = True # os.getenv(SKIP_BACKUP_TESTS_ENVVAR) == True +SKIP_BACKUP_TESTS = os.getenv(SKIP_BACKUP_TESTS_ENVVAR) is not None INSTANCE_OPERATION_TIMEOUT_IN_SECONDS = int( os.getenv("SPANNER_INSTANCE_OPERATION_TIMEOUT_IN_SECONDS", 560) From 7f22a94945c2e15bdc80af21454443f812fc8e0d Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Wed, 29 Jun 2022 14:53:19 +0530 Subject: [PATCH 2/6] async_client:add file for sample asynchronous queries of async client --- samples/samples/async_client_sample.py | 648 +++++++++++++++++++++++++ 1 file changed, 648 insertions(+) create mode 100644 samples/samples/async_client_sample.py diff --git a/samples/samples/async_client_sample.py b/samples/samples/async_client_sample.py new file mode 100644 index 0000000000..a74e9124bd --- /dev/null +++ b/samples/samples/async_client_sample.py @@ -0,0 +1,648 @@ +# -*- coding: utf-8 -*- +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This application demonstrates how to do basic asynchronous operations using Cloud Spanner. +""" + +from asyncore import readwrite +from curses import meta +from email.message import Message +from importlib.metadata import metadata +from inspect import trace +from google.cloud.spanner_v1.services import spanner +from google.cloud.spanner_v1 import Client +from google.cloud.spanner_v1 import session +from numpy import partition +from google.cloud.spanner_v1.types import CreateSessionRequest +from google.cloud.spanner_v1.types import GetSessionRequest +from google.cloud.spanner_v1.types import ListSessionsRequest +from google.cloud.spanner_v1.types import ExecuteSqlRequest +from google.cloud.spanner_v1.types import ReadRequest +from google.cloud.spanner_v1.types import ExecuteBatchDmlRequest +from google.cloud.spanner_v1.types import BeginTransactionRequest +from google.cloud.spanner_v1.types import TransactionOptions +from google.cloud.spanner_v1.types import CommitRequest +from google.cloud.spanner_v1.types import RollbackRequest +from google.cloud.spanner_v1.types import PartitionQueryRequest +from google.cloud.spanner_v1.types import PartitionReadRequest +from google.cloud.spanner_v1.types import TransactionSelector +from google.cloud.spanner_v1.types import BatchCreateSessionsRequest +from google.cloud.spanner_v1.types import DeleteSessionRequest +import asyncio +from google.cloud.spanner_v1 import keyset +from google.cloud.spanner_v1 import transaction +import time +import tracemalloc +import argparse + + +# [ FUNCTION for creating async client and spanner client object] +def getObjects(instance_id, database_id): + # Create async client object + async_client = spanner.SpannerAsyncClient() + + # Creates spanner client object + spanner_client = Client() + + # Creates the instance object associated with instance id + instance = spanner_client.instance(instance_id) + + # Creates the database object associated with instance object + database_object = instance.database(database_id) + + # Returns async client object and database object + return async_client, database_object + +# [START async client partition read] +async def partition_read(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Request object for Partition Read.""" + request = PartitionReadRequest( + session = session_object.name, + transaction = TransactionSelector( + begin = TransactionOptions( + read_only = TransactionOptions.ReadOnly( + strong = True, + ), + ), + ), + table = 'Singers', + key_set = keyset.KeySet(all_=True)._to_pb(), + ) + + """Execute Partition Read.""" + response = await async_client.partition_read(request = request) + + """Prints the response.""" + print(response) + + print("\nEnding task: ", task_num) +# [END async client partition read] + +# [START async client partition query] +async def partition_query(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Request object for Partition Query.""" + request = PartitionQueryRequest( + session = session_object.name, + transaction = TransactionSelector( + begin = TransactionOptions( + read_only = TransactionOptions.ReadOnly( + strong = True, + ), + ), + ), + sql = "UPDATE Singers SET FirstName = 'Trivedi' WHERE SingerId = 1003", + ) + + """Partition the sql statement query.""" + response = await async_client.partition_query(request = request) + + """Prints the response.""" + print(response) + + print("\nEnding task: ", task_num) +# [END async client partition query] + +# [START async client rollback] +async def rollback(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Transaction object associated with the session object""" + transaction_object = transaction.Transaction(session_object) + + """Begin Transaction""" + transaction_id_blob = transaction_object.begin() + + """Request object for Rollback.""" + request = RollbackRequest( + session = session_object.name, + transaction_id = transaction_id_blob, + ) + + """Rolled back the Transaction.""" + await async_client.rollback(request = request) + + print("\nEnding task: ", task_num) +# [END async client rollback] + +# [START async client commit] +async def commit(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Transaction object associated with the session object""" + transaction_object = transaction.Transaction(session_object) + + """Begin Transaction""" + transaction_id_blob = transaction_object.begin() + + """Request object for Commit.""" + request = CommitRequest( + transaction_id = transaction_id_blob, + session = session_object.name + ) + + """Commit the transaction""" + response = await async_client.commit(request = request) + + """Prints the commit transaction response.""" + print(response) + + print("\nEnding task: ", task_num) +# [END async client commit] + +# [START async client begin transaction] +async def begin_transaction(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Read write transaction object.""" + transactionOption_object = TransactionOptions( + read_write = TransactionOptions.ReadWrite() + ) + + """Request object for begin transaction.""" + request = BeginTransactionRequest( + session = session_object.name, + options = transactionOption_object, + ) + + """Begins Transaction.""" + response = await async_client.begin_transaction(request = request) + + """Prints the response.""" + print(response) + + print("\nEnding task: ", task_num) + print("\n") +# [END async client begin transaction] + +# [START async client streaming read] +async def streaming_read(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Request object for read.""" + request = ReadRequest( + session = session_object.name, + table = 'Singers', + columns = ["SingerId", "FirstName", "LastName"], + key_set = keyset.KeySet(all_=True)._to_pb(), + ) + + """Executes streaming read and get the response in stream""" + stream = await async_client.streaming_read(request = request) + + """Prints the stream response.""" + async for response in stream: + print(response) + + print("\nEnding task: ", task_num) +# [END async client streaming read] + +# [START async client execute batch dml] +async def execute_batch_dml(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + statements = ExecuteBatchDmlRequest.Statement( + sql = "INSERT INTO Singers (SingerId, FirstName, LastName) Values (1003, 'Alka', 'Trivedi')", + ), + + """Request object for execute batch dml.""" + request = ExecuteBatchDmlRequest( + session = session_object.name, + transaction = TransactionSelector( + begin = TransactionOptions( + read_write = TransactionOptions.ReadWrite(), + ), + ), + statements = statements, + seqno=550, + ) + + """Executes batch dml""" + response = await async_client.execute_batch_dml(request = request) + + """Prints the response.""" + print(response) + + print("\nEnding task: ", task_num) +# [END async client execute batch dml] + +# [START async client execute streaming sql] +async def execute_streaming_sql(database_object, async_client, task_num): + + print("\nstarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Request object for execute sql.""" + request = ExecuteSqlRequest( + session = session_object.name, + sql = "SELECT SingerId, FirstName, LastName FROM Singers", + ) + + """Execute the sql statement and get the response in a stream""" + stream = await async_client.execute_streaming_sql(request = request) + + """Prints the streaming response.""" + async for response in stream: + print(response) + + print("\nEnding task: ", task_num) +# [END async client execute streaming sql] + +# [START async client read] +async def read(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + + """Request object for read.""" + request = ReadRequest( + session = session_object.name, + table = 'Singers', + columns = ["SingerId", "FirstName", "LastName"], + key_set = keyset.KeySet(all_=True)._to_pb(), + ) + + """Reads the columns data from the table""" + response = await async_client.read(request = request) + + """Prints the response.""" + print(response) + + print("\nEnding task: ", task_num) +# [END async client read] + + +# [START async client execute_sql] +async def execute_sql(database_object, async_client, task_num): + print("\nstarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Request object for execute sql.""" + request = ExecuteSqlRequest( + session = session_object.name, + sql = "SELECT SingerId, FirstName, LastName FROM Singers", + ) + + """Execute the sql statement.""" + response = await async_client.execute_sql(request = request) + + """Prints the response.""" + print(response) + + print("\nEnding task: ", task_num) + +# [END async client execute_sql] + + +# [START async client delete_session] +async def delete_session(session_name, async_client, task_num): + + print("\nStarting task: ", task_num) + + """Request object for delete session.""" + request = DeleteSessionRequest( + name=session_name, + ) + + """Delete the session.""" + await async_client.delete_session(request = request) + + print("\nEnding task: ", task_num) + +# [END async client delete_session] + +# [START async client list_sessions] +async def list_sessions(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + """Request object for list session.""" + request = ListSessionsRequest( + database = database_object.name, + ) + + """Get the list of sessions.""" + response = await async_client.list_sessions(request = request) + + """Prints the session.""" + print(response) + + print("\nEnding task: ", task_num) + +# [END async client get_session] + + +# [START async client get_session] +async def get_session(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + # Create the session object associated with the database object + session_object = session.Session(database_object) + + """Create the session.""" + session_object.create() + + """Request object for get session.""" + request = GetSessionRequest( + name = session_object.name, + ) + + """Get the session.""" + response = await async_client.get_session(request = request) + + """Prints the session got.""" + print(response) + + print("\nEnding task: ", task_num) + +# [END async client get_session] + + +# [START async client batch_create_session] +async def batch_create_session(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + """Request object for batch create session.""" + request = BatchCreateSessionsRequest( + database = database_object.name, + session_count=1420, + ) + + """Creates batch sessions.""" + response = await async_client.batch_create_sessions(request = request) + + """Prints the sessions batchwise.""" + print(response) + + print("\nEnding task: ", task_num) + +# [END async client batch_create_session] + + +# [START async client create_session] +async def create_session(database_object, async_client, task_num): + + print("\nStarting task: ", task_num) + + """Request object for create session.""" + request = CreateSessionRequest( + database = database_object.name, + ) + + """Creates a session.""" + response = await async_client.create_session(request = request) + + """Prints the session.""" + print(response) + + print("\nEnding task: ", task_num) + +# [END async client create_session] + + +async def main(): + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument("--instance_id", help="Your Cloud Spanner instance ID.") + parser.add_argument( + "--database_id", help="Your Cloud Spanner database ID.", default="example_db" + ) + parser.add_argument( + "--session_name", help="session name." + ) + parser.add_argument( + "--n", help="number of queries." + ) + subparsers = parser.add_subparsers(dest="command") + subparsers.add_parser( + "create_session", + help=create_session.__doc__ + ) + subparsers.add_parser( + "batch_create_session", + help=batch_create_session.__doc__ + ) + subparsers.add_parser( + "get_session", + help=get_session.__doc__ + ) + subparsers.add_parser( + "list_sessions", + help=list_sessions.__doc__ + ) + subparsers.add_parser( + "delete_session", + help=delete_session.__doc__ + ) + subparsers.add_parser( + "execute_sql", + help=execute_sql.__doc__ + ) + subparsers.add_parser( + "execute_streaming_sql", + help=execute_streaming_sql.__doc__ + ) + subparsers.add_parser( + "execute_batch_dml", + help=execute_batch_dml.__doc__ + ) + subparsers.add_parser( + "read", + help=read.__doc__ + ) + subparsers.add_parser( + "streaming_read", + help=streaming_read.__doc__ + ) + subparsers.add_parser( + "begin_transaction", + help=begin_transaction.__doc__ + ) + subparsers.add_parser( + "commit", + help=commit.__doc__ + ) + subparsers.add_parser( + "rollback", + help=rollback.__doc__ + ) + subparsers.add_parser( + "partition_query", + help=partition_query.__doc__ + ) + subparsers.add_parser( + "partition_read", + help=partition_read.__doc__ + ) + args = parser.parse_args() + + async_client, database_object = getObjects(args.instance_id, args.database_id) + + if args.command == "create_session": + await asyncio.gather( + *[create_session(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "batch_create_session": + await asyncio.gather( + *[batch_create_session(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "get_session": + await asyncio.gather( + *[get_session(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "list_sessions": + await asyncio.gather( + *[list_sessions(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "delete_session": + await asyncio.gather( + *[delete_session(args.session_name, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "execute_sql": + await asyncio.gather( + *[execute_sql(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "execute_streaming_sql": + await asyncio.gather( + *[execute_streaming_sql(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "execute_batch_dml": + await asyncio.gather( + *[execute_batch_dml(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "read": + await asyncio.gather( + *[read(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "streaming_read": + await asyncio.gather( + *[streaming_read(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "begin_transaction": + await asyncio.gather( + *[begin_transaction(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "commit": + await asyncio.gather( + *[commit(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "rollback": + await asyncio.gather( + *[rollback(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "partition_query": + await asyncio.gather( + *[partition_query(database_object, async_client, task) for task in range(int(args.n))] + ) + elif args.command == "partition_read": + await asyncio.gather( + *[partition_read(database_object, async_client, task) for task in range(int(args.n))] + ) + + +if __name__ == "__main__": + + # Assigned start time of the execution to startTime + startTime = time.time() + + # Started keeping track of memory taken during execution + tracemalloc.start() + + # Called the main function + asyncio.run(main()) + + # Assigned end time of the execution to endTime + endTime = time.time() + + # Prints the total execution time + print('total time taken: ' + str(endTime-startTime) + ' seconds') + + # Prints the total memory taken + print('total memory taken(current, peak): ', tracemalloc.get_traced_memory()) + + # Stopped keeping track of memory taken during execution + tracemalloc.stop() \ No newline at end of file From 78492cfa39382a5d77d0ecfa9a4bb43198d5be14 Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Thu, 7 Jul 2022 15:31:33 +0530 Subject: [PATCH 3/6] modify imports and comments --- samples/samples/async_client_sample.py | 395 ++++++++++++------------- 1 file changed, 192 insertions(+), 203 deletions(-) diff --git a/samples/samples/async_client_sample.py b/samples/samples/async_client_sample.py index a74e9124bd..2e57ad375c 100644 --- a/samples/samples/async_client_sample.py +++ b/samples/samples/async_client_sample.py @@ -18,67 +18,63 @@ This application demonstrates how to do basic asynchronous operations using Cloud Spanner. """ -from asyncore import readwrite -from curses import meta -from email.message import Message -from importlib.metadata import metadata -from inspect import trace -from google.cloud.spanner_v1.services import spanner -from google.cloud.spanner_v1 import Client -from google.cloud.spanner_v1 import session -from numpy import partition -from google.cloud.spanner_v1.types import CreateSessionRequest -from google.cloud.spanner_v1.types import GetSessionRequest -from google.cloud.spanner_v1.types import ListSessionsRequest -from google.cloud.spanner_v1.types import ExecuteSqlRequest -from google.cloud.spanner_v1.types import ReadRequest -from google.cloud.spanner_v1.types import ExecuteBatchDmlRequest -from google.cloud.spanner_v1.types import BeginTransactionRequest -from google.cloud.spanner_v1.types import TransactionOptions -from google.cloud.spanner_v1.types import CommitRequest -from google.cloud.spanner_v1.types import RollbackRequest -from google.cloud.spanner_v1.types import PartitionQueryRequest -from google.cloud.spanner_v1.types import PartitionReadRequest -from google.cloud.spanner_v1.types import TransactionSelector -from google.cloud.spanner_v1.types import BatchCreateSessionsRequest -from google.cloud.spanner_v1.types import DeleteSessionRequest + import asyncio -from google.cloud.spanner_v1 import keyset -from google.cloud.spanner_v1 import transaction -import time +from google.cloud.spanner_v1 import ( + keyset, + transaction, + Client, + session +) +from google.cloud.spanner_v1.types import ( + CreateSessionRequest, + BatchCreateSessionsRequest, + GetSessionRequest, + ListSessionsRequest, + DeleteSessionRequest, + ExecuteSqlRequest, + ExecuteBatchDmlRequest, + ReadRequest, + TransactionOptions, + BeginTransactionRequest, + CommitRequest, + RollbackRequest, + TransactionSelector, + PartitionQueryRequest, + PartitionReadRequest +) +from google.cloud.spanner_v1.services import spanner import tracemalloc import argparse +import time # [ FUNCTION for creating async client and spanner client object] -def getObjects(instance_id, database_id): - # Create async client object +def get_async_client_and_associate_database(instance_id, database_id): + async_client = spanner.SpannerAsyncClient() - # Creates spanner client object spanner_client = Client() - # Creates the instance object associated with instance id instance = spanner_client.instance(instance_id) - # Creates the database object associated with instance object database_object = instance.database(database_id) - # Returns async client object and database object return async_client, database_object -# [START async client partition read] -async def partition_read(database_object, async_client, task_num): - print("\nStarting task: ", task_num) +# [START async_client_partition_read] +async def partition_read(database_object, async_client, task_id): + + """Creates a set of partition tokens that can be + used to execute a read operation in parallel.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Request object for Partition Read.""" request = PartitionReadRequest( session = session_object.name, transaction = TransactionSelector( @@ -92,27 +88,27 @@ async def partition_read(database_object, async_client, task_num): key_set = keyset.KeySet(all_=True)._to_pb(), ) - """Execute Partition Read.""" response = await async_client.partition_read(request = request) - """Prints the response.""" print(response) - print("\nEnding task: ", task_num) -# [END async client partition read] + print("\nEnding task: ", task_id) + +# [END async_client_partition_read] -# [START async client partition query] -async def partition_query(database_object, async_client, task_num): - print("\nStarting task: ", task_num) +# [START async_client_partition_query] +async def partition_query(database_object, async_client, task_id): + + """Creates a set of partition tokens that can be + used to execute a query operation in parallel.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Request object for Partition Query.""" request = PartitionQueryRequest( session = session_object.name, transaction = TransactionSelector( @@ -122,123 +118,117 @@ async def partition_query(database_object, async_client, task_num): ), ), ), - sql = "UPDATE Singers SET FirstName = 'Trivedi' WHERE SingerId = 1003", + sql = "UPDATE Singers SET FirstName = 'William' WHERE SingerId = 1003", ) - """Partition the sql statement query.""" response = await async_client.partition_query(request = request) - """Prints the response.""" print(response) - print("\nEnding task: ", task_num) -# [END async client partition query] + print("\nEnding task: ", task_id) + +# [END async_client_partition_query] -# [START async client rollback] -async def rollback(database_object, async_client, task_num): - print("\nStarting task: ", task_num) +# [START async_client_rollback] +async def rollback(database_object, async_client, task_id): + + """Rollbacks a transaction, releases any locks it holds.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Transaction object associated with the session object""" transaction_object = transaction.Transaction(session_object) - """Begin Transaction""" transaction_id_blob = transaction_object.begin() - """Request object for Rollback.""" request = RollbackRequest( session = session_object.name, transaction_id = transaction_id_blob, ) - """Rolled back the Transaction.""" await async_client.rollback(request = request) - print("\nEnding task: ", task_num) -# [END async client rollback] + print("\nEnding task: ", task_id) + +# [END async_client_rollback] -# [START async client commit] -async def commit(database_object, async_client, task_num): - print("\nStarting task: ", task_num) +# [START async_client_commit] +async def commit(database_object, async_client, task_id): + + """Commits a transaction. The request includes the mutations to be + applied to rows in the database.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Transaction object associated with the session object""" transaction_object = transaction.Transaction(session_object) - """Begin Transaction""" transaction_id_blob = transaction_object.begin() - """Request object for Commit.""" request = CommitRequest( transaction_id = transaction_id_blob, session = session_object.name ) - """Commit the transaction""" response = await async_client.commit(request = request) - """Prints the commit transaction response.""" print(response) - print("\nEnding task: ", task_num) -# [END async client commit] + print("\nEnding task: ", task_id) + +# [END async_client_commit] + -# [START async client begin transaction] -async def begin_transaction(database_object, async_client, task_num): +# [START async_client_begin_transaction] +async def begin_transaction(database_object, async_client, task_id): - print("\nStarting task: ", task_num) + """Begins a new transaction.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Read write transaction object.""" - transactionOption_object = TransactionOptions( + txn_options = TransactionOptions( read_write = TransactionOptions.ReadWrite() ) - """Request object for begin transaction.""" request = BeginTransactionRequest( session = session_object.name, - options = transactionOption_object, + options = txn_options, ) - """Begins Transaction.""" response = await async_client.begin_transaction(request = request) - """Prints the response.""" print(response) - print("\nEnding task: ", task_num) + print("\nEnding task: ", task_id) print("\n") -# [END async client begin transaction] -# [START async client streaming read] -async def streaming_read(database_object, async_client, task_num): +# [END async_client_begin_transaction] + + +# [START async_client_streaming_read] +async def streaming_read(database_object, async_client, task_id): - print("\nStarting task: ", task_num) + """Like [Read][google.cloud.spanner_v1.services.Spanner.Read], + except returns the result set as a stream.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Request object for read.""" request = ReadRequest( session = session_object.name, table = 'Singers', @@ -246,31 +236,35 @@ async def streaming_read(database_object, async_client, task_num): key_set = keyset.KeySet(all_=True)._to_pb(), ) - """Executes streaming read and get the response in stream""" stream = await async_client.streaming_read(request = request) - """Prints the stream response.""" async for response in stream: print(response) - print("\nEnding task: ", task_num) -# [END async client streaming read] + print("\nEnding task: ", task_id) + +# [END async_client_streaming_read] + -# [START async client execute batch dml] -async def execute_batch_dml(database_object, async_client, task_num): +# [START async_client_execute_batch_dml] +async def execute_batch_dml(database_object, async_client, task_id): - print("\nStarting task: ", task_num) + """Executes a batch of SQL DML statements.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() + statements = ExecuteBatchDmlRequest.Statement( - sql = "INSERT INTO Singers (SingerId, FirstName, LastName) Values (1003, 'Alka', 'Trivedi')", + sql = ( + "INSERT INTO Singers" + "(SingerId, FirstName, LastName)" + "Values (1003, 'Steven', 'Smith')" + ), ), - """Request object for execute batch dml.""" request = ExecuteBatchDmlRequest( session = session_object.name, transaction = TransactionSelector( @@ -282,55 +276,55 @@ async def execute_batch_dml(database_object, async_client, task_num): seqno=550, ) - """Executes batch dml""" response = await async_client.execute_batch_dml(request = request) - """Prints the response.""" print(response) - print("\nEnding task: ", task_num) -# [END async client execute batch dml] + print("\nEnding task: ", task_id) + +# [END async_client_execute_batch_dml] -# [START async client execute streaming sql] -async def execute_streaming_sql(database_object, async_client, task_num): - print("\nstarting task: ", task_num) +# [START async_client_execute_streaming_sql] +async def execute_streaming_sql(database_object, async_client, task_id): + + """Like [ExecuteSql][google.cloud.spanner_v1.services.Spanner.ExecuteSql], + except returns the result set as a stream.""" + + print("\nstarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Request object for execute sql.""" request = ExecuteSqlRequest( session = session_object.name, sql = "SELECT SingerId, FirstName, LastName FROM Singers", ) - """Execute the sql statement and get the response in a stream""" stream = await async_client.execute_streaming_sql(request = request) - """Prints the streaming response.""" async for response in stream: print(response) - print("\nEnding task: ", task_num) -# [END async client execute streaming sql] + print("\nEnding task: ", task_id) + +# [END async_client_execute_streaming_sql] -# [START async client read] -async def read(database_object, async_client, task_num): - print("\nStarting task: ", task_num) +# [START async_client_read] +async def read(database_object, async_client, task_id): + + """Reads rows from the database using key lookups and scans, + as a simple key/value style alternative to + [ExecuteSql][google.cloud.spanner_v1.services.Spanner.ExecuteSql]""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - - """Request object for read.""" request = ReadRequest( session = session_object.name, table = 'Singers', @@ -338,164 +332,165 @@ async def read(database_object, async_client, task_num): key_set = keyset.KeySet(all_=True)._to_pb(), ) - """Reads the columns data from the table""" response = await async_client.read(request = request) - """Prints the response.""" print(response) - print("\nEnding task: ", task_num) -# [END async client read] + print("\nEnding task: ", task_id) +# [END async_client_read] -# [START async client execute_sql] -async def execute_sql(database_object, async_client, task_num): - print("\nstarting task: ", task_num) - # Create the session object associated with the database object +# [START async_client_execute_sql] +async def execute_sql(database_object, async_client, task_id): + + """Executes an SQL statement, returns all results in a single + reply. This method cannot be used to return a result set larger + than 10 MiB; if the query yields more data than that, the query + fails with a ``FAILED_PRECONDITION`` error.""" + + print("\nstarting task: ", task_id) + session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Request object for execute sql.""" request = ExecuteSqlRequest( session = session_object.name, sql = "SELECT SingerId, FirstName, LastName FROM Singers", ) - """Execute the sql statement.""" response = await async_client.execute_sql(request = request) - """Prints the response.""" print(response) - print("\nEnding task: ", task_num) + print("\nEnding task: ", task_id) -# [END async client execute_sql] +# [END async_client_execute_sql] -# [START async client delete_session] -async def delete_session(session_name, async_client, task_num): +# [START async_client_delete_session] +async def delete_session(session_name, async_client, task_id): - print("\nStarting task: ", task_num) + """Ends a session, releasing server resources associated + with it. This will asynchronously trigger cancellation + of any operations that are running with this session.""" + + print("\nStarting task: ", task_id) - """Request object for delete session.""" request = DeleteSessionRequest( name=session_name, ) - """Delete the session.""" await async_client.delete_session(request = request) - print("\nEnding task: ", task_num) + print("\nEnding task: ", task_id) # [END async client delete_session] -# [START async client list_sessions] -async def list_sessions(database_object, async_client, task_num): - print("\nStarting task: ", task_num) +# [START async_client_list_sessions] +async def list_sessions(database_object, async_client, task_id): + + """Lists all sessions in a given database.""" + + print("\nStarting task: ", task_id) - """Request object for list session.""" request = ListSessionsRequest( database = database_object.name, ) - """Get the list of sessions.""" response = await async_client.list_sessions(request = request) - """Prints the session.""" print(response) - print("\nEnding task: ", task_num) + print("\nEnding task: ", task_id) -# [END async client get_session] +# [END async_client_get_session] -# [START async client get_session] -async def get_session(database_object, async_client, task_num): +# [START async_client_get_session] +async def get_session(database_object, async_client, task_id): - print("\nStarting task: ", task_num) + """Gets a session. Prints ``NOT_FOUND`` if the session does not + exist. This is mainly useful for determining whether a session + is still alive.""" + + print("\nStarting task: ", task_id) - # Create the session object associated with the database object session_object = session.Session(database_object) - """Create the session.""" session_object.create() - """Request object for get session.""" request = GetSessionRequest( name = session_object.name, ) - """Get the session.""" response = await async_client.get_session(request = request) - """Prints the session got.""" print(response) - print("\nEnding task: ", task_num) + print("\nEnding task: ", task_id) -# [END async client get_session] +# [END async_client_get_session] -# [START async client batch_create_session] -async def batch_create_session(database_object, async_client, task_num): +# [START async_client_batch_create_session] +async def batch_create_sessions(database_object, async_client, task_id): - print("\nStarting task: ", task_num) + """Creates multiple new sessions.""" - """Request object for batch create session.""" + print("\nStarting task: ", task_id) + request = BatchCreateSessionsRequest( database = database_object.name, - session_count=1420, + session_count=1420, # The number of sessions to be created in this batch call ) - """Creates batch sessions.""" response = await async_client.batch_create_sessions(request = request) - """Prints the sessions batchwise.""" print(response) - print("\nEnding task: ", task_num) + print("\nEnding task: ", task_id) + +# [END async_client_batch_create_session] -# [END async client batch_create_session] +# [START async_client_create_session] +async def create_session(database_object, async_client, task_id): -# [START async client create_session] -async def create_session(database_object, async_client, task_num): + """Creates a new session.""" - print("\nStarting task: ", task_num) + print("\nStarting task: ", task_id) - """Request object for create session.""" request = CreateSessionRequest( database = database_object.name, ) - """Creates a session.""" response = await async_client.create_session(request = request) - - """Prints the session.""" + print(response) - print("\nEnding task: ", task_num) + print("\nEnding task: ", task_id) -# [END async client create_session] +# [END async_client_create_session] async def main(): parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) - parser.add_argument("--instance_id", help="Your Cloud Spanner instance ID.") parser.add_argument( - "--database_id", help="Your Cloud Spanner database ID.", default="example_db" + "--instance_id", help="Your Cloud Spanner instance ID." ) parser.add_argument( - "--session_name", help="session name." + "--database_id", help="Your Cloud Spanner database ID." ) parser.add_argument( - "--n", help="number of queries." + "--session_name", help="Session name." + ) + parser.add_argument( + "--n", help="Number of queries you want to make." ) subparsers = parser.add_subparsers(dest="command") subparsers.add_parser( @@ -503,8 +498,8 @@ async def main(): help=create_session.__doc__ ) subparsers.add_parser( - "batch_create_session", - help=batch_create_session.__doc__ + "batch_create_sessions", + help=batch_create_sessions.__doc__ ) subparsers.add_parser( "get_session", @@ -560,15 +555,16 @@ async def main(): ) args = parser.parse_args() - async_client, database_object = getObjects(args.instance_id, args.database_id) + async_client, database_object = get_async_client_and_associate_database(args.instance_id, args.database_id) + if args.command == "create_session": await asyncio.gather( *[create_session(database_object, async_client, task) for task in range(int(args.n))] ) - elif args.command == "batch_create_session": + elif args.command == "batch_create_sessions": await asyncio.gather( - *[batch_create_session(database_object, async_client, task) for task in range(int(args.n))] + *[batch_create_sessions(database_object, async_client, task) for task in range(int(args.n))] ) elif args.command == "get_session": await asyncio.gather( @@ -625,24 +621,17 @@ async def main(): if __name__ == "__main__": - - # Assigned start time of the execution to startTime + startTime = time.time() - # Started keeping track of memory taken during execution tracemalloc.start() - # Called the main function asyncio.run(main()) - # Assigned end time of the execution to endTime endTime = time.time() - # Prints the total execution time - print('total time taken: ' + str(endTime-startTime) + ' seconds') + print('Total time taken: ' + str(endTime-startTime) + ' seconds') - # Prints the total memory taken - print('total memory taken(current, peak): ', tracemalloc.get_traced_memory()) + print('Total memory taken(current, peak): ' + str(tracemalloc.get_traced_memory()) + ' KiB') - # Stopped keeping track of memory taken during execution tracemalloc.stop() \ No newline at end of file From 4ca060fe6ab82ae8766ede1981e539be0e1a87b9 Mon Sep 17 00:00:00 2001 From: alkatrivedi <58396306+alkatrivedi@users.noreply.github.com> Date: Thu, 14 Jul 2022 21:29:25 +0530 Subject: [PATCH 4/6] Delete _helpers.py --- tests/system/_helpers.py | 134 --------------------------------------- 1 file changed, 134 deletions(-) delete mode 100644 tests/system/_helpers.py diff --git a/tests/system/_helpers.py b/tests/system/_helpers.py deleted file mode 100644 index 51a6d773c4..0000000000 --- a/tests/system/_helpers.py +++ /dev/null @@ -1,134 +0,0 @@ -# Copyright 2021 Google LLC All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import operator -import os -import time - -from google.api_core import exceptions -from google.cloud.spanner_v1 import instance as instance_mod -from tests import _fixtures -from test_utils import retry -from test_utils import system - - -CREATE_INSTANCE_ENVVAR = "GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE" -CREATE_INSTANCE = os.getenv(CREATE_INSTANCE_ENVVAR) is not None - -INSTANCE_ID_ENVVAR = "GOOGLE_CLOUD_TESTS_SPANNER_INSTANCE" -INSTANCE_ID_DEFAULT = "google-cloud-python-systest" -INSTANCE_ID = os.environ.get(INSTANCE_ID_ENVVAR, INSTANCE_ID_DEFAULT) - -SKIP_BACKUP_TESTS_ENVVAR = "SKIP_BACKUP_TESTS" -SKIP_BACKUP_TESTS = os.getenv(SKIP_BACKUP_TESTS_ENVVAR) is not None - -INSTANCE_OPERATION_TIMEOUT_IN_SECONDS = int( - os.getenv("SPANNER_INSTANCE_OPERATION_TIMEOUT_IN_SECONDS", 560) -) -DATABASE_OPERATION_TIMEOUT_IN_SECONDS = int( - os.getenv("SPANNER_DATABASE_OPERATION_TIMEOUT_IN_SECONDS", 120) -) -BACKUP_OPERATION_TIMEOUT_IN_SECONDS = int( - os.getenv("SPANNER_BACKUP_OPERATION_TIMEOUT_IN_SECONDS", 1200) -) - -USE_EMULATOR_ENVVAR = "SPANNER_EMULATOR_HOST" -USE_EMULATOR = os.getenv(USE_EMULATOR_ENVVAR) is not None - -DATABASE_DIALECT_ENVVAR = "SPANNER_DATABASE_DIALECT" -DATABASE_DIALECT = os.getenv(DATABASE_DIALECT_ENVVAR) - -EMULATOR_PROJECT_ENVVAR = "GCLOUD_PROJECT" -EMULATOR_PROJECT_DEFAULT = "emulator-test-project" -EMULATOR_PROJECT = os.getenv(EMULATOR_PROJECT_ENVVAR, EMULATOR_PROJECT_DEFAULT) - - -DDL_STATEMENTS = ( - _fixtures.PG_DDL_STATEMENTS - if DATABASE_DIALECT == "POSTGRESQL" - else ( - _fixtures.EMULATOR_DDL_STATEMENTS if USE_EMULATOR else _fixtures.DDL_STATEMENTS - ) -) - -retry_true = retry.RetryResult(operator.truth) -retry_false = retry.RetryResult(operator.not_) - -retry_503 = retry.RetryErrors(exceptions.ServiceUnavailable) -retry_429_503 = retry.RetryErrors( - exceptions.TooManyRequests, exceptions.ServiceUnavailable, 8 -) -retry_mabye_aborted_txn = retry.RetryErrors(exceptions.ServerError, exceptions.Aborted) -retry_mabye_conflict = retry.RetryErrors(exceptions.ServerError, exceptions.Conflict) - - -def _has_all_ddl(database): - # Predicate to test for EC completion. - return len(database.ddl_statements) == len(DDL_STATEMENTS) - - -retry_has_all_dll = retry.RetryInstanceState(_has_all_ddl) - - -def scrub_referencing_databases(to_scrub, db_list): - for db_name in db_list: - db = to_scrub.database(db_name.split("/")[-1]) - try: - retry_429_503(db.delete)() - except exceptions.NotFound: # lost the race - pass - - -def scrub_instance_backups(to_scrub): - try: - for backup_pb in to_scrub.list_backups(): - # Backup cannot be deleted while referencing databases exist. - scrub_referencing_databases(to_scrub, backup_pb.referencing_databases) - bkp = instance_mod.Backup.from_pb(backup_pb, to_scrub) - try: - # Instance cannot be deleted while backups exist. - retry_429_503(bkp.delete)() - except exceptions.NotFound: # lost the race - pass - except exceptions.MethodNotImplemented: - # The CI emulator raises 501: local versions seem fine. - pass - - -def scrub_instance_ignore_not_found(to_scrub): - """Helper for func:`cleanup_old_instances`""" - scrub_instance_backups(to_scrub) - - try: - retry_429_503(to_scrub.delete)() - except exceptions.NotFound: # lost the race - pass - - -def cleanup_old_instances(spanner_client): - cutoff = int(time.time()) - 2 * 60 * 60 # two hour ago - instance_filter = "labels.python-spanner-systests:true" - - for instance_pb in spanner_client.list_instances(filter_=instance_filter): - instance = instance_mod.Instance.from_pb(instance_pb, spanner_client) - - if "created" in instance.labels: - create_time = int(instance.labels["created"]) - - if create_time <= cutoff: - scrub_instance_ignore_not_found(instance) - - -def unique_id(prefix, separator="-"): - return f"{prefix}{system.unique_resource_id(separator)}" From c53000fc14d2dcc001ccfa3b3f8884ceb8b3a438 Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Fri, 15 Jul 2022 13:09:06 +0530 Subject: [PATCH 5/6] Revert "Delete _helpers.py" This reverts commit 4ca060fe6ab82ae8766ede1981e539be0e1a87b9. --- tests/system/_helpers.py | 134 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 tests/system/_helpers.py diff --git a/tests/system/_helpers.py b/tests/system/_helpers.py new file mode 100644 index 0000000000..51a6d773c4 --- /dev/null +++ b/tests/system/_helpers.py @@ -0,0 +1,134 @@ +# Copyright 2021 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import operator +import os +import time + +from google.api_core import exceptions +from google.cloud.spanner_v1 import instance as instance_mod +from tests import _fixtures +from test_utils import retry +from test_utils import system + + +CREATE_INSTANCE_ENVVAR = "GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE" +CREATE_INSTANCE = os.getenv(CREATE_INSTANCE_ENVVAR) is not None + +INSTANCE_ID_ENVVAR = "GOOGLE_CLOUD_TESTS_SPANNER_INSTANCE" +INSTANCE_ID_DEFAULT = "google-cloud-python-systest" +INSTANCE_ID = os.environ.get(INSTANCE_ID_ENVVAR, INSTANCE_ID_DEFAULT) + +SKIP_BACKUP_TESTS_ENVVAR = "SKIP_BACKUP_TESTS" +SKIP_BACKUP_TESTS = os.getenv(SKIP_BACKUP_TESTS_ENVVAR) is not None + +INSTANCE_OPERATION_TIMEOUT_IN_SECONDS = int( + os.getenv("SPANNER_INSTANCE_OPERATION_TIMEOUT_IN_SECONDS", 560) +) +DATABASE_OPERATION_TIMEOUT_IN_SECONDS = int( + os.getenv("SPANNER_DATABASE_OPERATION_TIMEOUT_IN_SECONDS", 120) +) +BACKUP_OPERATION_TIMEOUT_IN_SECONDS = int( + os.getenv("SPANNER_BACKUP_OPERATION_TIMEOUT_IN_SECONDS", 1200) +) + +USE_EMULATOR_ENVVAR = "SPANNER_EMULATOR_HOST" +USE_EMULATOR = os.getenv(USE_EMULATOR_ENVVAR) is not None + +DATABASE_DIALECT_ENVVAR = "SPANNER_DATABASE_DIALECT" +DATABASE_DIALECT = os.getenv(DATABASE_DIALECT_ENVVAR) + +EMULATOR_PROJECT_ENVVAR = "GCLOUD_PROJECT" +EMULATOR_PROJECT_DEFAULT = "emulator-test-project" +EMULATOR_PROJECT = os.getenv(EMULATOR_PROJECT_ENVVAR, EMULATOR_PROJECT_DEFAULT) + + +DDL_STATEMENTS = ( + _fixtures.PG_DDL_STATEMENTS + if DATABASE_DIALECT == "POSTGRESQL" + else ( + _fixtures.EMULATOR_DDL_STATEMENTS if USE_EMULATOR else _fixtures.DDL_STATEMENTS + ) +) + +retry_true = retry.RetryResult(operator.truth) +retry_false = retry.RetryResult(operator.not_) + +retry_503 = retry.RetryErrors(exceptions.ServiceUnavailable) +retry_429_503 = retry.RetryErrors( + exceptions.TooManyRequests, exceptions.ServiceUnavailable, 8 +) +retry_mabye_aborted_txn = retry.RetryErrors(exceptions.ServerError, exceptions.Aborted) +retry_mabye_conflict = retry.RetryErrors(exceptions.ServerError, exceptions.Conflict) + + +def _has_all_ddl(database): + # Predicate to test for EC completion. + return len(database.ddl_statements) == len(DDL_STATEMENTS) + + +retry_has_all_dll = retry.RetryInstanceState(_has_all_ddl) + + +def scrub_referencing_databases(to_scrub, db_list): + for db_name in db_list: + db = to_scrub.database(db_name.split("/")[-1]) + try: + retry_429_503(db.delete)() + except exceptions.NotFound: # lost the race + pass + + +def scrub_instance_backups(to_scrub): + try: + for backup_pb in to_scrub.list_backups(): + # Backup cannot be deleted while referencing databases exist. + scrub_referencing_databases(to_scrub, backup_pb.referencing_databases) + bkp = instance_mod.Backup.from_pb(backup_pb, to_scrub) + try: + # Instance cannot be deleted while backups exist. + retry_429_503(bkp.delete)() + except exceptions.NotFound: # lost the race + pass + except exceptions.MethodNotImplemented: + # The CI emulator raises 501: local versions seem fine. + pass + + +def scrub_instance_ignore_not_found(to_scrub): + """Helper for func:`cleanup_old_instances`""" + scrub_instance_backups(to_scrub) + + try: + retry_429_503(to_scrub.delete)() + except exceptions.NotFound: # lost the race + pass + + +def cleanup_old_instances(spanner_client): + cutoff = int(time.time()) - 2 * 60 * 60 # two hour ago + instance_filter = "labels.python-spanner-systests:true" + + for instance_pb in spanner_client.list_instances(filter_=instance_filter): + instance = instance_mod.Instance.from_pb(instance_pb, spanner_client) + + if "created" in instance.labels: + create_time = int(instance.labels["created"]) + + if create_time <= cutoff: + scrub_instance_ignore_not_found(instance) + + +def unique_id(prefix, separator="-"): + return f"{prefix}{system.unique_resource_id(separator)}" From f69cff93b481cc2ff950305034839e7022185b73 Mon Sep 17 00:00:00 2001 From: Alka Trivedi Date: Mon, 18 Jul 2022 09:23:21 +0530 Subject: [PATCH 6/6] remove spaces and print statements --- samples/samples/async_client_sample.py | 203 ++++++------------------- 1 file changed, 46 insertions(+), 157 deletions(-) diff --git a/samples/samples/async_client_sample.py b/samples/samples/async_client_sample.py index 2e57ad375c..13704f1ffb 100644 --- a/samples/samples/async_client_sample.py +++ b/samples/samples/async_client_sample.py @@ -51,28 +51,20 @@ # [ FUNCTION for creating async client and spanner client object] def get_async_client_and_associate_database(instance_id, database_id): - async_client = spanner.SpannerAsyncClient() - spanner_client = Client() - instance = spanner_client.instance(instance_id) - database_object = instance.database(database_id) - return async_client, database_object # [START async_client_partition_read] -async def partition_read(database_object, async_client, task_id): +async def partition_read(database_object, async_client): """Creates a set of partition tokens that can be used to execute a read operation in parallel.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() request = PartitionReadRequest( @@ -87,26 +79,19 @@ async def partition_read(database_object, async_client, task_id): table = 'Singers', key_set = keyset.KeySet(all_=True)._to_pb(), ) - response = await async_client.partition_read(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_partition_read] # [START async_client_partition_query] -async def partition_query(database_object, async_client, task_id): +async def partition_query(database_object, async_client): """Creates a set of partition tokens that can be used to execute a query operation in parallel.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() request = PartitionQueryRequest( @@ -120,82 +105,60 @@ async def partition_query(database_object, async_client, task_id): ), sql = "UPDATE Singers SET FirstName = 'William' WHERE SingerId = 1003", ) - response = await async_client.partition_query(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_partition_query] # [START async_client_rollback] -async def rollback(database_object, async_client, task_id): +async def rollback(database_object, async_client): """Rollbacks a transaction, releases any locks it holds.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() transaction_object = transaction.Transaction(session_object) - transaction_id_blob = transaction_object.begin() request = RollbackRequest( session = session_object.name, transaction_id = transaction_id_blob, ) - await async_client.rollback(request = request) - print("\nEnding task: ", task_id) - # [END async_client_rollback] # [START async_client_commit] -async def commit(database_object, async_client, task_id): +async def commit(database_object, async_client): """Commits a transaction. The request includes the mutations to be applied to rows in the database.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() transaction_object = transaction.Transaction(session_object) - transaction_id_blob = transaction_object.begin() request = CommitRequest( transaction_id = transaction_id_blob, session = session_object.name ) - response = await async_client.commit(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_commit] # [START async_client_begin_transaction] -async def begin_transaction(database_object, async_client, task_id): +async def begin_transaction(database_object, async_client): """Begins a new transaction.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() txn_options = TransactionOptions( @@ -206,27 +169,19 @@ async def begin_transaction(database_object, async_client, task_id): session = session_object.name, options = txn_options, ) - response = await async_client.begin_transaction(request = request) - - print(response) - - print("\nEnding task: ", task_id) - print("\n") + return response # [END async_client_begin_transaction] # [START async_client_streaming_read] -async def streaming_read(database_object, async_client, task_id): +async def streaming_read(database_object, async_client): """Like [Read][google.cloud.spanner_v1.services.Spanner.Read], except returns the result set as a stream.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() request = ReadRequest( @@ -235,26 +190,21 @@ async def streaming_read(database_object, async_client, task_id): columns = ["SingerId", "FirstName", "LastName"], key_set = keyset.KeySet(all_=True)._to_pb(), ) - stream = await async_client.streaming_read(request = request) - + response_list = [] async for response in stream: - print(response) - - print("\nEnding task: ", task_id) + response_list.append(response) + return response_list # [END async_client_streaming_read] # [START async_client_execute_batch_dml] -async def execute_batch_dml(database_object, async_client, task_id): +async def execute_batch_dml(database_object, async_client): """Executes a batch of SQL DML statements.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() statements = ExecuteBatchDmlRequest.Statement( @@ -275,54 +225,41 @@ async def execute_batch_dml(database_object, async_client, task_id): statements = statements, seqno=550, ) - response = await async_client.execute_batch_dml(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_execute_batch_dml] # [START async_client_execute_streaming_sql] -async def execute_streaming_sql(database_object, async_client, task_id): +async def execute_streaming_sql(database_object, async_client): """Like [ExecuteSql][google.cloud.spanner_v1.services.Spanner.ExecuteSql], except returns the result set as a stream.""" - print("\nstarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() request = ExecuteSqlRequest( session = session_object.name, sql = "SELECT SingerId, FirstName, LastName FROM Singers", ) - stream = await async_client.execute_streaming_sql(request = request) - + response_list = [] async for response in stream: - print(response) - - print("\nEnding task: ", task_id) + response_list.append(response) # [END async_client_execute_streaming_sql] # [START async_client_read] -async def read(database_object, async_client, task_id): +async def read(database_object, async_client): """Reads rows from the database using key lookups and scans, as a simple key/value style alternative to [ExecuteSql][google.cloud.spanner_v1.services.Spanner.ExecuteSql]""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() request = ReadRequest( @@ -331,147 +268,106 @@ async def read(database_object, async_client, task_id): columns = ["SingerId", "FirstName", "LastName"], key_set = keyset.KeySet(all_=True)._to_pb(), ) - response = await async_client.read(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_read] # [START async_client_execute_sql] -async def execute_sql(database_object, async_client, task_id): +async def execute_sql(database_object, async_client): """Executes an SQL statement, returns all results in a single reply. This method cannot be used to return a result set larger than 10 MiB; if the query yields more data than that, the query fails with a ``FAILED_PRECONDITION`` error.""" - - print("\nstarting task: ", task_id) session_object = session.Session(database_object) - session_object.create() request = ExecuteSqlRequest( session = session_object.name, sql = "SELECT SingerId, FirstName, LastName FROM Singers", ) - response = await async_client.execute_sql(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_execute_sql] # [START async_client_delete_session] -async def delete_session(session_name, async_client, task_id): +async def delete_session(session_name, async_client): """Ends a session, releasing server resources associated with it. This will asynchronously trigger cancellation of any operations that are running with this session.""" - print("\nStarting task: ", task_id) - request = DeleteSessionRequest( name=session_name, ) - await async_client.delete_session(request = request) - print("\nEnding task: ", task_id) - # [END async client delete_session] # [START async_client_list_sessions] -async def list_sessions(database_object, async_client, task_id): +async def list_sessions(database_object, async_client): """Lists all sessions in a given database.""" - print("\nStarting task: ", task_id) - request = ListSessionsRequest( database = database_object.name, ) - response = await async_client.list_sessions(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_get_session] # [START async_client_get_session] -async def get_session(database_object, async_client, task_id): +async def get_session(database_object, async_client): """Gets a session. Prints ``NOT_FOUND`` if the session does not exist. This is mainly useful for determining whether a session is still alive.""" - print("\nStarting task: ", task_id) - session_object = session.Session(database_object) - session_object.create() request = GetSessionRequest( name = session_object.name, ) - response = await async_client.get_session(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_get_session] # [START async_client_batch_create_session] -async def batch_create_sessions(database_object, async_client, task_id): +async def batch_create_sessions(database_object, async_client): """Creates multiple new sessions.""" - - print("\nStarting task: ", task_id) request = BatchCreateSessionsRequest( database = database_object.name, session_count=1420, # The number of sessions to be created in this batch call ) - response = await async_client.batch_create_sessions(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_batch_create_session] # [START async_client_create_session] -async def create_session(database_object, async_client, task_id): +async def create_session(database_object, async_client): """Creates a new session.""" - print("\nStarting task: ", task_id) - request = CreateSessionRequest( database = database_object.name, ) - response = await async_client.create_session(request = request) - - print(response) - - print("\nEnding task: ", task_id) + return response # [END async_client_create_session] @@ -557,81 +453,74 @@ async def main(): async_client, database_object = get_async_client_and_associate_database(args.instance_id, args.database_id) - if args.command == "create_session": await asyncio.gather( - *[create_session(database_object, async_client, task) for task in range(int(args.n))] + *[create_session(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "batch_create_sessions": await asyncio.gather( - *[batch_create_sessions(database_object, async_client, task) for task in range(int(args.n))] + *[batch_create_sessions(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "get_session": await asyncio.gather( - *[get_session(database_object, async_client, task) for task in range(int(args.n))] + *[get_session(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "list_sessions": await asyncio.gather( - *[list_sessions(database_object, async_client, task) for task in range(int(args.n))] + *[list_sessions(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "delete_session": await asyncio.gather( - *[delete_session(args.session_name, async_client, task) for task in range(int(args.n))] + *[delete_session(args.session_name, async_client) for task in range(int(args.n))] ) elif args.command == "execute_sql": await asyncio.gather( - *[execute_sql(database_object, async_client, task) for task in range(int(args.n))] + *[execute_sql(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "execute_streaming_sql": await asyncio.gather( - *[execute_streaming_sql(database_object, async_client, task) for task in range(int(args.n))] + *[execute_streaming_sql(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "execute_batch_dml": await asyncio.gather( - *[execute_batch_dml(database_object, async_client, task) for task in range(int(args.n))] + *[execute_batch_dml(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "read": await asyncio.gather( - *[read(database_object, async_client, task) for task in range(int(args.n))] + *[read(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "streaming_read": await asyncio.gather( - *[streaming_read(database_object, async_client, task) for task in range(int(args.n))] + *[streaming_read(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "begin_transaction": await asyncio.gather( - *[begin_transaction(database_object, async_client, task) for task in range(int(args.n))] + *[begin_transaction(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "commit": await asyncio.gather( - *[commit(database_object, async_client, task) for task in range(int(args.n))] + *[commit(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "rollback": await asyncio.gather( - *[rollback(database_object, async_client, task) for task in range(int(args.n))] + *[rollback(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "partition_query": await asyncio.gather( - *[partition_query(database_object, async_client, task) for task in range(int(args.n))] + *[partition_query(database_object, async_client) for task in range(int(args.n))] ) elif args.command == "partition_read": await asyncio.gather( - *[partition_read(database_object, async_client, task) for task in range(int(args.n))] + *[partition_read(database_object, async_client) for task in range(int(args.n))] ) if __name__ == "__main__": startTime = time.time() - tracemalloc.start() - asyncio.run(main()) - endTime = time.time() - print('Total time taken: ' + str(endTime-startTime) + ' seconds') - print('Total memory taken(current, peak): ' + str(tracemalloc.get_traced_memory()) + ' KiB') - tracemalloc.stop() \ No newline at end of file