diff --git a/README.md b/README.md index 9812ab87..51e26d35 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,7 @@ WORKFLOWS_CACHE=/code/workflows.json WORKFLOWS_REFRESH_MINUTES=60 TASKS_LOGS=/code/logs TASKS_TIMEOUT_MULTIPLIER=2 +TASKS_STEP_TIME_LIMIT_SECONDS=20 LAUNCHER_SCRIPT_NAME=launch SQL_ENGINE=django.db.backends.postgresql SQL_HOST=postgres diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 0b8717a7..6d0aa3f2 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -37,6 +37,7 @@ services: - TASKS_LOGS=${TASKS_LOGS} - TASKS_REFRESH_SECONDS=${TASKS_REFRESH_SECONDS} - TASKS_CLEANUP_MINUTES=${TASKS_CLEANUP_MINUTES} + - TASKS_STEP_TIME_LIMIT_SECONDS=${TASKS_STEP_TIME_LIMIT_SECONDS} - SQL_ENGINE=${SQL_ENGINE} - SQL_HOST=${SQL_HOST} - SQL_PORT=${SQL_PORT} @@ -134,6 +135,7 @@ services: - TASKS_LOGS=${TASKS_LOGS} - TASKS_REFRESH_SECONDS=${TASKS_REFRESH_SECONDS} - TASKS_CLEANUP_MINUTES=${TASKS_CLEANUP_MINUTES} + - TASKS_STEP_TIME_LIMIT_SECONDS=${TASKS_STEP_TIME_LIMIT_SECONDS} - LAUNCHER_SCRIPT_NAME=${LAUNCHER_SCRIPT_NAME} - SQL_ENGINE=${SQL_ENGINE} - SQL_HOST=${SQL_HOST} diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index c320f676..071fe143 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -37,6 +37,7 @@ services: - TASKS_LOGS=${TASKS_LOGS} - TASKS_REFRESH_SECONDS=${TASKS_REFRESH_SECONDS} - TASKS_CLEANUP_MINUTES=${TASKS_CLEANUP_MINUTES} + - TASKS_STEP_TIME_LIMIT_SECONDS=${TASKS_STEP_TIME_LIMIT_SECONDS} - LAUNCHER_SCRIPT_NAME=${LAUNCHER_SCRIPT_NAME} - SQL_ENGINE=${SQL_ENGINE} - SQL_HOST=${SQL_HOST} @@ -107,6 +108,7 @@ services: - TASKS_LOGS=${TASKS_LOGS} - TASKS_REFRESH_SECONDS=${TASKS_REFRESH_SECONDS} - TASKS_CLEANUP_MINUTES=${TASKS_CLEANUP_MINUTES} + - TASKS_STEP_TIME_LIMIT_SECONDS=${TASKS_STEP_TIME_LIMIT_SECONDS} - LAUNCHER_SCRIPT_NAME=${LAUNCHER_SCRIPT_NAME} - SQL_ENGINE=${SQL_ENGINE} - SQL_HOST=${SQL_HOST} diff --git a/plantit/front_end/src/components/datasets/data-tree.vue b/plantit/front_end/src/components/datasets/data-tree.vue index 4e06ce6f..d251d4e9 100644 --- a/plantit/front_end/src/components/datasets/data-tree.vue +++ b/plantit/front_end/src/components/datasets/data-tree.vue @@ -1185,7 +1185,7 @@ > @@ -1208,8 +1208,7 @@ - - + --> --> - - + diff --git a/plantit/front_end/src/components/workflows/workflow.vue b/plantit/front_end/src/components/workflows/workflow.vue index 20e6c9bf..e8b6661d 100644 --- a/plantit/front_end/src/components/workflows/workflow.vue +++ b/plantit/front_end/src/components/workflows/workflow.vue @@ -4439,8 +4439,8 @@ export default { patterns: [], names: [] }; - config.output.include.patterns = this.outputSelectedPatterns; - config.output.include.names = this.outputSelectedNames; + config.output.include.patterns = Array.from(this.outputSelectedPatterns); + config.output.include.names = Array.from(this.outputSelectedNames); // config.output.patterns = // this.outputSelectedPatterns.length > 0 diff --git a/plantit/plantit/celery_tasks.py b/plantit/plantit/celery_tasks.py index 56cb74b0..f599b3df 100644 --- a/plantit/plantit/celery_tasks.py +++ b/plantit/plantit/celery_tasks.py @@ -277,103 +277,6 @@ def list_task_results(guid: str, auth: dict): # async_to_sync(push_task_event)(task) check_cyverse_transfer_completion.s(guid).apply_async(priority=1) - # log_task_orchestrator_status(task, [f"Creating file previews"]) - # async_to_sync(push_task_event)(task) - - # with ssh: - # with ssh.client.open_sftp() as sftp: - # sftp.chdir(workdir) - # for result in expected: - # name = result['name'] - # path = result['path'] - # exists = result['exists'] - - # if not exists: continue - # if name.endswith('txt') or \ - # name.endswith('csv') or \ - # name.endswith('yml') or \ - # name.endswith('yaml') or \ - # name.endswith('tsv') or \ - # name.endswith('out') or \ - # name.endswith('err') or \ - # name.endswith('log'): - # logger.info(f"Creating preview for text file: {name}") - # with tempfile.NamedTemporaryFile() as temp_file: - # sftp.get(name, temp_file.name) - - # try: - # preview = previews.get_jpeg_preview(temp_file.name, width=1024, height=1024) - # except UnsupportedMimeType: - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", 'EMPTY') - # logger.info(f"Saved empty file preview to cache: {name}") - # continue - - # with open(preview, 'rb') as pf: - # content = pf.read() - # encoded = base64.b64encode(content) - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", encoded) - # logger.info(f"Saved file preview to cache: {name}") - # elif path.endswith('png'): - # logger.info(f"Creating preview for PNG file: {name}") - # with tempfile.NamedTemporaryFile() as temp_file: - # sftp.get(result['name'], temp_file.name) - - # try: - # preview = previews.get_jpeg_preview(temp_file.name, width=1024, height=1024) - # except UnsupportedMimeType: - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", 'EMPTY') - # logger.info(f"Saved empty preview for PNG file to cache: {name}") - # continue - - # with open(preview, 'rb') as pf: - # content = pf.read() - # encoded = base64.b64encode(content) - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", encoded) - # logger.info(f"Saved file preview to cache: {name}") - # elif path.endswith('jpg') or path.endswith('jpeg'): - # logger.info(f"Creating preview for JPG file: {name}") - # with tempfile.NamedTemporaryFile() as temp_file: - # sftp.get(result['name'], temp_file.name) - - # try: - # preview = previews.get_jpeg_preview(temp_file.name, width=1024, height=1024) - # except UnsupportedMimeType: - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", 'EMPTY') - # logger.info(f"Saved empty preview for JPG file to cache: {name}") - # continue - - # with open(preview, 'rb') as pf: - # content = pf.read() - # encoded = base64.b64encode(content) - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", encoded) - # logger.info(f"Saved JPG file preview to cache: {name}") - # elif path.endswith('czi'): - # logger.info(f"Creating preview for CZI file: {name}") - # with tempfile.NamedTemporaryFile() as temp_file: - # sftp.get(result['name'], temp_file.name) - - # image = czifile.imread(temp_file.name) - # image.shape = (image.shape[2], image.shape[3], image.shape[4]) - # success, buffer = cv2.imencode(".jpg", image) - # buffer.tofile(temp_file.name) - - # try: - # preview = previews.get_jpeg_preview(temp_file.name, width=1024, height=1024) - # except UnsupportedMimeType: - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", 'EMPTY') - # logger.info(f"Saved empty preview for CZI file to cache: {name}") - # continue - - # with open(preview, 'rb') as pf: - # content = pf.read() - # encoded = base64.b64encode(content) - # redis.set(f"previews/{task.user.username}/{task.guid}/{name}", encoded) - # logger.info(f"Saved file preview to cache: {name}") - # elif path.endswith('ply'): - # logger.info(f"Creating preview for PLY file: {name}") - # with tempfile.NamedTemporaryFile() as temp_file: - # sftp.get(result['name'], temp_file.name) - cleanup_delay = int(environ.get('TASKS_CLEANUP_MINUTES')) * 60 cleanup_task.s(guid, auth).apply_async(countdown=cleanup_delay, priority=2) task.cleanup_time = timezone.now() + timedelta(seconds=cleanup_delay) @@ -402,13 +305,21 @@ def check_cyverse_transfer_completion(guid: str, iteration: int = 0): else: msg = f"Transfer to CyVerse directory {path} completed" logger.info(msg) + task.transferred = True task.results_transferred = len(expected) + task.transfer_path = path task.save() log_task_orchestrator_status(task, [msg]) async_to_sync(push_task_event)(task) +@app.task() +def check_task_completion(guid: str, auth): + # TODO logic for local vs jobqueue tasks + pass + + # @app.task() # def clean_agent_singularity_cache(agent_name: str): # try: diff --git a/plantit/plantit/settings.py b/plantit/plantit/settings.py index 99689a48..d8a38ab1 100644 --- a/plantit/plantit/settings.py +++ b/plantit/plantit/settings.py @@ -21,6 +21,7 @@ assert 'TASKS_TIMEOUT_MULTIPLIER' in os.environ, f"{missing_variable}: TASKS_TIMEOUT_MULTIPLIER" assert 'TASKS_REFRESH_SECONDS' in os.environ, f"{missing_variable}: TASKS_REFRESH_SECONDS" assert 'TASKS_CLEANUP_MINUTES' in os.environ, f"{missing_variable}: TASKS_CLEANUP_MINUTES" +assert 'TASKS_STEP_TIME_LIMIT_SECONDS' in os.environ, f"{missing_variable}: TASKS_STEP_TIME_LIMIT_SECONDS" assert 'LAUNCHER_SCRIPT_NAME' in os.environ, f"{missing_variable}: LAUNCHER_SCRIPT_NAME" assert 'DJANGO_API_URL' in os.environ, f"{missing_variable}: DJANGO_API_URL" assert 'CYVERSE_REDIRECT_URL' in os.environ, f"{missing_variable}: CYVERSE_REDIRECT_URL" @@ -38,7 +39,6 @@ assert 'AGENTS_HEALTHCHECKS_MINUTES' in os.environ, f"{missing_variable} AGENTS_HEALTHCHECKS_MINUTES" assert 'AGENTS_HEALTHCHECKS_SAVED' in os.environ, f"{missing_variable} AGENTS_HEALTHCHECKS_SAVED" - MAPBOX_TOKEN = os.environ.get('MAPBOX_TOKEN') MAPBOX_FEATURE_REFRESH_MINUTES = os.environ.get('MAPBOX_FEATURE_REFRESH_MINUTES') CYVERSE_TOKEN_REFRESH_MINUTES = os.environ.get('CYVERSE_TOKEN_REFRESH_MINUTES') @@ -58,6 +58,7 @@ TASKS_TIMEOUT_MULTIPLIER = os.environ.get('TASKS_TIMEOUT_MULTIPLIER') TASKS_REFRESH_SECONDS = os.environ.get('TASKS_REFRESH_SECONDS') TASKS_CLEANUP_MINUTES = os.environ.get('TASKS_CLEANUP_MINUTES') +TASKS_STEP_TIME_LIMIT_SECONDS = os.environ.get('TASKS_STEP_TIME_LIMIT_SECONDS') NO_PREVIEW_THUMBNAIL = os.environ.get('NO_PREVIEW_THUMBNAIL') LAUNCHER_SCRIPT_NAME = os.environ.get('LAUNCHER_SCRIPT_NAME') AWS_FEEDBACK_ARN = os.environ.get("AWS_FEEDBACK_ARN") diff --git a/plantit/plantit/tasks/models.py b/plantit/plantit/tasks/models.py index 944759cd..6d258071 100644 --- a/plantit/plantit/tasks/models.py +++ b/plantit/plantit/tasks/models.py @@ -76,6 +76,7 @@ class Meta: completed = models.DateTimeField(null=True, blank=True) celery_task_id = models.CharField(max_length=50, null=True, blank=True) transferred = models.BooleanField(default=False) + transfer_path = models.CharField(max_length=250, null=True, blank=True) due_time = models.DateTimeField(null=True, blank=True) cleanup_time = models.DateTimeField(null=True, blank=True) diff --git a/plantit/plantit/tasks/urls.py b/plantit/plantit/tasks/urls.py index 7440b497..e84c3bc7 100644 --- a/plantit/plantit/tasks/urls.py +++ b/plantit/plantit/tasks/urls.py @@ -12,14 +12,13 @@ path(r'//delete/', views.delete), path(r'//output/', views.get_output_file), # path(r'//file_text/', views.get_file_text), - path(r'//thumbnail/', views.get_thumbnail), # path(r'//3d_model/', views.get_3d_model), - path(r'//orchestrator_logs/', views.get_task_logs), - path(r'//orchestrator_logs_content/', views.get_task_logs_content), - path(r'//scheduler_logs/', views.get_scheduler_logs), - path(r'//scheduler_logs_content/', views.get_scheduler_logs_content), path(r'//agent_logs/', views.get_agent_logs), path(r'//agent_logs_content/', views.get_agent_logs_content), + path(r'//scheduler_logs/', views.get_scheduler_logs), + path(r'//scheduler_logs_content/', views.get_scheduler_logs_content), + path(r'//orchestrator_logs/', views.get_task_logs), + path(r'//orchestrator_logs_content/', views.get_task_logs_content), path(r'//transfer/', views.transfer_to_cyverse), path(r'search////', views.search), ] diff --git a/plantit/plantit/tasks/views.py b/plantit/plantit/tasks/views.py index 2684863f..21d9d994 100644 --- a/plantit/plantit/tasks/views.py +++ b/plantit/plantit/tasks/views.py @@ -1,4 +1,3 @@ -import base64 import json import logging import tempfile @@ -15,16 +14,15 @@ from django.views.decorators.csrf import csrf_exempt from plantit import settings -from plantit.agents.models import Agent, AgentExecutor, AgentAuthentication +from plantit.agents.models import Agent, AgentExecutor from plantit.celery_tasks import submit_task -from plantit.redis import RedisClient from plantit.ssh import execute_command from plantit.tasks.models import Task, DelayedTask, RepeatingTask, TaskStatus from plantit.utils import task_to_dict, create_task, parse_task_auth_options, get_task_ssh_client, get_task_orchestrator_log_file_path, \ log_task_orchestrator_status, \ push_task_event, cancel_task, delayed_task_to_dict, repeating_task_to_dict, parse_time_limit_seconds, \ - get_task_scheduler_log_file_path, get_task_scheduler_log_file_name, get_task_agent_log_file_name, get_task_agent_log_file_path, \ - compose_task_push_command, parse_task_cli_options + get_task_scheduler_log_file_path, get_task_agent_log_file_path, \ + get_included_by_name, get_included_by_pattern logger = logging.getLogger(__name__) @@ -53,11 +51,16 @@ def get_all_or_create(request): investigation=workflow['miappe']['project']['title'] if workflow['miappe']['project'] is not None else None, study=workflow['miappe']['study']['title'] if workflow['miappe']['study'] is not None else None) - time_limit = parse_time_limit_seconds(workflow['config']['time']) - - # submit the task auth = parse_task_auth_options(workflow['auth']) - submit_task.apply_async(args=[task.guid, auth], soft_time_limit=time_limit, priority=1) # TODO soft time limits too? + step_time_limit = int(settings.TASKS_STEP_TIME_LIMIT_SECONDS) + task_time_limit = parse_time_limit_seconds(workflow['config']['time']) + + # check_task_completion.apply_async(args=[task.guid, auth], countdown=task_time_limit, priority=1) + submit_task.apply_async( + args=[task.guid, auth], + soft_time_limit=task_time_limit if agent.executor == AgentExecutor.LOCAL else step_time_limit, + priority=1) + tasks = list(Task.objects.filter(user=user)) return JsonResponse({'tasks': [task_to_dict(t) for t in tasks]}) @@ -107,8 +110,10 @@ def get_by_owner(request, owner): # params = request.query_params # page = params.get('page') if 'page' in params else -1 - try: user = User.objects.get(username=owner) - except: return HttpResponseNotFound() + try: + user = User.objects.get(username=owner) + except: + return HttpResponseNotFound() tasks = Task.objects.filter(user=user) paginator = Paginator(tasks, 20) @@ -152,90 +157,35 @@ def get_by_owner_and_name(request, owner, name): def transfer_to_cyverse(request, owner, name): body = json.loads(request.body.decode('utf-8')) auth = parse_task_auth_options(body['auth']) - path = body['path'] + transfer_path = body['path'] + # find the task try: user = User.objects.get(username=owner) task = Task.objects.get(user=user, name=name) except: return HttpResponseNotFound() if not task.is_complete: return HttpResponseBadRequest('task incomplete') - ssh = get_task_ssh_client(task, auth) - options = parse_task_cli_options(task) - # compose command - command = f"plantit terrain push {path} -p {join(task.agent.workdir, task.workdir)} " - - # TODO factor this out into its own method - included_by_name = ((task.workflow['output']['include']['names'] if 'names' in task.workflow['output']['include'] else [])) if 'output' in task.workflow else [] # [f"{run.task_id}.zip"] - included_by_name.append(f"{task.guid}.zip") # zip file - if not task.agent.launcher: included_by_name.append(f"{task.guid}.{task.agent.name.lower()}.log") - if task.agent.executor != AgentExecutor.LOCAL and task.job_id is not None and task.job_id != '': - included_by_name.append(f"plantit.{task.job_id}.out") - included_by_name.append(f"plantit.{task.job_id}.err") - included_by_pattern = (task.workflow['output']['include']['patterns'] if 'patterns' in task.workflow['output']['include'] else []) if 'output' in task.workflow else [] - included_by_pattern.append('.out') - included_by_pattern.append('.err') - included_by_pattern.append('.zip') - - command = command + ' ' + ' '.join(['--include_pattern ' + pattern for pattern in included_by_pattern]) - command = command + ' ' + ' '.join(['--include_name ' + name for name in included_by_name]) + command = f"plantit terrain push {transfer_path} -p {join(task.agent.workdir, task.workdir)} " + command = command + ' ' + ' '.join(['--include_name ' + name for name in get_included_by_name(task)]) + command = command + ' ' + ' '.join(['--include_pattern ' + pattern for pattern in get_included_by_pattern(task)]) command += f" --terrain_token '{task.user.profile.cyverse_access_token}'" + # run command + ssh = get_task_ssh_client(task, auth) with ssh: for line in execute_command(ssh=ssh, precommand=task.agent.pre_commands, command=command, directory=task.agent.workdir, allow_stderr=True): logger.info(f"[{task.agent.name}] {line}") + + # update task + task.transfer_path = transfer_path task.transferred = True task.save() return JsonResponse(task_to_dict(task)) -@login_required -def get_thumbnail(request, owner, name): - path = request.GET.get('path') - file = path.rpartition('/')[2] - - try: - user = User.objects.get(username=owner) - task = Task.objects.get(user=user, name=name) - except: - return HttpResponseNotFound() - - redis = RedisClient.get() - preview = redis.get(f"previews/{user.username}/{task.name}/{file}") - - if preview is None or preview == b'EMPTY': - with open(settings.NO_PREVIEW_THUMBNAIL, 'rb') as thumbnail: - return HttpResponse(thumbnail, content_type="image/png") - elif file.endswith('txt') or \ - file.endswith('csv') or \ - file.endswith('yml') or \ - file.endswith('yaml') or \ - file.endswith('tsv') or \ - file.endswith('out') or \ - file.endswith('err') or \ - file.endswith('log'): - decoded = base64.b64decode(preview) - print(f"Retrieved text file preview from cache: {file}") - return HttpResponse(decoded, content_type="image/jpg") - elif file.endswith('png'): - decoded = base64.b64decode(preview) - print(f"Retrieved PNG file preview from cache: {file}") - return HttpResponse(decoded, content_type="image/png") - elif file.endswith('jpg') or file.endswith('jpeg'): - decoded = base64.b64decode(preview) - print(f"Retrieved JPG file preview from cache: {file}") - return HttpResponse(decoded, content_type="image/jpg") - elif file.endswith('czi'): - decoded = base64.b64decode(preview) - print(f"Retrieved CZI file preview from cache: {file}") - return HttpResponse(decoded, content_type="image/jpg") - else: - with open(settings.NO_PREVIEW_THUMBNAIL, 'rb') as thumbnail: - return HttpResponse(thumbnail, content_type="image/png") - - # @login_required # def get_3d_model(request, guid): # body = json.loads(request.body.decode('utf-8')) diff --git a/plantit/plantit/terrain.py b/plantit/plantit/terrain.py index f17c9502..7cb52850 100644 --- a/plantit/plantit/terrain.py +++ b/plantit/plantit/terrain.py @@ -35,7 +35,7 @@ def list_files(path, if include_names is not None else included_by_pattern # gather only included files - included = set(included_by_pattern + included_by_name) + included = list(set(included_by_pattern + included_by_name)) # remove files matched excluded patterns excluded_by_pattern = [name for name in included if all(pattern.lower() not in name.lower() for pattern in diff --git a/plantit/plantit/utils.py b/plantit/plantit/utils.py index 9d97e422..5fd95c39 100644 --- a/plantit/plantit/utils.py +++ b/plantit/plantit/utils.py @@ -45,7 +45,7 @@ from plantit.notifications.models import Notification from plantit.redis import RedisClient from plantit.ssh import SSH, execute_command -from plantit.tasks.models import DelayedTask, RepeatingTask, TaskStatus, TaskCounter +from plantit.tasks.models import DelayedTask, RepeatingTask, TaskStatus, TaskCounter, Task from plantit.tasks.models import Task from plantit.tasks.options import BindMount, EnvironmentVariable from plantit.tasks.options import PlantITCLIOptions, Parameter, Input, PasswordTaskAuth, KeyTaskAuth, InputKind @@ -1044,14 +1044,14 @@ def compose_task_zip_command(task: Task, options: PlantITCLIOptions) -> str: if 'output' in config: if 'include' in config['output']: if 'patterns' in config['output']['include']: - output['include']['patterns'] = output['include']['patterns'] + task.workflow['config']['output']['include']['patterns'] + output['include']['patterns'] = list(set(output['include']['patterns'] + task.workflow['config']['output']['include']['patterns'])) if 'names' in config['output']['include']: - output['include']['names'] = output['include']['names'] + task.workflow['config']['output']['include']['names'] + output['include']['names'] = list(set(output['include']['names'] + task.workflow['config']['output']['include']['names'])) if 'exclude' in config['output']: if 'patterns' in config['output']['exclude']: - output['exclude']['patterns'] = set(output['exclude']['patterns'] + task.workflow['config']['output']['exclude']['patterns']) + output['exclude']['patterns'] = list(set(output['exclude']['patterns'] + task.workflow['config']['output']['exclude']['patterns'])) if 'names' in config['output']['exclude']: - output['exclude']['names'] = set(output['exclude']['names'] + task.workflow['config']['output']['exclude']['names']) + output['exclude']['names'] = list(set(output['exclude']['names'] + task.workflow['config']['output']['exclude']['names'])) command = f"plantit zip {output['from'] if 'from' in output and output['from'] != '' else '.'} -o . -n {task.guid}" logs = [f"{task.guid}.{task.agent.name.lower()}.log"] @@ -1082,10 +1082,13 @@ def compose_task_push_command(task: Task, options: PlantITCLIOptions) -> str: if 'to' in output and output['to'] is not None: command = f"plantit terrain push {output['to']} -p {join(task.agent.workdir, task.workdir, output['from'])} " + # command = command + ' ' + ' '.join(['--include_name ' + name for name in get_included_by_name(task)]) + # command = command + ' ' + ' '.join(['--include_pattern ' + pattern for pattern in get_included_by_pattern(task)]) + # command += f" --terrain_token '{task.user.profile.cyverse_access_token}'" + if 'include' in output: if 'patterns' in output['include']: - patterns = output['include']['patterns'] - pprint.pprint(patterns) + patterns = list(output['include']['patterns']) patterns.append('.out') patterns.append('.err') patterns.append('.zip') @@ -1430,6 +1433,28 @@ def get_task_remote_logs(task: Task, ssh: SSH): get_remote_logs(agent_log_file_name, agent_log_file_path, task, ssh, sftp) +def get_included_by_name(task: Task) -> List[str]: + included_by_name = ( + (task.workflow['output']['include']['names'] if 'names' in task.workflow['output']['include'] else [])) if 'output' in task.workflow else [] + included_by_name.append(f"{task.guid}.zip") # zip file + if not task.agent.launcher: included_by_name.append(f"{task.guid}.{task.agent.name.lower()}.log") + if task.agent.executor != AgentExecutor.LOCAL and task.job_id is not None and task.job_id != '': + included_by_name.append(f"plantit.{task.job_id}.out") + included_by_name.append(f"plantit.{task.job_id}.err") + + return included_by_name + + +def get_included_by_pattern(task: Task) -> List[str]: + included_by_pattern = (task.workflow['output']['include']['patterns'] if 'patterns' in task.workflow['output'][ + 'include'] else []) if 'output' in task.workflow else [] + included_by_pattern.append('.out') + included_by_pattern.append('.err') + included_by_pattern.append('.zip') + + return included_by_pattern + + def check_logs_for_progress(task: Task): """ Parse scheduler log files for CLI output and update progress counters @@ -1696,6 +1721,7 @@ def task_to_dict(task: Task) -> dict: 'results_transferred': task.results_transferred, 'cleaned_up': task.cleaned_up, 'transferred': task.transferred, + 'transfer_path': task.transfer_path, 'output_files': json.loads(results) if results is not None else [], 'job_id': task.job_id, 'job_status': task.job_status, diff --git a/scripts/configure-environment.sh b/scripts/configure-environment.sh index a94d4bf5..432d821a 100755 --- a/scripts/configure-environment.sh +++ b/scripts/configure-environment.sh @@ -134,9 +134,10 @@ TASKS_TIMEOUT_MULTIPLER=2 TASKS_LOGS=/code/logs TASKS_REFRESH_SECONDS=60 TASKS_CLEANUP_MINUTES=60 -LAUNCHER_SCRIPT_NAME=launcher +TASKS_STEP_TIME_LIMIT_SECONDS=20 TASKS_TEMPLATE_SCRIPT_LOCAL=/code/scripts/template_local_run.sh TASKS_TEMPLATE_SCRIPT_SLURM=/code/scripts/template_slurm_run.sh +LAUNCHER_SCRIPT_NAME=launcher SQL_ENGINE=django.db.backends.postgresql SQL_HOST=postgres SQL_PORT=5432