From d6fc692cb05aaca2870c64bad55eb8ad022a22c8 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 16 Nov 2018 17:05:29 +0100 Subject: [PATCH 01/31] Async batch --- salt/cli/batch.py | 91 ++++++++++++++++++++----- salt/cli/batch_async.py | 142 ++++++++++++++++++++++++++++++++++++++++ salt/client/__init__.py | 41 ++---------- salt/master.py | 81 ++++++++++++++++++----- 4 files changed, 285 insertions(+), 70 deletions(-) create mode 100644 salt/cli/batch_async.py diff --git a/salt/cli/batch.py b/salt/cli/batch.py index 723944b60a47..564dbdb6ee78 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -26,6 +26,79 @@ log = logging.getLogger(__name__) +def _get_bnum(opts, minions, quiet): + ''' + Return the active number of minions to maintain + ''' + partition = lambda x: float(x) / 100.0 * len(minions) + try: + if '%' in opts['batch']: + res = partition(float(opts['batch'].strip('%'))) + if res < 1: + return int(math.ceil(res)) + else: + return int(res) + else: + return int(opts['batch']) + except ValueError: + if not quiet: + salt.utils.stringutils.print_cli('Invalid batch data sent: {0}\nData must be in the ' + 'form of %10, 10% or 3'.format(opts['batch'])) + + +def _batch_get_opts( + tgt, + fun, + batch, + parent_opts, + arg=(), + tgt_type='glob', + ret='', + kwarg=None, + **kwargs): + # We need to re-import salt.utils.args here + # even though it has already been imported. + # when cmd_batch is called via the NetAPI + # the module is unavailable. + import salt.utils.args + + arg = salt.utils.args.condition_input(arg, kwarg) + opts = {'tgt': tgt, + 'fun': fun, + 'arg': arg, + 'tgt_type': tgt_type, + 'ret': ret, + 'batch': batch, + 'failhard': kwargs.get('failhard', False), + 'raw': kwargs.get('raw', False)} + + if 'timeout' in kwargs: + opts['timeout'] = kwargs['timeout'] + if 'gather_job_timeout' in kwargs: + opts['gather_job_timeout'] = kwargs['gather_job_timeout'] + if 'batch_wait' in kwargs: + opts['batch_wait'] = int(kwargs['batch_wait']) + + for key, val in six.iteritems(parent_opts): + if key not in opts: + opts[key] = val + + return opts + + +def _batch_get_eauth(kwargs): + eauth = {} + if 'eauth' in kwargs: + eauth['eauth'] = kwargs.pop('eauth') + if 'username' in kwargs: + eauth['username'] = kwargs.pop('username') + if 'password' in kwargs: + eauth['password'] = kwargs.pop('password') + if 'token' in kwargs: + eauth['token'] = kwargs.pop('token') + return eauth + + class Batch(object): ''' Manage the execution of batch runs @@ -80,23 +153,7 @@ def __gather_minions(self): return (list(fret), ping_gen, nret.difference(fret)) def get_bnum(self): - ''' - Return the active number of minions to maintain - ''' - partition = lambda x: float(x) / 100.0 * len(self.minions) - try: - if isinstance(self.opts['batch'], six.string_types) and '%' in self.opts['batch']: - res = partition(float(self.opts['batch'].strip('%'))) - if res < 1: - return int(math.ceil(res)) - else: - return int(res) - else: - return int(self.opts['batch']) - except ValueError: - if not self.quiet: - salt.utils.stringutils.print_cli('Invalid batch data sent: {0}\nData must be in the ' - 'form of %10, 10% or 3'.format(self.opts['batch'])) + return _get_bnum(self.opts, self.minions, self.quiet) def __update_wait(self, wait): now = datetime.now() diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py new file mode 100644 index 000000000000..d950151877c0 --- /dev/null +++ b/salt/cli/batch_async.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +''' +Execute batch runs +''' + +# Import python libs +from __future__ import absolute_import, print_function, unicode_literals +import math +import time +import copy +import tornado +from datetime import datetime, timedelta + +# Import salt libs +import salt.utils.stringutils +import salt.utils.event +import salt.client +import salt.output +import salt.exceptions + +# Import 3rd-party libs +# pylint: disable=import-error,no-name-in-module,redefined-builtin +from salt.ext import six +from salt.ext.six.moves import range +# pylint: enable=import-error,no-name-in-module,redefined-builtin +import logging +import fnmatch + +log = logging.getLogger(__name__) + +from salt.cli.batch import _get_bnum, _batch_get_opts, _batch_get_eauth + + +class BatchAsync(object): + ''' + Manage the execution of batch runs + ''' + def __init__(self, parent_opts, jid_gen, clear_load): + ioloop = tornado.ioloop.IOLoop.current() + self.local = salt.client.get_local_client(parent_opts['conf_file']) + clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout') + self.opts = _batch_get_opts( + clear_load.pop('tgt'), + clear_load.pop('fun'), + clear_load['kwargs'].pop('batch'), + self.local.opts, + **clear_load) + self.eauth = _batch_get_eauth(clear_load['kwargs']) + self.minions = set() + self.down_minions = set() + self.done = set() + self.active = [] + self.initialized = False + self.ping_jid = jid_gen() + self.batch_jid = jid_gen() + self.event = salt.utils.event.get_event( + 'master', + self.opts['sock_dir'], + self.opts['transport'], + opts=self.opts, + listen=True, + io_loop=ioloop, + keep_loop=True) + self.__set_event_handler() + + def __set_event_handler(self): + ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid) + batch_return_pattern = 'salt/job/{0}/ret/*'.format(self.batch_jid) + self.event.subscribe(ping_return_pattern, match_type='glob') + self.event.subscribe(batch_return_pattern, match_type='glob') + self.event.patterns = { + (ping_return_pattern, 'ping_return'), + (batch_return_pattern, 'batch_run') + } + if not self.event.subscriber.connected(): + self.event.set_event_handler(self.__event_handler) + + def __event_handler(self, raw): + mtag, data = self.event.unpack(raw, self.event.serial) + for (pattern, op) in self.event.patterns: + if fnmatch.fnmatch(mtag, pattern): + minion = data['id'] + if op == 'ping_return': + self.minions.add(minion) + self.down_minions.remove(minion) + self.batch_size = _get_bnum(self.opts, self.minions, True) + self.to_run = self.minions.difference(self.done).difference(self.active) + if not self.initialized: + #start batching even if not all minions respond to ping + self.event.io_loop.call_later( + self.opts['gather_job_timeout'], self.next) + self.initialized = True + elif op == 'batch_run': + if minion in self.active: + self.active.remove(minion) + self.done.add(minion) + if len(self.done) >= len(self.minions): + self.event.close_pub() + else: + # call later so that we maybe gather more returns + self.event.io_loop.call_later(1, self.next) + + def _get_next(self): + next_batch_size = min( + len(self.to_run), # partial batch (all left) + self.batch_size - len(self.active) # full batch + ) + next_batch = [] + for i in range(next_batch_size): + next_batch.append(self.to_run.pop()) + return next_batch + + @tornado.gen.coroutine + def start(self): + ping_return = yield self.local.run_job_async( + self.opts['tgt'], + 'test.ping', + [], + self.opts.get( + 'selected_target_option', + self.opts.get('tgt_type', 'glob') + ), + gather_job_timeout=self.opts['gather_job_timeout'], + jid=self.ping_jid, + **self.eauth) + self.down_minions = ping_return['minions'] + + @tornado.gen.coroutine + def next(self): + next_batch = self._get_next() + if next_batch: + yield self.local.run_job_async( + next_batch, + self.opts['fun'], + self.opts['arg'], + 'list', + raw=self.opts.get('raw', False), + ret=self.opts.get('return', ''), + gather_job_timeout=self.opts['gather_job_timeout'], + jid=self.batch_jid, + **self.eauth) + self.active += next_batch diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 70f5180a2c12..6e53b7c4e7e5 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -529,45 +529,14 @@ def cmd_batch( {'dave': {...}} {'stewart': {...}} ''' - # We need to re-import salt.utils.args here - # even though it has already been imported. - # when cmd_batch is called via the NetAPI - # the module is unavailable. - import salt.utils.args - # Late import - not used anywhere else in this file import salt.cli.batch + opts = salt.cli.batch._batch_get_opts( + tgt, fun, batch, self.opts, + arg=arg, tgt_type=tgt_type, ret=ret, kwarg=kwarg, **kwargs) + + eauth = salt.cli.batch._batch_get_eauth(kwargs) - arg = salt.utils.args.condition_input(arg, kwarg) - opts = {'tgt': tgt, - 'fun': fun, - 'arg': arg, - 'tgt_type': tgt_type, - 'ret': ret, - 'batch': batch, - 'failhard': kwargs.get('failhard', self.opts.get('failhard', False)), - 'raw': kwargs.get('raw', False)} - - if 'timeout' in kwargs: - opts['timeout'] = kwargs['timeout'] - if 'gather_job_timeout' in kwargs: - opts['gather_job_timeout'] = kwargs['gather_job_timeout'] - if 'batch_wait' in kwargs: - opts['batch_wait'] = int(kwargs['batch_wait']) - - eauth = {} - if 'eauth' in kwargs: - eauth['eauth'] = kwargs.pop('eauth') - if 'username' in kwargs: - eauth['username'] = kwargs.pop('username') - if 'password' in kwargs: - eauth['password'] = kwargs.pop('password') - if 'token' in kwargs: - eauth['token'] = kwargs.pop('token') - - for key, val in six.iteritems(self.opts): - if key not in opts: - opts[key] = val batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True) for ret in batch.run(): yield ret diff --git a/salt/master.py b/salt/master.py index 7c91fb4fa4f9..adb21f063648 100644 --- a/salt/master.py +++ b/salt/master.py @@ -96,6 +96,12 @@ log = logging.getLogger(__name__) +fh = logging.FileHandler('/tmp/log.log') +formatter = logging.Formatter('%(asctime)s - %(module)s.%(funcName)s - %(message)s') +fh.setFormatter(formatter) +mylogger = logging.getLogger('loglog') +mylogger.addHandler(fh) + class SMaster(object): ''' @@ -1604,6 +1610,7 @@ def _return(self, load): :param dict load: The minion payload ''' + mylogger.debug("{jid} {id} {fun}".format(**load)) if self.opts['require_minion_sign_messages'] and 'sig' not in load: log.critical( '_return: Master is requiring minions to sign their ' @@ -2036,12 +2043,48 @@ def get_token(self, clear_load): return False return self.loadauth.get_tok(clear_load['token']) + @tornado.gen.coroutine + def _timestamp(self, ioloop, lasttime, times, name): + if times >= 50: + raise tornado.gen.Return() + import random + # yield tornado.gen.sleep(random.randint(1, 50) / 100.0) + yield tornado.gen.sleep(0.1) + mylogger.info("%s %s", name, time.time() - lasttime) + ioloop.add_callback(self._timestamp, ioloop, time.time(), times + 1, name) + + def publish_batch(self, batch, clear_load): + batch_load = {} + batch_load.update(clear_load) + batch_load['kwargs']['batch'] = batch + import salt.cli.batch_async + batch = salt.cli.batch_async.BatchAsync( + self.local.opts, + functools.partial(self._prep_jid, clear_load, {}), + batch_load + ) + ioloop = tornado.ioloop.IOLoop.current() + ioloop.add_callback(batch.start) + # ioloop.add_callback(self._timestamp, ioloop, time.time(), 0, ' 1 ') + # ioloop.add_callback(self._timestamp, ioloop, time.time(), 0, ' 2 ') + # ioloop.add_callback(self._timestamp, ioloop, time.time(), 0, ' 3 ') + + return { + 'enc': 'clear', + 'load': { + 'jid': batch.batch_jid, + 'minions': [], + 'missing': [] + } + } + def publish(self, clear_load): ''' This method sends out publications to the minions, it can only be used by the LocalClient. ''' extra = clear_load.get('kwargs', {}) + batch = extra.pop('batch', None) publisher_acl = salt.acl.PublisherACL(self.opts['publisher_acl_blacklist']) @@ -2127,24 +2170,28 @@ def publish(self, clear_load): 'error': 'Master could not resolve minions for target {0}'.format(clear_load['tgt']) } } - jid = self._prep_jid(clear_load, extra) - if jid is None: - return {'enc': 'clear', - 'load': {'error': 'Master failed to assign jid'}} - payload = self._prep_pub(minions, jid, clear_load, extra, missing) - - # Send it! - self._send_ssh_pub(payload, ssh_minions=ssh_minions) - self._send_pub(payload) - - return { - 'enc': 'clear', - 'load': { - 'jid': clear_load['jid'], - 'minions': minions, - 'missing': missing + if batch: + return self.publish_batch(batch, clear_load) + else: + jid = self._prep_jid(clear_load, extra) + if jid is None: + return {'enc': 'clear', + 'load': {'error': 'Master failed to assign jid'}} + payload = self._prep_pub(minions, jid, clear_load, extra, missing) + mylogger.debug("{jid} {fun} {tgt} {arg}".format(**payload)) + + # Send it! + self._send_ssh_pub(payload, ssh_minions=ssh_minions) + self._send_pub(payload) + + return { + 'enc': 'clear', + 'load': { + 'jid': clear_load['jid'], + 'minions': minions, + 'missing': missing + } } - } def _prep_auth_info(self, clear_load): sensitive_load_keys = [] From 3a22c4a9b15d658302ffe9c6e1c0baf64592529e Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Thu, 29 Nov 2018 16:55:00 +0100 Subject: [PATCH 02/31] Fix `if` statement placement --- salt/cli/batch_async.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index d950151877c0..2f1b708eb15c 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -49,6 +49,7 @@ def __init__(self, parent_opts, jid_gen, clear_load): self.minions = set() self.down_minions = set() self.done = set() + self.to_run = set() self.active = [] self.initialized = False self.ping_jid = jid_gen() @@ -85,25 +86,28 @@ def __event_handler(self, raw): self.down_minions.remove(minion) self.batch_size = _get_bnum(self.opts, self.minions, True) self.to_run = self.minions.difference(self.done).difference(self.active) - if not self.initialized: - #start batching even if not all minions respond to ping - self.event.io_loop.call_later( - self.opts['gather_job_timeout'], self.next) - self.initialized = True elif op == 'batch_run': if minion in self.active: self.active.remove(minion) self.done.add(minion) if len(self.done) >= len(self.minions): + # TODO + # if not all available minions finish the batch + # the event handler connection is not closed self.event.close_pub() else: # call later so that we maybe gather more returns self.event.io_loop.call_later(1, self.next) + if not self.initialized: + #start batching even if not all minions respond to ping + self.event.io_loop.call_later( + self.opts['gather_job_timeout'], self.next) + self.initialized = True def _get_next(self): next_batch_size = min( len(self.to_run), # partial batch (all left) - self.batch_size - len(self.active) # full batch + self.batch_size - len(self.active) # full batch or available slots ) next_batch = [] for i in range(next_batch_size): From e2e77ee7d0831eb7882dbdd2ec48df86fc0a38f6 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 30 Nov 2018 10:11:31 +0100 Subject: [PATCH 03/31] Remove unused imports --- salt/cli/batch_async.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 2f1b708eb15c..2ae6dcd4252b 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -5,23 +5,11 @@ # Import python libs from __future__ import absolute_import, print_function, unicode_literals -import math -import time -import copy import tornado -from datetime import datetime, timedelta # Import salt libs -import salt.utils.stringutils -import salt.utils.event import salt.client -import salt.output -import salt.exceptions -# Import 3rd-party libs -# pylint: disable=import-error,no-name-in-module,redefined-builtin -from salt.ext import six -from salt.ext.six.moves import range # pylint: enable=import-error,no-name-in-module,redefined-builtin import logging import fnmatch From 3167b5aa92e291874350e1c0268e91978312d99a Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Mon, 3 Dec 2018 17:16:28 +0100 Subject: [PATCH 04/31] Add find_job checks --- salt/cli/batch_async.py | 64 ++++++++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 10 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 2ae6dcd4252b..7d33aaacdd77 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -36,12 +36,15 @@ def __init__(self, parent_opts, jid_gen, clear_load): self.eauth = _batch_get_eauth(clear_load['kwargs']) self.minions = set() self.down_minions = set() - self.done = set() + self.timedout_minions = set() + self.done_minions = set() self.to_run = set() - self.active = [] + self.active = set() self.initialized = False self.ping_jid = jid_gen() self.batch_jid = jid_gen() + self.find_job_jid = jid_gen() + self.find_job_returned = set() self.event = salt.utils.event.get_event( 'master', self.opts['sock_dir'], @@ -55,11 +58,13 @@ def __init__(self, parent_opts, jid_gen, clear_load): def __set_event_handler(self): ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid) batch_return_pattern = 'salt/job/{0}/ret/*'.format(self.batch_jid) + find_job_return_pattern = 'salt/job/{0}/ret/*'.format(self.find_job_jid) self.event.subscribe(ping_return_pattern, match_type='glob') self.event.subscribe(batch_return_pattern, match_type='glob') self.event.patterns = { (ping_return_pattern, 'ping_return'), - (batch_return_pattern, 'batch_run') + (batch_return_pattern, 'batch_run'), + (find_job_return_pattern, 'find_job_return') } if not self.event.subscriber.connected(): self.event.set_event_handler(self.__event_handler) @@ -73,15 +78,19 @@ def __event_handler(self, raw): self.minions.add(minion) self.down_minions.remove(minion) self.batch_size = _get_bnum(self.opts, self.minions, True) - self.to_run = self.minions.difference(self.done).difference(self.active) + self.to_run = self.minions.difference(self.done_minions).difference(self.active) + elif op == 'find_job_return': + self.find_job_returned.add(minion) elif op == 'batch_run': if minion in self.active: self.active.remove(minion) - self.done.add(minion) - if len(self.done) >= len(self.minions): + self.done_minions.add(minion) + from salt.master import mylogger + if self.done_minions == self.minions.difference(self.timedout_minions): # TODO # if not all available minions finish the batch # the event handler connection is not closed + mylogger.info('Closing: %s %s %s', self.done_minions, self.timedout_minions, self.down_minions) self.event.close_pub() else: # call later so that we maybe gather more returns @@ -97,11 +106,44 @@ def _get_next(self): len(self.to_run), # partial batch (all left) self.batch_size - len(self.active) # full batch or available slots ) - next_batch = [] + next_batch = set() for i in range(next_batch_size): - next_batch.append(self.to_run.pop()) + next_batch.add(self.to_run.pop()) return next_batch + @tornado.gen.coroutine + def check_find_job(self, minions): + from salt.master import mylogger + did_not_return = minions.difference(self.find_job_returned) + if did_not_return: + mylogger.info('Not returned: %s', did_not_return) + for minion in did_not_return: + if minion in self.find_job_returned: + self.find_job_returned.remove(minion) + if minion in self.active: + self.active.remove(minion) + self.timedout_minions.add(minion) + running = minions.difference(did_not_return).difference(self.done_minions).difference(self.timedout_minions) + if running: + self.event.io_loop.add_callback(self.find_job, running) + + @tornado.gen.coroutine + def find_job(self, minions): + from salt.master import mylogger + not_done = minions.difference(self.done_minions) + ping_return = yield self.local.run_job_async( + not_done, + 'saltutil.find_job', + [self.batch_jid], + 'list', + gather_job_timeout=self.opts['gather_job_timeout'], + jid=self.find_job_jid, + **self.eauth) + self.event.io_loop.call_later( + self.opts['gather_job_timeout'], + self.check_find_job, + not_done) + @tornado.gen.coroutine def start(self): ping_return = yield self.local.run_job_async( @@ -115,7 +157,7 @@ def start(self): gather_job_timeout=self.opts['gather_job_timeout'], jid=self.ping_jid, **self.eauth) - self.down_minions = ping_return['minions'] + self.down_minions = set(ping_return['minions']) @tornado.gen.coroutine def next(self): @@ -131,4 +173,6 @@ def next(self): gather_job_timeout=self.opts['gather_job_timeout'], jid=self.batch_jid, **self.eauth) - self.active += next_batch + # TODO add parameter for find_job - should use gather_job_timeout? + self.event.io_loop.call_later(10, self.find_job, set(next_batch)) + self.active = self.active.union(next_batch) From ee16fbed291d6e909f53838fdf5ec9f53d214873 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Mon, 3 Dec 2018 17:45:43 +0100 Subject: [PATCH 05/31] Check if should close on all events --- salt/cli/batch_async.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 7d33aaacdd77..a0b8cc8f0ccf 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -67,6 +67,7 @@ def __set_event_handler(self): (find_job_return_pattern, 'find_job_return') } if not self.event.subscriber.connected(): + # TODO is there a way to subscribe to only some tags? self.event.set_event_handler(self.__event_handler) def __event_handler(self, raw): @@ -85,14 +86,6 @@ def __event_handler(self, raw): if minion in self.active: self.active.remove(minion) self.done_minions.add(minion) - from salt.master import mylogger - if self.done_minions == self.minions.difference(self.timedout_minions): - # TODO - # if not all available minions finish the batch - # the event handler connection is not closed - mylogger.info('Closing: %s %s %s', self.done_minions, self.timedout_minions, self.down_minions) - self.event.close_pub() - else: # call later so that we maybe gather more returns self.event.io_loop.call_later(1, self.next) if not self.initialized: @@ -101,6 +94,14 @@ def __event_handler(self, raw): self.opts['gather_job_timeout'], self.next) self.initialized = True + from salt.master import mylogger + if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions): + # TODO + # if not all available minions finish the batch + # the event handler connection is not closed + mylogger.info('Closing: %s %s %s', self.done_minions, self.timedout_minions, self.down_minions) + self.event.close_pub() + def _get_next(self): next_batch_size = min( len(self.to_run), # partial batch (all left) From 6c52505152064e881713094b328ffc954de229c6 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Wed, 28 Nov 2018 15:06:52 +0100 Subject: [PATCH 06/31] Remove code used for debug --- salt/cli/batch_async.py | 5 ----- salt/master.py | 21 --------------------- 2 files changed, 26 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index a0b8cc8f0ccf..98203827498f 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -94,12 +94,10 @@ def __event_handler(self, raw): self.opts['gather_job_timeout'], self.next) self.initialized = True - from salt.master import mylogger if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions): # TODO # if not all available minions finish the batch # the event handler connection is not closed - mylogger.info('Closing: %s %s %s', self.done_minions, self.timedout_minions, self.down_minions) self.event.close_pub() def _get_next(self): @@ -114,10 +112,8 @@ def _get_next(self): @tornado.gen.coroutine def check_find_job(self, minions): - from salt.master import mylogger did_not_return = minions.difference(self.find_job_returned) if did_not_return: - mylogger.info('Not returned: %s', did_not_return) for minion in did_not_return: if minion in self.find_job_returned: self.find_job_returned.remove(minion) @@ -130,7 +126,6 @@ def check_find_job(self, minions): @tornado.gen.coroutine def find_job(self, minions): - from salt.master import mylogger not_done = minions.difference(self.done_minions) ping_return = yield self.local.run_job_async( not_done, diff --git a/salt/master.py b/salt/master.py index adb21f063648..e4c9bb30febd 100644 --- a/salt/master.py +++ b/salt/master.py @@ -96,12 +96,6 @@ log = logging.getLogger(__name__) -fh = logging.FileHandler('/tmp/log.log') -formatter = logging.Formatter('%(asctime)s - %(module)s.%(funcName)s - %(message)s') -fh.setFormatter(formatter) -mylogger = logging.getLogger('loglog') -mylogger.addHandler(fh) - class SMaster(object): ''' @@ -1610,7 +1604,6 @@ def _return(self, load): :param dict load: The minion payload ''' - mylogger.debug("{jid} {id} {fun}".format(**load)) if self.opts['require_minion_sign_messages'] and 'sig' not in load: log.critical( '_return: Master is requiring minions to sign their ' @@ -2043,16 +2036,6 @@ def get_token(self, clear_load): return False return self.loadauth.get_tok(clear_load['token']) - @tornado.gen.coroutine - def _timestamp(self, ioloop, lasttime, times, name): - if times >= 50: - raise tornado.gen.Return() - import random - # yield tornado.gen.sleep(random.randint(1, 50) / 100.0) - yield tornado.gen.sleep(0.1) - mylogger.info("%s %s", name, time.time() - lasttime) - ioloop.add_callback(self._timestamp, ioloop, time.time(), times + 1, name) - def publish_batch(self, batch, clear_load): batch_load = {} batch_load.update(clear_load) @@ -2065,9 +2048,6 @@ def publish_batch(self, batch, clear_load): ) ioloop = tornado.ioloop.IOLoop.current() ioloop.add_callback(batch.start) - # ioloop.add_callback(self._timestamp, ioloop, time.time(), 0, ' 1 ') - # ioloop.add_callback(self._timestamp, ioloop, time.time(), 0, ' 2 ') - # ioloop.add_callback(self._timestamp, ioloop, time.time(), 0, ' 3 ') return { 'enc': 'clear', @@ -2178,7 +2158,6 @@ def publish(self, clear_load): return {'enc': 'clear', 'load': {'error': 'Master failed to assign jid'}} payload = self._prep_pub(minions, jid, clear_load, extra, missing) - mylogger.debug("{jid} {fun} {tgt} {arg}".format(**payload)) # Send it! self._send_ssh_pub(payload, ssh_minions=ssh_minions) From fe8bb39512142c60698e0b76f0bbbc9462e5891a Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 14 Dec 2018 10:47:16 +0100 Subject: [PATCH 07/31] Remove TODO comment --- salt/cli/batch_async.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 98203827498f..a1144b506173 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -95,9 +95,6 @@ def __event_handler(self, raw): self.initialized = True if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions): - # TODO - # if not all available minions finish the batch - # the event handler connection is not closed self.event.close_pub() def _get_next(self): From 04317346fe76cbcd6b0e6da449b47f35165ca1a8 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 14 Dec 2018 15:52:58 +0100 Subject: [PATCH 08/31] Fix pylint error --- salt/cli/batch_async.py | 1 + 1 file changed, 1 insertion(+) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index a1144b506173..667396aa92dd 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -13,6 +13,7 @@ # pylint: enable=import-error,no-name-in-module,redefined-builtin import logging import fnmatch +from salt.ext.six.moves import range log = logging.getLogger(__name__) From 8b42afe8bdc3a58f892740f57b3e4caa4b1addfb Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 14 Dec 2018 15:58:58 +0100 Subject: [PATCH 09/31] Remove leading '_' from function names --- salt/cli/batch.py | 8 ++++---- salt/cli/batch_async.py | 8 ++++---- salt/client/__init__.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/salt/cli/batch.py b/salt/cli/batch.py index 564dbdb6ee78..3c999d895cc2 100644 --- a/salt/cli/batch.py +++ b/salt/cli/batch.py @@ -26,7 +26,7 @@ log = logging.getLogger(__name__) -def _get_bnum(opts, minions, quiet): +def get_bnum(opts, minions, quiet): ''' Return the active number of minions to maintain ''' @@ -46,7 +46,7 @@ def _get_bnum(opts, minions, quiet): 'form of %10, 10% or 3'.format(opts['batch'])) -def _batch_get_opts( +def batch_get_opts( tgt, fun, batch, @@ -86,7 +86,7 @@ def _batch_get_opts( return opts -def _batch_get_eauth(kwargs): +def batch_get_eauth(kwargs): eauth = {} if 'eauth' in kwargs: eauth['eauth'] = kwargs.pop('eauth') @@ -153,7 +153,7 @@ def __gather_minions(self): return (list(fret), ping_gen, nret.difference(fret)) def get_bnum(self): - return _get_bnum(self.opts, self.minions, self.quiet) + return get_bnum(self.opts, self.minions, self.quiet) def __update_wait(self, wait): now = datetime.now() diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 667396aa92dd..3baa66a115c6 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -17,7 +17,7 @@ log = logging.getLogger(__name__) -from salt.cli.batch import _get_bnum, _batch_get_opts, _batch_get_eauth +from salt.cli.batch import get_bnum, batch_get_opts, batch_get_eauth class BatchAsync(object): @@ -28,13 +28,13 @@ def __init__(self, parent_opts, jid_gen, clear_load): ioloop = tornado.ioloop.IOLoop.current() self.local = salt.client.get_local_client(parent_opts['conf_file']) clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout') - self.opts = _batch_get_opts( + self.opts = batch_get_opts( clear_load.pop('tgt'), clear_load.pop('fun'), clear_load['kwargs'].pop('batch'), self.local.opts, **clear_load) - self.eauth = _batch_get_eauth(clear_load['kwargs']) + self.eauth = batch_get_eauth(clear_load['kwargs']) self.minions = set() self.down_minions = set() self.timedout_minions = set() @@ -79,7 +79,7 @@ def __event_handler(self, raw): if op == 'ping_return': self.minions.add(minion) self.down_minions.remove(minion) - self.batch_size = _get_bnum(self.opts, self.minions, True) + self.batch_size = get_bnum(self.opts, self.minions, True) self.to_run = self.minions.difference(self.done_minions).difference(self.active) elif op == 'find_job_return': self.find_job_returned.add(minion) diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 6e53b7c4e7e5..ccb1f3f78bd4 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -531,11 +531,11 @@ def cmd_batch( ''' # Late import - not used anywhere else in this file import salt.cli.batch - opts = salt.cli.batch._batch_get_opts( + opts = salt.cli.batch.batch_get_opts( tgt, fun, batch, self.opts, arg=arg, tgt_type=tgt_type, ret=ret, kwarg=kwarg, **kwargs) - eauth = salt.cli.batch._batch_get_eauth(kwargs) + eauth = salt.cli.batch.batch_get_eauth(kwargs) batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True) for ret in batch.run(): From c4ba22160ee143c80636b3cf0b8a0e62b7326c19 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 14 Dec 2018 17:23:23 +0100 Subject: [PATCH 10/31] Remove TODO comment --- salt/cli/batch_async.py | 1 - 1 file changed, 1 deletion(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 3baa66a115c6..7c3c24f4e9dc 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -68,7 +68,6 @@ def __set_event_handler(self): (find_job_return_pattern, 'find_job_return') } if not self.event.subscriber.connected(): - # TODO is there a way to subscribe to only some tags? self.event.set_event_handler(self.__event_handler) def __event_handler(self, raw): From 7a70969429b8b01b73487a8a0ddf6b9ec56a0cbd Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Mon, 17 Dec 2018 15:08:51 +0100 Subject: [PATCH 11/31] Make batch_delay a request parameter --- salt/auth/__init__.py | 4 +++- salt/cli/batch_async.py | 9 +++++++-- salt/master.py | 9 ++++----- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/salt/auth/__init__.py b/salt/auth/__init__.py index 626665b7a127..7bf334b02cef 100644 --- a/salt/auth/__init__.py +++ b/salt/auth/__init__.py @@ -50,7 +50,9 @@ 'metadata', 'print_event', 'raw', - 'yield_pub_data' + 'yield_pub_data', + 'batch', + 'batch_delay' ]) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 7c3c24f4e9dc..c59439ddfe3b 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -28,6 +28,7 @@ def __init__(self, parent_opts, jid_gen, clear_load): ioloop = tornado.ioloop.IOLoop.current() self.local = salt.client.get_local_client(parent_opts['conf_file']) clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout') + self.batch_delay = clear_load['kwargs'].get('batch_delay', 1) self.opts = batch_get_opts( clear_load.pop('tgt'), clear_load.pop('fun'), @@ -62,6 +63,7 @@ def __set_event_handler(self): find_job_return_pattern = 'salt/job/{0}/ret/*'.format(self.find_job_jid) self.event.subscribe(ping_return_pattern, match_type='glob') self.event.subscribe(batch_return_pattern, match_type='glob') + self.event.subscribe(find_job_return_pattern, match_type='glob') self.event.patterns = { (ping_return_pattern, 'ping_return'), (batch_return_pattern, 'batch_run'), @@ -71,6 +73,8 @@ def __set_event_handler(self): self.event.set_event_handler(self.__event_handler) def __event_handler(self, raw): + if not self.event: + return mtag, data = self.event.unpack(raw, self.event.serial) for (pattern, op) in self.event.patterns: if fnmatch.fnmatch(mtag, pattern): @@ -87,7 +91,7 @@ def __event_handler(self, raw): self.active.remove(minion) self.done_minions.add(minion) # call later so that we maybe gather more returns - self.event.io_loop.call_later(1, self.next) + self.event.io_loop.call_later(self.batch_delay, self.next) if not self.initialized: #start batching even if not all minions respond to ping self.event.io_loop.call_later( @@ -95,7 +99,8 @@ def __event_handler(self, raw): self.initialized = True if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions): - self.event.close_pub() + # TODO how to really close this event handler? + del self.event def _get_next(self): next_batch_size = min( diff --git a/salt/master.py b/salt/master.py index e4c9bb30febd..d086e6d0f492 100644 --- a/salt/master.py +++ b/salt/master.py @@ -31,6 +31,7 @@ # Import salt libs import salt.crypt +import salt.cli.batch_async import salt.client import salt.client.ssh.client import salt.exceptions @@ -2036,10 +2037,9 @@ def get_token(self, clear_load): return False return self.loadauth.get_tok(clear_load['token']) - def publish_batch(self, batch, clear_load): + def publish_batch(self, clear_load): batch_load = {} batch_load.update(clear_load) - batch_load['kwargs']['batch'] = batch import salt.cli.batch_async batch = salt.cli.batch_async.BatchAsync( self.local.opts, @@ -2064,7 +2064,6 @@ def publish(self, clear_load): by the LocalClient. ''' extra = clear_load.get('kwargs', {}) - batch = extra.pop('batch', None) publisher_acl = salt.acl.PublisherACL(self.opts['publisher_acl_blacklist']) @@ -2150,8 +2149,8 @@ def publish(self, clear_load): 'error': 'Master could not resolve minions for target {0}'.format(clear_load['tgt']) } } - if batch: - return self.publish_batch(batch, clear_load) + if extra.get('batch', None): + return self.publish_batch(clear_load) else: jid = self._prep_jid(clear_load, extra) if jid is None: From 77b96a2b69904fee178e8977c1307611b073f444 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Wed, 19 Dec 2018 14:31:53 +0100 Subject: [PATCH 12/31] Improvements - start batching immediatelly after all minions reply to ping - return jid, available minions and missing minions in the response - fire event when batching start and when it ends --- salt/cli/batch_async.py | 27 +++++++++++++++++++++++++-- salt/master.py | 8 ++++---- salt/netapi/__init__.py | 3 ++- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index c59439ddfe3b..e4e7fc2e5299 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -84,6 +84,8 @@ def __event_handler(self, raw): self.down_minions.remove(minion) self.batch_size = get_bnum(self.opts, self.minions, True) self.to_run = self.minions.difference(self.done_minions).difference(self.active) + if not self.down_minions: + self.event.io_loop.call_later(self.batch_delay, self.start_batch) elif op == 'find_job_return': self.find_job_returned.add(minion) elif op == 'batch_run': @@ -95,10 +97,17 @@ def __event_handler(self, raw): if not self.initialized: #start batching even if not all minions respond to ping self.event.io_loop.call_later( - self.opts['gather_job_timeout'], self.next) - self.initialized = True + self.opts['gather_job_timeout'], self.start_batch) if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions): + self.event.fire_event( + { + "available_minions": self.minions, + "down_minions": self.down_minions, + "done_minions": self.done_minions, + "timedout_minions": self.timedout_minions + }, + "salt/batch/%s/done" % self.batch_jid) # TODO how to really close this event handler? del self.event @@ -157,8 +166,22 @@ def start(self): **self.eauth) self.down_minions = set(ping_return['minions']) + @tornado.gen.coroutine + def start_batch(self): + if not self.initialized: + self.initialized = True + self.event.fire_event( + { + "available_minions": self.minions, + "down_minions": self.down_minions + }, + "salt/batch/%s/start" % self.batch_jid) + yield self.next() + @tornado.gen.coroutine def next(self): + if not self.initialized: + self.initialized = True next_batch = self._get_next() if next_batch: yield self.local.run_job_async( diff --git a/salt/master.py b/salt/master.py index d086e6d0f492..20d1db78146b 100644 --- a/salt/master.py +++ b/salt/master.py @@ -2037,7 +2037,7 @@ def get_token(self, clear_load): return False return self.loadauth.get_tok(clear_load['token']) - def publish_batch(self, clear_load): + def publish_batch(self, clear_load, minions, missing): batch_load = {} batch_load.update(clear_load) import salt.cli.batch_async @@ -2053,8 +2053,8 @@ def publish_batch(self, clear_load): 'enc': 'clear', 'load': { 'jid': batch.batch_jid, - 'minions': [], - 'missing': [] + 'minions': minions, + 'missing': missing } } @@ -2150,7 +2150,7 @@ def publish(self, clear_load): } } if extra.get('batch', None): - return self.publish_batch(clear_load) + return self.publish_batch(clear_load, minions, missing) else: jid = self._prep_jid(clear_load, extra) if jid is None: diff --git a/salt/netapi/__init__.py b/salt/netapi/__init__.py index 95f6384889a0..43b6e943a7f4 100644 --- a/salt/netapi/__init__.py +++ b/salt/netapi/__init__.py @@ -88,7 +88,8 @@ def local_async(self, *args, **kwargs): :return: job ID ''' local = salt.client.get_local_client(mopts=self.opts) - return local.run_job(*args, **kwargs) + ret = local.run_job(*args, **kwargs) + return ret def local(self, *args, **kwargs): ''' From bbfd37f58f3be24913fa39d2886b476ca830d325 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 21 Dec 2018 16:10:56 +0100 Subject: [PATCH 13/31] Allow multiple event handlers --- salt/cli/batch_async.py | 53 +++++++++++++++++++---------------------- salt/transport/ipc.py | 9 +++++-- salt/utils/event.py | 11 +++++++-- 3 files changed, 41 insertions(+), 32 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index e4e7fc2e5299..a276f3a5daa7 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -40,7 +40,6 @@ def __init__(self, parent_opts, jid_gen, clear_load): self.down_minions = set() self.timedout_minions = set() self.done_minions = set() - self.to_run = set() self.active = set() self.initialized = False self.ping_jid = jid_gen() @@ -55,7 +54,6 @@ def __init__(self, parent_opts, jid_gen, clear_load): listen=True, io_loop=ioloop, keep_loop=True) - self.__set_event_handler() def __set_event_handler(self): ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid) @@ -69,8 +67,7 @@ def __set_event_handler(self): (batch_return_pattern, 'batch_run'), (find_job_return_pattern, 'find_job_return') } - if not self.event.subscriber.connected(): - self.event.set_event_handler(self.__event_handler) + self.event.set_event_handler(self.__event_handler) def __event_handler(self, raw): if not self.event: @@ -82,10 +79,8 @@ def __event_handler(self, raw): if op == 'ping_return': self.minions.add(minion) self.down_minions.remove(minion) - self.batch_size = get_bnum(self.opts, self.minions, True) - self.to_run = self.minions.difference(self.done_minions).difference(self.active) if not self.down_minions: - self.event.io_loop.call_later(self.batch_delay, self.start_batch) + self.event.io_loop.spawn_callback(self.start_batch) elif op == 'find_job_return': self.find_job_returned.add(minion) elif op == 'batch_run': @@ -94,32 +89,20 @@ def __event_handler(self, raw): self.done_minions.add(minion) # call later so that we maybe gather more returns self.event.io_loop.call_later(self.batch_delay, self.next) - if not self.initialized: - #start batching even if not all minions respond to ping - self.event.io_loop.call_later( - self.opts['gather_job_timeout'], self.start_batch) if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions): - self.event.fire_event( - { - "available_minions": self.minions, - "down_minions": self.down_minions, - "done_minions": self.done_minions, - "timedout_minions": self.timedout_minions - }, - "salt/batch/%s/done" % self.batch_jid) - # TODO how to really close this event handler? - del self.event + self.end_batch() def _get_next(self): + to_run = self.minions.difference( + self.done_minions).difference( + self.active).difference( + self.timedout_minions) next_batch_size = min( - len(self.to_run), # partial batch (all left) + len(to_run), # partial batch (all left) self.batch_size - len(self.active) # full batch or available slots ) - next_batch = set() - for i in range(next_batch_size): - next_batch.add(self.to_run.pop()) - return next_batch + return set(list(to_run)[:next_batch_size]) @tornado.gen.coroutine def check_find_job(self, minions): @@ -153,6 +136,10 @@ def find_job(self, minions): @tornado.gen.coroutine def start(self): + self.__set_event_handler() + #start batching even if not all minions respond to ping + self.event.io_loop.call_later( + self.opts['gather_job_timeout'], self.start_batch) ping_return = yield self.local.run_job_async( self.opts['tgt'], 'test.ping', @@ -169,6 +156,7 @@ def start(self): @tornado.gen.coroutine def start_batch(self): if not self.initialized: + self.batch_size = get_bnum(self.opts, self.minions, True) self.initialized = True self.event.fire_event( { @@ -178,10 +166,19 @@ def start_batch(self): "salt/batch/%s/start" % self.batch_jid) yield self.next() + def end_batch(self): + self.event.fire_event( + { + "available_minions": self.minions, + "down_minions": self.down_minions, + "done_minions": self.done_minions, + "timedout_minions": self.timedout_minions + }, + "salt/batch/%s/done" % self.batch_jid) + self.event.remove_event_handler(self.__event_handler) + @tornado.gen.coroutine def next(self): - if not self.initialized: - self.initialized = True next_batch = self._get_next() if next_batch: yield self.local.run_job_async( diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 1fc5c9b8a767..2432d21b2c72 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -623,6 +623,7 @@ def __init__(self, socket_path, io_loop=None): self._read_stream_future = None self._saved_data = [] self._read_in_progress = Lock() + self.callbacks = set() @tornado.gen.coroutine def _read(self, timeout, callback=None): @@ -697,8 +698,12 @@ def read_sync(self, timeout=None): return self._saved_data.pop(0) return self.io_loop.run_sync(lambda: self._read(timeout)) + def __run_callbacks(self, raw): + for callback in self.callbacks: + self.io_loop.spawn_callback(callback, raw) + @tornado.gen.coroutine - def read_async(self, callback): + def read_async(self): ''' Asynchronously read messages and invoke a callback when they are ready. @@ -713,7 +718,7 @@ def read_async(self, callback): except Exception as exc: log.error('Exception occurred while Subscriber connecting: %s', exc) yield tornado.gen.sleep(1) - yield self._read(None, callback) + yield self._read(self.__run_callbacks) def close(self): ''' diff --git a/salt/utils/event.py b/salt/utils/event.py index f37b65021a2f..835390bf1fd0 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -857,6 +857,10 @@ def fire_ret_load(self, load): # Minion fired a bad retcode, fire an event self._fire_ret_load_specific_fun(load) + def remove_event_handler(self, event_handler): + if event_handler in self.subscriber.callbacks: + self.subscriber.callbacks.remove(event_handler) + def set_event_handler(self, event_handler): ''' Invoke the event_handler callback each time an event arrives. @@ -865,8 +869,11 @@ def set_event_handler(self, event_handler): if not self.cpub: self.connect_pub() - # This will handle reconnects - return self.subscriber.read_async(event_handler) + + self.subscriber.callbacks.add(event_handler) + if not self.subscriber.reading: + # This will handle reconnects + self.subscriber.read_async() # pylint: disable=W1701 def __del__(self): From e17d3a3458a4d30b2c18998ade56a05769ccb814 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 15 Jan 2019 14:56:03 +0100 Subject: [PATCH 14/31] Use config value for gather_job_timeout when not in payload --- salt/cli/batch_async.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index a276f3a5daa7..044c250809f0 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -27,7 +27,10 @@ class BatchAsync(object): def __init__(self, parent_opts, jid_gen, clear_load): ioloop = tornado.ioloop.IOLoop.current() self.local = salt.client.get_local_client(parent_opts['conf_file']) - clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout') + if 'gather_job_timeout' in clear_load['kwargs']: + clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout') + else: + clear_load['gather_job_timeout'] = self.local.opts['gather_job_timeout'] self.batch_delay = clear_load['kwargs'].get('batch_delay', 1) self.opts = batch_get_opts( clear_load.pop('tgt'), From f3fa39424c3c47aa10b88079564087af530a06d1 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Thu, 17 Jan 2019 17:28:12 +0100 Subject: [PATCH 15/31] Add async batch unittests --- tests/unit/cli/test_batch_async.py | 320 +++++++++++++++++++++++++++++ 1 file changed, 320 insertions(+) create mode 100644 tests/unit/cli/test_batch_async.py diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py new file mode 100644 index 000000000000..50cb507ebbae --- /dev/null +++ b/tests/unit/cli/test_batch_async.py @@ -0,0 +1,320 @@ +# Import Salt Libs +from salt.cli.batch_async import BatchAsync + +import tornado +from tornado.testing import AsyncTestCase +from tests.support.unit import skipIf, TestCase +from tests.support.mock import MagicMock, patch, NO_MOCK, NO_MOCK_REASON + + +@skipIf(NO_MOCK, NO_MOCK_REASON) +class AsyncBatchTestCase(AsyncTestCase, TestCase): + + def setUp(self): + self.io_loop = self.get_new_ioloop() + opts = {'batch': '1', + 'conf_file': {}, + 'tgt': '*', + 'transport': '', + 'timeout': 5, + 'gather_job_timeout': 5, + 'transport': None, + 'sock_dir': ''} + + with patch('salt.client.get_local_client', MagicMock(return_value=MagicMock())): + with patch('salt.cli.batch_async.batch_get_opts', + MagicMock(return_value=opts) + ): + self.batch = BatchAsync(opts, MagicMock(side_effect=['1234', '1235', '1236']), MagicMock()) + + def test_ping_jid(self): + self.assertEqual(self.batch.ping_jid, '1234') + + def test_batch_jid(self): + self.assertEqual(self.batch.batch_jid, '1235') + + def test_find_job_jid(self): + self.assertEqual(self.batch.find_job_jid, '1236') + + def test_batch_size(self): + ''' + Tests passing batch value as a number + ''' + self.batch.opts = {'batch': '2', 'timeout': 5} + self.batch.minions = set(['foo', 'bar']) + self.batch.start_batch() + self.assertEqual(self.batch.batch_size, 2) + + @tornado.testing.gen_test + def test_batch_start(self): + self.batch.event = MagicMock() + future = tornado.gen.Future() + future.set_result({'minions': ['foo', 'bar']}) + self.batch.local.run_job_async.return_value = future + ret = self.batch.start() + # assert start_batch is called later with gather_job_timeout as param + self.assertEqual( + self.batch.event.io_loop.call_later.call_args[0], + (5, self.batch.start_batch)) + # assert test.ping called + self.assertEqual( + self.batch.local.run_job_async.call_args[0], + ('*', 'test.ping', [], 'glob') + ) + # assert down_minions == all minions matched by tgt + self.assertEqual(self.batch.down_minions, set(['foo', 'bar'])) + + def test_batch_fire_start_event(self): + self.batch.minions = set(['foo', 'bar']) + self.batch.opts = {'batch': '2', 'timeout': 5} + self.batch.event = MagicMock() + self.batch.start_batch() + self.assertEqual( + self.batch.event.fire_event.call_args[0], + ( + { + 'available_minions': set(['foo', 'bar']), + 'down_minions': set() + }, + "salt/batch/1235/start" + ) + ) + + @tornado.testing.gen_test + def test_start_batch_calls_next(self): + self.batch.next = MagicMock(return_value=MagicMock()) + self.batch.event = MagicMock() + future = tornado.gen.Future() + future.set_result(None) + self.batch.next = MagicMock(return_value=future) + self.batch.start_batch() + self.assertEqual(self.batch.initialized, True) + self.assertEqual(len(self.batch.next.mock_calls), 1) + + def test_batch_fire_done_event(self): + self.batch.minions = set(['foo', 'bar']) + self.batch.event = MagicMock() + self.batch.end_batch() + self.assertEqual( + self.batch.event.fire_event.call_args[0], + ( + { + 'available_minions': set(['foo', 'bar']), + 'done_minions': set(), + 'down_minions': set(), + 'timedout_minions': set() + }, + "salt/batch/1235/done" + ) + ) + self.assertEqual( + len(self.batch.event.remove_event_handler.mock_calls), 1) + + @tornado.testing.gen_test + def test_batch_next(self): + self.batch.event = MagicMock() + self.batch.opts['fun'] = 'my.fun' + self.batch.opts['arg'] = [] + self.batch._get_next = MagicMock(return_value={'foo', 'bar'}) + self.batch.batch_size = 2 + future = tornado.gen.Future() + future.set_result({'minions': ['foo', 'bar']}) + self.batch.local.run_job_async.return_value = future + ret = self.batch.next().result() + self.assertEqual( + self.batch.local.run_job_async.call_args[0], + ({'foo', 'bar'}, 'my.fun', [], 'list') + ) + self.assertEqual( + self.batch.event.io_loop.call_later.call_args[0], + (10, self.batch.find_job, {'foo', 'bar'}) + ) + self.assertEqual(self.batch.active, {'bar', 'foo'}) + + def test_next_batch(self): + self.batch.minions = {'foo', 'bar'} + self.batch.batch_size = 2 + self.assertEqual(self.batch._get_next(), {'foo', 'bar'}) + + def test_next_batch_one_done(self): + self.batch.minions = {'foo', 'bar'} + self.batch.done_minions = {'bar'} + self.batch.batch_size = 2 + self.assertEqual(self.batch._get_next(), {'foo'}) + + def test_next_batch_one_done_one_active(self): + self.batch.minions = {'foo', 'bar', 'baz'} + self.batch.done_minions = {'bar'} + self.batch.active = {'baz'} + self.batch.batch_size = 2 + self.assertEqual(self.batch._get_next(), {'foo'}) + + def test_next_batch_one_done_one_active_one_timedout(self): + self.batch.minions = {'foo', 'bar', 'baz', 'faz'} + self.batch.done_minions = {'bar'} + self.batch.active = {'baz'} + self.batch.timedout_minions = {'faz'} + self.batch.batch_size = 2 + self.assertEqual(self.batch._get_next(), {'foo'}) + + def test_next_batch_bigger_size(self): + self.batch.minions = {'foo', 'bar'} + self.batch.batch_size = 3 + self.assertEqual(self.batch._get_next(), {'foo', 'bar'}) + + def test_next_batch_all_done(self): + self.batch.minions = {'foo', 'bar'} + self.batch.done_minions = {'foo', 'bar'} + self.batch.batch_size = 2 + self.assertEqual(self.batch._get_next(), set()) + + def test_next_batch_all_active(self): + self.batch.minions = {'foo', 'bar'} + self.batch.active = {'foo', 'bar'} + self.batch.batch_size = 2 + self.assertEqual(self.batch._get_next(), set()) + + def test_next_batch_all_timedout(self): + self.batch.minions = {'foo', 'bar'} + self.batch.timedout_minions = {'foo', 'bar'} + self.batch.batch_size = 2 + self.assertEqual(self.batch._get_next(), set()) + + def test_batch__event_handler_ping_return(self): + self.batch.down_minions = {'foo'} + self.batch.event = MagicMock( + unpack=MagicMock(return_value=('salt/job/1234/ret/foo', {'id': 'foo'}))) + self.batch.start() + self.assertEqual(self.batch.minions, set()) + self.batch._BatchAsync__event_handler(MagicMock()) + self.assertEqual(self.batch.minions, {'foo'}) + self.assertEqual(self.batch.done_minions, set()) + + def test_batch__event_handler_call_start_batch_when_all_pings_return(self): + self.batch.down_minions = {'foo'} + self.batch.event = MagicMock( + unpack=MagicMock(return_value=('salt/job/1234/ret/foo', {'id': 'foo'}))) + self.batch.start() + self.batch._BatchAsync__event_handler(MagicMock()) + self.assertEqual( + self.batch.event.io_loop.spawn_callback.call_args[0], + (self.batch.start_batch,)) + + def test_batch__event_handler_not_call_start_batch_when_not_all_pings_return(self): + self.batch.down_minions = {'foo', 'bar'} + self.batch.event = MagicMock( + unpack=MagicMock(return_value=('salt/job/1234/ret/foo', {'id': 'foo'}))) + self.batch.start() + self.batch._BatchAsync__event_handler(MagicMock()) + self.assertEqual( + len(self.batch.event.io_loop.spawn_callback.mock_calls), 0) + + def test_batch__event_handler_batch_run_return(self): + self.batch.event = MagicMock( + unpack=MagicMock(return_value=('salt/job/1235/ret/foo', {'id': 'foo'}))) + self.batch.start() + self.batch.active = {'foo'} + self.batch._BatchAsync__event_handler(MagicMock()) + self.assertEqual(self.batch.active, set()) + self.assertEqual(self.batch.done_minions, {'foo'}) + self.assertEqual( + self.batch.event.io_loop.call_later.call_args[0], + (self.batch.batch_delay, self.batch.next)) + + def test_batch__event_handler_find_job_return(self): + self.batch.event = MagicMock( + unpack=MagicMock(return_value=('salt/job/1236/ret/foo', {'id': 'foo'}))) + self.batch.start() + self.batch._BatchAsync__event_handler(MagicMock()) + self.assertEqual(self.batch.find_job_returned, {'foo'}) + + @tornado.testing.gen_test + def test_batch__event_handler_end_batch(self): + self.batch.event = MagicMock( + unpack=MagicMock(return_value=('salt/job/not-my-jid/ret/foo', {'id': 'foo'}))) + future = tornado.gen.Future() + future.set_result({'minions': ['foo', 'bar', 'baz']}) + self.batch.local.run_job_async.return_value = future + self.batch.start() + self.batch.initialized = True + self.assertEqual(self.batch.down_minions, {'foo', 'bar', 'baz'}) + self.batch.end_batch = MagicMock() + self.batch.minions = {'foo', 'bar', 'baz'} + self.batch.done_minions = {'foo', 'bar'} + self.batch.timedout_minions = {'baz'} + self.batch._BatchAsync__event_handler(MagicMock()) + self.assertEqual(len(self.batch.end_batch.mock_calls), 1) + + @tornado.testing.gen_test + def test_batch_find_job(self): + self.batch.event = MagicMock() + future = tornado.gen.Future() + future.set_result({}) + self.batch.local.run_job_async.return_value = future + self.batch.find_job({'foo', 'bar'}) + self.assertEqual( + self.batch.event.io_loop.call_later.call_args[0], + (self.batch.opts['gather_job_timeout'], self.batch.check_find_job, {'foo', 'bar'}) + ) + + @tornado.testing.gen_test + def test_batch_find_job_with_done_minions(self): + self.batch.done_minions = {'bar'} + self.batch.event = MagicMock() + future = tornado.gen.Future() + future.set_result({}) + self.batch.local.run_job_async.return_value = future + self.batch.find_job({'foo', 'bar'}) + self.assertEqual( + self.batch.event.io_loop.call_later.call_args[0], + (self.batch.opts['gather_job_timeout'], self.batch.check_find_job, {'foo'}) + ) + + def test_batch_check_find_job_did_not_return(self): + self.batch.event = MagicMock() + self.batch.active = {'foo'} + self.batch.find_job_returned = set() + self.batch.check_find_job({'foo'}) + self.assertEqual(self.batch.find_job_returned, set()) + self.assertEqual(self.batch.active, set()) + self.assertEqual(len(self.batch.event.io_loop.add_callback.mock_calls), 0) + + def test_batch_check_find_job_did_return(self): + self.batch.event = MagicMock() + self.batch.find_job_returned = {'foo'} + self.batch.check_find_job({'foo'}) + self.assertEqual( + self.batch.event.io_loop.add_callback.call_args[0], + (self.batch.find_job, {'foo'}) + ) + + def test_batch_check_find_job_multiple_states(self): + self.batch.event = MagicMock() + # currently running minions + self.batch.active = {'foo', 'bar'} + + # minion is running and find_job returns + self.batch.find_job_returned = {'foo'} + + # minion started running but find_job did not return + self.batch.timedout_minions = {'faz'} + + # minion finished + self.batch.done_minions = {'baz'} + + # both not yet done but only 'foo' responded to find_job + not_done = {'foo', 'bar'} + + self.batch.check_find_job(not_done) + + # assert 'bar' removed from active + self.assertEqual(self.batch.active, {'foo'}) + + # assert 'bar' added to timedout_minions + self.assertEqual(self.batch.timedout_minions, {'bar', 'faz'}) + + # assert 'find_job' schedueled again only for 'foo' + self.assertEqual( + self.batch.event.io_loop.add_callback.call_args[0], + (self.batch.find_job, {'foo'}) + ) From f7000ad1ab4fad9e68c8fa014d8bcb679d3dbf08 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 18 Jan 2019 15:03:38 +0100 Subject: [PATCH 16/31] Remove unnecessary else --- salt/master.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/salt/master.py b/salt/master.py index 20d1db78146b..3aa97c1adf17 100644 --- a/salt/master.py +++ b/salt/master.py @@ -2151,25 +2151,25 @@ def publish(self, clear_load): } if extra.get('batch', None): return self.publish_batch(clear_load, minions, missing) - else: - jid = self._prep_jid(clear_load, extra) - if jid is None: - return {'enc': 'clear', - 'load': {'error': 'Master failed to assign jid'}} - payload = self._prep_pub(minions, jid, clear_load, extra, missing) - - # Send it! - self._send_ssh_pub(payload, ssh_minions=ssh_minions) - self._send_pub(payload) - - return { - 'enc': 'clear', - 'load': { - 'jid': clear_load['jid'], - 'minions': minions, - 'missing': missing - } + + jid = self._prep_jid(clear_load, extra) + if jid is None: + return {'enc': 'clear', + 'load': {'error': 'Master failed to assign jid'}} + payload = self._prep_pub(minions, jid, clear_load, extra, missing) + + # Send it! + self._send_ssh_pub(payload, ssh_minions=ssh_minions) + self._send_pub(payload) + + return { + 'enc': 'clear', + 'load': { + 'jid': clear_load['jid'], + 'minions': minions, + 'missing': missing } + } def _prep_auth_info(self, clear_load): sensitive_load_keys = [] From b2a98e20a8265c3634361e3b4ffaf78da9d04f43 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 18 Jan 2019 15:06:29 +0100 Subject: [PATCH 17/31] Set utf8 and remove duplicate dict value --- tests/unit/cli/test_batch_async.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index 50cb507ebbae..56457eb226bf 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- + # Import Salt Libs from salt.cli.batch_async import BatchAsync @@ -15,7 +17,6 @@ def setUp(self): opts = {'batch': '1', 'conf_file': {}, 'tgt': '*', - 'transport': '', 'timeout': 5, 'gather_job_timeout': 5, 'transport': None, From 80019eec1cf57e2fcd37e2ad0c63ffa4ce1fe439 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Fri, 18 Jan 2019 15:08:44 +0100 Subject: [PATCH 18/31] Replace str substitution with format --- salt/cli/batch_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 044c250809f0..ee55f45d75c8 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -166,7 +166,7 @@ def start_batch(self): "available_minions": self.minions, "down_minions": self.down_minions }, - "salt/batch/%s/start" % self.batch_jid) + "salt/batch/{0}/start".format(self.batch_jid)) yield self.next() def end_batch(self): @@ -177,7 +177,7 @@ def end_batch(self): "done_minions": self.done_minions, "timedout_minions": self.timedout_minions }, - "salt/batch/%s/done" % self.batch_jid) + "salt/batch/{0}/done".format(self.batch_jid)) self.event.remove_event_handler(self.__event_handler) @tornado.gen.coroutine From 1389af85f9bdeef1650fc163a5e0f06125c8594c Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Mon, 21 Jan 2019 14:38:57 +0100 Subject: [PATCH 19/31] Improve docstring --- salt/cli/batch_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index ee55f45d75c8..06ad9d015687 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ''' -Execute batch runs +Execute a job on the targeted minions by using a moving window of fixed size `batch`. ''' # Import python libs From a387b9432cffaf54266fade23819833218fcfb29 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Mon, 21 Jan 2019 15:05:55 +0100 Subject: [PATCH 20/31] Use for subsequent --- salt/cli/batch_async.py | 3 +-- tests/unit/cli/test_batch_async.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 06ad9d015687..188d8a0d3d1f 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -194,6 +194,5 @@ def next(self): gather_job_timeout=self.opts['gather_job_timeout'], jid=self.batch_jid, **self.eauth) - # TODO add parameter for find_job - should use gather_job_timeout? - self.event.io_loop.call_later(10, self.find_job, set(next_batch)) + self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch)) self.active = self.active.union(next_batch) diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index 56457eb226bf..5414c9c6936f 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -128,7 +128,7 @@ def test_batch_next(self): ) self.assertEqual( self.batch.event.io_loop.call_later.call_args[0], - (10, self.batch.find_job, {'foo', 'bar'}) + (self.batch.opts['timeout'], self.batch.find_job, {'foo', 'bar'}) ) self.assertEqual(self.batch.active, {'bar', 'foo'}) From 8fce76dc1de502dbb0e8814378e3a0c3cf41e408 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 22 Jan 2019 10:52:52 +0100 Subject: [PATCH 21/31] Rename method --- salt/cli/batch_async.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 188d8a0d3d1f..11dcc657950e 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -91,7 +91,7 @@ def __event_handler(self, raw): self.active.remove(minion) self.done_minions.add(minion) # call later so that we maybe gather more returns - self.event.io_loop.call_later(self.batch_delay, self.next) + self.event.io_loop.call_later(self.batch_delay, self.schedule_next) if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions): self.end_batch() @@ -167,7 +167,7 @@ def start_batch(self): "down_minions": self.down_minions }, "salt/batch/{0}/start".format(self.batch_jid)) - yield self.next() + yield self.schedule_next() def end_batch(self): self.event.fire_event( @@ -181,7 +181,7 @@ def end_batch(self): self.event.remove_event_handler(self.__event_handler) @tornado.gen.coroutine - def next(self): + def schedule_next(self): next_batch = self._get_next() if next_batch: yield self.local.run_job_async( From 0a63a832af755551d712b483d4ab9186dc5dbd3c Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 22 Jan 2019 10:53:23 +0100 Subject: [PATCH 22/31] Remove import --- salt/cli/batch_async.py | 1 - 1 file changed, 1 deletion(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 11dcc657950e..5a0e5788d42c 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -13,7 +13,6 @@ # pylint: enable=import-error,no-name-in-module,redefined-builtin import logging import fnmatch -from salt.ext.six.moves import range log = logging.getLogger(__name__) From caed708226423499dbfdc31bec1f65836e427d72 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 22 Jan 2019 11:13:05 +0100 Subject: [PATCH 23/31] Fix pylint error --- tests/unit/cli/test_batch_async.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index 5414c9c6936f..f545963d0c97 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- +from __future__ import absolute_import + # Import Salt Libs from salt.cli.batch_async import BatchAsync From dde2ff5244bdbbd4993b415923fce8c3e71993d2 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 22 Jan 2019 11:13:33 +0100 Subject: [PATCH 24/31] Fix tests - method rename --- tests/unit/cli/test_batch_async.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index f545963d0c97..766dadd2d429 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -85,14 +85,14 @@ def test_batch_fire_start_event(self): @tornado.testing.gen_test def test_start_batch_calls_next(self): - self.batch.next = MagicMock(return_value=MagicMock()) + self.batch.schedule_next = MagicMock(return_value=MagicMock()) self.batch.event = MagicMock() future = tornado.gen.Future() future.set_result(None) - self.batch.next = MagicMock(return_value=future) + self.batch.schedule_next = MagicMock(return_value=future) self.batch.start_batch() self.assertEqual(self.batch.initialized, True) - self.assertEqual(len(self.batch.next.mock_calls), 1) + self.assertEqual(len(self.batch.schedule_next.mock_calls), 1) def test_batch_fire_done_event(self): self.batch.minions = set(['foo', 'bar']) @@ -123,7 +123,7 @@ def test_batch_next(self): future = tornado.gen.Future() future.set_result({'minions': ['foo', 'bar']}) self.batch.local.run_job_async.return_value = future - ret = self.batch.next().result() + ret = self.batch.schedule_next().result() self.assertEqual( self.batch.local.run_job_async.call_args[0], ({'foo', 'bar'}, 'my.fun', [], 'list') @@ -222,7 +222,7 @@ def test_batch__event_handler_batch_run_return(self): self.assertEqual(self.batch.done_minions, {'foo'}) self.assertEqual( self.batch.event.io_loop.call_later.call_args[0], - (self.batch.batch_delay, self.batch.next)) + (self.batch.batch_delay, self.batch.schedule_next)) def test_batch__event_handler_find_job_return(self): self.batch.event = MagicMock( From 08b45d09a278d1c7a0ab1cac5b9ef6c14042aaeb Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 22 Jan 2019 11:40:15 +0100 Subject: [PATCH 25/31] Improve BatchAsync docstring --- salt/cli/batch_async.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 5a0e5788d42c..fe8e7939b2b1 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -21,7 +21,32 @@ class BatchAsync(object): ''' - Manage the execution of batch runs + Run a job on the targeted minions by using a moving window of fixed size `batch`. + + ``BatchAsync`` is used to execute a job on the targeted minions by keeping + the number of concurrent running minions to the size of `batch` parameter. + + The control parameters are: + - batch: number/percentage of concurrent running minions + - batch_delay: minimum wait time between batches + - gather_job_timeout: `find_job` timeout + - timeout: time to wait before firing a `find_job` + + When the batch stars, a `start` event is fired: + - tag: salt/batch//start + - data: { + "available_minions": self.minions, + "down_minions": self.down_minions + } + + When the batch ends, an `done` event is fired: + - tag: salt/batch//done + - data: { + "available_minions": self.minions, + "down_minions": self.down_minions, + "done_minions": self.done_minions, + "timedout_minions": self.timedout_minions + } ''' def __init__(self, parent_opts, jid_gen, clear_load): ioloop = tornado.ioloop.IOLoop.current() From 4570b580e8d7ab3fed640cf4c6f5caa5dc0bc976 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Wed, 23 Jan 2019 14:43:24 +0100 Subject: [PATCH 26/31] Allow metadata to pass --- salt/cli/batch_async.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index fe8e7939b2b1..004b0c8ac1e4 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -62,7 +62,8 @@ def __init__(self, parent_opts, jid_gen, clear_load): clear_load['kwargs'].pop('batch'), self.local.opts, **clear_load) - self.eauth = batch_get_eauth(clear_load['kwargs']) + self.kwargs = clear_load['kwargs'] + self.kwargs.update(batch_get_eauth(clear_load['kwargs'])) self.minions = set() self.down_minions = set() self.timedout_minions = set() @@ -155,7 +156,7 @@ def find_job(self, minions): 'list', gather_job_timeout=self.opts['gather_job_timeout'], jid=self.find_job_jid, - **self.eauth) + **self.kwargs) self.event.io_loop.call_later( self.opts['gather_job_timeout'], self.check_find_job, @@ -177,7 +178,7 @@ def start(self): ), gather_job_timeout=self.opts['gather_job_timeout'], jid=self.ping_jid, - **self.eauth) + **self.kwargs) self.down_minions = set(ping_return['minions']) @tornado.gen.coroutine @@ -217,6 +218,6 @@ def schedule_next(self): ret=self.opts.get('return', ''), gather_job_timeout=self.opts['gather_job_timeout'], jid=self.batch_jid, - **self.eauth) + **self.kwargs) self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch)) self.active = self.active.union(next_batch) From 07987af155ce856560d2a428eb0e1879575c83b1 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Wed, 23 Jan 2019 15:43:42 +0100 Subject: [PATCH 27/31] Pass metadata only to batch jobs --- salt/cli/batch_async.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 004b0c8ac1e4..f43fcb8e04cd 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -63,7 +63,8 @@ def __init__(self, parent_opts, jid_gen, clear_load): self.local.opts, **clear_load) self.kwargs = clear_load['kwargs'] - self.kwargs.update(batch_get_eauth(clear_load['kwargs'])) + self.eauth = batch_get_eauth(clear_load['kwargs']) + self.kwargs.update(self.eauth) self.minions = set() self.down_minions = set() self.timedout_minions = set() @@ -156,7 +157,7 @@ def find_job(self, minions): 'list', gather_job_timeout=self.opts['gather_job_timeout'], jid=self.find_job_jid, - **self.kwargs) + **self.eauth) self.event.io_loop.call_later( self.opts['gather_job_timeout'], self.check_find_job, @@ -178,7 +179,7 @@ def start(self): ), gather_job_timeout=self.opts['gather_job_timeout'], jid=self.ping_jid, - **self.kwargs) + **self.eauth) self.down_minions = set(ping_return['minions']) @tornado.gen.coroutine From cd098b7c0ab29fcfedb645c636fd79dabb54f658 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 29 Jan 2019 14:46:26 +0100 Subject: [PATCH 28/31] Add the metadata to the start/done events --- salt/cli/batch_async.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index f43fcb8e04cd..d1348594137a 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -187,23 +187,29 @@ def start_batch(self): if not self.initialized: self.batch_size = get_bnum(self.opts, self.minions, True) self.initialized = True - self.event.fire_event( + data = {} + data.update(self.kwargs) + data.update( { "available_minions": self.minions, "down_minions": self.down_minions - }, - "salt/batch/{0}/start".format(self.batch_jid)) + } + ) + self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid)) yield self.schedule_next() def end_batch(self): - self.event.fire_event( + data = {} + data.update(self.kwargs) + data.update( { "available_minions": self.minions, "down_minions": self.down_minions, "done_minions": self.done_minions, "timedout_minions": self.timedout_minions - }, - "salt/batch/{0}/done".format(self.batch_jid)) + } + ) + self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid)) self.event.remove_event_handler(self.__event_handler) @tornado.gen.coroutine From f4f0b2ad80aca680d99232509aedd2c957c609de Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Wed, 30 Jan 2019 14:58:36 +0100 Subject: [PATCH 29/31] Pass only metadata not all **kwargs --- salt/cli/batch_async.py | 36 +++++++++++++----------------- tests/unit/cli/test_batch_async.py | 8 +++++-- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index d1348594137a..8df1bb5795ef 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -62,9 +62,8 @@ def __init__(self, parent_opts, jid_gen, clear_load): clear_load['kwargs'].pop('batch'), self.local.opts, **clear_load) - self.kwargs = clear_load['kwargs'] self.eauth = batch_get_eauth(clear_load['kwargs']) - self.kwargs.update(self.eauth) + self.metadata = clear_load['kwargs'].get('metadata', {}) self.minions = set() self.down_minions = set() self.timedout_minions = set() @@ -179,6 +178,7 @@ def start(self): ), gather_job_timeout=self.opts['gather_job_timeout'], jid=self.ping_jid, + metadata=self.metadata, **self.eauth) self.down_minions = set(ping_return['minions']) @@ -187,28 +187,22 @@ def start_batch(self): if not self.initialized: self.batch_size = get_bnum(self.opts, self.minions, True) self.initialized = True - data = {} - data.update(self.kwargs) - data.update( - { - "available_minions": self.minions, - "down_minions": self.down_minions - } - ) + data = { + "available_minions": self.minions, + "down_minions": self.down_minions, + "metadata": self.metadata + } self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid)) yield self.schedule_next() def end_batch(self): - data = {} - data.update(self.kwargs) - data.update( - { - "available_minions": self.minions, - "down_minions": self.down_minions, - "done_minions": self.done_minions, - "timedout_minions": self.timedout_minions - } - ) + data = { + "available_minions": self.minions, + "down_minions": self.down_minions, + "done_minions": self.done_minions, + "timedout_minions": self.timedout_minions, + "metadata": self.metadata + } self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid)) self.event.remove_event_handler(self.__event_handler) @@ -225,6 +219,6 @@ def schedule_next(self): ret=self.opts.get('return', ''), gather_job_timeout=self.opts['gather_job_timeout'], jid=self.batch_jid, - **self.kwargs) + metadata=self.metadata) self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch)) self.active = self.active.union(next_batch) diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index 766dadd2d429..2e906cc37864 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -71,13 +71,15 @@ def test_batch_fire_start_event(self): self.batch.minions = set(['foo', 'bar']) self.batch.opts = {'batch': '2', 'timeout': 5} self.batch.event = MagicMock() + self.batch.metadata = {'mykey': 'myvalue'} self.batch.start_batch() self.assertEqual( self.batch.event.fire_event.call_args[0], ( { 'available_minions': set(['foo', 'bar']), - 'down_minions': set() + 'down_minions': set(), + 'metadata': self.batch.metadata }, "salt/batch/1235/start" ) @@ -97,6 +99,7 @@ def test_start_batch_calls_next(self): def test_batch_fire_done_event(self): self.batch.minions = set(['foo', 'bar']) self.batch.event = MagicMock() + self.batch.metadata = {'mykey': 'myvalue'} self.batch.end_batch() self.assertEqual( self.batch.event.fire_event.call_args[0], @@ -105,7 +108,8 @@ def test_batch_fire_done_event(self): 'available_minions': set(['foo', 'bar']), 'done_minions': set(), 'down_minions': set(), - 'timedout_minions': set() + 'timedout_minions': set(), + 'metadata': self.batch.metadata }, "salt/batch/1235/done" ) From cd6ff7217a6f409305e8f23c69f15a3b39b4d2d5 Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 12 Nov 2019 16:42:02 +0100 Subject: [PATCH 30/31] Add batch_presence_ping_timeout parameter --- salt/cli/batch_async.py | 5 ++++- tests/unit/cli/test_batch_async.py | 32 ++++++++++++++++++++++++++---- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 8df1bb5795ef..3160d46d8b2a 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -29,6 +29,7 @@ class BatchAsync(object): The control parameters are: - batch: number/percentage of concurrent running minions - batch_delay: minimum wait time between batches + - batch_presence_ping_timeout: time to wait for presence pings before starting the batch - gather_job_timeout: `find_job` timeout - timeout: time to wait before firing a `find_job` @@ -55,6 +56,7 @@ def __init__(self, parent_opts, jid_gen, clear_load): clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout') else: clear_load['gather_job_timeout'] = self.local.opts['gather_job_timeout'] + self.batch_presence_ping_timeout = clear_load['kwargs'].get('batch_presence_ping_timeout', None) self.batch_delay = clear_load['kwargs'].get('batch_delay', 1) self.opts = batch_get_opts( clear_load.pop('tgt'), @@ -167,7 +169,8 @@ def start(self): self.__set_event_handler() #start batching even if not all minions respond to ping self.event.io_loop.call_later( - self.opts['gather_job_timeout'], self.start_batch) + self.batch_presence_ping_timeout or self.opts['gather_job_timeout'], + self.start_batch) ping_return = yield self.local.run_job_async( self.opts['tgt'], 'test.ping', diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index 2e906cc37864..f65b6a06c36e 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -21,6 +21,7 @@ def setUp(self): 'tgt': '*', 'timeout': 5, 'gather_job_timeout': 5, + 'batch_presence_ping_timeout': 1, 'transport': None, 'sock_dir': ''} @@ -28,7 +29,17 @@ def setUp(self): with patch('salt.cli.batch_async.batch_get_opts', MagicMock(return_value=opts) ): - self.batch = BatchAsync(opts, MagicMock(side_effect=['1234', '1235', '1236']), MagicMock()) + self.batch = BatchAsync( + opts, + MagicMock(side_effect=['1234', '1235', '1236']), + { + 'tgt': '', + 'fun': '', + 'kwargs': { + 'batch': '', + 'batch_presence_ping_timeout': 1 + } + }) def test_ping_jid(self): self.assertEqual(self.batch.ping_jid, '1234') @@ -49,16 +60,16 @@ def test_batch_size(self): self.assertEqual(self.batch.batch_size, 2) @tornado.testing.gen_test - def test_batch_start(self): + def test_batch_start_on_batch_presence_ping_timeout(self): self.batch.event = MagicMock() future = tornado.gen.Future() future.set_result({'minions': ['foo', 'bar']}) self.batch.local.run_job_async.return_value = future ret = self.batch.start() - # assert start_batch is called later with gather_job_timeout as param + # assert start_batch is called later with batch_presence_ping_timeout as param self.assertEqual( self.batch.event.io_loop.call_later.call_args[0], - (5, self.batch.start_batch)) + (self.batch.batch_presence_ping_timeout, self.batch.start_batch)) # assert test.ping called self.assertEqual( self.batch.local.run_job_async.call_args[0], @@ -67,6 +78,19 @@ def test_batch_start(self): # assert down_minions == all minions matched by tgt self.assertEqual(self.batch.down_minions, set(['foo', 'bar'])) + @tornado.testing.gen_test + def test_batch_start_on_gather_job_timeout(self): + self.batch.event = MagicMock() + future = tornado.gen.Future() + future.set_result({'minions': ['foo', 'bar']}) + self.batch.local.run_job_async.return_value = future + self.batch.batch_presence_ping_timeout = None + ret = self.batch.start() + # assert start_batch is called later with gather_job_timeout as param + self.assertEqual( + self.batch.event.io_loop.call_later.call_args[0], + (self.batch.opts['gather_job_timeout'], self.batch.start_batch)) + def test_batch_fire_start_event(self): self.batch.minions = set(['foo', 'bar']) self.batch.opts = {'batch': '2', 'timeout': 5} From ad55f4005879b79cf421a62f52b2de021dc108ad Mon Sep 17 00:00:00 2001 From: Mihai Dinca Date: Tue, 12 Nov 2019 17:36:51 +0100 Subject: [PATCH 31/31] Incorporate PR #52519 --- salt/cli/batch_async.py | 3 ++- salt/master.py | 5 +++-- salt/utils/event.py | 2 +- tests/unit/cli/test_batch_async.py | 14 ++++++++++++++ 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 3160d46d8b2a..9c00762f0080 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -222,6 +222,7 @@ def schedule_next(self): ret=self.opts.get('return', ''), gather_job_timeout=self.opts['gather_job_timeout'], jid=self.batch_jid, - metadata=self.metadata) + metadata=self.metadata, + **self.eauth) self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch)) self.active = self.active.union(next_batch) diff --git a/salt/master.py b/salt/master.py index 3aa97c1adf17..926de5973478 100644 --- a/salt/master.py +++ b/salt/master.py @@ -2037,7 +2037,7 @@ def get_token(self, clear_load): return False return self.loadauth.get_tok(clear_load['token']) - def publish_batch(self, clear_load, minions, missing): + def publish_batch(self, clear_load, extra, minions, missing): batch_load = {} batch_load.update(clear_load) import salt.cli.batch_async @@ -2047,6 +2047,7 @@ def publish_batch(self, clear_load, minions, missing): batch_load ) ioloop = tornado.ioloop.IOLoop.current() + self._prep_pub(minions, batch.batch_jid, clear_load, extra, missing) ioloop.add_callback(batch.start) return { @@ -2150,7 +2151,7 @@ def publish(self, clear_load): } } if extra.get('batch', None): - return self.publish_batch(clear_load, minions, missing) + return self.publish_batch(clear_load, extra, minions, missing) jid = self._prep_jid(clear_load, extra) if jid is None: diff --git a/salt/utils/event.py b/salt/utils/event.py index 835390bf1fd0..4fe4c976ae22 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -873,7 +873,7 @@ def set_event_handler(self, event_handler): self.subscriber.callbacks.add(event_handler) if not self.subscriber.reading: # This will handle reconnects - self.subscriber.read_async() + return self.subscriber.read_async() # pylint: disable=W1701 def __del__(self): diff --git a/tests/unit/cli/test_batch_async.py b/tests/unit/cli/test_batch_async.py index f65b6a06c36e..5f770e28e502 100644 --- a/tests/unit/cli/test_batch_async.py +++ b/tests/unit/cli/test_batch_async.py @@ -151,6 +151,8 @@ def test_batch_next(self): future = tornado.gen.Future() future.set_result({'minions': ['foo', 'bar']}) self.batch.local.run_job_async.return_value = future + self.batch.eauth = {'username': 'user#1', 'password': 'pass'} + self.batch.metadata = {'mykey': 'myvalue'} ret = self.batch.schedule_next().result() self.assertEqual( self.batch.local.run_job_async.call_args[0], @@ -160,6 +162,18 @@ def test_batch_next(self): self.batch.event.io_loop.call_later.call_args[0], (self.batch.opts['timeout'], self.batch.find_job, {'foo', 'bar'}) ) + self.assertEqual( + self.batch.local.run_job_async.call_args[1], + { + 'username': 'user#1', + 'password': 'pass', + 'jid': self.batch.batch_jid, + 'ret': u'', + 'gather_job_timeout': self.batch.opts['gather_job_timeout'], + 'raw': False, + 'metadata': {'mykey': 'myvalue'} + } + ) self.assertEqual(self.batch.active, {'bar', 'foo'}) def test_next_batch(self):