Skip to content

Commit

Permalink
Update results_publisher to publisher (opensearch-project#316)
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Hoang <hoangia@amazon.com>
  • Loading branch information
IanHoang authored Jun 12, 2023
1 parent 93d8a8a commit 40ed461
Show file tree
Hide file tree
Showing 19 changed files with 99 additions and 99 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ After the test execution, a summary report is written to the command line:

Creating Your Own Workloads
---------------------------
For more information on how users can create their own workloads, see [the Create Workload Guide](./CREATE_WORKLOAD_GUIDE.md)
For more information on how users can create their own workloads, see [the Create Workload Guide](https://github.com/opensearch-project/opensearch-benchmark/blob/main/CREATE_WORKLOAD_GUIDE.md)

Getting help
------------
Expand Down Expand Up @@ -142,4 +142,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
the License.
2 changes: 1 addition & 1 deletion it/resources/benchmark-in-memory-it.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ opensearch.src.subdir = opensearch
[benchmarks]
local.dataset.cache = ${CONFIG_DIR}/benchmarks/data

[results_publishing]
[reporting]
datastore.type = in-memory


Expand Down
2 changes: 1 addition & 1 deletion it/resources/benchmark-os-it.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ opensearch.src.subdir = opensearch
[benchmarks]
local.dataset.cache = ${CONFIG_DIR}/benchmarks/data

[results_publishing]
[reporting]
datastore.type = opensearch
datastore.host = localhost
datastore.port = 10200
Expand Down
18 changes: 9 additions & 9 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from osbenchmark import PROGRAM_NAME, BANNER, FORUM_LINK, SKULL, check_python_version, doc_link, telemetry
from osbenchmark import version, actor, config, paths, \
test_execution_orchestrator, results_publisher, \
test_execution_orchestrator, publisher, \
metrics, workload, chart_generator, exceptions, log
from osbenchmark.builder import provision_config, builder
from osbenchmark.workload_generator import workload_generator
Expand Down Expand Up @@ -815,11 +815,11 @@ def configure_connection_params(arg_parser, args, cfg):
arg_parser.error("--target-hosts and --client-options must define the same keys for multi cluster setups.")


def configure_results_publishing_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "results_publishing", "format", args.results_format)
cfg.add(config.Scope.applicationOverride, "results_publishing", "values", args.show_in_results)
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
cfg.add(config.Scope.applicationOverride, "results_publishing", "numbers.align", args.results_numbers_align)
def configure_reporting_params(args, cfg):
cfg.add(config.Scope.applicationOverride, "reporting", "format", args.results_format)
cfg.add(config.Scope.applicationOverride, "reporting", "values", args.show_in_results)
cfg.add(config.Scope.applicationOverride, "reporting", "output.path", args.results_file)
cfg.add(config.Scope.applicationOverride, "reporting", "numbers.align", args.results_numbers_align)


def dispatch_sub_command(arg_parser, args, cfg):
Expand All @@ -830,8 +830,8 @@ def dispatch_sub_command(arg_parser, args, cfg):

try:
if sub_command == "compare":
configure_results_publishing_params(args, cfg)
results_publisher.compare(cfg, args.baseline, args.contender)
configure_reporting_params(args, cfg)
publisher.compare(cfg, args.baseline, args.contender)
elif sub_command == "list":
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit)
Expand Down Expand Up @@ -903,7 +903,7 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_results_publishing_params(args, cfg)
configure_reporting_params(args, cfg)

