Skip to content

Commit

Permalink
V1.9.9 (#208)
Browse files Browse the repository at this point in the history
* improves: win_32 improve

* feature: add worker queue
  • Loading branch information
jlsneto authored Apr 24, 2024
1 parent b7982cf commit 9705d90
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cereja/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from . import experimental
from ._requests import request

VERSION = "1.9.8.final.0"
VERSION = "1.9.9.final.0"
__version__ = get_version_pep440_compliant(VERSION)


Expand Down
177 changes: 154 additions & 23 deletions cereja/concurrently/process.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
import abc
import os
import queue
import threading
from .. import Progress
import time

from ..utils import decorators

from .. import Progress, console

__all__ = ["MultiProcess"]


class MultiProcess:
class _IMultiProcess(abc.ABC):
@abc.abstractmethod
def _store_response(self, response):
return NotImplementedError

@abc.abstractmethod
def _get_response(self):
return NotImplementedError


class MultiProcess(_IMultiProcess):
"""
MultiProcess is a utility class designed to execute a given function in parallel using multiple threads.
Expand All @@ -16,7 +33,7 @@ class MultiProcess:
_terminate (bool): A flag indicating if the thread execution should be terminated prematurely.
"""

def __init__(self, max_threads: int):
def __init__(self, max_threads: int, on_result=None):
"""
Initializes a new instance of the MultiProcess class.
Expand All @@ -29,8 +46,43 @@ def __init__(self, max_threads: int):
self._thread_available = threading.Condition(self._lock)
self._terminate = False
self._results = []
self._exception_err = None
self._on_result = on_result

def _create_task(self, function, value, indx, *args, **kwargs):
self.wait_for_available_thread()
if self._terminate:
self._terminate = False
raise ChildProcessError(f"Error on task item {indx - 1}: {self._exception_err}")
thread = threading.Thread(target=self._execute_function_thread, name=f'Thread-{indx}',
args=(function, value, indx, args, kwargs))
with self._lock:
self._active_threads += 1
thread.start()

def _get_response(self):
with self._thread_available:
while self._active_threads > 0:
self._thread_available.wait()
results = [val[-1] for val in sorted(self._results, key=lambda val: val[0])]
self._results = []
self._terminate = False
return results

def _store_response(self, response):
self._results.append(response)

def execute(self, function, values, *args, **kwargs) -> list:
def _process_response(self, response):
if self._on_result is not None:
self._on_result(response)
else:
self._store_response(response)

@decorators.depreciation(alternative='map')
def execute(self, function, values, *args, verbose=True, **kwargs) -> list:
return self.map(function, values, *args, verbose=verbose, **kwargs)

def map(self, function, values, *args, verbose=True, **kwargs) -> list:
"""
Execute the given function using multiple threads on the provided values.
Expand All @@ -40,27 +92,17 @@ def execute(self, function, values, *args, **kwargs) -> list:
@return: ordered each given function returns
"""
self._terminate = False
for indx, value in enumerate(
Progress.prog(values, custom_state_func=lambda: f'Threads Running: {self._active_threads}')):
self.wait_for_available_thread()
if self._terminate:
Progress.prog(values,
custom_state_func=lambda: f'Threads Running: {self._active_threads}') if verbose else values):
try:
self._create_task(function, value, indx, *args, **kwargs)
except ChildProcessError:
print("Terminating due to an exception in one of the threads. Returning processed data...")
break
thread = threading.Thread(target=self._execute_function_thread, name=f'Thread-{indx}',
args=(function, value, indx, args, kwargs))
with self._lock:
self._active_threads += 1
thread.start()
self._terminate = False
return self._get_results()

def _get_results(self):
with self._thread_available:
while self._active_threads > 0:
self._thread_available.wait()
results = [val[-1] for val in sorted(self._results, key=lambda val: val[0])]
self._results = []
return results
if self._on_result is None:
return self._get_response()

def wait_for_available_thread(self):
"""
Expand All @@ -80,11 +122,100 @@ def _execute_function_thread(self, function, value, indx, args, kwargs):
"""
try:
if not self._terminate:
self._results.append((indx, function(value, *args, **kwargs)))
self._process_response((indx, function(value, *args, **kwargs)))
except Exception as e:
print(f"Error encountered in thread: {e}")
self._terminate = True
self._exception_err = e
finally:
with self._thread_available:
self._active_threads -= 1
self._thread_available.notify()


class WorkerQueue(MultiProcess):
def __init__(self, func_task, max_threads: int = 1, max_size=-1, on_result=None, **task_kwargs):
super().__init__(max_threads, on_result=on_result)
self._q = queue.Queue(maxsize=max_size)
self._results = queue.PriorityQueue()
self._func_task = func_task
self._task_kwargs = task_kwargs
self._th_service = threading.Thread(target=self._worker, daemon=True).start()
self._indx = -1

def put(self, item, block=True, timeout=None, **task_kwargs):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self._indx += 1
self._q.put((self._indx, item, task_kwargs if len(task_kwargs) else {}), block=block, timeout=timeout)

def _store_response(self, response):
self._results.put_nowait(item=response)

def get_available_reponse(self, take_indx=False, timeout=30):
return self._get(take_indx, timeout=timeout)

def _get(self, take_indx=False, timeout=30):
assert self._on_result is None, "task results are being sent to the callback defined 'on_result'"
while (self._results.qsize() > 0 or self._active_threads > 0) or not self.is_empty:
indx, item = self._results.get(timeout=timeout)
self._results.task_done()
return indx, item if take_indx else item

def _get_response(self, take_indx=False):
assert self._on_result is None, "task results are being sent to the callback defined 'on_result'"
self._q.join()
self._indx = -1
result = []

while self._results.qsize() > 0 or self._active_threads > 0:
result.append(self._results.get(timeout=1))
self._results.task_done()
self._results.join()
return [val if take_indx else val[-1] for val in sorted(result, key=lambda val: val[0])]

def get_all_tasks_response(self, take_indx=False):
return self._get_response(take_indx)

def put_nowait(self, item, **task_kwargs):
self.put(item, block=False, **task_kwargs)

def _worker(self):
while True:
indx, item, kwargs = self._q.get()
self._create_task(self._func_task, item, indx, **kwargs)
self._q.task_done()

@property
def size(self):
return self._q.qsize()

@property
def is_empty(self):
return self._q.empty()

@property
def is_full(self):
return self._q.full()

def __enter__(self, *args, **kwargs) -> 'WorkerQueue':
self._with_context = True
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_val, Exception) and not isinstance(
exc_val, DeprecationWarning
):
console.error(
f"{os.path.basename(exc_tb.tb_frame.f_code.co_filename)}:{exc_tb.tb_lineno}: {exc_val}"
)
self._q.join()
self._with_context = False
19 changes: 14 additions & 5 deletions cereja/system/_win32.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
IsIconic = user32.IsIconic
IsZoomed = user32.IsZoomed
GetWindowDC = user32.GetWindowDC
GetDC = user32.GetDC
GetForegroundWindow = user32.GetForegroundWindow
ReleaseDC = user32.ReleaseDC
PrintWindow = user32.PrintWindow
Expand Down Expand Up @@ -185,7 +186,6 @@ def __init__(self, hwnd=None, is_async=False):
self._max_time_simule_key_press = self.MAX_TIME_SIMULE_KEY_PRESS
self._key_press_callbacks = None


@property
def is_async(self):
return self._is_async
Expand Down Expand Up @@ -614,11 +614,12 @@ def state(self):
return "Normal"

def capture_image_bmp(self, filepath=None, only_window_content=True):
if not user32.IsZoomed(self.hwnd):
ctypes.windll.user32.ShowWindow(self.hwnd, 4)
# Obtenha o DC da janela e crie um DC compatível
window_dc = GetWindowDC(self.hwnd)
window_dc = GetWindowDC(self.hwnd) if only_window_content else GetDC(self.hwnd)
mem_dc = CreateCompatibleDC(window_dc)


if only_window_content:
width, height = self.size_window_content
else:
Expand All @@ -644,7 +645,8 @@ def capture_image_bmp(self, filepath=None, only_window_content=True):
bitmap_info.bmiHeader.biCompression = BI_RGB

# Cria buffer para os dados da imagem
bitmap_data = ctypes.create_string_buffer(abs(bitmap_info.bmiHeader.biWidth * bitmap_info.bmiHeader.biHeight * 4))
bitmap_data = ctypes.create_string_buffer(
abs(bitmap_info.bmiHeader.biWidth * bitmap_info.bmiHeader.biHeight * 4))
# Obtém os dados da imagem
GetDIBits(mem_dc, screenshot, 0, height, bitmap_data, ctypes.byref(bitmap_info), DIB_RGB_COLORS)

Expand All @@ -669,7 +671,6 @@ def capture_image_bmp(self, filepath=None, only_window_content=True):

return bitmap_data.raw


# SHOW implements
def hide(self):
"""
Expand Down Expand Up @@ -719,3 +720,11 @@ def restore(self):
def show_default(self):
"""Define o estado de exibição com base no valor SW_ especificado no STARTUPINFO."""
ShowWindow(self.hwnd, 10)

def set_foreground(self):
"""Tenta trazer a janela para o primeiro plano."""
ctypes.windll.user32.SetForegroundWindow(self.hwnd)

def bring_to_top(self):
"""Move a janela para o topo do Z-Order sem necessariamente ativá-la."""
ctypes.windll.user32.BringWindowToTop(self.hwnd)
26 changes: 1 addition & 25 deletions cereja/utils/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import ctypes
import gc
import math
import threading
import time
from collections import OrderedDict, defaultdict
from importlib import import_module
Expand All @@ -35,7 +34,7 @@
import inspect
from pprint import PrettyPrinter

from .decorators import time_exec, depreciation
from .decorators import depreciation
# Needed init configs
from ..config.cj_types import ClassType, FunctionType, Number
from itertools import combinations as itertools_combinations
Expand Down Expand Up @@ -85,7 +84,6 @@
"dict_filter_value",
"get_zero_mask",
"get_batch_strides",
"Thread",
"prune_values",
"split_sequence",
"has_length",
Expand Down Expand Up @@ -114,28 +112,6 @@ def _format(self, object, *args):
super()._format(object, *args)


# TODO: Remove Thread class, check cereja.concurrently.process.MultiProcess implementation
class Thread(threading.Thread):
def __init__(
self, target, args=None, kwargs=None, name=None, daemon=None, callback=None
):
# while threading.active_count() > os.cpu_count() * 2:
# time.sleep(0.1)
super().__init__(daemon=daemon, name=name)
if args is None:
args = ()
if kwargs is None:
kwargs = {}
self._func = target
self._args = args
self._kwargs = kwargs
self._callback = callback

def run(self):
res = self._func(*self._args, **self._kwargs)
if self._callback:
self._callback(res)


def is_indexable(v):
return hasattr(v, "__getitem__")
Expand Down

0 comments on commit 9705d90

Please sign in to comment.