diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 0c08ab49cf..a3d9c874c7 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -113,3 +113,8 @@ def get_current_span(): if not HAS_OPENTELEMETRY_INSTALLED: return None return trace.get_current_span() + + +def add_span_event(span, commentary, event_attributes=None): + if span: + span.add_event(commentary, event_attributes) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 2c5e9968f8..798107c21a 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -67,7 +67,10 @@ SpannerGrpcTransport, ) from google.cloud.spanner_v1.table import Table -from google.cloud.spanner_v1._opentelemetry_tracing import get_current_span +from google.cloud.spanner_v1._opentelemetry_tracing import ( + add_span_event, + get_current_span, +) SPANNER_DATA_SCOPE = "https://www.googleapis.com/auth/spanner.data" @@ -1167,8 +1170,7 @@ def __enter__(self): """Begin ``with`` block.""" current_span = get_current_span() session = self._session = self._database._pool.get() - if current_span: - current_span.add_event("Using session", {"id": session.session_id}) + add_span_event(current_span, "Using session", {"id": session.session_id}) batch = self._batch = Batch(session) if self._request_options.transaction_tag: batch.transaction_tag = self._request_options.transaction_tag @@ -1192,10 +1194,11 @@ def __exit__(self, exc_type, exc_val, exc_tb): ) self._database._pool.put(self._session) current_span = get_current_span() - if current_span: - current_span.add_event( - "Returned session to pool", {"id": self._session.session_id} - ) + add_span_event( + current_span, + "Returned session to pool", + {"id": self._session.session_id}, + ) class MutationGroupsCheckout(object): diff --git a/google/cloud/spanner_v1/pool.py b/google/cloud/spanner_v1/pool.py index 550ad1c2f2..2cd5d39ced 100644 --- a/google/cloud/spanner_v1/pool.py +++ b/google/cloud/spanner_v1/pool.py @@ -26,6 +26,7 @@ _metadata_with_leader_aware_routing, ) from google.cloud.spanner_v1._opentelemetry_tracing import ( + add_span_event, get_current_span, trace_call, ) @@ -214,27 +215,27 @@ def bind(self, database): session_template=Session(creator_role=self.database_role), ) - if requested_session_count > 0 and span: - span.add_event( + if requested_session_count > 0: + add_span_event( + span, f"Requesting {requested_session_count} sessions", span_event_attributes, ) if self._sessions.full(): - if span: - span.add_event( - "Session pool is already full", span_event_attributes - ) + add_span_event( + span, "Session pool is already full", span_event_attributes + ) return returned_session_count = 0 while not self._sessions.full(): request.session_count = requested_session_count - self._sessions.qsize() - if span: - span.add_event( - f"Creating {request.session_count} sessions", - span_event_attributes, - ) + add_span_event( + span, + f"Creating {request.session_count} sessions", + span_event_attributes, + ) resp = api.batch_create_sessions( request=request, metadata=metadata, @@ -245,11 +246,11 @@ def bind(self, database): self._sessions.put(session) returned_session_count += 1 - if span: - span.add_event( - f"Requested for {requested_session_count} sessions, returned {returned_session_count}", - span_event_attributes, - ) + add_span_event( + span, + f"Requested for {requested_session_count} sessions, returned {returned_session_count}", + span_event_attributes, + ) def get(self, timeout=None): """Check a session out from the pool. @@ -268,35 +269,34 @@ def get(self, timeout=None): start_time = time.time() current_span = get_current_span() span_event_attributes = {"kind": type(self).__name__} - if current_span: - current_span.add_event("Acquiring session", span_event_attributes) + add_span_event(current_span, "Acquiring session", span_event_attributes) session = None try: - if current_span: - current_span.add_event( - "Waiting for a session to become available", span_event_attributes - ) + add_span_event( + current_span, + "Waiting for a session to become available", + span_event_attributes, + ) session = self._sessions.get(block=True, timeout=timeout) except queue.Empty as e: - if current_span: - current_span.add_event("No session available", span_event_attributes) + add_span_event(current_span, "No session available", span_event_attributes) raise e span_event_attributes["session.id"] = session._session_id if not session.exists(): - if current_span: - current_span.add_event( - "Session is not valid, recreating it", span_event_attributes - ) + add_span_event( + current_span, + "Session is not valid, recreating it", + span_event_attributes, + ) session = self._database.session() session.create() span_event_attributes["session.id"] = session._session_id span_event_attributes["time.elapsed"] = time.time() - start_time - if current_span: - current_span.add_event("Acquired session", span_event_attributes) + add_span_event(current_span, "Acquired session", span_event_attributes) return session def put(self, session): @@ -371,28 +371,30 @@ def get(self): """ current_span = get_current_span() span_event_attributes = {"kind": type(self).__name__} - if current_span: - current_span.add_event("Acquiring session", span_event_attributes) + add_span_event(current_span, "Acquiring session", span_event_attributes) try: - if current_span: - current_span.add_event( - "Waiting for a session to become available", span_event_attributes - ) + add_span_event( + current_span, + "Waiting for a session to become available", + span_event_attributes, + ) session = self._sessions.get_nowait() except queue.Empty: - if current_span: - current_span.add_event( - "No session available. Creating session", span_event_attributes - ) + add_span_event( + current_span, + "No session available. Creating session", + span_event_attributes, + ) session = self._new_session() session.create() else: if not session.exists(): - if current_span: - current_span.add_event( - "Session is not valid, recreating it", span_event_attributes - ) + add_span_event( + current_span, + "Session is not valid, recreating it", + span_event_attributes, + ) session = self._new_session() session.create() return session @@ -504,23 +506,25 @@ def bind(self, database): requested_session_count = request.session_count current_span = get_current_span() - if current_span: - current_span.add_event( - f"Requesting {requested_session_count} sessions", span_event_attributes - ) + add_span_event( + current_span, + f"Requesting {requested_session_count} sessions", + span_event_attributes, + ) if created_session_count >= self.size: - if current_span: - current_span.add_event( - "Created no new sessions as sessionPool is full", - span_event_attributes, - ) + add_span_event( + current_span, + "Created no new sessions as sessionPool is full", + span_event_attributes, + ) return - if current_span: - current_span.add_event( - f"Creating {request.session_count} sessions", span_event_attributes - ) + add_span_event( + current_span, + f"Creating {request.session_count} sessions", + span_event_attributes, + ) returned_session_count = 0 while created_session_count < self.size: @@ -536,11 +540,11 @@ def bind(self, database): created_session_count += len(resp.session) - if current_span: - current_span.add_event( - f"Requested for {requested_session_count} sessions, return {returned_session_count}", - span_event_attributes, - ) + add_span_event( + current_span, + f"Requested for {requested_session_count} sessions, return {returned_session_count}", + span_event_attributes, + ) def get(self, timeout=None): """Check a session out from the pool. @@ -559,18 +563,18 @@ def get(self, timeout=None): start_time = time.time() span_event_attributes = {"kind": type(self).__name__} current_span = get_current_span() - if current_span: - current_span.add_event( - "Waiting for a session to become available", span_event_attributes - ) + add_span_event( + current_span, + "Waiting for a session to become available", + span_event_attributes, + ) ping_after = None session = None try: ping_after, session = self._sessions.get(block=True, timeout=timeout) except queue.Empty as e: - if current_span: - current_span.add_event("No session available", span_event_attributes) + add_span_event(current_span, "No session available", span_event_attributes) raise e if _NOW() > ping_after: @@ -581,15 +585,14 @@ def get(self, timeout=None): session = self._new_session() session.create() - if current_span: - span_event_attributes.update( - { - "time.elapsed": time.time() - start_time, - "session.id": session._session_id, - "kind": "pinging_pool", - } - ) - current_span.add_event("Acquired session", span_event_attributes) + span_event_attributes.update( + { + "time.elapsed": time.time() - start_time, + "session.id": session._session_id, + "kind": "pinging_pool", + } + ) + add_span_event(current_span, "Acquired session", span_event_attributes) return session def put(self, session): diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index cc41f47281..45460df5c8 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -31,6 +31,7 @@ _metadata_with_leader_aware_routing, ) from google.cloud.spanner_v1._opentelemetry_tracing import ( + add_span_event, get_current_span, trace_call, ) @@ -128,8 +129,7 @@ def create(self): :raises ValueError: if :attr:`session_id` is already set. """ current_span = get_current_span() - if current_span: - current_span.add_event("Creating Session") + add_span_event(current_span, "Creating Session") if self._session_id is not None: raise ValueError("Session ID already set by back-end") @@ -173,13 +173,14 @@ def exists(self): """ current_span = get_current_span() if self._session_id is None: - current_span.add_event("Checking if Session failed due to unset session_id") + add_span_event( + current_span, "Checking if Session failed due to unset session_id" + ) return False - if current_span: - current_span.add_event( - "Checking if Session exists", {"session.id": self._session_id} - ) + add_span_event( + current_span, "Checking if Session exists", {"session.id": self._session_id} + ) api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) @@ -216,14 +217,14 @@ def delete(self): """ current_span = get_current_span() if self._session_id is None: - if current_span: - current_span.add_event( - "Deleting Session failed due to unset session_id" - ) + add_span_event( + current_span, "Deleting Session failed due to unset session_id" + ) raise ValueError("Session ID not set by back-end") - if current_span: - current_span.add_event("Deleting Session", {"session.id": self._session_id}) + add_span_event( + current_span, "Deleting Session", {"session.id": self._session_id} + ) api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index c9a8a16a40..a02776b27c 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -538,7 +538,7 @@ def _get_streamed_result_set( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.execute_sql", + "CloudSpanner.ReadWriteTransaction", self._session, trace_attributes, transaction=self, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index bc58563283..6bc7f254fb 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -32,7 +32,7 @@ from google.cloud.spanner_v1 import TransactionOptions from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import add_span_event, trace_call from google.cloud.spanner_v1 import RequestOptions from google.api_core import gapic_v1 from google.api_core.exceptions import InternalServerError @@ -169,11 +169,11 @@ def begin(self): ) def beforeNextRetry(nthRetry, delayInSeconds): - if span: - span.add_event( - "Transaction Begin Attempt Failed. Retrying", - {"attempt": nthRetry, "sleep_seconds": delayInSeconds}, - ) + add_span_event( + span, + "Transaction Begin Attempt Failed. Retrying", + {"attempt": nthRetry, "sleep_seconds": delayInSeconds}, + ) response = _retry( method, @@ -285,8 +285,7 @@ def commit( if self._transaction_id is None and len(self._mutations) > 0: self.begin() - if span: - span.add_event("Starting Commit") + add_span_event(span, "Starting Commit") method = functools.partial( api.commit, @@ -295,11 +294,11 @@ def commit( ) def beforeNextRetry(nthRetry, delayInSeconds): - if span: - span.add_event( - "Transaction Commit Attempt Failed. Retrying", - {"attempt": nthRetry, "sleep_seconds": delayInSeconds}, - ) + add_span_event( + span, + "Transaction Commit Attempt Failed. Retrying", + {"attempt": nthRetry, "sleep_seconds": delayInSeconds}, + ) response = _retry( method, @@ -307,8 +306,7 @@ def beforeNextRetry(nthRetry, delayInSeconds): beforeNextRetry=beforeNextRetry, ) - if span: - span.add_event("Commit Done") + add_span_event(span, "Commit Done") self.committed = response.commit_timestamp if return_commit_stats: