diff --git a/src/rez/cli/env.py b/src/rez/cli/env.py index 986c8e86b..9d8eb8aef 100644 --- a/src/rez/cli/env.py +++ b/src/rez/cli/env.py @@ -119,6 +119,11 @@ def setup_parser(parser, completions=False): parser.add_argument( "--no-pkg-cache", action="store_true", help="Disable package caching") + parser.add_argument( + "--pkg-cache-mode", choices=["sync", "async"], + help="If provided, override the rezconfig's package_cache_async key. " + "If 'sync', the process will block until packages are cached. " + "If 'async', the process will not block while packages are cached.") parser.add_argument( "--pre-command", type=str, help=SUPPRESS) PKG_action = parser.add_argument( @@ -201,6 +206,13 @@ def command(opts, parser, extra_arg_groups=None): rule = Rule.parse_rule(rule_str) package_filter.add_inclusion(rule) + if opts.pkg_cache_mode == "async": + package_cache_mode = True + elif opts.pkg_cache_mode == "sync": + package_cache_mode = False + else: + package_cache_mode = None + # perform the resolve context = ResolvedContext( package_requests=request, @@ -215,7 +227,8 @@ def command(opts, parser, extra_arg_groups=None): caching=(not opts.no_cache), suppress_passive=opts.no_passive, print_stats=opts.stats, - package_caching=(not opts.no_pkg_cache) + package_caching=(not opts.no_pkg_cache), + package_cache_async=package_cache_mode, ) success = (context.status == ResolverStatus.solved) diff --git a/src/rez/config.py b/src/rez/config.py index a6a8515f2..56b4ca3f2 100644 --- a/src/rez/config.py +++ b/src/rez/config.py @@ -454,6 +454,7 @@ def _parse_env_var(self, value): "package_cache_during_build": Bool, "package_cache_local": Bool, "package_cache_same_device": Bool, + "package_cache_async": Bool, "color_enabled": ForceOrBool, "resolve_caching": Bool, "cache_package_files": Bool, diff --git a/src/rez/package_cache.py b/src/rez/package_cache.py index 7ab833bdf..664a439b9 100644 --- a/src/rez/package_cache.py +++ b/src/rez/package_cache.py @@ -22,6 +22,7 @@ from rez.config import config from rez.exceptions import PackageCacheError from rez.vendor.lockfile import LockFile, NotLocked +from rez.vendor.progress.spinner import PixelSpinner from rez.utils.filesystem import safe_listdir, safe_makedirs, safe_remove, \ forceful_rmtree from rez.utils.colorize import ColorizedStreamHandler @@ -70,6 +71,18 @@ class PackageCache(object): VARIANT_PENDING = 5 #: Variant is pending caching VARIANT_REMOVED = 6 #: Variant was deleted + STATUS_DESCRIPTIONS = { + VARIANT_NOT_FOUND: "was not found", + VARIANT_FOUND: "was found", + VARIANT_CREATED: "was created", + VARIANT_COPYING: "payload is still being copied to this cache", + VARIANT_COPY_STALLED: "payload copy has stalled.\nSee " + "https://rez.readthedocs.io/en/stable/caching.html#cleaning-the-cache " + "for more information.", + VARIANT_PENDING: "is pending caching", + VARIANT_REMOVED: "was deleted", + } + _FILELOCK_TIMEOUT = 10 _COPYING_TIME_INC = 0.2 _COPYING_TIME_MAX = 5.0 @@ -117,7 +130,7 @@ def get_cached_root(self, variant): return rootpath - def add_variant(self, variant, force=False): + def add_variant(self, variant, force=False, wait_for_copying=False, logger=None): """Copy a variant's payload into the cache. The following steps are taken to ensure muti-thread/proc safety, and to @@ -148,6 +161,9 @@ def add_variant(self, variant, force=False): variant (Variant): The variant to copy into this cache force (bool): Copy the variant regardless. Use at your own risk (there is no guarantee the resulting variant payload will be functional). + wait_for_copying (bool): Whether the caching step should block when one of the + pending variants is marked as already copying. + logger (None | Logger): If a logger is provided, log information to it. Returns: tuple: 2-tuple: @@ -208,15 +224,40 @@ def add_variant(self, variant, force=False): % package.repository ) - no_op_statuses = ( + no_op_statuses = { self.VARIANT_FOUND, - self.VARIANT_COPYING, - self.VARIANT_COPY_STALLED - ) + self.VARIANT_COPY_STALLED, + } + if not wait_for_copying: + # Copying variants are only no-ops if we want to ignore them. + no_op_statuses.add(self.VARIANT_COPYING) # variant already exists, or is being copied to cache by another thread/proc status, rootpath = self._get_cached_root(variant) if status in no_op_statuses: + if logger: + logger.warning(f"Not caching {variant.qualified_name}. " + f"Variant {self.STATUS_DESCRIPTIONS[status]}") + return (rootpath, status) + + if wait_for_copying and status == self.VARIANT_COPYING: + spinner = PixelSpinner(f"Waiting for {variant.qualified_name} to finish copying. ") + while status == self.VARIANT_COPYING: + spinner.next() + time.sleep(self._COPYING_TIME_INC) + status, rootpath = self._get_cached_root(variant) + + # Status has changed, so report the change and return + if logger: + if status == self.VARIANT_FOUND: + # We have resolved into a satisfactory state + logger.info( + f"{variant.qualified_name} {self.STATUS_DESCRIPTIONS[status]}" + ) + else: + logger.warning( + f"{variant.qualified_name} {self.STATUS_DESCRIPTIONS[status]}" + ) return (rootpath, status) # 1. @@ -365,20 +406,39 @@ def add_variants_async(self, variants): This method is called when a context is created or sourced. Variants are then added to the cache in a separate process. + + .. deprecated:: 3.2.0 + Use :method:`add_variants` instead. """ + return self.add_variants(variants, package_cache_async=True) - # A prod install is necessary because add_variants_async works by + def add_variants(self, variants, package_cache_async=True): + """Add the given variants to the package payload cache. + """ + + # A prod install is necessary because add_variants works by # starting a rez-pkg-cache proc, and this can only be done reliably in # a prod install. On non-windows we could fork instead, but there would # remain no good solution on windows. # if not system.is_production_rez_install: raise PackageCacheError( - "PackageCache.add_variants_async is only supported in a " + "PackageCache.add_variants is only supported in a " "production rez installation." ) variants_ = [] + cachable_statuses = { + self.VARIANT_NOT_FOUND, + } + if not package_cache_async: + # We want to monitor copying variants if we're synchronous. + # We also want to report that a status has been stalled, so we'll + # hand that off to the caching function as well + cachable_statuses.update({ + self.VARIANT_COPYING, + self.VARIANT_COPY_STALLED, + }) # trim down to those variants that are cachable, and not already cached for variant in variants: @@ -386,7 +446,7 @@ def add_variants_async(self, variants): continue status, _ = self._get_cached_root(variant) - if status == self.VARIANT_NOT_FOUND: + if status in cachable_statuses: variants_.append(variant) # if there are no variants to add, and no potential cleanup to do, then exit @@ -427,6 +487,20 @@ def add_variants_async(self, variants): with open(filepath, 'w') as f: f.write(json.dumps(handle_dict)) + if package_cache_async: + self._subprocess_package_caching_daemon(self.path) + else: + # syncronous caching + self._run_caching_operation(wait_for_copying=True) + + @staticmethod + def _subprocess_package_caching_daemon(path): + """ + Run the package cache in a daemon process + + Returns: + subprocess.Popen : The package caching daemon process + """ # configure executable if platform.system() == "Windows": kwargs = { @@ -443,7 +517,7 @@ def add_variants_async(self, variants): raise RuntimeError("Did not find rez-pkg-cache executable") # start caching subproc - args = [exe, "--daemon", self.path] + args = [exe, "--daemon", path] try: with open(os.devnull, 'w') as devnull: @@ -454,8 +528,8 @@ def add_variants_async(self, variants): else: out_target = devnull - subprocess.Popen( - [exe, "--daemon", self.path], + return subprocess.Popen( + args, stdout=out_target, stderr=out_target, **kwargs @@ -558,6 +632,15 @@ def run_daemon(self): if pid > 0: sys.exit(0) + self._run_caching_operation(wait_for_copying=False) + + def _run_caching_operation(self, wait_for_copying=True): + """Copy pending variants. + + Args: + wait_for_copying (bool): Whether the caching step should block when one of the + pending variants is marked as already copying. + """ logger = self._init_logging() # somewhere for the daemon to store stateful info @@ -568,7 +651,7 @@ def run_daemon(self): # copy variants into cache try: while True: - keep_running = self._run_daemon_step(state) + keep_running = self._run_caching_step(state, wait_for_copying=wait_for_copying) if not keep_running: break except Exception: @@ -682,12 +765,13 @@ def _lock(self): except NotLocked: pass - def _run_daemon_step(self, state): + def _run_caching_step(self, state, wait_for_copying=False): logger = state["logger"] # pick a random pending variant to copy pending_filenames = set(os.listdir(self._pending_dir)) - pending_filenames -= set(state.get("copying", set())) + if not wait_for_copying: + pending_filenames -= set(state.get("copying", set())) if not pending_filenames: return False @@ -710,7 +794,11 @@ def _run_daemon_step(self, state): t = time.time() try: - rootpath, status = self.add_variant(variant) + rootpath, status = self.add_variant( + variant, + wait_for_copying=wait_for_copying, + logger=logger, + ) except PackageCacheError as e: # variant cannot be cached, so remove as a pending variant diff --git a/src/rez/resolved_context.py b/src/rez/resolved_context.py index 45dc6e3e6..7e1586b41 100644 --- a/src/rez/resolved_context.py +++ b/src/rez/resolved_context.py @@ -167,7 +167,7 @@ def __init__(self, package_requests, verbosity=0, timestamp=None, package_filter=None, package_orderers=None, max_fails=-1, add_implicit_packages=True, time_limit=-1, callback=None, package_load_callback=None, buf=None, suppress_passive=False, - print_stats=False, package_caching=None): + print_stats=False, package_caching=None, package_cache_async=None): """Perform a package resolve, and store the result. Args: @@ -205,6 +205,8 @@ def __init__(self, package_requests, verbosity=0, timestamp=None, package_caching (bool|None): If True, apply package caching settings as per the config. If None, enable as determined by config setting :data:`package_cache_during_build`. + package_cache_async (bool|None): If True, cache packages asynchronously. + If None, use the config setting :data:`package_cache_async` """ self.load_path = None @@ -246,9 +248,12 @@ def __init__(self, package_requests, verbosity=0, timestamp=None, package_caching = config.package_cache_during_build else: package_caching = True - self.package_caching = package_caching + if package_cache_async is None: + package_cache_async = config.package_cache_async + self.package_cache_async = package_cache_async + # patch settings self.default_patch_lock = PatchLock.no_lock self.patch_locks = {} @@ -1839,13 +1844,16 @@ def _update_package_cache(self): not self.success: return - # see PackageCache.add_variants_async + # see PackageCache.add_variants if not system.is_production_rez_install: return pkgcache = self._get_package_cache() if pkgcache: - pkgcache.add_variants_async(self.resolved_packages) + pkgcache.add_variants( + self.resolved_packages, + self.package_cache_async, + ) @classmethod def _init_context_tracking_payload_base(cls): diff --git a/src/rez/rezconfig.py b/src/rez/rezconfig.py index e9306d1bd..1abd59f71 100644 --- a/src/rez/rezconfig.py +++ b/src/rez/rezconfig.py @@ -278,6 +278,12 @@ # Enable package caching during a package build. package_cache_during_build = False +# Asynchronously cache packages. If this is false, resolves will block until +# all packages are cached. +# +# .. versionadded:: 3.2.0 +package_cache_async = True + # Allow caching of local packages. You would only want to set this True for # testing purposes. package_cache_local = False @@ -313,7 +319,7 @@ # This is useful as Platform.os might show different # values depending on the availability of ``lsb-release`` on the system. # The map supports regular expression, e.g. to keep versions. -# +# # .. note:: # The following examples are not necessarily recommendations. # @@ -1137,7 +1143,7 @@ # Enables/disables colorization globally. # -# .. warning:: +# .. warning:: # Turned off for Windows currently as there seems to be a problem with the colorama module. # # May also set to the string ``force``, which will make rez output color styling diff --git a/src/rez/tests/test_package_cache.py b/src/rez/tests/test_package_cache.py index c340dbfe7..01f91ff7d 100644 --- a/src/rez/tests/test_package_cache.py +++ b/src/rez/tests/test_package_cache.py @@ -134,24 +134,76 @@ def test_caching_on_resolve(self): # Creating the context will asynchronously add variants to the cache # in a separate proc. - # + # NOTE: pyfoo will not cache, because its repo is set to non-caching (see above) c = ResolvedContext([ "timestamped-1.2.0", - "pyfoo-3.1.0" # won't cache, see earlier test + "pyfoo-3.1.0" ]) + # Prove that the resolved context used async mode. + self.assertTrue(c.package_cache_async) + variant = c.get_resolved_package("timestamped") # Retry 50 times with 0.1 sec interval, 5 secs is more than enough for # the very small variant to be copied to cache. - # cached_root = None + resolve_not_always_cached = False for _ in range(50): - time.sleep(0.1) cached_root = pkgcache.get_cached_root(variant) if cached_root: break + resolve_not_always_cached = True + time.sleep(0.1) + + self.assertNotEqual(cached_root, None, + msg="Packages were expected to be cached, but were not.") + + # Test that the package is not immediately cached, since it is asynchronous + # WARNING: This is dangerous since it does open the test to a race condition and + # will fail if the cache happens faster than the resolve. + self.assertNotEqual(resolve_not_always_cached, False) + + expected_payload_file = os.path.join(cached_root, "stuff.txt") + self.assertTrue(os.path.exists(expected_payload_file)) + + # check that refs to root point to cache location in rex code + for ref in ("resolve.timestamped.root", "'{resolve.timestamped.root}'"): + proc = c.execute_rex_code( + code="info(%s)" % ref, + stdout=subprocess.PIPE, + universal_newlines=True + ) + + out, _ = proc.communicate() + root = out.strip() + + self.assertEqual( + root, cached_root, + "Reference %r should resolve to %s, but resolves to %s" + % (ref, cached_root, root) + ) + + @install_dependent() + def test_caching_on_resolve_synchronous(self): + """Test that cache is updated as expected on + resolved env using syncrhonous package caching.""" + pkgcache = self._pkgcache() + + with restore_os_environ(): + # set config settings into env so rez-pkg-cache proc sees them + os.environ.update(self.get_settings_env()) + + # Creating the context will synchronously add variants to the cache + c = ResolvedContext( + ["timestamped-1.2.0", "pyfoo-3.1.0"], + package_cache_async=False, + ) + + variant = c.get_resolved_package("timestamped") + # The first time we try to access it will be cached, because the cache is blocking + cached_root = pkgcache.get_cached_root(variant) self.assertNotEqual(cached_root, None) expected_payload_file = os.path.join(cached_root, "stuff.txt")