Skip to content

Commit

Permalink
(t) replication spawn error - continued rockstor#2766
Browse files Browse the repository at this point in the history
- complete fstrings conversion for all files modified.
  • Loading branch information
phillxnet committed Dec 21, 2023
1 parent bd9ddfd commit cd8e523
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 209 deletions.
135 changes: 53 additions & 82 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,12 @@ def _delete_receivers(self):
if r in self.remote_senders:
del self.remote_senders[r]
logger.debug(
"Receiver(%s) exited. exitcode: %s. Total "
"messages processed: %d. Removing from the list."
% (r, ecode, msg_count)
f"Receiver({r}) exited. exitcode: {ecode}. Total messages processed: {msg_count}. "
"Removing from the list."
)
else:
active_msgs.append(
"Active Receiver: %s. Total messages "
"processed: %d" % (r, msg_count)
f"Active Receiver: {r}. Total messages processed: {msg_count}"
)
for m in active_msgs:
logger.debug(m)
Expand All @@ -92,44 +90,38 @@ def _get_receiver_ip(self, replica):
appliance = Appliance.objects.get(uuid=replica.appliance)
return appliance.ip
except Exception as e:
msg = (
"Failed to get receiver ip. Is the receiver "
"appliance added?. Exception: %s" % e.__str__()
)
msg = f"Failed to get receiver ip. Is the receiver appliance added?. Exception: {e.__str__()}"
logger.error(msg)
raise Exception(msg)

def _process_send(self, replica):
sender_key = "%s_%s" % (self.uuid, replica.id)
sender_key = f"{self.uuid}_{replica.id}"
if sender_key in self.senders:
# If the sender exited but hasn't been removed from the dict,
# remove and proceed.
# If the sender exited but hasn't been removed from the dict, remove and proceed.
ecode = self.senders[sender_key].exitcode
if ecode is not None:
del self.senders[sender_key]
logger.debug(
"Sender(%s) exited. exitcode: %s. Forcing "
"removal." % (sender_key, ecode)
f"Sender({sender_key}) exited. Exitcode: {ecode}. Forcing removal."
)
else:
raise Exception(
"There is live sender for(%s). Will not start "
"a new one." % sender_key
f"There is live sender for({sender_key}). Will not start a new one."
)

