Skip to content

Commit

Permalink
(t) replication spawn error rockstor#2766
Browse files Browse the repository at this point in the history
Update replication code re Py3.*
- Modernise previously missed replication imports re Py3.*
- Force bytes format for replication messages and commands.
Required as zmq needs bytes format for these.
- Minor modification re Pythnon 3 behaviour re dict.keys(),
we replied on an implicit Python 2 behaviour.
- Move to Fstrings.
- Parameter/return type hinting.
- Removed an unused local variable.
- black format update
  • Loading branch information
phillxnet committed Dec 21, 2023
1 parent b870fa2 commit bd9ddfd
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 256 deletions.
16 changes: 9 additions & 7 deletions src/rockstor/scripts/scheduled_tasks/send_replica.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Copyright (c) 2012-2020 RockStor, Inc. <http://rockstor.com>
Copyright (c) 2012-2023 RockStor, Inc. <https://rockstor.com>
This file is part of RockStor.
RockStor is free software; you can redistribute it and/or modify
Expand All @@ -13,7 +13,7 @@
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""

import sys
Expand All @@ -28,25 +28,27 @@ def main():
rid = int(sys.argv[1])
ctx = zmq.Context()
poll = zmq.Poller()
num_tries = 12
num_tries = 3
while True:
req = ctx.socket(zmq.DEALER)
poll.register(req, zmq.POLLIN)
req.connect("ipc://%s" % settings.REPLICATION.get("ipc_socket"))
req.send_multipart(["new-send", b"%d" % rid])
ipc_socket = settings.REPLICATION.get("ipc_socket")
req.connect(f"ipc://{ipc_socket}")
req.send_multipart([b"new-send", f"{rid}".encode("utf-8")])

socks = dict(poll.poll(5000))
if socks.get(req) == zmq.POLLIN:
rcommand, reply = req.recv_multipart()
if rcommand == "SUCCESS":
print(f"rcommand={rcommand}, reply={reply}")
if rcommand == b"SUCCESS":
print(reply)
break
ctx.destroy(linger=0)
sys.exit(reply)
num_tries -= 1
print(
"No response from Replication service. Number of retry "
"attempts left: %d" % num_tries
f"attempts left: {num_tries}"
)
if num_tries == 0:
ctx.destroy(linger=0)
Expand Down
54 changes: 28 additions & 26 deletions src/rockstor/smart_manager/replication/listener_broker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Copyright (c) 2012-2020 RockStor, Inc. <http://rockstor.com>
Copyright (c) 2012-2023 RockStor, Inc. <https://rockstor.com>
This file is part of RockStor.
RockStor is free software; you can redistribute it and/or modify
Expand All @@ -13,9 +13,8 @@
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""

from multiprocessing import Process
import zmq
import os
Expand All @@ -24,9 +23,9 @@
from storageadmin.models import NetworkConnection, Appliance
from smart_manager.models import ReplicaTrail, ReplicaShare, Replica, Service
from django.conf import settings
from sender import Sender
from receiver import Receiver
from util import ReplicationMixin
from smart_manager.replication.sender import Sender
from smart_manager.replication.receiver import Receiver
from smart_manager.replication.util import ReplicationMixin
from cli import APIWrapper
import logging

Expand All @@ -35,6 +34,8 @@

class ReplicaScheduler(ReplicationMixin, Process):
def __init__(self):
self.law = None
self.local_receivers = None
self.ppid = os.getpid()
self.senders = {} # Active Sender(outgoing) process map.
self.receivers = {} # Active Receiver process map.
Expand All @@ -46,24 +47,25 @@ def __init__(self):

def _prune_workers(self, workers):
for wd in workers:
for w in wd.keys():
for w in list(wd.keys()):
if wd[w].exitcode is not None:
del wd[w]
logger.debug("deleted worker: %s" % w)
logger.debug(f"deleted worker: {w}")
return workers

def _prune_senders(self):
for s in self.senders.keys():
for s in list(self.senders.keys()):
ecode = self.senders[s].exitcode
if ecode is not None:
del self.senders[s]
logger.debug("Sender(%s) exited. exitcode: %s" % (s, ecode))
logger.debug(f"Sender({s}) exited. exitcode: {ecode}")
if len(self.senders) > 0:
logger.debug("Active Senders: %s" % self.senders.keys())
logger.debug(f"Active Senders: {self.senders.keys()}")

def _delete_receivers(self):
active_msgs = []
for r in self.local_receivers.keys():
# We modify during iteration, and so require explicit list.
for r in list(self.local_receivers.keys()):
msg_count = self.remote_senders.get(r, 0)
ecode = self.local_receivers[r].exitcode
if ecode is not None:
Expand Down Expand Up @@ -216,7 +218,7 @@ def run(self):

ctx = zmq.Context()
frontend = ctx.socket(zmq.ROUTER)
frontend.set_hwm(10)
frontend.set_hwm(value=10)
frontend.bind("tcp://%s:%d" % (self.listener_interface, self.listener_port))

backend = ctx.socket(zmq.ROUTER)
Expand Down Expand Up @@ -248,7 +250,7 @@ def run(self):
"Active Receiver: %s. Messages processed:"
"%d" % (rs, count)
)
if command == "sender-ready":
if command == b"sender-ready":
logger.debug("initial greeting from %s" % address)
# Start a new receiver and send the appropriate response
try:
Expand All @@ -275,7 +277,7 @@ def run(self):
# the active receiver and factor into it's
# retry/robust logic. But that is for later.
frontend.send_multipart(
[address, "receiver-init-error", msg]
[address, b"receiver-init-error", msg.encode("utf-8")]
)
if start_nr:
nr = Receiver(address, msg)
Expand All @@ -290,17 +292,17 @@ def run(self):
"new receiver for %s: %s" % (address, e.__str__())
)
logger.error(msg)
frontend.send_multipart([address, "receiver-init-error", msg])
frontend.send_multipart([address, b"receiver-init-error", msg.encode("utf-8")])
else:
# do we hit hwm? is the dealer still connected?
backend.send_multipart([address, command, msg])
backend.send_multipart([address, command, msg.encode("utf-8")])

elif backend in socks and socks[backend] == zmq.POLLIN:
address, command, msg = backend.recv_multipart()
if command == "new-send":
if command == b"new-send":
rid = int(msg)
logger.debug("new-send request received for %d" % rid)
rcommand = "ERROR"
rcommand = b"ERROR"
try:
replica = Replica.objects.get(id=rid)
if replica.enabled:
Expand All @@ -309,7 +311,7 @@ def run(self):
"A new Sender started successfully for "
"Replication Task(%d)." % rid
)
rcommand = "SUCCESS"
rcommand = b"SUCCESS"
else:
msg = (
"Failed to start a new Sender for "
Expand All @@ -323,18 +325,18 @@ def run(self):
)
logger.error(msg)
finally:
backend.send_multipart([address, rcommand, str(msg)])
backend.send_multipart([address, rcommand, msg.encode("utf-8")])
elif address in self.remote_senders:
if command in (
"receiver-ready",
"receiver-error",
"btrfs-recv-finished",
b"receiver-ready",
b"receiver-error",
b"btrfs-recv-finished",
): # noqa E501
logger.debug("Identitiy: %s command: %s" % (address, command))
backend.send_multipart([address, b"ACK", ""])
backend.send_multipart([address, b"ACK", b""])
# a new receiver has started. reply to the sender that
# must be waiting
frontend.send_multipart([address, command, msg])
frontend.send_multipart([address, command, msg.encode("utf-8")])

else:
iterations -= 1
Expand Down
Loading

0 comments on commit bd9ddfd

Please sign in to comment.