diff --git a/python/lsst/daf/butler/registry/_caching_context.py b/python/lsst/daf/butler/registry/_caching_context.py index f217f21a98..58d8840aa2 100644 --- a/python/lsst/daf/butler/registry/_caching_context.py +++ b/python/lsst/daf/butler/registry/_caching_context.py @@ -29,8 +29,14 @@ __all__ = ["CachingContext"] +from typing import TYPE_CHECKING + from ._collection_record_cache import CollectionRecordCache from ._collection_summary_cache import CollectionSummaryCache +from ._dataset_type_cache import DatasetTypeCache + +if TYPE_CHECKING: + from .interfaces import DatasetRecordStorage class CachingContext: @@ -45,6 +51,9 @@ class CachingContext: instances which will be `None` when caching is disabled. Instance of this class is passed to the relevant managers that can use it to query or populate caches when caching is enabled. + + Dataset type cache is always enabled for now, this avoids the need for + explicitly enabling caching in pipetask executors. """ collection_records: CollectionRecordCache | None = None @@ -53,6 +62,9 @@ class is passed to the relevant managers that can use it to query or collection_summaries: CollectionSummaryCache | None = None """Cache for collection summary records (`CollectionSummaryCache`).""" + dataset_types: DatasetTypeCache[DatasetRecordStorage] | None = DatasetTypeCache() + """Cache for dataset types, never disabled (`DatasetTypeCache`).""" + def enable(self) -> None: """Enable caches, initializes all caches.""" self.collection_records = CollectionRecordCache() diff --git a/python/lsst/daf/butler/registry/_dataset_type_cache.py b/python/lsst/daf/butler/registry/_dataset_type_cache.py new file mode 100644 index 0000000000..3f1665dfa3 --- /dev/null +++ b/python/lsst/daf/butler/registry/_dataset_type_cache.py @@ -0,0 +1,162 @@ +# This file is part of daf_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ("DatasetTypeCache",) + +from collections.abc import Iterable, Iterator +from typing import Generic, TypeVar + +from .._dataset_type import DatasetType + +_T = TypeVar("_T") + + +class DatasetTypeCache(Generic[_T]): + """Cache for dataset types. + + Notes + ----- + This class caches mapping of dataset type name to a corresponding + `DatasetType` instance. Registry manager also needs to cache corresponding + "storage" instance, so this class allows storing additional opaque object + along with the dataset type. + + In come contexts (e.g. ``resolve_wildcard``) a full list of dataset types + is needed. To signify that cache content can be used in such contexts, + cache defines special ``full`` flag that needs to be set by client. + """ + + def __init__(self) -> None: + self._cache: dict[str, tuple[DatasetType, _T | None]] = {} + self._full = False + + @property + def full(self) -> bool: + """`True` if cache holds all known dataset types (`bool`).""" + return self._full + + def add(self, dataset_type: DatasetType, extra: _T | None = None) -> None: + """Add one record to the cache. + + Parameters + ---------- + dataset_type : `DatasetType` + Dataset type, replaces any existing dataset type with the same + name. + extra : `Any`, optional + Additional opaque object stored with this dataset type. + """ + self._cache[dataset_type.name] = (dataset_type, extra) + + def set(self, data: Iterable[DatasetType | tuple[DatasetType, _T | None]], *, full: bool = False) -> None: + """Replace cache contents with the new set of dataset types. + + Parameters + ---------- + data : `~collections.abc.Iterable` + Sequence of `DatasetType` instances or tuples of `DatasetType` and + an extra opaque object. + full : `bool` + If `True` then ``data`` contains all known dataset types. + """ + self.clear() + for item in data: + if isinstance(item, DatasetType): + item = (item, None) + self._cache[item[0].name] = item + self._full = full + + def clear(self) -> None: + """Remove everything from the cache.""" + self._cache = {} + self._full = False + + def discard(self, name: str) -> None: + """Remove named dataset type from the cache. + + Parameters + ---------- + name : `str` + Name of the dataset type to remove. + """ + self._cache.pop(name, None) + + def get(self, name: str) -> tuple[DatasetType | None, _T | None]: + """Return cached info given dataset type name. + + Parameters + ---------- + name : `str` + Dataset type name. + + Returns + ------- + dataset_type : `DatasetType` or `None` + Cached dataset type, `None` is returned if the name is not in the + cache. + extra : `Any` or `None` + Cached opaque data, `None` is returned if the name is not in the + cache or no extra info was stored for this dataset type. + """ + item = self._cache.get(name) + if item is None: + return (None, None) + return item + + def get_dataset_type(self, name: str) -> DatasetType | None: + """Return dataset type given its name. + + Parameters + ---------- + name : `str` + Dataset type name. + + Returns + ------- + dataset_type : `DatasetType` or `None` + Cached dataset type, `None` is returned if the name is not in the + cache. + """ + item = self._cache.get(name) + if item is None: + return None + return item[0] + + def items(self) -> Iterator[tuple[DatasetType, _T | None]]: + """Return iterator for the set of items in the cache, can only be + used if `full` is true. + + Raises + ------ + RuntimeError + Raised if ``self.full`` is `False`. + """ + if not self._full: + raise RuntimeError("cannot call items() if cache is not full") + return iter(self._cache.values()) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index 1cd5eeac63..692d8585a3 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -121,6 +121,8 @@ class ByDimensionsDatasetRecordStorageManagerBase(DatasetRecordStorageManager): tables used by this class. summaries : `CollectionSummaryManager` Structure containing tables that summarize the contents of collections. + caching_context : `CachingContext` + Object controlling caching of information returned by managers. """ def __init__( @@ -131,6 +133,7 @@ def __init__( dimensions: DimensionRecordStorageManager, static: StaticDatasetTablesTuple, summaries: CollectionSummaryManager, + caching_context: CachingContext, registry_schema_version: VersionTuple | None = None, ): super().__init__(registry_schema_version=registry_schema_version) @@ -139,6 +142,7 @@ def __init__( self._dimensions = dimensions self._static = static self._summaries = summaries + self._caching_context = caching_context @classmethod def initialize( @@ -170,6 +174,7 @@ def initialize( dimensions=dimensions, static=static, summaries=summaries, + caching_context=caching_context, registry_schema_version=registry_schema_version, ) @@ -237,7 +242,8 @@ def addDatasetForeignKey( def refresh(self) -> None: # Docstring inherited from DatasetRecordStorageManager. - pass + if self._caching_context.dataset_types is not None: + self._caching_context.dataset_types.clear() def _make_storage(self, record: _DatasetTypeRecord) -> ByDimensionsDatasetRecordStorage: """Create storage instance for a dataset type record.""" @@ -286,8 +292,28 @@ def remove(self, name: str) -> None: def find(self, name: str) -> DatasetRecordStorage | None: # Docstring inherited from DatasetRecordStorageManager. + if self._caching_context.dataset_types is not None: + _, storage = self._caching_context.dataset_types.get(name) + if storage is not None: + return storage + else: + # On the first cache miss populate the cache with complete list + # of dataset types (if it was not done yet). + if not self._caching_context.dataset_types.full: + self._fetch_dataset_types() + # Try again + _, storage = self._caching_context.dataset_types.get(name) + if self._caching_context.dataset_types.full: + # If not in cache then dataset type is not defined. + return storage record = self._fetch_dataset_type_record(name) - return self._make_storage(record) if record is not None else None + if record is not None: + storage = self._make_storage(record) + if self._caching_context.dataset_types is not None: + self._caching_context.dataset_types.add(storage.datasetType, storage) + return storage + else: + return None def register(self, datasetType: DatasetType) -> bool: # Docstring inherited from DatasetRecordStorageManager. @@ -316,7 +342,7 @@ def register(self, datasetType: DatasetType) -> bool: self.getIdColumnType(), ), ) - _, inserted = self._db.sync( + row, inserted = self._db.sync( self._static.dataset_type, keys={"name": datasetType.name}, compared={ @@ -331,6 +357,16 @@ def register(self, datasetType: DatasetType) -> bool: }, returning=["id", "tag_association_table"], ) + # Make sure that cache is updated + if self._caching_context.dataset_types is not None and row is not None: + record = _DatasetTypeRecord( + dataset_type=datasetType, + dataset_type_id=row["id"], + tag_table_name=tagTableName, + calib_table_name=calibTableName, + ) + storage = self._make_storage(record) + self._caching_context.dataset_types.add(datasetType, storage) else: if datasetType != record.dataset_type: raise ConflictingDefinitionError( @@ -338,9 +374,7 @@ def register(self, datasetType: DatasetType) -> bool: f"with database definition {record.dataset_type}." ) inserted = False - # TODO: We return storage instance from this method, but the only - # client that uses this method ignores it. Maybe we should drop it - # and avoid making storage instance above. + return bool(inserted) def resolve_wildcard( @@ -472,7 +506,15 @@ def getDatasetRef(self, id: DatasetId) -> DatasetRef | None: row = sql_result.mappings().fetchone() if row is None: return None - storage = self._make_storage(self._record_from_row(row)) + record = self._record_from_row(row) + storage: DatasetRecordStorage | None = None + if self._caching_context.dataset_types is not None: + _, storage = self._caching_context.dataset_types.get(record.dataset_type.name) + if storage is None: + storage = self._make_storage(record) + if self._caching_context.dataset_types is not None: + self._caching_context.dataset_types.add(storage.datasetType, storage) + assert isinstance(storage, ByDimensionsDatasetRecordStorage), "Not expected storage class" return DatasetRef( storage.datasetType, dataId=storage.getDataId(id=id), @@ -516,9 +558,17 @@ def _dataset_type_from_row(self, row: Mapping) -> DatasetType: def _fetch_dataset_types(self) -> list[DatasetType]: """Fetch list of all defined dataset types.""" + if self._caching_context.dataset_types is not None: + if self._caching_context.dataset_types.full: + return [dataset_type for dataset_type, _ in self._caching_context.dataset_types.items()] with self._db.query(self._static.dataset_type.select()) as sql_result: sql_rows = sql_result.mappings().fetchall() - return [self._record_from_row(row).dataset_type for row in sql_rows] + records = [self._record_from_row(row) for row in sql_rows] + # Cache everything and specify that cache is complete. + if self._caching_context.dataset_types is not None: + cache_data = [(record.dataset_type, self._make_storage(record)) for record in records] + self._caching_context.dataset_types.set(cache_data, full=True) + return [record.dataset_type for record in records] def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary: # Docstring inherited from DatasetRecordStorageManager.