From 029c4edfe06b123670b9e0343d03500698498af8 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 13 Mar 2023 15:19:18 +0000 Subject: [PATCH 1/6] WIP on multi-node DB setup --- pyproject.toml | 2 +- redisbench_admin/run/cluster.py | 97 +++++++++++++++-------- redisbench_admin/run_async/terraform.py | 4 +- redisbench_admin/run_remote/remote_db.py | 36 +++++---- redisbench_admin/run_remote/standalone.py | 52 ++++++------ redisbench_admin/utils/remote.py | 10 ++- tests/test_remote.py | 75 +++++++++++++++++- 7 files changed, 198 insertions(+), 78 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 319278d..eb7d557 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "redisbench-admin" -version = "0.10.4" +version = "0.10.5" description = "Redis benchmark run helper. A wrapper around Redis and Redis Modules benchmark tools ( ftsb_redisearch, memtier_benchmark, redis-benchmark, aibench, etc... )." authors = ["filipecosta90 ","Redis Performance Group "] readme = "README.md" diff --git a/redisbench_admin/run/cluster.py b/redisbench_admin/run/cluster.py index 270cd1a..5e0eda7 100644 --- a/redisbench_admin/run/cluster.py +++ b/redisbench_admin/run/cluster.py @@ -101,8 +101,8 @@ def generate_startup_nodes_array(redis_conns): # noinspection PyBroadException def spin_up_redis_cluster_remote_redis( - server_public_ip, - server_private_ip, + server_public_ips, + server_private_ips, username, private_key, remote_module_files, @@ -116,42 +116,75 @@ def spin_up_redis_cluster_remote_redis( redis_7=True, ): logging.info("Generating the remote redis-server command arguments") + redis_process_commands = [] logfiles = [] logname_prefix = logname[: len(logname) - 4] + "-" - for master_shard_id in range(1, shard_count + 1): - shard_port = master_shard_id + start_port - 1 - - command, logfile = generate_cluster_redis_server_args( - "redis-server", - dbdir_folder, - remote_module_files, - server_private_ip, - shard_port, - redis_configuration_parameters, - "yes", - modules_configuration_parameters_map, - logname_prefix, - "yes", - redis_7, - ) - logging.error( - "Remote primary shard {} command: {}".format( - master_shard_id, " ".join(command) + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count) + shard_start = 1 + for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): + server_private_ip = server_private_ips[node_n] + server_public_ip = server_public_ips[node_n] + for master_shard_id in range( + shard_start, shard_start + primaries_this_node + 1 + ): + shard_port = master_shard_id + start_port - 1 + + command, logfile = generate_cluster_redis_server_args( + "redis-server", + dbdir_folder, + remote_module_files, + server_private_ip, + shard_port, + redis_configuration_parameters, + "yes", + modules_configuration_parameters_map, + logname_prefix, + "yes", + redis_7, ) - ) - logfiles.append(logfile) - redis_process_commands.append(" ".join(command)) - res = execute_remote_commands( - server_public_ip, username, private_key, redis_process_commands, ssh_port - ) - for pos, res_pos in enumerate(res): - [recv_exit_status, stdout, stderr] = res_pos - if recv_exit_status != 0: logging.error( - "Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format( - pos, recv_exit_status, stdout, stderr + "Remote primary shard {} command: {}".format( + master_shard_id, " ".join(command) ) ) + logfiles.append(logfile) + redis_process_commands.append(" ".join(command)) + res = execute_remote_commands( + server_public_ip, username, private_key, redis_process_commands, ssh_port + ) + for pos, res_pos in enumerate(res): + [recv_exit_status, stdout, stderr] = res_pos + if recv_exit_status != 0: + logging.error( + "Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format( + pos, recv_exit_status, stdout, stderr + ) + ) + shard_start = shard_start + primaries_this_node return logfiles + + +def split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count): + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + if type(server_private_ips) is str: + server_private_ips = [server_private_ips] + db_node_count = len(server_public_ips) + primaries_per_db_node = db_node_count // shard_count + remainder_first_node = db_node_count % shard_count + first_node_primaries = primaries_per_db_node + remainder_first_node + logging.info("DB node {} will have {} primaries".format(1, first_node_primaries)) + primaries_per_node = [first_node_primaries] + for node_n, node_id in enumerate(range(2, db_node_count + 1), start=2): + logging.info("Setting") + logging.info( + "DB node {} will have {} primaries".format(node_n, primaries_per_db_node) + ) + primaries_per_node.append(primaries_per_db_node) + return primaries_per_node, server_private_ips, server_public_ips diff --git a/redisbench_admin/run_async/terraform.py b/redisbench_admin/run_async/terraform.py index d7c9d06..3a9cbbb 100644 --- a/redisbench_admin/run_async/terraform.py +++ b/redisbench_admin/run_async/terraform.py @@ -172,6 +172,7 @@ def setup_remote_environment( ) _, _, _ = tf.refresh() tf_output = tf.output() + logging.error("TF OUTPUT setup_remote_environment: {}".format(tf_output)) server_private_ip = tf_output_or_none(tf_output, "runner_private_ip") server_public_ip = tf_output_or_none(tf_output, "runner_public_ip") if server_private_ip is not None or server_public_ip is not None: @@ -269,6 +270,7 @@ def terraform_spin_or_reuse_env( else: logging.info("Reusing remote setup {}".format(remote_id)) tf = remote_envs[remote_id] + tf_output = tf.output() ( tf_return_code, username, @@ -277,7 +279,7 @@ def terraform_spin_or_reuse_env( server_plaintext_port, client_private_ip, client_public_ip, - ) = retrieve_tf_connection_vars(None, tf) + ) = retrieve_tf_connection_vars(None, tf_output) return ( client_public_ip, deployment_type, diff --git a/redisbench_admin/run_remote/remote_db.py b/redisbench_admin/run_remote/remote_db.py index 4c1ef8d..2ee2494 100644 --- a/redisbench_admin/run_remote/remote_db.py +++ b/redisbench_admin/run_remote/remote_db.py @@ -40,23 +40,27 @@ def remote_tmpdir_prune( - server_public_ip, ssh_port, temporary_dir, username, private_key + server_public_ips, ssh_port, temporary_dir, username, private_key ): - execute_remote_commands( - server_public_ip, - username, - private_key, - [ - "mkdir -p {}".format(temporary_dir), - "rm -rf {}/*.log".format(temporary_dir), - "rm -rf {}/*.config".format(temporary_dir), - "rm -rf {}/*.rdb".format(temporary_dir), - "rm -rf {}/*.out".format(temporary_dir), - "rm -rf {}/*.data".format(temporary_dir), - "pkill -9 redis-server", - ], - ssh_port, - ) + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + + for server_public_ip in server_public_ips: + execute_remote_commands( + server_public_ip, + username, + private_key, + [ + "mkdir -p {}".format(temporary_dir), + "rm -rf {}/*.log".format(temporary_dir), + "rm -rf {}/*.config".format(temporary_dir), + "rm -rf {}/*.rdb".format(temporary_dir), + "rm -rf {}/*.out".format(temporary_dir), + "rm -rf {}/*.data".format(temporary_dir), + "pkill -9 redis-server", + ], + ssh_port, + ) def is_single_endpoint(setup_type): diff --git a/redisbench_admin/run_remote/standalone.py b/redisbench_admin/run_remote/standalone.py index 9b24e49..58cf577 100644 --- a/redisbench_admin/run_remote/standalone.py +++ b/redisbench_admin/run_remote/standalone.py @@ -71,33 +71,37 @@ def remote_module_files_cp( port, private_key, remote_module_file_dir, - server_public_ip, + server_public_ips, username, ): remote_module_files = [] - if local_module_files is not None: - for local_module_file in local_module_files: - remote_module_file = "{}/{}".format( - remote_module_file_dir, os.path.basename(local_module_file) - ) - # copy the module to the DB machine - copy_file_to_remote_setup( - server_public_ip, - username, - private_key, - local_module_file, - remote_module_file, - None, - port, - ) - execute_remote_commands( - server_public_ip, - username, - private_key, - ["chmod 755 {}".format(remote_module_file)], - port, - ) - remote_module_files.append(remote_module_file) + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + + for server_public_ip in server_public_ips: + if local_module_files is not None: + for local_module_file in local_module_files: + remote_module_file = "{}/{}".format( + remote_module_file_dir, os.path.basename(local_module_file) + ) + # copy the module to the DB machine + copy_file_to_remote_setup( + server_public_ip, + username, + private_key, + local_module_file, + remote_module_file, + None, + port, + ) + execute_remote_commands( + server_public_ip, + username, + private_key, + ["chmod 755 {}".format(remote_module_file)], + port, + ) + remote_module_files.append(remote_module_file) return remote_module_files diff --git a/redisbench_admin/utils/remote.py b/redisbench_admin/utils/remote.py index 431ede1..4b28e86 100644 --- a/redisbench_admin/utils/remote.py +++ b/redisbench_admin/utils/remote.py @@ -265,6 +265,7 @@ def setup_remote_environment( ) _, _, _ = tf.refresh() tf_output = tf.output() + logging.error("TF OUTPUT: {}".format(tf_output)) server_private_ip = tf_output_or_none(tf_output, "server_private_ip") server_public_ip = tf_output_or_none(tf_output, "server_public_ip") client_private_ip = tf_output_or_none(tf_output, "client_private_ip") @@ -291,17 +292,20 @@ def setup_remote_environment( "timeout_secs": tf_timeout_secs, }, ) - return retrieve_tf_connection_vars(return_code, tf) + tf_output = tf.output() + return retrieve_tf_connection_vars(return_code, tf_output) -def retrieve_tf_connection_vars(return_code, tf): - tf_output = tf.output() +def retrieve_tf_connection_vars(return_code, tf_output): + logging.error("TF OUTPUT setup_remote_environment: {}".format(tf_output)) server_private_ip = tf_output["server_private_ip"]["value"][0] server_public_ip = tf_output["server_public_ip"]["value"][0] server_plaintext_port = 6379 client_private_ip = tf_output["client_private_ip"]["value"][0] client_public_ip = tf_output["client_public_ip"]["value"][0] username = "ubuntu" + if "ssh_user" in tf_output: + username = tf_output["ssh_user"]["value"] return ( return_code, username, diff --git a/tests/test_remote.py b/tests/test_remote.py index bcac11d..9db75b0 100644 --- a/tests/test_remote.py +++ b/tests/test_remote.py @@ -3,7 +3,6 @@ import redis import yaml - from redisbench_admin.run.redistimeseries import ( prepare_timeseries_dict, timeseries_test_sucess_flow, @@ -22,6 +21,7 @@ exporter_create_ts, get_overall_dashboard_keynames, common_timeseries_extraction, + retrieve_tf_connection_vars, ) @@ -474,3 +474,76 @@ def test_exporter_create_ts(): except redis.exceptions.ConnectionError: pass + + +def test_retrieve_tf_connection_vars(): + tf_output = { + "client_private_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["10.3.0.235"], + }, + "client_public_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["3.135.206.198"], + }, + "server_private_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["10.3.0.53"], + }, + "server_public_ip": { + "sensitive": False, + "type": ["tuple", ["string"]], + "value": ["18.219.10.142"], + }, + } + ( + tf_return_code, + username, + server_private_ip, + server_public_ip, + server_plaintext_port, + client_private_ip, + client_public_ip, + ) = retrieve_tf_connection_vars(None, tf_output) + assert server_private_ip == "10.3.0.53" + assert server_public_ip == "18.219.10.142" + assert username == "ubuntu" + + tf_output_new = { + "client_private_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string"]]]], + "value": [["10.3.0.175"]], + }, + "client_public_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string"]]]], + "value": [["3.136.234.93"]], + }, + "server_private_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string", "string", "string"]]]], + "value": [["10.3.0.236", "10.3.0.9", "10.3.0.211"]], + }, + "server_public_ip": { + "sensitive": False, + "type": ["tuple", [["tuple", ["string", "string", "string"]]]], + "value": [["3.143.24.7", "13.58.158.80", "3.139.82.224"]], + }, + "ssh_user": {"sensitive": False, "type": "string", "value": "ec2"}, + } + ( + tf_return_code, + username, + server_private_ip, + server_public_ip, + server_plaintext_port, + client_private_ip, + client_public_ip, + ) = retrieve_tf_connection_vars(None, tf_output_new) + assert server_private_ip == ["10.3.0.236", "10.3.0.9", "10.3.0.211"] + assert server_public_ip == ["3.143.24.7", "13.58.158.80", "3.139.82.224"] + assert username == "ec2" From d27d0a635afceca7156a246be60e446c77b67de7 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 15 Mar 2023 13:36:53 +0000 Subject: [PATCH 2/6] stash wip --- redisbench_admin/environments/oss_cluster.py | 33 +++++- redisbench_admin/run/cluster.py | 2 +- redisbench_admin/run_remote/remote_db.py | 110 ++++++++++++------- redisbench_admin/run_remote/standalone.py | 20 ++-- 4 files changed, 111 insertions(+), 54 deletions(-) diff --git a/redisbench_admin/environments/oss_cluster.py b/redisbench_admin/environments/oss_cluster.py index 526b6c3..0bdbf1d 100644 --- a/redisbench_admin/environments/oss_cluster.py +++ b/redisbench_admin/environments/oss_cluster.py @@ -9,6 +9,7 @@ import redis +from redisbench_admin.run.cluster import split_primaries_per_db_nodes from redisbench_admin.utils.utils import ( wait_for_conn, redis_server_config_module_part, @@ -68,8 +69,9 @@ def spin_up_local_redis_cluster( def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_port): + meet_cmds = [] logging.info("Setting up cluster. Total {} primaries.".format(len(redis_conns))) - meet_cmds = generate_meet_cmds(shard_count, shard_host, start_port) + meet_cmds = generate_meet_cmds(shard_count, shard_host, start_port, meet_cmds) status = setup_oss_cluster_from_conns(meet_cmds, redis_conns, shard_count) if status is True: for conn in redis_conns: @@ -77,12 +79,31 @@ def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_p return status -def generate_meet_cmds(shard_count, shard_host, start_port): - meet_cmds = [] +def generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port): + host_port_pairs = [] + ( + primaries_per_node, + db_private_ips, + _, + ) = split_primaries_per_db_nodes(server_private_ips, None, shard_count) + shard_start = 1 + for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): + server_private_ip = db_private_ips[node_n] + for master_shard_id in range( + shard_start, shard_start + primaries_this_node + 1 + ): + shard_port = master_shard_id + cluster_start_port - 1 + host_port_pairs.append([server_private_ip, shard_port]) - for master_shard_id in range(1, shard_count + 1): - shard_port = master_shard_id + start_port - 1 - meet_cmds.append("CLUSTER MEET {} {}".format(shard_host, shard_port)) + return host_port_pairs + + +def generate_meet_cmds( + shard_count, server_private_ips, cluster_start_port, meet_cmds, shard_start=1 +): + generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port) + for pair in generate_host_port_pairs: + meet_cmds.append("CLUSTER MEET {} {}".format(pair[0], pair[1])) return meet_cmds diff --git a/redisbench_admin/run/cluster.py b/redisbench_admin/run/cluster.py index 5e0eda7..ac24772 100644 --- a/redisbench_admin/run/cluster.py +++ b/redisbench_admin/run/cluster.py @@ -175,7 +175,7 @@ def split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_co server_public_ips = [server_public_ips] if type(server_private_ips) is str: server_private_ips = [server_private_ips] - db_node_count = len(server_public_ips) + db_node_count = len(server_private_ips) primaries_per_db_node = db_node_count // shard_count remainder_first_node = db_node_count % shard_count first_node_primaries = primaries_per_db_node + remainder_first_node diff --git a/redisbench_admin/run_remote/remote_db.py b/redisbench_admin/run_remote/remote_db.py index 2ee2494..baed2da 100644 --- a/redisbench_admin/run_remote/remote_db.py +++ b/redisbench_admin/run_remote/remote_db.py @@ -13,6 +13,7 @@ spin_up_redis_cluster_remote_redis, debug_reload_rdb, cluster_init_steps, + split_primaries_per_db_nodes, ) from redisbench_admin.run.common import ( check_dbconfig_tool_requirement, @@ -82,8 +83,8 @@ def remote_db_spin( required_modules, return_code, server_plaintext_port, - server_private_ip, - server_public_ip, + server_private_ips, + server_public_ips, setup_name, setup_type, shard_count, @@ -123,7 +124,7 @@ def remote_db_spin( cp_local_dbdir_to_remote( dbdir_folder, private_key, - server_public_ip, + server_public_ips, temporary_dir, username, ) @@ -133,7 +134,7 @@ def remote_db_spin( db_ssh_port, private_key, remote_module_file_dir, - server_public_ip, + server_public_ips, username, ) # setup Redis @@ -143,8 +144,8 @@ def remote_db_spin( if setup_type == "oss-cluster": if skip_redis_setup is False: logfiles = spin_up_redis_cluster_remote_redis( - server_public_ip, - server_private_ip, + server_public_ips, + server_private_ips, username, private_key, remote_module_files, @@ -157,40 +158,69 @@ def remote_db_spin( logname, redis_7, ) - try: - for p in range(cluster_start_port, cluster_start_port + shard_count): - local_redis_conn, ssh_tunnel = ssh_tunnel_redisconn( - p, - server_private_ip, - server_public_ip, - username, - db_ssh_port, - private_key, - redis_password, - ) - local_redis_conn.ping() - redis_conns.append(local_redis_conn) - except redis.exceptions.ConnectionError as e: - logging.error("A error occurred while spinning DB: {}".format(e.__str__())) - logfile = logfiles[0] - - remote_file = "{}/{}".format(temporary_dir, logfile) - logging.error( - "Trying to fetch DB remote log {} into {}".format(remote_file, logfile) - ) - db_error_artifacts( - db_ssh_port, - dirname, - full_logfiles, - logname, - private_key, - s3_bucket_name, - s3_bucket_path, - server_public_ip, - temporary_dir, - True, - username, + ( + primaries_per_node, + db_private_ips, + db_public_ips, + ) = split_primaries_per_db_nodes( + server_private_ips, server_public_ips, shard_count ) + shard_start = 1 + for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): + server_private_ip = db_private_ips[node_n] + server_public_ip = db_public_ips[node_n] + for master_shard_id in range( + shard_start, shard_start + primaries_this_node + 1 + ): + shard_port = master_shard_id + cluster_start_port - 1 + try: + local_redis_conn, ssh_tunnel = ssh_tunnel_redisconn( + shard_port, + server_private_ip, + server_public_ip, + username, + db_ssh_port, + private_key, + redis_password, + ) + local_redis_conn.ping() + redis_conns.append(local_redis_conn) + except redis.exceptions.ConnectionError as e: + logging.error( + "A error occurred while spinning DB: {}".format(e.__str__()) + ) + logfile = logfiles[0] + + remote_file = "{}/{}".format(temporary_dir, logfile) + logging.error( + "Trying to fetch DB remote log {} into {}".format( + remote_file, logfile + ) + ) + db_error_artifacts( + db_ssh_port, + dirname, + full_logfiles, + logname, + private_key, + s3_bucket_name, + s3_bucket_path, + server_public_ip, + temporary_dir, + True, + username, + ) + shard_start = shard_start + primaries_this_node + + # we only use the 1st node for single endpoint tests + # for client connections, even on cluster setups we always reload the + # cluster info via cluster slots, so we can indeed only use the 1st node ip + if type(server_private_ips) is str: + server_private_ip = server_private_ips + server_public_ip = server_public_ips + else: + server_private_ip = server_private_ips[0] + server_public_ip = server_public_ips[0] if is_single_endpoint(setup_type): try: @@ -238,7 +268,7 @@ def remote_db_spin( setup_redis_cluster_from_conns( redis_conns, shard_count, - server_private_ip, + server_private_ips, cluster_start_port, ) server_plaintext_port = cluster_start_port diff --git a/redisbench_admin/run_remote/standalone.py b/redisbench_admin/run_remote/standalone.py index 58cf577..5cc71bc 100644 --- a/redisbench_admin/run_remote/standalone.py +++ b/redisbench_admin/run_remote/standalone.py @@ -54,16 +54,22 @@ def spin_up_standalone_remote_redis( def cp_local_dbdir_to_remote( - dbdir_folder, private_key, server_public_ip, temporary_dir, username + dbdir_folder, private_key, server_public_ips, temporary_dir, username ): if dbdir_folder is not None: - logging.info( - "Copying entire content of {} into temporary path: {}".format( - dbdir_folder, temporary_dir + + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + for server_public_ip in server_public_ips: + logging.info( + "Copying entire content of {} into temporary path: {} of remote IP {}".format( + dbdir_folder, temporary_dir, server_public_ip + ) + ) + ssh = SSHSession( + server_public_ip, username, key_file=open(private_key, "r") ) - ) - ssh = SSHSession(server_public_ip, username, key_file=open(private_key, "r")) - ssh.put_all(dbdir_folder, temporary_dir) + ssh.put_all(dbdir_folder, temporary_dir) def remote_module_files_cp( From 1fd7de13cfd7fa94198a89b62d6b5fda9c304336 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 19 Apr 2023 10:44:41 +0100 Subject: [PATCH 3/6] Fixed multi-node cluster setup steps --- redisbench_admin/environments/oss_cluster.py | 27 +++++++++++++++++--- redisbench_admin/run/cluster.py | 25 +++--------------- redisbench_admin/run_local/local_db.py | 4 ++- redisbench_admin/run_remote/remote_db.py | 6 +++-- redisbench_admin/run_remote/standalone.py | 1 - 5 files changed, 35 insertions(+), 28 deletions(-) diff --git a/redisbench_admin/environments/oss_cluster.py b/redisbench_admin/environments/oss_cluster.py index 0bdbf1d..6316d38 100644 --- a/redisbench_admin/environments/oss_cluster.py +++ b/redisbench_admin/environments/oss_cluster.py @@ -9,7 +9,6 @@ import redis -from redisbench_admin.run.cluster import split_primaries_per_db_nodes from redisbench_admin.utils.utils import ( wait_for_conn, redis_server_config_module_part, @@ -101,8 +100,10 @@ def generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port def generate_meet_cmds( shard_count, server_private_ips, cluster_start_port, meet_cmds, shard_start=1 ): - generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port) - for pair in generate_host_port_pairs: + host_port_pairs = generate_host_port_pairs( + server_private_ips, shard_count, cluster_start_port + ) + for pair in host_port_pairs: meet_cmds.append("CLUSTER MEET {} {}".format(pair[0], pair[1])) return meet_cmds @@ -220,3 +221,23 @@ def generate_cluster_redis_server_args( def get_cluster_dbfilename(port): return "cluster-node-port-{}.rdb".format(port) + + +def split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count): + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + if type(server_private_ips) is str: + server_private_ips = [server_private_ips] + db_node_count = len(server_private_ips) + primaries_per_db_node = db_node_count // shard_count + remainder_first_node = db_node_count % shard_count + first_node_primaries = primaries_per_db_node + remainder_first_node + logging.info("DB node {} will have {} primaries".format(1, first_node_primaries)) + primaries_per_node = [first_node_primaries] + for node_n, node_id in enumerate(range(2, db_node_count + 1), start=2): + logging.info("Setting") + logging.info( + "DB node {} will have {} primaries".format(node_n, primaries_per_db_node) + ) + primaries_per_node.append(primaries_per_db_node) + return primaries_per_node, server_private_ips, server_public_ips diff --git a/redisbench_admin/run/cluster.py b/redisbench_admin/run/cluster.py index ac24772..54e6f1e 100644 --- a/redisbench_admin/run/cluster.py +++ b/redisbench_admin/run/cluster.py @@ -7,7 +7,10 @@ from redisbench_admin.utils.remote import execute_remote_commands -from redisbench_admin.environments.oss_cluster import generate_cluster_redis_server_args +from redisbench_admin.environments.oss_cluster import ( + generate_cluster_redis_server_args, + split_primaries_per_db_nodes, +) from redisbench_admin.utils.utils import wait_for_conn @@ -168,23 +171,3 @@ def spin_up_redis_cluster_remote_redis( shard_start = shard_start + primaries_this_node return logfiles - - -def split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count): - if type(server_public_ips) is str: - server_public_ips = [server_public_ips] - if type(server_private_ips) is str: - server_private_ips = [server_private_ips] - db_node_count = len(server_private_ips) - primaries_per_db_node = db_node_count // shard_count - remainder_first_node = db_node_count % shard_count - first_node_primaries = primaries_per_db_node + remainder_first_node - logging.info("DB node {} will have {} primaries".format(1, first_node_primaries)) - primaries_per_node = [first_node_primaries] - for node_n, node_id in enumerate(range(2, db_node_count + 1), start=2): - logging.info("Setting") - logging.info( - "DB node {} will have {} primaries".format(node_n, primaries_per_db_node) - ) - primaries_per_node.append(primaries_per_db_node) - return primaries_per_node, server_private_ips, server_public_ips diff --git a/redisbench_admin/run_local/local_db.py b/redisbench_admin/run_local/local_db.py index 7d3e78a..c9a8b05 100644 --- a/redisbench_admin/run_local/local_db.py +++ b/redisbench_admin/run_local/local_db.py @@ -133,7 +133,9 @@ def local_db_spin( ) ) if is_process_alive(redis_process) is False: - raise Exception("Redis process is not alive. Failing test.") + raise Exception( + "Redis shard #{} is not alive. Failing test.".format(shardn) + ) if setup_type == "oss-cluster": cluster_init_steps(clusterconfig, redis_conns, local_module_file) diff --git a/redisbench_admin/run_remote/remote_db.py b/redisbench_admin/run_remote/remote_db.py index baed2da..6739597 100644 --- a/redisbench_admin/run_remote/remote_db.py +++ b/redisbench_admin/run_remote/remote_db.py @@ -8,12 +8,14 @@ import redis -from redisbench_admin.environments.oss_cluster import setup_redis_cluster_from_conns +from redisbench_admin.environments.oss_cluster import ( + setup_redis_cluster_from_conns, + split_primaries_per_db_nodes, +) from redisbench_admin.run.cluster import ( spin_up_redis_cluster_remote_redis, debug_reload_rdb, cluster_init_steps, - split_primaries_per_db_nodes, ) from redisbench_admin.run.common import ( check_dbconfig_tool_requirement, diff --git a/redisbench_admin/run_remote/standalone.py b/redisbench_admin/run_remote/standalone.py index b7c7b3f..05d72d3 100644 --- a/redisbench_admin/run_remote/standalone.py +++ b/redisbench_admin/run_remote/standalone.py @@ -56,7 +56,6 @@ def cp_local_dbdir_to_remote( dbdir_folder, private_key, server_public_ips, temporary_dir, username ): if dbdir_folder is not None: - if type(server_public_ips) is str: server_public_ips = [server_public_ips] for server_public_ip in server_public_ips: From f1fc2bba504c48b7a01529301c08e3c9d9cdf7c2 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 19 Apr 2023 11:57:33 +0100 Subject: [PATCH 4/6] Include DB node info in timeseries info --- redisbench_admin/export/export.py | 3 +++ redisbench_admin/run/common.py | 15 +++++++++++---- redisbench_admin/run/redistimeseries.py | 7 +++++++ redisbench_admin/run_remote/remote_client.py | 4 ++-- redisbench_admin/run_remote/remote_env.py | 9 +++++++++ redisbench_admin/run_remote/run_remote.py | 6 ++++++ redisbench_admin/run_remote/standalone.py | 13 ++++++++++++- redisbench_admin/utils/remote.py | 18 ++++++++++++++++++ redisbench_admin/utils/utils.py | 3 +++ tests/test_redistimeseries.py | 2 ++ tests/test_remote_env.py | 4 ++++ 11 files changed, 77 insertions(+), 7 deletions(-) diff --git a/redisbench_admin/export/export.py b/redisbench_admin/export/export.py index d177255..46b350b 100644 --- a/redisbench_admin/export/export.py +++ b/redisbench_admin/export/export.py @@ -154,6 +154,7 @@ def export_command_logic(args, project_name, project_version): github_org, github_repo, triggering_env, + 1, ) logging.info("Parsed a total of {} metrics".format(len(timeseries_dict.keys()))) logging.info( @@ -213,6 +214,7 @@ def export_json_to_timeseries_dict( tf_github_org, tf_github_repo, triggering_env, + n_db_nodes=1, ): results_dict = {} for test_name, d in benchmark_file.items(): @@ -237,6 +239,7 @@ def export_json_to_timeseries_dict( tf_github_repo, triggering_env, False, + n_db_nodes, ) results_dict[ts_name] = { "labels": timeserie_tags.copy(), diff --git a/redisbench_admin/run/common.py b/redisbench_admin/run/common.py index 28a6e72..aee25a2 100644 --- a/redisbench_admin/run/common.py +++ b/redisbench_admin/run/common.py @@ -68,7 +68,7 @@ def prepare_benchmark_parameters( benchmark_config, benchmark_tool, server_plaintext_port, - server_private_ip, + server_private_ips, remote_results_file, isremote=False, current_workdir=None, @@ -96,7 +96,7 @@ def prepare_benchmark_parameters( isremote, remote_results_file, server_plaintext_port, - server_private_ip, + server_private_ips, client_public_ip, username, private_key, @@ -116,7 +116,7 @@ def prepare_benchmark_parameters( isremote, remote_results_file, server_plaintext_port, - server_private_ip, + server_private_ips, client_public_ip, username, private_key, @@ -146,13 +146,17 @@ def prepare_benchmark_parameters_specif_tooling( isremote, remote_results_file, server_plaintext_port, - server_private_ip, + server_private_ips, client_public_ip, username, private_key, client_ssh_port, redis_password=None, ): + if type(server_private_ips) == list: + server_private_ip = server_private_ips[0] + else: + server_private_ip = server_private_ips if "redis-benchmark" in benchmark_tool: command_arr, command_str = prepare_redis_benchmark_command( benchmark_tool, @@ -296,6 +300,7 @@ def common_exporter_logic( build_variant_name=None, running_platform=None, datapoints_timestamp=None, + n_db_nodes=1, ): per_version_time_series_dict = {} per_branch_time_series_dict = {} @@ -341,6 +346,7 @@ def common_exporter_logic( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) if tf_github_branch is not None and tf_github_branch != "": # extract per branch datapoints @@ -363,6 +369,7 @@ def common_exporter_logic( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) else: logging.error( diff --git a/redisbench_admin/run/redistimeseries.py b/redisbench_admin/run/redistimeseries.py index d3cf651..da0d95c 100644 --- a/redisbench_admin/run/redistimeseries.py +++ b/redisbench_admin/run/redistimeseries.py @@ -39,6 +39,7 @@ def prepare_timeseries_dict( build_variant_name=None, running_platform=None, datapoints_timestamp=None, + n_db_nodes=1, ): time_series_dict = {} # check which metrics to extract @@ -67,6 +68,7 @@ def prepare_timeseries_dict( build_variant_name, running_platform, datapoints_timestamp, + n_db_nodes, ) time_series_dict.update(per_version_time_series_dict) time_series_dict.update(per_branch_time_series_dict) @@ -93,6 +95,7 @@ def add_standardized_metric_bybranch( metadata_tags={}, build_variant_name=None, running_platform=None, + n_db_nodes=1, ): if metric_value is not None: tsname_use_case_duration = get_ts_metric_name( @@ -109,6 +112,7 @@ def add_standardized_metric_bybranch( False, build_variant_name, running_platform, + n_db_nodes, ) labels = get_project_ts_tags( tf_github_org, @@ -119,6 +123,7 @@ def add_standardized_metric_bybranch( metadata_tags, build_variant_name, running_platform, + n_db_nodes, ) labels["branch"] = tf_github_branch labels["deployment_name+branch"] = "{} {}".format( @@ -238,6 +243,7 @@ def timeseries_test_sucess_flow( build_variant_name=None, running_platform=None, timeseries_dict=None, + n_db_nodes=1, ): testcase_metric_context_paths = [] version_target_tables = None @@ -266,6 +272,7 @@ def timeseries_test_sucess_flow( build_variant_name, running_platform, start_time_ms, + n_db_nodes, ) if push_results_redistimeseries: logging.info( diff --git a/redisbench_admin/run_remote/remote_client.py b/redisbench_admin/run_remote/remote_client.py index 4f52388..f50130f 100644 --- a/redisbench_admin/run_remote/remote_client.py +++ b/redisbench_admin/run_remote/remote_client.py @@ -38,7 +38,7 @@ def run_remote_client_tool( remote_results_file, return_code, server_plaintext_port, - server_private_ip, + server_private_ips, start_time_ms, start_time_str, username, @@ -96,7 +96,7 @@ def run_remote_client_tool( benchmark_config, benchmark_tool, server_plaintext_port, - server_private_ip, + server_private_ips, remote_results_file, True, None, diff --git a/redisbench_admin/run_remote/remote_env.py b/redisbench_admin/run_remote/remote_env.py index d5efd90..1e8e269 100644 --- a/redisbench_admin/run_remote/remote_env.py +++ b/redisbench_admin/run_remote/remote_env.py @@ -72,7 +72,16 @@ def remote_env_setup( tf_override_name, tf_folder_path, ) + n_db_hosts = 1 + n_client_hosts = 1 + if type(server_public_ip) == list: + n_db_hosts = len(server_public_ip) + if type(client_public_ip) == list: + n_client_hosts = len(client_public_ip) + return ( + n_db_hosts, + n_client_hosts, client_public_ip, server_plaintext_port, server_private_ip, diff --git a/redisbench_admin/run_remote/run_remote.py b/redisbench_admin/run_remote/run_remote.py index 111b957..5f328c1 100644 --- a/redisbench_admin/run_remote/run_remote.py +++ b/redisbench_admin/run_remote/run_remote.py @@ -399,6 +399,8 @@ def run_remote_command_logic(args, project_name, project_version): client_artifacts_map = {} temporary_dir = get_tmp_folder_rnd() ( + n_db_hosts, + n_client_hosts, client_public_ip, server_plaintext_port, server_private_ip, @@ -889,6 +891,10 @@ def run_remote_command_logic(args, project_name, project_version): tf_github_repo, tf_triggering_env, metadata_tags, + None, + None, + None, + n_db_hosts, ) if branch_target_tables is not None: for ( diff --git a/redisbench_admin/run_remote/standalone.py b/redisbench_admin/run_remote/standalone.py index 05d72d3..bb0e263 100644 --- a/redisbench_admin/run_remote/standalone.py +++ b/redisbench_admin/run_remote/standalone.py @@ -17,7 +17,7 @@ def spin_up_standalone_remote_redis( temporary_dir, - server_public_ip, + server_public_ips, username, private_key, remote_module_files, @@ -36,6 +36,17 @@ def spin_up_standalone_remote_redis( redis_7, ) + if type(server_public_ips) is str: + server_public_ips = [server_public_ips] + server_public_ip = server_public_ips[0] + logging.info( + "Given we've received multiple IPs for DB server {} and this is a standalone we're using the first one: {}".format( + server_public_ips, server_public_ip + ) + ) + else: + server_public_ip = server_public_ips + # start redis-server commands = [initial_redis_cmd] res = execute_remote_commands( diff --git a/redisbench_admin/utils/remote.py b/redisbench_admin/utils/remote.py index 72e7cfa..6041d1d 100644 --- a/redisbench_admin/utils/remote.py +++ b/redisbench_admin/utils/remote.py @@ -106,6 +106,8 @@ def copy_file_to_remote_setup( def fetch_file_from_remote_setup( server_public_ip, username, private_key, local_file, remote_file ): + if type(server_public_ip) == list: + server_public_ip = server_public_ip[0] logging.info( "Retrieving remote file {} from remote server {} ".format( remote_file, server_public_ip @@ -148,6 +150,8 @@ def execute_remote_commands( def connect_remote_ssh(port, private_key, server_public_ip, username): + if type(server_public_ip) == list: + server_public_ip = server_public_ip[0] k = paramiko.RSAKey.from_private_key_file(private_key) c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) @@ -683,6 +687,7 @@ def extract_perversion_timeseries_from_results( build_variant_name=None, running_platform=None, testcase_metric_context_paths=[], + n_db_nodes=1, ): break_by_key = "version" break_by_str = "by.{}".format(break_by_key) @@ -703,6 +708,7 @@ def extract_perversion_timeseries_from_results( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) return True, branch_time_series_dict, target_tables @@ -724,6 +730,7 @@ def common_timeseries_extraction( build_variant_name=None, running_platform=None, testcase_metric_context_paths=[], + n_db_nodes=1, ): time_series_dict = {} target_tables = {} @@ -758,6 +765,7 @@ def common_timeseries_extraction( tf_triggering_env, time_series_dict, use_metric_context_path, + n_db_nodes, ) target_tables[target_table_keyname] = target_table_dict @@ -786,6 +794,7 @@ def from_metric_kv_to_timeserie( tf_triggering_env, time_series_dict, use_metric_context_path, + n_db_nodes=1, ): timeserie_tags, ts_name = get_ts_tags_and_name( break_by_key, @@ -805,6 +814,7 @@ def from_metric_kv_to_timeserie( tf_github_repo, tf_triggering_env, use_metric_context_path, + n_db_nodes, ) time_series_dict[ts_name] = { "labels": timeserie_tags.copy(), @@ -881,6 +891,7 @@ def get_ts_tags_and_name( tf_github_repo, tf_triggering_env, use_metric_context_path, + n_db_nodes=1, ): # prepare tags timeserie_tags = get_project_ts_tags( @@ -892,6 +903,7 @@ def get_ts_tags_and_name( metadata_tags, build_variant_name, running_platform, + n_db_nodes, ) timeserie_tags[break_by_key] = break_by_value timeserie_tags["{}+{}".format("deployment_name", break_by_key)] = "{} {}".format( @@ -907,6 +919,7 @@ def get_ts_tags_and_name( test_name, build_variant_name ) timeserie_tags["metric"] = str(metric_name) + timeserie_tags["db_nodes"] = str(n_db_nodes) timeserie_tags["metric_name"] = metric_name timeserie_tags["metric_context_path"] = metric_context_path if metric_context_path is not None: @@ -930,6 +943,7 @@ def get_ts_tags_and_name( use_metric_context_path, build_variant_name, running_platform, + n_db_nodes, ) return timeserie_tags, ts_name @@ -943,6 +957,7 @@ def get_project_ts_tags( metadata_tags={}, build_variant_name=None, running_platform=None, + n_db_nodes=1, ): tags = { "github_org": tf_github_org, @@ -951,6 +966,7 @@ def get_project_ts_tags( "deployment_type": deployment_type, "deployment_name": deployment_name, "triggering_env": tf_triggering_env, + "n_db_nodes": str(n_db_nodes), } if build_variant_name is not None: tags["build_variant"] = build_variant_name @@ -976,6 +992,7 @@ def extract_perbranch_timeseries_from_results( build_variant_name=None, running_platform=None, testcase_metric_context_paths=[], + n_db_nodes=1, ): break_by_key = "branch" break_by_str = "by.{}".format(break_by_key) @@ -996,6 +1013,7 @@ def extract_perbranch_timeseries_from_results( build_variant_name, running_platform, testcase_metric_context_paths, + n_db_nodes, ) return True, branch_time_series_dict, target_tables diff --git a/redisbench_admin/utils/utils.py b/redisbench_admin/utils/utils.py index 4cd44be..e22e1f3 100644 --- a/redisbench_admin/utils/utils.py +++ b/redisbench_admin/utils/utils.py @@ -326,6 +326,7 @@ def get_ts_metric_name( use_metric_context_path=False, build_variant_name=None, running_platform=None, + n_db_nodes=1, ): if use_metric_context_path: metric_name = "{}/{}".format(metric_name, metric_context_path) @@ -339,6 +340,8 @@ def get_ts_metric_name( deployment_name = "/{}".format(deployment_name) else: deployment_name = "" + if n_db_nodes > 1: + deployment_name = deployment_name + "{}-nodes".format(n_db_nodes) ts_name = ( "ci.benchmarks.redislabs/{by}/" "{triggering_env}/{github_org}/{github_repo}/" diff --git a/tests/test_redistimeseries.py b/tests/test_redistimeseries.py index a6bd759..7f4a330 100644 --- a/tests/test_redistimeseries.py +++ b/tests/test_redistimeseries.py @@ -97,6 +97,8 @@ def test_timeseries_test_sucess_flow(): }, "build1", "platform1", + None, + 1, ) assert rts.exists(testcases_and_metric_context_path_setname) diff --git a/tests/test_remote_env.py b/tests/test_remote_env.py index 39c1fb9..d617b6c 100644 --- a/tests/test_remote_env.py +++ b/tests/test_remote_env.py @@ -34,6 +34,8 @@ def test_remote_env_setup(): tf_setup_name_sufix = "suffix" benchmark_config = {} ( + n_db_hosts, + n_client_hosts, client_public_ip, server_plaintext_port, server_private_ip, @@ -59,6 +61,8 @@ def test_remote_env_setup(): assert client_public_ip == "2.2.2.2" assert server_private_ip == "10.0.0.1" assert server_public_ip == "1.1.1.1" + assert n_client_hosts == 1 + assert n_db_hosts == 1 # using inventory but missing one manadatory key args = parser.parse_args( From b8200f850dd1f6f79a98507616353f445659b370 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 19 Apr 2023 18:24:20 +0100 Subject: [PATCH 5/6] Extended redistimeseries test to cover multi-node scenarios --- redisbench_admin/run_remote/run_remote.py | 11 ++++++- redisbench_admin/utils/utils.py | 2 +- tests/test_redistimeseries.py | 37 +++++++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/redisbench_admin/run_remote/run_remote.py b/redisbench_admin/run_remote/run_remote.py index 5f328c1..90e965c 100644 --- a/redisbench_admin/run_remote/run_remote.py +++ b/redisbench_admin/run_remote/run_remote.py @@ -794,6 +794,7 @@ def run_remote_command_logic(args, project_name, project_version): tf_triggering_env, {"metric-type": "redis-metrics"}, expire_ms, + n_db_hosts, ) if collect_commandstats: ( @@ -817,6 +818,7 @@ def run_remote_command_logic(args, project_name, project_version): tf_triggering_env, {"metric-type": "commandstats"}, expire_ms, + n_db_hosts, ) ( end_time_ms, @@ -839,6 +841,7 @@ def run_remote_command_logic(args, project_name, project_version): tf_triggering_env, {"metric-type": "latencystats"}, expire_ms, + n_db_hosts, ) if setup_details["env"] is None: @@ -1202,6 +1205,7 @@ def export_redis_metrics( tf_triggering_env, metadata_dict=None, expire_ms=0, + n_db_nodes=1, ): datapoint_errors = 0 datapoint_inserts = 0 @@ -1236,11 +1240,16 @@ def export_redis_metrics( metric_name, metric_value, ) in overall_end_time_metrics.items(): + setup_name_and_nodes = setup_name + if n_db_nodes > 1: + setup_name_and_nodes = setup_name_and_nodes + "-{}-nodes".format( + n_db_nodes + ) tsname_metric = "{}/{}/{}/benchmark_end/{}/{}".format( sprefix, test_name, by_variant, - setup_name, + setup_name_and_nodes, metric_name, ) diff --git a/redisbench_admin/utils/utils.py b/redisbench_admin/utils/utils.py index e22e1f3..6272511 100644 --- a/redisbench_admin/utils/utils.py +++ b/redisbench_admin/utils/utils.py @@ -341,7 +341,7 @@ def get_ts_metric_name( else: deployment_name = "" if n_db_nodes > 1: - deployment_name = deployment_name + "{}-nodes".format(n_db_nodes) + deployment_name = deployment_name + "-{}-nodes".format(n_db_nodes) ts_name = ( "ci.benchmarks.redislabs/{by}/" "{triggering_env}/{github_org}/{github_repo}/" diff --git a/tests/test_redistimeseries.py b/tests/test_redistimeseries.py index 7f4a330..d71bb12 100644 --- a/tests/test_redistimeseries.py +++ b/tests/test_redistimeseries.py @@ -208,6 +208,8 @@ def test_timeseries_test_sucess_flow(): {"arch": "arm64", "os": "ubuntu:16.04", "compiler": "icc"}, "build", "platform2", + None, + 2, ) assert "arm64".encode() in rts.smembers(project_archs_setname) assert "ubuntu:16.04".encode() in rts.smembers(project_oss_setname) @@ -224,5 +226,40 @@ def test_timeseries_test_sucess_flow(): assert len(rts.smembers(project_branches_setname)) == 1 assert len(rts.smembers(project_versions_setname)) == 1 + # check multi-node timeseries + rts.flushall() + deployment_type = "oss-cluster" + deployment_name = "oss-cluster-02-primaries" + + timeseries_test_sucess_flow( + True, + project_version, + benchmark_config, + benchmark_duration_seconds, + dataset_load_duration_seconds, + metrics, + deployment_name, + deployment_type, + merged_exporter_timemetric_path, + results_dict, + rts, + start_time_ms, + test_name, + tf_github_branch, + tf_github_org, + tf_github_repo, + tf_triggering_env, + {"arch": "arm64", "os": "ubuntu:16.04", "compiler": "icc"}, + "build", + "platform2", + None, + 2, + ) + + keys = [x.decode() for x in rts.keys()] + for keyname in keys: + if "target_tables" not in keyname: + if rts.type(keyname) is "TSDB-TYPE": + assert "-2-nodes" in keyname except redis.exceptions.ConnectionError: pass From b9dfaf281ecd517ababf5f54651721cb16d8c18b Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 20 Apr 2023 01:20:45 +0100 Subject: [PATCH 6/6] Improved multi-vm deployment checks and logging --- redisbench_admin/environments/oss_cluster.py | 54 +++++++++++---- redisbench_admin/run/cluster.py | 5 +- redisbench_admin/run/common.py | 17 ++++- redisbench_admin/run_local/run_local.py | 2 + redisbench_admin/run_remote/remote_db.py | 50 ++++++++++++-- redisbench_admin/run_remote/run_remote.py | 23 ++++++ tests/test_common.py | 10 ++- tests/test_oss_cluster.py | 73 ++++++++++++++++++++ 8 files changed, 209 insertions(+), 25 deletions(-) create mode 100644 tests/test_oss_cluster.py diff --git a/redisbench_admin/environments/oss_cluster.py b/redisbench_admin/environments/oss_cluster.py index 6316d38..c512e3b 100644 --- a/redisbench_admin/environments/oss_cluster.py +++ b/redisbench_admin/environments/oss_cluster.py @@ -67,10 +67,14 @@ def spin_up_local_redis_cluster( return redis_processes, redis_conns -def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_port): +def setup_redis_cluster_from_conns( + redis_conns, shard_count, shard_host, start_port, db_nodes=1 +): meet_cmds = [] logging.info("Setting up cluster. Total {} primaries.".format(len(redis_conns))) - meet_cmds = generate_meet_cmds(shard_count, shard_host, start_port, meet_cmds) + meet_cmds = generate_meet_cmds( + shard_count, shard_host, start_port, meet_cmds, 1, db_nodes + ) status = setup_oss_cluster_from_conns(meet_cmds, redis_conns, shard_count) if status is True: for conn in redis_conns: @@ -78,13 +82,13 @@ def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_p return status -def generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port): +def generate_host_port_pairs( + server_private_ips, shard_count, cluster_start_port, required_node_count=1 +): host_port_pairs = [] - ( - primaries_per_node, - db_private_ips, - _, - ) = split_primaries_per_db_nodes(server_private_ips, None, shard_count) + (primaries_per_node, db_private_ips, _,) = split_primaries_per_db_nodes( + server_private_ips, None, shard_count, required_node_count + ) shard_start = 1 for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): server_private_ip = db_private_ips[node_n] @@ -98,10 +102,15 @@ def generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port def generate_meet_cmds( - shard_count, server_private_ips, cluster_start_port, meet_cmds, shard_start=1 + shard_count, + server_private_ips, + cluster_start_port, + meet_cmds, + shard_start=1, + db_nodes=1, ): host_port_pairs = generate_host_port_pairs( - server_private_ips, shard_count, cluster_start_port + server_private_ips, shard_count, cluster_start_port, db_nodes ) for pair in host_port_pairs: meet_cmds.append("CLUSTER MEET {} {}".format(pair[0], pair[1])) @@ -223,21 +232,38 @@ def get_cluster_dbfilename(port): return "cluster-node-port-{}.rdb".format(port) -def split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count): +def split_primaries_per_db_nodes( + server_private_ips, server_public_ips, shard_count, required_node_count=None +): if type(server_public_ips) is str: server_public_ips = [server_public_ips] if type(server_private_ips) is str: server_private_ips = [server_private_ips] db_node_count = len(server_private_ips) - primaries_per_db_node = db_node_count // shard_count - remainder_first_node = db_node_count % shard_count + if required_node_count is None: + required_node_count = db_node_count + if db_node_count < required_node_count: + error_msg = "There are less servers ({}) than the required ones for this setup {}. Failing test...".format( + db_node_count, required_node_count + ) + logging.error(error_msg) + raise Exception(error_msg) + if server_public_ips is not None: + server_public_ips = server_public_ips[0:required_node_count] + if server_private_ips is not None: + server_private_ips = server_private_ips[0:required_node_count] + primaries_per_db_node = shard_count // required_node_count + remainder_first_node = shard_count % required_node_count first_node_primaries = primaries_per_db_node + remainder_first_node logging.info("DB node {} will have {} primaries".format(1, first_node_primaries)) primaries_per_node = [first_node_primaries] - for node_n, node_id in enumerate(range(2, db_node_count + 1), start=2): + for node_n, node_id in enumerate(range(2, required_node_count + 1), start=2): logging.info("Setting") logging.info( "DB node {} will have {} primaries".format(node_n, primaries_per_db_node) ) primaries_per_node.append(primaries_per_db_node) + logging.info("Final primaries per node {}".format(primaries_per_node)) + logging.info("Final server_private_ips {}".format(server_private_ips)) + logging.info("Final server_public_ips {}".format(server_public_ips)) return primaries_per_node, server_private_ips, server_public_ips diff --git a/redisbench_admin/run/cluster.py b/redisbench_admin/run/cluster.py index 54e6f1e..98ec728 100644 --- a/redisbench_admin/run/cluster.py +++ b/redisbench_admin/run/cluster.py @@ -117,6 +117,7 @@ def spin_up_redis_cluster_remote_redis( modules_configuration_parameters_map, logname, redis_7=True, + required_db_nodes=1, ): logging.info("Generating the remote redis-server command arguments") @@ -127,7 +128,9 @@ def spin_up_redis_cluster_remote_redis( primaries_per_node, server_private_ips, server_public_ips, - ) = split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count) + ) = split_primaries_per_db_nodes( + server_private_ips, server_public_ips, shard_count, required_db_nodes + ) shard_start = 1 for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): server_private_ip = server_private_ips[node_n] diff --git a/redisbench_admin/run/common.py b/redisbench_admin/run/common.py index aee25a2..4255e41 100644 --- a/redisbench_admin/run/common.py +++ b/redisbench_admin/run/common.py @@ -500,7 +500,11 @@ def extract_test_feasible_setups( "name": "oss-standalone", "type": "oss-standalone", "redis_topology": {"primaries": 1, "replicas": 0}, - "resources": {"requests": {"cpu": "1000m"}, "limits": {"cpu": "2000m"}}, + "resources": { + "nodes": 1, + "requests": {"cpu": "1000m"}, + "limits": {"cpu": "2000m"}, + }, } logging.info( "Using a backwards compatible 'oss-standalone' setup, with settings: {}".format( @@ -515,7 +519,16 @@ def get_setup_type_and_primaries_count(setup_settings): setup_type = setup_settings["type"] setup_name = setup_settings["name"] shard_count = setup_settings["redis_topology"]["primaries"] - return setup_name, setup_type, shard_count + + node_count = 1 + shard_placement = "dense" + if "redis_topology" in setup_settings: + if "placement" in setup_settings["redis_topology"]: + shard_placement = setup_settings["redis_topology"]["placement"] + if "resources" in setup_settings: + if "nodes" in setup_settings["resources"]: + node_count = setup_settings["resources"]["nodes"] + return setup_name, setup_type, shard_count, shard_placement, node_count def merge_default_and_config_metrics( diff --git a/redisbench_admin/run_local/run_local.py b/redisbench_admin/run_local/run_local.py index d2c5d9f..2fd0c87 100644 --- a/redisbench_admin/run_local/run_local.py +++ b/redisbench_admin/run_local/run_local.py @@ -156,6 +156,8 @@ def run_local_command_logic(args, project_name, project_version): setup_name, setup_type, shard_count, + _, + _, ) = get_setup_type_and_primaries_count(setup_settings) if args.allowed_setups != "": allowed_setups = args.allowed_setups.split(",") diff --git a/redisbench_admin/run_remote/remote_db.py b/redisbench_admin/run_remote/remote_db.py index 6739597..fb095cc 100644 --- a/redisbench_admin/run_remote/remote_db.py +++ b/redisbench_admin/run_remote/remote_db.py @@ -109,6 +109,8 @@ def remote_db_spin( redis_password=None, flushall_on_every_test_start=False, ignore_keyspace_errors=False, + shard_placement="sparse", + required_node_count=1, ): ( _, @@ -145,6 +147,9 @@ def remote_db_spin( topology_setup_start_time = datetime.datetime.now() if setup_type == "oss-cluster": if skip_redis_setup is False: + logging.info( + "Setting up oss-cluster with {} nodes".format(required_node_count) + ) logfiles = spin_up_redis_cluster_remote_redis( server_public_ips, server_private_ips, @@ -159,14 +164,46 @@ def remote_db_spin( modules_configuration_parameters_map, logname, redis_7, + required_node_count, ) - ( - primaries_per_node, - db_private_ips, - db_public_ips, - ) = split_primaries_per_db_nodes( - server_private_ips, server_public_ips, shard_count + if shard_placement == "sparse": + logging.info( + "Setting up sparse placement between {} nodes".format( + required_node_count + ) + ) + ( + primaries_per_node, + db_private_ips, + db_public_ips, + ) = split_primaries_per_db_nodes( + server_private_ips, + server_public_ips, + shard_count, + required_node_count, + ) + else: + logging.info( + "Setting up dense placement between {} nodes".format( + required_node_count + ) + ) + ( + primaries_per_node, + db_private_ips, + db_public_ips, + ) = split_primaries_per_db_nodes( + server_private_ips[0], + server_public_ips[0], + shard_count, + required_node_count, + ) + logging.info( + "Shard placement is {}. {} primaries per node. DB private IPs: {}; DB public IPs {}".format( + shard_placement, primaries_per_node, db_private_ips, db_public_ips + ) ) + shard_start = 1 for node_n, primaries_this_node in enumerate(primaries_per_node, start=0): server_private_ip = db_private_ips[node_n] @@ -272,6 +309,7 @@ def remote_db_spin( shard_count, server_private_ips, cluster_start_port, + required_node_count, ) server_plaintext_port = cluster_start_port diff --git a/redisbench_admin/run_remote/run_remote.py b/redisbench_admin/run_remote/run_remote.py index 90e965c..0994b81 100644 --- a/redisbench_admin/run_remote/run_remote.py +++ b/redisbench_admin/run_remote/run_remote.py @@ -366,6 +366,8 @@ def run_remote_command_logic(args, project_name, project_version): setup_name, setup_type, shard_count, + shard_placement, + required_db_node_count, ) = get_setup_type_and_primaries_count(setup_settings) if args.allowed_setups != "": allowed_setups = args.allowed_setups.split(",") @@ -426,6 +428,25 @@ def run_remote_command_logic(args, project_name, project_version): TF_OVERRIDE_REMOTE, ) + if n_db_hosts < required_db_node_count: + logging.warning( + "SKIPPING test named {}, for setup named {} of topology type {} given node_count={} and the setup has {} nodes.".format( + test_name, + setup_name, + setup_type, + required_db_node_count, + n_db_hosts, + ) + ) + continue + + else: + logging.info( + "Setup requires {} db nodes. This remote setup has {}. All OK".format( + required_db_node_count, n_db_hosts + ) + ) + # after we've created the env, even on error we should always teardown # in case of some unexpected error we fail the test try: @@ -507,6 +528,8 @@ def run_remote_command_logic(args, project_name, project_version): redis_password, flushall_on_every_test_start, ignore_keyspace_errors, + shard_placement, + required_db_node_count, ) if benchmark_type == "read-only": ro_benchmark_set( diff --git a/tests/test_common.py b/tests/test_common.py index 7cca968..ac21f56 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -396,7 +396,10 @@ def test_extract_test_feasible_setups(): ] assert standalone_setup_type == "oss-standalone" assert standalone_shard_count == 1 - n, t, c = get_setup_type_and_primaries_count(test_setups["oss-standalone"]) + n, t, c, placement, node_count = get_setup_type_and_primaries_count( + test_setups["oss-standalone"] + ) + assert node_count == 1 assert standalone_setup_type == t assert standalone_shard_count == c @@ -406,8 +409,11 @@ def test_extract_test_feasible_setups(): osscluster_shard_count = test_setups["oss-cluster-3-primaries"]["redis_topology"][ "primaries" ] - n, t, c = get_setup_type_and_primaries_count(test_setups["oss-cluster-3-primaries"]) + n, t, c, placement, node_count = get_setup_type_and_primaries_count( + test_setups["oss-cluster-3-primaries"] + ) assert osscluster_setup_type == t + assert node_count == 1 assert osscluster_shard_count == c # wrong read diff --git a/tests/test_oss_cluster.py b/tests/test_oss_cluster.py new file mode 100644 index 0000000..19123ed --- /dev/null +++ b/tests/test_oss_cluster.py @@ -0,0 +1,73 @@ +from redisbench_admin.environments.oss_cluster import split_primaries_per_db_nodes + + +def test_split_primaries_per_db_nodes(): + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 4, + 1, + ) + assert primaries_per_node == [4] + assert server_private_ips == ["10.3.0.169"] + assert server_public_ips == ["18.117.74.99"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 4, + 2, + ) + assert primaries_per_node == [2, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 5, + 2, + ) + assert primaries_per_node == [3, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 6, + 3, + ) + assert primaries_per_node == [2, 2, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55", "10.3.0.92"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160", "3.145.92.27"] + + ( + primaries_per_node, + server_private_ips, + server_public_ips, + ) = split_primaries_per_db_nodes( + ["10.3.0.169", "10.3.0.55", "10.3.0.92"], + ["18.117.74.99", "3.144.3.160", "3.145.92.27"], + 6, + None, + ) + assert primaries_per_node == [2, 2, 2] + assert server_private_ips == ["10.3.0.169", "10.3.0.55", "10.3.0.92"] + assert server_public_ips == ["18.117.74.99", "3.144.3.160", "3.145.92.27"]