diff --git a/cereja/__init__.py b/cereja/__init__.py index 47888fe..832b366 100644 --- a/cereja/__init__.py +++ b/cereja/__init__.py @@ -47,7 +47,7 @@ from . import experimental from ._requests import request -VERSION = "1.9.8.final.0" +VERSION = "1.9.9.final.0" __version__ = get_version_pep440_compliant(VERSION) diff --git a/cereja/concurrently/process.py b/cereja/concurrently/process.py index d964112..c1e2cc0 100644 --- a/cereja/concurrently/process.py +++ b/cereja/concurrently/process.py @@ -1,10 +1,27 @@ +import abc +import os +import queue import threading -from .. import Progress +import time + +from ..utils import decorators + +from .. import Progress, console __all__ = ["MultiProcess"] -class MultiProcess: +class _IMultiProcess(abc.ABC): + @abc.abstractmethod + def _store_response(self, response): + return NotImplementedError + + @abc.abstractmethod + def _get_response(self): + return NotImplementedError + + +class MultiProcess(_IMultiProcess): """ MultiProcess is a utility class designed to execute a given function in parallel using multiple threads. @@ -16,7 +33,7 @@ class MultiProcess: _terminate (bool): A flag indicating if the thread execution should be terminated prematurely. """ - def __init__(self, max_threads: int): + def __init__(self, max_threads: int, on_result=None): """ Initializes a new instance of the MultiProcess class. @@ -29,8 +46,43 @@ def __init__(self, max_threads: int): self._thread_available = threading.Condition(self._lock) self._terminate = False self._results = [] + self._exception_err = None + self._on_result = on_result + + def _create_task(self, function, value, indx, *args, **kwargs): + self.wait_for_available_thread() + if self._terminate: + self._terminate = False + raise ChildProcessError(f"Error on task item {indx - 1}: {self._exception_err}") + thread = threading.Thread(target=self._execute_function_thread, name=f'Thread-{indx}', + args=(function, value, indx, args, kwargs)) + with self._lock: + self._active_threads += 1 + thread.start() + + def _get_response(self): + with self._thread_available: + while self._active_threads > 0: + self._thread_available.wait() + results = [val[-1] for val in sorted(self._results, key=lambda val: val[0])] + self._results = [] + self._terminate = False + return results + + def _store_response(self, response): + self._results.append(response) - def execute(self, function, values, *args, **kwargs) -> list: + def _process_response(self, response): + if self._on_result is not None: + self._on_result(response) + else: + self._store_response(response) + + @decorators.depreciation(alternative='map') + def execute(self, function, values, *args, verbose=True, **kwargs) -> list: + return self.map(function, values, *args, verbose=verbose, **kwargs) + + def map(self, function, values, *args, verbose=True, **kwargs) -> list: """ Execute the given function using multiple threads on the provided values. @@ -40,27 +92,17 @@ def execute(self, function, values, *args, **kwargs) -> list: @return: ordered each given function returns """ + self._terminate = False for indx, value in enumerate( - Progress.prog(values, custom_state_func=lambda: f'Threads Running: {self._active_threads}')): - self.wait_for_available_thread() - if self._terminate: + Progress.prog(values, + custom_state_func=lambda: f'Threads Running: {self._active_threads}') if verbose else values): + try: + self._create_task(function, value, indx, *args, **kwargs) + except ChildProcessError: print("Terminating due to an exception in one of the threads. Returning processed data...") break - thread = threading.Thread(target=self._execute_function_thread, name=f'Thread-{indx}', - args=(function, value, indx, args, kwargs)) - with self._lock: - self._active_threads += 1 - thread.start() - self._terminate = False - return self._get_results() - - def _get_results(self): - with self._thread_available: - while self._active_threads > 0: - self._thread_available.wait() - results = [val[-1] for val in sorted(self._results, key=lambda val: val[0])] - self._results = [] - return results + if self._on_result is None: + return self._get_response() def wait_for_available_thread(self): """ @@ -80,11 +122,100 @@ def _execute_function_thread(self, function, value, indx, args, kwargs): """ try: if not self._terminate: - self._results.append((indx, function(value, *args, **kwargs))) + self._process_response((indx, function(value, *args, **kwargs))) except Exception as e: print(f"Error encountered in thread: {e}") self._terminate = True + self._exception_err = e finally: with self._thread_available: self._active_threads -= 1 self._thread_available.notify() + + +class WorkerQueue(MultiProcess): + def __init__(self, func_task, max_threads: int = 1, max_size=-1, on_result=None, **task_kwargs): + super().__init__(max_threads, on_result=on_result) + self._q = queue.Queue(maxsize=max_size) + self._results = queue.PriorityQueue() + self._func_task = func_task + self._task_kwargs = task_kwargs + self._th_service = threading.Thread(target=self._worker, daemon=True).start() + self._indx = -1 + + def put(self, item, block=True, timeout=None, **task_kwargs): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + """ + self._indx += 1 + self._q.put((self._indx, item, task_kwargs if len(task_kwargs) else {}), block=block, timeout=timeout) + + def _store_response(self, response): + self._results.put_nowait(item=response) + + def get_available_reponse(self, take_indx=False, timeout=30): + return self._get(take_indx, timeout=timeout) + + def _get(self, take_indx=False, timeout=30): + assert self._on_result is None, "task results are being sent to the callback defined 'on_result'" + while (self._results.qsize() > 0 or self._active_threads > 0) or not self.is_empty: + indx, item = self._results.get(timeout=timeout) + self._results.task_done() + return indx, item if take_indx else item + + def _get_response(self, take_indx=False): + assert self._on_result is None, "task results are being sent to the callback defined 'on_result'" + self._q.join() + self._indx = -1 + result = [] + + while self._results.qsize() > 0 or self._active_threads > 0: + result.append(self._results.get(timeout=1)) + self._results.task_done() + self._results.join() + return [val if take_indx else val[-1] for val in sorted(result, key=lambda val: val[0])] + + def get_all_tasks_response(self, take_indx=False): + return self._get_response(take_indx) + + def put_nowait(self, item, **task_kwargs): + self.put(item, block=False, **task_kwargs) + + def _worker(self): + while True: + indx, item, kwargs = self._q.get() + self._create_task(self._func_task, item, indx, **kwargs) + self._q.task_done() + + @property + def size(self): + return self._q.qsize() + + @property + def is_empty(self): + return self._q.empty() + + @property + def is_full(self): + return self._q.full() + + def __enter__(self, *args, **kwargs) -> 'WorkerQueue': + self._with_context = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if isinstance(exc_val, Exception) and not isinstance( + exc_val, DeprecationWarning + ): + console.error( + f"{os.path.basename(exc_tb.tb_frame.f_code.co_filename)}:{exc_tb.tb_lineno}: {exc_val}" + ) + self._q.join() + self._with_context = False diff --git a/cereja/system/_win32.py b/cereja/system/_win32.py index 0c8023d..846ba08 100644 --- a/cereja/system/_win32.py +++ b/cereja/system/_win32.py @@ -55,6 +55,7 @@ IsIconic = user32.IsIconic IsZoomed = user32.IsZoomed GetWindowDC = user32.GetWindowDC +GetDC = user32.GetDC GetForegroundWindow = user32.GetForegroundWindow ReleaseDC = user32.ReleaseDC PrintWindow = user32.PrintWindow @@ -185,7 +186,6 @@ def __init__(self, hwnd=None, is_async=False): self._max_time_simule_key_press = self.MAX_TIME_SIMULE_KEY_PRESS self._key_press_callbacks = None - @property def is_async(self): return self._is_async @@ -614,11 +614,12 @@ def state(self): return "Normal" def capture_image_bmp(self, filepath=None, only_window_content=True): + if not user32.IsZoomed(self.hwnd): + ctypes.windll.user32.ShowWindow(self.hwnd, 4) # Obtenha o DC da janela e crie um DC compatível - window_dc = GetWindowDC(self.hwnd) + window_dc = GetWindowDC(self.hwnd) if only_window_content else GetDC(self.hwnd) mem_dc = CreateCompatibleDC(window_dc) - if only_window_content: width, height = self.size_window_content else: @@ -644,7 +645,8 @@ def capture_image_bmp(self, filepath=None, only_window_content=True): bitmap_info.bmiHeader.biCompression = BI_RGB # Cria buffer para os dados da imagem - bitmap_data = ctypes.create_string_buffer(abs(bitmap_info.bmiHeader.biWidth * bitmap_info.bmiHeader.biHeight * 4)) + bitmap_data = ctypes.create_string_buffer( + abs(bitmap_info.bmiHeader.biWidth * bitmap_info.bmiHeader.biHeight * 4)) # Obtém os dados da imagem GetDIBits(mem_dc, screenshot, 0, height, bitmap_data, ctypes.byref(bitmap_info), DIB_RGB_COLORS) @@ -669,7 +671,6 @@ def capture_image_bmp(self, filepath=None, only_window_content=True): return bitmap_data.raw - # SHOW implements def hide(self): """ @@ -719,3 +720,11 @@ def restore(self): def show_default(self): """Define o estado de exibição com base no valor SW_ especificado no STARTUPINFO.""" ShowWindow(self.hwnd, 10) + + def set_foreground(self): + """Tenta trazer a janela para o primeiro plano.""" + ctypes.windll.user32.SetForegroundWindow(self.hwnd) + + def bring_to_top(self): + """Move a janela para o topo do Z-Order sem necessariamente ativá-la.""" + ctypes.windll.user32.BringWindowToTop(self.hwnd) diff --git a/cereja/utils/_utils.py b/cereja/utils/_utils.py index 49558d1..e942f21 100644 --- a/cereja/utils/_utils.py +++ b/cereja/utils/_utils.py @@ -20,7 +20,6 @@ import ctypes import gc import math -import threading import time from collections import OrderedDict, defaultdict from importlib import import_module @@ -35,7 +34,7 @@ import inspect from pprint import PrettyPrinter -from .decorators import time_exec, depreciation +from .decorators import depreciation # Needed init configs from ..config.cj_types import ClassType, FunctionType, Number from itertools import combinations as itertools_combinations @@ -85,7 +84,6 @@ "dict_filter_value", "get_zero_mask", "get_batch_strides", - "Thread", "prune_values", "split_sequence", "has_length", @@ -114,28 +112,6 @@ def _format(self, object, *args): super()._format(object, *args) -# TODO: Remove Thread class, check cereja.concurrently.process.MultiProcess implementation -class Thread(threading.Thread): - def __init__( - self, target, args=None, kwargs=None, name=None, daemon=None, callback=None - ): - # while threading.active_count() > os.cpu_count() * 2: - # time.sleep(0.1) - super().__init__(daemon=daemon, name=name) - if args is None: - args = () - if kwargs is None: - kwargs = {} - self._func = target - self._args = args - self._kwargs = kwargs - self._callback = callback - - def run(self): - res = self._func(*self._args, **self._kwargs) - if self._callback: - self._callback(res) - def is_indexable(v): return hasattr(v, "__getitem__")