diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index bd3c1a5b4..d530c30c0 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -201,7 +201,7 @@ def run(self): ctx = zmq.Context() frontend = ctx.socket(zmq.ROUTER) - frontend.set_hwm(value=10) + # frontend.set_hwm(value=10) frontend.bind(f"tcp://{self.listener_interface}:{ self.listener_port}") backend = ctx.socket(zmq.ROUTER) diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index ff4e086a4..6062648f4 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -164,7 +164,7 @@ def run(self): self.poll = zmq.Poller() self.dealer = self.ctx.socket(zmq.DEALER) self.dealer.setsockopt_string(zmq.IDENTITY, str(self.identity)) - self.dealer.set_hwm(10) + # self.dealer.set_hwm(10) ipc_socket = settings.REPLICATION.get("ipc_socket") self.dealer.connect(f"ipc://{ipc_socket}") self.poll.register(self.dealer, zmq.POLLIN) @@ -269,7 +269,11 @@ def run(self): ) self.msg = b"Failed to send receiver-ready" - rcommand, rmsg = self._send_recv(b"receiver-ready", b"") + # Previously our second parameter was (latest_snap or b"") + snap = latest_snap + if snap is None: + snap = b"place-holder" + rcommand, rmsg = self._send_recv(b"receiver-ready", snap) if rcommand == b"": logger.error( f"Id: {self.identity}. No response from the broker for receiver-ready command. Aborting." @@ -282,6 +286,12 @@ def run(self): t0 = time.time() while True: socks = dict(self.poll.poll(poll_interval)) + logger.debug(f"RECEIVER socks dict = {socks}") + if socks != {}: + for key in socks: + logger.debug(f"socks index ({key}), has value {socks[key]}") + else: + logger.debug("SOCKS EMPTY") if socks.get(self.dealer) == zmq.POLLIN: # reset to wait upto 60(poll_interval x num_tries # milliseconds) for every message diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index 066b413e8..c7d1961c1 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -250,10 +250,9 @@ def run(self): ) ) - poll_interval = 6000 # 6 seconds while True: - socks = dict(self.poll.poll(poll_interval)) - logger.debug(f"Sender socks dict = {socks}") + socks = dict(self.poll.poll(6000)) + logger.debug(f"SENDER socks dict = {socks}") if socks != {}: for key in socks: logger.debug(f"socks index ({key}), has value {socks[key]}")