diff --git a/.flake8 b/.flake8 index 7b88ce37eb9..baa9f02c31b 100644 --- a/.flake8 +++ b/.flake8 @@ -18,7 +18,7 @@ exclude = __pycache__ exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/gen/ exporter/opentelemetry-exporter-jaeger/build/* - exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen + exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen/ docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/ docs/examples/opentelemetry-example-app/build/* opentelemetry-proto/build/* diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen/__init__.py b/exporter/opentelemetry-exporter-zipkin/CHANGELOG.md similarity index 100% rename from exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen/__init__.py rename to exporter/opentelemetry-exporter-zipkin/CHANGELOG.md diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py index a7ea40180a3..1b2554923f6 100644 --- a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/__init__.py @@ -18,16 +18,16 @@ Usage ----- -The **OpenTelemetry Zipkin Exporter** allows to export `OpenTelemetry`_ traces to `Zipkin`_. -This exporter always send traces to the configured Zipkin collector using HTTP. - +The **OpenTelemetry Zipkin Exporter** allows exporting of `OpenTelemetry`_ +traces to `Zipkin`_. This exporter sends traces to the configured Zipkin +collector endpoint using HTTP and supports multiple protocols (v1 json, +v2 json, v2 protobuf). .. _Zipkin: https://zipkin.io/ .. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ .. _Specification: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/sdk-environment-variables.md#zipkin-exporter .. envvar:: OTEL_EXPORTER_ZIPKIN_ENDPOINT -.. envvar:: OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT .. code:: python @@ -41,12 +41,13 @@ # create a ZipkinSpanExporter zipkin_exporter = zipkin.ZipkinSpanExporter( - service_name="my-helloworld-service", + # protocol=Protocol.V2_PROTOBUF # optional: - # url="http://localhost:9411/api/v2/spans", - # ipv4="", - # ipv6="", - # retry=False, + # endpoint="http://localhost:9411/api/v2/spans", + # local_node_ipv4="192.168.0.1", + # local_node_ipv6="2001:db8::c001", + # local_node_port=31313, + # max_tag_value_length=256 ) # Create a BatchExportSpanProcessor and add the exporter to it @@ -58,392 +59,81 @@ with tracer.start_as_current_span("foo"): print("Hello world!") -The exporter supports the following environment variables for configuration: - -:envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`: target to which the exporter will -send data. This may include a path (e.g. http://example.com:9411/api/v2/spans). +The exporter supports the following environment variable for configuration: -:envvar:`OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT`: transport interchange format -to use when sending data. Currently only Zipkin's v2 json and protobuf formats -are supported, with v2 json being the default. +:envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`: zipkin collector endpoint to which the +exporter will send data. This may include a path (e.g. +http://example.com:9411/api/v2/spans). API --- """ -import json import logging from os import environ -from typing import Optional, Sequence, Union -from urllib.parse import urlparse +from typing import Optional, Sequence import requests -from opentelemetry.exporter.zipkin.gen import zipkin_pb2 +from opentelemetry.exporter.zipkin.encoder import Encoder, Protocol +from opentelemetry.exporter.zipkin.encoder.v1.json import JsonV1Encoder +from opentelemetry.exporter.zipkin.encoder.v2.json import JsonV2Encoder +from opentelemetry.exporter.zipkin.encoder.v2.protobuf import ProtobufEncoder +from opentelemetry.exporter.zipkin.node_endpoint import IpInput, NodeEndpoint from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_ZIPKIN_ENDPOINT, - OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT, ) from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult -from opentelemetry.trace import Span, SpanContext, SpanKind -from opentelemetry.trace.status import StatusCode - -TRANSPORT_FORMAT_JSON = "json" -TRANSPORT_FORMAT_PROTOBUF = "protobuf" - -DEFAULT_RETRY = False -DEFAULT_URL = "http://localhost:9411/api/v2/spans" -DEFAULT_MAX_TAG_VALUE_LENGTH = 128 - -SPAN_KIND_MAP_JSON = { - SpanKind.INTERNAL: None, - SpanKind.SERVER: "SERVER", - SpanKind.CLIENT: "CLIENT", - SpanKind.PRODUCER: "PRODUCER", - SpanKind.CONSUMER: "CONSUMER", -} +from opentelemetry.trace import Span -SPAN_KIND_MAP_PROTOBUF = { - SpanKind.INTERNAL: zipkin_pb2.Span.Kind.SPAN_KIND_UNSPECIFIED, - SpanKind.SERVER: zipkin_pb2.Span.Kind.SERVER, - SpanKind.CLIENT: zipkin_pb2.Span.Kind.CLIENT, - SpanKind.PRODUCER: zipkin_pb2.Span.Kind.PRODUCER, - SpanKind.CONSUMER: zipkin_pb2.Span.Kind.CONSUMER, -} - -NAME_KEY = "otel.library.name" -VERSION_KEY = "otel.library.version" - -SUCCESS_STATUS_CODES = (200, 202) +DEFAULT_ENDPOINT = "http://localhost:9411/api/v2/spans" +REQUESTS_SUCCESS_STATUS_CODES = (200, 202) logger = logging.getLogger(__name__) class ZipkinSpanExporter(SpanExporter): - """Zipkin span exporter for OpenTelemetry. - - Args: - service_name: Service that logged an annotation in a trace.Classifier - when query for spans. - url: The Zipkin endpoint URL - ipv4: Primary IPv4 address associated with this connection. - ipv6: Primary IPv6 address associated with this connection. - retry: Set to True to configure the exporter to retry on failure. - transport_format: transport interchange format to use - """ - def __init__( self, - service_name: str, - url: str = None, - ipv4: Optional[str] = None, - ipv6: Optional[str] = None, - retry: Optional[str] = DEFAULT_RETRY, - max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH, - transport_format: Union[ - TRANSPORT_FORMAT_JSON, TRANSPORT_FORMAT_PROTOBUF, None - ] = None, + protocol: Protocol, + endpoint: Optional[str] = None, + local_node_ipv4: IpInput = None, + local_node_ipv6: IpInput = None, + local_node_port: Optional[int] = None, + max_tag_value_length: Optional[int] = None, ): - self.service_name = service_name - if url is None: - self.url = ( - environ.get(OTEL_EXPORTER_ZIPKIN_ENDPOINT) or DEFAULT_URL - ) - else: - self.url = url - - self.port = urlparse(self.url).port - - self.ipv4 = ipv4 - self.ipv6 = ipv6 - self.retry = retry - self.max_tag_value_length = max_tag_value_length + self.local_node = NodeEndpoint( + local_node_ipv4, local_node_ipv6, local_node_port + ) - if transport_format is None: - self.transport_format = ( - environ.get(OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT) - or TRANSPORT_FORMAT_JSON + if endpoint is None: + endpoint = ( + environ.get(OTEL_EXPORTER_ZIPKIN_ENDPOINT) or DEFAULT_ENDPOINT ) - else: - self.transport_format = transport_format + self.endpoint = endpoint - def export(self, spans: Sequence[Span]) -> SpanExportResult: - if self.transport_format == TRANSPORT_FORMAT_JSON: - content_type = "application/json" - elif self.transport_format == TRANSPORT_FORMAT_PROTOBUF: - content_type = "application/x-protobuf" - else: - logger.error("Invalid transport format %s", self.transport_format) - return SpanExportResult.FAILURE + if protocol == Protocol.V1_JSON: + self.encoder = JsonV1Encoder(max_tag_value_length) + elif protocol == Protocol.V2_JSON: + self.encoder = JsonV2Encoder(max_tag_value_length) + elif protocol == Protocol.V2_PROTOBUF: + self.encoder = ProtobufEncoder(max_tag_value_length) + def export(self, spans: Sequence[Span]) -> SpanExportResult: result = requests.post( - url=self.url, - data=self._translate_to_transport_format(spans), - headers={"Content-Type": content_type}, + url=self.endpoint, + data=self.encoder.serialize(spans, self.local_node), + headers={"Content-Type": self.encoder.content_type()}, ) - if result.status_code not in SUCCESS_STATUS_CODES: + if result.status_code not in REQUESTS_SUCCESS_STATUS_CODES: logger.error( "Traces cannot be uploaded; status code: %s, message %s", result.status_code, result.text, ) - - if self.retry: - return SpanExportResult.FAILURE return SpanExportResult.FAILURE return SpanExportResult.SUCCESS def shutdown(self) -> None: pass - - def _translate_to_transport_format(self, spans: Sequence[Span]): - return ( - self._translate_to_json(spans) - if self.transport_format == TRANSPORT_FORMAT_JSON - else self._translate_to_protobuf(spans) - ) - - def _translate_to_json(self, spans: Sequence[Span]): - local_endpoint = {"serviceName": self.service_name, "port": self.port} - - if self.ipv4 is not None: - local_endpoint["ipv4"] = self.ipv4 - - if self.ipv6 is not None: - local_endpoint["ipv6"] = self.ipv6 - - zipkin_spans = [] - for span in spans: - context = span.get_span_context() - trace_id = context.trace_id - span_id = context.span_id - - # Timestamp in zipkin spans is int of microseconds. - # see: https://zipkin.io/pages/instrumenting.html - start_timestamp_mus = nsec_to_usec_round(span.start_time) - duration_mus = nsec_to_usec_round(span.end_time - span.start_time) - - zipkin_span = { - # Ensure left-zero-padding of traceId, spanId, parentId - "traceId": format(trace_id, "032x"), - "id": format(span_id, "016x"), - "name": span.name, - "timestamp": start_timestamp_mus, - "duration": duration_mus, - "localEndpoint": local_endpoint, - "kind": SPAN_KIND_MAP_JSON[span.kind], - "tags": self._extract_tags_from_span(span), - "annotations": self._extract_annotations_from_events( - span.events - ), - } - - if span.instrumentation_info is not None: - zipkin_span["tags"][NAME_KEY] = span.instrumentation_info.name - zipkin_span["tags"][ - VERSION_KEY - ] = span.instrumentation_info.version - - if span.status.status_code is not StatusCode.UNSET: - zipkin_span["tags"][ - "otel.status_code" - ] = span.status.status_code.name - if span.status.status_code is StatusCode.ERROR: - zipkin_span["tags"]["error"] = ( - span.status.description or "" - ) - - if context.trace_flags.sampled: - zipkin_span["debug"] = True - - if isinstance(span.parent, Span): - zipkin_span["parentId"] = format( - span.parent.get_span_context().span_id, "016x" - ) - elif isinstance(span.parent, SpanContext): - zipkin_span["parentId"] = format(span.parent.span_id, "016x") - - zipkin_spans.append(zipkin_span) - - return json.dumps(zipkin_spans) - - def _translate_to_protobuf(self, spans: Sequence[Span]): - - local_endpoint = zipkin_pb2.Endpoint( - service_name=self.service_name, port=self.port - ) - - if self.ipv4 is not None: - local_endpoint.ipv4 = self.ipv4 - - if self.ipv6 is not None: - local_endpoint.ipv6 = self.ipv6 - - pbuf_spans = zipkin_pb2.ListOfSpans() - - for span in spans: - context = span.get_span_context() - trace_id = context.trace_id.to_bytes( - length=16, byteorder="big", signed=False, - ) - span_id = self.format_pbuf_span_id(context.span_id) - - # Timestamp in zipkin spans is int of microseconds. - # see: https://zipkin.io/pages/instrumenting.html - start_timestamp_mus = nsec_to_usec_round(span.start_time) - duration_mus = nsec_to_usec_round(span.end_time - span.start_time) - - # pylint: disable=no-member - pbuf_span = zipkin_pb2.Span( - trace_id=trace_id, - id=span_id, - name=span.name, - timestamp=start_timestamp_mus, - duration=duration_mus, - local_endpoint=local_endpoint, - kind=SPAN_KIND_MAP_PROTOBUF[span.kind], - tags=self._extract_tags_from_span(span), - ) - - annotations = self._extract_annotations_from_events(span.events) - - if annotations is not None: - for annotation in annotations: - pbuf_span.annotations.append( - zipkin_pb2.Annotation( - timestamp=annotation["timestamp"], - value=annotation["value"], - ) - ) - - if span.instrumentation_info is not None: - pbuf_span.tags.update( - { - NAME_KEY: span.instrumentation_info.name, - VERSION_KEY: span.instrumentation_info.version, - } - ) - - if span.status.status_code is not StatusCode.UNSET: - pbuf_span.tags.update( - {"otel.status_code": span.status.status_code.name} - ) - if span.status.status_code is StatusCode.ERROR: - pbuf_span.tags.update( - {"error": span.status.description or ""} - ) - - if context.trace_flags.sampled: - pbuf_span.debug = True - - if isinstance(span.parent, Span): - pbuf_span.parent_id = self.format_pbuf_span_id( - span.parent.get_span_context().span_id - ) - elif isinstance(span.parent, SpanContext): - pbuf_span.parent_id = self.format_pbuf_span_id( - span.parent.span_id - ) - - pbuf_spans.spans.append(pbuf_span) - - return pbuf_spans.SerializeToString() - - @staticmethod - def format_pbuf_span_id(span_id: int): - return span_id.to_bytes(length=8, byteorder="big", signed=False) - - def _extract_tags_from_dict(self, tags_dict): - tags = {} - if not tags_dict: - return tags - for attribute_key, attribute_value in tags_dict.items(): - if isinstance(attribute_value, bool): - value = str(attribute_value).lower() - elif isinstance(attribute_value, (int, float, str)): - value = str(attribute_value) - elif isinstance(attribute_value, Sequence): - value = self._extract_tag_value_string_from_sequence( - attribute_value - ) - if not value: - logger.warning("Could not serialize tag %s", attribute_key) - continue - else: - logger.warning("Could not serialize tag %s", attribute_key) - continue - - if self.max_tag_value_length > 0: - value = value[: self.max_tag_value_length] - tags[attribute_key] = value - return tags - - def _extract_tag_value_string_from_sequence(self, sequence: Sequence): - if self.max_tag_value_length == 1: - return None - - tag_value_elements = [] - running_string_length = ( - 2 # accounts for array brackets in output string - ) - defined_max_tag_value_length = self.max_tag_value_length > 0 - - for element in sequence: - if isinstance(element, bool): - tag_value_element = str(element).lower() - elif isinstance(element, (int, float, str)): - tag_value_element = str(element) - elif element is None: - tag_value_element = None - else: - continue - - if defined_max_tag_value_length: - if tag_value_element is None: - running_string_length += 4 # null with no quotes - else: - # + 2 accounts for string quotation marks - running_string_length += len(tag_value_element) + 2 - - if tag_value_elements: - # accounts for ',' item separator - running_string_length += 1 - - if running_string_length > self.max_tag_value_length: - break - - tag_value_elements.append(tag_value_element) - - return json.dumps(tag_value_elements, separators=(",", ":")) - - def _extract_tags_from_span(self, span: Span): - tags = self._extract_tags_from_dict(span.attributes) - if span.resource: - tags.update(self._extract_tags_from_dict(span.resource.attributes)) - return tags - - def _extract_annotations_from_events(self, events): - if not events: - return None - - annotations = [] - for event in events: - attrs = {} - for key, value in event.attributes.items(): - if isinstance(value, str): - value = value[: self.max_tag_value_length] - attrs[key] = value - - annotations.append( - { - "timestamp": nsec_to_usec_round(event.timestamp), - "value": json.dumps({event.name: attrs}), - } - ) - return annotations - - -def nsec_to_usec_round(nsec): - """Round nanoseconds to microseconds""" - return (nsec + 500) // 10 ** 3 diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/__init__.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/__init__.py new file mode 100644 index 00000000000..2d4aad6b248 --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/__init__.py @@ -0,0 +1,267 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zipkin Exporter Transport Encoder + +Base module and abstract class for concrete transport encoders to extend. +""" + +import abc +import json +import logging +from enum import Enum +from typing import Any, Dict, List, Optional, Sequence, TypeVar + +from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint +from opentelemetry.sdk.trace import Event +from opentelemetry.trace import Span, SpanContext +from opentelemetry.trace.status import StatusCode + +EncodedLocalEndpointT = TypeVar("EncodedLocalEndpointT") + +DEFAULT_MAX_TAG_VALUE_LENGTH = 128 +NAME_KEY = "otel.library.name" +VERSION_KEY = "otel.library.version" + +logger = logging.getLogger(__name__) + + +class Protocol(Enum): + """Enum of supported protocol formats. + + Values are human-readable strings so that they can be easily used by the + OS environ var OTEL_EXPORTER_ZIPKIN_PROTOCOL (reserved for future usage). + """ + + V1_JSON = "v1_json" + V2_JSON = "v2_json" + V2_PROTOBUF = "v2_protobuf" + + +# pylint: disable=W0223 +class Encoder(abc.ABC): + """Base class for encoders that are used by the exporter. + + Args: + max_tag_value_length: maximum length of an exported tag value. Values + will be truncated to conform. Since values are serialized to a JSON + list string, max_tag_value_length is honored at the element boundary. + """ + + def __init__( + self, max_tag_value_length: int = DEFAULT_MAX_TAG_VALUE_LENGTH + ): + self.max_tag_value_length = max_tag_value_length + + @staticmethod + @abc.abstractmethod + def content_type() -> str: + pass + + @abc.abstractmethod + def serialize( + self, spans: Sequence[Span], local_endpoint: NodeEndpoint + ) -> str: + pass + + @abc.abstractmethod + def _encode_span( + self, span: Span, encoded_local_endpoint: EncodedLocalEndpointT + ) -> Any: + """ + Per spec Zipkin fields that can be absent SHOULD be omitted from the + payload when they are empty in the OpenTelemetry Span. + + https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/sdk_exporters/zipkin.md#request-payload + """ + + @staticmethod + @abc.abstractmethod + def _encode_local_endpoint( + local_endpoint: NodeEndpoint, + ) -> EncodedLocalEndpointT: + pass + + @staticmethod + def _encode_debug(span_context) -> Any: + return span_context.trace_flags.sampled + + @staticmethod + @abc.abstractmethod + def _encode_span_id(span_id: int) -> Any: + pass + + @staticmethod + @abc.abstractmethod + def _encode_trace_id(trace_id: int) -> Any: + pass + + @staticmethod + def _get_parent_id(span_context) -> Optional[int]: + if isinstance(span_context, Span): + parent_id = span_context.parent.span_id + elif isinstance(span_context, SpanContext): + parent_id = span_context.span_id + else: + parent_id = None + return parent_id + + def _extract_tags_from_dict( + self, tags_dict: Optional[Dict] + ) -> Dict[str, str]: + tags = {} + if not tags_dict: + return tags + for attribute_key, attribute_value in tags_dict.items(): + if isinstance(attribute_value, bool): + value = str(attribute_value).lower() + elif isinstance(attribute_value, (int, float, str)): + value = str(attribute_value) + elif isinstance(attribute_value, Sequence): + value = self._extract_tag_value_string_from_sequence( + attribute_value + ) + if not value: + logger.warning("Could not serialize tag %s", attribute_key) + continue + else: + logger.warning("Could not serialize tag %s", attribute_key) + continue + + if self.max_tag_value_length > 0: + value = value[: self.max_tag_value_length] + tags[attribute_key] = value + return tags + + def _extract_tag_value_string_from_sequence(self, sequence: Sequence): + if self.max_tag_value_length == 1: + return None + + tag_value_elements = [] + running_string_length = ( + 2 # accounts for array brackets in output string + ) + defined_max_tag_value_length = self.max_tag_value_length > 0 + + for element in sequence: + if isinstance(element, bool): + tag_value_element = str(element).lower() + elif isinstance(element, (int, float, str)): + tag_value_element = str(element) + elif element is None: + tag_value_element = None + else: + continue + + if defined_max_tag_value_length: + if tag_value_element is None: + running_string_length += 4 # null with no quotes + else: + # + 2 accounts for string quotation marks + running_string_length += len(tag_value_element) + 2 + + if tag_value_elements: + # accounts for ',' item separator + running_string_length += 1 + + if running_string_length > self.max_tag_value_length: + break + + tag_value_elements.append(tag_value_element) + + return json.dumps(tag_value_elements, separators=(",", ":")) + + def _extract_tags_from_span(self, span: Span) -> Dict[str, str]: + tags = self._extract_tags_from_dict(span.attributes) + if span.resource: + tags.update(self._extract_tags_from_dict(span.resource.attributes)) + if span.instrumentation_info is not None: + tags.update( + { + NAME_KEY: span.instrumentation_info.name, + VERSION_KEY: span.instrumentation_info.version, + } + ) + if span.status.status_code is not StatusCode.UNSET: + tags.update({"otel.status_code": span.status.status_code.name}) + if span.status.status_code is StatusCode.ERROR: + tags.update({"error": span.status.description or ""}) + return tags + + def _extract_annotations_from_events( + self, events: Optional[List[Event]] + ) -> Optional[List[Dict]]: + if not events: + return None + + annotations = [] + for event in events: + attrs = {} + for key, value in event.attributes.items(): + if isinstance(value, str) and self.max_tag_value_length > 0: + value = value[: self.max_tag_value_length] + attrs[key] = value + + annotations.append( + { + "timestamp": self._nsec_to_usec_round(event.timestamp), + "value": json.dumps({event.name: attrs}, sort_keys=True), + } + ) + return annotations + + @staticmethod + def _nsec_to_usec_round(nsec: int) -> int: + """Round nanoseconds to microseconds + + Timestamp in zipkin spans is int of microseconds. + See: https://zipkin.io/pages/instrumenting.html + """ + return (nsec + 500) // 10 ** 3 + + +class JsonEncoder(Encoder): + @staticmethod + def content_type(): + return "application/json" + + def serialize( + self, spans: Sequence[Span], local_endpoint: NodeEndpoint + ) -> str: + encoded_local_endpoint = self._encode_local_endpoint(local_endpoint) + encoded_spans = [] + for span in spans: + encoded_spans.append( + self._encode_span(span, encoded_local_endpoint) + ) + return json.dumps(encoded_spans) + + @staticmethod + def _encode_local_endpoint(local_endpoint: NodeEndpoint) -> Dict: + encoded_local_endpoint = {"serviceName": local_endpoint.service_name} + if local_endpoint.ipv4 is not None: + encoded_local_endpoint["ipv4"] = str(local_endpoint.ipv4) + if local_endpoint.ipv6 is not None: + encoded_local_endpoint["ipv6"] = str(local_endpoint.ipv6) + if local_endpoint.port is not None: + encoded_local_endpoint["port"] = local_endpoint.port + return encoded_local_endpoint + + @staticmethod + def _encode_span_id(span_id: int) -> str: + return format(span_id, "016x") + + @staticmethod + def _encode_trace_id(trace_id: int) -> str: + return format(trace_id, "032x") diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v1/__init__.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v1/__init__.py new file mode 100644 index 00000000000..b892a67bef3 --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v1/__init__.py @@ -0,0 +1,41 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zipkin Export Encoders for JSON formats +""" + +import abc +from typing import Dict, List + +from opentelemetry.exporter.zipkin.encoder import Encoder +from opentelemetry.trace import Span, SpanContext, SpanKind + + +# pylint: disable=W0223 +class V1Encoder(Encoder): + def _extract_binary_annotations( + self, span: Span, encoded_local_endpoint: Dict + ) -> List[Dict]: + binary_annotations = [] + for tag_key, tag_value in self._extract_tags_from_span(span).items(): + if isinstance(tag_value, str) and self.max_tag_value_length > 0: + tag_value = tag_value[: self.max_tag_value_length] + binary_annotations.append( + { + "key": tag_key, + "value": tag_value, + "endpoint": encoded_local_endpoint, + } + ) + return binary_annotations diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v1/json.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v1/json.py new file mode 100644 index 00000000000..966657311bb --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v1/json.py @@ -0,0 +1,65 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zipkin Export Encoders for JSON formats +""" +from typing import Dict + +from opentelemetry.exporter.zipkin.encoder import JsonEncoder +from opentelemetry.exporter.zipkin.encoder.v1 import V1Encoder +from opentelemetry.trace import Span + + +class JsonV1Encoder(JsonEncoder, V1Encoder): + """Zipkin Export Encoder for JSON v1 API + + API spec: https://github.com/openzipkin/zipkin-api/blob/master/zipkin-api.yaml + """ + + def _encode_span(self, span: Span, encoded_local_endpoint: Dict) -> Dict: + context = span.get_span_context() + + encoded_span = { + "traceId": self._encode_trace_id(context.trace_id), + "id": self._encode_span_id(context.span_id), + "name": span.name, + "timestamp": self._nsec_to_usec_round(span.start_time), + "duration": self._nsec_to_usec_round( + span.end_time - span.start_time + ), + } + + encoded_annotations = self._extract_annotations_from_events( + span.events + ) + if encoded_annotations is not None: + for annotation in encoded_annotations: + annotation["endpoint"] = encoded_local_endpoint + encoded_span["annotations"] = encoded_annotations + + binary_annotations = self._extract_binary_annotations( + span, encoded_local_endpoint + ) + if binary_annotations: + encoded_span["binaryAnnotations"] = binary_annotations + + debug = self._encode_debug(context) + if debug: + encoded_span["debug"] = debug + + parent_id = self._get_parent_id(span.parent) + if parent_id is not None: + encoded_span["parentId"] = self._encode_span_id(parent_id) + + return encoded_span diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/json.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/json.py new file mode 100644 index 00000000000..ec6e53382b6 --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/json.py @@ -0,0 +1,67 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zipkin Export Encoders for JSON formats +""" +from typing import Dict + +from opentelemetry.exporter.zipkin.encoder import JsonEncoder +from opentelemetry.trace import Span, SpanKind + + +class JsonV2Encoder(JsonEncoder): + """Zipkin Export Encoder for JSON v2 API + + API spec: https://github.com/openzipkin/zipkin-api/blob/master/zipkin2-api.yaml + """ + + SPAN_KIND_MAP = { + SpanKind.INTERNAL: None, + SpanKind.SERVER: "SERVER", + SpanKind.CLIENT: "CLIENT", + SpanKind.PRODUCER: "PRODUCER", + SpanKind.CONSUMER: "CONSUMER", + } + + def _encode_span(self, span: Span, encoded_local_endpoint: Dict) -> Dict: + context = span.get_span_context() + encoded_span = { + "traceId": self._encode_trace_id(context.trace_id), + "id": self._encode_span_id(context.span_id), + "name": span.name, + "timestamp": self._nsec_to_usec_round(span.start_time), + "duration": self._nsec_to_usec_round( + span.end_time - span.start_time + ), + "localEndpoint": encoded_local_endpoint, + "kind": self.SPAN_KIND_MAP[span.kind], + } + + tags = self._extract_tags_from_span(span) + if tags: + encoded_span["tags"] = tags + + annotations = self._extract_annotations_from_events(span.events) + if annotations: + encoded_span["annotations"] = annotations + + debug = self._encode_debug(context) + if debug: + encoded_span["debug"] = debug + + parent_id = self._get_parent_id(span.parent) + if parent_id is not None: + encoded_span["parentId"] = self._encode_span_id(parent_id) + + return encoded_span diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/__init__.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/__init__.py new file mode 100644 index 00000000000..f0cdb65b37c --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/__init__.py @@ -0,0 +1,129 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zipkin Export Encoder for Protobuf + +API spec: https://github.com/openzipkin/zipkin-api/blob/master/zipkin.proto +""" +from typing import List, Optional, Sequence + +from opentelemetry.exporter.zipkin.encoder import Encoder +from opentelemetry.exporter.zipkin.encoder.v2.protobuf.gen import zipkin_pb2 +from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint +from opentelemetry.sdk.trace import Event +from opentelemetry.trace import Span, SpanContext, SpanKind + + +class ProtobufEncoder(Encoder): + """Zipkin Export Encoder for Protobuf + + API spec: https://github.com/openzipkin/zipkin-api/blob/master/zipkin.proto + """ + + SPAN_KIND_MAP = { + SpanKind.INTERNAL: zipkin_pb2.Span.Kind.SPAN_KIND_UNSPECIFIED, + SpanKind.SERVER: zipkin_pb2.Span.Kind.SERVER, + SpanKind.CLIENT: zipkin_pb2.Span.Kind.CLIENT, + SpanKind.PRODUCER: zipkin_pb2.Span.Kind.PRODUCER, + SpanKind.CONSUMER: zipkin_pb2.Span.Kind.CONSUMER, + } + + @staticmethod + def content_type(): + return "application/x-protobuf" + + def serialize( + self, spans: Sequence[Span], local_endpoint: NodeEndpoint + ) -> str: + encoded_local_endpoint = self._encode_local_endpoint(local_endpoint) + # pylint: disable=no-member + encoded_spans = zipkin_pb2.ListOfSpans() + for span in spans: + encoded_spans.spans.append( + self._encode_span(span, encoded_local_endpoint) + ) + return encoded_spans.SerializeToString() + + def _encode_span( + self, span: Span, encoded_local_endpoint: zipkin_pb2.Endpoint + ) -> zipkin_pb2.Span: + context = span.get_span_context() + # pylint: disable=no-member + encoded_span = zipkin_pb2.Span( + trace_id=self._encode_trace_id(context.trace_id), + id=self._encode_span_id(context.span_id), + name=span.name, + timestamp=self._nsec_to_usec_round(span.start_time), + duration=self._nsec_to_usec_round(span.end_time - span.start_time), + local_endpoint=encoded_local_endpoint, + kind=self.SPAN_KIND_MAP[span.kind], + ) + + tags = self._extract_tags_from_span(span) + if tags: + encoded_span.tags.update(tags) + + annotations = self._encode_annotations(span.events) + if annotations: + encoded_span.annotations.extend(annotations) + + debug = self._encode_debug(context) + if debug: + encoded_span.debug = debug + + parent_id = self._get_parent_id(span.parent) + if parent_id is not None: + encoded_span.parent_id = self._encode_span_id(parent_id) + + return encoded_span + + def _encode_annotations( + self, span_events: Optional[List[Event]] + ) -> Optional[List]: + annotations = self._extract_annotations_from_events(span_events) + if annotations is None: + encoded_annotations = None + else: + encoded_annotations = [] + for annotation in annotations: + encoded_annotations.append( + zipkin_pb2.Annotation( + timestamp=annotation["timestamp"], + value=annotation["value"], + ) + ) + return encoded_annotations + + @staticmethod + def _encode_local_endpoint( + local_endpoint: NodeEndpoint, + ) -> zipkin_pb2.Endpoint: + encoded_local_endpoint = zipkin_pb2.Endpoint( + service_name=local_endpoint.service_name, + ) + if local_endpoint.ipv4 is not None: + encoded_local_endpoint.ipv4 = local_endpoint.ipv4.packed + if local_endpoint.ipv6 is not None: + encoded_local_endpoint.ipv6 = local_endpoint.ipv6.packed + if local_endpoint.port is not None: + encoded_local_endpoint.port = local_endpoint.port + return encoded_local_endpoint + + @staticmethod + def _encode_span_id(span_id: int) -> bytes: + return span_id.to_bytes(length=8, byteorder="big", signed=False) + + @staticmethod + def _encode_trace_id(trace_id: int) -> bytes: + return trace_id.to_bytes(length=16, byteorder="big", signed=False) diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen/__init__.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen/zipkin_pb2.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen/zipkin_pb2.py similarity index 100% rename from exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen/zipkin_pb2.py rename to exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen/zipkin_pb2.py diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen/zipkin_pb2.pyi b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen/zipkin_pb2.pyi similarity index 100% rename from exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen/zipkin_pb2.pyi rename to exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen/zipkin_pb2.pyi diff --git a/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/node_endpoint.py b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/node_endpoint.py new file mode 100644 index 00000000000..ee7cb71f02b --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/node_endpoint.py @@ -0,0 +1,85 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Zipkin Exporter Endpoints""" + +import ipaddress +from typing import Optional, Union + +from opentelemetry import trace +from opentelemetry.sdk.resources import SERVICE_NAME, Resource + +IpInput = Union[str, int, None] + + +class NodeEndpoint: + """The network context of a node in the service graph. + + Args: + ipv4: Primary IPv4 address associated with this connection. + ipv6: Primary IPv6 address associated with this connection. + port: Depending on context, this could be a listen port or the + client-side of a socket. None if unknown. + """ + + def __init__( + self, + ipv4: IpInput = None, + ipv6: IpInput = None, + port: Optional[int] = None, + ): + self.ipv4 = ipv4 + self.ipv6 = ipv6 + self.port = port + + tracer_provider = trace.get_tracer_provider() + + if hasattr(tracer_provider, "resource"): + resource = tracer_provider.resource + else: + resource = Resource.create() + + self.service_name = resource.attributes[SERVICE_NAME] + + @property + def ipv4(self) -> Optional[ipaddress.IPv4Address]: + return self._ipv4 + + @ipv4.setter + def ipv4(self, address: IpInput) -> None: + if address is None: + self._ipv4 = None + else: + ipv4_address = ipaddress.ip_address(address) + if not isinstance(ipv4_address, ipaddress.IPv4Address): + raise ValueError( + "%r does not appear to be an IPv4 address" % address + ) + self._ipv4 = ipv4_address + + @property + def ipv6(self) -> Optional[ipaddress.IPv6Address]: + return self._ipv6 + + @ipv6.setter + def ipv6(self, address: IpInput) -> None: + if address is None: + self._ipv6 = None + else: + ipv6_address = ipaddress.ip_address(address) + if not isinstance(ipv6_address, ipaddress.IPv6Address): + raise ValueError( + "%r does not appear to be an IPv6 address" % address + ) + self._ipv6 = ipv6_address diff --git a/exporter/opentelemetry-exporter-zipkin/tests/encoder/__init__.py b/exporter/opentelemetry-exporter-zipkin/tests/encoder/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/exporter/opentelemetry-exporter-zipkin/tests/encoder/common_tests.py b/exporter/opentelemetry-exporter-zipkin/tests/encoder/common_tests.py new file mode 100644 index 00000000000..a04148b6ea4 --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/tests/encoder/common_tests.py @@ -0,0 +1,505 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import abc +import json +import sys +import unittest +from typing import Dict, List + +from opentelemetry import trace as trace_api +from opentelemetry.exporter.zipkin.encoder import ( + DEFAULT_MAX_TAG_VALUE_LENGTH, + Encoder, +) +from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint +from opentelemetry.sdk import trace +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo +from opentelemetry.trace import TraceFlags +from opentelemetry.trace.status import Status, StatusCode + +TEST_SERVICE_NAME = "test_service" + + +# pylint: disable=protected-access +class CommonEncoderTestCases: + class CommonEncoderTest(unittest.TestCase): + @staticmethod + @abc.abstractmethod + def get_encoder(*args, **kwargs) -> Encoder: + pass + + @classmethod + def get_encoder_default(cls) -> Encoder: + return cls.get_encoder() + + @abc.abstractmethod + def test_encode_trace_id(self): + pass + + @abc.abstractmethod + def test_encode_span_id(self): + pass + + @abc.abstractmethod + def test_encode_local_endpoint_default(self): + pass + + @abc.abstractmethod + def test_encode_local_endpoint_explicits(self): + pass + + @abc.abstractmethod + def _test_encode_max_tag_length(self, max_tag_value_length: int): + pass + + def test_encode_max_tag_length_2(self): + self._test_encode_max_tag_length(2) + + def test_encode_max_tag_length_5(self): + self._test_encode_max_tag_length(5) + + def test_encode_max_tag_length_9(self): + self._test_encode_max_tag_length(9) + + def test_encode_max_tag_length_10(self): + self._test_encode_max_tag_length(10) + + def test_encode_max_tag_length_11(self): + self._test_encode_max_tag_length(11) + + def test_encode_max_tag_length_128(self): + self._test_encode_max_tag_length(128) + + def test_constructor_default(self): + encoder = self.get_encoder() + + self.assertEqual( + DEFAULT_MAX_TAG_VALUE_LENGTH, encoder.max_tag_value_length + ) + + def test_constructor_max_tag_value_length(self): + max_tag_value_length = 123456 + encoder = self.get_encoder(max_tag_value_length) + self.assertEqual( + max_tag_value_length, encoder.max_tag_value_length + ) + + def test_nsec_to_usec_round(self): + base_time_nsec = 683647322 * 10 ** 9 + for nsec in ( + base_time_nsec, + base_time_nsec + 150 * 10 ** 6, + base_time_nsec + 300 * 10 ** 6, + base_time_nsec + 400 * 10 ** 6, + ): + self.assertEqual( + (nsec + 500) // 10 ** 3, + self.get_encoder_default()._nsec_to_usec_round(nsec), + ) + + def test_encode_debug(self): + self.assertFalse( + self.get_encoder_default()._encode_debug( + trace_api.SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=0x00000000DEADBEF0, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.DEFAULT), + ) + ) + ) + self.assertTrue( + self.get_encoder_default()._encode_debug( + trace_api.SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=0x00000000DEADBEF0, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + ) + ) + + def test_get_parent_id_from_span(self): + parent_id = 0x00000000DEADBEF0 + self.assertEqual( + parent_id, + self.get_encoder_default()._get_parent_id( + trace._Span( + name="test-span", + context=trace_api.SpanContext( + 0x000000000000000000000000DEADBEEF, + 0x04BF92DEEFC58C92, + is_remote=False, + ), + parent=trace_api.SpanContext( + 0x0000000000000000000000AADEADBEEF, + parent_id, + is_remote=False, + ), + ) + ), + ) + + def test_get_parent_id_from_span_context(self): + parent_id = 0x00000000DEADBEF0 + self.assertEqual( + parent_id, + self.get_encoder_default()._get_parent_id( + trace_api.SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=parent_id, + is_remote=False, + ), + ), + ) + + @staticmethod + def get_data_for_max_tag_length_test( + max_tag_length: int, + ) -> (trace._Span, Dict): + start_time = 683647322 * 10 ** 9 # in ns + duration = 50 * 10 ** 6 + end_time = start_time + duration + + span = trace._Span( + name=TEST_SERVICE_NAME, + context=trace_api.SpanContext( + 0x0E0C63257DE34C926F9EFCD03927272E, + 0x04BF92DEEFC58C92, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ), + resource=trace.Resource({}), + ) + span.start(start_time=start_time) + span.set_attribute("string1", "v" * 500) + span.set_attribute("string2", "v" * 50) + span.set_attribute("list1", ["a"] * 25) + span.set_attribute("list2", ["a"] * 10) + span.set_attribute("list3", [2] * 25) + span.set_attribute("list4", [2] * 10) + span.set_attribute("list5", [True] * 25) + span.set_attribute("list6", [True] * 10) + span.set_attribute("tuple1", ("a",) * 25) + span.set_attribute("tuple2", ("a",) * 10) + span.set_attribute("tuple3", (2,) * 25) + span.set_attribute("tuple4", (2,) * 10) + span.set_attribute("tuple5", (True,) * 25) + span.set_attribute("tuple6", (True,) * 10) + span.set_attribute("range1", range(0, 25)) + span.set_attribute("range2", range(0, 10)) + span.set_attribute("empty_list", []) + span.set_attribute("none_list", ["hello", None, "world"]) + span.end(end_time=end_time) + + expected_outputs = { + 2: { + "string1": "vv", + "string2": "vv", + "list1": "[]", + "list2": "[]", + "list3": "[]", + "list4": "[]", + "list5": "[]", + "list6": "[]", + "tuple1": "[]", + "tuple2": "[]", + "tuple3": "[]", + "tuple4": "[]", + "tuple5": "[]", + "tuple6": "[]", + "range1": "[]", + "range2": "[]", + "empty_list": "[]", + "none_list": "[]", + }, + 5: { + "string1": "vvvvv", + "string2": "vvvvv", + "list1": '["a"]', + "list2": '["a"]', + "list3": '["2"]', + "list4": '["2"]', + "list5": "[]", + "list6": "[]", + "tuple1": '["a"]', + "tuple2": '["a"]', + "tuple3": '["2"]', + "tuple4": '["2"]', + "tuple5": "[]", + "tuple6": "[]", + "range1": '["0"]', + "range2": '["0"]', + "empty_list": "[]", + "none_list": "[]", + }, + 9: { + "string1": "vvvvvvvvv", + "string2": "vvvvvvvvv", + "list1": '["a","a"]', + "list2": '["a","a"]', + "list3": '["2","2"]', + "list4": '["2","2"]', + "list5": '["true"]', + "list6": '["true"]', + "tuple1": '["a","a"]', + "tuple2": '["a","a"]', + "tuple3": '["2","2"]', + "tuple4": '["2","2"]', + "tuple5": '["true"]', + "tuple6": '["true"]', + "range1": '["0","1"]', + "range2": '["0","1"]', + "empty_list": "[]", + "none_list": '["hello"]', + }, + 10: { + "string1": "vvvvvvvvvv", + "string2": "vvvvvvvvvv", + "list1": '["a","a"]', + "list2": '["a","a"]', + "list3": '["2","2"]', + "list4": '["2","2"]', + "list5": '["true"]', + "list6": '["true"]', + "tuple1": '["a","a"]', + "tuple2": '["a","a"]', + "tuple3": '["2","2"]', + "tuple4": '["2","2"]', + "tuple5": '["true"]', + "tuple6": '["true"]', + "range1": '["0","1"]', + "range2": '["0","1"]', + "empty_list": "[]", + "none_list": '["hello"]', + }, + 11: { + "string1": "vvvvvvvvvvv", + "string2": "vvvvvvvvvvv", + "list1": '["a","a"]', + "list2": '["a","a"]', + "list3": '["2","2"]', + "list4": '["2","2"]', + "list5": '["true"]', + "list6": '["true"]', + "tuple1": '["a","a"]', + "tuple2": '["a","a"]', + "tuple3": '["2","2"]', + "tuple4": '["2","2"]', + "tuple5": '["true"]', + "tuple6": '["true"]', + "range1": '["0","1"]', + "range2": '["0","1"]', + "empty_list": "[]", + "none_list": '["hello"]', + }, + 128: { + "string1": "v" * 128, + "string2": "v" * 50, + "list1": '["a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a"]', + "list2": '["a","a","a","a","a","a","a","a","a","a"]', + "list3": '["2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2"]', + "list4": '["2","2","2","2","2","2","2","2","2","2"]', + "list5": '["true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true"]', + "list6": '["true","true","true","true","true","true","true","true","true","true"]', + "tuple1": '["a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a"]', + "tuple2": '["a","a","a","a","a","a","a","a","a","a"]', + "tuple3": '["2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2"]', + "tuple4": '["2","2","2","2","2","2","2","2","2","2"]', + "tuple5": '["true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true"]', + "tuple6": '["true","true","true","true","true","true","true","true","true","true"]', + "range1": '["0","1","2","3","4","5","6","7","8","9","10","11","12","13","14","15","16","17","18","19","20","21","22","23","24"]', + "range2": '["0","1","2","3","4","5","6","7","8","9"]', + "empty_list": "[]", + "none_list": '["hello",null,"world"]', + }, + } + + return span, expected_outputs[max_tag_length] + + @staticmethod + def get_exhaustive_otel_span_list() -> List[trace._Span]: + trace_id = 0x6E0C63257DE34C926F9EFCD03927272E + + base_time = 683647322 * 10 ** 9 # in ns + start_times = ( + base_time, + base_time + 150 * 10 ** 6, + base_time + 300 * 10 ** 6, + base_time + 400 * 10 ** 6, + ) + end_times = ( + start_times[0] + (50 * 10 ** 6), + start_times[1] + (100 * 10 ** 6), + start_times[2] + (200 * 10 ** 6), + start_times[3] + (300 * 10 ** 6), + ) + + parent_span_context = trace_api.SpanContext( + trace_id, 0x1111111111111111, is_remote=False + ) + + other_context = trace_api.SpanContext( + trace_id, 0x2222222222222222, is_remote=False + ) + + span1 = trace._Span( + name="test-span-1", + context=trace_api.SpanContext( + trace_id, + 0x34BF92DEEFC58C92, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ), + parent=parent_span_context, + events=( + trace.Event( + name="event0", + timestamp=base_time + 50 * 10 ** 6, + attributes={ + "annotation_bool": True, + "annotation_string": "annotation_test", + "key_float": 0.3, + }, + ), + ), + links=( + trace_api.Link( + context=other_context, attributes={"key_bool": True} + ), + ), + resource=trace.Resource({}), + ) + span1.start(start_time=start_times[0]) + span1.set_attribute("key_bool", False) + span1.set_attribute("key_string", "hello_world") + span1.set_attribute("key_float", 111.22) + span1.set_status(Status(StatusCode.OK)) + span1.end(end_time=end_times[0]) + + span2 = trace._Span( + name="test-span-2", + context=parent_span_context, + parent=None, + resource=trace.Resource( + attributes={"key_resource": "some_resource"} + ), + ) + span2.start(start_time=start_times[1]) + span2.set_status(Status(StatusCode.ERROR, "Example description")) + span2.end(end_time=end_times[1]) + + span3 = trace._Span( + name="test-span-3", + context=other_context, + parent=None, + resource=trace.Resource( + attributes={"key_resource": "some_resource"} + ), + ) + span3.start(start_time=start_times[2]) + span3.set_attribute("key_string", "hello_world") + span3.end(end_time=end_times[2]) + + span4 = trace._Span( + name="test-span-3", + context=other_context, + parent=None, + resource=trace.Resource({}), + instrumentation_info=InstrumentationInfo( + name="name", version="version" + ), + ) + span4.start(start_time=start_times[3]) + span4.end(end_time=end_times[3]) + + return [span1, span2, span3, span4] + + # pylint: disable=W0223 + class CommonJsonEncoderTest(CommonEncoderTest, abc.ABC): + def test_encode_trace_id(self): + for trace_id in (1, 1024, 2 ** 32, 2 ** 64, 2 ** 65): + self.assertEqual( + format(trace_id, "032x"), + self.get_encoder_default()._encode_trace_id(trace_id), + ) + + def test_encode_span_id(self): + for span_id in (1, 1024, 2 ** 8, 2 ** 16, 2 ** 32, 2 ** 64): + self.assertEqual( + format(span_id, "016x"), + self.get_encoder_default()._encode_span_id(span_id), + ) + + def test_encode_local_endpoint_default(self): + self.assertEqual( + self.get_encoder_default()._encode_local_endpoint( + NodeEndpoint() + ), + {"serviceName": TEST_SERVICE_NAME}, + ) + + def test_encode_local_endpoint_explicits(self): + ipv4 = "192.168.0.1" + ipv6 = "2001:db8::c001" + port = 414120 + self.assertEqual( + self.get_encoder_default()._encode_local_endpoint( + NodeEndpoint(ipv4, ipv6, port) + ), + { + "serviceName": TEST_SERVICE_NAME, + "ipv4": ipv4, + "ipv6": ipv6, + "port": port, + }, + ) + + @staticmethod + def pop_and_sort(source_list, source_index, sort_key): + """ + Convenience method that will pop a specified index from a list, + sort it by a given key and then return it. + """ + popped_item = source_list.pop(source_index, None) + if popped_item is not None: + popped_item = sorted(popped_item, key=lambda x: x[sort_key]) + return popped_item + + def assert_equal_encoded_spans(self, expected_spans, actual_spans): + if sys.version_info.major == 3 and sys.version_info.minor <= 5: + expected_spans = json.loads(expected_spans) + actual_spans = json.loads(actual_spans) + for expected_span, actual_span in zip( + expected_spans, actual_spans + ): + actual_annotations = self.pop_and_sort( + actual_span, "annotations", "timestamp" + ) + expected_annotations = self.pop_and_sort( + expected_span, "annotations", "timestamp" + ) + expected_binary_annotations = self.pop_and_sort( + expected_span, "binaryAnnotations", "key" + ) + actual_binary_annotations = self.pop_and_sort( + actual_span, "binaryAnnotations", "key" + ) + self.assertEqual(actual_span, expected_span) + self.assertEqual(actual_annotations, expected_annotations) + self.assertEqual( + actual_binary_annotations, expected_binary_annotations + ) + else: + self.assertEqual(expected_spans, actual_spans) diff --git a/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v1_json.py b/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v1_json.py new file mode 100644 index 00000000000..13bc24e6cd2 --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v1_json.py @@ -0,0 +1,251 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + +from opentelemetry import trace as trace_api +from opentelemetry.exporter.zipkin.encoder import NAME_KEY, VERSION_KEY +from opentelemetry.exporter.zipkin.encoder.v1.json import JsonV1Encoder +from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint +from opentelemetry.sdk import trace +from opentelemetry.trace import TraceFlags + +from .common_tests import TEST_SERVICE_NAME, CommonEncoderTestCases + + +# pylint: disable=protected-access +class TestV1JsonEncoder(CommonEncoderTestCases.CommonJsonEncoderTest): + @staticmethod + def get_encoder(*args, **kwargs) -> JsonV1Encoder: + return JsonV1Encoder(*args, **kwargs) + + def test_encode(self): + + local_endpoint = {"serviceName": TEST_SERVICE_NAME} + + otel_spans = self.get_exhaustive_otel_span_list() + trace_id = JsonV1Encoder._encode_trace_id( + otel_spans[0].context.trace_id + ) + + expected_output = [ + { + "traceId": trace_id, + "id": JsonV1Encoder._encode_span_id( + otel_spans[0].context.span_id + ), + "name": otel_spans[0].name, + "timestamp": otel_spans[0].start_time // 10 ** 3, + "duration": (otel_spans[0].end_time // 10 ** 3) + - (otel_spans[0].start_time // 10 ** 3), + "annotations": [ + { + "timestamp": otel_spans[0].events[0].timestamp + // 10 ** 3, + "value": json.dumps( + { + "event0": { + "annotation_bool": True, + "annotation_string": "annotation_test", + "key_float": 0.3, + } + }, + sort_keys=True, + ), + "endpoint": local_endpoint, + } + ], + "binaryAnnotations": [ + { + "key": "key_bool", + "value": "false", + "endpoint": local_endpoint, + }, + { + "key": "key_string", + "value": "hello_world", + "endpoint": local_endpoint, + }, + { + "key": "key_float", + "value": "111.22", + "endpoint": local_endpoint, + }, + { + "key": "otel.status_code", + "value": "OK", + "endpoint": local_endpoint, + }, + ], + "debug": True, + "parentId": JsonV1Encoder._encode_span_id( + otel_spans[0].parent.span_id + ), + }, + { + "traceId": trace_id, + "id": JsonV1Encoder._encode_span_id( + otel_spans[1].context.span_id + ), + "name": otel_spans[1].name, + "timestamp": otel_spans[1].start_time // 10 ** 3, + "duration": (otel_spans[1].end_time // 10 ** 3) + - (otel_spans[1].start_time // 10 ** 3), + "binaryAnnotations": [ + { + "key": "key_resource", + "value": "some_resource", + "endpoint": local_endpoint, + }, + { + "key": "otel.status_code", + "value": "ERROR", + "endpoint": local_endpoint, + }, + { + "key": "error", + "value": "Example description", + "endpoint": local_endpoint, + }, + ], + }, + { + "traceId": trace_id, + "id": JsonV1Encoder._encode_span_id( + otel_spans[2].context.span_id + ), + "name": otel_spans[2].name, + "timestamp": otel_spans[2].start_time // 10 ** 3, + "duration": (otel_spans[2].end_time // 10 ** 3) + - (otel_spans[2].start_time // 10 ** 3), + "binaryAnnotations": [ + { + "key": "key_string", + "value": "hello_world", + "endpoint": local_endpoint, + }, + { + "key": "key_resource", + "value": "some_resource", + "endpoint": local_endpoint, + }, + ], + }, + { + "traceId": trace_id, + "id": JsonV1Encoder._encode_span_id( + otel_spans[3].context.span_id + ), + "name": otel_spans[3].name, + "timestamp": otel_spans[3].start_time // 10 ** 3, + "duration": (otel_spans[3].end_time // 10 ** 3) + - (otel_spans[3].start_time // 10 ** 3), + "binaryAnnotations": [ + { + "key": NAME_KEY, + "value": "name", + "endpoint": local_endpoint, + }, + { + "key": VERSION_KEY, + "value": "version", + "endpoint": local_endpoint, + }, + ], + }, + ] + + self.assert_equal_encoded_spans( + json.dumps(expected_output), + JsonV1Encoder().serialize(otel_spans, NodeEndpoint()), + ) + + def test_encode_id_zero_padding(self): + trace_id = 0x0E0C63257DE34C926F9EFCD03927272E + span_id = 0x04BF92DEEFC58C92 + parent_id = 0x0AAAAAAAAAAAAAAA + start_time = 683647322 * 10 ** 9 # in ns + duration = 50 * 10 ** 6 + end_time = start_time + duration + + otel_span = trace._Span( + name=TEST_SERVICE_NAME, + context=trace_api.SpanContext( + trace_id, + span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ), + parent=trace_api.SpanContext(trace_id, parent_id, is_remote=False), + resource=trace.Resource({}), + ) + otel_span.start(start_time=start_time) + otel_span.end(end_time=end_time) + + expected_output = [ + { + "traceId": format(trace_id, "032x"), + "id": format(span_id, "016x"), + "name": TEST_SERVICE_NAME, + "timestamp": JsonV1Encoder._nsec_to_usec_round(start_time), + "duration": JsonV1Encoder._nsec_to_usec_round(duration), + "debug": True, + "parentId": format(parent_id, "016x"), + } + ] + + self.assertEqual( + json.dumps(expected_output), + JsonV1Encoder().serialize([otel_span], NodeEndpoint()), + ) + + def _test_encode_max_tag_length(self, max_tag_value_length: int): + otel_span, expected_tag_output = self.get_data_for_max_tag_length_test( + max_tag_value_length + ) + service_name = otel_span.name + + binary_annotations = [] + for tag_key, tag_expected_value in expected_tag_output.items(): + binary_annotations.append( + { + "key": tag_key, + "value": tag_expected_value, + "endpoint": {"serviceName": service_name}, + } + ) + + expected_output = [ + { + "traceId": JsonV1Encoder._encode_trace_id( + otel_span.context.trace_id + ), + "id": JsonV1Encoder._encode_span_id(otel_span.context.span_id), + "name": service_name, + "timestamp": JsonV1Encoder._nsec_to_usec_round( + otel_span.start_time + ), + "duration": JsonV1Encoder._nsec_to_usec_round( + otel_span.end_time - otel_span.start_time + ), + "binaryAnnotations": binary_annotations, + "debug": True, + } + ] + + self.assert_equal_encoded_spans( + json.dumps(expected_output), + JsonV1Encoder(max_tag_value_length).serialize( + [otel_span], NodeEndpoint() + ), + ) diff --git a/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v2_json.py b/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v2_json.py new file mode 100644 index 00000000000..369eef68952 --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v2_json.py @@ -0,0 +1,205 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + +from opentelemetry import trace as trace_api +from opentelemetry.exporter.zipkin.encoder import NAME_KEY, VERSION_KEY +from opentelemetry.exporter.zipkin.encoder.v2.json import JsonV2Encoder +from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint +from opentelemetry.sdk import trace +from opentelemetry.trace import SpanKind, TraceFlags + +from .common_tests import TEST_SERVICE_NAME, CommonEncoderTestCases + + +# pylint: disable=protected-access +class TestV2JsonEncoder(CommonEncoderTestCases.CommonJsonEncoderTest): + @staticmethod + def get_encoder(*args, **kwargs) -> JsonV2Encoder: + return JsonV2Encoder(*args, **kwargs) + + def test_encode(self): + local_endpoint = {"serviceName": TEST_SERVICE_NAME} + span_kind = JsonV2Encoder.SPAN_KIND_MAP[SpanKind.INTERNAL] + + otel_spans = self.get_exhaustive_otel_span_list() + trace_id = JsonV2Encoder._encode_trace_id( + otel_spans[0].context.trace_id + ) + + expected_output = [ + { + "traceId": trace_id, + "id": JsonV2Encoder._encode_span_id( + otel_spans[0].context.span_id + ), + "name": otel_spans[0].name, + "timestamp": otel_spans[0].start_time // 10 ** 3, + "duration": (otel_spans[0].end_time // 10 ** 3) + - (otel_spans[0].start_time // 10 ** 3), + "localEndpoint": local_endpoint, + "kind": span_kind, + "tags": { + "key_bool": "false", + "key_string": "hello_world", + "key_float": "111.22", + "otel.status_code": "OK", + }, + "annotations": [ + { + "timestamp": otel_spans[0].events[0].timestamp + // 10 ** 3, + "value": json.dumps( + { + "event0": { + "annotation_bool": True, + "annotation_string": "annotation_test", + "key_float": 0.3, + } + }, + sort_keys=True, + ), + } + ], + "debug": True, + "parentId": JsonV2Encoder._encode_span_id( + otel_spans[0].parent.span_id + ), + }, + { + "traceId": trace_id, + "id": JsonV2Encoder._encode_span_id( + otel_spans[1].context.span_id + ), + "name": otel_spans[1].name, + "timestamp": otel_spans[1].start_time // 10 ** 3, + "duration": (otel_spans[1].end_time // 10 ** 3) + - (otel_spans[1].start_time // 10 ** 3), + "localEndpoint": local_endpoint, + "kind": span_kind, + "tags": { + "key_resource": "some_resource", + "otel.status_code": "ERROR", + "error": "Example description", + }, + }, + { + "traceId": trace_id, + "id": JsonV2Encoder._encode_span_id( + otel_spans[2].context.span_id + ), + "name": otel_spans[2].name, + "timestamp": otel_spans[2].start_time // 10 ** 3, + "duration": (otel_spans[2].end_time // 10 ** 3) + - (otel_spans[2].start_time // 10 ** 3), + "localEndpoint": local_endpoint, + "kind": span_kind, + "tags": { + "key_string": "hello_world", + "key_resource": "some_resource", + }, + }, + { + "traceId": trace_id, + "id": JsonV2Encoder._encode_span_id( + otel_spans[3].context.span_id + ), + "name": otel_spans[3].name, + "timestamp": otel_spans[3].start_time // 10 ** 3, + "duration": (otel_spans[3].end_time // 10 ** 3) + - (otel_spans[3].start_time // 10 ** 3), + "localEndpoint": local_endpoint, + "kind": span_kind, + "tags": {NAME_KEY: "name", VERSION_KEY: "version"}, + }, + ] + + self.assert_equal_encoded_spans( + json.dumps(expected_output), + JsonV2Encoder().serialize(otel_spans, NodeEndpoint()), + ) + + def test_encode_id_zero_padding(self): + trace_id = 0x0E0C63257DE34C926F9EFCD03927272E + span_id = 0x04BF92DEEFC58C92 + parent_id = 0x0AAAAAAAAAAAAAAA + start_time = 683647322 * 10 ** 9 # in ns + duration = 50 * 10 ** 6 + end_time = start_time + duration + + otel_span = trace._Span( + name=TEST_SERVICE_NAME, + context=trace_api.SpanContext( + trace_id, + span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ), + parent=trace_api.SpanContext(trace_id, parent_id, is_remote=False), + resource=trace.Resource({}), + ) + otel_span.start(start_time=start_time) + otel_span.end(end_time=end_time) + + expected_output = [ + { + "traceId": format(trace_id, "032x"), + "id": format(span_id, "016x"), + "name": TEST_SERVICE_NAME, + "timestamp": JsonV2Encoder._nsec_to_usec_round(start_time), + "duration": JsonV2Encoder._nsec_to_usec_round(duration), + "localEndpoint": {"serviceName": TEST_SERVICE_NAME}, + "kind": JsonV2Encoder.SPAN_KIND_MAP[SpanKind.INTERNAL], + "debug": True, + "parentId": format(parent_id, "016x"), + } + ] + + self.assert_equal_encoded_spans( + json.dumps(expected_output), + JsonV2Encoder().serialize([otel_span], NodeEndpoint()), + ) + + def _test_encode_max_tag_length(self, max_tag_value_length: int): + otel_span, expected_tag_output = self.get_data_for_max_tag_length_test( + max_tag_value_length + ) + service_name = otel_span.name + + expected_output = [ + { + "traceId": JsonV2Encoder._encode_trace_id( + otel_span.context.trace_id + ), + "id": JsonV2Encoder._encode_span_id(otel_span.context.span_id), + "name": service_name, + "timestamp": JsonV2Encoder._nsec_to_usec_round( + otel_span.start_time + ), + "duration": JsonV2Encoder._nsec_to_usec_round( + otel_span.end_time - otel_span.start_time + ), + "localEndpoint": {"serviceName": service_name}, + "kind": JsonV2Encoder.SPAN_KIND_MAP[SpanKind.INTERNAL], + "tags": expected_tag_output, + "debug": True, + } + ] + + self.assert_equal_encoded_spans( + json.dumps(expected_output), + JsonV2Encoder(max_tag_value_length).serialize( + [otel_span], NodeEndpoint() + ), + ) diff --git a/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v2_protobuf.py b/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v2_protobuf.py new file mode 100644 index 00000000000..092a8a303ca --- /dev/null +++ b/exporter/opentelemetry-exporter-zipkin/tests/encoder/test_v2_protobuf.py @@ -0,0 +1,234 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import ipaddress +import json + +from opentelemetry.exporter.zipkin.encoder import NAME_KEY, VERSION_KEY +from opentelemetry.exporter.zipkin.encoder.v2.protobuf import ProtobufEncoder +from opentelemetry.exporter.zipkin.encoder.v2.protobuf.gen import zipkin_pb2 +from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint +from opentelemetry.trace import SpanKind + +from .common_tests import TEST_SERVICE_NAME, CommonEncoderTestCases + + +# pylint: disable=protected-access +class TestProtobufEncoder(CommonEncoderTestCases.CommonEncoderTest): + @staticmethod + def get_encoder(*args, **kwargs) -> ProtobufEncoder: + return ProtobufEncoder(*args, **kwargs) + + def test_encode_trace_id(self): + for trace_id in (1, 1024, 2 ** 32, 2 ** 64, 2 ** 127): + self.assertEqual( + self.get_encoder_default()._encode_trace_id(trace_id), + trace_id.to_bytes(length=16, byteorder="big", signed=False), + ) + + def test_encode_span_id(self): + for span_id in (1, 1024, 2 ** 8, 2 ** 16, 2 ** 32, 2 ** 63): + self.assertEqual( + self.get_encoder_default()._encode_span_id(span_id), + span_id.to_bytes(length=8, byteorder="big", signed=False), + ) + + def test_encode_local_endpoint_default(self): + self.assertEqual( + ProtobufEncoder()._encode_local_endpoint(NodeEndpoint()), + zipkin_pb2.Endpoint(service_name=TEST_SERVICE_NAME), + ) + + def test_encode_local_endpoint_explicits(self): + ipv4 = "192.168.0.1" + ipv6 = "2001:db8::c001" + port = 414120 + self.assertEqual( + ProtobufEncoder()._encode_local_endpoint( + NodeEndpoint(ipv4, ipv6, port) + ), + zipkin_pb2.Endpoint( + service_name=TEST_SERVICE_NAME, + ipv4=ipaddress.ip_address(ipv4).packed, + ipv6=ipaddress.ip_address(ipv6).packed, + port=port, + ), + ) + + def test_encode(self): + local_endpoint = zipkin_pb2.Endpoint(service_name=TEST_SERVICE_NAME) + span_kind = ProtobufEncoder.SPAN_KIND_MAP[SpanKind.INTERNAL] + + otel_spans = self.get_exhaustive_otel_span_list() + trace_id = ProtobufEncoder._encode_trace_id( + otel_spans[0].context.trace_id + ) + expected_output = zipkin_pb2.ListOfSpans( + spans=[ + zipkin_pb2.Span( + trace_id=trace_id, + id=ProtobufEncoder._encode_span_id( + otel_spans[0].context.span_id + ), + name=otel_spans[0].name, + timestamp=ProtobufEncoder._nsec_to_usec_round( + otel_spans[0].start_time + ), + duration=( + ProtobufEncoder._nsec_to_usec_round( + otel_spans[0].end_time - otel_spans[0].start_time + ) + ), + local_endpoint=local_endpoint, + kind=span_kind, + tags={ + "key_bool": "false", + "key_string": "hello_world", + "key_float": "111.22", + "otel.status_code": "OK", + }, + debug=True, + parent_id=ProtobufEncoder._encode_span_id( + otel_spans[0].parent.span_id + ), + annotations=[ + zipkin_pb2.Annotation( + timestamp=ProtobufEncoder._nsec_to_usec_round( + otel_spans[0].events[0].timestamp + ), + value=json.dumps( + { + "event0": { + "annotation_bool": True, + "annotation_string": "annotation_test", + "key_float": 0.3, + } + }, + sort_keys=True, + ), + ), + ], + ), + zipkin_pb2.Span( + trace_id=trace_id, + id=ProtobufEncoder._encode_span_id( + otel_spans[1].context.span_id + ), + name=otel_spans[1].name, + timestamp=ProtobufEncoder._nsec_to_usec_round( + otel_spans[1].start_time + ), + duration=( + ProtobufEncoder._nsec_to_usec_round( + otel_spans[1].end_time - otel_spans[1].start_time + ) + ), + local_endpoint=local_endpoint, + kind=span_kind, + tags={ + "key_resource": "some_resource", + "otel.status_code": "ERROR", + "error": "Example description", + }, + debug=False, + ), + zipkin_pb2.Span( + trace_id=trace_id, + id=ProtobufEncoder._encode_span_id( + otel_spans[2].context.span_id + ), + name=otel_spans[2].name, + timestamp=ProtobufEncoder._nsec_to_usec_round( + otel_spans[2].start_time + ), + duration=( + ProtobufEncoder._nsec_to_usec_round( + otel_spans[2].end_time - otel_spans[2].start_time + ) + ), + local_endpoint=local_endpoint, + kind=span_kind, + tags={ + "key_string": "hello_world", + "key_resource": "some_resource", + }, + debug=False, + ), + zipkin_pb2.Span( + trace_id=trace_id, + id=ProtobufEncoder._encode_span_id( + otel_spans[3].context.span_id + ), + name=otel_spans[3].name, + timestamp=ProtobufEncoder._nsec_to_usec_round( + otel_spans[3].start_time + ), + duration=( + ProtobufEncoder._nsec_to_usec_round( + otel_spans[3].end_time - otel_spans[3].start_time + ) + ), + local_endpoint=local_endpoint, + kind=span_kind, + tags={NAME_KEY: "name", VERSION_KEY: "version"}, + debug=False, + ), + ], + ) + + actual_output = zipkin_pb2.ListOfSpans.FromString( + ProtobufEncoder().serialize(otel_spans, NodeEndpoint()) + ) + + self.assertEqual(actual_output, expected_output) + + def _test_encode_max_tag_length(self, max_tag_value_length: int): + otel_span, expected_tag_output = self.get_data_for_max_tag_length_test( + max_tag_value_length + ) + service_name = otel_span.name + + expected_output = zipkin_pb2.ListOfSpans( + spans=[ + zipkin_pb2.Span( + trace_id=ProtobufEncoder._encode_trace_id( + otel_span.context.trace_id + ), + id=ProtobufEncoder._encode_span_id( + otel_span.context.span_id + ), + name=service_name, + timestamp=ProtobufEncoder._nsec_to_usec_round( + otel_span.start_time + ), + duration=ProtobufEncoder._nsec_to_usec_round( + otel_span.end_time - otel_span.start_time + ), + local_endpoint=zipkin_pb2.Endpoint( + service_name=service_name + ), + kind=ProtobufEncoder.SPAN_KIND_MAP[SpanKind.INTERNAL], + tags=expected_tag_output, + annotations=None, + debug=True, + ) + ] + ) + + actual_output = zipkin_pb2.ListOfSpans.FromString( + ProtobufEncoder(max_tag_value_length).serialize( + [otel_span], NodeEndpoint() + ) + ) + + self.assertEqual(actual_output, expected_output) diff --git a/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py b/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py index 36320b78c38..43cbd25403f 100644 --- a/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py +++ b/exporter/opentelemetry-exporter-zipkin/tests/test_zipkin_exporter.py @@ -12,33 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json +import ipaddress import os import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import patch -from opentelemetry import trace as trace_api -from opentelemetry.exporter.zipkin import ( - NAME_KEY, - SPAN_KIND_MAP_JSON, - SPAN_KIND_MAP_PROTOBUF, - TRANSPORT_FORMAT_JSON, - TRANSPORT_FORMAT_PROTOBUF, - VERSION_KEY, - ZipkinSpanExporter, - nsec_to_usec_round, -) -from opentelemetry.exporter.zipkin.gen import zipkin_pb2 -from opentelemetry.sdk import trace +from opentelemetry import trace +from opentelemetry.exporter.zipkin import DEFAULT_ENDPOINT, ZipkinSpanExporter +from opentelemetry.exporter.zipkin.encoder import Protocol +from opentelemetry.exporter.zipkin.encoder.v2.protobuf import ProtobufEncoder +from opentelemetry.exporter.zipkin.node_endpoint import NodeEndpoint from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_ZIPKIN_ENDPOINT, - OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT, ) -from opentelemetry.sdk.trace import Resource +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SpanExportResult -from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.trace import SpanKind, TraceFlags -from opentelemetry.trace.status import Status, StatusCode + +TEST_SERVICE_NAME = "test_service" class MockResponse: @@ -48,883 +39,127 @@ def __init__(self, status_code): class TestZipkinSpanExporter(unittest.TestCase): - def setUp(self): - # create and save span to be used in tests - context = trace_api.SpanContext( - trace_id=0x000000000000000000000000DEADBEEF, - span_id=0x00000000DEADBEF0, - is_remote=False, + @classmethod + def setUpClass(cls): + trace.set_tracer_provider( + TracerProvider( + resource=Resource({SERVICE_NAME: TEST_SERVICE_NAME}) + ) ) - self._test_span = trace._Span("test_span", context=context) - self._test_span.start() - self._test_span.end() - def tearDown(self): if OTEL_EXPORTER_ZIPKIN_ENDPOINT in os.environ: del os.environ[OTEL_EXPORTER_ZIPKIN_ENDPOINT] - if OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT in os.environ: - del os.environ[OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT] - - def test_constructor_env_var(self): - """Test the default values assigned by constructor.""" - url = "https://foo:9911/path" - os.environ[OTEL_EXPORTER_ZIPKIN_ENDPOINT] = url - os.environ[ - OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT - ] = TRANSPORT_FORMAT_PROTOBUF - service_name = "my-service-name" - port = 9911 - exporter = ZipkinSpanExporter(service_name) - ipv4 = None - ipv6 = None - - self.assertEqual(exporter.service_name, service_name) - self.assertEqual(exporter.ipv4, ipv4) - self.assertEqual(exporter.ipv6, ipv6) - self.assertEqual(exporter.url, url) - self.assertEqual(exporter.port, port) - self.assertEqual(exporter.transport_format, TRANSPORT_FORMAT_PROTOBUF) def test_constructor_default(self): - """Test the default values assigned by constructor.""" - service_name = "my-service-name" - port = 9411 - exporter = ZipkinSpanExporter(service_name) - ipv4 = None - ipv6 = None - url = "http://localhost:9411/api/v2/spans" - transport_format = TRANSPORT_FORMAT_JSON - - self.assertEqual(exporter.service_name, service_name) - self.assertEqual(exporter.port, port) - self.assertEqual(exporter.ipv4, ipv4) - self.assertEqual(exporter.ipv6, ipv6) - self.assertEqual(exporter.url, url) - self.assertEqual(exporter.transport_format, transport_format) - - def test_constructor_explicit(self): - """Test the constructor passing all the options.""" - service_name = "my-opentelemetry-zipkin" - port = 15875 - ipv4 = "1.2.3.4" - ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334" - url = "https://opentelemetry.io:15875/myapi/traces?format=zipkin" - transport_format = TRANSPORT_FORMAT_PROTOBUF - - exporter = ZipkinSpanExporter( - service_name=service_name, - url=url, - ipv4=ipv4, - ipv6=ipv6, - transport_format=transport_format, - ) - - self.assertEqual(exporter.service_name, service_name) - self.assertEqual(exporter.port, port) - self.assertEqual(exporter.ipv4, ipv4) - self.assertEqual(exporter.ipv6, ipv6) - self.assertEqual(exporter.url, url) - self.assertEqual(exporter.transport_format, transport_format) - - # pylint: disable=too-many-locals,too-many-statements - def test_export_json(self): - span_names = ("test1", "test2", "test3", "test4") - trace_id = 0x6E0C63257DE34C926F9EFCD03927272E - span_id = 0x34BF92DEEFC58C92 - parent_id = 0x1111111111111111 - other_id = 0x2222222222222222 - - base_time = 683647322 * 10 ** 9 # in ns - start_times = ( - base_time, - base_time + 150 * 10 ** 6, - base_time + 300 * 10 ** 6, - base_time + 400 * 10 ** 6, - ) - durations = (50 * 10 ** 6, 100 * 10 ** 6, 200 * 10 ** 6, 300 * 10 ** 6) - end_times = ( - start_times[0] + durations[0], - start_times[1] + durations[1], - start_times[2] + durations[2], - start_times[3] + durations[3], - ) - - span_context = trace_api.SpanContext( - trace_id, - span_id, - is_remote=False, - trace_flags=TraceFlags(TraceFlags.SAMPLED), - ) - parent_span_context = trace_api.SpanContext( - trace_id, parent_id, is_remote=False - ) - other_context = trace_api.SpanContext( - trace_id, other_id, is_remote=False - ) - - event_attributes = { - "annotation_bool": True, - "annotation_string": "annotation_test", - "key_float": 0.3, - } - - event_timestamp = base_time + 50 * 10 ** 6 - event = trace.Event( - name="event0", - timestamp=event_timestamp, - attributes=event_attributes, - ) - - link_attributes = {"key_bool": True} - - link = trace_api.Link( - context=other_context, attributes=link_attributes - ) - - otel_spans = [ - trace._Span( - name=span_names[0], - context=span_context, - parent=parent_span_context, - events=(event,), - links=(link,), - resource=Resource({}), - ), - trace._Span( - name=span_names[1], - context=parent_span_context, - parent=None, - resource=Resource( - attributes={"key_resource": "some_resource"} - ), - ), - trace._Span( - name=span_names[2], - context=other_context, - parent=None, - resource=Resource( - attributes={"key_resource": "some_resource"} - ), - ), - trace._Span( - name=span_names[3], - context=other_context, - parent=None, - resource=Resource({}), - instrumentation_info=InstrumentationInfo( - name="name", version="version" - ), - ), - ] - - otel_spans[0].start(start_time=start_times[0]) - # added here to preserve order - otel_spans[0].set_attribute("key_bool", False) - otel_spans[0].set_attribute("key_string", "hello_world") - otel_spans[0].set_attribute("key_float", 111.22) - otel_spans[0].set_status( - Status(StatusCode.ERROR, "Example description") - ) - otel_spans[0].end(end_time=end_times[0]) - - otel_spans[1].start(start_time=start_times[1]) - otel_spans[1].end(end_time=end_times[1]) - - otel_spans[2].start(start_time=start_times[2]) - otel_spans[2].set_attribute("key_string", "hello_world") - otel_spans[2].end(end_time=end_times[2]) - - otel_spans[3].start(start_time=start_times[3]) - otel_spans[3].end(end_time=end_times[3]) - - service_name = "test-service" - local_endpoint = {"serviceName": service_name, "port": 9411} - span_kind = SPAN_KIND_MAP_JSON[SpanKind.INTERNAL] - - exporter = ZipkinSpanExporter(service_name) - expected_spans = [ - { - "traceId": format(trace_id, "x"), - "id": format(span_id, "x"), - "name": span_names[0], - "timestamp": start_times[0] // 10 ** 3, - "duration": durations[0] // 10 ** 3, - "localEndpoint": local_endpoint, - "kind": span_kind, - "tags": { - "key_bool": "false", - "key_string": "hello_world", - "key_float": "111.22", - "otel.status_code": "ERROR", - "error": "Example description", - }, - "debug": True, - "parentId": format(parent_id, "x"), - "annotations": [ - { - "timestamp": event_timestamp // 10 ** 3, - "value": { - "event0": { - "annotation_bool": True, - "annotation_string": "annotation_test", - "key_float": 0.3, - } - }, - } - ], - }, - { - "traceId": format(trace_id, "x"), - "id": format(parent_id, "x"), - "name": span_names[1], - "timestamp": start_times[1] // 10 ** 3, - "duration": durations[1] // 10 ** 3, - "localEndpoint": local_endpoint, - "kind": span_kind, - "tags": {"key_resource": "some_resource"}, - "annotations": None, - }, - { - "traceId": format(trace_id, "x"), - "id": format(other_id, "x"), - "name": span_names[2], - "timestamp": start_times[2] // 10 ** 3, - "duration": durations[2] // 10 ** 3, - "localEndpoint": local_endpoint, - "kind": span_kind, - "tags": { - "key_string": "hello_world", - "key_resource": "some_resource", - }, - "annotations": None, - }, - { - "traceId": format(trace_id, "x"), - "id": format(other_id, "x"), - "name": span_names[3], - "timestamp": start_times[3] // 10 ** 3, - "duration": durations[3] // 10 ** 3, - "localEndpoint": local_endpoint, - "kind": span_kind, - "tags": {NAME_KEY: "name", VERSION_KEY: "version"}, - "annotations": None, - }, - ] - - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export(otel_spans) - self.assertEqual(SpanExportResult.SUCCESS, status) - - # pylint: disable=unsubscriptable-object - kwargs = mock_post.call_args[1] - - self.assertEqual(kwargs["url"], "http://localhost:9411/api/v2/spans") - self.assertEqual(kwargs["headers"]["Content-Type"], "application/json") - actual_spans = sorted( - json.loads(kwargs["data"]), key=lambda span: span["timestamp"] - ) - for expected, actual in zip(expected_spans, actual_spans): - expected_annotations = expected.pop("annotations", None) - actual_annotations = actual.pop("annotations", None) - if actual_annotations: - for annotation in actual_annotations: - annotation["value"] = json.loads(annotation["value"]) - self.assertEqual(expected, actual) - self.assertEqual(expected_annotations, actual_annotations) - - # pylint: disable=too-many-locals - def test_export_json_zero_padding(self): - """test that hex ids starting with 0 - are properly padded to 16 or 32 hex chars - when exported + exporter = ZipkinSpanExporter(Protocol.V2_PROTOBUF) + self.assertIsInstance(exporter.encoder, ProtobufEncoder) + self.assertEqual(exporter.endpoint, DEFAULT_ENDPOINT) + self.assertEqual(exporter.local_node.service_name, TEST_SERVICE_NAME) + self.assertEqual(exporter.local_node.ipv4, None) + self.assertEqual(exporter.local_node.ipv6, None) + self.assertEqual(exporter.local_node.port, None) + + def test_constructor_env_vars(self): + os_endpoint = "https://foo:9911/path" + os.environ[OTEL_EXPORTER_ZIPKIN_ENDPOINT] = os_endpoint + + exporter = ZipkinSpanExporter(Protocol.V2_PROTOBUF) + + self.assertEqual(exporter.endpoint, os_endpoint) + self.assertEqual(exporter.local_node.service_name, TEST_SERVICE_NAME) + self.assertEqual(exporter.local_node.ipv4, None) + self.assertEqual(exporter.local_node.ipv6, None) + self.assertEqual(exporter.local_node.port, None) + + def test_constructor_protocol_endpoint(self): + """Test the constructor for the common usage of providing the + protocol and endpoint arguments.""" + protocol = Protocol.V2_PROTOBUF + endpoint = "https://opentelemetry.io:15875/myapi/traces?format=zipkin" + + exporter = ZipkinSpanExporter(protocol, endpoint) + + self.assertIsInstance(exporter.encoder, ProtobufEncoder) + self.assertEqual(exporter.endpoint, endpoint) + self.assertEqual(exporter.local_node.service_name, TEST_SERVICE_NAME) + self.assertEqual(exporter.local_node.ipv4, None) + self.assertEqual(exporter.local_node.ipv6, None) + self.assertEqual(exporter.local_node.port, None) + + def test_constructor_all_params_and_env_vars(self): + """Test the scenario where all params are provided and all OS env + vars are set. Explicit params should take precedence. """ + os_endpoint = "https://os.env.param:9911/path" + os.environ[OTEL_EXPORTER_ZIPKIN_ENDPOINT] = os_endpoint - span_names = "testZeroes" - trace_id = 0x0E0C63257DE34C926F9EFCD03927272E - span_id = 0x04BF92DEEFC58C92 - parent_id = 0x0AAAAAAAAAAAAAAA + constructor_param_protocol = Protocol.V2_PROTOBUF + constructor_param_endpoint = "https://constructor.param:9911/path" + local_node_ipv4 = "192.168.0.1" + local_node_ipv6 = "2001:db8::1000" + local_node_port = 30301 + max_tag_value_length = 56 - start_time = 683647322 * 10 ** 9 # in ns - duration = 50 * 10 ** 6 - end_time = start_time + duration - - span_context = trace_api.SpanContext( - trace_id, - span_id, - is_remote=False, - trace_flags=TraceFlags(TraceFlags.SAMPLED), - ) - parent_span_context = trace_api.SpanContext( - trace_id, parent_id, is_remote=False + exporter = ZipkinSpanExporter( + constructor_param_protocol, + constructor_param_endpoint, + local_node_ipv4, + local_node_ipv6, + local_node_port, + max_tag_value_length, ) - otel_span = trace._Span( - name=span_names[0], - context=span_context, - parent=parent_span_context, - resource=Resource({}), + self.assertIsInstance(exporter.encoder, ProtobufEncoder) + self.assertEqual(exporter.endpoint, constructor_param_endpoint) + self.assertEqual(exporter.local_node.service_name, TEST_SERVICE_NAME) + self.assertEqual( + exporter.local_node.ipv4, ipaddress.IPv4Address(local_node_ipv4) ) - - otel_span.start(start_time=start_time) - otel_span.end(end_time=end_time) - - service_name = "test-service" - local_endpoint = {"serviceName": service_name, "port": 9411} - - exporter = ZipkinSpanExporter(service_name) - # Check traceId are properly lowercase 16 or 32 hex - expected = [ - { - "traceId": "0e0c63257de34c926f9efcd03927272e", - "id": "04bf92deefc58c92", - "name": span_names[0], - "timestamp": start_time // 10 ** 3, - "duration": duration // 10 ** 3, - "localEndpoint": local_endpoint, - "kind": SPAN_KIND_MAP_JSON[SpanKind.INTERNAL], - "tags": {}, - "annotations": None, - "debug": True, - "parentId": "0aaaaaaaaaaaaaaa", - } - ] - - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([otel_span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - mock_post.assert_called_with( - url="http://localhost:9411/api/v2/spans", - data=json.dumps(expected), - headers={"Content-Type": "application/json"}, + self.assertEqual( + exporter.local_node.ipv6, ipaddress.IPv6Address(local_node_ipv6) ) + self.assertEqual(exporter.local_node.port, local_node_port) @patch("requests.post") def test_invalid_response(self, mock_post): mock_post.return_value = MockResponse(404) spans = [] - exporter = ZipkinSpanExporter("test-service") + exporter = ZipkinSpanExporter(Protocol.V2_PROTOBUF) status = exporter.export(spans) self.assertEqual(SpanExportResult.FAILURE, status) - def test_export_json_max_tag_length(self): - service_name = "test-service" - - span_context = trace_api.SpanContext( - 0x0E0C63257DE34C926F9EFCD03927272E, - 0x04BF92DEEFC58C92, - is_remote=False, - trace_flags=TraceFlags(TraceFlags.SAMPLED), - ) - - span = trace._Span( - name="test-span", context=span_context, resource=Resource({}) - ) - - span.start() - # added here to preserve order - span.set_attribute("string1", "v" * 500) - span.set_attribute("string2", "v" * 50) - span.set_attribute("list1", ["a"] * 25) - span.set_attribute("list2", ["a"] * 10) - span.set_attribute("list3", [2] * 25) - span.set_attribute("list4", [2] * 10) - span.set_attribute("list5", [True] * 25) - span.set_attribute("list6", [True] * 10) - span.set_attribute("tuple1", ("a",) * 25) - span.set_attribute("tuple2", ("a",) * 10) - span.set_attribute("tuple3", (2,) * 25) - span.set_attribute("tuple4", (2,) * 10) - span.set_attribute("tuple5", (True,) * 25) - span.set_attribute("tuple6", (True,) * 10) - span.set_attribute("range1", range(0, 25)) - span.set_attribute("range2", range(0, 10)) - span.set_attribute("empty_list", []) - span.set_attribute("none_list", ["hello", None, "world"]) - - span.set_status(Status(StatusCode.ERROR, "Example description")) - span.end() - - exporter = ZipkinSpanExporter(service_name) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - _, kwargs = mock_post.call_args # pylint: disable=E0633 - - tags = json.loads(kwargs["data"])[0]["tags"] - self.assertEqual(len(tags["string1"]), 128) - self.assertEqual(len(tags["string2"]), 50) - self.assertEqual( - tags["list1"], - '["a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a"]', - ) - self.assertEqual( - tags["list2"], '["a","a","a","a","a","a","a","a","a","a"]', - ) - self.assertEqual( - tags["list3"], - '["2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2"]', - ) - self.assertEqual( - tags["list4"], '["2","2","2","2","2","2","2","2","2","2"]', - ) - self.assertEqual( - tags["list5"], - '["true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true"]', - ) - self.assertEqual( - tags["list6"], - '["true","true","true","true","true","true","true","true","true","true"]', - ) - self.assertEqual( - tags["tuple1"], - '["a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a","a"]', - ) - self.assertEqual( - tags["tuple2"], '["a","a","a","a","a","a","a","a","a","a"]', - ) - self.assertEqual( - tags["tuple3"], - '["2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2","2"]', - ) - self.assertEqual( - tags["tuple4"], '["2","2","2","2","2","2","2","2","2","2"]', - ) - self.assertEqual( - tags["tuple5"], - '["true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true","true"]', - ) - self.assertEqual( - tags["tuple6"], - '["true","true","true","true","true","true","true","true","true","true"]', - ) - self.assertEqual( - tags["range1"], - '["0","1","2","3","4","5","6","7","8","9","10","11","12","13","14","15","16","17","18","19","20","21","22","23","24"]', - ) - self.assertEqual( - tags["range2"], '["0","1","2","3","4","5","6","7","8","9"]', - ) - self.assertEqual( - tags["empty_list"], "[]", - ) - self.assertEqual( - tags["none_list"], '["hello",null,"world"]', - ) - - exporter = ZipkinSpanExporter(service_name, max_tag_value_length=2) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - _, kwargs = mock_post.call_args # pylint: disable=E0633 - tags = json.loads(kwargs["data"])[0]["tags"] - self.assertEqual(len(tags["string1"]), 2) - self.assertEqual(len(tags["string2"]), 2) - self.assertEqual(tags["list1"], "[]") - self.assertEqual(tags["list2"], "[]") - self.assertEqual(tags["list3"], "[]") - self.assertEqual(tags["list4"], "[]") - self.assertEqual(tags["list5"], "[]") - self.assertEqual(tags["list6"], "[]") - self.assertEqual(tags["tuple1"], "[]") - self.assertEqual(tags["tuple2"], "[]") - self.assertEqual(tags["tuple3"], "[]") - self.assertEqual(tags["tuple4"], "[]") - self.assertEqual(tags["tuple5"], "[]") - self.assertEqual(tags["tuple6"], "[]") - self.assertEqual(tags["range1"], "[]") - self.assertEqual(tags["range2"], "[]") - - exporter = ZipkinSpanExporter(service_name, max_tag_value_length=5) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - _, kwargs = mock_post.call_args # pylint: disable=E0633 - tags = json.loads(kwargs["data"])[0]["tags"] - self.assertEqual(len(tags["string1"]), 5) - self.assertEqual(len(tags["string2"]), 5) - self.assertEqual(tags["list1"], '["a"]') - self.assertEqual(tags["list2"], '["a"]') - self.assertEqual(tags["list3"], '["2"]') - self.assertEqual(tags["list4"], '["2"]') - self.assertEqual(tags["list5"], "[]") - self.assertEqual(tags["list6"], "[]") - self.assertEqual(tags["tuple1"], '["a"]') - self.assertEqual(tags["tuple2"], '["a"]') - self.assertEqual(tags["tuple3"], '["2"]') - self.assertEqual(tags["tuple4"], '["2"]') - self.assertEqual(tags["tuple5"], "[]") - self.assertEqual(tags["tuple6"], "[]") - self.assertEqual(tags["range1"], '["0"]') - self.assertEqual(tags["range2"], '["0"]') - - exporter = ZipkinSpanExporter(service_name, max_tag_value_length=9) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - _, kwargs = mock_post.call_args # pylint: disable=E0633 - tags = json.loads(kwargs["data"])[0]["tags"] - self.assertEqual(len(tags["string1"]), 9) - self.assertEqual(len(tags["string2"]), 9) - self.assertEqual(tags["list1"], '["a","a"]') - self.assertEqual(tags["list2"], '["a","a"]') - self.assertEqual(tags["list3"], '["2","2"]') - self.assertEqual(tags["list4"], '["2","2"]') - self.assertEqual(tags["list5"], '["true"]') - self.assertEqual(tags["list6"], '["true"]') - self.assertEqual(tags["tuple1"], '["a","a"]') - self.assertEqual(tags["tuple2"], '["a","a"]') - self.assertEqual(tags["tuple3"], '["2","2"]') - self.assertEqual(tags["tuple4"], '["2","2"]') - self.assertEqual(tags["tuple5"], '["true"]') - self.assertEqual(tags["tuple6"], '["true"]') - self.assertEqual(tags["range1"], '["0","1"]') - self.assertEqual(tags["range2"], '["0","1"]') - - exporter = ZipkinSpanExporter(service_name, max_tag_value_length=10) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - _, kwargs = mock_post.call_args # pylint: disable=E0633 - tags = json.loads(kwargs["data"])[0]["tags"] - self.assertEqual(len(tags["string1"]), 10) - self.assertEqual(len(tags["string2"]), 10) - self.assertEqual(tags["list1"], '["a","a"]') - self.assertEqual(tags["list2"], '["a","a"]') - self.assertEqual(tags["list3"], '["2","2"]') - self.assertEqual(tags["list4"], '["2","2"]') - self.assertEqual(tags["list5"], '["true"]') - self.assertEqual(tags["list6"], '["true"]') - self.assertEqual(tags["tuple1"], '["a","a"]') - self.assertEqual(tags["tuple2"], '["a","a"]') - self.assertEqual(tags["tuple3"], '["2","2"]') - self.assertEqual(tags["tuple4"], '["2","2"]') - self.assertEqual(tags["tuple5"], '["true"]') - self.assertEqual(tags["tuple6"], '["true"]') - self.assertEqual(tags["range1"], '["0","1"]') - self.assertEqual(tags["range2"], '["0","1"]') - - exporter = ZipkinSpanExporter(service_name, max_tag_value_length=11) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - _, kwargs = mock_post.call_args # pylint: disable=E0633 - tags = json.loads(kwargs["data"])[0]["tags"] - self.assertEqual(len(tags["string1"]), 11) - self.assertEqual(len(tags["string2"]), 11) - self.assertEqual(tags["list1"], '["a","a"]') - self.assertEqual(tags["list2"], '["a","a"]') - self.assertEqual(tags["list3"], '["2","2"]') - self.assertEqual(tags["list4"], '["2","2"]') - self.assertEqual(tags["list5"], '["true"]') - self.assertEqual(tags["list6"], '["true"]') - self.assertEqual(tags["tuple1"], '["a","a"]') - self.assertEqual(tags["tuple2"], '["a","a"]') - self.assertEqual(tags["tuple3"], '["2","2"]') - self.assertEqual(tags["tuple4"], '["2","2"]') - self.assertEqual(tags["tuple5"], '["true"]') - self.assertEqual(tags["tuple6"], '["true"]') - self.assertEqual(tags["range1"], '["0","1"]') - self.assertEqual(tags["range2"], '["0","1"]') - - # pylint: disable=too-many-locals,too-many-statements - def test_export_protobuf(self): - span_names = ("test1", "test2", "test3", "test4") - trace_id = 0x6E0C63257DE34C926F9EFCD03927272E - span_id = 0x34BF92DEEFC58C92 - parent_id = 0x1111111111111111 - other_id = 0x2222222222222222 - - base_time = 683647322 * 10 ** 9 # in ns - start_times = ( - base_time, - base_time + 150 * 10 ** 6, - base_time + 300 * 10 ** 6, - base_time + 400 * 10 ** 6, - ) - durations = (50 * 10 ** 6, 100 * 10 ** 6, 200 * 10 ** 6, 300 * 10 ** 6) - end_times = ( - start_times[0] + durations[0], - start_times[1] + durations[1], - start_times[2] + durations[2], - start_times[3] + durations[3], - ) - - span_context = trace_api.SpanContext( - trace_id, - span_id, - is_remote=False, - trace_flags=TraceFlags(TraceFlags.SAMPLED), - ) - parent_span_context = trace_api.SpanContext( - trace_id, parent_id, is_remote=False - ) - other_context = trace_api.SpanContext( - trace_id, other_id, is_remote=False - ) - - event_attributes = { - "annotation_bool": True, - "annotation_string": "annotation_test", - "key_float": 0.3, - } - - event_timestamp = base_time + 50 * 10 ** 6 - event = trace.Event( - name="event0", - timestamp=event_timestamp, - attributes=event_attributes, - ) - - link_attributes = {"key_bool": True} - - link = trace_api.Link( - context=other_context, attributes=link_attributes - ) - - otel_spans = [ - trace._Span( - name=span_names[0], - context=span_context, - parent=parent_span_context, - resource=Resource({}), - events=(event,), - links=(link,), - ), - trace._Span( - name=span_names[1], - context=parent_span_context, - parent=None, - resource=Resource( - attributes={"key_resource": "some_resource"} - ), - ), - trace._Span( - name=span_names[2], - context=other_context, - parent=None, - resource=Resource( - attributes={"key_resource": "some_resource"} - ), - ), - trace._Span( - name=span_names[3], - context=other_context, - parent=None, - resource=Resource({}), - instrumentation_info=InstrumentationInfo( - name="name", version="version" - ), - ), - ] - - otel_spans[0].start(start_time=start_times[0]) - # added here to preserve order - otel_spans[0].set_attribute("key_bool", False) - otel_spans[0].set_attribute("key_string", "hello_world") - otel_spans[0].set_attribute("key_float", 111.22) - otel_spans[0].set_status( - Status(StatusCode.ERROR, "Example description") - ) - otel_spans[0].end(end_time=end_times[0]) - - otel_spans[1].start(start_time=start_times[1]) - otel_spans[1].set_status(Status(StatusCode.OK)) - otel_spans[1].end(end_time=end_times[1]) - - otel_spans[2].start(start_time=start_times[2]) - otel_spans[2].set_attribute("key_string", "hello_world") - otel_spans[2].end(end_time=end_times[2]) - - otel_spans[3].start(start_time=start_times[3]) - otel_spans[3].end(end_time=end_times[3]) - - service_name = "test-service" - local_endpoint = zipkin_pb2.Endpoint( - service_name=service_name, port=9411 - ) - span_kind = SPAN_KIND_MAP_PROTOBUF[SpanKind.INTERNAL] - - expected_spans = zipkin_pb2.ListOfSpans( - spans=[ - zipkin_pb2.Span( - trace_id=trace_id.to_bytes( - length=16, byteorder="big", signed=False - ), - id=ZipkinSpanExporter.format_pbuf_span_id(span_id), - name=span_names[0], - timestamp=nsec_to_usec_round(start_times[0]), - duration=nsec_to_usec_round(durations[0]), - local_endpoint=local_endpoint, - kind=span_kind, - tags={ - "key_bool": "false", - "key_string": "hello_world", - "key_float": "111.22", - "otel.status_code": "ERROR", - "error": "Example description", - }, - debug=True, - parent_id=ZipkinSpanExporter.format_pbuf_span_id( - parent_id - ), - annotations=[ - zipkin_pb2.Annotation( - timestamp=nsec_to_usec_round(event_timestamp), - value=json.dumps( - { - "event0": { - "annotation_bool": True, - "annotation_string": "annotation_test", - "key_float": 0.3, - } - } - ), - ), - ], - ), - zipkin_pb2.Span( - trace_id=trace_id.to_bytes( - length=16, byteorder="big", signed=False - ), - id=ZipkinSpanExporter.format_pbuf_span_id(parent_id), - name=span_names[1], - timestamp=nsec_to_usec_round(start_times[1]), - duration=nsec_to_usec_round(durations[1]), - local_endpoint=local_endpoint, - kind=span_kind, - tags={ - "key_resource": "some_resource", - "otel.status_code": "OK", - }, - ), - zipkin_pb2.Span( - trace_id=trace_id.to_bytes( - length=16, byteorder="big", signed=False - ), - id=ZipkinSpanExporter.format_pbuf_span_id(other_id), - name=span_names[2], - timestamp=nsec_to_usec_round(start_times[2]), - duration=nsec_to_usec_round(durations[2]), - local_endpoint=local_endpoint, - kind=span_kind, - tags={ - "key_string": "hello_world", - "key_resource": "some_resource", - }, - ), - zipkin_pb2.Span( - trace_id=trace_id.to_bytes( - length=16, byteorder="big", signed=False - ), - id=ZipkinSpanExporter.format_pbuf_span_id(other_id), - name=span_names[3], - timestamp=nsec_to_usec_round(start_times[3]), - duration=nsec_to_usec_round(durations[3]), - local_endpoint=local_endpoint, - kind=span_kind, - tags={NAME_KEY: "name", VERSION_KEY: "version"}, - ), - ], - ) - - exporter = ZipkinSpanExporter( - service_name, transport_format=TRANSPORT_FORMAT_PROTOBUF - ) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export(otel_spans) - self.assertEqual(SpanExportResult.SUCCESS, status) - - # pylint: disable=unsubscriptable-object - kwargs = mock_post.call_args[1] - - self.assertEqual(kwargs["url"], "http://localhost:9411/api/v2/spans") - self.assertEqual( - kwargs["headers"]["Content-Type"], "application/x-protobuf" - ) - self.assertEqual( - zipkin_pb2.ListOfSpans.FromString(kwargs["data"]), expected_spans - ) - - def test_export_protobuf_max_tag_length(self): - service_name = "test-service" - - span_context = trace_api.SpanContext( - 0x0E0C63257DE34C926F9EFCD03927272E, - 0x04BF92DEEFC58C92, - is_remote=False, - trace_flags=TraceFlags(TraceFlags.SAMPLED), - ) - - span = trace._Span( - name="test-span", context=span_context, resource=Resource({}) - ) - - span.start() - # added here to preserve order - span.set_attribute("k1", "v" * 500) - span.set_attribute("k2", "v" * 50) - span.set_status(Status(StatusCode.ERROR, "Example description")) - span.end() - - exporter = ZipkinSpanExporter( - service_name, transport_format=TRANSPORT_FORMAT_PROTOBUF, - ) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - # pylint: disable=unsubscriptable-object - kwargs = mock_post.call_args[1] - actual_spans = zipkin_pb2.ListOfSpans.FromString(kwargs["data"]) - span_tags = actual_spans.spans[0].tags - - self.assertEqual(len(span_tags["k1"]), 128) - self.assertEqual(len(span_tags["k2"]), 50) - - exporter = ZipkinSpanExporter( - service_name, - transport_format=TRANSPORT_FORMAT_PROTOBUF, - max_tag_value_length=2, - ) - mock_post = MagicMock() - with patch("requests.post", mock_post): - mock_post.return_value = MockResponse(200) - status = exporter.export([span]) - self.assertEqual(SpanExportResult.SUCCESS, status) - - # pylint: disable=unsubscriptable-object - kwargs = mock_post.call_args[1] - actual_spans = zipkin_pb2.ListOfSpans.FromString(kwargs["data"]) - span_tags = actual_spans.spans[0].tags - - self.assertEqual(len(span_tags["k1"]), 2) - self.assertEqual(len(span_tags["k2"]), 2) +class TestZipkinNodeEndpoint(unittest.TestCase): + def test_constructor_default(self): + node_endpoint = NodeEndpoint() + self.assertEqual(node_endpoint.ipv4, None) + self.assertEqual(node_endpoint.ipv6, None) + self.assertEqual(node_endpoint.port, None) + self.assertEqual(node_endpoint.service_name, TEST_SERVICE_NAME) + + def test_constructor_explicits(self): + ipv4 = "192.168.0.1" + ipv6 = "2001:db8::c001" + port = 414120 + node_endpoint = NodeEndpoint(ipv4, ipv6, port) + self.assertEqual(node_endpoint.ipv4, ipaddress.IPv4Address(ipv4)) + self.assertEqual(node_endpoint.ipv6, ipaddress.IPv6Address(ipv6)) + self.assertEqual(node_endpoint.port, port) + self.assertEqual(node_endpoint.service_name, TEST_SERVICE_NAME) + + def test_ipv4_invalid_raises_error(self): + with self.assertRaises(ValueError): + NodeEndpoint(ipv4="invalid-ipv4-address") + + def test_ipv4_passed_ipv6_raises_error(self): + with self.assertRaises(ValueError): + NodeEndpoint(ipv4="2001:db8::c001") + + def test_ipv6_invalid_raises_error(self): + with self.assertRaises(ValueError): + NodeEndpoint(ipv6="invalid-ipv6-address") + + def test_ipv6_passed_ipv4_raises_error(self): + with self.assertRaises(ValueError): + NodeEndpoint(ipv6="192.168.0.1") diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py index 2770b3659cc..79d9c29c20f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py @@ -46,4 +46,3 @@ OTEL_EXPORTER_JAEGER_CERTIFICATE = "OTEL_EXPORTER_JAEGER_CERTIFICATE" OTEL_EXPORTER_OTLP_INSECURE = "OTEL_EXPORTER_OTLP_INSECURE" OTEL_EXPORTER_OTLP_SPAN_INSECURE = "OTEL_EXPORTER_OTLP_SPAN_INSECURE" -OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT = "OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT" diff --git a/pyproject.toml b/pyproject.toml index 5cea9a34457..5207dd223a1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ exclude = ''' ( /( # generated files exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/gen| - exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen| + exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/encoder/v2/protobuf/gen| opentelemetry-proto/src/opentelemetry/proto/collector| opentelemetry-proto/src/opentelemetry/proto/common| opentelemetry-proto/src/opentelemetry/proto/metrics|