From 64810f92c5071765b9355e480daba9660560fde4 Mon Sep 17 00:00:00 2001 From: Philip Guyton Date: Sat, 30 Dec 2023 18:04:13 +0000 Subject: [PATCH] (t) replication spawn error - continued 4 #2766 - More debug logging. - reduce retry iterations from 10 to 3. - more type hints. --- .../smart_manager/replication/listener_broker.py | 11 ++++++++--- src/rockstor/smart_manager/replication/receiver.py | 10 ++++++---- src/rockstor/smart_manager/replication/sender.py | 10 +++++++++- src/rockstor/smart_manager/replication/util.py | 7 ++++--- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index 3f4f191ad..e21359a8d 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -213,7 +213,7 @@ def run(self): poller.register(backend, zmq.POLLIN) self.local_receivers = {} - iterations = 10 + iterations = 3 poll_interval = 6000 # 6 seconds msg_count = 0 while True: @@ -221,7 +221,12 @@ def run(self): # is terminated, as long as data is coming in. socks = dict(poller.poll(timeout=poll_interval)) if frontend in socks and socks[frontend] == zmq.POLLIN: + # frontend.recv_multipart() returns all as type address, command, msg = frontend.recv_multipart() + logger.debug("frontend.recv_multipart() returns") + logger.debug(f"address = {address}, type {type(address)}") + logger.debug(f"command = {command}, type {type(command)}") + logger.debug(f"msg = {msg}, type {type(msg)}") if address not in self.remote_senders: self.remote_senders[address] = 1 else: @@ -248,7 +253,7 @@ def run(self): ) start_nr = True else: - msg = f"Receiver({address}) already exists. Will not start a new one." + msg = f"Receiver({address}) already exists. Will not start a new one.".encode("utf-8") logger.error(msg) # TODO: There may be a different way to handle # this. For example, we can pass the message to @@ -258,7 +263,7 @@ def run(self): [ address, b"receiver-init-error", - msg.encode("utf-8"), + msg, ] ) if start_nr: diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index a5864fe03..d3988d730 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -40,7 +40,9 @@ class Receiver(ReplicationMixin, Process): - def __init__(self, identity, meta): + sname: str + + def __init__(self, identity: bytes, meta): self.identity = identity self.meta = json.loads(meta) self.src_share = self.meta["share"] @@ -126,7 +128,7 @@ def _delete_old_snaps(self, share_name, share_path, num_retain): return self._delete_old_snaps(share_name, share_path, num_retain) def _send_recv(self, command: bytes, msg: bytes = b""): - logger.debug(f"_send_recv called with command: {command}, msg: {msg}.") + logger.debug(f"RECEIVER: _send_recv called with command: {command}, msg: {msg}.") rcommand = rmsg = None self.dealer.send_multipart([command, msg]) # Retry logic doesn't make sense atm. So one long patient wait. @@ -134,7 +136,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""): if socks.get(self.dealer) == zmq.POLLIN: rcommand, rmsg = self.dealer.recv_multipart() logger.debug(f"Id: {self.identity} command: {command} rcommand: {rcommand}") - logger.debug((f"remote message: {rmsg}")) + logger.debug(f"remote message: {rmsg}") return rcommand, rmsg def _latest_snap(self, rso): @@ -285,7 +287,7 @@ def run(self): # milliseconds) for every message num_tries = 10 command, message = self.dealer.recv_multipart() - logger.debug(f"command = {command}, of type:", type(command)) + logger.debug(f"command = {command}, of type: {type(command)}") if command == b"btrfs-send-stream-finished": # this command concludes fsdata transfer. After this, # btrfs-recev process should be diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index c6de9cf54..06577f9a4 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -93,6 +93,7 @@ def _sys_exit(self, code): sys.exit(code) def _init_greeting(self): + logger.debug("_init_greeting CALLED") self.send_req = self.ctx.socket(zmq.DEALER) self.send_req.setsockopt_string(zmq.IDENTITY, self.identity) self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}") @@ -109,6 +110,7 @@ def _init_greeting(self): self.poll.register(self.send_req, zmq.POLLIN) def _send_recv(self, command: bytes, msg: bytes = b""): + logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})") self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8") rcommand = rmsg = None self.send_req.send_multipart([command, msg]) @@ -130,6 +132,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""): return rcommand, rmsg def _delete_old_snaps(self, share_path: str): + oldest_snap = get_oldest_snap( share_path, self.max_snap_retain, regex="_replication_" ) @@ -251,11 +254,16 @@ def run(self): while True: socks = dict(self.poll.poll(poll_interval)) logger.debug(f"Sender socks dict = {socks}") + if socks != {}: + for key in socks: + logger.debug(f"socks index ({key}), has value {socks[key]}") + else: + logger.debug("SOCKS EMPTY") if socks.get(self.send_req) == zmq.POLLIN: # not really necessary because we just want one reply for # now. command, reply = self.send_req.recv_multipart() - logger.debug(f"command = {command}, of type", type(command)) + logger.debug(f"command = {command}, of type {type(command)}") if command == b"receiver-ready": if self.rt is not None: self.rlatest_snap = reply diff --git a/src/rockstor/smart_manager/replication/util.py b/src/rockstor/smart_manager/replication/util.py index f8fbe4b66..d4b4d221e 100644 --- a/src/rockstor/smart_manager/replication/util.py +++ b/src/rockstor/smart_manager/replication/util.py @@ -65,6 +65,7 @@ def disable_replica(self, rid: int): raise Exception(msg) def create_replica_trail(self, rid: int, snap_name: str): + logger.debug(f"Replication create_replica_trail(rid={rid}, snap_name={snap_name})") url = f"sm/replicas/trail/replica/{rid}" return self.law.api_call( url, @@ -153,7 +154,7 @@ def update_repclone(self, sname: str, snap_name: str): share with the given snapshot. Intended for use in receive.py to turn the oldest snapshot into an existing share via unmount, mv, mount cycle. - :param sname: Existing share name + :param sname: Existing share-name :param snap_name: Name of snapshot to supplant given share with. :return: False if there is a failure. """ @@ -184,8 +185,8 @@ def delete_snapshot(self, sname: str, snap_name: str): return False raise e - def create_share(self, sname: str, pool): - print("pool parameter is of type:", type(pool)) + def create_share(self, sname: str, pool: str): + print(f"Replication 'create_share' called with sname {sname}, pool {pool}") try: url = "shares" data = {