Skip to content

Commit

Permalink
Add keep flag to parallel_* to keep or forget callback's results
Browse files Browse the repository at this point in the history
  • Loading branch information
erick committed Nov 29, 2024
1 parent 0e9a8e8 commit 0af523e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
Empty file added __init__.py
Empty file.
14 changes: 11 additions & 3 deletions src/minibone/parallel_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,30 @@ async def aioget(self, uid: str, timeout=-1) -> object | None:

return None

def queue(self, callback, cmd: str = "UID", **kwargs) -> str:
def queue(self, callback, cmd: str = "UID", keep: bool = True, **kwargs) -> str:
"""Queue a task and return an UID to get the result later
Arguments
---------
callback: object A callable object
cmd: str Any command to prefix the UID to return
keep: bool Set to True to keep the callback's result in memory
Set to False to forget it. Usefull for callbacks that returns nothing
kwargs: dict a dick with key/value parameters
Notes
-----
If keep is set, do not forget to call get(uid) to free memory
"""
assert callable(callable)
assert isinstance(keep, bool)
assert not kwargs or isinstance(kwargs, dict)

if not kwargs:
kwargs = dict()

uid = self._uid(cmd)
item = {"uid": uid, "callback": callback, "kwargs": kwargs}
item = {"uid": uid, "callback": callback, "keep": keep, "kwargs": kwargs}
self._queue.put(item)

self._logger.debug("Task %s queued", uid)
Expand All @@ -182,7 +189,8 @@ def on_process(self):
resp = self._pool.apply_async(func=item["callback"], kwds=item["kwargs"])

self.lock.acquire()
self._processing[item["uid"]] = resp
if item["keep"]:
self._processing[item["uid"]] = resp
self.lock.release()

self._queue.task_done()
Expand Down
16 changes: 12 additions & 4 deletions src/minibone/parallel_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,30 @@ async def aioget(self, uid: str, timeout=-1) -> object | None:

return None

def queue(self, callback, cmd: str = "UID", **kwargs) -> str:
def queue(self, callback, cmd: str = "UID", keep: bool = True, **kwargs) -> str:
"""Queue a task and return an UID to get the result later
Arguments
---------
callback: object A callable object
cmd: str Any command to prefix the UID to return
keep: bool Set to True to keep the callback's result in memory
Set to False to forget it. Usefull for callbacks that returns nothing
kwargs: dict a dick with key/value parameters
Notes
-----
If keep is set, do not forget to call get(uid) to free memory
"""
assert callable(callable)
assert isinstance(keep, bool)
assert not kwargs or isinstance(kwargs, dict)

if not kwargs:
kwargs = dict()

uid = self._uid(cmd)
item = {"uid": uid, "callback": callback, "kwargs": kwargs}
item = {"uid": uid, "callback": callback, "keep": keep, "kwargs": kwargs}

self.lock.acquire()
if self.type_queue == TypeQueue.LIFO:
Expand All @@ -179,13 +186,14 @@ def queue(self, callback, cmd: str = "UID", **kwargs) -> str:
self._logger.debug("Task %s queued", uid)
return uid

def _process_task(self, uid, callback, kwargs):
def _process_task(self, uid, callback, keep, kwargs):
"""Create a thread to run a task"""
data = callback(**kwargs)

self.lock.acquire()
self._current_task -= 1
self._results[uid] = data
if keep:
self._results[uid] = data
self.lock.release()
self._logger.debug("Task %s done. Parallel tasks: %d", uid, self._current_task)

Expand Down
Empty file added tests/__init__.py
Empty file.

0 comments on commit 0af523e

Please sign in to comment.