Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing update #7252

Merged
merged 23 commits into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sdk/core/azure-core/azure/core/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# --------------------------------------------------------------------------

import abc
from typing import (TypeVar, Any, Dict, Optional)
from typing import (TypeVar, Any, Dict, Optional, Generic)

try:
ABC = abc.ABC
Expand Down Expand Up @@ -97,7 +97,7 @@ def pop(self, *args):
return super(PipelineContext, self).pop(*args)


class PipelineRequest(object):
class PipelineRequest(Generic[HTTPRequestType]):
"""A pipeline request object.

Container for moving the HttpRequest through the pipeline.
Expand All @@ -114,7 +114,7 @@ def __init__(self, http_request, context):
self.context = context


class PipelineResponse(object):
class PipelineResponse(Generic[HTTPRequestType, HTTPResponseType]):
"""A pipeline response object.

The PipelineResponse interface exposes an HTTP response object as it returns through the pipeline of Policy objects.
Expand All @@ -132,7 +132,7 @@ class PipelineResponse(object):
:type context: ~azure.core.pipeline.PipelineContext
"""
def __init__(self, http_request, http_response, context):
# type: (HTTPRequestType, HTTPResponseType, Optional[Dict[str, Any]]) -> None
# type: (HTTPRequestType, HTTPResponseType, PipelineContext) -> None
self.http_request = http_request
self.http_response = http_response
self.context = context
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core/azure/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@ def run(self, request, **kwargs):
:rtype: ~azure.core.pipeline.PipelineResponse
"""
context = PipelineContext(self._transport, **kwargs)
pipeline_request = PipelineRequest(request, context) # type: PipelineRequest
pipeline_request = PipelineRequest(request, context) # type: PipelineRequest[HTTPRequestType]
first_node = self._impl_policies[0] if self._impl_policies else _TransportRunner(self._transport)
return first_node.send(pipeline_request) # type: ignore
2 changes: 1 addition & 1 deletion sdk/core/azure-core/azure/core/pipeline/base_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def __aenter__(self) -> 'AsyncPipeline':
async def __aexit__(self, *exc_details): # pylint: disable=arguments-differ
await self._transport.__aexit__(*exc_details)

async def run(self, request: PipelineRequest, **kwargs: Any):
async def run(self, request: PipelineRequest[HTTPRequestType], **kwargs: Any):
"""Runs the HTTP Request through the chained policies.

:param request: The HTTP request object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CustomHookPolicy(SansIOHTTPPolicy):

*raw_response_hook* - Callback function. Will be invoked on response.
"""
def __init__(self, **kwargs): # pylint: disable=unused-argument
def __init__(self, **kwargs): # pylint: disable=unused-argument,super-init-not-called
self._callback = None

def on_request(self, request): # type: ignore # pylint: disable=arguments-differ
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
#
# --------------------------------------------------------------------------
"""Traces network calls using the implementation library from the settings."""

import sys
from six.moves import urllib

from azure.core.tracing.context import tracing_context
from azure.core.tracing.common import set_span_contexts
from azure.core.pipeline.policies import SansIOHTTPPolicy
from azure.core.settings import settings

Expand All @@ -42,11 +40,12 @@
from azure.core.pipeline.transport import HttpRequest, HttpResponse # pylint: disable=ungrouped-imports
from azure.core.tracing.abstract_span import AbstractSpan # pylint: disable=ungrouped-imports
from azure.core.pipeline import PipelineRequest, PipelineResponse # pylint: disable=ungrouped-imports
from typing import Any, Optional, Dict, List, Union
from typing import Any, Optional, Dict, List, Union, Tuple


class DistributedTracingPolicy(SansIOHTTPPolicy):
"""The policy to create spans for Azure Calls"""
TRACING_CONTEXT = "TRACING_CONTEXT"

