Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenCensus kind mapping and EH update #7624

Merged
merged 15 commits into from
Jan 2, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ def kind(self, value):
"""Set the span kind of this span."""
kind = (
OpenCensusSpanKind.CLIENT if value == SpanKind.CLIENT else
OpenCensusSpanKind.CLIENT if value == SpanKind.PRODUCER else # No producer in opencensus
OpenCensusSpanKind.SERVER if value == SpanKind.SERVER else
OpenCensusSpanKind.CLIENT if value == SpanKind.CONSUMER else # No consumer in opencensus
OpenCensusSpanKind.UNSPECIFIED if value == SpanKind.INTERNAL else # No internal in opencensus
OpenCensusSpanKind.UNSPECIFIED if value == SpanKind.UNSPECIFIED else
None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,17 @@ def test_span_kind(self):
assert wrapped_class.span_instance.span_kind == OpenCensusSpanKind.CLIENT
assert wrapped_class.kind == SpanKind.CLIENT

# not supported
with pytest.raises(ValueError):
wrapped_class.kind = SpanKind.PRODUCER
# opencensus doesn't support producer, put client instead
wrapped_class.kind = SpanKind.PRODUCER
assert wrapped_class.span_instance.span_kind == OpenCensusSpanKind.CLIENT
assert wrapped_class.kind == SpanKind.CLIENT

with pytest.raises(ValueError):
wrapped_class.kind = SpanKind.CONSUMER
# opencensus doesn't support consumer, put client instead
wrapped_class.kind = SpanKind.CONSUMER
assert wrapped_class.span_instance.span_kind == OpenCensusSpanKind.CLIENT
assert wrapped_class.kind == SpanKind.CLIENT

with pytest.raises(ValueError):
wrapped_class.kind = SpanKind.INTERNAL
# opencensus doesn't support consumer, put client instead
wrapped_class.kind = SpanKind.INTERNAL
assert wrapped_class.span_instance.span_kind == OpenCensusSpanKind.UNSPECIFIED
assert wrapped_class.kind == SpanKind.UNSPECIFIED
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _context(self, event):
else:
child = span_impl_type(name="Azure.EventHubs.process")
self._eventhub_client._add_span_request_attributes(child) # type: ignore # pylint: disable=protected-access
child.kind = SpanKind.SERVER
child.kind = SpanKind.CONSUMER

trace_link_message(event, child)
with child:
Expand Down
36 changes: 19 additions & 17 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
from .exceptions import _error_handler, OperationTimeoutError
from ._common import EventData, EventDataBatch
from ._client_base import ConsumerProducerMixin
from ._utils import create_properties, set_message_partition_key, trace_message
from ._utils import (
create_properties,
set_message_partition_key,
trace_message,
send_context_manager,
add_link_to_send,
)
from ._constants import TIMEOUT_SYMBOL

_LOGGER = logging.getLogger(__name__)
Expand All @@ -49,6 +55,7 @@ def _set_trace_message(event_datas, parent_span=None):
# type: (Iterable[EventData], Optional[AbstractSpan]) -> Iterable[EventData]
for ed in iter(event_datas):
trace_message(ed, parent_span)
add_link_to_send(ed, parent_span)
yield ed


Expand Down Expand Up @@ -190,6 +197,7 @@ def _wrap_eventdata(
set_message_partition_key(event_data.message, partition_key)
wrapper_event_data = event_data
trace_message(wrapper_event_data, span)
add_link_to_send(wrapper_event_data, span)
else:
if isinstance(
event_data, EventDataBatch
Expand All @@ -200,11 +208,13 @@ def _wrap_eventdata(
raise ValueError(
"The partition_key does not match the one of the EventDataBatch"
)
for message in event_data.message._body_gen: # pylint: disable=protected-access
add_link_to_send(message, span)
wrapper_event_data = event_data # type:ignore
else:
if partition_key:
event_data = _set_partition_key(event_data, partition_key)
event_data = _set_trace_message(event_data)
event_data = _set_trace_message(event_data, span)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # type: ignore # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
return wrapper_event_data
Expand Down Expand Up @@ -241,24 +251,16 @@ def send(
"""
# Tracing code
with self._lock:
span_impl_type = (
settings.tracing_implementation()
) # type: Type[AbstractSpan]
child = None
if span_impl_type is not None:
child = span_impl_type(name="Azure.EventHubs.send")
child.kind = SpanKind.CLIENT # Should be PRODUCER
self._check_closed()
wrapper_event_data = self._wrap_eventdata(event_data, child, partition_key)
self._unsent_events = [wrapper_event_data.message]

if span_impl_type is not None and child is not None:
with child:
with send_context_manager() as child:
self._check_closed()
wrapper_event_data = self._wrap_eventdata(event_data, child, partition_key)
self._unsent_events = [wrapper_event_data.message]

if child:
self._client._add_span_request_attributes( # pylint: disable=protected-access
child
)
self._send_event_data_with_retry(timeout=timeout)
else:

self._send_event_data_with_retry(timeout=timeout)

def close(self):
Expand Down
48 changes: 39 additions & 9 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

from contextlib import contextmanager
import sys
import platform
import datetime
Expand All @@ -17,6 +18,7 @@
from uamqp.message import MessageHeader

from azure.core.settings import settings
from azure.core.tracing import SpanKind

from ._version import VERSION
from ._constants import (
Expand Down Expand Up @@ -122,6 +124,35 @@ def set_message_partition_key(message, partition_key):
message.header = header


@contextmanager
def send_context_manager():
span_impl_type = (
settings.tracing_implementation()
) # type: Type[AbstractSpan]

if span_impl_type is not None:
with span_impl_type(name="Azure.EventHubs.send") as child:
child.kind = SpanKind.CLIENT
yield child
else:
yield None


def add_link_to_send(event, send_span):
"""Add Diagnostic-Id from event to span as link.
"""
try:
if send_span:
if event.properties:
traceparent = event.properties.get(b"Diagnostic-Id", "").decode("ascii")
else:
traceparent = ""

send_span.link(traceparent)
except Exception as exp: # pylint:disable=broad-except
_LOGGER.warning("add_link_to_send had an exception %r", exp)


def trace_message(event, parent_span=None):
# type: (EventData, Optional[AbstractSpan]) -> None
"""Add tracing information to this event.
Expand All @@ -135,21 +166,20 @@ def trace_message(event, parent_span=None):
current_span = parent_span or span_impl_type(
span_impl_type.get_current_span()
)
message_span = current_span.span(name="Azure.EventHubs.message")
message_span.start()
app_prop = dict(event.properties) if event.properties else dict()
app_prop.setdefault(
b"Diagnostic-Id", message_span.get_trace_parent().encode("ascii")
)
event.properties = app_prop
message_span.finish()
with current_span.span(name="Azure.EventHubs.message") as message_span:
message_span.kind = SpanKind.PRODUCER
app_prop = dict(event.properties) if event.properties else dict()
app_prop.setdefault(
b"Diagnostic-Id", message_span.get_trace_parent().encode("ascii")
)
event.properties = app_prop
except Exception as exp: # pylint:disable=broad-except
_LOGGER.warning("trace_message had an exception %r", exp)


def trace_link_message(event, parent_span=None):
# type: (EventData, Optional[AbstractSpan]) -> None
"""Link the current event to current span.
"""Link the current event to current span or provided parent span.

Will extract DiagnosticId if available.
"""
Expand Down
40 changes: 19 additions & 21 deletions sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,24 @@
import uuid
import asyncio
import logging
from typing import Iterable, Union, Type, Optional, Any, AnyStr, List, TYPE_CHECKING
from typing import Iterable, Union, Optional, Any, AnyStr, List, TYPE_CHECKING
import time

from uamqp import types, constants, errors
from uamqp import SendClientAsync

from azure.core.tracing import SpanKind, AbstractSpan
from azure.core.settings import settings
from azure.core.tracing import AbstractSpan

from .._common import EventData, EventDataBatch
from ..exceptions import _error_handler, OperationTimeoutError
from .._producer import _set_partition_key, _set_trace_message
from .._utils import create_properties, set_message_partition_key, trace_message
from .._utils import (
create_properties,
set_message_partition_key,
trace_message,
send_context_manager,
add_link_to_send,
)
from .._constants import TIMEOUT_SYMBOL
from ._eventprocessor.utils import get_running_loop
from ._client_base_async import ConsumerProducerMixin
Expand Down Expand Up @@ -179,6 +184,7 @@ def _wrap_eventdata(
set_message_partition_key(event_data.message, partition_key)
wrapper_event_data = event_data
trace_message(wrapper_event_data, span)
add_link_to_send(wrapper_event_data, span)
else:
if isinstance(
event_data, EventDataBatch
Expand All @@ -189,11 +195,13 @@ def _wrap_eventdata(
raise ValueError(
"The partition_key does not match the one of the EventDataBatch"
)
for message in event_data.message._body_gen: # pylint: disable=protected-access
add_link_to_send(message, span)
wrapper_event_data = event_data # type:ignore
else:
if partition_key:
event_data = _set_partition_key(event_data, partition_key)
event_data = _set_trace_message(event_data)
event_data = _set_trace_message(event_data, span)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # type: ignore # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
return wrapper_event_data
Expand Down Expand Up @@ -230,26 +238,16 @@ async def send(
"""
# Tracing code
async with self._lock:
span_impl_type = (
settings.tracing_implementation()
) # type: Type[AbstractSpan]
child = None
if span_impl_type is not None:
child = span_impl_type(name="Azure.EventHubs.send")
child.kind = SpanKind.CLIENT # Should be PRODUCER
self._check_closed()
wrapper_event_data = self._wrap_eventdata(event_data, child, partition_key)
self._unsent_events = [wrapper_event_data.message]
with send_context_manager() as child:
self._check_closed()
wrapper_event_data = self._wrap_eventdata(event_data, child, partition_key)
self._unsent_events = [wrapper_event_data.message]

if span_impl_type is not None and child is not None:
with child:
if child:
self._client._add_span_request_attributes( # pylint: disable=protected-access
child
)
await self._send_event_data_with_retry(
timeout=timeout
) # pylint:disable=unexpected-keyword-arg
else:

await self._send_event_data_with_retry(
timeout=timeout
) # pylint:disable=unexpected-keyword-arg
Expand Down