Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gather log & analyze log use threading #509

Merged
merged 19 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/ssh_client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_ip(self):
return self.client.get_ip()

def progress_bar(self, transferred, to_be_transferred, suffix=''):
if self.inner_config_manager.get("obdiag", {}).get("logger", {}).get("silent") or False:
if self.stdio.silent:
return
bar_len = 20
filled_len = int(round(bar_len * transferred / float(to_be_transferred)))
Expand Down
2 changes: 1 addition & 1 deletion common/ssh_client/remote_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def download(self, remote_path, local_path):
self._sftp_client.close()

def progress_bar(self, transferred, to_be_transferred, suffix=''):
if self.inner_config_manager.get("obdiag", {}).get("logger", {}).get("silent") or False:
if self.stdio.silent:
return
bar_len = 20
filled_len = int(round(bar_len * transferred / float(to_be_transferred)))
Expand Down
3 changes: 3 additions & 0 deletions conf/inner_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ obdiag:
silent: false
ssh_client:
remote_client_sudo: 0
analyze:
thread_nums: 3
check:
ignore_version: false
work_path: "~/.obdiag/check"
Expand All @@ -26,5 +28,6 @@ check:
gather:
scenes_base_path: "~/.obdiag/gather/tasks"
redact_processing_num: 3
thread_nums: 3
rca:
result_path: "./obdiag_rca/"
3 changes: 2 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
'remote_client_sudo': False,
},
},
'analyze': {"thread_nums": 3},
'check': {
'ignore_version': False,
'work_path': '~/.obdiag/check',
Expand All @@ -90,7 +91,7 @@
'package_file': '~/.obdiag/check/check_package.yaml',
'tasks_base_path': '~/.obdiag/check/tasks/',
},
'gather': {'scenes_base_path': '~/.obdiag/gather/tasks', 'redact_processing_num': 3},
'gather': {'scenes_base_path': '~/.obdiag/gather/tasks', 'redact_processing_num': 3, "thread_nums": 3},
'rca': {
'result_path': './obdiag_rca/',
},
Expand Down
71 changes: 48 additions & 23 deletions handler/analyzer/analyze_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
import datetime
import os
import re
import threading

import tabulate

from handler.base_shell_handler import BaseShellHandler
from common.obdiag_exception import OBDIAGFormatException
from common.constant import const
from common.command import LocalClient, SshClient
from common.command import SshClient
from common.ob_log_level import OBLogLevel
from handler.meta.ob_error import OB_RET_DICT
from common.command import download_file, get_logfile_name_list, mkdir, delete_file
from common.tool import Util
from common.tool import Util, NetUtils
from common.tool import DirectoryUtil
from common.tool import FileUtil
from common.tool import TimeUtils
Expand Down Expand Up @@ -136,39 +138,62 @@ def handle(self):
local_store_parent_dir = os.path.join(self.gather_pack_dir, "obdiag_analyze_pack_{0}".format(TimeUtils.timestamp_to_filename_time(TimeUtils.get_current_us_timestamp())))
self.stdio.verbose("Use {0} as pack dir.".format(local_store_parent_dir))
analyze_tuples = []
# analyze_thread default thread nums is 3
analyze_thread_nums = int(self.context.inner_config.get("analyze", {}).get("thread_nums") or 3)
pool_sema = threading.BoundedSemaphore(value=analyze_thread_nums)

def handle_from_node(node):
resp, node_results = self.__handle_from_node(node, local_store_parent_dir)
analyze_tuples.append((node.get("ip"), False, resp["error"], node_results))
with pool_sema:
resp, node_results = self.__handle_from_node(node, local_store_parent_dir)
analyze_tuples.append((node.get("ip"), False, resp["error"], node_results))

if self.is_ssh:
for node in self.nodes:
handle_from_node(node)
else:
local_ip = '127.0.0.1'
node = self.nodes[0]
node["ip"] = local_ip
handle_from_node(node)
nodes_threads = []
self.stdio.print("analyze nodes's log start. Please wait a moment...")
old_silent = self.stdio.silent
self.stdio.set_silent(True)
for node in self.nodes:
if not self.is_ssh:
local_ip = NetUtils.get_inner_ip()
node = self.nodes[0]
node["ip"] = local_ip
node_threads = threading.Thread(target=handle_from_node, args=(node,))
node_threads.start()
nodes_threads.append(node_threads)
for node_thread in nodes_threads:
node_thread.join()
self.stdio.set_silent(old_silent)

