From 504f6a75841b540861324b94ea8aa9a9465275d0 Mon Sep 17 00:00:00 2001 From: Rakovskij Stanislav Date: Wed, 22 Jan 2025 16:27:21 +0300 Subject: [PATCH] Allow to override task timeout for consumer class (#265) --- karton/core/karton.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/karton/core/karton.py b/karton/core/karton.py index 5d819b9..6eceb22 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -106,11 +106,15 @@ class Consumer(KartonServiceBase): :param config: Karton config to use for service configuration :param identity: Karton service identity :param backend: Karton backend to use + :param task_timeout: The maximum time, in seconds, this consumer will wait for + a task to finish processing before being CRASHED on timeout. + Set 0 for unlimited, and None for using global value """ filters: List[Dict[str, Any]] = [] persistent: bool = True version: Optional[str] = None + task_timeout = None def __init__( self, @@ -130,7 +134,8 @@ def __init__( self.config.getboolean("karton", "persistent", self.persistent) and not self.debug ) - self.task_timeout = self.config.getint("karton", "task_timeout") + if self.task_timeout is None: + self.task_timeout = self.config.getint("karton", "task_timeout") self.current_task: Optional[Task] = None self._pre_hooks: List[Tuple[Optional[str], Callable[[Task], None]]] = [] self._post_hooks: List[ @@ -323,7 +328,7 @@ def loop(self) -> None: """ self.log.info("Service %s started", self.identity) - if self.task_timeout is not None: + if self.task_timeout: self.log.info(f"Task timeout is set to {self.task_timeout} seconds") # Get the old binds and set the new ones atomically