Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3006.x] deltaproxy, scheduler and event bus fixes #64182

Merged
merged 13 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/64102.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Update all the scheduler functions to include a fire_event argument which will determine whether to fire the completion event onto the event bus.
This event is only used when these functions are called via the schedule execution modules.
Update all the calls to the schedule related functions in the deltaproxy proxy minion to include fire_event=False, as the event bus is not available when these functions are called.
3 changes: 3 additions & 0 deletions changelog/64103.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Update all the scheduler functions to include a fire_event argument which will determine whether to fire the completion event onto the event bus.
This event is only used when these functions are called via the schedule execution modules.
Update all the calls to the schedule related functions in the deltaproxy proxy minion to include fire_event=False, as the event bus is not available when these functions are called.
73 changes: 55 additions & 18 deletions salt/metaproxy/deltaproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,11 @@ def post_master_init(self, master):
}
},
persist=True,
fire_event=False,
)
log.info("Added mine.update to scheduler")
else:
self.schedule.delete_job("__mine_interval", persist=True)
self.schedule.delete_job("__mine_interval", persist=True, fire_event=False)

# add master_alive job if enabled
if self.opts["transport"] != "tcp" and self.opts["master_alive_interval"] > 0:
Expand All @@ -250,6 +251,7 @@ def post_master_init(self, master):
}
},
persist=True,
fire_event=False,
)
if (
self.opts["master_failback"]
Expand All @@ -268,18 +270,24 @@ def post_master_init(self, master):
}
},
persist=True,
fire_event=False,
)
else:
self.schedule.delete_job(
salt.minion.master_event(type="failback"), persist=True
salt.minion.master_event(type="failback"),
persist=True,
fire_event=False,
)
else:
self.schedule.delete_job(
salt.minion.master_event(type="alive", master=self.opts["master"]),
persist=True,
fire_event=False,
)
self.schedule.delete_job(
salt.minion.master_event(type="failback"), persist=True
salt.minion.master_event(type="failback"),
persist=True,
fire_event=False,
)

# proxy keepalive
Expand All @@ -304,10 +312,15 @@ def post_master_init(self, master):
}
},
persist=True,
fire_event=False,
)
self.schedule.enable_schedule()
self.schedule.enable_schedule(fire_event=False)
else:
self.schedule.delete_job("__proxy_keepalive", persist=True)
self.schedule.delete_job(
"__proxy_keepalive",
persist=True,
fire_event=False,
)

# Sync the grains here so the proxy can communicate them to the master
self.functions["saltutil.sync_grains"](saltenv="base")
Expand All @@ -321,23 +334,34 @@ def post_master_init(self, master):
self.proxy_context = {}
self.add_periodic_callback("cleanup", self.cleanup_subprocesses)

_failed = list()
if self.opts["proxy"].get("parallel_startup"):
log.debug("Initiating parallel startup for proxies")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
futures = {
executor.submit(
subproxy_post_master_init,
_id,
uid,
self.opts,
self.proxy,
self.utils,
)
): _id
for _id in self.opts["proxy"].get("ids", [])
]
}

for f in concurrent.futures.as_completed(futures):
sub_proxy_data = f.result()
for future in concurrent.futures.as_completed(futures):
try:
sub_proxy_data = future.result()
except Exception as exc: # pylint: disable=broad-except
_id = futures[future]
log.info(
"An exception occured during initialization for %s, skipping: %s",
_id,
exc,
)
_failed.append(_id)
continue
minion_id = sub_proxy_data["proxy_opts"].get("id")

if sub_proxy_data["proxy_minion"]:
Expand All @@ -347,16 +371,24 @@ def post_master_init(self, master):
if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]:
self.deltaproxy_objs[
minion_id
].req_channel = salt.transport.client.AsyncReqChannel.factory(
].req_channel = salt.channel.client.AsyncReqChannel.factory(
sub_proxy_data["proxy_opts"], io_loop=self.io_loop
)
else:
log.debug("Initiating non-parallel startup for proxies")
for _id in self.opts["proxy"].get("ids", []):
sub_proxy_data = subproxy_post_master_init(
_id, uid, self.opts, self.proxy, self.utils
)

try:
sub_proxy_data = subproxy_post_master_init(
_id, uid, self.opts, self.proxy, self.utils
)
except Exception as exc: # pylint: disable=broad-except
log.info(
"An exception occured during initialization for %s, skipping: %s",
_id,
exc,
)
_failed.append(_id)
continue
minion_id = sub_proxy_data["proxy_opts"].get("id")

if sub_proxy_data["proxy_minion"]:
Expand All @@ -366,10 +398,12 @@ def post_master_init(self, master):
if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]:
self.deltaproxy_objs[
minion_id
].req_channel = salt.transport.client.AsyncReqChannel.factory(
].req_channel = salt.channel.client.AsyncReqChannel.factory(
sub_proxy_data["proxy_opts"], io_loop=self.io_loop
)

if _failed:
log.info("Following sub proxies failed %s", _failed)
self.ready = True


Expand Down Expand Up @@ -535,10 +569,13 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils):
}
},
persist=True,
fire_event=False,
)
_proxy_minion.schedule.enable_schedule()
_proxy_minion.schedule.enable_schedule(fire_event=False)
else:
_proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True)
_proxy_minion.schedule.delete_job(
"__proxy_keepalive", persist=True, fire_event=False
)

return {"proxy_minion": _proxy_minion, "proxy_opts": proxyopts}

Expand Down
Loading