self.stdio.start_loading('analyze result start')
title, field_names, summary_list, summary_details_list = self.__get_overall_summary(analyze_tuples, self.directly_analyze_files)
analyze_info_nodes = []
for summary in summary_list:
analyze_info_node = {}
field_names_nu = 0
for m in field_names:
analyze_info_node[m] = summary[field_names_nu]
field_names_nu += 1
if field_names_nu == len(summary):
break
analyze_info_nodes.append(analyze_info_node)
table = tabulate.tabulate(summary_list, headers=field_names, tablefmt="grid", showindex=False)
self.stdio.stop_loading('analyze result sucess')
self.stdio.stop_loading('analyze result success')
self.stdio.print(title)
self.stdio.print(table)
FileUtil.write_append(os.path.join(local_store_parent_dir, "result_details.txt"), title + str(table) + "\n\nDetails:\n\n")

with open(os.path.join(local_store_parent_dir, "result_details.txt"), 'a', encoding='utf-8') as fileobj:
fileobj.write(u'{}'.format(title + str(table) + "\n\nDetails:\n\n"))
# build summary details
summary_details_list_data = []
for m in range(len(summary_details_list)):
summary_details_list_data_once = {}
for n in range(len(field_names)):
extend = "\n\n" if n == len(field_names) - 1 else "\n"
FileUtil.write_append(os.path.join(local_store_parent_dir, "result_details.txt"), field_names[n] + ": " + str(summary_details_list[m][n]) + extend)
with open(os.path.join(local_store_parent_dir, "result_details.txt"), 'a', encoding='utf-8') as fileobj:
fileobj.write(u'{}'.format(field_names[n] + ": " + str(summary_details_list[m][n]) + extend))
summary_details_list_data_once[field_names[n]] = str(summary_details_list[m][n])
summary_details_list_data.append(summary_details_list_data_once)
last_info = "For more details, please run cmd \033[32m' cat {0} '\033[0m\n".format(os.path.join(local_store_parent_dir, "result_details.txt"))
self.stdio.print(last_info)
# get info from local_store_parent_dir+/result_details.txt
analyze_info = ""
with open(os.path.join(local_store_parent_dir, "result_details.txt"), "r", encoding="utf-8") as f:
analyze_info = f.read()
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": analyze_info})
return ObdiagResult(ObdiagResult.SUCCESS_CODE, data={"result": analyze_info_nodes, "summary_details_list": summary_details_list_data, "store_dir": local_store_parent_dir})

def __handle_from_node(self, node, local_store_parent_dir):
resp = {"skip": False, "error": ""}
Expand Down Expand Up @@ -442,8 +467,8 @@ def __get_overall_summary(node_summary_tuples, is_files=False):
]
)
if is_empty:
t.append([node, "\033[32mPASS\033[0m", None, None, None, None])
t_details.append([node, "\033[32mPASS\033[0m", None, None, None, None, None, None, None, None, None])
t.append([node, "PASS", None, None, None, None])
t_details.append([node, "PASS", None, None, None, None, None, None, None, None, None])
title = "\nAnalyze OceanBase Offline Log Summary:\n" if is_files else "\nAnalyze OceanBase Online Log Summary:\n"
t.sort(key=lambda x: (x[0], x[1], x[2], x[3]), reverse=False)
t_details.sort(key=lambda x: (x[0], x[1], x[2], x[3]), reverse=False)
Expand Down
43 changes: 27 additions & 16 deletions handler/gather/gather_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import time
import tabulate
import threading
from handler.base_shell_handler import BaseShellHandler
from common.obdiag_exception import OBDIAGFormatException
from common.constant import const
Expand Down Expand Up @@ -161,24 +162,34 @@ def handle(self):
self.stdio.verbose('Use {0} as pack dir.'.format(pack_dir_this_command))
gather_tuples = []

