diff --git a/config.py b/config.py index 2c5acd6..199c9db 100644 --- a/config.py +++ b/config.py @@ -3,7 +3,48 @@ import powerSaver +# Debugging on or off +DEBUG = False +# DEBUG = True +# Shall we call commands using sudo? +USE_SUDO = False + +# What init system are we running +# Valid options: OpenRC +# Planned options: systemd +INIT_SYSTEM = "OpenRC" + +# How often to query process and service status, and update menu +DEFAULT_REFRESH_RATE = 5 + +# How often to query power data while discharging or charging +DEFAULT_POWER_SAMPLING_RATE = 5 + +# The maximum value the previous ones can be set during runtime +REFRESH_MAXIMUM = 15 + +# The path pointing to your battery information +SYS_CLASS_BATTERY_PATH = "/sys/class/power_supply/BAT0" + +# Thresholds for battery percentage color +# Red if below [0] +# Yellow if below [1] +# Green if below [2] +# Cyan if below [2] but below 100.0 +BATTERY_COLOR_LEVELS = [15.0, 60.0, 95.0] + +# Thresholds for battery charge / discharge colors +# Cyan up to [0] +# Green up to [1] +# Yellow up to [2] +# Red up to [3] +# Magenta above [3] +POWER_COLOR_LEVELS = [5.0, 8.0, 14.0, 20.0] + + +# A list of processes to monitor and send STOP and CONT signals to +# # noinspection SpellCheckingInspection processes: List[Dict[str, Union[str, List[str], powerSaver.ProcessStatus]]] = [ {'title': "Chrome", @@ -31,6 +72,8 @@ 'name': ["zenmonitor"]}, ] +# A list of services to monitor and switch on/off +# # noinspection SpellCheckingInspection services: List[Dict[str, Union[str, List[str], powerSaver.ServiceStatus]]] = [ {'title': "Bluetooth", @@ -52,6 +95,8 @@ 'needs-modules': ['cdc_ether', 'r8152']}, ] +# A list of modules to monitor and load/unload +# # noinspection SpellCheckingInspection modules: List[Dict[str, Union[str, List[str], powerSaver.ModuleStatus]]] = [ {'title': "Bluetooth", diff --git a/powerSaver.py b/powerSaver.py index dc9f066..6db52b1 100755 --- a/powerSaver.py +++ b/powerSaver.py @@ -14,56 +14,59 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import concurrent.futures import curses +import math import select import sys from datetime import datetime, timedelta -from typing import Tuple +from typing import Tuple, List, Dict, Union, Optional import powerSaver -from config import processes, services, modules -version = "1.1.0" + +application_name = "powerSaver" +version = "1.2.0" def process_color(status: powerSaver.ProcessStatus) -> Tuple[int, int]: if status == powerSaver.ProcessStatus.RUNNING: - return 3, curses.A_NORMAL + return 3, curses.A_NORMAL # Green if status == powerSaver.ProcessStatus.STOPPED: - return 5, curses.A_NORMAL + return 5, curses.A_NORMAL # Red if status == powerSaver.ProcessStatus.NO_PROC: - return 2, curses.A_DIM + return 2, curses.A_DIM # Gray if status == powerSaver.ProcessStatus.MANY: - return 4, curses.A_NORMAL - return 17, curses.A_NORMAL + return 4, curses.A_NORMAL # Yellow + return 17, curses.A_NORMAL # Black on Red def service_color(status: powerSaver.ServiceStatus) -> Tuple[int, int]: if status == powerSaver.ServiceStatus.RUNNING: - return 3, curses.A_NORMAL + return 3, curses.A_NORMAL # Green if status == powerSaver.ServiceStatus.STOPPED: - return 5, curses.A_NORMAL + return 5, curses.A_NORMAL # Red if status == powerSaver.ServiceStatus.TOGGLED: - return 4, curses.A_NORMAL + return 4, curses.A_NORMAL # Yellow if status == powerSaver.ServiceStatus.NOT_FOUND: - return 2, curses.A_DIM + return 2, curses.A_DIM # Gray if status == powerSaver.ServiceStatus.CRASHED: - return 8, curses.A_NORMAL + return 8, curses.A_NORMAL # Magenta if status == powerSaver.ServiceStatus.NO_MODULES: - return 7, curses.A_NORMAL - return 17, curses.A_NORMAL + return 7, curses.A_NORMAL # Blue + return 17, curses.A_NORMAL # Black on Red def module_color(status: powerSaver.ModuleStatus) -> Tuple[int, int]: if status == powerSaver.ModuleStatus.LOADED: - return 3, curses.A_NORMAL + return 3, curses.A_NORMAL # Green if status == powerSaver.ModuleStatus.USED: - return 6, curses.A_NORMAL + return 6, curses.A_NORMAL # Cyan if status == powerSaver.ModuleStatus.NOT_LOADED: - return 5, curses.A_NORMAL + return 5, curses.A_NORMAL # Red if status == powerSaver.ModuleStatus.PARTIAL: - return 4, curses.A_NORMAL - return 17, curses.A_NORMAL + return 4, curses.A_NORMAL # Yellow + return 17, curses.A_NORMAL # Black on Red def service_status_to_module_status(status: powerSaver.ServiceStatus) -> powerSaver.ModuleStatus: @@ -95,232 +98,322 @@ def color_offset(check: bool) -> int: return 0 +def get_processes_length(processes: List[Dict[str, Union[str, List[str], powerSaver.ProcessStatus]]]) -> int: + length = 0 + for p in processes: + if 'status' in p and p['status'] == powerSaver.ProcessStatus.NO_PROC: + continue + length += 1 + return length + + +def battery_percent_color(battery_percent: float) -> int: + from config import BATTERY_COLOR_LEVELS + low, mid, high = BATTERY_COLOR_LEVELS + if battery_percent < low: + return curses.color_pair(5) # Red + elif battery_percent < mid: + return curses.color_pair(4) # Yellow + elif battery_percent < high: + return curses.color_pair(3) # Green + elif battery_percent < 100.0: + return curses.color_pair(6) # Cyan + else: + return curses.color_pair(1) # White + + +def power_use_color(battery_watts: float) -> int: + from config import POWER_COLOR_LEVELS + very_low, low, mid, high = POWER_COLOR_LEVELS + if battery_watts > high: + return curses.color_pair(8) # Magenta + elif battery_watts > mid: + return curses.color_pair(5) # Red + elif battery_watts > low: + return curses.color_pair(4) # Yellow + elif battery_watts > very_low: + return curses.color_pair(3) # Green + else: + return curses.color_pair(6) # Cyan + + +def calculate_menu_thread(processes: List[Dict[str, Union[str, List[str], powerSaver.ProcessStatus]]], + services: List[Dict[str, Union[str, List[str], powerSaver.ServiceStatus]]], + modules: List[Dict[str, Union[str, List[str], powerSaver.ModuleStatus]]], + process_manager: powerSaver.ProcessManager, + service_manager: powerSaver.ServiceManager, + module_manager: powerSaver.ModuleManager, + refresh: int, + title: str, + last_update_display: datetime, + ) -> Tuple[List[Dict[str, Union[str, List[str], powerSaver.ProcessStatus]]], + List[Dict[str, Union[str, List[str], powerSaver.ProcessStatus]]], + List[Dict[str, Union[str, List[str], powerSaver.ServiceStatus]]], + List[Dict[str, Union[str, List[str], powerSaver.ModuleStatus]]], + int]: + now = datetime.now() + max_len = len(title) + for y, p in enumerate(processes): + max_len = max(len(p["title"]), max_len) + p_status = set() + for proc in p["name"]: + if "cmdline" in p: + proc_status = process_manager.get_process_status(proc, p["cmdline"]) + else: + proc_status = process_manager.get_process_status(proc) + for p_s in proc_status: + p_status.add(p_s) + if powerSaver.ProcessStatus.ERROR in p_status: + p["status"] = powerSaver.ProcessStatus.ERROR + elif len(p_status) == 0: + p["status"] = powerSaver.ProcessStatus.NO_PROC + elif len(p_status) > 1: + p["status"] = powerSaver.ProcessStatus.MANY + else: + p["status"] = p_status.pop() + + active_processes = [] + for p in processes: + if p["status"] != powerSaver.ProcessStatus.NO_PROC: + active_processes.append(p) + + for y, s in enumerate(services): + max_len = max(len(s["title"]), max_len) + if "status" not in s or last_update_display > now - timedelta(seconds=refresh): + s["status"] = service_manager.get_status(s["name"]) + if "needs-modules" in s: + for mod in s["needs-modules"]: + if module_manager.get_module_status(mod) not in [powerSaver.ModuleStatus.LOADED, + powerSaver.ModuleStatus.USED]: + s["status"] = powerSaver.ServiceStatus.NO_MODULES + + for y, m in enumerate(modules): + max_len = max(len(m["title"]), max_len) + if "status" not in m or last_update_display > now - timedelta(seconds=refresh): + status = [] + if "usage-modules" in m and "service" not in m: + for mod in m["usage-modules"]: + status.append(module_manager.get_module_status(mod)) + elif "service" in m: + status.append(service_status_to_module_status(service_manager.get_status(m['service']))) + if status[0] == powerSaver.ModuleStatus.NEEDS_CHECK: + status.clear() + for mod in m["usage-modules"]: + status.append(module_manager.get_module_status(mod)) + if powerSaver.ModuleStatus.LOADED in status or powerSaver.ModuleStatus.USED in status: + if powerSaver.ModuleStatus.NOT_LOADED in status: + m['status'] = powerSaver.ModuleStatus.PARTIAL + else: + if powerSaver.ModuleStatus.USED in status: + m['status'] = powerSaver.ModuleStatus.USED + else: + m['status'] = powerSaver.ModuleStatus.LOADED + else: + m['status'] = powerSaver.ModuleStatus.NOT_LOADED + return processes, active_processes, services, modules, max_len + + +def execute_service_thread(service_manager: powerSaver.ServiceManager, name: str, stop: bool) -> None: + if stop: + service_manager.stop_service(name) + else: + service_manager.start_service(name) + + +def cleanup_executive_futures(in_futures: List[concurrent.futures.Future]) -> List[concurrent.futures.Future]: + output = [] + for f in in_futures: + if not f.done(): + output.append(f) + return output + + def draw_menu(std_screen: curses.window): + from config import processes, services, modules, DEBUG, DEFAULT_POWER_SAMPLING_RATE, \ + DEFAULT_REFRESH_RATE, USE_SUDO, REFRESH_MAXIMUM, SYS_CLASS_BATTERY_PATH, INIT_SYSTEM + poll_object = select.poll() poll_object.register(sys.stdin, select.POLLIN) k = 0 - # cursor_x = 0 - cursor_y = 0 - refresh = 5 - last_update = datetime.now() - timedelta(seconds=refresh) + cursor_y = 0 + refresh = DEFAULT_REFRESH_RATE + power_sampling_rate = DEFAULT_POWER_SAMPLING_RATE + + effective_power_sampling_rate = power_sampling_rate + + last_update_display = datetime.now() - timedelta(seconds=refresh*2) + last_update_power = datetime.now() - timedelta(seconds=power_sampling_rate*2) std_screen.clear() std_screen.refresh() std_screen.nodelay(True) curses.curs_set(0) - # Colors - if not curses.has_colors(): - Exception("No colors") - curses.start_color() - if curses.can_change_color(): - max_col_num = max([curses.COLOR_WHITE, curses.COLOR_CYAN, curses.COLOR_BLACK, curses.COLOR_MAGENTA, - curses.COLOR_BLUE, curses.COLOR_RED, curses.COLOR_GREEN, curses.COLOR_YELLOW]) - color_gray = max_col_num + 1 - if color_gray < curses.COLORS: - curses.init_color(color_gray, 128, 128, 128) - curses.init_pair(2, color_gray, curses.COLOR_BLACK) - curses.init_pair(10, color_gray, curses.COLOR_WHITE) + with concurrent.futures.ProcessPoolExecutor(max_workers=3) as process_pool: + futures_type = Optional[concurrent.futures.Future] + update_menu_structure_future: futures_type = None + execute_futures: List[concurrent.futures.Future] = [] + + # Colors + if not curses.has_colors(): + Exception("No colors") + curses.start_color() + if curses.can_change_color(): + max_col_num = max([curses.COLOR_WHITE, curses.COLOR_CYAN, curses.COLOR_BLACK, curses.COLOR_MAGENTA, + curses.COLOR_BLUE, curses.COLOR_RED, curses.COLOR_GREEN, curses.COLOR_YELLOW]) + color_gray = max_col_num + 1 + if color_gray < curses.COLORS: + curses.init_color(color_gray, 128, 128, 128) + curses.init_pair(2, color_gray, curses.COLOR_BLACK) + curses.init_pair(10, color_gray, curses.COLOR_WHITE) + else: + curses.init_pair(2, curses.COLOR_WHITE, curses.COLOR_BLACK) + curses.init_pair(10, curses.COLOR_BLACK, curses.COLOR_WHITE) else: curses.init_pair(2, curses.COLOR_WHITE, curses.COLOR_BLACK) curses.init_pair(10, curses.COLOR_BLACK, curses.COLOR_WHITE) - else: - curses.init_pair(2, curses.COLOR_WHITE, curses.COLOR_BLACK) - curses.init_pair(10, curses.COLOR_BLACK, curses.COLOR_WHITE) - - curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK) - curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) - curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) - curses.init_pair(5, curses.COLOR_RED, curses.COLOR_BLACK) - curses.init_pair(6, curses.COLOR_CYAN, curses.COLOR_BLACK) - curses.init_pair(7, curses.COLOR_BLUE, curses.COLOR_BLACK) - curses.init_pair(8, curses.COLOR_MAGENTA, curses.COLOR_BLACK) - - curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_WHITE) - curses.init_pair(11, curses.COLOR_GREEN, curses.COLOR_WHITE) - curses.init_pair(12, curses.COLOR_YELLOW, curses.COLOR_WHITE) - curses.init_pair(13, curses.COLOR_RED, curses.COLOR_WHITE) - curses.init_pair(14, curses.COLOR_CYAN, curses.COLOR_WHITE) - curses.init_pair(15, curses.COLOR_BLUE, curses.COLOR_WHITE) - curses.init_pair(16, curses.COLOR_MAGENTA, curses.COLOR_WHITE) - - curses.init_pair(17, curses.COLOR_BLACK, curses.COLOR_RED) - curses.init_pair(17+8, curses.COLOR_BLUE, curses.COLOR_RED) - - process_manager = powerSaver.ProcessManager(False) - service_manager = powerSaver.ServiceManager("sysvinit", False) - module_manager = powerSaver.ModuleManager(False) - power_stats = powerSaver.PowerStats(refresh) - - first_run = True - while k != ord('q'): - height, width = std_screen.getmaxyx() - now = datetime.now() - - toggle = False - skip = False - - if k == curses.KEY_DOWN: - cursor_y = cursor_y + 1 - elif k == curses.KEY_UP: - cursor_y = cursor_y - 1 - elif k == ord('+'): - refresh += 1 - elif k == ord('-'): - refresh -= 1 - elif k in [curses.KEY_ENTER, ord('\n'), ord(' '), ord('\r')]: - toggle = True - elif not first_run: - skip = True - - first_run = False - - # Update caches - if last_update + timedelta(seconds=refresh) < now: - last_update = now - process_manager.update_processes_information() - module_manager.update_modules_list() - power_stats.refresh_status() - - skip = False - # elif k == curses.KEY_RIGHT: - # cursor_x = cursor_x + 1 - # elif k == curses.KEY_LEFT: - # cursor_x = cursor_x - 1 - - # cursor_x = max(0, cursor_x) - # cursor_x = min(width - 1, cursor_x) - - if not skip: - cursor_y = max(0, cursor_y) - cursor_y = min(min(height - 1, len(processes) + len(services) + len(modules) - 1), cursor_y) - refresh = max(0, min(refresh, 15)) - - # Strings - title = f"powerSaver v{version}"[:width-1] - status_msg = f"refresh rate -({refresh}s)+ | cursor: {cursor_y}"[:width-1] - - # Power Stats - battery_status, battery_percent, battery_watts, battery_h, battery_m = power_stats.get_current_stats() - power_status_msg = f"Battery: {battery_percent:5.1f}% {battery_status.value}" - if battery_watts > 0.0 and battery_status in [powerSaver.BatteryStatus.CHARGING, - powerSaver.BatteryStatus.DISCHARGING]: - power_status_msg += f" | {battery_watts:5.2f}W ({battery_h:2d}:{battery_m:02d}) " - if battery_status == powerSaver.BatteryStatus.DISCHARGING: - power_load = power_stats.get_power_load() - if power_load is not None: - battery_w_1min, battery_w_5min, battery_w_15min = power_load - power_status_msg_len = len(power_status_msg) + 3 - power_status_msg += f" | {battery_w_1min:5.2f}W {battery_w_5min:5.2f}W {battery_w_15min:5.2f}W" - status_msg_space = " " * (power_status_msg_len - len(status_msg)) - time_est = power_stats.get_time_estimate_h_min() - if time_est is not None: - h_1min, m_1min, h_5min, m_5min, h_15min, m_15min = time_est - status_msg += f"{status_msg_space}" \ - f"{h_1min:02d}:{m_1min:02d} " \ - f"{h_5min:02d}:{m_5min:02d} " \ - f"{h_15min:02d}:{m_15min:02d}" + curses.init_pair(1, curses.COLOR_WHITE, curses.COLOR_BLACK) + curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) + curses.init_pair(4, curses.COLOR_YELLOW, curses.COLOR_BLACK) + curses.init_pair(5, curses.COLOR_RED, curses.COLOR_BLACK) + curses.init_pair(6, curses.COLOR_CYAN, curses.COLOR_BLACK) + curses.init_pair(7, curses.COLOR_BLUE, curses.COLOR_BLACK) + curses.init_pair(8, curses.COLOR_MAGENTA, curses.COLOR_BLACK) + + curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_WHITE) + curses.init_pair(11, curses.COLOR_GREEN, curses.COLOR_WHITE) + curses.init_pair(12, curses.COLOR_YELLOW, curses.COLOR_WHITE) + curses.init_pair(13, curses.COLOR_RED, curses.COLOR_WHITE) + curses.init_pair(14, curses.COLOR_CYAN, curses.COLOR_WHITE) + curses.init_pair(15, curses.COLOR_BLUE, curses.COLOR_WHITE) + curses.init_pair(16, curses.COLOR_MAGENTA, curses.COLOR_WHITE) + + curses.init_pair(17, curses.COLOR_BLACK, curses.COLOR_RED) + curses.init_pair(17+8, curses.COLOR_BLUE, curses.COLOR_RED) + + process_manager = powerSaver.ProcessManager(USE_SUDO) + service_manager = powerSaver.ServiceManager(INIT_SYSTEM, USE_SUDO, DEBUG) + module_manager = powerSaver.ModuleManager(USE_SUDO) + power_stats = powerSaver.PowerStats(refresh, SYS_CLASS_BATTERY_PATH) + + height, width = std_screen.getmaxyx() + title = f"{application_name} v{version}" + active_processes: List[Dict[str, Union[str, List[str], powerSaver.ProcessStatus]]] = [] max_len = len(title) - no_proc = 0 - for y, p in enumerate(processes): - p["display_title"] = p["title"][:width-1] - max_len = max(len(p["display_title"]), max_len) - p_status = set() - for proc in p["name"]: - if "cmdline" in p: - proc_status = process_manager.get_process_status(proc, p["cmdline"]) - else: - proc_status = process_manager.get_process_status(proc) - for p_s in proc_status: - p_status.add(p_s) - if len(p_status) > 0: - no_proc = 0 - if powerSaver.ProcessStatus.ERROR in p_status: - p["status"] = powerSaver.ProcessStatus.ERROR - elif len(p_status) == 0: - no_proc += 1 - p["status"] = powerSaver.ProcessStatus.NO_PROC - if cursor_y == y: - if k == curses.KEY_UP: - cursor_y -= no_proc - else: - cursor_y += 1 - elif len(p_status) > 1: - p["status"] = powerSaver.ProcessStatus.MANY - else: - p["status"] = p_status.pop() - not_found = 0 - for y, s in enumerate(services): - s["display_title"] = s["title"][:width-1] - max_len = max(len(s["display_title"]), max_len) - if "status" not in s or last_update > now - timedelta(seconds=refresh): - s["status"] = service_manager.get_status(s["name"]) - if "needs-modules" in s: - for mod in s["needs-modules"]: - if module_manager.get_module_status(mod) not in [powerSaver.ModuleStatus.LOADED, - powerSaver.ModuleStatus.USED]: - s["status"] = powerSaver.ServiceStatus.NO_MODULES - if s["status"] in [powerSaver.ServiceStatus.NOT_FOUND, powerSaver.ServiceStatus.NO_MODULES]: - not_found += 1 - if cursor_y - len(processes) == y: - if k == curses.KEY_UP: - cursor_y -= not_found - else: - cursor_y += 1 - else: + + first_loop = True + while k != ord('q'): + now = datetime.now() + + execute_futures = cleanup_executive_futures(execute_futures) + + if update_menu_structure_future is not None and update_menu_structure_future.done(): + processes, active_processes, services, modules, max_len = update_menu_structure_future.result() + + toggle = False + skip_render_menu = True + skip_calculate_menu = True + skip_render_power = True + + if k == curses.KEY_DOWN: + cursor_y = cursor_y + 1 + elif k == curses.KEY_UP: + cursor_y = cursor_y - 1 + elif k == ord('+'): + refresh += 1 + elif k == ord('-'): + refresh -= 1 + elif k == ord('.'): + power_sampling_rate += 1 + if effective_power_sampling_rate == power_sampling_rate: + effective_power_sampling_rate += 1 + elif k == ord(','): + power_sampling_rate -= 1 + if effective_power_sampling_rate == power_sampling_rate: + effective_power_sampling_rate -= 1 + elif k in [curses.KEY_ENTER, ord('\n'), ord(' '), ord('\r')]: + toggle = True + + # Update caches + if last_update_display + timedelta(seconds=refresh) < now: + last_update_display = now + process_manager.update_processes_information() + module_manager.update_modules_list() + skip_render_menu = False + skip_calculate_menu = False + elif k >= 0: + skip_render_menu = False + + if last_update_power + timedelta(seconds=effective_power_sampling_rate) < now: + last_update_power = now + power_stats.refresh_status() + skip_render_power = False + elif k != 0: + skip_render_power = False + + if first_loop: + skip_render_menu = False + skip_render_power = False + skip_calculate_menu = False + + if k > 0: + refresh = max(1, min(refresh, REFRESH_MAXIMUM)) + power_sampling_rate = max(1, min(power_sampling_rate, REFRESH_MAXIMUM)) + skip_render_menu = False + + if not skip_calculate_menu: + height, width = std_screen.getmaxyx() + + # Strings + update_menu_structure_future = process_pool.submit(calculate_menu_thread, + processes, services, modules, + process_manager, service_manager, module_manager, + refresh, title, last_update_display) + + if first_loop: + processes, active_processes, services, modules, max_len = update_menu_structure_future.result() not_found = 0 - for y, m in enumerate(modules): - m["display_title"] = m["title"][:width-1] - max_len = max(len(m["display_title"]), max_len) - if "status" not in m or last_update > now - timedelta(seconds=refresh): - status = [] - if "usage-modules" in m and "service" not in m: - for mod in m["usage-modules"]: - status.append(module_manager.get_module_status(mod)) - elif "service" in m: - status.append(service_status_to_module_status(service_manager.get_status(m['service']))) - if status[0] == powerSaver.ModuleStatus.NEEDS_CHECK: - status.clear() - for mod in m["usage-modules"]: - status.append(module_manager.get_module_status(mod)) - if powerSaver.ModuleStatus.LOADED in status or powerSaver.ModuleStatus.USED in status: - if powerSaver.ModuleStatus.NOT_LOADED in status: - m['status'] = powerSaver.ModuleStatus.PARTIAL + for y, s in enumerate(services): + if s["status"] in [powerSaver.ServiceStatus.NOT_FOUND, powerSaver.ServiceStatus.NO_MODULES]: + not_found += 1 + if cursor_y - len(active_processes) == y: + if k == curses.KEY_UP: + cursor_y -= not_found + else: + cursor_y += 1 else: - if powerSaver.ModuleStatus.USED in status: - m['status'] = powerSaver.ModuleStatus.USED - else: - m['status'] = powerSaver.ModuleStatus.LOADED - else: - m['status'] = powerSaver.ModuleStatus.NOT_LOADED + not_found = 0 + + if k > 0: + cursor_y = min(len(active_processes) + len(services) + len(modules) - 1, max(0, cursor_y)) - if not skip: # Execute action error_msg = "" if toggle: - if cursor_y < len(processes): - status = processes[cursor_y]["status"] - for name in processes[cursor_y]["name"]: + + if cursor_y < len(active_processes): # Processes + status = active_processes[cursor_y]["status"] + for name in active_processes[cursor_y]["name"]: cmdline_filter = None - if "cmdline" in processes[cursor_y]: - cmdline_filter = processes[cursor_y]["cmdline"] + if "cmdline" in active_processes[cursor_y]: + cmdline_filter = active_processes[cursor_y]["cmdline"] if status in [powerSaver.ProcessStatus.STOPPED, powerSaver.ProcessStatus.MANY]: process_manager.signal_processes(name, cmdline_filter, False) elif status == powerSaver.ProcessStatus.RUNNING: process_manager.signal_processes(name, cmdline_filter, True) - elif cursor_y - len(processes) < len(services): # Services - cursor = cursor_y - len(processes) + elif cursor_y - len(active_processes) < len(services): # Services + cursor = cursor_y - len(active_processes) status = services[cursor]["status"] if status in [powerSaver.ServiceStatus.STOPPED, powerSaver.ServiceStatus.CRASHED]: - service_manager.start_service(services[cursor]["name"]) - services[cursor]["status"] = powerSaver.ServiceStatus.TOGGLED + future = process_pool.submit(execute_service_thread, service_manager, services[cursor]["name"], False) + execute_futures.append(future) elif status == powerSaver.ServiceStatus.RUNNING: - service_manager.stop_service(services[cursor]["name"]) - services[cursor]["status"] = powerSaver.ServiceStatus.TOGGLED - elif cursor_y - len(processes) - len(services) < len(modules): # Modules - cursor = cursor_y - len(processes) - len(services) + future = process_pool.submit(execute_service_thread, service_manager, services[cursor]["name"], True) + execute_futures.append(future) + elif cursor_y - len(active_processes) - len(services) < len(modules): # Modules + cursor = cursor_y - len(active_processes) - len(services) status = modules[cursor]["status"] if status in [powerSaver.ModuleStatus.LOADED, powerSaver.ModuleStatus.PARTIAL]: for module in reversed(modules[cursor]["modules"]): @@ -333,61 +426,147 @@ def draw_menu(std_screen: curses.window): for module in modules[cursor]["modules"]: module_manager.load_module(module) - std_screen.clear() + # now = datetime.now() + # sleep_length_display = last_update_display + timedelta(seconds=refresh) - now + # sleep_length_power = last_update_power + timedelta(seconds=power_sampling_rate) - now + # sleep_length = min(sleep_length_power, sleep_length_display) + # sleep_length = math.floor(max(0.0, sleep_length.total_seconds()) * 1000) + # error_msg += f" D={sleep_length_display} P={sleep_length_power} Total={sleep_length/1000.0}" - # Draw Title - std_screen.attron(curses.A_BOLD) - std_screen.addstr(0, 0, title) + if (not skip_render_menu) or (not skip_render_power): + std_screen.clear() - # Divider - std_screen.addstr(1, 0, "-" * min(width - 1, max_len)) - std_screen.attroff(curses.A_BOLD) - - n = 2 - y_offset = 0 - - # Processes - if len(processes) > 0: - for y, p in enumerate(processes): - offset = color_offset(cursor_y == y) - menu_entry(std_screen, y + n, p["display_title"], process_color(p["status"]), offset) - n += len(processes) + # Draw Title + std_screen.attron(curses.A_BOLD) + std_screen.addstr(0, 0, title[:width - 1]) # Divider - std_screen.addstr(n, 0, "-" * min(width - 1, max_len)) - n += 1 - - # Services - if len(services) > 0: - y_offset = len(processes) - for y, s in enumerate(services): - offset = color_offset(cursor_y == y + y_offset) - menu_entry(std_screen, y + n, s["display_title"], service_color(s["status"]), offset) - n += len(services) - - # Divider - std_screen.addstr(n, 0, "-" * min(width - 1, max_len)) - n += 1 - - # Modules - if len(modules) > 0: - y_offset += len(services) - for y, m in enumerate(modules): - offset = color_offset(cursor_y == y + y_offset) - menu_entry(std_screen, y + n, m["display_title"], module_color(m["status"]), offset) - - # Status - if len(error_msg) > 0: - status_msg = f"{status_msg} | {error_msg}"[:width-1] - std_screen.addstr(height - 1, 0, status_msg) - std_screen.addstr(height - 2, 0, power_status_msg) - - # Refresh the screen - std_screen.refresh() - - # Wait for next input - poll_object.poll(refresh * 1000) - k = std_screen.getch() + std_screen.addstr(1, 0, "-" * min(width - 1, max_len)) + std_screen.attroff(curses.A_BOLD) + + n = 2 + y_offset = 0 + + # Processes + if len(active_processes) > 0: + for y, p in enumerate(active_processes): + offset = color_offset(cursor_y == y) + menu_entry(std_screen, y + n, p["title"][:width-1], process_color(p["status"]), offset) + n += len(active_processes) + + # Divider + std_screen.addstr(n, 0, "-" * min(width - 1, max_len)) + n += 1 + + # Services + if len(services) > 0: + y_offset = len(active_processes) + for y, s in enumerate(services): + offset = color_offset(cursor_y == y + y_offset) + menu_entry(std_screen, y + n, s["title"][:width-1], service_color(s["status"]), offset) + n += len(services) + + # Divider + std_screen.addstr(n, 0, "-" * min(width - 1, max_len)) + n += 1 + + # Modules + if len(modules) > 0: + y_offset += len(services) + for y, m in enumerate(modules): + offset = color_offset(cursor_y == y + y_offset) + menu_entry(std_screen, y + n, m["title"][:width-1], module_color(m["status"]), offset) + + # Status + battery_status, battery_percent, battery_watts, battery_h, battery_m = power_stats.get_current_stats() + + if battery_status == powerSaver.BatteryStatus.DISCHARGING: + effective_power_sampling_rate = power_sampling_rate + else: + effective_power_sampling_rate = refresh + + status_msg = powerSaver.FormattedMessage() + status_msg += [("Q", curses.A_BOLD), + ("uit | ", curses.A_NORMAL), + ("refresh rate: ", curses.A_NORMAL), + ("-", curses.A_BOLD), + (f"[{refresh}s]", curses.A_NORMAL), + ("+", curses.A_BOLD)] + if battery_status == powerSaver.BatteryStatus.DISCHARGING: + status_msg += [(" | power sampling rate: ", curses.A_NORMAL), + (",", curses.A_BOLD), + (f"[{power_sampling_rate}s]", curses.A_NORMAL), + (".", curses.A_BOLD)] + if DEBUG: + status_msg += [(" | ", curses.A_NORMAL), + (f"cursor: {cursor_y}/{len(active_processes) + len(services) + len(modules) - 1}", + curses.color_pair(4)), + (" | ", curses.A_NORMAL), + (f"k: {k}", curses.color_pair(6)), + ] + # len(active_processes) + len(services) + len(modules) - 1 + + # Power Stats + power_status_msg = powerSaver.FormattedMessage() + power_status_msg += [("Battery: ", curses.A_NORMAL), + (f"{battery_percent:5.1f}%", battery_percent_color(battery_percent)), + (f" {battery_status.value}", curses.A_BOLD)] + if battery_watts > 0.0 and battery_status in [powerSaver.BatteryStatus.CHARGING, + powerSaver.BatteryStatus.DISCHARGING]: + power_status_msg += [(" | ", curses.A_NORMAL), + (f"{battery_watts:5.2f}", power_use_color(battery_watts)), + (f"W ({battery_h:2d}:{battery_m:02d}) ", curses.A_NORMAL)] + + if battery_status == powerSaver.BatteryStatus.DISCHARGING: + power_load = power_stats.get_power_load() + if power_load is not None: + battery_w_1min, battery_w_5min, battery_w_15min = power_load + if len(status_msg) > len(power_status_msg): + status_msg_space = "" + power_status_msg_space = " " * (len(status_msg) - len(power_status_msg)) + else: + power_status_msg_space = "" + status_msg_space = " " * (len(power_status_msg) - len(status_msg)) + + power_status_msg += [(f"{power_status_msg_space} | ", curses.A_NORMAL), + (f"{battery_w_1min:5.2f}", power_use_color(battery_w_1min)), + ("W ", curses.A_NORMAL), + ("{battery_w_5min:5.2f}", power_use_color(battery_w_5min)), + ("W ", curses.A_NORMAL), + ("{battery_w_15min:5.2f}", power_use_color(battery_w_15min)), + ("W", curses.A_NORMAL)] + time_est = power_stats.get_time_estimate_h_min() + if time_est is not None: + h_1min, m_1min, h_5min, m_5min, h_15min, m_15min = time_est + status_msg += f"{status_msg_space} | " \ + f"{h_1min:02d}:{m_1min:02d} " \ + f"{h_5min:02d}:{m_5min:02d} " \ + f"{h_15min:02d}:{m_15min:02d}" + if len(error_msg) > 0: + status_msg += [(" | ", curses.A_NORMAL), + (f"{error_msg}", curses.color_pair(5))] + status_msg += " "*min(0, width - len(status_msg) - 1) + power_status_msg += " "*min(0, width - len(power_status_msg) - 1) + status_msg.display(std_screen, height - 1, 0, max_width=width-1) + power_status_msg.display(std_screen, height - 2, 0, max_width=width-1) + + # Refresh the screen + std_screen.refresh() + + # calculate sleep length + now = datetime.now() + sleep_length_display = last_update_display + timedelta(seconds=refresh) - now + sleep_length_power = last_update_power + timedelta(seconds=power_sampling_rate) - now + sleep_length = min(sleep_length_power, sleep_length_display) + sleep_length = math.floor(max(0.0, sleep_length.total_seconds() * 1000)) + + first_loop = False + + # Wait for next input + poll_object.poll(sleep_length) + k = std_screen.getch() + + process_pool.shutdown() if __name__ == '__main__': diff --git a/powerSaver/__init__.py b/powerSaver/__init__.py index 13319ef..637091d 100644 --- a/powerSaver/__init__.py +++ b/powerSaver/__init__.py @@ -19,6 +19,7 @@ import powerSaver.serviceManager import powerSaver.moduleManager import powerSaver.powerStats +import powerSaver.formattedMessage from .processManager import ProcessManager from .processManager import ProcessStatus @@ -28,3 +29,4 @@ from .moduleManager import ModuleStatus from .powerStats import PowerStats from .powerStats import BatteryStatus +from .formattedMessage import FormattedMessage diff --git a/powerSaver/formattedMessage.py b/powerSaver/formattedMessage.py new file mode 100644 index 0000000..9cadbb6 --- /dev/null +++ b/powerSaver/formattedMessage.py @@ -0,0 +1,80 @@ +import curses +from typing import List, Tuple, Union, Optional + + +class FormattedMessageAppendDatatypeError(Exception): + pass + + +class FormattedMessage: + message: List[Tuple[str, int]] + + def __init__(self): + self.message = [] + + def append_message(self, + msg: Union[List[Tuple[str, int]], Tuple[str, int], str, "FormattedMessage"], + attr: Optional[int] = None) -> None: + if isinstance(msg, list): + for m in msg: + self.message.append(m) + elif isinstance(msg, tuple): + self.message.append(msg) + elif isinstance(msg, str): + if attr is not None: + self.message.append((msg, attr)) + else: + self.message.append((msg, curses.A_NORMAL)) + elif isinstance(msg, FormattedMessage): + for m in msg.message: + self.message.append(m) + else: + FormattedMessageAppendDatatypeError("Please only append the correct datatypes to a FormattedMessage\n" + " These are a single: Union[List[Tuple[str, int]], Tuple[str, int], " + "FormattedMessage]\n" + " Or these two: str, int") + + def display(self, screen: curses.window, y: int, x_in: int, + max_width: Optional[int] = None, + restore_format: Optional[List[int]] = None): + if restore_format is None: + restore_format = [curses.A_NORMAL, curses.color_pair(1)] + x = x_in + for msg, attr in self.message: + msg_out = msg + if max_width is not None and x + len(msg) > max_width: + msg_out = msg_out[:max_width - x] + screen.addstr(y, x, msg_out, attr) + x += len(msg) + if msg != msg_out: + break + for attr in restore_format: + screen.attrset(attr) + + def __len__(self): + msg_length = 0 + for msg, attr in self.message: + msg_length += len(msg) + return msg_length + + def __add__(self, + other: Union[List[Tuple[str, int]], Tuple[str, int], str, "FormattedMessage"]) -> "FormattedMessage": + out = FormattedMessage() + out.append_message(self.message) + try: + out.append_message(other.message) + except FormattedMessageAppendDatatypeError: + raise FormattedMessageAppendDatatypeError("Please only append the correct datatypes to a FormattedMessage\n" + " These are: Union[List[Tuple[str, int]], Tuple[str, int], " + "FormattedMessage]") + return out + + def __iadd__(self, + other: Union[List[Tuple[str, int]], Tuple[str, int], str, "FormattedMessage"]) -> "FormattedMessage": + try: + self.append_message(other) + except FormattedMessageAppendDatatypeError: + raise FormattedMessageAppendDatatypeError("Please only append the correct datatypes to a FormattedMessage\n" + " These are: Union[List[Tuple[str, int]], Tuple[str, int], " + "FormattedMessage]") + return self diff --git a/powerSaver/powerStats.py b/powerSaver/powerStats.py index fe8e787..49ff4d9 100644 --- a/powerSaver/powerStats.py +++ b/powerSaver/powerStats.py @@ -43,9 +43,9 @@ class PowerStats(object): current_now: float = 0.0 working: bool - def __init__(self, refresh: int = 5): + def __init__(self, refresh: int = 5, battery_path: str = "/sys/class/power_supply/BAT0"): self.working = True - self.battery_path = Path("/sys/class/power_supply/BAT0") + self.battery_path = Path(battery_path) self.refresh = refresh self.power_usage = [] if self.battery_path.is_dir(): diff --git a/powerSaver/serviceManager.py b/powerSaver/serviceManager.py index 3297943..1a042d5 100644 --- a/powerSaver/serviceManager.py +++ b/powerSaver/serviceManager.py @@ -48,8 +48,9 @@ class ServiceStatus(Enum): class ServiceManager(object): functions: Dict[str, Callable] sudo: bool + debug: bool - def __init__(self, init_type: str, sudo: bool = True): + def __init__(self, init_type: str, sudo: bool = False, debug: bool = False): function_db = { "sysvinit": { "get_status": ServiceManager._get_status_init, @@ -64,17 +65,18 @@ def __init__(self, init_type: str, sudo: bool = True): "toggle_service": ServiceManager._unimplemented_function } } - corrected_type = init_type - if init_type in ['init', 'openrc']: + corrected_type = init_type.lower() + if init_type.lower() in ['init', 'openrc']: corrected_type = "sysvinit" if corrected_type in function_db: self.functions = function_db[corrected_type] else: - self.functions = function_db["systemd"] + self.functions = function_db["sysvinit"] self.sudo = sudo + self.debug = debug def get_status(self, name: str) -> ServiceStatus: - return self.functions["get_status"](name, self.sudo) + return self.functions["get_status"](name, self.sudo, self.debug) def start_service(self, name: str) -> bool: return self.functions["start_service"](name, self.sudo) @@ -97,11 +99,12 @@ def __create_service_command(name: str, sudo: bool) -> Optional[List[str]]: script = "/etc/init.d/" + name if not is_exe(script): return None - command.append(script) + command.append("/sbin/rc-service") + command.append(name) return command @staticmethod - def _get_status_init(name: str, sudo: bool) -> ServiceStatus: + def _get_status_init(name: str, sudo: bool, debug: bool) -> ServiceStatus: command = ServiceManager.__create_service_command(name, sudo) if command is None: return ServiceStatus.NOT_FOUND @@ -109,7 +112,9 @@ def _get_status_init(name: str, sudo: bool) -> ServiceStatus: status_result = subprocess.run(command, capture_output=True) status_output = status_result.stdout.decode() + status_output += status_result.stderr.decode() + status = None match = sysvinit_status_parser.match(status_output) if match: status = match.group(1) @@ -119,6 +124,13 @@ def _get_status_init(name: str, sudo: bool) -> ServiceStatus: return ServiceStatus.STOPPED if status == "crashed": return ServiceStatus.CRASHED + if status == "stopping": + return ServiceStatus.TOGGLED + + if debug: + with open('.error_status', 'ab') as outF: + err_str = f"{name}: [{status_result.returncode}] {status_output} -> {status}\n" + os.write(outF.fileno(), err_str.encode()) return ServiceStatus.UNKNOWN