def __init__(self):
# type: () -> None
Expand All @@ -60,52 +59,50 @@ def set_header(self, request, span): # pylint: disable=no-self-use
Sets the header information on the span.
"""
headers = span.to_header()
request.http_request.headers.update(headers) # type: ignore
request.http_request.headers.update(headers)

def on_request(self, request):
# type: (PipelineRequest) -> None
parent_span = tracing_context.current_span.get()
wrapper_class = settings.tracing_implementation()
original_context = [parent_span, None]
if parent_span is None and wrapper_class is not None:
current_span_instance = wrapper_class.get_current_span()
original_context[1] = current_span_instance
parent_span = wrapper_class(current_span_instance)

if parent_span is None:
span_impl_type = settings.tracing_implementation()
if span_impl_type is None:
return

path = urllib.parse.urlparse(request.http_request.url).path # type: ignore
path = urllib.parse.urlparse(request.http_request.url).path
if not path:
path = "/"
child = parent_span.span(name=path)
child.start()

set_span_contexts(child)
self.parent_span_dict[child] = original_context
self.set_header(request, child)
span = span_impl_type(name=path)
span.start()

self.set_header(request, span)

request.context[self.TRACING_CONTEXT] = span

def end_span(self, request, response=None):
# type: (HttpRequest, Optional[HttpResponse]) -> None
def end_span(self, request, response=None, exc_info=None):
# type: (PipelineRequest, Optional[HttpResponse], Optional[Tuple]) -> None
"""Ends the span that is tracing the network and updates its status."""
span = tracing_context.current_span.get() # type: AbstractSpan
if self.TRACING_CONTEXT not in request.context:
return

span = request.context[self.TRACING_CONTEXT] # type: AbstractSpan
http_request = request.http_request # type: HttpRequest
if span is not None:
span.set_http_attributes(request, response=response)
request_id = request.headers.get(self._request_id)
span.set_http_attributes(http_request, response=response)
request_id = http_request.headers.get(self._request_id)
if request_id is not None:
span.add_attribute(self._request_id, request_id)
if response and self._response_id in response.headers:
span.add_attribute(self._response_id, response.headers[self._response_id])
span.finish()
original_context = self.parent_span_dict.pop(span, None)
if original_context:
set_span_contexts(original_context[0], original_context[1])
if exc_info:
span.__exit__(*exc_info)
else:
span.finish()

def on_response(self, request, response):
# type: (PipelineRequest, PipelineResponse) -> None
self.end_span(request.http_request, response=response.http_response) # type: ignore
self.end_span(request, response=response.http_response)

def on_exception(self, _request): # pylint: disable=unused-argument
def on_exception(self, request): # pylint: disable=unused-argument
# type: (PipelineRequest) -> bool
self.end_span(_request.http_request) # type: ignore
self.end_span(request, exc_info=sys.exc_info())
return False
70 changes: 36 additions & 34 deletions sdk/core/azure-core/azure/core/pipeline/policies/universal.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

_LOGGER = logging.getLogger(__name__)
ContentDecodePolicyType = TypeVar('ContentDecodePolicyType', bound='ContentDecodePolicy')
HTTPRequestType = TypeVar("HTTPRequestType")
HTTPResponseType = TypeVar("HTTPResponseType")


class HeadersPolicy(SansIOHTTPPolicy):
Expand All @@ -68,10 +70,10 @@ class HeadersPolicy(SansIOHTTPPolicy):
:dedent: 4
:caption: Configuring a headers policy.
"""
def __init__(self, base_headers=None, **kwargs):
# type: (Mapping[str, str], Any) -> None
def __init__(self, base_headers=None, **kwargs): # pylint: disable=super-init-not-called
# type: (Dict[str, str], Any) -> None
self._headers = base_headers or {}
self._headers.update(kwargs.pop('headers', {})) # type: ignore
self._headers.update(kwargs.pop('headers', {}))

