Skip to content

Commit

Permalink
feat(observability): more descriptive and value adding spans
Browse files Browse the repository at this point in the history
This change adds more descriptive and value adding spans to
replace the generic CloudSpanner.ReadWriteTransaction.
With this change, we add new spans:
* CloudSpanner.Database.run_in_transaction
* CloudSpanner.execute_pdml
* CloudSpanner.execute_sql
* CloudSpanner.execute_update

Reduce edit changes

Propagate db_name when session is None

Update system tests + test_session

Database.execute_pdml: add events for before and after BeginTransaction

Database.run_in_transaction: add span_events for using Transaction and aborted

fix tests

Trace (FixedSizePool, PingingPool).BatchCreateSessions in bind

Add another form of tracing with explicit .close() invocation

General touch-ups for every better fine-grained spans and events

Use add_event_on_current_span helper more

Make updates and fix up tests

Wring up passthrough context manager

Address updates from running transaction aborting

All add much deeper tests to check for span statuses plus retries+abort

Also while here, fixed googleapis#1246

More upates for test span results

Updates from self review
  • Loading branch information
odeke-em committed Dec 4, 2024
1 parent f8bc285 commit 41ddb55
Show file tree
Hide file tree
Showing 14 changed files with 922 additions and 251 deletions.
121 changes: 111 additions & 10 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ def get_tracer(tracer_provider=None):
return tracer_provider.get_tracer(TRACER_NAME, TRACER_VERSION)


@contextmanager
def trace_call(name, session, extra_attributes=None, observability_options=None):
if not HAS_OPENTELEMETRY_INSTALLED or not session:
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return
def _make_tracer_and_span_attributes(
session=None, extra_attributes=None, observability_options=None
):
if not HAS_OPENTELEMETRY_INSTALLED:
return None, None

tracer_provider = None

Expand All @@ -68,20 +67,24 @@ def trace_call(name, session, extra_attributes=None, observability_options=None)
# on by default.
enable_extended_tracing = True

db_name = ""
if session and getattr(session, "_database", None):
db_name = session._database.name

if isinstance(observability_options, dict): # Avoid false positives with mock.Mock
tracer_provider = observability_options.get("tracer_provider", None)
enable_extended_tracing = observability_options.get(
"enable_extended_tracing", enable_extended_tracing
)
db_name = observability_options.get("db_name", db_name)

tracer = get_tracer(tracer_provider)

# Set base attributes that we know for every trace created
db = session._database
attributes = {
"db.type": "spanner",
"db.url": SpannerClient.DEFAULT_ENDPOINT,
"db.instance": "" if not db else db.name,
"db.instance": db_name,
"net.host.name": SpannerClient.DEFAULT_ENDPOINT,
OTEL_SCOPE_NAME: TRACER_NAME,
OTEL_SCOPE_VERSION: TRACER_VERSION,
Expand All @@ -95,9 +98,78 @@ def trace_call(name, session, extra_attributes=None, observability_options=None)

if not enable_extended_tracing:
attributes.pop("db.statement", False)
attributes.pop("sql", False)
else:
# Otherwise there are places where the annotated sql was inserted
# directly from the arguments as "sql", and transform those into "db.statement".
db_statement = attributes.get("db.statement", None)
if not db_statement:
sql = attributes.get("sql", None)
if sql:
attributes = attributes.copy()
attributes.pop("sql", False)
attributes["db.statement"] = sql

return tracer, attributes


def trace_call_end_lazily(
name, session=None, extra_attributes=None, observability_options=None
):
"""
trace_call_end_lazily is used in situations where you don't want a context managed
span in a with statement to end as soon as a block exits. This is useful for example
after a Database.batch or Database.snapshot but without a context manager.
If you need to directly invoke tracing with a context manager, please invoke
`trace_call` with which you can invoke
 `with trace_call(...) as span:`
It is the caller's responsibility to explicitly invoke the returned ending function.
"""

if not name:
return None

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
return None

span = tracer.start_span(
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
)
ctx_manager = trace.use_span(span, end_on_exit=True, record_exception=True)
ctx_manager.__enter__()

def discard(exc_type=None, exc_value=None, exc_traceback=None):
if not exc_type:
span.set_status(Status(StatusCode.OK))

ctx_manager.__exit__(exc_type, exc_value, exc_traceback)

return discard


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
"""
 trace_call is used in situations where you need to end a span with a context manager
 or after a scope is exited. If you need to keep a span alive and lazily end it, please
 invoke `trace_call_end_lazily`.
"""
if not name:
yield None
return

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
yield None
return

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
) as span:
try:
yield span
Expand All @@ -109,7 +181,22 @@ def trace_call(name, session, extra_attributes=None, observability_options=None)
# invoke .record_exception on our own else we shall have 2 exceptions.
raise
else:
span.set_status(Status(StatusCode.OK))
if span._status.status_code == StatusCode.UNSET:
# OpenTelemetry-Python only allows a status change
# if the current code is UNSET or ERROR. At the end
# of the generator's consumption, only set it to OK
# it wasn't previously set otherwise
span.set_status(Status(StatusCode.OK))


