diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index afe6264717..772abe8261 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -48,6 +48,7 @@ from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import _metadata_with_prefix from google.cloud.spanner_v1.instance import Instance +from google.cloud.spanner_v1._helpers import AtomicCounter _CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__) EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST" @@ -147,6 +148,8 @@ class Client(ClientWithProject): SCOPE = (SPANNER_ADMIN_SCOPE,) """The scopes required for Google Cloud Spanner.""" + NTH_CLIENT = AtomicCounter() + def __init__( self, project=None, @@ -199,6 +202,12 @@ def __init__( self._route_to_leader_enabled = route_to_leader_enabled self._directed_read_options = directed_read_options self._observability_options = observability_options + self._nth_client_id = Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() @property def credentials(self): diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 88d2bb60f7..a33ca8b0fb 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -50,8 +50,10 @@ from google.cloud.spanner_v1 import SpannerClient from google.cloud.spanner_v1._helpers import _merge_query_options from google.cloud.spanner_v1._helpers import ( + AtomicCounter, _metadata_with_prefix, _metadata_with_leader_aware_routing, + _metadata_with_request_id, ) from google.cloud.spanner_v1.batch import Batch from google.cloud.spanner_v1.batch import MutationGroups @@ -149,6 +151,9 @@ class Database(object): _spanner_api: SpannerClient = None + __transport_lock = threading.Lock() + __transports_to_channel_id = dict() + def __init__( self, database_id, @@ -443,6 +448,31 @@ def spanner_api(self): ) return self._spanner_api + @property + def _channel_id(self): + """ + Helper to retrieve the associated channelID for the spanner_api. + This property is paramount to x-goog-spanner-request-id. + """ + with self.__transport_lock: + api = self.spanner_api + channel_id = self.__transports_to_channel_id.get(api._transport, None) + if channel_id is None: + channel_id = len(self.__transports_to_channel_id) + 1 + self.__transports_to_channel_id[api._transport] = channel_id + + return channel_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + client_id = self._nth_client_id + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + def __eq__(self, other): if not isinstance(other, self.__class__): return NotImplemented @@ -698,10 +728,20 @@ def execute_partitioned_dml( _metadata_with_leader_aware_routing(self._route_to_leader_enabled) ) + # Attempt will be incremented inside _restart_on_unavailable. + begin_txn_nth_request = self._next_nth_request + begin_txn_attempt = AtomicCounter(1) + partial_nth_request = self._next_nth_request + partial_attempt = AtomicCounter(0) + def execute_pdml(): with SessionCheckout(self._pool) as session: txn = api.begin_transaction( - session=session.name, options=txn_options, metadata=metadata + session=session.name, + options=txn_options, + metadata=self.metadata_with_request_id( + begin_txn_nth_request, begin_txn_attempt.value, metadata + ), ) txn_selector = TransactionSelector(id=txn.id) @@ -714,17 +754,24 @@ def execute_pdml(): query_options=query_options, request_options=request_options, ) - method = functools.partial( - api.execute_streaming_sql, - metadata=metadata, - ) + + def wrapped_method(*args, **kwargs): + partial_attempt.increment() + method = functools.partial( + api.execute_streaming_sql, + metadata=self.metadata_with_request_id( + partial_nth_request, partial_attempt.value, metadata + ), + ) + return method(*args, **kwargs) iterator = _restart_on_unavailable( - method=method, + method=wrapped_method, trace_name="CloudSpanner.ExecuteStreamingSql", request=request, transaction_selector=txn_selector, observability_options=self.observability_options, + attempt=begin_txn_attempt, ) result_set = StreamedResultSet(iterator) @@ -734,6 +781,14 @@ def execute_pdml(): return _retry_on_aborted(execute_pdml, DEFAULT_RETRY_BACKOFF)() + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + def session(self, labels=None, database_role=None): """Factory to create a session for this database. diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index a67e0e630b..06ac1c4593 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -501,6 +501,7 @@ def database( proto_descriptors=proto_descriptors, ) else: + print("enabled interceptors") return TestDatabase( database_id, self, diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 03bff81b52..efae909da9 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -243,6 +243,7 @@ def bind(self, database): "CloudSpanner.FixedPool.BatchCreateSessions", observability_options=observability_options, ) as span: + attempt = 1 returned_session_count = 0 while not self._sessions.full(): request.session_count = requested_session_count - self._sessions.qsize() @@ -251,9 +252,12 @@ def bind(self, database): f"Creating {request.session_count} sessions", span_event_attributes, ) + all_metadata = database.metadata_with_request_id( + database._next_nth_request, attempt, metadata + ) resp = api.batch_create_sessions( request=request, - metadata=metadata, + metadata=all_metadata, ) add_span_event( diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index d73a8cc2b5..3495e50a08 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -193,7 +193,8 @@ def exists(self): current_span, "Checking if Session exists", {"session.id": self._session_id} ) - api = self._database.spanner_api + database = self._database + api = database.spanner_api metadata = _metadata_with_prefix(self._database.name) if self._database._route_to_leader_enabled: metadata.append( @@ -202,12 +203,16 @@ def exists(self): ) ) + all_metadata = database.metadata_with_request_id( + database._next_nth_request, 1, metadata + ) + observability_options = getattr(self._database, "observability_options", None) with trace_call( "CloudSpanner.GetSession", self, observability_options=observability_options ) as span: try: - api.get_session(name=self.name, metadata=metadata) + api.get_session(name=self.name, metadata=all_metadata) if span: span.set_attribute("session_found", True) except NotFound: @@ -237,8 +242,11 @@ def delete(self): current_span, "Deleting Session", {"session.id": self._session_id} ) - api = self._database.spanner_api - metadata = _metadata_with_prefix(self._database.name) + database = self._database + api = database.spanner_api + metadata = database.metadata_with_request_id( + database._next_nth_request, 1, _metadata_with_prefix(database.name) + ) observability_options = getattr(self._database, "observability_options", None) with trace_call( "CloudSpanner.DeleteSession", @@ -259,7 +267,10 @@ def ping(self): if self._session_id is None: raise ValueError("Session ID not set by back-end") api = self._database.spanner_api - metadata = _metadata_with_prefix(self._database.name) + database = self._database + metadata = database.metadata_with_request_id( + database._next_nth_request, 1, _metadata_with_prefix(database.name) + ) request = ExecuteSqlRequest(session=self.name, sql="SELECT 1") api.execute_sql(request=request, metadata=metadata) self._last_use_time = datetime.now() diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 6234c96435..24e026f7e4 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -35,9 +35,11 @@ _merge_query_options, _metadata_with_prefix, _metadata_with_leader_aware_routing, + _metadata_with_request_id, _retry, _check_rst_stream_error, _SessionWrapper, + AtomicCounter, ) from google.cloud.spanner_v1._opentelemetry_tracing import trace_call from google.cloud.spanner_v1.streamed import StreamedResultSet @@ -58,6 +60,7 @@ def _restart_on_unavailable( transaction=None, transaction_selector=None, observability_options=None, + attempt=0, ): """Restart iteration after :exc:`.ServiceUnavailable`. @@ -92,6 +95,7 @@ def _restart_on_unavailable( ): iterator = method(request=request) while True: + attempt += 1 try: for item in iterator: item_buffer.append(item) @@ -318,13 +322,24 @@ def read( data_boost_enabled=data_boost_enabled, directed_read_options=directed_read_options, ) - restart = functools.partial( - api.streaming_read, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + + nth_request = getattr(database, "_next_nth_request", 0) + attempt = AtomicCounter(0) + + def wrapped_restart(*args, **kwargs): + attempt.increment() + all_metadata = database.metadata_with_request_id( + nth_request, attempt.value, metadata + ) + + restart = functools.partial( + api.streaming_read, + request=request, + metadata=all_metadata, + retry=retry, + timeout=timeout, + ) + return restart(*args, **kwargs) trace_attributes = {"table_id": table, "columns": columns} observability_options = getattr(database, "observability_options", None) @@ -333,7 +348,7 @@ def read( # lock is added to handle the inline begin for first rpc with self._lock: iterator = _restart_on_unavailable( - restart, + wrapped_restart, request, f"CloudSpanner.{type(self).__name__}.read", self._session, @@ -355,7 +370,7 @@ def read( ) else: iterator = _restart_on_unavailable( - restart, + wrapped_restart, request, f"CloudSpanner.{type(self).__name__}.read", self._session, @@ -534,13 +549,23 @@ def execute_sql( data_boost_enabled=data_boost_enabled, directed_read_options=directed_read_options, ) - restart = functools.partial( - api.execute_streaming_sql, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + + nth_request = getattr(database, "_next_nth_request", 0) + attempt = AtomicCounter(0) + + def wrapped_restart(*args, **kwargs): + attempt.increment() + restart = functools.partial( + api.execute_streaming_sql, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + retry=retry, + timeout=timeout, + ) + + return restart(*args, **kwargs) trace_attributes = {"db.statement": sql} observability_options = getattr(database, "observability_options", None) @@ -549,7 +574,7 @@ def execute_sql( # lock is added to handle the inline begin for first rpc with self._lock: return self._get_streamed_result_set( - restart, + wrapped_restart, request, trace_attributes, column_info, @@ -558,7 +583,7 @@ def execute_sql( ) else: return self._get_streamed_result_set( - restart, + wrapped_restart, request, trace_attributes, column_info, @@ -681,15 +706,25 @@ def partition_read( trace_attributes, observability_options=getattr(database, "observability_options", None), ): - method = functools.partial( - api.partition_read, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = getattr(database, "_next_nth_request", 0) + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + all_metadata = database.metadata_with_request_id( + nth_request, attempt.value, metadata + ) + method = functools.partial( + api.partition_read, + request=request, + metadata=all_metadata, + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) + response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) @@ -784,15 +819,25 @@ def partition_query( trace_attributes, observability_options=getattr(database, "observability_options", None), ): - method = functools.partial( - api.partition_query, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = getattr(database, "_next_nth_request", 0) + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + all_metadata = database.metadata_with_request_id( + nth_request, attempt.value, metadata + ) + method = functools.partial( + api.partition_query, + request=request, + metadata=all_metadata, + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) + response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) @@ -930,14 +975,24 @@ def begin(self): self._session, observability_options=getattr(database, "observability_options", None), ): - method = functools.partial( - api.begin_transaction, - session=self._session.name, - options=txn_selector.begin, - metadata=metadata, - ) + nth_request = getattr(database, "_next_nth_request", 0) + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + all_metadata = database.metadata_with_request_id( + nth_request, attempt.value, metadata + ) + method = functools.partial( + api.begin_transaction, + session=self._session.name, + options=txn_selector.begin, + metadata=all_metadata, + ) + return method(*args, **kwargs) + response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) self._transaction_id = response.id diff --git a/google/cloud/spanner_v1/testing/database_test.py b/google/cloud/spanner_v1/testing/database_test.py index 54afda11e0..80f040d7e0 100644 --- a/google/cloud/spanner_v1/testing/database_test.py +++ b/google/cloud/spanner_v1/testing/database_test.py @@ -25,6 +25,7 @@ from google.cloud.spanner_v1.testing.interceptors import ( MethodCountInterceptor, MethodAbortInterceptor, + XGoogRequestIDHeaderInterceptor, ) @@ -34,6 +35,8 @@ class TestDatabase(Database): currently, and we don't want to make changes in the Database class for testing purpose as this is a hack to use interceptors in tests.""" + _interceptors = [] + def __init__( self, database_id, @@ -74,6 +77,8 @@ def spanner_api(self): client_options = client._client_options if self._instance.emulator_host is not None: channel = grpc.insecure_channel(self._instance.emulator_host) + self._x_goog_request_id_interceptor = XGoogRequestIDHeaderInterceptor() + self._interceptors.append(self._x_goog_request_id_interceptor) channel = grpc.intercept_channel(channel, *self._interceptors) transport = SpannerGrpcTransport(channel=channel) self._spanner_api = SpannerClient( diff --git a/google/cloud/spanner_v1/testing/interceptors.py b/google/cloud/spanner_v1/testing/interceptors.py index a8b015a87d..918c7d76b9 100644 --- a/google/cloud/spanner_v1/testing/interceptors.py +++ b/google/cloud/spanner_v1/testing/interceptors.py @@ -13,6 +13,8 @@ # limitations under the License. from collections import defaultdict +import threading + from grpc_interceptor import ClientInterceptor from google.api_core.exceptions import Aborted @@ -63,3 +65,75 @@ def reset(self): self._method_to_abort = None self._count = 0 self._connection = None + + +X_GOOG_REQUEST_ID = "x-goog-spanner-request-id" + + +class XGoogRequestIDHeaderInterceptor(ClientInterceptor): + def __init__(self): + self._unary_req_segments = [] + self._stream_req_segments = [] + self.__lock = threading.Lock() + + def intercept(self, method, request_or_iterator, call_details): + metadata = call_details.metadata + x_goog_request_id = None + for key, value in metadata: + if key == X_GOOG_REQUEST_ID: + x_goog_request_id = value + break + + if not x_goog_request_id: + raise Exception( + f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}" + ) + + response_or_iterator = method(request_or_iterator, call_details) + streaming = getattr(response_or_iterator, "__iter__", None) is not None + print( + "intercept got", + x_goog_request_id, + call_details.method, + "streaming", + streaming, + ) + with self.__lock: + if streaming: + self._stream_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) + else: + self._unary_req_segments.append( + (call_details.method, parse_request_id(x_goog_request_id)) + ) + + return response_or_iterator + + @property + def unary_request_ids(self): + return self._unary_req_segments + + @property + def stream_request_ids(self): + return self._stream_req_segments + + def reset(self): + self._stream_req_segments.clear() + self._unary_req_segments.clear() + pass + + +def parse_request_id(request_id_str): + splits = request_id_str.split(".") + version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list( + map(lambda v: int(v), splits) + ) + return ( + version, + rand_process_id, + client_id, + channel_id, + nth_request, + nth_attempt, + ) diff --git a/google/cloud/spanner_v1/testing/mock_spanner.py b/google/cloud/spanner_v1/testing/mock_spanner.py index 1f37ff2a03..e093fc7387 100644 --- a/google/cloud/spanner_v1/testing/mock_spanner.py +++ b/google/cloud/spanner_v1/testing/mock_spanner.py @@ -187,7 +187,7 @@ def __create_transaction( def Commit(self, request, context): self._requests.append(request) self.mock_spanner.pop_error(context) - tx = self.transactions[request.transaction_id] + tx = self.transactions.get(request.transaction_id, None) if tx is None: raise ValueError(f"Transaction not found: {request.transaction_id}") del self.transactions[request.transaction_id] diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index a8aef7f470..c94919b210 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -30,6 +30,7 @@ from google.cloud.spanner_v1 import ExecuteSqlRequest from google.cloud.spanner_v1 import TransactionSelector from google.cloud.spanner_v1 import TransactionOptions +from google.cloud.spanner_v1._helpers import AtomicCounter from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase from google.cloud.spanner_v1._opentelemetry_tracing import add_span_event, trace_call @@ -161,12 +162,20 @@ def begin(self): self._session, observability_options=observability_options, ) as span: - method = functools.partial( - api.begin_transaction, - session=self._session.name, - options=txn_options, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.begin_transaction, + session=self._session.name, + options=txn_options, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) def beforeNextRetry(nthRetry, delayInSeconds): add_span_event( @@ -176,7 +185,7 @@ def beforeNextRetry(nthRetry, delayInSeconds): ) response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, beforeNextRetry=beforeNextRetry, ) @@ -197,22 +206,33 @@ def rollback(self): database._route_to_leader_enabled ) ) + observability_options = getattr(database, "observability_options", None) with trace_call( f"CloudSpanner.{type(self).__name__}.rollback", self._session, observability_options=observability_options, ): - method = functools.partial( - api.rollback, - session=self._session.name, - transaction_id=self._transaction_id, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.rollback, + session=self._session.name, + transaction_id=self._transaction_id, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) + _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, ) + self.rolled_back = True del self._session._transaction @@ -285,11 +305,19 @@ def commit( ) as span: add_span_event(span, "Starting Commit") - method = functools.partial( - api.commit, - request=request, - metadata=metadata, - ) + attempt = AtomicCounter(0) + nth_request = database._next_nth_request + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.commit, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + ) + return method(*args, **kwargs) def beforeNextRetry(nthRetry, delayInSeconds): add_span_event( @@ -299,7 +327,7 @@ def beforeNextRetry(nthRetry, delayInSeconds): ) response = _retry( - method, + wrapped_method, allowed_exceptions={InternalServerError: _check_rst_stream_error}, beforeNextRetry=beforeNextRetry, ) @@ -433,19 +461,27 @@ def execute_update( request_options=request_options, ) - method = functools.partial( - api.execute_sql, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = database._next_nth_request + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.execute_sql, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc with self._lock: response = self._execute_request( - method, + wrapped_method, request, f"CloudSpanner.{type(self).__name__}.execute_update", self._session, @@ -462,7 +498,7 @@ def execute_update( self._transaction_id = response.metadata.transaction.id else: response = self._execute_request( - method, + wrapped_method, request, f"CloudSpanner.{type(self).__name__}.execute_update", self._session, @@ -558,19 +594,27 @@ def batch_update( request_options=request_options, ) - method = functools.partial( - api.execute_batch_dml, - request=request, - metadata=metadata, - retry=retry, - timeout=timeout, - ) + nth_request = database._next_nth_request + attempt = AtomicCounter(0) + + def wrapped_method(*args, **kwargs): + attempt.increment() + method = functools.partial( + api.execute_batch_dml, + request=request, + metadata=database.metadata_with_request_id( + nth_request, attempt.value, metadata + ), + retry=retry, + timeout=timeout, + ) + return method(*args, **kwargs) if self._transaction_id is None: # lock is added to handle the inline begin for first rpc with self._lock: response = self._execute_request( - method, + wrapped_method, request, "CloudSpanner.DMLTransaction", self._session, @@ -588,7 +632,7 @@ def batch_update( break else: response = self._execute_request( - method, + wrapped_method, request, "CloudSpanner.DMLTransaction", self._session, diff --git a/tests/mockserver_tests/mock_server_test_base.py b/tests/mockserver_tests/mock_server_test_base.py index 12c98bc51b..3e4071fe6e 100644 --- a/tests/mockserver_tests/mock_server_test_base.py +++ b/tests/mockserver_tests/mock_server_test_base.py @@ -165,6 +165,8 @@ def instance(self) -> Instance: def database(self) -> Database: if self._database is None: self._database = self.instance.database( - "test-database", pool=FixedSizePool(size=10) + "test-database", + pool=FixedSizePool(size=10), + enable_interceptors_in_tests=True, ) return self._database diff --git a/tests/unit/test_atomic_counter.py b/tests/unit/test_atomic_counter.py index 92d10cac79..d1b22cf841 100644 --- a/tests/unit/test_atomic_counter.py +++ b/tests/unit/test_atomic_counter.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import random import threading import unittest diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 6e29255fb7..fb1850609f 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -26,6 +26,11 @@ from google.protobuf.field_mask_pb2 import FieldMask from google.cloud.spanner_v1 import RequestOptions, DirectedReadOptions +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID DML_WO_PARAM = """ DELETE FROM citizens @@ -111,7 +116,9 @@ def _make_database_admin_api(): def _make_spanner_api(): from google.cloud.spanner_v1 import SpannerClient - return mock.create_autospec(SpannerClient, instance=True) + api = mock.create_autospec(SpannerClient, instance=True) + api._transport = "transport" + return api def test_ctor_defaults(self): from google.cloud.spanner_v1.pool import BurstyPool @@ -545,7 +552,9 @@ def test_create_grpc_error(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_create_already_exists(self): @@ -572,7 +581,9 @@ def test_create_already_exists(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_create_instance_not_found(self): @@ -598,7 +609,9 @@ def test_create_instance_not_found(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_create_success(self): @@ -634,7 +647,9 @@ def test_create_success(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_create_success_w_encryption_config_dict(self): @@ -671,7 +686,9 @@ def test_create_success_w_encryption_config_dict(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_create_success_w_proto_descriptors(self): @@ -706,7 +723,9 @@ def test_create_success_w_proto_descriptors(self): api.create_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_exists_grpc_error(self): @@ -724,7 +743,9 @@ def test_exists_grpc_error(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_exists_not_found(self): @@ -741,7 +762,9 @@ def test_exists_not_found(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_exists_success(self): @@ -760,7 +783,9 @@ def test_exists_success(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_reload_grpc_error(self): @@ -778,7 +803,9 @@ def test_reload_grpc_error(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_reload_not_found(self): @@ -796,7 +823,9 @@ def test_reload_not_found(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_reload_success(self): @@ -855,11 +884,15 @@ def test_reload_success(self): api.get_database_ddl.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) api.get_database.assert_called_once_with( name=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_update_ddl_grpc_error(self): @@ -885,7 +918,9 @@ def test_update_ddl_grpc_error(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_update_ddl_not_found(self): @@ -911,7 +946,9 @@ def test_update_ddl_not_found(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_update_ddl(self): @@ -938,7 +975,9 @@ def test_update_ddl(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_update_ddl_w_operation_id(self): @@ -965,7 +1004,9 @@ def test_update_ddl_w_operation_id(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_update_success(self): @@ -991,7 +1032,9 @@ def test_update_success(self): api.update_database.assert_called_once_with( database=expected_database, update_mask=field_mask, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_update_ddl_w_proto_descriptors(self): @@ -1019,7 +1062,9 @@ def test_update_ddl_w_proto_descriptors(self): api.update_database_ddl.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_drop_grpc_error(self): @@ -1037,7 +1082,9 @@ def test_drop_grpc_error(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_drop_not_found(self): @@ -1055,7 +1102,9 @@ def test_drop_not_found(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_drop_success(self): @@ -1072,7 +1121,9 @@ def test_drop_success(self): api.drop_database.assert_called_once_with( database=self.DATABASE_NAME, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def _execute_partitioned_dml_helper( @@ -1151,6 +1202,10 @@ def _execute_partitioned_dml_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) if retried: @@ -1192,6 +1247,10 @@ def _execute_partitioned_dml_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) if retried: @@ -1486,7 +1545,9 @@ def test_restore_grpc_error(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_restore_not_found(self): @@ -1512,7 +1573,9 @@ def test_restore_not_found(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_restore_success(self): @@ -1549,7 +1612,9 @@ def test_restore_success(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_restore_success_w_encryption_config_dict(self): @@ -1590,7 +1655,9 @@ def test_restore_success_w_encryption_config_dict(self): api.restore_database.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_restore_w_invalid_encryption_config_dict(self): @@ -1737,7 +1804,9 @@ def test_list_database_roles_grpc_error(self): api.list_database_roles.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) def test_list_database_roles_defaults(self): @@ -1758,7 +1827,9 @@ def test_list_database_roles_defaults(self): api.list_database_roles.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ], ) self.assertIsNotNone(resp) @@ -1845,6 +1916,10 @@ def test_context_mgr_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1892,6 +1967,10 @@ def test_context_mgr_w_commit_stats_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -1936,6 +2015,10 @@ def test_context_mgr_w_commit_stats_error(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -3010,6 +3093,10 @@ def test_context_mgr_success(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -3108,6 +3195,8 @@ def _make_database_admin_api(): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__( self, project=TestDatabase.PROJECT_ID, @@ -3126,6 +3215,12 @@ def __init__( self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.route_to_leader_enabled = route_to_leader_enabled self.directed_read_options = directed_read_options + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -3154,6 +3249,28 @@ def __init__(self, name, instance=None): self.logger = mock.create_autospec(Logger, instance=True) self._directed_read_options = None + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + client_id = self._nth_client_id + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Pool(object): _bound = None diff --git a/tests/unit/test_request_id_header.py b/tests/unit/test_request_id_header.py new file mode 100644 index 0000000000..35c7de7410 --- /dev/null +++ b/tests/unit/test_request_id_header.py @@ -0,0 +1,387 @@ +# Copyright 2024 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +import threading + +from tests.mockserver_tests.mock_server_test_base import ( + MockServerTestBase, + add_select1_result, +) +from google.cloud.spanner_v1.testing.interceptors import XGoogRequestIDHeaderInterceptor +from google.cloud.spanner_v1 import ( + BatchCreateSessionsRequest, + BeginTransactionRequest, + ExecuteSqlRequest, +) +from google.api_core.exceptions import Aborted +from google.rpc import code_pb2 +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID + + +class TestRequestIDHeader(MockServerTestBase): + def tearDown(self): + self.database._x_goog_request_id_interceptor.reset() + + def test_snapshot_read(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(2, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, 1, 1, 1, 1), + ) + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, 1, 1, 2, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_snapshot_read_concurrent(self): + def select1(): + with self.database.snapshot() as snapshot: + rows = snapshot.execute_sql("select 1") + res_list = [] + for row in rows: + self.assertEqual(1, row[0]) + res_list.append(row) + self.assertEqual(1, len(res_list)) + + n = 10 + threads = [] + for i in range(n): + th = threading.Thread(target=select1, name=f"snapshot-select1-{i}") + th.run() + threads.append(th) + + random.shuffle(threads) + + while True: + n_finished = 0 + for thread in threads: + if thread.is_alive(): + thread.join() + else: + n_finished += 1 + + if n_finished == len(threads): + break + + time.sleep(1) + + requests = self.spanner_service.requests + self.assertEqual(n * 2, len(requests), msg=requests) + + client_id = self.database._nth_client_id + channel_id = self.database._channel_id + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 3, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 5, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 7, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 9, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 11, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 13, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 15, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 17, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 19, 1), + ), + ] + assert got_unary_segments == want_unary_segments + + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 4, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 6, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 8, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 10, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 12, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 14, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 16, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 18, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 20, 1), + ), + ] + assert got_stream_segments == want_stream_segments + + def test_database_run_in_transaction_retries_on_abort(self): + counters = dict(aborted=0) + want_failed_attempts = 2 + + def select_in_txn(txn): + results = txn.execute_sql("select 1") + for row in results: + _ = row + + if counters["aborted"] < want_failed_attempts: + counters["aborted"] += 1 + raise Aborted( + "Thrown from ClientInterceptor for testing", + errors=[FauxCall(code_pb2.ABORTED)], + ) + + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + + self.database.run_in_transaction(select_in_txn) + + def test_database_execute_partitioned_dml_request_id(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + _ = self.database.execute_partitioned_dml("select 1") + + requests = self.spanner_service.requests + self.assertEqual(3, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], BeginTransactionRequest)) + self.assertTrue(isinstance(requests[2], ExecuteSqlRequest)) + + # Now ensure monotonicity of the received request-id segments. + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, 1, 1, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/BeginTransaction", + (1, REQ_RAND_PROCESS_ID, 1, 1, 2, 1), + ), + ] + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, 1, 1, 3, 1), + ) + ] + + assert got_unary_segments == want_unary_segments + assert got_stream_segments == want_stream_segments + + def test_snapshot_read(self): + add_select1_result() + if not getattr(self.database, "_interceptors", None): + self.database._interceptors = MockServerTestBase._interceptors + with self.database.snapshot() as snapshot: + results = snapshot.read("select 1") + result_list = [] + for row in results: + result_list.append(row) + self.assertEqual(1, row[0]) + self.assertEqual(1, len(result_list)) + + requests = self.spanner_service.requests + self.assertEqual(2, len(requests), msg=requests) + self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest)) + self.assertTrue(isinstance(requests[1], ExecuteSqlRequest)) + + requests = self.spanner_service.requests + self.assertEqual(n * 2, len(requests), msg=requests) + + client_id = self.database._nth_client_id + channel_id = self.database._channel_id + got_stream_segments, got_unary_segments = self.canonicalize_request_id_headers() + + want_unary_segments = [ + ( + "/google.spanner.v1.Spanner/BatchCreateSessions", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 1, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 3, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 5, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 7, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 9, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 11, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 13, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 15, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 17, 1), + ), + ( + "/google.spanner.v1.Spanner/GetSession", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 19, 1), + ), + ] + assert got_unary_segments == want_unary_segments + + want_stream_segments = [ + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 2, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 4, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 6, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 8, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 10, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 12, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 14, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 16, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 18, 1), + ), + ( + "/google.spanner.v1.Spanner/ExecuteStreamingSql", + (1, REQ_RAND_PROCESS_ID, client_id, channel_id, 20, 1), + ), + ] + assert got_stream_segments == want_stream_segments + + def canonicalize_request_id_headers(self): + src = self.database._x_goog_request_id_interceptor + return src._stream_req_segments, src._unary_req_segments + + +class FauxCall: + def __init__(self, code, details="FauxCall"): + self._code = code + self._details = details + + def initial_metadata(self): + return {} + + def trailing_metadata(self): + return {} + + def code(self): + return self._code + + def details(self): + return self._details diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index a4446a0d1e..37258bcd8c 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -25,6 +25,11 @@ ) from google.cloud.spanner_v1.param_types import INT64 from google.api_core.retry import Retry +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -278,7 +283,7 @@ def test_iteration_w_raw_raising_retryable_internal_error(self): fail_after=True, error=InternalServerError( "Received unexpected EOS on DATA frame from server" - ) + ), ) after = _MockIterator(*LAST) request = mock.Mock(test="test", spec=["test", "resume_token"]) @@ -450,7 +455,7 @@ def test_iteration_w_raw_raising_retryable_internal_error_after_token(self): fail_after=True, error=InternalServerError( "Received unexpected EOS on DATA frame from server" - ) + ), ) after = _MockIterator(*SECOND) request = mock.Mock(test="test", spec=["test", "resume_token"]) @@ -767,7 +772,13 @@ def _read_helper( ) api.streaming_read.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], retry=retry, timeout=timeout, ) @@ -1016,7 +1027,13 @@ def _execute_sql_helper( ) api.execute_streaming_sql.assert_called_once_with( request=expected_request, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], timeout=timeout, retry=retry, ) @@ -1189,6 +1206,10 @@ def _partition_read_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -1363,6 +1384,10 @@ def _partition_query_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -1751,7 +1776,13 @@ def test_begin_ok_exact_staleness(self): api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], ) self.assertSpanAttributes( @@ -1787,7 +1818,13 @@ def test_begin_ok_exact_strong(self): api.begin_transaction.assert_called_once_with( session=session.name, options=expected_txn_options, - metadata=[("google-cloud-resource-prefix", database.name)], + metadata=[ + ("google-cloud-resource-prefix", database.name), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), + ], ) self.assertSpanAttributes( @@ -1798,10 +1835,18 @@ def test_begin_ok_exact_strong(self): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -1816,6 +1861,28 @@ def __init__(self, directed_read_options=None): self._route_to_leader_enabled = True self._directed_read_options = directed_read_options + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + client_id = self._nth_client_id + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): def __init__(self, database=None, name=TestSnapshot.SESSION_NAME): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index ff34a109af..fdf477279b 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -37,9 +37,12 @@ from google.cloud.spanner_v1.keyset import KeySet from google.cloud.spanner_v1._helpers import ( + AtomicCounter, _make_value_pb, _merge_query_options, + _metadata_with_request_id, ) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID import mock @@ -517,6 +520,10 @@ def test_transaction_should_include_begin_with_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -532,6 +539,10 @@ def test_transaction_should_include_begin_with_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], timeout=TIMEOUT, retry=RETRY, @@ -549,6 +560,10 @@ def test_transaction_should_include_begin_with_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -565,6 +580,10 @@ def test_transaction_should_include_begin_with_first_batch_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -590,6 +609,10 @@ def test_transaction_should_include_begin_w_exclude_txn_from_change_streams_with metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -608,6 +631,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -622,6 +649,10 @@ def test_transaction_should_use_transaction_id_if_error_with_first_batch_update( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -638,6 +669,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -651,6 +686,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_query(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -667,6 +706,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -680,6 +723,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_update(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -701,6 +748,10 @@ def test_transaction_execute_sql_w_directed_read_options(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, @@ -724,6 +775,10 @@ def test_transaction_streaming_read_w_directed_read_options(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -740,6 +795,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -751,6 +810,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_read(self): metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -767,6 +830,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -779,6 +846,10 @@ def test_transaction_should_use_transaction_id_returned_by_first_batch_update(se metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -819,6 +890,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -829,6 +904,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -837,6 +916,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -880,6 +963,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -888,6 +975,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -898,6 +989,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -946,6 +1041,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -954,6 +1053,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -964,6 +1067,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1012,6 +1119,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) req = self._execute_sql_expected_request(database) @@ -1020,6 +1131,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1030,6 +1145,10 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=RETRY, timeout=TIMEOUT, @@ -1055,11 +1174,19 @@ def test_transaction_should_execute_sql_with_route_to_leader_disabled(self): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.directed_read_options = None + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -1074,6 +1201,28 @@ def __init__(self): self._route_to_leader_enabled = True self._directed_read_options = None + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + client_id = self._nth_client_id + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): _transaction = None diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index d3d7035854..b0886d664f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -20,6 +20,11 @@ from google.cloud.spanner_v1 import TypeCode from google.api_core.retry import Retry from google.api_core import gapic_v1 +from google.cloud.spanner_v1._helpers import ( + AtomicCounter, + _metadata_with_request_id, +) +from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID from tests._helpers import ( OpenTelemetryBase, @@ -191,6 +196,10 @@ def test_begin_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -295,6 +304,10 @@ def test_rollback_ok(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -420,6 +433,10 @@ def _commit_helper( [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) self.assertEqual(actual_request_options, expected_request_options) @@ -582,6 +599,10 @@ def _execute_update_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], ) @@ -768,6 +789,10 @@ def _batch_update_helper( metadata=[ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1", + ), ], retry=retry, timeout=timeout, @@ -883,6 +908,10 @@ def test_context_mgr_success(self): [ ("google-cloud-resource-prefix", database.name), ("x-goog-spanner-route-to-leader", "true"), + ( + "x-goog-spanner-request-id", + f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1", + ), ], ) @@ -913,11 +942,19 @@ def test_context_mgr_failure(self): class _Client(object): + NTH_CLIENT = AtomicCounter() + def __init__(self): from google.cloud.spanner_v1 import ExecuteSqlRequest self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1") self.directed_read_options = None + self._nth_client_id = _Client.NTH_CLIENT.increment() + self._nth_request = AtomicCounter() + + @property + def _next_nth_request(self): + return self._nth_request.increment() class _Instance(object): @@ -932,6 +969,28 @@ def __init__(self): self._route_to_leader_enabled = True self._directed_read_options = None + @property + def _next_nth_request(self): + return self._instance._client._next_nth_request + + @property + def _nth_client_id(self): + return self._instance._client._nth_client_id + + def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]): + client_id = self._nth_client_id + return _metadata_with_request_id( + self._nth_client_id, + self._channel_id, + nth_request, + nth_attempt, + prior_metadata, + ) + + @property + def _channel_id(self): + return 1 + class _Session(object): _transaction = None