Skip to content

Commit

Permalink
feat(observability): more descriptive and value adding spans
Browse files Browse the repository at this point in the history
This change adds more descriptive and value adding spans to
replace the generic CloudSpanner.ReadWriteTransaction.
With this change, we add new spans:
* CloudSpanner.Database.run_in_transaction
* CloudSpanner.execute_pdml
* CloudSpanner.execute_sql
* CloudSpanner.execute_update
  • Loading branch information
odeke-em committed Dec 17, 2024
1 parent ad69c48 commit f28ed47
Show file tree
Hide file tree
Showing 14 changed files with 757 additions and 189 deletions.
97 changes: 87 additions & 10 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ def get_tracer(tracer_provider=None):
return tracer_provider.get_tracer(TRACER_NAME, TRACER_VERSION)


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
if session:
session._last_use_time = datetime.now()

if not (HAS_OPENTELEMETRY_INSTALLED and name):
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return
def _make_tracer_and_span_attributes(
session=None, extra_attributes=None, observability_options=None
):
if not HAS_OPENTELEMETRY_INSTALLED:
return None, None

tracer_provider = None

Expand Down Expand Up @@ -103,9 +99,77 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=

if not enable_extended_tracing:
attributes.pop("db.statement", False)
attributes.pop("sql", False)
else:
# Otherwise there are places where the annotated sql was inserted
# directly from the arguments as "sql", and transform those into "db.statement".
db_statement = attributes.get("db.statement", None)
if not db_statement:
sql = attributes.get("sql", None)
if sql:
attributes = attributes.copy()
attributes.pop("sql", False)
attributes["db.statement"] = sql

return tracer, attributes


def trace_call_end_lazily(
name, session=None, extra_attributes=None, observability_options=None
):
"""
trace_call_end_lazily is used in situations where you don't want a context managed
span in a with statement to end as soon as a block exits. This is useful for example
after a Database.batch or Database.snapshot but without a context manager.
If you need to directly invoke tracing with a context manager, please invoke
`trace_call` with which you can invoke
 `with trace_call(...) as span:`
It is the caller's responsibility to explicitly invoke the returned ending function.
"""
if not name:
return None

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
return None

span = tracer.start_span(
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
)
ctx_manager = trace.use_span(span, end_on_exit=True, record_exception=True)
ctx_manager.__enter__()

def discard(exc_type=None, exc_value=None, exc_traceback=None):
if not exc_type:
span.set_status(Status(StatusCode.OK))

ctx_manager.__exit__(exc_type, exc_value, exc_traceback)

return discard


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
"""
 trace_call is used in situations where you need to end a span with a context manager
 or after a scope is exited. If you need to keep a span alive and lazily end it, please
 invoke `trace_call_end_lazily`.
"""
if not name:
yield None
return

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
yield None
return

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
) as span:
try:
yield span
Expand Down Expand Up @@ -135,3 +199,16 @@ def get_current_span():
def add_span_event(span, event_name, event_attributes=None):
if span:
span.add_event(event_name, event_attributes)


def add_event_on_current_span(event_name, event_attributes=None, span=None):
if not span:
span = get_current_span()

add_span_event(span, event_name, event_attributes)


def record_span_exception_and_status(span, exc):
if span:
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)
42 changes: 40 additions & 2 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_event_on_current_span,
trace_call,
trace_call_end_lazily,
)
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import _retry
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
Expand All @@ -46,6 +50,12 @@ class _BatchBase(_SessionWrapper):
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
self.__base_discard_span = trace_call_end_lazily(
f"CloudSpanner.{type(self).__name__}",
self._session,
None,
getattr(self._session._database, "observability_options", None),
)

def _check_state(self):
"""Helper for :meth:`commit` et al.
Expand All @@ -69,6 +79,10 @@ def insert(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
add_event_on_current_span(
"insert mutations added",
dict(table=table, columns=columns),
)
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))
# TODO: Decide if we should add a span event per mutation:
# https://github.com/googleapis/python-spanner/issues/1269
Expand Down Expand Up @@ -137,6 +151,17 @@ def delete(self, table, keyset):
# TODO: Decide if we should add a span event per mutation:
# https://github.com/googleapis/python-spanner/issues/1269

def _discard_on_end(self, exc_type=None, exc_val=None, exc_traceback=None):
if self.__base_discard_span:
self.__base_discard_span(exc_type, exc_val, exc_traceback)
self.__base_discard_span = None

def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
self._discard_on_end(exc_type, exc_val, exc_traceback)

def __enter__(self):
return self


class Batch(_BatchBase):
"""Accumulate mutations for transmission during :meth:`commit`."""
Expand Down Expand Up @@ -233,18 +258,31 @@ def commit(
)
self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats
self._discard_on_end()
return self.committed

def __enter__(self):
"""Begin ``with`` block."""
self._check_state()
observability_options = getattr(
self._session._database, "observability_options", None
)
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.Batch",
self._session,
observability_options=observability_options,
)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if exc_type is None:
self.commit()
if self.__discard_span:
self.__discard_span(exc_type, exc_val, exc_tb)
self.__discard_span = None
self._discard_on_end()


class MutationGroup(_BatchBase):
Expand Down Expand Up @@ -336,7 +374,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.BatchWrite",
"CloudSpanner.batch_write",
self._session,
trace_attributes,
observability_options=observability_options,
Expand Down
Loading

0 comments on commit f28ed47

Please sign in to comment.