From 63949dbef7d4185e17c45895599f33cb4c6c9e5d Mon Sep 17 00:00:00 2001 From: Philip Guyton Date: Fri, 29 Mar 2024 16:53:50 +0000 Subject: [PATCH] Modernise scan_disks() - no functional change intended #2826 Includes: - Modernisation re type hints in scan_disks(). - Improve lsblk line parser via modern Python builtins. Replace by-hand lsblk output line parser with dictionary comprehension. - Programmatically establish `lsblk` field derived Disk namedtuple fields. - Refactoring for readability. - Black reformatting. - Employ defaults in Disk named tuple to ease creation using lsblk info, as these defaults pertain to our extra flag info. - Normalise scan_disks()'s working dict keys to lowercase. By normalising on lowercase we can ease the transition to using our Disk named tuple as the working copy: to help readability and highlight where we write (via Disk.replace(root=True)). This should also simplify the assembly of our final return value. We also have a large number of tests that use the existing lowercase Disk fields. By working directly with the Disk namedtuple during device collation, we gain readability and the advantages of an immutable type: i.e. we are then more explicit where writes/changes happen due to requirement to use Disk._replace(). We also require less type transitions overall. --- src/rockstor/system/osi.py | 521 +++++++++++++++++-------------------- 1 file changed, 234 insertions(+), 287 deletions(-) diff --git a/src/rockstor/system/osi.py b/src/rockstor/system/osi.py index 543fbd525..b69ff2acb 100644 --- a/src/rockstor/system/osi.py +++ b/src/rockstor/system/osi.py @@ -102,17 +102,33 @@ # 5 bay no-name USB - forum member Miyuki report. EXCLUDED_SERIAL_NUMS = [ + None, "", "000000000000", # Many reports of multiple Orico models. "152D00539000", # USB ID 152d:0567 a JMS567 based device. "0123456789ABCDEF", # No-name USB external multi-bay. ] +# Field_names correspond to all used lsblk properties.lower() bar 'TRANS' to 'transport' Disk = collections.namedtuple( "Disk", - "name model serial size transport vendor " - "hctl type fstype label uuid parted root " - "partitions", + [ + "name", + "model", + "serial", + "size", + "transport", + "vendor", + "hctl", + "type", + "fstype", + "label", + "uuid", + "parted", + "root", + "partitions", + ], + defaults=[False, False, {}], ) @@ -215,15 +231,15 @@ def replace_pattern_inline(source_file, target_file, pattern, replacement): def run_command( - cmd: list[str], - shell: bool = False, - stdout: None | int | IO = subprocess.PIPE, - stderr: None | int | IO = subprocess.PIPE, - stdin: None | int | IO = subprocess.PIPE, - throw: bool = True, - log: bool = False, - pinput: AnyStr | None = None, - raw: bool = False, + cmd: list[str], + shell: bool = False, + stdout: None | int | IO = subprocess.PIPE, + stderr: None | int | IO = subprocess.PIPE, + stdin: None | int | IO = subprocess.PIPE, + throw: bool = True, + log: bool = False, + pinput: AnyStr | None = None, + raw: bool = False, ) -> (list[str] | str, list[str], int): try: # We force run_command to always use en_US @@ -265,18 +281,16 @@ def run_command( return out, err, rc -def scan_disks(min_size, test_mode=False): +def scan_disks(min_size: int, test_mode: bool = False) -> list[Disk]: """ Using lsblk we scan all attached disks and categorize them according to - if they are partitioned, their file system, if the drive hosts our / mount + if they are partitioned, their file system/s, if the drive hosts our '/' mount point etc. The result of this scan is used by:- view/disk.py _update_disk_state - for further analysis / categorization. - N.B. if a device (partition or whole dev) hosts swap or is of no interest - then it is ignored. + for further analysis / categorization, and to update the DB: if required. :param min_size: Discount all devices below this size in KB :param test_mode: Used by unit tests for deterministic 'fake-serial-' mode. - :return: List containing drives of interest + :return: List containing Disk: namedtuple members of interest. """ base_root_disk = root_disk() # /dev/sda if /dev/sda3, or md126 if md126p2 cmd = [ @@ -287,372 +301,303 @@ def scan_disks(min_size, test_mode=False): "NAME,MODEL,SERIAL,SIZE,TRAN,VENDOR,HCTL,TYPE,FSTYPE,LABEL,UUID", ] o, e, rc = run_command(cmd) - dnames = {} # Working dictionary of devices. - disks = [] # List derived from the final working dictionary of devices. - serials_seen = [] # List tally of serials seen during this scan. - # Stash variables to pass base info on root_disk to root device proper. - root_serial = root_model = root_transport = root_vendor = root_hctl = None + # Working dictionary of Disk values: indexed by a copy of their Disk.name. + dnames: dict = {} + serials_seen: list[str] = [] # List tally of serials seen during this scan. + # Stash variables to pass base info on root_disk to root partition device proper. + root_serial: str | None = None + root_model: str | None = None + root_transport: str | None = None + root_vendor: str | None = None + root_hctl: str | None = None # flag to indicate bcache backing device found. - bdev_flag = False - # To use udevadm to retrieve serial number rather than lsblk, make this - # True N.B. when lsblk returns no serial for a device then udev is used - # anyway. - always_use_udev_serial = False - device_names_seen = [] # List tally of devices seen during this scan + bdev_flag: bool = False + # To always use udevadm to retrieve serial numbers, rather than lsblk, make this True. + # N.B. when lsblk returns no serial for a device, then udev is used anyway. + always_use_udev_serial: bool = False + device_names_seen: list[str] = [] # List tally of devices seen during this scan for line in o: - # skip processing of all lines that don't begin with "NAME" - if re.match("NAME", line) is None: + # skip processing of all empty lines or those that don't begin with "NAME" + if line == "" or re.match("NAME", line) is None: continue - # setup our line / dev name dependant variables - # easy read categorization flags, all False until found otherwise. - is_root_disk = False # base dev that / is mounted on ie system disk - is_partition = is_btrfs = False - dmap = {} # to hold line info from lsblk output eg NAME: sda - # line parser variables - cur_name = "" - cur_val = "" - name_iter = True - val_iter = False - sl = line.strip() - i = 0 - while i < len(sl): - # We iterate over the line to parse it's information char by char - # keeping track of name or value and adding the char accordingly - if name_iter and sl[i] == "=" and sl[i + 1] == '"': - name_iter = False - val_iter = True - i = i + 2 - elif val_iter and sl[i] == '"' and (i == (len(sl) - 1) or sl[i + 1] == " "): - val_iter = False - name_iter = True - i = i + 2 - dmap[cur_name.strip()] = cur_val.strip() - cur_name = "" - cur_val = "" - elif name_iter: - cur_name = cur_name + sl[i] - i = i + 1 - elif val_iter: - cur_val = cur_val + sl[i] - i = i + 1 - else: - raise Exception("Failed to parse lsblk output: {}".format(sl)) + # lsblk example line (incomplete): 'NAME="/dev/sdb" MODEL="TOSHIBA MK1652GS" VENDOR="ATA " LABEL="" UUID=""' + # line.strip().split('" ') = ['NAME="/dev/sdb', 'MODEL="TOSHIBA MK1652GS', 'VENDOR="ATA ', 'LABEL="', 'UUID=""'] # noqa E501 + # Device information built from each lsblk line in turn. + blk_dev_properties: dict = { + key.lower() + if key != "TRAN" + else "transport": value.replace('"', "").strip() + if value.replace('"', "").strip() != "" + else None + for key, value in ( + key_value.split("=") for key_value in line.strip().split('" ') + ) + } + logger.debug(f"Scan_disks() using: blk_dev_properties={blk_dev_properties}") + # Disk namedtuple from lsblk line dictionary. + dev: Disk = Disk(**blk_dev_properties) + # Set up our line / dev dependant variables. + # Easy read categorization flags. + is_root_disk: bool = False # base dev that "/" is mounted on ie system disk + is_partition: bool = False + is_btrfs: bool = False # md devices, such as mdadmin software raid and some hardware raid # block devices show up in lsblk's output multiple times with identical - # info. Given we only need one copy of this info we remove duplicate - # device name entries, also offers more sane output to views/disk.py - # where name will be used as the index - if dmap["NAME"] in device_names_seen: + # info. Given we only need one copy of this info we ignore duplicate + # device name entries, also required as we use Disk.name as index. + if dev.name in device_names_seen: continue - device_names_seen.append(dmap["NAME"]) - # We are not interested in CD / DVD rom devices so skip to next device - if dmap["TYPE"] == "rom": + device_names_seen.append(dev.name) + # We are not interested in CD / DVD rom devices. + if dev.type == "rom": continue - # We are not interested in swap partitions or devices so skip further - # processing and move to next device. - # N.B. this also facilitates a simpler mechanism of classification. - if dmap["FSTYPE"] == "swap": + # We are not interested in swap devices. + if dev.fstype == "swap": continue - # convert size into KB - size_str = dmap["SIZE"] + # Convert size into KB + size_str = dev.size if size_str[-1] == "G": - dmap["SIZE"] = int(float(size_str[:-1]) * 1024 * 1024) + dev = dev._replace(size=int(float(size_str[:-1]) * 1024 * 1024)) elif size_str[-1] == "T": - dmap["SIZE"] = int(float(size_str[:-1]) * 1024 * 1024 * 1024) + dev = dev._replace(size=int(float(size_str[:-1]) * 1024 * 1024 * 1024)) else: - # Move to next line if we don't understand the size as GB or TB - # Note that this may cause an entry to be ignored if formatting - # changes. - # Previous to the explicit ignore swap clause this often caught - # swap but if swap was in GB and above min_size then it could - # show up when not in a partition (the previous caveat clause). + # Skip line if we don't understand the size as GB or TB. + # May cause an entry to be ignored if formatting changes. continue - if dmap["SIZE"] < min_size: + if int(dev.size) < min_size: continue # ----- Now we are done with easy exclusions we begin classification. - # If md device populate unused MODEL with basic member/raid summary. - if re.match("/dev/md", dmap["NAME"]) is not None: + # If md device, populate otherwise unused MODEL with basic member/raid summary. + if re.match("/dev/md", dev.name) is not None: # cheap way to display our member drives - dmap["MODEL"] = get_md_members(dmap["NAME"]) + dev = dev._replace(model=get_md_members(dev.name)) # ------------ Start more complex classification ------------- - if dmap["NAME"] == base_root_disk: # as returned by root_disk() + if dev.name == base_root_disk: # as returned by root_disk() # We are looking at the system drive that hosts, either - # directly or as a partition, the / mount point. + # directly or as a partition, the "/" mount point. # Given lsblk doesn't return serial, model, transport, vendor, hctl # when displaying partitions we grab and stash them while we are - # looking at the root drive directly, rather than the / partition. + # looking at the root drive directly, rather than the "/" partition. # N.B. assumption is lsblk first displays devices then partitions, # this is the observed behaviour so far. - root_serial = dmap["SERIAL"] - root_model = dmap["MODEL"] - root_transport = dmap["TRAN"] - root_vendor = dmap["VENDOR"] - root_hctl = dmap["HCTL"] + root_serial = dev.serial + root_model = dev.model + root_transport = dev.transport + root_vendor = dev.vendor + root_hctl = dev.hctl # Set readability flag as base_dev identified. is_root_disk = True # root as returned by root_disk() # And until we find a partition on this root disk we will label it # as our root, this then allows for non partitioned root devices # such as mdraid installs where root is directly on eg /dev/md126. # N.B. this assumes base devs are listed before their partitions. - dmap["root"] = True - # Normal partitions are of type 'part', md partitions are of type 'md' - # normal disks are of type 'disk' md devices are of type eg 'raid1'. - # Disk members of eg intel bios raid md devices - # fstype='isw_raid_member' Note for future re-write; when using udevadm - # DEVTYPE, partition and disk works for both raid and non raid - # partitions and devices. + dev = dev._replace(root=True) + # Normal partitions are of type 'part', md partitions are of type 'md'. + # Normal disks are of type 'disk', md devices are of type e.g. 'raid1'. + # Disk members of e.g. intel bios raid md devices: fstype='isw_raid_member'. + # Note for future re-write; when using udevadm DEVTYPE, partition and disk + # works for both raid and non raid partitions and devices. # ----- Begin readability variables assignment: - # - is this a partition, regular or md type. - if dmap["TYPE"] == "part" or dmap["TYPE"] == "md": + if dev.type == "part" or dev.type == "md": is_partition = True - # - is filesystem of type btrfs - if dmap["FSTYPE"] == "btrfs": + if dev.fstype == "btrfs": is_btrfs = True # End readability variables assignment if is_partition: - dmap["parted"] = True - # We don't account for partitions within partitions, but making - # an empty dict here simplifies conditionals as always a dict then. - dmap["partitions"] = {} - # Search our working dictionary of already scanned devices by name - # We are assuming base devices are listed first and if of interest - # we have recorded it and can now back port it's partitioned - # status. + dev = dev._replace(parted=True) + # Search our working dictionary of already scanned devices by name. + # We are assuming base devices are listed first by lsblk so we can + # now back port to the parent it's partitioned status. for dname in dnames.keys(): - if re.match(dname, dmap["NAME"]) is not None: - # Our device name has a base device entry of interest - # saved: ie we have scanned and saved sdb but looking at - # sdb3 now. Given we have found a partition on an existing + if re.match(dname, dev.name) is not None: + # Our device name has a base/parent device entry of interest + # saved: ie we have scanned and saved sdb, but we are now looking + # at sdb3. Given we have found a partition on an existing # base dev we should update that base dev's entry in dnames - # to parted "True" as when recorded lsblk type on base - # device would have been disk or RAID1 or raid1 (for base - # md dev). Change the 12th entry (0 indexed) of this - # device to True The 12 entry is the parted flag so we - # label our existing base dev entry as parted ie - # partitioned. - dnames[dname][11] = True - # Also take this opportunity to back port software raid + # to have `parted=True` as when parsing lsblk type of the base + # device, it would have been disk or RAID1 or raid1 (for base + # md dev). + dnames[dname] = dnames[dname]._replace(parted=True) + # Also take this opportunity to back-port software raid # info from partitions to the base device if the base - # device doesn't already have an fstype identifying it's + # device doesn't already have an fstype identifying its # raid member status. For Example:- bios raid base dev - # gives lsblk FSTYPE="isw_raid_member"; we already catch + # gives lsblk fstype="isw_raid_member"; we already catch # this directly. Pure software mdraid base dev has lsblk - # FSTYPE="" but a partition on this pure software mdraid - # that is a member of eg md125 has - # FSTYPE="linux_raid_member" - # Add the same treatment for partitions hosting LUKS - # containers. - if dmap["FSTYPE"] == "linux_raid_member" and ( - dnames[dname][8] is None + # 'fstype=""' but a partition on this pure software mdraid + # that is a member of eg md125 has fstype="linux_raid_member". + # Add the same treatment for partitions hosting LUKS containers. + if dev.fstype == "linux_raid_member" and ( + dnames[dname].fstype is None ): - # N.B. 9th item (index 8) in dname = FSTYPE We are a - # partition that is an mdraid raid member so backport - # this info to our base device ie sda1 raid member so - # label sda's FSTYPE entry the same as it's partition's - # entry if the above condition is met, ie only if the - # base device doesn't already have an FSTYPE entry ie + # We are a partition that is a mdraid raid member, so backport + # this info to our base device, i.e. sda1 raid member, so + # label sda's fstype= entry the same as its partition's + # entry if the above condition is met, i.e. only if the + # base device doesn't already have an fstype= entry i.e. # None, this way we don't overwrite / loose info and we # only need to have one partition identified as an # mdraid member to classify the entire device (the base # device) as a raid member, at least in part. - dnames[dname][8] = dmap["FSTYPE"] - if dmap["FSTYPE"] == "crypto_LUKS" and (dnames[dname][8] is None): - # As per mdraid we backport to the base device LUKS - # containers that live in partitions as the base device - # will have an FSTYPE="" and as per mdraid we classify - # the entire device as a LUKS container member even if + dnames[dname] = dnames[dname]._replace(fstype=dev.fstype) + if dev.fstype == "crypto_LUKS" and (dnames[dname].fstype is None): + # As per mdraid, we backport to the base device LUKS + # containers that live in partitions, as the base device + # will have an fstype="", and as per mdraid we classify + # the entire device as a LUKS container member, even if # it is only in part (ie this partition). But we only # backport this information if there currently exists - # no FSTYPE on the base device, there by protecting + # no fstype= on the base device, there by protecting # against fstype information loss on the base device. - # Please see mdraid partition treatment for additional - # comments on index number used. - dnames[dname][8] = dmap["FSTYPE"] - # and uuid backport - dnames[dname][10] = dmap["UUID"] - # Akin to back porting a partitions FSTYPE to it's base - # device, as with 'linux_raid_member' above, we can do the - # same for btrfs if found in a partition. + dnames[dname] = dnames[dname]._replace( + fstype=dev.fstype, uuid=dev.uuid + ) + # Akin to back porting a partitions' fstype to its base device, + # as with 'linux_raid_member' above, we can do the + # same for btrfs-in-partition. # This is intended to facilitate the user redirection role - # so that the base disk can be labeled with it's partitions - # fstype, label (for pool updates) uuid, and size. + # so that the base disk can be labeled with its partitions fstype, + # label (for pool updates), uuid, and size. # N.B. The base device info will end up pertaining to the # highest partition numbers details. Design limitation. - if is_btrfs and dnames[dname][8] is None: + if is_btrfs and dnames[dname].fstype is None: # We are a btrfs partition where the base device has no - # fstype entry: backport: fstype, label, uuid & size. - # fstype backport - dnames[dname][8] = dmap["FSTYPE"] - # label backport is at index 9 - dnames[dname][9] = dmap["LABEL"] - # and uuid backport - dnames[dname][10] = dmap["UUID"] - # and size backport - dnames[dname][3] = dmap["SIZE"] + # fstype entry: backport: size, fstype, label, & uuid. + dnames[dname] = dnames[dname]._replace( + size=dev.size, + fstype=dev.fstype, + label=dev.label, + uuid=dev.uuid, + ) # Build a dictionary of the partitions we find. # Back port our current name as a partition entry in our - # base devices 'partitions' dictionary 14th item (index 13) - dnames[dname][13][dmap["NAME"]] = dmap["FSTYPE"] + # base devices 'partitions' dictionary: + dnames[dname].partitions[dev.name] = dev.fstype # This dict is intended for use later in roles such as # import / export devices or external backup drives so # that the role config mechanism can offer up the known # partitions found so that the eventual configured role # will know which partition on the role based device to - # work with and it current filesystem type. - # Has one role per device limit but helps to keep usability - # and underlying disk management simpler. + # work with and its current filesystem type. + # There is a 'one role per device' limit, this helps with + # usability, and reduces underlying disk management complexity. else: - # We are not a partition so record this. - dmap["parted"] = False - # As we are not a partition it is assumed that we might hold a - # partition so start an empty partition dictionary for this. + # TODO: likely this else clause is now redundant given our namedtuple's defaults. + # We are not a partition so record this via bool flag and empty partitions dict. # N.B. This assumes base devices are listed before their partitions - dmap["partitions"] = {} - # This dict will be populated when we find our partitions and back - # port their names and fstype (as values). + dev = dev._replace(parted=False, partitions={}) if (not is_root_disk and not is_partition) or is_btrfs: - # We have a non system disk that is not a partition - # or - # We have a device that is btrfs formatted - # Or we may just be a non system disk without partitions. - dmap["root"] = is_root_disk + # We have a non system disk that is not a partition. Or + # We have a device that is btrfs formatted. Or + # We may just be a non system disk without partitions. + dev = dev._replace(root=is_root_disk) if is_btrfs: - # a btrfs file system # Regex to identify a partition on the base_root_disk. # Root on 'sda3' gives base_root_disk 'sda'. - if re.match("/dev/sd|/dev/vd", dmap["NAME"]) is not None: + if re.match("/dev/sd|/dev/vd", dev.name) is not None: # eg 'sda' or 'vda' with >= one additional digit, part_regex = base_root_disk + "\d+" else: # md126 or nvme0n1 with 'p' + >= one additional digit eg: # md126p3 or nvme0n1p4; also mmcblk0p2 for base mmcblk0. part_regex = base_root_disk + "p\d+" - if re.match(part_regex, dmap["NAME"]) is not None: + if re.match(part_regex, dev.name) is not None: logger.debug("--- Inheriting base_root_disk info ---") # We are assuming that a partition with a btrfs fs on is - # our root if it's name begins with our base system disk - # name. Now add the properties we stashed when looking at - # the base root disk rather than the root partition we see - # here. - dmap["SERIAL"] = root_serial - dmap["MODEL"] = root_model - dmap["TRAN"] = root_transport - dmap["VENDOR"] = root_vendor - dmap["HCTL"] = root_hctl - # As we have found root to be on a partition we can now un - # flag the base device as having been root prior to finding - # this partition on that base_root_disk N.B. Assumes base - # dev is listed before it's partitions The 13th item in - # dnames entries is root so index = 12. Only update our + # our root, if its name begins with our base system disk name. + # Now add the properties we stashed when looking at + # the base root disk rather than the root partition we see here. + dev = dev._replace( + model=root_model, + serial=root_serial, + transport=root_transport, + vendor=root_vendor, + hctl=root_hctl, + ) + # As we have found root to be on a partition, we can now un-flag + # the base device as having been root prior to finding + # this partition on that base_root_disk. N.B. Assumes base + # dev is listed before it's partitions. Only update our # base_root_disk if it exists in our scanned disks as this # may be the first time we are seeing it. Search to see if - # we already have an entry for the the base_root_disk which - # may be us or our base dev if we are a partition + # we already have an entry for the base_root_disk which + # may be us, or our base dev, if we are a partition. for dname in dnames.keys(): if dname == base_root_disk: dnames[base_root_disk][12] = False - # And update this device as real root + # And update this device as real root. # Note we may be looking at the base_root_disk or one of - # it's partitions there after. - dmap["root"] = True + # its partitions thereafter. + dev = dev._replace(root=True) else: # We have a non system disk btrfs filesystem. - # Ie we are a whole disk or a partition with btrfs on but - # NOT on the system disk. + # I.e. we are a whole disk or a partition with btrfs on, + # but NOT on the system disk. # Most likely a current btrfs data drive or one we could # import. - # Ignore / skip this btrfs device if it's a partition + # Ignore / skip this btrfs device if it is a partition. if is_partition: logger.debug("-- Skipping non root btrfs partition -") continue - # No more continues so the device we have is to be passed to our db - # entry system views/disk.py ie _update_disk_state() - # Do final tidy of data in dmap and ready for entry in dnames dict. - # db needs unique serial so provide one where there is none found. + # No more continues so the device we have is to be passed to our DB + # entry system views/disk.py: _update_disk_state(). + # Do final tidy of data in dev and ready for entry in dnames dict. + # DB needs unique serial, so provide one where there is None found. # First try harder with udev if lsblk failed on serial retrieval. - if dmap["SERIAL"] == "" or always_use_udev_serial: + if dev.serial is None or dev.serial == "" or always_use_udev_serial: # lsblk fails to retrieve SERIAL from VirtIO drives and some - # sdcard devices and md devices so try specialized function. - dmap["SERIAL"] = get_disk_serial(dmap["NAME"], dmap["TYPE"]) + # SD Card devices, and MD devices, so try specialized function. + dev = dev._replace(serial=get_disk_serial(dev.name, dev.type)) # Now try specialized serial propagation methods: - # Bcache virtual block devices get their backing devices uuid - # We propagate the uuid for a bcache backing device to it's virtual + # Bcache virtual block devices get their backing devices uuid. + # We propagate the Disk.uuid for a bcache backing device to its virtual # counterpart device for use as a serial number. - # Note however that we are only interested in the 'backing device' - # type of bcache as it has the counterpart virtual block device. - if dmap["FSTYPE"] == "bcache": - bcache_dev_type = get_bcache_device_type(dmap["NAME"]) + # Note that we are only interested in the 'backing device' type of bcache, + # as it has the counterpart virtual block device. + if dev.fstype == "bcache": + bcache_dev_type = get_bcache_device_type(dev.name) if bcache_dev_type == "bdev": - bdev_uuid = dmap["UUID"] - # We set out bdev_flag to inform the next device - # interpretation. + bdev_uuid = dev.uuid bdev_flag = True elif bcache_dev_type == "cdev": # We have a bcache caching device, not a backing device. - # Change fstype as an indicator to _update_disk_state() - # role system. N.B. fstype bcachecdev is fictitious. - dmap["FSTYPE"] = "bcachecdev" + # Change fstype as an indicator to _update_disk_state()'s + # role system. N.B. fstype "bcachecdev" is fictitious. + dev = dev._replace(fstype="bcachecdev") else: - # we are a non bcache bdev but we might be the virtual device + # we are a non bcache bdev, but we might be the virtual device # if we are listed directly after a bcache bdev. if bdev_flag: # Assumption is there is only one virtual device for each # bdev and that it is listed directly after it's associated - # bdev. We are listed directly after a bcache bdev but + # bdev. We are listed directly after a bcache bdev, but # could still be any device. As no cheap distinguishing - # properties we for now rely on device name: - if re.match("bcache", dmap["NAME"]) is not None: + # properties we, for now, rely on the device name: + if re.match("bcache", dev.name) is not None: # We avoid overwriting any serial just in case, normal # bcache virtual devices have no serial reported by - # lsblk but future lsblk versions may change this. - if dmap["SERIAL"] == "": + # lsblk, but future lsblk versions may change this. + if dev.serial is None or dev.serial == "": # transfer our stashed bdev uuid as a serial. - dmap["SERIAL"] = "bcache-{}".format(bdev_uuid) - # reset the bdev_flag as we are only interested in devices + dev = dev._replace(serial=f"bcache-{bdev_uuid}") + # Reset the bdev_flag as we are only interested in devices # listed directly after a bdev anyway. bdev_flag = False - if (dmap["SERIAL"] in EXCLUDED_SERIAL_NUMS) or ( - dmap["SERIAL"] in serials_seen - ): - # No serial number still or its a repeat. Overwrite drive - # serial entry in dmap with fake-serial- + uuid4. See js/template/ - # disk/disks_table.jst for a use of this flag mechanism. - # Previously we did dmap['SERIAL'] = dmap['NAME'] which is less - # robust as it can itself produce duplicate serial numbers. + if (dev.serial in EXCLUDED_SERIAL_NUMS) or (dev.serial in serials_seen): + # Overwrite dev.serial with fake-serial- + uuid4. + # See js/template/disk/disks_table.jst for a use of this flag mechanism. if test_mode: # required for reproducible output for repeatable tests - dmap["SERIAL"] = "fake-serial-" + dev = dev._replace(serial="fake-serial-") else: # 12 chars (fake-serial-) + 36 chars (uuid4) = 48 chars - dmap["SERIAL"] = "fake-serial-" + str(uuid.uuid4()) - serials_seen.append(dmap["SERIAL"]) - # replace all dmap values of '' with None. - for key in dmap.keys(): - if dmap[key] == "": - dmap[key] = None - # transfer our device info as now parsed in dmap to the dnames dict - dnames[dmap["NAME"]] = [ - dmap["NAME"], - dmap["MODEL"], - dmap["SERIAL"], - dmap["SIZE"], - dmap["TRAN"], - dmap["VENDOR"], - dmap["HCTL"], - dmap["TYPE"], - dmap["FSTYPE"], - dmap["LABEL"], - dmap["UUID"], - dmap["parted"], - dmap["root"], - dmap["partitions"], - ] - # Transfer our collected disk / dev entries of interest to the disks list. - for d in dnames.keys(): - disks.append(Disk(*dnames[d])) - # logger.debug('disks item = {} '.format(Disk(*dnames[d]))) - return disks + dev = dev._replace(serial="fake-serial-" + str(uuid.uuid4())) + serials_seen.append(dev.serial) + # N.B. no dev.field should be = "": but None of NoneType instead. + # Transfer our now processed Disk into the dnames dict; indexed by dev.name + dnames[dev.name] = dev + return list(dnames.values()) def uptime(): @@ -1174,11 +1119,11 @@ def root_disk(): raise NonBTRFSRootException(msg) -def get_md_members(device_name, test=None): +def get_md_members(device_name, test=None) -> str | None: """ Returns the md members from a given device, if the given device is not an - md device or the udevadm info command returns a non 0 (error) then an - empty string is returned. + md device or the udevadm info command returns a non 0 (error) then None + is returned. Example lines to parse from udevadmin:- E: MD_DEVICE_sda_DEV=/dev/sda E: MD_DEVICE_sda_ROLE=0 @@ -1196,7 +1141,7 @@ def get_md_members(device_name, test=None): line_fields = [] # if non md device then return empty string if re.match("/dev/md", device_name) is None: - return "" + return None members_string = "" if test is None: out, err, rc = run_command( @@ -1207,7 +1152,7 @@ def get_md_members(device_name, test=None): out = test rc = 0 if rc != 0: # if return code is an error return empty string - return "" + return None # search output of udevadmin to find all current md members. for line in out: if line == "": @@ -1237,10 +1182,12 @@ def get_md_members(device_name, test=None): # > 1 char value that doesn't start with /dev, so raid # level members_string += line_fields[2] + if members_string == "": + members_string = None return members_string -def get_disk_serial(device_name, device_type=None, test=None): +def get_disk_serial(device_name, device_type=None, test=None) -> str | None: """Returns the serial number of device_name using udevadm to match that returned by lsblk. N.B. udevadm has been observed to return the following:- ID_SCSI_SERIAL rarely seen @@ -1261,9 +1208,9 @@ def get_disk_serial(device_name, device_type=None, test=None): None as an indication that the caller cannot provide this info. :param test: When not None this parameter's contents is substituted for the return of the udevadm info --name=device_name command output - :return: 12345678901234567890 or empty string if no serial was retrieved. + :return: 12345678901234567890 or None if no serial was retrieved. """ - serial_num = "" + serial_num = None uuid_search_string = "" line_fields = [] # udevadm requires the full path for Device Mapped (DM) disks so if our @@ -1328,10 +1275,10 @@ def get_disk_serial(device_name, device_type=None, test=None): if line_fields[1] == "ID_SERIAL": # SERIAL is sometimes our only option but only use it if we # have found nothing else. - if serial_num == "": + if serial_num == "" or serial_num is None: serial_num = line_fields[2] # should return one of the following in order of priority - # SCSI_SERIAL, SERIAL_SHORT, SERIAL + # SCSI_SERIAL, SERIAL_SHORT, SERIAL, None return serial_num