diff --git a/plantit/plantit/celery_tasks.py b/plantit/plantit/celery_tasks.py index 72c1021d..a3066d66 100644 --- a/plantit/plantit/celery_tasks.py +++ b/plantit/plantit/celery_tasks.py @@ -13,23 +13,22 @@ import plantit.healthchecks import plantit.mapbox -import plantit.terrain as terrain import plantit.queries as q +import plantit.terrain as terrain import plantit.utils.agents from plantit import settings +from plantit.agents.models import Agent +from plantit.celery import app from plantit.healthchecks import is_healthy -from plantit.agents.models import AgentScheduler, Agent -from plantit.queries import get_workflow, refresh_user_workflow_cache, refresh_online_users_workflow_cache, refresh_online_user_orgs_workflow_cache, \ +from plantit.queries import refresh_user_workflow_cache, refresh_online_users_workflow_cache, refresh_online_user_orgs_workflow_cache, \ refresh_user_cyverse_tokens -from plantit.celery import app from plantit.redis import RedisClient from plantit.sns import SnsClient from plantit.ssh import execute_command from plantit.task_lifecycle import create_immediate_task, configure_task_environment, submit_task_to_scheduler, check_job_logs_for_progress, \ - get_job_status, get_job_walltime, list_result_files, cancel_task + get_job_status_and_walltime, list_result_files, cancel_task from plantit.task_resources import get_task_ssh_client, push_task_channel_event, log_task_orchestrator_status, get_task_remote_logs from plantit.tasks.models import Task, TaskStatus -from plantit.utils.tasks import parse_task_time_limit logger = get_task_logger(__name__) @@ -187,8 +186,7 @@ def poll_job_status(self, guid: str): try: # get the job status from the scheduler check_job_logs_for_progress(task) - job_status = get_job_status(task) - job_walltime = get_job_walltime(task) + job_status, job_walltime = get_job_status_and_walltime(task) # get_job_status() returns None if the job isn't found in the agent's scheduler. # there are 2 reasons this might happen: @@ -434,7 +432,7 @@ def check_task_cyverse_transfer(self, guid: str, attempts: int = 0): now = timezone.now() task.updated = now task.completed = now - task.status = TaskStatus.SUCCESS + task.status = TaskStatus.SUCCESS if task.status != TaskStatus.FAILURE else task.status task.transferred = True task.results_transferred = len(expected) task.transfer_path = path diff --git a/plantit/plantit/task_lifecycle.py b/plantit/plantit/task_lifecycle.py index 593aecee..cb0e77ad 100644 --- a/plantit/plantit/task_lifecycle.py +++ b/plantit/plantit/task_lifecycle.py @@ -20,6 +20,9 @@ from django.utils import timezone from django_celery_beat.models import IntervalSchedule, PeriodicTasks +from paramiko.ssh_exception import AuthenticationException, ChannelException, NoValidConnectionsError, SSHException +from tenacity import retry, wait_random_exponential, stop_after_attempt, retry_if_exception_type + from plantit import docker as docker from plantit.agents.models import Agent, AgentScheduler from plantit.miappe.models import Investigation, Study @@ -342,9 +345,18 @@ def check_job_logs_for_progress(task: Task): task.save() -def get_job_walltime(task: Task) -> (str, str): +@retry( + wait=wait_random_exponential(multiplier=5, max=120), + stop=stop_after_attempt(3), + retry=(retry_if_exception_type(AuthenticationException) | retry_if_exception_type(AuthenticationException) | retry_if_exception_type(ChannelException) | retry_if_exception_type(NoValidConnectionsError) | retry_if_exception_type(SSHException)), + reraise=True) +def get_job_status_and_walltime(task: Task): ssh = get_task_ssh_client(task) + status = None + walltime = None + with ssh: + # first get the job's walltime lines = execute_command( ssh=ssh, precommand=":", @@ -355,15 +367,12 @@ def get_job_walltime(task: Task) -> (str, str): try: job_line = next(l for l in lines if task.job_id in l) job_split = job_line.split() - job_walltime = job_split[-3] - return job_walltime + walltime = job_split[-3] except StopIteration: - return None - + # if we don't receive any lines of output, the job wasn't found + pass -def get_job_status(task: Task): - ssh = get_task_ssh_client(task) - with ssh: + # next get the job's status lines = execute_command( ssh=ssh, precommand=':', @@ -373,9 +382,10 @@ def get_job_status(task: Task): try: line = next(l for l in lines if task.job_id in l) - return line.split()[5].replace('+', '') + status = line.split()[5].replace('+', '') + return status, walltime except StopIteration: - # if we don't receive any lines of output from `sacct -j `, the job wasn't found + # if we don't receive any lines of output, the job wasn't found pass # check the scheduler log file in case `sacct` is no longer displaying info @@ -386,28 +396,31 @@ def get_job_status(task: Task): stdin, stdout, stderr = ssh.client.exec_command(f"test -e {log_file_path} && echo exists") # if log file doesn't exist, return None - if stdout.read().decode().strip() != 'exists': return None + if stdout.read().decode().strip() != 'exists': status = None # otherwise check the log file to see if job status was written there - with sftp.open(log_file_path, 'r') as log_file: - logger.info(f"Checking scheduler log file {log_file_path} for job {task.job_id} status") + else: + with sftp.open(log_file_path, 'r') as log_file: + logger.info(f"Checking scheduler log file {log_file_path} for job {task.job_id} status") - for line in log_file.readlines(): - # if we find success or failure, return immediately - if 'FAILED' in line or 'FAILURE' in line or 'NODE_FAIL' in line: - return 'FAILED' - if 'SUCCESS' in line or 'COMPLETED' in line: - return 'SUCCESS' + for line in log_file.readlines(): + # if we find success or failure, stop + if 'FAILED' in line or 'FAILURE' in line or 'NODE_FAIL' in line: + status = 'FAILED' + break + if 'SUCCESS' in line or 'COMPLETED' in line: + status = 'SUCCESS' + break - # otherwise use the most recent status (last line of the log file) - if 'CANCELLED' in line or 'CANCELED' in line: - status = 'CANCELED' - continue - if 'TIMEOUT' in line: - status = 'TIMEOUT' - continue + # otherwise use the most recent status (last line of the log file) + if 'CANCELLED' in line or 'CANCELED' in line: + status = 'CANCELED' + continue + if 'TIMEOUT' in line: + status = 'TIMEOUT' + continue - return status + return status, walltime def list_result_files(task: Task) -> List[dict]: @@ -480,8 +493,7 @@ def parse_task_cli_options(task: Task) -> (List[str], TaskOptions): if 'include' not in config['output']: config['output']['include'] = dict() if 'patterns' not in config['output']['include']: config['output']['exclude']['patterns'] = [] - # include task configuration file and scheduler logs - config['output']['include']['names'].append(f"{task.guid}.yaml") + # include scheduler logs config['output']['include']['patterns'].append("out") config['output']['include']['patterns'].append("err") config['output']['include']['patterns'].append("log")