Skip to content

Commit

Permalink
added extra test
Browse files Browse the repository at this point in the history
  • Loading branch information
liustve committed Feb 11, 2025
1 parent 6462b66 commit a34e899
Showing 1 changed file with 95 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@
# SPDX-License-Identifier: Apache-2.0
import os
import time
from botocore.credentials import Credentials
from unittest import TestCase
from unittest.mock import ANY, MagicMock, PropertyMock, patch
from opentelemetry.sdk.trace import Tracer, Resource, SpanLimits, SpanContext
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.sdk.trace.sampling import ALWAYS_ON

from opentelemetry.trace import SpanKind

import requests
from botocore.credentials import Credentials

from amazon.opentelemetry.distro.aws_opentelemetry_configurator import OTLPAwsSigV4Exporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
Expand All @@ -24,13 +19,18 @@
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
)

from opentelemetry.sdk.trace import _Span
from opentelemetry.trace import SpanKind, TraceFlags
from opentelemetry.sdk.trace import Resource
from opentelemetry.sdk.trace import Resource, SpanContext, SpanLimits, Tracer, _Span
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry.trace import SpanKind, TraceFlags

OTLP_CW_ENDPOINT = "https://xray.us-east-1.amazonaws.com/v1/traces"
USER_AGENT = "OTel-OTLP-Exporter-Python/" + __version__
CONTENT_TYPE = "application/x-protobuf"
AUTHORIZATION_HEADER = "Authorization"
X_AMZ_DATE_HEADER = "X-Amz-Date"
X_AMZ_SECURITY_TOKEN_HEADER = "X-Amz-Security-Token"


class TestAwsSigV4Exporter(TestCase):
Expand All @@ -40,19 +40,18 @@ def setUp(self):
self.create_span("test_span2", SpanKind.SERVER),
self.create_span("test_span3", SpanKind.CLIENT),
self.create_span("test_span4", SpanKind.PRODUCER),
self.create_span("test_span5", SpanKind.CONSUMER)
self.create_span("test_span5", SpanKind.CONSUMER),
]

self.invalid_cw_otlp_tracing_endpoints = [
"https://xray.bad-region-1.amazonaws.com/v1/traces",
"https://xray.us-east-1.amaz.com/v1/traces",
"https://logs.us-east-1.amazonaws.com/v1/logs"
"https://test-endpoint123.com/test"
"https://logs.us-east-1.amazonaws.com/v1/logs" "https://test-endpoint123.com/test",
]

self.expected_auth_header = 'AWS4-HMAC-SHA256 Credential=test_key/some_date/us-east-1/xray/aws4_request'
self.expected_auth_x_amz_date = 'some_date'
self.expected_auth_security_token = 'test_token'
self.expected_auth_header = "AWS4-HMAC-SHA256 Credential=test_key/some_date/us-east-1/xray/aws4_request"
self.expected_auth_x_amz_date = "some_date"
self.expected_auth_security_token = "test_token"

# Tests that the default exporter is OTLP protobuf/http Span Exporter if no endpoint is set
@patch.dict(os.environ, {}, clear=True)
Expand All @@ -61,7 +60,7 @@ def test_sigv4_exporter_init_default(self):
self.validate_exporter_extends_http_span_exporter(exporter, DEFAULT_ENDPOINT + DEFAULT_TRACES_EXPORT_PATH)
self.assertIsInstance(exporter._session, requests.Session)

# Tests that the endpoint is validated and sets the aws_region but still uses the OTLP protobuf/http
# Tests that the endpoint is validated and sets the aws_region but still uses the OTLP protobuf/http
# Span Exporter exporter constructor behavior if a valid OTLP CloudWatch endpoint is set
@patch.dict(os.environ, {OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: OTLP_CW_ENDPOINT}, clear=True)
@patch("botocore.session.Session")
Expand All @@ -77,7 +76,7 @@ def test_sigv4_exporter_init_valid_cw_otlp_endpoint(self, session_mock):

mock_session.get_available_regions.assert_called_once_with("xray")

# Tests that the exporter constructor behavior is set by OTLP protobuf/http Span Exporter
# Tests that the exporter constructor behavior is set by OTLP protobuf/http Span Exporter
# if an invalid OTLP CloudWatch endpoint is set
@patch("botocore.session.Session")
def test_sigv4_exporter_init_invalid_cw_otlp_endpoint(self, botocore_mock):
Expand All @@ -95,136 +94,144 @@ def test_sigv4_exporter_init_invalid_cw_otlp_endpoint(self, botocore_mock):

self.assertIsNone(exporter._aws_region)


# Tests that if the OTLP endpoint is not a valid CW endpoint but the credentials are valid, SigV4 authentication method is NOT called and
# is NOT injected into the existing Session headers.
@patch("botocore.session.Session.get_available_regions")
@patch("requests.Session.post")
@patch("botocore.auth.SigV4Auth.add_auth")
def test_sigv4_exporter_export_does_not_add_sigv4_if_not_valid_cw_endpoint(self, mock_sigv4_auth, requests_mock, botocore_mock):
def test_sigv4_exporter_export_does_not_add_sigv4_if_not_valid_cw_endpoint(
self, mock_sigv4_auth, requests_mock, botocore_mock
):
# Setting the exporter response
mock_response = MagicMock()
mock_response.status_code = 200
type(mock_response).ok = PropertyMock(return_value=True)

