Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async batch #50546

Merged
merged 29 commits into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion salt/auth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
'metadata',
'print_event',
'raw',
'yield_pub_data'
'yield_pub_data',
'batch',
'batch_delay'
])


Expand Down
91 changes: 74 additions & 17 deletions salt/cli/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,79 @@
log = logging.getLogger(__name__)


def get_bnum(opts, minions, quiet):
'''
Return the active number of minions to maintain
'''
partition = lambda x: float(x) / 100.0 * len(minions)
try:
if '%' in opts['batch']:
res = partition(float(opts['batch'].strip('%')))
if res < 1:
return int(math.ceil(res))
else:
return int(res)
else:
return int(opts['batch'])
except ValueError:
if not quiet:
salt.utils.stringutils.print_cli('Invalid batch data sent: {0}\nData must be in the '
'form of %10, 10% or 3'.format(opts['batch']))


def batch_get_opts(
tgt,
fun,
batch,
parent_opts,
arg=(),
tgt_type='glob',
ret='',
kwarg=None,
**kwargs):
# We need to re-import salt.utils.args here
# even though it has already been imported.
# when cmd_batch is called via the NetAPI
# the module is unavailable.
import salt.utils.args

arg = salt.utils.args.condition_input(arg, kwarg)
opts = {'tgt': tgt,
'fun': fun,
'arg': arg,
'tgt_type': tgt_type,
'ret': ret,
'batch': batch,
'failhard': kwargs.get('failhard', False),
'raw': kwargs.get('raw', False)}

if 'timeout' in kwargs:
opts['timeout'] = kwargs['timeout']
if 'gather_job_timeout' in kwargs:
opts['gather_job_timeout'] = kwargs['gather_job_timeout']
if 'batch_wait' in kwargs:
opts['batch_wait'] = int(kwargs['batch_wait'])

for key, val in six.iteritems(parent_opts):
if key not in opts:
opts[key] = val

return opts


def batch_get_eauth(kwargs):
eauth = {}
if 'eauth' in kwargs:
eauth['eauth'] = kwargs.pop('eauth')
if 'username' in kwargs:
eauth['username'] = kwargs.pop('username')
if 'password' in kwargs:
eauth['password'] = kwargs.pop('password')
if 'token' in kwargs:
eauth['token'] = kwargs.pop('token')
return eauth


class Batch(object):
'''
Manage the execution of batch runs
Expand Down Expand Up @@ -80,23 +153,7 @@ def __gather_minions(self):
return (list(fret), ping_gen, nret.difference(fret))

def get_bnum(self):
'''
Return the active number of minions to maintain
'''
partition = lambda x: float(x) / 100.0 * len(self.minions)
try:
if '%' in self.opts['batch']:
res = partition(float(self.opts['batch'].strip('%')))
if res < 1:
return int(math.ceil(res))
else:
return int(res)
else:
return int(self.opts['batch'])
except ValueError:
if not self.quiet:
salt.utils.stringutils.print_cli('Invalid batch data sent: {0}\nData must be in the '
'form of %10, 10% or 3'.format(self.opts['batch']))
return get_bnum(self.opts, self.minions, self.quiet)

def __update_wait(self, wait):
now = datetime.now()
Expand Down
224 changes: 224 additions & 0 deletions salt/cli/batch_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# -*- coding: utf-8 -*-
'''
Execute a job on the targeted minions by using a moving window of fixed size `batch`.
'''

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
dincamihai marked this conversation as resolved.
Show resolved Hide resolved
import tornado

# Import salt libs
import salt.client

# pylint: enable=import-error,no-name-in-module,redefined-builtin
import logging
import fnmatch

thatch45 marked this conversation as resolved.
Show resolved Hide resolved
log = logging.getLogger(__name__)

from salt.cli.batch import get_bnum, batch_get_opts, batch_get_eauth


class BatchAsync(object):
'''
Run a job on the targeted minions by using a moving window of fixed size `batch`.

``BatchAsync`` is used to execute a job on the targeted minions by keeping
the number of concurrent running minions to the size of `batch` parameter.

The control parameters are:
- batch: number/percentage of concurrent running minions
- batch_delay: minimum wait time between batches
- gather_job_timeout: `find_job` timeout
- timeout: time to wait before firing a `find_job`

When the batch stars, a `start` event is fired:
- tag: salt/batch/<batch-jid>/start
- data: {
"available_minions": self.minions,
"down_minions": self.down_minions
}

