diff --git a/pastastore/base.py b/pastastore/base.py index f2b34058..07cfda39 100644 --- a/pastastore/base.py +++ b/pastastore/base.py @@ -1,225 +1,697 @@ +import functools import json import warnings from abc import ABC, abstractmethod, abstractproperty from collections.abc import Iterable -from typing import Optional, Union +from typing import Dict, List, Optional, Tuple, Union import pandas as pd import pastas as ps -from pastas import Model from pastas.io.pas import PastasEncoder from tqdm import tqdm -from .util import _custom_warning +from .util import ItemInLibraryException, _custom_warning, validate_names FrameorSeriesUnion = Union[pd.DataFrame, pd.Series] warnings.showwarning = _custom_warning -class BaseConnector(ABC): # pragma: no cover - """Metaclass for connecting to data management sources. +class BaseConnector(ABC): + """Base Connector class. - For example, MongoDB through Arctic, Pystore, or other databases. - Create your own connection to a data source by writing a a class - that inherits from this BaseConnector. Your class has to override - each method and property. + Class holds base logic for dealing with timeseries and Pastas + Models. Create your own Connector to a data source by writing a a + class that inherits from this BaseConnector. Your class has to + override each abstractmethod and abstractproperty. """ + _default_library_names = ["oseries", "stresses", "models"] + def __repr__(self): + """Representation string of the object.""" + return (f"<{type(self).__name__} object> '{self.name}': " + f"{self.n_oseries} oseries, " + f"{self.n_stresses} stresses, " + f"{self.n_models} models") + @abstractmethod - def get_library(self, libname: str): + def _get_library(self, libname: str): """Get library handle. + Must be overriden by subclass. + Parameters ---------- - libname : str, + libname : str name of the library + + Returns + ------- + lib : Any + handle to the library + """ + pass + + @abstractmethod + def _add_item(self, libname: str, + item: Union[FrameorSeriesUnion, Dict], + name: str, + metadata: Optional[Dict] = None, + overwrite: bool = False) -> None: + """Internal method to add item for both timeseries and pastas.Models. + + Must be overriden by subclass. + + Parameters + ---------- + libname : str + name of library to add item to + item : FrameorSeriesUnion or dict + item to add + name : str + name of the item + metadata : dict, optional + dictionary containing metadata, by default None """ pass @abstractmethod - def add_oseries(self, series: FrameorSeriesUnion, name: str, - metadata: Union[dict, None] = None, - overwrite: bool = False, **kwargs) -> None: - """Add oseries. + def _get_item(self, libname: str, name: str) \ + -> Union[FrameorSeriesUnion, Dict]: + """Internal method to get item (series or pastas.Models). + + Must be overriden by subclass. Parameters ---------- + libname : str + name of library + name : str + name of item + + Returns + ------- + item : FrameorSeriesUnion or dict + item (timeseries or pastas.Model) + """ + pass + + @abstractmethod + def _del_item(self, libname: str, name: str) -> None: + """Internal method to delete items (series or models). + + Must be overriden by subclass. + + Parameters + ---------- + libname : str + name of library to delete item from + name : str + name of item to delete + """ + pass + + @abstractmethod + def _get_metadata(self, libname: str, name: str) -> Dict: + """Internal method to get metadata. + + Must be overriden by subclass. + + Parameters + ---------- + libname : str + name of the library + name : str + name of the item + + Returns + ------- + metadata : dict + dictionary containing metadata + """ + pass + + @abstractproperty + def oseries_names(self): + """List of oseries names. + + Property must be overriden by subclass. + """ + pass + + @abstractproperty + def stresses_names(self): + """List of stresses names. + + Property must be overriden by subclass. + """ + pass + + @abstractproperty + def model_names(self): + """List of model names. + + Property must be overriden by subclass. + """ + pass + + def _add_series(self, libname: str, + series: FrameorSeriesUnion, + name: str, + metadata: Optional[dict] = None, + overwrite: bool = False) -> None: + """Internal method to add series to database. + + Parameters + ---------- + libname : str + name of the library to add the series to + series : pandas.Series or pandas.DataFrame + data to add + name : str + name of the timeseries + metadata : dict, optional + dictionary containing metadata, by default None + overwrite : bool, optional + overwrite existing dataset with the same name, + by default False + + Raises + ------ + ItemInLibraryException + if overwrite is False and name is already in the database + """ + self._validate_input_series(series) + series = self._set_series_name(series, name) + in_store = getattr(self, f"{libname}_names") + if name not in in_store or overwrite: + self._add_item(libname, series, name, metadata=metadata, + overwrite=overwrite) + self._clear_cache(libname) + else: + raise ItemInLibraryException(f"Item with name '{name}' already" + f" in '{libname}' library!") + + def _update_series(self, libname: str, + series: FrameorSeriesUnion, + name: str, + metadata: Optional[dict] = None) -> None: + """Internal method to update timeseries. + + Parameters + ---------- + libname : str + name of library series : FrameorSeriesUnion - pandas.Series or pandas.DataFrame to add + timeseries containing update values name : str - name of the series + name of the timeseries to update metadata : Optional[dict], optional - dictionary containing metadata, by default None - overwrite: bool, optional + optionally provide metadata dictionary which will also update + the current stored metadata dictionary, by default None + """ + if libname not in ["oseries", "stresses"]: + raise ValueError("Library must be 'oseries' or 'stresses'!") + self._validate_input_series(series) + series = self._set_series_name(series, name) + stored = self._get_series(libname, name, progressbar=False) + # get union of index + idx_union = stored.index.union(series.index) + # update series with new values + update = stored.reindex(idx_union) + update.update(series) + # metadata + update_meta = self._get_metadata(libname, name) + update_meta.update(metadata) + self._add_series(libname, update, name, metadata=update_meta, + overwrite=True) + + def update_metadata(self, libname: str, name: str, metadata: dict) -> None: + """Update metadata. + + Note: also retrieves and stores timeseries as updating only metadata + is not really supported. + + Parameters + ---------- + libname : str + name of library + name : str + name of the item for which to update metadata + metadata : dict + metadata dictionary that will be used to update the stored + metadata + """ + + if libname not in ["oseries", "stresses"]: + raise ValueError("Library must be 'oseries' or 'stresses'!") + update_meta = self._get_metadata(libname, name) + update_meta.update(metadata) + # get series, since just updating metadata is not really defined + # in all cases + s = self._get_series(libname, name, progressbar=False) + self._add_series(libname, s, name, metadata=update_meta, + overwrite=True) + + def add_oseries(self, series: Union[FrameorSeriesUnion, ps.TimeSeries], + name: str, + metadata: Optional[dict] = None, + overwrite: bool = False) -> None: + """Add oseries to the database. + + Parameters + ---------- + series : pandas.Series, pandas.DataFrame or pastas.TimeSeries + data to add + name : str + name of the timeseries + metadata : dict, optional + dictionary containing metadata, by default None. If + pastas.TimeSeries is passed, metadata is kwarg is ignored and + metadata is taken from pastas.TimeSeries object + overwrite : bool, optional overwrite existing dataset with the same name, by default False """ - pass + series, metadata = self._parse_series_input(series, metadata) + self._add_series("oseries", series, name=name, metadata=metadata, + overwrite=overwrite) - @abstractmethod - def add_stress(self, series: FrameorSeriesUnion, name: str, kind: str, + def add_stress(self, series: Union[FrameorSeriesUnion, ps.TimeSeries], + name: str, kind: str, metadata: Optional[dict] = None, - overwrite: bool = False, **kwargs) -> None: - """Add stress. + overwrite: bool = False) -> None: + """Add stress to the database. Parameters ---------- - series : FrameorSeriesUnion - pandas.Series or pandas.DataFrame to add + series : pandas.Series, pandas.DataFrame or pastas.TimeSeries + data to add, if pastas.Timeseries is passed, series_orignal + and metadata is stored in database name : str - name of the series + name of the timeseries kind : str - label specifying type of stress (i.e. 'prec' or 'evap') - metadata : Optional[dict], optional - dictionary containing metadata, by default None - overwrite: bool, optional + category to identify type of stress, this label is added to the + metadata dictionary. + metadata : dict, optional + dictionary containing metadata, by default None. If + pastas.TimeSeries is passed, metadata is kwarg is ignored and + metadata is taken from pastas.TimeSeries object + overwrite : bool, optional overwrite existing dataset with the same name, by default False """ - pass + series, metadata = self._parse_series_input(series, metadata) + if metadata is None: + metadata = {} + metadata["kind"] = kind + self._add_series("stresses", series, name=name, + metadata=metadata, overwrite=overwrite) + + def add_model(self, ml: Union[ps.Model, dict], + overwrite: bool = False, + validate_metadata: bool = False) -> None: + """Add model to the database. - @abstractmethod - def add_model(self, ml: Model, overwrite: bool = False, **kwargs) -> None: - """Add model. + Parameters + ---------- + ml : pastas.Model or dict + pastas Model or dictionary to add to the database + overwrite : bool, optional + if True, overwrite existing model, by default False + validate_metadata, bool optional + remove unsupported characters from metadata dictionary keys + + Raises + ------ + TypeError + if model is not pastas.Model or dict + ItemInLibraryException + if overwrite is False and model is already in the database + """ + if isinstance(ml, ps.Model): + mldict = ml.to_dict(series=False) + name = ml.name + if validate_metadata: + metadata = validate_names(d=ml.oseries.metadata) + else: + metadata = ml.oseries.metadata + elif isinstance(ml, dict): + mldict = ml + name = ml["name"] + metadata = None + else: + raise TypeError("Expected pastas.Model or dict!") + + if name not in self.model_names or overwrite: + # check if oseries and stresses exist in store + self._check_model_series_names_for_store(ml) + self._check_oseries_in_store(ml) + self._check_stresses_in_store(ml) + # write model to store + self._add_item("models", mldict, name, metadata=metadata, + overwrite=overwrite) + else: + raise ItemInLibraryException(f"Model with name '{name}' " + "already in 'models' library!") + self._clear_cache("models") + + @staticmethod + def _parse_series_input(series: Union[FrameorSeriesUnion, ps.TimeSeries], + metadata: Optional[Dict] = None) \ + -> Tuple[FrameorSeriesUnion, Optional[Dict]]: + """Internal method to parse series input. + + Parameters + ---------- + series : Union[FrameorSeriesUnion, ps.TimeSeries], + series object to parse + metadata : dict, optional + metadata dictionary or None, by default None + + Returns + ------- + series, metadata : FrameorSeriesUnion, Optional[Dict] + timeseries as pandas.Series or DataFrame and optionally + metadata dictionary + """ + if isinstance(series, ps.TimeSeries): + if metadata is not None: + print("Warning! Metadata kwarg ignored. Metadata taken from " + "pastas.TimeSeries object!") + s = series.series_original + m = series.metadata + else: + s = series + m = metadata + return s, m + + def update_oseries(self, series: Union[FrameorSeriesUnion, ps.TimeSeries], + name: str, metadata: Optional[dict] = None) -> None: + """Update oseries values. Parameters ---------- - ml : Model - pastas.Model + series : Union[FrameorSeriesUnion, ps.TimeSeries] + timeseries to update stored oseries with + name : str + name of the oseries to update + metadata : Optional[dict], optional + optionally provide metadata, which will update + the stored metadata dictionary, by default None """ - pass + series, metadata = self._parse_series_input(series, metadata) + self._update_series("oseries", series, name, metadata=metadata) + + def update_stress(self, series: Union[FrameorSeriesUnion, ps.TimeSeries], + name: str, metadata: Optional[dict] = None) -> None: + """Update stresses values. + + Note: the 'kind' attribute of a stress cannot be updated! To update + the 'kind' delete and add the stress again. + + Parameters + ---------- + series : Union[FrameorSeriesUnion, ps.TimeSeries] + timeseries to update stored stress with + name : str + name of the stress to update + metadata : Optional[dict], optional + optionally provide metadata, which will update + the stored metadata dictionary, by default None + """ + series, metadata = self._parse_series_input(series, metadata) + self._update_series("stresses", series, name, metadata=metadata) - @abstractmethod def del_models(self, names: Union[list, str]) -> None: - """Delete model. + """Delete model(s) from the database. Parameters ---------- - names : Union[list, str] - str or list of str of model names to delete + names : str or list of str + name(s) of the model to delete """ - pass + for n in self._parse_names(names, libname="models"): + self._del_item("models", n) + self._clear_cache("models") - @abstractmethod - def del_oseries(self, names: Union[list, str]) -> None: - """Delete oseries. + def del_oseries(self, names: Union[list, str]): + """Delete oseries from the database. Parameters ---------- - names : Union[list, str] - str or list of str of oseries names to delete + names : str or list of str + name(s) of the oseries to delete """ - pass + for n in self._parse_names(names, libname="oseries"): + self._del_item("oseries", n) + self._clear_cache("oseries") - @abstractmethod - def del_stress(self, names: Union[list, str]) -> None: - """Delete stresses. + def del_stress(self, names: Union[list, str]): + """Delete stress from the database. Parameters ---------- - names : Union[list, str] - str or list of str of stresses to delete + names : str or list of str + name(s) of the stress to delete """ - pass + for n in self._parse_names(names, libname="stresses"): + self._del_item("stresses", n) + self._clear_cache("stresses") - @abstractmethod - def get_metadata(self, libname: str, names: Union[list, str], - progressbar: bool = False, as_frame: bool = True) \ - -> Union[pd.DataFrame, dict]: - """Get metadata for oseries or stress. + def _get_series(self, libname: str, names: Union[list, str], + progressbar: bool = True, squeeze: bool = True) \ + -> FrameorSeriesUnion: + """Internal method to get timeseries. Parameters ---------- libname : str - name of library - names : Union[list, str] - str or list of str of series to get metadata for + name of the library + names : str or list of str + names of the timeseries to load progressbar : bool, optional - show progressbar, by default False - as_frame : bool, optional - return as dataframe, by default True + show progressbar, by default True + squeeze : bool, optional + if True return DataFrame or Series instead of dictionary + for single entry Returns ------- - Union[pd.DataFrame, dict] - dictionary or pandas.DataFrame depending on value of `as_frame`. + pandas.DataFrame or dict of pandas.DataFrames + either returns timeseries as pandas.DataFrame or + dictionary containing the timeseries. """ - pass + ts = {} + names = self._parse_names(names, libname=libname) + desc = f"Get {libname}" + for n in (tqdm(names, desc=desc) if progressbar else names): + ts[n] = self._get_item(libname, n) + # return frame if len == 1 + if len(ts) == 1 and squeeze: + return ts[n] + else: + return ts + + def get_metadata(self, libname: str, names: Union[list, str], + progressbar: bool = False, as_frame: bool = True, + squeeze: bool = True) -> Union[dict, pd.DataFrame]: + """Read metadata from database. + + Parameters + ---------- + libname : str + name of the library containing the dataset + names : str or list of str + names of the datasets for which to read the metadata + squeeze : bool, optional + if True return dict instead of list of dict + for single entry + + Returns + ------- + dict or pandas.DataFrame + returns metadata dictionary or DataFrame of metadata + """ + metalist = [] + names = self._parse_names(names, libname=libname) + desc = f"Get metadata {libname}" + for n in (tqdm(names, desc=desc) if progressbar else names): + imeta = self._get_metadata(libname, n) + if imeta is None: + imeta = {} + if "name" not in imeta.keys(): + imeta["name"] = n + metalist.append(imeta) + if as_frame: + meta = self._meta_list_to_frame(metalist, names=names) + return meta + else: + if len(metalist) == 1 and squeeze: + return metalist[0] + else: + return metalist - @abstractmethod def get_oseries(self, names: Union[list, str], - progressbar: bool = False) -> FrameorSeriesUnion: - """Get oseries. + return_metadata: bool = False, + progressbar: bool = False, + squeeze: bool = True) \ + -> Union[Union[FrameorSeriesUnion, Dict], + Optional[Union[Dict, List]]]: + """Get oseries from database. Parameters ---------- - names : Union[list, str] - str or list of str of names of oseries to retrieve + names : str or list of str + names of the oseries to load + return_metadata : bool, optional + return metadata as dictionary or list of dictionaries, + default is False progressbar : bool, optional show progressbar, by default False + squeeze : bool, optional + if True return DataFrame or Series instead of dictionary + for single entry Returns ------- - dict, pandas.DataFrame - return dictionary containing data if multiple names are passed, - else return pandas.DataFrame or pandas.Series + oseries : pandas.DataFrame or dict of DataFrames + returns timeseries as DataFrame or dictionary of DataFrames if + multiple names were passed + metadata : dict or list of dict + metadata for each oseries, only returned if return_metadata=True """ - pass + oseries = self._get_series("oseries", names, progressbar=progressbar, + squeeze=squeeze) + if return_metadata: + metadata = self.get_metadata("oseries", + names, + progressbar=progressbar, + as_frame=False, + squeeze=squeeze) + return oseries, metadata + else: + return oseries - @abstractmethod def get_stresses(self, names: Union[list, str], - progressbar: bool = False) -> FrameorSeriesUnion: - """Get stresses. + return_metadata: bool = False, + progressbar: bool = False, + squeeze: bool = True) \ + -> Union[Union[FrameorSeriesUnion, Dict], + Optional[Union[Dict, List]]]: + """Get stresses from database. Parameters ---------- - names : Union[list, str] - str or list of str of names of stresses to retrieve + names : str or list of str + names of the stresses to load + return_metadata : bool, optional + return metadata as dictionary or list of dictionaries, + default is False progressbar : bool, optional show progressbar, by default False + squeeze : bool, optional + if True return DataFrame or Series instead of dictionary + for single entry Returns ------- - dict, pandas.DataFrame - return dictionary containing data if multiple names are passed, - else return pandas.DataFrame or pandas.Series + stresses : pandas.DataFrame or dict of DataFrames + returns timeseries as DataFrame or dictionary of DataFrames if + multiple names were passed + metadata : dict or list of dict + metadata for each stress, only returned if return_metadata=True """ - pass + stresses = self._get_series("stresses", names, progressbar=progressbar, + squeeze=squeeze) + if return_metadata: + metadata = self.get_metadata("stresses", + names, + progressbar=progressbar, + as_frame=False, + squeeze=squeeze) + return stresses, metadata + else: + return stresses - @abstractmethod - def get_models(self, names: Union[list, str], progressbar: bool = False, - **kwargs) -> Union[Model, dict]: - """Get models. + def get_models(self, names: Union[list, str], return_dict: bool = False, + progressbar: bool = False, squeeze: bool = True, + update_ts_settings: bool = False) \ + -> Union[ps.Model, list]: + """Load models from database. Parameters ---------- - names : Union[list, str] - str or list of str of models to retrieve + names : str or list of str + names of the models to load + return_dict : bool, optional + return model dictionary instead of pastas.Model (much + faster for obtaining parameters, for example) progressbar : bool, optional show progressbar, by default False + squeeze : bool, optional + if True return Model instead of list of Models + for single entry + update_ts_settings : bool, optional + update timeseries settings based on timeseries in store. + overwrites stored tmin/tmax in model. Returns ------- - Union[Model, dict] - return pastas.Model if only one name is passed, else return dict + pastas.Model or list of pastas.Model + return pastas model, or list of models if multiple names were + passed """ - pass + models = [] + names = self._parse_names(names, libname="models") + desc = "Get models" + for n in (tqdm(names, desc=desc) if progressbar else names): + data = self._get_item("models", n) + if return_dict: + ml = data + else: + ml = self._parse_model_dict( + data, update_ts_settings=update_ts_settings) + models.append(ml) + if len(models) == 1 and squeeze: + return models[0] + else: + return models - @abstractproperty + @staticmethod + def _clear_cache(libname: str) -> None: + """Clear cached property.""" + getattr(BaseConnector, libname).fget.cache_clear() + + @property # type: ignore + @functools.lru_cache() def oseries(self): - """Dataframe containing oseries overview.""" - pass + """Dataframe with overview of oseries.""" + return self.get_metadata("oseries", self.oseries_names) - @abstractproperty + @property # type: ignore + @functools.lru_cache() def stresses(self): - """Dataframe containing stresses overview.""" - pass + """Dataframe with overview of stresses.""" + return self.get_metadata("stresses", self.stresses_names) - @abstractproperty + @property # type: ignore + @functools.lru_cache() def models(self): """List of model names.""" - pass + return self.model_names + + @property + def n_oseries(self): + return len(self.oseries_names) + + @property + def n_stresses(self): + return len(self.stresses_names) + + @property + def n_models(self): + return len(self.model_names) class ConnectorUtil: @@ -248,11 +720,11 @@ def _parse_names(self, names: Optional[Union[list, str]] = None, return [names] elif names is None or names == "all": if libname == "oseries": - return getattr(self, "oseries").index.to_list() + return getattr(self, "oseries_names") elif libname == "stresses": - return getattr(self, "stresses").index.to_list() + return getattr(self, "stresses_names") elif libname == "models": - return getattr(self, "models") + return getattr(self, "model_names") else: raise ValueError(f"No library '{libname}'!") else: diff --git a/pastastore/connectors.py b/pastastore/connectors.py index 240f0916..2776bffb 100644 --- a/pastastore/connectors.py +++ b/pastastore/connectors.py @@ -1,49 +1,25 @@ -import functools import json import os import warnings from copy import deepcopy from importlib import import_module -from typing import Optional, Union, List, Dict +from typing import Dict, Optional, Union import pandas as pd -import pastas as ps -from pastas import Model from pastas.io.pas import PastasEncoder, pastas_hook -from tqdm import tqdm from .base import BaseConnector, ConnectorUtil -from .util import _custom_warning, validate_names +from .util import _custom_warning FrameorSeriesUnion = Union[pd.DataFrame, pd.Series] warnings.showwarning = _custom_warning class ArcticConnector(BaseConnector, ConnectorUtil): - """Object to serve as the interface between MongoDB and Python using the - Arctic module. Provides all the methods to read, write, or delete data from - the database. - - Create an ArcticConnector object that connects to a - running MongoDB database via Arctic. - - Parameters - ---------- - connstr : str - connection string - projectname : str - name of the project - library_map: dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. - """ conn_type = "arctic" - def __init__(self, name: str, connstr: str, - library_map: Optional[dict] = None): + def __init__(self, name: str, connstr: str): """Create an ArcticConnector object that connects to a running MongoDB database via Arctic. @@ -53,11 +29,6 @@ def __init__(self, name: str, connstr: str, connection string (e.g. 'mongodb://localhost:27017/') name : str name of the project - library_map: dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. """ try: import arctic @@ -73,40 +44,25 @@ def __init__(self, name: str, connstr: str, self.libs: dict = {} self.arc = arctic.Arctic(connstr) - self._initialize(library_map) - - def __repr__(self): - """Representation string of the object.""" - noseries = len(self.get_library("oseries").list_symbols()) - nstresses = len(self.get_library("stresses").list_symbols()) - nmodels = len(self.get_library("models").list_symbols()) - return (" '{0}': {1} oseries, " - "{2} stresses, {3} models".format( - self.name, noseries, nstresses, nmodels)) - - def _initialize(self, library_map: Optional[dict]) -> None: - """Internal method to initalize the libraries.""" - if library_map is None: - libmap = {i: i for i in self._default_library_names} - else: - libmap = library_map + self._initialize() - self.library_map = libmap + def _initialize(self) -> None: + """Internal method to initalize the libraries.""" - for libname in libmap.values(): + for libname in self._default_library_names: if self._library_name(libname) not in self.arc.list_libraries(): self.arc.initialize_library(self._library_name(libname)) else: print(f"ArcticConnector: library " f"'{self._library_name(libname)}'" " already exists. Linking to existing library.") - self.libs[libname] = self.get_library(libname) + self.libs[libname] = self._get_library(libname) def _library_name(self, libname: str) -> str: """Internal method to get full library name according to Arctic.""" return ".".join([self.name, libname]) - def get_library(self, libname: str): + def _get_library(self, libname: str): """Get Arctic library handle. Parameters @@ -116,160 +72,52 @@ def get_library(self, libname: str): Returns ------- - Arctic.library handle + lib : arctic.Library handle handle to the library """ - # get custom library name if necessary - real_libname = self.library_map[libname] - # get library handle - lib = self.arc.get_library(self._library_name(real_libname)) + lib = self.arc.get_library(self._library_name(libname)) return lib - def _add_series(self, libname: str, series: FrameorSeriesUnion, name: str, - metadata: Optional[dict] = None, - overwrite: bool = False) -> None: - """Internal method to add series to database. + def _add_item(self, libname: str, + item: Union[FrameorSeriesUnion, Dict], + name: str, + metadata: Optional[Dict] = None, + **_) -> None: + """Internal method to add item to library (timeseries or model). Parameters ---------- libname : str - name of the library to add the series to - series : pandas.Series or pandas.DataFrame - data to add + name of the library + item : Union[FrameorSeriesUnion, Dict] + item to add, either timeseries or pastas.Model as dictionary name : str - name of the timeseries - metadata : dict, optional + name of the item + metadata : Optional[Dict], optional dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing dataset with the same name, - by default False - - Raises - ------ - Exception - if overwrite is False and name is already in the database - raises an Exception. """ - self._validate_input_series(series) - series = self._set_series_name(series, name) - lib = self.get_library(libname) - if name not in lib.list_symbols() or overwrite: - lib.write(name, series, metadata=metadata) - self._clear_cache(libname) - else: - raise Exception("Item with name '{0}' already" - " in '{1}' library!".format(name, libname)) + lib = self._get_library(libname) + lib.write(name, item, metadata=metadata) - def add_oseries(self, series: FrameorSeriesUnion, name: str, - metadata: Optional[dict] = None, - overwrite: bool = False) -> None: - """Add oseries to the database. + def _get_item(self, libname: str, name: str) \ + -> Union[FrameorSeriesUnion, Dict]: + """Internal method to retrieve item from library. Parameters ---------- - series : pandas.Series or pandas.DataFrame - data to add - name : str - name of the timeseries - metadata : dict, optional - dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing dataset with the same name, - by default False - """ - if isinstance(series, pd.DataFrame) and len(series.columns) > 1: - if metadata is None: - print("Data contains multiple columns, " - "assuming values in column 0!") - metadata = {"value_col": 0} - elif "value_col" not in metadata.keys(): - print("Data contains multiple columns, " - "assuming values in column 0!") - metadata["value_col"] = 0 - - self._add_series("oseries", series, name=name, - metadata=metadata, overwrite=overwrite) - - def add_stress(self, series: FrameorSeriesUnion, name: str, kind: str, - metadata: Optional[dict] = None, - overwrite: bool = False) -> None: - """Add stress to the database. - - Parameters - ---------- - series : pandas.Series or pandas.DataFrame - data to add + libname : str + name of the library name : str - name of the timeseries - kind : str - category to identify type of stress, this label is added to the - metadata dictionary. - metadata : dict, optional - dictionary containing metadata, by default None. Also used to - point to column containing timeseries if DataFrame is passed - using the "value_col" key. - overwrite : bool, optional - overwrite existing dataset with the same name, - by default False - """ - if metadata is None: - metadata = {} - - if isinstance(series, pd.DataFrame) and len(series.columns) > 1: - print("Data contains multiple columns, " - "assuming values in column 0!") - metadata["value_col"] = 0 + name of the item - metadata["kind"] = kind - self._add_series("stresses", series, name=name, - metadata=metadata, overwrite=overwrite) - - def add_model(self, ml: Union[ps.Model, dict], - overwrite: bool = False, - validate_metadata: bool = False) -> None: - """Add model to the database. - - Parameters - ---------- - ml : pastas.Model or dict - pastas Model or dictionary to add to the database - overwrite : bool, optional - if True, overwrite existing model, by default False - validate_metadata, bool optional - remove unsupported characters from metadata dictionary keys - - Raises - ------ - Exception - if overwrite is False and model is already in the database - raises an Exception. + Returns + ------- + item : Union[FrameorSeriesUnion, Dict] + timeseries or model dictionary """ - lib = self.get_library("models") - if ml.name not in lib.list_symbols() or overwrite: - if isinstance(ml, ps.Model): - mldict = ml.to_dict(series=False) - name = ml.name - if validate_metadata: - metadata = validate_names(d=ml.oseries.metadata) - else: - metadata = ml.oseries.metadata - elif isinstance(ml, dict): - mldict = ml - name = ml["name"] - metadata = None - else: - raise TypeError("Expected pastas.Model or dict!") - # check if oseries and stresses exist in store - self._check_model_series_names_for_store(ml) - self._check_oseries_in_store(ml) - self._check_stresses_in_store(ml) - # write model to store - lib.write(name, mldict, metadata=metadata) - else: - raise Exception("Model with name '{}' already in store!".format( - ml.name)) - self._clear_cache("models") + lib = self._get_library(libname) + return lib.read(name).data def _del_item(self, libname: str, name: str) -> None: """Internal method to delete items (series or models). @@ -281,304 +129,66 @@ def _del_item(self, libname: str, name: str) -> None: name : str name of item to delete """ - lib = self.get_library(libname) + lib = self._get_library(libname) lib.delete(name) - def del_models(self, names: Union[list, str]) -> None: - """Delete model(s) from the database. - - Parameters - ---------- - names : str or list of str - name(s) of the model to delete - """ - for n in self._parse_names(names, libname="models"): - self._del_item("models", n) - self._clear_cache("models") - - def del_oseries(self, names: Union[list, str]): - """Delete oseries from the database. - - Parameters - ---------- - names : str or list of str - name(s) of the oseries to delete - """ - for n in self._parse_names(names, libname="oseries"): - self._del_item("oseries", n) - self._clear_cache("oseries") - - def del_stress(self, names: Union[list, str]): - """Delete stress from the database. - - Parameters - ---------- - names : str or list of str - name(s) of the stress to delete - """ - for n in self._parse_names(names, libname="stresses"): - self._del_item("stresses", n) - self._clear_cache("stresses") - - def _get_series(self, libname: str, names: Union[list, str], - progressbar: bool = True, squeeze: bool = True) \ - -> FrameorSeriesUnion: - """Internal method to get timeseries. + def _get_metadata(self, libname: str, name: str) -> dict: + """Internal method to retrieve metadata for an item. Parameters ---------- libname : str name of the library - names : str or list of str - names of the timeseries to load - progressbar : bool, optional - show progressbar, by default True - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry + name : str + name of the item Returns ------- - pandas.DataFrame or dict of pandas.DataFrames - either returns timeseries as pandas.DataFrame or - dictionary containing the timeseries. + dict + dictionary containing metadata """ - lib = self.get_library(libname) - - ts = {} - names = self._parse_names(names, libname=libname) - desc = f"Get {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - ts[n] = lib.read(n).data - # return frame if len == 1 - if len(ts) == 1 and squeeze: - return ts[n] - else: - return ts + lib = self._get_library(libname) + return lib.read_metadata(name).metadata - def get_metadata(self, libname: str, names: Union[list, str], - progressbar: bool = False, as_frame: bool = True, - squeeze: bool = True) -> Union[dict, pd.DataFrame]: - """Read metadata from database. - - Parameters - ---------- - libname : str - name of the library containing the dataset - names : str or list of str - names of the datasets for which to read the metadata - squeeze : bool, optional - if True return dict instead of list of dict - for single entry + @property + def oseries_names(self): + """List of oseries names. Returns ------- - dict or pandas.DataFrame - returns metadata dictionary or DataFrame of metadata + list + list of oseries in library """ - lib = self.get_library(libname) - - metalist = [] - names = self._parse_names(names, libname=libname) - desc = f"Get metadata {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - imeta = lib.read_metadata(n).metadata - if imeta is None: - imeta = {} - if "name" not in imeta.keys(): - imeta["name"] = n - metalist.append(imeta) - if as_frame: - meta = self._meta_list_to_frame(metalist, names=names) - return meta - else: - if len(metalist) == 1 and squeeze: - return metalist[0] - else: - return metalist + return self._get_library("oseries").list_symbols() - def get_oseries(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Get oseries from database. - - Parameters - ---------- - names : str or list of str - names of the oseries to load - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry + @property + def stresses_names(self): + """List of stresses names. Returns ------- - oseries : pandas.DataFrame or dict of DataFrames - returns timeseries as DataFrame or dictionary of DataFrames if - multiple names were passed - metadata : dict or list of dict - metadata for each oseries, only returned if return_metadata=True + list + list of stresses in library """ - oseries = self._get_series("oseries", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("oseries", - names, - progressbar=progressbar, - as_frame=False, - squeeze=squeeze) - return oseries, metadata - else: - return oseries - - def get_stresses(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Get stresses from database. + return self._get_library("stresses").list_symbols() - Parameters - ---------- - names : str or list of str - names of the stresses to load - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry + @property + def model_names(self): + """List of model names. Returns ------- - stresses : pandas.DataFrame or dict of DataFrames - returns timeseries as DataFrame or dictionary of DataFrames if - multiple names were passed - metadata : dict or list of dict - metadata for each stress, only returned if return_metadata=True + list + list of models in library """ - stresses = self._get_series("stresses", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("stresses", - names, - progressbar=progressbar, - as_frame=False, - squeeze=squeeze) - return stresses, metadata - else: - return stresses - - def get_models(self, names: Union[list, str], return_dict: bool = False, - progressbar: bool = False, squeeze: bool = True, - update_ts_settings: bool = False) \ - -> Union[ps.Model, list]: - """Load models from database. - - Parameters - ---------- - names : str or list of str - names of the models to load - return_dict : bool, optional - return model dictionary instead of pastas.Model (much - faster for obtaining parameters, for example) - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return Model instead of list of Models - for single entry - update_ts_settings : bool, optional - update timeseries settings based on timeseries in store. - overwrites stored tmin/tmax in model. - - Returns - ------- - pastas.Model or list of pastas.Model - return pastas model, or list of models if multiple names were - passed - """ - lib = self.get_library("models") - models = [] - names = self._parse_names(names, libname="models") - - desc = "Get models" - for n in (tqdm(names, desc=desc) if progressbar else names): - item = lib.read(n) - data = item.data - if return_dict: - ml = item.data - else: - ml = self._parse_model_dict( - data, update_ts_settings=update_ts_settings) - models.append(ml) - if len(models) == 1 and squeeze: - return models[0] - else: - return models - - @staticmethod - def _clear_cache(libname: str) -> None: - """Clear cached property.""" - getattr(ArcticConnector, libname).fget.cache_clear() - - @property # type: ignore - @functools.lru_cache() - def oseries(self): - """Dataframe with overview of oseries.""" - lib = self.get_library("oseries") - df = self.get_metadata("oseries", lib.list_symbols()) - return df - - @property # type: ignore - @functools.lru_cache() - def stresses(self): - """Dataframe with overview of stresses.""" - lib = self.get_library("stresses") - return self.get_metadata("stresses", - lib.list_symbols()) - - @property # type: ignore - @functools.lru_cache() - def models(self): - """List of model names.""" - lib = self.get_library("models") - return lib.list_symbols() + return self._get_library("models").list_symbols() class PystoreConnector(BaseConnector, ConnectorUtil): - """Object to serve as the interface between storage and Python using the - Pystore module. Provides all the methods to read, write, or delete data - from the pystore. - - Create as PystoreConnector object that connects to a folder on disk - containing a Pystore. - - Parameters - ---------- - name : str - name of the store - path : str - path to the pystore directory - library_map: dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. - """ + conn_type = "pystore" - def __init__(self, name: str, path: str, - library_map: Optional[dict] = None): + def __init__(self, name: str, path: str): """Create a PystoreConnector object that points to a Pystore. Parameters @@ -587,11 +197,6 @@ def __init__(self, name: str, path: str, name of the store path : str path to the pystore directory - library_map : dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. """ try: import pystore @@ -604,33 +209,18 @@ def __init__(self, name: str, path: str, pystore.set_path(self.path) self.store = pystore.store(self.name) self.libs: dict = {} - self._initialize(library_map) - - def __repr__(self): - """Representation string of the object.""" - storename = self.name - noseries = len(self.get_library("oseries").list_items()) - nstresses = len(self.get_library("stresses").list_items()) - nmodels = len(self.get_library("models").list_items()) - return (f" '{storename}': {noseries} oseries," - f" {nstresses} stresses, {nmodels} models") - - def _initialize(self, library_map: Optional[dict]): - """Internal method to initalize the libraries (stores).""" - if library_map is None: - self.library_map = {i: i for i in self._default_library_names} - else: - self.library_map = library_map + self._initialize() - for libname in self.library_map.values(): + def _initialize(self) -> None: + """Internal method to initalize the libraries (stores).""" + for libname in self._default_library_names: if libname in self.store.list_collections(): print(f"PystoreConnector: library '{self.path}/{libname}' " "already exists. Linking to existing library.") lib = self.store.collection(libname) - self.libs[libname] = lib - def get_library(self, libname: str): + def _get_library(self, libname: str): """Get Pystore library handle. Parameters @@ -643,133 +233,86 @@ def get_library(self, libname: str): Pystore.Collection handle handle to the library """ - # get custom library name if necessary - real_libname = self.library_map[libname] - # get library handle - lib = self.store.collection(real_libname) + lib = self.store.collection(libname) return lib - def _add_series(self, libname: str, series: FrameorSeriesUnion, name: str, - metadata: Optional[dict] = None, - overwrite: bool = False): - """Internal method to add series to a library/store. + def _add_item(self, libname: str, + item: Union[FrameorSeriesUnion, Dict], + name: str, + metadata: Optional[Dict] = None, + overwrite: bool = False) -> None: + """Internal method to add item to library (timeseries or model). Parameters ---------- libname : str name of the library - series : pandas.DataFrame or pandas.Series - data to write to the pystore + item : Union[FrameorSeriesUnion, Dict] + item to add, either timeseries or pastas.Model as dictionary name : str - name of the series - metadata : dict, optional + name of the item + metadata : Optional[Dict], optional dictionary containing metadata, by default None overwrite : bool, optional - overwrite existing dataset with the same name, - by default False + overwrite item if it already exists, by default False. """ - self._validate_input_series(series) - series = self._set_series_name(series, name) # convert to DataFrame because pystore doesn't accept pandas.Series # (maybe has an easy fix, but converting w to_frame for now) - if isinstance(series, pd.Series): - s = series.to_frame(name=name) + if isinstance(item, pd.Series): + s = item.to_frame(name=name) is_series = True + elif isinstance(item, dict): + s = pd.DataFrame() # empty DataFrame as placeholder + jsondict = json.loads(json.dumps( + item, cls=PastasEncoder, indent=4)) + metadata = jsondict # model dict is stored in metadata + is_series = False else: - s = series + s = item is_series = False # store info about input series to ensure same type is returned if metadata is None: metadata = {"_is_series": is_series} else: metadata["_is_series"] = is_series - lib = self.get_library(libname) - if name not in lib.items or overwrite: - lib.write(name, s, metadata=metadata, overwrite=overwrite) - else: - raise Exception("Item with name '{0}' already" - " in '{1}' library!".format(name, libname)) - self._clear_cache(libname) - - def add_oseries(self, series: FrameorSeriesUnion, name: str, - metadata: Optional[dict] = None, - overwrite=True): - """Add oseries to the pystore. - Parameters - ---------- - series : pandas.DataFrame of pandas.Series - oseries data to write to the store - name : str - name of series - metadata : dict, optional - dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing dataset with the same name, - by default True - """ - self._add_series("oseries", series, name, - metadata=metadata, overwrite=overwrite) + lib = self._get_library(libname) + lib.write(name, s, metadata=metadata, overwrite=overwrite) - def add_stress(self, series: FrameorSeriesUnion, name: str, kind: str, - metadata: Optional[dict] = None, overwrite=True): - """Add stresses to the pystore. + def _get_item(self, libname: str, name: str) \ + -> Union[FrameorSeriesUnion, Dict]: + """Internal method to retrieve item from pystore library. Parameters ---------- - series : pandas.DataFrame of pandas.Series - stress data to write to the store - kind : str - category to identify type of stress, this label is added to the - metadata dictionary. - metadata : dict, optional - dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing dataset with the same name, - by default True - """ - if metadata is None: - metadata = {} - if kind not in metadata.keys(): - metadata["kind"] = kind - self._add_series("stresses", series, name, - metadata=metadata, overwrite=overwrite) - - def add_model(self, ml: Union[ps.Model, dict], overwrite: bool = True): - """Add model to the pystore. + libname : str + name of the library + name : str + name of the item - Parameters - ---------- - ml : pastas.Model or dict - model to write to the store - overwrite : bool, optional - overwrite existing store model if it already exists, - by default True + Returns + ------- + item : Union[FrameorSeriesUnion, Dict] + timeseries or model dictionary """ - if isinstance(ml, ps.Model): - mldict = ml.to_dict(series=False) - name = ml.name - elif isinstance(ml, dict): - mldict = ml - name = ml["name"] - else: - raise TypeError("Expected ps.Model or dict!") - jsondict = json.loads(json.dumps(mldict, cls=PastasEncoder, indent=4)) - lib = self.get_library("models") - # check if oseries and stresses exist in store, if not add them - if name not in lib.items or overwrite: - self._check_model_series_names_for_store(ml) - self._check_oseries_in_store(ml) - self._check_stresses_in_store(ml) - lib.write(name, pd.DataFrame(), metadata=jsondict, - overwrite=overwrite) - else: - raise Exception("Model with name '{}' already in store!".format( - name)) - self._clear_cache("models") + load_mod = import_module("pastas.io.pas") # type: ignore + lib = self._get_library(libname) + # hack for storing models, stored as metadata + if libname == "models": + jsonpath = lib._item_path(name).joinpath("metadata.json") + s = load_mod.load(jsonpath) # type: ignore + else: + # read series and convert to pandas + item = lib.item(name) + s = item.to_pandas() + # remove _is_series key and return pd.Series if user passed in Series + is_series = item.metadata.pop("_is_series") + if is_series: + s = s.squeeze() + return s - def _del_series(self, libname: str, name): + def _del_item(self, libname: str, name: str) -> None: """Internal method to delete data from the store. Parameters @@ -777,1220 +320,373 @@ def _del_series(self, libname: str, name): libname : str name of the library name : str - name of the series to delete + name of the item to delete """ - lib = self.get_library(libname) + lib = self._get_library(libname) lib.delete_item(name) self._clear_cache(libname) - def del_oseries(self, names: Union[list, str]): - """Delete oseries from pystore. - - Parameters - ---------- - name : str - name of the collection containing the data - names : str or list of str, optional - name(s) of oseries to delete - """ - for n in self._parse_names(names, libname="oseries"): - self._del_series("oseries", n) - - def del_stress(self, names: Union[list, str]): - """Delete stresses from pystore. - - Parameters - ---------- - names : str or list of str - name(s) of the series to delete - """ - for n in self._parse_names(names, libname="stresses"): - self._del_series("stresses", n) - - def del_models(self, names: Union[list, str]): - """Delete model(s) from pystore. - - Parameters - ---------- - names : str - name(s) of the model(s) to delete - """ - for n in self._parse_names(names, libname="models"): - self._del_series("models", n) - - def _get_series(self, libname: str, names: Union[list, str], - progressbar: bool = True, squeeze: bool = True): - """Internal method to load timeseries data. + def _get_metadata(self, libname: str, name: str) -> dict: + """Internal method to read metadata from pystore. Parameters ---------- libname : str - name of the store to load data from - names : str or list of str - name(s) of the timeseries to load - progressbar : bool, optional - show progressbar, by default True - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry - - Returns - ------- - pandas.DataFrame or dict of pandas.DataFrames - returns data DataFrame or dictionary of DataFrames - if multiple names are provided - """ - lib = self.get_library(libname) - - ts = {} - names = self._parse_names(names, libname=libname) - desc = f"Get {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - item = lib.item(n) - s = item.to_pandas() - # return pd.Series if user passed in Series - is_series = item.metadata.pop("_is_series") - if is_series: - s = s.squeeze() - ts[n] = s - # return frame if only 1 name - if len(ts) == 1 and squeeze: - return ts[n] - else: - return ts - - def get_metadata(self, libname: str, names: Union[list, str], - progressbar: bool = False, as_frame=True, - squeeze: bool = True) \ - -> Union[dict, pd.DataFrame]: - """Read metadata from pystore. - - Parameters - ---------- - libname : str - name of the library the series are in, - usually ("oseries" or "stresses") - names : str or list of str - name(s) of series to load metadata for - progressbar : bool, optional - show progressbar, by default True - as_frame : bool, optional - return metadata as dataframe, default is - True, otherwise return as dict or list of - dict - squeeze : bool, optional - if True return dict instead of list of dict for single entry + name of the library the series are in ("oseries" or "stresses") + name : str + name of item to load metadata for Returns ------- - list or pandas.DataFrame - list or pandas.DataFrame containing metadata + imeta : dict + dictionary containing metadata """ from pystore.utils import read_metadata - lib = self.get_library(libname) - - metalist = [] - names = self._parse_names(names, libname=libname) - desc = f"Get metadata {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - imeta = read_metadata(lib._item_path(n)) - if "name" not in imeta.keys(): - imeta["name"] = n - if "_is_series" in imeta.keys(): - imeta.pop("_is_series") - metalist.append(imeta) - if as_frame: - meta = self._meta_list_to_frame(metalist, names=names) - return meta - else: - if len(metalist) == 1 and squeeze: - return metalist[0] - else: - return metalist - - def get_oseries(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Retrieve oseries from pystore. - - Parameters - ---------- - names : str or list of str - name(s) of collections to load oseries data from - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default True - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry + lib = self._get_library(libname) + imeta = read_metadata(lib._item_path(name)) + if "name" not in imeta.keys(): + imeta["name"] = name + if "_is_series" in imeta.keys(): + imeta.pop("_is_series") + return imeta + + @property + def oseries_names(self): + """List of oseries names. Returns ------- - oseries : pandas.DataFrame or dict of pandas.DataFrames - returns data as a DataFrame or a dictionary of DataFrames - if multiple names are passed - metadata : dict or list of dict - metadata for each oseries, only returned if return_metadata=True + list + list of oseries in library """ - oseries = self._get_series("oseries", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("oseries", names, - progressbar=progressbar, - as_frame=False, squeeze=squeeze) - return oseries, metadata - else: - return oseries + return self._get_library("oseries").list_items() - def get_stresses(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Retrieve stresses from pystore. - - Parameters - ---------- - names : str or list of str - name(s) of collections to load stresses data from - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default True - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry + @property + def stresses_names(self): + """List of stresses names. Returns ------- - stresses : pandas.DataFrame or dict of pandas.DataFrames - returns data as a DataFrame or a dictionary of DataFrames - if multiple names are passed - metadata : dict or list of dict - metadata for each stress, only returned if return_metadata=True + list + list of stresses in library """ - stresses = self._get_series("stresses", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("stresses", names, - progressbar=progressbar, - as_frame=False, squeeze=squeeze) - return stresses, metadata - else: - return stresses + return self._get_library("stresses").list_items() - def get_models(self, names: Union[list, str], return_dict: bool = False, - progressbar: bool = False, squeeze: bool = True, - update_ts_settings: bool = False) \ - -> Union[ps.Model, list]: - """Load models from pystore. - - Parameters - ---------- - names : str or list of str - name(s) of the models to load - return_dict : bool, optional - return model dictionary instead of pastas.Model object - (much faster for obtaining parameters, for example) - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return Model instead of list of Models - for single entry - update_ts_settings : bool, optional - update timeseries settings based on timeseries in store. - overwrites stored tmin/tmax in model. + @property + def model_names(self): + """List of model names. Returns ------- - list or pastas.Model - model or list of models + list + list of models in library """ - lib = self.get_library("models") - - models = [] - load_mod = import_module("pastas.io.pas") # type: ignore - names = self._parse_names(names, libname="models") - - desc = "Get models" - for n in (tqdm(names, desc=desc) if progressbar else names): - jsonpath = lib._item_path(n).joinpath("metadata.json") - data = load_mod.load(jsonpath) # type: ignore - if return_dict: - ml = data - else: - ml = self._parse_model_dict( - data, update_ts_settings=update_ts_settings) - models.append(ml) - if len(models) == 1 and squeeze: - return models[0] - else: - return models - - @staticmethod - def _clear_cache(libname: str) -> None: - """Clear cached property.""" - getattr(PystoreConnector, libname).fget.cache_clear() - - @property # type: ignore - @functools.lru_cache() - def oseries(self): - """Dataframe with overview of oseries.""" - lib = self.get_library("oseries") - df = self.get_metadata("oseries", lib.list_items()) - return df - - @property # type: ignore - @functools.lru_cache() - def stresses(self): - """Dataframe with overview of stresses.""" - lib = self.get_library("stresses") - df = self.get_metadata("stresses", lib.list_items()) - return df - - @property # type: ignore - @functools.lru_cache() - def models(self): - """List of model names.""" - lib = self.get_library("models") - if lib is not None: - mls = lib.list_items() - else: - mls = [] - return mls + return self._get_library("models").list_items() class DictConnector(BaseConnector, ConnectorUtil): - """Object to store timeseries and pastas models in-memory. Provides methods - to read, write, or delete data from the object. Data is stored in - dictionaries. - - Parameters - ---------- - name : str - user-specified name of the connector - library_map : dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. - """ + conn_type = "dict" - def __init__(self, name: str, library_map: Optional[dict] = None): + def __init__(self, name: str): """Create DictConnector object that stores data in dictionaries. Parameters ---------- name : str user-specified name of the connector - library_map : dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. """ self.name = name - # allow custom library names - if library_map is None: - libmap = {i: i for i in self._default_library_names} - else: - libmap = library_map - - self.library_map = libmap - - # set empty dictionaries for series - for val in self.library_map.values(): + # create empty dictionaries for series and models + for val in self._default_library_names: setattr(self, "lib_" + val, {}) - def __repr__(self): - """Representation string of the object.""" - noseries = len(self.get_library("oseries").keys()) - nstresses = len(self.get_library("stresses").keys()) - nmodels = len(self.get_library("models").keys()) - return (" '{0}': {1} oseries, {2} stresses, " - "{3} models".format(self.name, noseries, nstresses, nmodels)) - - def get_library(self, libname: str): + def _get_library(self, libname: str): """Get reference to dictionary holding data. Parameters ---------- libname : str name of the library + + Returns + ------- + lib : dict + library handle """ - # get custom library name - real_libname = "lib_" + self.library_map[libname] - return getattr(self, real_libname) + return getattr(self, f"lib_{libname}") - def _add_series(self, libname: str, series: FrameorSeriesUnion, - name: str, metadata: Union[dict, None] = None, - overwrite: bool = False) -> None: - """Internal method to obtain series. + def _add_item(self, libname: str, + item: Union[FrameorSeriesUnion, Dict], + name: str, + metadata: Optional[Dict] = None, + **_) -> None: + """Internal method to add item (timeseries or models). Parameters ---------- libname : str name of library - series : FrameorSeriesUnion + item : FrameorSeriesUnion pandas.Series or pandas.DataFrame containing data name : str - name of the series + name of the item metadata : dict, optional dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing dataset with the same name, - by default False """ - self._validate_input_series(series) - series = self._set_series_name(series, name) - lib = self.get_library(libname) - if name not in lib or overwrite: - lib[name] = (metadata, series) + lib = self._get_library(libname) + if libname == "models": + lib[name] = item else: - raise Exception("Item with name '{0}' already" - " in '{1}' library!".format(name, libname)) - self._clear_cache(libname) + lib[name] = (metadata, item) - def add_oseries(self, series: FrameorSeriesUnion, name: str, - metadata: Union[dict, None] = None, - overwrite: bool = False) -> None: - """Add oseries to object. - - Parameters - ---------- - series : FrameorSeriesUnion - pandas.Series or pandas.DataFrame containing data - name : str - name of the oseries - metadata : dict, optional - dictionary with metadata, by default None - overwrite : bool, optional - overwrite existing timeseries, default is False - """ - self._add_series("oseries", series, name, metadata=metadata, - overwrite=overwrite) - - def add_stress(self, series: FrameorSeriesUnion, name: str, kind: str, - metadata: Union[dict, None] = None, - overwrite: bool = False) -> None: - """Add stress to object. - - Parameters - ---------- - series : FrameorSeriesUnion - pandas.Series or pandas.DataFrame containing data - name : str - name of the stress - kind : str - type of stress (i.e. 'prec', 'evap', 'well', etc.) - metadata : dict, optional - dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing timeseries, default is False - """ - if metadata is None: - metadata = {} - if kind not in metadata.keys(): - metadata["kind"] = kind - self._add_series("stresses", series, name, metadata=metadata, - overwrite=overwrite) - - def add_model(self, ml: Union[ps.Model, dict], - overwrite: bool = False) -> None: - """Add model to object. - - Parameters - ---------- - ml : pastas.Model or dict - pastas.Model or dictionary to add - overwrite : bool, optional - overwrite model in store, default is False - """ - lib = self.get_library("models") - if isinstance(ml, ps.Model): - mldict = ml.to_dict(series=False) - name = ml.name - elif isinstance(ml, dict): - mldict = ml - name = ml["name"] - else: - raise TypeError("Expected pastas.Model or dict!") - # check if oseries and stresses exist in store, if not add them - if name not in lib or overwrite: - self._check_model_series_names_for_store(ml) - self._check_oseries_in_store(ml) - self._check_stresses_in_store(ml) - lib[name] = mldict - else: - raise Exception("Model with name '{}' already in store!".format( - name)) - self._clear_cache("models") - - def del_models(self, names: Union[list, str]) -> None: - """Delete models from object. - - Parameters - ---------- - names : Union[list, str] - str or list of str of model names to remove - """ - lib = self.get_library("models") - for n in self._parse_names(names, libname="models"): - _ = lib.pop(n) - self._clear_cache("models") - - def del_oseries(self, names: Union[list, str]) -> None: - """Delete oseries from object. - - Parameters - ---------- - names : Union[list, str] - str or list of str of oseries to remove - """ - lib = self.get_library("oseries") - for n in self._parse_names(names, libname="oseries"): - _ = lib.pop(n) - self._clear_cache("oseries") - - def del_stress(self, names: Union[list, str]) -> None: - """Delete stresses from object. - - Parameters - ---------- - names : Union[list, str] - str or list of str of stresses to remove - """ - lib = self.get_library("stresses") - for n in self._parse_names(names, libname="stresses"): - _ = lib.pop(n) - self._clear_cache("stresses") - - def _get_series(self, libname: str, names: Union[list, str], - progressbar: bool = True, squeeze: bool = True) \ - -> FrameorSeriesUnion: - """Internal method to get oseries or stresses. + def _get_item(self, libname: str, name: str) \ + -> Union[FrameorSeriesUnion, Dict]: + """Internal method to retrieve item from pystore library. Parameters ---------- libname : str - name of library - names : Union[list, str] - str or list of string - progressbar : bool, optional - show progressbar, by default True - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry + name of the library + name : str + name of the item Returns ------- - dict, FrameorSeriesUnion - returns DataFrame or Series if only one name is passed, else - returns dict with all the data + item : Union[FrameorSeriesUnion, Dict] + timeseries or model dictionary """ - lib = self.get_library(libname) - ts = {} - names = self._parse_names(names, libname=libname) - desc = f"Get {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - ts[n] = deepcopy(lib[n][1]) - # return frame if len == 1 - if len(ts) == 1 and squeeze: - return ts[n] + lib = self._get_library(libname) + if libname == "models": + item = deepcopy(lib[name]) else: - return ts + item = deepcopy(lib[name][1]) + return item - def get_metadata(self, libname: str, names: Union[list, str], - progressbar: bool = False, as_frame: bool = True, - squeeze: bool = True) -> Union[pd.DataFrame, list]: - """Get metadata from object. + def _del_item(self, libname: str, name: str) -> None: + """Internal method to delete items (series or models). Parameters ---------- libname : str - name of library - names : Union[list, str] - str or list of str of names to get metadata for - progressbar : bool, optional - show progressbar, by default False - as_frame : bool, optional - return as DataFrame, by default True - squeeze : bool, optional - if True return dict instead of list of dict - for single entry - - Returns - ------- - Union[pd.DataFrame, list] - returns list of metadata or pandas.DataFrame depending on value - of `as_frame` - """ - lib = self.get_library(libname) - metalist = [] - names = self._parse_names(names, libname=libname) - desc = f"Get metadata {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - imeta = deepcopy(lib[n][0]) - if imeta is None: - imeta = {} - if "name" not in imeta.keys(): - imeta["name"] = n - metalist.append(imeta) - - if as_frame: - meta = self._meta_list_to_frame(metalist, names=names) - return meta - else: - if len(metalist) == 1 and squeeze: - return metalist[0] - else: - return metalist - - def get_oseries(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Retrieve oseries from object. - - Parameters - ---------- - names : Union[list, str] - name or list of names to retrieve - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry - - Returns - ------- - oseries : dict, FrameorSeriesUnion - returns dictionary or DataFrame/Series depending on number of - names passed - metadata : dict or list of dict - metadata for each stress, only returned if return_metadata=True + name of library to delete item from + name : str + name of item to delete """ - oseries = self._get_series("oseries", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("oseries", names, - progressbar=progressbar, - as_frame=False, squeeze=squeeze) - return oseries, metadata - else: - return oseries + lib = self._get_library(libname) + _ = lib.pop(name) - def get_stresses(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Retrieve stresses from object. + def _get_metadata(self, libname: str, name: str) -> dict: + """Internal method to read metadata. Parameters ---------- - names : Union[list, str] - name or list of names of stresses to retrieve - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry + libname : str + name of the library the series are in ("oseries" or "stresses") + name : str + name of item to load metadata for Returns ------- - stresses : dict, FrameorSeriesUnion - returns dictionary or DataFrame/Series depending on number of - names passed - metadata : dict or list of dict - metadata for each stress, only returned if return_metadata=True + imeta : dict + dictionary containing metadata """ - stresses = self._get_series("stresses", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("stresses", names, - progressbar=progressbar, - as_frame=False, squeeze=squeeze) - return stresses, metadata - else: - return stresses + lib = self._get_library(libname) + imeta = deepcopy(lib[name][0]) + return imeta - def get_models(self, names: Union[list, str], return_dict: bool = False, - progressbar: bool = False, squeeze: bool = True, - update_ts_settings: bool = False) \ - -> Union[Model, list]: - """Load models from object. + @property + def oseries_names(self): + """List of oseries names.""" + lib = self._get_library("oseries") + return list(lib.keys()) - Parameters - ---------- - names : str or list of str - names of the models to load - return_dict : bool, optional - return model dictionary instead of pastas.Model object - (much faster for obtaining parameters, for example) - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return Model instead of list of Models - for single entry - update_ts_settings : bool, optional - update timeseries settings based on timeseries in store. - overwrites stored tmin/tmax in model. + @property + def stresses_names(self): + """List of stresses names.""" + lib = self._get_library("stresses") + return list(lib.keys()) - Returns - ------- - pastas.Model or list of pastas.Model - return pastas model, or list of models if multiple names were - passed - """ - lib = self.get_library("models") - models = [] - names = self._parse_names(names, libname="models") - - desc = "Get models" - for n in (tqdm(names, desc=desc) if progressbar else names): - data = deepcopy(lib[n]) - if return_dict: - ml = data - else: - ml = self._parse_model_dict( - data, update_ts_settings=update_ts_settings) - models.append(ml) - if len(models) == 1 and squeeze: - return models[0] - else: - return models - - @staticmethod - def _clear_cache(libname: str) -> None: - """Clear cached property.""" - getattr(DictConnector, libname).fget.cache_clear() - - @property # type: ignore - @functools.lru_cache() - def oseries(self): - """Dataframe showing overview of oseries.""" - lib = self.get_library("oseries") - return self.get_metadata("oseries", names=list(lib.keys())) - - @property # type: ignore - @functools.lru_cache() - def stresses(self): - """Dataframe showing overview of stresses.""" - lib = self.get_library("stresses") - return self.get_metadata("stresses", names=list(lib.keys())) - - @property # type: ignore - @functools.lru_cache() - def models(self): + @property + def model_names(self): """List of model names.""" - lib = self.get_library("models") + lib = self._get_library("models") return list(lib.keys()) class PasConnector(BaseConnector, ConnectorUtil): - """Object to store timeseries and pastas models on disk. Provides methods - to read, write, or delete data from the object. Data is stored in JSON - files on disk. - - Parameters - ---------- - name : str - user-specified name of the connector - path : str - path to directory for reading/writing data - library_map : dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. - """ + conn_type = "pas" - def __init__(self, name: str, path: str, - library_map: Optional[dict] = None): + def __init__(self, name: str, path: str): """Create PasConnector object that stores data as JSON files on disk. + Uses Pastas export format (pas-files) to store files. + Parameters ---------- name : str user-specified name of the connector path : str path to directory for reading/writing data - library_map : dict, optional - dictionary containing the default library names as - keys ('oseries', 'stresses', 'models') and the user - specified library names as corresponding values. - Allows user defined library names. """ self.name = name self.path = os.path.abspath(path) + self._initialize() - # allow custom library names - if library_map is None: - libmap = {i: i for i in self._default_library_names} - else: - libmap = library_map - - self.library_map = libmap - + def _initialize(self) -> None: + """Internal method to initialize the libraries.""" # set empty dictionaries for series - for val in self.library_map.values(): - libdir = os.path.join(path, val) + for val in self._default_library_names: + libdir = os.path.join(self.path, val) if not os.path.exists(libdir): print(f"PasConnector: library {val} created in {libdir}") os.makedirs(libdir) else: print(f"PasConnector: library {val} already exists. " f"Linking to existing directory: {libdir}") - setattr(self, f"lib_{val}", os.path.join(path, val)) + setattr(self, f"lib_{val}", os.path.join(self.path, val)) - def __repr__(self): - """Representation string of the object.""" - noseries = len(self.oseries) - nstresses = len(self.stresses) - nmodels = len(self.models) - return (" '{0}': {1} oseries, {2} stresses, " - "{3} models".format(self.name, noseries, nstresses, nmodels)) - - def get_library(self, libname: str): + def _get_library(self, libname: str): """Get path to directory holding data. Parameters ---------- libname : str name of the library + + Returns + ------- + lib : str + path to library """ - # get custom library name - real_libname = self.library_map[libname] - return getattr(self, "lib_" + real_libname) + return getattr(self, "lib_" + libname) - def _add_series(self, libname: str, series: FrameorSeriesUnion, - name: str, metadata: Union[dict, None] = None, - overwrite: bool = False) -> None: - """Internal method to add series. + def _add_item(self, libname: str, + item: Union[FrameorSeriesUnion, Dict], + name: str, + metadata: Optional[Dict] = None, + **_) -> None: + """Internal method to add item (timeseries or models). Parameters ---------- libname : str name of library - series : FrameorSeriesUnion + item : FrameorSeriesUnion pandas.Series or pandas.DataFrame containing data name : str - name of the series + name of the item metadata : dict, optional dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing dataset with the same name, - by default False """ - self._validate_input_series(series) - series = self._set_series_name(series, name) - - lib = self.get_library(libname) - df = getattr(self, libname) - if name not in df.index or overwrite: - if isinstance(series, pd.Series): - series = series.to_frame() - sjson = series.to_json(orient="columns") + lib = self._get_library(libname) + + # timeseries + if isinstance(item, pd.Series): + item = item.to_frame() + if isinstance(item, pd.DataFrame): + sjson = item.to_json(orient="columns") fname = os.path.join(lib, f"{name}.pas") with open(fname, "w") as f: f.write(sjson) - if metadata: + if metadata is not None: mjson = json.dumps(metadata, cls=PastasEncoder, indent=4) - fname_meta = fname = os.path.join(lib, f"{name}_meta.pas") + fname_meta = os.path.join(lib, f"{name}_meta.pas") with open(fname_meta, "w") as m: m.write(mjson) - else: - raise Exception("Item with name '{0}' already" - " in '{1}' library!".format(name, libname)) - self._clear_cache(libname) - - def add_oseries(self, series: FrameorSeriesUnion, name: str, - metadata: Union[dict, None] = None, - overwrite: bool = False) -> None: - """Add oseries to object. - - Parameters - ---------- - series : FrameorSeriesUnion - pandas.Series or pandas.DataFrame containing data - name : str - name of the oseries - metadata : dict, optional - dictionary with metadata, by default None - overwrite : bool, optional - overwrite existing timeseries, default is False - """ - self._add_series("oseries", series, name, metadata=metadata, - overwrite=overwrite) - - def add_stress(self, series: FrameorSeriesUnion, name: str, kind: str, - metadata: Union[dict, None] = None, - overwrite: bool = False) -> None: - """Add stress to object. - - Parameters - ---------- - series : FrameorSeriesUnion - pandas.Series or pandas.DataFrame containing data - name : str - name of the stress - kind : str - type of stress (i.e. 'prec', 'evap', 'well', etc.) - metadata : dict, optional - dictionary containing metadata, by default None - overwrite : bool, optional - overwrite existing timeseries, default is False - """ - if metadata is None: - metadata = {} - if kind not in metadata.keys(): - metadata["kind"] = kind - self._add_series("stresses", series, name, metadata=metadata, - overwrite=overwrite) - - def add_model(self, ml: Union[ps.Model, dict], - overwrite: bool = False) -> None: - """Add model to object. - - Parameters - ---------- - ml : pastas.Model or dict - pastas.Model or dictionary to add - overwrite : bool, optional - overwrite model in store, default is False - """ - lib = self.get_library("models") - if isinstance(ml, ps.Model): - mldict = ml.to_dict(series=False) - name = ml.name - elif isinstance(ml, dict): - mldict = ml - name = ml["name"] - else: - raise TypeError("Expected pastas.Model or dict!") - # check if oseries and stresses exist in store, if not add them - if name not in self.models or overwrite: - self._check_model_series_names_for_store(ml) - self._check_oseries_in_store(ml) - self._check_stresses_in_store(ml) - - jsondict = json.dumps(mldict, cls=PastasEncoder, indent=4) + # pastas model dict + elif isinstance(item, dict): + jsondict = json.dumps(item, cls=PastasEncoder, indent=4) fmodel = os.path.join(lib, f"{name}.pas") with open(fmodel, "w") as fm: fm.write(jsondict) - else: - raise Exception("Model with name '{}' already in store!".format( - name)) - self._clear_cache("models") - def del_models(self, names: Union[list, str]) -> None: - """Delete models from object. + def _get_item(self, libname: str, name: str) \ + -> Union[FrameorSeriesUnion, Dict]: + """Internal method to retrieve item. Parameters ---------- - names : Union[list, str] - str or list of str of model names to remove - """ - lib = self.get_library("models") - for n in self._parse_names(names, libname="models"): - os.remove(os.path.join(lib, f"{n}.pas")) - self._clear_cache("models") - - def del_oseries(self, names: Union[list, str]) -> None: - """Delete oseries from object. + libname : str + name of the library + name : str + name of the item - Parameters - ---------- - names : Union[list, str] - str or list of str of oseries to remove - """ - lib = self.get_library("oseries") - for n in self._parse_names(names, libname="oseries"): - os.remove(os.path.join(lib, f"{n}.pas")) - try: - os.remove(os.path.join(lib, f"{n}_meta.pas")) - except FileNotFoundError: - # Nothing to delete - pass - self._clear_cache("oseries") + Returns + ------- + item : Union[FrameorSeriesUnion, Dict] + timeseries or model dictionary + """ + lib = self._get_library(libname) + fjson = os.path.join(lib, f"{name}.pas") + if not os.path.exists(fjson): + msg = f"Item '{name}' not in '{libname}' library." + raise FileNotFoundError(msg) + # model + if libname == "models": + with open(fjson, "r") as ml_json: + item = json.load(ml_json, object_hook=pastas_hook) + # timeseries + else: + item = self._series_from_json(fjson) + return item - def del_stress(self, names: Union[list, str]) -> None: - """Delete stresses from object. + def _del_item(self, libname: str, name: str) -> None: + """Internal method to delete items (series or models). Parameters ---------- - names : Union[list, str] - str or list of str of stresses to remove + libname : str + name of library to delete item from + name : str + name of item to delete """ - lib = self.get_library("stresses") - for n in self._parse_names(names, libname="stresses"): - os.remove(os.path.join(lib, f"{n}.pas")) + lib = self._get_library(libname) + os.remove(os.path.join(lib, f"{name}.pas")) + # remove metadata for timeseries + if libname != "models": try: - os.remove(os.path.join(lib, f"{n}_meta.pas")) + os.remove(os.path.join(lib, f"{name}_meta.pas")) except FileNotFoundError: # Nothing to delete pass - self._clear_cache("stresses") - - def _get_series(self, libname: str, names: Union[list, str], - progressbar: bool = True, squeeze: bool = True) \ - -> FrameorSeriesUnion: - """Internal method to get oseries or stresses. - - Parameters - ---------- - libname : str - name of library - names : Union[list, str] - str or list of string - progressbar : bool, optional - show progressbar, by default True - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry - - Returns - ------- - dict, FrameorSeriesUnion - returns DataFrame or Series if only one name is passed, else - returns dict with all the data - """ - lib = self.get_library(libname) - ts = {} - names = self._parse_names(names, libname=libname) - desc = f"Get {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - fjson = os.path.join(lib, f"{n}.pas") - if not os.path.exists(fjson): - msg = f"Stress '{n}' not in store." - raise FileNotFoundError(msg) - ts[n] = self._series_from_json(fjson) - # return frame if len == 1 - if len(ts) == 1 and squeeze: - return ts[n] - else: - return ts - def get_metadata(self, libname: str, names: Union[list, str], - progressbar: bool = False, as_frame: bool = True, - squeeze: bool = True) -> Union[pd.DataFrame, list]: - """Get metadata from object. + def _get_metadata(self, libname: str, name: str) -> dict: + """Internal method to read metadata. Parameters ---------- libname : str - name of library - names : Union[list, str] - str or list of str of names to get metadata for - progressbar : bool, optional - show progressbar, by default False - as_frame : bool, optional - return as DataFrame, by default True - squeeze : bool, optional - if True return dict instead of list of dict - for single entry - - Returns - ------- - Union[pd.DataFrame, list] - returns list of metadata or pandas.DataFrame depending on value - of `as_frame` - """ - lib = self.get_library(libname) - metalist = [] - names = self._parse_names(names, libname=libname) - desc = f"Get metadata {libname}" - for n in (tqdm(names, desc=desc) if progressbar else names): - mjson = os.path.join(lib, f"{n}_meta.pas") - imeta = self._metadata_from_json(mjson) - if imeta is None: - imeta = {} - if "name" not in imeta.keys(): - imeta["name"] = n - metalist.append(imeta) - - if as_frame: - meta = self._meta_list_to_frame(metalist, names=names) - return meta - else: - if len(metalist) == 1 and squeeze: - return metalist[0] - else: - return metalist - - def get_oseries(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Retrieve oseries from object. - - Parameters - ---------- - names : Union[list, str] - name or list of names to retrieve - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry - - Returns - ------- - oseries : dict, FrameorSeriesUnion - returns dictionary or DataFrame/Series depending on number of - names passed - metadata : dict or list of dict - metadata for each oseries, only returned if return_metadata=True - """ - oseries = self._get_series("oseries", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("oseries", names, - progressbar=progressbar, - as_frame=False, squeeze=squeeze) - return oseries, metadata - else: - return oseries - - def get_stresses(self, names: Union[list, str], - return_metadata: bool = False, - progressbar: bool = False, - squeeze: bool = True) \ - -> Union[Union[FrameorSeriesUnion, Dict], - Optional[Union[Dict, List]]]: - """Retrieve stresses from object. - - Parameters - ---------- - names : Union[list, str] - name or list of names of stresses to retrieve - return_metadata : bool, optional - return metadata as dictionary or list of dictionaries, - default is False - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return DataFrame or Series instead of dictionary - for single entry - - Returns - ------- - stresses : dict, FrameorSeriesUnion - returns dictionary or DataFrame/Series depending on number of - names passed - metadata : dict or list of dict - metadata for each stress, only returned if return_metadata=True - """ - stresses = self._get_series("stresses", names, progressbar=progressbar, - squeeze=squeeze) - if return_metadata: - metadata = self.get_metadata("stresses", names, - progressbar=progressbar, - as_frame=False, squeeze=squeeze) - return stresses, metadata - else: - return stresses - - def get_models(self, names: Union[list, str], return_dict: bool = False, - progressbar: bool = False, squeeze: bool = True, - update_ts_settings: bool = False) \ - -> Union[Model, list]: - """Load models from object. - - Parameters - ---------- - names : str or list of str - names of the models to load - return_dict : bool, optional - return model dictionary instead of pastas.Model object - (much faster for obtaining parameters, for example) - progressbar : bool, optional - show progressbar, by default False - squeeze : bool, optional - if True return Model instead of list of Models - for single entry - update_ts_settings : bool, optional - update timeseries settings based on timeseries in store. - overwrites stored tmin/tmax in model. + name of the library the series are in ("oseries" or "stresses") + name : str + name of item to load metadata for Returns ------- - pastas.Model or list of pastas.Model - return pastas model, or list of models if multiple names were - passed - """ - lib = self.get_library("models") - models = [] - names = self._parse_names(names, libname="models") - - desc = "Get models" - for n in (tqdm(names, desc=desc) if progressbar else names): - with open(os.path.join(lib, f"{n}.pas"), "r") as ml_json: - data = json.load(ml_json, object_hook=pastas_hook) - if return_dict: - ml = data - else: - ml = self._parse_model_dict( - data, update_ts_settings=update_ts_settings) - models.append(ml) - if len(models) == 1 and squeeze: - return models[0] - else: - return models - - @ staticmethod - def _clear_cache(libname: str) -> None: - """Clear cached property.""" - getattr(PasConnector, libname).fget.cache_clear() - - @ property # type: ignore - @ functools.lru_cache() - def oseries(self): - """Dataframe showing overview of oseries.""" - lib = self.get_library("oseries") - names = [i[:-4] for i in os.listdir(lib) - if not i.endswith("_meta.pas")] - return self.get_metadata("oseries", names, as_frame=True, - progressbar=False) - - @ property # type: ignore - @ functools.lru_cache() - def stresses(self): - """Dataframe showing overview of stresses.""" - lib = self.get_library("stresses") - names = [i[:-4] - for i in os.listdir(lib) if not i.endswith("_meta.pas")] - return self.get_metadata("stresses", names, as_frame=True, - progressbar=False) - - @ property # type: ignore - @ functools.lru_cache() - def models(self): + imeta : dict + dictionary containing metadata + """ + lib = self._get_library(libname) + mjson = os.path.join(lib, f"{name}_meta.pas") + imeta = self._metadata_from_json(mjson) + return imeta + + @property + def oseries_names(self): + """List of oseries names.""" + lib = self._get_library("oseries") + return [i[:-4] for i in os.listdir(lib) if not i.endswith("_meta.pas")] + + @property + def stresses_names(self): + """List of stresses names.""" + lib = self._get_library("stresses") + return [i[:-4] for i in os.listdir(lib) if not i.endswith("_meta.pas")] + + @property + def model_names(self): """List of model names.""" - lib = self.get_library("models") + lib = self._get_library("models") return [i[:-4] for i in os.listdir(lib)] diff --git a/pastastore/store.py b/pastastore/store.py index 6794387a..07e194f2 100644 --- a/pastastore/store.py +++ b/pastastore/store.py @@ -1,7 +1,7 @@ import json import os import warnings -from typing import Optional, Tuple, Union +from typing import List, Optional, Tuple, Union import numpy as np import pandas as pd @@ -129,7 +129,8 @@ def get_nearest_oseries(self, names: Optional[Union[list, str]] = None, def get_distances(self, oseries: Optional[Union[list, str]] = None, stresses: Optional[Union[list, str]] = None, - kind: Optional[str] = None) -> FrameorSeriesUnion: + kind: Optional[Union[str, List[str]]] = None) \ + -> FrameorSeriesUnion: """Method to obtain the distances in meters between the oseries and stresses. diff --git a/pastastore/util.py b/pastastore/util.py index 63a93d7b..cee63c2a 100644 --- a/pastastore/util.py +++ b/pastastore/util.py @@ -1,5 +1,5 @@ import os -from typing import List, Optional +from typing import Dict, List, Optional, Union import numpy as np import pandas as pd @@ -12,6 +12,10 @@ def _custom_warning(message, category=UserWarning, filename='', lineno=-1, print(f"{filename}:{lineno}: {category.__name__}: {message}") +class ItemInLibraryException(Exception): + pass + + def delete_pystore_connector(path: Optional[str] = None, name: Optional[str] = None, conn=None, @@ -83,7 +87,7 @@ def delete_arctic_connector(connstr: Optional[str] = None, name = conn.name connstr = conn.connstr elif name is None or connstr is None: - raise ValueError("Please provide 'name' and 'connstr' OR 'conn'!") + raise ValueError("Provide 'name' and 'connstr' OR 'conn'!") arc = arctic.Arctic(connstr) @@ -94,8 +98,10 @@ def delete_arctic_connector(connstr: Optional[str] = None, for ilib in arc.list_libraries(): if ilib.split(".")[0] == name: libs.append(ilib) - else: + elif name is not None: libs = [name + "." + ilib for ilib in libraries] + else: + raise ValueError("Provide 'name' and 'connstr' OR 'conn'!") for lib in libs: arc.delete_library(lib) @@ -203,7 +209,8 @@ def empty_library(pstore, libname: str, def validate_names(s: Optional[str] = None, d: Optional[dict] = None, replace_space: Optional[str] = "_", - deletechars: Optional[str] = None, **kwargs) -> str: + deletechars: Optional[str] = None, **kwargs) \ + -> Union[str, Dict]: """Remove invalid characters from string or dictionary keys. Parameters @@ -237,7 +244,7 @@ def validate_names(s: Optional[str] = None, d: Optional[dict] = None, new_dict[validator(k)[0]] = v return new_dict else: - raise ValueError("Provide one of 's' or 'd'!") + raise ValueError("Provide one of 's' (string) or 'd' (dict)!") def compare_models(ml1, ml2, stats=None, detailed_comparison=False): @@ -353,3 +360,50 @@ def compare_models(ml1, ml2, stats=None, detailed_comparison=False): return df else: return df["comparison"].all() + + +def copy_database(conn1, conn2, libraries: Optional[List[str]] = None, + overwrite: bool = False, progressbar: bool = False) -> None: + """Copy libraries from one database to another. + + Parameters + ---------- + conn1 : pastastore.*Connector + source Connector containing link to current database containing data + conn2 : pastastore.*Connector + destination Connector with link to database to which you want to copy + libraries : Optional[List[str]], optional + list of str containing names of libraries to copy, by default None, + which copies all libraries: ['oseries', 'stresses', 'models'] + overwrite : bool, optional + overwrite data in destination database, by default False + progressbar : bool, optional + show progressbars, by default False + + Raises + ------ + ValueError + if library name is not understood + """ + if libraries is None: + libraries = ["oseries", "stresses", "models"] + + for lib in libraries: + if lib == "oseries": + for name in (tqdm(conn1.oseries_names, desc="copying oseries") if + progressbar else conn1.oseries_names): + o, meta = conn1.get_oseries(name, return_metadata=True) + conn2.add_oseries(o, name, metadata=meta, overwrite=overwrite) + elif lib == "stresses": + for name in (tqdm(conn1.stresses_names, desc="copying oseries") if + progressbar else conn1.stresses_names): + s, meta = conn1.get_stresses(name, return_metadata=True) + conn2.add_stress(s, name, kind=meta["kind"], metadata=meta, + overwrite=overwrite) + elif lib == "models": + for name in (tqdm(conn1.model_names, desc="copying oseries") if + progressbar else conn1.model_names): + mldict = conn1.get_models(name, return_dict=True) + conn2.add_model(mldict, overwrite=overwrite) + else: + raise ValueError(f"Library name '{lib}' not recognized!") diff --git a/pastastore/version.py b/pastastore/version.py index 3d187266..906d362f 100644 --- a/pastastore/version.py +++ b/pastastore/version.py @@ -1 +1 @@ -__version__ = "0.5.0" +__version__ = "0.6.0" diff --git a/tests/conftest.py b/tests/conftest.py index b101c12c..6bf4d98c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,114 +1,102 @@ -import pytest -import pystore -import arctic import pandas as pd - import pastastore as pst - +import pystore +import pytest params = ["arctic", "pystore", "dict", "pas"] -# params = ["pas"] +# params = ["dict"] def initialize_project(conn): - prj = pst.PastaStore("test_project", conn) + pstore = pst.PastaStore("test_project", conn) # oseries 1 o = pd.read_csv("./tests/data/obs.csv", index_col=0, parse_dates=True) - prj.add_oseries(o, "oseries1", metadata={"x": 100000, - "y": 400000}) + pstore.add_oseries(o, "oseries1", metadata={"x": 100000, + "y": 400000}) # oseries 2 o = pd.read_csv("./tests/data/head_nb1.csv", index_col=0, parse_dates=True) - prj.add_oseries(o, "oseries2", metadata={"x": 100300, - "y": 400400}) + pstore.add_oseries(o, "oseries2", metadata={"x": 100300, + "y": 400400}) # oseries 3 o = pd.read_csv("./tests/data/gw_obs.csv", index_col=0, parse_dates=True) - prj.add_oseries(o, "oseries3", metadata={"x": 165554, - "y": 422685}) + pstore.add_oseries(o, "oseries3", metadata={"x": 165554, + "y": 422685}) # prec 1 s = pd.read_csv("./tests/data/rain.csv", index_col=0, parse_dates=True) - prj.add_stress(s, "prec1", kind="prec", metadata={"x": 100000, - "y": 400000}) + pstore.add_stress(s, "prec1", kind="prec", metadata={"x": 100000, + "y": 400000}) # prec 2 s = pd.read_csv("./tests/data/rain_nb1.csv", index_col=0, parse_dates=True) - prj.add_stress(s, "prec2", kind="prec", metadata={"x": 100300, - "y": 400400}) + pstore.add_stress(s, "prec2", kind="prec", metadata={"x": 100300, + "y": 400400}) # evap 1 s = pd.read_csv("./tests/data/evap.csv", index_col=0, parse_dates=True) - prj.add_stress(s, "evap1", kind="evap", metadata={"x": 100000, - "y": 400000}) + pstore.add_stress(s, "evap1", kind="evap", metadata={"x": 100000, + "y": 400000}) # evap 2 s = pd.read_csv("./tests/data/evap_nb1.csv", index_col=0, parse_dates=True) - prj.add_stress(s, "evap2", kind="evap", metadata={"x": 100300, - "y": 400400}) + pstore.add_stress(s, "evap2", kind="evap", metadata={"x": 100300, + "y": 400400}) # well 1 s = pd.read_csv("./tests/data/well.csv", index_col=0, parse_dates=True) - prj.add_stress(s, "well1", kind="well", metadata={"x": 164691, - "y": 423579}) + pstore.add_stress(s, "well1", kind="well", metadata={"x": 164691, + "y": 423579}) - return prj + return pstore @pytest.fixture(scope="module", params=params) -def pr(request): +def conn(request): """Fixture that yields connection object. """ name = f"test_{request.param}" # connect to dbase if request.param == "arctic": connstr = "mongodb://localhost:27017/" - pr = pst.ArcticConnector(name, connstr) + conn = pst.ArcticConnector(name, connstr) elif request.param == "pystore": path = "./tests/data/pystore" - pr = pst.PystoreConnector(name, path) + conn = pst.PystoreConnector(name, path) elif request.param == "dict": - pr = pst.DictConnector(name) + conn = pst.DictConnector(name) elif request.param == "pas": - pr = pst.PasConnector(name, "./tests/data/pas") + conn = pst.PasConnector(name, "./tests/data/pas") else: raise ValueError("Unrecognized parameter!") - pr.type = request.param # added here for defining test dependencies - yield pr + conn.type = request.param # added here for defining test dependencies + yield conn @pytest.fixture(scope="module", params=params) -def prj(request): +def pstore(request): if request.param == "arctic": connstr = "mongodb://localhost:27017/" name = "test_project" - arc = arctic.Arctic(connstr) - if name in [lib.split(".")[0] for lib in arc.list_libraries()]: - connector = pst.ArcticConnector(name, connstr) - prj = pst.PastaStore(name, connector) - else: - connector = pst.ArcticConnector(name, connstr) - prj = initialize_project(connector) + connector = pst.ArcticConnector(name, connstr) + pstore = initialize_project(connector) elif request.param == "pystore": name = "test_project" path = "./tests/data/pystore" pystore.set_path(path) - if name in pystore.list_stores(): - connector = pst.PystoreConnector(name, path) - prj = pst.PastaStore(name, connector) - else: - connector = pst.PystoreConnector(name, path) - prj = initialize_project(connector) + connector = pst.PystoreConnector(name, path) + pstore = initialize_project(connector) elif request.param == "dict": name = "test_project" connector = pst.DictConnector(name) - prj = initialize_project(connector) + pstore = initialize_project(connector) elif request.param == "pas": name = "test_project" connector = pst.PasConnector(name, "./tests/data/pas") - prj = initialize_project(connector) + pstore = initialize_project(connector) else: raise ValueError("Unrecognized parameter!") - prj.type = request.param # added here for defining test dependencies - yield prj + pstore.type = request.param # added here for defining test dependencies + yield pstore diff --git a/tests/test_002_connectors.py b/tests/test_002_connectors.py index 49356864..836fbbe4 100644 --- a/tests/test_002_connectors.py +++ b/tests/test_002_connectors.py @@ -1,11 +1,10 @@ import warnings import pandas as pd +import pastas as ps import pytest from pytest_dependency import depends -import pastas as ps - with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=FutureWarning) import pastastore as pst @@ -13,128 +12,176 @@ ps.set_log_level("ERROR") -def test_get_library(pr): - olib = pr.get_library("oseries") +def test_get_library(conn): + olib = conn._get_library("oseries") return olib -def test_add_get_series(request, pr): +def test_add_get_series(request, conn): o1 = pd.Series(index=pd.date_range("2000", periods=10, freq="D"), data=1.0) o1.name = "test_series" - pr.add_oseries(o1, "test_series", metadata=None) - o2 = pr.get_oseries("test_series") + conn.add_oseries(o1, "test_series", metadata=None) + o2 = conn.get_oseries("test_series") # PasConnector has no logic for preserving Series - if pr.conn_type == "pas": + if conn.conn_type == "pas": o2 = o2.squeeze() try: assert isinstance(o2, pd.Series) assert (o1 == o2).all() finally: - pr.del_oseries("test_series") + conn.del_oseries("test_series") return -def test_add_get_dataframe(request, pr): +def test_add_get_dataframe(request, conn): o1 = pd.DataFrame(data=1.0, columns=["test_df"], index=pd.date_range("2000", periods=10, freq="D")) o1.index.name = "test_idx" - pr.add_oseries(o1, "test_df", metadata=None) - o2 = pr.get_oseries("test_df") + conn.add_oseries(o1, "test_df", metadata=None) + o2 = conn.get_oseries("test_df") try: assert isinstance(o2, pd.DataFrame) assert (o1 == o2).all().all() finally: - pr.del_oseries("test_df") + conn.del_oseries("test_df") + return + + +def test_add_pastas_timeseries(request, conn): + o1 = pd.DataFrame(data=1.0, columns=["test_df"], + index=pd.date_range("2000", periods=10, freq="D")) + o1.index.name = "test_idx" + ts = ps.TimeSeries(o1, metadata={"x": 100000., "y": 400000.}) + conn.add_oseries(ts, "test_pastas_ts", metadata=None) + conn.add_stress(ts, "test_pastas_ts", kind="test", + metadata={"x": 200000., "y": 500000.}) + conn.del_oseries("test_pastas_ts") + conn.del_stress("test_pastas_ts") + return + + +def test_update_series(request, conn): + o1 = pd.DataFrame(data=1.0, columns=["test_df"], + index=pd.date_range("2000", periods=10, freq="D")) + o1.index.name = "test_idx" + conn.add_oseries(o1, "test_df", metadata={"x": 100000.}) + o2 = pd.DataFrame(data=2.0, columns=["test_df"], + index=pd.date_range("2000-01-10", periods=2, freq="D")) + o2.index.name = "test_idx" + conn.update_oseries(o2, "test_df", metadata={"x": 200000., "y": 400000}) + o3 = conn.get_oseries("test_df") + try: + assert (o3.iloc[-2:] == 2.0).all().all() + assert o3.index.size == 11 + finally: + conn.del_oseries("test_df") + return + + +def test_update_metadata(request, conn): + o1 = pd.DataFrame(data=1.0, columns=["test_df"], + index=pd.date_range("2000", periods=10, freq="D")) + o1.index.name = "test_idx" + conn.add_oseries(o1, "test_df", metadata={"x": 100000.}) + conn.update_metadata("oseries", "test_df", {"x": 200000., "y": 400000.}) + m = conn._get_metadata("oseries", "test_df") + try: + assert isinstance(m, dict) + assert m["x"] == 200000. + assert m["y"] == 400000. + finally: + conn.del_oseries("test_df") return @pytest.mark.dependency() -def test_add_oseries(pr): +def test_add_oseries(conn): o = pd.read_csv("./tests/data/obs.csv", index_col=0, parse_dates=True) - pr.add_oseries(o, "oseries1", metadata={"name": "oseries1", - "x": 100000, - "y": 400000}, - overwrite=True) + conn.add_oseries(o, "oseries1", + metadata={"name": "oseries1", + "x": 100000, + "y": 400000}, + overwrite=True) return @pytest.mark.dependency() -def test_add_stress(pr): +def test_add_stress(conn): s = pd.read_csv("./tests/data/rain.csv", index_col=0, parse_dates=True) - pr.add_stress(s, "prec", kind="prec", metadata={"kind": "prec", - "x": 100001, - "y": 400001}) + conn.add_stress(s, "prec", kind="prec", metadata={"kind": "prec", + "x": 100001, + "y": 400001}) return @pytest.mark.dependency() -def test_get_oseries(request, pr): - depends(request, [f"test_add_oseries[{pr.type}]"]) - o = pr.get_oseries("oseries1") +def test_get_oseries(request, conn): + depends(request, [f"test_add_oseries[{conn.type}]"]) + o = conn.get_oseries("oseries1") return o @pytest.mark.dependency() -def test_get_oseries_and_metadata(request, pr): - depends(request, [f"test_add_oseries[{pr.type}]"]) - o, m = pr.get_oseries("oseries1", return_metadata=True) +def test_get_oseries_and_metadata(request, conn): + depends(request, [f"test_add_oseries[{conn.type}]"]) + o, m = conn.get_oseries("oseries1", return_metadata=True) return o, m @pytest.mark.dependency() -def test_get_stress(request, pr): - depends(request, [f"test_add_stress[{pr.type}]"]) - s = pr.get_stresses('prec') +def test_get_stress(request, conn): + depends(request, [f"test_add_stress[{conn.type}]"]) + s = conn.get_stresses('prec') s.name = 'prec' return s @pytest.mark.dependency() -def test_get_stress_and_metadata(request, pr): - depends(request, [f"test_add_stress[{pr.type}]"]) - s, m = pr.get_stresses('prec', return_metadata=True) +def test_get_stress_and_metadata(request, conn): + depends(request, [f"test_add_stress[{conn.type}]"]) + s, m = conn.get_stresses('prec', return_metadata=True) s.name = 'prec' return s, m @pytest.mark.dependency() -def test_oseries_prop(request, pr): - depends(request, [f"test_add_oseries[{pr.type}]"]) - return pr.oseries +def test_oseries_prop(request, conn): + depends(request, [f"test_add_oseries[{conn.type}]"]) + return conn.oseries @pytest.mark.dependency() -def test_stresses_prop(request, pr): - depends(request, [f"test_add_stress[{pr.type}]"]) - return pr.stresses +def test_stresses_prop(request, conn): + depends(request, [f"test_add_stress[{conn.type}]"]) + return conn.stresses -def test_repr(pr): - return pr.__repr__() +def test_repr(conn): + return conn.__repr__() @pytest.mark.dependency() -def test_del_oseries(request, pr): - depends(request, [f"test_add_oseries[{pr.type}]"]) - pr.del_oseries("oseries1") +def test_del_oseries(request, conn): + depends(request, [f"test_add_oseries[{conn.type}]"]) + conn.del_oseries("oseries1") return @pytest.mark.dependency() -def test_del_stress(request, pr): - depends(request, [f"test_add_stress[{pr.type}]"]) - pr.del_stress("prec") +def test_del_stress(request, conn): + depends(request, [f"test_add_stress[{conn.type}]"]) + conn.del_stress("prec") return @pytest.mark.dependency() -def test_delete(request, pr): - if pr.conn_type == "arctic": +def test_delete(request, conn): + if conn.conn_type == "arctic": pst.util.delete_arctic_connector( - pr.connstr, pr.name, libraries=["oseries"]) - pst.util.delete_arctic_connector(pr.connstr, pr.name) - elif pr.conn_type == "pystore": + conn.connstr, conn.name, libraries=["oseries"]) + pst.util.delete_arctic_connector(conn.connstr, conn.name) + elif conn.conn_type == "pystore": pst.util.delete_pystore_connector( - pr.path, pr.name, libraries=["oseries"]) - pst.util.delete_pystore_connector(pr.path, pr.name) + conn.path, conn.name, libraries=["oseries"]) + pst.util.delete_pystore_connector(conn.path, conn.name) return diff --git a/tests/test_003_pastastore.py b/tests/test_003_pastastore.py index 510b50b1..1b5acf34 100644 --- a/tests/test_003_pastastore.py +++ b/tests/test_003_pastastore.py @@ -12,127 +12,134 @@ @pytest.mark.dependency() -def test_create_model(prj): - ml = prj.create_model("oseries1") +def test_create_model(pstore): + ml = pstore.create_model("oseries1") return ml @pytest.mark.dependency() -def test_properties(prj): - _ = prj.oseries - _ = prj.stresses - _ = prj.models +def test_properties(pstore): + _ = pstore.oseries + _ = pstore.stresses + _ = pstore.models return @pytest.mark.dependency() -def test_store_model(request, prj): - depends(request, [f"test_create_model[{prj.type}]"]) - ml = test_create_model(prj) - prj.conn.add_model(ml) +def test_store_model(request, pstore): + depends(request, [f"test_create_model[{pstore.type}]"]) + ml = test_create_model(pstore) + pstore.conn.add_model(ml) return @pytest.mark.dependency() -def test_store_model_missing_series(request, prj): - depends(request, [f"test_create_model[{prj.type}]", - f"test_store_model[{prj.type}]"]) - ml = test_create_model(prj) - o = prj.get_oseries("oseries1") - meta = prj.get_metadata("oseries", "oseries1", as_frame=False) - prj.del_models("oseries1") - prj.del_oseries("oseries1") +def test_store_model_missing_series(request, pstore): + depends(request, [f"test_create_model[{pstore.type}]", + f"test_store_model[{pstore.type}]"]) + ml = test_create_model(pstore) + o = pstore.get_oseries("oseries1") + meta = pstore.get_metadata("oseries", "oseries1", as_frame=False) + pstore.del_models("oseries1") + pstore.del_oseries("oseries1") try: - prj.add_model(ml) + pstore.add_model(ml) except LookupError: - prj.add_oseries(o, "oseries1", metadata=meta) - prj.add_model(ml) + pstore.add_oseries(o, "oseries1", metadata=meta) + pstore.add_model(ml) return @pytest.mark.dependency() -def test_get_model(request, prj): - depends(request, [f"test_create_model[{prj.type}]", - f"test_store_model[{prj.type}]", - f"test_store_model_missing_series[{prj.type}]"]) - ml = prj.conn.get_models("oseries1") +def test_get_model(request, pstore): + depends(request, [f"test_create_model[{pstore.type}]", + f"test_store_model[{pstore.type}]", + f"test_store_model_missing_series[{pstore.type}]"]) + ml = pstore.conn.get_models("oseries1") return ml @pytest.mark.dependency() -def test_del_model(request, prj): - depends(request, [f"test_create_model[{prj.type}]", - f"test_store_model[{prj.type}]", - f"test_store_model_missing_series[{prj.type}]", - f"test_get_model[{prj.type}]"]) - prj.conn.del_models("oseries1") +def test_del_model(request, pstore): + depends(request, [f"test_create_model[{pstore.type}]", + f"test_store_model[{pstore.type}]", + f"test_store_model_missing_series[{pstore.type}]", + f"test_get_model[{pstore.type}]"]) + pstore.conn.del_models("oseries1") return @pytest.mark.dependency() -def test_create_models(prj): - mls = prj.create_models_bulk(["oseries1", "oseries2"], store=True, - progressbar=False) - _ = prj.conn.models +def test_create_models(pstore): + mls = pstore.create_models_bulk(["oseries1", "oseries2"], store=True, + progressbar=False) + _ = pstore.conn.models return mls @pytest.mark.dependency() -def test_get_parameters(request, prj): - depends(request, [f"test_create_models[{prj.type}]"]) - p = prj.get_parameters(progressbar=False, param_value="initial") +def test_get_parameters(request, pstore): + depends(request, [f"test_create_models[{pstore.type}]"]) + p = pstore.get_parameters(progressbar=False, param_value="initial") assert p.index.size == 2 assert p.isna().sum().sum() == 0 return p @pytest.mark.dependency() -def test_solve_models_and_get_stats(request, prj): - depends(request, [f"test_create_models[{prj.type}]"]) - mls = prj.solve_models(["oseries1", "oseries2"], - ignore_solve_errors=False, - progressbar=False, - store_result=True) - stats = prj.get_statistics(["evp", "aic"], progressbar=False) +def test_solve_models_and_get_stats(request, pstore): + depends(request, [f"test_create_models[{pstore.type}]"]) + mls = pstore.solve_models(["oseries1", "oseries2"], + ignore_solve_errors=False, + progressbar=False, + store_result=True) + stats = pstore.get_statistics(["evp", "aic"], progressbar=False) assert stats.index.size == 2 return mls, stats @pytest.mark.dependency() -def test_save_and_load_model(request, prj): - ml = prj.create_model("oseries3") - sm = ps.StressModel(prj.get_stresses('well1'), ps.Hantush, +def test_save_and_load_model(request, pstore): + ml = pstore.create_model("oseries3") + sm = ps.StressModel(pstore.get_stresses('well1'), ps.Hantush, name='well1', settings="well") ml.add_stressmodel(sm) ml.solve(tmin='1993-1-1') evp_ml = ml.stats.evp() - prj.add_model(ml, overwrite=True) - ml2 = prj.get_models(ml.name) + pstore.add_model(ml, overwrite=True) + ml2 = pstore.get_models(ml.name) evp_ml2 = ml2.stats.evp() assert allclose(evp_ml, evp_ml2) assert pst.util.compare_models(ml, ml2) return ml, ml2 # @pytest.mark.dependency() -# def test_model_results(request, prj): -# depends(request, [f"test_create_models[{prj.type}]", -# f"test_solve_models[{prj.type}]"]) -# prj.model_results(["oseries1", "oseries2"], progressbar=False) +# def test_model_results(request, pstore): +# depends(request, [f"test_create_models[{pstore.type}]", +# f"test_solve_models[{pstore.type}]"]) +# pstore.model_results(["oseries1", "oseries2"], progressbar=False) # return -def test_oseries_distances(prj): - _ = prj.get_nearest_oseries() +def test_oseries_distances(pstore): + _ = pstore.get_nearest_oseries() return -def test_repr(prj): - return prj.__repr__() +def test_repr(pstore): + return pstore.__repr__() -def test_to_from_zip(prj): - zipname = f"test_{prj.type}.zip" - prj.to_zip(zipname, progressbar=False) +def test_copy_dbase(pstore): + conn2 = pst.DictConnector("destination") + pst.util.copy_database(pstore.conn, conn2, overwrite=False, + progressbar=True) + return + + +def test_to_from_zip(pstore): + zipname = f"test_{pstore.type}.zip" + pstore.to_zip(zipname, progressbar=False) conn = pst.DictConnector("test") try: store = pst.PastaStore.from_zip(zipname, conn) @@ -142,6 +149,6 @@ def test_to_from_zip(prj): return store -def test_delete_db(prj): - pst.util.delete_pastastore(prj) +def test_delete_db(pstore): + pst.util.delete_pastastore(pstore) return diff --git a/tests/test_004_benchmark.py b/tests/test_004_benchmark.py index 54269bcf..f6a72214 100644 --- a/tests/test_004_benchmark.py +++ b/tests/test_004_benchmark.py @@ -1,8 +1,7 @@ +import numpy as np import pandas as pd -import pytest import pastastore as pst -import numpy as np - +import pytest # %% write