Skip to content

Commit

Permalink
add automatic testing and aggregation to OSB (#655)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
  • Loading branch information
OVI3D0 authored Oct 4, 2024
1 parent a967bfd commit 83040a9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 45 deletions.
3 changes: 3 additions & 0 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ def build_aggregated_results(self):
test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure)

test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision)
test_execution.user_tags = {
"aggregation-of-runs": list(self.test_executions.keys())
}
test_execution.add_results(AggregatedResults(aggregated_results))
test_execution.distribution_version = test_exe.distribution_version
test_execution.revision = test_exe.revision
Expand Down
134 changes: 89 additions & 45 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,24 @@ def add_workload_source(subparser):
f"high values favor the most common queries. "
f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA)
test_execution_parser.add_argument(
"--test-iterations",
help="The number of times to run the workload (default: 1).",
default=1)
test_execution_parser.add_argument(
"--aggregate",
type=lambda x: (str(x).lower() in ['true', '1', 'yes', 'y']),
help="Aggregate the results of multiple test executions (default: true).",
default=True)
test_execution_parser.add_argument(
"--sleep-timer",
help="Sleep for the specified number of seconds before starting the next test execution (default: 5).",
default=5)
test_execution_parser.add_argument(
"--cancel-on-error",
action="store_true",
help="Stop executing tests if an error occurs in one of the test iterations (default: false).",
)

###############################################################################
#
Expand All @@ -634,8 +652,8 @@ def add_workload_source(subparser):
action="store_true",
default=False)

for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, download_parser, install_parser,
start_parser, stop_parser, info_parser, create_workload_parser]:
for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser,
download_parser, install_parser, start_parser, stop_parser, info_parser, create_workload_parser]:
# This option is needed to support a separate configuration for the integration tests on the same machine
p.add_argument(
"--configuration-name",
Expand Down Expand Up @@ -863,6 +881,49 @@ def prepare_test_executions_dict(args, cfg):
test_executions_dict[execution] = None
return test_executions_dict

def configure_test(arg_parser, args, cfg):
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only
# to run the actual benchmark (i.e. generating load).
print_test_execution_id(args)
if args.effective_start_date:
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date)
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id)
# use the test_execution id implicitly also as the install id.
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id)
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_builder_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.plugins", opts.csv_to_list(
args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_results_publishing_params(args, cfg)

def print_test_execution_id(args):
console.info(f"[Test Execution ID]: {args.test_execution_id}")

Expand Down Expand Up @@ -920,49 +981,32 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id)
builder.stop(cfg)
elif sub_command == "execute-test":
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only
# to run the actual benchmark (i.e. generating load).
print_test_execution_id(args)
if args.effective_start_date:
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date)
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id)
# use the test_execution id implicitly also as the install id.
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id)
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_builder_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.plugins", opts.csv_to_list(
args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_results_publishing_params(args, cfg)

execute_test(cfg, args.kill_running_processes)
iterations = int(args.test_iterations)
if iterations > 1:
test_exes = []
for _ in range(iterations):
try:
configure_test(arg_parser, args, cfg)
execute_test(cfg, args.kill_running_processes)
time.sleep(int(args.sleep_timer))
test_exes.append(args.test_execution_id)
args.test_execution_id = str(uuid.uuid4())
except Exception as e:
console.error(f"Error occurred during test execution {_+1}: {str(e)}")
if args.cancel_on_error:
console.info("Cancelling remaining test executions.")
break

if args.aggregate:
args.test_executions = test_exes
test_executions_dict = prepare_test_executions_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args)
aggregator_instance.aggregate()
elif args.test_iterations == 1:
configure_test(arg_parser, args, cfg)
execute_test(cfg, args.kill_running_processes)
else:
console.info("Please enter a valid number of test iterations")
elif sub_command == "create-workload":
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs)
Expand Down

0 comments on commit 83040a9

Please sign in to comment.