When the batch ends, an `done` event is fired:
- tag: salt/batch/<batch-jid>/done
- data: {
"available_minions": self.minions,
"down_minions": self.down_minions,
"done_minions": self.done_minions,
"timedout_minions": self.timedout_minions
}
'''
def __init__(self, parent_opts, jid_gen, clear_load):
ioloop = tornado.ioloop.IOLoop.current()
self.local = salt.client.get_local_client(parent_opts['conf_file'])
if 'gather_job_timeout' in clear_load['kwargs']:
clear_load['gather_job_timeout'] = clear_load['kwargs'].pop('gather_job_timeout')
else:
clear_load['gather_job_timeout'] = self.local.opts['gather_job_timeout']
self.batch_delay = clear_load['kwargs'].get('batch_delay', 1)
self.opts = batch_get_opts(
clear_load.pop('tgt'),
clear_load.pop('fun'),
clear_load['kwargs'].pop('batch'),
self.local.opts,
**clear_load)
self.eauth = batch_get_eauth(clear_load['kwargs'])
self.metadata = clear_load['kwargs'].get('metadata', {})
self.minions = set()
self.down_minions = set()
self.timedout_minions = set()
self.done_minions = set()
self.active = set()
self.initialized = False
self.ping_jid = jid_gen()
self.batch_jid = jid_gen()
self.find_job_jid = jid_gen()
self.find_job_returned = set()
self.event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=True,
io_loop=ioloop,
keep_loop=True)

def __set_event_handler(self):
ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid)
batch_return_pattern = 'salt/job/{0}/ret/*'.format(self.batch_jid)
find_job_return_pattern = 'salt/job/{0}/ret/*'.format(self.find_job_jid)
self.event.subscribe(ping_return_pattern, match_type='glob')
self.event.subscribe(batch_return_pattern, match_type='glob')
self.event.subscribe(find_job_return_pattern, match_type='glob')
self.event.patterns = {
(ping_return_pattern, 'ping_return'),
(batch_return_pattern, 'batch_run'),
(find_job_return_pattern, 'find_job_return')
}
self.event.set_event_handler(self.__event_handler)

def __event_handler(self, raw):
if not self.event:
return
mtag, data = self.event.unpack(raw, self.event.serial)
for (pattern, op) in self.event.patterns:
if fnmatch.fnmatch(mtag, pattern):
minion = data['id']
if op == 'ping_return':
self.minions.add(minion)
self.down_minions.remove(minion)
if not self.down_minions:
self.event.io_loop.spawn_callback(self.start_batch)
elif op == 'find_job_return':
self.find_job_returned.add(minion)
elif op == 'batch_run':
if minion in self.active:
self.active.remove(minion)
self.done_minions.add(minion)
# call later so that we maybe gather more returns
self.event.io_loop.call_later(self.batch_delay, self.schedule_next)

if self.initialized and self.done_minions == self.minions.difference(self.timedout_minions):
self.end_batch()

def _get_next(self):
to_run = self.minions.difference(
self.done_minions).difference(
self.active).difference(
self.timedout_minions)
next_batch_size = min(
len(to_run), # partial batch (all left)
self.batch_size - len(self.active) # full batch or available slots
)
return set(list(to_run)[:next_batch_size])

@tornado.gen.coroutine
def check_find_job(self, minions):
did_not_return = minions.difference(self.find_job_returned)
if did_not_return:
for minion in did_not_return:
if minion in self.find_job_returned:
self.find_job_returned.remove(minion)
if minion in self.active:
self.active.remove(minion)
self.timedout_minions.add(minion)
running = minions.difference(did_not_return).difference(self.done_minions).difference(self.timedout_minions)
if running:
self.event.io_loop.add_callback(self.find_job, running)

@tornado.gen.coroutine
def find_job(self, minions):
not_done = minions.difference(self.done_minions)
ping_return = yield self.local.run_job_async(
not_done,
'saltutil.find_job',
[self.batch_jid],
'list',
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.find_job_jid,
**self.eauth)
self.event.io_loop.call_later(
self.opts['gather_job_timeout'],
self.check_find_job,
not_done)

@tornado.gen.coroutine
def start(self):
self.__set_event_handler()
#start batching even if not all minions respond to ping
self.event.io_loop.call_later(
self.opts['gather_job_timeout'], self.start_batch)
ping_return = yield self.local.run_job_async(
self.opts['tgt'],
'test.ping',
[],
self.opts.get(
'selected_target_option',
self.opts.get('tgt_type', 'glob')
),
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.ping_jid,
metadata=self.metadata,
**self.eauth)
self.down_minions = set(ping_return['minions'])

@tornado.gen.coroutine
def start_batch(self):
if not self.initialized:
self.batch_size = get_bnum(self.opts, self.minions, True)
self.initialized = True
data = {
"available_minions": self.minions,
"down_minions": self.down_minions,
"metadata": self.metadata
}
self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid))
yield self.schedule_next()

def end_batch(self):
data = {
"available_minions": self.minions,
"down_minions": self.down_minions,
"done_minions": self.done_minions,
"timedout_minions": self.timedout_minions,
"metadata": self.metadata
}
self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid))
self.event.remove_event_handler(self.__event_handler)

@tornado.gen.coroutine
def schedule_next(self):
next_batch = self._get_next()
if next_batch:
yield self.local.run_job_async(
next_batch,
self.opts['fun'],
self.opts['arg'],
'list',
raw=self.opts.get('raw', False),
ret=self.opts.get('return', ''),
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.batch_jid,
metadata=self.metadata)
self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch))
self.active = self.active.union(next_batch)
Loading