# Setting the request session headers to make the call to endpoint
mock_session = MagicMock()
mock_session.headers = {
'User-Agent': 'OTel-OTLP-Exporter-Python/' + __version__,
'Content-Type': 'application/x-protobuf'
}
mock_session.headers = {"User-Agent": USER_AGENT, "Content-Type": CONTENT_TYPE}
requests_mock.return_value = mock_session
mock_session.post.return_value = mock_response

mock_botocore_session = MagicMock()
botocore_mock.return_value = mock_botocore_session
mock_botocore_session.get_available_regions.return_value = ["us-east-1", "us-west-2"]
mock_botocore_session.get_credentials.return_value = Credentials(
access_key="test_key",
secret_key="test_secret",
token="test_token"
access_key="test_key", secret_key="test_secret", token="test_token"
)

#SigV4 mock authentication injection
# SigV4 mock authentication injection
mock_sigv4_auth.side_effect = self.mock_add_auth

# Initialize and call exporter
exporter = OTLPAwsSigV4Exporter(endpoint=OTLP_CW_ENDPOINT)
exporter.export(self.testing_spans)

# For each invalid CW OTLP endpoint, vdalidate that the sigv4
# For each invalid CW OTLP endpoint, vdalidate that the sigv4
for bad_endpoint in self.invalid_cw_otlp_tracing_endpoints:
with self.subTest(endpoint=bad_endpoint):
with patch.dict(os.environ, {OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: bad_endpoint}):
botocore_mock.return_value = ["us-east-1", "us-west-2"]

exporter = OTLPAwsSigV4Exporter(endpoint=bad_endpoint)

self.validate_exporter_extends_http_span_exporter(exporter, bad_endpoint)

# Test case, should not have detected a valid region
self.assertIsNone(exporter._aws_region)

exporter.export(self.testing_spans)

mock_sigv4_auth.assert_not_called()

# Verify that SigV4 request headers were not injected
actual_headers = mock_session.headers
self.assertNotIn('Authorization', actual_headers)
self.assertNotIn('X-Amz-Date', actual_headers)
self.assertNotIn('X-Amz-Security-Token', actual_headers)
self.assertNotIn(AUTHORIZATION_HEADER, actual_headers)
self.assertNotIn(X_AMZ_DATE_HEADER, actual_headers)
self.assertNotIn(X_AMZ_SECURITY_TOKEN_HEADER, actual_headers)

requests_mock.assert_called_with(
url=bad_endpoint,
data=ANY,
verify=ANY,
timeout=ANY,
cert=ANY,
)
url=bad_endpoint,
data=ANY,
verify=ANY,
timeout=ANY,
cert=ANY,
)

# Tests that if the OTLP endpoint is a valid CW endpoint but no credentials are returned, SigV4 authentication method is NOT called and
# is NOT injected into the existing Session headers.
@patch("botocore.session.Session.get_available_regions")
@patch("botocore.session.Session")
@patch("requests.Session")
@patch("botocore.auth.SigV4Auth.add_auth")
@patch.dict(os.environ, {OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: OTLP_CW_ENDPOINT})
def test_sigv4_exporter_export_does_not_add_sigv4_if_no_credentials(self, mock_sigv4, requests_mock, botcore_mock):
def test_sigv4_exporter_export_does_not_add_sigv4_if_not_valid_credentials(
self, mock_sigv4_auth, requests_posts_mock, botocore_mock
):

# Setting the exporter response
mock_response = MagicMock()
mock_response.status_code = 200
type(mock_response).ok = PropertyMock(return_value=True)

requests_mock.return_value = mock_response

botcore_mock.return_value = ["us-east-1", "us-west-2"]
# Setting the request session headers to make the call to endpoint
mock_session = MagicMock()
mock_session.headers = {"User-Agent": USER_AGENT, "Content-Type": CONTENT_TYPE}
requests_posts_mock.return_value = mock_session
mock_session.post.return_value = mock_response

mock_botocore_session = MagicMock()
botocore_mock.return_value = mock_botocore_session
mock_botocore_session.get_available_regions.return_value = ["us-east-1", "us-west-2"]

# Test case, return None for get credentials
mock_botocore_session.get_credentials.return_value = None

# Initialize and call exporter
exporter = OTLPAwsSigV4Exporter(endpoint=OTLP_CW_ENDPOINT)
self.validate_exporter_extends_http_span_exporter(exporter, OTLP_CW_ENDPOINT)
self.assertIsNone(exporter._aws_region)

# Validate that the region is valid
self.assertEqual(exporter._aws_region, "us-east-1")

exporter.export(self.testing_spans)

mock_sigv4.assert_not_called()
# Verify SigV4 auth was not called
mock_sigv4_auth.assert_not_called()