@property
def headers(self):
Expand All @@ -93,10 +95,10 @@ def on_request(self, request):
:param request: The PipelineRequest object
:type request: ~azure.core.pipeline.PipelineRequest
"""
request.http_request.headers.update(self.headers) # type: ignore
additional_headers = request.context.options.pop('headers', {}) # type: ignore
request.http_request.headers.update(self.headers)
additional_headers = request.context.options.pop('headers', {})
if additional_headers:
request.http_request.headers.update(additional_headers) # type: ignore
request.http_request.headers.update(additional_headers)


class UserAgentPolicy(SansIOHTTPPolicy):
Expand All @@ -121,7 +123,7 @@ class UserAgentPolicy(SansIOHTTPPolicy):
_USERAGENT = "User-Agent"
_ENV_ADDITIONAL_USER_AGENT = 'AZURE_HTTP_USER_AGENT'

def __init__(self, base_user_agent=None, **kwargs):
def __init__(self, base_user_agent=None, **kwargs): # pylint: disable=super-init-not-called
# type: (Optional[str], bool) -> None
self.overwrite = kwargs.pop('user_agent_overwrite', False)
self.use_env = kwargs.pop('user_agent_use_env', True)
Expand Down Expand Up @@ -160,17 +162,17 @@ def on_request(self, request):
:type request: ~azure.core.pipeline.PipelineRequest
"""
http_request = request.http_request
options = request.context.options # type: ignore
if 'user_agent' in options:
user_agent = options.pop('user_agent')
if options.pop('user_agent_overwrite', self.overwrite):
http_request.headers[self._USERAGENT] = user_agent # type: ignore
options_dict = request.context.options
if 'user_agent' in options_dict:
user_agent = options_dict.pop('user_agent')
if options_dict.pop('user_agent_overwrite', self.overwrite):
http_request.headers[self._USERAGENT] = user_agent
else:
user_agent = "{} {}".format(self.user_agent, user_agent)
http_request.headers[self._USERAGENT] = user_agent # type: ignore
http_request.headers[self._USERAGENT] = user_agent

elif self.overwrite or self._USERAGENT not in http_request.headers: # type: ignore
http_request.headers[self._USERAGENT] = self.user_agent # type: ignore
elif self.overwrite or self._USERAGENT not in http_request.headers:
http_request.headers[self._USERAGENT] = self.user_agent


class NetworkTraceLoggingPolicy(SansIOHTTPPolicy):
Expand Down Expand Up @@ -199,17 +201,17 @@ def on_request(self, request):
:type request: ~azure.core.pipeline.PipelineRequest
"""
http_request = request.http_request
options = request.context.options # type: ignore
options = request.context.options
if options.pop("logging_enable", self.enable_http_logger):
request.context["logging_enable"] = True # type: ignore
request.context["logging_enable"] = True
if not _LOGGER.isEnabledFor(logging.DEBUG):
return

try:
_LOGGER.debug("Request URL: %r", http_request.url) # type: ignore
_LOGGER.debug("Request method: %r", http_request.method) # type: ignore
_LOGGER.debug("Request URL: %r", http_request.url)
_LOGGER.debug("Request method: %r", http_request.method)
_LOGGER.debug("Request headers:")
for header, value in http_request.headers.items(): # type: ignore
for header, value in http_request.headers.items():
if header.lower() == 'authorization':
value = '*****'
_LOGGER.debug(" %r: %r", header, value)
Expand All @@ -232,33 +234,33 @@ def on_response(self, request, response):
:param response: The PipelineResponse object.
:type response: ~azure.core.pipeline.PipelineResponse
"""
if response.context.pop("logging_enable", self.enable_http_logger): # type: ignore
if response.context.pop("logging_enable", self.enable_http_logger):
if not _LOGGER.isEnabledFor(logging.DEBUG):
return

try:
_LOGGER.debug("Response status: %r", response.http_response.status_code) # type: ignore
_LOGGER.debug("Response status: %r", response.http_response.status_code)
_LOGGER.debug("Response headers:")
for res_header, value in response.http_response.headers.items(): # type: ignore
for res_header, value in response.http_response.headers.items():
_LOGGER.debug(" %r: %r", res_header, value)

# We don't want to log binary data if the response is a file.
_LOGGER.debug("Response content:")
pattern = re.compile(r'attachment; ?filename=["\w.]+', re.IGNORECASE)
header = response.http_response.headers.get('content-disposition') # type: ignore
header = response.http_response.headers.get('content-disposition')

if header and pattern.match(header):
filename = header.partition('=')[2]
_LOGGER.debug("File attachments: %s", filename)
elif response.http_response.headers.get("content-type", "").endswith("octet-stream"): # type: ignore
elif response.http_response.headers.get("content-type", "").endswith("octet-stream"):
_LOGGER.debug("Body contains binary data.")
elif response.http_response.headers.get("content-type", "").startswith("image"): # type: ignore
elif response.http_response.headers.get("content-type", "").startswith("image"):
_LOGGER.debug("Body contains image data.")
else:
if response.context.options.get('stream', False): # type: ignore
if response.context.options.get('stream', False):
_LOGGER.debug("Body is streamable")
else:
_LOGGER.debug(response.http_response.text()) # type: ignore
_LOGGER.debug(response.http_response.text())
except Exception as err: # pylint: disable=broad-except
_LOGGER.debug("Failed to log response: %s", repr(err))

