From bdba7e8fb89cdabfc4b5d439d08c067964f6381d Mon Sep 17 00:00:00 2001 From: Philip Guyton Date: Wed, 20 Mar 2024 16:37:51 +0000 Subject: [PATCH] [t] Add Group with custom GID fails with type error #2807 Add type hints to run_command() to ensure callers are passing the expected argument types and add some type hints &/or typecasts to run_command callers in: - user.py - active_directory.py - qgourp_ids list to enforce str() members only in qgroup_maxout_limit.py. - acl.py to enforce str() members of run_command() args. --- src/rockstor/scripts/qgroup_maxout_limit.py | 2 +- .../smart_manager/views/active_directory.py | 8 +++---- src/rockstor/system/acl.py | 8 +++---- src/rockstor/system/directory_services.py | 6 ++--- src/rockstor/system/osi.py | 23 ++++++++++--------- src/rockstor/system/users.py | 10 ++++---- 6 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/rockstor/scripts/qgroup_maxout_limit.py b/src/rockstor/scripts/qgroup_maxout_limit.py index 013d7f8b7..baa7979ab 100644 --- a/src/rockstor/scripts/qgroup_maxout_limit.py +++ b/src/rockstor/scripts/qgroup_maxout_limit.py @@ -34,7 +34,7 @@ def main(): print("Quotas not enabled on pool(%s). Skipping it." % p.name) continue - qgroup_ids = [] + qgroup_ids: list[str] = [] for l in o: if ( re.match("qgroupid", l) is not None diff --git a/src/rockstor/smart_manager/views/active_directory.py b/src/rockstor/smart_manager/views/active_directory.py index 831bd12d7..bc42ea2fc 100644 --- a/src/rockstor/smart_manager/views/active_directory.py +++ b/src/rockstor/smart_manager/views/active_directory.py @@ -100,9 +100,9 @@ def post(self, request, command): self._resolve_check(config.get("domain"), request) # 2. realm discover check? - domain = config.get("domain") + domain = str(config.get("domain")) try: - cmd = [REALM, "discover", "--name-only", domain] + cmd: list[str] = [REALM, "discover", "--name-only", domain] o, e, rc = run_command(cmd) except Exception as e: e_msg = ( @@ -117,14 +117,14 @@ def post(self, request, command): elif command == "start": config = self._config(service, request) - domain = config.get("domain") + domain = str(config.get("domain")) # 1. make sure ntpd is running, or else, don't start. self._ntp_check(request) # 2. Name resolution check? self._resolve_check(config.get("domain"), request) if method == "winbind": - cmd = [ + cmd: list[str] = [ "/usr/sbin/authconfig", ] # nss diff --git a/src/rockstor/system/acl.py b/src/rockstor/system/acl.py index 9b571279c..11caa5eb4 100644 --- a/src/rockstor/system/acl.py +++ b/src/rockstor/system/acl.py @@ -22,8 +22,8 @@ CHMOD = "/usr/bin/chmod" -def chown(share, owner, group=None, recursive=False): - cmd = [ +def chown(share: str, owner: str, group: str | None = None, recursive: bool = False): + cmd: list[str] = [ CHOWN, ] if recursive is True: @@ -34,8 +34,8 @@ def chown(share, owner, group=None, recursive=False): return run_command(cmd) -def chmod(share, perm_bits, recursive=False): - cmd = [ +def chmod(share: str, perm_bits: str, recursive: bool = False): + cmd: list[str] = [ CHMOD, ] if recursive is True: diff --git a/src/rockstor/system/directory_services.py b/src/rockstor/system/directory_services.py index 9f6d17911..79c34602d 100644 --- a/src/rockstor/system/directory_services.py +++ b/src/rockstor/system/directory_services.py @@ -267,7 +267,7 @@ def join_domain(config, method="sssd"): cmd[-3:-3] = cmd_options if method == "winbind": cmd = [NET, "ads", "join", "-U", admin] - return run_command(cmd, input=("{}\n".format(config.get("password"))), log=True) + return run_command(cmd, pinput=("{}\n".format(config.get("password"))), log=True) def leave_domain(config, method="sssd"): @@ -282,10 +282,10 @@ def leave_domain(config, method="sssd"): if method == "winbind": cmd = [NET, "ads", "leave", "-U", config.get("username")] try: - return run_command(cmd, input=pstr) + return run_command(cmd, pinput=pstr) except: status_cmd = [NET, "ads", "status", "-U", config.get("username")] - o, e, rc = run_command(status_cmd, input=pstr, throw=False) + o, e, rc = run_command(status_cmd, pinput=pstr, throw=False) if rc != 0: return logger.debug( "Status shows not joined. out: %s err: %s rc: %d" % (o, e, rc) diff --git a/src/rockstor/system/osi.py b/src/rockstor/system/osi.py index bc2322922..543fbd525 100644 --- a/src/rockstor/system/osi.py +++ b/src/rockstor/system/osi.py @@ -31,6 +31,7 @@ from struct import pack from tempfile import mkstemp from distutils.util import strtobool +from typing import AnyStr, IO from django.conf import settings @@ -214,16 +215,16 @@ def replace_pattern_inline(source_file, target_file, pattern, replacement): def run_command( - cmd, - shell=False, ## Default - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - throw=True, - log=False, - input=None, - raw=False, -): + cmd: list[str], + shell: bool = False, + stdout: None | int | IO = subprocess.PIPE, + stderr: None | int | IO = subprocess.PIPE, + stdin: None | int | IO = subprocess.PIPE, + throw: bool = True, + log: bool = False, + pinput: AnyStr | None = None, + raw: bool = False, +) -> (list[str] | str, list[str], int): try: # We force run_command to always use en_US # to avoid issues on date and number formats @@ -243,7 +244,7 @@ def run_command( env=fake_env, universal_newlines=True, # 3.7 adds text parameter universal_newlines alias ) - out, err = p.communicate(input=input) + out, err = p.communicate(input=pinput) # raw=True allows parsing of a JSON output directly, for instance if not raw: out = out.split("\n") diff --git a/src/rockstor/system/users.py b/src/rockstor/system/users.py index b5f81dc89..69f97fb86 100644 --- a/src/rockstor/system/users.py +++ b/src/rockstor/system/users.py @@ -173,7 +173,7 @@ def usermod(username, passwd): # crypted_passwd = crypt.crypt(passwd.encode("utf8"), salt_header + salt) salt = crypt.mksalt() crypted_passwd = crypt.crypt(passwd, salt) - cmd = [USERMOD, "-p", crypted_passwd, username] + cmd: list[str] = [USERMOD, "-p", crypted_passwd, username] out, err, rc = run_command(cmd, log=True) # p = subprocess.Popen( # cmd, @@ -236,7 +236,7 @@ def useradd(username, shell, uid=None, gid=None): ) return [""], [""], 0 - cmd = [USERADD, "-s", shell, "-m", username] + cmd: list[str] = [USERADD, "-s", shell, "-m", username] if uid is not None: cmd.insert(-1, "-u") cmd.insert(-1, str(uid)) @@ -246,11 +246,11 @@ def useradd(username, shell, uid=None, gid=None): return run_command(cmd) -def groupadd(groupname, gid=None): - cmd = [GROUPADD, groupname] +def groupadd(groupname: str, gid: int | None = None): + cmd: list[str] = [GROUPADD, groupname] if gid is not None: cmd.insert(-1, "-g") - cmd.insert(-1, gid) + cmd.insert(-1, str(gid)) return run_command(cmd)