Skip to content

Commit

Permalink
Merge pull request #340 from Derek-Wds/main
Browse files Browse the repository at this point in the history
Support resuming recorder
  • Loading branch information
you-n-g authored Mar 17, 2021
2 parents 4de628c + d78e42e commit 689774c
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 53 deletions.
21 changes: 17 additions & 4 deletions qlib/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,27 @@ def __repr__(self):

@contextmanager
def start(
self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None
self,
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool = False,
):
"""
Method to start an experiment. This method can only be called within a Python's `with` statement. Here is the example code:
.. code-block:: Python
# start new experiment and recorder
with R.start('test', 'recorder_1'):
model.fit(dataset)
R.log...
... # further operations
# resume previous experiment and recorder
with R.start('test', 'recorder_1', resume=True): # if users want to resume recorder, they have to specify the exact same name for experiment and recorder.
... # further operations
Parameters
----------
experiment_name : str
Expand All @@ -45,16 +54,18 @@ def start(
The default uri is set in the qlib.config. Note that this uri argument will not change the one defined in the config file.
Therefore, the next time when users call this function in the same experiment,
they have to also specify this argument with the same value. Otherwise, inconsistent uri may occur.
resume : bool
whether to resume the specific recorder with given name under the given experiment.
"""
run = self.start_exp(experiment_name, recorder_name, uri)
run = self.start_exp(experiment_name, recorder_name, uri, resume)
try:
yield run
except Exception as e:
self.end_exp(Recorder.STATUS_FA) # end the experiment if something went wrong
raise e
self.end_exp(Recorder.STATUS_FI)

def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
def start_exp(self, experiment_name=None, recorder_name=None, uri=None, resume=False):
"""
Lower level method for starting an experiment. When use this method, one should end the experiment manually
and the status of the recorder may not be handled properly. Here is the example code:
Expand All @@ -75,12 +86,14 @@ def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
uri : str
the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored.
The default uri are set in the qlib.config.
resume : bool
whether to resume the specific recorder with given name under the given experiment.
Returns
-------
An experiment instance being started.
"""
return self.exp_manager.start_exp(experiment_name, recorder_name, uri)
return self.exp_manager.start_exp(experiment_name, recorder_name, uri, resume)

def end_exp(self, recorder_status=Recorder.STATUS_FI):
"""
Expand Down
105 changes: 68 additions & 37 deletions qlib/workflow/exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ def info(self):
output["recorders"] = list(recorders.keys())
return output

def start(self, recorder_name=None):
def start(self, recorder_name=None, resume=False):
"""
Start the experiment and set it to be active. This method will also start a new recorder.
Parameters
----------
recorder_name : str
the name of the recorder to be created.
resume : bool
whether to resume the first recorder
Returns
-------
Expand Down Expand Up @@ -149,7 +151,57 @@ def get_recorder(self, recorder_id=None, recorder_name=None, create: bool = True
-------
A recorder object.
"""
raise NotImplementedError(f"Please implement the `get_recorder` method.")
# special case of getting the recorder
if recorder_id is None and recorder_name is None:
if self.active_recorder is not None:
return self.active_recorder
recorder_name = self._default_rec_name
if create:
recorder, is_new = self._get_or_create_rec(recorder_id=recorder_id, recorder_name=recorder_name)
else:
recorder, is_new = self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
if is_new:
self.active_recorder = recorder
# start the recorder
self.active_recorder.start_run()
return recorder

def _get_or_create_rec(self, recorder_id=None, recorder_name=None) -> (object, bool):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
automatically create a new recorder based on the given id and name.
"""
try:
if recorder_id is None and recorder_name is None:
recorder_name = self._default_rec_name
return self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
except ValueError:
if recorder_name is None:
recorder_name = self._default_rec_name
logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
return self.create_recorder(recorder_name), True

def _get_recorder(self, recorder_id=None, recorder_name=None):
"""
Get specific recorder by name or id. If it does not exist, raise ValueError
Parameters
----------
recorder_id :
The id of recorder
recorder_name :
The name of recorder
Returns
-------
Recorder:
The searched recorder
Raises
------
ValueError
"""
raise NotImplementedError(f"Please implement the `_get_recorder` method")

def list_recorders(self):
"""
Expand Down Expand Up @@ -178,12 +230,20 @@ def __init__(self, id, name, uri):
def __repr__(self):
return "{name}(id={id}, info={info})".format(name=self.__class__.__name__, id=self.id, info=self.info)

def start(self, recorder_name=None):
def start(self, recorder_name=None, resume=False):
logger.info(f"Experiment {self.id} starts running ...")
# set up recorder
recorder = self.create_recorder(recorder_name)
# Get or create recorder
if recorder_name is None:
recorder_name = self._default_rec_name
# resume the recorder
if resume:
recorder, _ = self._get_or_create_rec(recorder_name=recorder_name)
# create a new recorder
else:
recorder = self.create_recorder(recorder_name)
# Set up active recorder
self.active_recorder = recorder
# start the recorder
# Start the recorder
self.active_recorder.start_run()

return self.active_recorder
Expand All @@ -200,35 +260,6 @@ def create_recorder(self, recorder_name=None):

return recorder

def get_recorder(self, recorder_id=None, recorder_name=None, create=True):
# special case of getting the recorder
if recorder_id is None and recorder_name is None:
if self.active_recorder is not None:
return self.active_recorder
recorder_name = self._default_rec_name
if create:
recorder, is_new = self._get_or_create_rec(recorder_id=recorder_id, recorder_name=recorder_name)
else:
recorder, is_new = self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
if is_new:
self.active_recorder = recorder
# start the recorder
self.active_recorder.start_run()
return recorder

def _get_or_create_rec(self, recorder_id=None, recorder_name=None) -> (object, bool):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
automatically create a new recorder based on the given id and name.
"""
try:
return self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
except ValueError:
if recorder_name is None:
recorder_name = self._default_rec_name
logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
return self.create_recorder(recorder_name), True

def _get_recorder(self, recorder_id=None, recorder_name=None):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
Expand All @@ -246,7 +277,7 @@ def _get_recorder(self, recorder_id=None, recorder_name=None):
raise ValueError("No valid recorder has been found, please make sure the input recorder id is correct.")
elif recorder_name is not None:
logger.warning(
f"Please make sure the recorder name {recorder_name} is unique, we will only return the first recorder if there exist several matched the given name."
f"Please make sure the recorder name {recorder_name} is unique, we will only return the latest recorder if there exist several matched the given name."
)
recorders = self.list_recorders()
for rid in recorders:
Expand Down Expand Up @@ -280,7 +311,7 @@ def delete_recorder(self, recorder_id=None, recorder_name=None):
UNLIMITED = 50000 # FIXME: Mlflow can only list 50000 records at most!!!!!!!

def list_recorders(self, max_results=UNLIMITED):
runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)[::-1]
runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)
recorders = dict()
for i in range(len(runs)):
recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i])
Expand Down
29 changes: 17 additions & 12 deletions qlib/workflow/expm.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ExpManager:

