Skip to content

Commit

Permalink
(t) replication spawn error - continued 9 rockstor#2766
Browse files Browse the repository at this point in the history
- Enable tracker on listender_broker and sender:
- add zmq_version and libzmq_version properties to sender,
receiver also now has these.
  • Loading branch information
phillxnet committed Jan 3, 2024
1 parent 2000b2d commit 4de3275
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
11 changes: 8 additions & 3 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ def run(self):
msg = f"Failed to get uuid of current appliance. Aborting. Exception: {e.__str__()}"
return logger.error(msg)

# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.send_multipart
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.copy_threshold
logger.debug("DISABLING COPY_THRESHOLD to enable message tracking.")
zmq.COPY_THRESHOLD = 0

ctx = zmq.Context()

# FRONTEND: IP
Expand All @@ -222,14 +227,14 @@ def run(self):
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

iterations = 3
iterations = 5
msg_count = 0
while True:
# This loop may still continue even if replication service
# is terminated, as long as data is coming in.
# Get all events: returns imidiately if any exist, or waits for timeout.
# Event list of tuples of the form (socket, event_mask)):
events_list = poller.poll(timeout=2000) # Wait period in milliseconds
events_list = poller.poll(timeout=2000) # Max wait period in milliseconds
logger.debug(f"EVENT_LIST poll = {events_list}")
# Dictionary mapping of socket : event_mask.
events = dict(events_list)
Expand Down Expand Up @@ -328,7 +333,7 @@ def run(self):
else:
iterations -= 1
if iterations == 0:
iterations = 10
iterations = 5
self._prune_senders()
self._delete_receivers()
cur_time = time.time()
Expand Down
9 changes: 4 additions & 5 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(self, identity: bytes, meta):
self.rtid = None
# We mirror senders max_snap_retain via settings.REPLICATION
self.num_retain_snaps = settings.REPLICATION.get("max_snap_retain")
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Context
self.ctx = zmq.Context()
self.zmq_version = zmq.__version__
self.libzmq_version = zmq.zmq_version()
Expand Down Expand Up @@ -140,7 +141,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""):
rcommand = rmsg = b""
tracker = self.dealer.send_multipart([command, msg], copy=False, track=True)
if not tracker.done:
logger.debug(f"Waiting 2 seconds for send of commmand ({command})")
logger.debug(f"Waiting max 2 seconds for send of commmand ({command})")
tracker.wait(timeout=2) # seconds as float
# Note: And exception here would inform the receiver within the WebUI record.
events = dict(self.poller.poll(timeout=5000))
Expand Down Expand Up @@ -170,16 +171,14 @@ def run(self):
logger.debug(
f"Id: {self.identity}. Starting a new Receiver for meta: {self.meta}"
)
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Socket.send_multipart
logger.debug("DISABLING COPY_THESHOLD to enable message tracking.")
zmq.COPY_THRESHOLD = 0

self.msg = b"Top level exception in receiver"
latest_snap = None
with self._clean_exit_handler():
self.law = APIWrapper()
self.poller = zmq.Poller()
self.dealer = self.ctx.socket(zmq.DEALER) # Setup OUTPUT socket type.
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket
self.dealer = self.ctx.socket(zmq.DEALER, copy_threshold=0) # Setup OUTPUT socket type.
self.dealer.setsockopt_string(zmq.IDENTITY, str(self.identity))
# self.dealer.set_hwm(10)
ipc_socket = settings.REPLICATION.get("ipc_socket")
Expand Down
32 changes: 22 additions & 10 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def __init__(self, uuid, receiver_ip, replica, rt=None):
# Latest snapshot per Receiver(comes along with receiver-ready)
self.rlatest_snap = None
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Context
self.ctx = zmq.Context().instance()
self.ctx = zmq.Context()
self.zmq_version = zmq.__version__
self.libzmq_version = zmq.zmq_version()
self.msg = b""
self.update_trail = False
self.total_bytes_sent = 0
Expand Down Expand Up @@ -96,7 +98,10 @@ def _sys_exit(self, code):
def _init_greeting(self):
logger.debug("_init_greeting() CALLED")
# Create our send (DEALER) socket using our context (ctx)
self.send_req = self.ctx.socket(zmq.DEALER)
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket
self.send_req = self.ctx.socket(zmq.DEALER, copy_threshold=0)
# Register our poller to monitor for POLLIN events.
self.poller.register(self.send_req, zmq.POLLIN)
self.send_req.setsockopt_string(zmq.IDENTITY, self.identity)

self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}")
Expand All @@ -108,22 +113,29 @@ def _init_greeting(self):
"uuid": self.uuid,
}
msg_str = json.dumps(msg)
logger.debug("_init_greeting() sending 'sender-ready'")
self.send_req.send_multipart([b"sender-ready", msg_str.encode("utf-8")])
logger.debug(f"Id: {self.identity} Initial greeting: {msg}")
# Register our poller to monitor for POLLIN events.
self.poller.register(self.send_req, zmq.POLLIN)
msg = msg_str.encode("utf-8")
command = b"sender-ready"
rcommand, rmsg = self._send_recv(command, msg, send_only=True)
logger.debug(f"_send_recv(command={command}, msg={msg}) -> {rcommand}, {rmsg}")
logger.debug(f"Id: {self.identity} Initial greeting Done")


def _send_recv(self, command: bytes, msg: bytes = b""):
def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False):
logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})")
self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8")
rcommand = rmsg = b""
self.send_req.send_multipart([command, msg])
tracker = self.send_req.send_multipart([command, msg], copy=False, track=True)
if not tracker.done:
logger.debug(f"Waiting max 2 seconds for send of commmand ({command})")
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone
tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone
# There is no retry logic here because it's an overkill at the moment.
# If the stream is interrupted, we can only start from the beginning
# again. So we wait patiently, but only once. Perhaps we can implement
# a buffering or temporary caching strategy to make this part robust.
events = dict(self.poller.poller(60000)) # 60 seconds.
if send_only:
return command, b"send_only-succeeded"
events = dict(self.poller.poll(60000)) # 60 seconds.
if events.get(self.send_req) == zmq.POLLIN:
rcommand, rmsg = self.send_req.recv_multipart()
# len(b"") == 0 so change to test for command != b"" instead
Expand Down

0 comments on commit 4de3275

Please sign in to comment.