execute_test(cfg, args.kill_running_processes)
elif sub_command == "generate":
Expand Down
2 changes: 1 addition & 1 deletion osbenchmark/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def auto_load_local_config(base_config, additional_sections=None, config_file_cl
cfg.load_config(auto_upgrade=True)
# we override our some configuration with the one from the coordinator because it may contain more entries and we should be
# consistent across all nodes here.
cfg.add_all(base_config, "results_publishing")
cfg.add_all(base_config, "reporting")
cfg.add_all(base_config, "workloads")
cfg.add_all(base_config, "provision_configs")
cfg.add_all(base_config, "distributions")
Expand Down
26 changes: 13 additions & 13 deletions osbenchmark/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,22 @@ class OsClientFactory:

def __init__(self, cfg):
self._config = cfg
host = self._config.opts("results_publishing", "datastore.host")
port = self._config.opts("results_publishing", "datastore.port")
secure = convert.to_bool(self._config.opts("results_publishing", "datastore.secure"))
user = self._config.opts("results_publishing", "datastore.user")
host = self._config.opts("reporting", "datastore.host")
port = self._config.opts("reporting", "datastore.port")
secure = convert.to_bool(self._config.opts("reporting", "datastore.secure"))
user = self._config.opts("reporting", "datastore.user")
try:
password = os.environ["OSB_DATASTORE_PASSWORD"]
except KeyError:
try:
password = self._config.opts("results_publishing", "datastore.password")
password = self._config.opts("reporting", "datastore.password")
except exceptions.ConfigError:
raise exceptions.ConfigError(
"No password configured through [results_publishing] configuration or OSB_DATASTORE_PASSWORD environment variable."
"No password configured through [reporting] configuration or OSB_DATASTORE_PASSWORD environment variable."
) from None
verify = self._config.opts("results_publishing", "datastore.ssl.verification_mode", default_value="full", mandatory=False) != "none"
ca_path = self._config.opts("results_publishing", "datastore.ssl.certificate_authorities", default_value=None, mandatory=False)
self.probe_version = self._config.opts("results_publishing", "datastore.probe.cluster_version", default_value=True, mandatory=False)
verify = self._config.opts("reporting", "datastore.ssl.verification_mode", default_value="full", mandatory=False) != "none"
ca_path = self._config.opts("reporting", "datastore.ssl.certificate_authorities", default_value=None, mandatory=False)
self.probe_version = self._config.opts("reporting", "datastore.probe.cluster_version", default_value=True, mandatory=False)

# Instead of duplicating code, we're just adapting the metrics store specific properties to match the regular client options.
client_options = {
Expand Down Expand Up @@ -290,7 +290,7 @@ def metrics_store(cfg, read_only=True, workload=None, test_procedure=None, provi


def metrics_store_class(cfg):
if cfg.opts("results_publishing", "datastore.type") == "opensearch":
if cfg.opts("reporting", "datastore.type") == "opensearch":
return OsMetricsStore
else:
return InMemoryMetricsStore
Expand Down Expand Up @@ -1140,7 +1140,7 @@ def test_execution_store(cfg):
:return: A test_execution store implementation.
"""
logger = logging.getLogger(__name__)
if cfg.opts("results_publishing", "datastore.type") == "opensearch":
if cfg.opts("reporting", "datastore.type") == "opensearch":
logger.info("Creating OS test execution store")
return CompositeTestExecutionStore(EsTestExecutionStore(cfg), FileTestExecutionStore(cfg))
else:
Expand All @@ -1155,7 +1155,7 @@ def results_store(cfg):
:return: A test_execution store implementation.
"""
logger = logging.getLogger(__name__)
if cfg.opts("results_publishing", "datastore.type") == "opensearch":
if cfg.opts("reporting", "datastore.type") == "opensearch":
logger.info("Creating OS results store")
return OsResultsStore(cfg)
else:
Expand Down Expand Up @@ -1623,7 +1623,7 @@ def __call__(self):
op_type = task.operation.type
error_rate = self.error_rate(t, op_type)
duration = self.duration(t)
if task.operation.include_in_results_publishing or error_rate > 0:
if task.operation.include_in_reporting or error_rate > 0:
self.logger.debug("Gathering request metrics for [%s].", t)
result.add_op_metrics(
t,
Expand Down
22 changes: 11 additions & 11 deletions osbenchmark/results_publisher.py → osbenchmark/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,14 @@ def format_as_csv(headers, data):
class SummaryResultsPublisher:
def __init__(self, results, config):
self.results = results
self.results_file = config.opts("results_publishing", "output.path")
self.results_format = config.opts("results_publishing", "format")
self.numbers_align = config.opts("results_publishing", "numbers.align",
self.results_file = config.opts("reporting", "output.path")
self.results_format = config.opts("reporting", "format")
self.numbers_align = config.opts("reporting", "numbers.align",
mandatory=False, default_value="right")
results_publishing_values = config.opts("results_publishing", "values")
self.publish_all_values = results_publishing_values == "all"
self.publish_all_percentile_values = results_publishing_values == "all-percentiles"
self.show_processing_time = convert.to_bool(config.opts("results_publishing", "output.processingtime",
reporting_values = config.opts("reporting", "values")
self.publish_all_values = reporting_values == "all"
self.publish_all_percentile_values = reporting_values == "all-percentiles"
self.show_processing_time = convert.to_bool(config.opts("reporting", "output.processingtime",
mandatory=False, default_value=False))
self.cwd = config.opts("node", "benchmark.cwd")

Expand Down Expand Up @@ -317,12 +317,12 @@ def _line(self, k, task, v, unit, converter=lambda x: x, force=False):

class ComparisonResultsPublisher:
def __init__(self, config):
self.results_file = config.opts("results_publishing", "output.path")
self.results_format = config.opts("results_publishing", "format")
self.numbers_align = config.opts("results_publishing", "numbers.align",
self.results_file = config.opts("reporting", "output.path")
self.results_format = config.opts("reporting", "format")
self.numbers_align = config.opts("reporting", "numbers.align",
mandatory=False, default_value="right")
self.cwd = config.opts("node", "benchmark.cwd")
self.show_processing_time = convert.to_bool(config.opts("results_publishing", "output.processingtime",
self.show_processing_time = convert.to_bool(config.opts("reporting", "output.processingtime",
mandatory=False, default_value=False))
self.plain = False

Expand Down
2 changes: 1 addition & 1 deletion osbenchmark/resources/benchmark.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ opensearch.src.subdir = opensearch
[benchmarks]
local.dataset.cache = ${CONFIG_DIR}/benchmarks/data

[results_publishing]
[reporting]
datastore.type = in-memory
datastore.host =
datastore.port =
Expand Down
4 changes: 2 additions & 2 deletions osbenchmark/test_execution_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from osbenchmark import actor, config, doc_link, \
worker_coordinator, exceptions, builder, metrics, \
results_publisher, workload, version, PROGRAM_NAME
publisher, workload, version, PROGRAM_NAME
from osbenchmark.utils import console, opts, versions


Expand Down Expand Up @@ -250,7 +250,7 @@ def on_benchmark_complete(self, new_metrics):
self.test_execution.add_results(final_results)
self.test_execution_store.store_test_execution(self.test_execution)
metrics.results_store(self.cfg).store_results(self.test_execution)
results_publisher.summarize(final_results, self.cfg)
publisher.summarize(final_results, self.cfg)
else:
self.logger.info("Suppressing output of summary results. Cancelled = [%r], Error = [%r].", self.cancelled, self.error)
self.metrics_store.close()
Expand Down
12 changes: 6 additions & 6 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor
# which client ids are assigned to which workers?
self.clients_per_worker = {}

self.progress_results_publisher = console.progress()
self.progress_publisher = console.progress()
self.progress_counter = 0
self.quiet = False
self.allocations = None
Expand Down Expand Up @@ -607,7 +607,7 @@ def prepare_benchmark(self, t):
self.test_procedure = select_test_procedure(self.config, self.workload)
self.quiet = self.config.opts("system", "quiet.mode", mandatory=False, default_value=False)
downsample_factor = int(self.config.opts(
"results_publishing", "metrics.request.downsample.factor",
"reporting", "metrics.request.downsample.factor",
mandatory=False, default_value=1))
self.metrics_store = metrics.metrics_store(cfg=self.config,
workload=self.workload.name,
Expand Down Expand Up @@ -791,7 +791,7 @@ def finished(self):
return self.current_step == self.number_of_steps

def close(self):
self.progress_results_publisher.finish()
self.progress_publisher.finish()
if self.metrics_store and self.metrics_store.opened:
self.metrics_store.close()

Expand All @@ -817,9 +817,9 @@ def update_progress_message(self, task_finished=False):

num_clients = max(len(progress_per_client), 1)
total_progress = sum(progress_per_client) / num_clients
self.progress_results_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
self.progress_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
if task_finished:
self.progress_results_publisher.finish()
self.progress_publisher.finish()

def post_process_samples(self):
# we do *not* do this here to avoid concurrent updates (actors are single-threaded) but rather to make it clear that we use
Expand Down Expand Up @@ -1031,7 +1031,7 @@ def receiveMsg_StartWorker(self, msg, sender):
self.worker_id = msg.worker_id
self.config = load_local_config(msg.config)
self.on_error = self.config.opts("worker_coordinator", "on.error")
self.sample_queue_size = int(self.config.opts("results_publishing", "sample.queue.size", mandatory=False, default_value=1 << 20))
self.sample_queue_size = int(self.config.opts("reporting", "sample.queue.size", mandatory=False, default_value=1 << 20))
self.workload = msg.workload
workload.set_absolute_data_path(self.config, self.workload)
self.client_allocations = msg.client_allocations
Expand Down
6 changes: 3 additions & 3 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ def _create_test_procedures(self, workload_spec):
task = self.parse_task(op, ops, name)
schedule.append(task)

# verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing).
# verify we don't have any duplicate task names (which can be confusing / misleading in reporting).
known_task_names = set()
for task in schedule:
for sub_task in task:
Expand Down Expand Up @@ -1523,8 +1523,8 @@ def parse_operation(self, op_spec, error_ctx="operations"):

try:
op = workload.OperationType.from_hyphenated_string(op_type_name)
if "include-in-results_publishing" not in params:
params["include-in-results_publishing"] = not op.admin_op
if "include-in-reporting" not in params:
params["include-in-reporting"] = not op.admin_op
self.logger.debug("Using built-in operation type [%s] for operation [%s].", op_type_name, op_name)
except KeyError:
self.logger.info("Using user-provided operation type [%s] for operation [%s].", op_type_name, op_name)
Expand Down
8 changes: 4 additions & 4 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas
:param includes_action_and_meta_data: True, if the source file already includes the action and meta-data line. False, if it only
contains documents.
:param number_of_documents: The number of documents
in the benchmark document. Needed for proper progress results_publishing. Only needed if
in the benchmark document. Needed for proper progress reporting. Only needed if
a document_archive is given.
:param compressed_size_in_bytes: The compressed size in bytes of
the benchmark document. Needed for verification of the download and
user results_publishing. Only useful if a document_archive is given (optional but recommended to be set).
user reporting. Only useful if a document_archive is given (optional but recommended to be set).
:param uncompressed_size_in_bytes: The size in bytes of the benchmark document after decompressing it.
Only useful if a document_archive is given (optional but recommended to be set).
:param target_index: The index to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data`` is ``False``.
Expand Down Expand Up @@ -976,8 +976,8 @@ def __init__(self, name, operation_type, meta_data=None, params=None, param_sour
self.param_source = param_source

@property
def include_in_results_publishing(self):
return self.params.get("include-in-results_publishing", True)
def include_in_reporting(self):
return self.params.get("include-in-reporting", True)

def __hash__(self):
return hash(self.name)
Expand Down
2 changes: 1 addition & 1 deletion samples/ccr/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ opensearch.src.subdir = opensearch
[benchmarks]
local.dataset.cache = ${HOME}/.benchmark/benchmarks/data
[results_publishing]
[reporting]
datastore.type = opensearch
datastore.host = 127.0.0.1
datastore.port = 9209
Expand Down
Loading

0 comments on commit 40ed461

Please sign in to comment.