diff --git a/cereja/__init__.py b/cereja/__init__.py index a5891fc..0a64551 100644 --- a/cereja/__init__.py +++ b/cereja/__init__.py @@ -49,7 +49,7 @@ from . import scraping from . import wcag -VERSION = "2.0.4.final.0" +VERSION = "2.0.5.final.0" __version__ = get_version_pep440_compliant(VERSION) diff --git a/cereja/utils/time.py b/cereja/utils/time.py index 342b8d5..0f85552 100644 --- a/cereja/utils/time.py +++ b/cereja/utils/time.py @@ -17,12 +17,13 @@ SOFTWARE. """ import math +import random import threading import time import time as _time from typing import Callable, Union -__all__ = ["Timer", "set_interval", "time_format"] +__all__ = ["Timer", "set_interval", "time_format", "RandomTimer", "TimeEstimate", "IntervalScheduler"] class Timer: @@ -127,6 +128,12 @@ def time_overflow(self): return max(self.elapsed - self.interval, 0) if self._interval > 0 else 0 + def wait(self): + """ + Wait until the timer interval has passed. + """ + _time.sleep(self.remaining) + def __str__(self): return time_format(self.elapsed) @@ -134,6 +141,55 @@ def __repr__(self): eta = "" if self._interval <= 0 else f", ETA={time_format(self.remaining)}" return f"Timer(elapsed={self.__str__()}{eta})" + def iter_over(self, iterable): + """ + Iterate over an iterable and wait until the timer interval has passed. + @param iterable: An iterable object. + @return: An iterator object. + """ + for item in iterable: + self.start() + yield item + self.wait() + + +class RandomTimer(Timer): + def __init__(self, min_interval: float, max_interval: float, start=True, auto_reset=False): + """ + Initialize the RandomTimer. + + Args: + min_interval (float): The minimum time interval in seconds. + max_interval (float): The maximum time interval in seconds. + start (bool): Whether to start the timer immediately. Default is True. + auto_reset (bool): Whether the timer should automatically reset after the interval has passed. + Default is False. + """ + assert isinstance(min_interval, (int, float)), TypeError(f"{min_interval} isn't valid. Send a number!") + assert isinstance(max_interval, (int, float)), TypeError(f"{max_interval} isn't valid. Send a number!") + assert min_interval >= 0, ValueError("min_interval must be greater than or equal to 0") + assert max_interval >= 0, ValueError("max_interval must be greater than or equal to 0") + assert min_interval < max_interval, ValueError("min_interval must be less than max_interval") + self._min_interval = min_interval + self._max_interval = max_interval + self._interval = self._random_interval() + super(RandomTimer, self).__init__(self._interval, start, auto_reset) + + def _random_interval(self): + return random.uniform(self._min_interval, self._max_interval) + + def reset(self): + self._interval = self._random_interval() + self.start() + + def start(self): + self._interval = self._random_interval() + super(RandomTimer, self).start() + + def __repr__(self): + eta = "" if self._interval <= 0 else f", ETA={time_format(self.remaining)}" + return f"RandomTimer(elapsed={self.__str__()}{eta})" + class TimeEstimate: def __init__(self, size: int = None): @@ -142,6 +198,14 @@ def __init__(self, size: int = None): self._size = size or 0 self._total_times = 0 + @property + def total_times(self): + return self._total_times + + @property + def size(self): + return self._size + def set_time_it(self) -> float: self._total_times += 1 return self._timer.elapsed @@ -169,6 +233,12 @@ def eta_formated(self): def per_sec(self): return math.ceil(self._total_times / max(self._timer.elapsed, 1)) + def __str__(self): + return f"TimeEstimate(duration={self.duration_formated}, eta={self.eta_formated}, per_sec={self.per_sec})" + + def __repr__(self): + return self.__str__() + class IntervalScheduler: """ @@ -269,3 +339,17 @@ def time_format(seconds: float, format_="%H:%M:%S") -> Union[str, float]: return f"-{time_}" return time_ return seconds # Return NaN or any invalid input as it is + + +def set_timeout(func: Callable, sec: float, use_thread=False, *args, **kwargs): + """ + Call a function after sec seconds + @param func: function + @param sec: seconds + @param use_thread: If True, the function will be called in a new thread + """ + if use_thread: + threading.Timer(sec, func, args=args, kwargs=kwargs).start() + else: + time.sleep(sec) + func(*args, **kwargs) \ No newline at end of file