Skip to content

Commit

Permalink
(t) replication spawn error - fix unit type issue rockstor#2766
Browse files Browse the repository at this point in the history
- additional type-hinting and fix for very low send byte count.
- harmonize on btrfs binary location to fs.btrfs for replication.
- readability refactoring.
- more byte/str fixes re Py2.7/Py3
  • Loading branch information
phillxnet committed Jan 8, 2024
1 parent fca03f5 commit 89236f6
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 40 deletions.
42 changes: 28 additions & 14 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@
from django import db
from contextlib import contextmanager
from smart_manager.replication.util import ReplicationMixin
from fs.btrfs import get_oldest_snap, remove_share, set_property, is_subvol, mount_share
from fs.btrfs import (
get_oldest_snap,
remove_share,
set_property,
is_subvol,
mount_share,
BTRFS,
)
from system.osi import run_command
from storageadmin.models import Pool, Share, Appliance
from smart_manager.models import ReplicaShare, ReceiveTrail
Expand All @@ -36,10 +43,9 @@

logger = logging.getLogger(__name__)

BTRFS = "/sbin/btrfs"


class Receiver(ReplicationMixin, Process):
total_bytes_received: int
sname: str

def __init__(self, identity: bytes, meta: bytes):
Expand Down Expand Up @@ -178,7 +184,9 @@ def run(self):
self.law = APIWrapper()
self.poller = zmq.Poller()
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket
self.dealer = self.ctx.socket(zmq.DEALER, copy_threshold=0) # Setup OUTPUT socket type.
self.dealer = self.ctx.socket(
zmq.DEALER, copy_threshold=0
) # Setup OUTPUT socket type.
# self.dealer.set_hwm(10)
ipc_socket = settings.REPLICATION.get("ipc_socket")
# Identity must be set before connection.
Expand Down Expand Up @@ -300,7 +308,7 @@ def run(self):

num_tries = 10
num_msgs = 0
t0 = time.time()
start_time = time.time()
while True:
events = dict(self.poller.poll(timeout=6000)) # 6 seconds
logger.debug(f"RECEIVER events dict = {events}")
Expand All @@ -319,11 +327,12 @@ def run(self):
# this command concludes fsdata transfer. After this,
# btrfs-recev process should be
# terminated(.communicate).
# poll() returns None while process is running: rc otherwise.
if self.rp.poll() is None:
self.msg = b"Failed to terminate btrfs-recv command"
out, err = self.rp.communicate()
out = out.split("\n")
err = err.split("\n")
out = out.split(b"\n")
err = err.split(b"\n")
logger.debug(
f"Id: {self.identity}. Terminated btrfs-recv. cmd = {cmd} out = {out} err: {err} rc: {self.rp.returncode}"
)
Expand All @@ -332,9 +341,10 @@ def run(self):
"utf-8"
)
raise Exception(self.msg)
total_kb_received = int(self.total_bytes_received / 1024)
data = {
"status": "succeeded",
"kb_received": self.total_bytes_received / 1024,
"kb_received": total_kb_received,
}
self.msg = f"Failed to update receive trail for rtid: {self.rtid}".encode(
"utf-8"
Expand All @@ -345,7 +355,9 @@ def run(self):
self.refresh_share_state()
self.refresh_snapshot_state()

dsize, drate = self.size_report(self.total_bytes_received, t0)
dsize, drate = self.size_report(
self.total_bytes_received, start_time
)
logger.debug(
f"Id: {self.identity}. Receive complete. Total data transferred: {dsize}. Rate: {drate}/sec."
)
Expand All @@ -361,6 +373,7 @@ def run(self):
)
raise Exception(self.msg)

# poll() returns None while process is running: return code otherwise.
if self.rp.poll() is None:
self.rp.stdin.write(message)
self.rp.stdin.flush()
Expand All @@ -370,22 +383,23 @@ def run(self):
self.total_bytes_received += len(message)
if num_msgs == 1000:
num_msgs = 0
total_kb_received = int(self.total_bytes_received / 1024)
data = {
"status": "pending",
"kb_received": self.total_bytes_received / 1024,
"kb_received": total_kb_received,
}
self.update_receive_trail(self.rtid, data)

