diff --git a/dataquality/__init__.py b/dataquality/__init__.py index dd64c6363..e882ca2e2 100644 --- a/dataquality/__init__.py +++ b/dataquality/__init__.py @@ -31,7 +31,7 @@ """ -__version__ = "0.9.8" +__version__ = "0.10.0" import sys from typing import Any, List, Optional diff --git a/dataquality/core/log.py b/dataquality/core/log.py index 670fed955..be1fd6e56 100644 --- a/dataquality/core/log.py +++ b/dataquality/core/log.py @@ -123,30 +123,28 @@ def log_data_sample(*, text: str, id: int, **kwargs: Any) -> None: def log_image_dataset( dataset: DataSet, *, - imgs_colname: Optional[str] = None, - imgs_location_colname: Optional[str] = None, - imgs_remote_location: Optional[str] = None, + imgs_local_colname: Optional[str] = None, + imgs_remote: Optional[str] = None, batch_size: int = ITER_CHUNK_SIZE, id: str = "id", - label: Union[str, int] = "label", + label: str = "label", split: Optional[Split] = None, inference_name: Optional[str] = None, - meta: Union[List[str], List[int], None] = None, + meta: Optional[List[str]] = None, parallel: bool = False, **kwargs: Any, ) -> None: """ Log an image dataset of input samples for image classification - :param dataset: The dataset to log. This can be a Pandas/Vaex dataframe or an + :param dataset: The dataset to log. This can be a Pandas/HF dataframe or an ImageFolder (from Torchvision). - :param imgs_colname: If the images are passed as bytes in the dataframe, this - indicates the name of the column containing the images - :param imgs_location_colname: If the images are passed via their path in the - dataframe, this indicates the name of the column containing the paths. - These paths could be remote (skip upload) or local (upload) - :param imgs_remote_location: If the dataset is of type ImageFolder and the - images are stored remotely, pass the folder name here to avoid upload + :param imgs_local_colname: The name of the column containing the local images + (typically paths but could also be bytes for HF dataframes). Ignored for + ImageFolder where local paths are directly retrieved from the dataset. + :param imgs_remote: The name of the column containing paths to the remote images (in + the case of a df) or remote directory containing the images (in the case of + ImageFolder). Specifying this argument is required to skip uploading the images. :param batch_size: Number of samples to log in a batch. Default 10,000 :param id: The name of the column containing the ids (in the dataframe) :param label: The name of the column containing the labels (in the dataframe) @@ -165,11 +163,13 @@ def log_image_dataset( "This method is only supported for image tasks. " "Please use dq.log_samples for text tasks." ) + + # TODO: raise warning if imgs_local is None (and we provide no smart features) + data_logger.log_image_dataset( dataset=dataset, - imgs_colname=imgs_colname, - imgs_location_colname=imgs_location_colname, - imgs_remote_location=imgs_remote_location, + imgs_local_colname=imgs_local_colname, + imgs_remote=imgs_remote, batch_size=batch_size, id=id, label=label, diff --git a/dataquality/integrations/fastai.py b/dataquality/integrations/fastai.py index 27b7cd37e..e5729f995 100644 --- a/dataquality/integrations/fastai.py +++ b/dataquality/integrations/fastai.py @@ -15,6 +15,7 @@ from dataquality.analytics import Analytics from dataquality.clients.api import ApiClient from dataquality.exceptions import GalileoException +from dataquality.loggers.data_logger.image_classification import GAL_LOCAL_IMAGES_PATHS from dataquality.loggers.logger_config.base_logger_config import BaseLoggerConfig from dataquality.schemas.split import Split from dataquality.utils.helpers import galileo_disabled @@ -396,7 +397,7 @@ def convert_img_dl_to_df(dl: DataLoader, x_col: str = "image") -> pd.DataFrame: a.log_function("fastai/convert_img_dl_to_df") additional_data = {} if x_col == "image": - additional_data["text"] = dl.items + additional_data[GAL_LOCAL_IMAGES_PATHS] = dl.items x, y = [], [] for x_item, y_item in dl.dataset: x.append(x_item) diff --git a/dataquality/loggers/data_logger/base_data_logger.py b/dataquality/loggers/data_logger/base_data_logger.py index c6800b0f5..4e847655b 100644 --- a/dataquality/loggers/data_logger/base_data_logger.py +++ b/dataquality/loggers/data_logger/base_data_logger.py @@ -10,6 +10,7 @@ import numpy as np import pandas as pd import vaex +from datasets.arrow_dataset import Dataset as HFDataset from huggingface_hub.utils import HfHubHTTPError from vaex.dataframe import DataFrame @@ -40,7 +41,7 @@ ) DATA_FOLDERS = ["emb", "prob", "data"] -DataSet = TypeVar("DataSet", bound=Union[Iterable, pd.DataFrame, DataFrame]) +DataSet = TypeVar("DataSet", bound=Union[Iterable, pd.DataFrame, HFDataset, DataFrame]) MetasType = TypeVar("MetasType", bound=Dict[str, List[Union[str, float, int]]]) MetaType = TypeVar("MetaType", bound=Dict[str, Union[str, float, int]]) ITER_CHUNK_SIZE = 100_000 @@ -219,6 +220,26 @@ def export_df(self, df: vaex.DataFrame) -> None: def support_data_embs(self) -> bool: return True + def apply_column_map(self, dataset: DataSet, column_map: Dict[str, str]) -> DataSet: + """Rename columns in the dataset according to the column_map + + This function works for both pandas and HF datasets + """ + # Remove any columns that are mapped to themselves + column_map = {k: v for k, v in column_map.items() if k != v} + + if isinstance(dataset, pd.DataFrame): + dataset = dataset.rename(columns=column_map) + elif self.is_hf_dataset(dataset): + import datasets + + assert isinstance(dataset, datasets.Dataset) + for old_col, new_col in column_map.items(): + if old_col in dataset.column_names: # HF breaks if col doesn't exist + dataset = dataset.rename_column(old_col, new_col) + + return dataset + def upload( self, last_epoch: Optional[int] = None, create_data_embs: bool = False ) -> None: diff --git a/dataquality/loggers/data_logger/image_classification.py b/dataquality/loggers/data_logger/image_classification.py index 86703f932..785d36f53 100644 --- a/dataquality/loggers/data_logger/image_classification.py +++ b/dataquality/loggers/data_logger/image_classification.py @@ -3,7 +3,7 @@ import glob import os import tempfile -from typing import Any, Dict, List, Optional, Set, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union import numpy as np import pandas as pd @@ -29,6 +29,7 @@ from dataquality.utils.vaex import validate_unique_ids ITER_CHUNK_SIZE_IMAGES = 10000 +GAL_LOCAL_IMAGES_PATHS = "gal_local_images_paths" class ImageClassificationDataLogger(TextClassificationDataLogger): @@ -64,53 +65,57 @@ def log_image_dataset( self, dataset: Union[DataSet, "ImageFolder"], # type: ignore # noqa: F821 *, - imgs_colname: Optional[str] = None, - imgs_location_colname: Optional[str] = None, - imgs_remote_location: Optional[str] = None, + imgs_local_colname: Optional[str] = None, + imgs_remote: Optional[str] = None, batch_size: int = ITER_CHUNK_SIZE_IMAGES, id: str = "id", - label: Union[str, int] = "label", + label: str = "label", split: Optional[Split] = None, inference_name: Optional[str] = None, - meta: Union[List[str], List[int], None] = None, + meta: Optional[List[str]] = None, column_map: Optional[Dict[str, str]] = None, parallel: bool = False, ) -> Any: - """ - For docstring see top level method located in core/log.py - """ + """For main docstring see top level method located in core/log.py.""" if type(dataset).__name__ == "ImageFolder": + # For ImageFolder we ignore imgs_local_colname since not dataframe was + # passed in and we infer it from the ImageFolder dataset = self._prepare_df_from_ImageFolder( - dataset, imgs_remote_location, split + dataset=dataset, imgs_remote_location=imgs_remote, split=split ) - imgs_location_colname = "text" + # In _prepare_df_from_ImageFolder, we set a column GAL_LOCAL_IMAGES_PATHS + # that maps to the local image path + imgs_local_colname = GAL_LOCAL_IMAGES_PATHS + else: + if imgs_local_colname is None and imgs_remote is None: + raise GalileoException( + "Must provide imgs_local_colname or imgs_remote when using a df" + ) - if imgs_colname is None and imgs_location_colname is None: - raise GalileoException( - "Must provide one of imgs_colname or imgs_location_colname." - ) + # Get the column mapping and rename imgs_local and imgs_remote if required column_map = column_map or {id: "id"} + if imgs_local_colname is not None: + column_map[imgs_local_colname] = GAL_LOCAL_IMAGES_PATHS + # Rename the col with remote path to "text" (it would be renamed to "text" later + # anyways since IC inherits the logging methods from TC which uses "text") + if imgs_remote is not None: + column_map[imgs_remote] = "text" + + dataset = self.apply_column_map(dataset, column_map) + # If no remote paths are found, upload to the local images to the objectstore if isinstance(dataset, pd.DataFrame): - dataset = dataset.rename(columns=column_map) - if self._dataset_requires_upload_prep( - dataset=dataset, imgs_location_colname=imgs_location_colname - ): - dataset = self._prepare_content( - dataset=dataset, - imgs_location_colname=imgs_location_colname, - parallel=parallel, - ) + dataset, has_local_paths = self._prepare_content(dataset, parallel) elif self.is_hf_dataset(dataset): - dataset = self._prepare_hf( - dataset, - imgs_colname=imgs_colname, - imgs_location_colname=imgs_location_colname, - id_=id, - ) + dataset, has_local_paths = self._prepare_hf(dataset, id) else: raise GalileoException( f"Dataset must be one of pandas or HF, but got {type(dataset)}" ) + + meta = meta or [] + if has_local_paths and GAL_LOCAL_IMAGES_PATHS not in meta: + meta.append(GAL_LOCAL_IMAGES_PATHS) + self.log_dataset( dataset=dataset, batch_size=batch_size, @@ -122,22 +127,34 @@ def log_image_dataset( meta=meta, ) - def _dataset_requires_upload_prep( - self, dataset: pd.DataFrame, imgs_location_colname: Optional[str] = None - ) -> bool: - if imgs_location_colname is None: - raise GalileoException( - "Must provide imgs_location_colname in order to upload content." - ) - return os.path.isfile(dataset[imgs_location_colname].iloc[0]) + def _has_remote_images(self, dataset: DataSet) -> bool: + """Check if the dataset contains a column containing remote images""" + if isinstance(dataset, pd.DataFrame): + columns = dataset.columns + elif self.is_hf_dataset(dataset): + columns = dataset.column_names # type: ignore # noqa: F821 + + return "text" in columns def _prepare_content( self, dataset: pd.DataFrame, - imgs_location_colname: Optional[str], parallel: bool = False, - ) -> pd.DataFrame: - file_list = dataset[imgs_location_colname].tolist() + ) -> Tuple[pd.DataFrame, bool]: + """ + Uploads local images to ObjectStore and adds remote paths to the df in a column + called "text". + + NOTE: If the dataset already contains remote paths, this function does nothing + """ + has_local_paths = GAL_LOCAL_IMAGES_PATHS in dataset.columns + + # No need to upload data if we already have access to remote images + if self._has_remote_images(dataset): + return dataset, has_local_paths + + # If it doesn't have remote images, it necessarily has local images + file_list = dataset[GAL_LOCAL_IMAGES_PATHS].tolist() project_id = config.current_project_id with tempfile.TemporaryDirectory() as temp_dir: @@ -155,14 +172,18 @@ def _prepare_content( ) df = vaex.open(f"{temp_dir}/*.arrow") df = df.to_pandas_df() + # df has columns "file_path", "object_path" we merge with original dataset - # on imgs_location_colname and rename "object_path" to "text" - dataset = dataset.merge(df, left_on=imgs_location_colname, right_on="file_path") - dataset["text"] = dataset["object_path"] + # on GAL_LOCAL_IMAGES_PATHS and rename "object_path" to imgs_remote_colname + dataset = dataset.merge( + df, left_on=GAL_LOCAL_IMAGES_PATHS, right_on="file_path" + ) + dataset.rename(columns={"object_path": "text"}, inplace=True) + dataset.drop(columns=["file_path"], inplace=True) for f in glob.glob(f"{temp_dir}/*.arrow"): os.remove(f) - return dataset + return dataset, has_local_paths def convert_large_string(self, df: DataFrame) -> DataFrame: """We override to avoid doing the computation to check if the text is over 2GB @@ -177,12 +198,16 @@ def convert_large_string(self, df: DataFrame) -> DataFrame: def _prepare_hf( self, dataset: DataSet, - imgs_colname: Optional[str], - imgs_location_colname: Optional[str], id_: str, # returns HF dataset, hard to express in mypy without # importing the datasets package - ) -> Any: + ) -> Tuple[DataSet, bool]: + """ + If remote paths already exist in the df, do nothing. + + If not, upload the images to the objectstore and add their paths in the df in + the column imgs_remote_colname := GAL_REMOTE_IMAGES_PATHS + """ import datasets assert isinstance(dataset, datasets.Dataset) @@ -191,24 +216,37 @@ def _prepare_hf( if id_ not in dataset.column_names: dataset = dataset.add_column(name=id_, column=list(range(len(dataset)))) - if imgs_colname is not None: - # HF datasets Image feature - from dataquality.utils.hf_images import process_hf_image_feature_for_logging + # Check if the data in the local_col are string (paths) and not bytes + has_local_paths = (GAL_LOCAL_IMAGES_PATHS in dataset.column_names) and ( + dataset.features[GAL_LOCAL_IMAGES_PATHS].dtype == "string" + ) + + # No need to upload data if we already have access to remote images + if self._has_remote_images(dataset): + return dataset, has_local_paths - prepared = process_hf_image_feature_for_logging(dataset, imgs_colname) - elif imgs_location_colname is not None: - # file paths + if dataset.features[GAL_LOCAL_IMAGES_PATHS].dtype == "string": + # Case where the column contains paths to the images from dataquality.utils.hf_images import process_hf_image_paths_for_logging prepared = process_hf_image_paths_for_logging( - dataset, imgs_location_colname + dataset, GAL_LOCAL_IMAGES_PATHS + ) + elif isinstance(dataset.features[GAL_LOCAL_IMAGES_PATHS], datasets.Image): + # Case where the column contains Image feature + # We will not have local paths in this case + from dataquality.utils.hf_images import process_hf_image_feature_for_logging + + prepared = process_hf_image_feature_for_logging( + dataset, GAL_LOCAL_IMAGES_PATHS ) else: raise GalileoException( - "Must provide one of imgs_colname or imgs_location_colname." + f"The argument imgs_local={GAL_LOCAL_IMAGES_PATHS} doesn't point" + "to a column containing local paths or images. Pass a valid column name" ) - return prepared + return prepared, has_local_paths def _prepare_df_from_ImageFolder( self, @@ -220,10 +258,15 @@ def _prepare_df_from_ImageFolder( Create a dataframe containing the ids, labels and paths of the images coming from an ImageFolder dataset. """ + # Extract the local paths from the ImageFolder dataset and add them to the df if split == Split.inference: - df = pd.DataFrame(columns=["text"], data=[img[0] for img in dataset.imgs]) + df = pd.DataFrame( + columns=[GAL_LOCAL_IMAGES_PATHS], data=[img[0] for img in dataset.imgs] + ) else: - df = pd.DataFrame(columns=["text", "label"], data=dataset.imgs) + df = pd.DataFrame( + columns=[GAL_LOCAL_IMAGES_PATHS, "label"], data=dataset.imgs + ) label_idx_to_label = { label_idx: label for label, label_idx in dataset.class_to_idx.items() } @@ -231,9 +274,11 @@ def _prepare_df_from_ImageFolder( df = df.reset_index().rename(columns={"index": "id"}) - # Replace the paths with the remote one, if a remote location is specified + # Also add remote paths, if a remote location is specified if imgs_remote_location is not None: - df["text"] = df["text"].str.replace(dataset.root, imgs_remote_location) + df["text"] = df[GAL_LOCAL_IMAGES_PATHS].str.replace( + dataset.root, imgs_remote_location + ) return df @@ -307,6 +352,9 @@ def process_in_out_frames( else: emb_df = out_frame[["id", "emb"]] remove_cols = emb_df.get_column_names() + prob_df.get_column_names() + # We also don't want to upload the local image paths to ObjectStore + # for processing + remove_cols.append(GAL_LOCAL_IMAGES_PATHS) # The data df needs pred, which is in the prob_df, so we join just on that # col diff --git a/dataquality/loggers/data_logger/text_classification.py b/dataquality/loggers/data_logger/text_classification.py index c867a4af7..e70f7eea0 100644 --- a/dataquality/loggers/data_logger/text_classification.py +++ b/dataquality/loggers/data_logger/text_classification.py @@ -239,11 +239,25 @@ def log_dataset( self._log_df(chunk_df, meta) elif self.is_hf_dataset(dataset): self._log_hf_dataset( - dataset, batch_size, text, id, meta, label, split, inference_name + dataset, + batch_size, + text, + id, + meta, + label, + split, + inference_name, ) elif isinstance(dataset, Iterable): self._log_iterator( - dataset, batch_size, text, id, meta, label, split, inference_name + dataset, + batch_size, + text, + id, + meta, + label, + split, + inference_name, ) else: raise GalileoException( diff --git a/dataquality/utils/hf_images.py b/dataquality/utils/hf_images.py index fd651bdb4..83024ef93 100644 --- a/dataquality/utils/hf_images.py +++ b/dataquality/utils/hf_images.py @@ -1,6 +1,5 @@ import datasets -from dataquality.exceptions import GalileoException from dataquality.utils.cv import _bytes_to_img, _write_image_bytes_to_objectstore @@ -25,17 +24,11 @@ def _hf_map_image_feature(example: dict, imgs_colname: str) -> dict: def process_hf_image_feature_for_logging( dataset: datasets.Dataset, imgs_colname: str ) -> datasets.Dataset: - if not isinstance(dataset.features[imgs_colname], datasets.Image): - raise GalileoException( - f"Got imgs_colname={repr(imgs_colname)}, but that " - "dataset feature does not contain images. If your dataset has " - "image paths, pass imgs_location_colname instead." - ) - dataset = dataset.cast_column(imgs_colname, datasets.Image(decode=False)) dataset = dataset.map( - _hf_map_image_feature, fn_kwargs=dict(imgs_colname=imgs_colname) + _hf_map_image_feature, + fn_kwargs=dict(imgs_colname=imgs_colname), ) return dataset @@ -51,13 +44,6 @@ def _hf_map_image_file_path(example: dict, imgs_location_colname: str) -> dict: def process_hf_image_paths_for_logging( dataset: datasets.Dataset, imgs_location_colname: str ) -> datasets.Dataset: - if dataset.features[imgs_location_colname].dtype != "string": - raise GalileoException( - f"Got imgs_location_colname={repr(imgs_location_colname)}, but that " - "dataset feature does not contain strings. If your dataset uses " - "datasets.Image features, pass imgs_colname instead." - ) - dataset = dataset.map( _hf_map_image_file_path, fn_kwargs=dict(imgs_location_colname=imgs_location_colname), diff --git a/tests/integrations/fastai/test_cv_fai.py b/tests/integrations/fastai/test_cv_fai.py index 6b5822511..e1f08a726 100644 --- a/tests/integrations/fastai/test_cv_fai.py +++ b/tests/integrations/fastai/test_cv_fai.py @@ -87,15 +87,15 @@ def test_fast_ai_integration_e2e( dq.set_labels_for_run(["nocat", "cat"]) for data, split in zip(dls, ["training", "validation"]): df = convert_img_dl_to_df(data) - df["text"] = "s3://..." - dq.log_image_dataset(df, split=split, imgs_location_colname="text") + df["remote_images_paths"] = "s3://..." + dq.log_image_dataset(df, split=split, imgs_remote="remote_images_paths") ThreadPoolManager.wait_for_threads() learn = vision_learner(dls, "resnet10t", metrics=error_rate) dqc = FastAiDQCallback(finish=False) learn.add_cb(dqc) learn.fine_tune(2, freeze_epochs=0) - dq.log_image_dataset(df, imgs_location_colname="text", split="test") + dq.log_image_dataset(df, imgs_remote="remote_images_paths", split="test") dl_test = learn.dls.test_dl(pd.Series(image_files[:-3])) dqc.prepare_split("test") learn.get_preds(dl=dl_test) diff --git a/tests/loggers/test_image_classification.py b/tests/loggers/test_image_classification.py index 47fc17bbf..dfca2b6cc 100644 --- a/tests/loggers/test_image_classification.py +++ b/tests/loggers/test_image_classification.py @@ -258,7 +258,7 @@ def _test_hf_image_dataset(name, test_session_vars: TestSessionVariables) -> Non dq.log_image_dataset( dataset=dataset_info["dataset"], label="label", - imgs_colname=dataset_info["imgs_colname"], + imgs_local_colname=dataset_info["imgs_colname"], split="training", ) @@ -332,7 +332,7 @@ def save_and_record_path(example, index): dq.log_image_dataset( dataset=dataset_with_paths, label="label", - imgs_location_colname="path", + imgs_local_colname="path", split="training", ) @@ -357,11 +357,11 @@ def test_prepare_df_from_ImageFolder() -> None: df = image_logger._prepare_df_from_ImageFolder(dataset=train_dataset) # Assert that the dataframe is how we'd expect it to be by looking at the folder - assert set(df.columns) == {"id", "text", "label"} + assert set(df.columns) == {"id", "label", "gal_local_images_paths"} assert len(df) == 4 assert set(df.label.unique()) == {"labelA", "labelB"} assert set(df.id.unique()) == set(range(4)) - assert df.loc[0, "text"].endswith(".png") + assert df.loc[0, "gal_local_images_paths"].endswith(".png") def test_prepare_df_from_ImageFolder_with_remote_imgs() -> None: @@ -381,7 +381,12 @@ def test_prepare_df_from_ImageFolder_with_remote_imgs() -> None: ) # Assert that the dataframe is how we'd expect it to be by looking at the folder - assert set(df.columns) == {"id", "text", "label"} + assert set(df.columns) == { + "id", + "text", + "label", + "gal_local_images_paths", + } assert len(df) == 4 assert set(df.label.unique()) == {"labelA", "labelB"} assert set(df.id.unique()) == set(range(4)) @@ -406,7 +411,7 @@ def test_prepare_df_from_ImageFolder_inference() -> None: # Assert that the dataframe is how we'd expect it to be by looking at the folder # with no labels - assert set(df.columns) == {"id", "text"} + assert set(df.columns) == {"id", "gal_local_images_paths"} assert len(df) == 4 assert set(df.id.unique()) == set(range(4)) - assert df.loc[0, "text"].endswith(".png") + assert df.loc[0, "gal_local_images_paths"].endswith(".png") diff --git a/tests/loggers/test_log_image_dataset.py b/tests/loggers/test_log_image_dataset.py new file mode 100644 index 000000000..69bd0a75c --- /dev/null +++ b/tests/loggers/test_log_image_dataset.py @@ -0,0 +1,337 @@ +import os.path +from tempfile import TemporaryDirectory +from typing import Callable, Generator +from unittest.mock import MagicMock, patch + +import vaex +from datasets import load_dataset +from torchvision.datasets import ImageFolder + +import dataquality as dq +from dataquality.loggers.data_logger.image_classification import GAL_LOCAL_IMAGES_PATHS +from dataquality.utils.thread_pool import ThreadPoolManager +from tests.assets.constants import TEST_IMAGES_FOLDER_ROOT +from tests.conftest import TestSessionVariables +from tests.test_utils.mock_data import MockDatasetCV + +mnist_dataset = load_dataset("mnist", split="train").select(range(20)) + +TESTING_DATASETS = { + "mnist": dict( + dataset=mnist_dataset, + labels=mnist_dataset.features["label"].names, + imgs_colname="image", + ) +} + + +@patch.object(dq.core.finish, "upload_dq_log_file") +@patch.object(dq.clients.api.ApiClient, "make_request") +@patch.object(dq.clients.objectstore.ObjectStore, "create_object") +def test_with_pd_local_only( + mock_create_object: MagicMock, + mock_make_request: MagicMock, + mock_upload_log_file: MagicMock, + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with a pd when only local data is provided + """ + set_test_config(task_type="image_classification") + + cvdata = MockDatasetCV() + imgs_local_colname = "image_path" + + dq.set_labels_for_run(cvdata.labels) + dq.log_image_dataset( + dataset=cvdata.dataframe, + imgs_local_colname=imgs_local_colname, + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the df contains the remote images under "text" (minio rel path) + minio_path = df["text"].tolist()[0] + assert minio_path.count("/") == 1 and minio_path.split(".")[-1] == "png" + # assert that the saved df also contains the local images in the specified column + assert ( + df[GAL_LOCAL_IMAGES_PATHS].tolist()[0] + == cvdata.dataframe.loc[0, imgs_local_colname] + ) + assert os.path.isfile(df[GAL_LOCAL_IMAGES_PATHS].tolist()[0]) + + +def test_with_pd_remote_only( + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with a pd when only remote data is provided + """ + set_test_config(task_type="image_classification") + + cvdata = MockDatasetCV() + imgs_remote_colname = "remote_image_path" + + dq.set_labels_for_run(cvdata.labels) + dq.log_image_dataset( + dataset=cvdata.dataframe, + imgs_remote=imgs_remote_colname, + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the saved df contains the remote images in the specified column + assert df["text"].tolist()[0] == cvdata.dataframe.loc[0, imgs_remote_colname] + + +def test_with_pd_local_and_remote( + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with a pd when local and remote data are provided + """ + set_test_config(task_type="image_classification") + + cvdata = MockDatasetCV() + imgs_local_colname = "image_path" + imgs_remote_colname = "remote_image_path" + + dq.set_labels_for_run(cvdata.labels) + dq.log_image_dataset( + dataset=cvdata.dataframe, + imgs_local_colname=imgs_local_colname, + imgs_remote=imgs_remote_colname, + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the saved df contains the local images in the specified column + assert ( + df[GAL_LOCAL_IMAGES_PATHS].tolist()[0] + == cvdata.dataframe.loc[0, imgs_local_colname] + ) + assert os.path.isfile(df[GAL_LOCAL_IMAGES_PATHS].tolist()[0]) + # assert that the saved df contains the remote images under "text" + assert df["text"].tolist()[0] == cvdata.dataframe.loc[0, imgs_remote_colname] + + +@patch.object(dq.clients.objectstore.ObjectStore, "create_object") +def test_with_hf_local_only_images( + mock_create_object: MagicMock, + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with HF when only local data is provided (as images). + We don't have local paths in this case. + """ + set_test_config(task_type="image_classification") + imgs_local_colname = "image" + + with TemporaryDirectory(): + dataset_info = TESTING_DATASETS["mnist"] + dataset = dataset_info["dataset"] + + dq.set_labels_for_run(dataset_info["labels"]) + dq.log_image_dataset( + dataset=dataset, + label="label", + imgs_local_colname=imgs_local_colname, + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the df contains the remote images under "text" (minio rel path) + minio_path = df["text"].tolist()[0] + assert minio_path.count("/") == 1 and minio_path.split(".")[-1] == "png" + + +@patch.object(dq.clients.objectstore.ObjectStore, "create_object") +def test_with_hf_local_only_paths( + mock_create_object: MagicMock, + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with HF when only local data is provided (as paths) + """ + set_test_config(task_type="image_classification") + imgs_local_colname = "image_path" + + with TemporaryDirectory() as imgs_dir: + + def save_and_record_path(example, index): + path = os.path.join(imgs_dir, f"{index:04d}.jpg") + example["image"].save(path) + return {imgs_local_colname: path, **example} + + dataset_info = TESTING_DATASETS["mnist"] + dataset = dataset_info["dataset"] + dataset_with_paths = dataset.map(save_and_record_path, with_indices=True) + + dq.set_labels_for_run(dataset_info["labels"]) + dq.log_image_dataset( + dataset=dataset_with_paths, + imgs_local_colname=imgs_local_colname, + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the df contains the remote images under "text" (minio rel path) + minio_path = df["text"].tolist()[0] + assert minio_path.count("/") == 1 and minio_path.split(".")[-1] == "jpg" + # assert that the saved df contains the local images in the specified column + assert ( + df[GAL_LOCAL_IMAGES_PATHS].tolist()[0] + == dataset_with_paths[imgs_local_colname][0] + ) + assert os.path.isfile(df[GAL_LOCAL_IMAGES_PATHS].tolist()[0]) + + +def test_with_hf_local_and_remote( + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with HF when local data is provided (as paths) + remote paths + """ + set_test_config(task_type="image_classification") + imgs_local_colname = "image_path" + imgs_remote_colname = "remote_image_path" + fake_remote_base = "s3://some_bucket/some_dir" + + with TemporaryDirectory() as imgs_dir: + + def save_and_record_path(example, index): + image_name = f"{index:04d}.jpg" + path = os.path.join(imgs_dir, image_name) + remote_path = f"{fake_remote_base}/{image_name}" + example["image"].save(path) + return { + imgs_local_colname: path, + imgs_remote_colname: remote_path, + **example, + } + + dataset_info = TESTING_DATASETS["mnist"] + dataset = dataset_info["dataset"] + dataset_with_paths = dataset.map(save_and_record_path, with_indices=True) + + dq.set_labels_for_run(dataset_info["labels"]) + dq.log_image_dataset( + dataset=dataset_with_paths, + imgs_local_colname=imgs_local_colname, + imgs_remote=imgs_remote_colname, + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the saved df contains the remote images under "text" + assert df["text"].tolist()[0] == dataset_with_paths[0].get(imgs_remote_colname) + # assert that the saved df contains the local images in the specified column + assert df[GAL_LOCAL_IMAGES_PATHS].tolist()[0] == dataset_with_paths[0].get( + imgs_local_colname + ) + assert os.path.isfile(df[GAL_LOCAL_IMAGES_PATHS].tolist()[0]) + + +@patch.object(dq.core.finish, "upload_dq_log_file") +@patch.object(dq.clients.api.ApiClient, "make_request") +@patch.object(dq.clients.objectstore.ObjectStore, "create_object") +def test_with_ImageFolder_local_only( + mock_create_object: MagicMock, + mock_make_request: MagicMock, + mock_upload_log_file: MagicMock, + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with Imagefolder when only local data is provided + """ + set_test_config(task_type="image_classification") + + train_dataset = ImageFolder(root=TEST_IMAGES_FOLDER_ROOT) + + dq.set_labels_for_run(train_dataset.classes) + dq.log_image_dataset( + dataset=train_dataset, + imgs_local="randommmm", + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the df contains the remote images under "text" (minio rel path) + minio_path = df["text"].tolist()[0] + assert minio_path.count("/") == 1 and minio_path.split(".")[-1] == "png" + # assert that the saved df also contains the local images in the specified column + assert df["gal_local_images_paths"].tolist()[0] == train_dataset.samples[0][0] + assert os.path.isfile(df["gal_local_images_paths"].tolist()[0]) + + +@patch.object(dq.core.finish, "upload_dq_log_file") +@patch.object(dq.clients.api.ApiClient, "make_request") +@patch.object(dq.clients.objectstore.ObjectStore, "create_object") +def test_with_ImageFolder_local_and_remote( + mock_create_object: MagicMock, + mock_make_request: MagicMock, + mock_upload_log_file: MagicMock, + set_test_config: Callable, + cleanup_after_use: Generator, + test_session_vars: TestSessionVariables, +) -> None: + """ + Test logging the data with Imagefolder when local and remote data are provided + """ + set_test_config(task_type="image_classification") + + train_dataset = ImageFolder(root=TEST_IMAGES_FOLDER_ROOT) + imgs_remote = "s3://some_bucket/some_dir" + + dq.set_labels_for_run(train_dataset.classes) + dq.log_image_dataset( + dataset=train_dataset, + imgs_remote=imgs_remote, + split="training", + ) + + # read logged data + ThreadPoolManager.wait_for_threads() + df = vaex.open(f"{test_session_vars.LOCATION}/input_data/training/*.arrow") + + # assert that the saved df contains the local images in the specified column + assert df["gal_local_images_paths"].tolist()[0] == train_dataset.samples[0][0] + assert os.path.isfile(df["gal_local_images_paths"].tolist()[0]) + # assert that the saved df contains the remote images under "text" + assert df["text"].tolist()[0].startswith(imgs_remote) + assert df["text"].tolist()[0].endswith(train_dataset.samples[0][0].split("/")[-1]) diff --git a/tests/test_utils/mock_data.py b/tests/test_utils/mock_data.py index 900d2d3ad..94afb5fda 100644 --- a/tests/test_utils/mock_data.py +++ b/tests/test_utils/mock_data.py @@ -1,6 +1,11 @@ +import os +from pathlib import Path + import pandas as pd from torch.utils.data import Dataset +from tests.assets.constants import TEST_IMAGES_FOLDER_ROOT + mock_dict = { "text": [ "i didnt feel humiliated", @@ -85,3 +90,43 @@ def __getitem__(self, index: int) -> tuple: def __len__(self) -> int: return len(self.dataframe) + + +# Define CV dataset class +class MockDatasetCV(Dataset): + def __init__(self, root_folder: str = TEST_IMAGES_FOLDER_ROOT) -> None: + self.local_root_folder = root_folder + self.remote_root_folder = ( + "s3://galileo-s3os-images/ImageClassification/dq_tests/" + ) + self.labels = os.listdir(self.local_root_folder) + + self.dataframe = pd.DataFrame(columns=["image_name", "label"]) + for label in self.labels: + df_label = pd.DataFrame( + data=os.listdir(Path(self.local_root_folder) / label), + columns=["image_name"], + ) + df_label["label"] = label + self.dataframe = pd.concat([self.dataframe, df_label]) + self.dataframe = self.dataframe.reset_index(drop=True) + self.dataframe = self.dataframe.reset_index().rename(columns={"index": "id"}) + self.dataframe["image_path"] = self.dataframe[["image_name", "label"]].apply( + lambda r: str(Path(self.local_root_folder) / r["label"] / r["image_name"]), + axis=1, + ) + self.dataframe["remote_image_path"] = self.dataframe[ + ["image_name", "label"] + ].apply( + lambda r: str( + f'{self.remote_root_folder} / {r["label"]} / {r["image_name"]}' + ), + axis=1, + ) + + def __getitem__(self, index: int) -> tuple: + label, image_path = self.dataframe[["label", "image_path"]].iloc[index] + return label, image_path + + def __len__(self) -> int: + return len(self.dataframe)