From 2f469dd38fca5372d8e959a35fdc6672486c6811 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Tue, 6 Jun 2023 12:17:01 -0400 Subject: [PATCH 1/3] save the pca model durig the gpu umap flow --- dataquality/utils/cuda.py | 14 ++++++++++++-- dataquality/utils/emb.py | 33 ++++++++++++++++++++++++++++++++- dataquality/utils/vaex.py | 13 ++++++++++++- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/dataquality/utils/cuda.py b/dataquality/utils/cuda.py index 4cafa4630..b8a2b5721 100644 --- a/dataquality/utils/cuda.py +++ b/dataquality/utils/cuda.py @@ -1,5 +1,11 @@ +from typing import Tuple + import numpy as np +from dataquality.clients.objectstore import ObjectStore + +object_store = ObjectStore() + PCA_CHUNK_SIZE = 100_000 PCA_N_COMPONENTS = 100 @@ -13,16 +19,20 @@ def cuml_available() -> bool: return False -def get_pca_embeddings(embs: np.ndarray) -> np.ndarray: +def get_pca_embeddings(embs: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Uses Cuda and GPUs to create the PCA embeddings before uploading Should only be called if cuda ML available (`cuda_available()`) + + Returns the PCA embeddings, the components_ of the pca model, and the mean_ of + the pca model """ import cuml n_components = min(PCA_N_COMPONENTS, *embs.shape) pca = cuml.IncrementalPCA(n_components=n_components, batch_size=PCA_CHUNK_SIZE) - return pca.fit_transform(embs) + emb_pca = pca.fit_transform(embs) + return emb_pca, pca.components_, pca.mean_ def get_umap_embeddings(embs: np.ndarray) -> np.ndarray: diff --git a/dataquality/utils/emb.py b/dataquality/utils/emb.py index a403fa853..0f08891cd 100644 --- a/dataquality/utils/emb.py +++ b/dataquality/utils/emb.py @@ -7,6 +7,7 @@ from pydantic import UUID4 from vaex.dataframe import DataFrame +from dataquality import config from dataquality.clients.objectstore import ObjectStore from dataquality.utils.file import ( get_largest_epoch_for_splits, @@ -14,13 +15,21 @@ ) from dataquality.utils.hdf5_store import HDF5_STORE from dataquality.utils.vaex import ( + COMPONENTS, + MEAN, add_umap_pca_to_df, create_data_embs_df, get_output_df, ) +object_store = ObjectStore() DATA_EMB_PATH = "data_emb/data_emb.hdf5" +PCA_COMPONENTS_NAME = "components.hdf5" +PCA_COMPONENTS_OBJECT_PATH = f"pca/{PCA_COMPONENTS_NAME}" +PCA_MEAN_NAME = "mean.hdf5" +PCA_MEAN_OBJECT_PATH = f"pca/{PCA_MEAN_NAME}" + def get_concat_emb_df(run_dir: str, split_epoch: Dict[str, int]) -> DataFrame: """Returns the concatenated embedding df for all available (non inf) splits""" @@ -59,6 +68,26 @@ def save_processed_emb_dfs( os.rename(tmp_loc, split_loc) +def _upload_pca_data(df: DataFrame) -> None: + project_id, run_id = config.current_project_id, config.current_run_id + # Save the components as an hdf5 file + components_file = f"/tmp/{PCA_COMPONENTS_NAME}" + components_object_path = f"{project_id}/{run_id}/{PCA_COMPONENTS_OBJECT_PATH}" + df[[COMPONENTS]].export(components_file) + bucket = config.results_bucket_name + object_store.create_object( + components_object_path, components_file, progress=False, bucket_name=bucket + ) + + # Save the mean as an hdf5 file + mean_file_path = f"/tmp/{PCA_MEAN_NAME}" + mean_object_path = f"{project_id}/{run_id}/{PCA_MEAN_OBJECT_PATH}" + df[[MEAN]].export(mean_file_path) + object_store.create_object( + mean_object_path, mean_file_path, progress=False, bucket_name=bucket + ) + + def apply_umap_to_embs(run_dir: str, last_epoch: Optional[int]) -> None: """In the event that the user has Nvidia cuml installed, we apply UMAP locally @@ -70,6 +99,9 @@ def apply_umap_to_embs(run_dir: str, last_epoch: Optional[int]) -> None: """ # Get the correct epoch to process for each split split_epoch = get_largest_epoch_for_splits(run_dir, last_epoch) + # In the case of inference only + if not split_epoch: + return concat_df = get_concat_emb_df(run_dir, split_epoch) df_emb = add_umap_pca_to_df(concat_df) save_processed_emb_dfs(df_emb, split_epoch, run_dir) @@ -93,7 +125,6 @@ def upload_umap_data_embs( inference, we store it in the inference name. So when the split is inference, we further filter the dataframe by inference name and upload the df. """ - object_store = ObjectStore() df = vaex.open(f"{input_data_dir}/**/data*.arrow") try: df_emb = create_data_embs_df(df, lazy=False) diff --git a/dataquality/utils/vaex.py b/dataquality/utils/vaex.py index 593f1e6dc..696860941 100644 --- a/dataquality/utils/vaex.py +++ b/dataquality/utils/vaex.py @@ -7,6 +7,7 @@ from sklearn.decomposition import IncrementalPCA from vaex.dataframe import DataFrame +from dataquality.clients.objectstore import ObjectStore from dataquality.exceptions import GalileoException from dataquality.loggers.base_logger import BaseLoggerAttributes from dataquality.schemas.split import Split @@ -20,11 +21,16 @@ from dataquality.utils.hdf5_store import HDF5_STORE, concat_hdf5_files from dataquality.utils.helpers import galileo_verbose_logging +object_store = ObjectStore() + # To decide between "all-MiniLM-L6-v2" or "all-mpnet-base-v2" # https://www.sbert.net/docs/pretrained_models.html#model-overview GALILEO_DATA_EMBS_ENCODER = "GALILEO_DATA_EMBS_ENCODER" DEFAULT_DATA_EMBS_MODEL = "all-MiniLM-L6-v2" +COMPONENTS = "components" +MEAN = "mean" + def _join_in_out_frames( in_df: DataFrame, out_df: DataFrame, allow_missing_in_df_ids: bool = False @@ -136,11 +142,16 @@ def add_umap_pca_to_df(df: DataFrame, data_embs: bool = False) -> DataFrame: note = "[data embs]" if data_embs else "[embs]" print(f"{note} Found cuda ML libraries") print(f"{note} Applying dimensionality reduction step 1/2") - emb_pca = get_pca_embeddings(dfc["emb"].to_numpy()) + emb_pca, components, mean = get_pca_embeddings(dfc["emb"].to_numpy()) print(f"{note} Applying dimensionality reduction step 2/2") emb_xy = get_umap_embeddings(emb_pca) x, y = ("data_x", "data_y") if data_embs else ("x", "y") dfc["emb_pca"] = emb_pca + # We save the components and mean of the PCA model to minio + # see utils/emb.py::apply_umap_to_embs + if not data_embs: + df[COMPONENTS] = components + df[MEAN] = mean dfc[x] = emb_xy[:, 0] dfc[y] = emb_xy[:, 1] return dfc From 158d6fa22f51409d3a2ccd19e684951bb3c1ba98 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Tue, 6 Jun 2023 12:38:38 -0400 Subject: [PATCH 2/3] diff logic --- dataquality/utils/emb.py | 28 ---------------------------- dataquality/utils/vaex.py | 28 ++++++++++++++++++++++++++-- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/dataquality/utils/emb.py b/dataquality/utils/emb.py index 0f08891cd..ead44f896 100644 --- a/dataquality/utils/emb.py +++ b/dataquality/utils/emb.py @@ -7,7 +7,6 @@ from pydantic import UUID4 from vaex.dataframe import DataFrame -from dataquality import config from dataquality.clients.objectstore import ObjectStore from dataquality.utils.file import ( get_largest_epoch_for_splits, @@ -15,8 +14,6 @@ ) from dataquality.utils.hdf5_store import HDF5_STORE from dataquality.utils.vaex import ( - COMPONENTS, - MEAN, add_umap_pca_to_df, create_data_embs_df, get_output_df, @@ -25,11 +22,6 @@ object_store = ObjectStore() DATA_EMB_PATH = "data_emb/data_emb.hdf5" -PCA_COMPONENTS_NAME = "components.hdf5" -PCA_COMPONENTS_OBJECT_PATH = f"pca/{PCA_COMPONENTS_NAME}" -PCA_MEAN_NAME = "mean.hdf5" -PCA_MEAN_OBJECT_PATH = f"pca/{PCA_MEAN_NAME}" - def get_concat_emb_df(run_dir: str, split_epoch: Dict[str, int]) -> DataFrame: """Returns the concatenated embedding df for all available (non inf) splits""" @@ -68,26 +60,6 @@ def save_processed_emb_dfs( os.rename(tmp_loc, split_loc) -def _upload_pca_data(df: DataFrame) -> None: - project_id, run_id = config.current_project_id, config.current_run_id - # Save the components as an hdf5 file - components_file = f"/tmp/{PCA_COMPONENTS_NAME}" - components_object_path = f"{project_id}/{run_id}/{PCA_COMPONENTS_OBJECT_PATH}" - df[[COMPONENTS]].export(components_file) - bucket = config.results_bucket_name - object_store.create_object( - components_object_path, components_file, progress=False, bucket_name=bucket - ) - - # Save the mean as an hdf5 file - mean_file_path = f"/tmp/{PCA_MEAN_NAME}" - mean_object_path = f"{project_id}/{run_id}/{PCA_MEAN_OBJECT_PATH}" - df[[MEAN]].export(mean_file_path) - object_store.create_object( - mean_object_path, mean_file_path, progress=False, bucket_name=bucket - ) - - def apply_umap_to_embs(run_dir: str, last_epoch: Optional[int]) -> None: """In the event that the user has Nvidia cuml installed, we apply UMAP locally diff --git a/dataquality/utils/vaex.py b/dataquality/utils/vaex.py index 696860941..779985394 100644 --- a/dataquality/utils/vaex.py +++ b/dataquality/utils/vaex.py @@ -7,6 +7,7 @@ from sklearn.decomposition import IncrementalPCA from vaex.dataframe import DataFrame +from dataquality import config from dataquality.clients.objectstore import ObjectStore from dataquality.exceptions import GalileoException from dataquality.loggers.base_logger import BaseLoggerAttributes @@ -30,6 +31,10 @@ COMPONENTS = "components" MEAN = "mean" +PCA_COMPONENTS_NAME = "components.hdf5" +PCA_COMPONENTS_OBJECT_PATH = f"pca/{PCA_COMPONENTS_NAME}" +PCA_MEAN_NAME = "mean.hdf5" +PCA_MEAN_OBJECT_PATH = f"pca/{PCA_MEAN_NAME}" def _join_in_out_frames( @@ -89,6 +94,26 @@ def validate_unique_ids(df: DataFrame, epoch_or_inf_name: str) -> None: raise GalileoException(exc) +def _upload_pca_data(components: np.ndarray, mean: np.ndarray) -> None: + project_id, run_id = config.current_project_id, config.current_run_id + # Save the components as an hdf5 file + components_file = f"/tmp/{PCA_COMPONENTS_NAME}" + components_object_path = f"{project_id}/{run_id}/{PCA_COMPONENTS_OBJECT_PATH}" + vaex.from_dict({COMPONENTS: components}).export(components_file) + bucket = config.results_bucket_name + object_store.create_object( + components_object_path, components_file, progress=False, bucket_name=bucket + ) + + # Save the mean as an hdf5 file + mean_file_path = f"/tmp/{PCA_MEAN_NAME}" + mean_object_path = f"{project_id}/{run_id}/{PCA_MEAN_OBJECT_PATH}" + vaex.from_dict({MEAN: mean}).export(mean_file_path) + object_store.create_object( + mean_object_path, mean_file_path, progress=False, bucket_name=bucket + ) + + def get_dup_ids(df: DataFrame) -> List: """Gets the list of duplicate IDs in a dataframe, if any""" df_copy = df.copy() @@ -150,8 +175,7 @@ def add_umap_pca_to_df(df: DataFrame, data_embs: bool = False) -> DataFrame: # We save the components and mean of the PCA model to minio # see utils/emb.py::apply_umap_to_embs if not data_embs: - df[COMPONENTS] = components - df[MEAN] = mean + _upload_pca_data(components, mean) dfc[x] = emb_xy[:, 0] dfc[y] = emb_xy[:, 1] return dfc From 01e8e8d161e4a496cfe8b3f5257159ab21de9973 Mon Sep 17 00:00:00 2001 From: Ben Epstein Date: Tue, 6 Jun 2023 12:50:42 -0400 Subject: [PATCH 3/3] bump for release --- dataquality/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataquality/__init__.py b/dataquality/__init__.py index 9f3116e75..bc0a0330a 100644 --- a/dataquality/__init__.py +++ b/dataquality/__init__.py @@ -30,7 +30,7 @@ dataquality.get_insights() """ -__version__ = "v0.8.48" +__version__ = "v0.8.49" import sys from typing import Any, List, Optional