diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index e21359a8d..bd3c1a5b4 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -239,7 +239,7 @@ def run(self): f"Active Receiver: {rs}. Messages processed: {count}" ) if command == b"sender-ready": - logger.debug(f"initial greeting from {address}") + logger.debug(f"initial greeting command '{command}' received from {address}") # Start a new receiver and send the appropriate response try: start_nr = True diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index d3988d730..ff4e086a4 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -129,13 +129,13 @@ def _delete_old_snaps(self, share_name, share_path, num_retain): def _send_recv(self, command: bytes, msg: bytes = b""): logger.debug(f"RECEIVER: _send_recv called with command: {command}, msg: {msg}.") - rcommand = rmsg = None + rcommand = rmsg = b"" self.dealer.send_multipart([command, msg]) # Retry logic doesn't make sense atm. So one long patient wait. socks = dict(self.poll.poll(60000)) # 60 seconds. if socks.get(self.dealer) == zmq.POLLIN: rcommand, rmsg = self.dealer.recv_multipart() - logger.debug(f"Id: {self.identity} command: {command} rcommand: {rcommand}") + logger.debug(f"Id: {self.identity} RECEIVER: _send_recv command: {command} rcommand: {rcommand}") logger.debug(f"remote message: {rmsg}") return rcommand, rmsg @@ -270,7 +270,7 @@ def run(self): self.msg = b"Failed to send receiver-ready" rcommand, rmsg = self._send_recv(b"receiver-ready", b"") - if rcommand is None: + if rcommand == b"": logger.error( f"Id: {self.identity}. No response from the broker for receiver-ready command. Aborting." ) diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index 06577f9a4..066b413e8 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -93,7 +93,7 @@ def _sys_exit(self, code): sys.exit(code) def _init_greeting(self): - logger.debug("_init_greeting CALLED") + logger.debug("_init_greeting() CALLED") self.send_req = self.ctx.socket(zmq.DEALER) self.send_req.setsockopt_string(zmq.IDENTITY, self.identity) self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}") @@ -105,6 +105,7 @@ def _init_greeting(self): "uuid": self.uuid, } msg_str = json.dumps(msg) + logger.debug("_init_greeting() sending 'sender-ready'") self.send_req.send_multipart([b"sender-ready", msg_str.encode("utf-8")]) logger.debug(f"Id: {self.identity} Initial greeting: {msg}") self.poll.register(self.send_req, zmq.POLLIN) @@ -112,7 +113,7 @@ def _init_greeting(self): def _send_recv(self, command: bytes, msg: bytes = b""): logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})") self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8") - rcommand = rmsg = None + rcommand = rmsg = b"" self.send_req.send_multipart([command, msg]) # There is no retry logic here because it's an overkill at the moment. # If the stream is interrupted, we can only start from the beginning @@ -121,10 +122,9 @@ def _send_recv(self, command: bytes, msg: bytes = b""): socks = dict(self.poll.poll(60000)) # 60 seconds. if socks.get(self.send_req) == zmq.POLLIN: rcommand, rmsg = self.send_req.recv_multipart() - if ( - len(command) > 0 or (rcommand is not None and rcommand != b"send-more") - ) or ( # noqa E501 - len(command) > 0 and rcommand is None + # len(b"") == 0 so change to test for command != b"" instead + if (len(command) > 0 or (rcommand != b"" and rcommand != b"send-more")) or ( + len(command) > 0 and rcommand == b"" ): logger.debug( f"Id: {self.identity} Server: {self.receiver_ip}:{self.receiver_port} scommand: {command} rcommand: {rcommand}" @@ -132,7 +132,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""): return rcommand, rmsg def _delete_old_snaps(self, share_path: str): - + logger.debug(f"Sender _delete_old_snaps(share_path={share_path})") oldest_snap = get_oldest_snap( share_path, self.max_snap_retain, regex="_replication_" ) @@ -373,12 +373,11 @@ def run(self): logger.debug( f"Id: {self.identity} Sender alive. Data transferred: {dsize}. Rate: {drate}/sec." ) - if command is None or command == b"receiver-error": - # command is None when the remote side vanishes. + if command == b"" or command == b"receiver-error": + # command is EMPTY when the remote side vanishes. self.msg = ( - f"Got null or error command({command}) message({message}) " - "from the Receiver while " - "transmitting fsdata. Aborting." + f"Got EMPTY or error command ({command}) message ({message}) " + "from the Receiver while transmitting fsdata. Aborting." ).encode("utf-8") raise Exception(message) diff --git a/src/rockstor/smart_manager/replication/util.py b/src/rockstor/smart_manager/replication/util.py index d4b4d221e..6fba0554a 100644 --- a/src/rockstor/smart_manager/replication/util.py +++ b/src/rockstor/smart_manager/replication/util.py @@ -38,6 +38,7 @@ def validate_src_share(self, sender_uuid: str, sname: str): return self.raw.api_call(url=f"shares/{sname}") def update_replica_status(self, rtid: int, data): + logger.debug(f"update_replica_status(rtid={rtid}, data={data})") try: url = f"sm/replicas/trail/{rtid}" return self.law.api_call(url, data=data, calltype="put") @@ -82,6 +83,7 @@ def rshare_id(self, sname: str) -> int: return rshare["id"] def create_rshare(self, data) -> int: + logger.debug(f"create_rshare(data={data})") try: url = "sm/replicas/rshare" rshare = self.law.api_call( @@ -95,8 +97,10 @@ def create_rshare(self, data) -> int: raise e def create_receive_trail(self, rid: int, data) -> int: + logger.debug(f"create_receive_trail(rid={rid}, data={data})") url = f"sm/replicas/rtrail/rshare/{rid}" rt = self.law.api_call(url, data=data, calltype="post", save_error=False) + logger.debug(f"create_receive_trail() -> {rt['id']}") return rt["id"] def update_receive_trail(self, rtid: int, data):