-
Notifications
You must be signed in to change notification settings - Fork 340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to cache package payloads synchronously #1679
Changes from 13 commits
0d1e29c
d6ede1b
98bd61f
9463a3b
5a55f51
5ea2576
2251de8
b0309eb
2bb1ce6
b5950d6
461adb1
2253d76
1c6c94b
a5997b9
d8dbe4e
72747ca
a8d0439
71e0c4a
b12651e
355a891
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
from rez.config import config | ||
from rez.exceptions import PackageCacheError | ||
from rez.vendor.lockfile import LockFile, NotLocked | ||
from rez.vendor.progress.spinner import PixelSpinner | ||
from rez.utils.filesystem import safe_listdir, safe_makedirs, safe_remove, \ | ||
forceful_rmtree | ||
from rez.utils.colorize import ColorizedStreamHandler | ||
|
@@ -69,6 +70,18 @@ class PackageCache(object): | |
VARIANT_PENDING = 5 #: Variant is pending caching | ||
VARIANT_REMOVED = 6 #: Variant was deleted | ||
|
||
STATUS_DESCRIPTIONS = { | ||
VARIANT_NOT_FOUND: "was not found", | ||
VARIANT_FOUND: "was found", | ||
VARIANT_CREATED: "was created", | ||
VARIANT_COPYING: "payload is still being copied to this cache", | ||
VARIANT_COPY_STALLED: "payload copy has stalled.\nSee " | ||
"https://rez.readthedocs.io/en/stable/caching.html#cleaning-the-cache " | ||
"for more information.", | ||
VARIANT_PENDING: "is pending caching", | ||
VARIANT_REMOVED: "was deleted", | ||
} | ||
|
||
_FILELOCK_TIMEOUT = 10 | ||
_COPYING_TIME_INC = 0.2 | ||
_COPYING_TIME_MAX = 5.0 | ||
|
@@ -116,7 +129,7 @@ def get_cached_root(self, variant): | |
|
||
return rootpath | ||
|
||
def add_variant(self, variant, force=False): | ||
def add_variant(self, variant, force=False, wait_for_copying=False, logger=None): | ||
"""Copy a variant's payload into the cache. | ||
|
||
The following steps are taken to ensure muti-thread/proc safety, and to | ||
|
@@ -147,6 +160,9 @@ def add_variant(self, variant, force=False): | |
variant (Variant): The variant to copy into this cache | ||
force (bool): Copy the variant regardless. Use at your own risk (there | ||
is no guarantee the resulting variant payload will be functional). | ||
wait_for_copying (bool): Whether the caching step should block when one of the | ||
pending variants is marked as already copying. | ||
logger (None | Logger): If a logger is provided, log information to it. | ||
|
||
Returns: | ||
tuple: 2-tuple: | ||
|
@@ -214,17 +230,43 @@ def add_variant(self, variant, force=False): | |
% package.repository | ||
) | ||
|
||
no_op_statuses = ( | ||
no_op_statuses = { | ||
self.VARIANT_FOUND, | ||
self.VARIANT_COPYING, | ||
self.VARIANT_COPY_STALLED | ||
) | ||
self.VARIANT_COPY_STALLED, | ||
} | ||
if not wait_for_copying: | ||
# Copying variants are only no-ops if we want to ignore them. | ||
no_op_statuses.add(self.VARIANT_COPYING) | ||
|
||
# variant already exists, or is being copied to cache by another thread/proc | ||
status, rootpath = self._get_cached_root(variant) | ||
if status in no_op_statuses: | ||
if logger: | ||
logger.warning(f"Not caching {variant.qualified_name}. " | ||
f"Variant {self.STATUS_DESCRIPTIONS[status]}") | ||
return (rootpath, status) | ||
|
||
if wait_for_copying and status == self.VARIANT_COPYING: | ||
spinner = PixelSpinner(f"Waiting for {variant.qualified_name} to finish copying. ") | ||
JeanChristopheMorinPerso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while status == self.VARIANT_COPYING: | ||
spinner.next() | ||
time.sleep(self._COPYING_TIME_INC) | ||
status, rootpath = self._get_cached_root(variant) | ||
else: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this else is not needed and is redundant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is till valid and should be addressed. https://docs.python.org/3/reference/compound_stmts.html#the-while-statement
|
||
# Status has changed, so report the change and return | ||
if logger: | ||
if status in no_op_statuses: | ||
logger.warning(f"{variant.qualified_name} " | ||
isohedronpipeline marked this conversation as resolved.
Show resolved
Hide resolved
|
||
f"{self.STATUS_DESCRIPTIONS[status]}") | ||
elif status == self.VARIANT_FOUND: | ||
# We have resolved into a satisfactory state | ||
logger.info(f"{variant.qualified_name} " | ||
f"{self.STATUS_DESCRIPTIONS[status]}") | ||
else: | ||
logger.warning(f"{variant.qualified_name} " | ||
f"{self.STATUS_DESCRIPTIONS[status]}") | ||
isohedronpipeline marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return (rootpath, status) | ||
|
||
# 1. | ||
path = self._get_hash_path(variant) | ||
safe_makedirs(path) | ||
|
@@ -371,28 +413,50 @@ def add_variants_async(self, variants): | |
|
||
This method is called when a context is created or sourced. Variants | ||
are then added to the cache in a separate process. | ||
|
||
.. deprecated:: 3.1.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO for maintainers: Update with appropriate version number before merging. |
||
Use :method:`add_variants` instead. | ||
""" | ||
return self.add_variants(variants, package_cache_async=True) | ||
|
||
# A prod install is necessary because add_variants_async works by | ||
def add_variants(self, variants, package_cache_async=True): | ||
isohedronpipeline marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Update the package cache by adding some or all of the given variants. | ||
|
||
This method is called when a context is created or sourced. Variants | ||
are then added to the cache in a separate process. | ||
""" | ||
isohedronpipeline marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# A prod install is necessary because add_variants works by | ||
# starting a rez-pkg-cache proc, and this can only be done reliably in | ||
# a prod install. On non-windows we could fork instead, but there would | ||
# remain no good solution on windows. | ||
# | ||
if not system.is_production_rez_install: | ||
raise PackageCacheError( | ||
"PackageCache.add_variants_async is only supported in a " | ||
"PackageCache.add_variants is only supported in a " | ||
"production rez installation." | ||
) | ||
|
||
variants_ = [] | ||
cachable_statuses = { | ||
self.VARIANT_NOT_FOUND, | ||
} | ||
if not package_cache_async: | ||
# We want to monitor copying variants if we're synchronous. | ||
# We also want to report that a status has been stalled, so we'll | ||
# hand that off to the caching function as well | ||
cachable_statuses.update({ | ||
self.VARIANT_COPYING, | ||
self.VARIANT_COPY_STALLED, | ||
}) | ||
|
||
# trim down to those variants that are cachable, and not already cached | ||
for variant in variants: | ||
if not variant.parent.is_cachable: | ||
continue | ||
|
||
status, _ = self._get_cached_root(variant) | ||
if status == self.VARIANT_NOT_FOUND: | ||
if status in cachable_statuses: | ||
variants_.append(variant) | ||
|
||
# if there are no variants to add, and no potential cleanup to do, then exit | ||
|
@@ -433,6 +497,20 @@ def add_variants_async(self, variants): | |
with open(filepath, 'w') as f: | ||
f.write(json.dumps(handle_dict)) | ||
|
||
if package_cache_async: | ||
self._subprocess_package_caching_daemon(self.path) | ||
else: | ||
# syncronous caching | ||
self.run_caching_operation(wait_for_copying=True) | ||
|
||
@staticmethod | ||
def _subprocess_package_caching_daemon(path): | ||
""" | ||
Run the package cache in a daemon process | ||
|
||
Returns: | ||
subprocess.Popen : The package caching daemon process | ||
""" | ||
# configure executable | ||
if platform.system() == "Windows": | ||
kwargs = { | ||
|
@@ -449,7 +527,7 @@ def add_variants_async(self, variants): | |
raise RuntimeError("Did not find rez-pkg-cache executable") | ||
|
||
# start caching subproc | ||
args = [exe, "--daemon", self.path] | ||
args = [exe, "--daemon", path] | ||
|
||
try: | ||
with open(os.devnull, 'w') as devnull: | ||
|
@@ -460,8 +538,8 @@ def add_variants_async(self, variants): | |
else: | ||
out_target = devnull | ||
|
||
subprocess.Popen( | ||
[exe, "--daemon", self.path], | ||
isohedronpipeline marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return subprocess.Popen( | ||
args, | ||
stdout=out_target, | ||
stderr=out_target, | ||
**kwargs | ||
|
@@ -564,6 +642,15 @@ def run_daemon(self): | |
if pid > 0: | ||
sys.exit(0) | ||
|
||
self.run_caching_operation(wait_for_copying=False) | ||
|
||
def run_caching_operation(self, wait_for_copying=False): | ||
JeanChristopheMorinPerso marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Copy pending variants. | ||
|
||
Args: | ||
wait_for_copying (bool): Whether the caching step should block when one of the | ||
pending variants is marked as already copying. | ||
""" | ||
logger = self._init_logging() | ||
|
||
# somewhere for the daemon to store stateful info | ||
|
@@ -574,7 +661,7 @@ def run_daemon(self): | |
# copy variants into cache | ||
try: | ||
while True: | ||
keep_running = self._run_daemon_step(state) | ||
keep_running = self._run_caching_step(state, wait_for_copying=wait_for_copying) | ||
if not keep_running: | ||
break | ||
except Exception: | ||
|
@@ -688,12 +775,13 @@ def _lock(self): | |
except NotLocked: | ||
pass | ||
|
||
def _run_daemon_step(self, state): | ||
def _run_caching_step(self, state, wait_for_copying=False): | ||
logger = state["logger"] | ||
|
||
# pick a random pending variant to copy | ||
pending_filenames = set(os.listdir(self._pending_dir)) | ||
pending_filenames -= set(state.get("copying", set())) | ||
if not wait_for_copying: | ||
pending_filenames -= set(state.get("copying", set())) | ||
if not pending_filenames: | ||
return False | ||
|
||
|
@@ -716,7 +804,11 @@ def _run_daemon_step(self, state): | |
t = time.time() | ||
|
||
try: | ||
rootpath, status = self.add_variant(variant) | ||
rootpath, status = self.add_variant( | ||
variant, | ||
wait_for_copying=wait_for_copying, | ||
logger=logger, | ||
) | ||
|
||
except PackageCacheError as e: | ||
# variant cannot be cached, so remove as a pending variant | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -278,6 +278,12 @@ | |
# Enable package caching during a package build. | ||
package_cache_during_build = False | ||
|
||
# Asynchronously cache packages. If this is false, resolves will block until | ||
# all packages are cached. | ||
# | ||
# .. versionadded:: 3.1.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO for maintainers: Update with appropriate version number before merging. |
||
package_cache_async = True | ||
|
||
# Allow caching of local packages. You would only want to set this True for | ||
# testing purposes. | ||
package_cache_local = False | ||
|
@@ -313,7 +319,7 @@ | |
# This is useful as Platform.os might show different | ||
# values depending on the availability of ``lsb-release`` on the system. | ||
# The map supports regular expression, e.g. to keep versions. | ||
# | ||
# | ||
# .. note:: | ||
# The following examples are not necessarily recommendations. | ||
# | ||
|
@@ -1119,7 +1125,7 @@ | |
|
||
# Enables/disables colorization globally. | ||
# | ||
# .. warning:: | ||
# .. warning:: | ||
# Turned off for Windows currently as there seems to be a problem with the colorama module. | ||
# | ||
# May also set to the string ``force``, which will make rez output color styling | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think
wait_for_copying
should be True by default.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would change the default behavior from async to sync, which I don't think we want to do, do we?