Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/save pca model gpumap #648

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataquality/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
dataquality.get_insights()
"""

__version__ = "v0.8.48"
__version__ = "v0.8.49"

import sys
from typing import Any, List, Optional
Expand Down
14 changes: 12 additions & 2 deletions dataquality/utils/cuda.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from typing import Tuple

import numpy as np

from dataquality.clients.objectstore import ObjectStore

object_store = ObjectStore()

PCA_CHUNK_SIZE = 100_000
PCA_N_COMPONENTS = 100

Expand All @@ -13,16 +19,20 @@ def cuml_available() -> bool:
return False


def get_pca_embeddings(embs: np.ndarray) -> np.ndarray:
def get_pca_embeddings(embs: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Uses Cuda and GPUs to create the PCA embeddings before uploading

Should only be called if cuda ML available (`cuda_available()`)

Returns the PCA embeddings, the components_ of the pca model, and the mean_ of
the pca model
"""
import cuml

n_components = min(PCA_N_COMPONENTS, *embs.shape)
pca = cuml.IncrementalPCA(n_components=n_components, batch_size=PCA_CHUNK_SIZE)
return pca.fit_transform(embs)
emb_pca = pca.fit_transform(embs)
return emb_pca, pca.components_, pca.mean_


def get_umap_embeddings(embs: np.ndarray) -> np.ndarray:
Expand Down
5 changes: 4 additions & 1 deletion dataquality/utils/emb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
get_output_df,
)

object_store = ObjectStore()
DATA_EMB_PATH = "data_emb/data_emb.hdf5"


Expand Down Expand Up @@ -70,6 +71,9 @@ def apply_umap_to_embs(run_dir: str, last_epoch: Optional[int]) -> None:
"""
# Get the correct epoch to process for each split
split_epoch = get_largest_epoch_for_splits(run_dir, last_epoch)
# In the case of inference only
if not split_epoch:
return
concat_df = get_concat_emb_df(run_dir, split_epoch)
df_emb = add_umap_pca_to_df(concat_df)
save_processed_emb_dfs(df_emb, split_epoch, run_dir)
Expand All @@ -93,7 +97,6 @@ def upload_umap_data_embs(
inference, we store it in the inference name. So when the split is inference,
we further filter the dataframe by inference name and upload the df.
"""
object_store = ObjectStore()
df = vaex.open(f"{input_data_dir}/**/data*.arrow")
try:
df_emb = create_data_embs_df(df, lazy=False)
Expand Down
37 changes: 36 additions & 1 deletion dataquality/utils/vaex.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from sklearn.decomposition import IncrementalPCA
from vaex.dataframe import DataFrame

from dataquality import config
from dataquality.clients.objectstore import ObjectStore
from dataquality.exceptions import GalileoException
from dataquality.loggers.base_logger import BaseLoggerAttributes
from dataquality.schemas.split import Split
Expand All @@ -20,11 +22,20 @@
from dataquality.utils.hdf5_store import HDF5_STORE, concat_hdf5_files
from dataquality.utils.helpers import galileo_verbose_logging

object_store = ObjectStore()

# To decide between "all-MiniLM-L6-v2" or "all-mpnet-base-v2"
# https://www.sbert.net/docs/pretrained_models.html#model-overview
GALILEO_DATA_EMBS_ENCODER = "GALILEO_DATA_EMBS_ENCODER"
DEFAULT_DATA_EMBS_MODEL = "all-MiniLM-L6-v2"

COMPONENTS = "components"
MEAN = "mean"
PCA_COMPONENTS_NAME = "components.hdf5"
PCA_COMPONENTS_OBJECT_PATH = f"pca/{PCA_COMPONENTS_NAME}"
PCA_MEAN_NAME = "mean.hdf5"
PCA_MEAN_OBJECT_PATH = f"pca/{PCA_MEAN_NAME}"


def _join_in_out_frames(
in_df: DataFrame, out_df: DataFrame, allow_missing_in_df_ids: bool = False
Expand Down Expand Up @@ -83,6 +94,26 @@ def validate_unique_ids(df: DataFrame, epoch_or_inf_name: str) -> None:
raise GalileoException(exc)


def _upload_pca_data(components: np.ndarray, mean: np.ndarray) -> None:
project_id, run_id = config.current_project_id, config.current_run_id
# Save the components as an hdf5 file
components_file = f"/tmp/{PCA_COMPONENTS_NAME}"
components_object_path = f"{project_id}/{run_id}/{PCA_COMPONENTS_OBJECT_PATH}"
vaex.from_dict({COMPONENTS: components}).export(components_file)
bucket = config.results_bucket_name
object_store.create_object(
components_object_path, components_file, progress=False, bucket_name=bucket
)

# Save the mean as an hdf5 file
mean_file_path = f"/tmp/{PCA_MEAN_NAME}"
mean_object_path = f"{project_id}/{run_id}/{PCA_MEAN_OBJECT_PATH}"
vaex.from_dict({MEAN: mean}).export(mean_file_path)
object_store.create_object(
mean_object_path, mean_file_path, progress=False, bucket_name=bucket
)


def get_dup_ids(df: DataFrame) -> List:
"""Gets the list of duplicate IDs in a dataframe, if any"""
df_copy = df.copy()
Expand Down Expand Up @@ -136,11 +167,15 @@ def add_umap_pca_to_df(df: DataFrame, data_embs: bool = False) -> DataFrame:
note = "[data embs]" if data_embs else "[embs]"
print(f"{note} Found cuda ML libraries")
print(f"{note} Applying dimensionality reduction step 1/2")
emb_pca = get_pca_embeddings(dfc["emb"].to_numpy())
emb_pca, components, mean = get_pca_embeddings(dfc["emb"].to_numpy())
print(f"{note} Applying dimensionality reduction step 2/2")
emb_xy = get_umap_embeddings(emb_pca)
x, y = ("data_x", "data_y") if data_embs else ("x", "y")
dfc["emb_pca"] = emb_pca
# We save the components and mean of the PCA model to minio
# see utils/emb.py::apply_umap_to_embs
if not data_embs:
_upload_pca_data(components, mean)
dfc[x] = emb_xy[:, 0]
dfc[y] = emb_xy[:, 1]
return dfc
Expand Down