def __init__(self, uri: Text, default_exp_name: Optional[Text]):
self._current_uri = uri
self.default_exp_name = default_exp_name
self._default_exp_name = default_exp_name
self.active_experiment = None # only one experiment can active each time

def __repr__(self):
Expand All @@ -36,6 +36,7 @@ def start_exp(
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool = False,
**kwargs,
):
"""
Expand All @@ -50,6 +51,8 @@ def start_exp(
name of the recorder to be started.
uri : str
the current tracking URI.
resume : boolean
whether to resume the experiment and recorder.
Returns
-------
Expand Down Expand Up @@ -151,9 +154,7 @@ def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True)
if self.active_experiment is not None:
return self.active_experiment
# User don't want get active code now.
# Don't assume underlying code could handle the case of two None
if experiment_id is None and experiment_name is None:
experiment_name = self.default_exp_name
experiment_name = self._default_exp_name

if create:
exp, is_new = self._get_or_create_exp(experiment_id=experiment_id, experiment_name=experiment_name)
Expand All @@ -171,25 +172,23 @@ def _get_or_create_exp(self, experiment_id=None, experiment_name=None) -> (objec
automatically create a new experiment based on the given id and name.
"""
try:
if experiment_id is None and experiment_name is None:
experiment_name = self.default_exp_name
return self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False
except ValueError:
if experiment_name is None:
experiment_name = self.default_exp_name
experiment_name = self._default_exp_name
logger.info(f"No valid experiment found. Create a new experiment with name {experiment_name}.")
return self.create_exp(experiment_name), True

def _get_exp(self, experiment_id=None, experiment_name=None) -> Experiment:
"""
get specific experiment by name or id. If it does not exist, raise ValueError
Get specific experiment by name or id. If it does not exist, raise ValueError.
Parameters
----------
experiment_id :
The id of experiment
experiment_name :
The id name experiment
The name of experiment
Returns
-------
Expand Down Expand Up @@ -291,16 +290,22 @@ def client(self):
return self._client

def start_exp(
self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None
self,
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool = False,
):
# Set the tracking uri
self.set_uri(uri)
# Create experiment
if experiment_name is None:
experiment_name = self._default_exp_name
experiment, _ = self._get_or_create_exp(experiment_name=experiment_name)
# Set up active experiment
self.active_experiment = experiment
# Start the experiment
self.active_experiment.start(recorder_name)
self.active_experiment.start(recorder_name, resume)

return self.active_experiment

Expand All @@ -316,7 +321,7 @@ def create_exp(self, experiment_name: Optional[Text] = None):
# init experiment
experiment_id = self.client.create_experiment(experiment_name)
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
experiment._default_name = self.default_exp_name
experiment._default_name = self._default_exp_name

return experiment

Expand Down

0 comments on commit 689774c

Please sign in to comment.