diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index 0c7a02a8a..1175810c4 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -27,7 +27,14 @@ from django import db from contextlib import contextmanager from smart_manager.replication.util import ReplicationMixin -from fs.btrfs import get_oldest_snap, remove_share, set_property, is_subvol, mount_share +from fs.btrfs import ( + get_oldest_snap, + remove_share, + set_property, + is_subvol, + mount_share, + BTRFS, +) from system.osi import run_command from storageadmin.models import Pool, Share, Appliance from smart_manager.models import ReplicaShare, ReceiveTrail @@ -36,10 +43,9 @@ logger = logging.getLogger(__name__) -BTRFS = "/sbin/btrfs" - class Receiver(ReplicationMixin, Process): + total_bytes_received: int sname: str def __init__(self, identity: bytes, meta: bytes): @@ -178,7 +184,9 @@ def run(self): self.law = APIWrapper() self.poller = zmq.Poller() # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - self.dealer = self.ctx.socket(zmq.DEALER, copy_threshold=0) # Setup OUTPUT socket type. + self.dealer = self.ctx.socket( + zmq.DEALER, copy_threshold=0 + ) # Setup OUTPUT socket type. # self.dealer.set_hwm(10) ipc_socket = settings.REPLICATION.get("ipc_socket") # Identity must be set before connection. @@ -300,7 +308,7 @@ def run(self): num_tries = 10 num_msgs = 0 - t0 = time.time() + start_time = time.time() while True: events = dict(self.poller.poll(timeout=6000)) # 6 seconds logger.debug(f"RECEIVER events dict = {events}") @@ -319,11 +327,12 @@ def run(self): # this command concludes fsdata transfer. After this, # btrfs-recev process should be # terminated(.communicate). + # poll() returns None while process is running: rc otherwise. if self.rp.poll() is None: self.msg = b"Failed to terminate btrfs-recv command" out, err = self.rp.communicate() - out = out.split("\n") - err = err.split("\n") + out = out.split(b"\n") + err = err.split(b"\n") logger.debug( f"Id: {self.identity}. Terminated btrfs-recv. cmd = {cmd} out = {out} err: {err} rc: {self.rp.returncode}" ) @@ -332,9 +341,10 @@ def run(self): "utf-8" ) raise Exception(self.msg) + total_kb_received = int(self.total_bytes_received / 1024) data = { "status": "succeeded", - "kb_received": self.total_bytes_received / 1024, + "kb_received": total_kb_received, } self.msg = f"Failed to update receive trail for rtid: {self.rtid}".encode( "utf-8" @@ -345,7 +355,9 @@ def run(self): self.refresh_share_state() self.refresh_snapshot_state() - dsize, drate = self.size_report(self.total_bytes_received, t0) + dsize, drate = self.size_report( + self.total_bytes_received, start_time + ) logger.debug( f"Id: {self.identity}. Receive complete. Total data transferred: {dsize}. Rate: {drate}/sec." ) @@ -361,6 +373,7 @@ def run(self): ) raise Exception(self.msg) + # poll() returns None while process is running: return code otherwise. if self.rp.poll() is None: self.rp.stdin.write(message) self.rp.stdin.flush() @@ -370,22 +383,23 @@ def run(self): self.total_bytes_received += len(message) if num_msgs == 1000: num_msgs = 0 + total_kb_received = int(self.total_bytes_received / 1024) data = { "status": "pending", - "kb_received": self.total_bytes_received / 1024, + "kb_received": total_kb_received, } self.update_receive_trail(self.rtid, data) dsize, drate = self.size_report( - self.total_bytes_received, t0 + self.total_bytes_received, start_time ) logger.debug( f"Id: {self.identity}. Receiver alive. Data transferred: {dsize}. Rate: {drate}/sec." ) - else: + else: # receive process has stopped: out, err = self.rp.communicate() - out = out.split("\n") - err = err.split("\n") + out = out.split(b"\n") + err = err.split(b"\n") logger.error( f"Id: {self.identity}. btrfs-recv died unexpectedly. " f"cmd: {cmd} out: {out}. err: {err}" diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index d3352b76a..73a835878 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -21,14 +21,12 @@ import sys import zmq import subprocess -import io -import fcntl import json import time from django.conf import settings from contextlib import contextmanager from smart_manager.replication.util import ReplicationMixin -from fs.btrfs import get_oldest_snap, is_subvol +from fs.btrfs import get_oldest_snap, is_subvol, BTRFS from smart_manager.models import ReplicaTrail from cli import APIWrapper from django import db @@ -36,10 +34,9 @@ logger = logging.getLogger(__name__) -BTRFS = "/sbin/btrfs" - class Sender(ReplicationMixin, Process): + total_bytes_sent: int identity: str def __init__(self, uuid: str, receiver_ip, replica, rt: int | None = None): @@ -370,10 +367,11 @@ def run(self): alive = True num_msgs = 0 - t0 = time.time() + start_time = time.time() while alive: try: - if self.sp.poll() is not None: # poll() returns None while process is running: rc otherwise. + # poll() returns None while process is running: rc otherwise. + if self.sp.poll() is not None: logger.debug( f"Id: {self.identity}. send process finished for {self.snap_id}. " f"rc: {self.sp.returncode}. stderr: {self.sp.stderr.read()}" @@ -409,7 +407,7 @@ def run(self): num_msgs += 1 if num_msgs == 1000: num_msgs = 0 - dsize, drate = self.size_report(self.total_bytes_sent, t0) + dsize, drate = self.size_report(self.total_bytes_sent, start_time) logger.debug( f"Id: {self.identity} Sender alive. Data transferred: {dsize}. Rate: {drate}/sec." ) @@ -437,16 +435,16 @@ def run(self): f"Id: {self.identity}. Scheduler exited. Sender for {self.snap_id} cannot go on. Aborting." ) self._sys_exit(3) - + total_kb_sent = int(self.total_bytes_sent / 1024) data = { "status": "succeeded", - "kb_sent": self.total_bytes_sent / 1024, + "kb_sent": total_kb_sent, } self.msg = f"Failed to update final replica status for {self.snap_id}. Aborting.".encode( "utf-8" ) self.update_replica_status(self.rt2_id, data) - dsize, drate = self.size_report(self.total_bytes_sent, t0) + dsize, drate = self.size_report(self.total_bytes_sent, start_time) logger.debug( f"Id: {self.identity}. Send complete. Total data transferred: {dsize}. Rate: {drate}/sec." ) diff --git a/src/rockstor/smart_manager/replication/util.py b/src/rockstor/smart_manager/replication/util.py index 6fba0554a..7f4f11303 100644 --- a/src/rockstor/smart_manager/replication/util.py +++ b/src/rockstor/smart_manager/replication/util.py @@ -17,6 +17,8 @@ """ import time +from typing import Any + from storageadmin.exceptions import RockStorAPIException from storageadmin.models import Appliance, Share from cli import APIWrapper @@ -232,29 +234,23 @@ def refresh_share_state(self): except Exception as e: logger.error(f"Exception while refreshing Share state: {e.__str__()}") - def humanize_bytes( - self, - num: int, - units=( - "Bytes", - "KB", - "MB", - "GB", - ), - ): + def humanize_bytes(self, num: float, units=("Bytes", "KB", "MB", "GB")) -> str: """ Recursive routine to establish and then return the most appropriate num expression given the contents of units. Ie 1023 Bytes or 4096 KB :param num: Assumed to be in Byte units. :param units: list of units to recurse through - :return: "1023 Bytes" or "4.28 KB" etc given num=1023 or num=4384 ) + :return: "1023 Bytes" or "4.28 KB" etc. given num=1023 or num=4384 """ if num < 1024 or len(units) == 1: return f"{num:.2f} {units[0]}" return self.humanize_bytes(num / 1024, units[1:]) - def size_report(self, num: int, t0): - t1 = time.time() - dsize = self.humanize_bytes(float(num)) - drate = self.humanize_bytes(float(num / (t1 - t0))) + def size_report(self, num: int, time_started: float): + """ + Takes num of bytes, and a start time, and returns humanized output. + """ + time_now = time.time() + dsize: str = self.humanize_bytes(float(num)) + drate: str = self.humanize_bytes(float(num / (time_now - time_started))) return dsize, drate