From 9f57681032cc72bc6f4948a4ea01df9c915f3c4f Mon Sep 17 00:00:00 2001 From: D-X-Y <280835372@qq.com> Date: Tue, 16 Mar 2021 08:11:05 +0000 Subject: [PATCH 1/7] Fix errors when SignalRecord is not called before SigAna/PortAna --- qlib/workflow/record_temp.py | 23 ++++++++++++++--------- tests/test_all_pipeline.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index be458a24d2..6416698981 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -110,7 +110,7 @@ class SignalRecord(RecordTemp): This is the Signal Record class that generates the signal prediction. This class inherits the ``RecordTemp`` class. """ - def __init__(self, model=None, dataset=None, recorder=None, **kwargs): + def __init__(self, model=None, dataset=None, recorder=None): super().__init__(recorder=recorder) self.model = model self.dataset = dataset @@ -163,14 +163,16 @@ class SigAnaRecord(SignalRecord): artifact_path = "sig_analysis" - def __init__(self, recorder, ana_long_short=False, ann_scaler=252, **kwargs): + def __init__(self, recorder, ana_long_short=False, ann_scaler=252): + super().__init__(recorder=recorder) self.ana_long_short = ana_long_short self.ann_scaler = ann_scaler - super().__init__(recorder=recorder, **kwargs) - # The name must be unique. Otherwise it will be overridden - def generate(self): - self.check(parent=True) + def generate(self, **kwargs): + try: + self.check(parent=True) + except: + super().generate() pred = self.load("pred.pkl") label = self.load("label.pkl") @@ -228,7 +230,7 @@ def __init__(self, recorder, config, **kwargs): config["backtest"] : dict define the backtest kwargs. """ - super().__init__(recorder=recorder) + super().__init__(recorder=recorder, **kwargs) self.strategy_config = config["strategy"] self.backtest_config = config["backtest"] @@ -236,10 +238,13 @@ def __init__(self, recorder, config, **kwargs): def generate(self, **kwargs): # check previously stored prediction results - self.check(parent=True) # "Make sure the parent process is completed and store the data properly." + try: + self.check(parent=True) # "Make sure the parent process is completed and store the data properly." + except: + super().generate() # custom strategy and get backtest - pred_score = super().load() + pred_score = super().load("pred.pkl") report_dict = normal_backtest(pred_score, strategy=self.strategy, **self.backtest_config) report_normal = report_dict.get("report_df") positions_normal = report_dict.get("positions") diff --git a/tests/test_all_pipeline.py b/tests/test_all_pipeline.py index fbf15d29ad..ac0cad199e 100644 --- a/tests/test_all_pipeline.py +++ b/tests/test_all_pipeline.py @@ -139,6 +139,33 @@ def train(): return pred_score, {"ic": ic, "ric": ric}, rid +def train_with_sigana(): + """train model followed by SigAnaRecord + + Returns + ------- + pred_score: pandas.DataFrame + predict scores + performance: dict + model performance + """ + model = init_instance_by_config(task["model"]) + dataset = init_instance_by_config(task["dataset"]) + + # start exp + with R.start(experiment_name="workflow"): + R.log_params(**flatten_dict(task)) + model.fit(dataset) + + # predict and calculate ic and ric + recorder = R.get_recorder() + sar = SigAnaRecord(recorder, model=model, dataset=dataset) + sar.generate() + ic = sar.load(sar.get_path("ic.pkl")) + ric = sar.load(sar.get_path("ric.pkl")) + return pred_score, {"ic": ic, "ric": ric}, rid + + def fake_experiment(): """A fake experiment workflow to test uri @@ -214,6 +241,11 @@ def test_2_expmanager(self): self.assertTrue(pass_current, msg="current uri is incorrect") shutil.rmtree(str(Path(uri_path.strip("file:")).resolve())) + def test_3_train_with_sigana(self): + TestAllFlow.PRED_SCORE, ic_ric, TestAllFlow.RID = train_with_sigana() + self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed") + self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed") + def suite(): _suite = unittest.TestSuite() From 6559d44c7dc10c0a2de2705ee500fe0c9598c8fe Mon Sep 17 00:00:00 2001 From: D-X-Y <280835372@qq.com> Date: Tue, 16 Mar 2021 08:17:13 +0000 Subject: [PATCH 2/7] Add tests for SigAnaRecord --- qlib/workflow/record_temp.py | 4 ++-- tests/test_all_pipeline.py | 22 ++++++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index 6416698981..b4e70bea4d 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -163,8 +163,8 @@ class SigAnaRecord(SignalRecord): artifact_path = "sig_analysis" - def __init__(self, recorder, ana_long_short=False, ann_scaler=252): - super().__init__(recorder=recorder) + def __init__(self, recorder, ana_long_short=False, ann_scaler=252, **kwargs): + super().__init__(recorder=recorder, **kwargs) self.ana_long_short = ana_long_short self.ann_scaler = ann_scaler diff --git a/tests/test_all_pipeline.py b/tests/test_all_pipeline.py index ac0cad199e..f4fdc50ff8 100644 --- a/tests/test_all_pipeline.py +++ b/tests/test_all_pipeline.py @@ -153,7 +153,7 @@ def train_with_sigana(): dataset = init_instance_by_config(task["dataset"]) # start exp - with R.start(experiment_name="workflow"): + with R.start(experiment_name="workflow_with_sigana"): R.log_params(**flatten_dict(task)) model.fit(dataset) @@ -163,7 +163,8 @@ def train_with_sigana(): sar.generate() ic = sar.load(sar.get_path("ic.pkl")) ric = sar.load(sar.get_path("ric.pkl")) - return pred_score, {"ic": ic, "ric": ric}, rid + uri_path = R.get_uri() + return pred_score, {"ic": ic, "ric": ric}, uri_path def fake_experiment(): @@ -222,12 +223,18 @@ class TestAllFlow(TestAutoData): def tearDownClass(cls) -> None: shutil.rmtree(str(Path(C["exp_manager"]["kwargs"]["uri"].strip("file:")).resolve())) - def test_0_train(self): + def test_0_train_with_sigana(self): + TestAllFlow.PRED_SCORE, ic_ric, uri_path = train_with_sigana() + self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed") + self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed") + shutil.rmtree(str(Path(uri_path.strip("file:")).resolve())) + + def test_1_train(self): TestAllFlow.PRED_SCORE, ic_ric, TestAllFlow.RID = train() self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed") self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed") - def test_1_backtest(self): + def test_2_backtest(self): analyze_df = backtest_analysis(TestAllFlow.PRED_SCORE, TestAllFlow.RID) self.assertGreaterEqual( analyze_df.loc(axis=0)["excess_return_with_cost", "annualized_return"].values[0], @@ -235,17 +242,12 @@ def test_1_backtest(self): "backtest failed", ) - def test_2_expmanager(self): + def test_3_expmanager(self): pass_default, pass_current, uri_path = fake_experiment() self.assertTrue(pass_default, msg="default uri is incorrect") self.assertTrue(pass_current, msg="current uri is incorrect") shutil.rmtree(str(Path(uri_path.strip("file:")).resolve())) - def test_3_train_with_sigana(self): - TestAllFlow.PRED_SCORE, ic_ric, TestAllFlow.RID = train_with_sigana() - self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed") - self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed") - def suite(): _suite = unittest.TestSuite() From b0fd0d2395ba332bb99831f330351e42dbf370f7 Mon Sep 17 00:00:00 2001 From: D-X-Y <280835372@qq.com> Date: Tue, 16 Mar 2021 08:30:46 +0000 Subject: [PATCH 3/7] Add tests for SigAnaRecord --- tests/test_all_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_all_pipeline.py b/tests/test_all_pipeline.py index f4fdc50ff8..21a82cd30c 100644 --- a/tests/test_all_pipeline.py +++ b/tests/test_all_pipeline.py @@ -163,6 +163,7 @@ def train_with_sigana(): sar.generate() ic = sar.load(sar.get_path("ic.pkl")) ric = sar.load(sar.get_path("ric.pkl")) + pred_score = sar.load("pred.pkl") uri_path = R.get_uri() return pred_score, {"ic": ic, "ric": ric}, uri_path From 4cb74d77d1bf86b1484dc15f1941df5218a11416 Mon Sep 17 00:00:00 2001 From: D-X-Y <280835372@qq.com> Date: Tue, 16 Mar 2021 09:01:10 +0000 Subject: [PATCH 4/7] add error type for record_temp --- qlib/workflow/record_temp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index b4e70bea4d..2c1b6fecce 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -171,7 +171,7 @@ def __init__(self, recorder, ana_long_short=False, ann_scaler=252, **kwargs): def generate(self, **kwargs): try: self.check(parent=True) - except: + except FileExistsError: super().generate() pred = self.load("pred.pkl") @@ -240,7 +240,7 @@ def generate(self, **kwargs): # check previously stored prediction results try: self.check(parent=True) # "Make sure the parent process is completed and store the data properly." - except: + except FileExistsError: super().generate() # custom strategy and get backtest From d4aa6816520d306503a1f80c1834b37a9df83c3d Mon Sep 17 00:00:00 2001 From: D-X-Y <280835372@qq.com> Date: Tue, 16 Mar 2021 12:54:12 +0000 Subject: [PATCH 5/7] Add MSERecord in contrib.workflow --- qlib/contrib/workflow/__init__.py | 0 qlib/contrib/workflow/record_temp.py | 47 ++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 qlib/contrib/workflow/__init__.py create mode 100644 qlib/contrib/workflow/record_temp.py diff --git a/qlib/contrib/workflow/__init__.py b/qlib/contrib/workflow/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/qlib/contrib/workflow/record_temp.py b/qlib/contrib/workflow/record_temp.py new file mode 100644 index 0000000000..2b99307433 --- /dev/null +++ b/qlib/contrib/workflow/record_temp.py @@ -0,0 +1,47 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import re +import pandas as pd +from sklearn.metrics import mean_squared_error +from pprint import pprint +import numpy as np + +from ...workflow.record_temp import SignalRecord +from ...log import get_module_logger + +logger = get_module_logger("workflow", "INFO") + + +class SignalMseRecord(SignalRecord): + """ + This is the Signal MSE Record class that computes the mean squared error (MSE). + This class inherits the ``SignalMseRecord`` class. + """ + + artifact_path = "sig_analysis" + + def __init__(self, recorder, **kwargs): + super().__init__(recorder=recorder, **kwargs) + + def generate(self, **kwargs): + try: + self.check(parent=True) + except FileExistsError: + super().generate() + + pred = self.load("pred.pkl") + label = self.load("label.pkl") + masks = ~np.isnan(label.values) + mse = mean_squared_error(pred.values[masks], label[masks]) + metrics = { + "MSE": mse, + } + objects = {"mse.pkl": mse} + self.recorder.log_metrics(**metrics) + self.recorder.save_objects(**objects, artifact_path=self.get_path()) + pprint(metrics) + + def list(self): + paths = [self.get_path("mse.pkl")] + return paths From 88b0871c12d0b139da489c53e02444606f6ca634 Mon Sep 17 00:00:00 2001 From: D-X-Y <280835372@qq.com> Date: Tue, 16 Mar 2021 22:55:28 +0800 Subject: [PATCH 6/7] Add RMSE for contrib.workflow.record_temp and unit tests --- qlib/contrib/workflow/record_temp.py | 5 +++-- tests/test_all_pipeline.py | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/qlib/contrib/workflow/record_temp.py b/qlib/contrib/workflow/record_temp.py index 2b99307433..7094d844e0 100644 --- a/qlib/contrib/workflow/record_temp.py +++ b/qlib/contrib/workflow/record_temp.py @@ -36,12 +36,13 @@ def generate(self, **kwargs): mse = mean_squared_error(pred.values[masks], label[masks]) metrics = { "MSE": mse, + "RMSE": np.sqrt(mse) } - objects = {"mse.pkl": mse} + objects = {"mse.pkl": mse, "rmse.pkl": np.sqrt(mse)} self.recorder.log_metrics(**metrics) self.recorder.save_objects(**objects, artifact_path=self.get_path()) pprint(metrics) def list(self): - paths = [self.get_path("mse.pkl")] + paths = [self.get_path("mse.pkl"), self.get_path("rmse.pkl")] return paths diff --git a/tests/test_all_pipeline.py b/tests/test_all_pipeline.py index 21a82cd30c..29d39179d1 100644 --- a/tests/test_all_pipeline.py +++ b/tests/test_all_pipeline.py @@ -19,6 +19,7 @@ backtest as normal_backtest, risk_analysis, ) +from qlib.contrib.workflow.record_temp import SignalMseRecord from qlib.utils import exists_qlib_data, init_instance_by_config, flatten_dict from qlib.workflow import R from qlib.workflow.record_temp import SignalRecord, SigAnaRecord, PortAnaRecord @@ -164,6 +165,9 @@ def train_with_sigana(): ic = sar.load(sar.get_path("ic.pkl")) ric = sar.load(sar.get_path("ric.pkl")) pred_score = sar.load("pred.pkl") + + smr = SignalMseRecord(recorder) + smr.generate() uri_path = R.get_uri() return pred_score, {"ic": ic, "ric": ric}, uri_path From 872ddc6f950cfed2fd282d31d38a0bbbcf85c46a Mon Sep 17 00:00:00 2001 From: D-X-Y <280835372@qq.com> Date: Tue, 16 Mar 2021 22:57:26 +0800 Subject: [PATCH 7/7] Fix black error --- qlib/contrib/workflow/record_temp.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/qlib/contrib/workflow/record_temp.py b/qlib/contrib/workflow/record_temp.py index 7094d844e0..3fdf0c2815 100644 --- a/qlib/contrib/workflow/record_temp.py +++ b/qlib/contrib/workflow/record_temp.py @@ -34,10 +34,7 @@ def generate(self, **kwargs): label = self.load("label.pkl") masks = ~np.isnan(label.values) mse = mean_squared_error(pred.values[masks], label[masks]) - metrics = { - "MSE": mse, - "RMSE": np.sqrt(mse) - } + metrics = {"MSE": mse, "RMSE": np.sqrt(mse)} objects = {"mse.pkl": mse, "rmse.pkl": np.sqrt(mse)} self.recorder.log_metrics(**metrics) self.recorder.save_objects(**objects, artifact_path=self.get_path())