Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configure header extraction for ASGI middleware via constructor params #2026

Merged
merged 10 commits into from
Jan 31, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1948](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1948))
- Added schema_url (`"https://opentelemetry.io/schemas/1.11.0"`) to all metrics and traces
([#1977](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1977))
- Add support for configuring ASGI middleware header extraction via runtime constructor parameters
([#2026](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2026))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,13 @@ def client_response_hook(span: Span, message: dict):
---
"""

from __future__ import annotations

import typing
import urllib
from functools import wraps
from timeit import default_timer
from typing import Tuple
from typing import Any, Awaitable, Callable, Tuple, cast

from asgiref.compatibility import guarantee_single_callable

Expand Down Expand Up @@ -332,55 +334,28 @@ def collect_request_attributes(scope):
return result


def collect_custom_request_headers_attributes(scope):
"""returns custom HTTP request headers to be added into SERVER span as span attributes
Refer specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers
def collect_custom_headers_attributes(
scope_or_response_message: dict[str, Any],
sanitize: SanitizeValue,
header_regexes: list[str],
normalize_names: Callable[[str], str],
) -> dict[str, str]:
"""
Returns custom HTTP request or response headers to be added into SERVER span as span attributes.

sanitize = SanitizeValue(
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS
)
)

# Decode headers before processing.
headers = {
_key.decode("utf8"): _value.decode("utf8")
for (_key, _value) in scope.get("headers")
}

return sanitize.sanitize_header_values(
headers,
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST
),
normalise_request_header_name,
)


def collect_custom_response_headers_attributes(message):
"""returns custom HTTP response headers to be added into SERVER span as span attributes
Refer specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers
Refer specifications:
- https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers
"""

sanitize = SanitizeValue(
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS
)
)

# Decode headers before processing.
headers = {
headers: dict[str, str] = {
_key.decode("utf8"): _value.decode("utf8")
for (_key, _value) in message.get("headers")
for (_key, _value) in scope_or_response_message.get("headers")
or cast("list[tuple[bytes, bytes]]", [])
}

return sanitize.sanitize_header_values(
headers,
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE
),
normalise_response_header_name,
header_regexes,
normalize_names,
)


Expand Down Expand Up @@ -493,6 +468,9 @@ def __init__(
tracer_provider=None,
meter_provider=None,
meter=None,
http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None,
):
self.app = guarantee_single_callable(app)
self.tracer = trace.get_tracer(
Expand Down Expand Up @@ -540,7 +518,41 @@ def __init__(
self.client_response_hook = client_response_hook
self.content_length_header = None

async def __call__(self, scope, receive, send):
# Environment variables as constructor parameters
self.http_capture_headers_server_request = (
http_capture_headers_server_request
or (
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST
)
)
or None
)
self.http_capture_headers_server_response = (
http_capture_headers_server_response
or (
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE
)
)
or None
)
self.http_capture_headers_sanitize_fields = SanitizeValue(
http_capture_headers_sanitize_fields
or (
get_custom_headers(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS
)
)
or []
)

async def __call__(
self,
scope: dict[str, Any],
receive: Callable[[], Awaitable[dict[str, Any]]],
send: Callable[[dict[str, Any]], Awaitable[None]],
) -> None:
"""The ASGI application

Args:
Expand Down Expand Up @@ -583,7 +595,14 @@ async def __call__(self, scope, receive, send):

if current_span.kind == trace.SpanKind.SERVER:
custom_attributes = (
collect_custom_request_headers_attributes(scope)
collect_custom_headers_attributes(
scope,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_request,
normalise_request_header_name,
)
if self.http_capture_headers_server_request
else {}
)
if len(custom_attributes) > 0:
current_span.set_attributes(custom_attributes)
Expand Down Expand Up @@ -658,7 +677,7 @@ def _get_otel_send(
expecting_trailers = False

@wraps(send)
async def otel_send(message):
async def otel_send(message: dict[str, Any]):
nonlocal expecting_trailers
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
Expand All @@ -685,7 +704,14 @@ async def otel_send(message):
and "headers" in message
):
custom_response_attributes = (
collect_custom_response_headers_attributes(message)
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest import mock
import os

import opentelemetry.instrumentation.asgi as otel_asgi
from opentelemetry.test.asgitestutil import AsgiTestBase
Expand Down Expand Up @@ -72,21 +72,22 @@ async def websocket_app_with_custom_headers(scope, receive, send):
break


@mock.patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS: ".*my-secret.*",
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST: "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3,Regex-Test-Header-.*,Regex-Invalid-Test-Header-.*,.*my-secret.*",
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE: "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3,my-custom-regex-header-.*,invalid-regex-header-.*,.*my-secret.*",
},
)
class TestCustomHeaders(AsgiTestBase, TestBase):
constructor_params = {}
__test__ = False

def __init_subclass__(cls) -> None:
if cls is not TestCustomHeaders:
cls.__test__ = True

ocelotl marked this conversation as resolved.
Show resolved Hide resolved
def setUp(self):
super().setUp()
self.tracer_provider, self.exporter = TestBase.create_tracer_provider()
self.tracer = self.tracer_provider.get_tracer(__name__)
self.app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi, tracer_provider=self.tracer_provider
simple_asgi,
tracer_provider=self.tracer_provider,
**self.constructor_params,
)

def test_http_custom_request_headers_in_span_attributes(self):
Expand Down Expand Up @@ -148,7 +149,9 @@ def test_http_custom_request_headers_not_in_span_attributes(self):

def test_http_custom_response_headers_in_span_attributes(self):
self.app = otel_asgi.OpenTelemetryMiddleware(
http_app_with_custom_headers, tracer_provider=self.tracer_provider
http_app_with_custom_headers,
tracer_provider=self.tracer_provider,
**self.constructor_params,
)
self.seed_app(self.app)
self.send_default_request()
Expand All @@ -175,7 +178,9 @@ def test_http_custom_response_headers_in_span_attributes(self):

def test_http_custom_response_headers_not_in_span_attributes(self):
self.app = otel_asgi.OpenTelemetryMiddleware(
http_app_with_custom_headers, tracer_provider=self.tracer_provider
http_app_with_custom_headers,
tracer_provider=self.tracer_provider,
**self.constructor_params,
)
self.seed_app(self.app)
self.send_default_request()
Expand Down Expand Up @@ -277,6 +282,7 @@ def test_websocket_custom_response_headers_in_span_attributes(self):
self.app = otel_asgi.OpenTelemetryMiddleware(
websocket_app_with_custom_headers,
tracer_provider=self.tracer_provider,
**self.constructor_params,
)
self.seed_app(self.app)
self.send_input({"type": "websocket.connect"})
Expand Down Expand Up @@ -317,6 +323,7 @@ def test_websocket_custom_response_headers_not_in_span_attributes(self):
self.app = otel_asgi.OpenTelemetryMiddleware(
websocket_app_with_custom_headers,
tracer_provider=self.tracer_provider,
**self.constructor_params,
)
self.seed_app(self.app)
self.send_input({"type": "websocket.connect"})
Expand All @@ -333,3 +340,46 @@ def test_websocket_custom_response_headers_not_in_span_attributes(self):
if span.kind == SpanKind.SERVER:
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)


SANITIZE_FIELDS_TEST_VALUE = ".*my-secret.*"
SERVER_REQUEST_TEST_VALUE = "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3,Regex-Test-Header-.*,Regex-Invalid-Test-Header-.*,.*my-secret.*"
SERVER_RESPONSE_TEST_VALUE = "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3,my-custom-regex-header-.*,invalid-regex-header-.*,.*my-secret.*"


class TestCustomHeadersEnv(TestCustomHeaders):
def setUp(self):
os.environ.update(
{
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS: SANITIZE_FIELDS_TEST_VALUE,
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST: SERVER_REQUEST_TEST_VALUE,
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE: SERVER_RESPONSE_TEST_VALUE,
}
)
super().setUp()

def tearDown(self):
os.environ.pop(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SANITIZE_FIELDS, None
)
os.environ.pop(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST, None
)
os.environ.pop(
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE, None
)
super().tearDown()


class TestCustomHeadersConstructor(TestCustomHeaders):
constructor_params = {
"http_capture_headers_sanitize_fields": SANITIZE_FIELDS_TEST_VALUE.split(
","
),
"http_capture_headers_server_request": SERVER_REQUEST_TEST_VALUE.split(
","
),
"http_capture_headers_server_response": SERVER_RESPONSE_TEST_VALUE.split(
","
),
}
Original file line number Diff line number Diff line change
Expand Up @@ -983,18 +983,16 @@ class TestAsgiApplicationRaisingError(AsgiTestBase):
def tearDown(self):
pass

@mock.patch(
"opentelemetry.instrumentation.asgi.collect_custom_request_headers_attributes",
side_effect=ValueError("whatever"),
)
def test_asgi_issue_1883(
self, mock_collect_custom_request_headers_attributes
):
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
def test_asgi_issue_1883(self):
"""
Test that exception UnboundLocalError local variable 'start' referenced before assignment is not raised
See https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1883
"""
app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)

async def bad_app(_scope, _receive, _send):
raise ValueError("whatever")

app = otel_asgi.OpenTelemetryMiddleware(bad_app)
self.seed_app(app)
self.send_default_request()
try:
Expand Down
Loading
Loading