def handle_from_node(node):
st = time.time()
resp = self.__handle_from_node(pack_dir_this_command, node)
file_size = ""
if len(resp["error"]) == 0:
file_size = os.path.getsize(resp["gather_pack_path"])
gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"]))
# gather_thread default thread nums is 3
gather_thread_nums = int(self.context.inner_config.get("gather", {}).get("thread_nums") or 3)
pool_sema = threading.BoundedSemaphore(value=gather_thread_nums)

if self.is_ssh:
for node in self.nodes:
handle_from_node(node)
else:
local_ip = NetUtils.get_inner_ip()
node = self.nodes[0]
node["ip"] = local_ip
for node in self.nodes:
handle_from_node(node)
def handle_from_node(node):
with pool_sema:
st = time.time()
resp = self.__handle_from_node(pack_dir_this_command, node)
file_size = ""
if len(resp["error"]) == 0:
file_size = os.path.getsize(resp["gather_pack_path"])
gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"]))

nodes_threads = []
self.stdio.print("gather nodes's log start. Please wait a moment...")
old_silent = self.stdio.silent
self.stdio.set_silent(True)
for node in self.nodes:
if not self.is_ssh:
local_ip = NetUtils.get_inner_ip()
node = self.nodes[0]
node["ip"] = local_ip
node_threads = threading.Thread(target=handle_from_node, args=(node,))
node_threads.start()
nodes_threads.append(node_threads)
for node_thread in nodes_threads:
node_thread.join()
self.stdio.set_silent(old_silent)
summary_tuples = self.__get_overall_summary(gather_tuples, self.zip_encrypt)
self.stdio.print(summary_tuples)
self.pack_dir_this_command = pack_dir_this_command
Expand Down
44 changes: 27 additions & 17 deletions handler/gather/gather_obproxy_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
"""
import datetime
import os
import threading
import time

import tabulate

from handler.base_shell_handler import BaseShellHandler
from common.obdiag_exception import OBDIAGFormatException
from common.command import LocalClient, SshClient
from common.command import SshClient
from common.constant import const
from common.command import get_file_size, download_file, is_empty_dir, get_logfile_name_list, mkdir, delete_empty_file, rm_rf_file, zip_encrypt_dir, zip_dir
from common.tool import Util
Expand Down Expand Up @@ -153,24 +154,33 @@ def handle(self):
self.stdio.verbose("Use {0} as pack dir.".format(pack_dir_this_command))
gather_tuples = []

def handle_from_node(node):
st = time.time()
resp = self.__handle_from_node(node, pack_dir_this_command)
file_size = ""
if len(resp["error"]) == 0:
file_size = os.path.getsize(resp["gather_pack_path"])
gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"]))
# gather_thread default thread nums is 3
gather_thread_nums = int(self.context.inner_config.get("obdiag", {}).get("gather", {}).get("thread_nums") or 3)
pool_sema = threading.BoundedSemaphore(value=gather_thread_nums)

if self.is_ssh:
for node in self.nodes:
handle_from_node(node)
else:
local_ip = NetUtils.get_inner_ip()
node = self.nodes[0]
node["ip"] = local_ip
for node in self.nodes:
handle_from_node(node)
def handle_from_node(node):
with pool_sema:
st = time.time()
resp = self.__handle_from_node(node, pack_dir_this_command)
file_size = ""
if len(resp["error"]) == 0:
file_size = os.path.getsize(resp["gather_pack_path"])
gather_tuples.append((node.get("ip"), False, resp["error"], file_size, resp["zip_password"], int(time.time() - st), resp["gather_pack_path"]))

nodes_threads = []
self.stdio.print("gather nodes's log start. Please wait a moment...")
self.stdio.set_silent(True)
for node in self.nodes:
if not self.is_ssh:
local_ip = NetUtils.get_inner_ip()
node = self.nodes[0]
node["ip"] = local_ip
node_threads = threading.Thread(target=handle_from_node, args=(node,))
node_threads.start()
nodes_threads.append(node_threads)
for node_thread in nodes_threads:
node_thread.join()
self.stdio.set_silent(False)
summary_tuples = self.__get_overall_summary(gather_tuples, self.zip_encrypt)
self.stdio.print(summary_tuples)
self.pack_dir_this_command = pack_dir_this_command
Expand Down
Loading