diff --git a/src/rockstor/scripts/scheduled_tasks/send_replica.py b/src/rockstor/scripts/scheduled_tasks/send_replica.py index 45e9acc53..3e7bad21f 100644 --- a/src/rockstor/scripts/scheduled_tasks/send_replica.py +++ b/src/rockstor/scripts/scheduled_tasks/send_replica.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2020 RockStor, Inc. +Copyright (c) 2012-2023 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -13,7 +13,7 @@ General Public License for more details. You should have received a copy of the GNU General Public License -along with this program. If not, see . +along with this program. If not, see . """ import sys @@ -28,17 +28,19 @@ def main(): rid = int(sys.argv[1]) ctx = zmq.Context() poll = zmq.Poller() - num_tries = 12 + num_tries = 3 while True: req = ctx.socket(zmq.DEALER) poll.register(req, zmq.POLLIN) - req.connect("ipc://%s" % settings.REPLICATION.get("ipc_socket")) - req.send_multipart(["new-send", b"%d" % rid]) + ipc_socket = settings.REPLICATION.get("ipc_socket") + req.connect(f"ipc://{ipc_socket}") + req.send_multipart([b"new-send", f"{rid}".encode("utf-8")]) socks = dict(poll.poll(5000)) if socks.get(req) == zmq.POLLIN: rcommand, reply = req.recv_multipart() - if rcommand == "SUCCESS": + print(f"rcommand={rcommand}, reply={reply}") + if rcommand == b"SUCCESS": print(reply) break ctx.destroy(linger=0) @@ -46,7 +48,7 @@ def main(): num_tries -= 1 print( "No response from Replication service. Number of retry " - "attempts left: %d" % num_tries + f"attempts left: {num_tries}" ) if num_tries == 0: ctx.destroy(linger=0) diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index 4e2043927..799a20f86 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2020 RockStor, Inc. +Copyright (c) 2012-2023 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -13,10 +13,10 @@ General Public License for more details. You should have received a copy of the GNU General Public License -along with this program. If not, see . +along with this program. If not, see . """ - from multiprocessing import Process +from typing import Any import zmq import os import json @@ -24,9 +24,9 @@ from storageadmin.models import NetworkConnection, Appliance from smart_manager.models import ReplicaTrail, ReplicaShare, Replica, Service from django.conf import settings -from sender import Sender -from receiver import Receiver -from util import ReplicationMixin +from smart_manager.replication.sender import Sender +from smart_manager.replication.receiver import Receiver +from smart_manager.replication.util import ReplicationMixin from cli import APIWrapper import logging @@ -34,36 +34,44 @@ class ReplicaScheduler(ReplicationMixin, Process): + uuid: str | None + local_receivers: dict[Any, Any] + def __init__(self): + self.law = None + self.local_receivers = {} self.ppid = os.getpid() self.senders = {} # Active Sender(outgoing) process map. self.receivers = {} # Active Receiver process map. self.remote_senders = {} # Active incoming/remote Sender/client map. self.MAX_ATTEMPTS = settings.REPLICATION.get("max_send_attempts") - self.uuid = self.listener_interface = self.listener_port = None + self.uuid = None + self.listener_interface = None + self.listener_port = None self.trail_prune_time = None super(ReplicaScheduler, self).__init__() def _prune_workers(self, workers): for wd in workers: - for w in wd.keys(): + for w in list(wd.keys()): if wd[w].exitcode is not None: del wd[w] - logger.debug("deleted worker: %s" % w) + logger.debug(f"deleted worker: {w}") return workers def _prune_senders(self): - for s in self.senders.keys(): + for s in list(self.senders.keys()): ecode = self.senders[s].exitcode if ecode is not None: del self.senders[s] - logger.debug("Sender(%s) exited. exitcode: %s" % (s, ecode)) + logger.debug(f"Sender({s}) exited. exitcode: {ecode}") if len(self.senders) > 0: - logger.debug("Active Senders: %s" % self.senders.keys()) + logger.debug(f"Active Senders: {self.senders.keys()}") def _delete_receivers(self): active_msgs = [] - for r in self.local_receivers.keys(): + # We modify during iteration, and so require explicit list. + for r in list(self.local_receivers.keys()): msg_count = self.remote_senders.get(r, 0) ecode = self.local_receivers[r].exitcode if ecode is not None: @@ -71,14 +79,12 @@ def _delete_receivers(self): if r in self.remote_senders: del self.remote_senders[r] logger.debug( - "Receiver(%s) exited. exitcode: %s. Total " - "messages processed: %d. Removing from the list." - % (r, ecode, msg_count) + f"Receiver({r}) exited. exitcode: {ecode}. Total messages processed: {msg_count}. " + "Removing from the list." ) else: active_msgs.append( - "Active Receiver: %s. Total messages " - "processed: %d" % (r, msg_count) + f"Active Receiver: {r}. Total messages processed: {msg_count}" ) for m in active_msgs: logger.debug(m) @@ -90,44 +96,38 @@ def _get_receiver_ip(self, replica): appliance = Appliance.objects.get(uuid=replica.appliance) return appliance.ip except Exception as e: - msg = ( - "Failed to get receiver ip. Is the receiver " - "appliance added?. Exception: %s" % e.__str__() - ) + msg = f"Failed to get receiver ip. Is the receiver appliance added?. Exception: {e.__str__()}" logger.error(msg) raise Exception(msg) def _process_send(self, replica): - sender_key = "%s_%s" % (self.uuid, replica.id) + sender_key = f"{self.uuid}_{replica.id}" if sender_key in self.senders: - # If the sender exited but hasn't been removed from the dict, - # remove and proceed. + # If the sender exited but hasn't been removed from the dict, remove and proceed. ecode = self.senders[sender_key].exitcode if ecode is not None: del self.senders[sender_key] logger.debug( - "Sender(%s) exited. exitcode: %s. Forcing " - "removal." % (sender_key, ecode) + f"Sender({sender_key}) exited. Exitcode: {ecode}. Forcing removal." ) else: raise Exception( - "There is live sender for(%s). Will not start " - "a new one." % sender_key + f"There is live sender for({sender_key}). Will not start a new one." ) receiver_ip = self._get_receiver_ip(replica) rt_qs = ReplicaTrail.objects.filter(replica=replica).order_by("-id") last_rt = rt_qs[0] if (len(rt_qs) > 0) else None if last_rt is None: - logger.debug("Starting a new Sender(%s)." % sender_key) + logger.debug(f"Starting a new Sender({sender_key}).") self.senders[sender_key] = Sender(self.uuid, receiver_ip, replica) elif last_rt.status == "succeeded": - logger.debug("Starting a new Sender(%s)" % sender_key) + logger.debug(f"Starting a new Sender({sender_key}).") self.senders[sender_key] = Sender(self.uuid, receiver_ip, replica, last_rt) elif last_rt.status == "pending": msg = ( - "Replica trail shows a pending Sender(%s), but it is not " - "alive. Marking it as failed. Will not start a new one." % sender_key + f"Replica trail shows a pending Sender({sender_key}), but it is not alive. " + "Marking it as failed. Will not start a new one." ) logger.error(msg) data = { @@ -149,19 +149,16 @@ def _process_send(self, replica): num_tries = num_tries + 1 if num_tries >= self.MAX_ATTEMPTS: msg = ( - "Maximum attempts(%d) reached for Sender(%s). " - "A new one " - "will not be started and the Replica task will be " - "disabled." % (self.MAX_ATTEMPTS, sender_key) + f"Maximum attempts({self.MAX_ATTEMPTS}) reached for Sender({sender_key}). " + "A new one will not be started and the Replica task will be disabled." ) logger.error(msg) self.disable_replica(replica.id) raise Exception(msg) logger.debug( - "previous backup failed for Sender(%s). " - "Starting a new one. Attempt %d/%d." - % (sender_key, num_tries, self.MAX_ATTEMPTS) + f"previous backup failed for Sender({sender_key}). " + f"Starting a new one. Attempt {num_tries}/{self.MAX_ATTEMPTS}." ) try: last_success_rt = ReplicaTrail.objects.filter( @@ -169,8 +166,8 @@ def _process_send(self, replica): ).latest("id") except ReplicaTrail.DoesNotExist: logger.debug( - "No record of last successful ReplicaTrail for " - "Sender(%s). Will start a new Full Sender." % sender_key + f"No record of last successful ReplicaTrail for Sender({sender_key}). " + f"Will start a new Full Sender." ) last_success_rt = None self.senders[sender_key] = Sender( @@ -178,8 +175,8 @@ def _process_send(self, replica): ) else: msg = ( - "Unexpected ReplicaTrail status(%s) for Sender(%s). " - "Will not start a new one." % (last_rt.status, sender_key) + f"Unexpected ReplicaTrail status({last_rt.status}) for Sender({sender_key}). " + f"Will not start a new one." ) raise Exception(msg) @@ -199,43 +196,63 @@ def run(self): except NetworkConnection.DoesNotExist: self.listener_interface = "0.0.0.0" except Exception as e: - msg = ( - "Failed to fetch network interface for Listner/Broker. " - "Exception: %s" % e.__str__() - ) + msg = f"Failed to fetch network interface for Listener/Broker. Exception: {e.__str__()}" return logger.error(msg) try: + # DB query returns type self.uuid = Appliance.objects.get(current_appliance=True).uuid except Exception as e: - msg = ( - "Failed to get uuid of current appliance. Aborting. " - "Exception: %s" % e.__str__() - ) + msg = f"Failed to get uuid of current appliance. Aborting. Exception: {e.__str__()}" return logger.error(msg) + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.send_multipart + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.copy_threshold + logger.debug("DISABLING COPY_THRESHOLD to enable message tracking.") + zmq.COPY_THRESHOLD = 0 + ctx = zmq.Context() - frontend = ctx.socket(zmq.ROUTER) - frontend.set_hwm(10) - frontend.bind("tcp://%s:%d" % (self.listener_interface, self.listener_port)) - backend = ctx.socket(zmq.ROUTER) - backend.bind("ipc://%s" % settings.REPLICATION.get("ipc_socket")) + # FRONTEND: IP + frontend = ctx.socket(zmq.ROUTER) # Sender socket. + # frontend.set_hwm(value=10) + # Bind to tcp://interface:port + frontend.bind(f"tcp://{self.listener_interface}:{self.listener_port}") + + # BACKEND: IPC / UNIX SOCKET + backend = ctx.socket(zmq.ROUTER) # Sender socket + ipc_socket = settings.REPLICATION.get("ipc_socket") # /var/run/replication.sock + backend.bind(f"ipc://{ipc_socket}") + # POLLER + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#polling poller = zmq.Poller() + # Register our poller, for both sockets, to monitor for POLLIN events. poller.register(frontend, zmq.POLLIN) poller.register(backend, zmq.POLLIN) - self.local_receivers = {} - iterations = 10 - poll_interval = 6000 # 6 seconds + iterations = 5 msg_count = 0 while True: # This loop may still continue even if replication service # is terminated, as long as data is coming in. - socks = dict(poller.poll(timeout=poll_interval)) - if frontend in socks and socks[frontend] == zmq.POLLIN: + # Get all events: returns imidiately if any exist, or waits for timeout. + # Event list of tuples of the form (socket, event_mask)): + events_list = poller.poll(timeout=2000) # Max wait period in milliseconds + logger.debug(f"EVENT_LIST poll = {events_list}") + # Dictionary mapping of socket : event_mask. + events = dict(events_list) + if frontend in events and events[frontend] == zmq.POLLIN: + # frontend.recv_multipart() returns all as type address, command, msg = frontend.recv_multipart() + # Avoid debug logging the btrfs-send-stream contents. + if command == b"" and msg != b"": + logger.debug("frontend.recv_multipart() -> command=b'', msg assumed BTRFS SEND BYTE STREAM") + else: + logger.debug( + f"frontend.recv_multipart() -> address={address}, command={command}, msg={msg}" + ) + # Keep a numerical events tally of per remote sender's events: if address not in self.remote_senders: self.remote_senders[address] = 1 else: @@ -245,101 +262,115 @@ def run(self): msg_count = 0 for rs, count in self.remote_senders.items(): logger.debug( - "Active Receiver: %s. Messages processed:" - "%d" % (rs, count) + f"Active Receiver: {rs}. Messages processed: {count}" ) - if command == "sender-ready": - logger.debug("initial greeting from %s" % address) + + if command == b"sender-ready": + logger.debug( + f"initial greeting command '{command}' received from {address}" + ) # Start a new receiver and send the appropriate response try: start_nr = True - if address in self.local_receivers: + if address in self.local_receivers.keys(): start_nr = False ecode = self.local_receivers[address].exitcode if ecode is not None: del self.local_receivers[address] logger.debug( - "Receiver(%s) exited. exitcode: " - "%s. Forcing removal from broker " - "list." % (address, ecode) + f"Receiver({address}) exited. exitcode: {ecode}. Forcing removal from broker list." ) start_nr = True else: - msg = ( - "Receiver(%s) already exists. " - "Will not start a new one." % address + msg = f"Receiver({address}) already exists. Will not start a new one.".encode( + "utf-8" ) logger.error(msg) - # @todo: There may be a different way to handle - # this. For example, we can pass the message to - # the active receiver and factor into it's - # retry/robust logic. But that is for later. + # TODO: There may be a different way to handle + # this. For example, we can pass the message to + # the active receiver and factor into its + # retry/robust logic. But that is for later. frontend.send_multipart( - [address, "receiver-init-error", msg] + [ + address, + b"receiver-init-error", + msg, + ] ) if start_nr: nr = Receiver(address, msg) nr.daemon = True nr.start() - logger.debug("New Receiver(%s) started." % address) + logger.debug(f"New Receiver({address}) started.") self.local_receivers[address] = nr continue except Exception as e: - msg = ( - "Exception while starting the " - "new receiver for %s: %s" % (address, e.__str__()) + msg = f"Exception while starting the new receiver for {address}: {e.__str__()}".encode( + "utf-8" ) logger.error(msg) - frontend.send_multipart([address, "receiver-init-error", msg]) + frontend.send_multipart([address, b"receiver-init-error", msg]) else: # do we hit hwm? is the dealer still connected? backend.send_multipart([address, command, msg]) - elif backend in socks and socks[backend] == zmq.POLLIN: + elif backend in events and events[backend] == zmq.POLLIN: + # backend.recv_multipart() returns all as type address, command, msg = backend.recv_multipart() - if command == "new-send": + # In the following conditional: + # if redefines msg as str + # elif leaves msg as bytes + if command == b"new-send": rid = int(msg) - logger.debug("new-send request received for %d" % rid) - rcommand = "ERROR" + logger.debug(f"new-send request received for {rid}") + rcommand = b"ERROR" try: replica = Replica.objects.get(id=rid) if replica.enabled: self._process_send(replica) - msg = ( - "A new Sender started successfully for " - "Replication Task(%d)." % rid - ) - rcommand = "SUCCESS" + msg = f"A new Sender started successfully for Replication Task({rid})." + rcommand = b"SUCCESS" else: - msg = ( - "Failed to start a new Sender for " - "Replication " - "Task(%d) because it is disabled." % rid - ) + msg = f"Failed to start a new Sender for Replication Task({rid}) because it is disabled." except Exception as e: - msg = ( - "Failed to start a new Sender for Replication " - "Task(%d). Exception: %s" % (rid, e.__str__()) - ) + msg = f"Failed to start a new Sender for Replication Task({rid}). Exception: {e.__str__()}" logger.error(msg) finally: - backend.send_multipart([address, rcommand, str(msg)]) - elif address in self.remote_senders: - if command in ( - "receiver-ready", - "receiver-error", - "btrfs-recv-finished", - ): # noqa E501 - logger.debug("Identitiy: %s command: %s" % (address, command)) - backend.send_multipart([address, b"ACK", ""]) + backend.send_multipart([address, rcommand, msg.encode("utf-8")]) + elif address in self.remote_senders.keys(): + logger.debug( + f"Identity/address {address}, found in remote_senders.keys()" + ) + if ( + command == b"receiver-ready" + or command == b"receiver-error" + or command == b"btrfs-recv-finished" + ): + logger.debug(f"command: {command}, sending 'ACK' to backend.") + tracker = backend.send_multipart( + [address, b"ACK", b""], copy=False, track=True + ) + if not tracker.done: + logger.debug( + f"Waiting max 2 seconds for send of commmand ({command})" + ) + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone + tracker.wait( + timeout=2 + ) # seconds as float: raises zmq.NotDone # a new receiver has started. reply to the sender that # must be waiting - frontend.send_multipart([address, command, msg]) - + logger.debug(f"command: {command}, sending to frontend.") + tracker = frontend.send_multipart( + [address, command, msg], copy=False, track=True + ) + if not tracker.done: + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone + tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone else: iterations -= 1 if iterations == 0: - iterations = 10 + iterations = 5 self._prune_senders() self._delete_receivers() cur_time = time.time() diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index fb684933b..0f1d14c79 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2020 RockStor, Inc. +Copyright (c) 2012-2023 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -13,7 +13,7 @@ General Public License for more details. You should have received a copy of the GNU General Public License -along with this program. If not, see . +along with this program. If not, see . """ from multiprocessing import Process @@ -26,43 +26,52 @@ from django.conf import settings from django import db from contextlib import contextmanager -from util import ReplicationMixin -from fs.btrfs import get_oldest_snap, remove_share, set_property, is_subvol, mount_share +from smart_manager.replication.util import ReplicationMixin +from fs.btrfs import ( + get_oldest_snap, + remove_share, + set_property, + is_subvol, + mount_share, + BTRFS, +) from system.osi import run_command from storageadmin.models import Pool, Share, Appliance from smart_manager.models import ReplicaShare, ReceiveTrail -import shutil from cli import APIWrapper import logging logger = logging.getLogger(__name__) -BTRFS = "/sbin/btrfs" - class Receiver(ReplicationMixin, Process): - def __init__(self, identity, meta): - self.identity = identity + total_bytes_received: int + sname: str + + def __init__(self, identity: bytes, meta: bytes): + self.sender_ip = None + self.poller = None + self.dealer = None + self.law = None + self.identity = identity # Otherwise knows as address. self.meta = json.loads(meta) self.src_share = self.meta["share"] self.dest_pool = self.meta["pool"] self.incremental = self.meta["incremental"] self.snap_name = self.meta["snap"] self.sender_id = self.meta["uuid"] - self.sname = "%s_%s" % (self.sender_id, self.src_share) - self.snap_dir = "%s%s/.snapshots/%s" % ( - settings.MNT_PT, - self.dest_pool, - self.sname, - ) - + self.sname = f"{self.sender_id}_{self.src_share}" + self.snap_dir = f"{settings.MNT_PT}{self.dest_pool}/.snapshots/{self.sname}" self.ppid = os.getpid() self.kb_received = 0 self.rid = None self.rtid = None # We mirror senders max_snap_retain via settings.REPLICATION self.num_retain_snaps = settings.REPLICATION.get("max_snap_retain") + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Context self.ctx = zmq.Context() + self.zmq_version = zmq.__version__ + self.libzmq_version = zmq.zmq_version() self.rp = None self.raw = None self.ack = False @@ -77,14 +86,11 @@ def _sys_exit(self, code): self.rp.terminate() except Exception as e: logger.error( - "Id: %s. Exception while terminating " - "the btrfs-recv process: %s" % (self.identity, e.__str__()) + f"Id: {self.identity}. Exception while terminating the btrfs-recv process: {e.__str__()}" ) self.ctx.destroy(linger=0) if code == 0: - logger.debug( - "Id: %s. meta: %s Receive successful" % (self.identity, self.meta) - ) + logger.debug(f"Id: {self.identity}. meta: {self.meta} Receive successful") sys.exit(code) @contextmanager @@ -92,7 +98,7 @@ def _clean_exit_handler(self): try: yield except Exception as e: - logger.error("%s. Exception: %s" % (self.msg, e.__str__())) + logger.error(f"{self.msg}. Exception: {e.__str__()}") if self.rtid is not None: try: data = { @@ -101,40 +107,31 @@ def _clean_exit_handler(self): } self.update_receive_trail(self.rtid, data) except Exception as e: - msg = ( - "Id: %s. Exception while updating receive " - "trail for rtid(%d)." % (self.identity, self.rtid) - ) - logger.error("%s. Exception: %s" % (msg, e.__str__())) - + msg = f"Id: {self.identity}. Exception while updating receive trail for rtid({self.rtid})." + logger.error(f"{msg}. Exception: {e.__str__()}") if self.ack is True: try: - command = "receiver-error" + command = b"receiver-error" self.dealer.send_multipart( [ - "receiver-error", - b"%s. Exception: %s" % (str(self.msg), str(e.__str__())), + b"receiver-error", + f"{self.msg}. Exception: {e.__str__()}".encode("utf-8"), ] ) # Retry logic here is overkill atm. - socks = dict(self.poll.poll(60000)) # 60 seconds - if socks.get(self.dealer) == zmq.POLLIN: + events = dict(self.poller.poll(60000)) # 60 seconds + if events.get(self.dealer) == zmq.POLLIN: msg = self.dealer.recv() logger.debug( - "Id: %s. Response from the broker: %s" - % (self.identity, msg) + f"Id: {self.identity}. Response from the broker: {msg}" ) else: logger.debug( - "Id: %s. No response received from " - "the broker" % self.identity + f"Id: {self.identity}. No response received from the broker" ) except Exception as e: - msg = ( - "Id: %s. Exception while sending %s back " - "to the broker. Aborting" % (self.identity, command) - ) - logger.error("%s. Exception: %s" % (msg, e.__str__())) + msg = f"Id: {self.identity}. Exception while sending {command} back to the broker. Aborting" + logger.error(f"{msg}. Exception: {e.__str__()}") self._sys_exit(3) def _delete_old_snaps(self, share_name, share_path, num_retain): @@ -143,58 +140,74 @@ def _delete_old_snaps(self, share_name, share_path, num_retain): if self.delete_snapshot(share_name, oldest_snap): return self._delete_old_snaps(share_name, share_path, num_retain) - def _send_recv(self, command, msg=""): - rcommand = rmsg = None - self.dealer.send_multipart([command, msg]) - # Retry logic doesn't make sense atm. So one long patient wait. - socks = dict(self.poll.poll(60000)) # 60 seconds. - if socks.get(self.dealer) == zmq.POLLIN: + def _send_recv(self, command: bytes, msg: bytes = b""): + logger.debug( + f"_send_recv called with command: {command}, msg: {msg}." + ) + rcommand = rmsg = b"" + tracker = self.dealer.send_multipart([command, msg], copy=False, track=True) + if not tracker.done: + logger.debug(f"Waiting max 2 seconds for send of commmand ({command})") + tracker.wait(timeout=2) # seconds as float + # Note: And exception here would inform the receiver within the WebUI record. + events = dict(self.poller.poll(timeout=5000)) + if events.get(self.dealer) == zmq.POLLIN: rcommand, rmsg = self.dealer.recv_multipart() logger.debug( - "Id: %s command: %s rcommand: %s" % (self.identity, command, rcommand) + f"Id: {self.identity} _send_recv command: {command} rcommand: {rcommand}" ) + logger.debug(f"remote message: {rmsg}") return rcommand, rmsg - def _latest_snap(self, rso): + def _latest_snap_name(self, rso) -> str | None: for snap in ReceiveTrail.objects.filter( rshare=rso, status="succeeded" ).order_by("-id"): - if is_subvol("%s/%s" % (self.snap_dir, snap.snap_name)): - return str(snap.snap_name) # cannot be unicode for zmq message + if is_subvol(f"{self.snap_dir}/{snap.snap_name}"): + return str(snap.snap_name) logger.error( - "Id: %s. There are no replication snapshots on the " - "system for " - "Share(%s)." % (self.identity, rso.share) + f"Id: {self.identity}. There are no replication snapshots on the system for Share({rso.share})." ) # This would mean, a full backup transfer is required. return None def run(self): logger.debug( - "Id: %s. Starting a new Receiver for meta: %s" % (self.identity, self.meta) + f"Id: {self.identity}. Starting a new Receiver for meta: {self.meta}" ) - self.msg = "Top level exception in receiver" + + self.msg = b"Top level exception in receiver" latest_snap = None with self._clean_exit_handler(): self.law = APIWrapper() - self.poll = zmq.Poller() - self.dealer = self.ctx.socket(zmq.DEALER) - self.dealer.setsockopt_string(zmq.IDENTITY, u"%s" % self.identity) - self.dealer.set_hwm(10) - self.dealer.connect("ipc://%s" % settings.REPLICATION.get("ipc_socket")) - self.poll.register(self.dealer, zmq.POLLIN) + self.poller = zmq.Poller() + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket + self.dealer = self.ctx.socket( + zmq.DEALER, copy_threshold=0 + ) # Setup OUTPUT socket type. + # self.dealer.set_hwm(10) + ipc_socket = settings.REPLICATION.get("ipc_socket") + # Identity must be set before connection. + self.dealer.setsockopt(zmq.IDENTITY, self.identity) + self.dealer.connect(f"ipc://{ipc_socket}") + # Register our poller, for OUTPUT socket, to monitor for POLLIN events. + self.poller.register(self.dealer, zmq.POLLIN) self.ack = True - self.msg = "Failed to get the sender ip for appliance: %s" % self.sender_id + self.msg = ( + f"Failed to get the sender ip for appliance: {self.sender_id}. " + "Ensure receiver has sender in System -> Appliances.".encode("utf-8") + ) self.sender_ip = Appliance.objects.get(uuid=self.sender_id).ip if not self.incremental: - self.msg = "Failed to verify/create share: %s." % self.sname + self.msg = f"Failed to verify/create share: {self.sname}.".encode( + "utf-8" + ) self.create_share(self.sname, self.dest_pool) - self.msg = ( - "Failed to create the replica metadata object " - "for share: %s." % self.sname + self.msg = f"Failed to create the replica metadata object for share: {self.sname}.".encode( + "utf-8" ) data = { "share": self.sname, @@ -203,27 +216,28 @@ def run(self): } self.rid = self.create_rshare(data) else: - self.msg = ( - "Failed to retreive the replica metadata " - "object for share: %s." % self.sname + self.msg = f"Failed to retreive the replica metadata object for share: {self.sname}.".encode( + "utf-8" ) rso = ReplicaShare.objects.get(share=self.sname) self.rid = rso.id # Find and send the current snapshot to the sender. This will # be used as the start by btrfs-send diff. self.msg = ( - "Failed to verify latest replication snapshot on the system." + b"Failed to verify latest replication snapshot on the system." ) - latest_snap = self._latest_snap(rso) + latest_snap = self._latest_snap_name(rso) - self.msg = "Failed to create receive trail for rid: %d" % self.rid + self.msg = f"Failed to create receive trail for rid: {self.rid}".encode( + "utf-8" + ) data = { "snap_name": self.snap_name, } self.rtid = self.create_receive_trail(self.rid, data) # delete the share, move the oldest snap to share - self.msg = "Failed to promote the oldest Snapshot to Share." + self.msg = b"Failed to promote the oldest Snapshot to Share." oldest_snap = get_oldest_snap( self.snap_dir, self.num_retain_snaps, regex="_replication_" ) @@ -232,42 +246,43 @@ def run(self): self.refresh_share_state() self.refresh_snapshot_state() - self.msg = "Failed to prune old Snapshots" + self.msg = b"Failed to prune old Snapshots" self._delete_old_snaps(self.sname, self.snap_dir, self.num_retain_snaps + 1) # TODO: The following should be re-instantiated once we have a - # TODO: working method for doing so. see validate_src_share. - # self.msg = ('Failed to validate the source share(%s) on ' - # 'sender(uuid: %s ' - # ') Did the ip of the sender change?' % - # (self.src_share, self.sender_id)) + # working method for doing so. see validate_src_share. + # self.msg = ( + # f"Failed to validate the source share ({self.src_share}) on sender uuid: ({self.sender_id}). " + # f"Did the ip of the sender change?" + # ).encode("utf-8") # self.validate_src_share(self.sender_id, self.src_share) - sub_vol = "%s%s/%s" % (settings.MNT_PT, self.dest_pool, self.sname) + sub_vol = f"{settings.MNT_PT}{self.dest_pool}/{self.sname}" if not is_subvol(sub_vol): - self.msg = "Failed to create parent subvolume %s" % sub_vol + self.msg = f"Failed to create parent subvolume {sub_vol}".encode( + "utf-8" + ) run_command([BTRFS, "subvolume", "create", sub_vol]) - self.msg = "Failed to create snapshot directory: %s" % self.snap_dir + self.msg = f"Failed to create snapshot directory: {self.snap_dir}".encode( + "utf-8" + ) run_command(["/usr/bin/mkdir", "-p", self.snap_dir]) - snap_fp = "%s/%s" % (self.snap_dir, self.snap_name) + snap_fp = f"{self.snap_dir}/{self.snap_name}" # If the snapshot already exists, presumably from the previous # attempt and the sender tries to send the same, reply back with # snap_exists and do not start the btrfs-receive if is_subvol(snap_fp): logger.debug( - "Id: %s. Snapshot to be sent(%s) already " - "exists. Not starting a new receive process" - % (self.identity, snap_fp) + f"Id: {self.identity}. Snapshot to be sent({snap_fp}) already exists. Not starting a new receive process" ) - self._send_recv("snap-exists") + self._send_recv(b"snap-exists") self._sys_exit(0) cmd = [BTRFS, "receive", self.snap_dir] - self.msg = ( - "Failed to start the low level btrfs receive " - "command(%s). Aborting." % cmd + self.msg = f"Failed to start the low level btrfs receive command({cmd}). Aborting.".encode( + "utf-8" ) self.rp = subprocess.Popen( cmd, @@ -277,133 +292,128 @@ def run(self): stderr=subprocess.PIPE, ) - self.msg = "Failed to send receiver-ready" - rcommand, rmsg = self._send_recv("receiver-ready", latest_snap or "") - if rcommand is None: + self.msg = b"Failed to send receiver-ready" + # Previously our second parameter was (latest_snap or b"") + if latest_snap is None: + snap_name = b"" + else: + snap_name = latest_snap.encode("utf8") + rcommand, rmsg = self._send_recv(b"receiver-ready", snap_name) + if rcommand == b"": logger.error( - "Id: %s. No response from the broker for " - "receiver-ready command. Aborting." % self.identity + f"Id: {self.identity}. No response from the broker for receiver-ready command. Aborting." ) self._sys_exit(3) - term_commands = ( - "btrfs-send-init-error", - "btrfs-send-unexpected-termination-error", - "btrfs-send-nonzero-termination-error", - ) num_tries = 10 - poll_interval = 6000 # 6 seconds num_msgs = 0 - t0 = time.time() + start_time = time.time() while True: - socks = dict(self.poll.poll(poll_interval)) - if socks.get(self.dealer) == zmq.POLLIN: - # reset to wait upto 60(poll_interval x num_tries - # milliseconds) for every message + events = dict(self.poller.poll(timeout=6000)) # 6 seconds + logger.debug(f"Events dict = {events}") + if events.get(self.dealer) == zmq.POLLIN: num_tries = 10 command, message = self.dealer.recv_multipart() - if command == "btrfs-send-stream-finished": + logger.debug(f"command = {command}") + if command == b"btrfs-send-stream-finished": # this command concludes fsdata transfer. After this, # btrfs-recev process should be # terminated(.communicate). + # poll() returns None while process is running: rc otherwise. if self.rp.poll() is None: - self.msg = "Failed to terminate btrfs-recv command" + self.msg = b"Failed to terminate btrfs-recv command" out, err = self.rp.communicate() - out = out.split("\n") - err = err.split("\n") + out = out.split(b"\n") + err = err.split(b"\n") logger.debug( - "Id: %s. Terminated btrfs-recv. " - "cmd = %s out = %s err: %s rc: %s" - % (self.identity, cmd, out, err, self.rp.returncode) + f"Id: {self.identity}. Terminated btrfs-recv. cmd = {cmd} out = {out} err: {err} rc: {self.rp.returncode}" ) if self.rp.returncode != 0: - self.msg = ( - "btrfs-recv exited with unexpected " - "exitcode(%s). " % self.rp.returncode + self.msg = f"btrfs-recv exited with unexpected exitcode({self.rp.returncode}).".encode( + "utf-8" ) raise Exception(self.msg) + total_kb_received = int(self.total_bytes_received / 1024) data = { "status": "succeeded", - "kb_received": self.total_bytes_received / 1024, + "kb_received": total_kb_received, } - self.msg = ( - "Failed to update receive trail for rtid: %d" % self.rtid + self.msg = f"Failed to update receive trail for rtid: {self.rtid}".encode( + "utf-8" ) self.update_receive_trail(self.rtid, data) - self._send_recv("btrfs-recv-finished") + self._send_recv(b"btrfs-recv-finished") self.refresh_share_state() self.refresh_snapshot_state() - dsize, drate = self.size_report(self.total_bytes_received, t0) + dsize, drate = self.size_report( + self.total_bytes_received, start_time + ) logger.debug( - "Id: %s. Receive complete. Total data " - "transferred: %s. Rate: %s/sec." - % (self.identity, dsize, drate) + f"Id: {self.identity}. Receive complete. Total data transferred: {dsize}. Rate: {drate}/sec." ) self._sys_exit(0) - if command in term_commands: - self.msg = ( - "Terminal command(%s) received from the " - "sender. Aborting." % command + if ( + command == b"btrfs-send-init-error" + or command == b"btrfs-send-unexpected-termination-error" + or command == b"btrfs-send-nonzero-termination-error" + ): + self.msg = f"Terminal command({command}) received from the sender. Aborting.".encode( + "utf-8" ) raise Exception(self.msg) + # poll() returns None while process is running: return code otherwise. if self.rp.poll() is None: self.rp.stdin.write(message) self.rp.stdin.flush() # @todo: implement advanced credit request system. - self.dealer.send_multipart([b"send-more", ""]) + self.dealer.send_multipart([b"send-more", b""]) num_msgs += 1 self.total_bytes_received += len(message) if num_msgs == 1000: num_msgs = 0 + total_kb_received = int(self.total_bytes_received / 1024) data = { "status": "pending", - "kb_received": self.total_bytes_received / 1024, + "kb_received": total_kb_received, } self.update_receive_trail(self.rtid, data) dsize, drate = self.size_report( - self.total_bytes_received, t0 + self.total_bytes_received, start_time ) logger.debug( - "Id: %s. Receiver alive. Data " - "transferred: %s. Rate: %s/sec." - % (self.identity, dsize, drate) + f"Id: {self.identity}. Receiver alive. Data transferred: {dsize}. Rate: {drate}/sec." ) - else: + else: # receive process has stopped: out, err = self.rp.communicate() - out = out.split("\n") - err = err.split("\n") + out = out.split(b"\n") + err = err.split(b"\n") logger.error( - "Id: %s. btrfs-recv died unexpectedly. " - "cmd: %s out: %s. err: %s" % (self.identity, cmd, out, err) + f"Id: {self.identity}. btrfs-recv died unexpectedly. " + f"cmd: {cmd} out: {out}. err: {err}" ) msg = ( - "Low level system error from btrfs receive " - "command. cmd: %s out: %s err: %s for rtid: %s" - % (cmd, out, err, self.rtid) - ) + f"Low level system error from btrfs receive command. " + f"cmd: {cmd} out: {out} err: {err} for rtid: {self.rtid}" + ).encode("utf-8") data = { "status": "failed", "error": msg, } self.msg = ( - "Failed to update receive trail for " - "rtid: %d." % self.rtid - ) + f"Failed to update receive trail for rtid: {self.rtid}." + ).encode("utf-8") self.update_receive_trail(self.rtid, data) self.msg = msg raise Exception(self.msg) else: num_tries -= 1 - msg = ( - "No response received from the broker. " - "remaining tries: %d" % num_tries - ) - logger.error("Id: %s. %s" % (self.identity, msg)) + msg = f"No response received from the broker. remaining tries: {num_tries}" + logger.error(f"Id: {self.identity}. {msg}") if num_tries == 0: - self.msg = "%s. Terminating the receiver." % msg + self.msg = f"{msg}. Terminating the receiver.".encode("utf-8") raise Exception(self.msg) diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index 41c8b0fa9..ad593f811 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -1,5 +1,5 @@ """ -Copyright (c) 2012-2020 RockStor, Inc. +Copyright (c) 2012-2023 RockStor, Inc. This file is part of RockStor. RockStor is free software; you can redistribute it and/or modify @@ -13,7 +13,7 @@ General Public License for more details. You should have received a copy of the GNU General Public License -along with this program. If not, see . +along with this program. If not, see . """ from multiprocessing import Process @@ -21,13 +21,12 @@ import sys import zmq import subprocess -import fcntl import json import time from django.conf import settings from contextlib import contextmanager -from util import ReplicationMixin -from fs.btrfs import get_oldest_snap, is_subvol +from smart_manager.replication.util import ReplicationMixin +from fs.btrfs import get_oldest_snap, is_subvol, BTRFS from smart_manager.models import ReplicaTrail from cli import APIWrapper from django import db @@ -35,29 +34,37 @@ logger = logging.getLogger(__name__) -BTRFS = "/sbin/btrfs" - class Sender(ReplicationMixin, Process): - def __init__(self, uuid, receiver_ip, replica, rt=None): + uuid: str + total_bytes_sent: int + identity: str + rlatest_snap: str | None + + def __init__(self, uuid: str, receiver_ip, replica, rt: int | None = None): + self.law = None + self.poller = None self.uuid = uuid self.receiver_ip = receiver_ip self.receiver_port = replica.data_port self.replica = replica # TODO: may need to send local shareId so it can be verifed remotely - self.snap_name = "%s_%d_replication" % (replica.share, replica.id) - self.snap_name += "_1" if (rt is None) else "_%d" % (rt.id + 1) - self.snap_id = "%s_%s" % (self.uuid, self.snap_name) + self.snap_name = f"{replica.share}_{replica.id}_replication" + self.snap_name += "_1" if (rt is None) else f"_{rt.id + 1}" + self.snap_id = f"{self.uuid}_{self.snap_name}" self.rt = rt self.rt2 = None self.rt2_id = None self.rid = replica.id - self.identity = u"%s-%s" % (self.uuid, self.rid) + self.identity = f"{self.uuid}-{self.rid}" self.sp = None # Latest snapshot per Receiver(comes along with receiver-ready) self.rlatest_snap = None + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Context self.ctx = zmq.Context() - self.msg = "" + self.zmq_version = zmq.__version__ + self.libzmq_version = zmq.zmq_version() + self.msg = b"" self.update_trail = False self.total_bytes_sent = 0 self.ppid = os.getpid() @@ -70,20 +77,17 @@ def _clean_exit_handler(self): try: yield except Exception as e: - logger.error( - "Id: %s. %s. Exception: %s" % (self.identity, self.msg, e.__str__()) - ) + logger.error(f"Id: {self.identity}. {self.msg}. Exception: {e.__str__()}") if self.update_trail: try: data = { "status": "failed", - "error": "%s. Exception: %s" % (self.msg, e.__str__()), + "error": f"{self.msg}. Exception: {e.__str__()}", } # noqa E501 self.update_replica_status(self.rt2_id, data) except Exception as e: logger.error( - "Id: %s. Exception occured while updating " - "replica status: %s" % (self.identity, e.__str__()) + f"Id: {self.identity}. Exception occurred while updating replica status: {e.__str__()}" ) self._sys_exit(3) @@ -94,9 +98,16 @@ def _sys_exit(self, code): sys.exit(code) def _init_greeting(self): - self.send_req = self.ctx.socket(zmq.DEALER) + logger.debug("_init_greeting() CALLED") + # Create our send (DEALER) socket using our context (ctx) + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket + self.send_req = self.ctx.socket(zmq.DEALER, copy_threshold=0) + # Identity must be set before connection. self.send_req.setsockopt_string(zmq.IDENTITY, self.identity) - self.send_req.connect("tcp://%s:%d" % (self.receiver_ip, self.receiver_port)) + self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}") + # Register our poller to monitor for POLLIN events. + self.poller.register(self.send_req, zmq.POLLIN) + msg = { "pool": self.replica.dpool, "share": self.replica.share, @@ -105,47 +116,52 @@ def _init_greeting(self): "uuid": self.uuid, } msg_str = json.dumps(msg) - self.send_req.send_multipart(["sender-ready", b"%s" % msg_str]) - logger.debug("Id: %s Initial greeting: %s" % (self.identity, msg)) - self.poll.register(self.send_req, zmq.POLLIN) - - def _send_recv(self, command, msg=""): - self.msg = "Failed while send-recv-ing command(%s)" % command - rcommand = rmsg = None - self.send_req.send_multipart([command, b"%s" % msg]) + msg = msg_str.encode("utf-8") + command = b"sender-ready" + rcommand, rmsg = self._send_recv(command, msg, send_only=True) + logger.debug(f"_send_recv(command={command}, msg={msg}) -> {rcommand}, {rmsg}") + logger.debug(f"Id: {self.identity} Initial greeting Done") + + def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False): + # Avoid debug logging the btrfs-send-stream contents. + if command == b"" and msg != b"": + logger.debug("_send_recv(command=b'', msg assumed BTRFS SEND BYTE STREAM)") + else: + logger.debug(f"_send_recv(command={command}, msg={msg}), send_only={send_only}") + self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8") + rcommand = rmsg = b"" + tracker = self.send_req.send_multipart([command, msg], copy=False, track=True) + if not tracker.done: + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone + tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone # There is no retry logic here because it's an overkill at the moment. # If the stream is interrupted, we can only start from the beginning # again. So we wait patiently, but only once. Perhaps we can implement # a buffering or temporary caching strategy to make this part robust. - socks = dict(self.poll.poll(60000)) # 60 seconds. - if socks.get(self.send_req) == zmq.POLLIN: + if send_only: + return command, b"send_only-succeeded" + events = dict(self.poller.poll(60000)) # 60 seconds. + if events.get(self.send_req) == zmq.POLLIN: rcommand, rmsg = self.send_req.recv_multipart() - if ( - len(command) > 0 or (rcommand is not None and rcommand != "send-more") - ) or ( # noqa E501 - len(command) > 0 and rcommand is None + # len(b"") == 0 so change to test for command != b"" instead + if (len(command) > 0 or (rcommand != b"" and rcommand != b"send-more")) or ( + len(command) > 0 and rcommand == b"" ): logger.debug( - "Id: %s Server: %s:%d scommand: %s rcommand: %s" - % ( - self.identity, - self.receiver_ip, - self.receiver_port, - command, - rcommand, - ) + f"Id: {self.identity} Server: {self.receiver_ip}:{self.receiver_port} scommand: {command} rcommand: {rcommand}" ) return rcommand, rmsg - def _delete_old_snaps(self, share_path): + def _delete_old_snaps(self, share_path: str): + logger.debug(f"Sender _delete_old_snaps(share_path={share_path})") oldest_snap = get_oldest_snap( share_path, self.max_snap_retain, regex="_replication_" ) if oldest_snap is not None: - logger.debug( - "Id: %s. Deleting old snapshot: %s" % (self.identity, oldest_snap) + logger.debug(f"Id: {self.identity}. Deleting old snapshot: {oldest_snap}") + self.msg = f"Failed to delete snapshot: {oldest_snap}. Aborting.".encode( + "utf-8" ) - self.msg = "Failed to delete snapshot: %s. Aborting." % oldest_snap if self.delete_snapshot(self.replica.share, oldest_snap): return self._delete_old_snaps(share_path) @@ -155,19 +171,14 @@ def _refresh_rt(self): # it may not be the one refered by self.rt(latest) but a previous one. # We need to make sure to *only* send the incremental send that # receiver expects. - self.msg = "Failed to validate/refresh ReplicaTrail." + self.msg = "Failed to validate/refresh ReplicaTrail.".encode("utf-8") if self.rlatest_snap is None: # Validate/update self.rt to the one that has the expected Snapshot # on the system. for rt in ReplicaTrail.objects.filter( replica=self.replica, status="succeeded" ).order_by("-id"): - snap_path = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.rt.snap_name, - ) + snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.rt.snap_name}" if is_subvol(snap_path): return rt # Snapshots from previous succeeded ReplicaTrails don't actually @@ -184,76 +195,58 @@ def _refresh_rt(self): if self.rt.snap_name != self.rlatest_snap: self.msg = ( "Mismatch on starting snapshot for " - "btrfs-send. Sender picked %s but Receiver wants " - "%s, which takes precedence." % (self.rt.snap_name, self.rlatest_snap) - ) + f"btrfs-send. Sender picked {self.rt.snap_name} but Receiver wants " + f"{self.rlatest_snap}, which takes precedence." + ).encode("utf-8") for rt in ReplicaTrail.objects.filter( replica=self.replica, status="succeeded" ).order_by("-id"): if rt.snap_name == self.rlatest_snap: - self.msg = "%s. successful trail found for %s" % ( - self.msg, - self.rlatest_snap, - ) - snap_path = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.rlatest_snap, + self.msg = f"{self.msg}. successful trail found for {self.rlatest_snap}".encode( + "utf-8" ) + snap_path = f"{settings.MNT_PT}{self.replica.pool}.snapshots/{self.replica.share}/{self.rlatest_snap}" if is_subvol(snap_path): - self.msg = ( - "Snapshot(%s) exists in the system and " - "will be used as the parent" % snap_path + self.msg = f"Snapshot({snap_path}) exists in the system and will be used as the parent".encode( + "utf-8" ) - logger.debug("Id: %s. %s" % (self.identity, self.msg)) + logger.debug(f"Id: {self.identity}. {self.msg}") return rt - self.msg = ( - "Snapshot(%s) does not exist on the system. " - "So cannot use it." % snap_path + self.msg = f"Snapshot({snap_path}) does not exist on the system. So cannot use it.".encode( + "utf-8" ) raise Exception(self.msg) raise Exception( - "%s. No succeeded trail found for %s." % (self.msg, self.rlatest_snap) + f"{self.msg}. No succeeded trail found for {self.rlatest_snap}." ) - snap_path = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.rlatest_snap, - ) + snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.rlatest_snap}" if is_subvol(snap_path): return self.rt raise Exception( - "Parent Snapshot(%s) to use in btrfs-send does not " - "exist in the system." % snap_path + f"Parent Snapshot({snap_path}) to use in btrfs-send does not exist in the system." ) def run(self): - - self.msg = "Top level exception in sender: %s" % self.identity + self.msg = f"Top level exception in sender: {self.identity}".encode("utf-8") with self._clean_exit_handler(): self.law = APIWrapper() - self.poll = zmq.Poller() + self.poller = zmq.Poller() self._init_greeting() - # create a new replica trail if it's the very first time + # Create a new replica trail if it's the very first time, # or if the last one succeeded - self.msg = ( - "Failed to create local replica trail for snap_name:" - " %s. Aborting." % self.snap_name + self.msg = f"Failed to create local replica trail for snap_name: {self.snap_name}. Aborting.".encode( + "utf-8" ) self.rt2 = self.create_replica_trail(self.replica.id, self.snap_name) self.rt2_id = self.rt2["id"] # prune old snapshots. self.update_trail = True - self.msg = "Failed to prune old snapshots" - share_path = "%s%s/.snapshots/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, + self.msg = "Failed to prune old snapshots".encode("utf-8") + share_path = ( + f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}" ) self._delete_old_snaps(share_path) @@ -264,198 +257,199 @@ def run(self): # create a snapshot only if it's not already from a previous # failed attempt. # TODO: If one does exist we fail which seems harsh as we may be - # TODO: able to pickup where we left of depending on the failure. - self.msg = "Failed to create snapshot: %s. Aborting." % self.snap_name + # able to pickup where we left of depending on the failure. + self.msg = f"Failed to create snapshot: {self.snap_name}. Aborting.".encode( + "utf-8" + ) self.create_snapshot(self.replica.share, self.snap_name) retries_left = settings.REPLICATION.get("max_send_attempts") - poll_interval = 6000 # 6 seconds + self.msg = ( + "Place-holder message just after sender snapshot creation".encode( + "utf-8" + ) + ) + while True: - socks = dict(self.poll.poll(poll_interval)) - if socks.get(self.send_req) == zmq.POLLIN: + events_list = self.poller.poll(6000) + logger.debug(f"EVENT_LIST poll = {events_list}") + events = dict(events_list) + logger.debug(f"Events dict = {events}") + if events.get(self.send_req) == zmq.POLLIN: # not really necessary because we just want one reply for # now. - retries_left = settings.REPLICATION.get("max_send_attempts") command, reply = self.send_req.recv_multipart() - if command == "receiver-ready": + logger.debug(f"command = {command}") + if command == b"receiver-ready": if self.rt is not None: - self.rlatest_snap = reply + self.rlatest_snap = reply.decode("utf-8") self.rt = self._refresh_rt() logger.debug( - "Id: %s. command(%s) and message(%s) " - "received. Proceeding to send fsdata." - % (self.identity, command, reply) + f"Id: {self.identity}. command({command}) & message({reply}) received. " + "Proceed to send btrfs_send_stream." ) break else: - if command in "receiver-init-error": - self.msg = ( - "%s received for %s. extended reply: " - "%s. Aborting." % (command, self.identity, reply) + if command == b"receiver-init-error": + self.msg = f"{command} received for {self.identity}. extended reply: {reply}. Aborting.".encode( + "utf-8" ) - elif command == "snap-exists": + elif command == b"snap-exists": logger.debug( - "Id: %s. %s received. Not sending " - "fsdata" % (self.identity, command) + f"Id: {self.identity}. {command} received. Not sending fsdata" ) data = { "status": "succeeded", "error": "snapshot already exists on the receiver", } # noqa E501 - self.msg = ( - "Failed to update replica status for " - "%s" % self.snap_id + self.msg = f"Failed to update replica status for {self.snap_id}".encode( + "utf-8" ) self.update_replica_status(self.rt2_id, data) self._sys_exit(0) else: - self.msg = ( - "unexpected reply(%s) for %s. " - "extended reply: %s. Aborting" - % (command, self.identity, reply) + self.msg = f"unexpected reply({command}) for {self.identity}. extended reply: {reply}. Aborting".encode( + "utf-8" ) raise Exception(self.msg) else: retries_left -= 1 logger.debug( - "Id: %s. No response from receiver. Number " - "of retry attempts left: %d" % (self.identity, retries_left) + f"Id: {self.identity}. No response from receiver. Number of retry attempts left: {retries_left}" ) if retries_left == 0: - self.msg = "Receiver(%s:%d) is unreachable. Aborting." % ( - self.receiver_ip, - self.receiver_port, + self.msg = f"Receiver({self.receiver_ip}:{self.receiver_port}) is unreachable. Aborting.".encode( + "utf-8" ) raise Exception(self.msg) self.send_req.setsockopt(zmq.LINGER, 0) self.send_req.close() - self.poll.unregister(self.send_req) + self.poller.unregister(self.send_req) self._init_greeting() - snap_path = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.snap_name, - ) + snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.snap_name}" cmd = [BTRFS, "send", snap_path] + logger.debug(f"Initial btrfs 'send' cmd {cmd}") if self.rt is not None: - prev_snap = "%s%s/.snapshots/%s/%s" % ( - settings.MNT_PT, - self.replica.pool, - self.replica.share, - self.rt.snap_name, - ) + prev_snap = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.rt.snap_name}" logger.info( - "Id: %s. Sending incremental replica between " - "%s -- %s" % (self.identity, prev_snap, snap_path) + f"Id: {self.identity}. Sending incremental replica between {prev_snap} -- {snap_path}" ) cmd = [BTRFS, "send", "-p", prev_snap, snap_path] + logger.debug(f"Differential btrfs 'send' cmd {cmd}") else: - logger.info( - "Id: %s. Sending full replica: %s" % (self.identity, snap_path) - ) + logger.info(f"Id: {self.identity}. Sending full replica: {snap_path}") try: + # We force en_US to avoid issues on date and number formats + # on non Anglo-Saxon systems (ex. it, es, fr, de, etc) + fake_env = dict(os.environ) + fake_env["LANG"] = "en_US.UTF-8" + # all subprocess in and out are bytes by default. + # https://docs.python.org/3.11/library/subprocess.html#using-the-subprocess-module + # subprocess.run is blocking until execution has finnished. self.sp = subprocess.Popen( cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + # Get current stdout flags: + # stdout_flags = fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_GETFL) + # add via File_SetFlag, O_NONBLOCK (non-blocking) + # fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK) + # Py3 variant of the same: + os.set_blocking(self.sp.stdout.fileno(), False) except Exception as e: - self.msg = ( - "Failed to start the low level btrfs send " - "command(%s). Aborting. Exception: " % (cmd, e.__str__()) + self.msg = f"Failed to start the low level btrfs send command({cmd}). Aborting. Exception: {e.__str__()}".encode( + "utf-8" ) - logger.error("Id: %s. %s" % (self.identity, self.msg)) - self._send_recv("btrfs-send-init-error") + logger.error(f"Id: {self.identity}. {self.msg}") + self._send_recv(b"btrfs-send-init-error") self._sys_exit(3) alive = True num_msgs = 0 - t0 = time.time() + start_time = time.time() while alive: try: + # poll() returns None while process is running: rc otherwise. if self.sp.poll() is not None: logger.debug( - "Id: %s. send process finished " - "for %s. rc: %d. stderr: %s" - % ( - self.identity, - self.snap_id, - self.sp.returncode, - self.sp.stderr.read(), - ) + f"Id: {self.identity}. send process finished for {self.snap_id}. " + f"rc: {self.sp.returncode}. stderr: {self.sp.stderr.read()}" ) alive = False - fs_data = self.sp.stdout.read() - except IOError: + # Read all available data from stdout without blocking (requires bytes stream). + # https://docs.python.org/3/library/io.html#io.BufferedIOBase.read1 + # We limit/chunck this read1 to a set number of bytes per cycle. + # Btrfs uses 256 MB chunk on disk + # Arbitrarily chunk send process stdout via read1() bytes argument + btrfs_send_stream = self.sp.stdout.read1(100000000) + if btrfs_send_stream is None: + logger.debug("sp.stdout empty") + continue + except IOError: # TODO: Non functional in Py3 (Py2.7 behaviour) continue except Exception as e: self.msg = ( - "Exception occurred while reading low " - "level btrfs " - "send data for %s. Aborting." % self.snap_id - ) + f"Exception occurred while reading low level btrfs send data for {self.snap_id}. " + f"Aborting. Exception: {e.__str__()}" + ).encode("utf-8") if alive: self.sp.terminate() self.update_trail = True - self._send_recv("btrfs-send-unexpected-termination-error") + self._send_recv( + b"btrfs-send-unexpected-termination-error", self.msg + ) self._sys_exit(3) - self.msg = ( - "Failed to send fsdata to the receiver for %s. " - "Aborting." % (self.snap_id) + self.msg = f"Failed to send 'btrfs_send_stream' to the receiver for {self.snap_id}. Aborting.".encode( + "utf-8" ) self.update_trail = True - command, message = self._send_recv("", fs_data) - self.total_bytes_sent += len(fs_data) + command, message = self._send_recv(b"", btrfs_send_stream) + self.total_bytes_sent += len(btrfs_send_stream) num_msgs += 1 if num_msgs == 1000: num_msgs = 0 - dsize, drate = self.size_report(self.total_bytes_sent, t0) + dsize, drate = self.size_report(self.total_bytes_sent, start_time) logger.debug( - "Id: %s Sender alive. Data transferred: " - "%s. Rate: %s/sec." % (self.identity, dsize, drate) + f"Id: {self.identity} Sender alive. Data transferred: {dsize}. Rate: {drate}/sec." ) - if command is None or command == "receiver-error": - # command is None when the remote side vanishes. + if command == b"" or command == b"receiver-error": + # command is EMPTY when the remote side vanishes. self.msg = ( - "Got null or error command(%s) message(%s) " - "from the Receiver while" - " transmitting fsdata. Aborting." % (command, message) - ) + f"Got EMPTY or error command ({command}) message ({message}) " + "from the Receiver while transmitting fsdata. Aborting." + ).encode("utf-8") raise Exception(message) if not alive: if self.sp.returncode != 0: # do we mark failed? command, message = self._send_recv( - "btrfs-send-nonzero-termination-error" + b"btrfs-send-nonzero-termination-error" ) else: - command, message = self._send_recv("btrfs-send-stream-finished") + command, message = self._send_recv( + b"btrfs-send-stream-finished" + ) if os.getppid() != self.ppid: logger.error( - "Id: %s. Scheduler exited. Sender for %s " - "cannot go on. " - "Aborting." % (self.identity, self.snap_id) + f"Id: {self.identity}. Scheduler exited. Sender for {self.snap_id} cannot go on. Aborting." ) self._sys_exit(3) - + total_kb_sent = int(self.total_bytes_sent / 1024) data = { "status": "succeeded", - "kb_sent": self.total_bytes_sent / 1024, + "kb_sent": total_kb_sent, } - self.msg = ( - "Failed to update final replica status for %s" - ". Aborting." % self.snap_id + self.msg = f"Failed to update final replica status for {self.snap_id}. Aborting.".encode( + "utf-8" ) self.update_replica_status(self.rt2_id, data) - dsize, drate = self.size_report(self.total_bytes_sent, t0) + dsize, drate = self.size_report(self.total_bytes_sent, start_time) logger.debug( - "Id: %s. Send complete. Total data transferred: %s." - " Rate: %s/sec." % (self.identity, dsize, drate) + f"Id: {self.identity}. Send complete. Total data transferred: {dsize}. Rate: {drate}/sec." ) self._sys_exit(0) diff --git a/src/rockstor/smart_manager/replication/util.py b/src/rockstor/smart_manager/replication/util.py index e6e985503..7f4f11303 100644 --- a/src/rockstor/smart_manager/replication/util.py +++ b/src/rockstor/smart_manager/replication/util.py @@ -17,6 +17,8 @@ """ import time +from typing import Any + from storageadmin.exceptions import RockStorAPIException from storageadmin.models import Appliance, Share from cli import APIWrapper @@ -26,58 +28,64 @@ class ReplicationMixin(object): - def validate_src_share(self, sender_uuid, sname): + def validate_src_share(self, sender_uuid: str, sname: str): url = "https://" if self.raw is None: a = Appliance.objects.get(uuid=sender_uuid) - url = "%s%s:%s" % (url, a.ip, a.mgmt_port) + url = f"{url}{a.ip}:{a.mgmt_port}" self.raw = APIWrapper( client_id=a.client_id, client_secret=a.client_secret, url=url ) # TODO: update url to include senders shareId as sname is now invalid - return self.raw.api_call(url="shares/%s" % sname) + return self.raw.api_call(url=f"shares/{sname}") - def update_replica_status(self, rtid, data): + def update_replica_status(self, rtid: int, data): + logger.debug(f"update_replica_status(rtid={rtid}, data={data})") try: - url = "sm/replicas/trail/%d" % rtid + url = f"sm/replicas/trail/{rtid}" return self.law.api_call(url, data=data, calltype="put") except Exception as e: - msg = "Exception while updating replica(%s) status to %s: %s" % ( - url, - data["status"], - e.__str__(), - ) + msg = f"Exception while updating replica({url}) status to {data['status']}: {e.__str__()}" raise Exception(msg) - def disable_replica(self, rid): + def disable_replica(self, rid: int): try: - url = "sm/replicas/%d" % rid + url = f"sm/replicas/{rid}" headers = { "content-type": "application/json", } return self.law.api_call( url, - data={"enabled": False,}, + data={ + "enabled": False, + }, calltype="put", save_error=False, headers=headers, ) except Exception as e: - msg = "Exception while disabling replica(%s): %s" % (url, e.__str__()) + msg = f"Exception while disabling replica({url}): {e.__str__()}" raise Exception(msg) - def create_replica_trail(self, rid, snap_name): - url = "sm/replicas/trail/replica/%d" % rid + def create_replica_trail(self, rid: int, snap_name: str): + logger.debug(f"Replication create_replica_trail(rid={rid}, snap_name={snap_name})") + url = f"sm/replicas/trail/replica/{rid}" return self.law.api_call( - url, data={"snap_name": snap_name,}, calltype="post", save_error=False + url, + data={ + "snap_name": snap_name, + }, + calltype="post", + save_error=False, ) - def rshare_id(self, sname): - url = "sm/replicas/rshare/%s" % sname + def rshare_id(self, sname: str) -> int: + url = f"sm/replicas/rshare/{sname}" rshare = self.law.api_call(url, save_error=False) return rshare["id"] - def create_rshare(self, data): + def create_rshare(self, data) -> int: + logger.debug(f"create_rshare(data={data})") try: url = "sm/replicas/rshare" rshare = self.law.api_call( @@ -86,26 +94,26 @@ def create_rshare(self, data): return rshare["id"] except RockStorAPIException as e: # Note replica_share.py post() generates this exception message. - if ( - e.detail == "Replicashare(%s) already exists." % data["share"] - ): # noqa E501 + if e.detail == f"Replicashare({data['share']}) already exists.": # noqa E501 return self.rshare_id(data["share"]) raise e - def create_receive_trail(self, rid, data): - url = "sm/replicas/rtrail/rshare/%d" % rid + def create_receive_trail(self, rid: int, data) -> int: + logger.debug(f"create_receive_trail(rid={rid}, data={data})") + url = f"sm/replicas/rtrail/rshare/{rid}" rt = self.law.api_call(url, data=data, calltype="post", save_error=False) + logger.debug(f"create_receive_trail() -> {rt['id']}") return rt["id"] - def update_receive_trail(self, rtid, data): - url = "sm/replicas/rtrail/%d" % rtid + def update_receive_trail(self, rtid: int, data): + url = f"sm/replicas/rtrail/{rtid}" try: return self.law.api_call(url, data=data, calltype="put", save_error=False) except Exception as e: - msg = "Exception while updating receive trail(%s): %s" % (url, e.__str__()) + msg = f"Exception while updating receive trail({url}): {e.__str__()}" raise Exception(msg) - def prune_trail(self, url, days=7): + def prune_trail(self, url: str, days: int = 7): try: data = { "days": days, @@ -114,70 +122,77 @@ def prune_trail(self, url, days=7): url, data=data, calltype="delete", save_error=False ) except Exception as e: - msg = "Exception while pruning trail for url(%s): %s" % (url, e.__str__()) + msg = f"Exception while pruning trail for url({url}): {e.__str__()}" raise Exception(msg) def prune_receive_trail(self, ro): - url = "sm/replicas/rtrail/rshare/%d" % ro.id + url = f"sm/replicas/rtrail/rshare/{ro.id}" return self.prune_trail(url) def prune_replica_trail(self, ro): - url = "sm/replicas/trail/replica/%d" % ro.id + url = f"sm/replicas/trail/replica/{ro.id}" return self.prune_trail(url) - def create_snapshot(self, sname, snap_name, snap_type="replication"): + def create_snapshot(self, sname: str, snap_name: str, snap_type="replication"): try: share = Share.objects.get(name=sname) - url = "shares/%s/snapshots/%s" % (share.id, snap_name) + url = f"shares/{share.id}/snapshots/{snap_name}" return self.law.api_call( - url, data={"snap_type": snap_type,}, calltype="post", save_error=False + url, + data={ + "snap_type": snap_type, + }, + calltype="post", + save_error=False, ) except RockStorAPIException as e: # Note snapshot.py _create() generates this exception message. - if e.detail == ( - "Snapshot ({}) already exists for the share ({})." - ).format(snap_name, sname): + if ( + e.detail + == f"Snapshot ({snap_name}) already exists for the share ({sname})." + ): return logger.debug(e.detail) raise e - def update_repclone(self, sname, snap_name): + def update_repclone(self, sname: str, snap_name: str): """ Call the dedicated create_repclone via it's url to supplant our share with the given snapshot. Intended for use in receive.py to turn the oldest snapshot into an existing share via unmount, mv, mount cycle. - :param sname: Existing share name + :param sname: Existing share-name :param snap_name: Name of snapshot to supplant given share with. :return: False if there is a failure. """ try: share = Share.objects.get(name=sname) - url = "shares/{}/snapshots/{}/repclone".format(share.id, snap_name) + url = f"shares/{share.id}/snapshots/{snap_name}/repclone" return self.law.api_call(url, calltype="post", save_error=False) except RockStorAPIException as e: # TODO: need to look further at the following as command repclone - # TODO: (snapshot.py post) catches Snapshot.DoesNotExist. - # TODO: and doesn't appear to call _delete_snapshot() + # (snapshot.py post) catches Snapshot.DoesNotExist. + # and doesn't appear to call _delete_snapshot() # Note snapshot.py _delete_snapshot() generates this exception msg. - if e.detail == "Snapshot name ({}) does not exist.".format(snap_name): + if e.detail == f"Snapshot name ({snap_name}) does not exist.": logger.debug(e.detail) return False raise e - def delete_snapshot(self, sname, snap_name): + def delete_snapshot(self, sname: str, snap_name: str): try: share = Share.objects.get(name=sname) - url = "shares/%s/snapshots/%s" % (share.id, snap_name) + url = f"shares/{share.id}/snapshots/{snap_name}" self.law.api_call(url, calltype="delete", save_error=False) return True except RockStorAPIException as e: # Note snapshot.py _delete_snapshot() generates this exception msg. - if e.detail == "Snapshot name ({}) does not exist.".format(snap_name): + if e.detail == f"Snapshot name ({snap_name}) does not exist.": logger.debug(e.detail) return False raise e - def create_share(self, sname, pool): + def create_share(self, sname: str, pool: str): + print(f"Replication 'create_share' called with sname {sname}, pool {pool}") try: url = "shares" data = { @@ -193,10 +208,7 @@ def create_share(self, sname, pool): ) except RockStorAPIException as e: # Note share.py post() generates this exception message. - if ( - e.detail == "Share ({}) already exists. Choose a different " - "name.".format(sname) - ): # noqa E501 + if e.detail == f"Share ({sname}) already exists. Choose a different name.": return logger.debug(e.detail) raise e @@ -209,7 +221,7 @@ def refresh_snapshot_state(self): save_error=False, ) except Exception as e: - logger.error("Exception while refreshing Snapshot state: %s" % e.__str__()) + logger.error(f"Exception while refreshing Snapshot state: {e.__str__()}") def refresh_share_state(self): try: @@ -220,22 +232,25 @@ def refresh_share_state(self): save_error=False, ) except Exception as e: - logger.error("Exception while refreshing Share state: %s" % e.__str__()) + logger.error(f"Exception while refreshing Share state: {e.__str__()}") - def humanize_bytes(self, num, units=("Bytes", "KB", "MB", "GB",)): + def humanize_bytes(self, num: float, units=("Bytes", "KB", "MB", "GB")) -> str: """ Recursive routine to establish and then return the most appropriate num expression given the contents of units. Ie 1023 Bytes or 4096 KB :param num: Assumed to be in Byte units. :param units: list of units to recurse through - :return: "1023 Bytes" or "4.28 KB" etc given num=1023 or num=4384 ) + :return: "1023 Bytes" or "4.28 KB" etc. given num=1023 or num=4384 """ if num < 1024 or len(units) == 1: - return "%.2f %s" % (num, units[0]) + return f"{num:.2f} {units[0]}" return self.humanize_bytes(num / 1024, units[1:]) - def size_report(self, num, t0): - t1 = time.time() - dsize = self.humanize_bytes(float(num)) - drate = self.humanize_bytes(float(num / (t1 - t0))) + def size_report(self, num: int, time_started: float): + """ + Takes num of bytes, and a start time, and returns humanized output. + """ + time_now = time.time() + dsize: str = self.humanize_bytes(float(num)) + drate: str = self.humanize_bytes(float(num / (time_now - time_started))) return dsize, drate