diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py index aec5f9a2dc..c9e690e319 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py @@ -97,7 +97,7 @@ def on_data(control_message: ControlMessage): run_validation = True logger.debug("Training AE model for user: '%s'...", user_id) - model.fit(train_df, epochs=epochs, val_data=validation_df, run_validation=run_validation) + model.fit(train_df, epochs=epochs, validation_data=validation_df, run_validation=run_validation) logger.debug("Training AE model for user: '%s'... Complete.", user_id) dfp_mm = DFPMessageMeta(cudf.from_pandas(final_df), user_id=user_id) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_training.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_training.py index 78486fc29d..e9617b3cd6 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_training.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_training.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. """Training stage for the DFP pipeline.""" - +import base64 import logging +import pickle import typing import mrc @@ -21,11 +22,13 @@ from sklearn.model_selection import train_test_split from morpheus.config import Config +from morpheus.messages import ControlMessage from morpheus.messages.multi_ae_message import MultiAEMessage from morpheus.models.dfencoder import AutoEncoder from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair +from ..messages.multi_dfp_message import DFPMessageMeta from ..messages.multi_dfp_message import MultiDFPMessage logger = logging.getLogger(f"morpheus.{__name__}") @@ -87,10 +90,43 @@ def supports_cpp_node(self): def accepted_types(self) -> typing.Tuple: """Indicate which input message types this stage accepts.""" - return (MultiDFPMessage, ) + return ( + ControlMessage, + MultiDFPMessage, + ) + + def _dfp_multimessage_from_control_message(self, + control_message: ControlMessage) -> typing.Union[MultiDFPMessage, None]: + """Create a MultiDFPMessage from a ControlMessage.""" + ctrl_msg_user_id = control_message.get_metadata("user_id") + message_meta = control_message.payload() + + if (ctrl_msg_user_id is None or message_meta is None): + return None + + with message_meta.mutable_dataframe() as dfm: + msg_meta_df = dfm.to_pandas() + + msg_meta = DFPMessageMeta(msg_meta_df, user_id=str(ctrl_msg_user_id)) + message = MultiDFPMessage(meta=msg_meta, mess_offset=0, mess_count=len(msg_meta_df)) + + return message + + @typing.overload + def on_data(self, message: ControlMessage) -> ControlMessage: + ... + @typing.overload def on_data(self, message: MultiDFPMessage) -> MultiAEMessage: + ... + + def on_data(self, message): """Train the model and attach it to the output message.""" + received_control_message = False + if (isinstance(message, ControlMessage)): + message = self._dfp_multimessage_from_control_message(message) + received_control_message = True + if (message is None or message.mess_count == 0): return None @@ -111,13 +147,21 @@ def on_data(self, message: MultiDFPMessage) -> MultiAEMessage: run_validation = True logger.debug("Training AE model for user: '%s'...", user_id) - model.fit(train_df, epochs=self._epochs, val_data=validation_df, run_validation=run_validation) + model.fit(train_df, epochs=self._epochs, validation_data=validation_df, run_validation=run_validation) logger.debug("Training AE model for user: '%s'... Complete.", user_id) - output_message = MultiAEMessage(meta=message.meta, - mess_offset=message.mess_offset, - mess_count=message.mess_count, - model=model) + if (received_control_message): + output_message = ControlMessage(message.meta) + output_message.set_metadata("user_id", user_id) + + pickled_model_bytes = pickle.dumps(model) + pickled_model_base64_str = base64.b64encode(pickled_model_bytes).decode('utf-8') + output_message.set_metadata("model", pickled_model_base64_str) + else: + output_message = MultiAEMessage(meta=message.meta, + mess_offset=message.mess_offset, + mess_count=message.mess_count, + model=model) return output_message @@ -125,4 +169,8 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea stream = builder.make_node(self.unique_name, ops.map(self.on_data), ops.filter(lambda x: x is not None)) builder.make_edge(input_stream[0], stream) - return stream, MultiAEMessage + return_type = input_stream[1] + if (return_type == MultiDFPMessage): + return_type = MultiAEMessage + + return stream, return_type diff --git a/models/training-tuning-scripts/dfp-models/hammah-20211017-script.py b/models/training-tuning-scripts/dfp-models/hammah-20211017-script.py index 0ea08b1b7c..0d7e2cb0eb 100644 --- a/models/training-tuning-scripts/dfp-models/hammah-20211017-script.py +++ b/models/training-tuning-scripts/dfp-models/hammah-20211017-script.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=invalid-name """ Example Usage: python hammah-20211017-script.py \ @@ -24,13 +25,14 @@ import dill import pandas as pd import torch + from morpheus.models.dfencoder import AutoEncoder from morpheus.utils.seed import manual_seed def main(): - X_train = pd.read_csv(args.trainingdata) - X_val = pd.read_csv(args.valdata) + x_train = pd.read_csv(args.trainingdata) + x_val = pd.read_csv(args.valdata) features = [ 'eventSource', @@ -67,31 +69,31 @@ def main(): 'responseElementsreservationId', 'requestParametersgroupName' ] # NO userIdentitysessionContextsessionIssuerarn,userIdentityuserName - for i in list(X_train): + for i in list(x_train): if i not in features: - X_train = X_train.drop(i, axis=1) - for i in list(X_val): + x_train = x_train.drop(i, axis=1) + for i in list(x_val): if i not in features: - X_val = X_val.drop(i, axis=1) + x_val = x_val.drop(i, axis=1) - X_train = X_train.dropna(axis=1, how='all') - X_val = X_val.dropna(axis=1, how='all') + x_train = x_train.dropna(axis=1, how='all') + x_val = x_val.dropna(axis=1, how='all') - for i in list(X_val): - if i not in list(X_train): - X_val = X_val.drop([i], axis=1) + for i in list(x_val): + if i not in list(x_train): + x_val = x_val.drop([i], axis=1) - for i in list(X_train): - if i not in list(X_val): - X_train = X_train.drop([i], axis=1) + for i in list(x_train): + if i not in list(x_val): + x_train = x_train.drop([i], axis=1) manual_seed(42) model = AutoEncoder( encoder_layers=[512, 500], # layers of the encoding part decoder_layers=[512], # layers of the decoding part activation='relu', # activation function - swap_p=0.2, # noise parameter - lr=0.01, # learning rate - lr_decay=.99, # learning decay + swap_probability=0.2, # noise parameter + learning_rate=0.01, # learning rate + learning_rate_decay=.99, # learning decay batch_size=512, logger='ipynb', verbose=False, @@ -100,7 +102,7 @@ def main(): min_cats=1 # cut off for minority categories ) - model.fit(X_train, epochs=25, val=X_val) + model.fit(x_train, epochs=25, validation_data=x_val) torch.save(model.state_dict(), args.trainingdata[:-4] + ".pkl") with open(args.trainingdata[:-4] + 'dill' + '.pkl', 'wb') as f: diff --git a/morpheus/controllers/mlflow_model_writer_controller.py b/morpheus/controllers/mlflow_model_writer_controller.py index dca198ddcb..d373a01dd1 100644 --- a/morpheus/controllers/mlflow_model_writer_controller.py +++ b/morpheus/controllers/mlflow_model_writer_controller.py @@ -219,8 +219,8 @@ def on_data(self, message: MultiAEMessage): # Log all params in one dict to avoid round trips mlflow.log_params({ "Algorithm": "Denosing Autoencoder", - "Epochs": model.lr_decay.state_dict().get("last_epoch", "unknown"), - "Learning rate": model.lr, + "Epochs": model.learning_rate_decay.state_dict().get("last_epoch", "unknown"), + "Learning rate": model.learning_rate, "Batch size": model.batch_size, "Start Epoch": message.get_meta(self._timestamp_column_name).min(), "End Epoch": message.get_meta(self._timestamp_column_name).max(), diff --git a/morpheus/models/dfencoder/__init__.py b/morpheus/models/dfencoder/__init__.py index 4b0768a71a..c076c34b6d 100644 --- a/morpheus/models/dfencoder/__init__.py +++ b/morpheus/models/dfencoder/__init__.py @@ -57,9 +57,9 @@ from .ae_module import CompleteLayer from .autoencoder import AutoEncoder from .dataframe import EncoderDataFrame -from .dataloader import DatasetFromDataframe -from .dataloader import DatasetFromPath +from .dataloader import DataframeDataset from .dataloader import DFEncoderDataLoader +from .dataloader import FileSystemDataset from .distributed_ae import DistributedAutoEncoder from .logging import BasicLogger from .logging import IpynbLogger @@ -74,8 +74,8 @@ "CompleteLayer", "AutoEncoder", "EncoderDataFrame", - "DatasetFromDataframe", - "DatasetFromPath", + "DataframeDataset", + "FileSystemDataset", "DFEncoderDataLoader", "DistributedAutoEncoder", "BasicLogger", diff --git a/morpheus/models/dfencoder/autoencoder.py b/morpheus/models/dfencoder/autoencoder.py index f697b204ff..820362cf8d 100644 --- a/morpheus/models/dfencoder/autoencoder.py +++ b/morpheus/models/dfencoder/autoencoder.py @@ -50,6 +50,7 @@ import gc import logging +import typing from collections import OrderedDict from collections import defaultdict @@ -57,10 +58,13 @@ import pandas as pd import torch import tqdm +from torch.utils.data import DataLoader from .ae_module import AEModule from .dataframe import EncoderDataFrame -from .dataloader import DatasetFromDataframe +from .dataloader import DataframeDataset +from .dataloader import DFEncoderDataLoader +from .dataloader import FileSystemDataset from .distributed_ae import DistributedAutoEncoder from .logging import BasicLogger from .logging import IpynbLogger @@ -115,8 +119,8 @@ def __init__( decoder_activations=None, activation='relu', min_cats=10, - swap_p=.15, - lr=0.01, + swap_probability=.15, + learning_rate=0.01, batch_size=256, eval_batch_size=1024, optimizer='adam', @@ -125,7 +129,7 @@ def __init__( betas=(0.9, 0.999), dampening=0, weight_decay=0, - lr_decay=None, + learning_rate_decay=None, nesterov=False, verbose=False, device=None, @@ -135,7 +139,6 @@ def __init__( project_embeddings=True, run=None, progress_bar=True, - n_megabatches=1, scaler='standard', patience=5, preset_cats=None, @@ -172,14 +175,14 @@ def __init__( ) self.optimizer = optimizer self.optim = None - self.lr = lr - self.lr_decay = lr_decay + self.learning_rate = learning_rate + self.learning_rate_decay = learning_rate_decay self.min_cats = min_cats self.preset_cats = preset_cats self.preset_numerical_scaler_params = preset_numerical_scaler_params - self.swap_p = swap_p + self.swap_probability = swap_probability self.batch_size = batch_size self.eval_batch_size = eval_batch_size @@ -220,8 +223,6 @@ def __init__( self.loss_scaler_str = loss_scaler self.loss_scaler = self.get_scaler(loss_scaler) - self.n_megabatches = n_megabatches - def get_scaler(self, name): scalers = { 'standard': StandardScaler, @@ -487,19 +488,19 @@ def _build_model(self, df=None, rank=None): self.model = DistributedAutoEncoder(self.model, device_ids=[rank], output_device=rank) self._build_optimizer() - if self.lr_decay is not None: - self.lr_decay = torch.optim.lr_scheduler.ExponentialLR(self.optim, self.lr_decay) + if self.learning_rate_decay is not None: + self.learning_rate_decay = torch.optim.lr_scheduler.ExponentialLR(self.optim, self.learning_rate_decay) self._build_logger() LOG.debug('done!') def _build_optimizer(self): - lr = self.lr + lr = self.learning_rate params = self.model.parameters() if self.optimizer == 'adam': optim = torch.optim.Adam(params, - lr=self.lr, + lr=self.learning_rate, amsgrad=self.amsgrad, weight_decay=self.weight_decay, betas=self.betas) @@ -553,7 +554,7 @@ def build_input_tensor(self, df): x = torch.cat(num + bin + embeddings, dim=1) return x - def preprocess_train_data(self, df, shuffle_rows_in_batch=True): + def preprocess_training_data(self, df, shuffle_rows_in_batch=True): """ Wrapper function round `self.preprocess_data` feeding in the args suitable for a training set.""" return self.preprocess_data( df, @@ -598,11 +599,10 @@ def preprocess_data( Dict[str, Union[int, torch.Tensor]] A dict containing the preprocessed input data and targets by feature type. """ - df = self.prepare_df(df) if shuffle_rows_in_batch: df = df.sample(frac=1.0) - df = EncoderDataFrame(df) - swapped_df = df.swap(likelihood=self.swap_p) + df = self.prepare_df(df) + swapped_df = df.swap(likelihood=self.swap_probability) swapped_input_tensor = self.build_input_tensor(swapped_df) num_target, bin_target, codes = self.compute_targets(df) @@ -727,7 +727,8 @@ def compute_baseline_performance(self, in_, out_): dim = len(feature['cats']) + 1 pred = _ohe(cd, dim, device=self.device) * 5 codes_pred.append(pred) - mse_loss, bce_loss, cce_loss, net_loss = self.compute_loss(num_pred, bin_pred, codes_pred, out_, should_log=False) + mse_loss, bce_loss, cce_loss, net_loss = self.compute_loss(num_pred, bin_pred, codes_pred, out_, + should_log=False) if isinstance(self.logger, BasicLogger): self.logger.baseline_loss = net_loss return net_loss @@ -737,257 +738,147 @@ def _create_stat_dict(self, a): scaler.fit(a) return {'scaler': scaler} - def fit( - self, - train_data, - epochs=1, - val_data=None, - run_validation=False, - use_val_for_loss_stats=False, - rank=None, - world_size=None, - ): - """ Does training in the specified mode (indicated by self.distrivuted_training). + def _transform_dataset_for_training(self, dataset): + dataset.batch_size = self.batch_size + dataset.preprocess_fn = self.preprocess_training_data + dataset.shuffle_batch_indices = True + dataset.shuffle_rows_in_batch = True - Parameters - ---------- - train_data : pandas.DataFrame (centralized) or torch.utils.data.DataLoader (distributed) - Data for training. - epochs : int, optional - Number of epochs to run training, by default 1. - val_data : pandas.DataFrame (centralized) or torch.utils.data.DataLoader (distributed), optional - Data for validation and computing loss stats, by default None. - run_validation : bool, optional - Whether to collect validation loss for each epoch during training, by default False. - use_val_for_loss_stats : bool, optional - whether to use the validation set for loss statistics collection (for z score calculation), by default False. - rank : int, optional - The rank of the current process, by default None. Required for distributed training. - world_size : int, optional - The total number of processes, by default None. Required for distributed training. + return dataset - Raises - ------ - TypeError - If train_data is not a pandas dataframe in centralized training mode. - ValueError - If rank and world_size not provided in distributed training mode. - TypeError - If train_data is not a pandas dataframe or a torch.utils.data.DataLoader or a torch.utils.data.Dataset in distributed training mode. - """ - if not self.distributed_training: - if not isinstance(train_data, pd.DataFrame): - raise TypeError("`train_data` needs to be a pandas dataframe in centralized training mode." - f" `train_data` is currently of type: {type(train_data)}") - - self._fit_centralized( - df=train_data, - epochs=epochs, - val=val_data, - run_validation=run_validation, - use_val_for_loss_stats=use_val_for_loss_stats, - ) - else: - # distributed training requires rank and world_size - if rank is None or world_size is None: - raise ValueError('`rank` and `world_size` must be provided for distributed training.') - - if not isinstance(train_data, (pd.DataFrame, torch.utils.data.DataLoader, torch.utils.data.Dataset)): - raise TypeError( - "`train_data` needs to be a pandas DataFrame, a DataLoader, or a Dataset in distributed training mode." - f" `train_data` is currently of type: {type(train_data)}") - - self._fit_distributed( - train_data=train_data, - epochs=epochs, - val_data=val_data, - run_validation=run_validation, - use_val_for_loss_stats=use_val_for_loss_stats, - rank=rank, - world_size=world_size, - ) + def _transform_dataset_for_validation(self, dataset): + dataset.batch_size = self.eval_batch_size + dataset.preprocess_fn = self.preprocess_validation_data + dataset.shuffle_batch_indices = False + dataset.shuffle_rows_in_batch = False - def _fit_centralized(self, df, epochs=1, val=None, run_validation=False, use_val_for_loss_stats=False): - """Does training in a single process on a single GPU. + return dataset - Parameters - ---------- - df : pandas.DataFrame - Data used for training. - epochs : int, optional - Number of epochs to run training, by default 1. - val : pandas.DataFrame, optional - Optional pandas dataframe for validation or loss stats, by default None. - run_validation : bool, optional - Whether to collect validation loss for each epoch during training, by default False. - use_val_for_loss_stats : bool, optional - Whether to use the validation set for loss statistics collection (for z score calculation), by default False. + def _data_to_dataset( + self, + data: typing.Union[pd.DataFrame, DataframeDataset, FileSystemDataset, DFEncoderDataLoader], + train: bool = True, + ): + if (data is None): + return None - Raises - ------ - ValueError - If run_validation or use_val_for_loss_stats is True but val is not provided. - """ - if (run_validation or use_val_for_loss_stats) and val is None: - raise ValueError("Validation set is required if either run_validation or \ - use_val_for_loss_stats is set to True.") + is_loader = isinstance(data, torch.utils.data.DataLoader) - if use_val_for_loss_stats: - df_for_loss_stats = val.copy() + if (isinstance(data, pd.DataFrame)): + dset = DataframeDataset(df=data) + elif (isinstance(data, DFEncoderDataLoader)): + dset = data.dataset else: - # use train loss - df_for_loss_stats = df.copy() - - if run_validation and val is not None: - val = val.copy() + dset = data - if self.optim is None: - self._build_model(df) - - if self.n_megabatches == 1: - df = self.prepare_df(df) + # We ensure that we transform our dataset for training or validation, irrespective of how it + # is initially provided, to ensure consistency in the fit process. + if (train): + dset = self._transform_dataset_for_training(dset) + else: + dset = self._transform_dataset_for_validation(dset) - if run_validation and val is not None: - val_df = self.prepare_df(val) - val_in = val_df.swap(likelihood=self.swap_p) - msg = "Validating during training.\n" - msg += "Computing baseline performance..." - baseline = self.compute_baseline_performance(val_in, val_df) - LOG.debug(msg) + return data if is_loader else dset - n_updates = len(df) // self.batch_size - if len(df) % self.batch_size > 0: - n_updates += 1 - last_loss = 5000 + def _build_or_configure_datasets(self, training_data, validation_data, use_val_for_loss_stats): + # Construct validation and trianing datasets or modify the DataLoader's dataset and return it + training_data = self._data_to_dataset(training_data) + validation_data = self._data_to_dataset(validation_data, train=False) - count_es = 0 - for i in range(epochs): - self.train() + if (use_val_for_loss_stats): + loss_dset = validation_data + else: + loss_dset = training_data + + if isinstance(loss_dset, torch.utils.data.DataLoader): + loss_dset = training_data.dataset + + return training_data, validation_data, loss_dset + + def _train_for_epochs(self, + training_data, + validation_data, + epochs, + rank, + world_size, + run_validation, + is_main_process): + # Batched Training loop with early stopping + early_stopping_count = 0 + last_val_loss = float('inf') + should_early_stop = False + for epoch in range(epochs): + LOG.debug(f'[Rank{rank}] training epoch {epoch + 1}...') - LOG.debug(f'training epoch {i + 1}...') - df = df.sample(frac=1.0) - df = EncoderDataFrame(df) - if self.n_megabatches > 1: - self.train_megabatch_epoch(n_updates, df) - else: - input_df = df.swap(likelihood=self.swap_p) - self.train_epoch(n_updates, input_df, df) + # if we are using DistributedSampler, we have to tell it which epoch this is + if (self.distributed_training + and isinstance(training_data.sampler, torch.utils.data.distributed.DistributedSampler)): + training_data.sampler.set_epoch(epoch) - if self.lr_decay is not None: - self.lr_decay.step() + train_loss_sum = 0 + train_loss_count = 0 + for data_batch in training_data: + loss = self._fit_batch(**data_batch['data']) - if run_validation and val is not None: - self.eval() - with torch.no_grad(): - mean_id_loss, mean_swapped_loss = self._validate_dataframe(orig_df=val_df, swapped_df=val_in) + train_loss_count += 1 + train_loss_sum += loss - # Early stopping - current_net_loss = mean_id_loss - LOG.debug('The Current Net Loss: %s', current_net_loss) + if (self.learning_rate_decay is not None): + self.learning_rate_decay.step() - if current_net_loss > last_loss: - count_es += 1 - LOG.debug('Early stop count: %s', count_es) + if (is_main_process and run_validation): + # run validation + curr_val_loss = self._validate_dataset(validation_data, rank) + LOG.debug(f'Rank{rank} Loss: {round(last_val_loss, 4)}->{round(curr_val_loss, 4)}') - if count_es >= self.patience: - LOG.debug('Early stopping: early stop count(%s) >= patience(%s)', count_es, self.patience) - break + if (self.patience): # early stopping + if (curr_val_loss > last_val_loss): + early_stopping_count += 1 + LOG.debug(f'Rank{rank} Loss went up. Early stop count: {early_stopping_count}') + if (early_stopping_count >= self.patience): + LOG.debug( + f'Early stopping: early stop count({early_stopping_count}) >= patience({self.patience})' + ) + should_early_stop = True else: - LOG.debug('Set count for earlystop: 0') - count_es = 0 - - last_loss = current_net_loss - - self.logger.end_epoch() - - if self.verbose: - msg = '\n' - msg += 'net validation loss, swapped input: \n' - msg += f"{round(mean_swapped_loss, 4)} \n\n" - msg += 'baseline validation loss: ' - msg += f"{round(baseline, 4)} \n\n" - msg += 'net validation loss, unaltered input: \n' - msg += f"{round(mean_id_loss, 4)} \n\n\n" - LOG.debug(msg) - - #Getting training loss statistics - # mse_loss, bce_loss, cce_loss, _ = self.get_anomaly_score(pdf) if pdf_val is None else self.get_anomaly_score(pd.concat([pdf, pdf_val])) - mse_loss, bce_loss, cce_loss, _ = self.get_anomaly_score_with_losses(df_for_loss_stats) - for i, ft in enumerate(self.numeric_fts): - i_loss = mse_loss[:, i] - self.feature_loss_stats[ft] = self._create_stat_dict(i_loss) - for i, ft in enumerate(self.binary_fts): - i_loss = bce_loss[:, i] - self.feature_loss_stats[ft] = self._create_stat_dict(i_loss) - for i, ft in enumerate(self.categorical_fts): - i_loss = cce_loss[:, i] - self.feature_loss_stats[ft] = self._create_stat_dict(i_loss) - - def _validate_dataframe(self, orig_df, swapped_df): - """Runs a validation loop on the given validation pandas DataFrame, computing and returning the average loss of - both the original input and the input with swapped values. + LOG.debug(f'Rank{rank} Loss went down. Reset count for early stopping to 0') + early_stopping_count = 0 - Parameters - ---------- - orig_df : pandas.DataFrame, the original validation data - swapped_df: pandas.DataFrame, the swapped validation data - - Returns - ------- - Tuple[float] - A tuple containing two floats: - - mean_id_loss: the average net loss when passing the original df through the model - - mean_swapped_loss: the average net loss when passing the swapped df through the model - """ - val_batches = len(orig_df) // self.eval_batch_size - if len(orig_df) % self.eval_batch_size != 0: - val_batches += 1 - - swapped_loss = [] - id_loss = [] - for i in range(val_batches): - start = i * self.eval_batch_size - stop = (i + 1) * self.eval_batch_size - - # calculate the loss of the swapped tensor compared to the target (original) tensor - slc_in = swapped_df.iloc[start:stop] - slc_in_tensor = self.build_input_tensor(slc_in) - - slc_out = orig_df.iloc[start:stop] - slc_out_tensor = self.build_input_tensor(slc_out) - - num, bin, cat = self.model(slc_in_tensor) - _, _, _, net_loss = self.compute_loss(num, bin, cat, slc_out) - swapped_loss.append(net_loss) + last_val_loss = curr_val_loss - # calculate the loss of the original tensor - num, bin, cat = self.model(slc_out_tensor) - _, _, _, net_loss = self.compute_loss(num, bin, cat, slc_out, _id=True) - id_loss.append(net_loss) + # Sync early stopping info across processes if distributed training is enabled + if (self.distributed_training): + # we have to create enough room to store the collected objects + early_stopping_state = [None for _ in range(world_size)] + torch.distributed.all_gather_object(early_stopping_state, should_early_stop) + should_early_stop = early_stopping_state[0] # take the state of the main process - mean_swapped_loss = np.array(swapped_loss).mean() - mean_id_loss = np.array(id_loss).mean() + if should_early_stop is True: + LOG.debug(f'Rank{rank} Early stopped.') + break - return mean_id_loss, mean_swapped_loss + self.logger.end_epoch() - def _fit_distributed( + def fit( self, - train_data, - rank, - world_size, + training_data: typing.Union[pd.DataFrame, DataframeDataset, FileSystemDataset], + rank=0, + world_size=1, epochs=1, - val_data=None, + validation_data=None, run_validation=False, use_val_for_loss_stats=False, ): - """Fit the model in the distributed fashion with early stopping based on validation loss. - If run_validation is True, the val_dataset will be used for validation during training and early stopping + """ + Fit the model in a distributed or centralized fashion, depending on self.distributed_training with early + stopping based on validation loss. + If run_validation is True, the validation_dataset will be used for validation during training and early stopping will be applied based on patience argument. Parameters ---------- - train_data : pandas.DataFrame or torch.utils.data.Dataset or torch.utils.data.DataLoader + training_data : pandas.DataFrame or torch.utils.data.Dataset or torch.utils.data.DataLoader data object of training data rank : int the rank of the current process @@ -995,7 +886,7 @@ def _fit_distributed( the total number of processes epochs : int, optional the number of epochs to train for, by default 1 - val_data : torch.utils.data.Dataset or torch.utils.data.DataLoader, optional + validation_data : torch.utils.data.Dataset or torch.utils.data.DataLoader, optional the validation data object (with __iter__() that yields a batch at a time), by default None run_validation : bool, optional whether to perform validation during training, by default False @@ -1010,112 +901,53 @@ def _fit_distributed( ValueError If run_validation or use_val_for_loss_stats is True but val is not provided. """ - if run_validation and val_data is None: - raise ValueError("`run_validation` is set to True but the validation set (val_dataset) is not provided.") - if use_val_for_loss_stats and val_data is None: + if not isinstance(training_data, (pd.DataFrame, DataframeDataset, FileSystemDataset, DFEncoderDataLoader)): + raise TypeError( + "`train_data` needs to be a pandas DataFrame, a DataLoader, or a Dataset in distributed training mode." + f" `train_data` is currently of type: {type(training_data)}") + + if ((run_validation or use_val_for_loss_stats) and validation_data is None): raise ValueError("Validation set is required if either run_validation or \ use_val_for_loss_stats is set to True.") - # If train_data is in the format of a pandas df, wrap it by a dataset - train_df = None - if isinstance(train_data, pd.DataFrame): - train_df = train_data # keep the dataframe for model-building - train_data = DatasetFromDataframe( - df=train_data, - batch_size=self.batch_size, - preprocess_fn=self.preprocess_train_data, - shuffle_rows_in_batch=True, - ) + if (self.optim is None): + self._build_model(df=training_data, rank=rank) - if self.optim is None: - self._build_model(df=train_df, rank=rank) + training_data, validation_data, loss_dset = self._build_or_configure_datasets(training_data, validation_data, + use_val_for_loss_stats) - is_main_process = rank == 0 - should_run_validation = (run_validation and val_data is not None) - if self.patience and not should_run_validation: + is_main_process = (rank == 0) + + # Early stopping warning + if (self.patience and not run_validation): LOG.warning( f"Not going to perform early-stopping. self.patience(={self.patience}) is provided for early-stopping" - " but validation is not enabled. Please set `run_validation` to True and provide a `val_dataset` to" - " enable early-stopping.") + " but validation is not enabled. Please set `run_validation` to True and provide a `validation_dataset`" + " to enable early-stopping.") - if is_main_process and should_run_validation: + # Check if we're running validation, and compute baseline performance if we are + if (is_main_process and run_validation): LOG.debug('Validating during training. Computing baseline performance...') - baseline = self._compute_baseline_performance_from_dataset(val_data) + baseline = self._compute_baseline_performance_from_dataset(validation_data) if isinstance(self.logger, BasicLogger): self.logger.baseline_loss = baseline LOG.debug(f'Baseline loss: {round(baseline, 4)}') - # early stopping - count_es = 0 - last_val_loss = float('inf') - should_early_stop = False - for epoch in range(epochs): - LOG.debug(f'Rank{rank} training epoch {epoch + 1}...') - - # if we are using DistributedSampler, we have to tell it which epoch this is - train_data.sampler.set_epoch(epoch) - - train_loss_sum = 0 - train_loss_count = 0 - for data_d in train_data: - loss = self._fit_batch(**data_d['data']) - - train_loss_count += 1 - train_loss_sum += loss - - if self.lr_decay is not None: - self.lr_decay.step() - - if is_main_process and should_run_validation: - # run validation - curr_val_loss = self._validate_dataset(val_data, rank) - LOG.debug(f'Rank{rank} Loss: {round(last_val_loss, 4)}->{round(curr_val_loss, 4)}') - - if self.patience: # early stopping - if curr_val_loss > last_val_loss: - count_es += 1 - LOG.debug(f'Rank{rank} Loss went up. Early stop count: {count_es}') - - if count_es >= self.patience: - LOG.debug(f'Early stopping: early stop count({count_es}) >= patience({self.patience})') - should_early_stop = True - else: - LOG.debug(f'Rank{rank} Loss went down. Reset count for earlystop to 0') - count_es = 0 - - last_val_loss = curr_val_loss - - self.logger.end_epoch() - - # sync early stopping info so the early stopping decision can be passed from the main process to other processes - early_stpping_state = [None for _ in range(world_size) - ] # we have to create enough room to store the collected objects - torch.distributed.all_gather_object(early_stpping_state, should_early_stop) - should_early_stop_synced = early_stpping_state[0] # take the state of the main process - if should_early_stop_synced is True: - LOG.debug(f'Rank{rank} Early stopped.') - break - - if is_main_process: - # Run loss collection only on the main process (currently do not support distributed loss collection) - if use_val_for_loss_stats: - dataset_for_loss_stats = val_data + # Training loop + self._train_for_epochs(training_data, + validation_data, + epochs, + rank, + world_size, + run_validation, + is_main_process) - else: - # use training set for loss stats collection - if isinstance(train_data, torch.utils.data.DataLoader): - # grab only the dataset to avoid distriburted sampling - dataset_for_loss_stats = train_data.dataset - else: - # train_data is a Dataset - dataset_for_loss_stats = train_data - - # converts to validation mode to get the extra target tensors from the preprocessing function - dataset_for_loss_stats.convert_to_validation(self) - self._populate_loss_stats_from_dataset(dataset_for_loss_stats) + if (is_main_process): + self._transform_dataset_for_validation(loss_dset) + self._populate_loss_stats_from_dataset(loss_dset) def _fit_batch(self, input_swapped, num_target, bin_target, cat_target, **kwargs): """Forward pass on the input_swapped, then computes the losses from the predicted outputs and actual targets, performs @@ -1152,14 +984,15 @@ def _fit_batch(self, input_swapped, num_target, bin_target, cat_target, **kwargs self.do_backward(mse, bce, cce) self.optim.step() self.optim.zero_grad() + return net_loss - def _compute_baseline_performance_from_dataset(self, val_dataset): + def _compute_baseline_performance_from_dataset(self, validation_dataset): self.eval() loss_sum = 0 sample_count = 0 with torch.no_grad(): - for data_d in val_dataset: + for data_d in validation_dataset: curr_batch_size = data_d['data']['size'] loss = self._compute_batch_baseline_performance(**data_d['data']) loss_sum += loss @@ -1197,13 +1030,13 @@ def _compute_batch_baseline_performance( ) return net_loss - def _validate_dataset(self, val_dataset, rank=None): + def _validate_dataset(self, validation_dataset, rank=None): """Runs a validation loop on the given validation dataset, computing and returning the average loss of both the original input and the input with swapped values. Parameters ---------- - val_dataset : torch.utils.data.Dataset + validation_dataset : torch.utils.data.Dataset validation dataset to be used for validation rank : int, optional optional rank of the process being used for distributed training, used only for logging, by default None @@ -1217,7 +1050,7 @@ def _validate_dataset(self, val_dataset, rank=None): with torch.no_grad(): swapped_loss = [] id_loss = [] - for data_d in val_dataset: + for data_d in validation_dataset: orig_net_loss, net_loss = self._validate_batch(**data_d['data']) id_loss.append(orig_net_loss) swapped_loss.append(net_loss) @@ -1415,67 +1248,6 @@ def get_results_from_dataset(self, dataset, preloaded_df, return_abs=False): return result - def train_epoch(self, n_updates, input_df, df, pbar=None): - """Run regular epoch.""" - - if pbar is None and self.progress_bar: - close = True - pbar = tqdm.tqdm(total=n_updates) - else: - close = False - - for j in range(n_updates): - start = j * self.batch_size - stop = (j + 1) * self.batch_size - in_sample = input_df.iloc[start:stop] - in_sample_tensor = self.build_input_tensor(in_sample) - target_sample = df.iloc[start:stop] - num, bin, cat = self.model(in_sample_tensor) # forward - mse, bce, cce, net_loss = self.compute_loss(num, bin, cat, target_sample, should_log=True) - self.do_backward(mse, bce, cce) - self.optim.step() - self.optim.zero_grad() - - if self.progress_bar: - pbar.update(1) - if close: - pbar.close() - - def train_megabatch_epoch(self, n_updates, df): - """ - Run epoch doing 'megabatch' updates, preprocessing data in large - chunks. - """ - if self.progress_bar: - pbar = tqdm.tqdm(total=n_updates) - else: - pbar = None - - n_rows = len(df) - n_megabatches = self.n_megabatches - batch_size = self.batch_size - res = n_rows / n_megabatches - batches_per_megabatch = (res // batch_size) + 1 - megabatch_size = batches_per_megabatch * batch_size - final_batch_size = n_rows - (n_megabatches - 1) * megabatch_size - - for i in range(n_megabatches): - megabatch_start = int(i * megabatch_size) - megabatch_stop = int((i + 1) * megabatch_size) - megabatch = df.iloc[megabatch_start:megabatch_stop] - megabatch = self.prepare_df(megabatch) - input_df = megabatch.swap(self.swap_p) - if i == (n_megabatches - 1): - n_updates = int(final_batch_size // batch_size) - if final_batch_size % batch_size > 0: - n_updates += 1 - else: - n_updates = int(batches_per_megabatch) - self.train_epoch(n_updates, input_df, megabatch, pbar=pbar) - del megabatch - del input_df - gc.collect() - def get_representation(self, df, layer=0): """ Computes latent feature vector from hidden layer given input dataframe. @@ -1667,7 +1439,7 @@ def get_anomaly_score_losses(self, df): # merge the tensors into one (n_records * n_features) tensor cce_loss_slice = torch.cat(cce_loss_slice_of_each_feat, dim=1) else: - cce_loss_slice = torch.empty((len(df_slice), 0)) + cce_loss_slice = torch.empty((len(df_slice), 0), device=self.device) mse_loss_slices.append(mse_loss_slice) bce_loss_slices.append(bce_loss_slice) diff --git a/morpheus/models/dfencoder/dataloader.py b/morpheus/models/dfencoder/dataloader.py index 5eb841a254..1b2e7ecec1 100644 --- a/morpheus/models/dfencoder/dataloader.py +++ b/morpheus/models/dfencoder/dataloader.py @@ -41,10 +41,7 @@ def __iter__(self): if self.batch_size == 1: # unbatch to get rid of the first dimention of 1 intorduced by DataLoaders batching # (if batch size is set to 1) - data_d["data"] = { - k: v[0] if type(v) != list else [_v[0] for _v in v] - for k, v in data_d["data"].items() - } + data_d["data"] = {k: v[0] if type(v) != list else [_v[0] for _v in v] for k, v in data_d["data"].items()} yield data_d @staticmethod @@ -113,10 +110,10 @@ def get_distributed_training_dataloader_from_path(model, DFEncoderDataLoader The training DataLoader with DistributedSampler for distributed training. """ - dataset = DatasetFromPath( + dataset = FileSystemDataset( data_folder, model.batch_size, - model.preprocess_train_data, + model.preprocess_training_data, load_data_fn=load_data_fn, ) dataloader = DFEncoderDataLoader.get_distributed_training_dataloader_from_dataset( @@ -152,7 +149,7 @@ def get_distributed_training_dataloader_from_df(model, df, rank, world_size, pin DFEncoderDataLoader The training DataLoader with DistributedSampler for distributed training. """ - dataset = DatasetFromDataframe.get_train_dataset(model, df) + dataset = DataframeDataset.get_train_dataset(model, df) dataloader = DFEncoderDataLoader.get_distributed_training_dataloader_from_dataset( dataset=dataset, rank=rank, @@ -163,7 +160,7 @@ def get_distributed_training_dataloader_from_df(model, df, rank, world_size, pin return dataloader -class DatasetFromPath(Dataset): +class FileSystemDataset(Dataset): """ A dataset class that reads data in batches from a folder and applies preprocessing to each batch. * This class assumes that the data is saved in small csv files in one folder. """ @@ -171,8 +168,8 @@ class DatasetFromPath(Dataset): def __init__( self, data_folder, - batch_size, - preprocess_fn, + batch_size=128, + preprocess_fn=lambda x: x, load_data_fn=pd.read_csv, shuffle_rows_in_batch=True, shuffle_batch_indices=False, @@ -352,89 +349,46 @@ def get_preloaded_data(self): self._preloaded_data = {fn: self._load_data_fn(f"{self._data_folder}/{fn}") for fn in self._filenames} return pd.concat(pdf for pdf in self._preloaded_data.values()) - @staticmethod - def get_train_dataset(model, data_folder, load_data_fn=pd.read_csv, preload_data_into_memory=False): - """A helper function to get a train dataset with the provided parameters. + @property + def preprocess_fn(self): + return self._preprocess_fn - Parameters - ---------- - model : AutoEncoder - The autoencoder model used to get relevant params and the preprocessing func. - data_folder : str - The path to the folder containing the data. - load_data_fn : function, optional - A function for loading data from a provided file path into a pandas.DataFrame, by default pd.read_csv. - preload_data_into_memory : bool, optional - Whether to preload all the data into memory, by default False. + @preprocess_fn.setter + def preprocess_fn(self, value): + self._preprocess_fn = value - Returns - ------- - DatasetFromPath - Validation Dataset set up to load from the path. - """ - dataset = DatasetFromPath( - data_folder, - model.batch_size, - model.preprocess_train_data, - load_data_fn=load_data_fn, - shuffle_rows_in_batch=True, - shuffle_batch_indices=True, - preload_data_into_memory=preload_data_into_memory, - ) - return dataset + @property + def batch_size(self): + return self._batch_size - @staticmethod - def get_validation_dataset(model, data_folder, load_data_fn=pd.read_csv, preload_data_into_memory=True): - """A helper function to get a validation dataset with the provided parameters. + @batch_size.setter + def batch_size(self, value): + self._batch_size = value - Parameters - ---------- - model : AutoEncoder - The autoencoder model used to get relevant params and the preprocessing func. - data_folder : str - The path to the folder containing the data. - load_data_fn : function, optional - A function for loading data from a provided file path into a pandas.DataFrame, by default pd.read_csv. - preload_data_into_memory : bool, optional - Whether to preload all the data into memory, by default True. - (can speed up data loading if the data can fit into memory) + @property + def shuffle_rows_in_batch(self): + return self._shuffle_rows_in_batch - Returns - ------- - DatasetFromPath - Validation Dataset set up to load from the path. - """ - dataset = DatasetFromPath( - data_folder, - model.eval_batch_size, - model.preprocess_validation_data, - load_data_fn=load_data_fn, - shuffle_rows_in_batch=False, - preload_data_into_memory=preload_data_into_memory, - ) - return dataset + @shuffle_rows_in_batch.setter + def shuffle_rows_in_batch(self, value): + self._shuffle_rows_in_batch = value - def convert_to_validation(self, model): - """Converts the dataset to validation mode by resetting instance variables. + @property + def shuffle_batch_indices(self): + return self._shuffle_batch_indices - Parameters - ---------- - model : AutoEncoder - The autoencoder model used to get relevant params and the preprocessing func. - """ - self._preprocess_fn = model.preprocess_validation_data - self._batch_size = model.eval_batch_size - self._shuffle_rows_in_batch = False - self._shuffle_batch_indices = False + @shuffle_batch_indices.setter + def shuffle_batch_indices(self, value): + self._shuffle_batch_indices = value -class DatasetFromDataframe(Dataset): +class DataframeDataset(Dataset): def __init__( self, df, - batch_size, - preprocess_fn, + batch_size=128, + preprocess_fn=lambda x: x, shuffle_rows_in_batch=True, shuffle_batch_indices=False, ): @@ -545,65 +499,34 @@ def _preprocess(self, df, batch_index): ) return {"batch_index": batch_index, "data": data} - @staticmethod - def get_train_dataset(model, df): - """A helper function to get a train dataset with the provided parameters. + @property + def preprocess_fn(self): + return self._preprocess_fn - Parameters - ---------- - model : AutoEncoder - The autoencoder model used to get relevant params and the preprocessing func. - df : pandas.DataFrame - Input dataframe used for the dataset. + @preprocess_fn.setter + def preprocess_fn(self, value): + self._preprocess_fn = value - Returns - ------- - DatasetFromDataframe - Training Dataset set up to load from the dataframe. - """ - dataset = DatasetFromDataframe( - df=df, - batch_size=model.batch_size, - preprocess_fn=model.preprocess_train_data, - shuffle_rows_in_batch=True, - shuffle_batch_indices=True, - ) - return dataset + @property + def batch_size(self): + return self._batch_size - @staticmethod - def get_validation_dataset(model, df): - """A helper function to get a validation dataset with the provided parameters. + @batch_size.setter + def batch_size(self, value): + self._batch_size = value - Parameters - ---------- - model : AutoEncoder - The autoencoder model used to get relevant params and the preprocessing func. - df : pandas.DataFrame - Input dataframe used for the dataset. + @property + def shuffle_rows_in_batch(self): + return self._shuffle_rows_in_batch - Returns - ------- - DatasetFromDataframe - Validation Dataset set up to load from the dataframe. - """ - dataset = DatasetFromDataframe( - df=df, - batch_size=model.eval_batch_size, - preprocess_fn=model.preprocess_validation_data, - shuffle_rows_in_batch=False, - shuffle_batch_indices=False, - ) - return dataset + @shuffle_rows_in_batch.setter + def shuffle_rows_in_batch(self, value): + self._shuffle_rows_in_batch = value - def convert_to_validation(self, model): - """Converts the dataset to validation mode by resetting instance variables. + @property + def shuffle_batch_indices(self): + return self._shuffle_batch_indices - Parameters - ---------- - model : AutoEncoder - The autoencoder model used to get relevant params and the preprocessing func. - """ - self._preprocess_fn = model.preprocess_validation_data - self._batch_size = model.eval_batch_size - self._shuffle_rows_in_batch = False - self._shuffle_batch_indices = False + @shuffle_batch_indices.setter + def shuffle_batch_indices(self, value): + self._shuffle_batch_indices = value diff --git a/morpheus/stages/preprocess/train_ae_stage.py b/morpheus/stages/preprocess/train_ae_stage.py index ce8ed408d0..24797a13c2 100644 --- a/morpheus/stages/preprocess/train_ae_stage.py +++ b/morpheus/stages/preprocess/train_ae_stage.py @@ -92,9 +92,9 @@ def train(self, df: pd.DataFrame) -> AutoEncoder: encoder_layers=[512, 500], # layers of the encoding part decoder_layers=[512], # layers of the decoding part activation='relu', # activation function - swap_p=0.2, # noise parameter - lr=0.01, # learning rate - lr_decay=.99, # learning decay + swap_probability=0.2, # noise parameter + learning_rate=0.01, # learning rate + learning_rate_decay=.99, # learning decay batch_size=512, # logger='ipynb', verbose=False, diff --git a/tests/dfencoder/test_autoencoder.py b/tests/dfencoder/test_autoencoder.py index 668fffa94f..1cf16eff49 100755 --- a/tests/dfencoder/test_autoencoder.py +++ b/tests/dfencoder/test_autoencoder.py @@ -21,6 +21,7 @@ import pandas as pd import pytest import torch +from torch.utils.data import Dataset as TorchDataset from _utils import TEST_DIRS from _utils.dataset_manager import DatasetManager @@ -29,6 +30,9 @@ from morpheus.models.dfencoder import autoencoder from morpheus.models.dfencoder import scalers from morpheus.models.dfencoder.dataframe import EncoderDataFrame +from morpheus.models.dfencoder.dataloader import DataframeDataset +from morpheus.models.dfencoder.dataloader import DFEncoderDataLoader +from morpheus.models.dfencoder.dataloader import FileSystemDataset # Only pandas and Python is supported pytestmark = [pytest.mark.use_pandas, pytest.mark.use_python] @@ -64,9 +68,9 @@ def train_ae_fixture(): encoder_layers=[512, 500], decoder_layers=[512], activation='relu', - swap_p=0.2, - lr=0.01, - lr_decay=.99, + swap_probability=0.2, + learning_rate=0.01, + learning_rate_decay=.99, batch_size=512, verbose=False, optimizer='sgd', @@ -172,17 +176,16 @@ def test_auto_encoder_constructor_default_vals(): assert ae.model.encoder_layers is None assert ae.model.decoder_layers is None assert ae.min_cats == 10 - assert ae.swap_p == 0.15 + assert ae.swap_probability == 0.15 assert ae.batch_size == 256 assert ae.eval_batch_size == 1024 assert ae.model.activation == 'relu' assert ae.optimizer == 'adam' - assert ae.lr == 0.01 - assert ae.lr_decay is None + assert ae.learning_rate == 0.01 + assert ae.learning_rate_decay is None assert ae.device.type == 'cuda' assert ae.scaler == 'standard' assert ae.loss_scaler is scalers.StandardScaler - assert ae.n_megabatches == 1 def test_auto_encoder_constructor(train_ae: autoencoder.AutoEncoder): @@ -193,19 +196,18 @@ def test_auto_encoder_constructor(train_ae: autoencoder.AutoEncoder): assert train_ae.model.encoder_layers == [512, 500] assert train_ae.model.decoder_layers == [512] assert train_ae.min_cats == 1 - assert train_ae.swap_p == 0.2 + assert train_ae.swap_probability == 0.2 assert train_ae.batch_size == 512 assert train_ae.eval_batch_size == 1024 assert train_ae.model.activation == 'relu' assert train_ae.optimizer == 'sgd' - assert train_ae.lr == 0.01 - assert train_ae.lr_decay == 0.99 + assert train_ae.learning_rate == 0.01 + assert train_ae.learning_rate_decay == 0.99 assert not train_ae.progress_bar assert not train_ae.verbose assert train_ae.device.type == 'cuda' assert train_ae.scaler == 'standard' assert train_ae.loss_scaler is scalers.StandardScaler - assert train_ae.n_megabatches == 1 def test_auto_encoder_get_scaler(): @@ -246,8 +248,23 @@ def test_auto_encoder_init_numeric(filter_probs_df): compare_numeric_features(ae.numeric_fts, expected_features) -def test_auto_encoder_fit(train_ae: autoencoder.AutoEncoder, train_df: pd.DataFrame): - train_ae.fit(train_df, epochs=1) +@pytest.mark.parametrize("input_type", [pd.DataFrame, FileSystemDataset, DFEncoderDataLoader, TorchDataset]) +def test_auto_encoder_fit(train_ae: autoencoder.AutoEncoder, train_df: pd.DataFrame, input_type): + _train_df = train_df + if (isinstance(input_type, FileSystemDataset)): + _train_df = DataframeDataset(_train_df) + elif (isinstance(input_type, DFEncoderDataLoader)): + _train_df = DFEncoderDataLoader.get_distributed_training_dataloader_from_df(train_ae, + _train_df, + 1, + 1, + num_workers=1) + elif (isinstance(input_type, TorchDataset)): + with pytest.raises(TypeError): + train_ae.fit(_train_df, epochs=1) + return + + train_ae.fit(_train_df, epochs=1) expected_numeric_features = { 'eventID': { @@ -267,7 +284,7 @@ def test_auto_encoder_fit(train_ae: autoencoder.AutoEncoder, train_df: pd.DataFr assert sorted(train_ae.categorical_fts.keys()) == CAT_COLS for cat in CAT_COLS: - assert sorted(train_ae.categorical_fts[cat]['cats']) == sorted(train_df[cat].dropna().unique()) + assert sorted(train_ae.categorical_fts[cat]['cats']) == sorted(_train_df[cat].dropna().unique()) assert len(train_ae.cyclical_fts) == 0 @@ -278,9 +295,9 @@ def test_auto_encoder_fit(train_ae: autoencoder.AutoEncoder, train_df: pd.DataFr assert isinstance(feature['scaler'], scalers.StandardScaler) assert isinstance(train_ae.optim, torch.optim.SGD) - assert isinstance(train_ae.lr_decay, torch.optim.lr_scheduler.ExponentialLR) - assert train_ae.lr_decay.gamma == 0.99 - assert train_ae.optim is train_ae.lr_decay.optimizer + assert isinstance(train_ae.learning_rate_decay, torch.optim.lr_scheduler.ExponentialLR) + assert train_ae.learning_rate_decay.gamma == 0.99 + assert train_ae.optim is train_ae.learning_rate_decay.optimizer def test_auto_encoder_fit_early_stopping(train_df: pd.DataFrame): @@ -291,31 +308,27 @@ def test_auto_encoder_fit_early_stopping(train_df: pd.DataFrame): # Test normal training loop with no early stopping ae = autoencoder.AutoEncoder(patience=5) - ae.fit(train_data, val_data=validation_data, run_validation=True, use_val_for_loss_stats=True, epochs=epochs) + ae.fit(train_data, validation_data=validation_data, run_validation=True, use_val_for_loss_stats=True, epochs=epochs) # assert that training runs through all epoches assert ae.logger.n_epochs == epochs class MockHelper: """A helper class for mocking the `_validate_dataframe` method in the `AutoEncoder` class.""" - def __init__(self, orig_losses, swapped_loss=1.0): + def __init__(self, orig_losses): """ Initialization. Parameters: ----------- - orig_losses : list - A list of original validation losses to be returned by the mocked `_validate_dataframe` method. - swapped_loss : float, optional (default=1.0) - A fixed loss value to be returned by the mocked `_validate_dataframe` method as the `swapped_loss`. - Fixed as it's unrelated to the early-stopping functionality being tested here. + mean_loss: list + A list of mean validation losses to be returned by the mocked `_validate_dataframe` method. """ - self.swapped_loss = swapped_loss self.orig_losses = orig_losses # counter to keep track of the number of times the mocked `_validate_dataframe` method has been called self.count = 0 - def mocked_validate_dataframe(self, *args, **kwargs): # pylint: disable=unused-argument + def mocked_validate_dataset(self, *args, **kwargs): # pylint: disable=unused-argument """ A mocked version of the `_validate_dataframe` method in the `AutoEncoder` class for testing early stopping. @@ -324,9 +337,9 @@ def mocked_validate_dataframe(self, *args, **kwargs): # pylint: disable=unused- tuple of (float, float) A tuple of original validation loss and swapped loss values for each epoch. """ - orig_loss = self.orig_losses[self.count] + mean_loss = self.orig_losses[self.count] self.count += 1 - return orig_loss, self.swapped_loss + return mean_loss # Test early stopping orig_losses = [0.1, 0.2, 0.3, 0.4, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6] @@ -334,16 +347,24 @@ def mocked_validate_dataframe(self, *args, **kwargs): # pylint: disable=unused- ae = autoencoder.AutoEncoder( patience=3) # should stop at epoch 3 as the first 3 losses are monotonically increasing mock_helper = MockHelper(orig_losses=orig_losses) # validation loss is strictly increasing - with patch.object(ae, '_validate_dataframe', side_effect=mock_helper.mocked_validate_dataframe): - ae.fit(train_data, val_data=validation_data, run_validation=True, use_val_for_loss_stats=True, epochs=epochs) + with patch.object(ae, '_validate_dataset', side_effect=mock_helper.mocked_validate_dataset): + ae.fit(train_data, + validation_data=validation_data, + run_validation=True, + use_val_for_loss_stats=True, + epochs=epochs) # assert that training early-stops at epoch 3 assert ae.logger.n_epochs == 3 ae = autoencoder.AutoEncoder( patience=5) # should stop at epoch 9 as losses from epoch 5-9 are monotonically increasing mock_helper = MockHelper(orig_losses=orig_losses) # validation loss is strictly increasing - with patch.object(ae, '_validate_dataframe', side_effect=mock_helper.mocked_validate_dataframe): - ae.fit(train_data, val_data=validation_data, run_validation=True, use_val_for_loss_stats=True, epochs=epochs) + with patch.object(ae, '_validate_dataset', side_effect=mock_helper.mocked_validate_dataset): + ae.fit(train_data, + validation_data=validation_data, + run_validation=True, + use_val_for_loss_stats=True, + epochs=epochs) # assert that training early-stops at epoch 3 assert ae.logger.n_epochs == 9 @@ -459,6 +480,6 @@ def test_auto_encoder_get_results(train_ae: autoencoder.AutoEncoder, train_df: p assert round(results.loc[0, 'max_abs_z'], 2) == 2.5 - # Not sure why but numpy.float32(0.33) != 0.33 - assert round(float(results.loc[0, 'mean_abs_z']), 2) == 0.33 + # Numpy float has different precision checks than python float, so we wrap it. + assert round(float(results.loc[0, 'mean_abs_z']), 3) == 0.335 assert results.loc[0, 'z_loss_scaler_type'] == 'z' diff --git a/tests/dfencoder/test_dfencoder_distributed_e2e.py b/tests/dfencoder/test_dfencoder_distributed_e2e.py index 3f88f6d2bc..bd9d855173 100644 --- a/tests/dfencoder/test_dfencoder_distributed_e2e.py +++ b/tests/dfencoder/test_dfencoder_distributed_e2e.py @@ -27,8 +27,8 @@ from _utils import TEST_DIRS from morpheus.models.dfencoder.autoencoder import AutoEncoder -from morpheus.models.dfencoder.dataloader import DatasetFromPath from morpheus.models.dfencoder.dataloader import DFEncoderDataLoader +from morpheus.models.dfencoder.dataloader import FileSystemDataset from morpheus.models.dfencoder.multiprocessing import start_processes # import torch @@ -107,7 +107,6 @@ def cleanup_dist(): @pytest.mark.slow def test_dfencoder_distributed_e2e(): - world_size = 1 start_processes(_run_test, args=(world_size, ), nprocs=world_size, join=True) @@ -135,9 +134,9 @@ def _run_test(rank, world_size): encoder_layers=[512, 500], decoder_layers=[512], activation='relu', - swap_p=0.2, - lr=0.01, - lr_decay=0.99, + swap_probability=0.2, + learning_rate=0.01, + learning_rate_decay=0.99, batch_size=4096, logger='basic', verbose=True, @@ -160,14 +159,21 @@ def _run_test(rank, world_size): rank=rank, world_size=world_size) # Load validation set - val_dataset = DatasetFromPath.get_validation_dataset(model, VALIDATION_FOLDER) + val_dataset = FileSystemDataset( + data_folder=VALIDATION_FOLDER, + batch_size=model.eval_batch_size, + preprocess_fn=model.preprocess_validation_data, + shuffle_rows_in_batch=False, + shuffle_batch_indices=False, + preload_data_into_memory=True, # very small validation set + ) # Train - model.fit(train_data=dataloader, + model.fit(training_data=dataloader, rank=rank, world_size=world_size, epochs=10, - val_data=val_dataset, + validation_data=val_dataset, run_validation=True, use_val_for_loss_stats=True) @@ -180,7 +186,7 @@ def _run_test(rank, world_size): assert min(losses) < LOSS_TARGETS[loss_type][feature] * LOSS_TOLERANCE_RATIO # Inference - inf_dataset = DatasetFromPath( + inf_dataset = FileSystemDataset( data_folder=INFERENCE_FOLDER, batch_size=1024, preprocess_fn=model.preprocess_validation_data, diff --git a/tests/dfencoder/test_dfencoder_e2e.py b/tests/dfencoder/test_dfencoder_e2e.py index d151923195..3bebffa058 100644 --- a/tests/dfencoder/test_dfencoder_e2e.py +++ b/tests/dfencoder/test_dfencoder_e2e.py @@ -96,9 +96,9 @@ def test_dfencoder_e2e(): encoder_layers=[512, 500], # layers of the encoding part decoder_layers=[512], # layers of the decoding part activation="relu", # activation function - swap_p=0.2, # noise parameter - lr=0.01, # learning rate - lr_decay=0.99, # learning decay + swap_probability=0.2, # noise parameter + learning_rate=0.01, # learning rate + learning_rate_decay=0.99, # learning decay batch_size=1024, logger="basic", verbose=False, @@ -112,7 +112,7 @@ def test_dfencoder_e2e(): model.fit( train_df, epochs=10, - val_data=validation_df, + validation_data=validation_df, run_validation=True, use_val_for_loss_stats=True, ) diff --git a/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py b/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py index 54b438d4a3..1d04e39293 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py +++ b/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py @@ -258,8 +258,8 @@ def test_on_data( max_time = time_col.max() mock_model = mock.MagicMock() - mock_model.lr_decay.state_dict.return_value = {'last_epoch': 42} - mock_model.lr = 0.1 + mock_model.learning_rate_decay.state_dict.return_value = {'last_epoch': 42} + mock_model.learning_rate = 0.1 mock_model.batch_size = 100 mock_embedding = mock.MagicMock() diff --git a/tests/examples/digital_fingerprinting/test_dfp_training.py b/tests/examples/digital_fingerprinting/test_dfp_training.py index 74e4571b98..34dee902a3 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_training.py +++ b/tests/examples/digital_fingerprinting/test_dfp_training.py @@ -95,7 +95,7 @@ def test_on_data(mock_train_test_split: mock.MagicMock, assert len(mock_ae.fit.call_args.args) == 1 dataset_pandas.assert_compare_df(mock_ae.fit.call_args.args[0], train_df) assert mock_ae.fit.call_args.kwargs == { - 'epochs': stage._epochs, 'val_data': expected_val_data, 'run_validation': expected_run_validation + 'epochs': stage._epochs, 'validation_data': expected_val_data, 'run_validation': expected_run_validation } # The stage shouldn't be modifying the dataframe