Skip to content

Commit

Permalink
(t) replication spawn error - more Py2.7 to 3 issues rockstor#2766
Browse files Browse the repository at this point in the history
- more str/byte issues.
- iostream behaviour differs, in-dev modifications.
  • Loading branch information
phillxnet committed Jan 5, 2024
1 parent 48cf26d commit fca03f5
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
9 changes: 5 additions & 4 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def run(self):
logger.debug(
f"Active Receiver: {rs}. Messages processed: {count}"
)

if command == b"sender-ready":
logger.debug(f"initial greeting command '{command}' received from {address}")
# Start a new receiver and send the appropriate response
Expand Down Expand Up @@ -294,14 +295,14 @@ def run(self):
self.local_receivers[address] = nr
continue
except Exception as e:
msg = f"Exception while starting the new receiver for {address}: {e.__str__()}"
msg = f"Exception while starting the new receiver for {address}: {e.__str__()}".encode("utf-8")
logger.error(msg)
frontend.send_multipart(
[address, b"receiver-init-error", msg.encode("utf-8")]
[address, b"receiver-init-error", msg]
)
else:
# do we hit hwm? is the dealer still connected?
backend.send_multipart([address, command, msg.encode("utf-8")])
backend.send_multipart([address, command, msg])

elif backend in events and events[backend] == zmq.POLLIN:
# backend.recv_multipart() returns all as type <class 'bytes'>
Expand All @@ -327,7 +328,7 @@ def run(self):
finally:
backend.send_multipart([address, rcommand, msg.encode("utf-8")])
elif address in self.remote_senders.keys():
logger.debug(f"Identity/address {address}, found in remove_senders.keys()")
logger.debug(f"Identity/address {address}, found in remote_senders.keys()")
if (
command == b"receiver-ready"
or command == b"receiver-error"
Expand Down
48 changes: 34 additions & 14 deletions src/rockstor/smart_manager/replication/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sys
import zmq
import subprocess
import io
import fcntl
import json
import time
Expand Down Expand Up @@ -122,7 +123,6 @@ def _init_greeting(self):
logger.debug(f"_send_recv(command={command}, msg={msg}) -> {rcommand}, {rmsg}")
logger.debug(f"Id: {self.identity} Initial greeting Done")


def _send_recv(self, command: bytes, msg: bytes = b"", send_only: bool = False):
logger.debug(f"SENDER: _send_recv(command={command}, msg={msg})")
self.msg = f"Failed while send-recv-ing command({command})".encode("utf-8")
Expand Down Expand Up @@ -332,22 +332,34 @@ def run(self):

snap_path = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.snap_name}"
cmd = [BTRFS, "send", snap_path]
logger.debug(f"init btrfs 'send' cmd {cmd}")
logger.debug(f"Initial btrfs 'send' cmd {cmd}")
if self.rt is not None:
prev_snap = f"{settings.MNT_PT}{self.replica.pool}/.snapshots/{self.replica.share}/{self.rt.snap_name}"
logger.info(
f"Id: {self.identity}. Sending incremental replica between {prev_snap} -- {snap_path}"
)
cmd = [BTRFS, "send", "-p", prev_snap, snap_path]
logger.debug(f"differential btrfs 'send' cmd {cmd}")
logger.debug(f"Differential btrfs 'send' cmd {cmd}")
else:
logger.info(f"Id: {self.identity}. Sending full replica: {snap_path}")

try:
# We force en_US to avoid issues on date and number formats
# on non Anglo-Saxon systems (ex. it, es, fr, de, etc)
fake_env = dict(os.environ)
fake_env["LANG"] = "en_US.UTF-8"
# all subprocess in and out are bytes by default.
# https://docs.python.org/3.11/library/subprocess.html#using-the-subprocess-module
# subprocess.run is blocking until execution has finnished.
self.sp = subprocess.Popen(
cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
# Get current stdout flags:
# stdout_flags = fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_GETFL)
# add via File_SetFlag, O_NONBLOCK (non-blocking)
# fcntl.fcntl(self.sp.stdout.fileno(), fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK)
# Py3 variant of the same:
os.set_blocking(self.sp.stdout.fileno(), False)
except Exception as e:
self.msg = f"Failed to start the low level btrfs send command({cmd}). Aborting. Exception: {e.__str__()}".encode(
"utf-8"
Expand All @@ -361,31 +373,39 @@ def run(self):
t0 = time.time()
while alive:
try:
if self.sp.poll() is not None:
if self.sp.poll() is not None: # poll() returns None while process is running: rc otherwise.
logger.debug(
f"Id: {self.identity}. send process finished for {self.snap_id}. "
f"rc: {self.sp.returncode}. stderr: {self.sp.stderr.read()}"
)
alive = False
fs_data = self.sp.stdout.read()
except IOError:
# Read all available data from stdout without blocking (requires bytes stream).
# https://docs.python.org/3/library/io.html#io.BufferedIOBase.read1
send_data = self.sp.stdout.read1()
if send_data is None:
logger.debug("sp.stdout empty")
continue
except IOError: # TODO: Non functional in Py3 (Py2.7 behaviour)
continue
except Exception as e:
self.msg = f"Exception occurred while reading low level btrfs send data for {self.snap_id}. Aborting.".encode(
"utf-8"
)
self.msg = (
f"Exception occurred while reading low level btrfs send data for {self.snap_id}. "
f"Aborting. Exception: {e.__str__()}"
).encode("utf-8")
if alive:
self.sp.terminate()
self.update_trail = True
self._send_recv(b"btrfs-send-unexpected-termination-error")
self._send_recv(
b"btrfs-send-unexpected-termination-error", self.msg
)
self._sys_exit(3)

self.msg = f"Failed to send fsdata to the receiver for {self.snap_id}. Aborting.".encode(
self.msg = f"Failed to send 'send_data' to the receiver for {self.snap_id}. Aborting.".encode(
"utf-8"
)
self.update_trail = True
command, message = self._send_recv(b"", fs_data)
self.total_bytes_sent += len(fs_data)
command, message = self._send_recv(b"", send_data)
self.total_bytes_sent += len(send_data)
num_msgs += 1
if num_msgs == 1000:
num_msgs = 0
Expand Down

0 comments on commit fca03f5

Please sign in to comment.