diff --git a/docs/source/config_storage.rst b/docs/source/config_storage.rst
index 21a59b1b4..04d2661ae 100644
--- a/docs/source/config_storage.rst
+++ b/docs/source/config_storage.rst
@@ -9,13 +9,13 @@ what is done behind the scenes.
Downloading XENONnT files from the database
-------------------------------------------
-Most generically one downloads files using the :py:class:`straxen.MongoDownloader`
+Most generically one downloads files using the :py:class:`utilix.mongo_storage.MongoDownloader`
function. For example, one can download a file:
.. code-block:: python
- import straxen
- downloader = straxen.MongoDownloader()
+ import utilix
+ downloader = utilix.mongo_storage.MongoDownloader()
# The downloader allows one to download files from the mongo database by
# looking for the requested name in the files database. The downloader
#returns the path of the downloaded file.
@@ -55,7 +55,7 @@ Therefore, this manner of loading data is intended only for testing purposes.
How does the downloading work?
--------------------------------------
-In :py:mod:`straxen/mongo_storage.py` there are two classes that take care of the
+In :py:mod:`utilix/mongo_storage.py` there are two classes that take care of the
downloading and the uploading of files to the `files` database. In this
database we store configuration files under a :py:obj:`config_identifier` i.e. the
:py:obj:`'file_name'`. This is the label that is used to find the document one is
@@ -69,8 +69,8 @@ an admin user (with the credentials to upload files to the database) uploads a
file to the `files`- database (not shown) such that it can be downloaded later
by any user. The admin user can upload a file using the command
:py:obj:`MongoUploader.upload_from_dict({'file_name', '/path/to/file'})`.
-This command will use the :py:class:`straxen.MongoUploader` class to put the file
-:py:obj:`'file_name'` in the `files` database. The :py:class:`straxen.MongoUploader` will
+This command will use the :py:class:`utilix.mongo_storage.MongoUploader` class to put the file
+:py:obj:`'file_name'` in the `files` database. The :py:class:`utilix.mongo_storage.MongoUploader` will
communicate with the database via `GridFs
`_.
The GridFs interface communicates with two mongo-collections; :py:obj:`'fs.files'` and
@@ -81,15 +81,15 @@ storing pieces of data (not to be confused with :py:class:`strax.Chunks`).
Uploading
^^^^^^^^^
When the admin user issues the command to upload the :py:obj:`'file_name'`-file. The
-:py:class:`straxen.MongoUploader` will check that the file is not already stored in the
-database. To this end, the :py:class:`straxen.MongoUploader` computes the :py:obj:`md5-hash` of
+:py:class:`utilix.mongo_storage.MongoUploader` will check that the file is not already stored in the
+database. To this end, the :py:class:`utilix.mongo_storage.MongoUploader` computes the :py:obj:`md5-hash` of
the file stored under the :py:obj:`'/path/to/file'`. If this is the first time a file
-with this :py:obj:`md5-hash` is uploaded, :py:class:`straxen.MongoUploader` will upload it to
+with this :py:obj:`md5-hash` is uploaded, :py:class:`utilix.mongo_storage.MongoUploader` will upload it to
:py:obj:`GridFs`. If there is already an existing file with the :py:obj:`md5-hash`, there is no
need to upload. This however does mean that if there is already a file :py:obj:`'file_name'`
stored and you modify the :py:obj:`'file_name'`-file, it will be uploaded again! This is
a feature, not a bug. When a user requests the :py:obj:`'file_name'`-file, the
-:py:class:`straxen.MongoDownloader` will fetch the :py:obj:`'file_name'`-file that was uploaded
+:py:class:`utilix.mongo_storage.MongoDownloader` will fetch the :py:obj:`'file_name'`-file that was uploaded
last.
@@ -98,7 +98,7 @@ Downloading
Assuming that an admin user uploaded the :py:obj:`'file_name'`-file, any user (no
required admin rights) can now download the :py:obj:`'file_name'`-file (see above for the
example). When the user executes :py:obj:`MongoUploader.download_single('file_name')`,
-the :py:class:`straxen.MongoDownloader` will check if the file is downloaded already. If
+the :py:class:`utilix.mongo_storage.MongoDownloader` will check if the file is downloaded already. If
this is the case it will simply return the path of the file. Otherwise, it will
start downloading the file. It is important to notice that the files are saved
under their :py:obj:`md5-hash`-name. This means that wherever the files are stored,
@@ -112,8 +112,8 @@ already stored but it would be if the file has been changed as explained above.
Straxen Mongo config loader classes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-Both the :py:class:`straxen.MongoUploader` and :py:class:`straxen.MongoDownloader` share a common
+Both the :py:class:`utilix.mongo_storage.MongoUploader` and :py:class:`utilix.mongo_storage.MongoDownloader` share a common
parent class, the :py:class:`straxen.GridFsInterface` that provides the appropriate
shared functionality and connection to the database. The important difference
is the :py:obj:`readonly` argument that naturally has to be :py:obj:`False` for the
-:py:class:`straxen.MongoUploader` but :py:obj:`True` for the :py:class:`straxen.MongoDownloader`.
+:py:class:`utilix.mongo_storage.MongoUploader` but :py:obj:`True` for the :py:class:`utilix.mongo_storage.MongoDownloader`.
diff --git a/docs/source/reference/straxen.storage.rst b/docs/source/reference/straxen.storage.rst
index 97758ea4c..c6bd6213a 100644
--- a/docs/source/reference/straxen.storage.rst
+++ b/docs/source/reference/straxen.storage.rst
@@ -4,14 +4,6 @@ straxen.storage package
Submodules
----------
-straxen.storage.mongo\_storage module
--------------------------------------
-
-.. automodule:: straxen.storage.mongo_storage
- :members:
- :undoc-members:
- :show-inheritance:
-
straxen.storage.online\_monitor\_frontend module
------------------------------------------------
diff --git a/docs/source/url_configs.rst b/docs/source/url_configs.rst
index 94fff1e94..32e3368a2 100644
--- a/docs/source/url_configs.rst
+++ b/docs/source/url_configs.rst
@@ -53,10 +53,10 @@ A concrete plugin example
print(f"Path is local. Loading {self.algorithm} TF model locally "
f"from disk.")
else:
- downloader = straxen.MongoDownloader()
+ downloader = utilix.mongo_storage.MongoDownloader()
try:
self.model_file = downloader.download_single(self.model_file)
- except straxen.mongo_storage.CouldNotLoadError as e:
+ except utilix.mongo_storage.CouldNotLoadError as e:
raise RuntimeError(f'Model files {self.model_file} is not found') from e
with tempfile.TemporaryDirectory() as tmpdirname:
tar = tarfile.open(self.model_file, mode="r:gz")
diff --git a/pyproject.toml b/pyproject.toml
index b5461a357..354482b86 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -47,9 +47,8 @@ docutils = "==0.18.1"
mistune = "==0.8.4"
pymongo = "*"
requests = "*"
-utilix = ">=0.5.3"
+utilix = ">=0.11.0"
xedocs = "*"
-sharedarray = { url = "https://xenon.isi.edu/python/SharedArray-3.2.3.tar.gz", optional = true }
base_environment = { git = "https://github.com/XENONnT/base_environment.git", optional = true }
commonmark = { version = "0.9.1", optional = true }
nbsphinx = { version = "0.8.9", optional = true }
diff --git a/pytest.ini b/pytest.ini
index 64297b653..db6f981fd 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,4 +1,4 @@
[pytest]
filterwarnings =
ignore::numba.NumbaExperimentalFeatureWarning
- ignore::straxen.storage.mongo_storage.DownloadWarning
+ ignore::utilix.mongo_storage.DownloadWarning
diff --git a/straxen/analyses/daq_waveforms.py b/straxen/analyses/daq_waveforms.py
index d3d6ef4e7..92b7f2c4f 100644
--- a/straxen/analyses/daq_waveforms.py
+++ b/straxen/analyses/daq_waveforms.py
@@ -82,7 +82,7 @@ def _board_to_host_link(daq_config: dict, board: int, add_crate=True) -> str:
def _get_cable_map(name: str = "xenonnt_cable_map.csv") -> pandas.DataFrame:
"""Download the cable map and return as a pandas dataframe."""
- down = straxen.MongoDownloader()
+ down = utilix.mongo_storage.MongoDownloader()
cable_map = down.download_single(name)
cable_map = pandas.read_csv(cable_map)
return cable_map
diff --git a/straxen/common.py b/straxen/common.py
index d7eedac04..830d7ea8e 100644
--- a/straxen/common.py
+++ b/straxen/common.py
@@ -17,6 +17,7 @@
import numba
import strax
import straxen
+import utilix
export, __all__ = strax.exporter()
__all__.extend(
@@ -222,7 +223,7 @@ def get_resource(x: str, fmt="text"):
return open_resource(x, fmt=fmt)
# 3. load from database
elif straxen.uconfig is not None:
- downloader = straxen.MongoDownloader()
+ downloader = utilix.mongo_storage.MongoDownloader()
if x in downloader.list_files():
path = downloader.download_single(x)
return open_resource(path, fmt=fmt)
diff --git a/straxen/config/protocols.py b/straxen/config/protocols.py
index 70fbc19a9..316f69988 100644
--- a/straxen/config/protocols.py
+++ b/straxen/config/protocols.py
@@ -16,6 +16,7 @@
from immutabledict import immutabledict
from utilix import xent_collection
+import utilix
from scipy.interpolate import interp1d
@@ -42,7 +43,7 @@ def get_resource(name: str, fmt: str = "text", **kwargs):
"""Fetch a straxen resource Allow a direct download using otherwise kwargs are
passed directly to straxen.get_resource."""
if fmt == "abs_path":
- downloader = straxen.MongoDownloader()
+ downloader = utilix.mongo_storage.MongoDownloader()
return downloader.download_single(name)
return straxen.get_resource(name, fmt=fmt)
diff --git a/straxen/corrections_services.py b/straxen/corrections_services.py
index 1039ac6ea..6aec14e82 100644
--- a/straxen/corrections_services.py
+++ b/straxen/corrections_services.py
@@ -288,7 +288,8 @@ def get_pmt_gains(
return to_pe
def get_config_from_cmt(self, run_id, model_type, version="ONLINE"):
- """Smart logic to return NN weights file name to be downloader by straxen.MongoDownloader()
+ """Smart logic to return NN weights file name to be downloader by
+ utilix.mongo_storage.MongoDownloader()
:param run_id: run id from runDB
:param model_type: model type and neural network type; model_mlp, or model_gcn or model_cnn
diff --git a/straxen/storage/__init__.py b/straxen/storage/__init__.py
index 494e75d4e..f28f8071a 100644
--- a/straxen/storage/__init__.py
+++ b/straxen/storage/__init__.py
@@ -10,5 +10,5 @@
from . import rundb
from .rundb import *
-from . import mongo_storage
-from .mongo_storage import *
+from utilix import mongo_storage
+from utilix.mongo_storage import *
diff --git a/straxen/storage/mongo_storage.py b/straxen/storage/mongo_storage.py
deleted file mode 100644
index 1712352da..000000000
--- a/straxen/storage/mongo_storage.py
+++ /dev/null
@@ -1,595 +0,0 @@
-import os
-import tempfile
-from datetime import datetime
-from warnings import warn
-import pytz
-from typing import Tuple, Dict, Any, List, Optional, Union
-from strax import exporter, to_str_tuple
-import gridfs
-from tqdm import tqdm
-from shutil import move
-import hashlib
-from pymongo.collection import Collection
-from utilix.rundb import DB, xent_collection
-from utilix import uconfig, logger
-
-
-export, __all__ = exporter()
-
-
-@export
-class GridFsBase:
- """Base class for GridFS operations."""
-
- def __init__(self, config_identifier: str = "config_name", **kwargs: Any) -> None:
- self.config_identifier = config_identifier
-
- def get_query_config(self, config: str) -> Dict[str, str]:
- """Generate query identifier for a config."""
- return {self.config_identifier: config}
-
- def document_format(self, config):
- """Format of the document to upload.
-
- :param config: str, name of the file of interest
- :return: dict, that will be used to add the document
-
- """
- doc = self.get_query_config(config)
- doc.update(
- {
- "added": datetime.now(tz=pytz.utc),
- }
- )
- return doc
-
- def config_exists(self, config: str) -> bool:
- """Check if a config exists."""
- raise NotImplementedError
-
- def md5_stored(self, abs_path: str) -> bool:
- """Check if file with given MD5 is stored."""
- raise NotImplementedError
-
- def test_find(self) -> None:
- """Test the find operation."""
- raise NotImplementedError
-
- def list_files(self) -> List[str]:
- """List all files in the database."""
- raise NotImplementedError
-
- @staticmethod
- def compute_md5(abs_path: str) -> str:
- """Compute MD5 hash of a file.
-
- RAM intensive operation.
-
- """
- if not os.path.exists(abs_path):
- return ""
- # bandit: disable=B303
- hash_md5 = hashlib.md5()
- with open(abs_path, "rb") as f:
- for chunk in iter(lambda: f.read(4096), b""):
- hash_md5.update(chunk)
- return hash_md5.hexdigest()
-
-
-@export
-class GridFsInterfaceMongo(GridFsBase):
- """
- Class to upload/download the files to a database using GridFS
- for PyMongo:
- https://pymongo.readthedocs.io/en/stable/api/gridfs/index.html#module-gridfs
-
- This class does the basic shared initiation of the downloader and
- uploader classes.
-
- """
-
- def __init__(
- self,
- readonly=True,
- file_database="files",
- config_identifier="config_name",
- collection=None,
- _test_on_init=False,
- ):
- """GridFsInterface.
-
- :param readonly: bool, can one read or also write to the
- database.
- :param file_database: str, name of the database. Default should
- not be changed.
- :param config_identifier: str, header of the files that are
- saved in Gridfs
- :param collection: pymongo.collection.Collection, (Optional)
- PyMongo DataName Collection to bypass normal initiation
- using utilix. Should be an object of the form:
- pymongo.MongoClient(..).DATABASE_NAME.COLLECTION_NAME
- :param _test_on_init: Test if the collection is empty on init
- (only deactivate if you are using a brand new database)!
-
- """
-
- if collection is None:
- if not readonly:
- # We want admin access to start writing data!
- mongo_url = uconfig.get("rundb_admin", "mongo_rdb_url")
- mongo_user = uconfig.get("rundb_admin", "mongo_rdb_username")
- mongo_password = uconfig.get("rundb_admin", "mongo_rdb_password")
- else:
- # We can safely use the Utilix defaults
- mongo_url = mongo_user = mongo_password = None
-
- # If no collection arg is passed, it defaults to the 'files'
- # collection, see for more details:
- # https://github.com/XENONnT/utilix/blob/master/utilix/rundb.py
- mongo_kwargs = {
- "url": mongo_url,
- "user": mongo_user,
- "password": mongo_password,
- "database": file_database,
- }
- # We can safely hard-code the collection as that is always
- # the same with GridFS.
- collection = xent_collection(**mongo_kwargs, collection="fs.files")
- else:
- # Check the user input is fine for what we want to do.
- if not isinstance(collection, Collection):
- raise ValueError("Provide PyMongo collection (see docstring)!")
- if file_database is not None:
- raise ValueError("Already provided a collection!")
-
- # Set collection and make sure it can at least do a 'find' operation
- self.collection = collection
- if _test_on_init:
- self.test_find()
-
- # This is the identifier under which we store the files.
- self.config_identifier = config_identifier
-
- # The GridFS used in this database
- self.grid_fs = gridfs.GridFS(collection.database)
-
- def get_query_config(self, config):
- """Generate identifier to query against. This is just the configs name.
-
- :param config: str, name of the file of interest
- :return: dict, that can be used in queries
-
- """
- return {self.config_identifier: config}
-
- def config_exists(self, config):
- """Quick check if this config is already saved in the collection.
-
- :param config: str, name of the file of interest
- :return: bool, is this config name stored in the database
-
- """
- query = self.get_query_config(config)
- return self.collection.count_documents(query) > 0
-
- def md5_stored(self, abs_path):
- """
- NB: RAM intensive operation!
- Carefully compare if the MD5 identifier is the same as the file
- as stored under abs_path.
-
- :param abs_path: str, absolute path to the file name
- :return: bool, returns if the exact same file is already stored
- in the database
-
- """
- if not os.path.exists(abs_path):
- # A file that does not exist does not have the same MD5
- return False
- query = {"md5": self.compute_md5(abs_path)}
- return self.collection.count_documents(query) > 0
-
- def test_find(self):
- """Test the connection to the self.collection to see if we can perform a collection.find
- operation."""
- if self.collection.find_one(projection="_id") is None:
- raise ConnectionError("Could not find any data in this collection")
-
- def list_files(self):
- """Get a complete list of files that are stored in the database.
-
- :return: list, list of the names of the items stored in this database
-
- """
- return [
- doc[self.config_identifier]
- for doc in self.collection.find(projection={self.config_identifier: 1})
- if self.config_identifier in doc
- ]
-
- @staticmethod
- def compute_md5(abs_path):
- """
- NB: RAM intensive operation!
- Get the md5 hash of a file stored under abs_path
-
- :param abs_path: str, absolute path to a file
- :return: str, the md5-hash of the requested file
- """
- # This function is copied from:
- # stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file
-
- if not os.path.exists(abs_path):
- # if there is no file, there is nothing to compute
- return ""
- # Also, disable all the use of insecure MD2, MD4, MD5, or SHA1
- # hash function violations in this function.
- # disable bandit
- hash_md5 = hashlib.md5()
- with open(abs_path, "rb") as f:
- for chunk in iter(lambda: f.read(4096), b""):
- hash_md5.update(chunk)
- return hash_md5.hexdigest()
-
-
-@export
-class MongoUploader(GridFsInterfaceMongo):
- """Class to upload files to GridFs."""
-
- def __init__(self, readonly=False, *args, **kwargs):
- # Same as parent. Just check the readonly_argument
- if readonly:
- raise PermissionError("How can you upload if you want to operate in readonly?")
- super().__init__(*args, readonly=readonly, **kwargs)
-
- def upload_from_dict(self, file_path_dict):
- """Upload all files in the dictionary to the database.
-
- :param file_path_dict: dict, dictionary of paths to upload. The
- dict should be of the format:
- file_path_dict = {'config_name': '/the_config_path', ...}
-
- :return: None
-
- """
- if not isinstance(file_path_dict, dict):
- raise ValueError(
- "file_path_dict must be dict of form "
- '"dict(NAME=ABSOLUTE_PATH,...)". Got '
- f"{type(file_path_dict)} instead"
- )
-
- for config, abs_path in tqdm(file_path_dict.items()):
- # We need to do this expensive check here. It is not enough
- # to just check that the file is stored under the
- # 'config_identifier'. What if the file changed? Then we
- # want to upload a new file! Otherwise we could have done
- # the self.config_exists-query. If it turns out we have the
- # exact same file, forget about uploading it.
- if self.config_exists(config) and self.md5_stored(abs_path):
- continue
- else:
- # This means we are going to upload the file because its
- # not stored yet.
- try:
- self.upload_single(config, abs_path)
- except (CouldNotLoadError, ConfigTooLargeError):
- # Perhaps we should fail then?
- warn(f"Cannot upload {config}")
-
- def upload_single(self, config, abs_path):
- """Upload a single file to gridfs.
-
- :param config: str, the name under which this file should be stored
- :param abs_path: str, the absolute path of the file
-
- """
- doc = self.document_format(config)
- doc["md5"] = self.compute_md5(abs_path)
- if not os.path.exists(abs_path):
- raise CouldNotLoadError(f"{abs_path} does not exits")
-
- print(f"uploading {config}")
- with open(abs_path, "rb") as file:
- self.grid_fs.put(file, **doc)
-
-
-@export
-class MongoDownloader(GridFsInterfaceMongo):
- """Class to download files from GridFs."""
-
- _instances: Dict[Tuple, "MongoDownloader"] = {}
- _initialized: Dict[Tuple, bool] = {}
-
- def __new__(cls, *args, **kwargs):
- key = (args, frozenset(kwargs.items()))
- if key not in cls._instances:
- cls._instances[key] = super(MongoDownloader, cls).__new__(cls)
- cls._initialized[key] = False
- return cls._instances[key]
-
- def __init__(self, *args, **kwargs):
- key = (args, frozenset(kwargs.items()))
- if not self._initialized[key]:
- self._instances[key].initialize(*args, **kwargs)
- self._initialized[key] = True
- return
-
- def initialize(self, store_files_at=None, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # We are going to set a place where to store the files. It's
- # either specified by the user or we use these defaults:
- if store_files_at is None:
- store_files_at = (
- "./resource_cache",
- "/tmp/straxen_resource_cache",
- )
- elif not isinstance(store_files_at, (tuple, str, list)):
- raise ValueError(f"{store_files_at} should be tuple of paths!")
- elif isinstance(store_files_at, str):
- store_files_at = to_str_tuple(store_files_at)
-
- self.storage_options = store_files_at
-
- def download_single(self, config_name: str, human_readable_file_name=False):
- """Download the config_name if it exists.
-
- :param config_name: str, the name under which the file is stored
- :param human_readable_file_name: bool, store the file also under it's human readable name.
- It is better not to use this as the user might not know if the version of the file is
- the latest.
- :return: str, the absolute path of the file requested
-
- """
- if self.config_exists(config_name):
- # Query by name
- query = self.get_query_config(config_name)
- try:
- # This could return multiple since we upload files if
- # they have changed again! Therefore just take the last.
- fs_object = self.grid_fs.get_last_version(**query)
- except gridfs.NoFile as e:
- raise CouldNotLoadError(f"{config_name} cannot be downloaded from GridFs") from e
-
- # Ok, so we can open it. We will store the file under it's
- # md5-hash as that allows to easily compare if we already
- # have the correct file.
- if human_readable_file_name:
- target_file_name = config_name
- else:
- target_file_name = fs_object.md5
-
- for cache_folder in self.storage_options:
- possible_path = os.path.join(cache_folder, target_file_name)
- if os.path.exists(possible_path):
- # Great! This already exists. Let's just return
- # where it is stored.
- return possible_path
-
- # Apparently the file does not exist, let's find a place to
- # store the file and download it.
- store_files_at = self._check_store_files_at(self.storage_options)
- destination_path = os.path.join(store_files_at, target_file_name)
-
- # Let's open a temporary directory, download the file, and
- # try moving it to the destination_path. This prevents
- # simultaneous writes of the same file.
- with tempfile.TemporaryDirectory() as temp_directory_name:
- temp_path = os.path.join(temp_directory_name, target_file_name)
-
- with open(temp_path, "wb") as stored_file:
- # This is were we do the actual downloading!
- warn(f"Downloading {config_name} to {destination_path}", DownloadWarning)
- stored_file.write(fs_object.read())
-
- if not os.path.exists(destination_path):
- # Move the file to the place we want to store it.
- move(temp_path, destination_path)
- return destination_path
-
- else:
- raise ValueError(f"Config {config_name} cannot be downloaded since it is not stored")
-
- def get_abs_path(self, config_name):
- return self.download_single(config_name)
-
- def download_all(self):
- """Download all the files that are stored in the mongo collection."""
- for config in self.list_files():
- print(config, self.download_single(config))
-
- @staticmethod
- def _check_store_files_at(cache_folder_alternatives):
- """Iterate over the options in cache_options until we find a folder where we can store data.
- Order does matter as we iterate until we find one folder that is willing.
-
- :param cache_folder_alternatives: tuple, this tuple must be a list of paths one can try to
- store the downloaded data
- :return: str, the folder that we can write to.
-
- """
- if not isinstance(cache_folder_alternatives, (tuple, list)):
- raise ValueError("cache_folder_alternatives must be tuple")
- for folder in cache_folder_alternatives:
- if not os.path.exists(folder):
- try:
- os.makedirs(folder)
- except (PermissionError, OSError):
- continue
- if os.access(folder, os.W_OK):
- return folder
- raise PermissionError(
- f"Cannot write to any of the cache_folder_alternatives: {cache_folder_alternatives}"
- )
-
-
-@export
-class GridFsInterfaceAPI(GridFsBase):
- """Interface to gridfs using the runDB API."""
-
- def __init__(self, config_identifier: str = "config_name") -> None:
- super().__init__(config_identifier=config_identifier)
- self.db = DB()
-
- def config_exists(self, config: str) -> bool:
- """Check if config is saved in the collection."""
- query = self.get_query_config(config)
- return self.db.count_files(query) > 0
-
- def md5_stored(self, abs_path: str) -> bool:
- """Check if file with same MD5 is stored.
-
- RAM intensive.
-
- """
- if not os.path.exists(abs_path):
- return False
- query = {"md5": self.compute_md5(abs_path)}
- return self.db.count_files(query) > 0
-
- def test_find(self) -> None:
- """Test the connection to the collection."""
- if self.db.get_files({}, projection={"_id": 1}) is None:
- raise ConnectionError("Could not find any data in this collection")
-
- def list_files(self) -> List[str]:
- """Get list of files stored in the database."""
- return [
- doc[self.config_identifier]
- for doc in self.db.get_files({}, projection={self.config_identifier: 1})
- if self.config_identifier in doc
- ]
-
-
-@export
-class APIUploader(GridFsInterfaceAPI):
- """Upload files to gridfs using the runDB API."""
-
- def __init__(self, config_identifier: str = "config_name") -> None:
- super().__init__(config_identifier=config_identifier)
-
- def upload_single(self, config: str, abs_path: str) -> None:
- """Upload a single file to gridfs.
-
- :param config: str, the name under which this file should be stored
- :param abs_path: str, the absolute path of the file
-
- """
- if not os.path.exists(abs_path):
- raise CouldNotLoadError(f"{abs_path} does not exist")
-
- logger.info(f"uploading file {config} from {abs_path}")
- self.db.upload_file(abs_path, config)
-
-
-@export
-class APIDownloader(GridFsInterfaceAPI):
- """Download files from gridfs using the runDB API."""
-
- _instances: Dict[Tuple, "APIDownloader"] = {}
- _initialized: Dict[Tuple, bool] = {}
-
- def __new__(cls, *args: Any, **kwargs: Any) -> "APIDownloader":
- key = (args, frozenset(kwargs.items()))
- if key not in cls._instances:
- cls._instances[key] = super(APIDownloader, cls).__new__(cls)
- cls._initialized[key] = False
- return cls._instances[key]
-
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- key = (args, frozenset(kwargs.items()))
- if not self._initialized[key]:
- self._instances[key].initialize(*args, **kwargs)
- self._initialized[key] = True
-
- def initialize(
- self,
- config_identifier: str = "config_name",
- store_files_at: Optional[Union[str, Tuple[str, ...], List[str]]] = None,
- *args: Any,
- **kwargs: Any,
- ) -> None:
- super().__init__(config_identifier=config_identifier)
-
- if store_files_at is None:
- store_files_at = (
- "./resource_cache",
- "/tmp/straxen_resource_cache",
- )
- elif isinstance(store_files_at, str):
- store_files_at = (store_files_at,)
- elif isinstance(store_files_at, list):
- store_files_at = tuple(store_files_at)
- elif not isinstance(store_files_at, tuple):
- raise ValueError(f"{store_files_at} should be a string, list, or tuple of paths!")
-
- self.storage_options: Tuple[str, ...] = store_files_at
-
- def download_single(
- self,
- config_name: str,
- write_to: Optional[str] = None,
- human_readable_file_name: bool = False,
- ) -> str:
- """Download the config_name if it exists."""
- target_file_name = (
- config_name if human_readable_file_name else self.db.get_file_md5(config_name)
- )
-
- # check if self.storage_options is None or empty
- if not self.storage_options:
- raise ValueError("No storage options available")
-
- if write_to is None:
- for cache_folder in self.storage_options:
- possible_path = os.path.join(cache_folder, target_file_name)
- if os.path.exists(possible_path):
- return possible_path
-
- store_files_at = self._check_store_files_at(self.storage_options)
- else:
- store_files_at = write_to
-
- # make sure store_files_at is a string
- if not isinstance(store_files_at, str):
- raise TypeError(f"Expected string for store_files_at, got {type(store_files_at)}")
-
- destination_path = os.path.join(store_files_at, target_file_name)
-
- with tempfile.TemporaryDirectory() as temp_directory_name:
- temp_path = self.db.download_file(config_name, save_dir=temp_directory_name)
- if not os.path.exists(destination_path):
- move(temp_path, destination_path)
- else:
- warn(f"File {destination_path} already exists. Not overwriting.")
- return destination_path
-
- def _check_store_files_at(self, options: Union[str, Tuple[str, ...]]) -> str:
- """Check and return a valid storage location."""
- if isinstance(options, str):
- return options
- for option in options:
- if os.path.isdir(option):
- return option
- raise ValueError("No valid storage location found")
-
-
-class DownloadWarning(UserWarning):
- pass
-
-
-class CouldNotLoadError(Exception):
- """Raise if we cannot load this kind of data."""
-
- # Disable the inspection of 'Unnecessary pass statement'
- # pylint: disable=unnecessary-pass
- pass
-
-
-class ConfigTooLargeError(Exception):
- """Raise if the data is to large to be uploaded into mongo."""
-
- # Disable the inspection of 'Unnecessary pass statement'
- # pylint: disable=unnecessary-pass
- pass
diff --git a/tests/storage/test_mongo_downloader.py b/tests/storage/test_mongo_downloader.py
index 7a314ebc5..a7c2cca17 100644
--- a/tests/storage/test_mongo_downloader.py
+++ b/tests/storage/test_mongo_downloader.py
@@ -1,4 +1,6 @@
import unittest
+
+import utilix
import straxen
import os
import pymongo
@@ -32,13 +34,13 @@ def setUp(self):
client = pymongo.MongoClient(uri)
database = client[db_name]
collection = database[collection_name]
- self.downloader = straxen.MongoDownloader(
+ self.downloader = utilix.mongo_storage.MongoDownloader(
collection=collection,
readonly=True,
file_database=None,
_test_on_init=False,
)
- self.uploader = straxen.MongoUploader(
+ self.uploader = utilix.mongo_storage.MongoUploader(
collection=collection,
readonly=False,
file_database=None,
@@ -76,7 +78,7 @@ def test_up_and_download(self):
self.downloader.test_find()
self.downloader.download_all()
# Now the test on init should work, let's double try
- straxen.MongoDownloader(
+ utilix.mongo_storage.MongoDownloader(
collection=self.collection,
file_database=None,
_test_on_init=True,
@@ -85,28 +87,28 @@ def test_up_and_download(self):
def test_invalid_methods(self):
"""The following examples should NOT work, let's make sure the right errors are raised."""
with self.assertRaises(ValueError):
- straxen.MongoDownloader(
+ utilix.mongo_storage.MongoDownloader(
collection=self.collection,
file_database="NOT NONE",
)
with self.assertRaises(ValueError):
- straxen.MongoDownloader(
+ utilix.mongo_storage.MongoDownloader(
collection="invalid type",
)
with self.assertRaises(PermissionError):
- straxen.MongoUploader(readonly=True)
+ utilix.mongo_storage.MongoUploader(readonly=True)
with self.assertRaises(ValueError):
self.uploader.upload_from_dict("A string is not a dict")
- with self.assertRaises(straxen.mongo_storage.CouldNotLoadError):
+ with self.assertRaises(utilix.mongo_storage.CouldNotLoadError):
self.uploader.upload_single("no_such_file", "no_such_file")
with self.assertWarns(UserWarning):
self.uploader.upload_from_dict({"something": "no_such_file"})
with self.assertRaises(ValueError):
- straxen.MongoDownloader(
+ utilix.mongo_storage.MongoDownloader(
collection=self.collection,
file_database=None,
_test_on_init=False,
diff --git a/tests/storage/test_mongo_interactions.py b/tests/storage/test_mongo_interactions.py
index abd170017..84f9a484f 100644
--- a/tests/storage/test_mongo_interactions.py
+++ b/tests/storage/test_mongo_interactions.py
@@ -8,6 +8,7 @@
import straxen
import os
import unittest
+import utilix
@unittest.skipIf(not straxen.utilix_is_configured(), "No db access, cannot test!")
@@ -38,6 +39,6 @@ def test_select_runs(self, check_n_runs=2):
class TestDownloader(unittest.TestCase):
def test_downloader(self):
"""Test if we can download a small file from the downloader."""
- downloader = straxen.MongoDownloader()
+ downloader = utilix.mongo_storage.MongoDownloader()
path = downloader.download_single("to_pe_nt.npy")
self.assertTrue(os.path.exists(path))