Skip to content

Commit

Permalink
update to auto grpc instrumentor
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams committed Jul 27, 2020
1 parent 9679190 commit 17633bb
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
32 changes: 26 additions & 6 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def serve():
from contextlib import contextmanager

import grpc
from functools import partial
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry import trace
Expand Down Expand Up @@ -139,11 +140,21 @@ def wrapper_fn(self, original_func, instance, args, kwargs):

class GrpcInstrumentorClient(BaseInstrumentor):
def _instrument(self, **kwargs):
exporter = kwargs.get("exporter", None)
interval = kwargs.get("interval", 30)
if kwargs.get("channel_type") == "secure":
_wrap("grpc", "secure_channel", self.wrapper_fn)
_wrap(
"grpc",
"secure_channel",
partial(self.wrapper_fn, exporter, interval),
)

else:
_wrap("grpc", "insecure_channel", self.wrapper_fn)
_wrap(
"grpc",
"insecure_channel",
partial(self.wrapper_fn, exporter, interval),
)

def _uninstrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
Expand All @@ -152,10 +163,19 @@ def _uninstrument(self, **kwargs):
else:
unwrap(grpc, "insecure_channel")

@contextmanager
def wrapper_fn(self, original_func, instance, args, kwargs):
with original_func(*args, **kwargs) as channel:
yield intercept_channel(channel, client_interceptor())
def wrapper_fn(
self, exporter, interval, original_func, instance, args, kwargs
):
channel = original_func(*args, **kwargs)
tracer_provider = kwargs.get("tracer_provider")
return intercept_channel(
channel,
client_interceptor(
tracer_provider=tracer_provider,
exporter=exporter,
interval=interval,
),
)


def client_interceptor(tracer_provider=None, exporter=None, interval=30):
Expand Down
19 changes: 10 additions & 9 deletions ext/opentelemetry-ext-grpc/tests/test_client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import opentelemetry.ext.grpc
from opentelemetry import trace
from opentelemetry.ext.grpc import client_interceptor
from opentelemetry.ext.grpc import GrpcInstrumentorClient
from opentelemetry.ext.grpc.grpcext import intercept_channel
from opentelemetry.sdk.metrics.export.aggregate import (
MinMaxSumCountAggregator,
Expand All @@ -37,23 +37,23 @@
class TestClientProto(TestBase):
def setUp(self):
super().setUp()
self.server = create_test_server(25565)
self.server.start()
self.interceptor = client_interceptor(
GrpcInstrumentorClient().instrument(
exporter=self.memory_metrics_exporter
)
self.channel = intercept_channel(
grpc.insecure_channel("localhost:25565"), self.interceptor
)
self.server = create_test_server(25565)
self.server.start()
self.channel = grpc.insecure_channel("localhost:25565")
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)

def tearDown(self):
super().tearDown()
GrpcInstrumentorClient().uninstrument()
self.memory_metrics_exporter.clear()
self.server.stop(None)

def _verify_success_records(self, num_bytes_out, num_bytes_in, method):
self.interceptor.controller.tick()
# pylint: disable=protected-access
self.channel._interceptor.controller.tick()
records = self.memory_metrics_exporter.get_exported_metrics()
self.assertEqual(len(records), 3)

Expand Down Expand Up @@ -163,7 +163,8 @@ def test_stream_stream(self):
)

def _verify_error_records(self, method):
self.interceptor.controller.tick()
# pylint: disable=protected-access
self.channel._interceptor.controller.tick()
records = self.memory_metrics_exporter.get_exported_metrics()
self.assertEqual(len(records), 3)

Expand Down

0 comments on commit 17633bb

Please sign in to comment.