receiver_ip = self._get_receiver_ip(replica)
rt_qs = ReplicaTrail.objects.filter(replica=replica).order_by("-id")
last_rt = rt_qs[0] if (len(rt_qs) > 0) else None
if last_rt is None:
logger.debug("Starting a new Sender(%s)." % sender_key)
logger.debug(f"Starting a new Sender({sender_key}).")
self.senders[sender_key] = Sender(self.uuid, receiver_ip, replica)
elif last_rt.status == "succeeded":
logger.debug("Starting a new Sender(%s)" % sender_key)
logger.debug(f"Starting a new Sender({sender_key}).")
self.senders[sender_key] = Sender(self.uuid, receiver_ip, replica, last_rt)
elif last_rt.status == "pending":
msg = (
"Replica trail shows a pending Sender(%s), but it is not "
"alive. Marking it as failed. Will not start a new one." % sender_key
f"Replica trail shows a pending Sender({sender_key}), but it is not alive. "
"Marking it as failed. Will not start a new one."
)
logger.error(msg)
data = {
Expand All @@ -151,37 +143,34 @@ def _process_send(self, replica):
num_tries = num_tries + 1
if num_tries >= self.MAX_ATTEMPTS:
msg = (
"Maximum attempts(%d) reached for Sender(%s). "
"A new one "
"will not be started and the Replica task will be "
"disabled." % (self.MAX_ATTEMPTS, sender_key)
f"Maximum attempts({self.MAX_ATTEMPTS}) reached for Sender({sender_key}). "
"A new one will not be started and the Replica task will be disabled."
)
logger.error(msg)
self.disable_replica(replica.id)
raise Exception(msg)

logger.debug(
"previous backup failed for Sender(%s). "
"Starting a new one. Attempt %d/%d."
% (sender_key, num_tries, self.MAX_ATTEMPTS)
f"previous backup failed for Sender({sender_key}). "
f"Starting a new one. Attempt {num_tries}/{self.MAX_ATTEMPTS}."
)
try:
last_success_rt = ReplicaTrail.objects.filter(
replica=replica, status="succeeded"
).latest("id")
except ReplicaTrail.DoesNotExist:
logger.debug(
"No record of last successful ReplicaTrail for "
"Sender(%s). Will start a new Full Sender." % sender_key
f"No record of last successful ReplicaTrail for Sender({sender_key}). "
f"Will start a new Full Sender."
)
last_success_rt = None
self.senders[sender_key] = Sender(
self.uuid, receiver_ip, replica, last_success_rt
)
else:
msg = (
"Unexpected ReplicaTrail status(%s) for Sender(%s). "
"Will not start a new one." % (last_rt.status, sender_key)
f"Unexpected ReplicaTrail status({last_rt.status}) for Sender({sender_key}). "
f"Will not start a new one."
)
raise Exception(msg)

Expand All @@ -201,28 +190,23 @@ def run(self):
except NetworkConnection.DoesNotExist:
self.listener_interface = "0.0.0.0"
except Exception as e:
msg = (
"Failed to fetch network interface for Listner/Broker. "
"Exception: %s" % e.__str__()
)
msg = f"Failed to fetch network interface for Listener/Broker. Exception: {e.__str__()}"
return logger.error(msg)

try:
self.uuid = Appliance.objects.get(current_appliance=True).uuid
except Exception as e:
msg = (
"Failed to get uuid of current appliance. Aborting. "
"Exception: %s" % e.__str__()
)
msg = f"Failed to get uuid of current appliance. Aborting. Exception: {e.__str__()}"
return logger.error(msg)

ctx = zmq.Context()
frontend = ctx.socket(zmq.ROUTER)
frontend.set_hwm(value=10)
frontend.bind("tcp://%s:%d" % (self.listener_interface, self.listener_port))
frontend.bind(f"tcp://{self.listener_interface}:{ self.listener_port}")

backend = ctx.socket(zmq.ROUTER)
backend.bind("ipc://%s" % settings.REPLICATION.get("ipc_socket"))
ipc_socket = settings.REPLICATION.get("ipc_socket")
backend.bind(f"ipc://{ipc_socket}")

poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
Expand All @@ -247,11 +231,10 @@ def run(self):
msg_count = 0
for rs, count in self.remote_senders.items():
logger.debug(
"Active Receiver: %s. Messages processed:"
"%d" % (rs, count)
f"Active Receiver: {rs}. Messages processed: {count}"
)
if command == b"sender-ready":
logger.debug("initial greeting from %s" % address)
logger.debug(f"initial greeting from {address}")
# Start a new receiver and send the appropriate response
try:
start_nr = True
Expand All @@ -261,38 +244,36 @@ def run(self):
if ecode is not None:
del self.local_receivers[address]
logger.debug(
"Receiver(%s) exited. exitcode: "
"%s. Forcing removal from broker "
"list." % (address, ecode)
f"Receiver({address}) exited. exitcode: {ecode}. Forcing removal from broker list."
)
start_nr = True
else:
msg = (
"Receiver(%s) already exists. "
"Will not start a new one." % address
)
msg = f"Receiver({address}) already exists. Will not start a new one."
logger.error(msg)
# @todo: There may be a different way to handle
# this. For example, we can pass the message to
# the active receiver and factor into it's
# retry/robust logic. But that is for later.
# TODO: There may be a different way to handle
# this. For example, we can pass the message to
# the active receiver and factor into its
# retry/robust logic. But that is for later.
frontend.send_multipart(
[address, b"receiver-init-error", msg.encode("utf-8")]
[
address,
b"receiver-init-error",
msg.encode("utf-8"),
]
)
if start_nr:
nr = Receiver(address, msg)
nr.daemon = True
nr.start()
logger.debug("New Receiver(%s) started." % address)
logger.debug(f"New Receiver({address}) started.")
self.local_receivers[address] = nr
continue
except Exception as e:
msg = (
"Exception while starting the "
"new receiver for %s: %s" % (address, e.__str__())
)
msg = f"Exception while starting the new receiver for {address}: {e.__str__()}"
logger.error(msg)
frontend.send_multipart([address, b"receiver-init-error", msg.encode("utf-8")])
frontend.send_multipart(
[address, b"receiver-init-error", msg.encode("utf-8")]
)
else:
# do we hit hwm? is the dealer still connected?
backend.send_multipart([address, command, msg.encode("utf-8")])
Expand All @@ -301,38 +282,28 @@ def run(self):
address, command, msg = backend.recv_multipart()
if command == b"new-send":
rid = int(msg)
logger.debug("new-send request received for %d" % rid)
logger.debug(f"new-send request received for {rid}")
rcommand = b"ERROR"
try:
replica = Replica.objects.get(id=rid)
if replica.enabled:
self._process_send(replica)
msg = (
"A new Sender started successfully for "
"Replication Task(%d)." % rid
)
msg = f"A new Sender started successfully for Replication Task({rid})."
rcommand = b"SUCCESS"
else:
msg = (
"Failed to start a new Sender for "
"Replication "
"Task(%d) because it is disabled." % rid
)
msg = f"Failed to start a new Sender for Replication Task({rid}) because it is disabled."
except Exception as e:
msg = (
"Failed to start a new Sender for Replication "
"Task(%d). Exception: %s" % (rid, e.__str__())
)
msg = f"Failed to start a new Sender for Replication Task({rid}). Exception: {e.__str__()}"
logger.error(msg)
finally:
backend.send_multipart([address, rcommand, msg.encode("utf-8")])
elif address in self.remote_senders:
if command in (
b"receiver-ready",
b"receiver-error",
b"btrfs-recv-finished",
): # noqa E501
logger.debug("Identitiy: %s command: %s" % (address, command))
if (
command == b"receiver-ready"
or command == b"receiver-error"
or command == b"btrfs-recv-finished"
):
logger.debug(f"Identity: {address} command: {command}")
backend.send_multipart([address, b"ACK", b""])
# a new receiver has started. reply to the sender that
# must be waiting
Expand Down
Loading

0 comments on commit cd8e523

Please sign in to comment.