Skip to content

Commit

Permalink
(t) replication spawn error - continued 7 rockstor#2766
Browse files Browse the repository at this point in the history
- refactor poll -> poller socks -> events for readability.
- additional explanatory comments re sockets etc.
- formatting typo re fstrings.
- additional typing.
  • Loading branch information
phillxnet committed Jan 2, 2024
1 parent 1f76338 commit 28e1bf6
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 43 deletions.
39 changes: 27 additions & 12 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from multiprocessing import Process
from typing import Any
import zmq
import os
import json
Expand All @@ -33,9 +34,11 @@


class ReplicaScheduler(ReplicationMixin, Process):
local_receivers: dict[Any, Any]

def __init__(self):
self.law = None
self.local_receivers = None
self.local_receivers = {}
self.ppid = os.getpid()
self.senders = {} # Active Sender(outgoing) process map.
self.receivers = {} # Active Receiver process map.
Expand Down Expand Up @@ -200,33 +203,45 @@ def run(self):
return logger.error(msg)

ctx = zmq.Context()
frontend = ctx.socket(zmq.ROUTER)

# FRONTEND: IP
frontend = ctx.socket(zmq.ROUTER) # Sender socket.
# frontend.set_hwm(value=10)
frontend.bind(f"tcp://{self.listener_interface}:{ self.listener_port}")
# Bind to tcp://interface:port
frontend.bind(f"tcp://{self.listener_interface}:{self.listener_port}")

backend = ctx.socket(zmq.ROUTER)
ipc_socket = settings.REPLICATION.get("ipc_socket")
# BACKEND: IPC / UNIX SOCKET
backend = ctx.socket(zmq.ROUTER) # Sender socket
ipc_socket = settings.REPLICATION.get("ipc_socket") # /var/run/replication.sock
backend.bind(f"ipc://{ipc_socket}")

# POLLER
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#polling
poller = zmq.Poller()
# Register our poller, for both sockets, to monitor for POLLIN events.
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
self.local_receivers = {}

iterations = 3
poll_interval = 6000 # 6 seconds
msg_count = 0
while True:
# This loop may still continue even if replication service
# is terminated, as long as data is coming in.
socks = dict(poller.poll(timeout=poll_interval))
if frontend in socks and socks[frontend] == zmq.POLLIN:
# Get all events: returns imidiately if any exist, or waits for timeout.
# Event list of tuples of the form (socket, event_mask)):
events_list = poller.poll(timeout=2000)
logger.debug(f"LISTENER_BROKER: frontend & backend events_list poll = {events_list}")
# Dictionary mapping of socket : event_mask.
events = dict(events_list) # Wait period in milliseconds
if frontend in events and events[frontend] == zmq.POLLIN:
# frontend.recv_multipart() returns all as type <class 'bytes'>
#
address, command, msg = frontend.recv_multipart()
logger.debug("frontend.recv_multipart() returns")
logger.debug(f"address = {address}, type {type(address)}")
logger.debug(f"command = {command}, type {type(command)}")
logger.debug(f"msg = {msg}, type {type(msg)}")
# Keep a numerical events tally of per remote sender's events:
if address not in self.remote_senders:
self.remote_senders[address] = 1
else:
Expand All @@ -243,7 +258,7 @@ def run(self):
# Start a new receiver and send the appropriate response
try:
start_nr = True
if address in self.local_receivers:
if address in self.local_receivers.keys():
start_nr = False
ecode = self.local_receivers[address].exitcode
if ecode is not None:
Expand Down Expand Up @@ -283,7 +298,7 @@ def run(self):
# do we hit hwm? is the dealer still connected?
backend.send_multipart([address, command, msg.encode("utf-8")])

