diff --git a/common/ssh_client/base.py b/common/ssh_client/base.py index 3d235863..5b6f9a5b 100644 --- a/common/ssh_client/base.py +++ b/common/ssh_client/base.py @@ -54,7 +54,7 @@ def get_ip(self): return self.client.get_ip() def progress_bar(self, transferred, to_be_transferred, suffix=''): - if self.inner_config_manager.get("obdiag", {}).get("logger", {}).get("silent") or False: + if self.stdio.silent: return bar_len = 20 filled_len = int(round(bar_len * transferred / float(to_be_transferred))) diff --git a/common/ssh_client/remote_client.py b/common/ssh_client/remote_client.py index 581544e3..537eebdc 100644 --- a/common/ssh_client/remote_client.py +++ b/common/ssh_client/remote_client.py @@ -119,7 +119,7 @@ def download(self, remote_path, local_path): self._sftp_client.close() def progress_bar(self, transferred, to_be_transferred, suffix=''): - if self.inner_config_manager.get("obdiag", {}).get("logger", {}).get("silent") or False: + if self.stdio.silent: return bar_len = 20 filled_len = int(round(bar_len * transferred / float(to_be_transferred))) diff --git a/conf/inner_config.yml b/conf/inner_config.yml index 7d84c727..7ee54841 100644 --- a/conf/inner_config.yml +++ b/conf/inner_config.yml @@ -16,6 +16,8 @@ obdiag: silent: false ssh_client: remote_client_sudo: 0 +analyze: + thread_nums: 3 check: ignore_version: false work_path: "~/.obdiag/check" @@ -26,5 +28,6 @@ check: gather: scenes_base_path: "~/.obdiag/gather/tasks" redact_processing_num: 3 + thread_nums: 3 rca: result_path: "./obdiag_rca/" diff --git a/config.py b/config.py index b1809f28..4d08ed10 100644 --- a/config.py +++ b/config.py @@ -80,6 +80,7 @@ 'remote_client_sudo': False, }, }, + 'analyze': {"thread_nums": 3}, 'check': { 'ignore_version': False, 'work_path': '~/.obdiag/check', @@ -90,7 +91,7 @@ 'package_file': '~/.obdiag/check/check_package.yaml', 'tasks_base_path': '~/.obdiag/check/tasks/', }, - 'gather': {'scenes_base_path': '~/.obdiag/gather/tasks', 'redact_processing_num': 3}, + 'gather': {'scenes_base_path': '~/.obdiag/gather/tasks', 'redact_processing_num': 3, "thread_nums": 3}, 'rca': { 'result_path': './obdiag_rca/', }, diff --git a/handler/analyzer/analyze_log.py b/handler/analyzer/analyze_log.py index f85eabc3..b667e381 100644 --- a/handler/analyzer/analyze_log.py +++ b/handler/analyzer/analyze_log.py @@ -18,16 +18,18 @@ import datetime import os import re +import threading + import tabulate from handler.base_shell_handler import BaseShellHandler from common.obdiag_exception import OBDIAGFormatException from common.constant import const -from common.command import LocalClient, SshClient +from common.command import SshClient from common.ob_log_level import OBLogLevel from handler.meta.ob_error import OB_RET_DICT from common.command import download_file, get_logfile_name_list, mkdir, delete_file -from common.tool import Util +from common.tool import Util, NetUtils from common.tool import DirectoryUtil from common.tool import FileUtil from common.tool import TimeUtils @@ -136,39 +138,62 @@ def handle(self): local_store_parent_dir = os.path.join(self.gather_pack_dir, "obdiag_analyze_pack_{0}".format(TimeUtils.timestamp_to_filename_time(TimeUtils.get_current_us_timestamp()))) self.stdio.verbose("Use {0} as pack dir.".format(local_store_parent_dir)) analyze_tuples = [] + # analyze_thread default thread nums is 3 + analyze_thread_nums = int(self.context.inner_config.get("analyze", {}).get("thread_nums") or 3) + pool_sema = threading.BoundedSemaphore(value=analyze_thread_nums) def handle_from_node(node): - resp, node_results = self.__handle_from_node(node, local_store_parent_dir) - analyze_tuples.append((node.get("ip"), False, resp["error"], node_results)) + with pool_sema: + resp, node_results = self.__handle_from_node(node, local_store_parent_dir) + analyze_tuples.append((node.get("ip"), False, resp["error"], node_results)) - if self.is_ssh: - for node in self.nodes: - handle_from_node(node) - else: - local_ip = '127.0.0.1' - node = self.nodes[0] - node["ip"] = local_ip - handle_from_node(node) + nodes_threads = [] + self.stdio.print("analyze nodes's log start. Please wait a moment...") + old_silent = self.stdio.silent + self.stdio.set_silent(True) + for node in self.nodes: + if not self.is_ssh: + local_ip = NetUtils.get_inner_ip() + node = self.nodes[0] + node["ip"] = local_ip + node_threads = threading.Thread(target=handle_from_node, args=(node,)) + node_threads.start() + nodes_threads.append(node_threads) + for node_thread in nodes_threads: + node_thread.join() + self.stdio.set_silent(old_silent) self.stdio.start_loading('analyze result start') title, field_names, summary_list, summary_details_list = self.__get_overall_summary(analyze_tuples, self.directly_analyze_files) + analyze_info_nodes = [] + for summary in summary_list: + analyze_info_node = {} + field_names_nu = 0 + for m in field_names: + analyze_info_node[m] = summary[field_names_nu] + field_names_nu += 1 + if field_names_nu == len(summary): + break + analyze_info_nodes.append(analyze_info_node) table = tabulate.tabulate(summary_list, headers=field_names, tablefmt="grid", showindex=False) - self.stdio.stop_loading('analyze result sucess') + self.stdio.stop_loading('analyze result success') self.stdio.print(title) self.stdio.print(table) - FileUtil.write_append(os.path.join(local_store_parent_dir, "result_details.txt"), title + str(table) + "\n\nDetails:\n\n") - + with open(os.path.join(local_store_parent_dir, "result_details.txt"), 'a', encoding='utf-8') as fileobj: + fileobj.write(u'{}'.format(title + str(table) + "\n\nDetails:\n\n")) + # build summary details + summary_details_list_data = [] for m in range(len(summary_details_list)): + summary_details_list_data_once = {} for n in range(len(field_names)): extend = "\n\n" if n == len(field_names) - 1 else "\n" - FileUtil.write_append(os.path.join(local_store_parent_dir, "result_details.txt"), field_names[n] + ": " + str(summary_details_list[m][n]) + extend) + with open(os.path.join(local_store_parent_dir, "result_details.txt"), 'a', encoding='utf-8') as fileobj: + fileobj.write(u'{}'.format(field_names[n] + ": " + str(summary_details_list[m][n]) + extend)) + summary_details_list_data_once[field_names[n]] = str(summary_details_list[m][n]) + summary_details_list_data.append(summary_details_list_data_once) last_info = "For more details, please run cmd \033[32m' cat {0} '\033[0m\n".format(os.path.join(local_store_parent_dir, "result_details.txt")) self.stdio.print(last_info) - # get info from local_store_parent_dir+/result_details.txt - analyze_info = "" - with open(os.path.join(local_store_parent_dir, "result_details.txt"), "r", encoding="utf-8") as f: - analyze_info = f.read() - return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": analyze_info}) + return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": analyze_info_nodes, "summary_details_list": summary_details_list_data, "store_dir": local_store_parent_dir}) def __handle_from_node(self, node, local_store_parent_dir): resp = {"skip": False, "error": ""} @@ -442,8 +467,8 @@ def __get_overall_summary(node_summary_tuples, is_files=False): ] ) if is_empty: - t.append([node, "\033[32mPASS\033[0m", None, None, None, None]) - t_details.append([node, "\033[32mPASS\033[0m", None, None, None, None, None, None, None, None, None]) + t.append([node, "PASS", None, None, None, None]) + t_details.append([node, "PASS", None, None, None, None, None, None, None, None, None]) title = "\nAnalyze OceanBase Offline Log Summary:\n" if is_files else "\nAnalyze OceanBase Online Log Summary:\n" t.sort(key=lambda x: (x[0], x[1], x[2], x[3]), reverse=False) t_details.sort(key=lambda x: (x[0], x[1], x[2], x[3]), reverse=False) diff --git a/handler/gather/gather_log.py b/handler/gather/gather_log.py index e48c752c..87b0c33c 100644 --- a/handler/gather/gather_log.py +++ b/handler/gather/gather_log.py @@ -19,6 +19,7 @@ import os import time import tabulate +import threading from handler.base_shell_handler import BaseShellHandler from common.obdiag_exception import OBDIAGFormatException from common.constant import const @@ -161,24 +162,34 @@ def handle(self): self.stdio.verbose('Use {0} as pack dir.'.format(pack_dir_this_command)) gather_tuples = [] - def handle_from_node(node): - st = time.time() - resp = self.__handle_from_node(pack_dir_this_command, node) - file_size = "" - if len(resp["error"]) == 0: - file_size = os.path.getsize(resp["gather_pack_path"]) - gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"])) + # gather_thread default thread nums is 3 + gather_thread_nums = int(self.context.inner_config.get("gather", {}).get("thread_nums") or 3) + pool_sema = threading.BoundedSemaphore(value=gather_thread_nums) - if self.is_ssh: - for node in self.nodes: - handle_from_node(node) - else: - local_ip = NetUtils.get_inner_ip() - node = self.nodes[0] - node["ip"] = local_ip - for node in self.nodes: - handle_from_node(node) + def handle_from_node(node): + with pool_sema: + st = time.time() + resp = self.__handle_from_node(pack_dir_this_command, node) + file_size = "" + if len(resp["error"]) == 0: + file_size = os.path.getsize(resp["gather_pack_path"]) + gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"])) + nodes_threads = [] + self.stdio.print("gather nodes's log start. Please wait a moment...") + old_silent = self.stdio.silent + self.stdio.set_silent(True) + for node in self.nodes: + if not self.is_ssh: + local_ip = NetUtils.get_inner_ip() + node = self.nodes[0] + node["ip"] = local_ip + node_threads = threading.Thread(target=handle_from_node, args=(node,)) + node_threads.start() + nodes_threads.append(node_threads) + for node_thread in nodes_threads: + node_thread.join() + self.stdio.set_silent(old_silent) summary_tuples = self.__get_overall_summary(gather_tuples, self.zip_encrypt) self.stdio.print(summary_tuples) self.pack_dir_this_command = pack_dir_this_command diff --git a/handler/gather/gather_obproxy_log.py b/handler/gather/gather_obproxy_log.py index e512518f..24b5b082 100644 --- a/handler/gather/gather_obproxy_log.py +++ b/handler/gather/gather_obproxy_log.py @@ -17,13 +17,14 @@ """ import datetime import os +import threading import time import tabulate from handler.base_shell_handler import BaseShellHandler from common.obdiag_exception import OBDIAGFormatException -from common.command import LocalClient, SshClient +from common.command import SshClient from common.constant import const from common.command import get_file_size, download_file, is_empty_dir, get_logfile_name_list, mkdir, delete_empty_file, rm_rf_file, zip_encrypt_dir, zip_dir from common.tool import Util @@ -153,24 +154,33 @@ def handle(self): self.stdio.verbose("Use {0} as pack dir.".format(pack_dir_this_command)) gather_tuples = [] - def handle_from_node(node): - st = time.time() - resp = self.__handle_from_node(node, pack_dir_this_command) - file_size = "" - if len(resp["error"]) == 0: - file_size = os.path.getsize(resp["gather_pack_path"]) - gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"])) + # gather_thread default thread nums is 3 + gather_thread_nums = int(self.context.inner_config.get("obdiag", {}).get("gather", {}).get("thread_nums") or 3) + pool_sema = threading.BoundedSemaphore(value=gather_thread_nums) - if self.is_ssh: - for node in self.nodes: - handle_from_node(node) - else: - local_ip = NetUtils.get_inner_ip() - node = self.nodes[0] - node["ip"] = local_ip - for node in self.nodes: - handle_from_node(node) + def handle_from_node(node): + with pool_sema: + st = time.time() + resp = self.__handle_from_node(node, pack_dir_this_command) + file_size = "" + if len(resp["error"]) == 0: + file_size = os.path.getsize(resp["gather_pack_path"]) + gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"])) + nodes_threads = [] + self.stdio.print("gather nodes's log start. Please wait a moment...") + self.stdio.set_silent(True) + for node in self.nodes: + if not self.is_ssh: + local_ip = NetUtils.get_inner_ip() + node = self.nodes[0] + node["ip"] = local_ip + node_threads = threading.Thread(target=handle_from_node, args=(node,)) + node_threads.start() + nodes_threads.append(node_threads) + for node_thread in nodes_threads: + node_thread.join() + self.stdio.set_silent(False) summary_tuples = self.__get_overall_summary(gather_tuples, self.zip_encrypt) self.stdio.print(summary_tuples) self.pack_dir_this_command = pack_dir_this_command