Skip to content

Commit

Permalink
Fix worker halting
Browse files Browse the repository at this point in the history
- add retry for workers failing to send MeasurementsReport, so they don't hang indefintely
- add safeguard around post group run
  • Loading branch information
emiglietta committed Aug 23, 2024
1 parent e96f9a8 commit 01c89e9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cellprofiler_core/analysis/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(
# should have jobserver() call load_measurements_from_buffer() rather
# than interface() doing so. Currently, passing measurements in this
# way seems like it might be buggy:
# http://code.google.com/p/h5py/issues/detail?id=244
# https://github.com/h5py/h5py/issues/244
self.received_measurements_queue = queue.Queue(maxsize=10)

self.shared_dicts = None
Expand Down
1 change: 1 addition & 0 deletions cellprofiler_core/utilities/zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
LockStatusRequest,
Request,
)
from ._event import PollTimeoutException

NOTIFY_SOCKET_ADDR = "inproc://BoundaryNotifications"
SD_KEY_DICT = "__keydict__"
Expand Down
4 changes: 4 additions & 0 deletions cellprofiler_core/utilities/zmq/_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class PollTimeoutException(Exception):
"""Exception issued by a timeout from polling"""

pass
51 changes: 36 additions & 15 deletions cellprofiler_core/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
from ..constants.worker import the_zmq_context
from ..measurement import Measurements
from ..utilities.measurement import load_measurements_from_buffer
from ..utilities.zmq import PollTimeoutException
from ..pipeline import CancelledException
from ..preferences import get_awt_headless
from ..preferences import set_preferences_from_dict
from ..utilities.zmq.communicable.reply.upstream_exit import UpstreamExit
from ..workspace import Workspace

LOGGER = logging.getLogger(__name__)


class Worker:
"""An analysis worker processing work at a given address
Expand Down Expand Up @@ -124,6 +127,7 @@ def run(self):
)
t0 = time.time()
self.work_socket = the_zmq_context.socket(zmq.REQ)
self.work_socket.set_hwm(2000)
self.work_socket.connect(self.work_request_address)
# fetch a job
the_request = Work(self.current_analysis_id)
Expand Down Expand Up @@ -304,26 +308,40 @@ def do_job(self, job):
return

if worker_runs_post_group:
last_workspace.interaction_handler = self.interaction_handler
last_workspace.cancel_handler = self.cancel_handler
last_workspace.post_group_display_handler = (
self.post_group_display_handler
)
# There might be an exception in this call, but it will be
# handled elsewhere, and there's nothing we can do for it
# here.
current_pipeline.post_group(
last_workspace, current_measurements.get_grouping_keys()
)
del last_workspace
if not last_workspace is None:
last_workspace.interaction_handler = self.interaction_handler
last_workspace.cancel_handler = self.cancel_handler
last_workspace.post_group_display_handler = (
self.post_group_display_handler
)
# There might be an exception in this call, but it will be
# handled elsewhere, and there's nothing we can do for it
# here.
current_pipeline.post_group(
last_workspace, current_measurements.get_grouping_keys()
)
del last_workspace
else:
LOGGER.error("No workspace from last image set, cannot run post group")

# send measurements back to server
req = MeasurementsReport(
self.current_analysis_id,
buf=current_measurements.file_contents(),
image_set_numbers=image_set_numbers,
)
rep = self.send(req)

while True:
try:
rep = self.send(req, timeout=4000)
break
except PollTimeoutException:
LOGGER.info(f"Worker sending MeasurementsReport halted, retrying for job {str(job.image_set_numbers)}")
self.work_socket.close(linger=0)
self.work_socket = the_zmq_context.socket(zmq.REQ)
self.work_socket.set_hwm(2000)
self.work_socket.connect(self.work_request_address)
continue

except CancelledException:
# Main thread received shutdown signal
Expand Down Expand Up @@ -389,7 +407,7 @@ def omero_login_handler(self):
rep = self.send(req)
use_omero_credentials(rep.credentials)

def send(self, req, work_socket=None):
def send(self, req, work_socket=None, timeout=None):
"""Send a request and receive a reply
req - request to send
Expand All @@ -410,7 +428,10 @@ def send(self, req, work_socket=None):
req.send_only(work_socket)
response = None
while response is None:
for socket, state in poller.poll():
poll_res = poller.poll(timeout)
if len(poll_res) == 0:
raise PollTimeoutException
for socket, state in poll_res:
if socket == self.notify_socket and state == zmq.POLLIN:
notify_msg = self.notify_socket.recv()
if notify_msg == NOTIFY_STOP:
Expand Down

0 comments on commit 01c89e9

Please sign in to comment.