Skip to content

Commit

Permalink
(t) replication spawn error - continued 2 rockstor#2766
Browse files Browse the repository at this point in the history
- more str to byte for zmq use given our Py3 base now.
- more fstring conversions.
- further parameter/return type typecasting.
- removed unused variable.
- black format update
  • Loading branch information
phillxnet committed Dec 20, 2023
1 parent 58a7566 commit 6f2e7fd
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 79 deletions.
8 changes: 2 additions & 6 deletions src/rockstor/scripts/scheduled_tasks/send_replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,12 @@ def main():
num_tries = 3
while True:
req = ctx.socket(zmq.DEALER)
print(f"req = {req}")
poll.register(req, zmq.POLLIN)
print(f"poll.register = {poll.register}")
ipc_socket = settings.REPLICATION.get('ipc_socket')
ipc_socket = settings.REPLICATION.get("ipc_socket")
req.connect(f"ipc://{ipc_socket}")
print(f"req.connect = {req.connect}, ipc_socket = {ipc_socket}")
req.send_multipart([b"new-send", f"{rid}".encode('utf-8')])
req.send_multipart([b"new-send", f"{rid}".encode("utf-8")])

socks = dict(poll.poll(5000))
# print(f"socks.get(req) = {socks.get(req)}")
if socks.get(req) == zmq.POLLIN:
rcommand, reply = req.recv_multipart()
print(f"rcommand={rcommand}, reply={reply}")
Expand Down
24 changes: 14 additions & 10 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,10 @@ def _latest_snap(self, rso):
for snap in ReceiveTrail.objects.filter(
rshare=rso, status="succeeded"
).order_by("-id"):
if is_subvol("%s/%s" % (self.snap_dir, snap.snap_name)):
return str(snap.snap_name) # cannot be unicode for zmq message
if is_subvol(f"{self.snap_dir}/{snap.snap_name}"):
return str(snap.snap_name).encode(
"utf8"
) # cannot be unicode for zmq message
logger.error(
"Id: %s. There are no replication snapshots on the "
"system for "
Expand Down Expand Up @@ -265,7 +267,7 @@ def run(self):
"exists. Not starting a new receive process"
% (self.identity, snap_fp)
)
self._send_recv("snap-exists")
self._send_recv(b"snap-exists")
self._sys_exit(0)

cmd = [BTRFS, "receive", self.snap_dir]
Expand All @@ -288,11 +290,6 @@ def run(self):
)
self._sys_exit(3)

term_commands = (
b"btrfs-send-init-error",
b"btrfs-send-unexpected-termination-error",
b"btrfs-send-nonzero-termination-error",
)
num_tries = 10
poll_interval = 6000 # 6 seconds
num_msgs = 0
Expand All @@ -304,6 +301,7 @@ def run(self):
# milliseconds) for every message
num_tries = 10
command, message = self.dealer.recv_multipart()
logger.debug(f"command = {command}, of type:", type(command))
if command == b"btrfs-send-stream-finished":
# this command concludes fsdata transfer. After this,
# btrfs-recev process should be
Expand All @@ -327,7 +325,9 @@ def run(self):
"status": "succeeded",
"kb_received": self.total_bytes_received / 1024,
}
self.msg = f"Failed to update receive trail for rtid: {self.rtid}".encode("utf-8")
self.msg = f"Failed to update receive trail for rtid: {self.rtid}".encode(
"utf-8"
)
self.update_receive_trail(self.rtid, data)

self._send_recv(b"btrfs-recv-finished")
Expand All @@ -342,7 +342,11 @@ def run(self):
)
self._sys_exit(0)

