Skip to content

Commit

Permalink
(t) replication spawn error - set stdout read1 chunking rockstor#2766
Browse files Browse the repository at this point in the history
- Avoid logging btrfs data stream contents entirely.
- Set read1() bytes read to 100MB max.
  • Loading branch information
phillxnet committed Jan 10, 2024
1 parent 2a804f4 commit 8392bee
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 35 deletions.
13 changes: 4 additions & 9 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,12 @@ def run(self):
if frontend in events and events[frontend] == zmq.POLLIN:
# frontend.recv_multipart() returns all as type <class 'bytes'>
address, command, msg = frontend.recv_multipart()
# Limit debug msg to 180 chars to avoid MBs of btrfs-send-stream msg in log
if len(msg) > 180:
logger.debug(
f"frontend.recv_multipart() -> address= {address}, command={command}, msg={msg[0:180]} - log spam TRIMMED"
)
# Avoid debug logging the btrfs-send-stream contents.
if command == b"" and msg != b"":
logger.debug("frontend.recv_multipart() -> command=b'', msg assumed BTRFS SEND BYTE STREAM")
else:
logger.debug(
f"frontend.recv_multipart() -> address= {address}, command={command}, msg={msg}"
f"frontend.recv_multipart() -> address={address}, command={command}, msg={msg}"
)
# Keep a numerical events tally of per remote sender's events:
if address not in self.remote_senders:
Expand Down Expand Up @@ -367,9 +365,6 @@ def run(self):
[address, command, msg], copy=False, track=True
)
if not tracker.done:
logger.debug(
f"Waiting max 2 seconds for send of commmand ({command})"
)
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone
tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone
else:
Expand Down
13 changes: 3 additions & 10 deletions src/rockstor/smart_manager/replication/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def _delete_old_snaps(self, share_name, share_path, num_retain):

def _send_recv(self, command: bytes, msg: bytes = b""):
logger.debug(
f"RECEIVER: _send_recv called with command: {command}, msg: {msg}."
f"_send_recv called with command: {command}, msg: {msg}."
)
rcommand = rmsg = b""
tracker = self.dealer.send_multipart([command, msg], copy=False, track=True)
Expand All @@ -154,7 +154,7 @@ def _send_recv(self, command: bytes, msg: bytes = b""):
if events.get(self.dealer) == zmq.POLLIN:
rcommand, rmsg = self.dealer.recv_multipart()
logger.debug(
f"Id: {self.identity} RECEIVER: _send_recv command: {command} rcommand: {rcommand}"
f"Id: {self.identity} _send_recv command: {command} rcommand: {rcommand}"
)
logger.debug(f"remote message: {rmsg}")
return rcommand, rmsg
Expand Down Expand Up @@ -310,15 +310,8 @@ def run(self):
start_time = time.time()
while True:
events = dict(self.poller.poll(timeout=6000)) # 6 seconds
logger.debug(f"RECEIVER events dict = {events}")
if events != {}:
for key in events:
logger.debug(f"events index ({key}), has value {events[key]}")
else:
logger.debug("EVENTS EMPTY")
logger.debug(f"Events dict = {events}")
if events.get(self.dealer) == zmq.POLLIN:
# reset to wait upto 60(poll_interval x num_tries
# milliseconds) for every message
num_tries = 10
command, message = self.dealer.recv_multipart()
logger.debug(f"command = {command}")
Expand Down
24 changes: 8 additions & 16 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,15 @@ def _init_greeting(self):
logger.debug(f"Id: {self.identity} Initial greeting Done")

def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False):
# Limit debug msg to 180 chars to avoid MBs of btrfs-send-stream msg in log
if len(msg) > 180:
logger.debug(
f"_send_recv(command={command}, msg={msg[0:180]}) - log spam TRIMMED"
)
# Avoid debug logging the btrfs-send-stream contents.
if command == b"" and msg != b"":
logger.debug("_send_recv(command=b'', msg assumed BTRFS SEND BYTE STREAM)")
else:
logger.debug(f"_send_recv(command={command}, msg={msg})")
logger.debug(f"_send_recv(command={command}, msg={msg}), send_only={send_only}")
self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8")
rcommand = rmsg = b""
tracker = self.send_req.send_multipart([command, msg], copy=False, track=True)
if not tracker.done:
logger.debug(f"Waiting max 2 seconds for send of commmand ({command})")
# https://pyzmq.readthedocs.io/en/latest/api/zmq.html#notdone
tracker.wait(timeout=2) # seconds as float: raises zmq.NotDone
# There is no retry logic here because it's an overkill at the moment.
Expand Down Expand Up @@ -276,14 +273,9 @@ def run(self):

while True:
events_list = self.poller.poll(6000)
logger.debug(f"Sender: EVENT_LIST poll = {events_list}")
logger.debug(f"EVENT_LIST poll = {events_list}")
events = dict(events_list)
logger.debug(f"SENDER events dict = {events}")
if events != {}:
for key in events:
logger.debug(f"events index ({key}), has value {events[key]}")
else:
logger.debug("EVENTS EMPTY")
logger.debug(f"Events dict = {events}")
if events.get(self.send_req) == zmq.POLLIN:
# not really necessary because we just want one reply for
# now.
Expand Down Expand Up @@ -390,8 +382,8 @@ def run(self):
# https://docs.python.org/3/library/io.html#io.BufferedIOBase.read1
# We limit/chunck this read1 to a set number of bytes per cycle.
# Btrfs uses 256 MB chunk on disk
# Arbitrarily chunking to 10MB
btrfs_send_stream = self.sp.stdout.read1(10000000)
# Arbitrarily chunk send process stdout via read1() bytes argument
btrfs_send_stream = self.sp.stdout.read1(100000000)
if btrfs_send_stream is None:
logger.debug("sp.stdout empty")
continue
Expand Down

0 comments on commit 8392bee

Please sign in to comment.