Skip to content

Commit

Permalink
Add ability to pass provenance through butler.put
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jan 24, 2025
1 parent 4b2b537 commit ff6ea4a
Show file tree
Hide file tree
Showing 14 changed files with 153 additions and 21 deletions.
1 change: 1 addition & 0 deletions python/lsst/daf/butler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from ._config_support import LookupKey
from ._dataset_association import *
from ._dataset_existence import *
from ._dataset_provenance import *
from ._dataset_ref import *
from ._dataset_type import *
from ._deferredDatasetHandle import *
Expand Down
5 changes: 5 additions & 0 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

if TYPE_CHECKING:
from ._dataset_existence import DatasetExistence
from ._dataset_provenance import DatasetProvenance
from ._dataset_ref import DatasetId, DatasetRef
from ._dataset_type import DatasetType
from ._deferredDatasetHandle import DeferredDatasetHandle
Expand Down Expand Up @@ -664,6 +665,7 @@ def put(
dataId: DataId | None = None,
*,
run: str | None = None,
provenance: DatasetProvenance | None = None,
**kwargs: Any,
) -> DatasetRef:
"""Store and register a dataset.
Expand All @@ -683,6 +685,9 @@ def put(
run : `str`, optional
The name of the run the dataset should be added to, overriding
``self.run``. Not used if a resolved `DatasetRef` is provided.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Not supported by all serialization mechanisms.
**kwargs
Additional keyword arguments used to augment or construct a
`DataCoordinate`. See `DataCoordinate.standardize`
Expand Down
67 changes: 67 additions & 0 deletions python/lsst/daf/butler/_dataset_provenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("DatasetProvenance",)

import typing
import uuid

import pydantic

from ._dataset_ref import DatasetRef, SerializedDatasetRef


class DatasetProvenance(pydantic.BaseModel):
"""Provenance of a single `DatasetRef`."""

inputs: list[SerializedDatasetRef] = pydantic.Field(default_factory=list)
"""The input datasets."""
quantum_id: uuid.UUID | None = None
"""Identifier of the Quantum that was executed."""
_uuids: set[uuid.UUID] = pydantic.PrivateAttr(default_factory=set)

@pydantic.model_validator(mode="after")
def populate_cache(self) -> typing.Self:
for ref in self.inputs:
self._uuids.add(ref.id)
return self

Check warning on line 53 in python/lsst/daf/butler/_dataset_provenance.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_dataset_provenance.py#L52-L53

Added lines #L52 - L53 were not covered by tests

def add_input(self, ref: DatasetRef) -> None:
"""Add an input dataset to the provenance.
Parameters
----------
ref : `DatasetRef`
A dataset to register as an input.
"""
if ref.id in self._uuids:
# Already registered.
return
self._uuids.add(ref.id)
self.inputs.append(ref.to_simple())

Check warning on line 67 in python/lsst/daf/butler/_dataset_provenance.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/_dataset_provenance.py#L65-L67

Added lines #L65 - L67 were not covered by tests
7 changes: 7 additions & 0 deletions python/lsst/daf/butler/_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
log = logging.getLogger(__name__)

if TYPE_CHECKING:
from ._dataset_provenance import DatasetProvenance
from ._dataset_ref import DatasetRef
from ._dataset_type import DatasetType
from ._storage_class import StorageClass
Expand Down Expand Up @@ -98,6 +99,10 @@ class FormatterV2:
Parameters to control how the dataset is serialized.
write_recipes : `dict`, optional
Detailed write recipes indexed by recipe name.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Can be ignored by a formatter.
**kwargs
Additional arguments that will be ignored but allow for
`Formatter` V1 parameters to be given.
Expand Down Expand Up @@ -169,6 +174,7 @@ def __init__(
ref: DatasetRef,
write_parameters: Mapping[str, Any] | None = None,
write_recipes: Mapping[str, Any] | None = None,
provenance: DatasetProvenance | None = None,
# Compatibility parameters. Unused in v2.
**kwargs: Any,
):
Expand Down Expand Up @@ -198,6 +204,7 @@ def __init__(

self._write_parameters = write_parameters
self._write_recipes = self.validate_write_recipes(write_recipes)
self._provenance = provenance

def __str__(self) -> str:
return f"{self.name()}@{self.file_descriptor.location.uri}"
Expand Down
6 changes: 5 additions & 1 deletion python/lsst/daf/butler/_limited_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from lsst.resources import ResourcePath

from ._dataset_provenance import DatasetProvenance
from ._dataset_ref import DatasetRef
from ._deferredDatasetHandle import DeferredDatasetHandle
from ._storage_class import StorageClass, StorageClassFactory
Expand Down Expand Up @@ -64,7 +65,7 @@ def isWriteable(self) -> bool:
raise NotImplementedError()

@abstractmethod
def put(self, obj: Any, ref: DatasetRef, /) -> DatasetRef:
def put(self, obj: Any, ref: DatasetRef, /, *, provenance: DatasetProvenance | None = None) -> DatasetRef:
"""Store a dataset that already has a UUID and ``RUN`` collection.
Parameters
Expand All @@ -73,6 +74,9 @@ def put(self, obj: Any, ref: DatasetRef, /) -> DatasetRef:
The dataset.
ref : `DatasetRef`
Resolved reference for a not-yet-stored dataset.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Not supported by all serialization mechanisms.
Returns
-------
Expand Down
5 changes: 3 additions & 2 deletions python/lsst/daf/butler/_quantum_backed.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from ._butler_config import ButlerConfig
from ._config import Config
from ._dataset_provenance import DatasetProvenance
from ._dataset_ref import DatasetId, DatasetRef
from ._dataset_type import DatasetType
from ._deferredDatasetHandle import DeferredDatasetHandle
Expand Down Expand Up @@ -453,11 +454,11 @@ def dimensions(self) -> DimensionUniverse:
# Docstring inherited.
return self._dimensions

