From ef649929bb47c2febb9c8313dc2a0b0bda7ae6f2 Mon Sep 17 00:00:00 2001 From: sararb Date: Tue, 29 Nov 2022 17:58:17 -0500 Subject: [PATCH 1/8] add the inference fix to ReplaceMaskedEmbeddings --- merlin/models/tf/transforms/sequence.py | 19 +++++++++++++++++++ tests/unit/tf/transformers/test_block.py | 4 ++-- tests/unit/tf/transforms/test_sequence.py | 8 ++++---- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/merlin/models/tf/transforms/sequence.py b/merlin/models/tf/transforms/sequence.py index 8350a948d7..d1cf1e3422 100644 --- a/merlin/models/tf/transforms/sequence.py +++ b/merlin/models/tf/transforms/sequence.py @@ -651,6 +651,8 @@ def call( self, inputs: Union[tf.Tensor, tf.RaggedTensor], targets: Optional[Union[tf.Tensor, tf.RaggedTensor, TabularData]] = None, + training: bool = False, + testing: bool = False, ) -> Union[tf.Tensor, tf.RaggedTensor]: """If the sequence of input embeddings or the corresponding sequential targets is masked (with `tensor._keras_mask` defined), @@ -664,11 +666,28 @@ def call( targets : Union[tf.Tensor, tf.RaggedTensor, TabularData], optional The target values, from which the mask can be extracted if targets inputs._keras_mask is defined. + training : bool, optional + Flag that indicates whether in training mode, by default True + testing : bool, optional + Flag that indicates whether in evaluation mode, by default True Returns ------- Union[tf.Tensor, tf.RaggedTensor] If training, returns a tensor with the masked inputs replaced by the dummy embedding """ + if not testing and not training: + # Infers the mask from the inputs or targets + mask = self._infer_mask_from_inputs_or_targets(inputs, targets) + if not mask: + target_positions = tf.tile( + tf.expand_dims(self.masked_embedding, 0), [tf.shape(inputs)[0], 1] + ) + outputs = tf.concat([inputs, tf.expand_dims(target_positions, 1)], axis=1) + return outputs + else: + # TODO: mask should be defined if padded dense tensors are provided + pass + outputs = inputs # Infers the mask from the inputs or targets mask = self._infer_mask_from_inputs_or_targets(inputs, targets) diff --git a/tests/unit/tf/transformers/test_block.py b/tests/unit/tf/transformers/test_block.py index 910615c20a..d3421e82b4 100644 --- a/tests/unit/tf/transformers/test_block.py +++ b/tests/unit/tf/transformers/test_block.py @@ -307,7 +307,7 @@ def test_transformer_with_masked_language_modeling(sequence_testing_data: Datase predictions = model.predict(loader, batch_size=8, steps=1) # TODO: Decide what should be the output of predictions for MLM (currently it predicts for all # positions of the sequence, but typically you want a single next-item prediction) - assert predictions.shape == (8, 4, 51997) + assert predictions.shape == (8, 5, 51997) @pytest.mark.parametrize("run_eagerly", [True, False]) @@ -339,7 +339,7 @@ def test_transformer_with_masked_language_modeling_check_eval_masked( inputs = itertools.islice(iter(loader), 1) outputs = model.predict(inputs, pre=seq_mask_random) - assert list(outputs.shape) == [8, 4, 51997] + assert list(outputs.shape) == [8, 5, 51997] testing_utils.model_test( model, diff --git a/tests/unit/tf/transforms/test_sequence.py b/tests/unit/tf/transforms/test_sequence.py index 89afa11be4..8b453245f8 100644 --- a/tests/unit/tf/transforms/test_sequence.py +++ b/tests/unit/tf/transforms/test_sequence.py @@ -274,7 +274,7 @@ def test_replace_masked_input_embeddings_no_target(): targets = None masked_embeddings = mm.ReplaceMaskedEmbeddings() - output = masked_embeddings(item_id_emb_seq, targets=targets) + output = masked_embeddings(item_id_emb_seq, targets=targets, training=True) # Checks that no input embedding was replaced, as there was no masking defined tf.Assert(tf.logical_not(tf.reduce_all(output == item_id_emb_seq)), []) @@ -284,7 +284,7 @@ def test_not_replace_unmasked_sequence_embeddings(): targets = tf.random.uniform((8, 10), dtype=tf.float32) masked_embeddings = mm.ReplaceMaskedEmbeddings() - output = masked_embeddings(item_id_emb_seq, targets=targets) + output = masked_embeddings(item_id_emb_seq, targets=targets, training=True) # Checks that no input embedding was replaced, as there was no masking defined tf.Assert(tf.reduce_all(output == item_id_emb_seq), []) @@ -297,7 +297,7 @@ def test_replace_masked_input_2d_embeddings_incompatible_2d_mask(): masked_embeddings = mm.ReplaceMaskedEmbeddings() with pytest.raises(Exception) as exc_info: - _ = masked_embeddings(item_id_emb_seq) + _ = masked_embeddings(item_id_emb_seq, training=True) assert "The inputs and mask need to be compatible" in str(exc_info.value) @@ -309,7 +309,7 @@ def test_replace_masked_input_2d_embeddings_incompatible_ragged_2d_mask(): masked_embeddings = mm.ReplaceMaskedEmbeddings() with pytest.raises(Exception) as exc_info: - _ = masked_embeddings(item_id_emb_seq) + _ = masked_embeddings(item_id_emb_seq, training=True) assert "The inputs and mask need to be compatible" in str(exc_info.value) From 6a5e5820e135c20afea50853526ace50d385f28e Mon Sep 17 00:00:00 2001 From: sararb Date: Thu, 1 Dec 2022 10:58:22 -0500 Subject: [PATCH 2/8] first solution for inference support --- merlin/models/tf/transformers/block.py | 2 +- merlin/models/tf/transformers/transforms.py | 56 +++++++++++++++++++++ merlin/models/tf/transforms/sequence.py | 54 +++++++++++++++----- tests/unit/tf/transformers/test_block.py | 16 ++++-- 4 files changed, 109 insertions(+), 19 deletions(-) diff --git a/merlin/models/tf/transformers/block.py b/merlin/models/tf/transformers/block.py index e2259ede9c..ed55f9aac0 100644 --- a/merlin/models/tf/transformers/block.py +++ b/merlin/models/tf/transformers/block.py @@ -91,7 +91,7 @@ def __init__( self.transformer = get_tf_main_layer(transformer) else: self.transformer = transformer - + self.transformer.supports_masking = True if "transformer" in inspect.signature(transformer_pre.__init__).parameters: transformer_pre = transformer_pre(transformer=self.transformer) self.transformer_pre = transformer_pre diff --git a/merlin/models/tf/transformers/transforms.py b/merlin/models/tf/transformers/transforms.py index d0739dec9f..19e20df76b 100644 --- a/merlin/models/tf/transformers/transforms.py +++ b/merlin/models/tf/transformers/transforms.py @@ -37,10 +37,62 @@ class LastHiddenState(Layer): The output class returned by the HuggingFace transformer layer """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.supports_masking = True + def call(self, inputs: TFBaseModelOutputWithPoolingAndCrossAttentions): return inputs.last_hidden_state +@Block.registry.register("inference_hidden_state") +@tf.keras.utils.register_keras_serializable(package="merlin.models") +class InferenceHiddenState(Layer): + """A post-processing layer to select the hidden state + of the next-item position, during inference. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.supports_masking = True + + def call( + self, + inputs: tf.Tensor, + training: bool = False, + testing: bool = False, + mask: tf.Tensor = None, + ): + """Select the hidden state of the target position, during inference. + During training or testing, the inputs are returned + without any processing. + + Parameters: + ---------- + inputs: tf.Tensor + The 3-D output tensor returned by the transformer block + training : bool, optional + Flag that indicates whether in training mode, by default True + testing : bool, optional + Flag that indicates whether in evaluation mode, by default True + mask: tf.Tensor + Boolean tensor that indicates the target ("next-item") position at + inference. + + Returns + ------- + tf.Tensor + If inference, returns a 2-D tensor with the hidden states of + the target position + """ + if not training and not testing: + if mask is not None: + if isinstance(mask, tf.RaggedTensor): + mask = mask.to_tensor() + inputs = tf.reshape(tf.boolean_mask(inputs, mask), (-1, inputs.shape[-1])) + return inputs + + @Block.registry.register("pooler_output") @tf.keras.utils.register_keras_serializable(package="merlin.models") class PoolerOutput(Layer): @@ -113,6 +165,10 @@ def call(self, inputs: TFBaseModelOutputWithPoolingAndCrossAttentions): class PrepareTransformerInputs(tf.keras.layers.Layer): """Prepare the dictionary of inputs expected by the transformer layer""" + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.supports_masking = True + def call(self, inputs: tf.Tensor) -> Dict[str, tf.Tensor]: if isinstance(inputs, tf.RaggedTensor): # convert to a dense tensor as HF transformers do not support ragged tensors diff --git a/merlin/models/tf/transforms/sequence.py b/merlin/models/tf/transforms/sequence.py index d1cf1e3422..146eb590e9 100644 --- a/merlin/models/tf/transforms/sequence.py +++ b/merlin/models/tf/transforms/sequence.py @@ -621,6 +621,41 @@ def from_config(cls, config): return cls(schema, target, **config) +@tf.keras.utils.register_keras_serializable(package="merlin.models") +class SequenceMaskLastInference(Block): + def call(self, inputs, training=False, testing=False): + self.inference_mode = not training and not testing + if self.inference_mode: + # Extending sequences in one position by copying the last embedding + repeat = inputs[:, -1:, :] + # repeat = tf.expand_dims(repeat, 1) + inputs = tf.concat([inputs, repeat], axis=1) + return inputs + + def compute_mask(self, inputs, mask=None): + """Selects (masks) the nex position after the + last valid (non-padded) position of the sequential targets + to be predicted. + This method is called by Keras after call() + and returns the mask that is going to be assigned + to the input tensors, being accessible + by tensor._keras_mask + """ + + targets_mask = None + if self.inference_mode: + if isinstance(inputs, tf.RaggedTensor): + row_lengths = inputs.row_lengths(1) + 1 + max_seq_length = tf.cast(tf.reduce_max(row_lengths), tf.int32) + + padding_mask = tf.sequence_mask(row_lengths) + targets_mask = tf.ragged.boolean_mask( + tf.cast(tf.one_hot(row_lengths - 1, max_seq_length), tf.bool), padding_mask + ) + + return targets_mask + + @tf.keras.utils.register_keras_serializable(package="merlin.models") class ReplaceMaskedEmbeddings(Block): """Takes a 3D input tensor (batch size x seq. length x embedding dim) and replaces @@ -637,6 +672,10 @@ class ReplaceMaskedEmbeddings(Block): Masked Language Modeling (BERT-like) """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.supports_masking = True + def build(self, input_shape): self.hidden_size = input_shape[-1] if self.hidden_size is None: @@ -675,19 +714,8 @@ def call( Union[tf.Tensor, tf.RaggedTensor] If training, returns a tensor with the masked inputs replaced by the dummy embedding """ - if not testing and not training: - # Infers the mask from the inputs or targets - mask = self._infer_mask_from_inputs_or_targets(inputs, targets) - if not mask: - target_positions = tf.tile( - tf.expand_dims(self.masked_embedding, 0), [tf.shape(inputs)[0], 1] - ) - outputs = tf.concat([inputs, tf.expand_dims(target_positions, 1)], axis=1) - return outputs - else: - # TODO: mask should be defined if padded dense tensors are provided - pass - + if not training and not testing: + return inputs outputs = inputs # Infers the mask from the inputs or targets mask = self._infer_mask_from_inputs_or_targets(inputs, targets) diff --git a/tests/unit/tf/transformers/test_block.py b/tests/unit/tf/transformers/test_block.py index d3421e82b4..40c322283e 100644 --- a/tests/unit/tf/transformers/test_block.py +++ b/tests/unit/tf/transformers/test_block.py @@ -15,6 +15,7 @@ RobertaBlock, XLNetBlock, ) +from merlin.models.tf.transforms.sequence import SequenceMaskLastInference from merlin.models.tf.utils import testing_utils from merlin.schema import Tags @@ -281,7 +282,13 @@ def test_transformer_with_masked_language_modeling(sequence_testing_data: Datase seq_schema.select_by_tag(Tags.CATEGORICAL), sequence_combiner=None ), ), - BertBlock(d_model=48, n_head=8, n_layer=2, pre=mm.ReplaceMaskedEmbeddings()), + BertBlock( + d_model=48, + n_head=8, + n_layer=2, + pre=mm.SequentialBlock([SequenceMaskLastInference(), mm.ReplaceMaskedEmbeddings()]), + post="inference_hidden_state", + ), mm.CategoricalOutput( seq_schema.select_by_name(target), default_loss="categorical_crossentropy", @@ -304,10 +311,9 @@ def test_transformer_with_masked_language_modeling(sequence_testing_data: Datase metrics = model.evaluate(loader, batch_size=8, steps=1, return_dict=True, pre=seq_mask_last) assert len(metrics) > 0 + # Get predictions for next-item position predictions = model.predict(loader, batch_size=8, steps=1) - # TODO: Decide what should be the output of predictions for MLM (currently it predicts for all - # positions of the sequence, but typically you want a single next-item prediction) - assert predictions.shape == (8, 5, 51997) + assert predictions.shape == (8, 51997) @pytest.mark.parametrize("run_eagerly", [True, False]) @@ -339,7 +345,7 @@ def test_transformer_with_masked_language_modeling_check_eval_masked( inputs = itertools.islice(iter(loader), 1) outputs = model.predict(inputs, pre=seq_mask_random) - assert list(outputs.shape) == [8, 5, 51997] + assert list(outputs.shape) == [8, 4, 51997] testing_utils.model_test( model, From a135fe2afb8713f7f254bed90841106c2406d0b7 Mon Sep 17 00:00:00 2001 From: sararb Date: Thu, 1 Dec 2022 19:25:36 -0500 Subject: [PATCH 3/8] updates based on PR comments --- merlin/models/tf/__init__.py | 1 + merlin/models/tf/transformers/transforms.py | 19 ++++++----- merlin/models/tf/transforms/sequence.py | 36 ++++++++++----------- tests/unit/tf/transformers/test_block.py | 3 +- 4 files changed, 30 insertions(+), 29 deletions(-) diff --git a/merlin/models/tf/__init__.py b/merlin/models/tf/__init__.py index 51f968f4ae..71033c6edf 100644 --- a/merlin/models/tf/__init__.py +++ b/merlin/models/tf/__init__.py @@ -159,6 +159,7 @@ from merlin.models.tf.transforms.sequence import ( ReplaceMaskedEmbeddings, SequenceMaskLast, + SequenceMaskLastInference, SequenceMaskRandom, SequencePredictLast, SequencePredictNext, diff --git a/merlin/models/tf/transformers/transforms.py b/merlin/models/tf/transformers/transforms.py index 19e20df76b..7f4643cc53 100644 --- a/merlin/models/tf/transformers/transforms.py +++ b/merlin/models/tf/transformers/transforms.py @@ -61,7 +61,6 @@ def call( inputs: tf.Tensor, training: bool = False, testing: bool = False, - mask: tf.Tensor = None, ): """Select the hidden state of the target position, during inference. During training or testing, the inputs are returned @@ -75,9 +74,6 @@ def call( Flag that indicates whether in training mode, by default True testing : bool, optional Flag that indicates whether in evaluation mode, by default True - mask: tf.Tensor - Boolean tensor that indicates the target ("next-item") position at - inference. Returns ------- @@ -86,10 +82,10 @@ def call( the target position """ if not training and not testing: - if mask is not None: - if isinstance(mask, tf.RaggedTensor): - mask = mask.to_tensor() - inputs = tf.reshape(tf.boolean_mask(inputs, mask), (-1, inputs.shape[-1])) + if getattr(inputs, "_keras_mask", None) is not None: + inputs = tf.reshape( + tf.boolean_mask(inputs, inputs._keras_mask), (-1, inputs.shape[-1]) + ) return inputs @@ -170,9 +166,16 @@ def __init__(self, **kwargs): self.supports_masking = True def call(self, inputs: tf.Tensor) -> Dict[str, tf.Tensor]: + mask = None + if getattr(inputs, "_keras_mask", None) is not None and isinstance( + inputs._keras_mask, tf.RaggedTensor + ): + mask = inputs._keras_mask.to_tensor() if isinstance(inputs, tf.RaggedTensor): # convert to a dense tensor as HF transformers do not support ragged tensors inputs = inputs.to_tensor() + if mask is not None: + inputs._keras_mask = mask return {"inputs_embeds": inputs} diff --git a/merlin/models/tf/transforms/sequence.py b/merlin/models/tf/transforms/sequence.py index 146eb590e9..aad51da98d 100644 --- a/merlin/models/tf/transforms/sequence.py +++ b/merlin/models/tf/transforms/sequence.py @@ -659,17 +659,23 @@ def compute_mask(self, inputs, mask=None): @tf.keras.utils.register_keras_serializable(package="merlin.models") class ReplaceMaskedEmbeddings(Block): """Takes a 3D input tensor (batch size x seq. length x embedding dim) and replaces - by a dummy trainable single embedding at the positions to be masked. - This block looks for the Keras mask (`._keras_mask`) in the following order: - 1. Checks if the input tensor has a mask - 2. Checks if there is a single target and if it has a mask - 3. If there are multiple targets (dict) returns the mask of the target - that matches the first 2 dims of the input - This is useful to be used when PredictMasked() transformation is used in - the Loader, which randomly selects some targets to be predicted and uses - Keras Masking to cascade the `_keras_mask`. By replacing input embeddings - at masked positions we avoid target leakage when training models with - Masked Language Modeling (BERT-like) + by a dummy trainable single embedding at the positions to be masked. + This block looks for the Keras mask (`._keras_mask`) in the following order: + 1. Checks if the input tensor has a mask + 2. Checks if there is a single target and if it has a mask + 3. If there are multiple targets (dict) returns the mask of the target + that matches the first 2 dims of the input + This is useful to be used when PredictMasked() transformation is used in + the Loader, which randomly selects some targets to be predicted and uses + Keras Masking to cascade the `_keras_mask`. By replacing input embeddings + at masked positions we avoid target leakage when training models with + Masked Language Modeling (BERT-like) + + **Note:** To support inference, the input sequence and its corresponding mask should be + extended by one position at the end to account for the next-item (`target`) position. + To do this, you should set `SequenceMaskLastInference` as a pre-layer of + `ReplaceMaskedEmbeddings()` using the sequential-block: + ```mm.SequentialBlock([mm.SequenceMaskLastInference(), mm.ReplaceMaskedEmbeddings()])``` """ def __init__(self, **kwargs): @@ -690,8 +696,6 @@ def call( self, inputs: Union[tf.Tensor, tf.RaggedTensor], targets: Optional[Union[tf.Tensor, tf.RaggedTensor, TabularData]] = None, - training: bool = False, - testing: bool = False, ) -> Union[tf.Tensor, tf.RaggedTensor]: """If the sequence of input embeddings or the corresponding sequential targets is masked (with `tensor._keras_mask` defined), @@ -705,17 +709,11 @@ def call( targets : Union[tf.Tensor, tf.RaggedTensor, TabularData], optional The target values, from which the mask can be extracted if targets inputs._keras_mask is defined. - training : bool, optional - Flag that indicates whether in training mode, by default True - testing : bool, optional - Flag that indicates whether in evaluation mode, by default True Returns ------- Union[tf.Tensor, tf.RaggedTensor] If training, returns a tensor with the masked inputs replaced by the dummy embedding """ - if not training and not testing: - return inputs outputs = inputs # Infers the mask from the inputs or targets mask = self._infer_mask_from_inputs_or_targets(inputs, targets) diff --git a/tests/unit/tf/transformers/test_block.py b/tests/unit/tf/transformers/test_block.py index 40c322283e..f95a6b88fa 100644 --- a/tests/unit/tf/transformers/test_block.py +++ b/tests/unit/tf/transformers/test_block.py @@ -15,7 +15,6 @@ RobertaBlock, XLNetBlock, ) -from merlin.models.tf.transforms.sequence import SequenceMaskLastInference from merlin.models.tf.utils import testing_utils from merlin.schema import Tags @@ -286,7 +285,7 @@ def test_transformer_with_masked_language_modeling(sequence_testing_data: Datase d_model=48, n_head=8, n_layer=2, - pre=mm.SequentialBlock([SequenceMaskLastInference(), mm.ReplaceMaskedEmbeddings()]), + pre=mm.SequentialBlock([mm.SequenceMaskLastInference(), mm.ReplaceMaskedEmbeddings()]), post="inference_hidden_state", ), mm.CategoricalOutput( From 2cd8fd2107e6cd88c59a7327f6a58fb124621b78 Mon Sep 17 00:00:00 2001 From: Sara Rabhi Date: Tue, 13 Dec 2022 11:14:40 -0500 Subject: [PATCH 4/8] Apply suggestions from code review Co-authored-by: Gabriel Moreira --- merlin/models/tf/transformers/transforms.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/merlin/models/tf/transformers/transforms.py b/merlin/models/tf/transformers/transforms.py index 7f4643cc53..51260e1477 100644 --- a/merlin/models/tf/transformers/transforms.py +++ b/merlin/models/tf/transformers/transforms.py @@ -47,7 +47,7 @@ def call(self, inputs: TFBaseModelOutputWithPoolingAndCrossAttentions): @Block.registry.register("inference_hidden_state") @tf.keras.utils.register_keras_serializable(package="merlin.models") -class InferenceHiddenState(Layer): +class TransformerInferenceHiddenState(Layer): """A post-processing layer to select the hidden state of the next-item position, during inference. """ @@ -62,7 +62,7 @@ def call( training: bool = False, testing: bool = False, ): - """Select the hidden state of the target position, during inference. + """Select the hidden state of the target (last) position, during inference. During training or testing, the inputs are returned without any processing. From 4e50acf31f31422f398e5909f16f5821f17cd4e0 Mon Sep 17 00:00:00 2001 From: edknv <109497216+edknv@users.noreply.github.com> Date: Fri, 9 Dec 2022 01:17:03 -0800 Subject: [PATCH 5/8] Use merlin-dataloader package (#845) * Use merlin-dataloader package * remove torch.dataset in favor of merlin.loader.torch * update dressipi notebook * minor clean up * Completely removes models DataLoader * Installs merlin-dataloader in github actions * Adds back the stop method * dataloader can produce sparse tensors using value counts * remove data files * fix torch tests * add missing target to dlrm test * use loader.peek() * add some comments to help understand horovod tests * make sparse tensors optional * cleanup * fix spelling * fix merge * replace while loop with for loop in horovod test * use loader context mananger * Update according to dataloader changes #80 * restore tox.ini * restore gh workflow * revert generator changes --- .github/workflows/tensorflow.yml | 2 + examples/05-Retrieval-Model.ipynb | 2 +- ...sed-next-item-prediction-for-fashion.ipynb | 4 +- merlin/datasets/synthetic.py | 17 +- merlin/models/loader/backend.py | 657 +----------------- merlin/models/loader/dataframe_iter.py | 45 -- merlin/models/loader/tf_utils.py | 116 ---- merlin/models/loader/utils.py | 69 -- merlin/models/tf/__init__.py | 3 +- merlin/models/tf/inputs/base.py | 2 +- merlin/models/tf/loader.py | 333 ++------- merlin/models/tf/models/base.py | 2 +- merlin/models/tf/transforms/features.py | 3 +- merlin/models/tf/transforms/tensor.py | 23 +- merlin/models/tf/utils/testing_utils.py | 18 +- merlin/models/torch/dataset.py | 174 ----- merlin/models/torch/utils/torch_utils.py | 2 +- requirements/base.txt | 2 +- tests/common/tf/retrieval/retrieval_utils.py | 6 +- tests/unit/datasets/test_synthetic.py | 4 +- tests/unit/tf/blocks/test_interactions.py | 3 +- tests/unit/tf/core/test_encoder.py | 16 +- tests/unit/tf/core/test_index.py | 2 + tests/unit/tf/core/test_prediction.py | 2 +- tests/unit/tf/horovod/test_horovod.py | 7 +- tests/unit/tf/inputs/test_base.py | 3 +- tests/unit/tf/inputs/test_tabular.py | 82 ++- tests/unit/tf/models/test_base.py | 10 +- tests/unit/tf/models/test_retrieval.py | 20 +- .../tf/prediction_tasks/test_multi_task.py | 4 +- tests/unit/tf/test_loader.py | 5 +- tests/unit/tf/transformers/test_block.py | 40 +- tests/unit/tf/transforms/test_features.py | 8 +- .../tf/transforms/test_negative_sampling.py | 4 +- tests/unit/tf/transforms/test_sequence.py | 12 +- tests/unit/torch/test_dataset.py | 124 ---- tox.ini | 2 + 37 files changed, 271 insertions(+), 1557 deletions(-) delete mode 100644 merlin/models/loader/dataframe_iter.py delete mode 100644 merlin/models/loader/tf_utils.py delete mode 100644 merlin/models/loader/utils.py delete mode 100644 merlin/models/torch/dataset.py delete mode 100644 tests/unit/torch/test_dataset.py diff --git a/.github/workflows/tensorflow.yml b/.github/workflows/tensorflow.yml index 78147e7d5c..95beaad6dc 100644 --- a/.github/workflows/tensorflow.yml +++ b/.github/workflows/tensorflow.yml @@ -43,6 +43,7 @@ jobs: fi pip install "pandas>=1.2.0,<1.4.0dev0" pip install "NVTabular@git+https://github.com/NVIDIA-Merlin/NVTabular.git@$branch" + pip install "merlin-dataloader@git+https://github.com/NVIDIA-Merlin/dataloader.git@$branch" pip install "merlin-core@git+https://github.com/NVIDIA-Merlin/core.git@$branch" - name: Install dependencies run: | @@ -108,6 +109,7 @@ jobs: fi pip install "pandas>=1.2.0,<1.4.0dev0" pip install "NVTabular@git+https://github.com/NVIDIA-Merlin/NVTabular.git@$branch" + pip install "merlin-dataloader@git+https://github.com/NVIDIA-Merlin/dataloader.git@$branch" pip install "merlin-core@git+https://github.com/NVIDIA-Merlin/core.git@$branch" - name: Install dependencies run: | diff --git a/examples/05-Retrieval-Model.ipynb b/examples/05-Retrieval-Model.ipynb index c8f4b7c75d..4c3c1abb03 100644 --- a/examples/05-Retrieval-Model.ipynb +++ b/examples/05-Retrieval-Model.ipynb @@ -1147,7 +1147,7 @@ } ], "source": [ - "eval_loader = mm.Loader(valid, batch_size=1024, transform=mm.ToTarget(schema, \"item_id\"))\n", + "eval_loader = mm.Loader(valid, batch_size=1024).map(mm.ToTarget(schema, \"item_id\"))\n", "\n", "metrics = topk_model.evaluate(eval_loader, return_dict=True)\n", "metrics" diff --git a/examples/usecases/ecommerce-session-based-next-item-prediction-for-fashion.ipynb b/examples/usecases/ecommerce-session-based-next-item-prediction-for-fashion.ipynb index 0dd5d1aed2..b93907c239 100644 --- a/examples/usecases/ecommerce-session-based-next-item-prediction-for-fashion.ipynb +++ b/examples/usecases/ecommerce-session-based-next-item-prediction-for-fashion.ipynb @@ -968,8 +968,8 @@ "metadata": {}, "outputs": [], "source": [ - "loader = mm.Loader(train, batch_size=BATCH_SIZE, transform=mm.ToTarget(train.schema, \"purchase_id_first\", one_hot=True), shuffle = False)\n", - "val_loader = mm.Loader(valid, batch_size=BATCH_SIZE, transform=mm.ToTarget(train.schema, \"purchase_id_first\", one_hot=True), shuffle=False)" + "loader = mm.Loader(train, batch_size=BATCH_SIZE, shuffle=False).map(mm.ToTarget(train.schema, \"purchase_id_first\", one_hot=True))\n", + "val_loader = mm.Loader(valid, batch_size=BATCH_SIZE, shuffle=False).map(mm.ToTarget(train.schema, \"purchase_id_first\", one_hot=True))" ] }, { diff --git a/merlin/datasets/synthetic.py b/merlin/datasets/synthetic.py index fb99dbe9c9..e7e61d2392 100644 --- a/merlin/datasets/synthetic.py +++ b/merlin/datasets/synthetic.py @@ -25,7 +25,7 @@ import merlin.io from merlin.models.utils import schema_utils -from merlin.schema import Schema, Tags +from merlin.schema import ColumnSchema, Schema, Tags from merlin.schema.io.tensorflow_metadata import TensorflowMetadata LOG = logging.getLogger("merlin-models") @@ -116,6 +116,21 @@ def generate_data( else: raise ValueError(f"Unknown input type: {type(input)}") + for col in schema.column_names: + if not schema[col].is_list: + continue + new_properties = schema[col].properties + new_properties["value_count"] = {"min": min_session_length} + if max_session_length: + new_properties["value_count"]["max"] = max_session_length + schema[col] = ColumnSchema( + name=schema[col].name, + tags=schema[col].tags, + properties=new_properties, + dtype=schema[col].dtype, + is_list=True, + ) + df = generate_user_item_interactions( schema, num_rows, min_session_length, max_session_length, device=device ) diff --git a/merlin/models/loader/backend.py b/merlin/models/loader/backend.py index 860fe6a505..a27bca7f85 100644 --- a/merlin/models/loader/backend.py +++ b/merlin/models/loader/backend.py @@ -13,628 +13,39 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import copy -import math -import queue -import threading -import warnings -from collections import OrderedDict - -import numpy as np - -try: - import cupy as cp -except ImportError: - cp = np - -from merlin.core.dispatch import ( - HAS_GPU, - annotate, - concat, - generate_local_seed, - is_list_dtype, - make_df, - pull_apart_list, -) -from merlin.io.shuffle import shuffle_df -from merlin.models.loader.dataframe_iter import DataFrameIter -from merlin.schema import Tags - - -def _num_steps(num_samples, step_size): - return math.ceil(num_samples / step_size) - - -class ChunkQueue: - """This class takes partitions (parts) from an NVTabular dataset - and concatenates them into a cudf dataframe "chunk". This chunk - is subsequently transformed into its tensor representation using - the iterator's transform. - - Parameters - ----------- - qsize: int - Max number of elements to hold in the buffer at once - num_parts : int - number of partitions from the iterator, an NVTabular Dataset to concatenate into a "chunk" - shuffle : bool - enable/disable chunk-level shuffling - put_wait: float - amount of timeout to wait for a full queue to open up - before checking for errors and trying again - """ - - def __init__(self, dataloader, qsize, num_parts=1, shuffle=False, put_wait=1e-6, epochs=1): - self.num_parts = num_parts - self.shuffle = shuffle - self.put_wait = put_wait - self.q_out = queue.Queue(qsize) - self._stop_event = threading.Event() - self.itr = dataloader._data_iter(epochs) - self.dataloader = dataloader - - def __len__(self): - return len(self.itr) - - @property - def stopped(self): - return self._stop_event.is_set() - - @property - def empty(self): - return self.q_out.empty() - - def get(self): - return self.q_out.get() - - def put(self, packet): - while True: - if self.stopped: - return True - - try: - self.q_out.put(packet, timeout=self.put_wait) - return False - except queue.Full: - continue - - @annotate("batch", color="darkgreen", domain="nvt_python") - def batch(self, itr): - """ - iterates through gpu_mem_frac size chunks of dataset - and concatenates every `num_parts` of them. - """ - current = [] - while True: - try: - value = next(itr) - except StopIteration: - if len(current) > 0: - yield current - break - - current.append(value) - if len(current) == self.num_parts: - yield current - current = [] - - @annotate("chunk_logic", color="darkgreen", domain="nvt_python") - def chunk_logic(self, itr): - spill = None - for chunks in self.batch(itr): - if self.stopped: - return - - if spill is not None and not spill.empty: - chunks.insert(0, spill) - - chunks = concat(chunks) - chunks.reset_index(drop=True, inplace=True) - chunks, spill = self.get_batch_div_chunk(chunks, self.dataloader.batch_size) - if self.shuffle: - chunks = shuffle_df(chunks) - - if len(chunks) > 0: - chunks = self.dataloader.make_tensors(chunks, self.dataloader._use_nnz) - # put returns True if buffer is stopped before - # packet can be put in queue. Keeps us from - # freezing on a put on a full queue - if self.put(chunks): - return - chunks = None - # takes care final batch, which is less than batch size - if not self.dataloader.drop_last and spill is not None and not spill.empty: - spill = self.dataloader.make_tensors(spill, self.dataloader._use_nnz) - self.put(spill) - - @annotate("load_chunks", color="darkgreen", domain="nvt_python") - def load_chunks(self, dev): - try: - itr = iter(self.itr) - if self.dataloader.device != "cpu": - with self.dataloader._get_device_ctx(dev): - self.chunk_logic(itr) - else: - self.chunk_logic(itr) - except Exception as e: # pylint: disable=broad-except - self.put(e) - - # For when an iterator is stopped before iteration is complete. - def stop(self): - self._stop_event.set() - # TODO: should we be clearing? I can imagine a world where - # you want the thread to stop but still want to grab - # data out of the buffer - self.q_out.queue.clear() - - def start(self): - self._stop_event.clear() - - def get_batch_div_chunk(self, chunks, batch_size): - # TODO: is there a way to do this using cupy? - spill_idx = int(chunks.shape[0] / batch_size) * batch_size - spill = make_df(chunks.iloc[spill_idx:]) - chunks = make_df(chunks.iloc[:spill_idx]) - if not chunks.empty: - chunks.reset_index(drop=True, inplace=True) - if not spill.empty: - spill.reset_index(drop=True, inplace=True) - return chunks, spill - - -def _get_dataset_schema(dataset): - return dataset.schema if hasattr(dataset, "schema") else None - - -# TODO: implement as metaclass and assign methods to children -# to avoid having to do Dataset. calls? -class DataLoader: - _use_nnz = False - - def __init__( - self, - dataset, - batch_size, - shuffle, - cat_names=None, - cont_names=None, - label_names=None, - seed_fn=None, - parts_per_chunk=1, - device=None, - global_size=None, - global_rank=None, - drop_last=False, - sparse_names=None, - sparse_max=None, - sparse_as_dense=False, - ): - self.data = dataset - self.schema = _get_dataset_schema(dataset) - # self.data is ddf format - self.indices = cp.arange(self.data.npartitions) - self.drop_last = drop_last - self.device = (device or 0) if HAS_GPU else "cpu" - self.sparse_names = sparse_names or [] - self.sparse_max = sparse_max or {} - self.sparse_as_dense = sparse_as_dense - self.global_size = global_size or 1 - self.global_rank = global_rank or 0 - self._epochs = 1 - - self.cat_names = cat_names or ( - self.schema.select_by_tag(Tags.CATEGORICAL).excluding_by_tag(Tags.TARGET).column_names - if self.schema - else [] - ) - self.cont_names = cont_names or ( - self.schema.select_by_tag(Tags.CONTINUOUS).excluding_by_tag(Tags.TARGET).column_names - if self.schema - else [] - ) - self.label_names = label_names or ( - self.schema.select_by_tag(Tags.TARGET).column_names if self.schema else [] +from merlin.schema import ColumnSchema, Tags + + +def _augment_schema( + schema, + cats=None, + conts=None, + labels=None, + sparse_names=None, + sparse_max=None, + sparse_as_dense=False, +): + labels = [labels] if isinstance(labels, str) else labels + for label in labels or []: + schema[label] = schema[label].with_tags(Tags.TARGET) + for label in cats or []: + schema[label] = schema[label].with_tags(Tags.CATEGORICAL) + for label in conts or []: + schema[label] = schema[label].with_tags(Tags.CONTINUOUS) + + # Set the appropriate properties for the sparse_names/sparse_max/sparse_as_dense + for col in sparse_names or []: + cs = schema[col] + properties = cs.properties + if sparse_max and col in sparse_max: + properties["value_count"] = {"max": sparse_max[col]} + schema[col] = ColumnSchema( + name=cs.name, + tags=cs.tags, + dtype=cs.dtype, + is_list=True, + is_ragged=not sparse_as_dense, + properties=properties, ) - if not self.cat_names and not self.cont_names: - raise ValueError( - "Neither Categorical or Continuous columns were found by the dataloader. " - "You must either specify the cat_names, cont_names and " - "label_names properties or supply a schema.pbtxt file in dataset directory." - ) - - self.batch_size = batch_size - self.shuffle = shuffle - self.seed_fn = seed_fn - - self.num_rows_processed = 0 - - self.parts_per_chunk = parts_per_chunk - self.shuffle = shuffle - self.__buff = None - self.__buff_len = None - self._batch_itr = None - self._workers = None - - @property - def _buff(self): - if self.__buff is None: - # we set size of chunk queue to 1 we only want one chunk in queue at a time. - self.__buff = ChunkQueue( - self, 1, num_parts=self.parts_per_chunk, shuffle=self.shuffle, epochs=self._epochs - ) - return self.__buff - - @property - def _buff_len(self): - if self.__buff_len is None: - # run once instead of every time len called - self.__buff_len = len(self._buff) - return self.__buff_len - - def epochs(self, epochs=1): - if epochs == self._epochs: - return self - new_dataloader = copy.copy(self) - new_dataloader._set_epochs(epochs) - return new_dataloader - - def _set_epochs(self, epochs): - self.stop() - self.__buff = None - self.__buff_len = None - self._epochs = epochs - - def __len__(self): - batches = _num_steps(self._buff_len, self.batch_size) - if self.drop_last and self._buff_len % self.batch_size > 0: - batches = batches - 1 - return batches - - @property - def _working(self): - if self._workers is not None: - return any(t.is_alive() for t in self._workers) - return False - - def stop(self): - # TODO: raise warning or even error if condition - # isn't met? - if self._workers is not None: - if not self._buff.stopped: - self._buff.stop() - for t in self._workers: - t.join() - # remove joined threads from list - self._workers = None - self._buff.q_out.queue.clear() - self._batch_itr = None - - def _gather_indices_for_dev(self, dev): - # this should be self.indices divided by total processes, global set - if len(self.indices) < self.global_size: - warnings.warn( - f"""You have more processes({self.global_size}) than dataset - partitions({len(self.indices)}), reduce the number of processes.""" - ) - raise IndexError - per_worker = _num_steps(len(self.indices), self.global_size) - # identify process rank out of all processes (not local rank) - start = self.global_rank * per_worker - return self.indices[start : start + per_worker].tolist() - - @annotate("_shuffle_indices", color="darkgreen", domain="nvt_python") - def _shuffle_indices(self): - generate_local_seed(self.global_rank, self.global_size) - if self.seed_fn: - new_seed = self.seed_fn() - cp.random.seed(new_seed) - cp.random.shuffle(self.indices) - generate_local_seed(self.global_rank, self.global_size) - - def __iter__(self): - self.stop() - self.num_rows_processed = 0 - if self._buff.stopped: - self._buff.start() - - # shuffle partition indices to bring disparate - # parts of the dataset "close" to one another - if self.shuffle: - self._shuffle_indices() - - # build and start new threads for loading and - # concatenating data - self._workers = [] - t = threading.Thread(target=self._buff.load_chunks, args=(self.device,)) - t.daemon = True - t.start() - self._workers.append(t) - return self - - def __next__(self): - return self._get_next_batch() - - def _data_iter(self, epochs): - indices = self._gather_indices_for_dev(0) - if hasattr(self.data, "to_iter"): - return self.data.to_iter(indices=indices, epochs=epochs) - return DataFrameIter(self.data, epochs=epochs) - - def _fetch_chunk(self): - chunks = self._buff.get() - if isinstance(chunks, Exception): - self.stop() - raise chunks - self._batch_itr = iter(chunks) - - def _get_next_batch(self): - """ - adding this cheap shim so that we can call this - step without it getting overridden by the - framework-specific parent class's `__next__` method. - TODO: can this be better solved with a metaclass - implementation? My gut is that we don't actually - necessarily *want*, in general, to be overriding - __next__ and __iter__ methods - """ - # we've never initialized, do that now - # need this because tf.keras.Model.fit will - # call next() cold - if self._workers is None: - DataLoader.__iter__(self) - - # get the first chunks - if self._batch_itr is None: - self._fetch_chunk() - - # try to iterate through existing batches - try: - batch = next(self._batch_itr) - except StopIteration: - # anticipate any more chunks getting created - # if not, raise the StopIteration - if not self._working and self._buff.empty: - self._workers = None - self._batch_itr = None - raise - - # otherwise get the next chunks and return - # the first batch - self._fetch_chunk() - batch = next(self._batch_itr) - # if batch[0] is empty but other exist - for sub in batch: - if sub is not None and len(sub) > 0: - self.num_rows_processed += len(sub) - break - return batch - - @annotate("make_tensors", color="darkgreen", domain="nvt_python") - def make_tensors(self, gdf, use_nnz=False): - split_idx = self._get_segment_lengths(len(gdf)) - - # map from big chunk to framework-specific tensors - chunks = self._create_tensors(gdf) - - # if we have any offsets, calculate nnzs up front - if len(chunks) == 4: - offsets = chunks[-1] - if use_nnz: - nnzs = offsets[1:] - offsets[:-1] - chunks = chunks[:-1] - - # split them into batches and map to the framework-specific output format - batches = [[] for _ in range(len(split_idx))] - offset_idx = 0 - for chunk in chunks: - lists = None - if isinstance(chunk, tuple): - chunk, lists = chunk - - if len(split_idx) > 1 and chunk is not None: - chunk = self._split_fn(chunk, split_idx) - else: - chunk = [chunk for _ in split_idx] - - if lists is not None: - num_list_columns = len(lists) - - # grab the set of offsets and nnzs corresponding to - # the list columns from this chunk - chunk_offsets = offsets[:, offset_idx : offset_idx + num_list_columns] - if use_nnz: - chunk_nnzs = nnzs[:, offset_idx : offset_idx + num_list_columns] - offset_idx += num_list_columns - - # split them into batches, including an extra 1 on the offsets - # so we know how long the very last element is - batch_offsets = self._split_fn(chunk_offsets, split_idx + [1]) - if use_nnz and len(split_idx) > 1: - batch_nnzs = self._split_fn(chunk_nnzs, split_idx) - elif use_nnz: - batch_nnzs = [chunk_nnzs] - else: - batch_nnzs = [None] * (len(batch_offsets) - 1) - - # group all these indices together and iterate through - # them in batches to grab the proper elements from each - # values tensor - chunk = zip(chunk, batch_offsets[:-1], batch_offsets[1:], batch_nnzs) - - for n, c in enumerate(chunk): - if isinstance(c, tuple): - c, off0s, off1s, _nnzs = c - offsets_split_idx = [1 for _ in range(num_list_columns)] - off0s = self._split_fn(off0s, offsets_split_idx, axis=1) - off1s = self._split_fn(off1s, offsets_split_idx, axis=1) - if use_nnz: - _nnzs = self._split_fn(_nnzs, offsets_split_idx, axis=1) - - # TODO: does this need to be ordereddict? - batch_lists = {} - for k, (column_name, values) in enumerate(lists.items()): - off0, off1 = off0s[k], off1s[k] - if use_nnz: - nnz = _nnzs[k] - - # need to grab scalars for TF case - if len(off0.shape) == 1: - start, stop = off0[0], off1[0] - elif len(off0.shape) == 2: - start, stop = off0[0, 0], off1[0, 0] - else: - print(off0, off1) - raise ValueError - value = values[int(start) : int(stop)] - index = off0 - start if not use_nnz else nnz - batch_lists[column_name] = (value, index) - c = (c, batch_lists) - - batches[n].append(c) - return (self._handle_tensors(*batch) for batch in batches) - - def _get_segment_lengths(self, num_samples): - """ - Helper function to build indices to pass - to .split functions for breaking - up into batches - """ - num_full_batches = _num_steps(num_samples, self.batch_size) - 1 - idx = [self.batch_size for _ in range(num_full_batches)] - idx.append(num_samples - num_full_batches * self.batch_size) - return idx - - def _to_sparse_tensor(self, values_offset, column_name): - """ - Create a sparse representation of the input tensor. - values_offset is either a tensor or a tuple of tensor, offset. - """ - seq_limit = self.sparse_max[column_name] - values, offsets, diff_offsets, num_rows = self._pull_values_offsets(values_offset) - max_seq_len = self._get_max_seq_len(diff_offsets) - if max_seq_len > seq_limit: - raise ValueError( - "The default sequence length has been configured " - + f"to {seq_limit} but the " - + f"largest sequence in this batch have {max_seq_len} length" - ) - return self._build_sparse_tensor(values, offsets, diff_offsets, num_rows, seq_limit) - - def _to_tensor(self, gdf, dtype=None): - """ - One of the mandatory functions a child class needs - to implement. Maps from a cudf DataFrame to a - tensor in the appropriate library, with an optional - dtype kwarg to do explicit casting if need be - """ - raise NotImplementedError - - def _get_device_ctx(self, dev): - """ - One of the mandatory functions a child class needs - to implement. Maps from a GPU index to a framework - context object for placing tensors on specific GPUs - """ - raise NotImplementedError - - def _split_fn(self, tensor, idx, axis=0): - raise NotImplementedError - - @property - def _LONG_DTYPE(self): - raise NotImplementedError - - @property - def _FLOAT32_DTYPE(self): - raise NotImplementedError - - def _separate_list_columns(self, gdf): - lists, scalars = [], [] - for col in gdf.columns: - if is_list_dtype(gdf[col]): - lists.append(col) - else: - scalars.append(col) - return scalars, lists - - @annotate("_create_tensors", color="darkgreen", domain="nvt_python") - def _create_tensors(self, gdf): - """ - Breaks a dataframe down into the relevant - categorical, continuous, and label tensors. - Can be overrideen - """ - workflow_nodes = (self.cat_names, self.cont_names, self.label_names) - dtypes = (self._LONG_DTYPE, self._FLOAT32_DTYPE, self._FLOAT32_DTYPE) - tensors = [] - offsets = make_df(device=self.device) - for column_names, dtype in zip(workflow_nodes, dtypes): - if len(column_names) == 0: - tensors.append(None) - continue - if hasattr(column_names, "column_names"): - column_names = column_names.column_names - - gdf_i = gdf[column_names] - gdf.drop(columns=column_names, inplace=True) - - scalars, lists = self._separate_list_columns(gdf_i) - - x = None - if scalars: - # should always return dict column_name: values, offsets (optional) - x = self._to_tensor(gdf_i[scalars], dtype) - if lists: - list_tensors = OrderedDict() - for column_name in lists: - column = gdf_i.pop(column_name) - leaves, col_offsets = pull_apart_list(column) - if isinstance(leaves[0], list): - - leaves, nest_offsets = pull_apart_list(leaves) - col_offsets = nest_offsets.iloc[col_offsets[:]] - offsets[column_name] = col_offsets.reset_index(drop=True) - list_tensors[column_name] = self._to_tensor(leaves, dtype) - x = x, list_tensors - tensors.append(x) - - if not offsets.empty: - offsets_tensor = self._to_tensor(offsets, self._LONG_DTYPE) - if len(offsets_tensor.shape) == 1: - offsets_tensor = offsets_tensor[:, None] - tensors.append(offsets_tensor) - del gdf, offsets - - return tensors - - @annotate("_handle_tensors", color="darkgreen", domain="nvt_python") - def _handle_tensors(self, cats, conts, labels): - X = {} - for tensor, names in zip([cats, conts], [self.cat_names, self.cont_names]): - lists = {} - if isinstance(tensor, tuple): - tensor, lists = tensor - names = [i for i in names if i not in lists] - - # now add in any scalar tensors - if len(names) > 1: - tensors = self._tensor_split(tensor, len(names), axis=1) - lists.update(zip(names, tensors)) - elif len(names) == 1: - lists[names[0]] = tensor - X.update(lists) - - for column_name in X: - if column_name in self.sparse_names: - if column_name not in self.sparse_max: - raise ValueError( - f"Did not convert {column_name} to sparse due to missing sparse_max entry" - ) - X[column_name] = self._to_sparse_tensor(X[column_name], column_name) - - # TODO: use dict for labels as well? - # would require output layers to match naming - if len(self.label_names) > 1: - labels = self._tensor_split(labels, len(self.label_names), axis=1) - return X, labels + return schema diff --git a/merlin/models/loader/dataframe_iter.py b/merlin/models/loader/dataframe_iter.py deleted file mode 100644 index 817883fe76..0000000000 --- a/merlin/models/loader/dataframe_iter.py +++ /dev/null @@ -1,45 +0,0 @@ -# -# Copyright (c) 2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -class DataFrameIter: - def __init__(self, ddf, columns=None, indices=None, partition_lens=None, epochs=1): - self.indices = indices if isinstance(indices, list) else range(ddf.npartitions) - self._ddf = ddf - self.columns = columns - self.partition_lens = partition_lens - self.epochs = epochs - - def __len__(self): - if self.partition_lens: - # Use metadata-based partition-size information - # if/when it is available. Note that this metadata - # will not be correct if rows where added or dropped - # after IO (within Ops). - return sum(self.partition_lens[i] for i in self.indices) * self.epochs - if len(self.indices) < self._ddf.npartitions: - return len(self._ddf.partitions[self.indices]) * self.epochs - return len(self._ddf) * self.epochs - - def __iter__(self): - for epoch in range(self.epochs): - for i in self.indices: - part = self._ddf.get_partition(i) - if self.columns: - yield part[self.columns].compute(scheduler="synchronous") - else: - yield part.compute(scheduler="synchronous") - part = None diff --git a/merlin/models/loader/tf_utils.py b/merlin/models/loader/tf_utils.py deleted file mode 100644 index 01225370bc..0000000000 --- a/merlin/models/loader/tf_utils.py +++ /dev/null @@ -1,116 +0,0 @@ -# -# Copyright (c) 2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -import warnings - -import tensorflow as tf -from packaging import version -from tensorflow.python.feature_column import feature_column_v2 as fc - -from merlin.core.dispatch import HAS_GPU -from merlin.models.loader.utils import device_mem_size - - -def configure_tensorflow(memory_allocation=None, device=None): - total_gpu_mem_mb = device_mem_size(kind="total", cpu=(not HAS_GPU)) / (1024**2) - - if memory_allocation is None: - memory_allocation = os.environ.get("TF_MEMORY_ALLOCATION", 0.5) - - if float(memory_allocation) < 1: - memory_allocation = total_gpu_mem_mb * float(memory_allocation) - memory_allocation = int(memory_allocation) - assert memory_allocation < total_gpu_mem_mb - - # TODO: what will this look like in any sort - # of distributed set up? - if device is None: - device = int(os.environ.get("TF_VISIBLE_DEVICE", 0)) - tf_devices = tf.config.list_physical_devices("GPU") - if HAS_GPU and len(tf_devices) == 0: - raise ImportError("TensorFlow is not configured for GPU") - if HAS_GPU: - try: - tf.config.set_logical_device_configuration( - tf_devices[device], - [tf.config.LogicalDeviceConfiguration(memory_limit=memory_allocation)], - ) - except RuntimeError: - warnings.warn( - "TensorFlow runtime already initialized, may not be enough memory for cudf" - ) - try: - tf.config.experimental.set_virtual_device_configuration( - tf_devices[device], - [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=memory_allocation)], - ) - except RuntimeError as e: - # Virtual devices must be set before GPUs have been initialized - warnings.warn(e) - - # versions using TF earlier than 2.3.0 need to use extension - # library for dlpack support to avoid memory leak issue - __TF_DLPACK_STABLE_VERSION = "2.3.0" - if version.parse(tf.__version__) < version.parse(__TF_DLPACK_STABLE_VERSION): - try: - from tfdlpack import from_dlpack - except ModuleNotFoundError as e: - message = "If using TensorFlow < 2.3.0, you must install tfdlpack-gpu extension library" - raise ModuleNotFoundError(message) from e - - else: - from tensorflow.experimental.dlpack import from_dlpack - - return from_dlpack - - -def _get_parents(column): - """ - recursive function for finding the feature columns - that supply inputs for a given `column`. If there are - none, returns the column. Uses sets so is not - deterministic. - """ - if isinstance(column.parents[0], str): - return set([column]) - parents = set() - for parent in column.parents: - parents |= _get_parents(parent) - return parents - - -def get_dataset_schema_from_feature_columns(feature_columns): - """ - maps from a list of TensorFlow `feature_column`s to - lists giving the categorical and continuous feature - names for a dataset. Useful for constructing NVTabular - Workflows from feature columns - """ - base_columns = set() - for column in feature_columns: - base_columns |= _get_parents(column) - - cat_names, cont_names = [], [] - for column in base_columns: - if isinstance(column, fc.CategoricalColumn): - cat_names.append(column.name) - else: - cont_names.append(column.name) - - cat_names = sorted(cat_names) - cont_names = sorted(cont_names) - return cat_names, cont_names diff --git a/merlin/models/loader/utils.py b/merlin/models/loader/utils.py deleted file mode 100644 index a04a4a8cef..0000000000 --- a/merlin/models/loader/utils.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# Copyright (c) 2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import warnings - -try: - from numba import cuda -except ImportError: - cuda = None - -try: - import psutil -except ImportError: - psutil = None - - -def _pynvml_mem_size(kind="total", index=0): - import pynvml - - pynvml.nvmlInit() - size = None - if kind == "free": - size = int(pynvml.nvmlDeviceGetMemoryInfo(pynvml.nvmlDeviceGetHandleByIndex(index)).free) - elif kind == "total": - size = int(pynvml.nvmlDeviceGetMemoryInfo(pynvml.nvmlDeviceGetHandleByIndex(index)).total) - else: - raise ValueError("{0} not a supported option for device_mem_size.".format(kind)) - pynvml.nvmlShutdown() - return size - - -def device_mem_size(kind="total", cpu=False): - - # Use psutil (if available) for cpu mode - if cpu and psutil: - if kind == "total": - return psutil.virtual_memory().total - elif kind == "free": - return psutil.virtual_memory().free - elif cpu: - warnings.warn("Please install psutil for full cpu=True support.") - # Assume 1GB of memory - return int(1e9) - - if kind not in ["free", "total"]: - raise ValueError("{0} not a supported option for device_mem_size.".format(kind)) - try: - if kind == "free": - return int(cuda.current_context().get_memory_info()[0]) - else: - return int(cuda.current_context().get_memory_info()[1]) - except NotImplementedError: - if kind == "free": - # Not using NVML "free" memory, because it will not include RMM-managed memory - warnings.warn("get_memory_info is not supported. Using total device memory from NVML.") - size = _pynvml_mem_size(kind="total", index=0) - return size diff --git a/merlin/models/tf/__init__.py b/merlin/models/tf/__init__.py index 71033c6edf..cf8ecbd1df 100644 --- a/merlin/models/tf/__init__.py +++ b/merlin/models/tf/__init__.py @@ -17,7 +17,7 @@ # flake8: noqa # Must happen before any importing of tensorflow to curtail mem usage -from merlin.models.loader.tf_utils import configure_tensorflow +from merlin.dataloader.tf_utils import configure_tensorflow from merlin.models.tf.core.index import IndexBlock, TopKIndexBlock from merlin.models.tf.core.tabular import AsTabular, Filter, TabularBlock @@ -29,7 +29,6 @@ from tensorflow.keras.optimizers import Optimizer from tensorflow.python.training.tracking.data_structures import ListWrapper, _DictWrapper -from merlin.models.loader.tf_utils import configure_tensorflow from merlin.models.tf.blocks.cross import CrossBlock from merlin.models.tf.blocks.dlrm import DLRMBlock from merlin.models.tf.blocks.experts import CGCBlock, MMOEBlock, MMOEGate diff --git a/merlin/models/tf/inputs/base.py b/merlin/models/tf/inputs/base.py index 35f07c796a..ee841718f6 100644 --- a/merlin/models/tf/inputs/base.py +++ b/merlin/models/tf/inputs/base.py @@ -94,7 +94,7 @@ def InputBlock( Tags to filter the continuous features Defaults to (Tags.CONTINUOUS,) continuous_projection: Optional[Block] - If set, concatenate all numerical features and projet using the + If set, concatenate all numerical features and project using the specified Block. Defaults to None add_embedding_branch: bool diff --git a/merlin/models/tf/loader.py b/merlin/models/tf/loader.py index 771cd339ba..190cc3bcd2 100644 --- a/merlin/models/tf/loader.py +++ b/merlin/models/tf/loader.py @@ -13,36 +13,25 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import contextlib import logging import os -from typing import Protocol +from typing import Optional, Protocol, Union import dask.dataframe as dd import numpy as np import tensorflow as tf -from packaging import version +import merlin.dataloader.tensorflow from merlin.core.dispatch import HAS_GPU +from merlin.dataloader.tf_utils import get_dataset_schema_from_feature_columns from merlin.io import Dataset -from merlin.models.loader.backend import DataLoader -from merlin.models.loader.tf_utils import get_dataset_schema_from_feature_columns +from merlin.models.loader.backend import _augment_schema from merlin.models.tf.distributed.backend import hvd, hvd_installed from merlin.models.utils.schema_utils import select_targets from merlin.schema import Schema, Tags LOG = logging.getLogger("merlin.models") -if version.parse(tf.__version__) < version.parse("2.3.0"): - try: - from tfdlpack import from_dlpack - except ModuleNotFoundError as e: - message = "If using TensorFlow < 2.3.0, you must install tfdlpack-gpu extension library" - raise ModuleNotFoundError(message) from e - -else: - from tensorflow.experimental.dlpack import from_dlpack - # pylint has issues with TF array ops, so disable checks until fixed: # https://github.com/PyCQA/pylint/issues/3613 # pylint: disable=no-value-for-parameter,unexpected-keyword-arg,redundant-keyword-arg @@ -143,7 +132,7 @@ def compute_output_schema(self, schema: Schema): ... -class Loader(tf.keras.utils.Sequence, DataLoader): +class Loader(merlin.dataloader.tensorflow.Loader): """ Override class to customize data loading for backward compatibility with older NVTabular releases. @@ -257,8 +246,6 @@ class Loader(tf.keras.utils.Sequence, DataLoader): bool value to activate transforming sparse tensors to dense """ - _use_nnz = True - def __init__( self, paths_or_dataset, @@ -289,293 +276,89 @@ def __init__( ) if schema: dataset.schema = schema + + cat_names = cat_names or ( + dataset.schema.select_by_tag(Tags.CATEGORICAL) + .excluding_by_tag(Tags.TARGET) + .column_names + if _get_schema(dataset) + else [] + ) + cont_names = cont_names or ( + dataset.schema.select_by_tag(Tags.CONTINUOUS).excluding_by_tag(Tags.TARGET).column_names + if _get_schema(dataset) + else [] + ) + label_names = label_names or ( + dataset.schema.select_by_tag(Tags.TARGET).column_names if _get_schema(dataset) else [] + ) + cat_names, cont_names, label_names = _validate_schema( feature_columns, cat_names, cont_names, label_names, schema=dataset.schema ) + dataset.schema = _augment_schema( + dataset.schema, + cat_names, + cont_names, + label_names, + sparse_names, + sparse_max, + sparse_as_dense, + ) + device = "cpu" if not HAS_GPU else device if hvd_installed and hvd.size() > 1: device = hvd.local_rank() global_size = global_size or hvd.size() global_rank = global_rank or hvd.rank() seed_fn = seed_fn or get_default_hvd_seed_fn() - DataLoader.__init__( - self, + + super().__init__( dataset, batch_size, - shuffle, - cat_names=cat_names, - cont_names=cont_names, - label_names=label_names, + shuffle=shuffle, seed_fn=seed_fn, parts_per_chunk=parts_per_chunk, device=device, global_size=global_size, global_rank=global_rank, drop_last=drop_last, - sparse_names=sparse_names, - sparse_max=sparse_max, - sparse_as_dense=sparse_as_dense, ) - self._transforms = [("all", transform)] if transform else [] - self.multi_label_as_dict = multi_label_as_dict - - def __len__(self): - """ - recreating since otherwise Keras yells at you - """ - # TODO: what's a better way to do this inheritance - # of the appropriate methods? A Metaclass? - DataLoader.stop(self) - return DataLoader.__len__(self) - - def on_epoch_end(self): - """Method to call at the end of every epoch.""" - DataLoader.stop(self) - - def __getitem__(self, idx): - """ - implemented exclusively for consistency - with Keras model.fit. Does not leverage - passed idx in any way - """ - return DataLoader.__next__(self) - - def map(self, fn) -> "Loader": - """ - Applying a function to each batch. - - This can for instance be used to add `sample_weight` to the model. - """ - self._transforms.append(("all", fn)) - - return self - - def map_features(self, fn) -> "Loader": - def wrapped_fn(*inputs): - features = fn(inputs[0]) - - return features, *inputs[1:] - - self._transforms.append(("features", wrapped_fn)) - - return self - - def map_targets(self, fn) -> "Loader": - def wrapped_fn(*inputs): - targets = fn(inputs[1]) - - if len(inputs) > 2: - return inputs[0], targets, *inputs[2:] - - return inputs[0], targets - - self._transforms.append(("targets", wrapped_fn)) - - return self - - @contextlib.contextmanager - def _get_device_ctx(self, dev): - # with tf.device("/device:GPU:{}".format(dev)) as tf_device: - # # tf.device changes the cupy cuda device, which breaks us on multigpu - # # force cupy to still use the device we expect - # cupy.cuda.Device(dev).use() - # yield tf_device - # commenting out since device statements cause - # RuntimeErrors when exiting if two dataloaders - # are running at once (e.g. train and validation) - if dev != "cpu": - yield tf.device("/GPU:" + str(dev)) - else: - # https://www.tensorflow.org/guide/gpu#manual_device_placement - yield tf.device("/device:CPU:0") - - def _split_fn(self, tensor, idx, axis=0): - return tf.split(tensor, idx, axis=axis) - def _tensor_split(self, tensor, idx, axis=0): - """ - Same function as above but need this method - for api match. - """ - return tf.split(tensor, idx, axis=axis) - - @property - def _LONG_DTYPE(self): - return tf.int64 - - @property - def _FLOAT32_DTYPE(self): - return tf.float32 - - def _pack(self, gdf): - if isinstance(gdf, np.ndarray): - return gdf - elif hasattr(gdf, "to_dlpack") and callable(getattr(gdf, "to_dlpack")): - return gdf.to_dlpack() - elif hasattr(gdf, "to_numpy") and callable(getattr(gdf, "to_numpy")): - gdf = gdf.to_numpy() - if isinstance(gdf[0], list): - gdf = np.stack(gdf) - return gdf - return gdf.toDlpack() - - def _unpack(self, gdf): - if hasattr(gdf, "shape"): - return tf.convert_to_tensor(gdf) - return from_dlpack(gdf) - - def _to_tensor(self, gdf, dtype=None): - if gdf.empty: - return - - # checks necessary because of this bug - # https://github.com/tensorflow/tensorflow/issues/42660 - if len(gdf.shape) == 1 or gdf.shape[1] == 1: - dlpack = self._pack(gdf) - elif gdf.shape[0] == 1: - dlpack = self._pack(gdf.values[0]) - else: - dlpack = self._pack(gdf.values.T) - # catch error caused by tf eager context - # not being initialized - - try: - x = self._unpack(dlpack) - except AssertionError: - tf.random.uniform((1,)) - x = self._unpack(dlpack) - # if rank is already two it is already in list format - if gdf.shape[0] == 1 and not tf.rank(x) == 2: - # batch size 1 so got squashed to a vector - x = tf.expand_dims(x, 0) - elif len(gdf.shape) == 1 or len(x.shape) == 1: - # sort of a generic check for any other - # len(shape)==1 case, could probably - # be more specific - x = tf.expand_dims(x, -1) - elif gdf.shape[1] > 1: - # matrix which means we had to transpose - # for the bug above, so untranspose - x = tf.transpose(x) - return x - - def _pull_values_offsets(self, values_offset): - """ - values_offset is either a tuple (values, offsets) or just values. - Values is a tensor. - This method is used to turn a tensor into its sparse representation - """ - # pull_values_offsets, return values offsets diff_offsets - diff_offsets = None - if isinstance(values_offset, tuple): - values = tf.reshape(values_offset[0], [-1]) - diff_offsets = tf.cast(tf.reshape(values_offset[1], [-1]), dtype=tf.int64) - offsets = tf.math.cumsum(diff_offsets) - else: - values = tf.reshape(values_offset, [-1]) - offsets = tf.arange(tf.shape(values)[0], dtype=tf.int64) - diff_offsets = offsets[1:] - offsets[:-1] - num_rows = len(offsets) - return values, offsets, diff_offsets, num_rows - - def _get_max_seq_len(self, diff_offsets): - # get_max_seq_len, return int - return int(tf.math.reduce_max(diff_offsets)) - - def _get_indices(self, offsets, diff_offsets): - # Building the indices to reconstruct the sparse tensors - row_ids = tf.range(len(offsets), dtype=tf.int64) - - row_ids_repeated = tf.repeat(row_ids, diff_offsets) - row_offset_repeated = tf.repeat(offsets, diff_offsets) - col_ids = tf.range(len(row_offset_repeated), dtype=tf.int64) - row_offset_repeated - indices = tf.concat( - values=[tf.expand_dims(row_ids_repeated, -1), tf.expand_dims(col_ids, -1)], axis=1 - ) - return indices - - def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): - sparse_tensor = tf.sparse.SparseTensor( - indices=indices, values=values, dense_shape=[num_rows, seq_limit] - ) - return sparse_tensor - - def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): - ragged = tf.RaggedTensor.from_row_lengths(values=values, row_lengths=diff_offsets) - tensor = tf.RaggedTensor.from_tensor(ragged.to_tensor(shape=[None, seq_limit])).to_sparse() - if self.sparse_as_dense: - tensor = tf.sparse.to_dense(tensor) - return tensor - - def _handle_tensors(self, cats, conts, labels): - to_return = super()._handle_tensors(cats, conts, labels) - - if len(self.label_names) > 1 and self.multi_label_as_dict: - X, y = to_return - to_return = X, dict(zip(self.label_names, y)) - - for _, transform in self._transforms: - to_return = transform(*to_return) - - return to_return + # Override these parameters after initializing the parent dataloader + # class since the new dataloader will use sparse tensors for list + # columns by default, but sparse tensors were disabled by default + # and were optional in the old version of merlin.loader. + self.sparse_names = sparse_names or [] + self.sparse_max = sparse_max or {} + self.sparse_as_dense = sparse_as_dense @property def input_schema(self) -> Schema: - return self.data.schema + return self.dataset.schema @property def output_schema(self) -> Schema: schema = self.input_schema - for to, transform in self._transforms: - if hasattr(transform, "compute_output_schema"): - _schema = transform.compute_output_schema(schema) + for map_fn in self._map_fns: + if hasattr(map_fn, "compute_output_schema"): + schema = map_fn.compute_output_schema(schema) else: - raise ValueError(f"Couldn't infer schema from transform {transform}") - - if to == "all": - schema = _schema - elif to == "features": - targets_schema = schema.select_by_tag(Tags.Target) - schema = _schema + targets_schema - elif to == "targets": - features_schema = schema.remove_by_tag(Tags.Target) - schema = features_schema + _schema - else: - raise ValueError(f"Unknown schema target {to}") + raise ValueError(f"Couldn't infer schema from transform {map_fn}") return schema -class KerasSequenceValidator(tf.keras.callbacks.Callback): - # TODO: document - _supports_tf_logs = True - - def __init__(self, dataloader): - super().__init__() - self.dataloader = dataloader - - def on_epoch_end(self, epoch, logs=None): - logs = logs if logs is not None else {} - for X, y_true in self.dataloader: - y_pred = self.model(X) - - # TODO: how do we want to handle the multi-output case? - for metric in self.model.metrics: - metric.update_state(y_true, y_pred) - - set_logs = {} - for metric in self.model.metrics: - set_logs[f"val_{metric.name}"] = metric.result().numpy() - logs.update(set_logs) - print(set_logs) - return logs +KerasSequenceValidater = ( + KerasSequenceValidator +) = merlin.dataloader.tensorflow.KerasSequenceValidater def sample_batch( - data: Dataset, - batch_size: int, + dataset_or_loader: Union[Dataset, Loader], + batch_size: Optional[int] = None, shuffle: bool = False, include_targets: bool = True, to_ragged: bool = False, @@ -610,10 +393,14 @@ def sample_batch( from merlin.models.tf.transforms.tensor import ListToDense, ListToRagged, ProcessList - if not isinstance(data, Loader): - data = Loader(data, batch_size=batch_size, shuffle=shuffle) + if isinstance(dataset_or_loader, Dataset): + if not batch_size: + raise ValueError("Either use 'Loader' or specify 'batch_size'") + loader = Loader(dataset_or_loader, batch_size=batch_size, shuffle=shuffle) + else: + loader = dataset_or_loader - batch = next(iter(data)) + batch = loader.peek() # batch could be of type Prediction, so we can't unpack directly inputs, targets = batch[0], batch[1] @@ -622,7 +409,7 @@ def sample_batch( elif to_dense: inputs = ListToDense()(inputs) if process_lists: - inputs = ProcessList(data.schema)(inputs) + inputs = ProcessList(loader.schema)(inputs) if not include_targets: return inputs return inputs, targets diff --git a/merlin/models/tf/models/base.py b/merlin/models/tf/models/base.py index a0f011a360..0c16e93ee7 100644 --- a/merlin/models/tf/models/base.py +++ b/merlin/models/tf/models/base.py @@ -685,7 +685,7 @@ def adjust_predictions_and_targets( - Converts ragged targets (and their masks) to dense, so that they are compatible with most losses and metrics - Copies the targets mask to predictions mask, if defined - - One-hot encode targets if their tf.rank(targest) == tf.rank(predictions)-1 + - One-hot encode targets if their tf.rank(targets) == tf.rank(predictions)-1 - Ensures targets has the same shape and dtype as predicitnos Parameters diff --git a/merlin/models/tf/transforms/features.py b/merlin/models/tf/transforms/features.py index 2084792f74..48c63e9cad 100644 --- a/merlin/models/tf/transforms/features.py +++ b/merlin/models/tf/transforms/features.py @@ -52,7 +52,7 @@ def __init__(self, schema: Optional[Schema] = None, **kwargs): self.column_names = schema.column_names def call(self, inputs: TabularData, **kwargs) -> TabularData: - raise NotImplementedError("The call method need to be implemented by child clases") + raise NotImplementedError("The call method need to be implemented by child classes") def compute_output_shape(self, input_shapes): output_shapes = {} @@ -796,7 +796,6 @@ def reshape_categorical_input_tensor_for_encoding( ) else: - # if feat_name in features_2d_last_dim or len(input.get_shape()) == 2: if feat_name in features_2d_last_dim or ( input.get_shape()[-1] is not None and len(input.get_shape()) == 2 ): diff --git a/merlin/models/tf/transforms/tensor.py b/merlin/models/tf/transforms/tensor.py index 401b8e39c5..f141de3cee 100644 --- a/merlin/models/tf/transforms/tensor.py +++ b/merlin/models/tf/transforms/tensor.py @@ -43,6 +43,8 @@ def call(self, inputs: TabularData, **kwargs) -> TabularData: for name, val in inputs.items(): if isinstance(val, tuple): outputs[name] = list_col_to_ragged(val) + elif isinstance(val, tf.SparseTensor): + outputs[name] = tf.RaggedTensor.from_sparse(val) else: outputs[name] = val @@ -89,16 +91,19 @@ def call(self, inputs: TabularData, **kwargs) -> TabularData: if isinstance(val, tuple): ragged = list_col_to_ragged(val) + elif isinstance(val, tf.SparseTensor): + ragged = tf.RaggedTensor.from_sparse(val) + else: + outputs[name] = val + continue - if is_ragged: - if len(ragged.shape) == 2: - ragged = tf.expand_dims(ragged, axis=-1) + if is_ragged: + if len(ragged.shape) == 2: + ragged = tf.expand_dims(ragged, axis=-1) - outputs[name] = ragged - else: - outputs[name] = _ragged_to_dense(ragged) + outputs[name] = ragged else: - outputs[name] = val + outputs[name] = _ragged_to_dense(ragged) return outputs @@ -185,7 +190,7 @@ def __init__(self, max_seq_length: Optional[int] = None, **kwargs): self.max_seq_length = max_seq_length def call(self, inputs: Union[tuple, tf.RaggedTensor, TabularData], **kwargs) -> TabularData: - if isinstance(inputs, (tf.RaggedTensor, tuple)): + if isinstance(inputs, (tf.SparseTensor, tf.RaggedTensor, tuple)): return self._convert_tensor_to_dense(inputs) outputs = {} @@ -216,6 +221,8 @@ def _get_output_tensor_shape(self, val_shape): def _convert_tensor_to_dense(self, val): if isinstance(val, tuple): val = list_col_to_ragged(val) + if isinstance(val, tf.SparseTensor): + val = tf.RaggedTensor.from_sparse(val) if isinstance(val, tf.RaggedTensor): return _ragged_to_dense(val, self.max_seq_length) return tf.squeeze(val) diff --git a/merlin/models/tf/utils/testing_utils.py b/merlin/models/tf/utils/testing_utils.py index f96cef2dca..00930b5b08 100644 --- a/merlin/models/tf/utils/testing_utils.py +++ b/merlin/models/tf/utils/testing_utils.py @@ -76,7 +76,7 @@ def assert_model_is_retrainable( def model_test( model: Model, - dataset: Union[merlin.io.Dataset, Loader], + dataset_or_loader: Union[merlin.io.Dataset, Loader], run_eagerly: bool = True, optimizer="adam", epochs: int = 1, @@ -87,10 +87,16 @@ def model_test( """Generic model test. It will compile & fit the model and make sure it can be re-trained.""" model.compile(run_eagerly=run_eagerly, optimizer=optimizer, **kwargs) - fit_kwargs = fit_kwargs or {} - losses = model.fit(dataset, batch_size=50, epochs=epochs, steps_per_epoch=1, **fit_kwargs) - batch = sample_batch(dataset, batch_size=50, to_ragged=reload_model) + if isinstance(dataset_or_loader, merlin.io.Dataset): + dataloader = Loader(dataset_or_loader, batch_size=50) + else: + dataloader = dataset_or_loader + + batch = sample_batch(dataloader, to_ragged=reload_model) + + fit_kwargs = fit_kwargs or {} + losses = model.fit(dataloader, epochs=epochs, steps_per_epoch=1, **fit_kwargs) if reload_model: with tempfile.TemporaryDirectory() as tmpdir: @@ -109,6 +115,8 @@ def model_test( return loaded_model, losses + dataloader.stop() + assert isinstance(model.from_config(model.get_config()), type(model)) return model, losses @@ -159,7 +167,7 @@ def numeric_test(actual, expected): # This function is copied from keras/testing_infra/test_utils.py # We need it here because this was not publicly exposed prior to 2.9.0 -# and our CI tests muliple versions of tensorflow/keras +# and our CI tests multiple versions of tensorflow/keras @disable_cudnn_autotune def layer_test( layer_cls, diff --git a/merlin/models/torch/dataset.py b/merlin/models/torch/dataset.py deleted file mode 100644 index a110c375bf..0000000000 --- a/merlin/models/torch/dataset.py +++ /dev/null @@ -1,174 +0,0 @@ -# -# Copyright (c) 2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import numpy as np -import pandas as pd -import torch -from torch.utils.dlpack import from_dlpack - -from merlin.core.dispatch import HAS_GPU -from merlin.models.loader.backend import DataLoader - - -class Dataset(torch.utils.data.IterableDataset, DataLoader): - """This class creates batches of tensor. Each batch size is specified by the user. - The data input requires an NVTabular dataset. Handles spillover to ensure all - batches are the specified size until the final batch. - - Parameters - ----------- - dataset : NVTabular dataset - cats : [str] - the list of categorical columns in the dataset - conts : [str] - the list of continuous columns in the dataset - labels : [str] - the list of label columns in the dataset - batch_size : int - the size of each batch to supply to the model - shuffle : bool - enable/disable shuffling of dataset - parts_per_chunk : int - number of partitions from the iterator, an NVTabular Dataset, to concatenate into a "chunk" - device : int - device id of selected GPU - sparse_list : [str] - list with column names of columns that should be represented as sparse tensors - sparse_max : {str: int} - dictionary of key: column_name + value: integer representing max sequence length for column - sparse_as_dense : bool - bool value to activate transforming sparse tensors to dense - """ - - def __init__( - self, - dataset, - cats=None, - conts=None, - labels=None, - batch_size=1, - shuffle=False, - seed_fn=None, - parts_per_chunk=1, - device=None, - global_size=None, - global_rank=None, - drop_last=False, - sparse_names=None, - sparse_max=None, - sparse_as_dense=False, - ): - DataLoader.__init__( - self, - dataset, - batch_size, - shuffle, - cat_names=cats, - cont_names=conts, - label_names=labels, - seed_fn=seed_fn, - parts_per_chunk=parts_per_chunk, - device=device, - global_size=global_size, - global_rank=global_rank, - drop_last=drop_last, - sparse_names=sparse_names, - sparse_max=sparse_max, - sparse_as_dense=sparse_as_dense, - ) - - def __iter__(self): - return DataLoader.__iter__(self) - - def _get_device_ctx(self, dev): - if dev == "cpu": - return torch.device("cpu") - return torch.cuda.device("cuda:{}".format(dev)) - - def _pack(self, gdf): - if self.device == "cpu" or isinstance(gdf, pd.DataFrame) or not HAS_GPU: - return gdf - return gdf.to_dlpack() - - def _unpack(self, dlpack): - if self.device == "cpu" or isinstance(dlpack, pd.DataFrame) or not HAS_GPU: - if ( - len(dlpack.values.shape) == 2 - and dlpack.values.shape[1] == 1 - and isinstance(dlpack.values[0], np.ndarray) - ): - return torch.squeeze(torch.Tensor(dlpack.values)) - return torch.Tensor(dlpack.values) - return from_dlpack(dlpack) - - def _to_tensor(self, gdf, dtype=None): - dl_pack = self._pack(gdf) - tensor = self._unpack(dl_pack) - return tensor.type(dtype) - - def _split_fn(self, tensor, idx, axis=0): - return torch.split(tensor, idx, dim=axis) - - def _tensor_split(self, tensor, idx, axis=0): - return torch.tensor_split(tensor, idx, axis=axis) - - @property - def _LONG_DTYPE(self): - return torch.long - - @property - def _FLOAT32_DTYPE(self): - return torch.float32 - - def _pull_values_offsets(self, values_offset): - # pull_values_offsets, return values offsets diff_offsets - if isinstance(values_offset, tuple): - values = values_offset[0].flatten() - offsets = values_offset[1].flatten() - else: - values = values_offset.flatten() - offsets = torch.arange(values.size()[0], device=self.device) - num_rows = len(offsets) - if HAS_GPU: - offsets = torch.cat([offsets, torch.cuda.LongTensor([len(values)])]) - else: - offsets = torch.cat([offsets, torch.LongTensor([len(values)])]) - diff_offsets = offsets[1:] - offsets[:-1] - return values, offsets, diff_offsets, num_rows - - def _get_max_seq_len(self, diff_offsets): - return int(diff_offsets.max()) - - # Building the indices to reconstruct the sparse tensors - - def _get_indices(self, offsets, diff_offsets): - row_ids = torch.arange(len(offsets) - 1, device=self.device) - row_ids_repeated = torch.repeat_interleave(row_ids, diff_offsets) - row_offset_repeated = torch.repeat_interleave(offsets[:-1], diff_offsets) - col_ids = torch.arange(len(row_offset_repeated), device=self.device) - row_offset_repeated - indices = torch.cat([row_ids_repeated.unsqueeze(-1), col_ids.unsqueeze(-1)], axis=1) - return indices - - def _get_sparse_tensor(self, values, indices, num_rows, seq_limit): - sparse_tensor = torch.sparse_coo_tensor( - indices.T, values, torch.Size([num_rows, seq_limit]), device=self.device - ) - if self.sparse_as_dense: - sparse_tensor = sparse_tensor.to_dense() - return sparse_tensor - - def _build_sparse_tensor(self, values, offsets, diff_offsets, num_rows, seq_limit): - indices = self._get_indices(offsets, diff_offsets) - return self._get_sparse_tensor(values, indices, num_rows, seq_limit) diff --git a/merlin/models/torch/utils/torch_utils.py b/merlin/models/torch/utils/torch_utils.py index db321de1fc..8453dbd617 100644 --- a/merlin/models/torch/utils/torch_utils.py +++ b/merlin/models/torch/utils/torch_utils.py @@ -142,7 +142,7 @@ def get_output_sizes_from_schema(schema: Schema, batch_size=-1, max_sequence_len for feature in schema: name = feature.name # Sequential or multi-hot feature - if feature.is_list: + if feature.is_list and feature.value_count.max: sizes[name] = torch.Size( [ batch_size, diff --git a/requirements/base.txt b/requirements/base.txt index 5a667ff19f..97a785662d 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,2 +1,2 @@ merlin-core>=0.2.0 - +merlin-dataloader>=0.0.2 diff --git a/tests/common/tf/retrieval/retrieval_utils.py b/tests/common/tf/retrieval/retrieval_utils.py index 3968880684..2bb649b563 100644 --- a/tests/common/tf/retrieval/retrieval_utils.py +++ b/tests/common/tf/retrieval/retrieval_utils.py @@ -784,9 +784,8 @@ def run(self, hparams): train_loader = mm.Loader( self.train_ds, batch_size=self.eval_batch_size, - transform=mm.ToTarget(self.train_ds.schema, item_id_name), shuffle=False, - ) + ).map(mm.ToTarget(self.train_ds.schema, item_id_name)) train_metrics = recommender.evaluate( train_loader, @@ -800,9 +799,8 @@ def run(self, hparams): eval_loader = mm.Loader( self.eval_ds, batch_size=self.eval_batch_size, - transform=mm.ToTarget(self.eval_ds.schema, item_id_name), shuffle=False, - ) + ).map(mm.ToTarget(self.eval_ds.schema, item_id_name)) eval_metrics = recommender.evaluate( eval_loader, batch_size=self.eval_batch_size, diff --git a/tests/unit/datasets/test_synthetic.py b/tests/unit/datasets/test_synthetic.py index a3eaf6bd56..9a4ec3a6bf 100644 --- a/tests/unit/datasets/test_synthetic.py +++ b/tests/unit/datasets/test_synthetic.py @@ -70,8 +70,8 @@ def test_sequence_data_length(generate_data_kwargs, expected_sequence_length): tensors, y = sample_batch(data, batch_size=1, process_lists=False) - assert all(tensors["item_id_seq"][1] == expected_sequence_length) - assert all(tensors["categories"][1] == expected_sequence_length) + for col in ["item_id_seq", "categories"]: + assert all(tensors[col][1] == expected_sequence_length) def test_generate_user_item_interactions_dtypes(): diff --git a/tests/unit/tf/blocks/test_interactions.py b/tests/unit/tf/blocks/test_interactions.py index 0ae4f4cc2c..82d3eaf9dd 100644 --- a/tests/unit/tf/blocks/test_interactions.py +++ b/tests/unit/tf/blocks/test_interactions.py @@ -86,7 +86,8 @@ def test_fm_block_with_multi_hot_categ_features(testing_data: Dataset): wide_input_block=wide_input_block, factors_dim=32, ) + batch, _ = mm.sample_batch(testing_data, batch_size=16, process_lists=False) output = fm_block(batch) - output.shape.as_list() == [16, 1] + assert output.shape.as_list() == [16, 1] diff --git a/tests/unit/tf/core/test_encoder.py b/tests/unit/tf/core/test_encoder.py index f3ded9a0ef..300e525531 100644 --- a/tests/unit/tf/core/test_encoder.py +++ b/tests/unit/tf/core/test_encoder.py @@ -6,7 +6,7 @@ import merlin.models.tf as mm from merlin.io import Dataset from merlin.models.tf.utils import testing_utils -from merlin.models.utils.dataset import unique_rows_by_features +from merlin.models.utils.dataset import unique_by_tag from merlin.schema import Tags @@ -29,11 +29,12 @@ def test_encoder_block(music_streaming_data: Dataset): assert model.blocks[0]["query"] == user_encoder assert model.blocks[0]["candidate"] == item_encoder - testing_utils.model_test(model, music_streaming_data, reload_model=True) + loader = mm.Loader(music_streaming_data, batch_size=50) + testing_utils.model_test(model, loader, reload_model=True) with pytest.raises(Exception) as excinfo: user_encoder.compile("adam") - user_encoder.fit(music_streaming_data) + user_encoder.fit(loader) assert "This block is not meant to be trained by itself" in str(excinfo.value) @@ -67,16 +68,16 @@ def test_topk_encoder(music_streaming_data: Dataset): testing_utils.model_test(retrieval_model, music_streaming_data, reload_model=True) # 2. Get candidates embeddings for the top-k encoder - candidate_features = unique_rows_by_features(music_streaming_data, Tags.ITEM, Tags.ITEM_ID) + candidate_features = unique_by_tag(music_streaming_data, Tags.ITEM, Tags.ITEM_ID) candidates = retrieval_model.candidate_embeddings( candidate_features, batch_size=BATCH_SIZE, index=Tags.ITEM_ID ) # 3. Set data-loader for top-k recommendation - loader = mm.Loader( - music_streaming_data, batch_size=BATCH_SIZE, transform=mm.ToTarget(schema, "item_id") + loader = mm.Loader(music_streaming_data, batch_size=BATCH_SIZE).map( + mm.ToTarget(schema, "item_id") ) - batch = next(iter(loader)) + batch = loader.peek() # 4. Define the top-k encoder topk_encoder = mm.TopKEncoder( @@ -85,6 +86,7 @@ def test_topk_encoder(music_streaming_data: Dataset): # 5. Get top-k predictions batch_output = topk_encoder(batch[0]) predict_output = topk_encoder.predict(loader) + assert list(batch_output.scores.shape) == [BATCH_SIZE, TOP_K] assert list(predict_output.scores.shape) == [100, TOP_K] diff --git a/tests/unit/tf/core/test_index.py b/tests/unit/tf/core/test_index.py index 04bd5edd3a..75f67ed535 100644 --- a/tests/unit/tf/core/test_index.py +++ b/tests/unit/tf/core/test_index.py @@ -83,6 +83,8 @@ def test_topk_recommender_outputs(ecommerce_data: Dataset, batch_size=100): def numpy_recall(labels, top_item_ids, k): return np.equal(np.expand_dims(labels, -1), top_item_ids[:, :k]).max(axis=-1).mean() + ecommerce_data.schema = ecommerce_data.schema.select_by_name(["user_categories", "item_id"]) + model = mm.TwoTowerModel( ecommerce_data.schema, query_tower=mm.MLPBlock([64]), diff --git a/tests/unit/tf/core/test_prediction.py b/tests/unit/tf/core/test_prediction.py index 8bc40e2591..118b8a0bdc 100644 --- a/tests/unit/tf/core/test_prediction.py +++ b/tests/unit/tf/core/test_prediction.py @@ -42,7 +42,7 @@ def compute_output_shape(self, input_shape): def test_model_pre_transforming_targets(ecommerce_data: Dataset, run_eagerly): class FlipTargets(tf.keras.layers.Layer): def call(self, inputs: Dict[str, tf.Tensor], targets=None): - if targets: + if targets is not None: if isinstance(targets, dict): flipped = {} for key in targets: diff --git a/tests/unit/tf/horovod/test_horovod.py b/tests/unit/tf/horovod/test_horovod.py index 52a355ad9a..36513177af 100644 --- a/tests/unit/tf/horovod/test_horovod.py +++ b/tests/unit/tf/horovod/test_horovod.py @@ -95,6 +95,11 @@ def test_horovod_multigpu_dlrm( assert all(measure >= 0 for metric in losses.history for measure in losses.history[metric]) + # Check the steps in each worker to check that the dataset is distributed + # across workers. If this works correctly in a multi-gpu setting, the steps + # should decrease with more workers, e.g., steps = 9 in each worker with + # 1 GPU, steps = 4 in each worker with 2 GPUS, steps = 3 in each worker + # with 3 GPUS, and so on. if hvd_installed: assert losses.params["steps"] == 9 // hvd.size() else: @@ -149,7 +154,7 @@ def test_horovod_multigpu_two_tower( assert all(measure >= 0 for metric in losses.history for measure in losses.history[metric]) if hvd_installed: - assert losses.params["steps"] == 9 // hvd.size() + assert losses.params["steps"] == 9 // hvd.size() # 9 steps per epoch; 2 epochs else: assert losses.params["steps"] == 9 diff --git a/tests/unit/tf/inputs/test_base.py b/tests/unit/tf/inputs/test_base.py index c8299c3dbf..39974f6dd6 100644 --- a/tests/unit/tf/inputs/test_base.py +++ b/tests/unit/tf/inputs/test_base.py @@ -19,7 +19,8 @@ def test_concat_sequence(sequence_testing_data): - seq_schema = sequence_testing_data.schema.select_by_tag(Tags.SEQUENCE) + sequence_testing_data.schema = sequence_testing_data.schema.select_by_tag(Tags.SEQUENCE) + seq_schema = sequence_testing_data.schema seq_inputs = mm.InputBlockV2( seq_schema, diff --git a/tests/unit/tf/inputs/test_tabular.py b/tests/unit/tf/inputs/test_tabular.py index f9d28aea54..65a39b7910 100644 --- a/tests/unit/tf/inputs/test_tabular.py +++ b/tests/unit/tf/inputs/test_tabular.py @@ -18,16 +18,16 @@ import pytest import tensorflow as tf -import merlin.models.tf as ml +import merlin.models.tf as mm from merlin.io import Dataset from merlin.models.tf.utils import testing_utils from merlin.schema import ColumnSchema, Schema, Tags def test_tabular_features(testing_data: Dataset): - tab_module = ml.InputBlock(testing_data.schema) + tab_module = mm.InputBlock(testing_data.schema) - outputs = tab_module(ml.sample_batch(testing_data, batch_size=100, include_targets=False)) + outputs = tab_module(mm.sample_batch(testing_data, batch_size=100, include_targets=False)) con = testing_data.schema.select_by_tag(Tags.CONTINUOUS).column_names cat = testing_data.schema.select_by_tag(Tags.CATEGORICAL).column_names @@ -36,7 +36,7 @@ def test_tabular_features(testing_data: Dataset): def test_serialization_tabular_features(testing_data: Dataset): - inputs = ml.InputBlock(testing_data.schema) + inputs = mm.InputBlock(testing_data.schema) copy_layer = testing_utils.assert_serialization(inputs) @@ -44,9 +44,9 @@ def test_serialization_tabular_features(testing_data: Dataset): def test_tabular_features_with_projection(testing_data: Dataset): - tab_module = ml.InputBlock(testing_data.schema, continuous_projection=ml.MLPBlock([64])) + tab_module = mm.InputBlock(testing_data.schema, continuous_projection=mm.MLPBlock([64])) - outputs = tab_module(ml.sample_batch(testing_data, batch_size=100, include_targets=False)) + outputs = tab_module(mm.sample_batch(testing_data, batch_size=100, include_targets=False)) continuous_feature_names = testing_data.schema.select_by_tag(Tags.CONTINUOUS).column_names assert len(set(continuous_feature_names).intersection(set(outputs.keys()))) == 0 @@ -60,15 +60,15 @@ def test_tabular_features_yoochoose_model( music_streaming_data: Dataset, run_eagerly, continuous_projection ): if continuous_projection: - continuous_projection = ml.MLPBlock([continuous_projection]) - inputs = ml.InputBlock( + continuous_projection = mm.MLPBlock([continuous_projection]) + inputs = mm.InputBlock( music_streaming_data.schema, continuous_projection=continuous_projection, aggregation="concat", ) - body = ml.SequentialBlock([inputs, ml.MLPBlock([64])]) - model = ml.Model(body, ml.BinaryClassificationTask("click")) + body = mm.SequentialBlock([inputs, mm.MLPBlock([64])]) + model = mm.Model(body, mm.BinaryClassificationTask("click")) testing_utils.model_test(model, music_streaming_data, run_eagerly=run_eagerly) @@ -80,31 +80,30 @@ def test_tabular_features_yoochoose_model_inputblockv2( ): kwargs = {} if continuous_projection: - kwargs["continuous"] = ml.ContinuousProjection( + kwargs["continuous"] = mm.ContinuousProjection( music_streaming_data.schema.select_by_tag(Tags.CONTINUOUS), - ml.MLPBlock([continuous_projection]), + mm.MLPBlock([continuous_projection]), ) - inputs = ml.InputBlockV2(music_streaming_data.schema, aggregation="concat", **kwargs) + inputs = mm.InputBlockV2(music_streaming_data.schema, aggregation="concat", **kwargs) - body = ml.SequentialBlock([inputs, ml.MLPBlock([64])]) - model = ml.Model(body, ml.BinaryClassificationTask("click")) + body = mm.SequentialBlock([inputs, mm.MLPBlock([64])]) + model = mm.Model(body, mm.BinaryClassificationTask("click")) testing_utils.model_test(model, music_streaming_data, run_eagerly=run_eagerly) def test_tabular_seq_features_ragged_embeddings(sequence_testing_data: Dataset): - tab_module = ml.InputBlockV2( + tab_module = mm.InputBlockV2( sequence_testing_data.schema, - categorical=ml.Embeddings( + categorical=mm.Embeddings( sequence_testing_data.schema.select_by_tag(Tags.CATEGORICAL), sequence_combiner=None ), aggregation=None, ) - batch = ml.sample_batch( - sequence_testing_data, batch_size=100, include_targets=False, to_ragged=True - ) + loader = mm.Loader(sequence_testing_data, batch_size=100) + batch = mm.sample_batch(loader, include_targets=False, to_ragged=True) outputs = tab_module(batch) @@ -122,19 +121,18 @@ def test_tabular_seq_features_ragged_embeddings(sequence_testing_data: Dataset): ) def test_tabular_seq_features_ragged_emb_combiner(sequence_testing_data: Dataset, seq_combiner): con2d = sequence_testing_data.schema.select_by_tag(Tags.CONTINUOUS).remove_by_tag(Tags.SEQUENCE) - input_block = ml.InputBlockV2( + input_block = mm.InputBlockV2( sequence_testing_data.schema, - categorical=ml.Embeddings( + categorical=mm.Embeddings( sequence_testing_data.schema.select_by_tag(Tags.CATEGORICAL), sequence_combiner=seq_combiner, ), - continuous=ml.Continuous(con2d), + continuous=mm.Continuous(con2d), aggregation=None, ) - batch = ml.sample_batch( - sequence_testing_data, batch_size=100, include_targets=False, to_ragged=True - ) + loader = mm.Loader(sequence_testing_data, batch_size=100) + batch = mm.sample_batch(loader, include_targets=False, to_ragged=True) outputs = input_block(batch) @@ -150,19 +148,18 @@ def test_tabular_seq_features_ragged_custom_emb_combiner(sequence_testing_data: schema = schema + Schema([ColumnSchema("item_id_seq_weights")]) assert "item_id_seq_weights" in schema.column_names - batch = ml.sample_batch( - sequence_testing_data, batch_size=100, include_targets=False, to_ragged=True - ) + loader = mm.Loader(sequence_testing_data, batch_size=100) + batch = mm.sample_batch(loader, include_targets=False, to_ragged=True) batch["item_id_seq_weights"] = tf.ragged.constant( [[1.0, 2.0, 3.0, 4.0] for _ in range(batch["item_id_seq"].shape[0])], row_splits_dtype=batch["item_id_seq"].row_splits.dtype, ) - input_block_weighed_avg = ml.InputBlockV2( + input_block_weighed_avg = mm.InputBlockV2( schema, - categorical=ml.Embeddings( + categorical=mm.Embeddings( schema.select_by_tag(Tags.CATEGORICAL), - sequence_combiner=ml.AverageEmbeddingsByWeightFeature.from_schema_convention( + sequence_combiner=mm.AverageEmbeddingsByWeightFeature.from_schema_convention( schema, "_weights" ), ), @@ -171,9 +168,9 @@ def test_tabular_seq_features_ragged_custom_emb_combiner(sequence_testing_data: outputs_weighted_avg = input_block_weighed_avg(batch, features=batch) - input_block_simple_avg = ml.InputBlockV2( + input_block_simple_avg = mm.InputBlockV2( schema, - categorical=ml.Embeddings( + categorical=mm.Embeddings( schema.select_by_tag(Tags.CATEGORICAL), sequence_combiner=tf.keras.layers.Lambda(lambda x: tf.reduce_mean(x, axis=1)), ), @@ -197,16 +194,16 @@ def test_tabular_seq_features_ragged_custom_emb_combiner(sequence_testing_data: def test_tabular_seq_features_avg_embeddings_with_mapvalues(sequence_testing_data: Dataset): cat_schema = sequence_testing_data.schema.select_by_tag(Tags.CATEGORICAL) - batch = ml.sample_batch( + batch = mm.sample_batch( sequence_testing_data, batch_size=100, include_targets=False, to_ragged=True ) - input_block = ml.InputBlockV2( + input_block = mm.InputBlockV2( cat_schema, - categorical=ml.Embeddings( + categorical=mm.Embeddings( cat_schema, ), - post=ml.MapValues( + post=mm.MapValues( tf.keras.layers.Lambda( lambda x: tf.math.reduce_mean(x, axis=1) if isinstance(x, tf.RaggedTensor) else x ) @@ -226,16 +223,15 @@ def test_tabular_seq_features_avg_embeddings_with_mapvalues(sequence_testing_dat @pytest.mark.parametrize("aggregation", [None, "concat"]) def test_embedding_tables_from_schema_infer_dims(sequence_testing_data: Dataset, aggregation: str): cat_schema = sequence_testing_data.schema.select_by_tag(Tags.CATEGORICAL) - embeddings_block = ml.Embeddings( + embeddings_block = mm.Embeddings( cat_schema.select_by_tag(Tags.CATEGORICAL), dim={"item_id_seq": 15, "test_user_id": 21}, embeddings_initializer="truncated_normal", ) - input_block = ml.InputBlockV2(cat_schema, categorical=embeddings_block, aggregation=aggregation) + input_block = mm.InputBlockV2(cat_schema, categorical=embeddings_block, aggregation=aggregation) - batch = ml.sample_batch( - sequence_testing_data, batch_size=100, include_targets=False, to_ragged=True - ) + loader = mm.Loader(sequence_testing_data, batch_size=100) + batch = mm.sample_batch(loader, include_targets=False, to_ragged=True) outputs = input_block(batch) diff --git a/tests/unit/tf/models/test_base.py b/tests/unit/tf/models/test_base.py index 7d9c53f96a..028cc43db0 100644 --- a/tests/unit/tf/models/test_base.py +++ b/tests/unit/tf/models/test_base.py @@ -48,7 +48,7 @@ def test_fit_compile_twice(): dataset.schema = Schema( [ ColumnSchema("feature", dtype=np.int32, tags=[Tags.CONTINUOUS]), - ColumnSchema("target", dtype=np.int32, tags=[Tags.BINARY_CLASSIFICATION]), + ColumnSchema("target", dtype=np.int32, tags=[Tags.TARGET, Tags.BINARY_CLASSIFICATION]), ] ) loader = mm.Loader(dataset, batch_size=2, shuffle=False) @@ -721,8 +721,8 @@ def test_retrieval_model_query(ecommerce_data: Dataset, run_eagerly=True): query = ecommerce_data.schema.select_by_tag(Tags.USER_ID) candidate = ecommerce_data.schema.select_by_tag(Tags.ITEM_ID) - loader = mm.Loader( - ecommerce_data, batch_size=50, transform=mm.ToTarget(ecommerce_data.schema, Tags.ITEM_ID) + loader = mm.Loader(ecommerce_data, batch_size=50).map( + mm.ToTarget(ecommerce_data.schema, Tags.ITEM_ID) ) model = mm.RetrievalModelV2( @@ -898,8 +898,8 @@ def test_categorical_prediction_with_temperature(sequence_testing_data: Dataset) ), ) - loader = mm.Loader( - train, batch_size=1024, transform=mm.ToTarget(train.schema, "user_country", one_hot=True) + loader = mm.Loader(train, batch_size=1024).map( + mm.ToTarget(train.schema, "user_country", one_hot=True) ) model.compile(run_eagerly=False, optimizer="adam") diff --git a/tests/unit/tf/models/test_retrieval.py b/tests/unit/tf/models/test_retrieval.py index 10b405d294..478ffb5488 100644 --- a/tests/unit/tf/models/test_retrieval.py +++ b/tests/unit/tf/models/test_retrieval.py @@ -133,10 +133,8 @@ def test_matrix_factorization_topk_evaluation(music_streaming_data: Dataset, run topk_model = model.to_top_k_encoder(candidate_features, k=20, batch_size=16) topk_model.compile(run_eagerly=run_eagerly) - loader = mm.Loader( - music_streaming_data, - batch_size=32, - transform=mm.ToTarget(music_streaming_data.schema, "item_id"), + loader = mm.Loader(music_streaming_data, batch_size=32).map( + mm.ToTarget(music_streaming_data.schema, "item_id") ) metrics = topk_model.evaluate(loader, return_dict=True) @@ -422,7 +420,7 @@ def test_two_tower_model_topk_evaluation(ecommerce_data: Dataset, run_eagerly): topk_model = model.to_top_k_encoder(candidate_features, k=20, batch_size=16) topk_model.compile(run_eagerly=run_eagerly) - loader = mm.Loader(ecommerce_data, batch_size=32, transform=mm.ToTarget(schema, "item_id")) + loader = mm.Loader(ecommerce_data, batch_size=32).map(mm.ToTarget(schema, "item_id")) metrics = topk_model.evaluate(loader, return_dict=True) assert all([metric >= 0 for metric in metrics.values()]) @@ -744,6 +742,7 @@ def test_two_tower_retrieval_model_v2_with_topk_metrics_aggregator( def test_two_tower_advanced_options(ecommerce_data): + ecommerce_data.schema = ecommerce_data.schema.select_by_name(["user_id", "item_id"]) train_ds, eval_ds = ecommerce_data, ecommerce_data metrics = retrieval_tests_common.train_eval_two_tower_for_lastfm( train_ds, @@ -763,6 +762,7 @@ def test_two_tower_advanced_options(ecommerce_data): def test_mf_advanced_options(ecommerce_data): + ecommerce_data.schema = ecommerce_data.schema.select_by_name(["user_id", "item_id"]) train_ds, eval_ds = ecommerce_data, ecommerce_data metrics = retrieval_tests_common.train_eval_mf_for_lastfm( train_ds, @@ -829,9 +829,7 @@ def last_interaction_as_target(inputs, targets): return inputs, targets - dataloader = mm.Loader( - sequence_testing_data, batch_size=50, transform=last_interaction_as_target - ) + dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(last_interaction_as_target) losses = model.fit(dataloader, epochs=1) @@ -858,7 +856,7 @@ def test_youtube_dnn_retrieval_v2(sequence_testing_data: Dataset, run_eagerly, t schema=sequence_testing_data.schema, top_block=mm.MLPBlock([32]), num_sampled=1000 ) - dataloader = mm.Loader(sequence_testing_data, batch_size=50, transform=target_augmentation) + dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(target_augmentation) _, losses = testing_utils.model_test( model, dataloader, reload_model=True, run_eagerly=run_eagerly @@ -938,7 +936,7 @@ def test_youtube_dnn_v2_export_embeddings(sequence_testing_data: Dataset): schema=sequence_testing_data.schema, top_block=mm.MLPBlock([32]), num_sampled=1000 ) - dataloader = mm.Loader(sequence_testing_data, batch_size=50, transform=predict_next) + dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(predict_next) model, _ = testing_utils.model_test(model, dataloader, reload_model=False) candidates = model.candidate_embeddings().compute() @@ -971,7 +969,7 @@ def test_youtube_dnn_topk_evaluation(sequence_testing_data: Dataset, run_eagerly schema=sequence_testing_data.schema, top_block=mm.MLPBlock([32]), num_sampled=1000 ) - dataloader = mm.Loader(sequence_testing_data, batch_size=50, transform=predict_next) + dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(predict_next) model, _ = testing_utils.model_test(model, dataloader, reload_model=False) diff --git a/tests/unit/tf/prediction_tasks/test_multi_task.py b/tests/unit/tf/prediction_tasks/test_multi_task.py index d2c145eb17..4f91d2cb10 100644 --- a/tests/unit/tf/prediction_tasks/test_multi_task.py +++ b/tests/unit/tf/prediction_tasks/test_multi_task.py @@ -100,13 +100,13 @@ def call_outputs( self, outputs: PredictionOutput, features: Dict[str, tf.Tensor] = None, - targets: Dict[str, tf.Tensor] = None, + targets: tf.Tensor = None, training=True, testing=False, **kwargs, ) -> PredictionOutput: # Computes loss for the like loss only for clicked items - outputs = outputs.copy_with_updates(sample_weight=targets["click"]) + outputs = outputs.copy_with_updates(sample_weight=targets) return outputs inputs = ml.InputBlock(music_streaming_data.schema) diff --git a/tests/unit/tf/test_loader.py b/tests/unit/tf/test_loader.py index 6dab379916..cc5bfc392c 100644 --- a/tests/unit/tf/test_loader.py +++ b/tests/unit/tf/test_loader.py @@ -44,7 +44,7 @@ def identity(x, y): loader = mm.Loader(dataset, batch_size=10, transform=identity) - elapsed_time_seconds = timeit.timeit(lambda: next(loader), number=1) + elapsed_time_seconds = timeit.timeit(lambda: next(iter(loader)), number=1) assert elapsed_time_seconds < 1 @@ -226,8 +226,7 @@ def add_sample_weight(features, labels, sample_weight_col_name="sample_weight"): batch_size=10, label_names=label_name, shuffle=False, - transform=add_sample_weight, - ) + ).map(add_sample_weight) for X, y, sample_weight in loader: assert list(X["cat1"].numpy()) == [1] * 10 diff --git a/tests/unit/tf/transformers/test_block.py b/tests/unit/tf/transformers/test_block.py index f95a6b88fa..45a1660f12 100644 --- a/tests/unit/tf/transformers/test_block.py +++ b/tests/unit/tf/transformers/test_block.py @@ -28,9 +28,10 @@ def test_import(): @pytest.mark.parametrize("run_eagerly", [True]) def test_retrieval_transformer(sequence_testing_data: Dataset, run_eagerly): - seq_schema = sequence_testing_data.schema.select_by_tag(Tags.SEQUENCE).select_by_tag( - Tags.CATEGORICAL - ) + sequence_testing_data.schema = sequence_testing_data.schema.select_by_tag( + Tags.SEQUENCE + ).select_by_tag(Tags.CATEGORICAL) + seq_schema = sequence_testing_data.schema target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0] predict_last = mm.SequencePredictLast(schema=seq_schema, target=target) @@ -160,7 +161,7 @@ def test_hf_tranformers_blocks(encoder): @pytest.mark.parametrize("run_eagerly", [True, False]) -def test_transformer_as_classfication_model(sequence_testing_data: Dataset, run_eagerly): +def test_transformer_as_classification_model(sequence_testing_data: Dataset, run_eagerly): EMBED_DIM = 48 loader, schema = classification_loader(sequence_testing_data) @@ -180,7 +181,8 @@ def test_transformer_as_classfication_model(sequence_testing_data: Dataset, run_ ), ) - batch = next(iter(loader))[0] + batch = loader.peek()[0] + outputs = model(batch) assert list(outputs.shape) == [50, 63] testing_utils.model_test(model, loader, run_eagerly=run_eagerly) @@ -224,8 +226,7 @@ def classification_loader(sequence_testing_data: Dataset): dataloader = mm.Loader( sequence_testing_data, batch_size=50, - transform=mm.ToTarget(schema, "user_country", one_hot=True), - ) + ).map(mm.ToTarget(schema, "user_country", one_hot=True)) return dataloader, schema @@ -238,7 +239,11 @@ def test_transformer_with_causal_language_modeling(sequence_testing_data: Datase target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0] predict_next = mm.SequencePredictNext(schema=seq_schema, target=target) - loader = Loader(sequence_testing_data, batch_size=8, shuffle=False, transform=predict_next) + loader = Loader( + sequence_testing_data, + batch_size=8, + shuffle=False, + ).map(predict_next) model = mm.Model( mm.InputBlockV2( @@ -295,7 +300,8 @@ def test_transformer_with_masked_language_modeling(sequence_testing_data: Datase ) seq_mask_random = mm.SequenceMaskRandom(schema=seq_schema, target=target, masking_prob=0.3) - inputs, targets = next(iter(loader)) + inputs, targets = loader.peek() + outputs = model(inputs, targets=targets, training=True) assert list(outputs.shape) == [8, 4, 51997] testing_utils.model_test( @@ -357,12 +363,16 @@ def test_transformer_with_masked_language_modeling_check_eval_masked( # This transform only extracts targets, but without applying mask seq_target_as_input_no_mask = mm.SequenceTargetAsInput(schema=seq_schema, target=target) - metrics_all_positions1 = model.evaluate( - loader, batch_size=8, steps=1, return_dict=True, pre=seq_target_as_input_no_mask - ) - metrics_all_positions2 = model.evaluate( - loader, batch_size=8, steps=1, return_dict=True, pre=seq_target_as_input_no_mask - ) + + with Loader(sequence_testing_data, batch_size=8, shuffle=False) as loader: + metrics_all_positions1 = model.evaluate( + loader, batch_size=8, steps=1, return_dict=True, pre=seq_target_as_input_no_mask + ) + + with Loader(sequence_testing_data, batch_size=8, shuffle=False) as loader: + metrics_all_positions2 = model.evaluate( + loader, batch_size=8, steps=1, return_dict=True, pre=seq_target_as_input_no_mask + ) def _metrics_almost_equal(metrics1, metrics2): return np.all( diff --git a/tests/unit/tf/transforms/test_features.py b/tests/unit/tf/transforms/test_features.py index a01ef014c3..dd9a7e25eb 100644 --- a/tests/unit/tf/transforms/test_features.py +++ b/tests/unit/tf/transforms/test_features.py @@ -967,7 +967,7 @@ def test_to_target_loader(): dataset = Dataset(input_df, schema=schema) # target is passed as a string. - loader0 = mm.Loader(dataset, batch_size=10, transform=mm.ToTarget(schema, "c")) + loader0 = mm.Loader(dataset, batch_size=10).map(mm.ToTarget(schema, "c")) inputs0, targets0 = next(iter(loader0)) assert sorted(inputs0.keys()) == ["a", "b"] assert targets0.numpy().tolist() == [[3], [6]] @@ -976,7 +976,7 @@ def test_to_target_loader(): # target is passed as a ColumnSchema target_column_schema = ColumnSchema("c", tags=[Tags.CATEGORICAL]) assert Tags.TARGET not in target_column_schema.tags - loader1 = mm.Loader(dataset, batch_size=10, transform=mm.ToTarget(schema, target_column_schema)) + loader1 = mm.Loader(dataset, batch_size=10).map(mm.ToTarget(schema, target_column_schema)) inputs1, targets1 = next(iter(loader1)) assert sorted(inputs1.keys()) == ["a", "b"] assert targets1.numpy().tolist() == [[3], [6]] @@ -985,14 +985,14 @@ def test_to_target_loader(): # target is passed as a Schema target_schema = schema.select_by_name("c") assert not target_schema.select_by_tag(Tags.TARGET) - loader2 = mm.Loader(dataset, batch_size=10, transform=mm.ToTarget(schema, target_schema)) + loader2 = mm.Loader(dataset, batch_size=10).map(mm.ToTarget(schema, target_schema)) inputs2, targets2 = next(iter(loader2)) assert sorted(inputs2.keys()) == ["a", "b"] assert targets2.numpy().tolist() == [[3], [6]] assert loader2.output_schema.select_by_tag(Tags.TARGET).column_names == ["c"] # target is passed as a Tag - loader3 = mm.Loader(dataset, batch_size=10, transform=mm.ToTarget(schema, Tags.ITEM)) + loader3 = mm.Loader(dataset, batch_size=10).map(mm.ToTarget(schema, Tags.ITEM)) inputs3, targets3 = next(iter(loader3)) assert sorted(inputs3.keys()) == ["a", "b"] assert targets3.numpy().tolist() == [[3], [6]] diff --git a/tests/unit/tf/transforms/test_negative_sampling.py b/tests/unit/tf/transforms/test_negative_sampling.py index 2c0d362a44..2cc9eae6f0 100644 --- a/tests/unit/tf/transforms/test_negative_sampling.py +++ b/tests/unit/tf/transforms/test_negative_sampling.py @@ -67,7 +67,7 @@ def test_dataloader(self): ) input_df = input_df[sorted(input_df.columns)] dataset = Dataset(input_df, schema=schema) - loader = mm.Loader(dataset, batch_size=10, transform=sampler) + loader = mm.Loader(dataset, batch_size=10).map(sampler) outputs, targets = next(iter(loader)) output_dict = { @@ -203,7 +203,7 @@ def test_model_with_dataloader(self, music_streaming_data: Dataset, tf_random_se add_negatives = InBatchNegatives(schema, 5, seed=tf_random_seed) batch_size, n_per_positive = 10, 5 - loader = mm.Loader(dataset, batch_size=batch_size, transform=add_negatives) + loader = mm.Loader(dataset, batch_size=batch_size).map(add_negatives) features, targets = next(iter(loader)) diff --git a/tests/unit/tf/transforms/test_sequence.py b/tests/unit/tf/transforms/test_sequence.py index 8b453245f8..3e0867ba17 100644 --- a/tests/unit/tf/transforms/test_sequence.py +++ b/tests/unit/tf/transforms/test_sequence.py @@ -32,8 +32,8 @@ def test_seq_predict_next(sequence_testing_data: Dataset, use_loader: bool): batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False) if use_loader: - dataset_transformed = Loader( - sequence_testing_data, batch_size=8, shuffle=False, transform=predict_next + dataset_transformed = Loader(sequence_testing_data, batch_size=8, shuffle=False).map( + predict_next ) output = next(iter(dataset_transformed)) else: @@ -66,8 +66,8 @@ def test_seq_predict_last(sequence_testing_data: Dataset, use_loader: bool): batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False) if use_loader: - dataset_transformed = Loader( - sequence_testing_data, batch_size=8, shuffle=False, transform=predict_last + dataset_transformed = Loader(sequence_testing_data, batch_size=8, shuffle=False).map( + predict_last ) output = next(iter(dataset_transformed)) else: @@ -101,8 +101,8 @@ def test_seq_predict_random(sequence_testing_data: Dataset, use_loader: bool): batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False) if use_loader: - dataset_transformed = Loader( - sequence_testing_data, batch_size=8, shuffle=False, transform=predict_random + dataset_transformed = Loader(sequence_testing_data, batch_size=8, shuffle=False).map( + predict_random ) output = next(iter(dataset_transformed)) else: diff --git a/tests/unit/torch/test_dataset.py b/tests/unit/torch/test_dataset.py deleted file mode 100644 index 4086b53811..0000000000 --- a/tests/unit/torch/test_dataset.py +++ /dev/null @@ -1,124 +0,0 @@ -# -# Copyright (c) 2021, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import numpy as np -import pandas as pd -import pytest -import torch - -from merlin.core.dispatch import HAS_GPU, make_df -from merlin.io.dataset import Dataset - -import merlin.models.torch.dataset as torch_dataloader # noqa isort:skip - - -def test_shuffling(): - num_rows = 10000 - batch_size = 10000 - - df = pd.DataFrame({"a": np.asarray(range(num_rows)), "b": np.asarray([0] * num_rows)}) - - train_dataset = torch_dataloader.Dataset( - Dataset(df), conts=["a"], labels=["b"], batch_size=batch_size, shuffle=True - ) - - batch = next(iter(train_dataset)) - first_batch = batch[0]["a"].cpu() - in_order = torch.arange(0, batch_size) - - assert (first_batch != in_order).any() - assert (torch.sort(first_batch).values == in_order).all() - - -@pytest.mark.parametrize("batch_size", [10, 9, 8]) -@pytest.mark.parametrize("drop_last", [True, False]) -@pytest.mark.parametrize("num_rows", [100]) -def test_torch_drp_reset(tmpdir, batch_size, drop_last, num_rows): - df = make_df( - { - "cat1": [1] * num_rows, - "cat2": [2] * num_rows, - "cat3": [3] * num_rows, - "label": [0] * num_rows, - "cont3": [3.0] * num_rows, - "cont2": [2.0] * num_rows, - "cont1": [1.0] * num_rows, - } - ) - cat_names = ["cat3", "cat2", "cat1"] - cont_names = ["cont3", "cont2", "cont1"] - label_name = ["label"] - - data_itr = torch_dataloader.Dataset( - Dataset(df), - cats=cat_names, - conts=cont_names, - labels=label_name, - batch_size=batch_size, - drop_last=drop_last, - device="cpu", - ) - - all_len = len(data_itr) if drop_last else len(data_itr) - 1 - all_rows = 0 - df_cols = df.columns.to_list() - for idx, chunk in enumerate(data_itr): - all_rows += len(chunk[0]["cat1"]) - if idx < all_len: - for col in df_cols: - if col in chunk[0].keys(): - if HAS_GPU: - assert (list(chunk[0][col].cpu().numpy()) == df[col].values_host).all() - else: - assert (list(chunk[0][col].cpu().numpy()) == df[col].values).all() - - if drop_last and num_rows % batch_size > 0: - assert num_rows > all_rows - else: - assert num_rows == all_rows - - -@pytest.mark.parametrize("sparse_dense", [False, True]) -def test_sparse_tensors(sparse_dense): - # create small Dataset, add values to sparse_list - df = make_df( - { - "spar1": [[1, 2, 3, 4], [4, 2, 4, 4], [1, 3, 4, 3], [1, 1, 3, 3]], - "spar2": [[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14], [15, 16]], - } - ) - spa_lst = ["spar1", "spar2"] - spa_mx = {"spar1": 5, "spar2": 6} - batch_size = 2 - data_itr = torch_dataloader.Dataset( - Dataset(df), - cats=spa_lst, - conts=[], - labels=[], - batch_size=batch_size, - sparse_names=spa_lst, - sparse_max=spa_mx, - sparse_as_dense=sparse_dense, - ) - for batch in data_itr: - feats, labs = batch - for col in spa_lst: - feature_tensor = feats[col] - if not sparse_dense: - assert list(feature_tensor.shape) == [batch_size, spa_mx[col]] - assert feature_tensor.is_sparse - else: - assert feature_tensor.shape[1] == spa_mx[col] - assert not feature_tensor.is_sparse diff --git a/tox.ini b/tox.ini index 47f45274be..ed8603a424 100644 --- a/tox.ini +++ b/tox.ini @@ -27,6 +27,7 @@ setenv = TF_GPU_ALLOCATOR=cuda_malloc_async commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git python -m pytest --cov-report term --cov merlin -rxs {posargs:tests/unit} @@ -49,5 +50,6 @@ allowlist_externals = horovodrun commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/core.git + python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh python -m pytest -m horovod -rxs tests/unit From d2286f6c5ac36fa2cc865f5e6dbcfa12b9db8574 Mon Sep 17 00:00:00 2001 From: mikemckiernan Date: Fri, 9 Dec 2022 15:52:04 -0500 Subject: [PATCH 6/8] Restore documentation build (#916) - Change Python 3.9.7 to 3.8. - Update the versions of the GH actions. - Update pre-commit config file to get flake8 from GitHub instead of GitLab. --- .github/workflows/datasets.yml | 6 +-- .github/workflows/docs-build.yaml | 47 +++++++++++++++++++++++ .github/workflows/docs-preview-pr.yaml | 2 +- .github/workflows/docs-sched-rebuild.yaml | 17 +++----- .github/workflows/implicit.yml | 6 +-- .github/workflows/lightfm.yml | 6 +-- .github/workflows/pre-commit.yml | 7 ++-- .github/workflows/pytorch.yml | 6 +-- .github/workflows/tensorflow.yml | 30 +++++---------- .github/workflows/xgboost.yml | 6 +-- .pre-commit-config.yaml | 2 +- tox.ini | 20 ++++++++++ 12 files changed, 104 insertions(+), 51 deletions(-) create mode 100644 .github/workflows/docs-build.yaml diff --git a/.github/workflows/datasets.yml b/.github/workflows/datasets.yml index 96716a4855..ae2c240fa7 100644 --- a/.github/workflows/datasets.yml +++ b/.github/workflows/datasets.yml @@ -11,13 +11,13 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [3.9.7] + python-version: [3.8] os: [ubuntu-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' diff --git a/.github/workflows/docs-build.yaml b/.github/workflows/docs-build.yaml new file mode 100644 index 0000000000..911e74abe0 --- /dev/null +++ b/.github/workflows/docs-build.yaml @@ -0,0 +1,47 @@ +name: docs-build + +on: + pull_request: + branches: [ main ] + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.8 + uses: actions/setup-python@v4 + with: + python-version: '3.8' + - name: Install dependencies + run: | + python -m pip install --upgrade pip tox + - name: Build docs + run: | + tox -e docs + - name: Delete unnecessary files + run: | + find docs/build -name .doctrees -prune -exec rm -rf {} \; + find docs/build -name .buildinfo -exec rm -rf {} \; + - name: Upload HTML + uses: actions/upload-artifact@v3 + with: + name: html-build-artifact + path: docs/build/html + if-no-files-found: error + retention-days: 1 + - name: Store PR information + run: | + mkdir ./pr + echo ${{ github.event.number }} > ./pr/pr.txt + echo ${{ github.event.pull_request.merged }} > ./pr/merged.txt + echo ${{ github.event.action }} > ./pr/action.txt + - name: Upload PR information + uses: actions/upload-artifact@v3 + with: + name: pr + path: pr/ \ No newline at end of file diff --git a/.github/workflows/docs-preview-pr.yaml b/.github/workflows/docs-preview-pr.yaml index e30d4517b2..bf0d272e9d 100644 --- a/.github/workflows/docs-preview-pr.yaml +++ b/.github/workflows/docs-preview-pr.yaml @@ -2,7 +2,7 @@ name: docs-preview-pr on: workflow_run: - workflows: [tensorflow] + workflows: [docs-build] types: [completed] env: diff --git a/.github/workflows/docs-sched-rebuild.yaml b/.github/workflows/docs-sched-rebuild.yaml index c9589a8c82..d6973a1c82 100644 --- a/.github/workflows/docs-sched-rebuild.yaml +++ b/.github/workflows/docs-sched-rebuild.yaml @@ -14,27 +14,22 @@ jobs: - uses: actions/checkout@v3 with: fetch-depth: 0 - - name: Set up Python 3.9.7 + - name: Set up Python 3.8 uses: actions/setup-python@v4 with: - python-version: 3.9.7 + python-version: 3.8 - name: Install Ubuntu packages run: | sudo apt-get update -y sudo apt-get install -y protobuf-compiler - name: Install dependencies run: | - python -m pip install --upgrade pip - python -m pip install .[all] - - name: Build - run: | - python setup.py develop - - name: Report the versions to build - run: | - sphinx-multiversion --dump-metadata docs/source docs/build/html | jq "keys" + python -m pip install --upgrade pip tox - name: Building docs (multiversion) run: | - sphinx-multiversion docs/source docs/build/html + tox -vv -e docs-multi + - name: Delete unnecessary files + run: | find docs/build/ -name .doctrees -prune -exec rm -rf {} \; find docs/build/ -name .buildinfo -exec rm {} \; - name: Upload HTML diff --git a/.github/workflows/implicit.yml b/.github/workflows/implicit.yml index fc492b8e80..42f751fcd9 100644 --- a/.github/workflows/implicit.yml +++ b/.github/workflows/implicit.yml @@ -11,13 +11,13 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [3.9.7] + python-version: [3.8] os: [ubuntu-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' diff --git a/.github/workflows/lightfm.yml b/.github/workflows/lightfm.yml index 64986ea872..715e64aaaa 100644 --- a/.github/workflows/lightfm.yml +++ b/.github/workflows/lightfm.yml @@ -11,13 +11,13 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [3.9.7] + python-version: [3.8] os: [ubuntu-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 8fe055f78f..935900141c 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -9,12 +9,13 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 with: + python-version: '3.8' cache: 'pip' cache-dependency-path: '**/**.txt' - name: Install black Jupyter run : | pip install black[jupyter]==22.3.0 - - uses: pre-commit/action@v2.0.3 + - uses: pre-commit/action@v3.0.0 diff --git a/.github/workflows/pytorch.yml b/.github/workflows/pytorch.yml index f3f51866a5..0b341f2e9b 100644 --- a/.github/workflows/pytorch.yml +++ b/.github/workflows/pytorch.yml @@ -11,13 +11,13 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [3.9.7] + python-version: [3.8] os: [ubuntu-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' diff --git a/.github/workflows/tensorflow.yml b/.github/workflows/tensorflow.yml index 95beaad6dc..40fa7f5b29 100644 --- a/.github/workflows/tensorflow.yml +++ b/.github/workflows/tensorflow.yml @@ -14,14 +14,14 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [3.9.7] + python-version: [3.8] os: [ubuntu-latest] tensorflow-version: ["~=2.8.0", "~=2.9.0", "~=2.10.0"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -31,7 +31,7 @@ jobs: - name: Install Ubuntu packages run: | sudo apt-get update -y - sudo apt-get install -y protobuf-compiler pandoc + sudo apt-get install -y protobuf-compiler - name: Install Merlin dependencies run: | ref_type=${{ github.ref_type }} @@ -63,30 +63,19 @@ jobs: with: name: dist path: dist - - name: Store PR information - run: | - mkdir ./pr - echo ${{ github.event.number }} > ./pr/pr.txt - echo ${{ github.event.pull_request.merged }} > ./pr/merged.txt - echo ${{ github.event.action }} > ./pr/action.txt - - name: Upload PR information - uses: actions/upload-artifact@v2 - with: - name: pr - path: pr/ examples: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [ 3.9.7 ] + python-version: [ 3.8 ] os: [ ubuntu-latest ] tensorflow-version: ["~=2.8.0", "~=2.9.0"] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' @@ -121,13 +110,14 @@ jobs: - name: Run unittests run: | make tests-tf-examples + release: name: Release runs-on: ubuntu-latest if: "startsWith(github.ref, 'refs/tags/')" needs: [tests] steps: - - uses: actions/download-artifact@v2 + - uses: actions/download-artifact@v3 with: name: dist - name: Create GitHub Release @@ -135,7 +125,7 @@ jobs: env: GHR_PATH: . GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - uses: actions/setup-python@v2 + - uses: actions/setup-python@v4 with: python-version: 3.9 - name: Push to PyPi diff --git a/.github/workflows/xgboost.yml b/.github/workflows/xgboost.yml index 30711eb944..8a0b2b2003 100644 --- a/.github/workflows/xgboost.yml +++ b/.github/workflows/xgboost.yml @@ -11,13 +11,13 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - python-version: [3.9.7] + python-version: [3.8] os: [ubuntu-latest] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6b1083650d..6cf971d019 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: rev: 22.3.0 hooks: - id: black - - repo: https://gitlab.com/pycqa/flake8 + - repo: https://github.com/pycqa/flake8 rev: 3.8.4 hooks: - id: flake8 diff --git a/tox.ini b/tox.ini index ed8603a424..5948645987 100644 --- a/tox.ini +++ b/tox.ini @@ -53,3 +53,23 @@ commands = python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/dataloader.git python -m pip install --upgrade git+https://github.com/NVIDIA-Merlin/nvtabular.git horovodrun -np 2 sh examples/usecases/multi-gpu/hvd_wrapper.sh python -m pytest -m horovod -rxs tests/unit + +[testenv:docs] +; Runs in: Github Actions +; Generates documentation with sphinx. There are other steps in the Github Actions workflow +; to publish the documentation on release. +changedir = {toxinidir} +deps = -rrequirements/docs.txt + {[testenv:py38-gpu]deps} +commands = + python -m sphinx.cmd.build -E -P -b html docs/source docs/build/html + +[testenv:docs-multi] +; Run the multi-version build that is shown on GitHub Pages. +changedir = {toxinidir} +deps = -rrequirements/docs.txt + {[testenv:py38-gpu]deps} +commands = + sphinx-multiversion --dump-metadata docs/source docs/build/html | jq "keys" + sphinx-multiversion docs/source docs/build/html + From 9b38f04a5e76d3be759cb2f662a3c58c8e212452 Mon Sep 17 00:00:00 2001 From: Oliver Holworthy Date: Tue, 13 Dec 2022 14:38:09 +0000 Subject: [PATCH 7/8] Support `tuple` return type from model `pre` and update test to use this (#890) * Support `tuple` return typee from `pre` arg to `evaluate`, `predict` * Update CLM transformer test to use `pre` instead of Loader `transform` * Update youtube dnn tests to use transform as model fit pre * Add `pre` to ModelBlock fit/evaluate * Revert "Add `pre` to ModelBlock fit/evaluate" This reverts commit 1eef7b89192b9b78d7307f36be8d6c7b5677df52. * Raise exception if ragged/sparse tensors are passed at training time. * Update model_test helper to avoid passing ragged tensors to `fit` * Handle x and y in model_test * Change process_lists param to False by default * Convert to tuple in test loader * Move order of ragged tensor assertion to before train_pre call * expand dims in test_classification * pass transform as pre in test in batch negatives * Update continuous and retrieval tests * Remove test of sequence predict functions with loader * Update error message about ragged tensors for clarity * Add explanation about why the input types are restricted * Rename dataset to dataloader in model_test Co-authored-by: rnyak --- merlin/models/tf/loader.py | 2 +- merlin/models/tf/models/base.py | 26 ++++++++++++ merlin/models/tf/utils/testing_utils.py | 9 ++-- tests/unit/tf/inputs/test_continuous.py | 2 +- tests/unit/tf/models/test_retrieval.py | 41 +++++++++++-------- tests/unit/tf/outputs/test_classification.py | 6 +++ tests/unit/tf/transformers/test_block.py | 16 ++++---- .../tf/transforms/test_negative_sampling.py | 7 ++-- tests/unit/tf/transforms/test_sequence.py | 34 +++------------ 9 files changed, 82 insertions(+), 61 deletions(-) diff --git a/merlin/models/tf/loader.py b/merlin/models/tf/loader.py index 190cc3bcd2..87c1daed87 100644 --- a/merlin/models/tf/loader.py +++ b/merlin/models/tf/loader.py @@ -363,7 +363,7 @@ def sample_batch( include_targets: bool = True, to_ragged: bool = False, to_dense: bool = False, - process_lists=True, + process_lists=False, ): """Util function to generate a batch of input tensors from a merlin.io.Dataset instance diff --git a/merlin/models/tf/models/base.py b/merlin/models/tf/models/base.py index 0c16e93ee7..fab9648829 100644 --- a/merlin/models/tf/models/base.py +++ b/merlin/models/tf/models/base.py @@ -736,6 +736,22 @@ def train_step(self, data): with tf.GradientTape() as tape: x, y, sample_weight = unpack_x_y_sample_weight(data) + # Ensure that we don't have any ragged or sparse tensors passed at training time. + if isinstance(x, dict): + for k in x: + if isinstance(x[k], (tf.RaggedTensor, tf.SparseTensor)): + raise ValueError( + "Training with RaggedTensor or SparseTensor input features is " + "not supported. Please update your dataloader to pass a tuple " + "of dense tensors instead, (corresponding to the values and " + "row lengths of the ragged input feature). This will ensure that " + "the model can be saved with the correct input signature, " + "and served correctly. " + "This is because when ragged or sparse tensors are fed as inputs " + "the input feature names are currently lost in the saved model " + "input signature." + ) + if getattr(self, "train_pre", None): out = call_layer(self.train_pre, x, targets=y, features=x, training=True) if isinstance(out, Prediction): @@ -775,6 +791,11 @@ def test_step(self, data): out = call_layer(self.test_pre, x, targets=y, features=x, testing=True) if isinstance(out, Prediction): x, y = out.outputs, out.targets + elif isinstance(out, tuple): + assert ( + len(out) == 2 + ), "output of `pre` must be a 2-tuple of x, y or `Prediction` tuple" + x, y = out else: x = out @@ -804,6 +825,11 @@ def predict_step(self, data): out = call_layer(self.predict_pre, x, features=x, training=False) if isinstance(out, Prediction): x = out.outputs + elif isinstance(out, tuple): + assert ( + len(out) == 2 + ), "output of `pre` must be a 2-tuple of x, y or `Prediction` tuple" + x, y = out else: x = out diff --git a/merlin/models/tf/utils/testing_utils.py b/merlin/models/tf/utils/testing_utils.py index 00930b5b08..cabc7f8acc 100644 --- a/merlin/models/tf/utils/testing_utils.py +++ b/merlin/models/tf/utils/testing_utils.py @@ -105,13 +105,16 @@ def model_test( assert isinstance(loaded_model, type(model)) + x, y = sample_batch(dataloader, batch_size=50, to_ragged=False, process_lists=False) + batch = [(x, y)] + np.testing.assert_array_almost_equal( - model.predict(batch[0]), - loaded_model.predict(batch[0]), + model.predict(iter(batch)), + loaded_model.predict(iter(batch)), ) loaded_model.compile(run_eagerly=run_eagerly, optimizer=optimizer, **kwargs) - loaded_model.train_step(batch) + loaded_model.fit(iter(batch)) return loaded_model, losses diff --git a/tests/unit/tf/inputs/test_continuous.py b/tests/unit/tf/inputs/test_continuous.py index 63204d2d75..4e0a4f39f5 100644 --- a/tests/unit/tf/inputs/test_continuous.py +++ b/tests/unit/tf/inputs/test_continuous.py @@ -80,7 +80,7 @@ def test_continuous_features_ragged(sequence_testing_data: Dataset): inputs = ml.ContinuousFeatures.from_schema( schema, post=ml.BroadcastToSequence(context_schema, seq_schema), aggregation="concat" ) - features, _ = ml.sample_batch(sequence_testing_data, batch_size=100) + features, _ = ml.sample_batch(sequence_testing_data, batch_size=100, process_lists=True) outputs = inputs(features) assert outputs.to_tensor().shape == (100, 4, 6) diff --git a/tests/unit/tf/models/test_retrieval.py b/tests/unit/tf/models/test_retrieval.py index 478ffb5488..564e5de2b7 100644 --- a/tests/unit/tf/models/test_retrieval.py +++ b/tests/unit/tf/models/test_retrieval.py @@ -819,19 +819,20 @@ def test_youtube_dnn_retrieval(sequence_testing_data: Dataset): as_ragged = mm.ListToRagged() - def last_interaction_as_target(inputs, targets): - inputs = as_ragged(inputs) - items = inputs["item_id_seq"] - _items = items[:, :-1] - targets = items[:, -1:].flat_values + class LastInteractionAsTarget(tf.keras.layers.Layer): + def call(self, inputs, **kwargs): + inputs = as_ragged(inputs) + items = inputs["item_id_seq"] + _items = items[:, :-1] + targets = items[:, -1:].flat_values - inputs["item_id_seq"] = _items + inputs["item_id_seq"] = _items - return inputs, targets + return inputs, targets - dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(last_interaction_as_target) + dataloader = mm.Loader(sequence_testing_data, batch_size=50) - losses = model.fit(dataloader, epochs=1) + losses = model.fit(dataloader, epochs=1, pre=LastInteractionAsTarget()) assert losses is not None @@ -856,10 +857,14 @@ def test_youtube_dnn_retrieval_v2(sequence_testing_data: Dataset, run_eagerly, t schema=sequence_testing_data.schema, top_block=mm.MLPBlock([32]), num_sampled=1000 ) - dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(target_augmentation) + dataloader = mm.Loader(sequence_testing_data, batch_size=50) _, losses = testing_utils.model_test( - model, dataloader, reload_model=True, run_eagerly=run_eagerly + model, + dataloader, + reload_model=True, + run_eagerly=run_eagerly, + fit_kwargs=dict(pre=target_augmentation), ) assert losses is not None @@ -936,8 +941,10 @@ def test_youtube_dnn_v2_export_embeddings(sequence_testing_data: Dataset): schema=sequence_testing_data.schema, top_block=mm.MLPBlock([32]), num_sampled=1000 ) - dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(predict_next) - model, _ = testing_utils.model_test(model, dataloader, reload_model=False) + dataloader = mm.Loader(sequence_testing_data, batch_size=50) + model, _ = testing_utils.model_test( + model, dataloader, reload_model=False, fit_kwargs=dict(pre=predict_next) + ) candidates = model.candidate_embeddings().compute() assert list(candidates.columns) == [str(i) for i in range(32)] @@ -969,13 +976,15 @@ def test_youtube_dnn_topk_evaluation(sequence_testing_data: Dataset, run_eagerly schema=sequence_testing_data.schema, top_block=mm.MLPBlock([32]), num_sampled=1000 ) - dataloader = mm.Loader(sequence_testing_data, batch_size=50).map(predict_next) + dataloader = mm.Loader(sequence_testing_data, batch_size=50) - model, _ = testing_utils.model_test(model, dataloader, reload_model=False) + model, _ = testing_utils.model_test( + model, dataloader, reload_model=False, fit_kwargs=dict(pre=predict_next) + ) # Top-K evaluation topk_model = model.to_top_k_encoder(k=20) topk_model.compile(run_eagerly=run_eagerly) - metrics = topk_model.evaluate(dataloader, return_dict=True) + metrics = topk_model.evaluate(dataloader, return_dict=True, pre=predict_next) assert all([metric >= 0 for metric in metrics.values()]) diff --git a/tests/unit/tf/outputs/test_classification.py b/tests/unit/tf/outputs/test_classification.py index 60a8a6fe18..fbb87be1ce 100644 --- a/tests/unit/tf/outputs/test_classification.py +++ b/tests/unit/tf/outputs/test_classification.py @@ -99,6 +99,12 @@ def _last_interaction_as_target(inputs, targets): _items = items[:, :-1] targets = tf.one_hot(items[:, -1:].flat_values, 51997) inputs["item_id_seq"] = _items + for k in inputs: + if isinstance(inputs[k], tf.RaggedTensor): + inputs[k] = ( + tf.expand_dims(inputs[k].values, 1), + tf.expand_dims(inputs[k].row_lengths(), 1), + ) return inputs, targets schema = sequence_testing_data.schema.select_by_tag(Tags.CATEGORICAL) diff --git a/tests/unit/tf/transformers/test_block.py b/tests/unit/tf/transformers/test_block.py index 45a1660f12..67ee4dfff9 100644 --- a/tests/unit/tf/transformers/test_block.py +++ b/tests/unit/tf/transformers/test_block.py @@ -239,11 +239,7 @@ def test_transformer_with_causal_language_modeling(sequence_testing_data: Datase target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0] predict_next = mm.SequencePredictNext(schema=seq_schema, target=target) - loader = Loader( - sequence_testing_data, - batch_size=8, - shuffle=False, - ).map(predict_next) + loader = Loader(sequence_testing_data, batch_size=8, shuffle=False) model = mm.Model( mm.InputBlockV2( @@ -260,14 +256,16 @@ def test_transformer_with_causal_language_modeling(sequence_testing_data: Datase batch = next(iter(loader))[0] outputs = model(batch) - assert list(outputs.shape) == [8, 3, 51997] - testing_utils.model_test(model, loader, run_eagerly=run_eagerly, reload_model=True) + assert list(outputs.shape) == [8, 4, 51997] + testing_utils.model_test( + model, loader, run_eagerly=run_eagerly, reload_model=True, fit_kwargs={"pre": predict_next} + ) - metrics = model.evaluate(loader, batch_size=8, steps=1, return_dict=True) + metrics = model.evaluate(loader, batch_size=8, steps=1, return_dict=True, pre=predict_next) assert len(metrics) > 0 predictions = model.predict(loader, batch_size=8, steps=1) - assert predictions.shape == (8, 3, 51997) + assert predictions.shape == (8, 4, 51997) @pytest.mark.parametrize("run_eagerly", [True, False]) diff --git a/tests/unit/tf/transforms/test_negative_sampling.py b/tests/unit/tf/transforms/test_negative_sampling.py index 2cc9eae6f0..7cfc0e3a74 100644 --- a/tests/unit/tf/transforms/test_negative_sampling.py +++ b/tests/unit/tf/transforms/test_negative_sampling.py @@ -203,9 +203,10 @@ def test_model_with_dataloader(self, music_streaming_data: Dataset, tf_random_se add_negatives = InBatchNegatives(schema, 5, seed=tf_random_seed) batch_size, n_per_positive = 10, 5 - loader = mm.Loader(dataset, batch_size=batch_size).map(add_negatives) + loader = mm.Loader(dataset, batch_size=batch_size) - features, targets = next(iter(loader)) + features, targets = next(loader) + features, targets = add_negatives(features, targets) expected_batch_size = batch_size + batch_size * n_per_positive @@ -226,4 +227,4 @@ def test_model_with_dataloader(self, music_streaming_data: Dataset, tf_random_se assert model(features).shape[0] > batch_size assert model(features).shape[0] <= expected_batch_size - testing_utils.model_test(model, loader) + testing_utils.model_test(model, loader, fit_kwargs=dict(pre=add_negatives)) diff --git a/tests/unit/tf/transforms/test_sequence.py b/tests/unit/tf/transforms/test_sequence.py index 3e0867ba17..05f8011751 100644 --- a/tests/unit/tf/transforms/test_sequence.py +++ b/tests/unit/tf/transforms/test_sequence.py @@ -19,25 +19,17 @@ import merlin.models.tf as mm from merlin.io import Dataset -from merlin.models.tf.loader import Loader from merlin.models.tf.utils.testing_utils import assert_output_shape from merlin.schema import Tags -@pytest.mark.parametrize("use_loader", [False, True]) -def test_seq_predict_next(sequence_testing_data: Dataset, use_loader: bool): +def test_seq_predict_next(sequence_testing_data: Dataset): seq_schema = sequence_testing_data.schema.select_by_tag(Tags.SEQUENCE) target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0] predict_next = mm.SequencePredictNext(schema=seq_schema, target=target, pre=mm.ListToRagged()) batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False) - if use_loader: - dataset_transformed = Loader(sequence_testing_data, batch_size=8, shuffle=False).map( - predict_next - ) - output = next(iter(dataset_transformed)) - else: - output = predict_next(batch) + output = predict_next(batch) output_x, output_y = output output_y = output_y[target] @@ -58,20 +50,13 @@ def test_seq_predict_next(sequence_testing_data: Dataset, use_loader: bool): ) -@pytest.mark.parametrize("use_loader", [False, True]) -def test_seq_predict_last(sequence_testing_data: Dataset, use_loader: bool): +def test_seq_predict_last(sequence_testing_data: Dataset): seq_schema = sequence_testing_data.schema.select_by_tag(Tags.SEQUENCE) target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0] predict_last = mm.SequencePredictLast(schema=seq_schema, target=target) batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False) - if use_loader: - dataset_transformed = Loader(sequence_testing_data, batch_size=8, shuffle=False).map( - predict_last - ) - output = next(iter(dataset_transformed)) - else: - output = predict_last(batch) + output = predict_last(batch) output_x, output_y = output output_y = output_y[target] @@ -93,20 +78,13 @@ def test_seq_predict_last(sequence_testing_data: Dataset, use_loader: bool): ) -@pytest.mark.parametrize("use_loader", [False, True]) -def test_seq_predict_random(sequence_testing_data: Dataset, use_loader: bool): +def test_seq_predict_random(sequence_testing_data: Dataset): seq_schema = sequence_testing_data.schema.select_by_tag(Tags.SEQUENCE) target = sequence_testing_data.schema.select_by_tag(Tags.ITEM_ID).column_names[0] predict_random = mm.SequencePredictRandom(schema=seq_schema, target=target) batch, _ = mm.sample_batch(sequence_testing_data, batch_size=8, process_lists=False) - if use_loader: - dataset_transformed = Loader(sequence_testing_data, batch_size=8, shuffle=False).map( - predict_random - ) - output = next(iter(dataset_transformed)) - else: - output = predict_random(batch) + output = predict_random(batch) output_x, output_y = output output_y = output_y[target] From 2983433f20302ceadc23becdd562a20c30b12d6d Mon Sep 17 00:00:00 2001 From: sararb Date: Tue, 13 Dec 2022 17:27:17 +0000 Subject: [PATCH 8/8] add assertion check to TransformerInferenceHiddenState --- merlin/models/tf/transformers/transforms.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/merlin/models/tf/transformers/transforms.py b/merlin/models/tf/transformers/transforms.py index 51260e1477..ace3fce40b 100644 --- a/merlin/models/tf/transformers/transforms.py +++ b/merlin/models/tf/transformers/transforms.py @@ -81,11 +81,19 @@ def call( If inference, returns a 2-D tensor with the hidden states of the target position """ + batch_size = tf.shape(inputs)[0] if not training and not testing: if getattr(inputs, "_keras_mask", None) is not None: inputs = tf.reshape( tf.boolean_mask(inputs, inputs._keras_mask), (-1, inputs.shape[-1]) ) + tf.debugging.assert_equal( + tf.shape(inputs)[0], + batch_size, + f"The resulting tensor has {tf.shape(inputs)[0]} rows, which does not match" + f" the inputs batch-size {batch_size}. During inference only one position " + "candidate (the last one) should be masked per example", + ) return inputs