Skip to content

Commit

Permalink
grpc client metrics + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams committed Jul 6, 2020
1 parent 112bade commit d3591b1
Show file tree
Hide file tree
Showing 11 changed files with 1,068 additions and 40 deletions.
1 change: 1 addition & 0 deletions ext/opentelemetry-ext-grpc/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ install_requires =
test =
opentelemetry-test == 0.11.dev0
opentelemetry-sdk == 0.11.dev0
protobuf == 3.12.2

[options.packages.find]
where = src
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from opentelemetry.ext.grpc.version import __version__


def client_interceptor(tracer_provider=None):
def client_interceptor(tracer_provider=None, meter=None):
"""Create a gRPC client channel interceptor.
Args:
Expand All @@ -34,7 +34,7 @@ def client_interceptor(tracer_provider=None):

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _client.OpenTelemetryClientInterceptor(tracer)
return _client.OpenTelemetryClientInterceptor(tracer, meter)


def server_interceptor(tracer_provider=None):
Expand Down
160 changes: 123 additions & 37 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@
import grpc

from opentelemetry import propagators, trace
from opentelemetry.trace.status import Status, StatusCanonicalCode

from . import grpcext
from ._utilities import RpcInfo
from ._utilities import RpcInfo, TimedMetricRecorder


class _GuardedSpan:
def __init__(self, span):
self.span = span
self.generated_span = None
self._engaged = True

def __enter__(self):
self.span.__enter__()
self.generated_span = self.span.__enter__()
return self

def __exit__(self, *args, **kwargs):
if self._engaged:
self.generated_span = None
return self.span.__exit__(*args, **kwargs)
return False

Expand Down Expand Up @@ -76,8 +79,10 @@ def callback(response_future):
class OpenTelemetryClientInterceptor(
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
):
def __init__(self, tracer):
def __init__(self, tracer, meter):
self._tracer = tracer
self._meter = meter
self._metrics_recorder = TimedMetricRecorder(self._meter, "client")

def _start_span(self, method):
return self._tracer.start_as_current_span(
Expand Down Expand Up @@ -106,24 +111,56 @@ def _trace_result(self, guarded_span, rpc_info, result):
def _start_guarded_span(self, *args, **kwargs):
return _GuardedSpan(self._start_span(*args, **kwargs))

def _bytes_out_iterator_wrapper(self, iterator, client_info):
for request in iterator:
if "ByteSize" in dir(request):
self._metrics_recorder.record_bytes_out(
request.ByteSize(), client_info.full_method
)
yield request

def intercept_unary(self, request, metadata, client_info, invoker):
if not metadata:
mutable_metadata = OrderedDict()
else:
mutable_metadata = OrderedDict(metadata)

with self._start_guarded_span(client_info.full_method) as guarded_span:
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())

rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request,
)
result = invoker(request, metadata)
return self._trace_result(guarded_span, rpc_info, result)
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())

# If protobuf is used, we can record the bytes in/out. Otherwise, we have no way
# to get the size of the request/response properly, so don't record anything
if "ByteSize" in dir(request):
self._metrics_recorder.record_bytes_out(
request.ByteSize(), client_info.full_method
)

rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request,
)

try:
result = invoker(request, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

ret = self._trace_result(guarded_span, rpc_info, result)

if "ByteSize" in dir(rpc_info.response):
self._metrics_recorder.record_bytes_in(
rpc_info.response.ByteSize(), client_info.full_method
)
return ret

# For RPCs that stream responses, the result can be a generator. To record
# the span across the generated responses and detect any errors, we wrap
Expand All @@ -136,19 +173,45 @@ def _intercept_server_stream(
else:
mutable_metadata = OrderedDict(metadata)

with self._start_span(client_info.full_method):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
)
if client_info.is_client_stream:
rpc_info.request = request_or_iterator
result = invoker(request_or_iterator, metadata)
for response in result:
yield response
with self._start_span(client_info.full_method) as span:
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
)

if client_info.is_client_stream:
rpc_info.request = request_or_iterator
request_or_iterator = self._bytes_out_iterator_wrapper(
request_or_iterator, client_info
)
else:
if "ByteSize" in dir(request_or_iterator):
self._metrics_recorder.record_bytes_out(
request_or_iterator.ByteSize(),
client_info.full_method,
)

try:
result = invoker(request_or_iterator, metadata)

# Rewrap the result stream into a generator, and record the bytes received
for response in result:
if "ByteSize" in dir(response):
self._metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)
yield response
except grpc.RpcError as exc:
span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
Expand All @@ -164,13 +227,36 @@ def intercept_stream(
mutable_metadata = OrderedDict(metadata)

with self._start_guarded_span(client_info.full_method) as guarded_span:
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request_or_iterator,
)
result = invoker(request_or_iterator, metadata)
return self._trace_result(guarded_span, rpc_info, result)
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request_or_iterator,
)

rpc_info.request = request_or_iterator

request_or_iterator = self._bytes_out_iterator_wrapper(
request_or_iterator, client_info
)

try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

ret = self._trace_result(guarded_span, rpc_info, result)

self._metrics_recorder.record_bytes_in(
rpc_info.response.ByteSize(), client_info.full_method
)

return ret
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

"""Internal utilities."""

from contextlib import contextmanager
from time import time

import grpc

from opentelemetry.sdk.metrics import Counter, ValueRecorder


class RpcInfo:
def __init__(
Expand All @@ -31,3 +38,75 @@ def __init__(
self.request = request
self.response = response
self.error = error


class TimedMetricRecorder:
def __init__(self, meter, span_kind):
self._meter = meter
service_name = "grpcio"
self._span_kind = span_kind
base_attributes = ["method"]

if self._meter:
self._duration = self._meter.create_metric(
name="{}/{}/duration".format(service_name, span_kind),
description="Duration of grpc requests to the server",
unit="ms",
value_type=float,
metric_type=ValueRecorder,
label_keys=base_attributes + ["error", "status_code"],
)
self._error_count = self._meter.create_metric(
name="{}/{}/errors".format(service_name, span_kind),
description="Number of errors that were returned from the server",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=base_attributes + ["status_code"],
)
self._bytes_in = self._meter.create_metric(
name="{}/{}/bytes_in".format(service_name, span_kind),
description="Number of bytes received from the server",
unit="by",
value_type=int,
metric_type=Counter,
label_keys=base_attributes,
)
self._bytes_out = self._meter.create_metric(
name="{}/{}/bytes_out".format(service_name, span_kind),
description="Number of bytes sent out through gRPC",
unit="by",
value_type=int,
metric_type=Counter,
label_keys=base_attributes,
)

def record_bytes_in(self, bytes_in, method):
if self._meter:
labels = {"method": method}
self._bytes_in.add(bytes_in, labels)

def record_bytes_out(self, bytes_out, method):
if self._meter:
labels = {"method": method}
self._bytes_out.add(bytes_out, labels)

@contextmanager
def record_latency(self, method):
start_time = time()
labels = {"method": method, "status_code": grpc.StatusCode.OK}
try:
yield labels
except grpc.RpcError as exc:
if self._meter:
# pylint: disable=no-member
labels["status_code"] = exc.code()
self._error_count.add(1, labels)
labels["error"] = True
raise
finally:
if self._meter:
if "error" not in labels:
labels["error"] = False
elapsed_time = (time() - start_time) * 1000
self._duration.record(elapsed_time, labels)
57 changes: 57 additions & 0 deletions ext/opentelemetry-ext-grpc/tests/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .protobuf.test_server_pb2 import Request

CLIENT_ID = 1


def simple_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
stub.SimpleMethod(request)


def client_streaming_method(stub, error=False):
# create a generator
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request

stub.ClientStreamingMethod(request_messages())


def server_streaming_method(stub, error=False):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
response_iterator = stub.ServerStreamingMethod(request)
list(response_iterator)


def bidirectional_streaming_method(stub, error=False):
def request_messages():
for _ in range(5):
request = Request(
client_id=CLIENT_ID, request_data="error" if error else "data"
)
yield request

response_iterator = stub.BidirectionalStreamingMethod(request_messages())

list(response_iterator)
Loading

0 comments on commit d3591b1

Please sign in to comment.