Skip to content

Commit

Permalink
(t) replication spawn error - continued 4 rockstor#2766
Browse files Browse the repository at this point in the history
- More debug logging.
- reduce retry iterations from 10 to 3.
- more type hints.
  • Loading branch information
phillxnet committed Dec 30, 2023
1 parent 3d89af3 commit 64810f9
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 11 deletions.
11 changes: 8 additions & 3 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,20 @@ def run(self):
poller.register(backend, zmq.POLLIN)
self.local_receivers = {}

iterations = 10
iterations = 3
poll_interval = 6000 # 6 seconds
msg_count = 0
while True:
# This loop may still continue even if replication service
# is terminated, as long as data is coming in.
socks = dict(poller.poll(timeout=poll_interval))
if frontend in socks and socks[frontend] == zmq.POLLIN:
# frontend.recv_multipart() returns all as type <class 'bytes'>
address, command, msg = frontend.recv_multipart()
logger.debug("frontend.recv_multipart() returns")
logger.debug(f"address = {address}, type {type(address)}")
logger.debug(f"command = {command}, type {type(command)}")
logger.debug(f"msg = {msg}, type {type(msg)}")
if address not in self.remote_senders:
self.remote_senders[address] = 1
else:
Expand All @@ -248,7 +253,7 @@ def run(self):
)
start_nr = True
else:
msg = f"Receiver({address}) already exists. Will not start a new one."
msg = f"Receiver({address}) already exists. Will not start a new one.".encode("utf-8")
logger.error(msg)
# TODO: There may be a different way to handle
# this. For example, we can pass the message to
Expand All @@ -258,7 +263,7 @@ def run(self):
[
address,
b"receiver-init-error",
msg.encode("utf-8"),
msg,
]
)
if start_nr:
Expand Down
10 changes: 6 additions & 4 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@


class Receiver(ReplicationMixin, Process):
def __init__(self, identity, meta):
sname: str

def __init__(self, identity: bytes, meta):
self.identity = identity
self.meta = json.loads(meta)
self.src_share = self.meta["share"]
Expand Down Expand Up @@ -126,15 +128,15 @@ def _delete_old_snaps(self, share_name, share_path, num_retain):
return self._delete_old_snaps(share_name, share_path, num_retain)

def _send_recv(self, command: bytes, msg: bytes = b""):
logger.debug(f"_send_recv called with command: {command}, msg: {msg}.")
logger.debug(f"RECEIVER: _send_recv called with command: {command}, msg: {msg}.")
rcommand = rmsg = None
self.dealer.send_multipart([command, msg])
# Retry logic doesn't make sense atm. So one long patient wait.
socks = dict(self.poll.poll(60000)) # 60 seconds.
if socks.get(self.dealer) == zmq.POLLIN:
rcommand, rmsg = self.dealer.recv_multipart()
logger.debug(f"Id: {self.identity} command: {command} rcommand: {rcommand}")
logger.debug((f"remote message: {rmsg}"))
logger.debug(f"remote message: {rmsg}")
return rcommand, rmsg

def _latest_snap(self, rso):
Expand Down Expand Up @@ -285,7 +287,7 @@ def run(self):
# milliseconds) for every message
num_tries = 10
command, message = self.dealer.recv_multipart()
logger.debug(f"command = {command}, of type:", type(command))
logger.debug(f"command = {command}, of type: {type(command)}")
if command == b"btrfs-send-stream-finished":
# this command concludes fsdata transfer. After this,
# btrfs-recev process should be
Expand Down
10 changes: 9 additions & 1 deletion src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def _sys_exit(self, code):
sys.exit(code)

def _init_greeting(self):
logger.debug("_init_greeting CALLED")
self.send_req = self.ctx.socket(zmq.DEALER)
self.send_req.setsockopt_string(zmq.IDENTITY, self.identity)
self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}")
Expand All @@ -109,6 +110,7 @@ def _init_greeting(self):
self.poll.register(self.send_req, zmq.POLLIN)

def _send_recv(self, command: bytes, msg: bytes = b""):
logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})")
self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8")
rcommand = rmsg = None
self.send_req.send_multipart([command, msg])
Expand All @@ -130,6 +132,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""):
return rcommand, rmsg

def _delete_old_snaps(self, share_path: str):

oldest_snap = get_oldest_snap(
share_path, self.max_snap_retain, regex="_replication_"
)
Expand Down Expand Up @@ -251,11 +254,16 @@ def run(self):
while True:
socks = dict(self.poll.poll(poll_interval))
logger.debug(f"Sender socks dict = {socks}")
if socks != {}:
for key in socks:
logger.debug(f"socks index ({key}), has value {socks[key]}")
else:
logger.debug("SOCKS EMPTY")
if socks.get(self.send_req) == zmq.POLLIN:
# not really necessary because we just want one reply for
# now.
command, reply = self.send_req.recv_multipart()
logger.debug(f"command = {command}, of type", type(command))
logger.debug(f"command = {command}, of type {type(command)}")
if command == b"receiver-ready":
if self.rt is not None:
self.rlatest_snap = reply
Expand Down
7 changes: 4 additions & 3 deletions src/rockstor/smart_manager/replication/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def disable_replica(self, rid: int):
raise Exception(msg)

def create_replica_trail(self, rid: int, snap_name: str):
logger.debug(f"Replication create_replica_trail(rid={rid}, snap_name={snap_name})")
url = f"sm/replicas/trail/replica/{rid}"
return self.law.api_call(
url,
Expand Down Expand Up @@ -153,7 +154,7 @@ def update_repclone(self, sname: str, snap_name: str):
share with the given snapshot. Intended for use in receive.py to turn
the oldest snapshot into an existing share via unmount, mv, mount
cycle.
:param sname: Existing share name
:param sname: Existing share-name
:param snap_name: Name of snapshot to supplant given share with.
:return: False if there is a failure.
"""
Expand Down Expand Up @@ -184,8 +185,8 @@ def delete_snapshot(self, sname: str, snap_name: str):
return False
raise e

def create_share(self, sname: str, pool):
print("pool parameter is of type:", type(pool))
def create_share(self, sname: str, pool: str):
print(f"Replication 'create_share' called with sname {sname}, pool {pool}")
try:
url = "shares"
data = {
Expand Down

0 comments on commit 64810f9

Please sign in to comment.