From 4de32759413c41397231804a74f523170829c78c Mon Sep 17 00:00:00 2001 From: Philip Guyton Date: Wed, 3 Jan 2024 18:11:53 +0000 Subject: [PATCH] (t) replication spawn error - continued 9 #2766 - Enable tracker on listender_broker and sender: - add zmq_version and libzmq_version properties to sender, receiver also now has these. --- .../replication/listener_broker.py | 11 +++++-- .../smart_manager/replication/receiver.py | 9 +++--- .../smart_manager/replication/sender.py | 32 +++++++++++++------ 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/rockstor/smart_manager/replication/listener_broker.py b/src/rockstor/smart_manager/replication/listener_broker.py index c7d46001d..745ad0185 100644 --- a/src/rockstor/smart_manager/replication/listener_broker.py +++ b/src/rockstor/smart_manager/replication/listener_broker.py @@ -202,6 +202,11 @@ def run(self): msg = f"Failed to get uuid of current appliance. Aborting. Exception: {e.__str__()}" return logger.error(msg) + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.send_multipart + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.copy_threshold + logger.debug("DISABLING COPY_THRESHOLD to enable message tracking.") + zmq.COPY_THRESHOLD = 0 + ctx = zmq.Context() # FRONTEND: IP @@ -222,14 +227,14 @@ def run(self): poller.register(frontend, zmq.POLLIN) poller.register(backend, zmq.POLLIN) - iterations = 3 + iterations = 5 msg_count = 0 while True: # This loop may still continue even if replication service # is terminated, as long as data is coming in. # Get all events: returns imidiately if any exist, or waits for timeout. # Event list of tuples of the form (socket, event_mask)): - events_list = poller.poll(timeout=2000) # Wait period in milliseconds + events_list = poller.poll(timeout=2000) # Max wait period in milliseconds logger.debug(f"EVENT_LIST poll = {events_list}") # Dictionary mapping of socket : event_mask. events = dict(events_list) @@ -328,7 +333,7 @@ def run(self): else: iterations -= 1 if iterations == 0: - iterations = 10 + iterations = 5 self._prune_senders() self._delete_receivers() cur_time = time.time() diff --git a/src/rockstor/smart_manager/replication/receiver.py b/src/rockstor/smart_manager/replication/receiver.py index 90dbe54df..7325662b9 100644 --- a/src/rockstor/smart_manager/replication/receiver.py +++ b/src/rockstor/smart_manager/replication/receiver.py @@ -62,6 +62,7 @@ def __init__(self, identity: bytes, meta): self.rtid = None # We mirror senders max_snap_retain via settings.REPLICATION self.num_retain_snaps = settings.REPLICATION.get("max_snap_retain") + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Context self.ctx = zmq.Context() self.zmq_version = zmq.__version__ self.libzmq_version = zmq.zmq_version() @@ -140,7 +141,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""): rcommand = rmsg = b"" tracker = self.dealer.send_multipart([command, msg], copy=False, track=True) if not tracker.done: - logger.debug(f"Waiting 2 seconds for send of commmand ({command})") + logger.debug(f"Waiting max 2 seconds for send of commmand ({command})") tracker.wait(timeout=2) # seconds as float # Note: And exception here would inform the receiver within the WebUI record. events = dict(self.poller.poll(timeout=5000)) @@ -170,16 +171,14 @@ def run(self): logger.debug( f"Id: {self.identity}. Starting a new Receiver for meta: {self.meta}" ) - # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.send_multipart - logger.debug("DISABLING COPY_THESHOLD to enable message tracking.") - zmq.COPY_THRESHOLD = 0 self.msg = b"Top level exception in receiver" latest_snap = None with self._clean_exit_handler(): self.law = APIWrapper() self.poller = zmq.Poller() - self.dealer = self.ctx.socket(zmq.DEALER) # Setup OUTPUT socket type. + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket + self.dealer = self.ctx.socket(zmq.DEALER, copy_threshold=0) # Setup OUTPUT socket type. self.dealer.setsockopt_string(zmq.IDENTITY, str(self.identity)) # self.dealer.set_hwm(10) ipc_socket = settings.REPLICATION.get("ipc_socket") diff --git a/src/rockstor/smart_manager/replication/sender.py b/src/rockstor/smart_manager/replication/sender.py index 799634f44..7f1954f71 100644 --- a/src/rockstor/smart_manager/replication/sender.py +++ b/src/rockstor/smart_manager/replication/sender.py @@ -59,7 +59,9 @@ def __init__(self, uuid, receiver_ip, replica, rt=None): # Latest snapshot per Receiver(comes along with receiver-ready) self.rlatest_snap = None # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Context - self.ctx = zmq.Context().instance() + self.ctx = zmq.Context() + self.zmq_version = zmq.__version__ + self.libzmq_version = zmq.zmq_version() self.msg = b"" self.update_trail = False self.total_bytes_sent = 0 @@ -96,7 +98,10 @@ def _sys_exit(self, code): def _init_greeting(self): logger.debug("_init_greeting() CALLED") # Create our send (DEALER) socket using our context (ctx) - self.send_req = self.ctx.socket(zmq.DEALER) + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket + self.send_req = self.ctx.socket(zmq.DEALER, copy_threshold=0) + # Register our poller to monitor for POLLIN events. + self.poller.register(self.send_req, zmq.POLLIN) self.send_req.setsockopt_string(zmq.IDENTITY, self.identity) self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}") @@ -108,22 +113,29 @@ def _init_greeting(self): "uuid": self.uuid, } msg_str = json.dumps(msg) - logger.debug("_init_greeting() sending 'sender-ready'") - self.send_req.send_multipart([b"sender-ready", msg_str.encode("utf-8")]) - logger.debug(f"Id: {self.identity} Initial greeting: {msg}") - # Register our poller to monitor for POLLIN events. - self.poller.register(self.send_req, zmq.POLLIN) + msg = msg_str.encode("utf-8") + command = b"sender-ready" + rcommand, rmsg = self._send_recv(command, msg, send_only=True) + logger.debug(f"_send_recv(command={command}, msg={msg}) -> {rcommand}, {rmsg}") + logger.debug(f"Id: {self.identity} Initial greeting Done") + - def _send_recv(self, command: bytes, msg: bytes = b""): + def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False): logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})") self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8") rcommand = rmsg = b"" - self.send_req.send_multipart([command, msg]) + tracker = self.send_req.send_multipart([command, msg], copy=False, track=True) + if not tracker.done: + logger.debug(f"Waiting max 2 seconds for send of commmand ({command})") + # https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone + tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone # There is no retry logic here because it's an overkill at the moment. # If the stream is interrupted, we can only start from the beginning # again. So we wait patiently, but only once. Perhaps we can implement # a buffering or temporary caching strategy to make this part robust. - events = dict(self.poller.poller(60000)) # 60 seconds. + if send_only: + return command, b"send_only-succeeded" + events = dict(self.poller.poll(60000)) # 60 seconds. if events.get(self.send_req) == zmq.POLLIN: rcommand, rmsg = self.send_req.recv_multipart() # len(b"") == 0 so change to test for command != b"" instead