From d020caf31a26a10c6d3ac41e378b872aef3c81f8 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 26 Feb 2024 11:27:19 +0000 Subject: [PATCH 1/6] feat(spanner): add retry, timeout for batch update --- google/cloud/spanner_v1/transaction.py | 17 ++++++++++++++++- tests/unit/test_transaction.py | 25 +++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 1f5ff1098a..b02a43e8d2 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -410,7 +410,14 @@ def execute_update( return response.stats.row_count_exact - def batch_update(self, statements, request_options=None): + def batch_update( + self, + statements, + request_options=None, + *, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ): """Perform a batch of DML statements via an ``ExecuteBatchDml`` request. :type statements: @@ -431,6 +438,12 @@ def batch_update(self, statements, request_options=None): If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type retry: :class:`~google.api_core.retry.Retry` + :param retry: (Optional) The retry settings for this request. + + :type timeout: float + :param timeout: (Optional) The timeout for this request. + :rtype: Tuple(status, Sequence[int]) :returns: @@ -486,6 +499,8 @@ def batch_update(self, statements, request_options=None): api.execute_batch_dml, request=request, metadata=metadata, + retry=retry, + timeout=timeout, ) if self._transaction_id is None: diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index a673eabb83..b40ae8843f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -662,7 +662,14 @@ def test_batch_update_other_error(self): with self.assertRaises(RuntimeError): transaction.batch_update(statements=[DML_QUERY]) - def _batch_update_helper(self, error_after=None, count=0, request_options=None): + def _batch_update_helper( + self, + error_after=None, + count=0, + request_options=None, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + ): from google.rpc.status_pb2 import Status from google.protobuf.struct_pb2 import Struct from google.cloud.spanner_v1 import param_types @@ -716,7 +723,10 @@ def _batch_update_helper(self, error_after=None, count=0, request_options=None): request_options = RequestOptions(request_options) status, row_counts = transaction.batch_update( - dml_statements, request_options=request_options + dml_statements, + request_options=request_options, + retry=retry, + timeout=timeout, ) self.assertEqual(status, expected_status) @@ -753,6 +763,8 @@ def _batch_update_helper(self, error_after=None, count=0, request_options=None): ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=retry, + timeout=timeout, ) self.assertEqual(transaction._execute_sql_count, count + 1) @@ -826,6 +838,15 @@ def test_batch_update_error(self): self.assertEqual(transaction._execute_sql_count, 1) + def test_batch_update_w_timeout_param(self): + self._batch_update_helper(timeout=2.0) + + def test_batch_update_w_retry_param(self): + self._batch_update_helper(retry=gapic_v1.method.DEFAULT) + + def test_batch_update_w_timeout_and_retry_params(self): + self._batch_update_helper(retry=gapic_v1.method.DEFAULT, timeout=2.0) + def test_context_mgr_success(self): import datetime from google.cloud.spanner_v1 import CommitResponse From 154cdee7fe5bc064260fc454b8fbd4f2eff66645 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 26 Feb 2024 11:51:25 +0000 Subject: [PATCH 2/6] feat(spanner): add samples for retry, timeout --- samples/samples/snippets.py | 32 ++++++++++++++++++++++++++++++++ samples/samples/snippets_test.py | 7 +++++++ 2 files changed, 39 insertions(+) diff --git a/samples/samples/snippets.py b/samples/samples/snippets.py index 3ffd579f4a..470d2a59e3 100644 --- a/samples/samples/snippets.py +++ b/samples/samples/snippets.py @@ -2796,6 +2796,33 @@ def directed_read_options( # [END spanner_directed_read] +def set_custom_timeout_and_retry(instance_id, database_id): + """Executes a snapshot read with custom timeout and retry.""" + # [START spanner_set_custom_timeout_and_retry] + from google.api_core.retry import Retry + + # instance_id = "your-spanner-instance" + # database_id = "your-spanner-db-id" + spanner_client = spanner.Client() + instance = spanner_client.instance(instance_id) + database = instance.database(database_id) + + retry = Retry(initial=5, maximum=100, multiplier=2, timeout=60) + + # Set a custom retry and timeout setting. + with database.snapshot() as snapshot: + results = snapshot.execute_sql( + "SELECT SingerId, AlbumId, AlbumTitle FROM Albums", + retry=retry, + timeout=60, + ) + + for row in results: + print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row)) + + # [END spanner_set_custom_timeout_and_retry] + + if __name__ == "__main__": # noqa: C901 parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter @@ -2936,6 +2963,9 @@ def directed_read_options( ) enable_fine_grained_access_parser.add_argument("--title", default="condition title") subparsers.add_parser("directed_read_options", help=directed_read_options.__doc__) + subparsers.add_parser( + "set_custom_timeout_and_retry", help=set_custom_timeout_and_retry.__doc__ + ) args = parser.parse_args() @@ -3069,3 +3099,5 @@ def directed_read_options( ) elif args.command == "directed_read_options": directed_read_options(args.instance_id, args.database_id) + elif args.command == "set_custom_timeout_and_retry": + set_custom_timeout_and_retry(args.instance_id, args.database_id) diff --git a/samples/samples/snippets_test.py b/samples/samples/snippets_test.py index a49a4ee480..0a15df1d0c 100644 --- a/samples/samples/snippets_test.py +++ b/samples/samples/snippets_test.py @@ -859,3 +859,10 @@ def test_directed_read_options(capsys, instance_id, sample_database): snippets.directed_read_options(instance_id, sample_database.database_id) out, _ = capsys.readouterr() assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out + + +@pytest.mark.dependency(depends=["insert_data"]) +def test_set_custom_timeout_and_retry(capsys, instance_id, sample_database): + snippets.set_custom_timeout_and_retry(instance_id, sample_database.database_id) + out, _ = capsys.readouterr() + assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out From 8efa9a55ee94fd8b608fe71ef77aed892d58b543 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 26 Feb 2024 18:23:01 +0000 Subject: [PATCH 3/6] feat(spanner): update unittest --- tests/unit/test_spanner.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index 3663d8bdc9..0c7feed5ac 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -556,6 +556,8 @@ def test_transaction_should_include_begin_with_first_batch_update(self): ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( @@ -574,6 +576,8 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self._execute_update_helper(transaction=transaction, api=api) api.execute_sql.assert_called_once_with( @@ -715,6 +719,8 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) def test_transaction_should_use_transaction_id_returned_by_first_batch_update(self): @@ -729,6 +735,8 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self._read_helper(transaction=transaction, api=api) api.streaming_read.assert_called_once_with( @@ -797,6 +805,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self.assertEqual(api.execute_sql.call_count, 2) @@ -846,6 +856,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) api.execute_batch_dml.assert_any_call( @@ -854,6 +866,8 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), ], + retry=RETRY, + timeout=TIMEOUT, ) self.assertEqual(api.execute_sql.call_count, 1) From 74279ba52605e1ce61b6a7806b74fa9de03b3e0c Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 6 Mar 2024 09:05:36 +0000 Subject: [PATCH 4/6] feat(spanner): update comments --- samples/samples/snippets.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/samples/samples/snippets.py b/samples/samples/snippets.py index 470d2a59e3..72e3c57f3c 100644 --- a/samples/samples/snippets.py +++ b/samples/samples/snippets.py @@ -2807,13 +2807,20 @@ def set_custom_timeout_and_retry(instance_id, database_id): instance = spanner_client.instance(instance_id) database = instance.database(database_id) + # Customize retry with an initial wait time of 5 seconds. + # Customize retry with a maximum wait time of 100 seconds. + # Customize retry with a wait time multiplier per iteration of 2. + # Customize retry with a timeout on + # how long a certain RPC may be retried in case the server returns an error. retry = Retry(initial=5, maximum=100, multiplier=2, timeout=60) # Set a custom retry and timeout setting. with database.snapshot() as snapshot: results = snapshot.execute_sql( "SELECT SingerId, AlbumId, AlbumTitle FROM Albums", + # Set custom retry setting for this request retry=retry, + # Set custom timeout setting for this request timeout=60, ) From b4a155df35c054f8301385e13b7a269ace2b1289 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 6 Mar 2024 10:19:12 +0000 Subject: [PATCH 5/6] feat(spanner): update code for retry --- samples/samples/snippets.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/samples/samples/snippets.py b/samples/samples/snippets.py index c1e85c8041..7e649d3c29 100644 --- a/samples/samples/snippets.py +++ b/samples/samples/snippets.py @@ -3020,7 +3020,8 @@ def directed_read_options( def set_custom_timeout_and_retry(instance_id, database_id): """Executes a snapshot read with custom timeout and retry.""" # [START spanner_set_custom_timeout_and_retry] - from google.api_core.retry import Retry + from google.api_core import retry + from google.api_core import exceptions as core_exceptions # instance_id = "your-spanner-instance" # database_id = "your-spanner-db-id" @@ -3028,12 +3029,20 @@ def set_custom_timeout_and_retry(instance_id, database_id): instance = spanner_client.instance(instance_id) database = instance.database(database_id) - # Customize retry with an initial wait time of 5 seconds. - # Customize retry with a maximum wait time of 100 seconds. - # Customize retry with a wait time multiplier per iteration of 2. + # Customize retry with an initial wait time of 500 milliseconds. + # Customize retry with a maximum wait time of 16 seconds. + # Customize retry with a wait time multiplier per iteration of 1.5. # Customize retry with a timeout on # how long a certain RPC may be retried in case the server returns an error. - retry = Retry(initial=5, maximum=100, multiplier=2, timeout=60) + retry = retry.Retry( + initial=0.5, + maximum=16, + multiplier=1.5, + timeout=60, + predicate=retry.if_exception_type( + core_exceptions.ServiceUnavailable, + ), + ) # Set a custom retry and timeout setting. with database.snapshot() as snapshot: @@ -3041,7 +3050,7 @@ def set_custom_timeout_and_retry(instance_id, database_id): "SELECT SingerId, AlbumId, AlbumTitle FROM Albums", # Set custom retry setting for this request retry=retry, - # Set custom timeout setting for this request + # Set custom timeout of 60 seconds for this request timeout=60, ) From 66535e1db9035f1edc1b0b094427c7a4773ff8e0 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 6 Mar 2024 10:53:13 +0000 Subject: [PATCH 6/6] feat(spanner): update comment --- samples/samples/snippets.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/samples/samples/snippets.py b/samples/samples/snippets.py index 7e649d3c29..23d9d8aff1 100644 --- a/samples/samples/snippets.py +++ b/samples/samples/snippets.py @@ -3029,16 +3029,18 @@ def set_custom_timeout_and_retry(instance_id, database_id): instance = spanner_client.instance(instance_id) database = instance.database(database_id) - # Customize retry with an initial wait time of 500 milliseconds. - # Customize retry with a maximum wait time of 16 seconds. - # Customize retry with a wait time multiplier per iteration of 1.5. - # Customize retry with a timeout on - # how long a certain RPC may be retried in case the server returns an error. retry = retry.Retry( + # Customize retry with an initial wait time of 500 milliseconds. initial=0.5, + # Customize retry with a maximum wait time of 16 seconds. maximum=16, + # Customize retry with a wait time multiplier per iteration of 1.5. multiplier=1.5, + # Customize retry with a timeout on + # how long a certain RPC may be retried in + # case the server returns an error. timeout=60, + # Configure which errors should be retried. predicate=retry.if_exception_type( core_exceptions.ServiceUnavailable, ),