diff --git a/janis/translations/__init__.py b/janis/translations/__init__.py index afddf38b..7f06866d 100644 --- a/janis/translations/__init__.py +++ b/janis/translations/__init__.py @@ -37,3 +37,15 @@ def translate_tool(tool, translation: SupportedTranslation, with_docker, with_re else: raise NotImplementedError(f"The requested translation ('{translation}') has not been implemented yet, " f"why not contribute one at '{GITHUB_URL}'.") + + +def build_resources_input(workflow, translation: SupportedTranslation, hints): + if translation == SupportedTranslations.CWL: + raise NotImplementedError("CWL implementation is coming soon!") + + elif translation == SupportedTranslations.WDL: + return wdl.build_wdl_resource_inputs(workflow, hints) + + else: + raise NotImplementedError(f"The requested translation ('{translation}') has not been implemented yet, " + f"why not contribute one at '{GITHUB_URL}'.") diff --git a/janis/translations/wdl.py b/janis/translations/wdl.py index 4955c0f3..00ef6d49 100644 --- a/janis/translations/wdl.py +++ b/janis/translations/wdl.py @@ -1,13 +1,14 @@ import os -from typing import List, Dict, Optional +from typing import List, Dict, Optional, Any import itertools import wdlgen as wdl +import janis.workflow.workflow as JW from janis.workflow.step import StepNode from janis.graph.stepinput import Edge, StepInput -from janis.types import InputSelector, WildcardSelector +from janis.types import InputSelector, WildcardSelector, CpuSelector, MemorySelector from janis.types.common_data_types import Stdout, Array, Boolean, Filename, File @@ -350,6 +351,17 @@ def translate_tool(tool, with_docker): if with_docker: r.add_docker(tool.docker()) + # generate resource inputs, for memory, cpu and disk at the moment + ins.extend([ + wdl.Input(wdl.WdlType.parse_type("Int?"), "runtime_cpu"), + wdl.Input(wdl.WdlType.parse_type("String?"), "runtime_memory"), + wdl.Input(wdl.WdlType.parse_type("String?"), "runtime_disks"), + ]) + + r.add_cpus("runtime_cpu") + r.add_memory("${runtime_memory}") + r.kwargs["disks"] = "runtime_disks" + return wdl.Task(tool.id(), ins, outs, commands, r, version="development") @@ -566,9 +578,10 @@ def translate_step_node(node, step_identifier: str, step_alias: str): f = edge.finish.inputs()[edge.ftag] secs = f.input_type.subtype().secondary_files() if isinstance(f.input_type, Array) \ else f.input_type.secondary_files() - for sec in secs: - inputs_map[get_secondary_tag_from_original_tag(k, sec)] = \ - "[" + ", ".join(get_secondary_tag_from_original_tag(kk, sec) for kk in edge.dotted_source()) + "]" + if secs: + for sec in secs: + inputs_map[get_secondary_tag_from_original_tag(k, sec)] = \ + "[" + ", ".join(get_secondary_tag_from_original_tag(kk, sec) for kk in edge.dotted_source()) + "]" continue # source = source[0] # raise Exception("Conversion to WDL does not currently support multiple sources") @@ -644,4 +657,25 @@ def translate_step_node(node, step_identifier: str, step_alias: str): return call -# def map_inputs_for_non_secondary_scatterable_field(): + +def build_wdl_resource_inputs(wf, hints, prefix=None) -> Dict[str, Any]: + # returns a list of key, value pairs + steps = {} + if not prefix: + prefix = "" # wf.id() + "." + + for s in wf._steps: + tool: Tool = s.step.tool() + + if isinstance(tool, CommandTool): + tool_pre = prefix + "_" + tool.id() + "_" + steps.update([ + (tool_pre + "runtime_memory", tool.memory(hints)), + (tool_pre + "runtime_cpu", tool.cpus(hints)), + (tool_pre + "runtime_disks", None) + ]) + elif isinstance(tool, JW.Workflow): + tool_pre = prefix + "." + s.id() + steps.update(build_wdl_resource_inputs(tool, hints, tool_pre)) + + return steps diff --git a/janis/workflow/workflow.py b/janis/workflow/workflow.py index ead376ae..734386b7 100644 --- a/janis/workflow/workflow.py +++ b/janis/workflow/workflow.py @@ -744,3 +744,7 @@ def dump_translation(self, translation: SupportedTranslation, to_console=True, t should_zip=should_zip, write_inputs_file=write_inputs_file, should_validate=should_validate) + + def generate_resources_file(self, translation: SupportedTranslation, hints: Dict[str, Any]): + return translations.build_resources_input(self, translation, hints) +