From ecda6688521f2df9005c7cb0622bfb4575f7f8a0 Mon Sep 17 00:00:00 2001 From: Sebastian Marsching Date: Tue, 27 Nov 2018 20:59:41 +0100 Subject: [PATCH 1/9] Fix race condition in Salt loader. There was a race condition in the salt loader when injecting global values (e.g. "__pillar__" or "__salt__") into modules. One effect of this race condition was that in a setup with multiple threads, some threads may see pillar data intended for other threads or the pillar data seen by a thread might even change spuriously. There have been earlier attempts to fix this problem (#27937, #29397). These patches tried to fix the problem by storing the dictionary that keeps the relevant data in a thread-local variable and referencing this thread-local variable from the variables that are injected into the modules. These patches did not fix the problem completely because they only work when a module is loaded through a single loader instance only. When there is more than one loader, there is more than one thread-local variable and the variable injected into a module is changed to point to another thread-local variable when the module is loaded again. Thus, the problem resurfaced while working on #39670. This patch attempts to solve the problem from a slightly different angle, complementing the earlier patches: The value injected into the modules now is a proxy that internally uses a thread-local variable to decide to which object it points. This means that when loading a module again through a different loader (possibly passing different pillar data), the data is actually only changed in the thread in which the loader is used. Other threads are not affected by such a change. This means that it will work correctly in the current situation where loaders are possibly created by many different modules and these modules do not necessary know in which context they are executed. Thus it is much more flexible and reliable than the more explicit approach used by the two earlier patches. Unfortunately, the stand JSON and Msgpack serialization code cannot handle proxied objects, so they have to be unwrapped before passing them to that code. The salt.utils.json and salt.utils.msgpack modules have been modified to take care of unwrapping objects that are proxied using the ThreadLocalProxy. --- salt/loader.py | 74 ++- salt/utils/json.py | 17 +- salt/utils/msgpack.py | 20 +- salt/utils/thread_local_proxy.py | 599 ++++++++++++++++++++ tests/unit/utils/test_thread_local_proxy.py | 37 ++ 5 files changed, 741 insertions(+), 6 deletions(-) create mode 100644 salt/utils/thread_local_proxy.py create mode 100644 tests/unit/utils/test_thread_local_proxy.py diff --git a/salt/loader.py b/salt/loader.py index 428fb338c964..0f18dfcbff88 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -14,6 +14,7 @@ import logging import inspect import tempfile +import threading import functools import threading import traceback @@ -33,6 +34,7 @@ import salt.utils.lazy import salt.utils.odict import salt.utils.platform +import salt.utils.thread_local_proxy import salt.utils.versions import salt.utils.stringutils from salt.exceptions import LoaderError @@ -1100,6 +1102,76 @@ def _mod_type(module_path): return 'ext' +def _inject_into_mod(mod, name, value, force_lock=False): + ''' + Inject a variable into a module. This is used to inject "globals" like + ``__salt__``, ``__pillar``, or ``grains``. + + Instead of injecting the value directly, a ``ThreadLocalProxy`` is created. + If such a proxy is already present under the specified name, it is updated + with the new value. This update only affects the current thread, so that + the same name can refer to different values depending on the thread of + execution. + + This is important for data that is not truly global. For example, pillar + data might be dynamically overriden through function parameters and thus + the actual values available in pillar might depend on the thread that is + calling a module. + + mod: + module object into which the value is going to be injected. + + name: + name of the variable that is injected into the module. + + value: + value that is injected into the variable. The value is not injected + directly, but instead set as the new reference of the proxy that has + been created for the variable. + + force_lock: + whether the lock should be acquired before checking whether a proxy + object for the specified name has already been injected into the + module. If ``False`` (the default), this function checks for the + module's variable without acquiring the lock and only acquires the lock + if a new proxy has to be created and injected. + ''' + from salt.utils.thread_local_proxy import ThreadLocalProxy + old_value = getattr(mod, name, None) + # We use a double-checked locking scheme in order to avoid taking the lock + # when a proxy object has already been injected. + # In most programming languages, double-checked locking is considered + # unsafe when used without explicit memory barriers because one might read + # an uninitialized value. In CPython it is safe due to the global + # interpreter lock (GIL). In Python implementations that do not have the + # GIL, it could be unsafe, but at least Jython also guarantees that (for + # Python objects) memory is not corrupted when writing and reading without + # explicit synchronization + # (http://www.jython.org/jythonbook/en/1.0/Concurrency.html). + # Please note that in order to make this code safe in a runtime environment + # that does not make this guarantees, it is not sufficient. The + # ThreadLocalProxy must also be created with fallback_to_shared set to + # False or a lock must be added to the ThreadLocalProxy. + if force_lock: + with _inject_into_mod.lock: + if isinstance(old_value, ThreadLocalProxy): + ThreadLocalProxy.set_reference(old_value, value) + else: + setattr(mod, name, ThreadLocalProxy(value, True)) + else: + if isinstance(old_value, ThreadLocalProxy): + ThreadLocalProxy.set_reference(old_value, value) + else: + _inject_into_mod(mod, name, value, True) + + +# Lock used when injecting globals. This is needed to avoid a race condition +# when two threads try to load the same module concurrently. This must be +# outside the loader because there might be more than one loader for the same +# namespace. +_inject_into_mod.lock = threading.RLock() + + # TODO: move somewhere else? class FilterDictWrapper(MutableMapping): ''' @@ -1653,7 +1725,7 @@ def _load_module(self, name): # pack whatever other globals we were asked to for p_name, p_value in six.iteritems(self.pack): - setattr(mod, p_name, p_value) + _inject_into_mod(mod, p_name, p_value) module_name = mod.__name__.rsplit('.', 1)[-1] diff --git a/salt/utils/json.py b/salt/utils/json.py index fd71544b5468..8e3ee2970f0e 100644 --- a/salt/utils/json.py +++ b/salt/utils/json.py @@ -13,6 +13,7 @@ # Import Salt libs import salt.utils.data import salt.utils.stringutils +from salt.utils.thread_local_proxy import ThreadLocalProxy # Import 3rd-party libs from salt.ext import six @@ -119,11 +120,17 @@ def dump(obj, fp, **kwargs): using the _json_module argument) ''' json_module = kwargs.pop('_json_module', json) + orig_enc_func = kwargs.pop('default', lambda x: x) + + def _enc_func(obj): + obj = ThreadLocalProxy.unproxy(obj) + return orig_enc_func(obj) + if 'ensure_ascii' not in kwargs: kwargs['ensure_ascii'] = False if six.PY2: obj = salt.utils.data.encode(obj) - return json_module.dump(obj, fp, **kwargs) # future lint: blacklisted-function + return json_module.dump(obj, fp, default=_enc_func, **kwargs) # future lint: blacklisted-function def dumps(obj, **kwargs): @@ -142,8 +149,14 @@ def dumps(obj, **kwargs): using the _json_module argument) ''' json_module = kwargs.pop('_json_module', json) + orig_enc_func = kwargs.pop('default', lambda x: x) + + def _enc_func(obj): + obj = ThreadLocalProxy.unproxy(obj) + return orig_enc_func(obj) + if 'ensure_ascii' not in kwargs: kwargs['ensure_ascii'] = False if six.PY2: obj = salt.utils.data.encode(obj) - return json_module.dumps(obj, **kwargs) # future lint: blacklisted-function + return json_module.dumps(obj, default=_enc_func, **kwargs) # future lint: blacklisted-function diff --git a/salt/utils/msgpack.py b/salt/utils/msgpack.py index 1d02aa96ba8b..2c9a3135999f 100644 --- a/salt/utils/msgpack.py +++ b/salt/utils/msgpack.py @@ -2,11 +2,13 @@ ''' Functions to work with MessagePack ''' - # Import Python libs from __future__ import absolute_import import logging +# Import Salt libs +from salt.utils.thread_local_proxy import ThreadLocalProxy + log = logging.getLogger(__name__) # Import 3rd party libs @@ -87,8 +89,14 @@ def pack(o, stream, **kwargs): By default, this function uses the msgpack module and falls back to msgpack_pure, if the msgpack is not available. ''' + orig_enc_func = kwargs.pop('default', lambda x: x) + + def _enc_func(obj): + obj = ThreadLocalProxy.unproxy(obj) + return orig_enc_func(obj) + # Writes to a stream, there is no return - msgpack.pack(o, stream, **_sanitize_msgpack_kwargs(kwargs)) + msgpack.pack(o, stream, default=_enc_func, **_sanitize_msgpack_kwargs(kwargs)) def packb(o, **kwargs): @@ -101,7 +109,13 @@ def packb(o, **kwargs): By default, this function uses the msgpack module and falls back to msgpack_pure, if the msgpack is not available. ''' - return msgpack.packb(o, **_sanitize_msgpack_kwargs(kwargs)) + orig_enc_func = kwargs.pop('default', lambda x: x) + + def _enc_func(obj): + obj = ThreadLocalProxy.unproxy(obj) + return orig_enc_func(obj) + + return msgpack.packb(o, default=_enc_func, **_sanitize_msgpack_kwargs(kwargs)) def unpack(stream, **kwargs): diff --git a/salt/utils/thread_local_proxy.py b/salt/utils/thread_local_proxy.py new file mode 100644 index 000000000000..8be7ad03be30 --- /dev/null +++ b/salt/utils/thread_local_proxy.py @@ -0,0 +1,599 @@ +# -*- coding: utf-8 -*- +''' +Proxy object that can reference different values depending on the current +thread of execution. + +..versionadded:: 2018.3.4 + +''' + +# Import python libs +from __future__ import absolute_import +import threading + +# Import 3rd-party libs +from salt.ext import six + + +class ThreadLocalProxy(object): + ''' + Proxy that delegates all operations to its referenced object. The referenced + object is hold through a thread-local variable, so that this proxy may refer + to different objects in different threads of execution. + + For all practical purposes (operators, attributes, `isinstance`), the proxy + acts like the referenced object. Thus, code receiving the proxy object + instead of the reference object typically does not have to be changed. The + only exception is code that explicitly uses the ``type()`` function for + checking the proxy's type. While `isinstance(proxy, ...)` will yield the + expected results (based on the actual type of the referenced object), using + something like ``issubclass(type(proxy), ...)`` will not work, because + these tests will be made on the type of the proxy object instead of the + type of the referenced object. In order to avoid this, such code must be + changed to use ``issubclass(type(ThreadLocalProxy.unproxy(proxy)), ...)``. + + If an instance of this class is created with the ``fallback_to_shared`` flag + set and a thread uses the instance without setting the reference explicitly, + the reference for this thread is initialized with the latest reference set + by any thread. + + This class has primarily been designed for use by the Salt loader, but it + might also be useful in other places. + ''' + + __slots__ = ['_thread_local', '_last_reference', '_fallback_to_shared'] + + @staticmethod + def get_reference(proxy): + ''' + Return the object that is referenced by the specified proxy. + + If the proxy has not been bound to a reference for the current thread, + the behavior depends on th the ``fallback_to_shared`` flag that has + been specified when creating the proxy. If the flag has been set, the + last reference that has been set by any thread is returned (and + silently set as the reference for the current thread). If the flag has + not been set, an ``AttributeError`` is raised. + + If the object references by this proxy is itself a proxy, that proxy is + returned. Use ``unproxy`` for unwrapping the referenced object until it + is not a proxy. + + proxy: + proxy object for which the reference shall be returned. If the + specified object is not an instance of `ThreadLocalProxy`, the + behavior is unspecified. Typically, an ``AttributeError`` is + going to be raised. + ''' + thread_local = object.__getattribute__(proxy, '_thread_local') + try: + return thread_local.reference + except AttributeError: + fallback_to_shared = object.__getattribute__( + proxy, '_fallback_to_shared') + if fallback_to_shared: + # If the reference has never been set in the current thread of + # execution, we use the reference that has been last set by any + # thread. + reference = object.__getattribute__(proxy, '_last_reference') + # We save the reference in the thread local so that future + # calls to get_reference will have consistent results. + ThreadLocalProxy.set_reference(proxy, reference) + return reference + else: + # We could simply return None, but this would make it hard to + # debug situations where the reference has not been set (the + # problem might go unnoticed until some code tries to do + # something with the returned object and it might not be easy to + # find out from where the None value originates). + # For this reason, we raise an AttributeError with an error + # message explaining the problem. + raise AttributeError( + 'The proxy object has not been bound to a reference in this thread of execution.') + + @staticmethod + def set_reference(proxy, new_reference): + ''' + Set the reference to be used the current thread of execution. + + After calling this function, the specified proxy will act like it was + the referenced object. + + proxy: + proxy object for which the reference shall be set. If the specified + object is not an instance of `ThreadLocalProxy`, the behavior is + unspecified. Typically, an ``AttributeError`` is going to be + raised. + + new_reference: + reference the proxy should point to for the current thread after + calling this function. + ''' + # If the new reference is itself a proxy, we have to ensure that it does + # not refer to this proxy. If it does, we simply return because updating + # the reference would result in an inifite loop when trying to use the + # proxy. + possible_proxy = new_reference + while isinstance(possible_proxy, ThreadLocalProxy): + if possible_proxy is proxy: + return + possible_proxy = ThreadLocalProxy.get_reference(possible_proxy) + thread_local = object.__getattribute__(proxy, '_thread_local') + thread_local.reference = new_reference + object.__setattr__(proxy, '_last_reference', new_reference) + + @staticmethod + def unset_reference(proxy): + ''' + Unset the reference to be used by the current thread of execution. + + After calling this function, the specified proxy will act like the + reference had never been set for the current thread. + + proxy: + proxy object for which the reference shall be unset. If the + specified object is not an instance of `ThreadLocalProxy`, the + behavior is unspecified. Typically, an ``AttributeError`` is going + to be raised. + ''' + thread_local = object.__getattribute__(proxy, '_thread_local') + del thread_local.reference + + @staticmethod + def unproxy(possible_proxy): + ''' + Unwrap and return the object referenced by a proxy. + + This function is very similar to :func:`get_reference`, but works for + both proxies and regular objects. If the specified object is a proxy, + its reference is extracted with ``get_reference`` and returned. If it + is not a proxy, it is returned as is. + + If the object references by the proxy is itself a proxy, the unwrapping + is repeated until a regular (non-proxy) object is found. + + possible_proxy: + object that might or might not be a proxy. + ''' + while isinstance(possible_proxy, ThreadLocalProxy): + possible_proxy = ThreadLocalProxy.get_reference(possible_proxy) + return possible_proxy + + def __init__(self, initial_reference, fallback_to_shared=False): + ''' + Create a proxy object that references the specified object. + + initial_reference: + object this proxy should initially reference (for the current + thread of execution). The :func:`set_reference` function is called + for the newly created proxy, passing this object. + + fallback_to_shared: + flag indicating what should happen when the proxy is used in a + thread where the reference has not been set explicitly. If + ``True``, the thread's reference is silently initialized to use the + reference last set by any thread. If ``False`` (the default), an + exception is raised when the proxy is used in a thread without + first initializing the reference in this thread. + ''' + object.__setattr__(self, '_thread_local', threading.local()) + object.__setattr__(self, '_fallback_to_shared', fallback_to_shared) + ThreadLocalProxy.set_reference(self, initial_reference) + + def __repr__(self): + reference = ThreadLocalProxy.get_reference(self) + return repr(reference) + + def __str__(self): + reference = ThreadLocalProxy.get_reference(self) + return str(reference) + + def __lt__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference < other + + def __le__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference <= other + + def __eq__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference == other + + def __ne__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference != other + + def __gt__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference > other + + def __ge__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference >= other + + def __hash__(self): + reference = ThreadLocalProxy.get_reference(self) + return hash(reference) + + def __nonzero__(self): + reference = ThreadLocalProxy.get_reference(self) + return bool(reference) + + def __getattr__(self, name): + reference = ThreadLocalProxy.get_reference(self) + # Old-style classes might not have a __getattr__ method, but using + # getattr(...) will still work. + try: + original_method = reference.__getattr__ + except AttributeError: + return getattr(reference, name) + return reference.__getattr__(name) + + def __setattr__(self, name, value): + reference = ThreadLocalProxy.get_reference(self) + reference.__setattr__(name, value) + + def __delattr__(self, name): + reference = ThreadLocalProxy.get_reference(self) + reference.__delattr__(name) + + def __getattribute__(self, name): + reference = ThreadLocalProxy.get_reference(self) + return reference.__getattribute__(name) + + def __call__(self, *args, **kwargs): + reference = ThreadLocalProxy.get_reference(self) + return reference(*args, **kwargs) + + def __len__(self): + reference = ThreadLocalProxy.get_reference(self) + return len(reference) + + def __getitem__(self, key): + reference = ThreadLocalProxy.get_reference(self) + return reference[key] + + def __setitem__(self, key, value): + reference = ThreadLocalProxy.get_reference(self) + reference[key] = value + + def __delitem__(self, key): + reference = ThreadLocalProxy.get_reference(self) + del reference[key] + + def __iter__(self): + reference = ThreadLocalProxy.get_reference(self) + return reference.__iter__() + + def __reversed__(self): + reference = ThreadLocalProxy.get_reference(self) + return reversed(reference) + + def __contains__(self, item): + reference = ThreadLocalProxy.get_reference(self) + return item in reference + + def __add__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference + other + + def __sub__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference - other + + def __mul__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference * other + + def __floordiv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference // other + + def __mod__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference % other + + def __divmod__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return divmod(reference, other) + + def __pow__(self, other, modulo=None): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + modulo = ThreadLocalProxy.unproxy(modulo) + if modulo is None: + return pow(reference, other) + else: + return pow(reference, other, modulo) + + def __lshift__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference << other + + def __rshift__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference >> other + + def __and__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference & other + + def __xor__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference ^ other + + def __or__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return reference | other + + def __div__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + try: + func = reference.__div__ + except AttributeError: + return NotImplemented + return func(other) + + def __truediv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + try: + func = reference.__truediv__ + except AttributeError: + return NotImplemented + return func(other) + + def __radd__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other + reference + + def __rsub__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other - reference + + def __rmul__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other * reference + + def __rdiv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + try: + func = reference.__rdiv__ + except AttributeError: + return NotImplemented + return func(other) + + def __rtruediv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + try: + func = reference.__rtruediv__ + except AttributeError: + return NotImplemented + return func(other) + + def __rfloordiv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other // reference + + def __rmod__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other % reference + + def __rdivmod__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return divmod(other, reference) + + def __rpow__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other ** reference + + def __rlshift__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other << reference + + def __rrshift__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other >> reference + + def __rand__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other & reference + + def __rxor__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other ^ reference + + def __ror__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return other | reference + + def __iadd__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference += other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __isub__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference -= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __imul__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference *= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __idiv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + try: + func = reference.__idiv__ + except AttributeError: + return NotImplemented + reference = func(other) + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __itruediv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + try: + func = reference.__itruediv__ + except AttributeError: + return NotImplemented + reference = func(other) + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __ifloordiv__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference //= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __imod__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference %= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __ipow__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference **= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __ilshift__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference <<= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __irshift__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference >>= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __iand__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference &= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __ixor__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference ^= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __ior__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + reference |= other + ThreadLocalProxy.set_reference(self, reference) + return reference + + def __neg__(self): + reference = ThreadLocalProxy.get_reference(self) + return - reference + + def __pos__(self): + reference = ThreadLocalProxy.get_reference(self) + return + reference + + def __abs__(self): + reference = ThreadLocalProxy.get_reference(self) + return abs(reference) + + def __invert__(self): + reference = ThreadLocalProxy.get_reference(self) + return ~ reference + + def __complex__(self): + reference = ThreadLocalProxy.get_reference(self) + return complex(reference) + + def __int__(self): + reference = ThreadLocalProxy.get_reference(self) + return int(reference) + + def __float__(self): + reference = ThreadLocalProxy.get_reference(self) + return float(reference) + + def __oct__(self): + reference = ThreadLocalProxy.get_reference(self) + return oct(reference) + + def __hex__(self): + reference = ThreadLocalProxy.get_reference(self) + return hex(reference) + + def __index__(self): + reference = ThreadLocalProxy.get_reference(self) + try: + func = reference.__index__ + except AttributeError: + return NotImplemented + return func() + + def __coerce__(self, other): + reference = ThreadLocalProxy.get_reference(self) + other = ThreadLocalProxy.unproxy(other) + return coerce(reference, other) + + if six.PY2: + # pylint: disable=incompatible-py3-code + def __unicode__(self): + reference = ThreadLocalProxy.get_reference(self) + return unicode(reference) + + def __long__(self): + reference = ThreadLocalProxy.get_reference(self) + return long(reference) diff --git a/tests/unit/utils/test_thread_local_proxy.py b/tests/unit/utils/test_thread_local_proxy.py new file mode 100644 index 000000000000..0cd77379aed8 --- /dev/null +++ b/tests/unit/utils/test_thread_local_proxy.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- + +# Import python libs +from __future__ import absolute_import + +# Import Salt Libs +from salt.utils import thread_local_proxy + +# Import Salt Testing Libs +from tests.support.unit import TestCase + + +class ThreadLocalProxyTestCase(TestCase): + ''' + Test case for salt.utils.thread_local_proxy module. + ''' + + def test_set_reference_avoid_loop(self): + ''' + Test that passing another proxy (or the same proxy) to set_reference + does not results in a recursive proxy loop. + ''' + test_obj1 = 1 + test_obj2 = 2 + proxy1 = thread_local_proxy.ThreadLocalProxy(test_obj1) + proxy2 = thread_local_proxy.ThreadLocalProxy(proxy1) + self.assertEqual(test_obj1, proxy1) + self.assertEqual(test_obj1, proxy2) + self.assertEqual(proxy1, proxy2) + thread_local_proxy.ThreadLocalProxy.set_reference(proxy1, test_obj2) + self.assertEqual(test_obj2, proxy1) + self.assertEqual(test_obj2, proxy2) + self.assertEqual(proxy1, proxy2) + thread_local_proxy.ThreadLocalProxy.set_reference(proxy1, proxy2) + self.assertEqual(test_obj2, proxy1) + self.assertEqual(test_obj2, proxy2) + self.assertEqual(proxy1, proxy2) From 1a7a83f005d416e478bea9abd5c507da900725f6 Mon Sep 17 00:00:00 2001 From: Matt Phillips Date: Mon, 4 Feb 2019 20:08:51 -0500 Subject: [PATCH 2/9] address circular reference in LazyLoader with ThreadLocalProxy when lazyLoader was getting instantiated from inside already lazyloaded modules (ie state), pillar/grain (and probably other) globals were already populated. In this case they weren't getting dereferenced properly when wrapped via the NamespacedDict wrapper, causing an infinite loop. This should fix that scenario. Fixes https://github.com/saltstack/salt/pull/50655#issuecomment-460050683 --- salt/loader.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/salt/loader.py b/salt/loader.py index 0f18dfcbff88..eca6e7c989b2 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -34,12 +34,12 @@ import salt.utils.lazy import salt.utils.odict import salt.utils.platform -import salt.utils.thread_local_proxy import salt.utils.versions import salt.utils.stringutils from salt.exceptions import LoaderError from salt.template import check_render_pipe_str from salt.utils.decorators import Depends +from salt.utils.thread_local_proxy import ThreadLocalProxy # Import 3rd-party libs from salt.ext import six @@ -1136,7 +1136,6 @@ def _inject_into_mod(mod, name, value, force_lock=False): module's variable without acquiring the lock and only acquires the lock if a new proxy has to be created and injected. ''' - from salt.utils.thread_local_proxy import ThreadLocalProxy old_value = getattr(mod, name, None) # We use a double-checked locking scheme in order to avoid taking the lock # when a proxy object has already been injected. @@ -1266,7 +1265,12 @@ def __init__(self, for k, v in six.iteritems(self.pack): if v is None: # if the value of a pack is None, lets make an empty dict - self.context_dict.setdefault(k, {}) + value = self.context_dict.get(k, {}) + + if isinstance(value, ThreadLocalProxy): + value = ThreadLocalProxy.unproxy(value) + + self.context_dict[k] = value self.pack[k] = salt.utils.context.NamespacedDictWrapper(self.context_dict, k) self.whitelist = whitelist @@ -1544,11 +1548,21 @@ def __prep_mod_opts(self, opts): Strip out of the opts any logger instance ''' if '__grains__' not in self.pack: - self.context_dict['grains'] = opts.get('grains', {}) + grains = opts.get('grains', {}) + + if isinstance(grains, ThreadLocalProxy): + grains = ThreadLocalProxy.unproxy(grains) + + self.context_dict['grains'] = grains self.pack['__grains__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'grains') if '__pillar__' not in self.pack: - self.context_dict['pillar'] = opts.get('pillar', {}) + pillar = opts.get('pillar', {}) + + if isinstance(pillar, ThreadLocalProxy): + pillar = ThreadLocalProxy.unproxy(pillar) + + self.context_dict['pillar'] = pillar self.pack['__pillar__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'pillar') mod_opts = {} From 2590cf77c256e8aafd88cd8c1ad42677a10e8fd9 Mon Sep 17 00:00:00 2001 From: Tyler Johnson Date: Wed, 18 Dec 2019 15:15:37 -0700 Subject: [PATCH 3/9] Collapsed rendundant logic, don't use vars from outter scope --- salt/loader.py | 17 ++++------------- salt/utils/json.py | 12 ++++++------ 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/salt/loader.py b/salt/loader.py index eca6e7c989b2..6170d193bc39 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -1265,10 +1265,7 @@ def __init__(self, for k, v in six.iteritems(self.pack): if v is None: # if the value of a pack is None, lets make an empty dict - value = self.context_dict.get(k, {}) - - if isinstance(value, ThreadLocalProxy): - value = ThreadLocalProxy.unproxy(value) + value = ThreadLocalProxy.unproxy(self.context_dict.get(k, {})) self.context_dict[k] = value self.pack[k] = salt.utils.context.NamespacedDictWrapper(self.context_dict, k) @@ -1548,19 +1545,13 @@ def __prep_mod_opts(self, opts): Strip out of the opts any logger instance ''' if '__grains__' not in self.pack: - grains = opts.get('grains', {}) - - if isinstance(grains, ThreadLocalProxy): - grains = ThreadLocalProxy.unproxy(grains) + _grains = ThreadLocalProxy.unproxy(opts.get('grains', {})) - self.context_dict['grains'] = grains + self.context_dict['grains'] = _grains self.pack['__grains__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'grains') if '__pillar__' not in self.pack: - pillar = opts.get('pillar', {}) - - if isinstance(pillar, ThreadLocalProxy): - pillar = ThreadLocalProxy.unproxy(pillar) + pillar = ThreadLocalProxy.unproxy(opts.get('pillar', {})) self.context_dict['pillar'] = pillar self.pack['__pillar__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'pillar') diff --git a/salt/utils/json.py b/salt/utils/json.py index 8e3ee2970f0e..57c513587113 100644 --- a/salt/utils/json.py +++ b/salt/utils/json.py @@ -122,9 +122,9 @@ def dump(obj, fp, **kwargs): json_module = kwargs.pop('_json_module', json) orig_enc_func = kwargs.pop('default', lambda x: x) - def _enc_func(obj): - obj = ThreadLocalProxy.unproxy(obj) - return orig_enc_func(obj) + def _enc_func(_obj): + _obj = ThreadLocalProxy.unproxy(_obj) + return orig_enc_func(_obj) if 'ensure_ascii' not in kwargs: kwargs['ensure_ascii'] = False @@ -151,9 +151,9 @@ def dumps(obj, **kwargs): json_module = kwargs.pop('_json_module', json) orig_enc_func = kwargs.pop('default', lambda x: x) - def _enc_func(obj): - obj = ThreadLocalProxy.unproxy(obj) - return orig_enc_func(obj) + def _enc_func(_obj): + _obj = ThreadLocalProxy.unproxy(_obj) + return orig_enc_func(_obj) if 'ensure_ascii' not in kwargs: kwargs['ensure_ascii'] = False From cc3e476da3f0dc5fcd28d53054c791de7beab543 Mon Sep 17 00:00:00 2001 From: Tyler Johnson Date: Wed, 18 Dec 2019 15:39:04 -0700 Subject: [PATCH 4/9] Import thread_local_proxy correctly --- salt/loader.py | 19 +++++++++---------- salt/utils/json.py | 8 +++----- salt/utils/msgpack.py | 9 ++++----- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/salt/loader.py b/salt/loader.py index 6170d193bc39..d5fd0af8ac3c 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -14,7 +14,6 @@ import logging import inspect import tempfile -import threading import functools import threading import traceback @@ -36,10 +35,10 @@ import salt.utils.platform import salt.utils.versions import salt.utils.stringutils +import salt.utils.thread_local_proxy as thread_local_proxy from salt.exceptions import LoaderError from salt.template import check_render_pipe_str from salt.utils.decorators import Depends -from salt.utils.thread_local_proxy import ThreadLocalProxy # Import 3rd-party libs from salt.ext import six @@ -1153,13 +1152,13 @@ def _inject_into_mod(mod, name, value, force_lock=False): # False or a lock must be added to the ThreadLocalProxy. if force_lock: with _inject_into_mod.lock: - if isinstance(old_value, ThreadLocalProxy): - ThreadLocalProxy.set_reference(old_value, value) + if isinstance(old_value, thread_local_proxy.ThreadLocalProxy): + thread_local_proxy.ThreadLocalProxy.set_reference(old_value, value) else: - setattr(mod, name, ThreadLocalProxy(value, True)) + setattr(mod, name, thread_local_proxy.ThreadLocalProxy(value, True)) else: - if isinstance(old_value, ThreadLocalProxy): - ThreadLocalProxy.set_reference(old_value, value) + if isinstance(old_value, thread_local_proxy.ThreadLocalProxy): + thread_local_proxy.ThreadLocalProxy.set_reference(old_value, value) else: _inject_into_mod(mod, name, value, True) @@ -1265,7 +1264,7 @@ def __init__(self, for k, v in six.iteritems(self.pack): if v is None: # if the value of a pack is None, lets make an empty dict - value = ThreadLocalProxy.unproxy(self.context_dict.get(k, {})) + value = thread_local_proxy.ThreadLocalProxy.unproxy(self.context_dict.get(k, {})) self.context_dict[k] = value self.pack[k] = salt.utils.context.NamespacedDictWrapper(self.context_dict, k) @@ -1545,13 +1544,13 @@ def __prep_mod_opts(self, opts): Strip out of the opts any logger instance ''' if '__grains__' not in self.pack: - _grains = ThreadLocalProxy.unproxy(opts.get('grains', {})) + _grains = thread_local_proxy.ThreadLocalProxy.unproxy(opts.get('grains', {})) self.context_dict['grains'] = _grains self.pack['__grains__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'grains') if '__pillar__' not in self.pack: - pillar = ThreadLocalProxy.unproxy(opts.get('pillar', {})) + pillar = thread_local_proxy.ThreadLocalProxy.unproxy(opts.get('pillar', {})) self.context_dict['pillar'] = pillar self.pack['__pillar__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'pillar') diff --git a/salt/utils/json.py b/salt/utils/json.py index 57c513587113..e520e21afb69 100644 --- a/salt/utils/json.py +++ b/salt/utils/json.py @@ -13,7 +13,7 @@ # Import Salt libs import salt.utils.data import salt.utils.stringutils -from salt.utils.thread_local_proxy import ThreadLocalProxy +import salt.utils.thread_local_proxy as thread_local_proxy # Import 3rd-party libs from salt.ext import six @@ -123,8 +123,7 @@ def dump(obj, fp, **kwargs): orig_enc_func = kwargs.pop('default', lambda x: x) def _enc_func(_obj): - _obj = ThreadLocalProxy.unproxy(_obj) - return orig_enc_func(_obj) + return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(_obj)) if 'ensure_ascii' not in kwargs: kwargs['ensure_ascii'] = False @@ -152,8 +151,7 @@ def dumps(obj, **kwargs): orig_enc_func = kwargs.pop('default', lambda x: x) def _enc_func(_obj): - _obj = ThreadLocalProxy.unproxy(_obj) - return orig_enc_func(_obj) + return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(_obj)) if 'ensure_ascii' not in kwargs: kwargs['ensure_ascii'] = False diff --git a/salt/utils/msgpack.py b/salt/utils/msgpack.py index 2c9a3135999f..0cfc2c348557 100644 --- a/salt/utils/msgpack.py +++ b/salt/utils/msgpack.py @@ -2,12 +2,13 @@ ''' Functions to work with MessagePack ''' + # Import Python libs from __future__ import absolute_import import logging # Import Salt libs -from salt.utils.thread_local_proxy import ThreadLocalProxy +import salt.utils.thread_local_proxy as thread_local_proxy log = logging.getLogger(__name__) @@ -92,8 +93,7 @@ def pack(o, stream, **kwargs): orig_enc_func = kwargs.pop('default', lambda x: x) def _enc_func(obj): - obj = ThreadLocalProxy.unproxy(obj) - return orig_enc_func(obj) + return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(obj)) # Writes to a stream, there is no return msgpack.pack(o, stream, default=_enc_func, **_sanitize_msgpack_kwargs(kwargs)) @@ -112,8 +112,7 @@ def packb(o, **kwargs): orig_enc_func = kwargs.pop('default', lambda x: x) def _enc_func(obj): - obj = ThreadLocalProxy.unproxy(obj) - return orig_enc_func(obj) + return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(obj)) return msgpack.packb(o, default=_enc_func, **_sanitize_msgpack_kwargs(kwargs)) From 72b00368aaaa7782f60acc144eb4439b7ff32074 Mon Sep 17 00:00:00 2001 From: Tyler Johnson Date: Mon, 6 Jan 2020 00:58:19 -0700 Subject: [PATCH 5/9] disable pylint error for python-version specific code --- salt/utils/thread_local_proxy.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/salt/utils/thread_local_proxy.py b/salt/utils/thread_local_proxy.py index 8be7ad03be30..8744dbe3870b 100644 --- a/salt/utils/thread_local_proxy.py +++ b/salt/utils/thread_local_proxy.py @@ -584,16 +584,17 @@ def __index__(self): return func() def __coerce__(self, other): + # `coerce` isn't available on python 3.6, 3.7, or 3.8 reference = ThreadLocalProxy.get_reference(self) other = ThreadLocalProxy.unproxy(other) - return coerce(reference, other) + return coerce(reference, other) # pylint: disable=undefined-variable if six.PY2: # pylint: disable=incompatible-py3-code def __unicode__(self): reference = ThreadLocalProxy.get_reference(self) - return unicode(reference) + return unicode(reference) # pylint: disable=undefined-variable def __long__(self): reference = ThreadLocalProxy.get_reference(self) - return long(reference) + return long(reference) # pylint: disable=undefined-variable From f7896470ae83703da3b47bae36d85872442b4842 Mon Sep 17 00:00:00 2001 From: Tyler Johnson Date: Tue, 7 Jan 2020 20:41:04 -0700 Subject: [PATCH 6/9] Added tests that may be use with the threadsafe loader --- tests/unit/test_loader.py | 132 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/tests/unit/test_loader.py b/tests/unit/test_loader.py index c0877ff81153..02460534411c 100644 --- a/tests/unit/test_loader.py +++ b/tests/unit/test_loader.py @@ -31,9 +31,11 @@ import salt.loader import salt.utils.files import salt.utils.stringutils +import salt.utils.thread_local_proxy as thread_local_proxy # pylint: disable=import-error,no-name-in-module,redefined-builtin from salt.ext import six from salt.ext.six.moves import range + # pylint: enable=no-name-in-module,redefined-builtin log = logging.getLogger(__name__) @@ -132,6 +134,7 @@ class LazyLoaderVirtualEnabledTest(TestCase): ''' Test the base loader of salt. ''' + @classmethod def setUpClass(cls): cls.opts = salt.config.minion_config(None) @@ -248,6 +251,7 @@ class LazyLoaderVirtualDisabledTest(TestCase): ''' Test the loader of salt without __virtual__ ''' + @classmethod def setUpClass(cls): cls.opts = salt.config.minion_config(None) @@ -284,6 +288,7 @@ class LazyLoaderWhitelistTest(TestCase): ''' Test the loader of salt with a whitelist ''' + @classmethod def setUpClass(cls): cls.opts = salt.config.minion_config(None) @@ -323,6 +328,7 @@ class LazyLoaderGrainsBlacklistTest(TestCase): ''' Test the loader of grains with a blacklist ''' + def setUp(self): self.opts = salt.config.minion_config(None) @@ -348,6 +354,7 @@ class LazyLoaderSingleItem(TestCase): ''' Test loading a single item via the _load() function ''' + @classmethod def setUpClass(cls): cls.opts = salt.config.minion_config(None) @@ -758,6 +765,39 @@ def test_basic(self): self.loader.clear() self.assertIn(self.module_key, self.loader) + def test_race_condition(self): + # Create a second loader + opts = copy.deepcopy(self.opts) + dirs = salt.loader._module_dirs(opts, 'modules', 'module') + loader = salt.loader.LazyLoader( + dirs, + opts, + tag='module', + pack={'__utils__': self.utils, + '__proxy__': self.proxy, + '__salt__': self.minion_mods}) + + # ensure the module doesn't exist in either loader + self.assertNotIn(self.module_key, self.loader) + self.assertNotIn(self.module_key, loader) + + # Add it to both loaders + self.update_module() + self.update_lib() + self.loader.clear() + loader.clear() + self.assertIn(self.module_key, self.loader) + self.assertIn(self.module_key, loader) + + # Inject __opts__ into the module in both loaders + key = self.module_key.split('.')[0] + salt.loader._inject_into_mod(self.loader.loaded_modules[key], '__opts__', self.opts) + salt.loader._inject_into_mod(loader.loaded_modules[key], '__opts__', self.opts) + + # Verify that __opts__ is the same in both loaders + self.assertEqual(self.loader.loaded_modules[key].__opts__, loader.loaded_modules[key].__opts__) + self.assertIs(self.loader.loaded_modules[key].__opts__, loader.loaded_modules[key].__opts__) + def test_reload(self): # ensure it doesn't exist self.assertNotIn(self.module_key, self.loader) @@ -1054,6 +1094,7 @@ class LoaderGlobalsTest(ModuleCase): This is intended as a shorter term way of testing these so we don't break the loader ''' + def _verify_globals(self, mod_dict): ''' Verify that the globals listed in the doc string (from the test) are in these modules @@ -1178,6 +1219,7 @@ class RawModTest(TestCase): ''' Test the interface of raw_mod ''' + def setUp(self): self.opts = salt.config.minion_config(None) @@ -1381,3 +1423,93 @@ def test_osrelease_info_has_correct_type(self): grains = salt.loader.grains(self.opts) osrelease_info = grains['osrelease_info'] assert isinstance(osrelease_info, tuple), osrelease_info + + +class ThreadLocalProxyLoaderTest(TestCase): + module_key = 'loadertestsubmod.test' + module_name = 'lazyloadertest' + + @classmethod + def setUpClass(cls): + cls.opts = salt.config.minion_config(None) + cls.opts['grains'] = salt.loader.grains(cls.opts) + if not os.path.isdir(RUNTIME_VARS.TMP): + os.makedirs(RUNTIME_VARS.TMP) + cls.utils = salt.loader.utils(cls.opts) + cls.proxy = salt.loader.proxy(cls.opts) + cls.funcs = salt.loader.minion_mods(cls.opts, utils=cls.utils, proxy=cls.proxy) + + def setUp(self): + # Setup the module + self.module_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) + self.addCleanup(shutil.rmtree, self.module_dir, ignore_errors=True) + self.module_file = os.path.join(self.module_dir, '{0}.py'.format(self.module_key)) + with salt.utils.files.fopen(self.module_file, 'w') as fh: + fh.write(salt.utils.stringutils.to_str(loader_template)) + fh.flush() + os.fsync(fh.fileno()) + + self.loader1 = None + self.loader2 = None + self.count = 0 + self.lib_count = 0 + + def tearDown(self): + del self.module_dir + del self.module_file + del self.loader1 + del self.loader2 + + def test__inject_into_mod(self): + class test_module(object): + name = 'threadproxy.test.module' + + self.assertFalse(hasattr(test_module, '__test__')) + salt.loader._inject_into_mod(test_module, '__test__', self.opts) + self.assertTrue(hasattr(test_module, '__test__')) + self.assertIsInstance(test_module.__test__, thread_local_proxy.ThreadLocalProxy) + self.assertEqual(self.opts, test_module.__test__) + + salt.loader._inject_into_mod(test_module, '__test__', self.opts) + self.assertIsInstance(test_module.__test__, thread_local_proxy.ThreadLocalProxy) + self.assertEqual(self.opts, test_module.__test__) + + def test__inject_into_mod_force_lock(self): + class test_module(object): + name = 'threadproxy.test.module' + + self.assertFalse(hasattr(test_module, '__test2__')) + salt.loader._inject_into_mod(test_module, '__test2__', self.opts, force_lock=True) + self.assertTrue(hasattr(test_module, '__test2__')) + self.assertIsInstance(test_module.__test2__, thread_local_proxy.ThreadLocalProxy) + self.assertEqual(self.opts, test_module.__test2__) + + salt.loader._inject_into_mod(test_module, '__test2__', self.opts, force_lock=True) + self.assertIsInstance(test_module.__test2__, thread_local_proxy.ThreadLocalProxy) + self.assertEqual(self.opts, test_module.__test2__) + + def test_race_condition(self): + ''' + Test issue #2355, create two loaders and verify that all attributes are proxied and equal + ''' + self.loader1 = salt.loader.LazyLoader([self.module_dir], + copy.deepcopy(self.opts), + pack={'__utils__': self.utils, + '__salt__': self.funcs, + '__proxy__': self.proxy}, + tag='module') + + self.loader2 = salt.loader.LazyLoader([self.module_dir], + copy.deepcopy(self.opts), + pack={'__utils__': self.utils, + '__salt__': self.funcs, + '__proxy__': self.proxy}, + tag='module') + + for key, val in six.iteritems(self.loader1._dict): + self.assertIsInstance(val, thread_local_proxy.ThreadLocalProxy) + + for key, val in six.iteritems(self.loader2._dict): + self.assertIsInstance(val, thread_local_proxy.ThreadLocalProxy) + + self.assertEqual(self.loader1.opts, self.loader2.opts) From 5087ad8b5ccc5d2818efe74201a31ce20b3fae6b Mon Sep 17 00:00:00 2001 From: Tyler Johnson Date: Wed, 8 Jan 2020 17:22:41 -0700 Subject: [PATCH 7/9] Consolidated new tests of injection_mod --- tests/unit/test_loader.py | 136 ++++++-------------------------------- 1 file changed, 22 insertions(+), 114 deletions(-) diff --git a/tests/unit/test_loader.py b/tests/unit/test_loader.py index 02460534411c..04bb5c6d50ec 100644 --- a/tests/unit/test_loader.py +++ b/tests/unit/test_loader.py @@ -765,39 +765,6 @@ def test_basic(self): self.loader.clear() self.assertIn(self.module_key, self.loader) - def test_race_condition(self): - # Create a second loader - opts = copy.deepcopy(self.opts) - dirs = salt.loader._module_dirs(opts, 'modules', 'module') - loader = salt.loader.LazyLoader( - dirs, - opts, - tag='module', - pack={'__utils__': self.utils, - '__proxy__': self.proxy, - '__salt__': self.minion_mods}) - - # ensure the module doesn't exist in either loader - self.assertNotIn(self.module_key, self.loader) - self.assertNotIn(self.module_key, loader) - - # Add it to both loaders - self.update_module() - self.update_lib() - self.loader.clear() - loader.clear() - self.assertIn(self.module_key, self.loader) - self.assertIn(self.module_key, loader) - - # Inject __opts__ into the module in both loaders - key = self.module_key.split('.')[0] - salt.loader._inject_into_mod(self.loader.loaded_modules[key], '__opts__', self.opts) - salt.loader._inject_into_mod(loader.loaded_modules[key], '__opts__', self.opts) - - # Verify that __opts__ is the same in both loaders - self.assertEqual(self.loader.loaded_modules[key].__opts__, loader.loaded_modules[key].__opts__) - self.assertIs(self.loader.loaded_modules[key].__opts__, loader.loaded_modules[key].__opts__) - def test_reload(self): # ensure it doesn't exist self.assertNotIn(self.module_key, self.loader) @@ -1426,90 +1393,31 @@ def test_osrelease_info_has_correct_type(self): class ThreadLocalProxyLoaderTest(TestCase): - module_key = 'loadertestsubmod.test' - module_name = 'lazyloadertest' - @classmethod def setUpClass(cls): cls.opts = salt.config.minion_config(None) - cls.opts['grains'] = salt.loader.grains(cls.opts) - if not os.path.isdir(RUNTIME_VARS.TMP): - os.makedirs(RUNTIME_VARS.TMP) - cls.utils = salt.loader.utils(cls.opts) - cls.proxy = salt.loader.proxy(cls.opts) - cls.funcs = salt.loader.minion_mods(cls.opts, utils=cls.utils, proxy=cls.proxy) - - def setUp(self): - # Setup the module - self.module_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP) - self.addCleanup(shutil.rmtree, self.module_dir, ignore_errors=True) - self.module_file = os.path.join(self.module_dir, '{0}.py'.format(self.module_key)) - with salt.utils.files.fopen(self.module_file, 'w') as fh: - fh.write(salt.utils.stringutils.to_str(loader_template)) - fh.flush() - os.fsync(fh.fileno()) - - self.loader1 = None - self.loader2 = None - self.count = 0 - self.lib_count = 0 - - def tearDown(self): - del self.module_dir - del self.module_file - del self.loader1 - del self.loader2 def test__inject_into_mod(self): class test_module(object): - name = 'threadproxy.test.module' - - self.assertFalse(hasattr(test_module, '__test__')) - salt.loader._inject_into_mod(test_module, '__test__', self.opts) - self.assertTrue(hasattr(test_module, '__test__')) - self.assertIsInstance(test_module.__test__, thread_local_proxy.ThreadLocalProxy) - self.assertEqual(self.opts, test_module.__test__) - - salt.loader._inject_into_mod(test_module, '__test__', self.opts) - self.assertIsInstance(test_module.__test__, thread_local_proxy.ThreadLocalProxy) - self.assertEqual(self.opts, test_module.__test__) - - def test__inject_into_mod_force_lock(self): - class test_module(object): - name = 'threadproxy.test.module' - - self.assertFalse(hasattr(test_module, '__test2__')) - salt.loader._inject_into_mod(test_module, '__test2__', self.opts, force_lock=True) - self.assertTrue(hasattr(test_module, '__test2__')) - self.assertIsInstance(test_module.__test2__, thread_local_proxy.ThreadLocalProxy) - self.assertEqual(self.opts, test_module.__test2__) - - salt.loader._inject_into_mod(test_module, '__test2__', self.opts, force_lock=True) - self.assertIsInstance(test_module.__test2__, thread_local_proxy.ThreadLocalProxy) - self.assertEqual(self.opts, test_module.__test2__) - - def test_race_condition(self): - ''' - Test issue #2355, create two loaders and verify that all attributes are proxied and equal - ''' - self.loader1 = salt.loader.LazyLoader([self.module_dir], - copy.deepcopy(self.opts), - pack={'__utils__': self.utils, - '__salt__': self.funcs, - '__proxy__': self.proxy}, - tag='module') - - self.loader2 = salt.loader.LazyLoader([self.module_dir], - copy.deepcopy(self.opts), - pack={'__utils__': self.utils, - '__salt__': self.funcs, - '__proxy__': self.proxy}, - tag='module') - - for key, val in six.iteritems(self.loader1._dict): - self.assertIsInstance(val, thread_local_proxy.ThreadLocalProxy) - - for key, val in six.iteritems(self.loader2._dict): - self.assertIsInstance(val, thread_local_proxy.ThreadLocalProxy) - - self.assertEqual(self.loader1.opts, self.loader2.opts) + name = 'inject_into_mod.test.module' + + # First path, Force is not true, proxy doesn't exist -- also takes the path of Force True and proxy not exist + salt.loader._inject_into_mod(test_module, '__opts__', self.opts) + self.assertTrue(hasattr(test_module, '__opts__')) + self.assertIsInstance(test_module.__opts__, thread_local_proxy.ThreadLocalProxy) + self.assertEqual(self.opts, test_module.__opts__) + foo = test_module.__opts__ + + # Second path, Force is not true, proxy exists + salt.loader._inject_into_mod(test_module, '__opts__', self.opts) + self.assertIsInstance(test_module.__opts__, thread_local_proxy.ThreadLocalProxy) + self.assertEqual(self.opts, test_module.__opts__) + bar = test_module.__opts__ + + self.assertIs(foo, bar) + self.assertEqual(foo, bar) + foo['yes'] = 'no' + self.assertIn('yes', bar) + + # Final path, Force is true, proxy exists + salt.loader._inject_into_mod(test_module, '__opts__', self.opts, True) From 7b504862979b15bdcecec4fc3222c72f40735bd5 Mon Sep 17 00:00:00 2001 From: Tyler Johnson Date: Fri, 24 Apr 2020 03:46:28 -0600 Subject: [PATCH 8/9] blackened files --- salt/loader.py | 36 ++++++++----- salt/utils/json.py | 28 ++++++----- salt/utils/msgpack.py | 8 +-- salt/utils/thread_local_proxy.py | 56 ++++++++++----------- tests/unit/test_loader.py | 15 +++--- tests/unit/utils/test_thread_local_proxy.py | 8 +-- 6 files changed, 83 insertions(+), 68 deletions(-) diff --git a/salt/loader.py b/salt/loader.py index f45d3462dfb2..c076a2a744ea 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -1092,7 +1092,7 @@ def _mod_type(module_path): def _inject_into_mod(mod, name, value, force_lock=False): - ''' + """ Inject a variable into a module. This is used to inject "globals" like ``__salt__``, ``__pillar``, or ``grains``. @@ -1124,7 +1124,7 @@ def _inject_into_mod(mod, name, value, force_lock=False): module. If ``False`` (the default), this function checks for the module's variable without acquiring the lock and only acquires the lock if a new proxy has to be created and injected. - ''' + """ old_value = getattr(mod, name, None) # We use a double-checked locking scheme in order to avoid taking the lock # when a proxy object has already been injected. @@ -1256,10 +1256,14 @@ def __init__( for k, v in six.iteritems(self.pack): if v is None: # if the value of a pack is None, lets make an empty dict - value = thread_local_proxy.ThreadLocalProxy.unproxy(self.context_dict.get(k, {})) + value = thread_local_proxy.ThreadLocalProxy.unproxy( + self.context_dict.get(k, {}) + ) self.context_dict[k] = value - self.pack[k] = salt.utils.context.NamespacedDictWrapper(self.context_dict, k) + self.pack[k] = salt.utils.context.NamespacedDictWrapper( + self.context_dict, k + ) self.whitelist = whitelist self.virtual_enable = virtual_enable @@ -1537,18 +1541,24 @@ def clear(self): def __prep_mod_opts(self, opts): """ Strip out of the opts any logger instance - ''' - if '__grains__' not in self.pack: - _grains = thread_local_proxy.ThreadLocalProxy.unproxy(opts.get('grains', {})) + """ + if "__grains__" not in self.pack: + _grains = thread_local_proxy.ThreadLocalProxy.unproxy( + opts.get("grains", {}) + ) - self.context_dict['grains'] = _grains - self.pack['__grains__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'grains') + self.context_dict["grains"] = _grains + self.pack["__grains__"] = salt.utils.context.NamespacedDictWrapper( + self.context_dict, "grains" + ) - if '__pillar__' not in self.pack: - pillar = thread_local_proxy.ThreadLocalProxy.unproxy(opts.get('pillar', {})) + if "__pillar__" not in self.pack: + pillar = thread_local_proxy.ThreadLocalProxy.unproxy(opts.get("pillar", {})) - self.context_dict['pillar'] = pillar - self.pack['__pillar__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'pillar') + self.context_dict["pillar"] = pillar + self.pack["__pillar__"] = salt.utils.context.NamespacedDictWrapper( + self.context_dict, "pillar" + ) mod_opts = {} for key, val in list(opts.items()): diff --git a/salt/utils/json.py b/salt/utils/json.py index f7a257d5f73c..6a596c2f7548 100644 --- a/salt/utils/json.py +++ b/salt/utils/json.py @@ -118,18 +118,20 @@ def dump(obj, fp, **kwargs): You can pass an alternate json module (loaded via import_json() above) using the _json_module argument) - ''' - json_module = kwargs.pop('_json_module', json) - orig_enc_func = kwargs.pop('default', lambda x: x) + """ + json_module = kwargs.pop("_json_module", json) + orig_enc_func = kwargs.pop("default", lambda x: x) def _enc_func(_obj): return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(_obj)) - if 'ensure_ascii' not in kwargs: - kwargs['ensure_ascii'] = False + if "ensure_ascii" not in kwargs: + kwargs["ensure_ascii"] = False if six.PY2: obj = salt.utils.data.encode(obj) - return json_module.dump(obj, fp, default=_enc_func, **kwargs) # future lint: blacklisted-function + return json_module.dump( + obj, fp, default=_enc_func, **kwargs + ) # future lint: blacklisted-function def dumps(obj, **kwargs): @@ -146,15 +148,17 @@ def dumps(obj, **kwargs): You can pass an alternate json module (loaded via import_json() above) using the _json_module argument) - ''' - json_module = kwargs.pop('_json_module', json) - orig_enc_func = kwargs.pop('default', lambda x: x) + """ + json_module = kwargs.pop("_json_module", json) + orig_enc_func = kwargs.pop("default", lambda x: x) def _enc_func(_obj): return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(_obj)) - if 'ensure_ascii' not in kwargs: - kwargs['ensure_ascii'] = False + if "ensure_ascii" not in kwargs: + kwargs["ensure_ascii"] = False if six.PY2: obj = salt.utils.data.encode(obj) - return json_module.dumps(obj, default=_enc_func, **kwargs) # future lint: blacklisted-function + return json_module.dumps( + obj, default=_enc_func, **kwargs + ) # future lint: blacklisted-function diff --git a/salt/utils/msgpack.py b/salt/utils/msgpack.py index 4dd380f55fb9..f6a2c4250c64 100644 --- a/salt/utils/msgpack.py +++ b/salt/utils/msgpack.py @@ -96,8 +96,8 @@ def pack(o, stream, **kwargs): By default, this function uses the msgpack module and falls back to msgpack_pure, if the msgpack is not available. - ''' - orig_enc_func = kwargs.pop('default', lambda x: x) + """ + orig_enc_func = kwargs.pop("default", lambda x: x) def _enc_func(obj): return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(obj)) @@ -115,8 +115,8 @@ def packb(o, **kwargs): By default, this function uses the msgpack module and falls back to msgpack_pure, if the msgpack is not available. - ''' - orig_enc_func = kwargs.pop('default', lambda x: x) + """ + orig_enc_func = kwargs.pop("default", lambda x: x) def _enc_func(obj): return orig_enc_func(thread_local_proxy.ThreadLocalProxy.unproxy(obj)) diff --git a/salt/utils/thread_local_proxy.py b/salt/utils/thread_local_proxy.py index 8744dbe3870b..9239cfa1edaf 100644 --- a/salt/utils/thread_local_proxy.py +++ b/salt/utils/thread_local_proxy.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- -''' +""" Proxy object that can reference different values depending on the current thread of execution. ..versionadded:: 2018.3.4 -''' +""" # Import python libs from __future__ import absolute_import @@ -16,7 +16,7 @@ class ThreadLocalProxy(object): - ''' + """ Proxy that delegates all operations to its referenced object. The referenced object is hold through a thread-local variable, so that this proxy may refer to different objects in different threads of execution. @@ -39,13 +39,13 @@ class ThreadLocalProxy(object): This class has primarily been designed for use by the Salt loader, but it might also be useful in other places. - ''' + """ - __slots__ = ['_thread_local', '_last_reference', '_fallback_to_shared'] + __slots__ = ["_thread_local", "_last_reference", "_fallback_to_shared"] @staticmethod def get_reference(proxy): - ''' + """ Return the object that is referenced by the specified proxy. If the proxy has not been bound to a reference for the current thread, @@ -64,18 +64,17 @@ def get_reference(proxy): specified object is not an instance of `ThreadLocalProxy`, the behavior is unspecified. Typically, an ``AttributeError`` is going to be raised. - ''' - thread_local = object.__getattribute__(proxy, '_thread_local') + """ + thread_local = object.__getattribute__(proxy, "_thread_local") try: return thread_local.reference except AttributeError: - fallback_to_shared = object.__getattribute__( - proxy, '_fallback_to_shared') + fallback_to_shared = object.__getattribute__(proxy, "_fallback_to_shared") if fallback_to_shared: # If the reference has never been set in the current thread of # execution, we use the reference that has been last set by any # thread. - reference = object.__getattribute__(proxy, '_last_reference') + reference = object.__getattribute__(proxy, "_last_reference") # We save the reference in the thread local so that future # calls to get_reference will have consistent results. ThreadLocalProxy.set_reference(proxy, reference) @@ -89,11 +88,12 @@ def get_reference(proxy): # For this reason, we raise an AttributeError with an error # message explaining the problem. raise AttributeError( - 'The proxy object has not been bound to a reference in this thread of execution.') + "The proxy object has not been bound to a reference in this thread of execution." + ) @staticmethod def set_reference(proxy, new_reference): - ''' + """ Set the reference to be used the current thread of execution. After calling this function, the specified proxy will act like it was @@ -108,7 +108,7 @@ def set_reference(proxy, new_reference): new_reference: reference the proxy should point to for the current thread after calling this function. - ''' + """ # If the new reference is itself a proxy, we have to ensure that it does # not refer to this proxy. If it does, we simply return because updating # the reference would result in an inifite loop when trying to use the @@ -118,13 +118,13 @@ def set_reference(proxy, new_reference): if possible_proxy is proxy: return possible_proxy = ThreadLocalProxy.get_reference(possible_proxy) - thread_local = object.__getattribute__(proxy, '_thread_local') + thread_local = object.__getattribute__(proxy, "_thread_local") thread_local.reference = new_reference - object.__setattr__(proxy, '_last_reference', new_reference) + object.__setattr__(proxy, "_last_reference", new_reference) @staticmethod def unset_reference(proxy): - ''' + """ Unset the reference to be used by the current thread of execution. After calling this function, the specified proxy will act like the @@ -135,13 +135,13 @@ def unset_reference(proxy): specified object is not an instance of `ThreadLocalProxy`, the behavior is unspecified. Typically, an ``AttributeError`` is going to be raised. - ''' - thread_local = object.__getattribute__(proxy, '_thread_local') + """ + thread_local = object.__getattribute__(proxy, "_thread_local") del thread_local.reference @staticmethod def unproxy(possible_proxy): - ''' + """ Unwrap and return the object referenced by a proxy. This function is very similar to :func:`get_reference`, but works for @@ -154,13 +154,13 @@ def unproxy(possible_proxy): possible_proxy: object that might or might not be a proxy. - ''' + """ while isinstance(possible_proxy, ThreadLocalProxy): possible_proxy = ThreadLocalProxy.get_reference(possible_proxy) return possible_proxy def __init__(self, initial_reference, fallback_to_shared=False): - ''' + """ Create a proxy object that references the specified object. initial_reference: @@ -175,9 +175,9 @@ def __init__(self, initial_reference, fallback_to_shared=False): reference last set by any thread. If ``False`` (the default), an exception is raised when the proxy is used in a thread without first initializing the reference in this thread. - ''' - object.__setattr__(self, '_thread_local', threading.local()) - object.__setattr__(self, '_fallback_to_shared', fallback_to_shared) + """ + object.__setattr__(self, "_thread_local", threading.local()) + object.__setattr__(self, "_fallback_to_shared", fallback_to_shared) ThreadLocalProxy.set_reference(self, initial_reference) def __repr__(self): @@ -541,11 +541,11 @@ def __ior__(self, other): def __neg__(self): reference = ThreadLocalProxy.get_reference(self) - return - reference + return -reference def __pos__(self): reference = ThreadLocalProxy.get_reference(self) - return + reference + return +reference def __abs__(self): reference = ThreadLocalProxy.get_reference(self) @@ -553,7 +553,7 @@ def __abs__(self): def __invert__(self): reference = ThreadLocalProxy.get_reference(self) - return ~ reference + return ~reference def __complex__(self): reference = ThreadLocalProxy.get_reference(self) diff --git a/tests/unit/test_loader.py b/tests/unit/test_loader.py index 41e779298848..ed3648eebf08 100644 --- a/tests/unit/test_loader.py +++ b/tests/unit/test_loader.py @@ -27,6 +27,7 @@ import salt.utils.files import salt.utils.stringutils import salt.utils.thread_local_proxy as thread_local_proxy + # pylint: disable=import-error,no-name-in-module,redefined-builtin from salt.ext import six from salt.ext.six.moves import range @@ -1497,25 +1498,25 @@ def setUpClass(cls): def test__inject_into_mod(self): class test_module(object): - name = 'inject_into_mod.test.module' + name = "inject_into_mod.test.module" # First path, Force is not true, proxy doesn't exist -- also takes the path of Force True and proxy not exist - salt.loader._inject_into_mod(test_module, '__opts__', self.opts) - self.assertTrue(hasattr(test_module, '__opts__')) + salt.loader._inject_into_mod(test_module, "__opts__", self.opts) + self.assertTrue(hasattr(test_module, "__opts__")) self.assertIsInstance(test_module.__opts__, thread_local_proxy.ThreadLocalProxy) self.assertEqual(self.opts, test_module.__opts__) foo = test_module.__opts__ # Second path, Force is not true, proxy exists - salt.loader._inject_into_mod(test_module, '__opts__', self.opts) + salt.loader._inject_into_mod(test_module, "__opts__", self.opts) self.assertIsInstance(test_module.__opts__, thread_local_proxy.ThreadLocalProxy) self.assertEqual(self.opts, test_module.__opts__) bar = test_module.__opts__ self.assertIs(foo, bar) self.assertEqual(foo, bar) - foo['yes'] = 'no' - self.assertIn('yes', bar) + foo["yes"] = "no" + self.assertIn("yes", bar) # Final path, Force is true, proxy exists - salt.loader._inject_into_mod(test_module, '__opts__', self.opts, True) + salt.loader._inject_into_mod(test_module, "__opts__", self.opts, True) diff --git a/tests/unit/utils/test_thread_local_proxy.py b/tests/unit/utils/test_thread_local_proxy.py index 0cd77379aed8..ebcfeb07d147 100644 --- a/tests/unit/utils/test_thread_local_proxy.py +++ b/tests/unit/utils/test_thread_local_proxy.py @@ -11,15 +11,15 @@ class ThreadLocalProxyTestCase(TestCase): - ''' + """ Test case for salt.utils.thread_local_proxy module. - ''' + """ def test_set_reference_avoid_loop(self): - ''' + """ Test that passing another proxy (or the same proxy) to set_reference does not results in a recursive proxy loop. - ''' + """ test_obj1 = 1 test_obj2 = 2 proxy1 = thread_local_proxy.ThreadLocalProxy(test_obj1) From 3f7f86909f0cd0bf7fd6beb40aa86a6ef8e95300 Mon Sep 17 00:00:00 2001 From: Tyler Johnson Date: Fri, 24 Apr 2020 03:53:31 -0600 Subject: [PATCH 9/9] passing pre-commit --- salt/loader.py | 3 +-- salt/utils/thread_local_proxy.py | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/salt/loader.py b/salt/loader.py index c076a2a744ea..2b949cafc478 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -35,9 +35,8 @@ import salt.utils.odict import salt.utils.platform import salt.utils.stringutils -import salt.utils.versions -import salt.utils.stringutils import salt.utils.thread_local_proxy as thread_local_proxy +import salt.utils.versions from salt.exceptions import LoaderError # Import 3rd-party libs diff --git a/salt/utils/thread_local_proxy.py b/salt/utils/thread_local_proxy.py index 9239cfa1edaf..df2db08cc91e 100644 --- a/salt/utils/thread_local_proxy.py +++ b/salt/utils/thread_local_proxy.py @@ -9,6 +9,7 @@ # Import python libs from __future__ import absolute_import + import threading # Import 3rd-party libs