def put(self, obj: Any, ref: DatasetRef, /) -> DatasetRef:
def put(self, obj: Any, ref: DatasetRef, /, provenance: DatasetProvenance | None = None) -> DatasetRef:
# Docstring inherited.
if ref.id not in self._predicted_outputs:
raise RuntimeError("Cannot `put` dataset that was not predicted as an output.")
self._datastore.put(obj, ref)
self._datastore.put(obj, ref, provenance=provenance)
self._actual_output_refs.add(ref)
return ref

Expand Down
9 changes: 7 additions & 2 deletions python/lsst/daf/butler/_storage_class_delegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from lsst.utils.introspection import get_full_type_name

if TYPE_CHECKING:
from lsst.daf.butler import DatasetRef
from lsst.daf.butler import DatasetProvenance, DatasetRef

from ._storage_class import StorageClass

Expand Down Expand Up @@ -335,7 +335,9 @@ def disassemble(

return components

def add_provenance(self, inMemoryDataset: Any, ref: DatasetRef) -> Any:
def add_provenance(
self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None
) -> Any:
"""Add provenance to the composite dataset.
Parameters
Expand All @@ -344,6 +346,9 @@ def add_provenance(self, inMemoryDataset: Any, ref: DatasetRef) -> Any:
The composite dataset to serialize.
ref : `DatasetRef`
The dataset associated with this in-memory dataset.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Can be ignored by a delegate.
Returns
-------
Expand Down
12 changes: 10 additions & 2 deletions python/lsst/daf/butler/datastore/_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@

from .. import ddl
from .._config_support import LookupKey
from .._dataset_provenance import DatasetProvenance
from .._dataset_ref import DatasetRef
from .._dataset_type import DatasetType
from .._storage_class import StorageClass
Expand Down Expand Up @@ -626,7 +627,9 @@ def prepare_get_for_external_client(self, ref: DatasetRef) -> object | None:
raise NotImplementedError()

@abstractmethod
def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None:
def put(
self, inMemoryDataset: Any, datasetRef: DatasetRef, provenance: DatasetProvenance | None = None
) -> None:
"""Write a `InMemoryDataset` with a given `DatasetRef` to the store.
Parameters
Expand All @@ -635,6 +638,9 @@ def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None:
The Dataset to store.
datasetRef : `DatasetRef`
Reference to the associated Dataset.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Not supported by all serialization mechanisms.
"""
raise NotImplementedError("Must be implemented by subclass")

Expand Down Expand Up @@ -1449,7 +1455,9 @@ def get(
) -> Any:
raise FileNotFoundError("This is a no-op datastore that can not access a real datastore")

def put(self, inMemoryDataset: Any, datasetRef: DatasetRef) -> None:
def put(
self, inMemoryDataset: Any, datasetRef: DatasetRef, provenance: DatasetProvenance | None = None
) -> None:
raise NotImplementedError("This is a no-op datastore that can not access a real datastore")

def put_new(self, in_memory_dataset: Any, ref: DatasetRef) -> Mapping[str, DatasetRef]:
Expand Down
9 changes: 6 additions & 3 deletions python/lsst/daf/butler/datastores/chainedDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from lsst.utils import doImportType

if TYPE_CHECKING:
from lsst.daf.butler import Config, DatasetType, LookupKey, StorageClass
from lsst.daf.butler import Config, DatasetProvenance, DatasetType, LookupKey, StorageClass
from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager
from lsst.resources import ResourcePathExpression

Expand Down Expand Up @@ -420,7 +420,7 @@ def _get_matching_datastore(self, ref: DatasetRef) -> Datastore | None:

return None

def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None:
"""Write a InMemoryDataset with a given `DatasetRef` to each
datastore.
Expand All @@ -435,6 +435,9 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
The dataset to store.
ref : `DatasetRef`
Reference to the associated Dataset.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Not supported by all serialization mechanisms.
Raises
------
Expand Down Expand Up @@ -468,7 +471,7 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
else:
npermanent += 1
try:
datastore.put(inMemoryDataset, ref)
datastore.put(inMemoryDataset, ref, provenance=provenance)
nsuccess += 1
if not datastore.isEphemeral:
isPermanent = True
Expand Down
26 changes: 20 additions & 6 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
from sqlalchemy import BigInteger, String

if TYPE_CHECKING:
from lsst.daf.butler import LookupKey
from lsst.daf.butler import DatasetProvenance, LookupKey
from lsst.daf.butler.registry.interfaces import DatasetIdRef, DatastoreRegistryBridgeManager

log = getLogger(__name__)
Expand Down Expand Up @@ -830,13 +830,17 @@ def _prepare_for_direct_get(
parameters=parameters,
)

def _determine_put_formatter_location(self, ref: DatasetRef) -> tuple[Location, Formatter | FormatterV2]:
def _determine_put_formatter_location(
self, ref: DatasetRef, provenance: DatasetProvenance | None = None
) -> tuple[Location, Formatter | FormatterV2]:
"""Calculate the formatter and output location to use for put.
Parameters
----------
ref : `DatasetRef`
Reference to the associated Dataset.
provenance : `DatasetProvenance`
Any provenance that should be attached to the serialized dataset.
Returns
-------
Expand Down Expand Up @@ -865,6 +869,7 @@ def _determine_put_formatter_location(self, ref: DatasetRef) -> tuple[Location,
FileDescriptor(location, storageClass=storageClass, component=ref.datasetType.component()),
dataId=ref.dataId,
ref=ref,
provenance=provenance,
)
except KeyError as e:
raise DatasetTypeNotSupportedError(
Expand Down Expand Up @@ -1275,7 +1280,9 @@ def _calculate_ingested_datastore_name(

return location

def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) -> StoredFileInfo:
def _write_in_memory_to_artifact(
self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None
) -> StoredFileInfo:
"""Write out in memory dataset to datastore.
Parameters
Expand All @@ -1284,6 +1291,9 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
Dataset to write to datastore.
ref : `DatasetRef`
Registry information associated with this dataset.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Not supported by all formatters.
Returns
-------
Expand All @@ -1302,7 +1312,7 @@ def _write_in_memory_to_artifact(self, inMemoryDataset: Any, ref: DatasetRef) ->
f"Dataset {ref} has been rejected by this datastore via configuration."
)

