Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert publisher back to results_publisher #353

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ After the test execution, a summary report is written to the command line:

Creating Your Own Workloads
---------------------------
For more information on how users can create their own workloads, see [the Create Workload Guide](https://github.com/opensearch-project/opensearch-benchmark/blob/main/CREATE_WORKLOAD_GUIDE.md)
For more information on how users can create their own workloads, see [the Create Workload Guide](./CREATE_WORKLOAD_GUIDE.md)

Getting help
------------
Expand Down Expand Up @@ -142,4 +142,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
the License.
2 changes: 1 addition & 1 deletion it/resources/benchmark-in-memory-it.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ opensearch.src.subdir = opensearch
[benchmarks]
local.dataset.cache = ${CONFIG_DIR}/benchmarks/data

[reporting]
[results_publishing]
datastore.type = in-memory


Expand Down
2 changes: 1 addition & 1 deletion it/resources/benchmark-os-it.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ opensearch.src.subdir = opensearch
[benchmarks]
local.dataset.cache = ${CONFIG_DIR}/benchmarks/data

[reporting]
[results_publishing]
datastore.type = opensearch
datastore.host = localhost
datastore.port = 10200
Expand Down
18 changes: 9 additions & 9 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from osbenchmark import PROGRAM_NAME, BANNER, FORUM_LINK, SKULL, check_python_version, doc_link, telemetry
from osbenchmark import version, actor, config, paths, \
test_execution_orchestrator, publisher, \
test_execution_orchestrator, results_publisher, \
metrics, workload, chart_generator, exceptions, log
from osbenchmark.builder import provision_config, builder
from osbenchmark.workload_generator import workload_generator
Expand Down Expand Up @@ -815,11 +815,11 @@ def configure_connection_params(arg_parser, args, cfg):
arg_parser.error("--target-hosts and --client-options must define the same keys for multi cluster setups.")


def configure_reporting_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "reporting", "format", args.results_format)
cfg.add(config.Scope.applicationOverride, "reporting", "values", args.show_in_results)
cfg.add(config.Scope.applicationOverride, "reporting", "output.path", args.results_file)
cfg.add(config.Scope.applicationOverride, "reporting", "numbers.align", args.results_numbers_align)
def configure_results_publishing_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "results_publishing", "format", args.results_format)
cfg.add(config.Scope.applicationOverride, "results_publishing", "values", args.show_in_results)
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
cfg.add(config.Scope.applicationOverride, "results_publishing", "numbers.align", args.results_numbers_align)


def dispatch_sub_command(arg_parser, args, cfg):
Expand All @@ -830,8 +830,8 @@ def dispatch_sub_command(arg_parser, args, cfg):

try:
if sub_command == "compare":
configure_reporting_params(args, cfg)
publisher.compare(cfg, args.baseline, args.contender)
configure_results_publishing_params(args, cfg)
results_publisher.compare(cfg, args.baseline, args.contender)
elif sub_command == "list":
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit)
Expand Down Expand Up @@ -903,7 +903,7 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_reporting_params(args, cfg)
configure_results_publishing_params(args, cfg)

