Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Porting Async Batch #50546 #55268

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d6fc692
Async batch
Nov 16, 2018
3a22c4a
Fix `if` statement placement
Nov 29, 2018
e2e77ee
Remove unused imports
Nov 30, 2018
3167b5a
Add find_job checks
Dec 3, 2018
ee16fbe
Check if should close on all events
Dec 3, 2018
6c52505
Remove code used for debug
Nov 28, 2018
fe8bb39
Remove TODO comment
Dec 14, 2018
0431734
Fix pylint error
Dec 14, 2018
8b42afe
Remove leading '_' from function names
Dec 14, 2018
c4ba221
Remove TODO comment
Dec 14, 2018
7a70969
Make batch_delay a request parameter
Dec 17, 2018
77b96a2
Improvements
Dec 19, 2018
bbfd37f
Allow multiple event handlers
Dec 21, 2018
e17d3a3
Use config value for gather_job_timeout when not in payload
Jan 15, 2019
f3fa394
Add async batch unittests
Jan 17, 2019
f7000ad
Remove unnecessary else
Jan 18, 2019
b2a98e2
Set utf8 and remove duplicate dict value
Jan 18, 2019
80019ee
Replace str substitution with format
Jan 18, 2019
1389af8
Improve docstring
Jan 21, 2019
a387b94
Use for subsequent
Jan 21, 2019
8fce76d
Rename method
Jan 22, 2019
0a63a83
Remove import
Jan 22, 2019
caed708
Fix pylint error
Jan 22, 2019
dde2ff5
Fix tests - method rename
Jan 22, 2019
08b45d0
Improve BatchAsync docstring
Jan 22, 2019
4570b58
Allow metadata to pass
Jan 23, 2019
07987af
Pass metadata only to batch jobs
Jan 23, 2019
cd098b7
Add the metadata to the start/done events
Jan 29, 2019
f4f0b2a
Pass only metadata not all **kwargs
Jan 30, 2019
cd6ff72
Add batch_presence_ping_timeout parameter
Nov 12, 2019
ad55f40
Incorporate PR #52519
Nov 12, 2019
4b06611
Merge branch 'master' into saltstack-PR#50546
DmitryKuzmenko Mar 27, 2020
79b5e17
Merge branch 'master' into saltstack-PR#50546
DmitryKuzmenko Mar 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion salt/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
'metadata',
'print_event',
'raw',
'yield_pub_data'
'yield_pub_data',
'batch',
'batch_delay'
])


Expand Down
91 changes: 74 additions & 17 deletions salt/cli/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,79 @@
log = logging.getLogger(__name__)


def get_bnum(opts, minions, quiet):
'''
Return the active number of minions to maintain
'''
partition = lambda x: float(x) / 100.0 * len(minions)
try:
if '%' in opts['batch']:
res = partition(float(opts['batch'].strip('%')))
if res < 1:
return int(math.ceil(res))
else:
return int(res)
else:
return int(opts['batch'])
except ValueError:
if not quiet:
salt.utils.stringutils.print_cli('Invalid batch data sent: {0}\nData must be in the '
'form of %10, 10% or 3'.format(opts['batch']))


def batch_get_opts(
tgt,
fun,
batch,
parent_opts,
arg=(),
tgt_type='glob',
ret='',
kwarg=None,
**kwargs):
# We need to re-import salt.utils.args here
# even though it has already been imported.
# when cmd_batch is called via the NetAPI
# the module is unavailable.
import salt.utils.args

arg = salt.utils.args.condition_input(arg, kwarg)
opts = {'tgt': tgt,
'fun': fun,
'arg': arg,
'tgt_type': tgt_type,
'ret': ret,
'batch': batch,
'failhard': kwargs.get('failhard', False),
'raw': kwargs.get('raw', False)}

if 'timeout' in kwargs:
opts['timeout'] = kwargs['timeout']
if 'gather_job_timeout' in kwargs:
opts['gather_job_timeout'] = kwargs['gather_job_timeout']
if 'batch_wait' in kwargs:
opts['batch_wait'] = int(kwargs['batch_wait'])

for key, val in six.iteritems(parent_opts):
if key not in opts:
opts[key] = val

return opts


def batch_get_eauth(kwargs):
eauth = {}
if 'eauth' in kwargs:
eauth['eauth'] = kwargs.pop('eauth')
if 'username' in kwargs:
eauth['username'] = kwargs.pop('username')
if 'password' in kwargs:
eauth['password'] = kwargs.pop('password')
if 'token' in kwargs:
eauth['token'] = kwargs.pop('token')
return eauth