if command in term_commands:
if (
command == b"btrfs-send-init-error"
or command == b"btrfs-send-unexpected-termination-error"
or command == b"btrfs-send-nonzero-termination-error"
):
self.msg = f"Terminal command({command}) received from the sender. Aborting.".encode(
"utf-8"
)
Expand Down
29 changes: 17 additions & 12 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ def _clean_exit_handler(self):
try:
yield
except Exception as e:
logger.error(
"Id: %s. %s. Exception: %s" % (self.identity, self.msg, e.__str__())
)
logger.error(f"Id: {self.identity}. {self.msg}. Exception: {e.__str__()}")
if self.update_trail:
try:
data = {
Expand All @@ -84,8 +82,7 @@ def _clean_exit_handler(self):
self.update_replica_status(self.rt2_id, data)
except Exception as e:
logger.error(
"Id: %s. Exception occured while updating "
"replica status: %s" % (self.identity, e.__str__())
f"Id: {self.identity}. Exception occurred while updating replica status: {e.__str__()}"
)
self._sys_exit(3)

Expand All @@ -98,7 +95,7 @@ def _sys_exit(self, code):
def _init_greeting(self):
self.send_req = self.ctx.socket(zmq.DEALER)
self.send_req.setsockopt_string(zmq.IDENTITY, self.identity)
self.send_req.connect("tcp://%s:%d" % (self.receiver_ip, self.receiver_port))
self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}")
msg = {
"pool": self.replica.dpool,
"share": self.replica.share,
Expand All @@ -107,8 +104,8 @@ def _init_greeting(self):
"uuid": self.uuid,
}
msg_str = json.dumps(msg)
self.send_req.send_multipart([b"sender-ready", b"%s" % msg_str])
logger.debug("Id: %s Initial greeting: %s" % (self.identity, msg))
self.send_req.send_multipart([b"sender-ready", msg_str.encode("utf-8")])
logger.debug(f"Id: {self.identity} Initial greeting: {msg}")
self.poll.register(self.send_req, zmq.POLLIN)

def _send_recv(self, command: bytes, msg: bytes = b""):
Expand All @@ -132,7 +129,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""):
)
return rcommand, rmsg

def _delete_old_snaps(self, share_path):
def _delete_old_snaps(self, share_path: str):
oldest_snap = get_oldest_snap(
share_path, self.max_snap_retain, regex="_replication_"
)
Expand Down Expand Up @@ -248,22 +245,28 @@ def run(self):
# create a snapshot only if it's not already from a previous
# failed attempt.
# TODO: If one does exist we fail which seems harsh as we may be
# TODO: able to pickup where we left of depending on the failure.
# able to pickup where we left of depending on the failure.
self.msg = f"Failed to create snapshot: {self.snap_name}. Aborting.".encode(
"utf-8"
)
self.create_snapshot(self.replica.share, self.snap_name)

retries_left = settings.REPLICATION.get("max_send_attempts")

self.msg = (
"Place-holder message just after sender snapshot creation".encode(
"utf-8"
)
)

poll_interval = 6000 # 6 seconds
while True:
socks = dict(self.poll.poll(poll_interval))
if socks.get(self.send_req) == zmq.POLLIN:
# not really necessary because we just want one reply for
# now.
retries_left = settings.REPLICATION.get("max_send_attempts")
command, reply = self.send_req.recv_multipart()
logger.debug(f"command = {command}, of type", type(command))
if command == b"receiver-ready":
if self.rt is not None:
self.rlatest_snap = reply
Expand All @@ -273,7 +276,7 @@ def run(self):
)
break
else:
if command in "receiver-init-error":
if command == b"receiver-init-error":
self.msg = f"{command} received for {self.identity}. extended reply: {reply}. Aborting.".encode(
"utf-8"
)
Expand Down Expand Up @@ -320,6 +323,7 @@ def run(self):
self.snap_name,
)
cmd = [BTRFS, "send", snap_path]
logger.debug(f"init btrfs 'send' cmd {cmd}")
if self.rt is not None:
prev_snap = "%s%s/.snapshots/%s/%s" % (
settings.MNT_PT,
Expand All @@ -332,6 +336,7 @@ def run(self):
"%s -- %s" % (self.identity, prev_snap, snap_path)
)
cmd = [BTRFS, "send", "-p", prev_snap, snap_path]
logger.debug(f"differential btrfs 'send' cmd {cmd}")
else:
logger.info(
"Id: %s. Sending full replica: %s" % (self.identity, snap_path)
Expand Down
Loading

0 comments on commit 6f2e7fd

Please sign in to comment.