dsize, drate = self.size_report(
self.total_bytes_received, t0
self.total_bytes_received, start_time
)
logger.debug(
f"Id: {self.identity}. Receiver alive. Data transferred: {dsize}. Rate: {drate}/sec."
)
else:
else: # receive process has stopped:
out, err = self.rp.communicate()
out = out.split("\n")
err = err.split("\n")
out = out.split(b"\n")
err = err.split(b"\n")
logger.error(
f"Id: {self.identity}. btrfs-recv died unexpectedly. "
f"cmd: {cmd} out: {out}. err: {err}"
Expand Down
20 changes: 9 additions & 11 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,22 @@
import sys
import zmq
import subprocess
import io
import fcntl
import json
import time
from django.conf import settings
from contextlib import contextmanager
from smart_manager.replication.util import ReplicationMixin
from fs.btrfs import get_oldest_snap, is_subvol
from fs.btrfs import get_oldest_snap, is_subvol, BTRFS
from smart_manager.models import ReplicaTrail
from cli import APIWrapper
from django import db
import logging

logger = logging.getLogger(__name__)

BTRFS = "/sbin/btrfs"


class Sender(ReplicationMixin, Process):
total_bytes_sent: int
identity: str

def __init__(self, uuid: str, receiver_ip, replica, rt: int | None = None):
Expand Down Expand Up @@ -370,10 +367,11 @@ def run(self):

alive = True
num_msgs = 0
t0 = time.time()
start_time = time.time()
while alive:
try:
if self.sp.poll() is not None: # poll() returns None while process is running: rc otherwise.
# poll() returns None while process is running: rc otherwise.
if self.sp.poll() is not None:
logger.debug(
f"Id: {self.identity}. send process finished for {self.snap_id}. "
f"rc: {self.sp.returncode}. stderr: {self.sp.stderr.read()}"
Expand Down Expand Up @@ -409,7 +407,7 @@ def run(self):
num_msgs += 1
if num_msgs == 1000:
num_msgs = 0
dsize, drate = self.size_report(self.total_bytes_sent, t0)
dsize, drate = self.size_report(self.total_bytes_sent, start_time)
logger.debug(
f"Id: {self.identity} Sender alive. Data transferred: {dsize}. Rate: {drate}/sec."
)
Expand Down Expand Up @@ -437,16 +435,16 @@ def run(self):
f"Id: {self.identity}. Scheduler exited. Sender for {self.snap_id} cannot go on. Aborting."
)
self._sys_exit(3)

total_kb_sent = int(self.total_bytes_sent / 1024)
data = {
"status": "succeeded",
"kb_sent": self.total_bytes_sent / 1024,
"kb_sent": total_kb_sent,
}
self.msg = f"Failed to update final replica status for {self.snap_id}. Aborting.".encode(
"utf-8"
)
self.update_replica_status(self.rt2_id, data)
dsize, drate = self.size_report(self.total_bytes_sent, t0)
dsize, drate = self.size_report(self.total_bytes_sent, start_time)
logger.debug(
f"Id: {self.identity}. Send complete. Total data transferred: {dsize}. Rate: {drate}/sec."
)
Expand Down
26 changes: 11 additions & 15 deletions src/rockstor/smart_manager/replication/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"""

import time
from typing import Any

from storageadmin.exceptions import RockStorAPIException
from storageadmin.models import Appliance, Share
from cli import APIWrapper
Expand Down Expand Up @@ -232,29 +234,23 @@ def refresh_share_state(self):
except Exception as e:
logger.error(f"Exception while refreshing Share state: {e.__str__()}")

def humanize_bytes(
self,
num: int,
units=(
"Bytes",
"KB",
"MB",
"GB",
),
):
def humanize_bytes(self, num: float, units=("Bytes", "KB", "MB", "GB")) -> str:
"""
Recursive routine to establish and then return the most appropriate
num expression given the contents of units. Ie 1023 Bytes or 4096 KB
:param num: Assumed to be in Byte units.
:param units: list of units to recurse through
:return: "1023 Bytes" or "4.28 KB" etc given num=1023 or num=4384 )
:return: "1023 Bytes" or "4.28 KB" etc. given num=1023 or num=4384
"""
if num < 1024 or len(units) == 1:
return f"{num:.2f} {units[0]}"
return self.humanize_bytes(num / 1024, units[1:])

def size_report(self, num: int, t0):
t1 = time.time()
dsize = self.humanize_bytes(float(num))
drate = self.humanize_bytes(float(num / (t1 - t0)))
def size_report(self, num: int, time_started: float):
"""
Takes num of bytes, and a start time, and returns humanized output.
"""
time_now = time.time()
dsize: str = self.humanize_bytes(float(num))
drate: str = self.humanize_bytes(float(num / (time_now - time_started)))
return dsize, drate

0 comments on commit 89236f6

Please sign in to comment.