From cd8e5239f2032680d6d12f6b383097a2d279a8c9 Mon Sep 17 00:00:00 2001 From: Philip Guyton Date: Thu, 21 Dec 2023 18:55:25 +0000 Subject: [PATCH] (t) replication spawn error - continued #2766 - complete fstrings conversion for all files modified. --- .../replication/listener_broker.py | 135 +++++++----------- .../smart_manager/replication/receiver.py | 78 ++++------ .../smart_manager/replication/sender.py | 100 ++++--------- .../smart_manager/replication/util.py | 8 +- 4 files changed, 112 insertions(+), 209 deletions(-) diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index 47f8fd858..3f4f191ad 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -73,14 +73,12 @@ def _delete_receivers(self): if r in self.remote_senders: del self.remote_senders[r] logger.debug( - "Receiver(%s) exited. exitcode: %s. Total " - "messages processed: %d. Removing from the list." - % (r, ecode, msg_count) + f"Receiver({r}) exited. exitcode: {ecode}. Total messages processed: {msg_count}. " + "Removing from the list." ) else: active_msgs.append( - "Active Receiver: %s. Total messages " - "processed: %d" % (r, msg_count) + f"Active Receiver: {r}. Total messages processed: {msg_count}" ) for m in active_msgs: logger.debug(m) @@ -92,44 +90,38 @@ def _get_receiver_ip(self, replica): appliance = Appliance.objects.get(uuid=replica.appliance) return appliance.ip except Exception as e: - msg = ( - "Failed to get receiver ip. Is the receiver " - "appliance added?. Exception: %s" % e.__str__() - ) + msg = f"Failed to get receiver ip. Is the receiver appliance added?. Exception: {e.__str__()}" logger.error(msg) raise Exception(msg) def _process_send(self, replica): - sender_key = "%s_%s" % (self.uuid, replica.id) + sender_key = f"{self.uuid}_{replica.id}" if sender_key in self.senders: - # If the sender exited but hasn't been removed from the dict, - # remove and proceed. + # If the sender exited but hasn't been removed from the dict, remove and proceed. ecode = self.senders[sender_key].exitcode if ecode is not None: del self.senders[sender_key] logger.debug( - "Sender(%s) exited. exitcode: %s. Forcing " - "removal." % (sender_key, ecode) + f"Sender({sender_key}) exited. Exitcode: {ecode}. Forcing removal." ) else: raise Exception( - "There is live sender for(%s). Will not start " - "a new one." % sender_key + f"There is live sender for({sender_key}). Will not start a new one." ) receiver_ip = self._get_receiver_ip(replica) rt_qs = ReplicaTrail.objects.filter(replica=replica).order_by("-id") last_rt = rt_qs[0] if (len(rt_qs) > 0) else None if last_rt is None: - logger.debug("Starting a new Sender(%s)." % sender_key) + logger.debug(f"Starting a new Sender({sender_key}).") self.senders[sender_key] = Sender(self.uuid, receiver_ip, replica) elif last_rt.status == "succeeded": - logger.debug("Starting a new Sender(%s)" % sender_key) + logger.debug(f"Starting a new Sender({sender_key}).") self.senders[sender_key] = Sender(self.uuid, receiver_ip, replica, last_rt) elif last_rt.status == "pending": msg = ( - "Replica trail shows a pending Sender(%s), but it is not " - "alive. Marking it as failed. Will not start a new one." % sender_key + f"Replica trail shows a pending Sender({sender_key}), but it is not alive. " + "Marking it as failed. Will not start a new one." ) logger.error(msg) data = { @@ -151,19 +143,16 @@ def _process_send(self, replica): num_tries = num_tries + 1 if num_tries >= self.MAX_ATTEMPTS: msg = ( - "Maximum attempts(%d) reached for Sender(%s). " - "A new one " - "will not be started and the Replica task will be " - "disabled." % (self.MAX_ATTEMPTS, sender_key) + f"Maximum attempts({self.MAX_ATTEMPTS}) reached for Sender({sender_key}). " + "A new one will not be started and the Replica task will be disabled." ) logger.error(msg) self.disable_replica(replica.id) raise Exception(msg) logger.debug( - "previous backup failed for Sender(%s). " - "Starting a new one. Attempt %d/%d." - % (sender_key, num_tries, self.MAX_ATTEMPTS) + f"previous backup failed for Sender({sender_key}). " + f"Starting a new one. Attempt {num_tries}/{self.MAX_ATTEMPTS}." ) try: last_success_rt = ReplicaTrail.objects.filter( @@ -171,8 +160,8 @@ def _process_send(self, replica): ).latest("id") except ReplicaTrail.DoesNotExist: logger.debug( - "No record of last successful ReplicaTrail for " - "Sender(%s). Will start a new Full Sender." % sender_key + f"No record of last successful ReplicaTrail for Sender({sender_key}). " + f"Will start a new Full Sender." ) last_success_rt = None self.senders[sender_key] = Sender( @@ -180,8 +169,8 @@ def _process_send(self, replica): ) else: msg = ( - "Unexpected ReplicaTrail status(%s) for Sender(%s). " - "Will not start a new one." % (last_rt.status, sender_key) + f"Unexpected ReplicaTrail status({last_rt.status}) for Sender({sender_key}). " + f"Will not start a new one." ) raise Exception(msg) @@ -201,28 +190,23 @@ def run(self): except NetworkConnection.DoesNotExist: self.listener_interface = "0.0.0.0" except Exception as e: - msg = ( - "Failed to fetch network interface for Listner/Broker. " - "Exception: %s" % e.__str__() - ) + msg = f"Failed to fetch network interface for Listener/Broker. Exception: {e.__str__()}" return logger.error(msg) try: self.uuid = Appliance.objects.get(current_appliance=True).uuid except Exception as e: - msg = ( - "Failed to get uuid of current appliance. Aborting. " - "Exception: %s" % e.__str__() - ) + msg = f"Failed to get uuid of current appliance. Aborting. Exception: {e.__str__()}" return logger.error(msg) ctx = zmq.Context() frontend = ctx.socket(zmq.ROUTER) frontend.set_hwm(value=10) - frontend.bind("tcp://%s:%d" % (self.listener_interface, self.listener_port)) + frontend.bind(f"tcp://{self.listener_interface}:{ self.listener_port}") backend = ctx.socket(zmq.ROUTER) - backend.bind("ipc://%s" % settings.REPLICATION.get("ipc_socket")) + ipc_socket = settings.REPLICATION.get("ipc_socket") + backend.bind(f"ipc://{ipc_socket}") poller = zmq.Poller() poller.register(frontend, zmq.POLLIN) @@ -247,11 +231,10 @@ def run(self): msg_count = 0 for rs, count in self.remote_senders.items(): logger.debug( - "Active Receiver: %s. Messages processed:" - "%d" % (rs, count) + f"Active Receiver: {rs}. Messages processed: {count}" ) if command == b"sender-ready": - logger.debug("initial greeting from %s" % address) + logger.debug(f"initial greeting from {address}") # Start a new receiver and send the appropriate response try: start_nr = True @@ -261,38 +244,36 @@ def run(self): if ecode is not None: del self.local_receivers[address] logger.debug( - "Receiver(%s) exited. exitcode: " - "%s. Forcing removal from broker " - "list." % (address, ecode) + f"Receiver({address}) exited. exitcode: {ecode}. Forcing removal from broker list." ) start_nr = True else: - msg = ( - "Receiver(%s) already exists. " - "Will not start a new one." % address - ) + msg = f"Receiver({address}) already exists. Will not start a new one." logger.error(msg) - # @todo: There may be a different way to handle - # this. For example, we can pass the message to - # the active receiver and factor into it's - # retry/robust logic. But that is for later. + # TODO: There may be a different way to handle + # this. For example, we can pass the message to + # the active receiver and factor into its + # retry/robust logic. But that is for later. frontend.send_multipart( - [address, b"receiver-init-error", msg.encode("utf-8")] + [ + address, + b"receiver-init-error", + msg.encode("utf-8"), + ] ) if start_nr: nr = Receiver(address, msg) nr.daemon = True nr.start() - logger.debug("New Receiver(%s) started." % address) + logger.debug(f"New Receiver({address}) started.") self.local_receivers[address] = nr continue except Exception as e: - msg = ( - "Exception while starting the " - "new receiver for %s: %s" % (address, e.__str__()) - ) + msg = f"Exception while starting the new receiver for {address}: {e.__str__()}" logger.error(msg) - frontend.send_multipart([address, b"receiver-init-error", msg.encode("utf-8")]) + frontend.send_multipart( + [address, b"receiver-init-error", msg.encode("utf-8")] + ) else: # do we hit hwm? is the dealer still connected? backend.send_multipart([address, command, msg.encode("utf-8")]) @@ -301,38 +282,28 @@ def run(self): address, command, msg = backend.recv_multipart() if command == b"new-send": rid = int(msg) - logger.debug("new-send request received for %d" % rid) + logger.debug(f"new-send request received for {rid}") rcommand = b"ERROR" try: replica = Replica.objects.get(id=rid) if replica.enabled: self._process_send(replica) - msg = ( - "A new Sender started successfully for " - "Replication Task(%d)." % rid - ) + msg = f"A new Sender started successfully for Replication Task({rid})." rcommand = b"SUCCESS" else: - msg = ( - "Failed to start a new Sender for " - "Replication " - "Task(%d) because it is disabled." % rid - ) + msg = f"Failed to start a new Sender for Replication Task({rid}) because it is disabled." except Exception as e: - msg = ( - "Failed to start a new Sender for Replication " - "Task(%d). Exception: %s" % (rid, e.__str__()) - ) + msg = f"Failed to start a new Sender for Replication Task({rid}). Exception: {e.__str__()}" logger.error(msg) finally: backend.send_multipart([address, rcommand, msg.encode("utf-8")]) elif address in self.remote_senders: - if command in ( - b"receiver-ready", - b"receiver-error", - b"btrfs-recv-finished", - ): # noqa E501 - logger.debug("Identitiy: %s command: %s" % (address, command)) + if ( + command == b"receiver-ready" + or command == b"receiver-error" + or command == b"btrfs-recv-finished" + ): + logger.debug(f"Identity: {address} command: {command}") backend.send_multipart([address, b"ACK", b""]) # a new receiver has started. reply to the sender that # must be waiting diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index a6450a8f2..0c328a8a2 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -48,13 +48,8 @@ def __init__(self, identity, meta): self.incremental = self.meta["incremental"] self.snap_name = self.meta["snap"] self.sender_id = self.meta["uuid"] - self.sname = "%s_%s" % (self.sender_id, self.src_share) - self.snap_dir = "%s%s/.snapshots/%s" % ( - settings.MNT_PT, - self.dest_pool, - self.sname, - ) - + self.sname = f"{self.sender_id}_{self.src_share}" + self.snap_dir = f"{settings.MNT_PT}{self.dest_pool}/.snapshots/{self.sname}" self.ppid = os.getpid() self.kb_received = 0 self.rid = None @@ -76,14 +71,11 @@ def _sys_exit(self, code): self.rp.terminate() except Exception as e: logger.error( - "Id: %s. Exception while terminating " - "the btrfs-recv process: %s" % (self.identity, e.__str__()) + f"Id: {self.identity}. Exception while terminating the btrfs-recv process: {e.__str__()}" ) self.ctx.destroy(linger=0) if code == 0: - logger.debug( - "Id: %s. meta: %s Receive successful" % (self.identity, self.meta) - ) + logger.debug(f"Id: {self.identity}. meta: {self.meta} Receive successful") sys.exit(code) @contextmanager @@ -100,19 +92,15 @@ def _clean_exit_handler(self): } self.update_receive_trail(self.rtid, data) except Exception as e: - msg = ( - "Id: %s. Exception while updating receive " - "trail for rtid(%d)." % (self.identity, self.rtid) - ) - logger.error("%s. Exception: %s" % (msg, e.__str__())) - + msg = f"Id: {self.identity}. Exception while updating receive trail for rtid({self.rtid})." + logger.error(f"{msg}. Exception: {e.__str__()}") if self.ack is True: try: command = b"receiver-error" self.dealer.send_multipart( [ b"receiver-error", - b"%s. Exception: %s" % (str(self.msg), str(e.__str__())), + f"{self.msg}. Exception: {e.__str__()}".encode("utf-8"), ] ) # Retry logic here is overkill atm. @@ -120,13 +108,11 @@ def _clean_exit_handler(self): if socks.get(self.dealer) == zmq.POLLIN: msg = self.dealer.recv() logger.debug( - "Id: %s. Response from the broker: %s" - % (self.identity, msg) + f"Id: {self.identity}. Response from the broker: {msg}" ) else: logger.debug( - "Id: %s. No response received from " - "the broker" % self.identity + f"Id: {self.identity}. No response received from the broker" ) except Exception as e: msg = f"Id: {self.identity}. Exception while sending {command} back to the broker. Aborting" @@ -158,9 +144,7 @@ def _latest_snap(self, rso): "utf8" ) # cannot be unicode for zmq message logger.error( - "Id: %s. There are no replication snapshots on the " - "system for " - "Share(%s)." % (self.identity, rso.share) + f"Id: {self.identity}. There are no replication snapshots on the system for Share({rso.share})." ) # This would mean, a full backup transfer is required. return None @@ -175,9 +159,10 @@ def run(self): self.law = APIWrapper() self.poll = zmq.Poller() self.dealer = self.ctx.socket(zmq.DEALER) - self.dealer.setsockopt_string(zmq.IDENTITY, "%s" % self.identity) + self.dealer.setsockopt_string(zmq.IDENTITY, str(self.identity)) self.dealer.set_hwm(10) - self.dealer.connect("ipc://%s" % settings.REPLICATION.get("ipc_socket")) + ipc_socket = settings.REPLICATION.get("ipc_socket") + self.dealer.connect(f"ipc://{ipc_socket}") self.poll.register(self.dealer, zmq.POLLIN) self.ack = True @@ -238,14 +223,14 @@ def run(self): self._delete_old_snaps(self.sname, self.snap_dir, self.num_retain_snaps + 1) # TODO: The following should be re-instantiated once we have a - # TODO: working method for doing so. see validate_src_share. - # self.msg = ('Failed to validate the source share(%s) on ' - # 'sender(uuid: %s ' - # ') Did the ip of the sender change?' % - # (self.src_share, self.sender_id)) + # working method for doing so. see validate_src_share. + # self.msg = ( + # f"Failed to validate the source share ({self.src_share}) on sender uuid: ({self.sender_id}). " + # f"Did the ip of the sender change?" + # ).encode("utf-8") # self.validate_src_share(self.sender_id, self.src_share) - sub_vol = "%s%s/%s" % (settings.MNT_PT, self.dest_pool, self.sname) + sub_vol = f"{settings.MNT_PT}{self.dest_pool}/{self.sname}" if not is_subvol(sub_vol): self.msg = f"Failed to create parent subvolume {sub_vol}".encode( "utf-8" @@ -256,16 +241,14 @@ def run(self): "utf-8" ) run_command(["/usr/bin/mkdir", "-p", self.snap_dir]) - snap_fp = "%s/%s" % (self.snap_dir, self.snap_name) + snap_fp = f"{self.snap_dir}/{self.snap_name}" # If the snapshot already exists, presumably from the previous # attempt and the sender tries to send the same, reply back with # snap_exists and do not start the btrfs-receive if is_subvol(snap_fp): logger.debug( - "Id: %s. Snapshot to be sent(%s) already " - "exists. Not starting a new receive process" - % (self.identity, snap_fp) + f"Id: {self.identity}. Snapshot to be sent({snap_fp}) already exists. Not starting a new receive process" ) self._send_recv(b"snap-exists") self._sys_exit(0) @@ -312,9 +295,7 @@ def run(self): out = out.split("\n") err = err.split("\n") logger.debug( - "Id: %s. Terminated btrfs-recv. " - "cmd = %s out = %s err: %s rc: %s" - % (self.identity, cmd, out, err, self.rp.returncode) + f"Id: {self.identity}. Terminated btrfs-recv. cmd = {cmd} out = {out} err: {err} rc: {self.rp.returncode}" ) if self.rp.returncode != 0: self.msg = f"btrfs-recv exited with unexpected exitcode({self.rp.returncode}).".encode( @@ -336,9 +317,7 @@ def run(self): dsize, drate = self.size_report(self.total_bytes_received, t0) logger.debug( - "Id: %s. Receive complete. Total data " - "transferred: %s. Rate: %s/sec." - % (self.identity, dsize, drate) + f"Id: {self.identity}. Receive complete. Total data transferred: {dsize}. Rate: {drate}/sec." ) self._sys_exit(0) @@ -371,9 +350,7 @@ def run(self): self.total_bytes_received, t0 ) logger.debug( - "Id: %s. Receiver alive. Data " - "transferred: %s. Rate: %s/sec." - % (self.identity, dsize, drate) + f"Id: {self.identity}. Receiver alive. Data transferred: {dsize}. Rate: {drate}/sec." ) else: out, err = self.rp.communicate() @@ -399,11 +376,8 @@ def run(self): raise Exception(self.msg) else: num_tries -= 1 - msg = ( - "No response received from the broker. " - "remaining tries: %d" % num_tries - ) - logger.error("Id: %s. %s" % (self.identity, msg)) + msg = f"No response received from the broker. remaining tries: {num_tries}" + logger.error(f"Id: {self.identity}. {msg}") if num_tries == 0: self.msg = f"{msg}. Terminating the receiver.".encode("utf-8") raise Exception(self.msg) diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index f6bb42087..bb7784228 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -47,14 +47,14 @@ def __init__(self, uuid, receiver_ip, replica, rt=None): self.receiver_port = replica.data_port self.replica = replica # TODO: may need to send local shareId so it can be verifed remotely - self.snap_name = "%s_%d_replication" % (replica.share, replica.id) - self.snap_name += "_1" if (rt is None) else "_%d" % (rt.id + 1) - self.snap_id = "%s_%s" % (self.uuid, self.snap_name) + self.snap_name = f"{replica.share}_{replica.id}_replication" + self.snap_name += "_1" if (rt is None) else f"_{rt.id + 1}" + self.snap_id = f"{self.uuid}_{self.snap_name}" self.rt = rt self.rt2 = None self.rt2_id = None self.rid = replica.id - self.identity = "%s-%s" % (self.uuid, self.rid) + self.identity = f"{self.uuid}-{self.rid}" self.sp = None # Latest snapshot per Receiver(comes along with receiver-ready) self.rlatest_snap = None @@ -77,7 +77,7 @@ def _clean_exit_handler(self): try: data = { "status": "failed", - "error": "%s. Exception: %s" % (self.msg, e.__str__()), + "error": f"{self.msg}. Exception: {e.__str__()}", } # noqa E501 self.update_replica_status(self.rt2_id, data) except Exception as e: @@ -111,7 +111,7 @@ def _init_greeting(self): def _send_recv(self, command: bytes, msg: bytes = b""): self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8") rcommand = rmsg = None - self.send_req.send_multipart([command, b"%s" % msg]) + self.send_req.send_multipart([command, msg]) # There is no retry logic here because it's an overkill at the moment. # If the stream is interrupted, we can only start from the beginning # again. So we wait patiently, but only once. Perhaps we can implement @@ -154,12 +154,7 @@ def _refresh_rt(self): for rt in ReplicaTrail.objects.filter( replica=self.replica, status="succeeded" ).order_by("-id"): - snap_path = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.rt.snap_name, - ) + snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.rt.snap_name}" if is_subvol(snap_path): return rt # Snapshots from previous succeeded ReplicaTrails don't actually @@ -186,12 +181,7 @@ def _refresh_rt(self): self.msg = f"{self.msg}. successful trail found for {self.rlatest_snap}".encode( "utf-8" ) - snap_path = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.rlatest_snap, - ) + snap_path = f"{settings.MNT_PT}{self.replica.pool}.snapshots/{self.replica.share}/{self.rlatest_snap}" if is_subvol(snap_path): self.msg = f"Snapshot({snap_path}) exists in the system and will be used as the parent".encode( "utf-8" @@ -231,10 +221,8 @@ def run(self): # prune old snapshots. self.update_trail = True self.msg = "Failed to prune old snapshots".encode("utf-8") - share_path = "%s%s/.snapshots/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, + share_path = ( + f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}" ) self._delete_old_snaps(share_path) @@ -288,18 +276,15 @@ def run(self): "status": "succeeded", "error": "snapshot already exists on the receiver", } # noqa E501 - self.msg = ( - "Failed to update replica status for " - "%s" % self.snap_id - ).encode("utf-8") + self.msg = f"Failed to update replica status for {self.snap_id}".encode( + "utf-8" + ) self.update_replica_status(self.rt2_id, data) self._sys_exit(0) else: - self.msg = ( - "unexpected reply(%s) for %s. " - "extended reply: %s. Aborting" - % (command, self.identity, reply) - ).encode("utf-8") + self.msg = f"unexpected reply({command}) for {self.identity}. extended reply: {reply}. Aborting".encode( + "utf-8" + ) raise Exception(self.msg) else: retries_left -= 1 @@ -316,31 +301,18 @@ def run(self): self.poll.unregister(self.send_req) self._init_greeting() - snap_path = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.snap_name, - ) + snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.snap_name}" cmd = [BTRFS, "send", snap_path] logger.debug(f"init btrfs 'send' cmd {cmd}") if self.rt is not None: - prev_snap = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.rt.snap_name, - ) + prev_snap = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.rt.snap_name}" logger.info( - "Id: %s. Sending incremental replica between " - "%s -- %s" % (self.identity, prev_snap, snap_path) + f"Id: {self.identity}. Sending incremental replica between {prev_snap} -- {snap_path}" ) cmd = [BTRFS, "send", "-p", prev_snap, snap_path] logger.debug(f"differential btrfs 'send' cmd {cmd}") else: - logger.info( - "Id: %s. Sending full replica: %s" % (self.identity, snap_path) - ) + logger.info(f"Id: {self.identity}. Sending full replica: {snap_path}") try: self.sp = subprocess.Popen( @@ -348,10 +320,9 @@ def run(self): ) fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) except Exception as e: - self.msg = ( - "Failed to start the low level btrfs send " - f"command({cmd}). Aborting. Exception: {e.__str__()}" - ).encode("utf-8") + self.msg = f"Failed to start the low level btrfs send command({cmd}). Aborting. Exception: {e.__str__()}".encode( + "utf-8" + ) logger.error(f"Id: {self.identity}. {self.msg}") self._send_recv(b"btrfs-send-init-error") self._sys_exit(3) @@ -363,25 +334,17 @@ def run(self): try: if self.sp.poll() is not None: logger.debug( - "Id: %s. send process finished " - "for %s. rc: %d. stderr: %s" - % ( - self.identity, - self.snap_id, - self.sp.returncode, - self.sp.stderr.read(), - ) + f"Id: {self.identity}. send process finished for {self.snap_id}. " + f"rc: {self.sp.returncode}. stderr: {self.sp.stderr.read()}" ) alive = False fs_data = self.sp.stdout.read() except IOError: continue except Exception as e: - self.msg = ( - "Exception occurred while reading low " - "level btrfs " - "send data for %s. Aborting." % self.snap_id - ).encode("utf-8") + self.msg = f"Exception occurred while reading low level btrfs send data for {self.snap_id}. Aborting.".encode( + "utf-8" + ) if alive: self.sp.terminate() self.update_trail = True @@ -399,8 +362,7 @@ def run(self): num_msgs = 0 dsize, drate = self.size_report(self.total_bytes_sent, t0) logger.debug( - "Id: %s Sender alive. Data transferred: " - "%s. Rate: %s/sec." % (self.identity, dsize, drate) + f"Id: {self.identity} Sender alive. Data transferred: {dsize}. Rate: {drate}/sec." ) if command is None or command == b"receiver-error": # command is None when the remote side vanishes. @@ -424,9 +386,7 @@ def run(self): if os.getppid() != self.ppid: logger.error( - "Id: %s. Scheduler exited. Sender for %s " - "cannot go on. " - "Aborting." % (self.identity, self.snap_id) + f"Id: {self.identity}. Scheduler exited. Sender for {self.snap_id} cannot go on. Aborting." ) self._sys_exit(3) diff --git a/src/rockstor/smart_manager/replication/util.py b/src/rockstor/smart_manager/replication/util.py index 16d10e6df..f8fbe4b66 100644 --- a/src/rockstor/smart_manager/replication/util.py +++ b/src/rockstor/smart_manager/replication/util.py @@ -35,7 +35,7 @@ def validate_src_share(self, sender_uuid: str, sname: str): client_id=a.client_id, client_secret=a.client_secret, url=url ) # TODO: update url to include senders shareId as sname is now invalid - return self.raw.api_call(url="shares/%s" % sname) + return self.raw.api_call(url=f"shares/{sname}") def update_replica_status(self, rtid: int, data): try: @@ -89,9 +89,7 @@ def create_rshare(self, data) -> int: return rshare["id"] except RockStorAPIException as e: # Note replica_share.py post() generates this exception message. - if ( - e.detail == "Replicashare(%s) already exists." % data["share"] - ): # noqa E501 + if e.detail == f"Replicashare({data['share']}) already exists.": # noqa E501 return self.rshare_id(data["share"]) raise e @@ -247,7 +245,7 @@ def humanize_bytes( :return: "1023 Bytes" or "4.28 KB" etc given num=1023 or num=4384 ) """ if num < 1024 or len(units) == 1: - return "%.2f %s" % (num, units[0]) + return f"{num:.2f} {units[0]}" return self.humanize_bytes(num / 1024, units[1:]) def size_report(self, num: int, t0):