diff --git a/WDL/_grammar.py b/WDL/_grammar.py index a62ba8e9..2b61be0f 100644 --- a/WDL/_grammar.py +++ b/WDL/_grammar.py @@ -323,7 +323,7 @@ // WDL task commands: with {} and <<< >>> command and ${} and ~{} placeholder styles placeholder: expr -?command: command1 | command2 +?command: "command" (command1 | command2) // meta/parameter_meta sections (effectively JSON) meta_object: "{" [meta_kv (","? meta_kv)*] "}" @@ -452,11 +452,11 @@ COMMAND1_CHAR: /[^~$}]/ | /\$[^{$~]/ | /~[^{$~]/ COMMAND1_FRAGMENT: COMMAND1_CHAR+ -command1: "command" "{" (COMMAND1_FRAGMENT? /\$/* /\~/* _EITHER_DELIM placeholder "}")* COMMAND1_FRAGMENT? /\$/* /\~/* "}" -> command +command1: "{" (COMMAND1_FRAGMENT? /\$/* /\~/* _EITHER_DELIM placeholder "}")* COMMAND1_FRAGMENT? /\$/* /\~/* "}" -> command COMMAND2_CHAR: /[^~>]/ | /~[^{~]/ | />[^>]/ | />>[^>]/ COMMAND2_FRAGMENT: COMMAND2_CHAR+ -command2: "command" "<<<" (COMMAND2_FRAGMENT? /\~/? "~{" placeholder "}")* COMMAND2_FRAGMENT? /\~/* ">>>" -> command +command2: "<<<" (COMMAND2_FRAGMENT? /\~/? "~{" placeholder "}")* COMMAND2_FRAGMENT? /\~/* ">>>" -> command CNAME: /[a-zA-Z][a-zA-Z0-9_]*/ diff --git a/WDL/runtime/task.py b/WDL/runtime/task.py index 723c17ee..5187fc57 100644 --- a/WDL/runtime/task.py +++ b/WDL/runtime/task.py @@ -14,6 +14,7 @@ import re from typing import Tuple, List, Dict, Optional, Callable, Iterable, Set, Any, Union from contextlib import ExitStack +from docker.errors import BuildError as DockerBuildError from .. import Error, Type, Env, Value, StdLib, Tree, Expr, _util from .._util import ( @@ -425,7 +426,14 @@ def _eval_task_runtime( logger.debug(_("runtime values", **dict((key, str(v)) for key, v in runtime_values.items()))) ans = {} - if "docker" in runtime_values: + if "inlineDockerfile" in runtime_values: + # join Array[String] + dockerfile = runtime_values["inlineDockerfile"] + if not isinstance(dockerfile, Value.Array): + dockerfile = Value.Array(dockerfile.type, [dockerfile]) + dockerfile = "\n".join(elt.coerce(Type.String()).value for elt in dockerfile.value) + ans["inlineDockerfile"] = dockerfile + elif "docker" in runtime_values: ans["docker"] = runtime_values["docker"].coerce(Type.String()).value host_limits = container.__class__.detect_resource_limits(cfg, logger) @@ -535,7 +543,7 @@ def _try_task( ) ) interruptions += 1 - elif not isinstance(exn, Terminated) and retries < max_retries: + elif not isinstance(exn, (Terminated, DockerBuildError)) and retries < max_retries: logger.error( _( "failed task will be retried", diff --git a/WDL/runtime/task_container.py b/WDL/runtime/task_container.py index f410f127..a71324be 100644 --- a/WDL/runtime/task_container.py +++ b/WDL/runtime/task_container.py @@ -16,6 +16,7 @@ import stat from typing import Callable, Iterable, List, Set, Tuple, Type, Any, Dict, Optional from abc import ABC, abstractmethod +from io import BytesIO import docker from .. import Error from .._util import ( @@ -505,11 +506,14 @@ def _run(self, logger: logging.Logger, terminating: Callable[[], bool], command: # prepare docker configuration client = docker.from_env(version="auto", timeout=900) - image_tag = self.resolve_tag( - logger, client, self.runtime_values.get("docker", "ubuntu:20.04") - ) + if "inlineDockerfile" in self.runtime_values: + image_tag = self.build_inline_dockerfile(logger.getChild("inlineDockerfile"), client) + else: + image_tag = self.resolve_tag( + logger, client, self.runtime_values.get("docker", "ubuntu:20.04") + ) mounts = self.prepare_mounts(logger) - resources, user, groups = self.misc_config(logger, client) + resources, user, groups = self.misc_config(logger) # run container as a transient docker swarm service, letting docker handle the resource # scheduling (e.g. waiting until requested # of CPUs are available). @@ -717,7 +721,7 @@ def escape(s): return mounts def misc_config( - self, logger: logging.Logger, client: docker.DockerClient + self, logger: logging.Logger ) -> Tuple[Optional[Dict[str, str]], Optional[str], List[str]]: resources = {} cpu = self.runtime_values.get("cpu", 0) @@ -894,3 +898,73 @@ def unique_service_name(self, run_id: str) -> str: junk = base64.b32encode(junk).decode().lower() assert len(junk) == 24 return f"wdl-{run_id[:34]}-{junk}" # 4 + 34 + 1 + 24 = 63 + + _build_inline_dockerfile_lock: threading.Lock = threading.Lock() + + def build_inline_dockerfile( + self, + logger: logging.Logger, + client: docker.DockerClient, + tries: Optional[int] = None, + ) -> str: + # formulate image tag using digest of dockerfile text + dockerfile_utf8 = self.runtime_values["inlineDockerfile"].encode("utf8") + dockerfile_digest = hashlib.sha256(dockerfile_utf8).digest() + dockerfile_digest = base64.b32encode(dockerfile_digest[:15]).decode().lower() + tag_part1 = "miniwdl_auto_" + tag_part3 = ":" + dockerfile_digest + tag_part2 = self.run_id.lower() + if "-" in tag_part2: + tag_part2 = tag_part2.split("-")[1] + maxtag2 = 64 - len(tag_part1) - len(tag_part3) + assert maxtag2 > 0 + tag = tag_part1 + tag_part2 + tag_part3 + + # short-circuit if digest-tagged image already exists + try: + existing = client.images.get(tag) + logger.notice(_("docker build cached", tag=tag, id=existing.id)) # pyre-ignore + return tag + except docker.errors.ImageNotFound: + pass + + # prepare to tee docker build log to logger.verbose and a file + build_logfile = os.path.join(self.host_dir, "inlineDockerfile.log.txt") + + def write_log(stream: Iterable[Dict[str, str]]): + # tee the log messages to logger.verbose and build_logfile + with open(build_logfile, "w") as outfile: + for d in stream: + if "stream" in d: + msg = d["stream"].rstrip() + if msg: + logger.verbose(msg) + print(msg, file=outfile) + + # run docker build + try: + with SwarmContainer._build_inline_dockerfile_lock: # one build at a time + logger.info(_("starting docker build", tag=tag)) + logger.debug(_("Dockerfile", txt=self.runtime_values["inlineDockerfile"])) + image, build_log = client.images.build(fileobj=BytesIO(dockerfile_utf8), tag=tag) + except docker.errors.BuildError as exn: + # potentially retry, if task has runtime.maxRetries + if isinstance(tries, int): + tries -= 1 + else: + tries = self.runtime_values.get("maxRetries", 0) + if tries > 0: + logger.error( + _("failed docker build will be retried", tries_remaining=tries, msg=exn.msg) + ) + return self.build_inline_dockerfile(logger, client, tries=tries) + else: + write_log(exn.build_log) + logger.error(_("docker build failed", msg=exn.msg, log=build_logfile)) + raise exn + + write_log(build_log) + logger.notice( # pyre-ignore + _("docker build", tag=image.tags[0], id=image.id, log=build_logfile) + ) + return tag diff --git a/stubs/docker/__init__.py b/stubs/docker/__init__.py index eb7b4ced..5015cd74 100644 --- a/stubs/docker/__init__.py +++ b/stubs/docker/__init__.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, List, Iterable +from typing import Dict, Any, List, Iterable, Tuple class Container: @property @@ -22,6 +22,14 @@ class Containers: def run(self, image_tag: str, **kwargs) -> Container: ... +class Images: + def build(self, **kwargs) -> Tuple[Image, Iterable[Dict[str,str]]]: + ... + +class Image: + id: str + tags: List[str] + class Node: attrs: Dict[str,Any] @@ -93,6 +101,10 @@ def __init__(self, *args, **kwargs): ... class errors: + class BuildError(Exception): + msg : str + build_log : Iterable[Dict[str,str]] + class ImageNotFound(Exception): pass @@ -101,6 +113,10 @@ class DockerClient: def containers(self) -> Containers: ... + @property + def images(self) -> Images: + ... + def close(self) -> None: ... diff --git a/tests/test_7runner.py b/tests/test_7runner.py index a5303ef1..2ee1607d 100644 --- a/tests/test_7runner.py +++ b/tests/test_7runner.py @@ -5,6 +5,7 @@ import os import shutil import json +import time import docker from testfixtures import log_capture from .context import WDL @@ -768,6 +769,47 @@ def test_weird_filenames(self): assert os.stat(fn).st_uid == euid +class TestInlineDockerfile(RunnerTestCase): + @log_capture() + def test1(self, capture): + wdl = """ + version development + workflow w { + call t + } + task t { + input { + Array[String]+ apt_pkgs + Float timestamp + } + command <<< + set -euxo pipefail + apt list --installed | tr '/' $'\t' | sort > installed.txt + sort "~{write_lines(apt_pkgs)}" > expected.txt + join -j 1 -v 2 installed.txt expected.txt > missing.txt + if [ -s missing.txt ]; then + >&2 cat missing.txt + exit 1 + fi + >>> + runtime { + inlineDockerfile: [ + "FROM ubuntu:20.04", + "RUN apt-get -qq update && apt-get install -y ${sep(' ', apt_pkgs)}", + "RUN touch ${timestamp}" + ] + maxRetries: 1 + } + } + """ + t = time.time() # to ensure the image is built anew on every test run + self._run(wdl, {"t.apt_pkgs": ["samtools", "tabix"], "t.timestamp": t}) + self._run(wdl, {"t.apt_pkgs": ["samtools", "tabix"], "t.timestamp": t}) + logs = [str(record.msg) for record in capture.records if str(record.msg).startswith("docker build cached")] + self.assertEqual(len(logs), 1) + self._run(wdl, {"t.apt_pkgs": ["bogusfake123"], "t.timestamp": t}, expected_exception=docker.errors.BuildError) + + class TestAbbreviatedCallInput(RunnerTestCase): def test_docker(self):