-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize the implementation of uri & Fix async log bug #1364
Changes from 6 commits
4c7d3d7
ce7934a
8cde41b
09d9203
ecea697
508cdd7
5d8fbb1
d57bde8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,23 +15,32 @@ | |
from ..log import get_module_logger | ||
from ..utils.exceptions import ExpAlreadyExistError | ||
|
||
|
||
logger = get_module_logger("workflow") | ||
|
||
|
||
class ExpManager: | ||
""" | ||
This is the `ExpManager` class for managing experiments. The API is designed similar to mlflow. | ||
(The link: https://mlflow.org/docs/latest/python_api/mlflow.html) | ||
This is the `ExpManager` class for managing experiments. The API is designed similar to mlflow. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ChiahungTai You can start from these docs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "we can have multiple From my understanding in MLFLow, the uri is like a store backend(sql, file...). The ExpManger is a single entry to lookup and manipulate the experiments in the uri. So the user can change different uri to retrieve the different topic of experiements. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your new comment is more clear than before. |
||
(The link: https://mlflow.org/docs/latest/python_api/mlflow.html) | ||
|
||
The `ExpManager` is expected to be a singleton (btw, we can have multiple `Experiment`s with different uri. user can get different experiments from different uri, and then compare records of them). Global Config (e.g. `C`) is also a singleton. | ||
So we try to align them together. They share the same variable, which is called **default uri**. Please refer to `ExpManager.default_uri` for details of variable sharing. | ||
|
||
When the user starts an experiment, the user may want to set the uri to a specific uri (it will override **default uri** during this period), and then unset the **specific uri** and fallback to the **default uri**. `ExpManager._active_exp_uri` is that **specific uri**. | ||
""" | ||
|
||
active_experiment: Optional[Experiment] | ||
|
||
def __init__(self, uri: Text, default_exp_name: Optional[Text]): | ||
self._current_uri = uri | ||
self.default_uri = uri | ||
self._active_exp_uri = None # No active experiments. So it is set to None | ||
self._default_exp_name = default_exp_name | ||
self.active_experiment = None # only one experiment can be active each time | ||
logger.info(f"experiment manager uri is at {self._current_uri}") | ||
logger.info(f"experiment manager uri is at {self.uri}") | ||
|
||
def __repr__(self): | ||
return "{name}(current_uri={curi})".format(name=self.__class__.__name__, curi=self._current_uri) | ||
return "{name}(uri={uri})".format(name=self.__class__.__name__, uri=self.uri) | ||
|
||
def start_exp( | ||
self, | ||
|
@@ -43,11 +52,13 @@ def start_exp( | |
uri: Optional[Text] = None, | ||
resume: bool = False, | ||
**kwargs, | ||
): | ||
) -> Experiment: | ||
""" | ||
Start an experiment. This method includes first get_or_create an experiment, and then | ||
set it to be active. | ||
|
||
Maintaining `_active_exp_uri` is included in start_exp, remaining implementation should be included in _end_exp in subclass | ||
|
||
Parameters | ||
---------- | ||
experiment_id : str | ||
|
@@ -67,19 +78,41 @@ def start_exp( | |
------- | ||
An active experiment. | ||
""" | ||
self._active_exp_uri = uri | ||
# The subclass may set the underlying uri back. | ||
# So setting `_active_exp_uri` come before `_start_exp` | ||
return self._start_exp( | ||
experiment_id=experiment_id, | ||
experiment_name=experiment_name, | ||
recorder_id=recorder_id, | ||
recorder_name=recorder_name, | ||
resume=resume, | ||
**kwargs, | ||
) | ||
|
||
def _start_exp(self, *args, **kwargs) -> Experiment: | ||
"""Please refer to the doc of `start_exp`""" | ||
raise NotImplementedError(f"Please implement the `start_exp` method.") | ||
|
||
def end_exp(self, recorder_status: Text = Recorder.STATUS_S, **kwargs): | ||
""" | ||
End an active experiment. | ||
|
||
Maintaining `_active_exp_uri` is included in end_exp, remaining implementation should be included in _end_exp in subclass | ||
|
||
Parameters | ||
---------- | ||
experiment_name : str | ||
name of the active experiment. | ||
recorder_status : str | ||
the status of the active recorder of the experiment. | ||
""" | ||
self._active_exp_uri = None | ||
# The subclass may set the underlying uri back. | ||
# So setting `_active_exp_uri` come before `_end_exp` | ||
self._end_exp(recorder_status=recorder_status, **kwargs) | ||
|
||
def _end_exp(self, recorder_status: Text = Recorder.STATUS_S, **kwargs): | ||
raise NotImplementedError(f"Please implement the `end_exp` method.") | ||
|
||
def create_exp(self, experiment_name: Optional[Text] = None): | ||
|
@@ -254,6 +287,10 @@ def default_uri(self): | |
raise ValueError("The default URI is not set in qlib.config.C") | ||
return C.exp_manager["kwargs"]["uri"] | ||
|
||
@default_uri.setter | ||
def default_uri(self, value): | ||
C.exp_manager.setdefault("kwargs", {})["uri"] = value | ||
|
||
@property | ||
def uri(self): | ||
""" | ||
|
@@ -263,33 +300,7 @@ def uri(self): | |
------- | ||
The tracking URI string. | ||
""" | ||
return self._current_uri or self.default_uri | ||
|
||
def set_uri(self, uri: Optional[Text] = None): | ||
""" | ||
Set the current tracking URI and the corresponding variables. | ||
|
||
Parameters | ||
---------- | ||
uri : str | ||
|
||
""" | ||
if uri is None: | ||
if self._current_uri is None: | ||
logger.debug("No tracking URI is provided. Use the default tracking URI.") | ||
self._current_uri = self.default_uri | ||
else: | ||
# Temporarily re-set the current uri as the uri argument. | ||
self._current_uri = uri | ||
# Customized features for subclasses. | ||
self._set_uri() | ||
|
||
def _set_uri(self): | ||
""" | ||
Customized features for subclasses' set_uri function. | ||
This method is designed for the underlying experiment backend storage. | ||
""" | ||
raise NotImplementedError(f"Please implement the `_set_uri` method.") | ||
return self._active_exp_uri or self.default_uri | ||
|
||
def list_experiments(self): | ||
""" | ||
|
@@ -309,31 +320,22 @@ class MLflowExpManager(ExpManager): | |
|
||
def __init__(self, uri: Text, default_exp_name: Optional[Text]): | ||
super(MLflowExpManager, self).__init__(uri, default_exp_name) | ||
self._client = None | ||
|
||
def _set_uri(self): | ||
self._client = mlflow.tracking.MlflowClient(tracking_uri=self.uri) | ||
logger.info("{:}".format(self._client)) | ||
|
||
@property | ||
def client(self): | ||
# Delay the creation of mlflow client in case of creating `mlruns` folder when importing qlib | ||
if self._client is None: | ||
self._client = mlflow.tracking.MlflowClient(tracking_uri=self.uri) | ||
return self._client | ||
# Please refer to `tests/dependency_tests/test_mlflow.py::MLflowTest::test_creating_client` | ||
# The test ensure the speed of create a new client | ||
return mlflow.tracking.MlflowClient(tracking_uri=self.uri) | ||
|
||
def start_exp( | ||
def _start_exp( | ||
self, | ||
*, | ||
experiment_id: Optional[Text] = None, | ||
experiment_name: Optional[Text] = None, | ||
recorder_id: Optional[Text] = None, | ||
recorder_name: Optional[Text] = None, | ||
uri: Optional[Text] = None, | ||
resume: bool = False, | ||
): | ||
# Set the tracking uri | ||
self.set_uri(uri) | ||
# Create experiment | ||
if experiment_name is None: | ||
experiment_name = self._default_exp_name | ||
|
@@ -345,12 +347,10 @@ def start_exp( | |
|
||
return self.active_experiment | ||
|
||
def end_exp(self, recorder_status: Text = Recorder.STATUS_S): | ||
def _end_exp(self, recorder_status: Text = Recorder.STATUS_S): | ||
if self.active_experiment is not None: | ||
self.active_experiment.end(recorder_status) | ||
self.active_experiment = None | ||
# When an experiment end, we will release the current uri. | ||
self._current_uri = None | ||
|
||
def create_exp(self, experiment_name: Optional[Text] = None): | ||
assert experiment_name is not None | ||
|
@@ -362,9 +362,7 @@ def create_exp(self, experiment_name: Optional[Text] = None): | |
raise ExpAlreadyExistError() from e | ||
raise e | ||
|
||
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) | ||
experiment._default_name = self._default_exp_name | ||
return experiment | ||
return MLflowExperiment(experiment_id, experiment_name, self.uri) | ||
|
||
def _get_exp(self, experiment_id=None, experiment_name=None): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -329,7 +329,7 @@ def get_local_dir(self): | |
def start_run(self): | ||
# set the tracking uri | ||
mlflow.set_tracking_uri(self.uri) | ||
# start the run | ||
# start the RuntimeError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does the start the RuntimeError mean? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed now |
||
run = mlflow.start_run(self.id, self.experiment_id, self.name) | ||
# save the run id and artifact_uri | ||
self.id = run.info.run_id | ||
|
@@ -378,14 +378,15 @@ def end_run(self, status: str = Recorder.STATUS_S): | |
Recorder.STATUS_FI, | ||
Recorder.STATUS_FA, | ||
], f"The status type {status} is not supported." | ||
mlflow.end_run(status) | ||
self.end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | ||
if self.status != Recorder.STATUS_S: | ||
self.status = status | ||
if self.async_log is not None: | ||
# Waiting Queue should go before mlflow.end_run. Otherwise mlflow will raise error | ||
with TimeInspector.logt("waiting `async_log`"): | ||
self.async_log.wait() | ||
self.async_log = None | ||
mlflow.end_run(status) | ||
|
||
def save_objects(self, local_path=None, artifact_path=None, **kwargs): | ||
assert self.uri is not None, "Please start the experiment and recorder first before using recorder directly." | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Some implementation of Qlib depends on some assumptions of its dependencies. | ||
|
||
So some tests are requried to ensure that these assumptions are required. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
import unittest | ||
import mlflow | ||
import time | ||
from pathlib import Path | ||
import shutil | ||
|
||
|
||
class MLflowTest(unittest.TestCase): | ||
TMP_PATH = Path("./.mlruns_tmp/") | ||
|
||
def tearDown(self) -> None: | ||
if self.TMP_PATH.exists(): | ||
shutil.rmtree(self.TMP_PATH) | ||
|
||
def test_creating_client(self): | ||
""" | ||
Please refer to qlib/workflow/expm.py:MLflowExpManager._client | ||
we don't cache _client (this is helpful to reduce maintainance work when MLflowExpManager's uri is chagned) | ||
|
||
This implementation is based on the assumption creating a client is fast | ||
""" | ||
start = time.time() | ||
for i in range(10): | ||
_ = mlflow.tracking.MlflowClient(tracking_uri=str(self.TMP_PATH)) | ||
end = time.time() | ||
elasped = end - start | ||
self.assertGreater(1e-2, elasped) # it can be done in less than 10ms | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. assertGreater or assertLesser There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have changed the direction. And it is more readable than before |
||
print(elasped) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the set of default_uri is not engouh. The self.exp_manager is not change the client uri if you only change default uri.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is fixed now.
The _client will not be cached. No further maintenance is required for it.