diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py index 1afef63de95..e65cbfb5d89 100644 --- a/cloudinit/distros/__init__.py +++ b/cloudinit/distros/__init__.py @@ -136,6 +136,10 @@ class Distro(persistence.CloudInitPickleMixin, metaclass=abc.ABCMeta): doas_fn = "/etc/doas.conf" ci_sudoers_fn = "/etc/sudoers.d/90-cloud-init-users" hostname_conf_fn = "/etc/hostname" + shadow_fn = "/etc/shadow" + shadow_extrausers_fn = "/var/lib/extrausers/shadow" + # /etc/shadow match patterns indicating empty passwords + shadow_empty_locked_passwd_patterns = ["^{username}::", "^{username}:!:"] tz_zone_dir = "/usr/share/zoneinfo" default_owner = "root:root" init_cmd = ["service"] # systemctl, service etc @@ -655,19 +659,21 @@ def preferred_ntp_clients(self): def get_default_user(self): return self.get_option("default_user") - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: """ Add a user to the system using standard GNU tools This should be overridden on distros where useradd is not desirable or not available. + + Returns False if user already exists, otherwise True. """ # XXX need to make add_user idempotent somehow as we # still want to add groups or modify SSH keys on pre-existing # users in the image. if util.is_user(name): LOG.info("User %s already exists, skipping.", name) - return + return False if "create_groups" in kwargs: create_groups = kwargs.pop("create_groups") @@ -771,6 +777,9 @@ def add_user(self, name, **kwargs): util.logexc(LOG, "Failed to create user %s", name) raise e + # Indicate that a new user was created + return True + def add_snap_user(self, name, **kwargs): """ Add a snappy user to the system using snappy tools @@ -798,6 +807,40 @@ def add_snap_user(self, name, **kwargs): return username + def _shadow_file_has_empty_user_password(self, username) -> bool: + """ + Check whether username exists in shadow files with empty password. + + Support reading /var/lib/extrausers/shadow on snappy systems. + """ + if util.system_is_snappy(): + shadow_files = [self.shadow_extrausers_fn, self.shadow_fn] + else: + shadow_files = [self.shadow_fn] + shadow_empty_passwd_re = "|".join( + [ + pattern.format(username=username) + for pattern in self.shadow_empty_locked_passwd_patterns + ] + ) + for shadow_file in shadow_files: + if not os.path.exists(shadow_file): + continue + shadow_content = util.load_text_file(shadow_file) + if not re.findall(rf"^{username}:", shadow_content, re.MULTILINE): + LOG.debug("User %s not found in %s", username, shadow_file) + continue + LOG.debug( + "User %s found in %s. Checking for empty password", + username, + shadow_file, + ) + if re.findall( + shadow_empty_passwd_re, shadow_content, re.MULTILINE + ): + return True + return False + def create_user(self, name, **kwargs): """ Creates or partially updates the ``name`` user in the system. @@ -824,20 +867,93 @@ def create_user(self, name, **kwargs): return self.add_snap_user(name, **kwargs) # Add the user - self.add_user(name, **kwargs) - - # Set password if plain-text password provided and non-empty - if "plain_text_passwd" in kwargs and kwargs["plain_text_passwd"]: - self.set_passwd(name, kwargs["plain_text_passwd"]) - - # Set password if hashed password is provided and non-empty - if "hashed_passwd" in kwargs and kwargs["hashed_passwd"]: - self.set_passwd(name, kwargs["hashed_passwd"], hashed=True) + pre_existing_user = not self.add_user(name, **kwargs) + + has_existing_password = False + ud_blank_password_specified = False + ud_password_specified = False + password_key = None + + if "plain_text_passwd" in kwargs: + ud_password_specified = True + password_key = "plain_text_passwd" + if kwargs["plain_text_passwd"]: + # Set password if plain-text password provided and non-empty + self.set_passwd(name, kwargs["plain_text_passwd"]) + else: + ud_blank_password_specified = True + + if "hashed_passwd" in kwargs: + ud_password_specified = True + password_key = "hashed_passwd" + if kwargs["hashed_passwd"]: + # Set password if hashed password is provided and non-empty + self.set_passwd(name, kwargs["hashed_passwd"], hashed=True) + else: + ud_blank_password_specified = True + + if pre_existing_user: + if not ud_password_specified: + if "passwd" in kwargs: + password_key = "passwd" + # Only "plain_text_passwd" and "hashed_passwd" + # are valid for an existing user. + LOG.warning( + "'passwd' in user-data is ignored for existing " + "user %s", + name, + ) - # Default locking down the account. 'lock_passwd' defaults to True. - # lock account unless lock_password is False. + # As no password specified for the existing user in user-data + # then check if the existing user's hashed password value is + # empty (whether locked or not). + has_existing_password = not ( + self._shadow_file_has_empty_user_password(name) + ) + else: + if "passwd" in kwargs: + ud_password_specified = True + password_key = "passwd" + if not kwargs["passwd"]: + ud_blank_password_specified = True + + # Default locking down the account. 'lock_passwd' defaults to True. + # Lock account unless lock_password is False in which case unlock + # account as long as a password (blank or otherwise) was specified. if kwargs.get("lock_passwd", True): self.lock_passwd(name) + elif has_existing_password or ud_password_specified: + # 'lock_passwd: False' and either existing account already with + # non-blank password or else existing/new account with password + # explicitly set in user-data. + if ud_blank_password_specified: + LOG.debug( + "Allowing unlocking empty password for %s based on empty" + " '%s' in user-data", + name, + password_key, + ) + + # Unlock the existing/new account + self.unlock_passwd(name) + elif pre_existing_user: + # Pre-existing user with no existing password and none + # explicitly set in user-data. + LOG.warning( + "Not unlocking blank password for existing user %s." + " 'lock_passwd: false' present in user-data but no existing" + " password set and no 'plain_text_passwd'/'hashed_passwd'" + " provided in user-data", + name, + ) + else: + # No password (whether blank or otherwise) explicitly set + LOG.warning( + "Not unlocking password for user %s. 'lock_passwd: false'" + " present in user-data but no 'passwd'/'plain_text_passwd'/" + "'hashed_passwd' provided in user-data", + name, + ) # Configure doas access if "doas" in kwargs: @@ -914,6 +1030,50 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to disable password for user %s", name) raise e + def unlock_passwd(self, name: str): + """ + Unlock the password of a user, i.e., enable password logins + """ + # passwd must use short '-u' due to SLES11 lacking long form '--unlock' + unlock_tools = (["passwd", "-u", name], ["usermod", "--unlock", name]) + try: + cmd = next(tool for tool in unlock_tools if subp.which(tool[0])) + except StopIteration as e: + raise RuntimeError( + "Unable to unlock user account '%s'. No tools available. " + " Tried: %s." % (name, [c[0] for c in unlock_tools]) + ) from e + try: + _, err = subp.subp(cmd, rcs=[0, 3]) + except Exception as e: + util.logexc(LOG, "Failed to enable password for user %s", name) + raise e + if err: + # if "passwd" or "usermod" are unable to unlock an account with + # an empty password then they display a message on stdout. In + # that case then instead set a blank password. + passwd_set_tools = ( + ["passwd", "-d", name], + ["usermod", "--password", "''", name], + ) + try: + cmd = next( + tool for tool in passwd_set_tools if subp.which(tool[0]) + ) + except StopIteration as e: + raise RuntimeError( + "Unable to set blank password for user account '%s'. " + "No tools available. " + " Tried: %s." % (name, [c[0] for c in unlock_tools]) + ) from e + try: + subp.subp(cmd) + except Exception as e: + util.logexc( + LOG, "Failed to set blank password for user %s", name + ) + raise e + def expire_passwd(self, user): try: subp.subp(["passwd", "--expire", user]) @@ -948,6 +1108,9 @@ def chpasswd(self, plist_in: list, hashed: bool): ) + "\n" ) + # Need to use the short option name '-e' instead of '--encrypted' + # (which would be more descriptive) since Busybox and SLES 11 + # chpasswd don't know about long names. cmd = ["chpasswd"] + (["-e"] if hashed else []) subp.subp(cmd, data=payload) diff --git a/cloudinit/distros/alpine.py b/cloudinit/distros/alpine.py index dae4b61564e..19912d3724f 100644 --- a/cloudinit/distros/alpine.py +++ b/cloudinit/distros/alpine.py @@ -205,16 +205,18 @@ def preferred_ntp_clients(self): return self._preferred_ntp_clients - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: """ Add a user to the system using standard tools On Alpine this may use either 'useradd' or 'adduser' depending on whether the 'shadow' package is installed. + + Returns False if user already exists, otherwise True. """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) - return + return False if "selinux_user" in kwargs: LOG.warning("Ignoring selinux_user parameter for Alpine Linux") @@ -418,6 +420,9 @@ def add_user(self, name, **kwargs): LOG, "Failed to update %s for user %s", shadow_file, name ) + # Indicate that a new user was created + return True + def lock_passwd(self, name): """ Lock the password of a user, i.e., disable password logins @@ -446,6 +451,36 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to disable password for user %s", name) raise e + def unlock_passwd(self, name: str): + """ + Unlock the password of a user, i.e., enable password logins + """ + + # Check whether Shadow's or Busybox's version of 'passwd'. + # If Shadow's 'passwd' is available then use the generic + # lock_passwd function from __init__.py instead. + if not os.path.islink( + "/usr/bin/passwd" + ) or "bbsuid" not in os.readlink("/usr/bin/passwd"): + return super().unlock_passwd(name) + + cmd = ["passwd", "-u", name] + # Busybox's 'passwd', unlike Shadow's 'passwd', errors + # if password is already unlocked: + # + # "passwd: password for user2 is already unlocked" + # + # with exit code 1 + # + # and also does *not* error if no password is set. + try: + _, err = subp.subp(cmd, rcs=[0, 1]) + if re.search(r"is already unlocked", err): + return True + except subp.ProcessExecutionError as e: + util.logexc(LOG, "Failed to unlock password for user %s", name) + raise e + def expire_passwd(self, user): # Check whether Shadow's or Busybox's version of 'passwd'. # If Shadow's 'passwd' is available then use the generic diff --git a/cloudinit/distros/bsd.py b/cloudinit/distros/bsd.py index 15be9c36714..8433aac2aa8 100644 --- a/cloudinit/distros/bsd.py +++ b/cloudinit/distros/bsd.py @@ -15,6 +15,7 @@ class BSD(distros.Distro): networking_cls = BSDNetworking hostname_conf_fn = "/etc/rc.conf" rc_conf_fn = "/etc/rc.conf" + shadow_fn = "/etc/master.passwd" default_owner = "root:wheel" # This differs from the parent Distro class, which has -P for diff --git a/cloudinit/distros/freebsd.py b/cloudinit/distros/freebsd.py index ba35b2e611f..fc1c38a424a 100644 --- a/cloudinit/distros/freebsd.py +++ b/cloudinit/distros/freebsd.py @@ -41,6 +41,17 @@ class Distro(cloudinit.distros.bsd.BSD): dhclient_lease_directory = "/var/db" dhclient_lease_file_regex = r"dhclient.leases.\w+" + # /etc/shadow match patterns indicating empty passwords + # For FreeBSD (from https://man.freebsd.org/cgi/man.cgi?passwd(5)) a + # password field of "" indicates no password, and a password + # field value of either "*" or "*LOCKED*" indicate differing forms of + # "locked" but with no password defined. + shadow_empty_locked_passwd_patterns = [ + r"^{username}::", + r"^{username}:\*:", + r"^{username}:\*LOCKED\*:", + ] + @classmethod def reload_init(cls, rcs=None): """ @@ -86,7 +97,12 @@ def manage_service( def _get_add_member_to_group_cmd(self, member_name, group_name): return ["pw", "usermod", "-n", member_name, "-G", group_name] - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: + """ + Add a user to the system using standard tools + + Returns False if user already exists, otherwise True. + """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) return False @@ -140,6 +156,9 @@ def add_user(self, name, **kwargs): if passwd_val is not None: self.set_passwd(name, passwd_val, hashed=True) + # Indicate that a new user was created + return True + def expire_passwd(self, user): try: subp.subp(["pw", "usermod", user, "-p", "01-Jan-1970"]) @@ -170,6 +189,13 @@ def lock_passwd(self, name): util.logexc(LOG, "Failed to lock password login for user %s", name) raise + def unlock_passwd(self, name): + LOG.debug( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user %s", + name, + ) + def apply_locale(self, locale, out_fn=None): # Adjust the locales value to the new value newconf = StringIO() diff --git a/cloudinit/distros/netbsd.py b/cloudinit/distros/netbsd.py index da8c1904028..157aba06924 100644 --- a/cloudinit/distros/netbsd.py +++ b/cloudinit/distros/netbsd.py @@ -49,6 +49,17 @@ class NetBSD(cloudinit.distros.bsd.BSD): ci_sudoers_fn = "/usr/pkg/etc/sudoers.d/90-cloud-init-users" group_add_cmd_prefix = ["groupadd"] + # For NetBSD (from https://man.netbsd.org/passwd.5) a password field + # value of either "" or "*************" (13 "*") indicates no password, + # a password field prefixed with "*LOCKED*" indicates a locked + # password, and a password field of "*LOCKED*" followed by 13 "*" + # indicates a locked and blank password. + shadow_empty_locked_passwd_patterns = [ + r"^{username}::", + r"^{username}:\*\*\*\*\*\*\*\*\*\*\*\*\*:", + r"^{username}:\*LOCKED\*\*\*\*\*\*\*\*\*\*\*\*\*\*:", + ] + def __init__(self, name, cfg, paths): super().__init__(name, cfg, paths) if os.path.exists("/usr/pkg/bin/pkgin"): @@ -63,7 +74,12 @@ def __init__(self, name, cfg, paths): def _get_add_member_to_group_cmd(self, member_name, group_name): return ["usermod", "-G", group_name, member_name] - def add_user(self, name, **kwargs): + def add_user(self, name, **kwargs) -> bool: + """ + Add a user to the system using standard tools + + Returns False if user already exists, otherwise True. + """ if util.is_user(name): LOG.info("User %s already exists, skipping.", name) return False @@ -112,6 +128,9 @@ def add_user(self, name, **kwargs): if passwd_val is not None: self.set_passwd(name, passwd_val, hashed=True) + # Indicate that a new user was created + return True + def set_passwd(self, user, passwd, hashed=False): if hashed: hashed_pw = passwd diff --git a/cloudinit/distros/networking.py b/cloudinit/distros/networking.py index af9584bdfca..67f10f4fbcf 100644 --- a/cloudinit/distros/networking.py +++ b/cloudinit/distros/networking.py @@ -179,16 +179,21 @@ class BSDNetworking(Networking): def __init__(self): self.ifc = ifconfig.Ifconfig() - self.ifs = {} - self._update_ifs() + self._ifs = {} super().__init__() + @property + def ifs(self) -> dict: + if not self._ifs: + self._update_ifs() + return self._ifs + def _update_ifs(self): ifconf = subp.subp(["ifconfig", "-a"]) # ``ifconfig -a`` always returns at least ``lo0``. # So this ``if`` is really just to make testing/mocking easier if ifconf[0]: - self.ifs = self.ifc.parse(ifconf[0]) + self._ifs = self.ifc.parse(ifconf[0]) def apply_network_config_names(self, netcfg: NetworkConfig) -> None: LOG.debug("Cannot rename network interface.") diff --git a/cloudinit/distros/openbsd.py b/cloudinit/distros/openbsd.py index a701580deb1..14cf3be2b8e 100644 --- a/cloudinit/distros/openbsd.py +++ b/cloudinit/distros/openbsd.py @@ -14,6 +14,16 @@ class Distro(cloudinit.distros.netbsd.NetBSD): hostname_conf_fn = "/etc/myname" init_cmd = ["rcctl"] + # For OpenBSD (from https://man.openbsd.org/passwd.5) a password field + # of "" indicates no password, and password field values of either + # "*" or "*************" (13 "*") indicate differing forms of "locked" + # but with no password defined. + shadow_empty_locked_passwd_patterns = [ + r"^{username}::", + r"^{username}:\*:", + r"^{username}:\*\*\*\*\*\*\*\*\*\*\*\*\*:", + ] + def _read_hostname(self, filename, default=None): return util.load_text_file(self.hostname_conf_fn) @@ -53,7 +63,11 @@ def lock_passwd(self, name): raise def unlock_passwd(self, name): - pass + LOG.debug( + "OpenBSD password lock is not reversible, " + "ignoring unlock for user %s", + name, + ) def _get_pkg_cmd_environ(self): """Return env vars used in OpenBSD package_command operations""" diff --git a/tests/integration_tests/modules/test_users_groups.py b/tests/integration_tests/modules/test_users_groups.py index a904cd9f6f2..809d988f8cb 100644 --- a/tests/integration_tests/modules/test_users_groups.py +++ b/tests/integration_tests/modules/test_users_groups.py @@ -11,7 +11,7 @@ from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.releases import CURRENT_RELEASE, IS_UBUNTU, JAMMY -from tests.integration_tests.util import verify_clean_log +from tests.integration_tests.util import verify_clean_boot USER_DATA = """\ #cloud-config @@ -36,6 +36,9 @@ sudo: ALL=(ALL) NOPASSWD:ALL groups: [cloud-users, secret] lock_passwd: true + - name: nopassworduser + gecos: I do not like passwords + lock_passwd: false - name: cloudy gecos: Magic Cloud App Daemon User inactive: '0' @@ -47,6 +50,10 @@ uid: 1743 """ +NEW_USER_EMPTY_PASSWD_WARNING = "Not unlocking password for user {username}. 'lock_passwd: false' present in user-data but no 'passwd'/'plain_text_passwd'/'hashed_passwd' provided in user-data" # noqa: E501 + +EXISTING_USER_EMPTY_PASSWD_WARNING = "Not unlocking blank password for existing user {username}. 'lock_passwd: false' present in user-data but no existing password set and no 'plain_text_passwd'/'hashed_passwd' provided in user-data" # noqa E501 + @pytest.mark.ci @pytest.mark.user_data(USER_DATA) @@ -86,6 +93,11 @@ class TestUsersGroups: (["passwd", "eric"], r"eric:x:1742:"), # Test int uid (["passwd", "archivist"], r"archivist:x:1743:"), + # Test int uid + ( + ["passwd", "nopassworduser"], + r"nopassworduser:x:[0-9]{4}:[0-9]{4}:I do not like passwords", + ), ], ) def test_users_groups(self, regex, getent_args, class_client): @@ -100,13 +112,43 @@ def test_users_groups(self, regex, getent_args, class_client): def test_user_root_in_secret(self, class_client): """Test root user is in 'secret' group.""" - log = class_client.read_from_file("/var/log/cloud-init.log") - verify_clean_log(log) + verify_clean_boot( + class_client, + require_warnings=[ + NEW_USER_EMPTY_PASSWD_WARNING.format(username="nopassworduser") + ], + ) output = class_client.execute("groups root").stdout _, groups_str = output.split(":", maxsplit=1) groups = groups_str.split() assert "secret" in groups + def test_nopassword_unlock_warnings(self, class_client): + """Verify warnings for empty passwords for new and existing users.""" + verify_clean_boot( + class_client, + require_warnings=[ + NEW_USER_EMPTY_PASSWD_WARNING.format(username="nopassworduser") + ], + ) + + # Fake admin clearing and unlocking and empty unlocked password foobar + # This will generate additional warnings about not unlocking passwords + # for pre-existing users which have an existing empty password + class_client.execute("passwd -d foobar") + class_client.instance.clean() + class_client.restart() + verify_clean_boot( + class_client, + ignore_warnings=True, # ignore warnings about existing groups + require_warnings=[ + EXISTING_USER_EMPTY_PASSWD_WARNING.format( + username="nopassworduser" + ), + EXISTING_USER_EMPTY_PASSWD_WARNING.format(username="foobar"), + ], + ) + @pytest.mark.user_data(USER_DATA) @pytest.mark.skipif( diff --git a/tests/unittests/distros/test_create_users.py b/tests/unittests/distros/test_create_users.py index ebbbb418e8a..819e2b9b006 100644 --- a/tests/unittests/distros/test_create_users.py +++ b/tests/unittests/distros/test_create_users.py @@ -1,10 +1,12 @@ # This file is part of cloud-init. See LICENSE file for license information. +from pathlib import Path from typing import List import pytest from cloudinit import distros, features, lifecycle, ssh_util +from tests.unittests.distros import _get_distro from tests.unittests.helpers import mock from tests.unittests.util import abstract_to_concrete @@ -16,6 +18,14 @@ def common_mocks(mocker): mocker.patch("cloudinit.distros.util.system_is_snappy", return_value=False) +def _chpasswdmock(name: str, password: str, hashed: bool = False): + """Return a mock of chpasswd call based on args""" + cmd = ["chpasswd", "-e"] if hashed else ["chpasswd"] + return mock.call( + cmd, data=f"{name}:{password}", logstring=f"chpasswd for {name}" + ) + + def _useradd2call(args: List[str]): # return a mock call for the useradd command in args # with expected 'logstring'. @@ -30,67 +40,420 @@ def _useradd2call(args: List[str]): @mock.patch("cloudinit.distros.subp.subp") class TestCreateUser: @pytest.fixture() - def dist(self): - return abstract_to_concrete(distros.Distro)( + def dist(self, tmpdir): + d = abstract_to_concrete(distros.Distro)( name="test", cfg=None, paths=None ) + # Monkey patch /etc/shadow files to tmpdir + d.shadow_fn = tmpdir.join(d.shadow_fn).strpath + d.shadow_extrausers_fn = tmpdir.join(d.shadow_extrausers_fn).strpath + return d @pytest.mark.parametrize( - "create_kwargs,expected", + "create_kwargs,is_snappy,expected", [ pytest.param( {}, + False, [ _useradd2call([USER, "-m"]), mock.call(["passwd", "-l", USER]), ], id="basic", ), + pytest.param( + {}, + True, + [ + _useradd2call([USER, "--extrausers", "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="basic_snappy", + ), pytest.param( {"no_create_home": True}, + False, [ _useradd2call([USER, "-M"]), mock.call(["passwd", "-l", USER]), ], id="no_home", ), + pytest.param( + {"no_create_home": True}, + True, + [ + _useradd2call([USER, "--extrausers", "-M"]), + mock.call(["passwd", "-l", USER]), + ], + id="no_home_snappy", + ), pytest.param( {"system": True}, + False, [ _useradd2call([USER, "--system", "-M"]), mock.call(["passwd", "-l", USER]), ], id="system_user", ), + pytest.param( + {"system": True}, + True, + [ + _useradd2call([USER, "--extrausers", "--system", "-M"]), + mock.call(["passwd", "-l", USER]), + ], + id="system_user_snappy", + ), pytest.param( {"create_no_home": False}, + False, [ _useradd2call([USER, "-m"]), mock.call(["passwd", "-l", USER]), ], id="explicit_no_home_false", ), + pytest.param( + {"create_no_home": False}, + True, + [ + _useradd2call([USER, "--extrausers", "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="explicit_no_home_false_snappy", + ), pytest.param( {"lock_passwd": False}, + False, [_useradd2call([USER, "-m"])], id="unlocked", ), pytest.param( - {"passwd": "passfoo"}, + {"lock_passwd": False}, + True, + [_useradd2call([USER, "--extrausers", "-m"])], + id="unlocked_snappy", + ), + pytest.param( + {"passwd": "$6$rounds=..."}, + False, + [ + _useradd2call([USER, "--password", "$6$rounds=...", "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="set_implicit_encrypted_password", + ), + pytest.param( + {"passwd": "$6$rounds=..."}, + True, [ - _useradd2call([USER, "--password", "passfoo", "-m"]), + _useradd2call( + [ + USER, + "--extrausers", + "--password", + "$6$rounds=...", + "-m", + ] + ), mock.call(["passwd", "-l", USER]), ], - id="set_password", + id="set_implicit_encrypted_password_snappy", + ), + pytest.param( + {"passwd": ""}, + False, + [ + _useradd2call([USER, "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="set_empty_passwd_new_user", + ), + pytest.param( + {"passwd": ""}, + True, + [ + _useradd2call([USER, "--extrausers", "-m"]), + mock.call(["passwd", "-l", USER]), + ], + id="set_empty_passwd_new_user_snappy", + ), + pytest.param( + {"plain_text_passwd": "clearfoo"}, + False, + [ + _useradd2call([USER, "-m"]), + _chpasswdmock(USER, "clearfoo"), + mock.call(["passwd", "-l", USER]), + ], + id="set_plain_text_password", + ), + pytest.param( + {"plain_text_passwd": "clearfoo"}, + True, + [ + _useradd2call([USER, "--extrausers", "-m"]), + _chpasswdmock(USER, "clearfoo"), + mock.call(["passwd", "-l", USER]), + ], + id="set_plain_text_password_snappy", + ), + pytest.param( + {"hashed_passwd": "$6$rounds=..."}, + False, + [ + _useradd2call([USER, "-m"]), + _chpasswdmock(USER, "$6$rounds=...", hashed=True), + mock.call(["passwd", "-l", USER]), + ], + id="set_explicitly_hashed_password", + ), + pytest.param( + {"hashed_passwd": "$6$rounds=..."}, + True, + [ + _useradd2call([USER, "--extrausers", "-m"]), + _chpasswdmock(USER, "$6$rounds=...", hashed=True), + mock.call(["passwd", "-l", USER]), + ], + id="set_explicitly_hashed_password_snappy", ), ], ) - def test_create_options(self, m_subp, dist, create_kwargs, expected): + @mock.patch("cloudinit.distros.util.is_user", return_value=False) + def test_create_options( + self, + m_is_user, + m_subp, + dist, + create_kwargs, + is_snappy, + expected, + mocker, + ): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=is_snappy + ) dist.create_user(name=USER, **create_kwargs) assert m_subp.call_args_list == expected + @pytest.mark.parametrize( + "shadow_content,distro_name,is_snappy,expected_logs", + ( + pytest.param( + {"/etc/shadow": f"dnsmasq:!:\n{USER}:!:"}, + "ubuntu", + False, + [ + "Not unlocking blank password for existing user " + "foo_user. 'lock_passwd: false' present in user-data " + "but no existing password set and no " + "'plain_text_passwd'/'hashed_passwd' provided in " + "user-data" + ], + id="no_unlock_on_locked_empty_user_passwd", + ), + pytest.param( + {"/var/lib/extrausers/shadow": f"dnsmasq::\n{USER}:!:"}, + "ubuntu", + True, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_in_snappy_on_locked_empty_user_passwd_in_extrausers", + ), + pytest.param( + {"/etc/shadow": f"dnsmasq::\n{USER}::"}, + "alpine", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_empty_user_passwd_alpine", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}::"}, + "dragonflybsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_empty_user_passwd_dragonflybsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}:*:"}, + "dragonflybsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_locked_format1_empty_user_passwd_dragonflybsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}:*LOCKED*:"}, + "dragonflybsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_locked_format2_empty_user_passwd_dragonflybsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}::"}, + "freebsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_empty_user_passwd_freebsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}:*:"}, + "freebsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_locked_format1_empty_user_passwd_freebsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}:*LOCKED*:"}, + "freebsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_locked_format2_empty_user_passwd_freebsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}::"}, + "netbsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_empty_format1_user_passwd_netbsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}:*************:"}, + "netbsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_empty_format2_user_passwd_netbsd", + ), + pytest.param( + { + "/etc/master.passwd": f"dnsmasq::\n{USER}:*LOCKED**************:" # noqa: E501 + }, + "netbsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_locked_empty_user_passwd_netbsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}::"}, + "openbsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_empty_user_passwd_openbsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}:*:"}, + "openbsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_locked_format1_empty_user_passwd_openbsd", + ), + pytest.param( + {"/etc/master.passwd": f"dnsmasq::\n{USER}:*************:"}, + "openbsd", + False, + ["Not unlocking blank password for existing user foo_user."], + id="no_unlock_on_locked_format2_empty_user_passwd_openbsd", + ), + ), + ) + def test_avoid_unlock_preexisting_user_empty_password( + self, + m_subp, + shadow_content, + distro_name, + is_snappy, + expected_logs, + caplog, + mocker, + tmpdir, + ): + dist = _get_distro(distro_name) + dist.shadow_fn = tmpdir.join(dist.shadow_fn).strpath + dist.shadow_extrausers_fn = tmpdir.join( + dist.shadow_extrausers_fn + ).strpath + + mocker.patch("cloudinit.distros.util.is_user", return_value=True) + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=is_snappy + ) + for filename, content in shadow_content.items(): + if dist.shadow_fn == tmpdir.join(filename).strpath: + shadow_file = Path(dist.shadow_fn) + shadow_file.parent.mkdir(parents=True, exist_ok=True) + elif dist.shadow_extrausers_fn == tmpdir.join(filename).strpath: + shadow_file = Path(dist.shadow_extrausers_fn) + shadow_file.parent.mkdir(parents=True, exist_ok=True) + else: + raise AssertionError( + f"Shadow file path {filename} not defined for distro" + f" {dist.name}" + ) + shadow_file.write_text(content) + unlock_passwd = mocker.patch.object(dist, "unlock_passwd") + dist.create_user(name=USER, lock_passwd=False) + for log in expected_logs: + assert log in caplog.text + unlock_passwd.assert_not_called() + assert m_subp.call_args_list == [] + + @pytest.mark.parametrize( + "create_kwargs,expected,expected_logs", + [ + pytest.param( + {"passwd": "$6$rounds=..."}, + [mock.call(["passwd", "-l", USER])], + [ + "'passwd' in user-data is ignored for existing user " + "foo_user" + ], + id="skip_passwd_set_on_existing_user", + ), + pytest.param( + {"plain_text_passwd": "clearfoo"}, + [ + _chpasswdmock(USER, "clearfoo"), + mock.call(["passwd", "-l", USER]), + ], + [], + id="set_plain_text_password_on_existing_user", + ), + pytest.param( + {"hashed_passwd": "$6$rounds=..."}, + [ + _chpasswdmock(USER, "$6$rounds=...", hashed=True), + mock.call(["passwd", "-l", USER]), + ], + [], + id="set_explicitly_hashed_password", + ), + ], + ) + @mock.patch("cloudinit.distros.util.is_user", return_value=True) + def test_create_passwd_existing_user( + self, + m_is_user, + m_subp, + create_kwargs, + expected, + expected_logs, + dist, + caplog, + tmpdir, + mocker, + ): + """When user exists, don't unlock on empty or locked passwords.""" + dist.create_user(name=USER, **create_kwargs) + for log in expected_logs: + assert log in caplog.text + assert m_subp.call_args_list == expected + @mock.patch("cloudinit.distros.util.is_group") - def test_group_added(self, m_is_group, m_subp, dist): + def test_group_added(self, m_is_group, m_subp, dist, mocker): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) m_is_group.return_value = False dist.create_user(USER, groups=["group1"]) expected = [ @@ -101,7 +464,24 @@ def test_group_added(self, m_is_group, m_subp, dist): assert m_subp.call_args_list == expected @mock.patch("cloudinit.distros.util.is_group") - def test_only_new_group_added(self, m_is_group, m_subp, dist): + def test_snappy_group_added(self, m_is_group, m_subp, dist, mocker): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=True + ) + m_is_group.return_value = False + dist.create_user(USER, groups=["group1"]) + expected = [ + mock.call(["groupadd", "group1", "--extrausers"]), + _useradd2call([USER, "--extrausers", "--groups", "group1", "-m"]), + mock.call(["passwd", "-l", USER]), + ] + assert m_subp.call_args_list == expected + + @mock.patch("cloudinit.distros.util.is_group") + def test_only_new_group_added(self, m_is_group, m_subp, dist, mocker): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) ex_groups = ["existing_group"] groups = ["group1", ex_groups[0]] m_is_group.side_effect = lambda m: m in ex_groups @@ -113,11 +493,34 @@ def test_only_new_group_added(self, m_is_group, m_subp, dist): ] assert m_subp.call_args_list == expected + @mock.patch("cloudinit.distros.util.is_group") + def test_snappy_only_new_group_added( + self, m_is_group, m_subp, dist, mocker + ): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=True + ) + ex_groups = ["existing_group"] + groups = ["group1", ex_groups[0]] + m_is_group.side_effect = lambda m: m in ex_groups + dist.create_user(USER, groups=groups) + expected = [ + mock.call(["groupadd", "group1", "--extrausers"]), + _useradd2call( + [USER, "--extrausers", "--groups", ",".join(groups), "-m"] + ), + mock.call(["passwd", "-l", USER]), + ] + assert m_subp.call_args_list == expected + @mock.patch("cloudinit.distros.util.is_group") def test_create_groups_with_whitespace_string( - self, m_is_group, m_subp, dist + self, m_is_group, m_subp, dist, mocker ): # groups supported as a comma delimeted string even with white space + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) m_is_group.return_value = False dist.create_user(USER, groups="group1, group2") expected = [ @@ -128,11 +531,34 @@ def test_create_groups_with_whitespace_string( ] assert m_subp.call_args_list == expected + @mock.patch("cloudinit.distros.util.is_group") + def test_snappy_create_groups_with_whitespace_string( + self, m_is_group, m_subp, dist, mocker + ): + # groups supported as a comma delimeted string even with white space + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=True + ) + m_is_group.return_value = False + dist.create_user(USER, groups="group1, group2") + expected = [ + mock.call(["groupadd", "group1", "--extrausers"]), + mock.call(["groupadd", "group2", "--extrausers"]), + _useradd2call( + [USER, "--extrausers", "--groups", "group1,group2", "-m"] + ), + mock.call(["passwd", "-l", USER]), + ] + assert m_subp.call_args_list == expected + @mock.patch("cloudinit.distros.util.is_group", return_value=False) def test_create_groups_with_dict_deprecated( - self, m_is_group, m_subp, dist, caplog + self, m_is_group, m_subp, dist, caplog, mocker ): """users.groups supports a dict value, but emit deprecation log.""" + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) dist.create_user(USER, groups={"group1": None, "group2": None}) expected = [ mock.call(["groupadd", "group1"]), @@ -157,8 +583,13 @@ def test_create_groups_with_dict_deprecated( assert "Use a comma-delimited" in caplog.records[0].message @mock.patch("cloudinit.distros.util.is_group", return_value=False) - def test_create_groups_with_list(self, m_is_group, m_subp, dist, caplog): + def test_create_groups_with_list( + self, m_is_group, m_subp, dist, caplog, mocker + ): """users.groups supports a list value.""" + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) dist.create_user(USER, groups=["group1", "group2"]) expected = [ mock.call(["groupadd", "group1"]), @@ -170,7 +601,31 @@ def test_create_groups_with_list(self, m_is_group, m_subp, dist, caplog): assert "WARNING" not in caplog.text assert "DEPRECATED" not in caplog.text - def test_explicit_sudo_false(self, m_subp, dist, caplog): + @mock.patch("cloudinit.distros.util.is_group", return_value=False) + def test_snappy_create_groups_with_list( + self, m_is_group, m_subp, dist, caplog, mocker + ): + """users.groups supports a list value.""" + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=True + ) + dist.create_user(USER, groups=["group1", "group2"]) + expected = [ + mock.call(["groupadd", "group1", "--extrausers"]), + mock.call(["groupadd", "group2", "--extrausers"]), + _useradd2call( + [USER, "--extrausers", "--groups", "group1,group2", "-m"] + ), + mock.call(["passwd", "-l", USER]), + ] + assert m_subp.call_args_list == expected + assert "WARNING" not in caplog.text + assert "DEPRECATED" not in caplog.text + + def test_explicit_sudo_false(self, m_subp, dist, caplog, mocker): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) dist.create_user(USER, sudo=False) assert m_subp.call_args_list == [ _useradd2call([USER, "-m"]), @@ -191,7 +646,10 @@ def test_explicit_sudo_false(self, m_subp, dist, caplog): " in 27.2. Use 'null' instead." ) in caplog.text - def test_explicit_sudo_none(self, m_subp, dist, caplog): + def test_explicit_sudo_none(self, m_subp, dist, caplog, mocker): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) dist.create_user(USER, sudo=None) assert m_subp.call_args_list == [ _useradd2call([USER, "-m"]), @@ -200,11 +658,26 @@ def test_explicit_sudo_none(self, m_subp, dist, caplog): assert "WARNING" not in caplog.text assert "DEPRECATED" not in caplog.text + def test_snappy_explicit_sudo_none(self, m_subp, dist, caplog, mocker): + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=True + ) + dist.create_user(USER, sudo=None) + assert m_subp.call_args_list == [ + _useradd2call([USER, "--extrausers", "-m"]), + mock.call(["passwd", "-l", USER]), + ] + assert "WARNING" not in caplog.text + assert "DEPRECATED" not in caplog.text + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_string( - self, m_setup_user_keys, m_subp, dist + self, m_setup_user_keys, m_subp, dist, mocker ): """ssh_authorized_keys allows string and calls setup_user_keys.""" + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) dist.create_user(USER, ssh_authorized_keys="mykey") assert m_subp.call_args_list == [ _useradd2call([USER, "-m"]), @@ -212,11 +685,29 @@ def test_setup_ssh_authorized_keys_with_string( ] m_setup_user_keys.assert_called_once_with({"mykey"}, USER) + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_snappy_setup_ssh_authorized_keys_with_string( + self, m_setup_user_keys, m_subp, dist, mocker + ): + """ssh_authorized_keys allows string and calls setup_user_keys.""" + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=True + ) + dist.create_user(USER, ssh_authorized_keys="mykey") + assert m_subp.call_args_list == [ + _useradd2call([USER, "--extrausers", "-m"]), + mock.call(["passwd", "-l", USER]), + ] + m_setup_user_keys.assert_called_once_with({"mykey"}, USER) + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_list( - self, m_setup_user_keys, m_subp, dist + self, m_setup_user_keys, m_subp, dist, mocker ): """ssh_authorized_keys allows lists and calls setup_user_keys.""" + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=False + ) dist.create_user(USER, ssh_authorized_keys=["key1", "key2"]) assert m_subp.call_args_list == [ _useradd2call([USER, "-m"]), @@ -224,6 +715,21 @@ def test_setup_ssh_authorized_keys_with_list( ] m_setup_user_keys.assert_called_once_with({"key1", "key2"}, USER) + @mock.patch("cloudinit.ssh_util.setup_user_keys") + def test_snappy_setup_ssh_authorized_keys_with_list( + self, m_setup_user_keys, m_subp, dist, mocker + ): + """ssh_authorized_keys allows lists and calls setup_user_keys.""" + mocker.patch( + "cloudinit.distros.util.system_is_snappy", return_value=True + ) + dist.create_user(USER, ssh_authorized_keys=["key1", "key2"]) + assert m_subp.call_args_list == [ + _useradd2call([USER, "--extrausers", "-m"]), + mock.call(["passwd", "-l", USER]), + ] + m_setup_user_keys.assert_called_once_with({"key1", "key2"}, USER) + @mock.patch("cloudinit.ssh_util.setup_user_keys") def test_setup_ssh_authorized_keys_with_integer( self, m_setup_user_keys, m_subp, dist, caplog diff --git a/tests/unittests/distros/test_dragonflybsd.py b/tests/unittests/distros/test_dragonflybsd.py index 8a240ea5fa9..5419eeeafd4 100644 --- a/tests/unittests/distros/test_dragonflybsd.py +++ b/tests/unittests/distros/test_dragonflybsd.py @@ -1,8 +1,41 @@ # This file is part of cloud-init. See LICENSE file for license information. import cloudinit.util +from tests.unittests.distros import _get_distro from tests.unittests.helpers import mock +M_PATH = "cloudinit.distros." + + +class TestDragonFlyBSD: + @mock.patch(M_PATH + "subp.subp") + def test_add_user(self, m_subp): + distro = _get_distro("dragonflybsd") + assert True is distro.add_user("me2", uid=1234, default=False) + assert [ + mock.call( + [ + "pw", + "useradd", + "-n", + "me2", + "-u", + "1234", + "-d/home/me2", + "-m", + ], + logstring=["pw", "useradd", "-n", "me2", "-d/home/me2", "-m"], + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, caplog): + distro = _get_distro("dragonflybsd") + distro.unlock_passwd("me2") + assert ( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + ) + def test_find_dragonflybsd_part(): assert cloudinit.util.find_freebsd_part("/dev/vbd0s3") == "vbd0s3" diff --git a/tests/unittests/distros/test_freebsd.py b/tests/unittests/distros/test_freebsd.py index c4c067ead71..50fb8e9ffc0 100644 --- a/tests/unittests/distros/test_freebsd.py +++ b/tests/unittests/distros/test_freebsd.py @@ -2,7 +2,6 @@ import os -from cloudinit.distros.freebsd import Distro, FreeBSDNetworking from cloudinit.util import find_freebsd_part, get_path_dev_freebsd from tests.unittests.distros import _get_distro from tests.unittests.helpers import CiTestCase, mock @@ -12,10 +11,9 @@ class TestFreeBSD: @mock.patch(M_PATH + "subp.subp") - def test_add_user(self, m_subp, mocker): - mocker.patch.object(Distro, "networking_cls", spec=FreeBSDNetworking) + def test_add_user(self, m_subp): distro = _get_distro("freebsd") - distro.add_user("me2", uid=1234, default=False) + assert True is distro.add_user("me2", uid=1234, default=False) assert [ mock.call( [ @@ -39,6 +37,14 @@ def test_add_user(self, m_subp, mocker): ) ] == m_subp.call_args_list + def test_unlock_passwd(self, caplog): + distro = _get_distro("freebsd") + distro.unlock_passwd("me2") + assert ( + "Dragonfly BSD/FreeBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + ) + class TestDeviceLookUp(CiTestCase): @mock.patch("cloudinit.subp.subp") diff --git a/tests/unittests/distros/test_netbsd.py b/tests/unittests/distros/test_netbsd.py index 2abe5ef1441..c4cb9a55122 100644 --- a/tests/unittests/distros/test_netbsd.py +++ b/tests/unittests/distros/test_netbsd.py @@ -2,6 +2,8 @@ import pytest +from tests.unittests.distros import _get_distro + try: # Blowfish not available in < 3.7, so this has never worked. Ignore failure # to import with AttributeError. We need this module imported prior to @@ -10,6 +12,28 @@ except AttributeError: pass +M_PATH = "cloudinit.distros.netbsd." + + +class TestNetBSD: + @mock.patch(M_PATH + "subp.subp") + def test_add_user(self, m_subp): + distro = _get_distro("netbsd") + assert True is distro.add_user("me2", uid=1234, default=False) + assert [ + mock.call( + ["useradd", "-m", "me2"], logstring=["useradd", "-m", "me2"] + ) + ] == m_subp.call_args_list + + @mock.patch(M_PATH + "subp.subp") + def test_unlock_passwd(self, m_subp, caplog): + distro = _get_distro("netbsd") + distro.unlock_passwd("me2") + assert [ + mock.call(["usermod", "-C", "no", "me2"]) + ] == m_subp.call_args_list + @pytest.mark.parametrize("with_pkgin", (True, False)) @mock.patch("cloudinit.distros.netbsd.os") diff --git a/tests/unittests/distros/test_openbsd.py b/tests/unittests/distros/test_openbsd.py new file mode 100644 index 00000000000..2bab0d3bd14 --- /dev/null +++ b/tests/unittests/distros/test_openbsd.py @@ -0,0 +1,26 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +from tests.unittests.distros import _get_distro +from tests.unittests.helpers import mock + +M_PATH = "cloudinit.distros.openbsd." + + +class TestOpenBSD: + @mock.patch(M_PATH + "subp.subp") + def test_add_user(self, m_subp): + distro = _get_distro("openbsd") + assert True is distro.add_user("me2", uid=1234, default=False) + assert [ + mock.call( + ["useradd", "-m", "me2"], logstring=["useradd", "-m", "me2"] + ) + ] == m_subp.call_args_list + + def test_unlock_passwd(self, caplog): + distro = _get_distro("openbsd") + distro.unlock_passwd("me2") + assert ( + "OpenBSD password lock is not reversible, " + "ignoring unlock for user me2" in caplog.text + )