def set_span_status_error(span, error):
if span:
span.set_status(Status(StatusCode.ERROR, str(error)))


def set_span_status_ok(span):
if span:
span.set_status(Status(StatusCode.OK))


def get_current_span():
Expand All @@ -121,3 +208,17 @@ def get_current_span():
def add_span_event(span, event_name, event_attributes=None):
if span:
span.add_event(event_name, event_attributes)


def add_event_on_current_span(event_name, attributes=None, span=None):
if not span:
span = get_current_span()

if span:
span.add_event(event_name, attributes)


def record_span_exception_and_status(span, exc):
if span:
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)
60 changes: 57 additions & 3 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_event_on_current_span,
trace_call,
trace_call_end_lazily,
)
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import _retry
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
Expand All @@ -46,6 +50,12 @@ class _BatchBase(_SessionWrapper):
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
self.__base_discard_span = trace_call_end_lazily(
f"CloudSpanner.{type(self).__name__}",
self._session,
None,
getattr(self._session._database, "observability_options", None),
)

def _check_state(self):
"""Helper for :meth:`commit` et al.
Expand All @@ -69,6 +79,10 @@ def insert(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
add_event_on_current_span(
"insert mutations added",
dict(table=table, columns=columns),
)
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))

def update(self, table, columns, values):
Expand All @@ -84,6 +98,10 @@ def update(self, table, columns, values):
:param values: Values to be modified.
"""
self._mutations.append(Mutation(update=_make_write_pb(table, columns, values)))
add_event_on_current_span(
"update mutations added",
dict(table=table, columns=columns),
)

def insert_or_update(self, table, columns, values):
"""Insert/update one or more table rows.
Expand All @@ -100,6 +118,10 @@ def insert_or_update(self, table, columns, values):
self._mutations.append(
Mutation(insert_or_update=_make_write_pb(table, columns, values))
)
add_event_on_current_span(
"insert_or_update mutations added",
dict(table=table, columns=columns),
)

def replace(self, table, columns, values):
"""Replace one or more table rows.
Expand All @@ -114,6 +136,10 @@ def replace(self, table, columns, values):
:param values: Values to be modified.
"""
self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))
add_event_on_current_span(
"replace mutations added",
dict(table=table, columns=columns),
)

def delete(self, table, keyset):
"""Delete one or more table rows.
Expand All @@ -126,6 +152,21 @@ def delete(self, table, keyset):
"""
delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
self._mutations.append(Mutation(delete=delete))
add_event_on_current_span(
"delete mutations added",
dict(table=table),
)

def _discard_on_end(self, exc_type=None, exc_val=None, exc_traceback=None):
if self.__base_discard_span:
self.__base_discard_span(exc_type, exc_val, exc_traceback)
self.__base_discard_span = None

def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
self._discard_on_end(exc_type, exc_val, exc_traceback)

def __enter__(self):
return self


class Batch(_BatchBase):
Expand Down Expand Up @@ -207,7 +248,7 @@ def commit(
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.Commit",
"CloudSpanner.Batch.commit",
self._session,
trace_attributes,
observability_options=observability_options,
Expand All @@ -223,18 +264,31 @@ def commit(
)
self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats
self._discard_on_end()
return self.committed

def __enter__(self):
"""Begin ``with`` block."""
self._check_state()
observability_options = getattr(
self._session._database, "observability_options", None
)
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.Batch",
self._session,
observability_options=observability_options,
)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if exc_type is None:
self.commit()
if self.__discard_span:
self.__discard_span(exc_type, exc_val, exc_tb)
self.__discard_span = None
self._discard_on_end()


class MutationGroup(_BatchBase):
Expand Down Expand Up @@ -326,7 +380,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.BatchWrite",
"CloudSpanner.batch_write",
self._session,
trace_attributes,
observability_options=observability_options,
Expand Down
Loading

0 comments on commit 41ddb55

Please sign in to comment.