Skip to content

Commit

Permalink
Optional feature: crash started task on timeout instead of deleting (#…
Browse files Browse the repository at this point in the history
…264)

Co-authored-by: Stanislav Rakovsky <iam@disasm.me>
Co-authored-by: psrok1 <pawel.srokosz@cert.pl>
  • Loading branch information
3 people authored Jan 22, 2025
1 parent 8140d48 commit 4cbb5ce
Showing 1 changed file with 67 additions and 9 deletions.
76 changes: 67 additions & 9 deletions karton/system/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SystemService(KartonServiceBase):
version = __version__
with_service_info = True

CRASH_STARTED_TASKS_ON_TIMEOUT = False
GC_INTERVAL = 3 * 60
TASK_DISPATCHED_TIMEOUT = 24 * 3600
TASK_STARTED_TIMEOUT = 24 * 3600
Expand All @@ -45,9 +46,31 @@ def __init__(self, config: Optional[Config]) -> None:
)
self.enable_gc = self.config.getboolean("system", "enable_gc", True)
self.enable_router = self.config.getboolean("system", "enable_router", True)
self.crash_started_tasks_on_timeout = self.config.getboolean(
"system", "crash_started_tasks_on_timeout", False
)

self.last_gc_trigger = time.time()

def _log_config(self):
self.log.info(
"Effective config:\n"
" gc_interval:\t%s\n"
" task_dispatched_timeout:\t%s\n"
" task_started_timeout:\t%s\n"
" task_crashed_timeout:\t%s\n"
" enable_gc:\t%s\n"
" enable_router:\t%s\n"
" crash_started_tasks_on_timeout:\t%s",
self.gc_interval,
self.task_dispatched_timeout,
self.task_started_timeout,
self.task_crashed_timeout,
self.enable_gc,
self.enable_router,
self.crash_started_tasks_on_timeout,
)

def gc_collect_resources(self) -> None:
# Collects unreferenced resources left in object storage
karton_bucket = self.backend.default_bucket_name
Expand All @@ -68,13 +91,15 @@ def gc_collect_resources(self) -> None:
self.backend.remove_objects(karton_bucket, resources_to_remove)

def gc_collect_tasks(self) -> None:
self.log.debug("GC: gc_collect_tasks started")
# Collects finished tasks
root_tasks = set()
running_root_tasks = set()
unrouted_task_uids = self.backend.get_task_ids_from_queue(KARTON_TASKS_QUEUE)

current_time = time.time()
to_delete = []
to_crash = []

queues_to_clear = set()
online_consumers = self.backend.get_online_consumers()
Expand Down Expand Up @@ -116,14 +141,24 @@ def gc_collect_tasks(self) -> None:
and task.last_update is not None
and current_time > task.last_update + self.task_started_timeout
):
to_delete.append(task)
self.log.error(
"Task %s is in Started state more than %d seconds. "
"Killed. (receiver: %s)",
task.uid,
self.task_started_timeout,
task.headers.get("receiver", "<unknown>"),
)
if self.crash_started_tasks_on_timeout:
to_crash.append(task)
self.log.error(
"Task %s is in Started state more than %d seconds. "
"Crashed. (receiver: %s)",
task.uid,
self.task_started_timeout,
task.headers.get("receiver", "<unknown>"),
)
else:
to_delete.append(task)
self.log.error(
"Task %s is in Started state more than %d seconds. "
"Killed. (receiver: %s)",
task.uid,
self.task_started_timeout,
task.headers.get("receiver", "<unknown>"),
)
elif task.status == TaskState.FINISHED:
to_delete.append(task)
self.log.debug("GC: Finished task %s", task.uid)
Expand Down Expand Up @@ -151,11 +186,26 @@ def gc_collect_tasks(self) -> None:
self.backend.increment_metrics_list(
KartonMetrics.TASK_GARBAGE_COLLECTED, to_increment
)
if to_crash:
to_increment = [
task.headers.get("receiver", "unknown") for task in to_crash
]
for task in to_crash:
task.error = [
"This task was STARTED too long (TASK_STARTED_TIMEOUT), "
"so status was changes to CRASHED."
]
self.backend.set_task_status(task, TaskState.CRASHED)
self.backend.increment_metrics_list(
KartonMetrics.TASK_CRASHED, to_increment
)

for finished_root_task in root_tasks.difference(running_root_tasks):
# TODO: Notification needed
self.log.debug("GC: Finished root task %s", finished_root_task)

self.log.debug("GC: gc_collect_tasks ended")

def gc_collect(self) -> None:
if time.time() > (self.last_gc_trigger + self.gc_interval):
try:
Expand Down Expand Up @@ -251,6 +301,7 @@ def process_routing(self) -> None:
self.handle_operations(bodies)

def loop(self) -> None:
self._log_config()
self.log.info("Manager %s started", self.identity)

with self.graceful_killer():
Expand Down Expand Up @@ -288,7 +339,6 @@ def args_parser(cls) -> argparse.ArgumentParser:
parser.add_argument(
"--gc-interval",
type=int,
default=cls.GC_INTERVAL,
help="Garbage collection interval",
)
parser.add_argument(
Expand All @@ -304,16 +354,24 @@ def args_parser(cls) -> argparse.ArgumentParser:
parser.add_argument(
"--task-crashed-timeout", help="Timeout for tasks in Crashed state"
)
parser.add_argument(
"--crash-started-task-on-timeout",
action="store_const",
dest="crash_started_tasks",
help="Crash Started tasks on timeout instead of deleting",
)
return parser

@classmethod
def config_from_args(cls, config: Config, args: argparse.Namespace):
super().config_from_args(config, args)

config.load_from_dict(
{
"system": {
"enable_gc": args.enable_gc,
"enable_router": args.enable_router,
"crash_started_tasks_on_timeout": args.crash_started_tasks,
"gc_interval": args.gc_interval,
"task_dispatched_timeout": args.task_dispatched_timeout,
"task_started_timeout": args.task_started_timeout,
Expand Down

0 comments on commit 4cbb5ce

Please sign in to comment.