Skip to content

Commit

Permalink
#269 fixes and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
wpbonelli committed Feb 7, 2022
1 parent 60aab47 commit 4f7b502
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 38 deletions.
16 changes: 7 additions & 9 deletions plantit/plantit/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,22 @@

import plantit.healthchecks
import plantit.mapbox
import plantit.terrain as terrain
import plantit.queries as q
import plantit.terrain as terrain
import plantit.utils.agents
from plantit import settings
from plantit.agents.models import Agent
from plantit.celery import app
from plantit.healthchecks import is_healthy
from plantit.agents.models import AgentScheduler, Agent
from plantit.queries import get_workflow, refresh_user_workflow_cache, refresh_online_users_workflow_cache, refresh_online_user_orgs_workflow_cache, \
from plantit.queries import refresh_user_workflow_cache, refresh_online_users_workflow_cache, refresh_online_user_orgs_workflow_cache, \
refresh_user_cyverse_tokens
from plantit.celery import app
from plantit.redis import RedisClient
from plantit.sns import SnsClient
from plantit.ssh import execute_command
from plantit.task_lifecycle import create_immediate_task, configure_task_environment, submit_task_to_scheduler, check_job_logs_for_progress, \
get_job_status, get_job_walltime, list_result_files, cancel_task
get_job_status_and_walltime, list_result_files, cancel_task
from plantit.task_resources import get_task_ssh_client, push_task_channel_event, log_task_orchestrator_status, get_task_remote_logs
from plantit.tasks.models import Task, TaskStatus
from plantit.utils.tasks import parse_task_time_limit

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -187,8 +186,7 @@ def poll_job_status(self, guid: str):
try:
# get the job status from the scheduler
check_job_logs_for_progress(task)
job_status = get_job_status(task)
job_walltime = get_job_walltime(task)
job_status, job_walltime = get_job_status_and_walltime(task)

# get_job_status() returns None if the job isn't found in the agent's scheduler.
# there are 2 reasons this might happen:
Expand Down Expand Up @@ -434,7 +432,7 @@ def check_task_cyverse_transfer(self, guid: str, attempts: int = 0):
now = timezone.now()
task.updated = now
task.completed = now
task.status = TaskStatus.SUCCESS
task.status = TaskStatus.SUCCESS if task.status != TaskStatus.FAILURE else task.status
task.transferred = True
task.results_transferred = len(expected)
task.transfer_path = path
Expand Down
70 changes: 41 additions & 29 deletions plantit/plantit/task_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from django.utils import timezone
from django_celery_beat.models import IntervalSchedule, PeriodicTasks

from paramiko.ssh_exception import AuthenticationException, ChannelException, NoValidConnectionsError, SSHException
from tenacity import retry, wait_random_exponential, stop_after_attempt, retry_if_exception_type

from plantit import docker as docker
from plantit.agents.models import Agent, AgentScheduler
from plantit.miappe.models import Investigation, Study
Expand Down Expand Up @@ -342,9 +345,18 @@ def check_job_logs_for_progress(task: Task):
task.save()


def get_job_walltime(task: Task) -> (str, str):
@retry(
wait=wait_random_exponential(multiplier=5, max=120),
stop=stop_after_attempt(3),
retry=(retry_if_exception_type(AuthenticationException) | retry_if_exception_type(AuthenticationException) | retry_if_exception_type(ChannelException) | retry_if_exception_type(NoValidConnectionsError) | retry_if_exception_type(SSHException)),
reraise=True)
def get_job_status_and_walltime(task: Task):
ssh = get_task_ssh_client(task)
status = None
walltime = None

with ssh:
# first get the job's walltime
lines = execute_command(
ssh=ssh,
precommand=":",
Expand All @@ -355,15 +367,12 @@ def get_job_walltime(task: Task) -> (str, str):
try:
job_line = next(l for l in lines if task.job_id in l)
job_split = job_line.split()
job_walltime = job_split[-3]
return job_walltime
walltime = job_split[-3]
except StopIteration:
return None

# if we don't receive any lines of output, the job wasn't found
pass

def get_job_status(task: Task):
ssh = get_task_ssh_client(task)
with ssh:
# next get the job's status
lines = execute_command(
ssh=ssh,
precommand=':',
Expand All @@ -373,9 +382,10 @@ def get_job_status(task: Task):

try:
line = next(l for l in lines if task.job_id in l)
return line.split()[5].replace('+', '')
status = line.split()[5].replace('+', '')
return status, walltime
except StopIteration:
# if we don't receive any lines of output from `sacct -j <job ID>`, the job wasn't found
# if we don't receive any lines of output, the job wasn't found
pass

# check the scheduler log file in case `sacct` is no longer displaying info
Expand All @@ -386,28 +396,31 @@ def get_job_status(task: Task):
stdin, stdout, stderr = ssh.client.exec_command(f"test -e {log_file_path} && echo exists")

# if log file doesn't exist, return None
if stdout.read().decode().strip() != 'exists': return None
if stdout.read().decode().strip() != 'exists': status = None

# otherwise check the log file to see if job status was written there
with sftp.open(log_file_path, 'r') as log_file:
logger.info(f"Checking scheduler log file {log_file_path} for job {task.job_id} status")
else:
with sftp.open(log_file_path, 'r') as log_file:
logger.info(f"Checking scheduler log file {log_file_path} for job {task.job_id} status")

for line in log_file.readlines():
# if we find success or failure, return immediately
if 'FAILED' in line or 'FAILURE' in line or 'NODE_FAIL' in line:
return 'FAILED'
if 'SUCCESS' in line or 'COMPLETED' in line:
return 'SUCCESS'
for line in log_file.readlines():
# if we find success or failure, stop
if 'FAILED' in line or 'FAILURE' in line or 'NODE_FAIL' in line:
status = 'FAILED'
break
if 'SUCCESS' in line or 'COMPLETED' in line:
status = 'SUCCESS'
break

# otherwise use the most recent status (last line of the log file)
if 'CANCELLED' in line or 'CANCELED' in line:
status = 'CANCELED'
continue
if 'TIMEOUT' in line:
status = 'TIMEOUT'
continue
# otherwise use the most recent status (last line of the log file)
if 'CANCELLED' in line or 'CANCELED' in line:
status = 'CANCELED'
continue
if 'TIMEOUT' in line:
status = 'TIMEOUT'
continue

return status
return status, walltime


def list_result_files(task: Task) -> List[dict]:
Expand Down Expand Up @@ -480,8 +493,7 @@ def parse_task_cli_options(task: Task) -> (List[str], TaskOptions):
if 'include' not in config['output']: config['output']['include'] = dict()
if 'patterns' not in config['output']['include']: config['output']['exclude']['patterns'] = []

# include task configuration file and scheduler logs
config['output']['include']['names'].append(f"{task.guid}.yaml")
# include scheduler logs
config['output']['include']['patterns'].append("out")
config['output']['include']['patterns'].append("err")
config['output']['include']['patterns'].append("log")
Expand Down

0 comments on commit 4f7b502

Please sign in to comment.