From 604b9bbd6d7b6354b5f2961536bcb80079199c4a Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 17 Oct 2015 10:37:10 -0700 Subject: [PATCH 01/11] Fix for exception when exiting out of a runner finally will be run even if self.exit() is called. ``` Traceback (most recent call last): File "/home/jacksontj/.virtualenvs/salt/bin/salt-run", line 10, in execfile(__file__) File "/home/jacksontj/src/salt/scripts/salt-run", line 10, in salt_run() File "/home/jacksontj/src/salt/salt/scripts.py", line 337, in salt_run client.run() File "/home/jacksontj/src/salt/salt/cli/run.py", line 47, in run if isinstance(ret, dict) and 'retcode' in ret: UnboundLocalError: local variable 'ret' referenced before assignment ``` --- salt/cli/run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/salt/cli/run.py b/salt/cli/run.py index de3471fc4d8a..7e476c445304 100644 --- a/salt/cli/run.py +++ b/salt/cli/run.py @@ -39,13 +39,13 @@ def run(self): pr = activate_profile(profiling_enabled) try: ret = runner.run() + if isinstance(ret, dict) and 'retcode' in ret: + self.exit(ret['retcode']) finally: output_profile( pr, stats_path=self.options.profiling_path, stop=True) - if isinstance(ret, dict) and 'retcode' in ret: - self.exit(ret['retcode']) except SaltClientError as exc: raise SystemExit(str(exc)) From 6838a9534f0dd01291eacfbd6621be43ec0396f4 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 9 Oct 2015 16:11:39 -0700 Subject: [PATCH 02/11] Initial addition of ContextDict Inspired by #23373 The basic issue we ran into is that the loader is injecting globals directly into the global namespace. This means that these injected values are not thread or coroutine safe-- meaning we can never do more than one thing at a time. Instead of multiprocessing everything to death-- we can simply use a stack_context to nicely handle this down in the core. As far as the module authors/users are concerned nothing has changed-- but the storage behind the scenes is now per-JID. This same set of classes can easily be used to store additional data (next candidates are reactors, master MWorker tasks, etc.). --- salt/loader.py | 59 ++++++---------- salt/minion.py | 22 ++++-- salt/utils/context.py | 141 +++++++++++++++++++++++++++++++++++++ tests/unit/context_test.py | 133 ++++++++++++++++++++++++++++++++++ 4 files changed, 312 insertions(+), 43 deletions(-) create mode 100644 tests/unit/context_test.py diff --git a/salt/loader.py b/salt/loader.py index e56d197f7805..6ca0ffc1e126 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -24,7 +24,7 @@ from salt.exceptions import LoaderError from salt.template import check_render_pipe_str from salt.utils.decorators import Depends -from salt.utils import context +import salt.utils.context import salt.utils.lazy import salt.utils.event import salt.utils.odict @@ -191,13 +191,6 @@ def minion_mods( __salt__['test.ping']() ''' # TODO Publish documentation for module whitelisting - if context is None: - context = {} - if utils is None: - utils = {} - if proxy is None: - proxy = {} - if not whitelist: whitelist = opts.get('whitelist_modules', None) ret = LazyLoader(_module_dirs(opts, 'modules', 'module'), @@ -293,8 +286,6 @@ def returners(opts, functions, whitelist=None, context=None): ''' Returns the returner modules ''' - if context is None: - context = {} return LazyLoader(_module_dirs(opts, 'returners', 'returner'), opts, tag='returner', @@ -307,8 +298,6 @@ def utils(opts, whitelist=None, context=None): ''' Returns the utility modules ''' - if context is None: - context = {} return LazyLoader(_module_dirs(opts, 'utils', 'utils', ext_type_dirs='utils_dirs'), opts, tag='utils', @@ -320,8 +309,6 @@ def pillars(opts, functions, context=None): ''' Returns the pillars modules ''' - if context is None: - context = {} ret = LazyLoader(_module_dirs(opts, 'pillar', 'pillar'), opts, tag='pillar', @@ -449,8 +436,6 @@ def beacons(opts, functions, context=None): :param dict functions: A dictionary of minion modules, with module names as keys and funcs as values. ''' - if context is None: - context = {} return LazyLoader(_module_dirs(opts, 'beacons', 'beacons'), opts, tag='beacons', @@ -496,10 +481,6 @@ def ssh_wrapper(opts, functions=None, context=None): ''' Returns the custom logging handler modules ''' - if context is None: - context = {} - if functions is None: - functions = {} return LazyLoader(_module_dirs(opts, 'wrapper', 'wrapper', @@ -888,20 +869,30 @@ def __init__(self, virtual_enable=True, static_modules=None ): # pylint: disable=W0231 + ''' + In pack, if any of the values are None they will be replaced with an + empty context-specific dict + ''' self.inject_globals = {} + self.pack = {} if pack is None else pack + if opts is None: + opts = {} + self.context_dict = salt.utils.context.ContextDict() self.opts = self.__prep_mod_opts(opts) self.module_dirs = module_dirs - if opts is None: - opts = {} self.tag = tag self.loaded_base_name = loaded_base_name or LOADED_BASE_NAME self.mod_type_check = mod_type_check or _mod_type - self.pack = {} if pack is None else pack if '__context__' not in self.pack: - self.pack['__context__'] = {} + self.pack['__context__'] = None + + for k, v in self.pack.iteritems(): + if v is None: # if the value of a pack is None, lets make an empty dict + self.context_dict.setdefault(k, {}) + self.pack[k] = salt.utils.context.NamespacedDictWrapper(self.context_dict, k) self.whitelist = whitelist self.virtual_enable = virtual_enable @@ -1079,14 +1070,13 @@ def __prep_mod_opts(self, opts): ''' Strip out of the opts any logger instance ''' - if 'grains' in opts: - self._grains = opts['grains'] - else: - self._grains = {} - if 'pillar' in opts: - self._pillar = opts['pillar'] - else: - self._pillar = {} + if '__grains__' not in self.pack: + self.context_dict['grains'] = opts.get('grains', {}) + self.pack['__grains__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'grains') + + if '__pillar__' not in self.pack: + self.context_dict['pillar'] = opts.get('pillar', {}) + self.pack['__pillar__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'pillar') mod_opts = {} for key, val in list(opts.items()): @@ -1204,9 +1194,6 @@ def _load_module(self, name): else: mod.__opts__ = self.opts - mod.__grains__ = self._grains - mod.__pillar__ = self._pillar - # pack whatever other globals we were asked to for p_name, p_value in six.iteritems(self.pack): setattr(mod, p_name, p_value) @@ -1528,7 +1515,7 @@ def global_injector_decorator(inject_globals): def inner_decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): - with context.func_globals_inject(f, **inject_globals): + with salt.utils.context.func_globals_inject(f, **inject_globals): return f(*args, **kwargs) return wrapper return inner_decorator diff --git a/salt/minion.py b/salt/minion.py index 5defa88ff85b..22ed92992a01 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -80,6 +80,7 @@ import salt.payload import salt.syspaths import salt.utils +import salt.utils.context import salt.utils.jid import salt.pillar import salt.utils.args @@ -960,10 +961,6 @@ def _handle_decoded_payload(self, data): self.functions, self.returners, self.function_errors, self.executors = self._load_modules() self.schedule.functions = self.functions self.schedule.returners = self.returners - if isinstance(data['fun'], tuple) or isinstance(data['fun'], list): - target = Minion._thread_multi_return - else: - target = Minion._thread_return # We stash an instance references to allow for the socket # communication in Windows. You can't pickle functions, and thus # python needs to be able to reconstruct the reference on the other @@ -975,20 +972,31 @@ def _handle_decoded_payload(self, data): # running on windows instance = None process = multiprocessing.Process( - target=target, args=(instance, self.opts, data) + target=self._target, args=(instance, self.opts, data) ) else: process = threading.Thread( - target=target, + target=self._target, args=(instance, self.opts, data), name=data['jid'] ) process.start() - if not sys.platform.startswith('win'): + # TODO: remove the windows specific check? + if not sys.platform.startswith('win') and self.opts['multiprocessing']: + # we only want to join() immediately if we are daemonizing a process process.join() else: self.win_proc.append(process) + @classmethod + def _target(cls, minion_instance, opts, data): + # TODO: clone all contexts? Should be one per loader :/ + with tornado.stack_context.StackContext(minion_instance.functions.context_dict.clone): + if isinstance(data['fun'], tuple) or isinstance(data['fun'], list): + Minion._thread_multi_return(minion_instance, opts, data) + else: + Minion._thread_return(minion_instance, opts, data) + @classmethod def _thread_return(cls, minion_instance, opts, data): ''' diff --git a/salt/utils/context.py b/salt/utils/context.py index 89d6acb7c67f..89d9736059cf 100644 --- a/salt/utils/context.py +++ b/salt/utils/context.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- ''' :codeauthor: :email:`Pedro Algarvio (pedro@algarvio.me)` + :codeauthor: :email:`Thomas Jackson (jacksontj.89@gmail.com)` salt.utils.context @@ -11,8 +12,13 @@ from __future__ import absolute_import # Import python libs +import copy +import threading +import collections from contextlib import contextmanager +import salt.ext.six + @contextmanager def func_globals_inject(func, **overrides): @@ -49,3 +55,138 @@ def func_globals_inject(func, **overrides): # Remove any entry injected in the function globals for injected in injected_func_globals: del func_globals[injected] + + +class ContextDict(collections.MutableMapping): + """A context manager that saves some per-thread state globally. + Intended for use with Tornado's StackContext. + + Provide arbitrary data as kwargs upon creation, + then allow any children to override the values of the parent. + """ + + def __init__(self, **data): + # state should be thread local, so this object can be threadsafe + self._state = threading.local() + # variable for the overriden data + self._state.data = None + self.global_data = {} + + @property + def active(self): + '''Determine if this ContextDict is currently overriden + Since the ContextDict can be overriden in each thread, we check whether + the _state.data is set or not. + ''' + try: + return self._state.data is not None + except AttributeError: + return False + + # TODO: rename? + def clone(self, **kwargs): + ''' + Clone this context, and return the ChildContextDict + ''' + child = ChildContextDict(parent=self, overrides=kwargs) + return child + + def __setitem__(self, key, val): + if self.active: + self._state.data[key] = val + else: + self.global_data[key] = val + + def __delitem__(self, key): + if self.active: + del self._state.data[key] + else: + del self.global_data[key] + + def __getitem__(self, key): + if self.active: + return self._state.data[key] + else: + return self.global_data[key] + + def __len__(self): + if self.active: + return len(self._state.data) + else: + return len(self.global_data) + + def __iter__(self): + if self.active: + return iter(self._state.data) + else: + return iter(self.global_data) + + +class ChildContextDict(collections.MutableMapping): + '''An overrideable child of ContextDict + + ''' + def __init__(self, parent, overrides=None): + self.parent = parent + self._data = {} if overrides is None else overrides + + # merge self.global_data into self._data + for k, v in self.parent.global_data.iteritems(): + if k not in self._data: + self._data[k] = copy.deepcopy(v) + + def __setitem__(self, key, val): + self._data[key] = val + + def __delitem__(self, key): + del self._data[key] + + def __getitem__(self, key): + return self._data[key] + + def __len__(self): + return len(self._data) + + def __iter__(self): + return iter(self._data) + + def __enter__(self): + self.parent._state.data = self._data + + def __exit__(self, *exc): + self.parent._state.data = None + + +class NamespacedDictWrapper(collections.MutableMapping, dict): + ''' + Create a dict which wraps another dict with a specific prefix of key(s) + + MUST inherit from dict to serialize through msgpack correctly + ''' + def __init__(self, d, pre_keys): + self.__dict = d + if isinstance(pre_keys, salt.ext.six.string_types): + self.pre_keys = (pre_keys,) + else: + self.pre_keys = pre_keys + + def _dict(self): + r = self.__dict + for k in self.pre_keys: + r = r[k] + return r + + def __setitem__(self, key, val): + self._dict()[key] = val + + def __delitem__(self, key): + del self._dict()[key] + + def __getitem__(self, key): + return self._dict()[key] + + def __len__(self): + return len(self._dict()) + + def __iter__(self): + return iter(self._dict()) diff --git a/tests/unit/context_test.py b/tests/unit/context_test.py new file mode 100644 index 000000000000..aa216ff3e4d7 --- /dev/null +++ b/tests/unit/context_test.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +''' + tests.unit.context_test + ~~~~~~~~~~~~~~~~~~~~ +''' +# Import python libs +from __future__ import absolute_import +import tornado.stack_context +import threading +import time + +# Import Salt Testing libs +from salttesting import TestCase +from salttesting.helpers import ensure_in_syspath +ensure_in_syspath('../../') + +# Import Salt libs +from salt.utils.context import ContextDict, NamespacedDictWrapper + + +class ContextDictTests(TestCase): + def setUp(self): + self.cd = ContextDict() + # set a global value + self.cd['foo'] = 'global' + + def test_threads(self): + rets = [] + + def tgt(x, s): + inner_ret = [] + over = self.cd.clone() + + inner_ret.append(self.cd.get('foo')) + with over: + inner_ret.append(over.get('foo')) + over['foo'] = x + inner_ret.append(over.get('foo')) + time.sleep(s) + inner_ret.append(over.get('foo')) + rets.append(inner_ret) + + threads = [] + NUM_JOBS = 5 + for x in xrange(0, NUM_JOBS): + s = NUM_JOBS - x + t = threading.Thread(target=tgt, args=(x, s)) + t.start() + threads.append(t) + + for t in threads: + t.join() + + for r in rets: + self.assertEqual(r[0], r[1]) + self.assertEqual(r[2], r[3]) + + def test_basic(self): + '''Test that the contextDict is a dict + ''' + # ensure we get the global value + self.assertEqual( + dict(self.cd), + {'foo': 'global'}, + ) + + def test_override(self): + over = self.cd.clone() + over['bar'] = 'global' + self.assertEqual( + dict(over), + {'foo': 'global', 'bar': 'global'}, + ) + self.assertEqual( + dict(self.cd), + {'foo': 'global'}, + ) + with tornado.stack_context.StackContext(lambda: over): + self.assertEqual( + dict(over), + {'foo': 'global', 'bar': 'global'}, + ) + self.assertEqual( + dict(self.cd), + {'foo': 'global', 'bar': 'global'}, + ) + over['bar'] = 'baz' + self.assertEqual( + dict(over), + {'foo': 'global', 'bar': 'baz'}, + ) + self.assertEqual( + dict(self.cd), + {'foo': 'global', 'bar': 'baz'}, + ) + self.assertEqual( + dict(over), + {'foo': 'global', 'bar': 'baz'}, + ) + self.assertEqual( + dict(self.cd), + {'foo': 'global'}, + ) + + def test_multiple_contexts(self): + cds = [] + for x in xrange(0, 10): + cds.append(self.cd.clone(bar=x)) + for x, cd in enumerate(cds): + self.assertNotIn('bar', self.cd) + with tornado.stack_context.StackContext(lambda: cd): + self.assertEqual( + dict(self.cd), + {'bar': x, 'foo': 'global'}, + ) + self.assertNotIn('bar', self.cd) + + +class NamespacedDictWrapperTests(TestCase): + PREFIX = 'prefix' + + def setUp(self): + self._dict = {} + + def test_single_key(self): + self._dict['prefix'] = {'foo': 'bar'} + w = NamespacedDictWrapper(self._dict, 'prefix') + self.assertEqual(w['foo'], 'bar') + + def test_multiple_key(self): + self._dict['prefix'] = {'foo': {'bar': 'baz'}} + w = NamespacedDictWrapper(self._dict, ('prefix', 'foo')) + self.assertEqual(w['bar'], 'baz') From df97ef8b412c1fb9ee405c7c337150b21375bfc0 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 17 Oct 2015 10:57:35 -0700 Subject: [PATCH 03/11] Clone all contexts before executing the minion methods --- salt/minion.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/salt/minion.py b/salt/minion.py index 22ed92992a01..79026d01bbab 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -15,6 +15,7 @@ import logging import threading import traceback +import contextlib import multiprocessing from random import randint, shuffle from salt.config import DEFAULT_MINION_OPTS @@ -988,10 +989,18 @@ def _handle_decoded_payload(self, data): else: self.win_proc.append(process) + def ctx(self): + '''Return a single context manager for the minion's data + ''' + return contextlib.nested( + self.functions.context_dict.clone(), + self.returners.context_dict.clone(), + self.executors.context_dict.clone(), + ) + @classmethod def _target(cls, minion_instance, opts, data): - # TODO: clone all contexts? Should be one per loader :/ - with tornado.stack_context.StackContext(minion_instance.functions.context_dict.clone): + with tornado.stack_context.StackContext(minion_instance.ctx): if isinstance(data['fun'], tuple) or isinstance(data['fun'], list): Minion._thread_multi_return(minion_instance, opts, data) else: From 0a0818e72922a621dea963fad5d331541cb5e961 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 17 Oct 2015 11:04:51 -0700 Subject: [PATCH 04/11] Normalize format for initializing LazyLoader While debugging in here I noticed that the style varied a bit-- so this normalizes it to one clean one :) --- salt/loader.py | 350 ++++++++++++++++++++++++++++--------------------- 1 file changed, 200 insertions(+), 150 deletions(-) diff --git a/salt/loader.py b/salt/loader.py index 6ca0ffc1e126..d20062c47efb 100644 --- a/salt/loader.py +++ b/salt/loader.py @@ -90,17 +90,20 @@ def static_loader( base_path=None, filter_name=None, ): - funcs = LazyLoader(_module_dirs(opts, - ext_type, - tag, - int_type, - ext_dirs, - ext_type_dirs, - base_path), - opts, - tag=tag, - pack=pack, - ) + funcs = LazyLoader( + _module_dirs( + opts, + ext_type, + tag, + int_type, + ext_dirs, + ext_type_dirs, + base_path, + ), + opts, + tag=tag, + pack=pack, + ) ret = {} funcs._load_all() if filter_name: @@ -193,14 +196,15 @@ def minion_mods( # TODO Publish documentation for module whitelisting if not whitelist: whitelist = opts.get('whitelist_modules', None) - ret = LazyLoader(_module_dirs(opts, 'modules', 'module'), - opts, - tag='module', - pack={'__context__': context, '__utils__': utils, - '__proxy__': proxy}, - whitelist=whitelist, - loaded_base_name=loaded_base_name, - static_modules=static_modules) + ret = LazyLoader( + _module_dirs(opts, 'modules', 'module'), + opts, + tag='module', + pack={'__context__': context, '__utils__': utils, '__proxy__': proxy}, + whitelist=whitelist, + loaded_base_name=loaded_base_name, + static_modules=static_modules, + ) ret.pack['__salt__'] = ret @@ -242,11 +246,13 @@ def raw_mod(opts, name, functions, mod='modules'): testmod = salt.loader.raw_mod(__opts__, 'test', None) testmod['test.ping']() ''' - loader = LazyLoader(_module_dirs(opts, mod, 'rawmodule'), - opts, - tag='rawmodule', - virtual_enable=False, - pack={'__salt__': functions}) + loader = LazyLoader( + _module_dirs(opts, mod, 'rawmodule'), + opts, + tag='rawmodule', + virtual_enable=False, + pack={'__salt__': functions}, + ) # if we don't have the module, return an empty dict if name not in loader.file_mapping: return {} @@ -261,21 +267,24 @@ def engines(opts, functions, runners): ''' pack = {'__salt__': functions, '__runners__': runners} - return LazyLoader(_module_dirs(opts, 'engines', 'engines'), - opts, - tag='engines', - pack=pack) + return LazyLoader( + _module_dirs(opts, 'engines', 'engines'), + opts, + tag='engines', + pack=pack, + ) def proxy(opts, functions=None, returners=None, whitelist=None): ''' Returns the proxy module for this salt-proxy-minion ''' - ret = LazyLoader(_module_dirs(opts, 'proxy', 'proxy'), - opts, - tag='proxy', - pack={'__salt__': functions, - '__ret__': returners}) + ret = LazyLoader( + _module_dirs(opts, 'proxy', 'proxy'), + opts, + tag='proxy', + pack={'__salt__': functions, '__ret__': returners}, + ) ret.pack['__proxy__'] = ret @@ -286,34 +295,38 @@ def returners(opts, functions, whitelist=None, context=None): ''' Returns the returner modules ''' - return LazyLoader(_module_dirs(opts, 'returners', 'returner'), - opts, - tag='returner', - whitelist=whitelist, - pack={'__salt__': functions, - '__context__': context}) + return LazyLoader( + _module_dirs(opts, 'returners', 'returner'), + opts, + tag='returner', + whitelist=whitelist, + pack={'__salt__': functions, '__context__': context}, + ) def utils(opts, whitelist=None, context=None): ''' Returns the utility modules ''' - return LazyLoader(_module_dirs(opts, 'utils', 'utils', ext_type_dirs='utils_dirs'), - opts, - tag='utils', - whitelist=whitelist, - pack={'__context__': context}) + return LazyLoader( + _module_dirs(opts, 'utils', 'utils', ext_type_dirs='utils_dirs'), + opts, + tag='utils', + whitelist=whitelist, + pack={'__context__': context}, + ) def pillars(opts, functions, context=None): ''' Returns the pillars modules ''' - ret = LazyLoader(_module_dirs(opts, 'pillar', 'pillar'), - opts, - tag='pillar', - pack={'__salt__': functions, - '__context__': context}) + ret = LazyLoader( + _module_dirs(opts, 'pillar', 'pillar'), + opts, + tag='pillar', + pack={'__salt__': functions, '__context__': context}, + ) return FilterDictWrapper(ret, '.ext_pillar') @@ -324,10 +337,12 @@ def tops(opts): if 'master_tops' not in opts: return {} whitelist = list(opts['master_tops'].keys()) - ret = LazyLoader(_module_dirs(opts, 'tops', 'top'), - opts, - tag='top', - whitelist=whitelist) + ret = LazyLoader( + _module_dirs(opts, 'tops', 'top'), + opts, + tag='top', + whitelist=whitelist, + ) return FilterDictWrapper(ret, '.top') @@ -335,10 +350,12 @@ def wheels(opts, whitelist=None): ''' Returns the wheels modules ''' - return LazyLoader(_module_dirs(opts, 'wheel', 'wheel'), - opts, - tag='wheel', - whitelist=whitelist) + return LazyLoader( + _module_dirs(opts, 'wheel', 'wheel'), + opts, + tag='wheel', + whitelist=whitelist, + ) def outputters(opts): @@ -348,9 +365,11 @@ def outputters(opts): :param dict opts: The Salt options dictionary :returns: LazyLoader instance, with only outputters present in the keyspace ''' - ret = LazyLoader(_module_dirs(opts, 'output', 'output', ext_type_dirs='outputter_dirs'), - opts, - tag='output') + ret = LazyLoader( + _module_dirs(opts, 'output', 'output', ext_type_dirs='outputter_dirs'), + opts, + tag='output', + ) wrapped_ret = FilterDictWrapper(ret, '.output') # TODO: this name seems terrible... __salt__ should always be execution mods ret.pack['__salt__'] = wrapped_ret @@ -363,9 +382,11 @@ def serializers(opts): :param dict opts: The Salt options dictionary :returns: LazyLoader instance, with only serializers present in the keyspace ''' - return LazyLoader(_module_dirs(opts, 'serializers', 'serializers'), - opts, - tag='serializers') + return LazyLoader( + _module_dirs(opts, 'serializers', 'serializers'), + opts, + tag='serializers', + ) def auth(opts, whitelist=None): @@ -375,31 +396,37 @@ def auth(opts, whitelist=None): :param dict opts: The Salt options dictionary :returns: LazyLoader ''' - return LazyLoader(_module_dirs(opts, 'auth', 'auth'), - opts, - tag='auth', - whitelist=whitelist, - pack={'__salt__': minion_mods(opts)}) + return LazyLoader( + _module_dirs(opts, 'auth', 'auth'), + opts, + tag='auth', + whitelist=whitelist, + pack={'__salt__': minion_mods(opts)}, + ) def fileserver(opts, backends): ''' Returns the file server modules ''' - return LazyLoader(_module_dirs(opts, 'fileserver', 'fileserver'), - opts, - tag='fileserver', - whitelist=backends) + return LazyLoader( + _module_dirs(opts, 'fileserver', 'fileserver'), + opts, + tag='fileserver', + whitelist=backends, + ) def roster(opts, whitelist=None): ''' Returns the roster modules ''' - return LazyLoader(_module_dirs(opts, 'roster', 'roster'), - opts, - tag='roster', - whitelist=whitelist) + return LazyLoader( + _module_dirs(opts, 'roster', 'roster'), + opts, + tag='roster', + whitelist=whitelist, + ) def states(opts, functions, utils, whitelist=None): @@ -418,11 +445,13 @@ def states(opts, functions, utils, whitelist=None): __opts__ = salt.config.minion_config('/etc/salt/minion') statemods = salt.loader.states(__opts__, None, None) ''' - ret = LazyLoader(_module_dirs(opts, 'states', 'states'), - opts, - tag='states', - pack={'__salt__': functions}, - whitelist=whitelist) + ret = LazyLoader( + _module_dirs(opts, 'states', 'states'), + opts, + tag='states', + pack={'__salt__': functions}, + whitelist=whitelist, + ) ret.pack['__states__'] = ret ret.pack['__utils__'] = utils return ret @@ -436,11 +465,12 @@ def beacons(opts, functions, context=None): :param dict functions: A dictionary of minion modules, with module names as keys and funcs as values. ''' - return LazyLoader(_module_dirs(opts, 'beacons', 'beacons'), - opts, - tag='beacons', - pack={'__context__': context, - '__salt__': functions}) + return LazyLoader( + _module_dirs(opts, 'beacons', 'beacons'), + opts, + tag='beacons', + pack={'__context__': context, '__salt__': functions}, + ) def search(opts, returners, whitelist=None): @@ -453,11 +483,13 @@ def search(opts, returners, whitelist=None): ''' # TODO Document returners arg # TODO Document whitelist arg - return LazyLoader(_module_dirs(opts, 'search', 'search'), - opts, - tag='search', - whitelist=whitelist, - pack={'__ret__': returners}) + return LazyLoader( + _module_dirs(opts, 'search', 'search'), + opts, + tag='search', + whitelist=whitelist, + pack={'__ret__': returners}, + ) def log_handlers(opts): @@ -466,14 +498,17 @@ def log_handlers(opts): :param dict opts: The Salt options dictionary ''' - ret = LazyLoader(_module_dirs(opts, - 'log_handlers', - 'log_handlers', - int_type='handlers', - base_path=os.path.join(SALT_BASE_PATH, 'log')), - opts, - tag='log_handlers', - ) + ret = LazyLoader( + _module_dirs( + opts, + 'log_handlers', + 'log_handlers', + int_type='handlers', + base_path=os.path.join(SALT_BASE_PATH, 'log'), + ), + opts, + tag='log_handlers', + ) return FilterDictWrapper(ret, '.setup_handlers') @@ -481,14 +516,17 @@ def ssh_wrapper(opts, functions=None, context=None): ''' Returns the custom logging handler modules ''' - return LazyLoader(_module_dirs(opts, - 'wrapper', - 'wrapper', - base_path=os.path.join(SALT_BASE_PATH, os.path.join('client', 'ssh'))), - opts, - tag='wrapper', - pack={'__salt__': functions, '__context__': context}, - ) + return LazyLoader( + _module_dirs( + opts, + 'wrapper', + 'wrapper', + base_path=os.path.join(SALT_BASE_PATH, os.path.join('client', 'ssh')), + ), + opts, + tag='wrapper', + pack={'__salt__': functions, '__context__': context}, + ) def render(opts, functions, states=None): @@ -498,14 +536,17 @@ def render(opts, functions, states=None): pack = {'__salt__': functions} if states: pack['__states__'] = states - ret = LazyLoader(_module_dirs(opts, - 'renderers', - 'render', - ext_type_dirs='render_dirs'), - opts, - tag='render', - pack=pack, - ) + ret = LazyLoader( + _module_dirs( + opts, + 'renderers', + 'render', + ext_type_dirs='render_dirs', + ), + opts, + tag='render', + pack=pack, + ) rend = FilterDictWrapper(ret, '.render') if not check_render_pipe_str(opts['renderer'], rend): @@ -528,13 +569,16 @@ def grain_funcs(opts): __opts__ = salt.config.minion_config('/etc/salt/minion') grainfuncs = salt.loader.grain_funcs(__opts__) ''' - return LazyLoader(_module_dirs(opts, - 'grains', - 'grain', - ext_type_dirs='grains_dirs'), - opts, - tag='grains', - ) + return LazyLoader( + _module_dirs( + opts, + 'grains', + 'grain', + ext_type_dirs='grains_dirs', + ), + opts, + tag='grains', + ) def grains(opts, force_refresh=False): @@ -672,11 +716,12 @@ def call(fun, **kwargs): args = kwargs.get('args', []) dirs = kwargs.get('dirs', []) - funcs = LazyLoader([os.path.join(SALT_BASE_PATH, 'modules')] + dirs, - None, - tag='modules', - virtual_enable=False, - ) + funcs = LazyLoader( + [os.path.join(SALT_BASE_PATH, 'modules')] + dirs, + None, + tag='modules', + virtual_enable=False, + ) return funcs[fun](*args) @@ -684,10 +729,11 @@ def runner(opts): ''' Directly call a function inside a loader directory ''' - ret = LazyLoader(_module_dirs(opts, 'runners', 'runner', ext_type_dirs='runner_dirs'), - opts, - tag='runners', - ) + ret = LazyLoader( + _module_dirs(opts, 'runners', 'runner', ext_type_dirs='runner_dirs'), + opts, + tag='runners', + ) # TODO: change from __salt__ to something else, we overload __salt__ too much ret.pack['__salt__'] = ret return ret @@ -697,22 +743,24 @@ def queues(opts): ''' Directly call a function inside a loader directory ''' - return LazyLoader(_module_dirs(opts, 'queues', 'queue', ext_type_dirs='queue_dirs'), - opts, - tag='queues', - ) + return LazyLoader( + _module_dirs(opts, 'queues', 'queue', ext_type_dirs='queue_dirs'), + opts, + tag='queues', + ) def sdb(opts, functions=None, whitelist=None): ''' Make a very small database call ''' - return LazyLoader(_module_dirs(opts, 'sdb', 'sdb'), - opts, - tag='sdb', - pack={'__sdb__': functions}, - whitelist=whitelist, - ) + return LazyLoader( + _module_dirs(opts, 'sdb', 'sdb'), + opts, + tag='sdb', + pack={'__sdb__': functions}, + whitelist=whitelist, + ) def pkgdb(opts): @@ -783,21 +831,23 @@ def netapi(opts): ''' Return the network api functions ''' - return LazyLoader(_module_dirs(opts, 'netapi', 'netapi'), - opts, - tag='netapi', - ) + return LazyLoader( + _module_dirs(opts, 'netapi', 'netapi'), + opts, + tag='netapi', + ) def executors(opts, functions=None, context=None): ''' Returns the executor modules ''' - return LazyLoader(_module_dirs(opts, 'executors', 'executor'), - opts, - tag='executor', - pack={'__salt__': functions, '__context__': context or {}}, - ) + return LazyLoader( + _module_dirs(opts, 'executors', 'executor'), + opts, + tag='executor', + pack={'__salt__': functions, '__context__': context or {}}, + ) def _generate_module(name): From 117a7135023a04cd4c1a0e6f1c5df68647137b35 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 17 Oct 2015 11:09:10 -0700 Subject: [PATCH 05/11] style cleanup --- salt/client/mixins.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/salt/client/mixins.py b/salt/client/mixins.py index f3c220d4cce5..f076b60660b4 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -84,13 +84,13 @@ def wrapper(*args, **kwargs): user = salt.utils.get_specific_user() return self.client._proc_function( - key, - low, - user, - async_pub['tag'], # TODO: fix - async_pub['jid'], # TODO: fix - False, # Don't daemonize - ) + key, + low, + user, + async_pub['tag'], # TODO: fix + async_pub['jid'], # TODO: fix + False, # Don't daemonize + ) return wrapper def __len__(self): From 391e31a52570a01a1e2f8c5c0e8f16b718887f3a Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Sat, 17 Oct 2015 11:21:57 -0700 Subject: [PATCH 06/11] Initialize a unique context per wheel/runner execution --- salt/client/mixins.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/salt/client/mixins.py b/salt/client/mixins.py index f076b60660b4..4e6483b06c5c 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -10,6 +10,7 @@ import traceback import collections import multiprocessing +import tornado.stack_context # Import Salt libs import salt.exceptions @@ -333,8 +334,10 @@ def low(self, fun, low): else: kwargs = low['kwargs'] - data['return'] = self.functions[fun](*args, **kwargs) - data['success'] = True + # Initialize a context for executing the method. + with tornado.stack_context.StackContext(self.functions.context_dict.clone): + data['return'] = self.functions[fun](*args, **kwargs) + data['success'] = True except (Exception, SystemExit) as ex: if isinstance(ex, salt.exceptions.NotImplemented): data['return'] = str(ex) From 408af9d41977e5cabb56dba8a9754efecee2e391 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 23 Oct 2015 08:31:37 -0700 Subject: [PATCH 07/11] Add Coroutine tests for the ContextDict --- tests/unit/context_test.py | 58 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/tests/unit/context_test.py b/tests/unit/context_test.py index aa216ff3e4d7..022e334cb63c 100644 --- a/tests/unit/context_test.py +++ b/tests/unit/context_test.py @@ -6,6 +6,8 @@ # Import python libs from __future__ import absolute_import import tornado.stack_context +import tornado.gen +from tornado.testing import AsyncTestCase, gen_test import threading import time @@ -18,13 +20,16 @@ from salt.utils.context import ContextDict, NamespacedDictWrapper -class ContextDictTests(TestCase): +class ContextDictTests(AsyncTestCase): def setUp(self): + super(ContextDictTests, self).setUp() self.cd = ContextDict() # set a global value self.cd['foo'] = 'global' def test_threads(self): + '''Verify that ContextDict overrides properly within threads + ''' rets = [] def tgt(x, s): @@ -55,6 +60,53 @@ def tgt(x, s): self.assertEqual(r[0], r[1]) self.assertEqual(r[2], r[3]) + @gen_test + def test_coroutines(self): + '''Verify that ContextDict overrides properly within coroutines + ''' + @tornado.gen.coroutine + def secondary_coroutine(over): + raise tornado.gen.Return(over.get('foo')) + + @tornado.gen.coroutine + def tgt(x, s, over): + inner_ret = [] + # first grab the global + inner_ret.append(self.cd.get('foo')) + # grab the child's global (should match) + inner_ret.append(over.get('foo')) + # override the global + over['foo'] = x + inner_ret.append(over.get('foo')) + # sleep for some time to let other coroutines do this section of code + yield tornado.gen.sleep(s) + # get the value of the global again. + inner_ret.append(over.get('foo')) + # Call another coroutine to verify that we keep our context + r = yield secondary_coroutine(over) + inner_ret.append(r) + raise tornado.gen.Return(inner_ret) + + futures = [] + NUM_JOBS = 5 + for x in xrange(0, NUM_JOBS): + s = NUM_JOBS - x + over = self.cd.clone() + def run(): + return tgt(x, s/5.0, over) + f = tornado.stack_context.run_with_stack_context( + tornado.stack_context.StackContext(lambda: over), + run, + ) + futures.append(f) + + wait_iterator = tornado.gen.WaitIterator(*futures) + while not wait_iterator.done(): + r = yield wait_iterator.next() + self.assertEqual(r[0], r[1]) # verify that the global value remails + self.assertEqual(r[2], r[3]) # verify that the override sticks locally + self.assertEqual(r[3], r[4]) # verify that the override sticks across coroutines + def test_basic(self): '''Test that the contextDict is a dict ''' @@ -75,7 +127,7 @@ def test_override(self): dict(self.cd), {'foo': 'global'}, ) - with tornado.stack_context.StackContext(lambda: over): + with over: self.assertEqual( dict(over), {'foo': 'global', 'bar': 'global'}, @@ -108,7 +160,7 @@ def test_multiple_contexts(self): cds.append(self.cd.clone(bar=x)) for x, cd in enumerate(cds): self.assertNotIn('bar', self.cd) - with tornado.stack_context.StackContext(lambda: cd): + with cd: self.assertEqual( dict(self.cd), {'bar': x, 'foo': 'global'}, From 36624654715d24908b77a0c6817e5f2a405cdc88 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 23 Oct 2015 08:40:56 -0700 Subject: [PATCH 08/11] pylint cleanup --- salt/utils/context.py | 2 +- tests/unit/context_test.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/salt/utils/context.py b/salt/utils/context.py index 89d9736059cf..3ed442dd580e 100644 --- a/salt/utils/context.py +++ b/salt/utils/context.py @@ -163,7 +163,7 @@ class NamespacedDictWrapper(collections.MutableMapping, dict): MUST inherit from dict to serialize through msgpack correctly ''' - def __init__(self, d, pre_keys): + def __init__(self, d, pre_keys): # pylint: disable=W0231 self.__dict = d if isinstance(pre_keys, salt.ext.six.string_types): self.pre_keys = (pre_keys,) diff --git a/tests/unit/context_test.py b/tests/unit/context_test.py index 022e334cb63c..56c6a8ad2bc7 100644 --- a/tests/unit/context_test.py +++ b/tests/unit/context_test.py @@ -13,6 +13,7 @@ # Import Salt Testing libs from salttesting import TestCase +from salt.ext.six.moves import range from salttesting.helpers import ensure_in_syspath ensure_in_syspath('../../') @@ -47,7 +48,7 @@ def tgt(x, s): threads = [] NUM_JOBS = 5 - for x in xrange(0, NUM_JOBS): + for x in range(0, NUM_JOBS): s = NUM_JOBS - x t = threading.Thread(target=tgt, args=(x, s)) t.start() @@ -89,7 +90,7 @@ def tgt(x, s, over): futures = [] NUM_JOBS = 5 - for x in xrange(0, NUM_JOBS): + for x in range(0, NUM_JOBS): s = NUM_JOBS - x over = self.cd.clone() def run(): @@ -156,7 +157,7 @@ def test_override(self): def test_multiple_contexts(self): cds = [] - for x in xrange(0, 10): + for x in range(0, 10): cds.append(self.cd.clone(bar=x)) for x, cd in enumerate(cds): self.assertNotIn('bar', self.cd) From 256a0e2af881ab4dc23346fd1d3ec951085108ee Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 23 Oct 2015 08:43:46 -0700 Subject: [PATCH 09/11] Rename and consolidate NUM_JOBs into a single class-wide variable --- tests/unit/context_test.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/unit/context_test.py b/tests/unit/context_test.py index 56c6a8ad2bc7..ac4193082dcd 100644 --- a/tests/unit/context_test.py +++ b/tests/unit/context_test.py @@ -22,6 +22,9 @@ class ContextDictTests(AsyncTestCase): + # how many threads/coroutines to run at a time + num_concurrent_tasks = 5 + def setUp(self): super(ContextDictTests, self).setUp() self.cd = ContextDict() @@ -47,9 +50,8 @@ def tgt(x, s): rets.append(inner_ret) threads = [] - NUM_JOBS = 5 - for x in range(0, NUM_JOBS): - s = NUM_JOBS - x + for x in range(0, self.num_concurrent_tasks): + s = self.num_concurrent_tasks - x t = threading.Thread(target=tgt, args=(x, s)) t.start() threads.append(t) @@ -89,9 +91,9 @@ def tgt(x, s, over): raise tornado.gen.Return(inner_ret) futures = [] - NUM_JOBS = 5 - for x in range(0, NUM_JOBS): - s = NUM_JOBS - x + + for x in range(0, self.num_concurrent_tasks): + s = self.num_concurrent_tasks - x over = self.cd.clone() def run(): return tgt(x, s/5.0, over) From 718cf6481777ead626bf8706499ceadd7823b50d Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 23 Oct 2015 08:53:44 -0700 Subject: [PATCH 10/11] Pylint cleanup --- tests/unit/context_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/unit/context_test.py b/tests/unit/context_test.py index ac4193082dcd..7abedceac7ff 100644 --- a/tests/unit/context_test.py +++ b/tests/unit/context_test.py @@ -95,11 +95,10 @@ def tgt(x, s, over): for x in range(0, self.num_concurrent_tasks): s = self.num_concurrent_tasks - x over = self.cd.clone() - def run(): - return tgt(x, s/5.0, over) + f = tornado.stack_context.run_with_stack_context( - tornado.stack_context.StackContext(lambda: over), - run, + tornado.stack_context.StackContext(lambda: over), # pylint: disable=W0640 + lambda: tgt(x, s/5.0, over), # pylint: disable=W0640 ) futures.append(f) From ff1ac70cd562f91104dd7270b0dc0324310bc1a2 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Fri, 23 Oct 2015 11:30:21 -0700 Subject: [PATCH 11/11] Python3 compat for context_test --- tests/unit/context_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/context_test.py b/tests/unit/context_test.py index 7abedceac7ff..87afa9090dc8 100644 --- a/tests/unit/context_test.py +++ b/tests/unit/context_test.py @@ -104,7 +104,7 @@ def tgt(x, s, over): wait_iterator = tornado.gen.WaitIterator(*futures) while not wait_iterator.done(): - r = yield wait_iterator.next() + r = yield next(wait_iterator) self.assertEqual(r[0], r[1]) # verify that the global value remails self.assertEqual(r[2], r[3]) # verify that the override sticks locally self.assertEqual(r[3], r[4]) # verify that the override sticks across coroutines