diff --git a/karton/core/karton.py b/karton/core/karton.py index 5d819b9..6eceb22 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -106,11 +106,15 @@ class Consumer(KartonServiceBase): :param config: Karton config to use for service configuration :param identity: Karton service identity :param backend: Karton backend to use + :param task_timeout: The maximum time, in seconds, this consumer will wait for + a task to finish processing before being CRASHED on timeout. + Set 0 for unlimited, and None for using global value """ filters: List[Dict[str, Any]] = [] persistent: bool = True version: Optional[str] = None + task_timeout = None def __init__( self, @@ -130,7 +134,8 @@ def __init__( self.config.getboolean("karton", "persistent", self.persistent) and not self.debug ) - self.task_timeout = self.config.getint("karton", "task_timeout") + if self.task_timeout is None: + self.task_timeout = self.config.getint("karton", "task_timeout") self.current_task: Optional[Task] = None self._pre_hooks: List[Tuple[Optional[str], Callable[[Task], None]]] = [] self._post_hooks: List[ @@ -323,7 +328,7 @@ def loop(self) -> None: """ self.log.info("Service %s started", self.identity) - if self.task_timeout is not None: + if self.task_timeout: self.log.info(f"Task timeout is set to {self.task_timeout} seconds") # Get the old binds and set the new ones atomically