diff --git a/changelog/64102.fixed.md b/changelog/64102.fixed.md new file mode 100644 index 000000000000..09d14ab16cb5 --- /dev/null +++ b/changelog/64102.fixed.md @@ -0,0 +1,3 @@ +Update all the scheduler functions to include a fire_event argument which will determine whether to fire the completion event onto the event bus. +This event is only used when these functions are called via the schedule execution modules. +Update all the calls to the schedule related functions in the deltaproxy proxy minion to include fire_event=False, as the event bus is not available when these functions are called. diff --git a/changelog/64103.fixed.md b/changelog/64103.fixed.md new file mode 100644 index 000000000000..09d14ab16cb5 --- /dev/null +++ b/changelog/64103.fixed.md @@ -0,0 +1,3 @@ +Update all the scheduler functions to include a fire_event argument which will determine whether to fire the completion event onto the event bus. +This event is only used when these functions are called via the schedule execution modules. +Update all the calls to the schedule related functions in the deltaproxy proxy minion to include fire_event=False, as the event bus is not available when these functions are called. diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index c3003b368f7b..d866d6f4c1de 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -231,10 +231,11 @@ def post_master_init(self, master): } }, persist=True, + fire_event=False, ) log.info("Added mine.update to scheduler") else: - self.schedule.delete_job("__mine_interval", persist=True) + self.schedule.delete_job("__mine_interval", persist=True, fire_event=False) # add master_alive job if enabled if self.opts["transport"] != "tcp" and self.opts["master_alive_interval"] > 0: @@ -250,6 +251,7 @@ def post_master_init(self, master): } }, persist=True, + fire_event=False, ) if ( self.opts["master_failback"] @@ -268,18 +270,24 @@ def post_master_init(self, master): } }, persist=True, + fire_event=False, ) else: self.schedule.delete_job( - salt.minion.master_event(type="failback"), persist=True + salt.minion.master_event(type="failback"), + persist=True, + fire_event=False, ) else: self.schedule.delete_job( salt.minion.master_event(type="alive", master=self.opts["master"]), persist=True, + fire_event=False, ) self.schedule.delete_job( - salt.minion.master_event(type="failback"), persist=True + salt.minion.master_event(type="failback"), + persist=True, + fire_event=False, ) # proxy keepalive @@ -304,10 +312,15 @@ def post_master_init(self, master): } }, persist=True, + fire_event=False, ) - self.schedule.enable_schedule() + self.schedule.enable_schedule(fire_event=False) else: - self.schedule.delete_job("__proxy_keepalive", persist=True) + self.schedule.delete_job( + "__proxy_keepalive", + persist=True, + fire_event=False, + ) # Sync the grains here so the proxy can communicate them to the master self.functions["saltutil.sync_grains"](saltenv="base") @@ -321,10 +334,11 @@ def post_master_init(self, master): self.proxy_context = {} self.add_periodic_callback("cleanup", self.cleanup_subprocesses) + _failed = list() if self.opts["proxy"].get("parallel_startup"): log.debug("Initiating parallel startup for proxies") with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ + futures = { executor.submit( subproxy_post_master_init, _id, @@ -332,12 +346,22 @@ def post_master_init(self, master): self.opts, self.proxy, self.utils, - ) + ): _id for _id in self.opts["proxy"].get("ids", []) - ] + } - for f in concurrent.futures.as_completed(futures): - sub_proxy_data = f.result() + for future in concurrent.futures.as_completed(futures): + try: + sub_proxy_data = future.result() + except Exception as exc: # pylint: disable=broad-except + _id = futures[future] + log.info( + "An exception occured during initialization for %s, skipping: %s", + _id, + exc, + ) + _failed.append(_id) + continue minion_id = sub_proxy_data["proxy_opts"].get("id") if sub_proxy_data["proxy_minion"]: @@ -347,16 +371,24 @@ def post_master_init(self, master): if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: self.deltaproxy_objs[ minion_id - ].req_channel = salt.transport.client.AsyncReqChannel.factory( + ].req_channel = salt.channel.client.AsyncReqChannel.factory( sub_proxy_data["proxy_opts"], io_loop=self.io_loop ) else: log.debug("Initiating non-parallel startup for proxies") for _id in self.opts["proxy"].get("ids", []): - sub_proxy_data = subproxy_post_master_init( - _id, uid, self.opts, self.proxy, self.utils - ) - + try: + sub_proxy_data = subproxy_post_master_init( + _id, uid, self.opts, self.proxy, self.utils + ) + except Exception as exc: # pylint: disable=broad-except + log.info( + "An exception occured during initialization for %s, skipping: %s", + _id, + exc, + ) + _failed.append(_id) + continue minion_id = sub_proxy_data["proxy_opts"].get("id") if sub_proxy_data["proxy_minion"]: @@ -366,10 +398,12 @@ def post_master_init(self, master): if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: self.deltaproxy_objs[ minion_id - ].req_channel = salt.transport.client.AsyncReqChannel.factory( + ].req_channel = salt.channel.client.AsyncReqChannel.factory( sub_proxy_data["proxy_opts"], io_loop=self.io_loop ) + if _failed: + log.info("Following sub proxies failed %s", _failed) self.ready = True @@ -535,10 +569,13 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): } }, persist=True, + fire_event=False, ) - _proxy_minion.schedule.enable_schedule() + _proxy_minion.schedule.enable_schedule(fire_event=False) else: - _proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True) + _proxy_minion.schedule.delete_job( + "__proxy_keepalive", persist=True, fire_event=False + ) return {"proxy_minion": _proxy_minion, "proxy_opts": proxyopts} diff --git a/salt/utils/schedule.py b/salt/utils/schedule.py index 814c2980d4af..6565dda59e6e 100644 --- a/salt/utils/schedule.py +++ b/salt/utils/schedule.py @@ -315,7 +315,7 @@ def persist(self): exc_info_on_loglevel=logging.DEBUG, ) - def delete_job(self, name, persist=True): + def delete_job(self, name, persist=True, fire_event=True): """ Deletes a job from the scheduler. Ignore jobs from pillar """ @@ -325,12 +325,15 @@ def delete_job(self, name, persist=True): elif name in self._get_schedule(include_opts=False): log.warning("Cannot delete job %s, it's in the pillar!", name) - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_delete_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_delete_complete", + ) # remove from self.intervals if name in self.intervals: @@ -349,7 +352,7 @@ def reset(self): self.splay = None self.opts["schedule"] = {} - def delete_job_prefix(self, name, persist=True): + def delete_job_prefix(self, name, persist=True, fire_event=True): """ Deletes a job from the scheduler. Ignores jobs from pillar """ @@ -361,12 +364,15 @@ def delete_job_prefix(self, name, persist=True): if job.startswith(name): log.warning("Cannot delete job %s, it's in the pillar!", job) - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_delete_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_delete_complete", + ) # remove from self.intervals for job in list(self.intervals.keys()): @@ -376,7 +382,7 @@ def delete_job_prefix(self, name, persist=True): if persist: self.persist() - def add_job(self, data, persist=True): + def add_job(self, data, persist=True, fire_event=True): """ Adds a new job to the scheduler. The format is the same as required in the configuration file. See the docs on how YAML is interpreted into @@ -410,16 +416,19 @@ def add_job(self, data, persist=True): self.opts["schedule"].update(data) # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_add_complete", - ) + if fire_event: + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_add_complete", + ) if persist: self.persist() - def enable_job(self, name, persist=True): + def enable_job(self, name, persist=True, fire_event=True): """ Enable a job in the scheduler. Ignores jobs from pillar """ @@ -430,17 +439,20 @@ def enable_job(self, name, persist=True): elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_enabled_job_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_enabled_job_complete", + ) if persist: self.persist() - def disable_job(self, name, persist=True): + def disable_job(self, name, persist=True, fire_event=True): """ Disable a job in the scheduler. Ignores jobs from pillar """ @@ -451,23 +463,26 @@ def disable_job(self, name, persist=True): elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - # Fire the complete event back along with updated list of schedule - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_disabled_job_complete", - ) + if fire_event: + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + # Fire the complete event back along with updated list of schedule + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_disabled_job_complete", + ) if persist: self.persist() - def modify_job(self, name, schedule, persist=True): + def modify_job(self, name, schedule, persist=True, fire_event=True): """ Modify a job in the scheduler. Ignores jobs from pillar """ # ensure job exists, then replace it if name in self.opts["schedule"]: - self.delete_job(name, persist) + self.delete_job(name, persist, fire_event) elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) return @@ -511,34 +526,40 @@ def run_job(self, name): log.info("Running Job: %s", name) self._run_job(func, data) - def enable_schedule(self, persist=True): + def enable_schedule(self, persist=True, fire_event=True): """ Enable the scheduler. """ self.opts["schedule"]["enabled"] = True - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_enabled_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_enabled_complete", + ) if persist: self.persist() - def disable_schedule(self, persist=True): + def disable_schedule(self, persist=True, fire_event=True): """ Disable the scheduler. """ self.opts["schedule"]["enabled"] = False - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_disabled_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_disabled_complete", + ) if persist: self.persist() @@ -554,7 +575,7 @@ def reload(self, schedule): schedule = schedule["schedule"] self.opts.setdefault("schedule", {}).update(schedule) - def list(self, where): + def list(self, where, fire_event=True): """ List the current schedule items """ @@ -565,24 +586,32 @@ def list(self, where): else: schedule = self._get_schedule() - # Fire the complete event back along with the list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": schedule}, - tag="/salt/minion/minion_schedule_list_complete", - ) + if fire_event: + # Fire the complete event back along with the list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": schedule}, + tag="/salt/minion/minion_schedule_list_complete", + ) - def save_schedule(self): + def save_schedule(self, fire_event=True): """ Save the current schedule """ self.persist() - # Fire the complete event back along with the list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event({"complete": True}, tag="/salt/minion/minion_schedule_saved") + if fire_event: + # Fire the complete event back along with the list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True}, tag="/salt/minion/minion_schedule_saved" + ) - def postpone_job(self, name, data): + def postpone_job(self, name, data, fire_event=True): """ Postpone a job in the scheduler. Ignores jobs from pillar @@ -608,14 +637,17 @@ def postpone_job(self, name, data): elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_postpone_job_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_postpone_job_complete", + ) - def skip_job(self, name, data): + def skip_job(self, name, data, fire_event=True): """ Skip a job at a specific time in the scheduler. Ignores jobs from pillar @@ -634,14 +666,17 @@ def skip_job(self, name, data): elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "schedule": self._get_schedule()}, - tag="/salt/minion/minion_schedule_skip_job_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "schedule": self._get_schedule()}, + tag="/salt/minion/minion_schedule_skip_job_complete", + ) - def get_next_fire_time(self, name, fmt="%Y-%m-%dT%H:%M:%S"): + def get_next_fire_time(self, name, fmt="%Y-%m-%dT%H:%M:%S", fire_event=True): """ Return the next fire time for the specified job """ @@ -653,12 +688,15 @@ def get_next_fire_time(self, name, fmt="%Y-%m-%dT%H:%M:%S"): if _next_fire_time: _next_fire_time = _next_fire_time.strftime(fmt) - # Fire the complete event back along with updated list of schedule - with salt.utils.event.get_event("minion", opts=self.opts, listen=False) as evt: - evt.fire_event( - {"complete": True, "next_fire_time": _next_fire_time}, - tag="/salt/minion/minion_schedule_next_fire_time_complete", - ) + if fire_event: + # Fire the complete event back along with updated list of schedule + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as evt: + evt.fire_event( + {"complete": True, "next_fire_time": _next_fire_time}, + tag="/salt/minion/minion_schedule_next_fire_time_complete", + ) def job_status(self, name, fire_event=False): """ diff --git a/tests/pytests/functional/cli/test_salt_deltaproxy.py b/tests/pytests/functional/cli/test_salt_deltaproxy.py new file mode 100644 index 000000000000..5bc7604c84ab --- /dev/null +++ b/tests/pytests/functional/cli/test_salt_deltaproxy.py @@ -0,0 +1,225 @@ +""" +:codeauthor: Gareth J. Greenaway (ggreenaway@vmware.com) +""" + +import logging +import os +import random + +import pytest +from saltfactories.utils import random_string + +import salt.defaults.exitcodes +from tests.support.helpers import PRE_PYTEST_SKIP_REASON + +log = logging.getLogger(__name__) + + +pytestmark = [ + pytest.mark.skip_on_spawning_platform( + reason="Deltaproxy minions do not currently work on spawning platforms.", + ), + pytest.mark.core_test, +] + + +@pytest.fixture(scope="package") +def salt_master(salt_factories): + config_defaults = { + "open_mode": True, + } + salt_master = salt_factories.salt_master_daemon( + "deltaproxy-functional-master", defaults=config_defaults + ) + with salt_master.started(): + yield salt_master + + +@pytest.fixture(scope="package") +def salt_cli(salt_master): + """ + The ``salt`` CLI as a fixture against the running master + """ + assert salt_master.is_running() + return salt_master.salt_cli(timeout=30) + + +@pytest.fixture(scope="package", autouse=True) +def skip_on_tcp_transport(request): + if request.config.getoption("--transport") == "tcp": + pytest.skip("Deltaproxy under the TPC transport is not working. See #61367") + + +@pytest.fixture +def proxy_minion_id(salt_master): + _proxy_minion_id = random_string("proxy-minion-") + + try: + yield _proxy_minion_id + finally: + # Remove stale key if it exists + pytest.helpers.remove_stale_minion_key(salt_master, _proxy_minion_id) + + +def clear_proxy_minions(salt_master, proxy_minion_id): + for proxy in [proxy_minion_id, "dummy_proxy_one", "dummy_proxy_two"]: + pytest.helpers.remove_stale_minion_key(salt_master, proxy) + + cachefile = os.path.join( + salt_master.config["cachedir"], "{}.cache".format(proxy) + ) + if os.path.exists(cachefile): + os.unlink(cachefile) + + +# Hangs on Windows. You can add a timeout to the proxy.run command, but then +# it just times out. +@pytest.mark.skip_on_windows(reason=PRE_PYTEST_SKIP_REASON) +@pytest.mark.parametrize( + "parallel_startup", + [True, False], + ids=["parallel_startup=True", "parallel_startup=False"], +) +def test_exit_status_correct_usage_large_number_of_minions( + salt_master, + salt_cli, + proxy_minion_id, + parallel_startup, +): + """ + Ensure the salt-proxy control proxy starts and + is able to respond to test.ping, additionally ensure that + the proxies being controlled also respond to test.ping. + + Finally ensure correct exit status when salt-proxy exits correctly. + + Skip on Windows because daemonization not supported + """ + + config_defaults = { + "metaproxy": "deltaproxy", + } + sub_proxies = [ + "proxy_one", + "proxy_two", + "proxy_three", + "proxy_four", + "proxy_five", + "proxy_six", + "proxy_seven", + "proxy_eight", + "proxy_nine", + "proxy_ten", + "proxy_eleven", + "proxy_twelve", + "proxy_thirteen", + "proxy_fourteen", + "proxy_fifteen", + "proxy_sixteen", + "proxy_seventeen", + "proxy_eighteen", + "proxy_nineteen", + "proxy_twenty", + "proxy_twenty_one", + "proxy_twenty_two", + "proxy_twenty_three", + "proxy_twenty_four", + "proxy_twenty_five", + "proxy_twenty_six", + "proxy_twenty_seven", + "proxy_twenty_eight", + "proxy_twenty_nine", + "proxy_thirty", + "proxy_thirty_one", + "proxy_thirty_two", + ] + + top_file = """ + base: + {control}: + - controlproxy + """.format( + control=proxy_minion_id, + ) + controlproxy_pillar_file = """ + proxy: + proxytype: deltaproxy + parallel_startup: {} + ids: + """.format( + parallel_startup + ) + + dummy_proxy_pillar_file = """ + proxy: + proxytype: dummy + """ + + for minion_id in sub_proxies: + top_file += """ + {minion_id}: + - dummy""".format( + minion_id=minion_id, + ) + + controlproxy_pillar_file += """ + - {} + """.format( + minion_id, + ) + + top_tempfile = salt_master.pillar_tree.base.temp_file("top.sls", top_file) + controlproxy_tempfile = salt_master.pillar_tree.base.temp_file( + "controlproxy.sls", controlproxy_pillar_file + ) + dummy_proxy_tempfile = salt_master.pillar_tree.base.temp_file( + "dummy.sls", + dummy_proxy_pillar_file, + ) + with top_tempfile, controlproxy_tempfile, dummy_proxy_tempfile: + with salt_master.started(): + assert salt_master.is_running() + + factory = salt_master.salt_proxy_minion_daemon( + proxy_minion_id, + defaults=config_defaults, + extra_cli_arguments_after_first_start_failure=["--log-level=info"], + start_timeout=240, + ) + + for minion_id in [proxy_minion_id] + sub_proxies: + factory.before_start( + pytest.helpers.remove_stale_proxy_minion_cache_file, + factory, + minion_id, + ) + factory.after_terminate( + pytest.helpers.remove_stale_minion_key, salt_master, minion_id + ) + factory.after_terminate( + pytest.helpers.remove_stale_proxy_minion_cache_file, + factory, + minion_id, + ) + + with factory.started(): + assert factory.is_running() + + # Let's issue a ping the control proxy + ret = salt_cli.run("test.ping", minion_tgt=proxy_minion_id) + assert ret.returncode == 0 + assert ret.data is True + + for minion_id in random.sample(sub_proxies, 4): + # Let's issue a ping to one of the controlled proxies + ret = salt_cli.run("test.ping", minion_tgt=minion_id) + assert ret.returncode == 0 + assert ret.data is True + + # Terminate the proxy minion + ret = factory.terminate() + assert ret.returncode == salt.defaults.exitcodes.EX_OK, ret + + # Terminate the salt master + ret = salt_master.terminate() + assert ret.returncode == salt.defaults.exitcodes.EX_OK, ret