location, formatter = self._determine_put_formatter_location(ref)
location, formatter = self._determine_put_formatter_location(ref, provenance)

# The external storage class can differ from the registry storage
# class AND the given in-memory dataset might not match any of the
Expand Down Expand Up @@ -2313,7 +2323,7 @@ def prepare_get_for_external_client(self, ref: DatasetRef) -> FileDatastoreGetPa
)

@transactional
def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
def put(self, inMemoryDataset: Any, ref: DatasetRef, provenance: DatasetProvenance | None = None) -> None:
"""Write a InMemoryDataset with a given `DatasetRef` to the store.
Parameters
Expand All @@ -2322,6 +2332,9 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
The dataset to store.
ref : `DatasetRef`
Reference to the associated Dataset.
provenance : `DatasetProvenance` or `None`, optional
Any provenance that should be attached to the serialized dataset.
Can be ignored by a formatter or delegate.
Raises
------
Expand Down Expand Up @@ -2358,11 +2371,12 @@ def put(self, inMemoryDataset: Any, ref: DatasetRef) -> None:
# DatasetType does not refer to the types of components
# So we construct one ourselves.
compRef = ref.makeComponentRef(component)
# Provenance has already been attached above.
storedInfo = self._write_in_memory_to_artifact(componentInfo.component, compRef)
artifacts.append((compRef, storedInfo))
else:
# Write the entire thing out
storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref)
storedInfo = self._write_in_memory_to_artifact(inMemoryDataset, ref, provenance=provenance)
artifacts.append((ref, storedInfo))

self._register_datasets(artifacts, insert_mode=DatabaseInsertMode.INSERT)
Expand Down
Loading

0 comments on commit ff6ea4a

Please sign in to comment.