Skip to content

Commit

Permalink
(t) replication spawn error - avoid btrfs-send log-spam rockstor#2766
Browse files Browse the repository at this point in the history
- During debug logging we only show the first 180 bytes of the
message, this avoids log-spamming MBs of btrfs-send stream data.
- Send btrfs-send byte stream in 10MB chunks.
  • Loading branch information
phillxnet committed Jan 9, 2024
1 parent 6a3ea30 commit 2a804f4
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 22 deletions.
50 changes: 37 additions & 13 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,15 @@ def run(self):
if frontend in events and events[frontend] == zmq.POLLIN:
# frontend.recv_multipart() returns all as type <class 'bytes'>
address, command, msg = frontend.recv_multipart()
logger.debug(f"frontend.recv_multipart() -> command={command}, msg={msg}")
# Limit debug msg to 180 chars to avoid MBs of btrfs-send-stream msg in log
if len(msg) > 180:
logger.debug(
f"frontend.recv_multipart() -> address= {address}, command={command}, msg={msg[0:180]} - log spam TRIMMED"
)
else:
logger.debug(
f"frontend.recv_multipart() -> address= {address}, command={command}, msg={msg}"
)
# Keep a numerical events tally of per remote sender's events:
if address not in self.remote_senders:
self.remote_senders[address] = 1
Expand All @@ -260,7 +268,9 @@ def run(self):
)

if command == b"sender-ready":
logger.debug(f"initial greeting command '{command}' received from {address}")
logger.debug(
f"initial greeting command '{command}' received from {address}"
)
# Start a new receiver and send the appropriate response
try:
start_nr = True
Expand All @@ -274,7 +284,9 @@ def run(self):
)
start_nr = True
else:
msg = f"Receiver({address}) already exists. Will not start a new one.".encode("utf-8")
msg = f"Receiver({address}) already exists. Will not start a new one.".encode(
"utf-8"
)
logger.error(msg)
# TODO: There may be a different way to handle
# this. For example, we can pass the message to
Expand All @@ -295,11 +307,11 @@ def run(self):
self.local_receivers[address] = nr
continue
except Exception as e:
msg = f"Exception while starting the new receiver for {address}: {e.__str__()}".encode("utf-8")
logger.error(msg)
frontend.send_multipart(
[address, b"receiver-init-error", msg]
msg = f"Exception while starting the new receiver for {address}: {e.__str__()}".encode(
"utf-8"
)
logger.error(msg)
frontend.send_multipart([address, b"receiver-init-error", msg])
else:
# do we hit hwm? is the dealer still connected?
backend.send_multipart([address, command, msg])
Expand Down Expand Up @@ -328,24 +340,36 @@ def run(self):
finally:
backend.send_multipart([address, rcommand, msg.encode("utf-8")])
elif address in self.remote_senders.keys():
logger.debug(f"Identity/address {address}, found in remote_senders.keys()")
logger.debug(
f"Identity/address {address}, found in remote_senders.keys()"
)
if (
command == b"receiver-ready"
or command == b"receiver-error"
or command == b"btrfs-recv-finished"
):
logger.debug(f"command: {command}, sending 'ACK' to backend.")
tracker = backend.send_multipart([address, b"ACK", b""], copy=False, track=True)
tracker = backend.send_multipart(
[address, b"ACK", b""], copy=False, track=True
)
if not tracker.done:
logger.debug(f"Waiting max 2 seconds for send of commmand ({command})")
logger.debug(
f"Waiting max 2 seconds for send of commmand ({command})"
)
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone
tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone
tracker.wait(
timeout=2
) # seconds as float: raises zmq.NotDone
# a new receiver has started. reply to the sender that
# must be waiting
logger.debug(f"command: {command}, sending to frontend.")
tracker = frontend.send_multipart([address, command, msg], copy=False, track=True)
tracker = frontend.send_multipart(
[address, command, msg], copy=False, track=True
)
if not tracker.done:
logger.debug(f"Waiting max 2 seconds for send of commmand ({command})")
logger.debug(
f"Waiting max 2 seconds for send of commmand ({command})"
)
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone
tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone
else:
Expand Down
2 changes: 1 addition & 1 deletion src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def run(self):
# milliseconds) for every message
num_tries = 10
command, message = self.dealer.recv_multipart()
logger.debug(f"command = {command}, of type: {type(command)}")
logger.debug(f"command = {command}")
if command == b"btrfs-send-stream-finished":
# this command concludes fsdata transfer. After this,
# btrfs-recev process should be
Expand Down
26 changes: 18 additions & 8 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ def _init_greeting(self):
logger.debug(f"Id: {self.identity} Initial greeting Done")

def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False):
logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})")
# Limit debug msg to 180 chars to avoid MBs of btrfs-send-stream msg in log
if len(msg) > 180:
logger.debug(
f"_send_recv(command={command}, msg={msg[0:180]}) - log spam TRIMMED"
)
else:
logger.debug(f"_send_recv(command={command}, msg={msg})")
self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8")
rcommand = rmsg = b""
tracker = self.send_req.send_multipart([command, msg], copy=False, track=True)
Expand Down Expand Up @@ -282,13 +288,14 @@ def run(self):
# not really necessary because we just want one reply for
# now.
command, reply = self.send_req.recv_multipart()
logger.debug(f"command = {command}, of type {type(command)}")
logger.debug(f"command = {command}")
if command == b"receiver-ready":
if self.rt is not None:
self.rlatest_snap = reply.decode("utf-8")
self.rt = self._refresh_rt()
logger.debug(
f"Id: {self.identity}. command({command}) and message({reply}) received. Proceeding to send fsdata."
f"Id: {self.identity}. command({command}) & message({reply}) received. "
"Proceed to send btrfs_send_stream."
)
break
else:
Expand Down Expand Up @@ -381,8 +388,11 @@ def run(self):
alive = False
# Read all available data from stdout without blocking (requires bytes stream).
# https://docs.python.org/3/library/io.html#io.BufferedIOBase.read1
send_data = self.sp.stdout.read1()
if send_data is None:
# We limit/chunck this read1 to a set number of bytes per cycle.
# Btrfs uses 256 MB chunk on disk
# Arbitrarily chunking to 10MB
btrfs_send_stream = self.sp.stdout.read1(10000000)
if btrfs_send_stream is None:
logger.debug("sp.stdout empty")
continue
except IOError: # TODO: Non functional in Py3 (Py2.7 behaviour)
Expand All @@ -400,12 +410,12 @@ def run(self):
)
self._sys_exit(3)

self.msg = f"Failed to send 'send_data' to the receiver for {self.snap_id}. Aborting.".encode(
self.msg = f"Failed to send 'btrfs_send_stream' to the receiver for {self.snap_id}. Aborting.".encode(
"utf-8"
)
self.update_trail = True
command, message = self._send_recv(b"", send_data)
self.total_bytes_sent += len(send_data)
command, message = self._send_recv(b"", btrfs_send_stream)
self.total_bytes_sent += len(btrfs_send_stream)
num_msgs += 1
if num_msgs == 1000:
num_msgs = 0
Expand Down

0 comments on commit 2a804f4

Please sign in to comment.