Skip to content

Commit

Permalink
(t) replication spawn error - continued 5 rockstor#2766
Browse files Browse the repository at this point in the history
- Yet more debug logging.
- Remove use of None from within zmq command/message passing:
to help with stricter type hinting.
  • Loading branch information
phillxnet committed Dec 31, 2023
1 parent 64810f9 commit 5700577
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def run(self):
f"Active Receiver: {rs}. Messages processed: {count}"
)
if command == b"sender-ready":
logger.debug(f"initial greeting from {address}")
logger.debug(f"initial greeting command '{command}' received from {address}")
# Start a new receiver and send the appropriate response
try:
start_nr = True
Expand Down
6 changes: 3 additions & 3 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ def _delete_old_snaps(self, share_name, share_path, num_retain):

def _send_recv(self, command: bytes, msg: bytes = b""):
logger.debug(f"RECEIVER: _send_recv called with command: {command}, msg: {msg}.")
rcommand = rmsg = None
rcommand = rmsg = b""
self.dealer.send_multipart([command, msg])
# Retry logic doesn't make sense atm. So one long patient wait.
socks = dict(self.poll.poll(60000)) # 60 seconds.
if socks.get(self.dealer) == zmq.POLLIN:
rcommand, rmsg = self.dealer.recv_multipart()
logger.debug(f"Id: {self.identity} command: {command} rcommand: {rcommand}")
logger.debug(f"Id: {self.identity} RECEIVER: _send_recv command: {command} rcommand: {rcommand}")
logger.debug(f"remote message: {rmsg}")
return rcommand, rmsg

Expand Down Expand Up @@ -270,7 +270,7 @@ def run(self):

self.msg = b"Failed to send receiver-ready"
rcommand, rmsg = self._send_recv(b"receiver-ready", b"")
if rcommand is None:
if rcommand == b"":
logger.error(
f"Id: {self.identity}. No response from the broker for receiver-ready command. Aborting."
)
Expand Down
23 changes: 11 additions & 12 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _sys_exit(self, code):
sys.exit(code)

def _init_greeting(self):
logger.debug("_init_greeting CALLED")
logger.debug("_init_greeting() CALLED")
self.send_req = self.ctx.socket(zmq.DEALER)
self.send_req.setsockopt_string(zmq.IDENTITY, self.identity)
self.send_req.connect(f"tcp://{self.receiver_ip}:{self.receiver_port}")
Expand All @@ -105,14 +105,15 @@ def _init_greeting(self):
"uuid": self.uuid,
}
msg_str = json.dumps(msg)
logger.debug("_init_greeting() sending 'sender-ready'")
self.send_req.send_multipart([b"sender-ready", msg_str.encode("utf-8")])
logger.debug(f"Id: {self.identity} Initial greeting: {msg}")
self.poll.register(self.send_req, zmq.POLLIN)

def _send_recv(self, command: bytes, msg: bytes = b""):
logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})")
self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8")
rcommand = rmsg = None
rcommand = rmsg = b""
self.send_req.send_multipart([command, msg])
# There is no retry logic here because it's an overkill at the moment.
# If the stream is interrupted, we can only start from the beginning
Expand All @@ -121,18 +122,17 @@ def _send_recv(self, command: bytes, msg: bytes = b""):
socks = dict(self.poll.poll(60000)) # 60 seconds.
if socks.get(self.send_req) == zmq.POLLIN:
rcommand, rmsg = self.send_req.recv_multipart()
if (
len(command) > 0 or (rcommand is not None and rcommand != b"send-more")
) or ( # noqa E501
len(command) > 0 and rcommand is None
# len(b"") == 0 so change to test for command != b"" instead
if (len(command) > 0 or (rcommand != b"" and rcommand != b"send-more")) or (
len(command) > 0 and rcommand == b""
):
logger.debug(
f"Id: {self.identity} Server: {self.receiver_ip}:{self.receiver_port} scommand: {command} rcommand: {rcommand}"
)
return rcommand, rmsg

def _delete_old_snaps(self, share_path: str):

logger.debug(f"Sender _delete_old_snaps(share_path={share_path})")
oldest_snap = get_oldest_snap(
share_path, self.max_snap_retain, regex="_replication_"
)
Expand Down Expand Up @@ -373,12 +373,11 @@ def run(self):
logger.debug(
f"Id: {self.identity} Sender alive. Data transferred: {dsize}. Rate: {drate}/sec."
)
if command is None or command == b"receiver-error":
# command is None when the remote side vanishes.
if command == b"" or command == b"receiver-error":
# command is EMPTY when the remote side vanishes.
self.msg = (
f"Got null or error command({command}) message({message}) "
"from the Receiver while "
"transmitting fsdata. Aborting."
f"Got EMPTY or error command ({command}) message ({message}) "
"from the Receiver while transmitting fsdata. Aborting."
).encode("utf-8")
raise Exception(message)

Expand Down
4 changes: 4 additions & 0 deletions src/rockstor/smart_manager/replication/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def validate_src_share(self, sender_uuid: str, sname: str):
return self.raw.api_call(url=f"shares/{sname}")

def update_replica_status(self, rtid: int, data):
logger.debug(f"update_replica_status(rtid={rtid}, data={data})")
try:
url = f"sm/replicas/trail/{rtid}"
return self.law.api_call(url, data=data, calltype="put")
Expand Down Expand Up @@ -82,6 +83,7 @@ def rshare_id(self, sname: str) -> int:
return rshare["id"]

def create_rshare(self, data) -> int:
logger.debug(f"create_rshare(data={data})")
try:
url = "sm/replicas/rshare"
rshare = self.law.api_call(
Expand All @@ -95,8 +97,10 @@ def create_rshare(self, data) -> int:
raise e

def create_receive_trail(self, rid: int, data) -> int:
logger.debug(f"create_receive_trail(rid={rid}, data={data})")
url = f"sm/replicas/rtrail/rshare/{rid}"
rt = self.law.api_call(url, data=data, calltype="post", save_error=False)
logger.debug(f"create_receive_trail() -> {rt['id']}")
return rt["id"]

def update_receive_trail(self, rtid: int, data):
Expand Down

0 comments on commit 5700577

Please sign in to comment.