diff --git a/src/rockstor/scripts/scheduled_tasks/send_replica.py b/src/rockstor/scripts/scheduled_tasks/send_replica.py index b4f0b6aec..3e7bad21f 100644 --- a/src/rockstor/scripts/scheduled_tasks/send_replica.py +++ b/src/rockstor/scripts/scheduled_tasks/send_replica.py @@ -31,16 +31,12 @@ def main(): num_tries = 3 while True: req = ctx.socket(zmq.DEALER) - print(f"req = {req}") poll.register(req, zmq.POLLIN) - print(f"poll.register = {poll.register}") - ipc_socket = settings.REPLICATION.get('ipc_socket') + ipc_socket = settings.REPLICATION.get("ipc_socket") req.connect(f"ipc://{ipc_socket}") - print(f"req.connect = {req.connect}, ipc_socket = {ipc_socket}") - req.send_multipart([b"new-send", f"{rid}".encode('utf-8')]) + req.send_multipart([b"new-send", f"{rid}".encode("utf-8")]) socks = dict(poll.poll(5000)) - # print(f"socks.get(req) = {socks.get(req)}") if socks.get(req) == zmq.POLLIN: rcommand, reply = req.recv_multipart() print(f"rcommand={rcommand}, reply={reply}") diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index 87a70d37a..a6450a8f2 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -153,8 +153,10 @@ def _latest_snap(self, rso): for snap in ReceiveTrail.objects.filter( rshare=rso, status="succeeded" ).order_by("-id"): - if is_subvol("%s/%s" % (self.snap_dir, snap.snap_name)): - return str(snap.snap_name) # cannot be unicode for zmq message + if is_subvol(f"{self.snap_dir}/{snap.snap_name}"): + return str(snap.snap_name).encode( + "utf8" + ) # cannot be unicode for zmq message logger.error( "Id: %s. There are no replication snapshots on the " "system for " @@ -265,7 +267,7 @@ def run(self): "exists. Not starting a new receive process" % (self.identity, snap_fp) ) - self._send_recv("snap-exists") + self._send_recv(b"snap-exists") self._sys_exit(0) cmd = [BTRFS, "receive", self.snap_dir] @@ -288,11 +290,6 @@ def run(self): ) self._sys_exit(3) - term_commands = ( - b"btrfs-send-init-error", - b"btrfs-send-unexpected-termination-error", - b"btrfs-send-nonzero-termination-error", - ) num_tries = 10 poll_interval = 6000 # 6 seconds num_msgs = 0 @@ -304,6 +301,7 @@ def run(self): # milliseconds) for every message num_tries = 10 command, message = self.dealer.recv_multipart() + logger.debug(f"command = {command}, of type:", type(command)) if command == b"btrfs-send-stream-finished": # this command concludes fsdata transfer. After this, # btrfs-recev process should be @@ -327,7 +325,9 @@ def run(self): "status": "succeeded", "kb_received": self.total_bytes_received / 1024, } - self.msg = f"Failed to update receive trail for rtid: {self.rtid}".encode("utf-8") + self.msg = f"Failed to update receive trail for rtid: {self.rtid}".encode( + "utf-8" + ) self.update_receive_trail(self.rtid, data) self._send_recv(b"btrfs-recv-finished") @@ -342,7 +342,11 @@ def run(self): ) self._sys_exit(0) - if command in term_commands: + if ( + command == b"btrfs-send-init-error" + or command == b"btrfs-send-unexpected-termination-error" + or command == b"btrfs-send-nonzero-termination-error" + ): self.msg = f"Terminal command({command}) received from the sender. Aborting.".encode( "utf-8" ) diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index 34a687fd3..f6bb42087 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -72,9 +72,7 @@ def _clean_exit_handler(self): try: yield except Exception as e: - logger.error( - "Id: %s. %s. Exception: %s" % (self.identity, self.msg, e.__str__()) - ) + logger.error(f"Id: {self.identity}. {self.msg}. Exception: {e.__str__()}") if self.update_trail: try: data = { @@ -84,8 +82,7 @@ def _clean_exit_handler(self): self.update_replica_status(self.rt2_id, data) except Exception as e: logger.error( - "Id: %s. Exception occured while updating " - "replica status: %s" % (self.identity, e.__str__()) + f"Id: {self.identity}. Exception occurred while updating replica status: {e.__str__()}" ) self._sys_exit(3) @@ -98,7 +95,7 @@ def _sys_exit(self, code): def _init_greeting(self): self.send_req = self.ctx.socket(zmq.DEALER) self.send_req.setsockopt_string(zmq.IDENTITY, self.identity) - self.send_req.connect("tcp://%s:%d" % (self.receiver_ip, self.receiver_port)) + self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}") msg = { "pool": self.replica.dpool, "share": self.replica.share, @@ -107,8 +104,8 @@ def _init_greeting(self): "uuid": self.uuid, } msg_str = json.dumps(msg) - self.send_req.send_multipart([b"sender-ready", b"%s" % msg_str]) - logger.debug("Id: %s Initial greeting: %s" % (self.identity, msg)) + self.send_req.send_multipart([b"sender-ready", msg_str.encode("utf-8")]) + logger.debug(f"Id: {self.identity} Initial greeting: {msg}") self.poll.register(self.send_req, zmq.POLLIN) def _send_recv(self, command: bytes, msg: bytes = b""): @@ -132,7 +129,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""): ) return rcommand, rmsg - def _delete_old_snaps(self, share_path): + def _delete_old_snaps(self, share_path: str): oldest_snap = get_oldest_snap( share_path, self.max_snap_retain, regex="_replication_" ) @@ -248,7 +245,7 @@ def run(self): # create a snapshot only if it's not already from a previous # failed attempt. # TODO: If one does exist we fail which seems harsh as we may be - # TODO: able to pickup where we left of depending on the failure. + # able to pickup where we left of depending on the failure. self.msg = f"Failed to create snapshot: {self.snap_name}. Aborting.".encode( "utf-8" ) @@ -256,14 +253,20 @@ def run(self): retries_left = settings.REPLICATION.get("max_send_attempts") + self.msg = ( + "Place-holder message just after sender snapshot creation".encode( + "utf-8" + ) + ) + poll_interval = 6000 # 6 seconds while True: socks = dict(self.poll.poll(poll_interval)) if socks.get(self.send_req) == zmq.POLLIN: # not really necessary because we just want one reply for # now. - retries_left = settings.REPLICATION.get("max_send_attempts") command, reply = self.send_req.recv_multipart() + logger.debug(f"command = {command}, of type", type(command)) if command == b"receiver-ready": if self.rt is not None: self.rlatest_snap = reply @@ -273,7 +276,7 @@ def run(self): ) break else: - if command in "receiver-init-error": + if command == b"receiver-init-error": self.msg = f"{command} received for {self.identity}. extended reply: {reply}. Aborting.".encode( "utf-8" ) @@ -320,6 +323,7 @@ def run(self): self.snap_name, ) cmd = [BTRFS, "send", snap_path] + logger.debug(f"init btrfs 'send' cmd {cmd}") if self.rt is not None: prev_snap = "%s%s/.snapshots/%s/%s" % ( settings.MNT_PT, @@ -332,6 +336,7 @@ def run(self): "%s -- %s" % (self.identity, prev_snap, snap_path) ) cmd = [BTRFS, "send", "-p", prev_snap, snap_path] + logger.debug(f"differential btrfs 'send' cmd {cmd}") else: logger.info( "Id: %s. Sending full replica: %s" % (self.identity, snap_path) diff --git a/src/rockstor/smart_manager/replication/util.py b/src/rockstor/smart_manager/replication/util.py index e6e985503..16d10e6df 100644 --- a/src/rockstor/smart_manager/replication/util.py +++ b/src/rockstor/smart_manager/replication/util.py @@ -26,58 +26,61 @@ class ReplicationMixin(object): - def validate_src_share(self, sender_uuid, sname): + def validate_src_share(self, sender_uuid: str, sname: str): url = "https://" if self.raw is None: a = Appliance.objects.get(uuid=sender_uuid) - url = "%s%s:%s" % (url, a.ip, a.mgmt_port) + url = f"{url}{a.ip}:{a.mgmt_port}" self.raw = APIWrapper( client_id=a.client_id, client_secret=a.client_secret, url=url ) # TODO: update url to include senders shareId as sname is now invalid return self.raw.api_call(url="shares/%s" % sname) - def update_replica_status(self, rtid, data): + def update_replica_status(self, rtid: int, data): try: - url = "sm/replicas/trail/%d" % rtid + url = f"sm/replicas/trail/{rtid}" return self.law.api_call(url, data=data, calltype="put") except Exception as e: - msg = "Exception while updating replica(%s) status to %s: %s" % ( - url, - data["status"], - e.__str__(), - ) + msg = f"Exception while updating replica({url}) status to {data['status']}: {e.__str__()}" raise Exception(msg) - def disable_replica(self, rid): + def disable_replica(self, rid: int): try: - url = "sm/replicas/%d" % rid + url = f"sm/replicas/{rid}" headers = { "content-type": "application/json", } return self.law.api_call( url, - data={"enabled": False,}, + data={ + "enabled": False, + }, calltype="put", save_error=False, headers=headers, ) except Exception as e: - msg = "Exception while disabling replica(%s): %s" % (url, e.__str__()) + msg = f"Exception while disabling replica({url}): {e.__str__()}" raise Exception(msg) - def create_replica_trail(self, rid, snap_name): - url = "sm/replicas/trail/replica/%d" % rid + def create_replica_trail(self, rid: int, snap_name: str): + url = f"sm/replicas/trail/replica/{rid}" return self.law.api_call( - url, data={"snap_name": snap_name,}, calltype="post", save_error=False + url, + data={ + "snap_name": snap_name, + }, + calltype="post", + save_error=False, ) - def rshare_id(self, sname): - url = "sm/replicas/rshare/%s" % sname + def rshare_id(self, sname: str) -> int: + url = f"sm/replicas/rshare/{sname}" rshare = self.law.api_call(url, save_error=False) return rshare["id"] - def create_rshare(self, data): + def create_rshare(self, data) -> int: try: url = "sm/replicas/rshare" rshare = self.law.api_call( @@ -92,20 +95,20 @@ def create_rshare(self, data): return self.rshare_id(data["share"]) raise e - def create_receive_trail(self, rid, data): - url = "sm/replicas/rtrail/rshare/%d" % rid + def create_receive_trail(self, rid: int, data) -> int: + url = f"sm/replicas/rtrail/rshare/{rid}" rt = self.law.api_call(url, data=data, calltype="post", save_error=False) return rt["id"] - def update_receive_trail(self, rtid, data): - url = "sm/replicas/rtrail/%d" % rtid + def update_receive_trail(self, rtid: int, data): + url = f"sm/replicas/rtrail/{rtid}" try: return self.law.api_call(url, data=data, calltype="put", save_error=False) except Exception as e: - msg = "Exception while updating receive trail(%s): %s" % (url, e.__str__()) + msg = f"Exception while updating receive trail({url}): {e.__str__()}" raise Exception(msg) - def prune_trail(self, url, days=7): + def prune_trail(self, url: str, days: int = 7): try: data = { "days": days, @@ -114,33 +117,39 @@ def prune_trail(self, url, days=7): url, data=data, calltype="delete", save_error=False ) except Exception as e: - msg = "Exception while pruning trail for url(%s): %s" % (url, e.__str__()) + msg = f"Exception while pruning trail for url({url}): {e.__str__()}" raise Exception(msg) def prune_receive_trail(self, ro): - url = "sm/replicas/rtrail/rshare/%d" % ro.id + url = f"sm/replicas/rtrail/rshare/{ro.id}" return self.prune_trail(url) def prune_replica_trail(self, ro): - url = "sm/replicas/trail/replica/%d" % ro.id + url = f"sm/replicas/trail/replica/{ro.id}" return self.prune_trail(url) - def create_snapshot(self, sname, snap_name, snap_type="replication"): + def create_snapshot(self, sname: str, snap_name: str, snap_type="replication"): try: share = Share.objects.get(name=sname) - url = "shares/%s/snapshots/%s" % (share.id, snap_name) + url = f"shares/{share.id}/snapshots/{snap_name}" return self.law.api_call( - url, data={"snap_type": snap_type,}, calltype="post", save_error=False + url, + data={ + "snap_type": snap_type, + }, + calltype="post", + save_error=False, ) except RockStorAPIException as e: # Note snapshot.py _create() generates this exception message. - if e.detail == ( - "Snapshot ({}) already exists for the share ({})." - ).format(snap_name, sname): + if ( + e.detail + == f"Snapshot ({snap_name}) already exists for the share ({sname})." + ): return logger.debug(e.detail) raise e - def update_repclone(self, sname, snap_name): + def update_repclone(self, sname: str, snap_name: str): """ Call the dedicated create_repclone via it's url to supplant our share with the given snapshot. Intended for use in receive.py to turn @@ -152,32 +161,33 @@ def update_repclone(self, sname, snap_name): """ try: share = Share.objects.get(name=sname) - url = "shares/{}/snapshots/{}/repclone".format(share.id, snap_name) + url = f"shares/{share.id}/snapshots/{snap_name}/repclone" return self.law.api_call(url, calltype="post", save_error=False) except RockStorAPIException as e: # TODO: need to look further at the following as command repclone - # TODO: (snapshot.py post) catches Snapshot.DoesNotExist. - # TODO: and doesn't appear to call _delete_snapshot() + # (snapshot.py post) catches Snapshot.DoesNotExist. + # and doesn't appear to call _delete_snapshot() # Note snapshot.py _delete_snapshot() generates this exception msg. - if e.detail == "Snapshot name ({}) does not exist.".format(snap_name): + if e.detail == f"Snapshot name ({snap_name}) does not exist.": logger.debug(e.detail) return False raise e - def delete_snapshot(self, sname, snap_name): + def delete_snapshot(self, sname: str, snap_name: str): try: share = Share.objects.get(name=sname) - url = "shares/%s/snapshots/%s" % (share.id, snap_name) + url = f"shares/{share.id}/snapshots/{snap_name}" self.law.api_call(url, calltype="delete", save_error=False) return True except RockStorAPIException as e: # Note snapshot.py _delete_snapshot() generates this exception msg. - if e.detail == "Snapshot name ({}) does not exist.".format(snap_name): + if e.detail == f"Snapshot name ({snap_name}) does not exist.": logger.debug(e.detail) return False raise e - def create_share(self, sname, pool): + def create_share(self, sname: str, pool): + print("pool parameter is of type:", type(pool)) try: url = "shares" data = { @@ -193,10 +203,7 @@ def create_share(self, sname, pool): ) except RockStorAPIException as e: # Note share.py post() generates this exception message. - if ( - e.detail == "Share ({}) already exists. Choose a different " - "name.".format(sname) - ): # noqa E501 + if e.detail == f"Share ({sname}) already exists. Choose a different name.": return logger.debug(e.detail) raise e @@ -209,7 +216,7 @@ def refresh_snapshot_state(self): save_error=False, ) except Exception as e: - logger.error("Exception while refreshing Snapshot state: %s" % e.__str__()) + logger.error(f"Exception while refreshing Snapshot state: {e.__str__()}") def refresh_share_state(self): try: @@ -220,9 +227,18 @@ def refresh_share_state(self): save_error=False, ) except Exception as e: - logger.error("Exception while refreshing Share state: %s" % e.__str__()) + logger.error(f"Exception while refreshing Share state: {e.__str__()}") - def humanize_bytes(self, num, units=("Bytes", "KB", "MB", "GB",)): + def humanize_bytes( + self, + num: int, + units=( + "Bytes", + "KB", + "MB", + "GB", + ), + ): """ Recursive routine to establish and then return the most appropriate num expression given the contents of units. Ie 1023 Bytes or 4096 KB @@ -234,7 +250,7 @@ def humanize_bytes(self, num, units=("Bytes", "KB", "MB", "GB",)): return "%.2f %s" % (num, units[0]) return self.humanize_bytes(num / 1024, units[1:]) - def size_report(self, num, t0): + def size_report(self, num: int, t0): t1 = time.time() dsize = self.humanize_bytes(float(num)) drate = self.humanize_bytes(float(num / (t1 - t0)))