-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor aggregate #708
Merged
Merged
Refactor aggregate #708
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,12 +3,12 @@ | |
from typing import Any, Dict, List, Union | ||
import uuid | ||
|
||
from osbenchmark.metrics import FileTestExecutionStore | ||
from osbenchmark.metrics import FileTestExecutionStore, TestExecution | ||
from osbenchmark import metrics, workload, config | ||
from osbenchmark.utils import io as rio | ||
|
||
class Aggregator: | ||
def __init__(self, cfg, test_executions_dict, args): | ||
def __init__(self, cfg, test_executions_dict, args) -> None: | ||
self.config = cfg | ||
self.args = args | ||
self.test_executions = test_executions_dict | ||
|
@@ -21,69 +21,72 @@ def __init__(self, cfg, test_executions_dict, args): | |
self.test_procedure_name = None | ||
self.loaded_workload = None | ||
|
||
def count_iterations_for_each_op(self, test_execution) -> None: | ||
matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None) | ||
def count_iterations_for_each_op(self, test_execution: TestExecution) -> None: | ||
"""Count iterations for each operation in the test execution""" | ||
workload_params = test_execution.workload_params if test_execution.workload_params else {} | ||
|
||
test_execution_id = test_execution.test_execution_id | ||
self.accumulated_iterations[test_execution_id] = {} | ||
|
||
if matching_test_procedure: | ||
for task in matching_test_procedure.schedule: | ||
task_name = task.name | ||
task_name_iterations = f"{task_name}_iterations" | ||
if task_name_iterations in workload_params: | ||
iterations = int(workload_params[task_name_iterations]) | ||
else: | ||
iterations = task.iterations or 1 | ||
self.accumulated_iterations[test_execution_id][task_name] = iterations | ||
else: | ||
raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.") | ||
for task in self.loaded_workload.find_test_procedure_or_default(self.test_procedure_name).schedule: | ||
task_name = task.name | ||
task_name_iterations = f"{task_name}_iterations" | ||
iterations = int(workload_params.get(task_name_iterations, task.iterations or 1)) | ||
self.accumulated_iterations[test_execution_id][task_name] = iterations | ||
|
||
def accumulate_results(self, test_execution: Any) -> None: | ||
for item in test_execution.results.get("op_metrics", []): | ||
task = item.get("task", "") | ||
def accumulate_results(self, test_execution: TestExecution) -> None: | ||
"""Accumulate results from a single test execution""" | ||
for operation_metric in test_execution.results.get("op_metrics", []): | ||
task = operation_metric.get("task", "") | ||
self.accumulated_results.setdefault(task, {}) | ||
for metric in self.metrics: | ||
self.accumulated_results[task].setdefault(metric, []) | ||
self.accumulated_results[task][metric].append(item.get(metric)) | ||
self.accumulated_results[task][metric].append(operation_metric.get(metric)) | ||
|
||
def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any: | ||
all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] | ||
|
||
# retrieve nested value from a dictionary given a key path | ||
def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any: | ||
""" | ||
Aggregates JSON results across multiple test executions using a specified key path. | ||
Handles nested dictionary structures and calculates averages for numeric values | ||
""" | ||
all_json_results = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()] | ||
|
||
def get_nested_value(json_data: Dict[str, Any], path: List[str]) -> Any: | ||
""" | ||
Retrieves a value from a nested dictionary structure using a path of keys. | ||
""" | ||
for key in path: | ||
if isinstance(obj, dict): | ||
obj = obj.get(key, {}) | ||
elif isinstance(obj, list) and key.isdigit(): | ||
obj = obj[int(key)] if int(key) < len(obj) else {} | ||
if isinstance(json_data, dict): | ||
json_data = json_data.get(key, {}) | ||
elif isinstance(json_data, list) and key.isdigit(): | ||
json_data = json_data[int(key)] if int(key) < len(json_data) else {} | ||
else: | ||
return None | ||
return obj | ||
return json_data | ||
|
||
def aggregate_helper(objects: List[Any]) -> Any: | ||
if not objects: | ||
def aggregate_json_elements(json_elements: List[Any]) -> Any: | ||
if not json_elements: | ||
return None | ||
if all(isinstance(obj, (int, float)) for obj in objects): | ||
avg = sum(objects) / len(objects) | ||
return avg | ||
if all(isinstance(obj, dict) for obj in objects): | ||
keys = set().union(*objects) | ||
return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys} | ||
if all(isinstance(obj, list) for obj in objects): | ||
max_length = max(len(obj) for obj in objects) | ||
return [aggregate_helper([obj[i] if i < len(obj) else None for obj in objects]) for i in range(max_length)] | ||
return next((obj for obj in objects if obj is not None), None) | ||
# If all elements are numbers, calculate the average | ||
if all(isinstance(obj, (int, float)) for obj in json_elements): | ||
return sum(json_elements) / len(json_elements) | ||
# If all elements are dictionaries, recursively aggregate their values | ||
if all(isinstance(obj, dict) for obj in json_elements): | ||
keys = set().union(*json_elements) | ||
return {key: aggregate_json_elements([obj.get(key) for obj in json_elements]) for key in keys} | ||
# If all elements are lists, recursively aggregate corresponding elements | ||
if all(isinstance(obj, list) for obj in json_elements): | ||
max_length = max(len(obj) for obj in json_elements) | ||
return [aggregate_json_elements([obj[i] if i < len(obj) else None for obj in json_elements]) for i in range(max_length)] | ||
# If elements are of mixed types, return the first non-None value | ||
return next((obj for obj in json_elements if obj is not None), None) | ||
|
||
if isinstance(key_path, str): | ||
key_path = key_path.split('.') | ||
|
||
values = [get_nested_value(json, key_path) for json in all_jsons] | ||
return aggregate_helper(values) | ||
nested_values = [get_nested_value(json_result, key_path) for json_result in all_json_results] | ||
return aggregate_json_elements(nested_values) | ||
|
||
def build_aggregated_results(self): | ||
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) | ||
def build_aggregated_results_dict(self) -> Dict[str, Any]: | ||
"""Builds a dictionary of aggregated metrics from all test executions""" | ||
aggregated_results = { | ||
"op_metrics": [], | ||
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"), | ||
|
@@ -147,8 +150,30 @@ def build_aggregated_results(self): | |
|
||
aggregated_results["op_metrics"].append(op_metric) | ||
|
||
# extract the necessary data from the first test execution, since the configurations should be identical for all test executions | ||
return aggregated_results | ||
|
||
def update_config_object(self, test_execution: TestExecution) -> None: | ||
""" | ||
Updates the configuration object with values from a test execution. | ||
Uses the first test execution as reference since configurations should be identical | ||
""" | ||
current_timestamp = self.config.opts("system", "time.start") | ||
self.config.add(config.Scope.applicationOverride, "builder", | ||
"provision_config_instance.names", test_execution.provision_config_instance) | ||
self.config.add(config.Scope.applicationOverride, "system", | ||
"env.name", test_execution.environment_name) | ||
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) | ||
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline) | ||
self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params) | ||
self.config.add(config.Scope.applicationOverride, "builder", | ||
"provision_config_instance.params", test_execution.provision_config_instance_params) | ||
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params) | ||
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles) | ||
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles) | ||
|
||
def build_aggregated_results(self) -> TestExecution: | ||
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0]) | ||
aggregated_results = self.build_aggregated_results_dict() | ||
|
||
if hasattr(self.args, 'results_file') and self.args.results_file != "": | ||
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd) | ||
|
@@ -165,19 +190,7 @@ def build_aggregated_results(self): | |
|
||
print("Aggregate test execution ID: ", test_execution_id) | ||
|
||
# add values to the configuration object | ||
self.config.add(config.Scope.applicationOverride, "builder", | ||
"provision_config_instance.names", test_exe.provision_config_instance) | ||
self.config.add(config.Scope.applicationOverride, "system", | ||
"env.name", test_exe.environment_name) | ||
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp) | ||
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline) | ||
self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params) | ||
self.config.add(config.Scope.applicationOverride, "builder", | ||
"provision_config_instance.params", test_exe.provision_config_instance_params) | ||
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params) | ||
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles) | ||
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles) | ||
self.update_config_object(test_exe) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is nice, makes |
||
|
||
loaded_workload = workload.load_workload(self.config) | ||
test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name) | ||
|
@@ -223,7 +236,7 @@ def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], task_na | |
|
||
return weighted_metrics | ||
|
||
def calculate_rsd(self, values: List[Union[int, float]], metric_name: str): | ||
def calculate_rsd(self, values: List[Union[int, float]], metric_name: str) -> Union[float, str]: | ||
if not values: | ||
raise ValueError(f"Cannot calculate RSD for metric '{metric_name}': empty list of values") | ||
if len(values) == 1: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much more clean!