execute_test(cfg, args.kill_running_processes)
elif sub_command == "generate":
Expand Down
2 changes: 1 addition & 1 deletion osbenchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def auto_load_local_config(base_config, additional_sections=None, config_file_cl
cfg.load_config(auto_upgrade=True)
# we override our some configuration with the one from the coordinator because it may contain more entries and we should be
# consistent across all nodes here.
cfg.add_all(base_config, "reporting")
cfg.add_all(base_config, "results_publishing")
cfg.add_all(base_config, "workloads")
cfg.add_all(base_config, "provision_configs")
cfg.add_all(base_config, "distributions")
Expand Down
41 changes: 21 additions & 20 deletions osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,26 +179,26 @@ class OsClientFactory:

def __init__(self, cfg):
self._config = cfg
host = self._config.opts("reporting", "datastore.host")
port = self._config.opts("reporting", "datastore.port")
secure = convert.to_bool(self._config.opts("reporting", "datastore.secure"))
user = self._config.opts("reporting", "datastore.user")
host = self._config.opts("results_publishing", "datastore.host")
port = self._config.opts("results_publishing", "datastore.port")
secure = convert.to_bool(self._config.opts("results_publishing", "datastore.secure"))
user = self._config.opts("results_publishing", "datastore.user")

metrics_amazon_aws_log_in = self._config.opts("reporting", "datastore.amazon_aws_log_in",
metrics_amazon_aws_log_in = self._config.opts("results_publishing", "datastore.amazon_aws_log_in",
default_value=None, mandatory=False)
metrics_aws_access_key_id = None
metrics_aws_secret_access_key = None
metrics_aws_region = None
metrics_aws_service = None

if metrics_amazon_aws_log_in == 'config':
metrics_aws_access_key_id = self._config.opts("reporting", "datastore.aws_access_key_id",
metrics_aws_access_key_id = self._config.opts("results_publishing", "datastore.aws_access_key_id",
default_value=None, mandatory=False)
metrics_aws_secret_access_key = self._config.opts("reporting", "datastore.aws_secret_access_key",
metrics_aws_secret_access_key = self._config.opts("results_publishing", "datastore.aws_secret_access_key",
default_value=None, mandatory=False)
metrics_aws_region = self._config.opts("reporting", "datastore.region",
metrics_aws_region = self._config.opts("results_publishing", "datastore.region",
default_value=None, mandatory=False)
metrics_aws_service = self._config.opts("reporting", "datastore.service",
metrics_aws_service = self._config.opts("results_publishing", "datastore.service",
default_value=None, mandatory=False)
elif metrics_amazon_aws_log_in == 'environment':
metrics_aws_access_key_id = os.getenv("OSB_DATASTORE_AWS_ACCESS_KEY_ID", default=None)
Expand Down Expand Up @@ -233,14 +233,14 @@ def __init__(self, cfg):
password = os.environ["OSB_DATASTORE_PASSWORD"]
except KeyError:
try:
password = self._config.opts("reporting", "datastore.password")
password = self._config.opts("results_publishing", "datastore.password")
except exceptions.ConfigError:
raise exceptions.ConfigError(
"No password configured through [reporting] configuration or OSB_DATASTORE_PASSWORD environment variable."
"No password configured through [results_publishing] configuration or OSB_DATASTORE_PASSWORD environment variable."
) from None
verify = self._config.opts("reporting", "datastore.ssl.verification_mode", default_value="full", mandatory=False) != "none"
ca_path = self._config.opts("reporting", "datastore.ssl.certificate_authorities", default_value=None, mandatory=False)
self.probe_version = self._config.opts("reporting", "datastore.probe.cluster_version", default_value=True, mandatory=False)
verify = self._config.opts("results_publishing", "datastore.ssl.verification_mode", default_value="full", mandatory=False) != "none"
ca_path = self._config.opts("results_publishing", "datastore.ssl.certificate_authorities", default_value=None, mandatory=False)
self.probe_version = self._config.opts("results_publishing", "datastore.probe.cluster_version", default_value=True, mandatory=False)

# Instead of duplicating code, we're just adapting the metrics store specific properties to match the regular client options.
client_options = {
Expand Down Expand Up @@ -280,8 +280,9 @@ class IndexTemplateProvider:
def __init__(self, cfg):
self._config = cfg
self.script_dir = self._config.opts("node", "benchmark.root")
self._number_of_shards = self._config.opts("reporting", "datastore.number_of_shards", default_value=None, mandatory=False)
self._number_of_replicas = self._config.opts("reporting", "datastore.number_of_replicas", default_value=None, mandatory=False)
self._number_of_shards = self._config.opts("results_publishing", "datastore.number_of_shards", default_value=None, mandatory=False)
self._number_of_replicas = self._config.opts("results_publishing", "datastore.number_of_replicas",
default_value=None, mandatory=False)

def metrics_template(self):
return self._read("metrics-template")
Expand Down Expand Up @@ -357,7 +358,7 @@ def metrics_store(cfg, read_only=True, workload=None, test_procedure=None, provi


def metrics_store_class(cfg):
if cfg.opts("reporting", "datastore.type") == "opensearch":
if cfg.opts("results_publishing", "datastore.type") == "opensearch":
return OsMetricsStore
else:
return InMemoryMetricsStore
Expand Down Expand Up @@ -1207,7 +1208,7 @@ def test_execution_store(cfg):
:return: A test_execution store implementation.
"""
logger = logging.getLogger(__name__)
if cfg.opts("reporting", "datastore.type") == "opensearch":
if cfg.opts("results_publishing", "datastore.type") == "opensearch":
logger.info("Creating OS test execution store")
return CompositeTestExecutionStore(EsTestExecutionStore(cfg), FileTestExecutionStore(cfg))
else:
Expand All @@ -1222,7 +1223,7 @@ def results_store(cfg):
:return: A test_execution store implementation.
"""
logger = logging.getLogger(__name__)
if cfg.opts("reporting", "datastore.type") == "opensearch":
if cfg.opts("results_publishing", "datastore.type") == "opensearch":
logger.info("Creating OS results store")
return OsResultsStore(cfg)
else:
Expand Down Expand Up @@ -1690,7 +1691,7 @@ def __call__(self):
op_type = task.operation.type
error_rate = self.error_rate(t, op_type)
duration = self.duration(t)
if task.operation.include_in_reporting or error_rate > 0:
if task.operation.include_in_results_publishing or error_rate > 0:
self.logger.debug("Gathering request metrics for [%s].", t)
result.add_op_metrics(
t,
Expand Down
2 changes: 1 addition & 1 deletion osbenchmark/resources/benchmark.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ opensearch.src.subdir = opensearch
[benchmarks]
local.dataset.cache = ${CONFIG_DIR}/benchmarks/data

[reporting]
[results_publishing]
datastore.type = in-memory
datastore.host =
datastore.port =
Expand Down
22 changes: 11 additions & 11 deletions osbenchmark/publisher.py → osbenchmark/results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ def format_as_csv(headers, data):
class SummaryResultsPublisher:
def __init__(self, results, config):
self.results = results
self.results_file = config.opts("reporting", "output.path")
self.results_format = config.opts("reporting", "format")
self.numbers_align = config.opts("reporting", "numbers.align",
self.results_file = config.opts("results_publishing", "output.path")
self.results_format = config.opts("results_publishing", "format")
self.numbers_align = config.opts("results_publishing", "numbers.align",
mandatory=False, default_value="right")
reporting_values = config.opts("reporting", "values")
self.publish_all_values = reporting_values == "all"
self.publish_all_percentile_values = reporting_values == "all-percentiles"
self.show_processing_time = convert.to_bool(config.opts("reporting", "output.processingtime",
results_publishing_values = config.opts("results_publishing", "values")
self.publish_all_values = results_publishing_values == "all"
self.publish_all_percentile_values = results_publishing_values == "all-percentiles"
self.show_processing_time = convert.to_bool(config.opts("results_publishing", "output.processingtime",
mandatory=False, default_value=False))
self.cwd = config.opts("node", "benchmark.cwd")

Expand Down Expand Up @@ -317,12 +317,12 @@ def _line(self, k, task, v, unit, converter=lambda x: x, force=False):

class ComparisonResultsPublisher:
def __init__(self, config):
self.results_file = config.opts("reporting", "output.path")
self.results_format = config.opts("reporting", "format")
self.numbers_align = config.opts("reporting", "numbers.align",
self.results_file = config.opts("results_publishing", "output.path")
self.results_format = config.opts("results_publishing", "format")
self.numbers_align = config.opts("results_publishing", "numbers.align",
mandatory=False, default_value="right")
self.cwd = config.opts("node", "benchmark.cwd")
self.show_processing_time = convert.to_bool(config.opts("reporting", "output.processingtime",
self.show_processing_time = convert.to_bool(config.opts("results_publishing", "output.processingtime",
mandatory=False, default_value=False))
self.plain = False

Expand Down
4 changes: 2 additions & 2 deletions osbenchmark/test_execution_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from osbenchmark import actor, config, doc_link, \
worker_coordinator, exceptions, builder, metrics, \
publisher, workload, version, PROGRAM_NAME
results_publisher, workload, version, PROGRAM_NAME
from osbenchmark.utils import console, opts, versions


Expand Down Expand Up @@ -250,7 +250,7 @@ def on_benchmark_complete(self, new_metrics):
self.test_execution.add_results(final_results)
self.test_execution_store.store_test_execution(self.test_execution)
metrics.results_store(self.cfg).store_results(self.test_execution)
publisher.summarize(final_results, self.cfg)
results_publisher.summarize(final_results, self.cfg)
else:
self.logger.info("Suppressing output of summary results. Cancelled = [%r], Error = [%r].", self.cancelled, self.error)
self.metrics_store.close()
Expand Down
12 changes: 6 additions & 6 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor
# which client ids are assigned to which workers?
self.clients_per_worker = {}

self.progress_publisher = console.progress()
self.progress_results_publisher = console.progress()
self.progress_counter = 0
self.quiet = False
self.allocations = None
Expand Down Expand Up @@ -608,7 +608,7 @@ def prepare_benchmark(self, t):
self.test_procedure = select_test_procedure(self.config, self.workload)
self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
downsample_factor = int(self.config.opts(
"reporting", "metrics.request.downsample.factor",
"results_publishing", "metrics.request.downsample.factor",
mandatory=False, default_value=1))
self.metrics_store = metrics.metrics_store(cfg=self.config,
workload=self.workload.name,
Expand Down Expand Up @@ -792,7 +792,7 @@ def finished(self):
return self.current_step == self.number_of_steps

def close(self):
self.progress_publisher.finish()
self.progress_results_publisher.finish()
if self.metrics_store and self.metrics_store.opened:
self.metrics_store.close()

Expand All @@ -818,9 +818,9 @@ def update_progress_message(self, task_finished=False):

num_clients = max(len(progress_per_client), 1)
total_progress = sum(progress_per_client) / num_clients
self.progress_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
self.progress_results_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
if task_finished:
self.progress_publisher.finish()
self.progress_results_publisher.finish()

def post_process_samples(self):
# we do *not* do this here to avoid concurrent updates (actors are single-threaded) but rather to make it clear that we use
Expand Down Expand Up @@ -1032,7 +1032,7 @@ def receiveMsg_StartWorker(self, msg, sender):
self.worker_id = msg.worker_id
self.config = load_local_config(msg.config)
self.on_error = self.config.opts("worker_coordinator", "on.error")
self.sample_queue_size = int(self.config.opts("reporting", "sample.queue.size", mandatory=False, default_value=1 << 20))
self.sample_queue_size = int(self.config.opts("results_publishing", "sample.queue.size", mandatory=False, default_value=1 << 20))
self.workload = msg.workload
workload.set_absolute_data_path(self.config, self.workload)
self.client_allocations = msg.client_allocations
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ def _create_test_procedures(self, workload_spec):
task = self.parse_task(op, ops, name)
schedule.append(task)

# verify we don't have any duplicate task names (which can be confusing / misleading in reporting).
# verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing).
known_task_names = set()
for task in schedule:
for sub_task in task:
Expand Down Expand Up @@ -1523,8 +1523,8 @@ def parse_operation(self, op_spec, error_ctx="operations"):

try:
op = workload.OperationType.from_hyphenated_string(op_type_name)
if "include-in-reporting" not in params:
params["include-in-reporting"] = not op.admin_op
if "include-in-results_publishing" not in params:
params["include-in-results_publishing"] = not op.admin_op
self.logger.debug("Using built-in operation type [%s] for operation [%s].", op_type_name, op_name)
except KeyError:
self.logger.info("Using user-provided operation type [%s] for operation [%s].", op_type_name, op_name)
Expand Down
Loading