From 0137d225d20fe2cf2b31b0b993e7544e9d439a99 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Wed, 21 Feb 2018 10:44:15 -0800 Subject: [PATCH 1/9] Add intial sample. --- spanner/cloud-client/batch_sample.py | 91 ++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 spanner/cloud-client/batch_sample.py diff --git a/spanner/cloud-client/batch_sample.py b/spanner/cloud-client/batch_sample.py new file mode 100644 index 000000000000..8b6e7ba350e2 --- /dev/null +++ b/spanner/cloud-client/batch_sample.py @@ -0,0 +1,91 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to do batch operations using Cloud +Spanner. + +For more information, see the README.rst under /spanner. +""" + +import argparse +import multiprocessing +from multiprocessing.pool import ThreadPool +import time + +from google.cloud import spanner + + +def run_batch_query(instance_id, database_id): + """Runs an example batch query.""" + + # [START spanner_batch_client] + # + # CREATE TABLE Singers ( + # SingerId INT64 NOT NULL, + # FirstName STRING(1024), + # LastName STRING(1024), + # SingerInfo BYTES(MAX), + # ) PRIMARY KEY (SingerId); + + spanner_client = spanner.Client() + instance = spanner_client.instance(instance_id) + database = instance.database(database_id) + + # Create the batch transaction and generate partitions + batch_transaction = database.batch_transaction() + partitions = batch_transaction.generate_read_batches( + table='Singers', + columns=('SingerId', 'FirstName', 'LastName',), + keyset=spanner.KeySet(all_=True) + ) + + # pool = multiprocessing.Pool() + pool = ThreadPool() + results = [] + start = time.time() + print('reached this point') + for partition in partitions: + print('Starting partition.') + results.append( + pool.apply_async(process_partition, (batch_transaction, partition))) + + # Print results + for result in results: + finish, row_ct = result.get(timeout=3600) + elapsed = finish - start + print(u'Completed {} rows in {} seconds'.format(row_ct, elapsed)) + + +def process_partition(transaction, partition): + """Processes the requests of a query in an separate process.""" + print('Process started.') + row_ct = 0 + for row in transaction.process_read_batch(partition): + print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row)) + row_ct += 1 + return time.time(), row_ct + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument( + 'instance_id', help='Your Cloud Spanner instance ID.') + parser.add_argument( + 'database_id', help='Your Cloud Spanner database ID.', + default='example_db') + + args = parser.parse_args() + + run_batch_query(args.instance_id, args.database_id) \ No newline at end of file From 7c3c8811d53d04c04ec1f6bceb000d085a38c5e8 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Wed, 21 Feb 2018 15:52:20 -0800 Subject: [PATCH 2/9] Clean up sample. --- spanner/cloud-client/batch_sample.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/spanner/cloud-client/batch_sample.py b/spanner/cloud-client/batch_sample.py index 8b6e7ba350e2..325a97ff37e8 100644 --- a/spanner/cloud-client/batch_sample.py +++ b/spanner/cloud-client/batch_sample.py @@ -1,4 +1,3 @@ -# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -18,18 +17,16 @@ """ import argparse -import multiprocessing from multiprocessing.pool import ThreadPool import time from google.cloud import spanner +# [START spanner_batch_client] def run_batch_query(instance_id, database_id): """Runs an example batch query.""" - # [START spanner_batch_client] - # # CREATE TABLE Singers ( # SingerId INT64 NOT NULL, # FirstName STRING(1024), @@ -53,7 +50,7 @@ def run_batch_query(instance_id, database_id): pool = ThreadPool() results = [] start = time.time() - print('reached this point') + for partition in partitions: print('Starting partition.') results.append( @@ -61,19 +58,27 @@ def run_batch_query(instance_id, database_id): # Print results for result in results: + print(result) finish, row_ct = result.get(timeout=3600) elapsed = finish - start print(u'Completed {} rows in {} seconds'.format(row_ct, elapsed)) + # Clean up + batch_transaction.session.delete() + def process_partition(transaction, partition): """Processes the requests of a query in an separate process.""" print('Process started.') - row_ct = 0 - for row in transaction.process_read_batch(partition): - print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row)) - row_ct += 1 - return time.time(), row_ct + try: + row_ct = 0 + for row in transaction.process_read_batch(partition): + print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row)) + row_ct += 1 + return time.time(), row_ct + except Exception as e: + print(e.message) +# [END spanner_batch_client] if __name__ == '__main__': @@ -88,4 +93,4 @@ def process_partition(transaction, partition): args = parser.parse_args() - run_batch_query(args.instance_id, args.database_id) \ No newline at end of file + run_batch_query(args.instance_id, args.database_id) From cdd7da14b3d67d4d2c79b9cd024a72ee26ca5388 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Wed, 14 Mar 2018 13:31:40 -0700 Subject: [PATCH 3/9] Update license and address style check. --- spanner/cloud-client/batch_sample.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spanner/cloud-client/batch_sample.py b/spanner/cloud-client/batch_sample.py index 325a97ff37e8..93fa1526ce81 100644 --- a/spanner/cloud-client/batch_sample.py +++ b/spanner/cloud-client/batch_sample.py @@ -1,8 +1,10 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -54,7 +56,8 @@ def run_batch_query(instance_id, database_id): for partition in partitions: print('Starting partition.') results.append( - pool.apply_async(process_partition, (batch_transaction, partition))) + pool.apply_async( + process_partition, (batch_transaction, partition))) # Print results for result in results: From 172ee2c8c4a64f32f0713292153969e009854543 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Wed, 14 Mar 2018 14:13:48 -0700 Subject: [PATCH 4/9] Replace multiproccessing with concurrent.futures. --- spanner/cloud-client/batch_sample.py | 6 +++--- spanner/cloud-client/requirements.txt | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/spanner/cloud-client/batch_sample.py b/spanner/cloud-client/batch_sample.py index 93fa1526ce81..77542c46dd16 100644 --- a/spanner/cloud-client/batch_sample.py +++ b/spanner/cloud-client/batch_sample.py @@ -19,7 +19,7 @@ """ import argparse -from multiprocessing.pool import ThreadPool +from concurrent.futures import ThreadPoolExecutor import time from google.cloud import spanner @@ -48,8 +48,8 @@ def run_batch_query(instance_id, database_id): keyset=spanner.KeySet(all_=True) ) - # pool = multiprocessing.Pool() - pool = ThreadPool() + # Create a pool of workers for the tasks + pool = ThreadPoolExecutor() results = [] start = time.time() diff --git a/spanner/cloud-client/requirements.txt b/spanner/cloud-client/requirements.txt index be2572296f3f..b2742aa507f2 100644 --- a/spanner/cloud-client/requirements.txt +++ b/spanner/cloud-client/requirements.txt @@ -1 +1,2 @@ google-cloud-spanner==1.1.0 +futures==3.2.0 \ No newline at end of file From bc1ecdbf3f6526bdeecdfbe729de9e7f5565f1de Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Wed, 14 Mar 2018 20:24:52 -0700 Subject: [PATCH 5/9] Update to 1.2.0 and fix threadpool. --- spanner/cloud-client/batch_sample.py | 33 +++++++++++---------------- spanner/cloud-client/requirements.txt | 2 +- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/spanner/cloud-client/batch_sample.py b/spanner/cloud-client/batch_sample.py index 77542c46dd16..3483493fd179 100644 --- a/spanner/cloud-client/batch_sample.py +++ b/spanner/cloud-client/batch_sample.py @@ -19,7 +19,7 @@ """ import argparse -from concurrent.futures import ThreadPoolExecutor +import concurrent.futures import time from google.cloud import spanner @@ -41,41 +41,34 @@ def run_batch_query(instance_id, database_id): database = instance.database(database_id) # Create the batch transaction and generate partitions - batch_transaction = database.batch_transaction() - partitions = batch_transaction.generate_read_batches( + snapshot = database.batch_snapshot() + partitions = snapshot.generate_read_batches( table='Singers', columns=('SingerId', 'FirstName', 'LastName',), keyset=spanner.KeySet(all_=True) ) # Create a pool of workers for the tasks - pool = ThreadPoolExecutor() - results = [] start = time.time() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [executor.submit(process, snapshot, p) for p in partitions] - for partition in partitions: - print('Starting partition.') - results.append( - pool.apply_async( - process_partition, (batch_transaction, partition))) + for future in futures: + finish, row_ct = future.result(timeout=3600) - # Print results - for result in results: - print(result) - finish, row_ct = result.get(timeout=3600) - elapsed = finish - start - print(u'Completed {} rows in {} seconds'.format(row_ct, elapsed)) + elapsed = finish - start + print(u'Completed {} rows in {} seconds'.format(row_ct, elapsed)) # Clean up - batch_transaction.session.delete() + snapshot.close() -def process_partition(transaction, partition): +def process(snapshot, partition): """Processes the requests of a query in an separate process.""" - print('Process started.') + print('Started processing partition.') try: row_ct = 0 - for row in transaction.process_read_batch(partition): + for row in snapshot.process_read_batch(partition): print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row)) row_ct += 1 return time.time(), row_ct diff --git a/spanner/cloud-client/requirements.txt b/spanner/cloud-client/requirements.txt index b2742aa507f2..7c307846f5fb 100644 --- a/spanner/cloud-client/requirements.txt +++ b/spanner/cloud-client/requirements.txt @@ -1,2 +1,2 @@ -google-cloud-spanner==1.1.0 +google-cloud-spanner==1.2.0 futures==3.2.0 \ No newline at end of file From 707ec8d2d87895cc89c887cf348be415c980b4d9 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Thu, 15 Mar 2018 09:07:20 -0700 Subject: [PATCH 6/9] Address feedback. --- spanner/cloud-client/batch_sample.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spanner/cloud-client/batch_sample.py b/spanner/cloud-client/batch_sample.py index 3483493fd179..4745fb5f1716 100644 --- a/spanner/cloud-client/batch_sample.py +++ b/spanner/cloud-client/batch_sample.py @@ -29,6 +29,7 @@ def run_batch_query(instance_id, database_id): """Runs an example batch query.""" + # Expected Table Format: # CREATE TABLE Singers ( # SingerId INT64 NOT NULL, # FirstName STRING(1024), @@ -53,9 +54,8 @@ def run_batch_query(instance_id, database_id): with concurrent.futures.ThreadPoolExecutor() as executor: futures = [executor.submit(process, snapshot, p) for p in partitions] - for future in futures: - finish, row_ct = future.result(timeout=3600) - + for future in concurrent.futures.as_completed(futures, timeout=3600): + finish, row_ct = future.result() elapsed = finish - start print(u'Completed {} rows in {} seconds'.format(row_ct, elapsed)) From 092cc883ffc814e3de63755117a88be756c63eb9 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Thu, 15 Mar 2018 09:13:53 -0700 Subject: [PATCH 7/9] Downgrade futures to 3.1.0 for python3. --- spanner/cloud-client/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spanner/cloud-client/requirements.txt b/spanner/cloud-client/requirements.txt index 7c307846f5fb..0ea74f650cfc 100644 --- a/spanner/cloud-client/requirements.txt +++ b/spanner/cloud-client/requirements.txt @@ -1,2 +1,2 @@ google-cloud-spanner==1.2.0 -futures==3.2.0 \ No newline at end of file +futures==3.1.0 \ No newline at end of file From c3b862a0aadf7d1fccd85336b5153d844a4e7c26 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Thu, 15 Mar 2018 09:22:14 -0700 Subject: [PATCH 8/9] Futures requirement only for python2. --- spanner/cloud-client/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spanner/cloud-client/requirements.txt b/spanner/cloud-client/requirements.txt index 0ea74f650cfc..4d374891d4db 100644 --- a/spanner/cloud-client/requirements.txt +++ b/spanner/cloud-client/requirements.txt @@ -1,2 +1,2 @@ google-cloud-spanner==1.2.0 -futures==3.1.0 \ No newline at end of file +futures ; python_version < '3' From 7bb0eb51667d1794c24633a8d0d15c0efd6036c5 Mon Sep 17 00:00:00 2001 From: Kurtis Van Gent Date: Thu, 15 Mar 2018 10:42:24 -0700 Subject: [PATCH 9/9] Remove try/catch. --- spanner/cloud-client/batch_sample.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/spanner/cloud-client/batch_sample.py b/spanner/cloud-client/batch_sample.py index 4745fb5f1716..e54581853a90 100644 --- a/spanner/cloud-client/batch_sample.py +++ b/spanner/cloud-client/batch_sample.py @@ -66,14 +66,11 @@ def run_batch_query(instance_id, database_id): def process(snapshot, partition): """Processes the requests of a query in an separate process.""" print('Started processing partition.') - try: - row_ct = 0 - for row in snapshot.process_read_batch(partition): - print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row)) - row_ct += 1 - return time.time(), row_ct - except Exception as e: - print(e.message) + row_ct = 0 + for row in snapshot.process_read_batch(partition): + print(u'SingerId: {}, AlbumId: {}, AlbumTitle: {}'.format(*row)) + row_ct += 1 + return time.time(), row_ct # [END spanner_batch_client]