From 1a4cce7b3ba6f4dac8f18512c8caa3bafdf744d4 Mon Sep 17 00:00:00 2001 From: Tim Mensinger Date: Thu, 23 Jan 2025 21:59:08 +0100 Subject: [PATCH] Save changes (not working and non-verified) --- src/optimagic/optimization/history.py | 155 +++++++++++------ tests/optimagic/optimization/test_history.py | 170 ++++++++++--------- 2 files changed, 194 insertions(+), 131 deletions(-) diff --git a/src/optimagic/optimization/history.py b/src/optimagic/optimization/history.py index 2eebb80fb..763d294eb 100644 --- a/src/optimagic/optimization/history.py +++ b/src/optimagic/optimization/history.py @@ -99,7 +99,7 @@ def _get_next_batch_id(self) -> int: # Function data, function value, and monotone function value # ---------------------------------------------------------------------------------- - def fun_data(self, cost_model: CostModel, monotone: bool) -> pd.DataFrame: + def fun_data(self, cost_model: CostModel, monotone: bool, dropna: bool = False) -> pd.DataFrame: """Return the function value data. Args: @@ -116,13 +116,37 @@ def fun_data(self, cost_model: CostModel, monotone: bool) -> pd.DataFrame: fun: list[float | None] | NDArray[np.float64] = self.monotone_fun else: fun = self.fun - task = _task_as_categorical(self.task) - time = self._get_time(cost_model) - return pd.DataFrame({"fun": fun, "task": task, "time": time}) + timings = self._get_total_timings(cost_model) + + if not self.is_serial: + + timings = _apply_reduction_to_batches( + data=timings, + batch_ids=self.batches, + reduction_function=cost_model.aggregate_batch_time, + ) + + min_or_max = np.nanmin if self.direction == Direction.MINIMIZE else np.nanmax + fun = _apply_reduction_to_batches( + data=fun, + batch_ids=self.batches, + reduction_function=min_or_max, + ) + + time = np.cumsum(timings) + data = pd.DataFrame({"fun": fun, "time": time}) + + if self.is_serial: + data["task"] = _task_to_categorical(self.task) + + if dropna: + data = data.dropna() + + return data.rename_axis("counter") @property - def fun(self) -> list[float | None]: - return self._fun + def fun(self) -> NDArray[np.float64]: + return np.array(self._fun, dtype=np.float64) @property def monotone_fun(self) -> NDArray[np.float64]: @@ -155,24 +179,52 @@ def is_accepted(self) -> NDArray[np.bool_]: # Parameter data, params, flat params, and flat params names # ---------------------------------------------------------------------------------- - def params_data(self, cost_model: CostModel) -> pd.DataFrame: + def params_data(self, dropna: bool = False) -> pd.DataFrame: """Return the parameter data. Args: - cost_model: The cost model that is used to calculate the time measure. + dropna: Whether to drop rows with missing values. These correspond to + parameters that were used to calculate a pure jacobian. Returns: pd.DataFrame: The parameter data. The columns are: 'name' (the parameter - names), 'value' (the parameter values), 'task', and 'time'. + names), 'value' (the parameter values), 'task' (the task for which the + parameter was used), and 'counter' (a counter that is unique for each + row). """ wide = pd.DataFrame(self.flat_params, columns=self.flat_param_names) - wide["task"] = _task_as_categorical(self.task) - wide["time"] = self._get_time(cost_model) - data = pd.melt( - wide, var_name="name", value_name="value", id_vars=["task", "time"] + wide["task"] = _task_to_categorical(self.task) + wide["batches"] = self.batches + wide["fun"] = self.fun + + # 1. Get the location of the best function value in each batch and corre. params + # 2. Make long + + if not self.is_serial: + + if self.direction == Direction.MINIMIZE: + loc = data.groupby("batches")["fun"].idxmin() + elif self.direction == Direction.MAXIMIZE: + loc = data.groupby("batches")["fun"].idxmax() + + breakpoint() + + data = data.loc[loc] + + data = data.drop(columns=["batches", "fun"]) + + long = pd.melt( + wide, var_name="name", value_name="value", id_vars=["task", "batches", "fun"] ) - return data.reindex(columns=["name", "value", "task", "time"]) + + data = long.reindex(columns=["name", "value", "task", "batches", "fun"]) + + if dropna: + data = data.dropna() + + return data.rename_axis("counter") + @property def params(self) -> list[PyTree]: @@ -189,17 +241,17 @@ def flat_param_names(self) -> list[str]: # Time # ---------------------------------------------------------------------------------- - def _get_time( + def _get_total_timings( self, cost_model: CostModel | Literal["wall_time"] ) -> NDArray[np.float64]: - """Return the cumulative time measure. + """Return the total timings across all tasks. Args: cost_model: The cost model that is used to calculate the time measure. If "wall_time", the wall time is returned. Returns: - np.ndarray: The time measure. + np.ndarray: The sum of the timings across all tasks. """ if not isinstance(cost_model, CostModel) and cost_model != "wall_time": @@ -208,25 +260,20 @@ def _get_time( if cost_model == "wall_time": return np.array(self.stop_time, dtype=np.float64) - self.start_time[0] - fun_time = self._get_time_per_task( + fun_time = self._get_timings_per_task( task=EvalTask.FUN, cost_factor=cost_model.fun ) - jac_time = self._get_time_per_task( + jac_time = self._get_timings_per_task( task=EvalTask.JAC, cost_factor=cost_model.jac ) - fun_and_jac_time = self._get_time_per_task( + fun_and_jac_time = self._get_timings_per_task( task=EvalTask.FUN_AND_JAC, cost_factor=cost_model.fun_and_jac ) - time = fun_time + jac_time + fun_and_jac_time - batch_aware_time = _apply_to_batch( - data=time, - batch_ids=self.batches, - func=cost_model.aggregate_batch_time, - ) - return np.cumsum(batch_aware_time) + return fun_time + jac_time + fun_and_jac_time + - def _get_time_per_task( + def _get_timings_per_task( self, task: EvalTask, cost_factor: float | None ) -> NDArray[np.float64]: """Return the time measure per task. @@ -261,12 +308,16 @@ def start_time(self) -> list[float]: def stop_time(self) -> list[float]: return self._stop_time - # Batches + # Batches and fast_path # ---------------------------------------------------------------------------------- @property def batches(self) -> list[int]: return self._batches + + @property + def is_serial(self) -> bool: + return all(self.batches == np.arange(len(self.batches))) # Tasks # ---------------------------------------------------------------------------------- @@ -380,16 +431,16 @@ def _validate_args_are_all_none_or_lists_of_same_length( raise ValueError("All arguments must be lists of the same length or None.") -def _task_as_categorical(task: list[EvalTask]) -> pd.Categorical: +def _task_to_categorical(task: list[EvalTask]) -> pd.Categorical: return pd.Categorical( [t.value for t in task], categories=[t.value for t in EvalTask] ) -def _apply_to_batch( +def _apply_reduction_to_batches( data: NDArray[np.float64], batch_ids: list[int], - func: Callable[[Iterable[float]], float], + reduction_function: Callable[[Iterable[float]], float], ) -> NDArray[np.float64]: """Apply a reduction operator on batches of data. @@ -399,50 +450,48 @@ def _apply_to_batch( data: 1d array with data. batch_ids: A list with batch ids whose length is equal to the size of data. Values need to be sorted and can be repeated. - func: A reduction function that takes an iterable of floats as input (e.g., a - numpy.ndarray or list) and returns a scalar. + reduction_function: A reduction function that takes an iterable of floats as + input (e.g., a numpy.ndarray or list) and returns a scalar. Returns: - The transformed data. Has the same length as data. For each batch, the result of - the reduction operation is stored at the first index of that batch, and all - other values of that batch are set to zero. + The transformed data. Has one entry per unique batch id, equal to the result of + applying the reduction function to the data of that batch. """ - batch_starts = _get_batch_start(batch_ids) - batch_stops = [*batch_starts[1:], len(data)] + batch_starts, batch_stops = _get_batch_starts_and_stops(batch_ids) batch_results: list[float] = [] + for start, stop in zip(batch_starts, batch_stops, strict=True): batch_data = data[start:stop] batch_id = batch_ids[start] try: - reduced = func(batch_data) + reduced = reduction_function(batch_data) except Exception as e: msg = ( - f"Calling function {func.__name__} on batch {batch_id} of the History " - f"raised an Exception. Please verify that {func.__name__} is " - "well-defined, takes a list of floats as input, and returns a scalar." + f"Calling function {reduction_function.__name__} on batch {batch_id} " + "of the History raised an Exception. Please verify that " + f"{reduction_function.__name__} is well-defined, takes a list of " + "floats as input, and returns a scalar." ) raise ValueError(msg) from e if not np.isscalar(reduced): msg = ( - f"Function {func.__name__} did not return a scalar for batch " - f"{batch_id}. Please verify that {func.__name__} returns a " - "scalar when called on a list of floats." + f"Function {reduction_function.__name__} did not return a scalar for " + f"batch {batch_id}. Please verify that {reduction_function.__name__} " + "returns a scalar when called on a list of floats." ) raise ValueError(msg) batch_results.append(reduced) # type: ignore[arg-type] - out = np.zeros_like(data) - out[batch_starts] = batch_results - return out + return np.array(batch_results, dtype=np.float64) -def _get_batch_start(batch_ids: list[int]) -> list[int]: - """Get start indices of batches. +def _get_batch_starts_and_stops(batch_ids: list[int]) -> tuple[list[int], list[int]]: + """Get start and stop indices of batches. This function assumes that batch_ids non-empty and sorted. @@ -450,4 +499,6 @@ def _get_batch_start(batch_ids: list[int]) -> list[int]: ids_arr = np.array(batch_ids, dtype=np.int64) indices = np.where(ids_arr[:-1] != ids_arr[1:])[0] + 1 list_indices: list[int] = indices.tolist() # type: ignore[assignment] - return [0, *list_indices] + starts = [0, *list_indices] + stops = [*starts[1:], len(batch_ids)] + return starts, stops diff --git a/tests/optimagic/optimization/test_history.py b/tests/optimagic/optimization/test_history.py index 9a2fe6109..165185b3b 100644 --- a/tests/optimagic/optimization/test_history.py +++ b/tests/optimagic/optimization/test_history.py @@ -10,13 +10,13 @@ from optimagic.optimization.history import ( History, HistoryEntry, - _apply_to_batch, + _apply_reduction_to_batches, _calculate_monotone_sequence, - _get_batch_start, + _get_batch_starts_and_stops, _get_flat_param_names, _get_flat_params, _is_1d_array, - _task_as_categorical, + _task_to_categorical, _validate_args_are_all_none_or_lists_of_same_length, ) from optimagic.typing import Direction, EvalTask @@ -65,9 +65,9 @@ def test_history_add_entry(history_entries): {"a": 4, "b": [5, 6]}, {"a": 7, "b": [8, 9]}, ] - assert history.fun == [1, 3, 2] assert history.task == [EvalTask.FUN, EvalTask.FUN, EvalTask.FUN] assert history.batches == [0, 1, 2] + aaae(history.fun, [1, 3, 2]) aaae(history.start_time, [0.1, 0.2, 0.3]) aaae(history.stop_time, [0.2, 0.3, 0.4]) @@ -88,9 +88,9 @@ def test_history_add_batch(history_entries): {"a": 4, "b": [5, 6]}, {"a": 7, "b": [8, 9]}, ] - assert history.fun == [1, 3, 2] assert history.task == [EvalTask.FUN, EvalTask.FUN, EvalTask.FUN] assert history.batches == [0, 0, 0] + aaae(history.fun, [1, 3, 2]) aaae(history.start_time, [0.1, 0.2, 0.3]) aaae(history.stop_time, [0.2, 0.3, 0.4]) @@ -118,9 +118,9 @@ def test_history_from_data(): assert history.direction == Direction.MAXIMIZE assert history.params == data["params"] - assert history.fun == data["fun"] assert history.task == data["task"] assert history.batches == data["batches"] + aaae(history.fun, data["fun"]) aaae(history.start_time, data["start_time"]) aaae(history.stop_time, data["stop_time"]) @@ -179,7 +179,7 @@ def history_parallel(history_data): # -------------------------------------------------------------------------------------- -def test_history_fun_data_with_fun_evaluations_cost_model(history): +def test_history_fun_data_with_fun_evaluations_cost_model(history: History): got = history.fun_data( cost_model=om.timing.fun_evaluations, monotone=False, @@ -187,6 +187,7 @@ def test_history_fun_data_with_fun_evaluations_cost_model(history): exp = pd.DataFrame( { "fun": [10, np.nan, 9, np.nan, 2, 5], + "time": [1, 1, 2, 2, 3, 4], "task": [ "fun", "jac", @@ -195,13 +196,12 @@ def test_history_fun_data_with_fun_evaluations_cost_model(history): "fun", "fun_and_jac", ], - "time": [1, 1, 2, 2, 3, 4], } - ) + ).rename_axis("counter") assert_frame_equal(got, exp, check_dtype=False, check_categorical=False) -def test_history_fun_data_with_fun_evaluations_cost_model_and_monotone(history): +def test_history_fun_data_with_fun_evaluations_cost_model_and_monotone(history: History): got = history.fun_data( cost_model=om.timing.fun_evaluations, monotone=True, @@ -209,6 +209,7 @@ def test_history_fun_data_with_fun_evaluations_cost_model_and_monotone(history): exp = pd.DataFrame( { "fun": [10, np.nan, 9, np.nan, 2, 2], + "time": [1, 1, 2, 2, 3, 4], "task": [ "fun", "jac", @@ -217,35 +218,26 @@ def test_history_fun_data_with_fun_evaluations_cost_model_and_monotone(history): "fun", "fun_and_jac", ], - "time": [1, 1, 2, 2, 3, 4], } - ) + ).rename_axis("counter") assert_frame_equal(got, exp, check_dtype=False, check_categorical=False) -def test_history_fun_data_with_fun_batches_cost_model(history_parallel): +def test_history_fun_data_with_fun_batches_cost_model(history_parallel: History): got = history_parallel.fun_data( cost_model=om.timing.fun_batches, monotone=False, ) exp = pd.DataFrame( { - "fun": [10, np.nan, 9, np.nan, 2, 5], - "task": [ - "fun", - "jac", - "fun", - "jac", - "fun", - "fun_and_jac", - ], - "time": [1, 1, 2, 2, 3, 3], + "fun": [10, 9, 2], + "time": [1, 2, 3], } - ) + ).rename_axis("counter") assert_frame_equal(got, exp, check_dtype=False, check_categorical=False) -def test_history_fun_data_with_evaluation_time_cost_model(history): +def test_history_fun_data_with_evaluation_time_cost_model(history: History): got = history.fun_data( cost_model=om.timing.evaluation_time, monotone=False, @@ -253,6 +245,7 @@ def test_history_fun_data_with_evaluation_time_cost_model(history): exp = pd.DataFrame( { "fun": [10, np.nan, 9, np.nan, 2, 5], + "time": [1, 3, 4, 6, 7, 9], "task": [ "fun", "jac", @@ -261,17 +254,16 @@ def test_history_fun_data_with_evaluation_time_cost_model(history): "fun", "fun_and_jac", ], - "time": [1, 3, 4, 6, 7, 9], } - ) + ).rename_axis("counter") assert_frame_equal(got, exp, check_dtype=False, check_categorical=False) -def test_fun_property(history): - assert_array_equal(history.fun, [10, None, 9, None, 2, 5]) +def test_fun_property(history: History): + assert_array_equal(history.fun, np.array([10, np.nan, 9, np.nan, 2, 5])) -def test_monotone_fun_property(history): +def test_monotone_fun_property(history: History): assert_array_equal(history.monotone_fun, np.array([10, np.nan, 9, np.nan, 2, 2])) @@ -279,7 +271,7 @@ def test_monotone_fun_property(history): # -------------------------------------------------------------------------------------- -def test_is_accepted_property(history): +def test_is_accepted_property(history: History): got = history.is_accepted exp = np.array([True, False, True, False, True, False]) assert_array_equal(got, exp) @@ -289,8 +281,8 @@ def test_is_accepted_property(history): # -------------------------------------------------------------------------------------- -def test_params_data_fun_evaluations_cost_model(history): - got = history.params_data(cost_model=om.timing.fun_evaluations) +def test_params_data_fun_evaluations_cost_model(history: History): + got = history.params_data() exp = pd.DataFrame( { "name": np.repeat( @@ -314,9 +306,28 @@ def test_params_data_fun_evaluations_cost_model(history): ], 4, ), + } + ).rename_axis("counter") + assert_frame_equal(got, exp, check_categorical=False, check_dtype=False) + + +def test_params_data_fun_evaluations_cost_model(history_parallel: History): + got = history_parallel.params_data() + exp = pd.DataFrame( + { + "name": np.repeat( + [ + "a", + "b_c", + "b_d_0", + "b_d_1", + ], + 3, + ), + "value": np.tile(list(range(3)), 4), "time": np.tile([1, 1, 2, 2, 3, 4], 4), } - ) + ).rename_axis("counter") assert_frame_equal(got, exp, check_categorical=False, check_dtype=False) @@ -324,12 +335,12 @@ def test_params_property(history, params): assert history.params == params -def test_flat_params_property(history): +def test_flat_params_property(history: History): got = history.flat_params assert_array_equal(got, [[k for _ in range(4)] for k in range(6)]) -def test_flat_param_names(history): +def test_flat_param_names(history: History): assert history.flat_param_names == ["a", "b_c", "b_d_0", "b_d_1"] @@ -337,84 +348,84 @@ def test_flat_param_names(history): # -------------------------------------------------------------------------------------- -def test_get_time_per_task_fun(history): - got = history._get_time_per_task(EvalTask.FUN, cost_factor=1) +def test_get_total_timings_per_task_fun(history: History): + got = history._get_timings_per_task(EvalTask.FUN, cost_factor=1) exp = np.array([1, 0, 1, 0, 1, 0]) assert_array_equal(got, exp) -def test_get_time_per_task_jac_cost_factor_none(history): - got = history._get_time_per_task(EvalTask.JAC, cost_factor=None) +def test_get_total_timings_per_task_jac_cost_factor_none(history: History): + got = history._get_timings_per_task(EvalTask.JAC, cost_factor=None) exp = np.array([0, 2, 0, 2, 0, 0]) assert_array_equal(got, exp) -def test_get_time_per_task_fun_and_jac(history): - got = history._get_time_per_task(EvalTask.FUN_AND_JAC, cost_factor=-0.5) +def test_get_total_timings_per_task_fun_and_jac(history: History): + got = history._get_timings_per_task(EvalTask.FUN_AND_JAC, cost_factor=-0.5) exp = np.array([0, 0, 0, 0, 0, -0.5]) assert_array_equal(got, exp) -def test_get_time_custom_cost_model(history): +def test_get_total_timings_custom_cost_model(history: History): cost_model = om.timing.CostModel( fun=0.5, jac=1, fun_and_jac=2, label="test", aggregate_batch_time=sum ) - got = history._get_time(cost_model) + got = history._get_total_timings(cost_model) exp = np.array( [ 0.5, - 0.5 + 1, - 1 + 1, - 1 + 2, - 1.5 + 2, - 1.5 + 2 + 2, + 1, + 0.5, + 1, + 0.5, + 2, ] ) assert_array_equal(got, exp) -def test_get_time_fun_evaluations(history): - got = history._get_time(cost_model=om.timing.fun_evaluations) - exp = np.array([1, 1, 2, 2, 3, 4]) +def test_get_total_timings_fun_evaluations(history: History): + got = history._get_total_timings(cost_model=om.timing.fun_evaluations) + exp = np.array([1, 0, 1, 0, 1, 1]) assert_array_equal(got, exp) -def test_get_time_fun_batches(history): - got = history._get_time(cost_model=om.timing.fun_batches) - exp = np.array([1, 1, 2, 2, 3, 4]) +def test_get_total_timings_fun_batches(history: History): + got = history._get_total_timings(cost_model=om.timing.fun_batches) + exp = np.array([1, 0, 1, 0, 1, 1]) assert_array_equal(got, exp) -def test_get_time_fun_batches_parallel(history_parallel): - got = history_parallel._get_time(cost_model=om.timing.fun_batches) - exp = np.array([1, 1, 2, 2, 3, 3]) +def test_get_total_timings_fun_batches_parallel(history_parallel: History): + got = history_parallel._get_total_timings(cost_model=om.timing.fun_batches) + exp = np.array([1, 0, 1, 0, 1, 1]) assert_array_equal(got, exp) -def test_get_time_evaluation_time(history): - got = history._get_time(cost_model=om.timing.evaluation_time) - exp = np.array([1, 3, 4, 6, 7, 9]) +def test_get_total_timings_evaluation_time(history: History): + got = history._get_total_timings(cost_model=om.timing.evaluation_time) + exp = np.array([1, 2, 1, 2, 1, 2]) assert_array_equal(got, exp) -def test_get_time_wall_time(history): - got = history._get_time(cost_model="wall_time") +def test_get_total_timings_wall_time(history: History): + got = history._get_total_timings(cost_model="wall_time") exp = np.array([1, 4, 6, 9, 11, 14]) assert_array_equal(got, exp) -def test_get_time_invalid_cost_model(history): +def test_get_total_timings_invalid_cost_model(history: History): with pytest.raises( TypeError, match="cost_model must be a CostModel or 'wall_time'." ): - history._get_time(cost_model="invalid") + history._get_total_timings(cost_model="invalid") -def test_start_time_property(history): +def test_start_time_property(history: History): assert history.start_time == [0, 2, 5, 7, 10, 12] -def test_stop_time_property(history): +def test_stop_time_property(history: History): assert history.stop_time == [1, 4, 6, 9, 11, 14] @@ -422,7 +433,7 @@ def test_stop_time_property(history): # -------------------------------------------------------------------------------------- -def test_batches_property(history): +def test_batches_property(history: History): assert history.batches == [0, 1, 2, 3, 4, 5] @@ -430,7 +441,7 @@ def test_batches_property(history): # -------------------------------------------------------------------------------------- -def test_task_property(history): +def test_task_property(history: History): assert history.task == [ EvalTask.FUN, EvalTask.JAC, @@ -511,30 +522,31 @@ def test_validate_args_are_all_none_or_lists_of_same_length(): def test_task_as_categorical(): task = [EvalTask.FUN, EvalTask.JAC, EvalTask.FUN_AND_JAC] - got = _task_as_categorical(task) + got = _task_to_categorical(task) assert got.tolist() == ["fun", "jac", "fun_and_jac"] assert isinstance(got.dtype, pd.CategoricalDtype) -def test_get_batch_start(): +def test_get_batch_starts_and_stops(): batches = [0, 0, 1, 1, 1, 2, 2, 3] - got = _get_batch_start(batches) - assert got == [0, 2, 5, 7] + got_starts, got_stops = _get_batch_starts_and_stops(batches) + assert got_starts == [0, 2, 5, 7] + assert got_stops == [2, 5, 7, 8] def test_apply_to_batch_sum(): data = np.array([0, 1, 2, 3, 4]) batch_ids = [0, 0, 1, 1, 2] - exp = np.array([1, 0, 5, 0, 4]) - got = _apply_to_batch(data, batch_ids, sum) + exp = np.array([1, 5, 4]) + got = _apply_reduction_to_batches(data, batch_ids, sum) assert_array_equal(exp, got) def test_apply_to_batch_max(): data = np.array([0, 1, 2, 3, 4]) batch_ids = [0, 0, 1, 1, 2] - exp = np.array([1, 0, 3, 0, 4]) - got = _apply_to_batch(data, batch_ids, max) + exp = np.array([1, 3, 4]) + got = _apply_reduction_to_batches(data, batch_ids, max) assert_array_equal(exp, got) @@ -542,11 +554,11 @@ def test_apply_to_batch_broken_func(): data = np.array([0, 1, 2, 3, 4]) batch_ids = [0, 0, 1, 1, 2] with pytest.raises(ValueError, match="Calling function on batch [0, 0]"): - _apply_to_batch(data, batch_ids, func=lambda _: 1 / 0) + _apply_reduction_to_batches(data, batch_ids, reduction_function=lambda _: 1 / 0) def test_apply_to_batch_func_with_non_scalar_return(): data = np.array([0, 1, 2, 3, 4]) batch_ids = [0, 0, 1, 1, 2] with pytest.raises(ValueError, match="Function did not return a scalar"): - _apply_to_batch(data, batch_ids, func=lambda _list: _list) + _apply_reduction_to_batches(data, batch_ids, reduction_function=lambda _list: _list)