diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index 3a1d74d26..d359352e1 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -258,6 +258,7 @@ def run(self): logger.debug( f"Active Receiver: {rs}. Messages processed: {count}" ) + if command == b"sender-ready": logger.debug(f"initial greeting command '{command}' received from {address}") # Start a new receiver and send the appropriate response @@ -294,14 +295,14 @@ def run(self): self.local_receivers[address] = nr continue except Exception as e: - msg = f"Exception while starting the new receiver for {address}: {e.__str__()}" + msg = f"Exception while starting the new receiver for {address}: {e.__str__()}".encode("utf-8") logger.error(msg) frontend.send_multipart( - [address, b"receiver-init-error", msg.encode("utf-8")] + [address, b"receiver-init-error", msg] ) else: # do we hit hwm? is the dealer still connected? - backend.send_multipart([address, command, msg.encode("utf-8")]) + backend.send_multipart([address, command, msg]) elif backend in events and events[backend] == zmq.POLLIN: # backend.recv_multipart() returns all as type @@ -327,7 +328,7 @@ def run(self): finally: backend.send_multipart([address, rcommand, msg.encode("utf-8")]) elif address in self.remote_senders.keys(): - logger.debug(f"Identity/address {address}, found in remove_senders.keys()") + logger.debug(f"Identity/address {address}, found in remote_senders.keys()") if ( command == b"receiver-ready" or command == b"receiver-error" diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index c3da4f313..d3352b76a 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -21,6 +21,7 @@ import sys import zmq import subprocess +import io import fcntl import json import time @@ -122,7 +123,6 @@ def _init_greeting(self): logger.debug(f"_send_recv(command={command}, msg={msg}) -> {rcommand}, {rmsg}") logger.debug(f"Id: {self.identity} Initial greeting Done") - def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False): logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})") self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8") @@ -332,22 +332,34 @@ def run(self): snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.snap_name}" cmd = [BTRFS, "send", snap_path] - logger.debug(f"init btrfs 'send' cmd {cmd}") + logger.debug(f"Initial btrfs 'send' cmd {cmd}") if self.rt is not None: prev_snap = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.rt.snap_name}" logger.info( f"Id: {self.identity}. Sending incremental replica between {prev_snap} -- {snap_path}" ) cmd = [BTRFS, "send", "-p", prev_snap, snap_path] - logger.debug(f"differential btrfs 'send' cmd {cmd}") + logger.debug(f"Differential btrfs 'send' cmd {cmd}") else: logger.info(f"Id: {self.identity}. Sending full replica: {snap_path}") try: + # We force en_US to avoid issues on date and number formats + # on non Anglo-Saxon systems (ex. it, es, fr, de, etc) + fake_env = dict(os.environ) + fake_env["LANG"] = "en_US.UTF-8" + # all subprocess in and out are bytes by default. + # https://docs.python.org/3.11/library/subprocess.html#using-the-subprocess-module + # subprocess.run is blocking until execution has finnished. self.sp = subprocess.Popen( cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) + # Get current stdout flags: + # stdout_flags = fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_GETFL) + # add via File_SetFlag, O_NONBLOCK (non-blocking) + # fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK) + # Py3 variant of the same: + os.set_blocking(self.sp.stdout.fileno(), False) except Exception as e: self.msg = f"Failed to start the low level btrfs send command({cmd}). Aborting. Exception: {e.__str__()}".encode( "utf-8" @@ -361,31 +373,39 @@ def run(self): t0 = time.time() while alive: try: - if self.sp.poll() is not None: + if self.sp.poll() is not None: # poll() returns None while process is running: rc otherwise. logger.debug( f"Id: {self.identity}. send process finished for {self.snap_id}. " f"rc: {self.sp.returncode}. stderr: {self.sp.stderr.read()}" ) alive = False - fs_data = self.sp.stdout.read() - except IOError: + # Read all available data from stdout without blocking (requires bytes stream). + # https://docs.python.org/3/library/io.html#io.BufferedIOBase.read1 + send_data = self.sp.stdout.read1() + if send_data is None: + logger.debug("sp.stdout empty") + continue + except IOError: # TODO: Non functional in Py3 (Py2.7 behaviour) continue except Exception as e: - self.msg = f"Exception occurred while reading low level btrfs send data for {self.snap_id}. Aborting.".encode( - "utf-8" - ) + self.msg = ( + f"Exception occurred while reading low level btrfs send data for {self.snap_id}. " + f"Aborting. Exception: {e.__str__()}" + ).encode("utf-8") if alive: self.sp.terminate() self.update_trail = True - self._send_recv(b"btrfs-send-unexpected-termination-error") + self._send_recv( + b"btrfs-send-unexpected-termination-error", self.msg + ) self._sys_exit(3) - self.msg = f"Failed to send fsdata to the receiver for {self.snap_id}. Aborting.".encode( + self.msg = f"Failed to send 'send_data' to the receiver for {self.snap_id}. Aborting.".encode( "utf-8" ) self.update_trail = True - command, message = self._send_recv(b"", fs_data) - self.total_bytes_sent += len(fs_data) + command, message = self._send_recv(b"", send_data) + self.total_bytes_sent += len(send_data) num_msgs += 1 if num_msgs == 1000: num_msgs = 0