class Batch(object):
'''
Manage the execution of batch runs
Expand Down Expand Up @@ -80,23 +153,7 @@ def __gather_minions(self):
return (list(fret), ping_gen, nret.difference(fret))

def get_bnum(self):
'''
Return the active number of minions to maintain
'''
partition = lambda x: float(x) / 100.0 * len(self.minions)
try:
if isinstance(self.opts['batch'], six.string_types) and '%' in self.opts['batch']:
res = partition(float(self.opts['batch'].strip('%')))
if res < 1:
return int(math.ceil(res))
else:
return int(res)
else:
return int(self.opts['batch'])
except ValueError:
if not self.quiet:
salt.utils.stringutils.print_cli('Invalid batch data sent: {0}\nData must be in the '
'form of %10, 10% or 3'.format(self.opts['batch']))
return get_bnum(self.opts, self.minions, self.quiet)

def __update_wait(self, wait):
now = datetime.now()
Expand Down
228 changes: 228 additions & 0 deletions salt/cli/batch_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# -*- coding: utf-8 -*-
'''
Execute a job on the targeted minions by using a moving window of fixed size `batch`.
'''

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import tornado

# Import salt libs
import salt.client

# pylint: enable=import-error,no-name-in-module,redefined-builtin
import logging
import fnmatch

log = logging.getLogger(__name__)

from salt.cli.batch import get_bnum, batch_get_opts, batch_get_eauth


class BatchAsync(object):
'''
Run a job on the targeted minions by using a moving window of fixed size `batch`.

``BatchAsync`` is used to execute a job on the targeted minions by keeping
the number of concurrent running minions to the size of `batch` parameter.

The control parameters are:
- batch: number/percentage of concurrent running minions
- batch_delay: minimum wait time between batches
- batch_presence_ping_timeout: time to wait for presence pings before starting the batch
- gather_job_timeout: `find_job` timeout
- timeout: time to wait before firing a `find_job`

When the batch stars, a `start` event is fired:
- tag: salt/batch/<batch-jid>/start
- data: {
"available_minions": self.minions,
"down_minions": self.down_minions
}