Expand All @@ -274,7 +276,7 @@ class ContentDecodePolicy(SansIOHTTPPolicy):

@classmethod
def deserialize_from_text(cls, response, content_type=None):
# type: (Type[ContentDecodePolicyType], PipelineResponse, Optional[str]) -> Any
# type: (Type[ContentDecodePolicyType], HTTPResponseType, Optional[str]) -> Any
"""Decode response data according to content-type.
Accept a stream of data as well, but will be load at once in memory for now.
If no content-type, will return the string version (not bytes, not stream)
Expand Down Expand Up @@ -337,7 +339,7 @@ def _json_attemp(data):

@classmethod
def deserialize_from_http_generics(cls, response):
# type: (Type[ContentDecodePolicyType], PipelineResponse) -> Any
# type: (Type[ContentDecodePolicyType], HTTPResponseType) -> Any
"""Deserialize from HTTP response.
Use bytes and headers to NOT use any requests/aiohttp or whatever
specific implementation.
Expand All @@ -361,7 +363,7 @@ def deserialize_from_http_generics(cls, response):
return cls.deserialize_from_text(response, content_type)

def on_response(self, request, response):
# type: (PipelineRequest, PipelineResponse) -> None
# type: (PipelineRequest[HTTPRequestType], PipelineResponse[HTTPRequestType, HTTPResponseType]) -> None
"""Extract data from the body of a REST response object.
This will load the entire payload in memory.
Will follow Content-Type to parse.
Expand All @@ -378,10 +380,10 @@ def on_response(self, request, response):
:raises xml.etree.ElementTree.ParseError: If bytes is not valid XML
"""
# If response was asked as stream, do NOT read anything and quit now
if response.context.options.get("stream", True): # type: ignore
if response.context.options.get("stream", True):
return

response.context[self.CONTEXT_NAME] = self.deserialize_from_http_generics(response.http_response) # type: ignore
response.context[self.CONTEXT_NAME] = self.deserialize_from_http_generics(response.http_response)


class ProxyPolicy(SansIOHTTPPolicy):
Expand All @@ -401,7 +403,7 @@ class ProxyPolicy(SansIOHTTPPolicy):
:dedent: 4
:caption: Configuring a proxy policy.
"""
def __init__(self, proxies=None, **kwargs): #pylint: disable=unused-argument
def __init__(self, proxies=None, **kwargs): #pylint: disable=unused-argument,super-init-not-called
self.proxies = proxies

def on_request(self, request):
Expand Down
4 changes: 2 additions & 2 deletions sdk/core/azure-core/azure/core/polling/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@

from typing import Any, Callable, Union, List, Optional, TYPE_CHECKING
from azure.core.pipeline.transport.base import HttpResponse # type: ignore
from azure.core.tracing.context import tracing_context
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing.common import with_current_context

if TYPE_CHECKING:
import requests
Expand Down Expand Up @@ -139,7 +139,7 @@ def __init__(self, client, initial_response, deserialization_callback, polling_m
if not self._polling_method.finished():
self._done = threading.Event()
self._thread = threading.Thread(
target=tracing_context.with_current_context(self._start),
target=with_current_context(self._start),
name="LROPoller({})".format(uuid.uuid4()))
self._thread.daemon = True
self._thread.start()
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core/azure/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def get_opencensus_span():
try:
from azure.core.tracing.ext.opencensus_span import OpenCensusSpan # pylint:disable=redefined-outer-name

return OpenCensusSpan
return OpenCensusSpan # type: ignore
except ImportError:
return None

Expand Down
4 changes: 2 additions & 2 deletions sdk/core/azure-core/azure/core/tracing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from azure.core.tracing.abstract_span import AbstractSpan
from azure.core.tracing.abstract_span import AbstractSpan, SpanKind

__all__ = [
"AbstractSpan",
"AbstractSpan", "SpanKind"
]
Loading