Skip to content

Commit

Permalink
Making sure the MultiprocessingParallelManager cleans up when done (w…
Browse files Browse the repository at this point in the history
…atertap-org/watertap#1211)

* Try adding explicit timeout where possible/easy

* Try one more timeout

* Add quick n dirty way to shut down multiprocessing worker processes

* Try using EAFP instead of checking Queue.empty()

* Make timeout configurable with default value

* Refactor to use logging

* Reorder imports

* remove timeout from concurrent.futures.wait

* trying Pool implementation

* Revert "trying Pool implementation"

This reverts commit 5354b1794ae1195b1f9222f723e141908d7cfc76.

* try using kill

* pylint

---------

Co-authored-by: Ludovico Bianchi <lbianchi@lbl.gov>
  • Loading branch information
bknueven and lbianchi-lbl authored Nov 21, 2023
1 parent 00c232b commit 3555a85
Showing 1 changed file with 32 additions and 11 deletions.
43 changes: 32 additions & 11 deletions src/parameter_sweep/parallel/multiprocessing_parallel_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@
# "https://github.com/watertap-org/watertap/"
#################################################################################

import numpy
import logging
import multiprocessing
from queue import Empty as EmptyQueue

import numpy

from watertap.tools.parallel.results import LocalResults
from watertap.tools.parallel.parallel_manager import (
parallelActor,
ParallelManager,
)
import multiprocessing


_logger = logging.getLogger(__name__)


class MultiprocessingParallelManager(ParallelManager):
def __init__(self, number_of_subprocesses=1, **kwargs):
def __init__(
self,
number_of_subprocesses=1,
**kwargs,
):
self.max_number_of_subprocesses = number_of_subprocesses

# this will be updated when child processes are kicked off
Expand Down Expand Up @@ -112,14 +121,25 @@ def gather(self):
results = []
# collect result from the actors
while len(results) < self.expected_samples:
if self.return_queue.empty() == False:
try:
i, values, result = self.return_queue.get()

results.append(LocalResults(i, values, result))
except EmptyQueue:
break
self._shut_down()
# sort the results by the process number to keep a deterministic ordering
results.sort(key=lambda result: result.process_number)
return results

def _shut_down(self):
n_shut_down = 0
for process in self.actors:
_logger.debug("Attempting to shut down %s", process)
process.kill()
n_shut_down += 1
self.actors.clear()
_logger.debug("Shut down %d processes", n_shut_down)

def results_from_local_tree(self, results):
return results

Expand All @@ -135,9 +155,10 @@ def multiProcessingActor(
):
actor = parallelActor(do_build, do_build_kwargs, do_execute, local_parameters)
while True:
if queue.empty():
break
else:
i, local_parameters = queue.get()
result = actor.execute(local_parameters)
return_queue.put([i, local_parameters, result])
try:
msg = queue.get()
except EmptyQueue:
return
i, local_parameters = msg
result = actor.execute(local_parameters)
return_queue.put([i, local_parameters, result])

0 comments on commit 3555a85

Please sign in to comment.