When the batch ends, an `done` event is fired:
- tag: salt/batch/<batch-jid>/done
- data: {
"available_minions": self.minions,
"down_minions": self.down_minions,
"done_minions": self.done_minions,
"timedout_minions": self.timedout_minions
}
'''
def __init__(self, parent_opts, jid_gen, clear_load):
ioloop = tornado.ioloop.IOLoop.current()
self.local = salt.client.get_local_client(parent_opts['conf_file'])
if 'gather_job_timeout' in clear_load['kwargs']:
clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout')
else:
clear_load['gather_job_timeout'] = self.local.opts['gather_job_timeout']
self.batch_presence_ping_timeout = clear_load['kwargs'].get('batch_presence_ping_timeout', None)
self.batch_delay = clear_load['kwargs'].get('batch_delay', 1)
self.opts = batch_get_opts(
clear_load.pop('tgt'),
clear_load.pop('fun'),
clear_load['kwargs'].pop('batch'),
self.local.opts,
**clear_load)
self.eauth = batch_get_eauth(clear_load['kwargs'])
self.metadata = clear_load['kwargs'].get('metadata', {})
self.minions = set()
self.down_minions = set()
self.timedout_minions = set()
self.done_minions = set()
self.active = set()
self.initialized = False
self.ping_jid = jid_gen()
self.batch_jid = jid_gen()
self.find_job_jid = jid_gen()
self.find_job_returned = set()
self.event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=True,
io_loop=ioloop,
keep_loop=True)

def __set_event_handler(self):
ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid)
batch_return_pattern = 'salt/job/{0}/ret/*'.format(self.batch_jid)
find_job_return_pattern = 'salt/job/{0}/ret/*'.format(self.find_job_jid)
self.event.subscribe(ping_return_pattern, match_type='glob')
self.event.subscribe(batch_return_pattern, match_type='glob')
self.event.subscribe(find_job_return_pattern, match_type='glob')
self.event.patterns = {
(ping_return_pattern, 'ping_return'),
(batch_return_pattern, 'batch_run'),
(find_job_return_pattern, 'find_job_return')
}
self.event.set_event_handler(self.__event_handler)

def __event_handler(self, raw):
if not self.event:
return
mtag, data = self.event.unpack(raw, self.event.serial)
for (pattern, op) in self.event.patterns:
if fnmatch.fnmatch(mtag, pattern):
minion = data['id']
if op == 'ping_return':
self.minions.add(minion)
self.down_minions.remove(minion)
if not self.down_minions:
self.event.io_loop.spawn_callback(self.start_batch)
elif op == 'find_job_return':
self.find_job_returned.add(minion)
elif op == 'batch_run':
if minion in self.active:
self.active.remove(minion)
self.done_minions.add(minion)
# call later so that we maybe gather more returns
self.event.io_loop.call_later(self.batch_delay, self.schedule_next)

if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions):
self.end_batch()

def _get_next(self):
to_run = self.minions.difference(
self.done_minions).difference(
self.active).difference(
self.timedout_minions)
next_batch_size = min(
len(to_run), # partial batch (all left)
self.batch_size - len(self.active) # full batch or available slots
)
return set(list(to_run)[:next_batch_size])

@tornado.gen.coroutine
def check_find_job(self, minions):
did_not_return = minions.difference(self.find_job_returned)
if did_not_return:
for minion in did_not_return:
if minion in self.find_job_returned:
self.find_job_returned.remove(minion)
if minion in self.active:
self.active.remove(minion)
self.timedout_minions.add(minion)
running = minions.difference(did_not_return).difference(self.done_minions).difference(self.timedout_minions)
if running:
self.event.io_loop.add_callback(self.find_job, running)

@tornado.gen.coroutine
def find_job(self, minions):
not_done = minions.difference(self.done_minions)
ping_return = yield self.local.run_job_async(
not_done,
'saltutil.find_job',
[self.batch_jid],
'list',
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.find_job_jid,
**self.eauth)
self.event.io_loop.call_later(
self.opts['gather_job_timeout'],
self.check_find_job,
not_done)

@tornado.gen.coroutine
def start(self):
self.__set_event_handler()
#start batching even if not all minions respond to ping
self.event.io_loop.call_later(
self.batch_presence_ping_timeout or self.opts['gather_job_timeout'],
self.start_batch)
ping_return = yield self.local.run_job_async(
self.opts['tgt'],
'test.ping',
[],
self.opts.get(
'selected_target_option',
self.opts.get('tgt_type', 'glob')
),
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.ping_jid,
metadata=self.metadata,
**self.eauth)
self.down_minions = set(ping_return['minions'])

@tornado.gen.coroutine
def start_batch(self):
if not self.initialized:
self.batch_size = get_bnum(self.opts, self.minions, True)
self.initialized = True
data = {
"available_minions": self.minions,
"down_minions": self.down_minions,
"metadata": self.metadata
}
self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid))
yield self.schedule_next()

def end_batch(self):
data = {
"available_minions": self.minions,
"down_minions": self.down_minions,
"done_minions": self.done_minions,
"timedout_minions": self.timedout_minions,
"metadata": self.metadata
}
self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid))
self.event.remove_event_handler(self.__event_handler)

@tornado.gen.coroutine
def schedule_next(self):
next_batch = self._get_next()
if next_batch:
yield self.local.run_job_async(
next_batch,
self.opts['fun'],
self.opts['arg'],
'list',
raw=self.opts.get('raw', False),
ret=self.opts.get('return', ''),
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.batch_jid,
metadata=self.metadata,
**self.eauth)
self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch))
self.active = self.active.union(next_batch)
41 changes: 5 additions & 36 deletions salt/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,45 +529,14 @@ def cmd_batch(
{'dave': {...}}
{'stewart': {...}}
'''
# We need to re-import salt.utils.args here
# even though it has already been imported.
# when cmd_batch is called via the NetAPI
# the module is unavailable.
import salt.utils.args

# Late import - not used anywhere else in this file
import salt.cli.batch
opts = salt.cli.batch.batch_get_opts(
tgt, fun, batch, self.opts,
arg=arg, tgt_type=tgt_type, ret=ret, kwarg=kwarg, **kwargs)

eauth = salt.cli.batch.batch_get_eauth(kwargs)

arg = salt.utils.args.condition_input(arg, kwarg)
opts = {'tgt': tgt,
'fun': fun,
'arg': arg,
'tgt_type': tgt_type,
'ret': ret,
'batch': batch,
'failhard': kwargs.get('failhard', self.opts.get('failhard', False)),
'raw': kwargs.get('raw', False)}

if 'timeout' in kwargs:
opts['timeout'] = kwargs['timeout']
if 'gather_job_timeout' in kwargs:
opts['gather_job_timeout'] = kwargs['gather_job_timeout']
if 'batch_wait' in kwargs:
opts['batch_wait'] = int(kwargs['batch_wait'])

eauth = {}
if 'eauth' in kwargs:
eauth['eauth'] = kwargs.pop('eauth')
if 'username' in kwargs:
eauth['username'] = kwargs.pop('username')
if 'password' in kwargs:
eauth['password'] = kwargs.pop('password')
if 'token' in kwargs:
eauth['token'] = kwargs.pop('token')

for key, val in six.iteritems(self.opts):
if key not in opts:
opts[key] = val
batch = salt.cli.batch.Batch(opts, eauth=eauth, quiet=True)
for ret in batch.run():
yield ret
Expand Down
Loading