diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py index 4312d8038055..72309a81c04b 100644 --- a/cloudinit/distros/__init__.py +++ b/cloudinit/distros/__init__.py @@ -133,6 +133,7 @@ class Distro(persistence.CloudInitPickleMixin, metaclass=abc.ABCMeta): doas_fn = "/etc/doas.conf" ci_sudoers_fn = "/etc/sudoers.d/90-cloud-init-users" hostname_conf_fn = "/etc/hostname" + shadow_fn = "/etc/shadow" tz_zone_dir = "/usr/share/zoneinfo" default_owner = "root:root" init_cmd = ["service"] # systemctl, service etc @@ -649,19 +650,21 @@ def preferred_ntp_clients(self): def get_default_user(self): return self.get_option("default_user") - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: """ Add a user to the system using standard GNU tools This should be overridden on distros where useradd is not desirable or not available. + + Returns False if user already exists, otherwise True. """ # XXX need to make add_user idempotent somehow as we # still want to add groups or modify SSH keys on pre-existing # users in the image. if util.is_user(name): LOG.info("User %s already exists, skipping.", name) - return + return False if "create_groups" in kwargs: create_groups = kwargs.pop("create_groups") @@ -765,6 +768,9 @@ def add_user(self, name, **kwargs): util.logexc(LOG, "Failed to create user %s", name) raise e + # Indicate that a new user was created + return True + def add_snap_user(self, name, **kwargs): """ Add a snappy user to the system using snappy tools @@ -792,6 +798,62 @@ def add_snap_user(self, name, **kwargs): return username + def _check_if_password_field_matches( + self, username, pattern1, pattern2, pattern3=None, check_file=None + ) -> bool: + """ + Check whether ``username`` user has a hashed password matching + either pattern. + + FreeBSD, NetBSD, and OpenBSD use 3 patterns, others only use + 2 patterns. + + Returns either 'True' to indicate a match, otherwise 'False'. + """ + + if not check_file: + check_file = self.shadow_fn + + cmd = [ + "grep", + "-q", + "-e", + "^%s%s" % (username, pattern1), + "-e", + "^%s%s" % (username, pattern2), + ] + if pattern3 is not None: + cmd.extend(["-e", "^%s%s" % (username, pattern3)]) + cmd.append(check_file) + try: + subp.subp(cmd) + except subp.ProcessExecutionError as e: + if e.exit_code == 1: + # Exit code 1 means 'grep' didn't find empty password + return True + else: + util.logexc( + LOG, + "Failed to check the status of password for user %s", + username, + ) + raise e + return False + + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, "::", ":!:", check_file=shadow_file + ) + return status + def create_user(self, name, **kwargs): """ Creates or partially updates the ``name`` user in the system. @@ -818,20 +880,103 @@ def create_user(self, name, **kwargs): return self.add_snap_user(name, **kwargs) # Add the user - self.add_user(name, **kwargs) - - # Set password if plain-text password provided and non-empty - if "plain_text_passwd" in kwargs and kwargs["plain_text_passwd"]: - self.set_passwd(name, kwargs["plain_text_passwd"]) - - # Set password if hashed password is provided and non-empty - if "hashed_passwd" in kwargs and kwargs["hashed_passwd"]: - self.set_passwd(name, kwargs["hashed_passwd"], hashed=True) + pre_existing_user = not self.add_user(name, **kwargs) + + has_existing_password = False + ud_blank_password_specified = False + ud_password_specified = False + password_key = None + + if "plain_text_passwd" in kwargs: + ud_password_specified = True + password_key = "plain_text_passwd" + if kwargs["plain_text_passwd"]: + # Set password if plain-text password provided and non-empty + self.set_passwd(name, kwargs["plain_text_passwd"]) + else: + ud_blank_password_specified = True + + if "hashed_passwd" in kwargs: + ud_password_specified = True + password_key = "hashed_passwd" + if kwargs["hashed_passwd"]: + # Set password if hashed password is provided and non-empty + self.set_passwd(name, kwargs["hashed_passwd"], hashed=True) + else: + ud_blank_password_specified = True + + if pre_existing_user: + if not ud_password_specified: + if "passwd" in kwargs: + password_key = "passwd" + # Only "plain_text_passwd" and "hashed_passwd" + # are valid for an existing user. + LOG.warning( + "'passwd' in user-data is ignored for existing " + "user %s", + name, + ) - # Default locking down the account. 'lock_passwd' defaults to True. - # lock account unless lock_password is False. + # As no password specified for the existing user in user-data + # then check if the existing user's hashed password value is + # blank (whether locked or not). + if util.system_is_snappy(): + has_existing_password = self._check_if_existing_password( + name, "/var/lib/extrausers/shadow" + ) + if not has_existing_password: + # Check /etc/shadow also + has_existing_password = ( + self._check_if_existing_password(name) + ) + else: + has_existing_password = self._check_if_existing_password( + name + ) + else: + if "passwd" in kwargs: + ud_password_specified = True + password_key = "passwd" + if not kwargs["passwd"]: + ud_blank_password_specified = True + + # Default locking down the account. 'lock_passwd' defaults to True. + # Lock account unless lock_password is False in which case unlock + # account as long as a password (blank or otherwise) was specified. if kwargs.get("lock_passwd", True): self.lock_passwd(name) + elif has_existing_password or ud_password_specified: + # 'lock_passwd: False' and either existing account already with + # non-blank password or else existing/new account with password + # explicitly set in user-data. + if ud_blank_password_specified: + LOG.debug( + "Allowing unlocking empty password for %s based on empty" + " '%s' in user-data", + name, + password_key, + ) + + # Unlock the existing/new account + self.unlock_passwd(name) + elif pre_existing_user: + # Pre-existing user with no existing password and none + # explicitly set in user-data. + LOG.warning( + "Not unlocking blank password for existing user %s." + " 'lock_passwd: false' present in user-data but no existing" + " password set and no 'plain_text_passwd'/'hashed_passwd'" + " provided in user-data", + name, + ) + else: + # No password (whether blank or otherwise) explicitly set + LOG.warning( + "Not unlocking password for user %s. 'lock_passwd: false'" + " present in user-data but no 'passwd'/'plain_text_passwd'/" + "'hashed_passwd' provided in user-data", + name, + ) # Configure doas access if "doas" in kwargs: @@ -908,6 +1053,50 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to disable password for user %s", name) raise e + def unlock_passwd(self, name: str): + """ + Unlock the password of a user, i.e., enable password logins + """ + # passwd must use short '-u' due to SLES11 lacking long form '--unlock' + unlock_tools = (["passwd", "-u", name], ["usermod", "--unlock", name]) + try: + cmd = next(tool for tool in unlock_tools if subp.which(tool[0])) + except StopIteration as e: + raise RuntimeError( + "Unable to unlock user account '%s'. No tools available. " + " Tried: %s." % (name, [c[0] for c in unlock_tools]) + ) from e + try: + _, err = subp.subp(cmd, rcs=[0, 3]) + except Exception as e: + util.logexc(LOG, "Failed to enable password for user %s", name) + raise e + if err: + # if "passwd" or "usermod" are unable to unlock an account with + # an empty password then they display a message on stdout. In + # that case then instead set a blank password. + passwd_set_tools = ( + ["passwd", "-d", name], + ["usermod", "--password", "''", name], + ) + try: + cmd = next( + tool for tool in passwd_set_tools if subp.which(tool[0]) + ) + except StopIteration as e: + raise RuntimeError( + "Unable to set blank password for user account '%s'. " + "No tools available. " + " Tried: %s." % (name, [c[0] for c in unlock_tools]) + ) from e + try: + subp.subp(cmd) + except Exception as e: + util.logexc( + LOG, "Failed to set blank password for user %s", name + ) + raise e + def expire_passwd(self, user): try: subp.subp(["passwd", "--expire", user]) @@ -942,6 +1131,9 @@ def chpasswd(self, plist_in: list, hashed: bool): ) + "\n" ) + # Need to use the short option name '-e' instead of '--encrypted' + # (which would be more descriptive) since Busybox and SLES 11 + # chpasswd don't know about long names. cmd = ["chpasswd"] + (["-e"] if hashed else []) subp.subp(cmd, data=payload) diff --git a/cloudinit/distros/alpine.py b/cloudinit/distros/alpine.py index 867963f2c78f..cd1323b404da 100644 --- a/cloudinit/distros/alpine.py +++ b/cloudinit/distros/alpine.py @@ -205,16 +205,18 @@ def preferred_ntp_clients(self): return self._preferred_ntp_clients - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: """ Add a user to the system using standard tools On Alpine this may use either 'useradd' or 'adduser' depending on whether the 'shadow' package is installed. + + Returns False if user already exists, otherwise True. """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) - return + return False if "selinux_user" in kwargs: LOG.warning("Ignoring selinux_user parameter for Alpine Linux") @@ -418,6 +420,9 @@ def add_user(self, name, **kwargs): LOG, "Failed to update %s for user %s", shadow_file, name ) + # Indicate that a new user was created + return True + def lock_passwd(self, name): """ Lock the password of a user, i.e., disable password logins @@ -446,6 +451,36 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to disable password for user %s", name) raise e + def unlock_passwd(self, name: str): + """ + Unlock the password of a user, i.e., enable password logins + """ + + # Check whether Shadow's or Busybox's version of 'passwd'. + # If Shadow's 'passwd' is available then use the generic + # lock_passwd function from __init__.py instead. + if not os.path.islink( + "/usr/bin/passwd" + ) or "bbsuid" not in os.readlink("/usr/bin/passwd"): + return super().unlock_passwd(name) + + cmd = ["passwd", "-u", name] + # Busybox's 'passwd', unlike Shadow's 'passwd', errors + # if password is already unlocked: + # + # "passwd: password for user2 is already unlocked" + # + # with exit code 1 + # + # and also does *not* error if no password is set. + try: + _, err = subp.subp(cmd, rcs=[0, 1]) + if re.search(r"is already unlocked", err): + return True + except subp.ProcessExecutionError as e: + util.logexc(LOG, "Failed to unlock password for user %s", name) + raise e + def expire_passwd(self, user): # Check whether Shadow's or Busybox's version of 'passwd'. # If Shadow's 'passwd' is available then use the generic diff --git a/cloudinit/distros/bsd.py b/cloudinit/distros/bsd.py index 5bef9203c3d1..4e9fa1f7f2bd 100644 --- a/cloudinit/distros/bsd.py +++ b/cloudinit/distros/bsd.py @@ -15,6 +15,7 @@ class BSD(distros.Distro): networking_cls = BSDNetworking hostname_conf_fn = "/etc/rc.conf" rc_conf_fn = "/etc/rc.conf" + shadow_fn = "/etc/master.passwd" default_owner = "root:wheel" # This differs from the parent Distro class, which has -P for diff --git a/cloudinit/distros/freebsd.py b/cloudinit/distros/freebsd.py index 2d8fa02fea6d..ccd961159278 100644 --- a/cloudinit/distros/freebsd.py +++ b/cloudinit/distros/freebsd.py @@ -86,7 +86,12 @@ def manage_service( def _get_add_member_to_group_cmd(self, member_name, group_name): return ["pw", "usermod", "-n", member_name, "-G", group_name] - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: + """ + Add a user to the system using standard tools + + Returns False if user already exists, otherwise True. + """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) return False @@ -140,6 +145,28 @@ def add_user(self, name, **kwargs): if passwd_val is not None: self.set_passwd(name, passwd_val, hashed=True) + # Indicate that a new user was created + return True + + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + For FreeBSD (from https://man.freebsd.org/cgi/man.cgi?passwd(5)) a + password field of "" indicates no password, and a password + field value of either "*" or "*LOCKED*" indicate differing forms of + "locked" but with no password defined. + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, "::", ":*:", ":*LOCKED*:", check_file=shadow_file + ) + return status + def expire_passwd(self, user): try: subp.subp(["pw", "usermod", user, "-p", "01-Jan-1970"]) @@ -170,6 +197,13 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to lock password login for user %s", name) raise + def unlock_passwd(self, name): + LOG.debug( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user %s", + name, + ) + def apply_locale(self, locale, out_fn=None): # Adjust the locales value to the new value newconf = StringIO() diff --git a/cloudinit/distros/netbsd.py b/cloudinit/distros/netbsd.py index 972528c6df00..b9e038e895d6 100644 --- a/cloudinit/distros/netbsd.py +++ b/cloudinit/distros/netbsd.py @@ -63,7 +63,12 @@ def __init__(self, name, cfg, paths): def _get_add_member_to_group_cmd(self, member_name, group_name): return ["usermod", "-G", group_name, member_name] - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: + """ + Add a user to the system using standard tools + + Returns False if user already exists, otherwise True. + """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) return False @@ -112,6 +117,33 @@ def add_user(self, name, **kwargs): if passwd_val is not None: self.set_passwd(name, passwd_val, hashed=True) + # Indicate that a new user was created + return True + + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + For NetBSD (from https://man.netbsd.org/passwd.5) a password field + value of either "" or "*************" (13 "*") indicates no password, + a password field prefixed with "*LOCKED*" indicates a locked + password, and a password field of "*LOCKED*" followed by 13 "*" + indicates a locked and blank password. + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, + "::", + ":*************:", + ":*LOCKED**************:", + check_file=shadow_file, + ) + return status + def set_passwd(self, user, passwd, hashed=False): if hashed: hashed_pw = passwd diff --git a/cloudinit/distros/openbsd.py b/cloudinit/distros/openbsd.py index a701580deb15..418e99bc495a 100644 --- a/cloudinit/distros/openbsd.py +++ b/cloudinit/distros/openbsd.py @@ -45,6 +45,25 @@ def manage_service(cls, action: str, service: str, *extra_args, rcs=None): cmd = list(init_cmd) + list(cmds[action]) return subp.subp(cmd, capture=True, rcs=rcs) + def _check_if_existing_password(self, username, shadow_file=None) -> bool: + """ + Check whether ``username`` user has an existing password (regardless + of whether locked or not). + + For OpenBSD (from https://man.openbsd.org/passwd.5) a password field + of "" indicates no password, and password field values of either + "*" or "*************" (13 "*") indicate differing forms of "locked" + but with no password defined. + + Returns either 'True' to indicate a password present, or 'False' + for no password set. + """ + + status = not self._check_if_password_field_matches( + username, "::", ":*:", ":*************:", check_file=shadow_file + ) + return status + def lock_passwd(self, name): try: subp.subp(["usermod", "-p", "*", name]) @@ -53,7 +72,11 @@ def lock_passwd(self, name): raise def unlock_passwd(self, name): - pass + LOG.debug( + "OpenBSD password lock is not reversible, " + "ignoring unlock for user %s", + name, + ) def _get_pkg_cmd_environ(self): """Return env vars used in OpenBSD package_command operations""" diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py index 039723aaad2b..df52c2c3d2e8 100644 --- a/tests/unittests/distros/test_create_users.py +++ b/tests/unittests/distros/test_create_users.py @@ -16,6 +16,21 @@ def common_mocks(mocker): mocker.patch("cloudinit.distros.util.system_is_snappy", return_value=False) +def _existing_shadow_grep(name: str): + """Return a mock of grep of /etc/shadow call based on username.""" + return mock.call( + ["grep", "-q", "-e", f"^{name}::", "-e", f"^{name}:!:", "/etc/shadow"] + ) + + +def _chpasswdmock(name: str, password: str, hashed: bool = False): + """Return a mock of chpasswd call based on args""" + cmd = ["chpasswd", "-e"] if hashed else ["chpasswd"] + return mock.call( + cmd, data=f"{name}:{password}", logstring=f"chpasswd for {name}" + ) + + def _useradd2call(args: List[str]): # return a mock call for the useradd command in args # with expected 'logstring'. @@ -76,17 +91,98 @@ def dist(self): id="unlocked", ), pytest.param( - {"passwd": "passfoo"}, + {"passwd": "$6$rounds=..."}, + [ + _useradd2call([USER, "--password", "$6$rounds=...", "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="set_implicit_encrypted_password", + ), + pytest.param( + {"passwd": ""}, + [ + _useradd2call([USER, "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="set_empty_passwd_new_user", + ), + pytest.param( + {"plain_text_passwd": "clearfoo"}, + [ + _useradd2call([USER, "-m"]), + _chpasswdmock(USER, "clearfoo"), + mock.call(["passwd", "-l", USER]), + ], + id="set_plain_text_password", + ), + pytest.param( + {"hashed_passwd": "$6$rounds=..."}, + [ + _useradd2call([USER, "-m"]), + _chpasswdmock(USER, "$6$rounds=...", hashed=True), + mock.call(["passwd", "-l", USER]), + ], + id="set_explicitly_hashed_password", + ), + ], + ) + @mock.patch("cloudinit.distros.util.is_user", return_value=False) + def test_create_options( + self, _is_user, m_subp, dist, create_kwargs, expected + ): + dist.create_user(name=USER, **create_kwargs) + assert m_subp.call_args_list == expected + + @pytest.mark.parametrize( + "create_kwargs,expected,expected_logs", + [ + pytest.param( + {"passwd": "$6$rounds=..."}, [ - _useradd2call([USER, "--password", "passfoo", "-m"]), + _existing_shadow_grep(USER), mock.call(["passwd", "-l", USER]), ], - id="set_password", + [ + "'passwd' in user-data is ignored for existing user " + "foo_user" + ], + id="skip_passwd_set_on_existing_user", + ), + pytest.param( + {"plain_text_passwd": "clearfoo"}, + [ + _chpasswdmock(USER, "clearfoo"), + mock.call(["passwd", "-l", USER]), + ], + [], + id="set_plain_text_password_on_existing_user", + ), + pytest.param( + {"hashed_passwd": "$6$rounds=..."}, + [ + _chpasswdmock(USER, "$6$rounds=...", hashed=True), + mock.call(["passwd", "-l", USER]), + ], + [], + id="set_explicitly_hashed_password", ), ], ) - def test_create_options(self, m_subp, dist, create_kwargs, expected): + @mock.patch("cloudinit.distros.util.is_user", return_value=True) + def test_create_passwd_existing_user( + self, + m_is_user, + m_subp, + dist, + create_kwargs, + expected, + expected_logs, + caplog, + ): + """When user exists, don't unlock on empty or locked passwords.""" dist.create_user(name=USER, **create_kwargs) + for log in expected_logs: + assert log in caplog.text assert m_subp.call_args_list == expected @mock.patch("cloudinit.distros.util.is_group") diff --git a/tests/unittests/distros/test_dragonflybsd.py b/tests/unittests/distros/test_dragonflybsd.py index 8a240ea5fa91..8fe8a11aa0df 100644 --- a/tests/unittests/distros/test_dragonflybsd.py +++ b/tests/unittests/distros/test_dragonflybsd.py @@ -1,8 +1,66 @@ # This file is part of cloud-init. See LICENSE file for license information. import cloudinit.util +from cloudinit.distros.dragonflybsd import Distro +from cloudinit.distros.freebsd import FreeBSDNetworking +from tests.unittests.distros import _get_distro from tests.unittests.helpers import mock +M_PATH = "cloudinit.distros." + + +class TestDragonFlyBSD: + @mock.patch(M_PATH + "subp.subp") + def test_add_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("dragonflybsd") + distro.add_user("me2", uid=1234, default=False) + assert [ + mock.call( + [ + "pw", + "useradd", + "-n", + "me2", + "-u", + "1234", + "-d/home/me2", + "-m", + ], + logstring=["pw", "useradd", "-n", "me2", "-d/home/me2", "-m"], + ) + ] == m_subp.call_args_list + + @mock.patch(M_PATH + "subp.subp") + def test_check_existing_password_for_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("dragonflybsd") + distro._check_if_existing_password("me2") + assert [ + mock.call( + [ + "grep", + "-q", + "-e", + "^me2::", + "-e", + "^me2:*:", + "-e", + "^me2:*LOCKED*:", + "/etc/master.passwd", + ] + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, mocker, caplog): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("dragonflybsd") + distro.unlock_passwd("me2") + assert ( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + ) + def test_find_dragonflybsd_part(): assert cloudinit.util.find_freebsd_part("/dev/vbd0s3") == "vbd0s3" diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py index c4c067ead713..6cf962dab54c 100644 --- a/tests/unittests/distros/test_freebsd.py +++ b/tests/unittests/distros/test_freebsd.py @@ -39,6 +39,36 @@ def test_add_user(self, m_subp, mocker): ) ] == m_subp.call_args_list + @mock.patch(M_PATH + "subp.subp") + def test_check_existing_password_for_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("freebsd") + distro._check_if_existing_password("me2") + assert [ + mock.call( + [ + "grep", + "-q", + "-e", + "^me2::", + "-e", + "^me2:*:", + "-e", + "^me2:*LOCKED*:", + "/etc/master.passwd", + ] + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, mocker, caplog): + mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + distro = _get_distro("freebsd") + distro.unlock_passwd("me2") + assert ( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + ) + class TestDeviceLookUp(CiTestCase): @mock.patch("cloudinit.subp.subp") diff --git a/tests/unittests/distros/test_openbsd.py b/tests/unittests/distros/test_openbsd.py new file mode 100644 index 000000000000..0e28ed5dc00e --- /dev/null +++ b/tests/unittests/distros/test_openbsd.py @@ -0,0 +1,51 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from cloudinit.distros.bsd import BSDNetworking +from cloudinit.distros.openbsd import Distro +from tests.unittests.distros import _get_distro +from tests.unittests.helpers import mock + +M_PATH = "cloudinit.distros.openbsd." + + +class TestOpenBSD: + @mock.patch(M_PATH + "subp.subp") + def test_add_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=BSDNetworking) + distro = _get_distro("openbsd") + distro.add_user("me2", uid=1234, default=False) + assert [ + mock.call( + ["useradd", "-m", "me2"], logstring=["useradd", "-m", "me2"] + ) + ] == m_subp.call_args_list + + @mock.patch(M_PATH + "subp.subp") + def test_check_existing_password_for_user(self, m_subp, mocker): + mocker.patch.object(Distro, "networking_cls", spec=BSDNetworking) + distro = _get_distro("openbsd") + distro._check_if_existing_password("me2") + assert [ + mock.call( + [ + "grep", + "-q", + "-e", + "^me2::", + "-e", + "^me2:*:", + "-e", + "^me2:*************:", + "/etc/master.passwd", + ] + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, mocker, caplog): + mocker.patch.object(Distro, "networking_cls", spec=BSDNetworking) + distro = _get_distro("openbsd") + distro.unlock_passwd("me2") + assert ( + "OpenBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + )