Skip to content

Commit

Permalink
improves: fix multiprocess and add func args and kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
jlsneto committed Nov 9, 2023
1 parent 414dc85 commit eda764e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
13 changes: 9 additions & 4 deletions cereja/concurrently/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, max_threads: int):
self._terminate = False
self._results = []

def execute(self, function, values) -> list:
def execute(self, function, values, args, kwargs) -> list:
"""
Execute the given function using multiple threads on the provided values.
Expand All @@ -47,10 +47,11 @@ def execute(self, function, values) -> list:
print("Terminating due to an exception in one of the threads. Returning processed data...")
break
thread = threading.Thread(target=self._execute_function_thread, name=f'Thread-{indx}',
args=(function, value, indx))
args=(function, value, indx, args[indx] if args else args, kwargs[indx] if kwargs else kwargs))
with self._lock:
self._active_threads += 1
thread.start()
self._terminate = False
return self._get_results()

def _get_results(self):
Expand All @@ -69,7 +70,7 @@ def wait_for_available_thread(self):
while self._active_threads >= self.max_threads:
self._thread_available.wait()

def _execute_function_thread(self, function, value, indx):
def _execute_function_thread(self, function, value, indx, args, kwargs):
"""
Internal method to execute the function on the given value. Handles exceptions and manages the active thread count.
Expand All @@ -79,7 +80,11 @@ def _execute_function_thread(self, function, value, indx):
"""
try:
if not self._terminate:
self._results.append((indx, function(value)))
if args or kwargs:
self._results.append((indx, function(value, *args, **kwargs)))
else:
self._results.append((indx, function(value)))

except Exception as e:
print(f"Error encountered in thread: {e}")
self._terminate = True
Expand Down
4 changes: 2 additions & 2 deletions cereja/experimental/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from cereja.utils import invert_dict, obj_repr

__all__ = ["CJOrderedDict", "CJDict", "CJMeta", "Multiprocess"]
__all__ = ["CJOrderedDict", "CJDict", "CJMeta", "ParallelProcess"]


class CJMeta(type):
Expand Down Expand Up @@ -71,7 +71,7 @@ def __setitem__(self, key, value):
import cereja as cj


class Multiprocess:
class ParallelProcess:
_template = """
from multiprocessing import Pool
import cereja as cj
Expand Down
12 changes: 8 additions & 4 deletions cereja/utils/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import inspect
from pprint import pprint

from .decorators import time_exec
# Needed init configs
from ..config.cj_types import ClassType, FunctionType, Number

Expand Down Expand Up @@ -710,20 +711,23 @@ def value_from_memory(memory_id):
return ctypes.cast(memory_id, ctypes.py_object).value


def combinations(iterable, size):
def combinations(iterable, size, is_sorted=False):
pool = tuple(set(map(id, iterable)))
n = len(pool)
combinations_result = []
for indices in itertools.permutations(range(n), size):
if sorted(indices) == list(indices):
combinations_result.append(tuple(value_from_memory(pool[i]) for i in indices))
if is_sorted:
combinations_result.append(tuple(sorted(value_from_memory(pool[i]) for i in indices)))
else:
combinations_result.append(tuple(value_from_memory(pool[i]) for i in indices))
return combinations_result


def combinations_sizes(iterable, min_size, max_size):
def combinations_sizes(iterable, min_size, max_size, is_sorted=False):
res = []
for n in range(min_size, max_size + 1):
for i in combinations(iterable, size=n):
for i in combinations(iterable, size=n, is_sorted=is_sorted):
res.append(i)
return res

Expand Down

0 comments on commit eda764e

Please sign in to comment.