From addcb0be66e3b423814a6911d725075eaaedef92 Mon Sep 17 00:00:00 2001 From: Bogdan Gheorghe <112427971+bogdan-galileo@users.noreply.github.com> Date: Mon, 4 Dec 2023 17:30:18 -0500 Subject: [PATCH] fix(chore): add warnings in s2s for bad inputs (#812) --- dataquality/core/finish.py | 5 +- dataquality/integrations/seq2seq/core.py | 42 ++++++----- dataquality/schemas/seq2seq.py | 13 ++-- dataquality/utils/vaex.py | 31 +++++++- tests/integrations/seq2seq/test_core.py | 56 ++++++++++---- tests/loggers/test_seq2seq.py | 95 +++++++++++++++++++++++- 6 files changed, 197 insertions(+), 45 deletions(-) diff --git a/dataquality/core/finish.py b/dataquality/core/finish.py index 037d4d3db..9696f7eb4 100644 --- a/dataquality/core/finish.py +++ b/dataquality/core/finish.py @@ -40,8 +40,9 @@ def finish( via dq.metrics.get_data_embeddings(). Default True if a GPU is available, else default False. :param data_embs_col: Optional text col on which to compute data embeddings. - If not set, we default to 'text' which corresponds to the input text - Can also be set to `target` or `generated_output` + If not set, we default to 'text' which corresponds to the input text. + Can also be set to `target`, `generated_output` or any other column that is + logged as metadata. """ a.log_function("dq/finish") if create_data_embs is None: diff --git a/dataquality/integrations/seq2seq/core.py b/dataquality/integrations/seq2seq/core.py index 01f41ef3c..05ffaa736 100644 --- a/dataquality/integrations/seq2seq/core.py +++ b/dataquality/integrations/seq2seq/core.py @@ -121,8 +121,8 @@ def watch( """Seq2seq only. Log model generations for your run Iterates over a given dataset and logs the generations for each sample. - `model` must be an instance of transformers PreTrainedModel and have a `generate` - method. + To generate outputs, a model that is an instance of transformers PreTrainedModel + must be given and it must have a `generate` method. Unlike other watch functions, in this one we are just registering the model and generation config and not attaching any hooks to the model. We call it 'watch' @@ -135,25 +135,24 @@ def watch( max_target_tokens=max_target_tokens, ) - if model: - assert isinstance( - model, PreTrainedModel - ), "model must be an instance of transformers PreTrainedModel" - assert ( - model.can_generate() - ), "model must contain a `generate` method for seq2seq" - - if model_type == Seq2SeqModelType.decoder_only and not response_template: - raise GalileoException( - "You must specify a `response_template` when using Decoder-Only models." - " This is necessary to internally isolate the target response tokens." - ) - - if model_type == Seq2SeqModelType.encoder_decoder and response_template: + if model_type == Seq2SeqModelType.decoder_only: + if response_template is None: + raise GalileoException( + "You must specify a `response_template` when using Decoder-Only models." + " This is necessary to internally isolate the target response tokens." + ) + elif not isinstance(response_template, list) or not all( + isinstance(token, int) for token in response_template + ): + raise GalileoException( + "The response template must already be tokenized and be a list of ints." + ) + elif model_type == Seq2SeqModelType.encoder_decoder and response_template: warn( "The argument response_template is only used when working with " "DecoderOnly models. This value will be ignored." ) + seq2seq_logger_config.response_template = response_template seq2seq_logger_config.model = model seq2seq_logger_config.generation_config = generation_config @@ -170,4 +169,13 @@ def watch( generation_splits_set.add(Split[split]) + # A model of the correct type is required if we need to generate + if generation_splits: + assert isinstance( + model, PreTrainedModel + ), "model must be an instance of transformers PreTrainedModel" + assert ( + model.can_generate() + ), "model must contain a `generate` method for seq2seq" + seq2seq_logger_config.generation_splits = generation_splits_set diff --git a/dataquality/schemas/seq2seq.py b/dataquality/schemas/seq2seq.py index fa21cd95c..de7958bd7 100644 --- a/dataquality/schemas/seq2seq.py +++ b/dataquality/schemas/seq2seq.py @@ -72,22 +72,21 @@ class AlignedTokenData: token_label_offsets: List[List[Tuple[int, int]]] token_label_positions: List[List[Set[int]]] - def append(self, aligned_token_data: "AlignedTokenData") -> None: + def append(self, data: "AlignedTokenData") -> None: """Append offsets and positions for a *single* sample - Assumes that `sample_aligned_token_data` holds alignment info for + Assumes that `data` holds alignment info for a *single* data sample. As such, when appending to `token_label_offsets` and `token_label_positions` we remove the "batch" dimensions respectively. e.g. - >> sample_aligned_token_data.token_label_offsets[0] + >> data.token_label_offsets[0] """ assert ( - len(aligned_token_data.token_label_offsets) == 1 - and len(aligned_token_data.token_label_positions) == 1 + len(data.token_label_offsets) == 1 and len(data.token_label_positions) == 1 ) - self.token_label_offsets.append(aligned_token_data.token_label_offsets[0]) - self.token_label_positions.append(aligned_token_data.token_label_positions[0]) + self.token_label_offsets.append(data.token_label_offsets[0]) + self.token_label_positions.append(data.token_label_positions[0]) @dataclass diff --git a/dataquality/utils/vaex.py b/dataquality/utils/vaex.py index 97a485476..43a27e984 100644 --- a/dataquality/utils/vaex.py +++ b/dataquality/utils/vaex.py @@ -1,4 +1,5 @@ import os +import warnings from typing import Dict, List, Union import numpy as np @@ -9,7 +10,7 @@ from dataquality import config from dataquality.clients.objectstore import ObjectStore -from dataquality.exceptions import GalileoException +from dataquality.exceptions import GalileoException, GalileoWarning from dataquality.loggers.base_logger import BaseLoggerAttributes from dataquality.schemas.split import Split from dataquality.utils.cuda import ( @@ -185,12 +186,34 @@ def add_umap_pca_to_df(df: DataFrame, data_embs: bool = False) -> DataFrame: def create_data_embs_df(df: DataFrame, text_col: str, lazy: bool = True) -> DataFrame: """Runs sentence transformer on raw text to get off the shelf data embeddings - text_col can be passed in as "input" or "target" for Seq2Seq tasks + text_col can be passed in as "input" or "target" for Seq2Seq tasks. + text_col can also be any given metadata text column. :param df: The dataframe to get data embeddings for. Must have the text_col :param text_col: The column to use for calculating data embeddings :param lazy: If true, we lazily apply the model to encode the text """ + # If the specified column doesn't exist, fall back on default columns + col_to_encode = text_col + if col_to_encode not in df.get_column_names(): + col_to_encode = "" + for col in ["text", "input"]: + if col in df.get_column_names(): + warnings.warn( + f"Column `{text_col}` not found, `{col}` will be used for data " + "embeddings", + GalileoWarning, + ) + col_to_encode = col + break + + # If user specified col and default cols don't exist, raise error + if not col_to_encode: + raise GalileoException( + f"The specified column create_data_embs={text_col} in dq.finish does not " + "exist in the dataframe. Re-run dq.finish with an existing column name." + ) + # This import takes up to 25 seconds, so we don't want to eagerly import it import transformers from sentence_transformers import SentenceTransformer @@ -208,7 +231,7 @@ def apply_sentence_transformer(text: pa.array) -> np.ndarray: ) if lazy: - df_copy["emb"] = df_copy[text_col].apply_sentence_transformer() + df_copy["emb"] = df_copy[col_to_encode].apply_sentence_transformer() df_copy = df_copy[["id", "emb"]] else: import torch @@ -216,7 +239,7 @@ def apply_sentence_transformer(text: pa.array) -> np.ndarray: # Downcasts to float16 where possible, speeds up processing by 10 it/sec with torch.autocast("cuda"): df_copy["emb"] = data_model.encode( - df_copy[text_col].tolist(), show_progress_bar=True + df_copy[col_to_encode].tolist(), show_progress_bar=True ).astype(np.float32) return df_copy diff --git a/tests/integrations/seq2seq/test_core.py b/tests/integrations/seq2seq/test_core.py index 454e78f11..ed9d35df3 100644 --- a/tests/integrations/seq2seq/test_core.py +++ b/tests/integrations/seq2seq/test_core.py @@ -5,6 +5,7 @@ from tokenizers.models import BPE from transformers import PreTrainedTokenizerFast +from dataquality.exceptions import GalileoException from dataquality.integrations.seq2seq.core import set_tokenizer, watch from dataquality.schemas.seq2seq import Seq2SeqModelType from tests.conftest import TestSessionVariables, tokenizer_T5, tokenizer_T5_not_auto @@ -56,10 +57,9 @@ def test_set_tokenizer_other( ) with pytest.raises(ValueError) as e: set_tokenizer(tokenizer_T5_not_auto, "encoder_decoder") - assert str(e.value) == ( - "The tokenizer must be an instance of PreTrainedTokenizerFast " - "or Tokenizer" - ) + assert str(e.value) == ( + "The tokenizer must be an instance of PreTrainedTokenizerFast " "or Tokenizer" + ) def test_watch_invalid_task_type( @@ -71,10 +71,10 @@ def test_watch_invalid_task_type( set_test_config(task_type="text_classification") with pytest.raises(AssertionError) as e: watch(tokenizer_T5, "encoder_decoder") - assert str(e.value) == ( - "This method is only supported for seq2seq tasks. " - "Make sure to set the task type with dq.init()" - ) + assert str(e.value) == ( + "This method is only supported for seq2seq tasks. " + "Make sure to set the task type with dq.init()" + ) def test_watch_invalid_model_type( @@ -82,11 +82,41 @@ def test_watch_invalid_model_type( cleanup_after_use: Callable, test_session_vars: TestSessionVariables, ) -> None: - """Test that we can't watch without a tokenizer""" + """Test that we can't watch without an appropriate model_type""" set_test_config(task_type="seq2seq") with pytest.raises(ValueError) as e: watch(tokenizer_T5, "invalid_model_type") - assert str(e.value) == ( - f"model_type must be one of {Seq2SeqModelType.members()}, " - "got invalid_model_type" - ) + assert str(e.value) == ( + f"model_type must be one of {Seq2SeqModelType.members()}, " + "got invalid_model_type" + ) + + +def test_watch_response_template_required( + set_test_config: Callable, + cleanup_after_use: Callable, + test_session_vars: TestSessionVariables, +) -> None: + """Test that we need response template to be passed for decoder-only""" + set_test_config(task_type="seq2seq") + with pytest.raises(GalileoException) as e: + watch(tokenizer_T5, "decoder_only") + assert str(e.value) == ( + "You must specify a `response_template` when using Decoder-Only models." + " This is necessary to internally isolate the target response tokens." + ) + + +def test_watch_response_template_not_tokenized( + set_test_config: Callable, + cleanup_after_use: Callable, + test_session_vars: TestSessionVariables, +) -> None: + """Test that we need response template to be tokenized (for decoder-only)""" + set_test_config(task_type="seq2seq") + with pytest.raises(GalileoException) as e: + watch(tokenizer_T5, "decoder_only", response_template="###\nRESPONSE:\n") + assert ( + str(e.value) + == "The response template must already be tokenized and be a list of ints." + ) diff --git a/tests/loggers/test_seq2seq.py b/tests/loggers/test_seq2seq.py index 304121615..bee6a83b8 100644 --- a/tests/loggers/test_seq2seq.py +++ b/tests/loggers/test_seq2seq.py @@ -12,6 +12,7 @@ from transformers import GenerationConfig, T5ForConditionalGeneration import dataquality as dq +from dataquality.exceptions import GalileoWarning from dataquality.integrations.seq2seq.core import set_tokenizer, watch from dataquality.loggers.data_logger.base_data_logger import DataSet from dataquality.loggers.data_logger.seq2seq.seq2seq_base import Seq2SeqDataLogger @@ -28,7 +29,7 @@ generate_sample_output, ) from dataquality.utils.thread_pool import ThreadPoolManager -from dataquality.utils.vaex import GALILEO_DATA_EMBS_ENCODER +from dataquality.utils.vaex import GALILEO_DATA_EMBS_ENCODER, create_data_embs_df from tests.conftest import ( LOCAL_MODEL_PATH, TestSessionVariables, @@ -340,7 +341,6 @@ def test_tokenize_input_provide_maxlength( set_test_config: Callable, cleanup_after_use: Generator, ) -> None: - # TODO comment! """ Test that as we generate output and the user provided the max_input_tokens argument, the input is tokenized correctly to the length set by max_input_tokens. @@ -558,3 +558,94 @@ def test_create_and_upload_data_embs( assert data_embs.emb.values.ndim == 2 # mini BERT model spits out 32 dims assert data_embs.emb.values.shape == (10, 32) + + +def test_create_data_embs_df_bad_text_col_name( + cleanup_after_use: Callable, + set_test_config: Callable, + test_session_vars: TestSessionVariables, +) -> None: + """Test data embeddings flow works with bad col name + + When the wrong column name is passed, test that the flow still works + using the default value of "input". + """ + set_test_config(task_type=TaskType.seq2seq) + # Use the local mini bert model + os.environ[GALILEO_DATA_EMBS_ENCODER] = LOCAL_MODEL_PATH + + watch(tokenizer_T5, "encoder_decoder", generation_splits=[]) + + input_1, input_2 = "dog dog dog done - tricked you", "bird" + target_1, target_2 = "cat cat cat cat cat done", "cat" + ds = Dataset.from_dict( + { + "id": [0, 1], + "inpinp": [input_1, input_2], + "tongtong": [target_1, target_2], + } + ) + data_logger = Seq2SeqDataLogger() + data_logger.log_dataset(ds, text="inpinp", label="tongtong", split="training") + + df = vaex.open(f"{data_logger.input_data_path}/**/data*.arrow") + + # Check that no exception is thrown and that data embs are created + assert "text" not in df.get_column_names() + with pytest.warns(GalileoWarning) as gw: + data_embs = create_data_embs_df(df, text_col="text") + + warning_msg = "Column `text` not found, `input` will be used for data embeddings" + assert str(list(gw)[0].message) == warning_msg + + assert len(data_embs) == 2 + assert data_embs.get_column_names() == ["id", "emb"] + assert isinstance(data_embs.emb.values, np.ndarray) + assert data_embs.emb.values.ndim == 2 + # mini BERT model spits out 32 dims + assert data_embs.emb.values.shape == (2, 32) + + +def test_create_data_embs_df_custom_column( + cleanup_after_use: Callable, + set_test_config: Callable, + test_session_vars: TestSessionVariables, +) -> None: + """Test that data embeddings work with a column specified by the user""" + set_test_config(task_type=TaskType.seq2seq) + # Use the local mini bert model + os.environ[GALILEO_DATA_EMBS_ENCODER] = LOCAL_MODEL_PATH + + watch(tokenizer_T5, "encoder_decoder", generation_splits=[]) + + input_1, input_2 = "dog dog dog done - tricked you", "bird" + target_1, target_2 = "cat cat cat cat cat done", "cat" + other = "just some text" + ds = Dataset.from_dict( + { + "id": [0, 1], + "inpinp": [input_1, input_2], + "tartar": [target_1, target_2], + "other": [other, other], + } + ) + data_logger = Seq2SeqDataLogger() + data_logger.log_dataset( + ds, text="inpinp", label="tartar", split="training", meta=["other"] + ) + + df = vaex.open(f"{data_logger.input_data_path}/**/data*.arrow") + + # Check that no exception is thrown and that data embs are created + assert "text" not in df.get_column_names() + with pytest.warns(None): + data_embs = create_data_embs_df(df, text_col="other") + + assert len(data_embs) == 2 + assert data_embs.get_column_names() == ["id", "emb"] + np_emb = data_embs.emb.values + assert isinstance(np_emb, np.ndarray) + # mini BERT model spits out 32 dims + assert np_emb.shape == (2, 32) + # Check that the two embeddings are the same, i.e., we used the column "other" + assert np.isclose(np_emb[0], np_emb[1]).all()