elif backend in socks and socks[backend] == zmq.POLLIN:
elif backend in events and events[backend] == zmq.POLLIN:
address, command, msg = backend.recv_multipart()
if command == b"new-send":
rid = int(msg)
Expand All @@ -302,7 +317,7 @@ def run(self):
logger.error(msg)
finally:
backend.send_multipart([address, rcommand, msg.encode("utf-8")])
elif address in self.remote_senders:
elif address in self.remote_senders.keys():
if (
command == b"receiver-ready"
or command == b"receiver-error"
Expand Down
39 changes: 22 additions & 17 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def _clean_exit_handler(self):
]
)
# Retry logic here is overkill atm.
socks = dict(self.poll.poll(60000)) # 60 seconds
if socks.get(self.dealer) == zmq.POLLIN:
events = dict(self.poller.poll(60000)) # 60 seconds
if events.get(self.dealer) == zmq.POLLIN:
msg = self.dealer.recv()
logger.debug(
f"Id: {self.identity}. Response from the broker: {msg}"
Expand All @@ -128,14 +128,18 @@ def _delete_old_snaps(self, share_name, share_path, num_retain):
return self._delete_old_snaps(share_name, share_path, num_retain)

def _send_recv(self, command: bytes, msg: bytes = b""):
logger.debug(f"RECEIVER: _send_recv called with command: {command}, msg: {msg}.")
logger.debug(
f"RECEIVER: _send_recv called with command: {command}, msg: {msg}."
)
rcommand = rmsg = b""
self.dealer.send_multipart([command, msg])
# Retry logic doesn't make sense atm. So one long patient wait.
socks = dict(self.poll.poll(60000)) # 60 seconds.
if socks.get(self.dealer) == zmq.POLLIN:
events = dict(self.poller.poll(60000)) # 60 seconds.
if events.get(self.dealer) == zmq.POLLIN:
rcommand, rmsg = self.dealer.recv_multipart()
logger.debug(f"Id: {self.identity} RECEIVER: _send_recv command: {command} rcommand: {rcommand}")
logger.debug(
f"Id: {self.identity} RECEIVER: _send_recv command: {command} rcommand: {rcommand}"
)
logger.debug(f"remote message: {rmsg}")
return rcommand, rmsg

Expand All @@ -161,13 +165,15 @@ def run(self):
latest_snap = None
with self._clean_exit_handler():
self.law = APIWrapper()
self.poll = zmq.Poller()
self.dealer = self.ctx.socket(zmq.DEALER)
self.poller = zmq.Poller()
self.dealer = self.ctx.socket(zmq.DEALER) # Setup OUTPUT socket type.
self.dealer.setsockopt_string(zmq.IDENTITY, str(self.identity))
# self.dealer.set_hwm(10)
ipc_socket = settings.REPLICATION.get("ipc_socket")
self.dealer.connect(f"ipc://{ipc_socket}")
self.poll.register(self.dealer, zmq.POLLIN)
# Setup poller
# Register our poller, for OUTPUT socket, to monitor for POLLIN events.
self.poller.register(self.dealer, zmq.POLLIN)

self.ack = True
self.msg = (
Expand Down Expand Up @@ -281,18 +287,17 @@ def run(self):
self._sys_exit(3)

num_tries = 10
poll_interval = 6000 # 6 seconds
num_msgs = 0
t0 = time.time()
while True:
socks = dict(self.poll.poll(poll_interval))
logger.debug(f"RECEIVER socks dict = {socks}")
if socks != {}:
for key in socks:
logger.debug(f"socks index ({key}), has value {socks[key]}")
events = dict(self.poller.poll(timeout=6000)) # 6 seconds
logger.debug(f"RECEIVER events dict = {events}")
if events != {}:
for key in events:
logger.debug(f"events index ({key}), has value {events[key]}")
else:
logger.debug("SOCKS EMPTY")
if socks.get(self.dealer) == zmq.POLLIN:
logger.debug("EVENTS EMPTY")
if events.get(self.dealer) == zmq.POLLIN:
# reset to wait upto 60(poll_interval x num_tries
# milliseconds) for every message
num_tries = 10
Expand Down
32 changes: 18 additions & 14 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
class Sender(ReplicationMixin, Process):
def __init__(self, uuid, receiver_ip, replica, rt=None):
self.law = None
self.poll = None
self.poller = None
self.uuid = uuid
self.receiver_ip = receiver_ip
self.receiver_port = replica.data_port
Expand All @@ -58,7 +58,8 @@ def __init__(self, uuid, receiver_ip, replica, rt=None):
self.sp = None
# Latest snapshot per Receiver(comes along with receiver-ready)
self.rlatest_snap = None
self.ctx = zmq.Context()
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#zmq.Context
self.ctx = zmq.Context().instance()
self.msg = b""
self.update_trail = False
self.total_bytes_sent = 0
Expand Down Expand Up @@ -94,8 +95,10 @@ def _sys_exit(self, code):

def _init_greeting(self):
logger.debug("_init_greeting() CALLED")
# Create our send (DEALER) socket using our context (ctx)
self.send_req = self.ctx.socket(zmq.DEALER)
self.send_req.setsockopt_string(zmq.IDENTITY, self.identity)

self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}")
msg = {
"pool": self.replica.dpool,
Expand All @@ -108,7 +111,8 @@ def _init_greeting(self):
logger.debug("_init_greeting() sending 'sender-ready'")
self.send_req.send_multipart([b"sender-ready", msg_str.encode("utf-8")])
logger.debug(f"Id: {self.identity} Initial greeting: {msg}")
self.poll.register(self.send_req, zmq.POLLIN)
# Register our poller to monitor for POLLIN events.
self.poller.register(self.send_req, zmq.POLLIN)

def _send_recv(self, command: bytes, msg: bytes = b""):
logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})")
Expand All @@ -119,8 +123,8 @@ def _send_recv(self, command: bytes, msg: bytes = b""):
# If the stream is interrupted, we can only start from the beginning
# again. So we wait patiently, but only once. Perhaps we can implement
# a buffering or temporary caching strategy to make this part robust.
socks = dict(self.poll.poll(60000)) # 60 seconds.
if socks.get(self.send_req) == zmq.POLLIN:
events = dict(self.poller.poller(60000)) # 60 seconds.
if events.get(self.send_req) == zmq.POLLIN:
rcommand, rmsg = self.send_req.recv_multipart()
# len(b"") == 0 so change to test for command != b"" instead
if (len(command) > 0 or (rcommand != b"" and rcommand != b"send-more")) or (
Expand Down Expand Up @@ -210,7 +214,7 @@ def run(self):
self.msg = f"Top level exception in sender: {self.identity}".encode("utf-8")
with self._clean_exit_handler():
self.law = APIWrapper()
self.poll = zmq.Poller()
self.poller = zmq.Poller()
self._init_greeting()

# Create a new replica trail if it's the very first time,
Expand Down Expand Up @@ -251,14 +255,14 @@ def run(self):
)

while True:
socks = dict(self.poll.poll(6000))
logger.debug(f"SENDER socks dict = {socks}")
if socks != {}:
for key in socks:
logger.debug(f"socks index ({key}), has value {socks[key]}")
events = dict(self.poller.poll(6000))
logger.debug(f"SENDER events dict = {events}")
if events != {}:
for key in events:
logger.debug(f"events index ({key}), has value {events[key]}")
else:
logger.debug("SOCKS EMPTY")
if socks.get(self.send_req) == zmq.POLLIN:
logger.debug("EVENTS EMPTY")
if events.get(self.send_req) == zmq.POLLIN:
# not really necessary because we just want one reply for
# now.
command, reply = self.send_req.recv_multipart()
Expand Down Expand Up @@ -306,7 +310,7 @@ def run(self):
raise Exception(self.msg)
self.send_req.setsockopt(zmq.LINGER, 0)
self.send_req.close()
self.poll.unregister(self.send_req)
self.poller.unregister(self.send_req)
self._init_greeting()

snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.snap_name}"
Expand Down

0 comments on commit 28e1bf6

Please sign in to comment.