Skip to content

Commit

Permalink
feature: add worker queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jlsneto committed Apr 24, 2024
1 parent 130df35 commit 7c4dcb0
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 48 deletions.
177 changes: 154 additions & 23 deletions cereja/concurrently/process.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
import abc
import os
import queue
import threading
from .. import Progress
import time

from ..utils import decorators

from .. import Progress, console

__all__ = ["MultiProcess"]


class MultiProcess:
class _IMultiProcess(abc.ABC):
@abc.abstractmethod
def _store_response(self, response):
return NotImplementedError

@abc.abstractmethod
def _get_response(self):
return NotImplementedError


class MultiProcess(_IMultiProcess):
"""
MultiProcess is a utility class designed to execute a given function in parallel using multiple threads.
Expand All @@ -16,7 +33,7 @@ class MultiProcess:
_terminate (bool): A flag indicating if the thread execution should be terminated prematurely.
"""

def __init__(self, max_threads: int):
def __init__(self, max_threads: int, on_result=None):
"""
Initializes a new instance of the MultiProcess class.
Expand All @@ -29,8 +46,43 @@ def __init__(self, max_threads: int):
self._thread_available = threading.Condition(self._lock)
self._terminate = False
self._results = []
self._exception_err = None
self._on_result = on_result

def _create_task(self, function, value, indx, *args, **kwargs):
self.wait_for_available_thread()
if self._terminate:
self._terminate = False
raise ChildProcessError(f"Error on task item {indx - 1}: {self._exception_err}")
thread = threading.Thread(target=self._execute_function_thread, name=f'Thread-{indx}',
args=(function, value, indx, args, kwargs))
with self._lock:
self._active_threads += 1
thread.start()

def _get_response(self):
with self._thread_available:
while self._active_threads > 0:
self._thread_available.wait()
results = [val[-1] for val in sorted(self._results, key=lambda val: val[0])]
self._results = []
self._terminate = False
return results

def _store_response(self, response):
self._results.append(response)

def execute(self, function, values, *args, **kwargs) -> list:
def _process_response(self, response):
if self._on_result is not None:
self._on_result(response)
else:
self._store_response(response)

@decorators.depreciation(alternative='map')
def execute(self, function, values, *args, verbose=True, **kwargs) -> list:
return self.map(function, values, *args, verbose=verbose, **kwargs)

def map(self, function, values, *args, verbose=True, **kwargs) -> list:
"""
Execute the given function using multiple threads on the provided values.
Expand All @@ -40,27 +92,17 @@ def execute(self, function, values, *args, **kwargs) -> list:
@return: ordered each given function returns
"""
self._terminate = False
for indx, value in enumerate(
Progress.prog(values, custom_state_func=lambda: f'Threads Running: {self._active_threads}')):
self.wait_for_available_thread()
if self._terminate:
Progress.prog(values,
custom_state_func=lambda: f'Threads Running: {self._active_threads}') if verbose else values):
try:
self._create_task(function, value, indx, *args, **kwargs)
except ChildProcessError:
print("Terminating due to an exception in one of the threads. Returning processed data...")
break
thread = threading.Thread(target=self._execute_function_thread, name=f'Thread-{indx}',
args=(function, value, indx, args, kwargs))
with self._lock:
self._active_threads += 1
thread.start()
self._terminate = False
return self._get_results()

def _get_results(self):
with self._thread_available:
while self._active_threads > 0:
self._thread_available.wait()
results = [val[-1] for val in sorted(self._results, key=lambda val: val[0])]
self._results = []
return results
if self._on_result is None:
return self._get_response()

def wait_for_available_thread(self):
"""
Expand All @@ -80,11 +122,100 @@ def _execute_function_thread(self, function, value, indx, args, kwargs):
"""
try:
if not self._terminate:
self._results.append((indx, function(value, *args, **kwargs)))
self._process_response((indx, function(value, *args, **kwargs)))
except Exception as e:
print(f"Error encountered in thread: {e}")
self._terminate = True
self._exception_err = e
finally:
with self._thread_available:
self._active_threads -= 1
self._thread_available.notify()


class WorkerQueue(MultiProcess):
def __init__(self, func_task, max_threads: int = 1, max_size=-1, on_result=None, **task_kwargs):
super().__init__(max_threads, on_result=on_result)
self._q = queue.Queue(maxsize=max_size)
self._results = queue.PriorityQueue()
self._func_task = func_task
self._task_kwargs = task_kwargs
self._th_service = threading.Thread(target=self._worker, daemon=True).start()
self._indx = -1

def put(self, item, block=True, timeout=None, **task_kwargs):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self._indx += 1
self._q.put((self._indx, item, task_kwargs if len(task_kwargs) else {}), block=block, timeout=timeout)

def _store_response(self, response):
self._results.put_nowait(item=response)

def get_available_reponse(self, take_indx=False, timeout=30):
return self._get(take_indx, timeout=timeout)

def _get(self, take_indx=False, timeout=30):
assert self._on_result is None, "task results are being sent to the callback defined 'on_result'"
while (self._results.qsize() > 0 or self._active_threads > 0) or not self.is_empty:
indx, item = self._results.get(timeout=timeout)
self._results.task_done()
return indx, item if take_indx else item

def _get_response(self, take_indx=False):
assert self._on_result is None, "task results are being sent to the callback defined 'on_result'"
self._q.join()
self._indx = -1
result = []

while self._results.qsize() > 0 or self._active_threads > 0:
result.append(self._results.get(timeout=1))
self._results.task_done()
self._results.join()
return [val if take_indx else val[-1] for val in sorted(result, key=lambda val: val[0])]

def get_all_tasks_response(self, take_indx=False):
return self._get_response(take_indx)

def put_nowait(self, item, **task_kwargs):
self.put(item, block=False, **task_kwargs)

def _worker(self):
while True:
indx, item, kwargs = self._q.get()
self._create_task(self._func_task, item, indx, **kwargs)
self._q.task_done()

@property
def size(self):
return self._q.qsize()

@property
def is_empty(self):
return self._q.empty()

@property
def is_full(self):
return self._q.full()

def __enter__(self, *args, **kwargs) -> 'WorkerQueue':
self._with_context = True
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_val, Exception) and not isinstance(
exc_val, DeprecationWarning
):
console.error(
f"{os.path.basename(exc_tb.tb_frame.f_code.co_filename)}:{exc_tb.tb_lineno}: {exc_val}"
)
self._q.join()
self._with_context = False
2 changes: 2 additions & 0 deletions cereja/system/_win32.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,8 @@ def state(self):
return "Normal"

def capture_image_bmp(self, filepath=None, only_window_content=True):
if not user32.IsZoomed(self.hwnd):
ctypes.windll.user32.ShowWindow(self.hwnd, 4)
# Obtenha o DC da janela e crie um DC compatível
window_dc = GetWindowDC(self.hwnd) if only_window_content else GetDC(self.hwnd)
mem_dc = CreateCompatibleDC(window_dc)
Expand Down
26 changes: 1 addition & 25 deletions cereja/utils/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import ctypes
import gc
import math
import threading
import time
from collections import OrderedDict, defaultdict
from importlib import import_module
Expand All @@ -35,7 +34,7 @@
import inspect
from pprint import PrettyPrinter

from .decorators import time_exec, depreciation
from .decorators import depreciation
# Needed init configs
from ..config.cj_types import ClassType, FunctionType, Number
from itertools import combinations as itertools_combinations
Expand Down Expand Up @@ -85,7 +84,6 @@
"dict_filter_value",
"get_zero_mask",
"get_batch_strides",
"Thread",
"prune_values",
"split_sequence",
"has_length",
Expand Down Expand Up @@ -114,28 +112,6 @@ def _format(self, object, *args):
super()._format(object, *args)


# TODO: Remove Thread class, check cereja.concurrently.process.MultiProcess implementation
class Thread(threading.Thread):
def __init__(
self, target, args=None, kwargs=None, name=None, daemon=None, callback=None
):
# while threading.active_count() > os.cpu_count() * 2:
# time.sleep(0.1)
super().__init__(daemon=daemon, name=name)
if args is None:
args = ()
if kwargs is None:
kwargs = {}
self._func = target
self._args = args
self._kwargs = kwargs
self._callback = callback

def run(self):
res = self._func(*self._args, **self._kwargs)
if self._callback:
self._callback(res)


def is_indexable(v):
return hasattr(v, "__getitem__")
Expand Down

0 comments on commit 7c4dcb0

Please sign in to comment.