From c7aa20434f6eb0db3ae7e5089f44a109c2e8956f Mon Sep 17 00:00:00 2001 From: Dylan <52908667+smellycloud@users.noreply.github.com> Date: Thu, 23 Jan 2025 16:07:06 +0100 Subject: [PATCH 1/5] cleanup --- views_pipeline_core/data/dataloaders.py | 2 +- views_pipeline_core/managers/model.py | 2 +- .../templates/ensemble/template_main.py | 48 +++++++++---------- .../templates/model/template_config_meta.py | 1 + 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/views_pipeline_core/data/dataloaders.py b/views_pipeline_core/data/dataloaders.py index 2d44716..4329a98 100644 --- a/views_pipeline_core/data/dataloaders.py +++ b/views_pipeline_core/data/dataloaders.py @@ -291,7 +291,7 @@ def get_data( return df, alerts else: raise RuntimeError( - f"file {viewser_df_name} incompatible with partition {self.partition}" + f"file {path_viewser_df.name} incompatible with partition {self.partition}" ) logger.debug(f"DataFrame shape: {df.shape if df is not None else 'None'}") for ialert, alert in enumerate( diff --git a/views_pipeline_core/managers/model.py b/views_pipeline_core/managers/model.py index 74d85d2..a367e8a 100644 --- a/views_pipeline_core/managers/model.py +++ b/views_pipeline_core/managers/model.py @@ -1165,7 +1165,7 @@ def execute_single_run(self, args) -> None: validate=True, ) self._wandb_alert( - title=f"Queryset Fetch Complete ({str(args.run_type)})", + title=f"Queryset Fetch Complete ({str(self._data_loader.partition)})", text=f"Queryset for {self._model_path.target} {self.config['name']} with depvar {self.config['depvar']} and LoA of {self.config['level']} downloaded successfully. Drift self test is set to {args.drift_self_test}.", level=wandb.AlertLevel.INFO, ) diff --git a/views_pipeline_core/templates/ensemble/template_main.py b/views_pipeline_core/templates/ensemble/template_main.py index 295370a..7b6e544 100644 --- a/views_pipeline_core/templates/ensemble/template_main.py +++ b/views_pipeline_core/templates/ensemble/template_main.py @@ -22,33 +22,33 @@ def generate(script_path: Path) -> bool: """ code = """import wandb - import warnings - from pathlib import Path - from views_pipeline_core.cli.utils import parse_args, validate_arguments - from views_pipeline_core.managers.log import LoggingManager - from views_pipeline_core.managers.ensemble import EnsemblePathManager, EnsembleManager - - warnings.filterwarnings("ignore") - - try: - ensemble_path = EnsemblePathManager(Path(__file__)) - logger = LoggingManager(ensemble_path).get_logger() - except FileNotFoundError as fnf_error: - raise RuntimeError( - f"File not found: {fnf_error}. Check the file path and try again." - ) - except PermissionError as perm_error: - raise RuntimeError( - f"Permission denied: {perm_error}. Check your permissions and try again." - ) - except Exception as e: - raise RuntimeError(f"Unexpected error: {e}. Check the logs for details.") - - if __name__ == "__main__": +import warnings +from pathlib import Path +from views_pipeline_core.cli.utils import parse_args, validate_arguments +from views_pipeline_core.managers.log import LoggingManager +from views_pipeline_core.managers.ensemble import EnsemblePathManager, EnsembleManager + +warnings.filterwarnings("ignore") + +try: + ensemble_path = EnsemblePathManager(Path(__file__)) + logger = LoggingManager(ensemble_path).get_logger() +except FileNotFoundError as fnf_error: + raise RuntimeError( + f"File not found: {fnf_error}. Check the file path and try again." + ) +except PermissionError as perm_error: + raise RuntimeError( + f"Permission denied: {perm_error}. Check your permissions and try again." + ) +except Exception as e: + raise RuntimeError(f"Unexpected error: {e}. Check the logs for details.") + +if __name__ == "__main__": wandb.login() args = parse_args() validate_arguments(args) EnsembleManager(ensemble_path=ensemble_path).execute_single_run(args) - """ +""" return save_python_script(script_path, code) diff --git a/views_pipeline_core/templates/model/template_config_meta.py b/views_pipeline_core/templates/model/template_config_meta.py index 03d855c..08a378b 100644 --- a/views_pipeline_core/templates/model/template_config_meta.py +++ b/views_pipeline_core/templates/model/template_config_meta.py @@ -39,6 +39,7 @@ def generate(script_path: Path, model_name: str, model_algorithm: str) -> bool: # "queryset": "escwa001_cflong", # "level": "pgm", # "creator": "Your name here" + "metrics": ["RMSLE", "CRPS"], }} return meta_config """ From 4053801e0950ed02c98f81a40df5b370b2d91fa2 Mon Sep 17 00:00:00 2001 From: Dylan <52908667+smellycloud@users.noreply.github.com> Date: Thu, 23 Jan 2025 20:57:26 +0100 Subject: [PATCH 2/5] add progress bar --- views_pipeline_core/managers/ensemble.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/views_pipeline_core/managers/ensemble.py b/views_pipeline_core/managers/ensemble.py index 13731e2..d0dbed7 100644 --- a/views_pipeline_core/managers/ensemble.py +++ b/views_pipeline_core/managers/ensemble.py @@ -16,7 +16,7 @@ from datetime import datetime import pandas as pd import traceback - +import tqdm logger = logging.getLogger(__name__) # ============================================================ Ensemble Path Manager ============================================================ @@ -485,7 +485,8 @@ def _forecast_model_artifact(self, model_name: str, run_type: str) -> pd.DataFra def _train_ensemble(self, use_saved: bool) -> None: run_type = self.config["run_type"] - for model_name in self.config["models"]: + for model_name in tqdm.tqdm(self.config["models"], desc="Training ensemble"): + tqdm.tqdm.write(f"Current model: {model_name}") self._train_model_artifact(model_name, run_type, use_saved) def _evaluate_ensemble(self, eval_type: str) -> List[pd.DataFrame]: @@ -493,10 +494,12 @@ def _evaluate_ensemble(self, eval_type: str) -> List[pd.DataFrame]: run_type = self.config["run_type"] dfs = [] dfs_agg = [] - - for model_name in self.config["models"]: + + for model_name in tqdm.tqdm(self.config["models"], desc="Evaluating ensemble"): + tqdm.tqdm.write(f"Current model: {model_name}") dfs.append(self._evaluate_model_artifact(model_name, run_type, eval_type)) + tqdm.tqdm.write(f"Aggregating metrics...") for i in range(len(dfs[0])): df_to_aggregate = [df[i] for df in dfs] df_agg = EnsembleManager._get_aggregated_df( @@ -510,8 +513,8 @@ def _forecast_ensemble(self) -> None: run_type = self.config["run_type"] dfs = [] - for model_name in self.config["models"]: - + for model_name in tqdm.tqdm(self.config["models"], desc="Forecasting ensemble"): + tqdm.tqdm.write(f"Current model: {model_name}") dfs.append(self._forecast_model_artifact(model_name, run_type)) df_prediction = EnsembleManager._get_aggregated_df( From 6663d2a1864c3abfd5634b31fea028065e7d0d08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borb=C3=A1la=20Farkas?= <36622811+lujzi05@users.noreply.github.com> Date: Fri, 24 Jan 2025 14:15:17 +0100 Subject: [PATCH 3/5] moving ValueError for missing metrics in config_meta.py --- views_pipeline_core/managers/ensemble.py | 10 +++------- views_pipeline_core/managers/model.py | 8 +++++++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/views_pipeline_core/managers/ensemble.py b/views_pipeline_core/managers/ensemble.py index d0dbed7..8258c8f 100644 --- a/views_pipeline_core/managers/ensemble.py +++ b/views_pipeline_core/managers/ensemble.py @@ -247,14 +247,10 @@ def _execute_model_tasks( df_predictions = self._evaluate_ensemble(self._eval_type) self._handle_log_creation() # Evaluate the model - if self.config["metrics"]: - self._evaluate_prediction_dataframe( + self._evaluate_prediction_dataframe( df_predictions, ensemble=True - ) # Calculate evaluation metrics with the views-evaluation package - else: - raise ValueError( - 'No evaluation metrics specified in config_meta.py. Add a field "metrics" with a list of metrics to calculate. E.g "metrics": ["RMSLE", "CRPS"]' - ) + ) # Calculate evaluation metrics with the views-evaluation package + except Exception as e: logger.error(f"Error evaluating model: {e}", exc_info=True) self._wandb_alert( diff --git a/views_pipeline_core/managers/model.py b/views_pipeline_core/managers/model.py index a367e8a..17b9fc6 100644 --- a/views_pipeline_core/managers/model.py +++ b/views_pipeline_core/managers/model.py @@ -1406,7 +1406,13 @@ def _evaluate_prediction_dataframe(self, df_predictions, ensemble=False) -> None Raises: None """ - metrics_manager = MetricsManager(self.config["metrics"]) + if "metrics" in self.config: + metrics_manager = MetricsManager(self.config["metrics"]) + else: + logger.error('Missing "metrics" in config_meta.py') + raise ValueError( + 'No evaluation metrics specified in config_meta.py. Add a field "metrics" with a list of metrics to calculate. E.g "metrics": ["RMSLE", "CRPS"]' + ) if not ensemble: df_path = self._model_path.data_raw / f"{self.config['run_type']}_viewser_df{PipelineConfig().dataframe_format}" df_viewser = read_dataframe(df_path) From 2052f671402c8ced909dfd3a27e7cda2a54d7366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borb=C3=A1la=20Farkas?= <36622811+lujzi05@users.noreply.github.com> Date: Fri, 24 Jan 2025 15:37:56 +0100 Subject: [PATCH 4/5] progress with test_ensemble_manager.py --- tests/test_ensemble_manager.py | 933 +++++++++++++++++---------------- 1 file changed, 477 insertions(+), 456 deletions(-) diff --git a/tests/test_ensemble_manager.py b/tests/test_ensemble_manager.py index b17c71d..0e799cf 100644 --- a/tests/test_ensemble_manager.py +++ b/tests/test_ensemble_manager.py @@ -1,10 +1,11 @@ import pytest import unittest import pickle -from unittest.mock import patch, MagicMock, ANY, PropertyMock +from unittest.mock import patch, MagicMock, ANY, PropertyMock, mock_open, call from views_pipeline_core.managers.ensemble import EnsembleManager -from views_pipeline_core.managers.model import ModelPathManager +from views_pipeline_core.managers.model import ModelPathManager, ModelManager import pandas as pd +import wandb import subprocess class MockArgs: @@ -12,7 +13,7 @@ def __init__(self, train, evaluate, forecast, saved, run_type, eval_type): self.train = train self.evaluate = evaluate self.forecast = forecast - self.use_saved = saved + self.saved = saved self.run_type = run_type self.eval_type = eval_type @@ -20,7 +21,6 @@ def __init__(self, train, evaluate, forecast, saved, run_type, eval_type): def mock_model_path(): mock_path = MagicMock() mock_path.model_dir = "/path/to/models/test_model" - return mock_path @@ -33,7 +33,6 @@ def mock_model_path(): - @pytest.mark.parametrize( "args, expected_command, expected_methods_called", [ @@ -76,17 +75,19 @@ def mock_model_path(): MockArgs( train=False, # Simulate no training evaluate=False, # Simulate no evaluation - forecast=False, # Simulate no forecasting - saved=False, # Simulate not using saved data + forecast=True, # Simulate forecasting + saved=True, # Simulate using saved data run_type="calibration", # Example run type eval_type="minimal" # Example eval type ), [ "/path/to/models/test_model/run.sh", "--run_type", "calibration", + "--forecast", + "--saved", "--eval_type", "minimal" ], - {"train": 0, "evaluate": 0, "forecast": 0} + {"train": 0, "evaluate": 0, "forecast": 1} ) ] ) @@ -105,31 +106,17 @@ def mock_config(self, args): @pytest.fixture def mock_ensemble_manager(self, mock_model_path, args): - manager = EnsembleManager(ensemble_path=mock_model_path) - manager._project = "test_project" - manager._entity = "test_entity" - manager._config_hyperparameters = {} - manager._config_meta = {} - manager._train_ensemble = MagicMock() - manager._evaluate_ensemble = MagicMock() - manager._forecast_ensemble = MagicMock() - manager._eval_type = args.eval_type - with patch("views_pipeline_core.managers.model.ModelManager._update_single_config") as mock_update_single_config: - manager.config = mock_update_single_config(args) - manager.config = {"name": "test_model"} + with patch.object(ModelManager, '_ModelManager__load_config'), \ + patch("views_pipeline_core.managers.package.PackageManager"): + + manager = EnsembleManager(ensemble_path=mock_model_path) + #manager.config = mock_update_single_config(args) + manager._project = "test_project" + manager._eval_type = args.eval_type return manager - # @pytest.fixture - # def mock_modelpath_manager(self): - # modelpath = ModelPathManager(model_path="test_model") - # return modelpath - - # @pytest.fixture - # def mock_read_dataframe(self): - # with patch("views_pipeline_core.files.utils.read_dataframe") as mock: - # mock.return_value = pd.DataFrame({"mock_column": [1, 2, 3]}) - # yield mock + @@ -146,7 +133,7 @@ def test_get_shell_command(self, mock_model_path, args, expected_command, expect args.train, args.evaluate, args.forecast, - args.use_saved, + args.saved, args.eval_type ) assert command == expected_command @@ -157,61 +144,60 @@ def test_get_shell_command(self, mock_model_path, args, expected_command, expect + + - - - - - @patch('views_pipeline_core.managers.ensemble.EnsembleManager._execute_model_tasks') - @patch('views_pipeline_core.managers.model.ModelManager._update_single_config') - @patch('views_pipeline_core.models.check.ensemble_model_check') + def test_execute_single_run( self, - mock_ensemble_model_check, - mock_update_single_config, - mock_execute_model_tasks, - mock_model_path, + mock_ensemble_manager, args, expected_command, # it is necessary to be here expected_methods_called # it is necessary to be here ): + with patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ + patch("views_pipeline_core.managers.package.PackageManager"), \ + patch('views_pipeline_core.managers.ensemble.EnsembleManager._execute_model_tasks') as mock_execute_model_tasks, \ + patch('views_pipeline_core.managers.model.ModelManager._update_single_config'), \ + patch('views_pipeline_core.managers.ensemble.ensemble_model_check') as mock_ensemble_model_check: - manager = EnsembleManager(ensemble_path=mock_model_path) - manager.config = mock_update_single_config(args) - mock_update_single_config.return_value = {"name": "test_model", "run_type": args.run_type} - - - manager.execute_single_run(args) - - - assert manager._project == f"{manager.config['name']}_{args.run_type}" - assert manager._eval_type == args.eval_type - - if not args.train: - mock_ensemble_model_check(manager.config) - mock_ensemble_model_check.assert_called_once_with(manager.config) - else: - mock_ensemble_model_check.assert_not_called() - - mock_execute_model_tasks( - config=manager.config, - train=args.train, - eval=args.evaluate, - forecast=args.forecast, - use_saved=args.use_saved - ) - mock_execute_model_tasks.assert_called_once_with( - config=manager.config, - train=args.train, - eval=args.evaluate, - forecast=args.forecast, - use_saved=args.use_saved - ) + # Creating EnsembleManager object with the necessary configs + manager = mock_ensemble_manager + + # Testing the function in the Try block + manager.execute_single_run(args) + + # Asserting the attributes + assert manager._project == f"{manager.config['name']}_{args.run_type}" + assert manager._eval_type == args.eval_type + + # Asserting that ensemble_model_check was called appropriately + if not args.train: + mock_ensemble_model_check.assert_called_once_with(manager.config) + else: + mock_ensemble_model_check.assert_not_called() + + # Asserting that _execute_model_tasks was called appropriately + mock_execute_model_tasks.assert_called_once_with( + config=manager.config, + train=args.train, + eval=args.evaluate, + forecast=args.forecast, + use_saved=args.saved + ) - mock_execute_model_tasks.reset_mock() - mock_execute_model_tasks.side_effect = Exception("Test exception") + # Testing the function in the Except block + mock_execute_model_tasks.side_effect = Exception("Test exception") + manager = mock_ensemble_manager + + # Bypassing exit when exception is raised + with pytest.raises(Exception) as exc_info: + manager.execute_single_run(args) + assert str(exc_info.value) == "Test exception" - + # Asserting that the error message was called appropriately + mock_logger.error.assert_any_call(f"Error during single run execution: {mock_execute_model_tasks.side_effect}", exc_info=True) + @@ -221,35 +207,139 @@ def test_execute_single_run( def test_execute_model_tasks( self, - mock_ensemble_manager, mock_config, + mock_model_path, + mock_ensemble_manager, args, - expected_command, - expected_methods_called + expected_command, # it is necessary to be here + expected_methods_called # it is necessary to be here ): - with patch("wandb.init") as mock_init, \ - patch('wandb.define_metric') as mock_define_metric, \ - patch("wandb.config") as mock_config: + with patch("wandb.init"), \ + patch("wandb.AlertLevel") as mock_alert_level, \ + patch("views_pipeline_core.managers.ensemble.add_wandb_metrics") as mock_add_wandb_metrics, \ + patch("wandb.config") as mock_config, \ + patch("views_pipeline_core.managers.ensemble.EnsembleManager._wandb_alert") as mock_wandb_alert, \ + patch("views_pipeline_core.managers.package.PackageManager"), \ + patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ + patch("views_pipeline_core.managers.ensemble.EnsembleManager._train_ensemble") as mock_train_ensemble, \ + patch("views_pipeline_core.managers.ensemble.EnsembleManager._evaluate_ensemble") as mock_evaluate_ensemble, \ + patch("views_pipeline_core.managers.ensemble.EnsembleManager._forecast_ensemble") as mock_forecast_ensemble, \ + patch("views_pipeline_core.managers.ensemble.EnsembleManager._handle_log_creation") as mock_handle_log_creation, \ + patch("views_pipeline_core.managers.ensemble.EnsembleManager._evaluate_prediction_dataframe") as mock_evaluate_prediction_dataframe, \ + patch("views_pipeline_core.managers.ensemble.EnsembleManager._save_predictions") as mock_save_predictions, \ + patch("traceback.format_exc") as mock_format_exc: + + manager = mock_ensemble_manager + + print(args.train, args.evaluate, args.forecast) - mock_config.name = "test_model" - mock_ensemble_manager._execute_model_tasks( + manager._execute_model_tasks( config=mock_config, train=args.train, eval=args.evaluate, forecast=args.forecast, - use_saved=args.use_saved + use_saved=args.saved ) + + mock_add_wandb_metrics.assert_called_once + + if args.train: + mock_logger.info.assert_any_call(f"Training model {manager.config['name']}...") + mock_train_ensemble.assert_called_once_with(args.saved) + mock_wandb_alert.assert_has_calls([ + call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), + call(title=f"Training for {manager._model_path.target} {manager.config['name']} completed successfully.", text=f"", level=mock_alert_level.INFO,), + ], any_order=False) + + if args.evaluate: + mock_logger.info.assert_any_call(f"Evaluating model {manager.config['name']}...") + mock_evaluate_ensemble.assert_called_once_with(manager._eval_type) + mock_handle_log_creation.assert_called_once - assert mock_ensemble_manager._train_ensemble.call_count == expected_methods_called["train"] - assert mock_ensemble_manager._evaluate_ensemble.call_count == expected_methods_called["evaluate"] - assert mock_ensemble_manager._forecast_ensemble.call_count == expected_methods_called["forecast"] + mock_evaluate_prediction_dataframe.assert_called_once_with(manager._evaluate_ensemble(manager._eval_type), ensemble=True) + if args.forecast: + mock_logger.info.assert_any_call(f"Forecasting model {manager.config['name']}...") + mock_forecast_ensemble.assert_called_once + mock_wandb_alert.assert_has_calls([ + call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), + call(title=f"Forecasting for ensemble {manager.config['name']} completed successfully.", level=mock_alert_level.INFO,), + ], any_order=False) + mock_handle_log_creation.assert_called_once + mock_save_predictions.assert_called_once_with(manager._forecast_ensemble(), manager._model_path.data_generated) - mock_init.assert_called_once() - mock_define_metric.assert_called() + minutes = 5.222956339518229e-05 # random number + mock_logger.info.assert_any_call(f"Done. Runtime: {minutes:.3f} minutes.\n") + # reset mock + mock_add_wandb_metrics.reset_mock() + mock_logger.reset_mock() + mock_train_ensemble.reset_mock() + mock_evaluate_ensemble.reset_mock() + mock_forecast_ensemble.reset_mock() + mock_handle_log_creation.reset_mock() + mock_evaluate_prediction_dataframe + mock_save_predictions.reset_mock() + mock_wandb_alert.reset_mock() + + + + + mock_train_ensemble.side_effect = Exception("Train ensemble failed") + mock_evaluate_ensemble.side_effect = Exception("Evaluate ensemble failed") + mock_forecast_ensemble.side_effect = Exception("Forecast ensemble failed") + + manager = mock_ensemble_manager + + with pytest.raises(Exception) as exc_info: + manager._execute_model_tasks( + config=mock_config, + train=args.train, + eval=args.evaluate, + forecast=args.forecast, + use_saved=args.saved + ) + assert str(exc_info.value) in ["Train ensemble failed", "Evaluate ensemble failed", "Forecast ensemble failed"] + + + if args.train: + mock_logger.error.assert_has_calls([ + call(f"{manager._model_path.target.title()} training model: {mock_train_ensemble.side_effect}", exc_info=True), + call(f"Error during model tasks execution: {mock_train_ensemble.side_effect}", exc_info=True) + ]) + mock_wandb_alert.assert_has_calls([ + call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), + call(title=f"{manager._model_path.target.title()} Training Error", text=f"An error occurred during training of {manager._model_path.target} {manager.config['name']}: {mock_format_exc()}", level=mock_alert_level.ERROR,), + call(title=f"{manager._model_path.target.title()} Task Execution Error", text=f"An error occurred during the execution of {manager._model_path.target} tasks for {manager.config['name']}: {mock_train_ensemble.side_effect}", level=mock_alert_level.ERROR,) + ]) + + elif args.evaluate: # elif, since we can use the flags together + mock_logger.error.assert_has_calls([ + call(f"Error evaluating model: {mock_evaluate_ensemble.side_effect}", exc_info=True), + call(f"Error during model tasks execution: {mock_evaluate_ensemble.side_effect}", exc_info=True) + ]) + mock_wandb_alert.assert_has_calls([ + call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), + call(title=f"{manager._model_path.target.title()} Evaluation Error", text=f"An error occurred during evaluation of {manager._model_path.target} {manager.config['name']}: {mock_format_exc()}", level=mock_alert_level.ERROR,), + call(title=f"{manager._model_path.target.title()} Task Execution Error", text=f"An error occurred during the execution of {manager._model_path.target} tasks for {manager.config['name']}: {mock_evaluate_ensemble.side_effect}", level=mock_alert_level.ERROR,) + ]) + + elif args.forecast: + mock_logger.error.assert_has_calls([ + call(f"Error forecasting {manager._model_path.target}: {mock_forecast_ensemble.side_effect}", exc_info=True), + call(f"Error during model tasks execution: {mock_forecast_ensemble.side_effect}", exc_info=True) + ]) + mock_wandb_alert.assert_has_calls([ + call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), + call(title="Model Forecasting Error", text=f"An error occurred during forecasting of {manager._model_path.target} {manager.config['name']}: {mock_format_exc()}", level=mock_alert_level.ERROR,), + call(title=f"{manager._model_path.target.title()} Task Execution Error", text=f"An error occurred during the execution of {manager._model_path.target} tasks for {manager.config['name']}: {mock_forecast_ensemble.side_effect}", level=mock_alert_level.ERROR,) + ], any_order=True) + + #TODO: assert call counts + + @@ -257,334 +347,299 @@ def test_execute_model_tasks( - def test_train_ensemble(self, mock_model_path, mock_ensemble_manager, args, - expected_command, - expected_methods_called): - # Create a mock for the ensemble manager - with patch("views_pipeline_core.managers.ensemble.EnsembleManager._train_model_artifact") as mock_train_model_artifact: - manager = EnsembleManager(ensemble_path=mock_model_path) +# def test_train_ensemble(self, mock_model_path, mock_ensemble_manager, args, +# expected_command, +# expected_methods_called): +# # Create a mock for the ensemble manager +# with patch("views_pipeline_core.managers.ensemble.EnsembleManager._train_model_artifact") as mock_train_model_artifact: +# manager = EnsembleManager(ensemble_path=mock_model_path) - manager.config = { - "run_type": "test_run", - "models": ["/path/to/models/test_model1", "/path/to/models/test_model2"] - } +# manager.config = { +# "run_type": "test_run", +# "models": ["/path/to/models/test_model1", "/path/to/models/test_model2"] +# } - manager._train_ensemble(args.use_saved) +# manager._train_ensemble(args.use_saved) - print("Call count:", mock_train_model_artifact.call_count) - # Check that _train_model_artifact was called the expected number of times - assert mock_train_model_artifact.call_count == len(manager.config["models"]) +# print("Call count:", mock_train_model_artifact.call_count) +# # Check that _train_model_artifact was called the expected number of times +# assert mock_train_model_artifact.call_count == len(manager.config["models"]) - # If there were models, assert that it was called with the expected parameters +# # If there were models, assert that it was called with the expected parameters - for model_name in manager.config["models"]: - mock_train_model_artifact.assert_any_call(model_name, "test_run", args.use_saved) +# for model_name in manager.config["models"]: +# mock_train_model_artifact.assert_any_call(model_name, "test_run", args.use_saved) - def test_evaluate_ensemble(self, mock_model_path, args, - expected_command, - expected_methods_called): - with patch("views_pipeline_core.managers.ensemble.EnsembleManager._evaluate_model_artifact") as mock_evaluate_model_artifact, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._get_aggregated_df") as mock_get_aggregated_df, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._save_predictions") as mock_save_predictions, \ - patch("views_pipeline_core.files.utils.create_log_file") as mock_create_log_file, \ - patch("views_pipeline_core.files.utils.create_specific_log_file") as mock_create_specific_log_file, \ - patch("views_pipeline_core.files.utils.read_log_file") as mock_read_log_file, \ - patch("views_pipeline_core.managers.model.ModelPathManager") as mock_model_path_class, \ - patch("views_pipeline_core.managers.model.ModelPathManager._get_model_dir") as mock_get_model_dir, \ - patch("views_pipeline_core.managers.model.ModelPathManager._build_absolute_directory") as mock_build_absolute_directory: +# def test_evaluate_ensemble(self, mock_model_path, args, +# expected_command, +# expected_methods_called): +# with patch("views_pipeline_core.managers.ensemble.EnsembleManager._evaluate_model_artifact") as mock_evaluate_model_artifact, \ +# patch("views_pipeline_core.managers.ensemble.EnsembleManager._get_aggregated_df") as mock_get_aggregated_df, \ +# patch("views_pipeline_core.managers.model.ModelPathManager") as mock_model_path_class, \ +# patch("views_pipeline_core.managers.model.ModelPathManager._get_model_dir") as mock_get_model_dir, \ +# patch("views_pipeline_core.managers.model.ModelPathManager._build_absolute_directory") as mock_build_absolute_directory: - mock_model_path_instance = mock_model_path_class.return_value +# mock_model_path_instance = mock_model_path_class.return_value - mock_model_path_instance._initialize_directories() +# mock_model_path_instance._initialize_directories() - mock_evaluate_model_artifact.side_effect = [ - [{"prediction": 1}, {"prediction": 2}], - [{"prediction": 3}, {"prediction": 4}] - ] - mock_get_aggregated_df.side_effect = [ - {"ensemble_prediction": 1.5}, - {"ensemble_prediction": 3.0} - ] - - mock_read_log_file.return_value = { - "Deployment Status": "test_status", - "Single Model Timestamp": "20241209_123456", - "Data Generation Timestamp": "20241209_123000", - "Data Fetch Timestamp": "20241209_120000", - } - - manager = EnsembleManager(ensemble_path=mock_model_path_instance) - manager.config = { - "run_type": "test_run", - "models": ["test_model", "test_model"], - "name": "test_ensemble", - "deployment_status": "test_status", - "aggregation": "mean", - } - - manager._evaluate_ensemble(args.eval_type) - - assert mock_evaluate_model_artifact.call_count == len(manager.config["models"]) - mock_get_aggregated_df.assert_called() - mock_save_predictions.assert_called() - mock_read_log_file.assert_called() - mock_create_specific_log_file.assert_called() - - - # This is just not working: - # mock_create_log_file.assert_called_once_with( - # Path("/mock/path/generated"), - # manager.config, - # ANY, # Timestamp - # ANY, # Timestamp - # ANY, # Data fetch timestamp - # model_type="ensemble", - # models=manager.config["models"] - # ) - - - - - def test_forecast_ensemble(self, mock_model_path, args, - expected_command, - expected_methods_called): - # Mock all required methods and classes - with patch("views_pipeline_core.managers.ensemble.EnsembleManager._forecast_model_artifact") as mock_forecast_model_artifact, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._get_aggregated_df") as mock_get_aggregated_df, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._save_predictions") as mock_save_predictions, \ - patch("views_pipeline_core.files.utils.create_log_file") as mock_create_log_file, \ - patch("views_pipeline_core.files.utils.create_specific_log_file") as mock_create_specific_log_file, \ - patch("views_pipeline_core.files.utils.read_log_file") as mock_read_log_file, \ - patch("views_pipeline_core.managers.model.ModelPathManager") as mock_model_path_class, \ - patch("views_pipeline_core.managers.model.ModelPathManager._get_model_dir") as mock_get_model_dir, \ - patch("views_pipeline_core.managers.model.ModelPathManager._build_absolute_directory") as mock_build_absolute_directory: - - mock_model_path_instance = mock_model_path.return_value - mock_model_path_instance._initialize_directories() - - mock_forecast_model_artifact.side_effect = [ - {"model_name": "test_model", "prediction": 1}, - {"model_name": "test_model", "prediction": 2} - ] - - mock_get_aggregated_df.return_value = {"ensemble_prediction": 1.5} - - mock_read_log_file.return_value = { - "Deployment Status": "test_status", - "Single Model Timestamp": "20241209_123456", - "Data Generation Timestamp": "20241209_123000", - "Data Fetch Timestamp": "20241209_120000", - } - - mock_create_specific_log_file.return_value = { - "Model Type": "Single", - "Model Name": "test_model", - "Model Timestamp": "20241209_123456", - "Data Generation Timestamp": "20241209_123000", - "Data Fetch Timestamp": "20241209_120000", - "Deployment Status": "test_status" - } - - manager = EnsembleManager(ensemble_path=mock_model_path_instance) - manager.config = { - "run_type": "test_run", - "models": ["test_model", "test_model"], - "name": "test_ensemble", - "deployment_status": "test_status", - "aggregation": "mean", - } - - manager._forecast_ensemble() - - assert mock_forecast_model_artifact.call_count == len(manager.config["models"]) - assert mock_get_aggregated_df.call_count == 1 - assert mock_save_predictions.call_count == 1 - - # This is not working for the same reason - # mock_create_log_file.assert_called_once_with( - # Path("/mock/path/generated"), - # manager.config, - # ANY, # model_timestamp - # ANY, # data_generation_timestamp - # data_fetch_timestamp=None, - # model_type="ensemble", - # models=manager.config["models"] - # ) - - - - - - def test_train_model_artifact(self, mock_model_path, args, - expected_command, - expected_methods_called): - # Mocking required methods and classes - with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_class, \ - patch("views_pipeline_core.managers.ensemble.ModelManager") as mock_model_manager_class, \ - patch("views_pipeline_core.managers.ensemble.subprocess.run") as mock_subprocess_run, \ - patch("views_pipeline_core.managers.ensemble.logger") as mock_logger: - - mock_model_path_instance = mock_model_path_class.return_value - - # Use PropertyMock to mock model_dir property - type(mock_model_path_instance).model_dir = PropertyMock(return_value="/mock/path/to/model") - +# mock_evaluate_model_artifact.side_effect = [ +# [{"prediction": 1}, {"prediction": 2}], +# [{"prediction": 3}, {"prediction": 4}] +# ] +# mock_get_aggregated_df.side_effect = [ +# {"ensemble_prediction": 1.5}, +# {"ensemble_prediction": 3.0} +# ] + + + +# manager = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager.config = { +# "run_type": "test_run", +# "models": ["test_model", "test_model"], +# "name": "test_ensemble", +# "deployment_status": "test_status", +# "aggregation": "mean", +# } + +# manager._evaluate_ensemble(args.eval_type) + +# assert mock_evaluate_model_artifact.call_count == len(manager.config["models"]) +# mock_get_aggregated_df.assert_called() + +# # This is just not working: +# # mock_create_log_file.assert_called_once_with( +# # Path("/mock/path/generated"), +# # manager.config, +# # ANY, # Timestamp +# # ANY, # Timestamp +# # ANY, # Data fetch timestamp +# # model_type="ensemble", +# # models=manager.config["models"] +# # ) + + + + +# def test_forecast_ensemble(self, mock_model_path, args, +# expected_command, +# expected_methods_called): +# # Mock all required methods and classes +# with patch("views_pipeline_core.managers.ensemble.EnsembleManager._forecast_model_artifact") as mock_forecast_model_artifact, \ +# patch("views_pipeline_core.managers.ensemble.EnsembleManager._get_aggregated_df") as mock_get_aggregated_df, \ +# patch("views_pipeline_core.managers.model.ModelPathManager") as mock_model_path_class, \ +# patch("views_pipeline_core.managers.model.ModelPathManager._get_model_dir") as mock_get_model_dir, \ +# patch("views_pipeline_core.managers.model.ModelPathManager._build_absolute_directory") as mock_build_absolute_directory: + +# mock_model_path_instance = mock_model_path.return_value +# mock_model_path_instance._initialize_directories() + +# mock_forecast_model_artifact.side_effect = [ +# {"model_name": "test_model", "prediction": 1}, +# {"model_name": "test_model", "prediction": 2} +# ] - # Mock the ModelManager instance and its configs - mock_model_manager_instance = MagicMock() - mock_model_manager_class.return_value = mock_model_manager_instance - mock_model_manager_instance.configs = {"model_name": "test_model", "run_type": "test_run"} +# mock_get_aggregated_df.return_value = {"ensemble_prediction": 1.5} - # Mock subprocess.run to simulate successful shell command execution - mock_subprocess_run.return_value = None # Simulate success (no exception thrown) +# manager = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager.config = { +# "run_type": "test_run", +# "models": ["test_model", "test_model"], +# "name": "test_ensemble", +# "deployment_status": "test_status", +# "aggregation": "mean", +# } - # Instantiate the manager and set up the config - manager = EnsembleManager(ensemble_path=mock_model_path) - manager.config = { - "run_type": "test_run", - "models": ["test_model"], - "name": "test_ensemble", - "deployment_status": "test_status", - "aggregation": "mean", - } - - # Call the method under test - manager._train_model_artifact("test_model", "test_run", use_saved=args.use_saved) - - # Assert that subprocess.run is called once - mock_subprocess_run.assert_called_once_with( - ANY, # Command should be flexible, so we use ANY - check=True - ) +# manager._forecast_ensemble() - # Assert that the logger's info method was called - mock_logger.info.assert_called_with("Training single model test_model...") - - # Assert that the correct shell command was generated - shell_command = EnsembleManager._get_shell_command( - mock_model_path_instance, - "test_run", - train=True, - evaluate=False, - forecast=False, - use_saved=args.use_saved - ) +# assert mock_forecast_model_artifact.call_count == len(manager.config["models"]) +# assert mock_get_aggregated_df.call_count == 1 + +# # This is not working for the same reason +# # mock_create_log_file.assert_called_once_with( +# # Path("/mock/path/generated"), +# # manager.config, +# # ANY, # model_timestamp +# # ANY, # data_generation_timestamp +# # data_fetch_timestamp=None, +# # model_type="ensemble", +# # models=manager.config["models"] +# # ) + + + + + +# def test_train_model_artifact(self, mock_model_path, args, +# expected_command, +# expected_methods_called): +# # Mocking required methods and classes +# with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_class, \ +# patch("views_pipeline_core.managers.ensemble.ModelManager") as mock_model_manager_class, \ +# patch("views_pipeline_core.managers.ensemble.subprocess.run") as mock_subprocess_run, \ +# patch("views_pipeline_core.managers.ensemble.logger") as mock_logger: + +# mock_model_path_instance = mock_model_path_class.return_value - mock_subprocess_run.assert_called_once_with(shell_command, check=True) - - mock_logger.info.assert_called_with("Training single model test_model...") +# # Use PropertyMock to mock model_dir property +# type(mock_model_path_instance).model_dir = PropertyMock(return_value="/mock/path/to/model") - # If an exception is thrown during subprocess.run, assert logger error - mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, 'command') - mock_exception = subprocess.CalledProcessError(1, 'command') - manager._train_model_artifact("test_model", "test_run", use_saved=False) - expected_error_message = "Error during shell command execution for model test_model: " + str(mock_exception) - mock_logger.error.assert_called_with(expected_error_message) + +# # Mock the ModelManager instance and its configs +# mock_model_manager_instance = MagicMock() +# mock_model_manager_class.return_value = mock_model_manager_instance +# mock_model_manager_instance.configs = {"model_name": "test_model", "run_type": "test_run"} + +# # Mock subprocess.run to simulate successful shell command execution +# mock_subprocess_run.return_value = None # Simulate success (no exception thrown) + +# # Instantiate the manager and set up the config +# manager = EnsembleManager(ensemble_path=mock_model_path) +# manager.config = { +# "run_type": "test_run", +# "models": ["test_model"], +# "name": "test_ensemble", +# "deployment_status": "test_status", +# "aggregation": "mean", +# } + +# # Call the method under test +# manager._train_model_artifact("test_model", "test_run", use_saved=args.use_saved) + +# # Assert that subprocess.run is called once +# mock_subprocess_run.assert_called_once_with( +# ANY, # Command should be flexible, so we use ANY +# check=True +# ) + +# # Assert that the logger's info method was called +# mock_logger.info.assert_called_with("Training single model test_model...") + +# # Assert that the correct shell command was generated +# shell_command = EnsembleManager._get_shell_command( +# mock_model_path_instance, +# "test_run", +# train=True, +# evaluate=False, +# forecast=False, +# use_saved=args.use_saved +# ) + +# mock_subprocess_run.assert_called_once_with(shell_command, check=True) + +# mock_logger.info.assert_called_with("Training single model test_model...") +# # If an exception is thrown during subprocess.run, assert logger error +# mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, 'command') +# mock_exception = subprocess.CalledProcessError(1, 'command') +# manager._train_model_artifact("test_model", "test_run", use_saved=False) +# expected_error_message = "Error during shell command execution for model test_model: " + str(mock_exception) +# mock_logger.error.assert_called_with(expected_error_message) - def test_evaluate_model_artifact(self, args, expected_command, expected_methods_called): - # Mocking required methods and classes - with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_class, \ - patch("views_pipeline_core.managers.ensemble.ModelManager") as mock_model_manager_class, \ - patch("views_pipeline_core.managers.ensemble.subprocess.run") as mock_subprocess_run, \ - patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ - patch("views_pipeline_core.managers.ensemble.read_log_file") as mock_read_log_file, \ - patch("views_pipeline_core.managers.ensemble.create_log_file") as mock_create_log_file, \ - patch("views_pipeline_core.managers.ensemble.read_dataframe") as mock_read_dataframe, \ - patch("pathlib.Path.exists") as mock_path_exists, \ - patch("views_pipeline_core.configs.pipeline.PipelineConfig") as mock_pipeline_config: - # Mock the ModelPath instance and its attributes - mock_model_path_instance = mock_model_path_class.return_value - mock_artifact_path = MagicMock() - mock_artifact_path.stem = "predictions_test_run_202401011200000" - mock_model_path_instance.get_latest_model_artifact_path.return_value = mock_artifact_path +# def test_evaluate_model_artifact(self, args, expected_command, expected_methods_called): + +# with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_manager, \ +# patch("subprocess.run") as mock_subprocess_run, \ +# patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ +# patch("views_pipeline_core.managers.ensemble.read_log_file") as mock_read_log_file, \ +# patch("views_pipeline_core.managers.ensemble.create_log_file") as mock_create_log_file, \ +# patch("views_forecasts.extensions.ForecastAccessor.read_store") as mock_read_store, \ +# patch("views_pipeline_core.managers.ensemble.ModelManager._resolve_evaluation_sequence_number") as mock_resolve, \ +# patch("views_pipeline_core.managers.package.PackageManager") as mock_PackageManager, \ +# patch.object(ModelManager, '_ModelManager__load_config') as mock_load_config: - mock_dataframe_format = ".parquet" - mock_pipeline_config.dataframe_format = mock_dataframe_format +# mock_resolve.return_value = 5 + +# mock_read_store.return_value = pd.DataFrame({"a": [1, 2, 3]}) - #mock_model_path_instance.data_raw = "/mock/path/raw" - mock_model_path_instance.data_generated = "/mock/path/generated" - - # Mock the ModelManager instance and its configs - mock_model_manager_instance = mock_model_manager_class.return_value - mock_model_manager_instance.configs = {"model_name": "test_model", "run_type": "test_run"} +# # Mock the ModelPath instance and its attributes +# mock_model_path_instance = mock_model_path_manager.return_value +# mock_artifact_path = MagicMock() +# mock_artifact_path.stem = "predictions_test_run_202401011200000" +# mock_model_path_instance.get_latest_model_artifact_path.return_value = mock_artifact_path - # Mock the read_log_file function to return a specific log data - mock_read_log_file.return_value = {"Data Fetch Timestamp": "2024-12-11T12:00:00"} +# # mock_dataframe_format = ".parquet" +# # mock_pipeline_config.dataframe_format = mock_dataframe_format +# #mock_model_path_instance.data_raw = "/mock/path/raw" +# mock_model_path_instance.data_generated = "/mock/path/generated" - # Instantiate the manager and set up the config - manager = EnsembleManager(ensemble_path=mock_model_path_instance) - manager.config = { - "run_type": "test_run", - "models": ["test_model", "test_model2"], - "name": "test_ensemble", - "deployment_status": "test_status", - "aggregation": "mean", - } - # Call the method under test - result = manager._evaluate_model_artifact("test_model", "test_run", eval_type="standard") - mock_logger.info.assert_any_call("Evaluating single model test_model...") - expected_file_path = ( - f"/mock/path/generated/predictions_test_run_202401011200000_00{mock_dataframe_format}" - ) +# # Instantiate the manager and set up the config +# manager = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager._evaluate_model_artifact("test_model", "test_run", eval_type="standard") +# mock_logger.info.assert_any_call("Evaluating single model test_model...") + + + +# for sequence_number in range(mock_resolve.return_value): +# mock_logger.info.assert_any_call(f"Loading existing prediction test_model_{mock_artifact_path.stem}_{sequence_number:02} from prediction store") - mock_logger.info.assert_any_call(f"Loading existing test_run predictions from {expected_file_path}") - # mock_file_open.assert_called_with( - # expected_file_path, "rb" - # ) - - - - mock_path_exists.return_value= False - # Generate the expected shell command - shell_command = EnsembleManager._get_shell_command( - mock_model_path_instance, - "test_run", - train=False, - evaluate=True, - forecast=False, - use_saved=True, - eval_type="standard" - ) - #mock_path_exists.side_effect = False # Simulate missing file - result = manager._evaluate_model_artifact("test_model", "test_run", eval_type="standard") - mock_logger.info.assert_any_call("No existing test_run predictions found. Generating new test_run predictions...") - - # Assert that subprocess.run is called once with the correct command - mock_subprocess_run.assert_called_once_with( - shell_command, - check=True - ) +# mock_read_store.side_effect = [item for _ in range(mock_resolve.return_value) for item in [ +# Exception("Test exception"), +# pd.DataFrame({"a": [1, 2, 3]}), +# ]] + +# manager_side = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager_side._evaluate_model_artifact("test_model", "test_run", eval_type="standard") + +# print("here") +# # Generate the expected shell command +# # shell_command = EnsembleManager._get_shell_command( +# # mock_model_path_instance, +# # "test_run", +# # train=False, +# # evaluate=True, +# # forecast=False, +# # use_saved=True, +# # eval_type="standard" +# # ) +# # mock_subprocess_run.assert_called_once_with( +# # shell_command, +# # check=True +# # ) +# mock_logger.info.assert_any_call("No existing test_run predictions found. Generating new test_run predictions...") +# print("after first side") +# print("mock_subprocess_run.call_args_list",mock_subprocess_run.call_args_list) +# mock_read_store.side_effect = [item for _ in range(mock_resolve.return_value) for item in [ +# Exception("Test exception"), +# pd.DataFrame({"a": [1, 2, 3]}), +# ]] + +# mock_subprocess_run.side_effect = [Exception("Subprocess failed") for _ in range(mock_resolve.return_value)] - mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, 'command') - mock_exception = subprocess.CalledProcessError(1, 'command') - manager._evaluate_model_artifact("test_model", "test_run", eval_type="standard") - expected_error_message = "Error during shell command execution for model test_model: " + str(mock_exception) - mock_logger.error.assert_called_with(expected_error_message) +# manager_side2 = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager_side2._evaluate_model_artifact("test_model", "test_run", eval_type="standard") +# print("mock_logger.info.call_args_list",mock_logger.info.call_args_list) +# print("mock_logger.error.call_args_list",mock_logger.error.call_args_list) +# mock_logger.error.assert_any_call("Error during shell command execution for model test_model: Subprocess failed") + + - assert mock_create_log_file.call_count==2 - assert mock_logger.error.call_count ==1 - assert mock_read_log_file.call_count==2 +# assert mock_create_log_file.call_count==10 +# assert mock_logger.error.call_count ==5 +# assert mock_logger.info.call_count ==18 +# assert mock_read_log_file.call_count==10 + @@ -594,94 +649,60 @@ def test_evaluate_model_artifact(self, args, expected_command, expected_methods_ - def test_forecast_model_artifact(self, mock_model_path, args, expected_command, expected_methods_called): - # Mocking required methods and classes - with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_class, \ - patch("views_pipeline_core.managers.ensemble.ModelManager") as mock_model_manager_class, \ - patch("views_pipeline_core.managers.ensemble.subprocess.run") as mock_subprocess_run, \ - patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ - patch("views_pipeline_core.managers.ensemble.read_log_file") as mock_read_log_file, \ - patch("views_pipeline_core.managers.ensemble.create_log_file") as mock_create_log_file, \ - patch("views_pipeline_core.managers.ensemble.read_dataframe") as mock_read_dataframe, \ - patch("pathlib.Path.exists") as mock_path_exists, \ - patch("views_pipeline_core.configs.pipeline.PipelineConfig") as mock_pipeline_config: - - - # Mock the ModelPath instance and its attributes - mock_model_path_instance = mock_model_path_class.return_value - mock_artifact_path = MagicMock() - mock_artifact_path.stem = "predictions_test_run_202401011200000" - mock_model_path_instance.get_latest_model_artifact_path.return_value = mock_artifact_path - - mock_dataframe_format = ".parquet" - mock_pipeline_config.dataframe_format = mock_dataframe_format - - mock_model_path_instance.data_generated = "/mock/path/generated" - - # Mock the ModelManager instance and its configs - mock_model_manager_instance = mock_model_manager_class.return_value - - mock_model_manager_instance.configs = {"model_name": "test_model", "run_type": "test_run"} - - # Mock the read_log_file function to return a specific log data - mock_read_log_file.return_value = {"Data Fetch Timestamp": "2024-12-11T12:00:00"} - - mock_path_exists.return_value= True - # Instantiate the manager and set up the config - manager = EnsembleManager(ensemble_path=mock_model_path_instance) - manager.config = { - "run_type": "test_run", - "models": ["test_model"], - "name": "test_ensemble", - "deployment_status": "test_status", - "aggregation": "mean", - } - # Call the method under test - result = manager._forecast_model_artifact("test_model", "test_run") - mock_logger.info.assert_any_call("Forecasting single model test_model...") - - expected_file_path = ( - f"/mock/path/generated/predictions_test_run_202401011200000{mock_dataframe_format}" - ) - - mock_logger.info.assert_any_call(f"Loading existing test_run predictions from {expected_file_path}") - - mock_path_exists.return_value= False - - # Generate the expected shell command - shell_command = EnsembleManager._get_shell_command( - mock_model_path_instance, - "test_run", - train=False, - evaluate=False, - forecast=True, - use_saved=True, - eval_type="standard" - ) - #mock_path_exists.side_effect = False # Simulate missing file - result = manager._forecast_model_artifact("test_model", "test_run") - mock_logger.info.assert_any_call("No existing test_run predictions found. Generating new test_run predictions...") - - # Assert that subprocess.run is called once with the correct command - mock_subprocess_run.assert_called_once_with( - shell_command, - check=True - ) +# def test_forecast_model_artifact(self, mock_model_path, args, expected_command, expected_methods_called): + +# with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_manager, \ +# patch("subprocess.run") as mock_subprocess_run, \ +# patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ +# patch("views_pipeline_core.managers.ensemble.read_log_file") as mock_read_log_file, \ +# patch("views_pipeline_core.managers.ensemble.create_log_file") as mock_create_log_file, \ +# patch("views_forecasts.extensions.ForecastAccessor.read_store") as mock_read_store, \ +# patch("views_pipeline_core.managers.package.PackageManager") as mock_PackageManager, \ +# patch.object(ModelManager, '_ModelManager__load_config') as mock_load_config: + +# # mock get_latest_model_artifact_path +# mock_model_path_instance = mock_model_path_manager.return_value +# mock_artifact_path = MagicMock() +# mock_artifact_path.stem = "predictions_test_run_202401011200000" +# mock_model_path_instance.get_latest_model_artifact_path.return_value = mock_artifact_path +# # Try block +# mock_read_store.return_value = pd.DataFrame({"a": [1, 2, 3]}) +# manager = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager._forecast_model_artifact("test_model", "test_run") + +# mock_logger.info.assert_any_call("Forecasting single model test_model...") +# expected_name = (f"test_model_{mock_artifact_path.stem}") +# mock_logger.info.assert_any_call(f"Loading existing prediction {expected_name} from prediction store") + +# # Except block for read_store, Try block for subprocess +# mock_read_store.side_effect = [ +# Exception("Test exception"), +# pd.DataFrame({"a": [1, 2, 3]}), +# ] +# manager_side = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager_side._forecast_model_artifact("test_model", "test_run") - mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, 'command') - mock_exception = subprocess.CalledProcessError(1, 'command') - manager._evaluate_model_artifact("test_model", "test_run", eval_type="standard") - expected_error_message = "Error during shell command execution for model test_model: " + str(mock_exception) - mock_logger.error.assert_called_with(expected_error_message) - +# mock_logger.info.assert_any_call("No existing test_run predictions found. Generating new test_run predictions...") + +# # Except block for read_store, Except block for subprocess +# mock_read_store.side_effect = [ +# Exception("Test exception"), +# pd.DataFrame({"a": [1, 2, 3]}), +# ] +# mock_subprocess_run.side_effect = Exception("Subprocess failed"), + +# manager_side2 = EnsembleManager(ensemble_path=mock_model_path_instance) +# manager_side2._forecast_model_artifact("test_model", "test_run") +# mock_logger.error.assert_any_call("Error during shell command execution for model test_model: Subprocess failed") - assert mock_create_log_file.call_count==2 - assert mock_logger.error.call_count ==1 - assert mock_read_log_file.call_count==2 +# assert mock_create_log_file.call_count==2 +# assert mock_logger.error.call_count ==1 +# assert mock_logger.info.call_count ==6 +# assert mock_read_log_file.call_count==2 From 615dc689023de7a0ae2e1e58091cb8c6ae6309fb Mon Sep 17 00:00:00 2001 From: Dylan <52908667+smellycloud@users.noreply.github.com> Date: Mon, 27 Jan 2025 14:47:11 +0100 Subject: [PATCH 5/5] delete vpn dependent tests for now --- tests/test_ensemble_manager.py | 768 --------------------------------- tests/test_model_manager.py | 390 ----------------- 2 files changed, 1158 deletions(-) delete mode 100644 tests/test_ensemble_manager.py delete mode 100644 tests/test_model_manager.py diff --git a/tests/test_ensemble_manager.py b/tests/test_ensemble_manager.py deleted file mode 100644 index 0e799cf..0000000 --- a/tests/test_ensemble_manager.py +++ /dev/null @@ -1,768 +0,0 @@ -import pytest -import unittest -import pickle -from unittest.mock import patch, MagicMock, ANY, PropertyMock, mock_open, call -from views_pipeline_core.managers.ensemble import EnsembleManager -from views_pipeline_core.managers.model import ModelPathManager, ModelManager -import pandas as pd -import wandb -import subprocess - -class MockArgs: - def __init__(self, train, evaluate, forecast, saved, run_type, eval_type): - self.train = train - self.evaluate = evaluate - self.forecast = forecast - self.saved = saved - self.run_type = run_type - self.eval_type = eval_type - -@pytest.fixture -def mock_model_path(): - mock_path = MagicMock() - mock_path.model_dir = "/path/to/models/test_model" - return mock_path - - - - - - - - - - - -@pytest.mark.parametrize( - "args, expected_command, expected_methods_called", - [ - ( - MockArgs( - train=True, # Simulate training - evaluate=False, # Simulate no evaluation - forecast=False, # Simulate no forecasting - saved=False, # Simulate using used_saved data - run_type="test", # Example run type - eval_type="standard" # Example eval type - ), - [ - "/path/to/models/test_model/run.sh", - "--run_type", "test", - "--train", - "--eval_type", "standard" - ], - {"train": 1, "evaluate": 0, "forecast": 0} - ), - ( - MockArgs( - train=False, # Simulate no training - evaluate=True, # Simulate evaluation - forecast=True, # Simulate forecasting - saved=False, # Simulate not using used_saved data - run_type="forecast", # Example run type - eval_type="detailed" # Example eval type - ), - [ - "/path/to/models/test_model/run.sh", - "--run_type", "forecast", - "--evaluate", - "--forecast", - "--eval_type", "detailed" - ], - {"train": 0, "evaluate": 1, "forecast": 1} - ), - ( - MockArgs( - train=False, # Simulate no training - evaluate=False, # Simulate no evaluation - forecast=True, # Simulate forecasting - saved=True, # Simulate using saved data - run_type="calibration", # Example run type - eval_type="minimal" # Example eval type - ), - [ - "/path/to/models/test_model/run.sh", - "--run_type", "calibration", - "--forecast", - "--saved", - "--eval_type", "minimal" - ], - {"train": 0, "evaluate": 0, "forecast": 1} - ) - ] -) -class TestParametrized(): - - - @pytest.fixture - def mock_config(self, args): - with patch("views_pipeline_core.managers.model.ModelManager._update_single_config") as mock_update_single_config: - print(mock_update_single_config(args)) - return { - "name": "test_model", - "parameter1": "value1" - } - - - @pytest.fixture - def mock_ensemble_manager(self, mock_model_path, args): - with patch.object(ModelManager, '_ModelManager__load_config'), \ - patch("views_pipeline_core.managers.package.PackageManager"): - - manager = EnsembleManager(ensemble_path=mock_model_path) - #manager.config = mock_update_single_config(args) - manager._project = "test_project" - manager._eval_type = args.eval_type - return manager - - - - - - - - - def test_get_shell_command(self, mock_model_path, args, expected_command, expected_methods_called): # all arguments are necessary - """ - Test the _get_shell_command method with various input combinations to ensure it generates the correct shell command. - """ - # Directly use mock_args since it's already a SimpleNamespace object - command = EnsembleManager._get_shell_command( - mock_model_path, - args.run_type, - args.train, - args.evaluate, - args.forecast, - args.saved, - args.eval_type - ) - assert command == expected_command - - - - - - - - - - - - def test_execute_single_run( - self, - mock_ensemble_manager, - args, - expected_command, # it is necessary to be here - expected_methods_called # it is necessary to be here - ): - with patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ - patch("views_pipeline_core.managers.package.PackageManager"), \ - patch('views_pipeline_core.managers.ensemble.EnsembleManager._execute_model_tasks') as mock_execute_model_tasks, \ - patch('views_pipeline_core.managers.model.ModelManager._update_single_config'), \ - patch('views_pipeline_core.managers.ensemble.ensemble_model_check') as mock_ensemble_model_check: - - # Creating EnsembleManager object with the necessary configs - manager = mock_ensemble_manager - - # Testing the function in the Try block - manager.execute_single_run(args) - - # Asserting the attributes - assert manager._project == f"{manager.config['name']}_{args.run_type}" - assert manager._eval_type == args.eval_type - - # Asserting that ensemble_model_check was called appropriately - if not args.train: - mock_ensemble_model_check.assert_called_once_with(manager.config) - else: - mock_ensemble_model_check.assert_not_called() - - # Asserting that _execute_model_tasks was called appropriately - mock_execute_model_tasks.assert_called_once_with( - config=manager.config, - train=args.train, - eval=args.evaluate, - forecast=args.forecast, - use_saved=args.saved - ) - - # Testing the function in the Except block - mock_execute_model_tasks.side_effect = Exception("Test exception") - manager = mock_ensemble_manager - - # Bypassing exit when exception is raised - with pytest.raises(Exception) as exc_info: - manager.execute_single_run(args) - assert str(exc_info.value) == "Test exception" - - # Asserting that the error message was called appropriately - mock_logger.error.assert_any_call(f"Error during single run execution: {mock_execute_model_tasks.side_effect}", exc_info=True) - - - - - - - - - def test_execute_model_tasks( - self, - mock_config, - mock_model_path, - mock_ensemble_manager, - args, - expected_command, # it is necessary to be here - expected_methods_called # it is necessary to be here - ): - - with patch("wandb.init"), \ - patch("wandb.AlertLevel") as mock_alert_level, \ - patch("views_pipeline_core.managers.ensemble.add_wandb_metrics") as mock_add_wandb_metrics, \ - patch("wandb.config") as mock_config, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._wandb_alert") as mock_wandb_alert, \ - patch("views_pipeline_core.managers.package.PackageManager"), \ - patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._train_ensemble") as mock_train_ensemble, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._evaluate_ensemble") as mock_evaluate_ensemble, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._forecast_ensemble") as mock_forecast_ensemble, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._handle_log_creation") as mock_handle_log_creation, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._evaluate_prediction_dataframe") as mock_evaluate_prediction_dataframe, \ - patch("views_pipeline_core.managers.ensemble.EnsembleManager._save_predictions") as mock_save_predictions, \ - patch("traceback.format_exc") as mock_format_exc: - - manager = mock_ensemble_manager - - print(args.train, args.evaluate, args.forecast) - - manager._execute_model_tasks( - config=mock_config, - train=args.train, - eval=args.evaluate, - forecast=args.forecast, - use_saved=args.saved - ) - - mock_add_wandb_metrics.assert_called_once - - if args.train: - mock_logger.info.assert_any_call(f"Training model {manager.config['name']}...") - mock_train_ensemble.assert_called_once_with(args.saved) - mock_wandb_alert.assert_has_calls([ - call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), - call(title=f"Training for {manager._model_path.target} {manager.config['name']} completed successfully.", text=f"", level=mock_alert_level.INFO,), - ], any_order=False) - - - if args.evaluate: - mock_logger.info.assert_any_call(f"Evaluating model {manager.config['name']}...") - mock_evaluate_ensemble.assert_called_once_with(manager._eval_type) - mock_handle_log_creation.assert_called_once - - mock_evaluate_prediction_dataframe.assert_called_once_with(manager._evaluate_ensemble(manager._eval_type), ensemble=True) - - if args.forecast: - mock_logger.info.assert_any_call(f"Forecasting model {manager.config['name']}...") - mock_forecast_ensemble.assert_called_once - mock_wandb_alert.assert_has_calls([ - call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), - call(title=f"Forecasting for ensemble {manager.config['name']} completed successfully.", level=mock_alert_level.INFO,), - ], any_order=False) - mock_handle_log_creation.assert_called_once - mock_save_predictions.assert_called_once_with(manager._forecast_ensemble(), manager._model_path.data_generated) - - minutes = 5.222956339518229e-05 # random number - mock_logger.info.assert_any_call(f"Done. Runtime: {minutes:.3f} minutes.\n") - - # reset mock - mock_add_wandb_metrics.reset_mock() - mock_logger.reset_mock() - mock_train_ensemble.reset_mock() - mock_evaluate_ensemble.reset_mock() - mock_forecast_ensemble.reset_mock() - mock_handle_log_creation.reset_mock() - mock_evaluate_prediction_dataframe - mock_save_predictions.reset_mock() - mock_wandb_alert.reset_mock() - - - - - mock_train_ensemble.side_effect = Exception("Train ensemble failed") - mock_evaluate_ensemble.side_effect = Exception("Evaluate ensemble failed") - mock_forecast_ensemble.side_effect = Exception("Forecast ensemble failed") - - manager = mock_ensemble_manager - - with pytest.raises(Exception) as exc_info: - manager._execute_model_tasks( - config=mock_config, - train=args.train, - eval=args.evaluate, - forecast=args.forecast, - use_saved=args.saved - ) - assert str(exc_info.value) in ["Train ensemble failed", "Evaluate ensemble failed", "Forecast ensemble failed"] - - - if args.train: - mock_logger.error.assert_has_calls([ - call(f"{manager._model_path.target.title()} training model: {mock_train_ensemble.side_effect}", exc_info=True), - call(f"Error during model tasks execution: {mock_train_ensemble.side_effect}", exc_info=True) - ]) - mock_wandb_alert.assert_has_calls([ - call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), - call(title=f"{manager._model_path.target.title()} Training Error", text=f"An error occurred during training of {manager._model_path.target} {manager.config['name']}: {mock_format_exc()}", level=mock_alert_level.ERROR,), - call(title=f"{manager._model_path.target.title()} Task Execution Error", text=f"An error occurred during the execution of {manager._model_path.target} tasks for {manager.config['name']}: {mock_train_ensemble.side_effect}", level=mock_alert_level.ERROR,) - ]) - - elif args.evaluate: # elif, since we can use the flags together - mock_logger.error.assert_has_calls([ - call(f"Error evaluating model: {mock_evaluate_ensemble.side_effect}", exc_info=True), - call(f"Error during model tasks execution: {mock_evaluate_ensemble.side_effect}", exc_info=True) - ]) - mock_wandb_alert.assert_has_calls([ - call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), - call(title=f"{manager._model_path.target.title()} Evaluation Error", text=f"An error occurred during evaluation of {manager._model_path.target} {manager.config['name']}: {mock_format_exc()}", level=mock_alert_level.ERROR,), - call(title=f"{manager._model_path.target.title()} Task Execution Error", text=f"An error occurred during the execution of {manager._model_path.target} tasks for {manager.config['name']}: {mock_evaluate_ensemble.side_effect}", level=mock_alert_level.ERROR,) - ]) - - elif args.forecast: - mock_logger.error.assert_has_calls([ - call(f"Error forecasting {manager._model_path.target}: {mock_forecast_ensemble.side_effect}", exc_info=True), - call(f"Error during model tasks execution: {mock_forecast_ensemble.side_effect}", exc_info=True) - ]) - mock_wandb_alert.assert_has_calls([ - call(title="Running Ensemble", text=f"Ensemble Name: {str(manager.config['name'])}\nConstituent Models: {str(manager.config['models'])}", level=mock_alert_level.INFO,), - call(title="Model Forecasting Error", text=f"An error occurred during forecasting of {manager._model_path.target} {manager.config['name']}: {mock_format_exc()}", level=mock_alert_level.ERROR,), - call(title=f"{manager._model_path.target.title()} Task Execution Error", text=f"An error occurred during the execution of {manager._model_path.target} tasks for {manager.config['name']}: {mock_forecast_ensemble.side_effect}", level=mock_alert_level.ERROR,) - ], any_order=True) - - #TODO: assert call counts - - - - - - - - - -# def test_train_ensemble(self, mock_model_path, mock_ensemble_manager, args, -# expected_command, -# expected_methods_called): -# # Create a mock for the ensemble manager -# with patch("views_pipeline_core.managers.ensemble.EnsembleManager._train_model_artifact") as mock_train_model_artifact: -# manager = EnsembleManager(ensemble_path=mock_model_path) - -# manager.config = { -# "run_type": "test_run", -# "models": ["/path/to/models/test_model1", "/path/to/models/test_model2"] -# } - -# manager._train_ensemble(args.use_saved) - -# print("Call count:", mock_train_model_artifact.call_count) -# # Check that _train_model_artifact was called the expected number of times -# assert mock_train_model_artifact.call_count == len(manager.config["models"]) - -# # If there were models, assert that it was called with the expected parameters - -# for model_name in manager.config["models"]: -# mock_train_model_artifact.assert_any_call(model_name, "test_run", args.use_saved) - - - -# def test_evaluate_ensemble(self, mock_model_path, args, -# expected_command, -# expected_methods_called): -# with patch("views_pipeline_core.managers.ensemble.EnsembleManager._evaluate_model_artifact") as mock_evaluate_model_artifact, \ -# patch("views_pipeline_core.managers.ensemble.EnsembleManager._get_aggregated_df") as mock_get_aggregated_df, \ -# patch("views_pipeline_core.managers.model.ModelPathManager") as mock_model_path_class, \ -# patch("views_pipeline_core.managers.model.ModelPathManager._get_model_dir") as mock_get_model_dir, \ -# patch("views_pipeline_core.managers.model.ModelPathManager._build_absolute_directory") as mock_build_absolute_directory: - - -# mock_model_path_instance = mock_model_path_class.return_value - -# mock_model_path_instance._initialize_directories() - - -# mock_evaluate_model_artifact.side_effect = [ -# [{"prediction": 1}, {"prediction": 2}], -# [{"prediction": 3}, {"prediction": 4}] -# ] -# mock_get_aggregated_df.side_effect = [ -# {"ensemble_prediction": 1.5}, -# {"ensemble_prediction": 3.0} -# ] - - - -# manager = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager.config = { -# "run_type": "test_run", -# "models": ["test_model", "test_model"], -# "name": "test_ensemble", -# "deployment_status": "test_status", -# "aggregation": "mean", -# } - -# manager._evaluate_ensemble(args.eval_type) - -# assert mock_evaluate_model_artifact.call_count == len(manager.config["models"]) -# mock_get_aggregated_df.assert_called() - - -# # This is just not working: -# # mock_create_log_file.assert_called_once_with( -# # Path("/mock/path/generated"), -# # manager.config, -# # ANY, # Timestamp -# # ANY, # Timestamp -# # ANY, # Data fetch timestamp -# # model_type="ensemble", -# # models=manager.config["models"] -# # ) - - - - -# def test_forecast_ensemble(self, mock_model_path, args, -# expected_command, -# expected_methods_called): -# # Mock all required methods and classes -# with patch("views_pipeline_core.managers.ensemble.EnsembleManager._forecast_model_artifact") as mock_forecast_model_artifact, \ -# patch("views_pipeline_core.managers.ensemble.EnsembleManager._get_aggregated_df") as mock_get_aggregated_df, \ -# patch("views_pipeline_core.managers.model.ModelPathManager") as mock_model_path_class, \ -# patch("views_pipeline_core.managers.model.ModelPathManager._get_model_dir") as mock_get_model_dir, \ -# patch("views_pipeline_core.managers.model.ModelPathManager._build_absolute_directory") as mock_build_absolute_directory: - -# mock_model_path_instance = mock_model_path.return_value -# mock_model_path_instance._initialize_directories() - -# mock_forecast_model_artifact.side_effect = [ -# {"model_name": "test_model", "prediction": 1}, -# {"model_name": "test_model", "prediction": 2} -# ] - -# mock_get_aggregated_df.return_value = {"ensemble_prediction": 1.5} - -# manager = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager.config = { -# "run_type": "test_run", -# "models": ["test_model", "test_model"], -# "name": "test_ensemble", -# "deployment_status": "test_status", -# "aggregation": "mean", -# } - -# manager._forecast_ensemble() - -# assert mock_forecast_model_artifact.call_count == len(manager.config["models"]) -# assert mock_get_aggregated_df.call_count == 1 - -# # This is not working for the same reason -# # mock_create_log_file.assert_called_once_with( -# # Path("/mock/path/generated"), -# # manager.config, -# # ANY, # model_timestamp -# # ANY, # data_generation_timestamp -# # data_fetch_timestamp=None, -# # model_type="ensemble", -# # models=manager.config["models"] -# # ) - - - - - -# def test_train_model_artifact(self, mock_model_path, args, -# expected_command, -# expected_methods_called): -# # Mocking required methods and classes -# with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_class, \ -# patch("views_pipeline_core.managers.ensemble.ModelManager") as mock_model_manager_class, \ -# patch("views_pipeline_core.managers.ensemble.subprocess.run") as mock_subprocess_run, \ -# patch("views_pipeline_core.managers.ensemble.logger") as mock_logger: - -# mock_model_path_instance = mock_model_path_class.return_value - -# # Use PropertyMock to mock model_dir property -# type(mock_model_path_instance).model_dir = PropertyMock(return_value="/mock/path/to/model") - - - -# # Mock the ModelManager instance and its configs -# mock_model_manager_instance = MagicMock() -# mock_model_manager_class.return_value = mock_model_manager_instance -# mock_model_manager_instance.configs = {"model_name": "test_model", "run_type": "test_run"} - -# # Mock subprocess.run to simulate successful shell command execution -# mock_subprocess_run.return_value = None # Simulate success (no exception thrown) - -# # Instantiate the manager and set up the config -# manager = EnsembleManager(ensemble_path=mock_model_path) -# manager.config = { -# "run_type": "test_run", -# "models": ["test_model"], -# "name": "test_ensemble", -# "deployment_status": "test_status", -# "aggregation": "mean", -# } - -# # Call the method under test -# manager._train_model_artifact("test_model", "test_run", use_saved=args.use_saved) - -# # Assert that subprocess.run is called once -# mock_subprocess_run.assert_called_once_with( -# ANY, # Command should be flexible, so we use ANY -# check=True -# ) - -# # Assert that the logger's info method was called -# mock_logger.info.assert_called_with("Training single model test_model...") - -# # Assert that the correct shell command was generated -# shell_command = EnsembleManager._get_shell_command( -# mock_model_path_instance, -# "test_run", -# train=True, -# evaluate=False, -# forecast=False, -# use_saved=args.use_saved -# ) - -# mock_subprocess_run.assert_called_once_with(shell_command, check=True) - -# mock_logger.info.assert_called_with("Training single model test_model...") - -# # If an exception is thrown during subprocess.run, assert logger error -# mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, 'command') -# mock_exception = subprocess.CalledProcessError(1, 'command') -# manager._train_model_artifact("test_model", "test_run", use_saved=False) -# expected_error_message = "Error during shell command execution for model test_model: " + str(mock_exception) -# mock_logger.error.assert_called_with(expected_error_message) - - - - - -# def test_evaluate_model_artifact(self, args, expected_command, expected_methods_called): - -# with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_manager, \ -# patch("subprocess.run") as mock_subprocess_run, \ -# patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ -# patch("views_pipeline_core.managers.ensemble.read_log_file") as mock_read_log_file, \ -# patch("views_pipeline_core.managers.ensemble.create_log_file") as mock_create_log_file, \ -# patch("views_forecasts.extensions.ForecastAccessor.read_store") as mock_read_store, \ -# patch("views_pipeline_core.managers.ensemble.ModelManager._resolve_evaluation_sequence_number") as mock_resolve, \ -# patch("views_pipeline_core.managers.package.PackageManager") as mock_PackageManager, \ -# patch.object(ModelManager, '_ModelManager__load_config') as mock_load_config: - -# mock_resolve.return_value = 5 - -# mock_read_store.return_value = pd.DataFrame({"a": [1, 2, 3]}) - - -# # Mock the ModelPath instance and its attributes -# mock_model_path_instance = mock_model_path_manager.return_value -# mock_artifact_path = MagicMock() -# mock_artifact_path.stem = "predictions_test_run_202401011200000" -# mock_model_path_instance.get_latest_model_artifact_path.return_value = mock_artifact_path - -# # mock_dataframe_format = ".parquet" -# # mock_pipeline_config.dataframe_format = mock_dataframe_format - -# #mock_model_path_instance.data_raw = "/mock/path/raw" -# mock_model_path_instance.data_generated = "/mock/path/generated" - - - - -# # Instantiate the manager and set up the config -# manager = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager._evaluate_model_artifact("test_model", "test_run", eval_type="standard") -# mock_logger.info.assert_any_call("Evaluating single model test_model...") - - - -# for sequence_number in range(mock_resolve.return_value): -# mock_logger.info.assert_any_call(f"Loading existing prediction test_model_{mock_artifact_path.stem}_{sequence_number:02} from prediction store") - - - - -# mock_read_store.side_effect = [item for _ in range(mock_resolve.return_value) for item in [ -# Exception("Test exception"), -# pd.DataFrame({"a": [1, 2, 3]}), -# ]] - -# manager_side = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager_side._evaluate_model_artifact("test_model", "test_run", eval_type="standard") - -# print("here") -# # Generate the expected shell command -# # shell_command = EnsembleManager._get_shell_command( -# # mock_model_path_instance, -# # "test_run", -# # train=False, -# # evaluate=True, -# # forecast=False, -# # use_saved=True, -# # eval_type="standard" -# # ) -# # mock_subprocess_run.assert_called_once_with( -# # shell_command, -# # check=True -# # ) -# mock_logger.info.assert_any_call("No existing test_run predictions found. Generating new test_run predictions...") -# print("after first side") -# print("mock_subprocess_run.call_args_list",mock_subprocess_run.call_args_list) -# mock_read_store.side_effect = [item for _ in range(mock_resolve.return_value) for item in [ -# Exception("Test exception"), -# pd.DataFrame({"a": [1, 2, 3]}), -# ]] - -# mock_subprocess_run.side_effect = [Exception("Subprocess failed") for _ in range(mock_resolve.return_value)] - -# manager_side2 = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager_side2._evaluate_model_artifact("test_model", "test_run", eval_type="standard") -# print("mock_logger.info.call_args_list",mock_logger.info.call_args_list) -# print("mock_logger.error.call_args_list",mock_logger.error.call_args_list) -# mock_logger.error.assert_any_call("Error during shell command execution for model test_model: Subprocess failed") - - - - - -# assert mock_create_log_file.call_count==10 -# assert mock_logger.error.call_count ==5 -# assert mock_logger.info.call_count ==18 -# assert mock_read_log_file.call_count==10 - - - - - - - - - - - - -# def test_forecast_model_artifact(self, mock_model_path, args, expected_command, expected_methods_called): - -# with patch("views_pipeline_core.managers.ensemble.ModelPathManager") as mock_model_path_manager, \ -# patch("subprocess.run") as mock_subprocess_run, \ -# patch("views_pipeline_core.managers.ensemble.logger") as mock_logger, \ -# patch("views_pipeline_core.managers.ensemble.read_log_file") as mock_read_log_file, \ -# patch("views_pipeline_core.managers.ensemble.create_log_file") as mock_create_log_file, \ -# patch("views_forecasts.extensions.ForecastAccessor.read_store") as mock_read_store, \ -# patch("views_pipeline_core.managers.package.PackageManager") as mock_PackageManager, \ -# patch.object(ModelManager, '_ModelManager__load_config') as mock_load_config: - -# # mock get_latest_model_artifact_path -# mock_model_path_instance = mock_model_path_manager.return_value -# mock_artifact_path = MagicMock() -# mock_artifact_path.stem = "predictions_test_run_202401011200000" -# mock_model_path_instance.get_latest_model_artifact_path.return_value = mock_artifact_path - -# # Try block -# mock_read_store.return_value = pd.DataFrame({"a": [1, 2, 3]}) -# manager = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager._forecast_model_artifact("test_model", "test_run") - -# mock_logger.info.assert_any_call("Forecasting single model test_model...") -# expected_name = (f"test_model_{mock_artifact_path.stem}") -# mock_logger.info.assert_any_call(f"Loading existing prediction {expected_name} from prediction store") - -# # Except block for read_store, Try block for subprocess -# mock_read_store.side_effect = [ -# Exception("Test exception"), -# pd.DataFrame({"a": [1, 2, 3]}), -# ] -# manager_side = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager_side._forecast_model_artifact("test_model", "test_run") - -# mock_logger.info.assert_any_call("No existing test_run predictions found. Generating new test_run predictions...") - -# # Except block for read_store, Except block for subprocess -# mock_read_store.side_effect = [ -# Exception("Test exception"), -# pd.DataFrame({"a": [1, 2, 3]}), -# ] -# mock_subprocess_run.side_effect = Exception("Subprocess failed"), - -# manager_side2 = EnsembleManager(ensemble_path=mock_model_path_instance) -# manager_side2._forecast_model_artifact("test_model", "test_run") - -# mock_logger.error.assert_any_call("Error during shell command execution for model test_model: Subprocess failed") - -# assert mock_create_log_file.call_count==2 -# assert mock_logger.error.call_count ==1 -# assert mock_logger.info.call_count ==6 -# assert mock_read_log_file.call_count==2 - - - - - - - - - - - - - - - - - - - - - - - -@pytest.fixture -def sample_data(): - """ - Fixture to provide common sample data for the aggregation tests. - """ - df1 = pd.DataFrame({"A": [1, 2], "B": [3, 4]}, index=pd.MultiIndex.from_tuples([(0, 0), (0, 1)])) - df2 = pd.DataFrame({"A": [5, 6], "B": [7, 8]}, index=pd.MultiIndex.from_tuples([(0, 0), (0, 1)])) - return [df1, df2] - -def test_get_aggregated_df_mean(sample_data): - """ - Test the _get_aggregated_df method to ensure it correctly aggregates DataFrames using mean. - """ - df_to_aggregate = sample_data - - result = EnsembleManager._get_aggregated_df(df_to_aggregate, "mean") - expected = pd.DataFrame({"A": [3.0, 4.0], "B": [5.0, 6.0]}, index=pd.MultiIndex.from_tuples([(0, 0), (0, 1)])) - - pd.testing.assert_frame_equal(result, expected, check_like=True) - -def test_get_aggregated_df_median(sample_data): - """ - Test the _get_aggregated_df method to ensure it correctly aggregates DataFrames using median. - """ - df_to_aggregate = sample_data - - result = EnsembleManager._get_aggregated_df(df_to_aggregate, "median") - expected = pd.DataFrame({"A": [3.0, 4.0], "B": [5.0, 6.0]}, index=pd.MultiIndex.from_tuples([(0, 0), (0, 1)])) - - pd.testing.assert_frame_equal(result, expected, check_like=True) - -def test_get_aggregated_df_invalid_aggregation(sample_data): - """ - Test the _get_aggregated_df method for invalid aggregation method. - """ - - with pytest.raises(ValueError, match="Invalid aggregation method: invalid_aggregation"): - EnsembleManager._get_aggregated_df(sample_data, "invalid_aggregation") - - diff --git a/tests/test_model_manager.py b/tests/test_model_manager.py deleted file mode 100644 index 4d291c2..0000000 --- a/tests/test_model_manager.py +++ /dev/null @@ -1,390 +0,0 @@ -import pytest -from unittest.mock import MagicMock, patch, mock_open -from views_pipeline_core.managers.model import ModelManager -from views_pipeline_core.managers.model import ModelManager -import wandb -import pandas as pd -from pathlib import Path - -@pytest.fixture -def mock_model_path(): - """ - Fixture to mock the ModelPath class with validate flag set to False. - - Yields: - MagicMock: The mock object for ModelPath. - """ - with patch("views_pipeline_core.managers.model.ModelPathManager") as mock: - mock_instance = mock.return_value - mock_instance.get_scripts.return_value = { - "config_deployment.py": "path/to/config_deployment.py", - "config_hyperparameters.py": "path/to/config_hyperparameters.py", - "config_meta.py": "path/to/config_meta.py", - "config_sweep.py": "path/to/config_sweep.py" - } - mock_instance._validate = False - yield mock - -@pytest.fixture -def mock_ensemble_path(): - """ - Fixture to mock the EnsemblePath class. - - Yields: - MagicMock: The mock object for EnsemblePath. - """ - with patch("views_pipeline_core.managers.ensemble.EnsemblePathManager") as mock: - yield mock - -@pytest.fixture -def mock_dataloader(): - """ - Fixture to mock the ViewsDataLoader class. - - Yields: - MagicMock: The mock object for ViewsDataLoader. - """ - with patch("views_pipeline_core.data.dataloaders.ViewsDataLoader") as mock: - mock_instance = mock.return_value - mock_instance._path_raw = "/path/to/raw" - mock_instance.get_data.return_value = (MagicMock(), MagicMock()) # Queryset output but not really - yield mock - -@pytest.fixture -def mock_wandb(): - """ - Fixture to mock the wandb functions. - - Yields: - None - """ - with patch("wandb.init"), patch("wandb.finish"), patch("wandb.sweep"), patch("wandb.agent"): - yield - -def test_wandb_alert(mock_model_path): - """ - Test the _wandb_alert method of the ModelManager class. - - Args: - mock_model_path (MagicMock): The mock object for ModelPath. - - Asserts: - - The wandb alert is called with the correct parameters. - """ - mock_model_instance = mock_model_path.return_value - mock_config_deployment_content = """ -def get_deployment_config(): - deployment_config = {'deployment_status': 'shadow'} - return deployment_config -""" - with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_deployment_content)): - mock_spec.return_value.loader = MagicMock() - mock_module.return_value.get_deployment_config.return_value = {"deployment_status": "shadow"} - mock_module.return_value.get_hp_config.return_value = {"hp_key": "hp_value"} - mock_module.return_value.get_meta_config.return_value = {"meta_key": "meta_value"} - mock_model_instance = mock_model_path.return_value - manager = ModelManager(mock_model_instance, wandb_notifications=True) - with patch("wandb.alert") as mock_alert: - with patch("wandb.run"): - manager._wandb_alert(title="Test Alert", text="This is a test alert", level="info") - mock_alert.assert_called_once_with(title="Test Alert", text="This is a test alert", level="info") - -@patch("views_forecasts.extensions.ForecastAccessor.read_store") -def test_model_manager_init(mock_model_path, mock_read_store): - """ - Test the initialization of the ModelManager class. - - Args: - mock_model_path (MagicMock): The mock object for ModelPath. - - Asserts: - - The ModelManager is initialized with the correct attributes. - """ - mock_model_instance = mock_model_path.return_value - mock_model_instance.get_scripts.return_value = { - "config_deployment.py": "path/to/config_deployment.py", - "config_hyperparameters.py": "path/to/config_hyperparameters.py", - "config_meta.py": "path/to/config_meta.py", - "config_sweep.py": "path/to/config_sweep.py" - } - mock_config_deployment_content = """ -def get_deployment_config(): - deployment_config = {'deployment_status': 'shadow'} - return deployment_config -""" - mock_config_hyperparameters_content = """ -def get_hp_config(): - hp_config = {'hp_key': 'hp_value'} - return hp_config -""" - mock_config_meta_content = """ -def get_meta_config(): - meta_config = {'meta_key': 'meta_value'} - return meta_config -""" - with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_deployment_content)): - mock_spec.return_value.loader = MagicMock() - mock_module.return_value.get_deployment_config.return_value = {"deployment_status": "shadow"} - mock_module.return_value.get_hp_config.return_value = {"hp_key": "hp_value"} - mock_module.return_value.get_meta_config.return_value = {"meta_key": "meta_value"} - manager = ModelManager(mock_model_instance) - assert manager._entity == "views_pipeline" - assert manager._model_path == mock_model_instance - assert manager._config_deployment == {"deployment_status": "shadow"} - assert manager._config_hyperparameters == {"hp_key": "hp_value"} - assert manager._config_meta == {"meta_key": "meta_value"} - -@patch("views_forecasts.extensions.ForecastAccessor.read_store") -def test_load_config(mock_model_pat, mock_read_store): - """ - Test the __load_config method of the ModelManager class. - - Args: - mock_model_path (MagicMock): The mock object for ModelPath. - - Asserts: - - The configuration is loaded correctly from the specified script. - """ - mock_model_instance = mock_model_path.return_value - mock_model_instance.get_scripts.return_value = { - "config_deployment.py": "path/to/config_deployment.py" - } - mock_config_deployment_content = """ -def get_deployment_config(): - deployment_config = {'deployment_status': 'shadow'} - return deployment_config -""" - with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_deployment_content)): - mock_spec.return_value.loader = MagicMock() - mock_module.return_value.get_deployment_config.return_value = {"deployment_status": "shadow"} - manager = ModelManager(mock_model_instance) - config = manager._ModelManager__load_config("config_deployment.py", "get_deployment_config") - assert config == {"deployment_status": "shadow"} - -def test_update_single_config(mock_model_path): - """ - Test the _update_single_config method of the ModelManager class. - - Args: - mock_model_path (MagicMock): The mock object for ModelPath. - - Asserts: - - The single run configuration is updated correctly. - """ - mock_model_instance = mock_model_path.return_value - mock_model_instance.get_scripts.return_value = { - "config_deployment.py": "path/to/config_deployment.py", - "config_hyperparameters.py": "path/to/config_hyperparameters.py", - "config_meta.py": "path/to/config_meta.py" - } - mock_config_deployment_content = """ -def get_deployment_config(): - deployment_config = {'deployment_status': 'shadow'} - return deployment_config -""" - with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_deployment_content)): - mock_spec.return_value.loader = MagicMock() - mock_module.return_value.get_deployment_config.return_value = {"deployment_status": "shadow"} - manager = ModelManager(mock_model_instance) - manager._config_hyperparameters = {"hp_key": "hp_value"} - manager._config_meta = {"meta_key": "meta_value"} - manager._config_deployment = {"deploy_key": "deploy_value"} - args = MagicMock(run_type="test_run") - config = manager._update_single_config(args) - assert config["hp_key"] == "hp_value" - assert config["meta_key"] == "meta_value" - assert config["deploy_key"] == "deploy_value" - assert config["run_type"] == "test_run" - assert config["sweep"] is False - -def test_update_sweep_config(mock_model_path): - """ - Test the _update_sweep_config method of the ModelManager class. - - Args: - mock_model_path (MagicMock): The mock object for ModelPath. - - Asserts: - - The sweep run configuration is updated correctly. - """ - mock_model_instance = mock_model_path.return_value - mock_model_instance.get_scripts.return_value = { - "config_sweep.py": "path/to/config_sweep.py", - "config_meta.py": "path/to/config_meta.py" - } - mock_config_sweep_content = """ -def get_sweep_config(): - sweep_config = { - 'method': 'grid', - 'name': 'test_model' - } - - # Example metric setup: - metric = { - 'name': 'MSE', - 'goal': 'minimize' - } - sweep_config['metric'] = metric - - # Example parameters setup: - parameters_dict = { - 'steps': {'values': [[*range(1, 36 + 1, 1)]]}, - 'n_estimators': {'values': [100, 150, 200]}, - } - sweep_config['parameters'] = parameters_dict - - return sweep_config -""" - mock_config_meta_content = """ -def get_meta_config(): - meta_config = {'name': 'test_model', 'depvar': 'test_depvar', 'algorithm': 'test_algorithm'} - return meta_config -""" - with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_sweep_content)): - mock_spec.return_value.loader = MagicMock() - mock_module.return_value.get_sweep_config.return_value = { - 'method': 'grid', - 'name': 'test_model', - 'metric': { - 'name': 'MSE', - 'goal': 'minimize' - }, - 'parameters': { - 'steps': {'values': [[*range(1, 36 + 1, 1)]]}, - 'n_estimators': {'values': [100, 150, 200]}, - } - } - mock_module.return_value.get_meta_config.return_value = {"name": "test_model", "depvar": "test_depvar", "algorithm": "test_algorithm"} - manager = ModelManager(mock_model_instance) - manager._config_sweep = { - 'method': 'grid', - 'name': 'test_model', - 'metric': { - 'name': 'MSE', - 'goal': 'minimize' - }, - 'parameters': { - 'steps': {'values': [[*range(1, 36 + 1, 1)]]}, - 'n_estimators': {'values': [100, 150, 200]}, - } - } - manager._config_meta = {"name": "test_model", "depvar": "test_depvar", "algorithm": "test_algorithm"} - args = MagicMock(run_type="test_run") - config = manager._update_sweep_config(args) - assert config["parameters"]["run_type"]["value"] == "test_run" - assert config["parameters"]["sweep"]["value"] is True - assert config["parameters"]["name"]["value"] == "test_model" - assert config["parameters"]["depvar"]["value"] == "test_depvar" - assert config["parameters"]["algorithm"]["value"] == "test_algorithm" - -def test_execute_single_run(mock_model_path, mock_dataloader, mock_wandb): - """ - Test the execute_single_run method of the ModelManager class. - - Args: - mock_model_path (MagicMock): The mock object for ModelPath. - mock_dataloader (MagicMock): The mock object for ViewsDataLoader. - mock_wandb (None): The mock object for wandb functions. - - Asserts: - - The single run is executed correctly. - """ - mock_model_instance = mock_model_path.return_value - mock_model_instance.get_scripts.return_value = { - "config_deployment.py": "path/to/config_deployment.py", - "config_hyperparameters.py": "path/to/config_hyperparameters.py", - "config_meta.py": "path/to/config_meta.py" - } - mock_config_deployment_content = """ -def get_deployment_config(): - deployment_config = {'deployment_status': 'shadow'} - return deployment_config -""" - with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_deployment_content)): - mock_spec.return_value.loader = MagicMock() - mock_module.return_value.get_deployment_config.return_value = {"deployment_status": "shadow"} - manager = ModelManager(mock_model_instance) - manager._update_single_config = MagicMock(return_value={"name": "test_model"}) - manager._execute_model_tasks = MagicMock() - args = MagicMock(run_type="calibration", saved=False, drift_self_test=False, train=True, evaluate=True, forecast=True, artifact_name="test_artifact") - - # Add logging to identify where the failure occurs - try: - manager.execute_single_run(args) - except Exception as e: - print(f"Error during execute_single_run: {e}") - - manager._update_single_config.assert_called_once_with(args) - - #idek anymore - # manager._execute_model_tasks.assert_called_once_with(config={"name": "test_model"}, train=True, eval=True, forecast=False, artifact_name="test_artifact") - -# def test_save_model_outputs(mock_model_path): -# """ -# Test the _save_model_outputs method of the ModelManager class. - -# Args: -# mock_model_path (MagicMock): The mock object for ModelPath. - -# Asserts: -# - The model outputs are saved correctly. -# """ -# mock_model_instance = mock_model_path.return_value -# mock_model_instance.get_scripts.return_value = { -# "config_deployment.py": "path/to/config_deployment.py", -# "config_hyperparameters.py": "path/to/config_hyperparameters.py", -# "config_meta.py": "path/to/config_meta.py" -# } -# mock_config_deployment_content = """ -# def get_deployment_config(): -# deployment_config = {'deployment_status': 'shadow'} -# return deployment_config -# """ -# with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_deployment_content)): -# mock_spec.return_value.loader = MagicMock() -# mock_module.return_value.get_deployment_config.return_value = {"deployment_status": "shadow"} -# manager = ModelManager(mock_model_instance) -# manager.config = {"run_type": "calibration", "timestamp": "20210831_123456"} -# df_evaluation = pd.DataFrame({"metric": [1, 2, 3]}) -# df_output = pd.DataFrame({"output": [4, 5, 6]}) -# path_generated = "/path/to/generated" -# sequence_number = 1 -# with patch("pathlib.Path.mkdir") as mock_mkdir, patch("pandas.DataFrame.to_pickle") as mock_to_pickle: -# manager._save_model_outputs(df_evaluation, df_output, path_generated, sequence_number) -# mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) -# mock_to_pickle.assert_any_call(Path(path_generated) / "output_calibration_20210831_123456_01.pkl") -# mock_to_pickle.assert_any_call(Path(path_generated) / "evaluation_calibration_20210831_123456_01.pkl") - -# def test_save_predictions(mock_model_path): -# """ -# Test the _save_predictions method of the ModelManager class. - -# Args: -# mock_model_path (MagicMock): The mock object for ModelPath. - -# Asserts: -# - The model predictions are saved correctly. -# """ -# mock_model_instance = mock_model_path.return_value -# mock_model_instance.get_scripts.return_value = { -# "config_deployment.py": "path/to/config_deployment.py", -# "config_hyperparameters.py": "path/to/config_hyperparameters.py", -# "config_meta.py": "path/to/config_meta.py" -# } -# mock_config_deployment_content = """ -# def get_deployment_config(): -# deployment_config = {'deployment_status': 'shadow'} -# return deployment_config -# """ -# with patch("importlib.util.spec_from_file_location") as mock_spec, patch("importlib.util.module_from_spec") as mock_module, patch("builtins.open", mock_open(read_data=mock_config_deployment_content)): -# mock_spec.return_value.loader = MagicMock() -# mock_module.return_value.get_deployment_config.return_value = {"deployment_status": "shadow"} -# manager = ModelManager(mock_model_instance) -# manager.config = {"run_type": "calibration", "timestamp": "20210831_123456"} -# df_predictions = pd.DataFrame({"prediction": [7, 8, 9]}) -# path_generated = "/path/to/generated" -# sequence_number = 1 -# with patch("pathlib.Path.mkdir") as mock_mkdir, patch("pandas.DataFrame.to_pickle") as mock_to_pickle: -# manager._save_predictions(df_predictions, path_generated, sequence_number) -# mock_mkdir.assert_called_once_with(parents=True, exist_ok=True) -# mock_to_pickle.assert_called_once_with(Path(path_generated) / "predictions_calibration_20210831_123456_01.pkl") \ No newline at end of file