diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index 0807cb2b3aa..4adadb9d0ed 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -68,6 +68,7 @@ LOCAL_STORE_LEASE_TIME_SECS, ExecutionOptions, LocalStoreOptions, + normalize_remote_address, ) from pants.util.contextutil import temporary_file_path from pants.util.logging import LogLevel @@ -192,8 +193,8 @@ def __init__( execution_headers=execution_options.remote_execution_headers, execution_overall_deadline_secs=execution_options.remote_execution_overall_deadline_secs, execution_rpc_concurrency=execution_options.remote_execution_rpc_concurrency, - store_address=execution_options.remote_store_address, - execution_address=execution_options.remote_execution_address, + store_address=normalize_remote_address(execution_options.remote_store_address), + execution_address=normalize_remote_address(execution_options.remote_execution_address), execution_process_cache_namespace=execution_options.process_execution_cache_namespace, instance_name=execution_options.remote_instance_name, root_ca_certs_path=execution_options.remote_ca_certs_path, diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 0d13ad95a35..60ef83faa5b 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -75,6 +75,213 @@ class DynamicUIRenderer(Enum): _G = TypeVar("_G", bound="_GlobMatchErrorBehaviorOptionBase") +_EXPERIMENTAL_SCHEME = "experimental:" + + +def normalize_remote_address(addr: str | None) -> str | None: + if addr is None: + return None + return addr.removeprefix(_EXPERIMENTAL_SCHEME) + + +@dataclass(frozen=True) +class _RemoteAddressScheme: + schemes: tuple[str, ...] + supports_execution: bool + experimental: bool + description: str + + def rendered_schemes(self) -> tuple[str, ...]: + """Convert the schemes into what the user needs to write. + + For example: `experimental:some-scheme://` if experimental, or `some-scheme://` if not. + + This includes the :// because that's clearer in docs etc, even if it's not 'technically' + part of the scheme. + """ + # `experimental:` is used as a prefix-scheme, riffing on `view-source:https://...` in some + # web browsers. This ensures the experimental status is communicated right where a user is + # opting-in to using it. + experimental_prefix = _EXPERIMENTAL_SCHEME if self.experimental else "" + return tuple(f"{experimental_prefix}{scheme}://" for scheme in self.schemes) + + @staticmethod + def _validate_address( + schemes: tuple[_RemoteAddressScheme, ...], + addr: str, + require_execution: bool, + context_for_diagnostics: str, + ) -> None: + addr_is_experimental = addr.startswith(_EXPERIMENTAL_SCHEME) + experimentalless_addr = addr.removeprefix(_EXPERIMENTAL_SCHEME) + + matching_scheme = next( + ( + (scheme_str, scheme) + for scheme in schemes + for scheme_str in scheme.schemes + if experimentalless_addr.startswith(f"{scheme_str}://") + ), + None, + ) + + if matching_scheme is None: + # This an address that doesn't seem to have a scheme we understand. + supported_schemes = ", ".join( + f"`{rendered}`" for scheme in schemes for rendered in scheme.rendered_schemes() + ) + raise OptionsError( + softwrap( + f""" + {context_for_diagnostics} has invalid value `{addr}`: it does not have a + supported scheme. + + The value must start with one of: {supported_schemes} + """ + ) + ) + + scheme_str, scheme = matching_scheme + + if scheme.experimental and not addr_is_experimental: + # This is a URL like `some-scheme://` for a scheme that IS experimental, so let's tell + # the user they need to specify it as `experimental:some-scheme://`. + raise OptionsError( + softwrap( + f""" + {context_for_diagnostics} has invalid value `{addr}`: the scheme `{scheme_str}` + is experimental and thus must include the `{_EXPERIMENTAL_SCHEME}` prefix to + opt-in to this less-stable Pants feature. + + Specify the value as `{_EXPERIMENTAL_SCHEME}{addr}`, with the + `{_EXPERIMENTAL_SCHEME}` prefix. + """ + ) + ) + + if not scheme.experimental and addr_is_experimental: + # This is a URL like `experimental:some-scheme://...` for a scheme that's NOT experimental, + # so let's tell the user to fix it up as `some-scheme://...`. It's low importance (we + # can unambigiously tell what they mean), so a warning is fine. + logger.warning( + softwrap( + f""" + {context_for_diagnostics} has value `{addr}` including `{_EXPERIMENTAL_SCHEME}` + prefix, but the scheme `{scheme_str}` is not experimental. + + Specify the value as `{experimentalless_addr}`, without the `{_EXPERIMENTAL_SCHEME}` + prefix. + """ + ) + ) + + if require_execution and not scheme.supports_execution: + # The address is being used for remote execution, but the scheme doesn't support it. + supported_execution_schemes = ", ".join( + f"`{rendered}`" + for scheme in schemes + if scheme.supports_execution + for rendered in scheme.rendered_schemes() + ) + raise OptionsError( + softwrap( + f""" + {context_for_diagnostics} has invalid value `{addr}`: the scheme `{scheme_str}` + does not support remote execution. + + Either remove the value (and disable remote execution), or use an address for a + server does support remote execution, starting with one of: + {supported_execution_schemes} """ + ) + ) + + # Validated, all good! + + @staticmethod + def validate_address(addr: str, require_execution: bool, context_for_diagnostics: str) -> None: + _RemoteAddressScheme._validate_address( + _REMOTE_ADDRESS_SCHEMES, + addr=addr, + require_execution=require_execution, + context_for_diagnostics=context_for_diagnostics, + ) + + @staticmethod + def address_help(context: str, extra: str, requires_execution: bool) -> Callable[[object], str]: + def render_list_item(scheme_strs: tuple[str, ...], description: str) -> str: + schemes = ", ".join(f"`{s}`" for s in scheme_strs) + return f"- {schemes}: {description}" + + def renderer(_: object) -> str: + supported_schemes = [ + (scheme.rendered_schemes(), scheme.description) + for scheme in _REMOTE_ADDRESS_SCHEMES + if not requires_execution or (requires_execution and scheme.supports_execution) + ] + if requires_execution: + # If this is the help for remote execution, still include the schemes that don't + # support it, but mark them as such. + supported_schemes.append( + ( + tuple( + scheme_str + for scheme in _REMOTE_ADDRESS_SCHEMES + if not scheme.supports_execution + for scheme_str in scheme.rendered_schemes() + ), + "Remote execution is not supported.", + ) + ) + + schemes = "\n\n".join( + render_list_item(scheme_strs, description) + for scheme_strs, description in supported_schemes + ) + extra_inline = f"\n\n{extra}" if extra else "" + return softwrap( + f""" + The URI of a server/entity used as a {context}.{extra_inline} + + Supported schemes: + + {schemes} + """ + ) + + return renderer + + +# This duplicates logic/semantics around choosing a byte store/action cache (and, even, technically, +# remote execution) provider: it'd be nice to have it in one place, but huonw thinks we do the +# validation before starting the engine, and, in any case, we can refactor our way there (the remote +# providers aren't configured in one place yet) +_REMOTE_ADDRESS_SCHEMES = ( + _RemoteAddressScheme( + schemes=("grpc", "grpcs"), + supports_execution=True, + experimental=False, + description=softwrap( + """ + Use a [Remote Execution API](https://github.com/bazelbuild/remote-apis) remote + caching/execution server. `grpcs` uses TLS while `grpc` does not. Format: + `grpc[s]://$host:$port`. + """ + ), + ), + _RemoteAddressScheme( + schemes=("file",), + supports_execution=False, + experimental=True, + description=softwrap( + """ + Use a local directory as a 'remote' store, for testing, debugging, or potentially an NFS + mount. Format: `file://$path`. For example: `file:///tmp/remote-cache-example/` will + store within the `/tmp/remote-cache-example/` directory, creating it if necessary. + """ + ), + ), +) + @dataclass(frozen=True) class _GlobMatchErrorBehaviorOptionBase: @@ -149,10 +356,10 @@ class AuthPluginResult: the merge strategy if your plugin sets conflicting headers. Usually, you will want to preserve the `initial_store_headers` and `initial_execution_headers` passed to the plugin. - If set, the returned `instance_name` will override `[GLOBAL].remote_instance_name`, `store_address` - will override `[GLOBAL].remote_store_address`, and `execution_address` will override - ``[GLOBAL].remote_execution_address``. The store address and execution address must be prefixed with - `grpc://` or `grpcs://`. + If set, the returned `instance_name` will override `[GLOBAL].remote_instance_name`, + `store_address` will override `[GLOBAL].remote_store_address`, and `execution_address` will + override ``[GLOBAL].remote_execution_address``. The addresses are interpreted and validated in + the same manner as the corresponding option. """ state: AuthPluginState @@ -165,23 +372,21 @@ class AuthPluginResult: plugin_name: str | None = None def __post_init__(self) -> None: - def assert_valid_address(addr: str | None, field_name: str) -> None: - valid_schemes = [f"{scheme}://" for scheme in ("grpc", "grpcs")] - if addr and not any(addr.startswith(scheme) for scheme in valid_schemes): - name = self.plugin_name or "" - raise ValueError( - softwrap( - f""" - Invalid `{field_name}` in `AuthPluginResult` returned from - `[GLOBAL].remote_auth_plugin` {name}. - - Must start with `grpc://` or `grpcs://`, but was {addr}. - """ - ) - ) + name = self.plugin_name or "" + plugin_context = f"in `AuthPluginResult` returned from `[GLOBAL].remote_auth_plugin` {name}" - assert_valid_address(self.store_address, "store_address") - assert_valid_address(self.execution_address, "execution_address") + if self.store_address: + _RemoteAddressScheme.validate_address( + self.store_address, + require_execution=False, + context_for_diagnostics=f"`store_address` {plugin_context}", + ) + if self.execution_address: + _RemoteAddressScheme.validate_address( + self.execution_address, + require_execution=True, + context_for_diagnostics=f"`execution_address` {plugin_context}", + ) @property def is_available(self) -> bool: @@ -468,6 +673,7 @@ def _normalize_address(cls, address: str | None) -> str | None: # NB: Tonic expects the schemes `http` and `https`, even though they are gRPC requests. # We validate that users set `grpc` and `grpcs` in the options system / plugin for clarity, # but then normalize to `http`/`https`. + # TODO: move this logic into the actual remote providers return re.sub(r"^grpc", "http", address) if address else None @@ -1367,6 +1573,7 @@ class BootstrapOptions: """ ), ) + # TODO: update all these remote_... option helps for the new support for non-REAPI schemes remote_instance_name = StrOption( default=None, advanced=True, @@ -1413,13 +1620,10 @@ class BootstrapOptions: remote_store_address = StrOption( advanced=True, default=cast(str, DEFAULT_EXECUTION_OPTIONS.remote_store_address), - help=softwrap( - """ - The URI of a server used for the remote file store. - - Format: `scheme://host:port`. The supported schemes are `grpc` and `grpcs`, i.e. gRPC - with TLS enabled. If `grpc` is used, TLS will be disabled. - """ + help=_RemoteAddressScheme.address_help( + "remote file store", + extra="", + requires_execution=False, ), ) remote_store_headers = DictOption( @@ -1488,15 +1692,10 @@ class BootstrapOptions: remote_execution_address = StrOption( advanced=True, default=cast(str, DEFAULT_EXECUTION_OPTIONS.remote_execution_address), - help=softwrap( - """ - The URI of a server used as a remote execution scheduler. - - Format: `scheme://host:port`. The supported schemes are `grpc` and `grpcs`, i.e. gRPC - with TLS enabled. If `grpc` is used, TLS will be disabled. - - You must also set `[GLOBAL].remote_store_address`, which will often be the same value. - """ + help=_RemoteAddressScheme.address_help( + "remote execution scheduler", + extra="You must also set `[GLOBAL].remote_store_address`, which will often be the same value.", + requires_execution=True, ), ) remote_execution_headers = DictOption( @@ -1782,21 +1981,18 @@ def validate_instance(cls, opts): ) ) - def validate_remote_address(opt_name: str) -> None: - valid_schemes = [f"{scheme}://" for scheme in ("grpc", "grpcs")] - address = getattr(opts, opt_name) - if address and not any(address.startswith(scheme) for scheme in valid_schemes): - raise OptionsError( - softwrap( - f""" - The `{opt_name}` option must begin with one of {valid_schemes}, but - was {address}. - """ - ) - ) - - validate_remote_address("remote_execution_address") - validate_remote_address("remote_store_address") + if opts.remote_execution_address: + _RemoteAddressScheme.validate_address( + opts.remote_execution_address, + require_execution=True, + context_for_diagnostics="The `[GLOBAL].remote_execution_address` option", + ) + if opts.remote_store_address: + _RemoteAddressScheme.validate_address( + opts.remote_store_address, + require_execution=False, + context_for_diagnostics="The `[GLOBAL].remote_store_address` option", + ) # Ensure that remote headers are ASCII. def validate_remote_headers(opt_name: str) -> None: diff --git a/src/python/pants/option/global_options_test.py b/src/python/pants/option/global_options_test.py index 044d8b9a37a..e8d05fd0b68 100644 --- a/src/python/pants/option/global_options_test.py +++ b/src/python/pants/option/global_options_test.py @@ -4,6 +4,7 @@ from __future__ import annotations import sys +from collections import Counter from pathlib import Path from textwrap import dedent @@ -14,7 +15,13 @@ from pants.engine.internals.scheduler import ExecutionError from pants.engine.unions import UnionMembership from pants.init.options_initializer import OptionsInitializer -from pants.option.global_options import DynamicRemoteOptions, GlobalOptions +from pants.option.errors import OptionsError +from pants.option.global_options import ( + _REMOTE_ADDRESS_SCHEMES, + DynamicRemoteOptions, + GlobalOptions, + _RemoteAddressScheme, +) from pants.option.options_bootstrapper import OptionsBootstrapper from pants.testutil import rule_runner from pants.testutil.option_util import create_options_bootstrapper @@ -154,3 +161,116 @@ def test_invalidation_globs() -> None: ) for glob in globs: assert suffix not in glob + + +def _scheme( + schemes: tuple[str, ...] = ("foo",), + supports_execution: bool = False, + experimental: bool = False, +) -> _RemoteAddressScheme: + return _RemoteAddressScheme( + schemes=schemes, + supports_execution=supports_execution, + experimental=experimental, + description="DESCRIPTION", + ) + + +@pytest.mark.parametrize( + "address", + [ + "experimental:foo://", + "experimental:foo://host:123", + "experimental:foos://path/here", + "bar://", + "bar://user@host:123/path?query#fragment", + ], +) +@pytest.mark.parametrize("execution", [False, True]) +def test_remote_schemes_validate_address_should_pass_for_various_good_addresses_without_execution( + address: str, execution: bool +) -> None: + _RemoteAddressScheme._validate_address( + ( + _scheme(schemes=("foo", "foos"), experimental=True, supports_execution=execution), + # (smoke test require_execution=False supports_execution=True) + _scheme(schemes=("bar",), supports_execution=True), + ), + address, + require_execution=execution, + context_for_diagnostics="CONTEXT", + ) + + +@pytest.mark.parametrize( + "address", + ["", "foo", "foo:", "foo:/", "FOO://", "foo:bar://", "fooextra://", "baz://", "bars://"], +) +def test_remote_schemes_validate_address_should_error_when_bad_address(address: str) -> None: + with pytest.raises( + OptionsError, + match=f"(?s)CONTEXT has invalid value `{address}`: it does not have a supported scheme.*start with one of: `foo://`, `foos://`, `bar://`", + ): + _RemoteAddressScheme._validate_address( + ( + _scheme(schemes=("foo", "foos")), + _scheme(schemes=("bar",)), + ), + address, + require_execution=False, + context_for_diagnostics="CONTEXT", + ) + + +def test_remote_schemes_validate_address_should_error_when_missing_experimental() -> None: + with pytest.raises( + OptionsError, + match="(?s)CONTEXT has invalid value `foo://bar`: the scheme `foo` is experimental.*Specify the value as `experimental:foo://bar`", + ): + _RemoteAddressScheme._validate_address( + (_scheme(experimental=True),), + "foo://bar", + require_execution=False, + context_for_diagnostics="CONTEXT", + ) + + +def test_remote_schemes_validate_address_should_warn_when_unnecessary_experimental(caplog) -> None: + with caplog.at_level("WARNING"): + _RemoteAddressScheme._validate_address( + (_scheme(experimental=False),), + "experimental:foo://bar", + require_execution=False, + context_for_diagnostics="CONTEXT", + ) + + assert "CONTEXT has value `experimental:foo://bar`" in caplog.text + assert "the scheme `foo` is not experimental" in caplog.text + assert "Specify the value as `foo://bar`" in caplog.text + + +def test_remote_schemes_validate_address_should_error_when_execution_required_but_not_supported() -> ( + None +): + with pytest.raises( + OptionsError, + match="(?s)CONTEXT has invalid value `foo://bar`: the scheme `foo` does not support remote execution.*starting with one of: `bar://`", + ): + _RemoteAddressScheme._validate_address( + ( + _scheme(supports_execution=False), + _scheme(schemes=("bar",), supports_execution=True), + ), + "foo://bar", + require_execution=True, + context_for_diagnostics="CONTEXT", + ) + + +def test_remote_schemes_should_have_unique_schemes(): + # the raw schemes supported for remoting (not with experimental: prefix, etc.) should be unique, + # so there's no accidental ambiguity about, for instance, `http://` configured more than once + counts = Counter( + scheme_str for scheme in _REMOTE_ADDRESS_SCHEMES for scheme_str in scheme.schemes + ) + assert [scheme for scheme, count in counts.items() if count > 1] == [] diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 7e3d5c9c785..10bcb08d913 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -50,6 +50,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -92,6 +98,19 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "async-compat" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b48b4ff0c2026db683dea961cd8ea874737f56cffca86fa84415eaddc51c00d" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-lock" version = "2.5.0" @@ -244,6 +263,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +dependencies = [ + "fastrand", + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -394,9 +425,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.2.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "cache" @@ -467,13 +498,13 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.24" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b" +checksum = "ec837a71355b28f6556dbd569b37b3f363091c0bd4b2e735674521b4c5fd9bc5" dependencies = [ + "android-tzdata", "iana-time-zone", "js-sys", - "num-integer", "num-traits", "serde", "time 0.1.45", @@ -1083,9 +1114,9 @@ checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" [[package]] name = "fastrand" -version = "1.6.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "779d043b6a0b90cc4c0ed7ee380a6504394cee7efd7db050e3774eee387324b2" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" dependencies = [ "instant", ] @@ -1108,6 +1139,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flagset" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499" + [[package]] name = "flate2" version = "1.0.19" @@ -1251,9 +1288,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.25" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" @@ -1955,6 +1992,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" +[[package]] +name = "md-5" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +dependencies = [ + "digest", +] + [[package]] name = "memchr" version = "2.4.1" @@ -2128,16 +2174,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "num-integer" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" -dependencies = [ - "autocfg", - "num-traits", -] - [[package]] name = "num-traits" version = "0.2.15" @@ -2211,6 +2247,37 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "opendal" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad95e460e5976ab1b74f398ab856c59f8417b3dd32202329e3491dcbe3a6b84" +dependencies = [ + "anyhow", + "async-compat", + "async-trait", + "backon", + "base64 0.21.0", + "bytes", + "chrono", + "flagset", + "futures", + "http", + "hyper", + "log", + "md-5", + "once_cell", + "parking_lot 0.12.1", + "percent-encoding", + "pin-project", + "quick-xml", + "reqwest", + "serde", + "serde_json", + "tokio", + "uuid", +] + [[package]] name = "openssl-probe" version = "0.1.2" @@ -2742,6 +2809,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "quick-xml" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81b9228215d82c7b61490fec1de287136b5de6f5700f6e58ea9ad61a7964ca51" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.32" @@ -2898,6 +2975,7 @@ dependencies = [ "maplit", "mock", "once_cell", + "opendal", "parking_lot 0.12.1", "process_execution", "prost", @@ -3368,6 +3446,7 @@ dependencies = [ "madvise", "mock", "num_cpus", + "opendal", "parking_lot 0.12.1", "prost", "prost-types", @@ -4020,6 +4099,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" dependencies = [ "getrandom 0.2.8", + "serde", ] [[package]] diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index 223117594ae..8554952c9f6 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -254,6 +254,7 @@ notify = { git = "https://github.com/pantsbuild/notify", rev = "276af0f3c5f300bf num_cpus = "1" num_enum = "0.5" once_cell = "1.18" +opendal = { version = "0.39.0", default-features = false } os_pipe = "1.1" parking_lot = "0.12" peg = "0.8" diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index a0ae34a9358..4396bf8809c 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -41,6 +41,10 @@ tower-service = { workspace = true } tryfuture = { path = "../../tryfuture" } uuid = { workspace = true, features = ["v4"] } workunit_store = { path = "../../workunit_store" } +opendal = { workspace = true, default-features = false, features = [ + "services-memory", + "services-fs", +] } [dev-dependencies] criterion = { workspace = true } diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 51a3f5cadb2..54d8a9b0237 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -83,7 +83,7 @@ mod local; #[cfg(test)] pub mod local_tests; -mod remote; +pub mod remote; pub use remote::RemoteOptions; #[cfg(test)] mod remote_tests; diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index ce0dc5882e3..4e9c4be7da9 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -21,6 +21,10 @@ mod reapi; #[cfg(test)] mod reapi_tests; +pub mod base_opendal; +#[cfg(test)] +mod base_opendal_tests; + #[async_trait] pub trait ByteStoreProvider: Sync + Send + 'static { /// Store the bytes readable from `file` into the remote store @@ -48,6 +52,8 @@ pub trait ByteStoreProvider: Sync + Send + 'static { // TODO: Consider providing `impl Default`, similar to `super::LocalOptions`. #[derive(Clone)] pub struct RemoteOptions { + // TODO: this is currently framed for the REAPI provider, with some options used by others, would + // be good to generalise pub cas_address: String, pub instance_name: Option, pub headers: BTreeMap, @@ -60,6 +66,28 @@ pub struct RemoteOptions { pub batch_api_size_limit: usize, } +// TODO: this is probably better positioned somewhere else +pub const REAPI_ADDRESS_SCHEMAS: [&str; 3] = ["grpc://", "grpcs://", "http://"]; + +async fn choose_provider(options: RemoteOptions) -> Result, String> { + let address = options.cas_address.clone(); + if REAPI_ADDRESS_SCHEMAS.iter().any(|s| address.starts_with(s)) { + Ok(Arc::new(reapi::Provider::new(options).await?)) + } else if let Some(path) = address.strip_prefix("file://") { + // It's a bit weird to support local "file://" for a 'remote' store... but this is handy for + // testing. + Ok(Arc::new(base_opendal::Provider::fs( + path, + "byte-store".to_owned(), + options, + )?)) + } else { + Err(format!( + "Cannot initialise remote byte store provider with address {address}, as the scheme is not supported", + )) + } +} + #[derive(Clone)] pub struct ByteStore { instance_name: Option, @@ -108,7 +136,7 @@ impl ByteStore { pub async fn from_options(options: RemoteOptions) -> Result { let instance_name = options.instance_name.clone(); - let provider = Arc::new(reapi::Provider::new(options).await?); + let provider = choose_provider(options).await?; Ok(ByteStore::new(instance_name, provider)) } diff --git a/src/rust/engine/fs/store/src/remote/base_opendal.rs b/src/rust/engine/fs/store/src/remote/base_opendal.rs new file mode 100755 index 00000000000..0582466f02c --- /dev/null +++ b/src/rust/engine/fs/store/src/remote/base_opendal.rs @@ -0,0 +1,231 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). +#![allow(dead_code)] + +use std::collections::HashSet; +use std::time::Instant; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::future; +use hashing::{async_verified_copy, Digest, Fingerprint, EMPTY_DIGEST}; +use opendal::layers::{ConcurrentLimitLayer, RetryLayer, TimeoutLayer}; +use opendal::{Builder, Operator}; +use tokio::fs::File; +use workunit_store::ObservationMetric; + +use super::{ByteStoreProvider, LoadDestination, RemoteOptions}; + +#[derive(Debug, Clone, Copy)] +pub enum LoadMode { + Validate, + NoValidate, +} + +pub struct Provider { + /// This is public for easier testing of the action cache provider + // TODO: move all the providers into a single crate so that the pub isn't necessary + pub operator: Operator, + base_path: String, +} + +impl Provider { + pub fn new( + builder: B, + scope: String, + options: RemoteOptions, + ) -> Result { + let operator = Operator::new(builder) + .map_err(|e| { + format!( + "failed to initialise {} remote store provider: {e}", + B::SCHEME + ) + })? + .layer(ConcurrentLimitLayer::new(options.rpc_concurrency_limit)) + .layer( + // TODO: record Metric::RemoteStoreRequestTimeouts for timeouts + TimeoutLayer::new() + .with_timeout(options.rpc_timeout) + // TimeoutLayer requires specifying a non-zero minimum transfer speed too. + .with_speed(1), + ) + // TODO: RetryLayer doesn't seem to retry stores, but we should + .layer(RetryLayer::new().with_max_times(options.rpc_retries + 1)) + .finish(); + + let base_path = match options.instance_name { + Some(instance_name) => format!("{instance_name}/{scope}"), + None => scope, + }; + + Ok(Provider { + operator, + base_path, + }) + } + + pub fn fs(path: &str, scope: String, options: RemoteOptions) -> Result { + let mut builder = opendal::services::Fs::default(); + builder.root(path).enable_path_check(); + Provider::new(builder, scope, options) + } + + fn path(&self, fingerprint: Fingerprint) -> String { + // We include the first two bytes as parent directories to make listings less wide. + format!( + "{}/{:02x}/{:02x}/{}", + self.base_path, fingerprint.0[0], fingerprint.0[1], fingerprint + ) + } + + async fn load_raw( + &self, + digest: Digest, + destination: &mut dyn LoadDestination, + mode: LoadMode, + ) -> Result { + // Some providers (e.g. GitHub Actions Cache) don't like storing an empty file, so we just magic + // it up here, and ignore it when storing. + if digest == EMPTY_DIGEST { + // `destination` starts off empty, so is already in the right state. + return Ok(true); + } + + let path = self.path(digest.hash); + let start = Instant::now(); + let mut reader = match self.operator.reader(&path).await { + Ok(reader) => reader, + Err(e) if e.kind() == opendal::ErrorKind::NotFound => return Ok(false), + Err(e) => return Err(format!("failed to read {}: {}", path, e)), + }; + + if let Some(workunit_store_handle) = workunit_store::get_workunit_store_handle() { + // TODO: this pretends that the time-to-first-byte can be approximated by "time to create + // reader", which is often not really true. + let timing: Result = Instant::now().duration_since(start).as_micros().try_into(); + if let Ok(obs) = timing { + workunit_store_handle + .store + .record_observation(ObservationMetric::RemoteStoreTimeToFirstByteMicros, obs); + } + } + + match mode { + LoadMode::Validate => { + let correct_digest = async_verified_copy(digest, false, &mut reader, destination) + .await + .map_err(|e| format!("failed to read {}: {}", path, e))?; + + if !correct_digest { + // TODO: include the actual digest here + return Err(format!("Remote CAS gave wrong digest: expected {digest:?}")); + } + } + LoadMode::NoValidate => { + tokio::io::copy(&mut reader, destination) + .await + .map_err(|e| format!("failed to read {}: {}", path, e))?; + } + } + Ok(true) + } + + /// Load `digest` trusting the contents from the remote, without validating that the digest + /// matches the downloaded bytes. + /// + /// This can/should be used for cases where the digest isn't the digest of the contents + /// (e.g. action cache). + pub async fn load_without_validation( + &self, + digest: Digest, + destination: &mut dyn LoadDestination, + ) -> Result { + self + .load_raw(digest, destination, LoadMode::NoValidate) + .await + } +} + +#[async_trait] +impl ByteStoreProvider for Provider { + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String> { + // Some providers (e.g. GitHub Actions Cache) don't like storing an empty file, so we don't + // store it here, and magic it up when loading. + if digest == EMPTY_DIGEST { + return Ok(()); + } + + let path = self.path(digest.hash); + + self + .operator + .write(&path, bytes) + .await + .map_err(|e| format!("failed to write bytes to {path}: {e}")) + } + + async fn store_file(&self, digest: Digest, mut file: File) -> Result<(), String> { + // Some providers (e.g. GitHub Actions Cache) don't like storing an empty file, so we don't + // store it here, and magic it up when loading. + if digest == EMPTY_DIGEST { + return Ok(()); + } + + let path = self.path(digest.hash); + + let mut writer = self + .operator + .writer_with(&path) + .content_length(digest.size_bytes as u64) + .await + .map_err(|e| format!("failed to start write to {path}: {e}"))?; + + // TODO: it would be good to pass through options.chunk_size_bytes here + match tokio::io::copy(&mut file, &mut writer).await { + Ok(_) => writer.close().await.map_err(|e| { + format!("Uploading file with digest {digest:?} to {path}: failed to commit: {e}") + }), + Err(e) => { + let abort_err = writer.abort().await.err().map_or("".to_owned(), |e| { + format!(" (additional error while aborting = {e})") + }); + Err(format!( + "Uploading file with digest {digest:?} to {path}: failed to copy: {e}{abort_err}" + )) + } + } + } + + async fn load( + &self, + digest: Digest, + destination: &mut dyn LoadDestination, + ) -> Result { + self.load_raw(digest, destination, LoadMode::Validate).await + } + + async fn list_missing_digests( + &self, + digests: &mut (dyn Iterator + Send), + ) -> Result, String> { + // NB. this is doing individual requests and thus may be expensive. + let existences = future::try_join_all(digests.map(|digest| async move { + // Some providers (e.g. GitHub Actions Cache) don't like storing an empty file, so we don't + // store it, but can still magic it up when loading, i.e. it is never missing. + if digest == EMPTY_DIGEST { + return Ok(None); + } + + let path = self.path(digest.hash); + match self.operator.is_exist(&path).await { + Ok(true) => Ok(None), + Ok(false) => Ok(Some(digest)), + Err(e) => Err(format!("failed to query {}: {}", path, e)), + } + })) + .await?; + + Ok(existences.into_iter().flatten().collect()) + } +} diff --git a/src/rust/engine/fs/store/src/remote/base_opendal_tests.rs b/src/rust/engine/fs/store/src/remote/base_opendal_tests.rs new file mode 100644 index 00000000000..d24e810fd4e --- /dev/null +++ b/src/rust/engine/fs/store/src/remote/base_opendal_tests.rs @@ -0,0 +1,296 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). +use std::collections::{BTreeMap, HashSet}; +use std::time::Duration; + +use bytes::Bytes; +use grpc_util::tls; +use hashing::{Digest, Fingerprint}; +use opendal::services::Memory; +use testutil::data::TestData; + +use crate::tests::{big_file_bytes, big_file_fingerprint, mk_tempfile}; + +use super::base_opendal::Provider; +use super::{ByteStoreProvider, RemoteOptions}; + +const BASE: &str = "opendal-testing-base"; + +fn test_path_fingerprint(fingerprint: Fingerprint) -> String { + let fingerprint = fingerprint.to_string(); + format!( + "{}/{}/{}/{}", + BASE, + &fingerprint[0..2], + &fingerprint[2..4], + fingerprint + ) +} +fn test_path(data: &TestData) -> String { + test_path_fingerprint(data.fingerprint()) +} +fn remote_options() -> RemoteOptions { + RemoteOptions { + cas_address: "".to_owned(), + instance_name: None, + tls_config: tls::Config::default(), + headers: BTreeMap::new(), + chunk_size_bytes: 10000, + rpc_timeout: Duration::from_secs(5), + rpc_retries: 1, + rpc_concurrency_limit: 256, + capabilities_cell_opt: None, + batch_api_size_limit: 10000, + } +} + +fn new_provider() -> Provider { + Provider::new(Memory::default(), BASE.to_owned(), remote_options()).unwrap() +} + +async fn write_test_data(provider: &Provider, data: &TestData) { + provider + .operator + .write(&test_path(&data), data.bytes()) + .await + .unwrap(); +} + +#[tokio::test] +async fn load_existing() { + let testdata = TestData::roland(); + let provider = new_provider(); + write_test_data(&provider, &testdata).await; + + let mut destination = Vec::new(); + let found = provider + .load(testdata.digest(), &mut destination) + .await + .unwrap(); + assert!(found); + assert_eq!(destination, testdata.bytes()) +} + +#[tokio::test] +async fn load_missing() { + let testdata = TestData::roland(); + let provider = new_provider(); + + let mut destination = Vec::new(); + let found = provider + .load(testdata.digest(), &mut destination) + .await + .unwrap(); + assert!(!found); + assert!(destination.is_empty()) +} + +#[tokio::test] +async fn load_empty() { + // The empty file can be loaded even when it's not "physically" in the remote provider. + let testdata = TestData::empty(); + let provider = new_provider(); + + let mut destination = Vec::new(); + let found = provider + .load(testdata.digest(), &mut destination) + .await + .unwrap(); + assert!(found); + assert_eq!(destination, testdata.bytes()); +} + +#[tokio::test] +async fn load_existing_wrong_digest_eror() { + let testdata = TestData::roland(); + let provider = new_provider(); + provider + .operator + .write(&test_path(&testdata), Bytes::from_static(b"not roland")) + .await + .unwrap(); + + let mut destination = Vec::new(); + let error = provider + .load(testdata.digest(), &mut destination) + .await + .expect_err("Want error"); + + assert!( + error.contains("Remote CAS gave wrong digest"), + "Bad error message, got: {error}" + ) +} + +#[tokio::test] +async fn load_without_validation_existing() { + let testdata = TestData::roland(); + let bytes = Bytes::from_static(b"not roland"); + let provider = new_provider(); + provider + .operator + .write(&test_path(&testdata), bytes.clone()) + .await + .unwrap(); + + let mut destination = Vec::new(); + let found = provider + .load_without_validation(testdata.digest(), &mut destination) + .await + .unwrap(); + assert!(found); + assert_eq!(destination, bytes) +} + +#[tokio::test] +async fn load_without_validation_missing() { + let testdata = TestData::roland(); + let provider = new_provider(); + + let mut destination = Vec::new(); + let found = provider + .load_without_validation(testdata.digest(), &mut destination) + .await + .unwrap(); + assert!(!found); + assert!(destination.is_empty()) +} + +async fn assert_store(provider: &Provider, fingerprint: Fingerprint, bytes: Bytes) { + let result = provider + .operator + .read(&test_path_fingerprint(fingerprint)) + .await + .unwrap(); + assert_eq!(result, bytes); +} + +#[tokio::test] +async fn store_bytes_data() { + let testdata = TestData::roland(); + let provider = new_provider(); + + provider + .store_bytes(testdata.digest(), testdata.bytes()) + .await + .unwrap(); + + assert_store(&provider, testdata.fingerprint(), testdata.bytes()).await; +} + +#[tokio::test] +async fn store_bytes_empty() { + let testdata = TestData::empty(); + let provider = new_provider(); + + provider + .store_bytes(testdata.digest(), testdata.bytes()) + .await + .unwrap(); + + // We don't actually store an empty file. + assert!(!provider + .operator + .is_exist(&test_path(&testdata)) + .await + .unwrap()); +} + +#[tokio::test] +async fn store_file_one_chunk() { + let testdata = TestData::roland(); + let provider = new_provider(); + + provider + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) + .await + .unwrap(); + assert_store(&provider, testdata.fingerprint(), testdata.bytes()).await; +} + +#[tokio::test] +async fn store_file_multiple_chunks() { + let provider = new_provider(); + + let all_the_henries = big_file_bytes(); + // Our current chunk size is the tokio::io::copy default (8KiB at + // the time of writing). + assert!(all_the_henries.len() > 8 * 1024); + let fingerprint = big_file_fingerprint(); + let digest = Digest::new(fingerprint, all_the_henries.len()); + + provider + .store_file(digest, mk_tempfile(Some(&all_the_henries)).await) + .await + .unwrap(); + assert_store(&provider, fingerprint, all_the_henries).await; +} + +#[tokio::test] +async fn store_file_empty_file() { + let testdata = TestData::empty(); + let provider = new_provider(); + + provider + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) + .await + .unwrap(); + + // We don't actually store an empty file. + assert!(!provider + .operator + .is_exist(&test_path(&testdata)) + .await + .unwrap()); +} + +#[tokio::test] +async fn list_missing_digests_none_missing() { + let testdata = TestData::roland(); + let provider = new_provider(); + write_test_data(&provider, &testdata).await; + + assert_eq!( + provider + .list_missing_digests(&mut vec![testdata.digest()].into_iter()) + .await, + Ok(HashSet::new()) + ) +} + +#[tokio::test] +async fn list_missing_digests_some_missing() { + let testdata = TestData::roland(); + let digest = testdata.digest(); + + let provider = new_provider(); + + let mut digest_set = HashSet::new(); + digest_set.insert(digest); + + assert_eq!( + provider + .list_missing_digests(&mut vec![digest].into_iter()) + .await, + Ok(digest_set) + ) +} + +#[tokio::test] +async fn list_missing_digests_empty_never_missing() { + let testdata = TestData::empty(); + let provider = new_provider(); + + assert_eq!( + provider + .list_missing_digests(&mut vec![testdata.digest()].into_iter()) + .await, + Ok(HashSet::new()) + ) +} diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 34d6d07371e..73532f8ae81 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use grpc_util::tls; use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; +use tempfile::TempDir; use testutil::data::TestData; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; @@ -19,9 +20,8 @@ use crate::MEGABYTES; #[tokio::test] async fn smoke_test_from_options_reapi_provider() { - // run through the various methods using a 'real' provider (REAPI - // talking to a stubbed CAS), as a double-check that the test - // provider is plausible + // This runs through the various methods using the 'real' REAPI provider (talking to a stubbed + // CAS), as a double-check that the test provider is plausible and test provider selection works. let roland = TestData::roland(); let empty = TestData::empty(); @@ -45,7 +45,7 @@ async fn smoke_test_from_options_reapi_provider() { let mut missing_set = HashSet::new(); missing_set.insert(empty.digest()); - // only roland is in the CAS: + // Only roland is in the CAS: assert_eq!( store.load_bytes(roland.digest()).await, Ok(Some(roland.bytes())) @@ -58,7 +58,7 @@ async fn smoke_test_from_options_reapi_provider() { Ok(missing_set) ); - // insert empty: + // Insert empty: assert_eq!(store.store_bytes(empty.bytes()).await, Ok(())); assert_eq!( store.load_bytes(empty.digest()).await, @@ -66,6 +66,57 @@ async fn smoke_test_from_options_reapi_provider() { ); } +#[tokio::test] +async fn smoke_test_from_options_file_provider() { + // This runs through the various methods using the file:// provider, as a double-check that the + // test provider is plausible and test provider selection works. + let roland = TestData::roland(); + let catnip = TestData::catnip(); + + let _ = WorkunitStore::setup_for_tests(); + let dir = TempDir::new().unwrap(); + + let store = ByteStore::from_options(RemoteOptions { + cas_address: format!("file://{}", dir.path().display()), + instance_name: None, + tls_config: tls::Config::default(), + headers: BTreeMap::new(), + chunk_size_bytes: 10 * MEGABYTES, + rpc_timeout: Duration::from_secs(5), + rpc_retries: 1, + rpc_concurrency_limit: 256, + capabilities_cell_opt: None, + batch_api_size_limit: crate::tests::STORE_BATCH_API_SIZE_LIMIT, + }) + .await + .unwrap(); + + let mut missing_set = HashSet::new(); + missing_set.insert(catnip.digest()); + + // Insert roland: + assert_eq!(store.store_bytes(roland.bytes()).await, Ok(())); + assert_eq!( + store.load_bytes(roland.digest()).await, + Ok(Some(roland.bytes())) + ); + // Only roland is stored: + assert_eq!(store.load_bytes(catnip.digest()).await, Ok(None)); + assert_eq!( + store + .list_missing_digests(vec![roland.digest(), catnip.digest()]) + .await, + Ok(missing_set) + ); + + // Insert catnip: + assert_eq!(store.store_bytes(catnip.bytes()).await, Ok(())); + assert_eq!( + store.load_bytes(catnip.digest()).await, + Ok(Some(catnip.bytes())) + ); +} + #[tokio::test] async fn load_bytes_existing() { let _ = WorkunitStore::setup_for_tests(); diff --git a/src/rust/engine/process_execution/remote/Cargo.toml b/src/rust/engine/process_execution/remote/Cargo.toml index 5abeb0e738d..ae1a87e95e7 100644 --- a/src/rust/engine/process_execution/remote/Cargo.toml +++ b/src/rust/engine/process_execution/remote/Cargo.toml @@ -36,6 +36,7 @@ process_execution = { path = ".." } strum = { workspace = true } strum_macros = { workspace = true } parking_lot = { workspace = true } +opendal = { workspace = true } [dev-dependencies] env_logger = { workspace = true } diff --git a/src/rust/engine/process_execution/remote/src/remote_cache.rs b/src/rust/engine/process_execution/remote/src/remote_cache.rs index 5b29e5b6fe7..a5c0f9429fe 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache.rs @@ -9,12 +9,14 @@ use async_trait::async_trait; use fs::{directory, DigestTrie, RelativePath, SymlinkBehavior}; use futures::future::{BoxFuture, TryFutureExt}; use futures::FutureExt; +use grpc_util::tls; use hashing::Digest; use parking_lot::Mutex; use protos::gen::build::bazel::remote::execution::v2 as remexec; use protos::require_digest; use remexec::{ActionResult, Command, Tree}; -use store::{Store, StoreError}; +use store::remote::REAPI_ADDRESS_SCHEMAS; +use store::{RemoteOptions, Store, StoreError}; use workunit_store::{ in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata, }; @@ -26,6 +28,9 @@ use process_execution::{ }; use process_execution::{make_execute_request, EntireExecuteRequest}; +mod base_opendal; +#[cfg(test)] +mod base_opendal_tests; mod reapi; #[cfg(test)] mod reapi_tests; @@ -57,6 +62,10 @@ pub trait ActionCacheProvider: Sync + Send + 'static { #[derive(Clone)] pub struct RemoteCacheProviderOptions { + // TODO: this is currently framed for the REAPI provider, with some options used by others, would + // be good to generalise + // TODO: this is structurally very similar to `RemoteOptions`: maybe they should be the same? (see + // comment in `choose_provider` too) pub instance_name: Option, pub action_cache_address: String, pub root_ca_certs: Option>, @@ -65,6 +74,44 @@ pub struct RemoteCacheProviderOptions { pub rpc_timeout: Duration, } +async fn choose_provider( + options: RemoteCacheProviderOptions, +) -> Result, String> { + let address = options.action_cache_address.clone(); + + // TODO: we shouldn't need to gin up a whole copy of this struct; it'd be better to have the two + // set of remoting options managed together. + let remote_options = RemoteOptions { + cas_address: address.clone(), + instance_name: options.instance_name.clone(), + headers: options.headers.clone(), + tls_config: tls::Config::new_without_mtls(options.root_ca_certs.clone()), + rpc_timeout: options.rpc_timeout, + rpc_concurrency_limit: options.concurrency_limit, + // TODO: these should either be passed through or not synthesized here + chunk_size_bytes: 0, + rpc_retries: 0, + capabilities_cell_opt: None, + batch_api_size_limit: 0, + }; + + if REAPI_ADDRESS_SCHEMAS.iter().any(|s| address.starts_with(s)) { + Ok(Arc::new(reapi::Provider::new(options).await?)) + } else if let Some(path) = address.strip_prefix("file://") { + // It's a bit weird to support local "file://" for a 'remote' store... but this is handy for + // testing. + Ok(Arc::new(base_opendal::Provider::fs( + path, + "action-cache".to_owned(), + remote_options, + )?)) + } else { + Err(format!( + "Cannot initialise remote action cache provider with address {address}, as the scheme is not supported", + )) + } +} + pub struct RemoteCacheRunnerOptions { pub inner: Arc, pub instance_name: Option, @@ -140,8 +187,7 @@ impl CommandRunner { runner_options: RemoteCacheRunnerOptions, provider_options: RemoteCacheProviderOptions, ) -> Result { - let provider = Arc::new(reapi::Provider::new(provider_options).await?); - + let provider = choose_provider(provider_options).await?; Ok(Self::new(runner_options, provider)) } diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal.rs b/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal.rs new file mode 100644 index 00000000000..52c9660b150 --- /dev/null +++ b/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). +#![allow(dead_code)] + +use async_trait::async_trait; +use bytes::Bytes; +use grpc_util::prost::MessageExt; +use hashing::Digest; +use prost::Message; +use protos::gen::build::bazel::remote::execution::v2 as remexec; +use remexec::ActionResult; + +use super::ActionCacheProvider; +use process_execution::Context; + +pub use store::remote::{base_opendal::Provider, ByteStoreProvider}; + +#[async_trait] +impl ActionCacheProvider for Provider { + async fn update_action_result( + &self, + action_digest: Digest, + action_result: ActionResult, + ) -> Result<(), String> { + let bytes = action_result.to_bytes(); + self.store_bytes(action_digest, bytes).await + } + async fn get_action_result( + &self, + action_digest: Digest, + _context: &Context, + ) -> Result, String> { + let mut destination = Vec::new(); + + match self + .load_without_validation(action_digest, &mut destination) + .await? + { + false => Ok(None), + true => { + let bytes = Bytes::from(destination); + Ok(Some(ActionResult::decode(bytes).map_err(|e| { + format!("failed to decode action result for digest {action_digest:?}: {e}") + })?)) + } + } + } +} diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal_tests.rs b/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal_tests.rs new file mode 100644 index 00000000000..38edeed3ac8 --- /dev/null +++ b/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal_tests.rs @@ -0,0 +1,117 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::collections::BTreeMap; +use std::time::Duration; + +use bytes::Bytes; +use grpc_util::prost::MessageExt; +use grpc_util::tls; +use hashing::Digest; +use opendal::services::Memory; +use process_execution::Context; +use prost::Message; +use protos::gen::build::bazel::remote::execution::v2 as remexec; +use store::remote::RemoteOptions; + +use super::base_opendal::Provider; +use super::ActionCacheProvider; + +const BASE: &str = "opendal-testing-base"; + +fn test_path(digest: Digest) -> String { + let fingerprint = digest.hash.to_string(); + format!( + "{}/{}/{}/{}", + BASE, + &fingerprint[0..2], + &fingerprint[2..4], + fingerprint + ) +} + +fn remote_options() -> RemoteOptions { + RemoteOptions { + cas_address: "".to_owned(), + instance_name: None, + tls_config: tls::Config::default(), + headers: BTreeMap::new(), + chunk_size_bytes: 10000, + rpc_timeout: Duration::from_secs(5), + rpc_retries: 1, + rpc_concurrency_limit: 256, + capabilities_cell_opt: None, + batch_api_size_limit: 10000, + } +} + +fn new_provider() -> Provider { + Provider::new(Memory::default(), BASE.to_owned(), remote_options()).unwrap() +} + +async fn write_test_data(provider: &Provider, digest: Digest, data: remexec::ActionResult) { + provider + .operator + .write(&test_path(digest), data.to_bytes()) + .await + .unwrap() +} + +#[tokio::test] +async fn get_action_result_existing() { + let provider = new_provider(); + + let action_digest = Digest::of_bytes(b"get_action_cache test"); + let action_result = remexec::ActionResult { + exit_code: 123, + ..Default::default() + }; + write_test_data(&provider, action_digest, action_result.clone()).await; + + assert_eq!( + provider + .get_action_result(action_digest, &Context::default()) + .await, + Ok(Some(action_result)) + ); +} + +#[tokio::test] +async fn get_action_result_missing() { + let provider = new_provider(); + + let action_digest = Digest::of_bytes(b"update_action_cache test"); + + assert_eq!( + provider + .get_action_result(action_digest, &Context::default()) + .await, + Ok(None) + ); +} + +#[tokio::test] +async fn update_action_cache() { + let provider = new_provider(); + + let action_digest = Digest::of_bytes(b"update_action_cache test"); + let action_result = remexec::ActionResult { + exit_code: 123, + ..Default::default() + }; + + provider + .update_action_result(action_digest, action_result.clone()) + .await + .unwrap(); + + let stored = provider + .operator + .read(&test_path(action_digest)) + .await + .unwrap(); + assert_eq!( + remexec::ActionResult::decode(Bytes::from(stored)).unwrap(), + action_result + ); +}