From f8d0d47c4efdd968d48835d1cc34efa38efcf8bd Mon Sep 17 00:00:00 2001 From: harryryu Date: Wed, 3 Jul 2024 14:44:08 -0700 Subject: [PATCH] Add try except and extra env variable for gevent monkey patch --- .../distro/aws_opentelemetry_distro.py | 7 +- .../distro/patches/_instrumentation_patch.py | 41 +++++++++- .../distro/test_instrumentation_patch.py | 82 ++++++++++++++++--- 3 files changed, 115 insertions(+), 15 deletions(-) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py index 9bca8acd1..fc5b5d380 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_distro.py @@ -4,7 +4,10 @@ import sys from logging import Logger, getLogger -from amazon.opentelemetry.distro.patches._instrumentation_patch import apply_instrumentation_patches +from amazon.opentelemetry.distro.patches._instrumentation_patch import ( + AWS_GEVENT_PATCH_MODULES, + apply_instrumentation_patches, +) from opentelemetry.distro import OpenTelemetryDistro from opentelemetry.environment_variables import OTEL_PROPAGATORS, OTEL_PYTHON_ID_GENERATOR from opentelemetry.sdk.environment_variables import ( @@ -65,5 +68,7 @@ def _configure(self, **kwargs): OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION, "base2_exponential_bucket_histogram" ) + os.environ.setdefault(AWS_GEVENT_PATCH_MODULES, "all") + if kwargs.get("apply_patches", True): apply_instrumentation_patches() diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py index baacad05a..02d6839ce 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_instrumentation_patch.py @@ -1,6 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 # Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License. +import os import sys from logging import Logger, getLogger @@ -8,6 +9,10 @@ from amazon.opentelemetry.distro.patches._resource_detector_patches import _apply_resource_detector_patches +# Env variable for determining whether we want to monkey patch gevent modules. Possible values are 'all', 'none', and +# comma separated list 'os, thread, time, sys, socket, select, ssl, subprocess, builtins, signal, queue, contextvars' +AWS_GEVENT_PATCH_MODULES = "AWS_GEVENT_PATCH_MODULES" + _logger: Logger = getLogger(__name__) @@ -21,11 +26,39 @@ def apply_instrumentation_patches() -> None: Where possible, automated testing should be run to catch upstream changes resulting in broken patches """ if _is_installed("gevent"): - # pylint: disable=import-outside-toplevel - # Delay import to only occur if monkey patch is needed (e.g. gevent is used to run application). - from gevent import monkey + try: + gevent_patch_module = os.environ.get(AWS_GEVENT_PATCH_MODULES) + if gevent_patch_module == "all": + # pylint: disable=import-outside-toplevel + # Delay import to only occur if monkey patch is needed (e.g. gevent is used to run application). + from gevent import monkey + + monkey.patch_all() + elif gevent_patch_module == "none": + pass + else: + module_list = [module.strip() for module in gevent_patch_module.split(",")] + + # pylint: disable=import-outside-toplevel + # Delay import to only occur if monkey patch is needed (e.g. gevent is used to run application). + from gevent import monkey - monkey.patch_ssl() + monkey.patch_all( + socket="socket" in module_list, + time="time" in module_list, + select="select" in module_list, + thread="thread" in module_list, + os="os" in module_list, + ssl="ssl" in module_list, + subprocess="subprocess" in module_list, + sys="sys" in module_list, + builtins="builtins" in module_list, + signal="signal" in module_list, + queue="queue" in module_list, + contextvars="contextvars" in module_list, + ) + except Exception as exc: # pylint: disable=broad-except + _logger.debug("Failed to monkey patch gevent, exception: %s", exc) if _is_installed("botocore ~= 1.0"): # pylint: disable=import-outside-toplevel diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index 7e67ec43a..d12040041 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -1,13 +1,17 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -import sys +import os from typing import Dict from unittest import TestCase from unittest.mock import MagicMock, patch +import gevent.monkey import pkg_resources -from amazon.opentelemetry.distro.patches._instrumentation_patch import apply_instrumentation_patches +from amazon.opentelemetry.distro.patches._instrumentation_patch import ( + AWS_GEVENT_PATCH_MODULES, + apply_instrumentation_patches, +) from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS from opentelemetry.semconv.trace import SpanAttributes @@ -55,17 +59,37 @@ def test_instrumentation_patch(self): def _run_patch_behaviour_tests(self): # Test setup self.method_patches[GET_DISTRIBUTION_PATCH].return_value = "CorrectDistributionObject" + # Test setup to not patch gevent + os.environ[AWS_GEVENT_PATCH_MODULES] = "none" # Validate unpatched upstream behaviour - important to detect upstream changes that may break instrumentation self._test_unpatched_botocore_instrumentation() - self._test_unpatched_gevent_ssl_instrumentation() + self._test_unpatched_gevent_instrumentation() # Apply patches apply_instrumentation_patches() # Validate patched upstream behaviour - important to detect downstream changes that may break instrumentation self._test_patched_botocore_instrumentation() - self._test_patched_gevent_ssl_instrumentation() + self._test_unpatched_gevent_instrumentation() + + # Test setup to check whether only these two modules get patched by gevent monkey + os.environ[AWS_GEVENT_PATCH_MODULES] = "os, ssl" + + # Apply patches + apply_instrumentation_patches() + + # Validate that os and ssl gevent monkey patch modules were patched + self._test_patched_gevent_os_ssl_instrumentation() + + # Set the value to 'all' so that all the remaining gevent monkey patch modules are patched + os.environ[AWS_GEVENT_PATCH_MODULES] = "all" + + # Apply patches again. + apply_instrumentation_patches() + + # Validate that remaining gevent monkey patch modules were patched + self._test_patched_gevent_instrumentation() # Test teardown self._reset_mocks() @@ -96,9 +120,19 @@ def _test_unpatched_botocore_instrumentation(self): self.assertFalse("aws.sqs.queue_url" in attributes) self.assertFalse("aws.sqs.queue_name" in attributes) - def _test_unpatched_gevent_ssl_instrumentation(self): - # Ssl - self.assertFalse("gevent.ssl" in sys.modules, "Upstream has added the gevent ssl patch") + def _test_unpatched_gevent_instrumentation(self): + self.assertFalse(gevent.monkey.is_module_patched("os"), "gevent os module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("time"), "gevent time module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("sys"), "gevent sys module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("socket"), "gevent socket module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("select"), "gevent select module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("ssl"), "gevent ssl module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("subprocess"), "gevent subprocess module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("builtins"), "gevent builtins module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("signal"), "gevent signal module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("queue"), "gevent queue module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("contextvars"), "gevent contextvars module has been patched") def _test_patched_botocore_instrumentation(self): # Kinesis @@ -122,9 +156,37 @@ def _test_patched_botocore_instrumentation(self): self.assertTrue("aws.sqs.queue_name" in sqs_attributes) self.assertEqual(sqs_attributes["aws.sqs.queue_name"], _QUEUE_NAME) - def _test_patched_gevent_ssl_instrumentation(self): - # Ssl - self.assertTrue("gevent.ssl" in sys.modules) + def _test_patched_gevent_os_ssl_instrumentation(self): + # Only ssl and os module should have been patched since the environment variable was set to 'os, ssl' + self.assertTrue(gevent.monkey.is_module_patched("ssl"), "gevent ssl module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("os"), "gevent os module has not been patched") + # Rest should still be unpatched + self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("time"), "gevent time module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("sys"), "gevent sys module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("socket"), "gevent socket module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("select"), "gevent select module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("subprocess"), "gevent subprocess module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("builtins"), "gevent builtins module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("signal"), "gevent signal module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("queue"), "gevent queue module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("contextvars"), "gevent contextvars module has been patched") + + def _test_patched_gevent_instrumentation(self): + self.assertTrue(gevent.monkey.is_module_patched("os"), "gevent os module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("time"), "gevent time module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("socket"), "gevent socket module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("select"), "gevent select module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("ssl"), "gevent ssl module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("subprocess"), "gevent subprocess module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("signal"), "gevent signal module has not been patched") + self.assertTrue(gevent.monkey.is_module_patched("queue"), "gevent queue module has not been patched") + + # Current version of gevent.monkey.patch_all() does not do anything to these modules despite being called + self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("sys"), "gevent sys module has been patched") + self.assertFalse(gevent.monkey.is_module_patched("builtins"), "gevent builtins module not been patched") + self.assertFalse(gevent.monkey.is_module_patched("contextvars"), "gevent contextvars module has been patched") def _test_botocore_installed_flag(self): with patch(