# Verify that SigV4 request headers were properly injected
actual_headers = mock_session.headers
self.assertNotIn(AUTHORIZATION_HEADER, actual_headers)
self.assertNotIn(X_AMZ_DATE_HEADER, actual_headers)
self.assertNotIn(X_AMZ_SECURITY_TOKEN_HEADER, actual_headers)

requests_mock.assert_called_with(
url=OTLP_CW_ENDPOINT,
data=ANY,
verify=ANY,
timeout=ANY,
cert=ANY,
)

# Tests that if the OTLP endpoint is valid and credentials are valid, SigV4 authentication method is called and
# is injected into the existing Session headers.
@patch("botocore.session.Session")
@patch("requests.Session")
@patch("botocore.auth.SigV4Auth.add_auth")
@patch.dict(os.environ, {OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: OTLP_CW_ENDPOINT})
def test_sigv4_exporter_export_adds_sigv4_authentication_if_valid_cw_endpoint(self, mock_sigv4_auth, requests_posts_mock, botocore_mock):
def test_sigv4_exporter_export_adds_sigv4_authentication_if_valid_cw_endpoint(
self, mock_sigv4_auth, requests_posts_mock, botocore_mock
):

# Setting the exporter response
mock_response = MagicMock()
mock_response.status_code = 200
type(mock_response).ok = PropertyMock(return_value=True)

# Setting the request session headers to make the call to endpoint
mock_session = MagicMock()
mock_session.headers = {
'User-Agent': 'OTel-OTLP-Exporter-Python/' + __version__,
'Content-Type': 'application/x-protobuf'
}
mock_session.headers = {"User-Agent": USER_AGENT, "Content-Type": CONTENT_TYPE}
requests_posts_mock.return_value = mock_session
mock_session.post.return_value = mock_response

mock_botocore_session = MagicMock()
botocore_mock.return_value = mock_botocore_session
mock_botocore_session.get_available_regions.return_value = ["us-east-1", "us-west-2"]
mock_botocore_session.get_credentials.return_value = Credentials(
access_key="test_key",
secret_key="test_secret",
token="test_token"
access_key="test_key", secret_key="test_secret", token="test_token"
)

#SigV4 mock authentication injection
# SigV4 mock authentication injection
mock_sigv4_auth.side_effect = self.mock_add_auth

# Initialize and call exporter
Expand All @@ -236,14 +243,13 @@ def test_sigv4_exporter_export_adds_sigv4_authentication_if_valid_cw_endpoint(se

# Verify that SigV4 request headers were properly injected
actual_headers = mock_session.headers
self.assertIn('Authorization', actual_headers)
self.assertIn('X-Amz-Date', actual_headers)
self.assertIn('X-Amz-Security-Token', actual_headers)

self.assertEqual(actual_headers['Authorization'], self.expected_auth_header)
self.assertEqual(actual_headers['X-Amz-Date'], self.expected_auth_x_amz_date)
self.assertEqual(actual_headers['X-Amz-Security-Token'], self.expected_auth_security_token)
self.assertIn("Authorization", actual_headers)
self.assertIn("X-Amz-Date", actual_headers)
self.assertIn("X-Amz-Security-Token", actual_headers)

self.assertEqual(actual_headers[AUTHORIZATION_HEADER], self.expected_auth_header)
self.assertEqual(actual_headers[X_AMZ_DATE_HEADER], self.expected_auth_x_amz_date)
self.assertEqual(actual_headers[X_AMZ_SECURITY_TOKEN_HEADER], self.expected_auth_security_token)

def validate_exporter_extends_http_span_exporter(self, exporter, endpoint):
self.assertEqual(exporter._endpoint, endpoint)
Expand All @@ -256,13 +262,9 @@ def validate_exporter_extends_http_span_exporter(self, exporter, endpoint):
self.assertIn("User-Agent", exporter._session.headers)
self.assertEqual(
exporter._session.headers.get("Content-Type"),
"application/x-protobuf",
)
self.assertEqual(
exporter._session.headers.get("User-Agent"),
"OTel-OTLP-Exporter-Python/" + __version__,
CONTENT_TYPE,
)

self.assertEqual(exporter._session.headers.get("User-Agent"), USER_AGENT)

def create_span(self, name="test_span", kind=SpanKind.INTERNAL):
span = _Span(
Expand All @@ -271,15 +273,17 @@ def create_span(self, name="test_span", kind=SpanKind.INTERNAL):
trace_id=0x1234567890ABCDEF,
span_id=0x9876543210,
is_remote=False,
trace_flags=TraceFlags(TraceFlags.SAMPLED)
trace_flags=TraceFlags(TraceFlags.SAMPLED),
),
kind=kind
kind=kind,
)
return span

def mock_add_auth(self, request):
request.headers._headers.extend([
('Authorization', self.expected_auth_header),
('X-Amz-Date', self.expected_auth_x_amz_date),
('X-Amz-Security-Token', self.expected_auth_security_token)
])
request.headers._headers.extend(
[
(AUTHORIZATION_HEADER, self.expected_auth_header),
(X_AMZ_DATE_HEADER, self.expected_auth_x_amz_date),
(X_AMZ_SECURITY_TOKEN_HEADER, self.expected_auth_security_token),
]
)

0 comments on commit a34e899

Please sign in to comment.