diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index d359352e1..624310d0f 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -245,7 +245,15 @@ def run(self): if frontend in events and events[frontend] == zmq.POLLIN: # frontend.recv_multipart() returns all as type address, command, msg = frontend.recv_multipart() - logger.debug(f"frontend.recv_multipart() -> command={command}, msg={msg}") + # Limit debug msg to 180 chars to avoid MBs of btrfs-send-stream msg in log + if len(msg) > 180: + logger.debug( + f"frontend.recv_multipart() -> address= {address}, command={command}, msg={msg[0:180]} - log spam TRIMMED" + ) + else: + logger.debug( + f"frontend.recv_multipart() -> address= {address}, command={command}, msg={msg}" + ) # Keep a numerical events tally of per remote sender's events: if address not in self.remote_senders: self.remote_senders[address] = 1 @@ -260,7 +268,9 @@ def run(self): ) if command == b"sender-ready": - logger.debug(f"initial greeting command '{command}' received from {address}") + logger.debug( + f"initial greeting command '{command}' received from {address}" + ) # Start a new receiver and send the appropriate response try: start_nr = True @@ -274,7 +284,9 @@ def run(self): ) start_nr = True else: - msg = f"Receiver({address}) already exists. Will not start a new one.".encode("utf-8") + msg = f"Receiver({address}) already exists. Will not start a new one.".encode( + "utf-8" + ) logger.error(msg) # TODO: There may be a different way to handle # this. For example, we can pass the message to @@ -295,11 +307,11 @@ def run(self): self.local_receivers[address] = nr continue except Exception as e: - msg = f"Exception while starting the new receiver for {address}: {e.__str__()}".encode("utf-8") - logger.error(msg) - frontend.send_multipart( - [address, b"receiver-init-error", msg] + msg = f"Exception while starting the new receiver for {address}: {e.__str__()}".encode( + "utf-8" ) + logger.error(msg) + frontend.send_multipart([address, b"receiver-init-error", msg]) else: # do we hit hwm? is the dealer still connected? backend.send_multipart([address, command, msg]) @@ -328,24 +340,36 @@ def run(self): finally: backend.send_multipart([address, rcommand, msg.encode("utf-8")]) elif address in self.remote_senders.keys(): - logger.debug(f"Identity/address {address}, found in remote_senders.keys()") + logger.debug( + f"Identity/address {address}, found in remote_senders.keys()" + ) if ( command == b"receiver-ready" or command == b"receiver-error" or command == b"btrfs-recv-finished" ): logger.debug(f"command: {command}, sending 'ACK' to backend.") - tracker = backend.send_multipart([address, b"ACK", b""], copy=False, track=True) + tracker = backend.send_multipart( + [address, b"ACK", b""], copy=False, track=True + ) if not tracker.done: - logger.debug(f"Waiting max 2 seconds for send of commmand ({command})") + logger.debug( + f"Waiting max 2 seconds for send of commmand ({command})" + ) # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone - tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone + tracker.wait( + timeout=2 + ) # seconds as float: raises zmq.NotDone # a new receiver has started. reply to the sender that # must be waiting logger.debug(f"command: {command}, sending to frontend.") - tracker = frontend.send_multipart([address, command, msg], copy=False, track=True) + tracker = frontend.send_multipart( + [address, command, msg], copy=False, track=True + ) if not tracker.done: - logger.debug(f"Waiting max 2 seconds for send of commmand ({command})") + logger.debug( + f"Waiting max 2 seconds for send of commmand ({command})" + ) # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone else: diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index e4bbbe696..61db60b9f 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -321,7 +321,7 @@ def run(self): # milliseconds) for every message num_tries = 10 command, message = self.dealer.recv_multipart() - logger.debug(f"command = {command}, of type: {type(command)}") + logger.debug(f"command = {command}") if command == b"btrfs-send-stream-finished": # this command concludes fsdata transfer. After this, # btrfs-recev process should be diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index 44a225292..0bc60e59f 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -123,7 +123,13 @@ def _init_greeting(self): logger.debug(f"Id: {self.identity} Initial greeting Done") def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False): - logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})") + # Limit debug msg to 180 chars to avoid MBs of btrfs-send-stream msg in log + if len(msg) > 180: + logger.debug( + f"_send_recv(command={command}, msg={msg[0:180]}) - log spam TRIMMED" + ) + else: + logger.debug(f"_send_recv(command={command}, msg={msg})") self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8") rcommand = rmsg = b"" tracker = self.send_req.send_multipart([command, msg], copy=False, track=True) @@ -282,13 +288,14 @@ def run(self): # not really necessary because we just want one reply for # now. command, reply = self.send_req.recv_multipart() - logger.debug(f"command = {command}, of type {type(command)}") + logger.debug(f"command = {command}") if command == b"receiver-ready": if self.rt is not None: self.rlatest_snap = reply.decode("utf-8") self.rt = self._refresh_rt() logger.debug( - f"Id: {self.identity}. command({command}) and message({reply}) received. Proceeding to send fsdata." + f"Id: {self.identity}. command({command}) & message({reply}) received. " + "Proceed to send btrfs_send_stream." ) break else: @@ -381,8 +388,11 @@ def run(self): alive = False # Read all available data from stdout without blocking (requires bytes stream). # https://docs.python.org/3/library/io.html#io.BufferedIOBase.read1 - send_data = self.sp.stdout.read1() - if send_data is None: + # We limit/chunck this read1 to a set number of bytes per cycle. + # Btrfs uses 256 MB chunk on disk + # Arbitrarily chunking to 10MB + btrfs_send_stream = self.sp.stdout.read1(10000000) + if btrfs_send_stream is None: logger.debug("sp.stdout empty") continue except IOError: # TODO: Non functional in Py3 (Py2.7 behaviour) @@ -400,12 +410,12 @@ def run(self): ) self._sys_exit(3) - self.msg = f"Failed to send 'send_data' to the receiver for {self.snap_id}. Aborting.".encode( + self.msg = f"Failed to send 'btrfs_send_stream' to the receiver for {self.snap_id}. Aborting.".encode( "utf-8" ) self.update_trail = True - command, message = self._send_recv(b"", send_data) - self.total_bytes_sent += len(send_data) + command, message = self._send_recv(b"", btrfs_send_stream) + self.total_bytes_sent += len(btrfs_send_stream) num_msgs += 1 if num_msgs == 1000: num_msgs = 0