diff --git a/src/rockstor/system/osi.py b/src/rockstor/system/osi.py index 543fbd525..b69ff2acb 100644 --- a/src/rockstor/system/osi.py +++ b/src/rockstor/system/osi.py @@ -102,17 +102,33 @@ # 5 bay no-name USB - forum member Miyuki report. EXCLUDED_SERIAL_NUMS = [ + None, "", "000000000000", # Many reports of multiple Orico models. "152D00539000", # USB ID 152d:0567 a JMS567 based device. "0123456789ABCDEF", # No-name USB external multi-bay. ] +# Field_names correspond to all used lsblk properties.lower() bar 'TRANS' to 'transport' Disk = collections.namedtuple( "Disk", - "name model serial size transport vendor " - "hctl type fstype label uuid parted root " - "partitions", + [ + "name", + "model", + "serial", + "size", + "transport", + "vendor", + "hctl", + "type", + "fstype", + "label", + "uuid", + "parted", + "root", + "partitions", + ], + defaults=[False, False, {}], ) @@ -215,15 +231,15 @@ def replace_pattern_inline(source_file, target_file, pattern, replacement): def run_command( - cmd: list[str], - shell: bool = False, - stdout: None | int | IO = subprocess.PIPE, - stderr: None | int | IO = subprocess.PIPE, - stdin: None | int | IO = subprocess.PIPE, - throw: bool = True, - log: bool = False, - pinput: AnyStr | None = None, - raw: bool = False, + cmd: list[str], + shell: bool = False, + stdout: None | int | IO = subprocess.PIPE, + stderr: None | int | IO = subprocess.PIPE, + stdin: None | int | IO = subprocess.PIPE, + throw: bool = True, + log: bool = False, + pinput: AnyStr | None = None, + raw: bool = False, ) -> (list[str] | str, list[str], int): try: # We force run_command to always use en_US @@ -265,18 +281,16 @@ def run_command( return out, err, rc -def scan_disks(min_size, test_mode=False): +def scan_disks(min_size: int, test_mode: bool = False) -> list[Disk]: """ Using lsblk we scan all attached disks and categorize them according to - if they are partitioned, their file system, if the drive hosts our / mount + if they are partitioned, their file system/s, if the drive hosts our '/' mount point etc. The result of this scan is used by:- view/disk.py _update_disk_state - for further analysis / categorization. - N.B. if a device (partition or whole dev) hosts swap or is of no interest - then it is ignored. + for further analysis / categorization, and to update the DB: if required. :param min_size: Discount all devices below this size in KB :param test_mode: Used by unit tests for deterministic 'fake-serial-' mode. - :return: List containing drives of interest + :return: List containing Disk: namedtuple members of interest. """ base_root_disk = root_disk() # /dev/sda if /dev/sda3, or md126 if md126p2 cmd = [ @@ -287,372 +301,303 @@ def scan_disks(min_size, test_mode=False): "NAME,MODEL,SERIAL,SIZE,TRAN,VENDOR,HCTL,TYPE,FSTYPE,LABEL,UUID", ] o, e, rc = run_command(cmd) - dnames = {} # Working dictionary of devices. - disks = [] # List derived from the final working dictionary of devices. - serials_seen = [] # List tally of serials seen during this scan. - # Stash variables to pass base info on root_disk to root device proper. - root_serial = root_model = root_transport = root_vendor = root_hctl = None + # Working dictionary of Disk values: indexed by a copy of their Disk.name. + dnames: dict = {} + serials_seen: list[str] = [] # List tally of serials seen during this scan. + # Stash variables to pass base info on root_disk to root partition device proper. + root_serial: str | None = None + root_model: str | None = None + root_transport: str | None = None + root_vendor: str | None = None + root_hctl: str | None = None # flag to indicate bcache backing device found. - bdev_flag = False - # To use udevadm to retrieve serial number rather than lsblk, make this - # True N.B. when lsblk returns no serial for a device then udev is used - # anyway. - always_use_udev_serial = False - device_names_seen = [] # List tally of devices seen during this scan + bdev_flag: bool = False + # To always use udevadm to retrieve serial numbers, rather than lsblk, make this True. + # N.B. when lsblk returns no serial for a device, then udev is used anyway. + always_use_udev_serial: bool = False + device_names_seen: list[str] = [] # List tally of devices seen during this scan for line in o: - # skip processing of all lines that don't begin with "NAME" - if re.match("NAME", line) is None: + # skip processing of all empty lines or those that don't begin with "NAME" + if line == "" or re.match("NAME", line) is None: continue - # setup our line / dev name dependant variables - # easy read categorization flags, all False until found otherwise. - is_root_disk = False # base dev that / is mounted on ie system disk - is_partition = is_btrfs = False - dmap = {} # to hold line info from lsblk output eg NAME: sda - # line parser variables - cur_name = "" - cur_val = "" - name_iter = True - val_iter = False - sl = line.strip() - i = 0 - while i < len(sl): - # We iterate over the line to parse it's information char by char - # keeping track of name or value and adding the char accordingly - if name_iter and sl[i] == "=" and sl[i + 1] == '"': - name_iter = False - val_iter = True - i = i + 2 - elif val_iter and sl[i] == '"' and (i == (len(sl) - 1) or sl[i + 1] == " "): - val_iter = False - name_iter = True - i = i + 2 - dmap[cur_name.strip()] = cur_val.strip() - cur_name = "" - cur_val = "" - elif name_iter: - cur_name = cur_name + sl[i] - i = i + 1 - elif val_iter: - cur_val = cur_val + sl[i] - i = i + 1 - else: - raise Exception("Failed to parse lsblk output: {}".format(sl)) + # lsblk example line (incomplete): 'NAME="/dev/sdb" MODEL="TOSHIBA MK1652GS" VENDOR="ATA " LABEL="" UUID=""' + # line.strip().split('" ') = ['NAME="/dev/sdb', 'MODEL="TOSHIBA MK1652GS', 'VENDOR="ATA ', 'LABEL="', 'UUID=""'] # noqa E501 + # Device information built from each lsblk line in turn. + blk_dev_properties: dict = { + key.lower() + if key != "TRAN" + else "transport": value.replace('"', "").strip() + if value.replace('"', "").strip() != "" + else None + for key, value in ( + key_value.split("=") for key_value in line.strip().split('" ') + ) + } + logger.debug(f"Scan_disks() using: blk_dev_properties={blk_dev_properties}") + # Disk namedtuple from lsblk line dictionary. + dev: Disk = Disk(**blk_dev_properties) + # Set up our line / dev dependant variables. + # Easy read categorization flags. + is_root_disk: bool = False # base dev that "/" is mounted on ie system disk + is_partition: bool = False + is_btrfs: bool = False # md devices, such as mdadmin software raid and some hardware raid # block devices show up in lsblk's output multiple times with identical - # info. Given we only need one copy of this info we remove duplicate - # device name entries, also offers more sane output to views/disk.py - # where name will be used as the index - if dmap["NAME"] in device_names_seen: + # info. Given we only need one copy of this info we ignore duplicate + # device name entries, also required as we use Disk.name as index. + if dev.name in device_names_seen: continue - device_names_seen.append(dmap["NAME"]) - # We are not interested in CD / DVD rom devices so skip to next device - if dmap["TYPE"] == "rom": + device_names_seen.append(dev.name) + # We are not interested in CD / DVD rom devices. + if dev.type == "rom": continue - # We are not interested in swap partitions or devices so skip further - # processing and move to next device. - # N.B. this also facilitates a simpler mechanism of classification. - if dmap["FSTYPE"] == "swap": + # We are not interested in swap devices. + if dev.fstype == "swap": continue - # convert size into KB - size_str = dmap["SIZE"] + # Convert size into KB + size_str = dev.size if size_str[-1] == "G": - dmap["SIZE"] = int(float(size_str[:-1]) * 1024 * 1024) + dev = dev._replace(size=int(float(size_str[:-1]) * 1024 * 1024)) elif size_str[-1] == "T": - dmap["SIZE"] = int(float(size_str[:-1]) * 1024 * 1024 * 1024) + dev = dev._replace(size=int(float(size_str[:-1]) * 1024 * 1024 * 1024)) else: - # Move to next line if we don't understand the size as GB or TB - # Note that this may cause an entry to be ignored if formatting - # changes. - # Previous to the explicit ignore swap clause this often caught - # swap but if swap was in GB and above min_size then it could - # show up when not in a partition (the previous caveat clause). + # Skip line if we don't understand the size as GB or TB. + # May cause an entry to be ignored if formatting changes. continue - if dmap["SIZE"] < min_size: + if int(dev.size) < min_size: continue # ----- Now we are done with easy exclusions we begin classification. - # If md device populate unused MODEL with basic member/raid summary. - if re.match("/dev/md", dmap["NAME"]) is not None: + # If md device, populate otherwise unused MODEL with basic member/raid summary. + if re.match("/dev/md", dev.name) is not None: # cheap way to display our member drives - dmap["MODEL"] = get_md_members(dmap["NAME"]) + dev = dev._replace(model=get_md_members(dev.name)) # ------------ Start more complex classification ------------- - if dmap["NAME"] == base_root_disk: # as returned by root_disk() + if dev.name == base_root_disk: # as returned by root_disk() # We are looking at the system drive that hosts, either - # directly or as a partition, the / mount point. + # directly or as a partition, the "/" mount point. # Given lsblk doesn't return serial, model, transport, vendor, hctl # when displaying partitions we grab and stash them while we are - # looking at the root drive directly, rather than the / partition. + # looking at the root drive directly, rather than the "/" partition. # N.B. assumption is lsblk first displays devices then partitions, # this is the observed behaviour so far. - root_serial = dmap["SERIAL"] - root_model = dmap["MODEL"] - root_transport = dmap["TRAN"] - root_vendor = dmap["VENDOR"] - root_hctl = dmap["HCTL"] + root_serial = dev.serial + root_model = dev.model + root_transport = dev.transport + root_vendor = dev.vendor + root_hctl = dev.hctl # Set readability flag as base_dev identified. is_root_disk = True # root as returned by root_disk() # And until we find a partition on this root disk we will label it # as our root, this then allows for non partitioned root devices # such as mdraid installs where root is directly on eg /dev/md126. # N.B. this assumes base devs are listed before their partitions. - dmap["root"] = True - # Normal partitions are of type 'part', md partitions are of type 'md' - # normal disks are of type 'disk' md devices are of type eg 'raid1'. - # Disk members of eg intel bios raid md devices - # fstype='isw_raid_member' Note for future re-write; when using udevadm - # DEVTYPE, partition and disk works for both raid and non raid - # partitions and devices. + dev = dev._replace(root=True) + # Normal partitions are of type 'part', md partitions are of type 'md'. + # Normal disks are of type 'disk', md devices are of type e.g. 'raid1'. + # Disk members of e.g. intel bios raid md devices: fstype='isw_raid_member'. + # Note for future re-write; when using udevadm DEVTYPE, partition and disk + # works for both raid and non raid partitions and devices. # ----- Begin readability variables assignment: - # - is this a partition, regular or md type. - if dmap["TYPE"] == "part" or dmap["TYPE"] == "md": + if dev.type == "part" or dev.type == "md": is_partition = True - # - is filesystem of type btrfs - if dmap["FSTYPE"] == "btrfs": + if dev.fstype == "btrfs": is_btrfs = True # End readability variables assignment if is_partition: - dmap["parted"] = True - # We don't account for partitions within partitions, but making - # an empty dict here simplifies conditionals as always a dict then. - dmap["partitions"] = {} - # Search our working dictionary of already scanned devices by name - # We are assuming base devices are listed first and if of interest - # we have recorded it and can now back port it's partitioned - # status. + dev = dev._replace(parted=True) + # Search our working dictionary of already scanned devices by name. + # We are assuming base devices are listed first by lsblk so we can + # now back port to the parent it's partitioned status. for dname in dnames.keys(): - if re.match(dname, dmap["NAME"]) is not None: - # Our device name has a base device entry of interest - # saved: ie we have scanned and saved sdb but looking at - # sdb3 now. Given we have found a partition on an existing + if re.match(dname, dev.name) is not None: + # Our device name has a base/parent device entry of interest + # saved: ie we have scanned and saved sdb, but we are now looking + # at sdb3. Given we have found a partition on an existing # base dev we should update that base dev's entry in dnames - # to parted "True" as when recorded lsblk type on base - # device would have been disk or RAID1 or raid1 (for base - # md dev). Change the 12th entry (0 indexed) of this - # device to True The 12 entry is the parted flag so we - # label our existing base dev entry as parted ie - # partitioned. - dnames[dname][11] = True - # Also take this opportunity to back port software raid + # to have `parted=True` as when parsing lsblk type of the base + # device, it would have been disk or RAID1 or raid1 (for base + # md dev). + dnames[dname] = dnames[dname]._replace(parted=True) + # Also take this opportunity to back-port software raid # info from partitions to the base device if the base - # device doesn't already have an fstype identifying it's + # device doesn't already have an fstype identifying its # raid member status. For Example:- bios raid base dev - # gives lsblk FSTYPE="isw_raid_member"; we already catch + # gives lsblk fstype="isw_raid_member"; we already catch # this directly. Pure software mdraid base dev has lsblk - # FSTYPE="" but a partition on this pure software mdraid - # that is a member of eg md125 has - # FSTYPE="linux_raid_member" - # Add the same treatment for partitions hosting LUKS - # containers. - if dmap["FSTYPE"] == "linux_raid_member" and ( - dnames[dname][8] is None + # 'fstype=""' but a partition on this pure software mdraid + # that is a member of eg md125 has fstype="linux_raid_member". + # Add the same treatment for partitions hosting LUKS containers. + if dev.fstype == "linux_raid_member" and ( + dnames[dname].fstype is None ): - # N.B. 9th item (index 8) in dname = FSTYPE We are a - # partition that is an mdraid raid member so backport - # this info to our base device ie sda1 raid member so - # label sda's FSTYPE entry the same as it's partition's - # entry if the above condition is met, ie only if the - # base device doesn't already have an FSTYPE entry ie + # We are a partition that is a mdraid raid member, so backport + # this info to our base device, i.e. sda1 raid member, so + # label sda's fstype= entry the same as its partition's + # entry if the above condition is met, i.e. only if the + # base device doesn't already have an fstype= entry i.e. # None, this way we don't overwrite / loose info and we # only need to have one partition identified as an # mdraid member to classify the entire device (the base # device) as a raid member, at least in part. - dnames[dname][8] = dmap["FSTYPE"] - if dmap["FSTYPE"] == "crypto_LUKS" and (dnames[dname][8] is None): - # As per mdraid we backport to the base device LUKS - # containers that live in partitions as the base device - # will have an FSTYPE="" and as per mdraid we classify - # the entire device as a LUKS container member even if + dnames[dname] = dnames[dname]._replace(fstype=dev.fstype) + if dev.fstype == "crypto_LUKS" and (dnames[dname].fstype is None): + # As per mdraid, we backport to the base device LUKS + # containers that live in partitions, as the base device + # will have an fstype="", and as per mdraid we classify + # the entire device as a LUKS container member, even if # it is only in part (ie this partition). But we only # backport this information if there currently exists - # no FSTYPE on the base device, there by protecting + # no fstype= on the base device, there by protecting # against fstype information loss on the base device. - # Please see mdraid partition treatment for additional - # comments on index number used. - dnames[dname][8] = dmap["FSTYPE"] - # and uuid backport - dnames[dname][10] = dmap["UUID"] - # Akin to back porting a partitions FSTYPE to it's base - # device, as with 'linux_raid_member' above, we can do the - # same for btrfs if found in a partition. + dnames[dname] = dnames[dname]._replace( + fstype=dev.fstype, uuid=dev.uuid + ) + # Akin to back porting a partitions' fstype to its base device, + # as with 'linux_raid_member' above, we can do the + # same for btrfs-in-partition. # This is intended to facilitate the user redirection role - # so that the base disk can be labeled with it's partitions - # fstype, label (for pool updates) uuid, and size. + # so that the base disk can be labeled with its partitions fstype, + # label (for pool updates), uuid, and size. # N.B. The base device info will end up pertaining to the # highest partition numbers details. Design limitation. - if is_btrfs and dnames[dname][8] is None: + if is_btrfs and dnames[dname].fstype is None: # We are a btrfs partition where the base device has no - # fstype entry: backport: fstype, label, uuid & size. - # fstype backport - dnames[dname][8] = dmap["FSTYPE"] - # label backport is at index 9 - dnames[dname][9] = dmap["LABEL"] - # and uuid backport - dnames[dname][10] = dmap["UUID"] - # and size backport - dnames[dname][3] = dmap["SIZE"] + # fstype entry: backport: size, fstype, label, & uuid. + dnames[dname] = dnames[dname]._replace( + size=dev.size, + fstype=dev.fstype, + label=dev.label, + uuid=dev.uuid, + ) # Build a dictionary of the partitions we find. # Back port our current name as a partition entry in our - # base devices 'partitions' dictionary 14th item (index 13) - dnames[dname][13][dmap["NAME"]] = dmap["FSTYPE"] + # base devices 'partitions' dictionary: + dnames[dname].partitions[dev.name] = dev.fstype # This dict is intended for use later in roles such as # import / export devices or external backup drives so # that the role config mechanism can offer up the known # partitions found so that the eventual configured role # will know which partition on the role based device to - # work with and it current filesystem type. - # Has one role per device limit but helps to keep usability - # and underlying disk management simpler. + # work with and its current filesystem type. + # There is a 'one role per device' limit, this helps with + # usability, and reduces underlying disk management complexity. else: - # We are not a partition so record this. - dmap["parted"] = False - # As we are not a partition it is assumed that we might hold a - # partition so start an empty partition dictionary for this. + # TODO: likely this else clause is now redundant given our namedtuple's defaults. + # We are not a partition so record this via bool flag and empty partitions dict. # N.B. This assumes base devices are listed before their partitions - dmap["partitions"] = {} - # This dict will be populated when we find our partitions and back - # port their names and fstype (as values). + dev = dev._replace(parted=False, partitions={}) if (not is_root_disk and not is_partition) or is_btrfs: - # We have a non system disk that is not a partition - # or - # We have a device that is btrfs formatted - # Or we may just be a non system disk without partitions. - dmap["root"] = is_root_disk + # We have a non system disk that is not a partition. Or + # We have a device that is btrfs formatted. Or + # We may just be a non system disk without partitions. + dev = dev._replace(root=is_root_disk) if is_btrfs: - # a btrfs file system # Regex to identify a partition on the base_root_disk. # Root on 'sda3' gives base_root_disk 'sda'. - if re.match("/dev/sd|/dev/vd", dmap["NAME"]) is not None: + if re.match("/dev/sd|/dev/vd", dev.name) is not None: # eg 'sda' or 'vda' with >= one additional digit, part_regex = base_root_disk + "\d+" else: # md126 or nvme0n1 with 'p' + >= one additional digit eg: # md126p3 or nvme0n1p4; also mmcblk0p2 for base mmcblk0. part_regex = base_root_disk + "p\d+" - if re.match(part_regex, dmap["NAME"]) is not None: + if re.match(part_regex, dev.name) is not None: logger.debug("--- Inheriting base_root_disk info ---") # We are assuming that a partition with a btrfs fs on is - # our root if it's name begins with our base system disk - # name. Now add the properties we stashed when looking at - # the base root disk rather than the root partition we see - # here. - dmap["SERIAL"] = root_serial - dmap["MODEL"] = root_model - dmap["TRAN"] = root_transport - dmap["VENDOR"] = root_vendor - dmap["HCTL"] = root_hctl - # As we have found root to be on a partition we can now un - # flag the base device as having been root prior to finding - # this partition on that base_root_disk N.B. Assumes base - # dev is listed before it's partitions The 13th item in - # dnames entries is root so index = 12. Only update our + # our root, if its name begins with our base system disk name. + # Now add the properties we stashed when looking at + # the base root disk rather than the root partition we see here. + dev = dev._replace( + model=root_model, + serial=root_serial, + transport=root_transport, + vendor=root_vendor, + hctl=root_hctl, + ) + # As we have found root to be on a partition, we can now un-flag + # the base device as having been root prior to finding + # this partition on that base_root_disk. N.B. Assumes base + # dev is listed before it's partitions. Only update our # base_root_disk if it exists in our scanned disks as this # may be the first time we are seeing it. Search to see if - # we already have an entry for the the base_root_disk which - # may be us or our base dev if we are a partition + # we already have an entry for the base_root_disk which + # may be us, or our base dev, if we are a partition. for dname in dnames.keys(): if dname == base_root_disk: dnames[base_root_disk][12] = False - # And update this device as real root + # And update this device as real root. # Note we may be looking at the base_root_disk or one of - # it's partitions there after. - dmap["root"] = True + # its partitions thereafter. + dev = dev._replace(root=True) else: # We have a non system disk btrfs filesystem. - # Ie we are a whole disk or a partition with btrfs on but - # NOT on the system disk. + # I.e. we are a whole disk or a partition with btrfs on, + # but NOT on the system disk. # Most likely a current btrfs data drive or one we could # import. - # Ignore / skip this btrfs device if it's a partition + # Ignore / skip this btrfs device if it is a partition. if is_partition: logger.debug("-- Skipping non root btrfs partition -") continue - # No more continues so the device we have is to be passed to our db - # entry system views/disk.py ie _update_disk_state() - # Do final tidy of data in dmap and ready for entry in dnames dict. - # db needs unique serial so provide one where there is none found. + # No more continues so the device we have is to be passed to our DB + # entry system views/disk.py: _update_disk_state(). + # Do final tidy of data in dev and ready for entry in dnames dict. + # DB needs unique serial, so provide one where there is None found. # First try harder with udev if lsblk failed on serial retrieval. - if dmap["SERIAL"] == "" or always_use_udev_serial: + if dev.serial is None or dev.serial == "" or always_use_udev_serial: # lsblk fails to retrieve SERIAL from VirtIO drives and some - # sdcard devices and md devices so try specialized function. - dmap["SERIAL"] = get_disk_serial(dmap["NAME"], dmap["TYPE"]) + # SD Card devices, and MD devices, so try specialized function. + dev = dev._replace(serial=get_disk_serial(dev.name, dev.type)) # Now try specialized serial propagation methods: - # Bcache virtual block devices get their backing devices uuid - # We propagate the uuid for a bcache backing device to it's virtual + # Bcache virtual block devices get their backing devices uuid. + # We propagate the Disk.uuid for a bcache backing device to its virtual # counterpart device for use as a serial number. - # Note however that we are only interested in the 'backing device' - # type of bcache as it has the counterpart virtual block device. - if dmap["FSTYPE"] == "bcache": - bcache_dev_type = get_bcache_device_type(dmap["NAME"]) + # Note that we are only interested in the 'backing device' type of bcache, + # as it has the counterpart virtual block device. + if dev.fstype == "bcache": + bcache_dev_type = get_bcache_device_type(dev.name) if bcache_dev_type == "bdev": - bdev_uuid = dmap["UUID"] - # We set out bdev_flag to inform the next device - # interpretation. + bdev_uuid = dev.uuid bdev_flag = True elif bcache_dev_type == "cdev": # We have a bcache caching device, not a backing device. - # Change fstype as an indicator to _update_disk_state() - # role system. N.B. fstype bcachecdev is fictitious. - dmap["FSTYPE"] = "bcachecdev" + # Change fstype as an indicator to _update_disk_state()'s + # role system. N.B. fstype "bcachecdev" is fictitious. + dev = dev._replace(fstype="bcachecdev") else: - # we are a non bcache bdev but we might be the virtual device + # we are a non bcache bdev, but we might be the virtual device # if we are listed directly after a bcache bdev. if bdev_flag: # Assumption is there is only one virtual device for each # bdev and that it is listed directly after it's associated - # bdev. We are listed directly after a bcache bdev but + # bdev. We are listed directly after a bcache bdev, but # could still be any device. As no cheap distinguishing - # properties we for now rely on device name: - if re.match("bcache", dmap["NAME"]) is not None: + # properties we, for now, rely on the device name: + if re.match("bcache", dev.name) is not None: # We avoid overwriting any serial just in case, normal # bcache virtual devices have no serial reported by - # lsblk but future lsblk versions may change this. - if dmap["SERIAL"] == "": + # lsblk, but future lsblk versions may change this. + if dev.serial is None or dev.serial == "": # transfer our stashed bdev uuid as a serial. - dmap["SERIAL"] = "bcache-{}".format(bdev_uuid) - # reset the bdev_flag as we are only interested in devices + dev = dev._replace(serial=f"bcache-{bdev_uuid}") + # Reset the bdev_flag as we are only interested in devices # listed directly after a bdev anyway. bdev_flag = False - if (dmap["SERIAL"] in EXCLUDED_SERIAL_NUMS) or ( - dmap["SERIAL"] in serials_seen - ): - # No serial number still or its a repeat. Overwrite drive - # serial entry in dmap with fake-serial- + uuid4. See js/template/ - # disk/disks_table.jst for a use of this flag mechanism. - # Previously we did dmap['SERIAL'] = dmap['NAME'] which is less - # robust as it can itself produce duplicate serial numbers. + if (dev.serial in EXCLUDED_SERIAL_NUMS) or (dev.serial in serials_seen): + # Overwrite dev.serial with fake-serial- + uuid4. + # See js/template/disk/disks_table.jst for a use of this flag mechanism. if test_mode: # required for reproducible output for repeatable tests - dmap["SERIAL"] = "fake-serial-" + dev = dev._replace(serial="fake-serial-") else: # 12 chars (fake-serial-) + 36 chars (uuid4) = 48 chars - dmap["SERIAL"] = "fake-serial-" + str(uuid.uuid4()) - serials_seen.append(dmap["SERIAL"]) - # replace all dmap values of '' with None. - for key in dmap.keys(): - if dmap[key] == "": - dmap[key] = None - # transfer our device info as now parsed in dmap to the dnames dict - dnames[dmap["NAME"]] = [ - dmap["NAME"], - dmap["MODEL"], - dmap["SERIAL"], - dmap["SIZE"], - dmap["TRAN"], - dmap["VENDOR"], - dmap["HCTL"], - dmap["TYPE"], - dmap["FSTYPE"], - dmap["LABEL"], - dmap["UUID"], - dmap["parted"], - dmap["root"], - dmap["partitions"], - ] - # Transfer our collected disk / dev entries of interest to the disks list. - for d in dnames.keys(): - disks.append(Disk(*dnames[d])) - # logger.debug('disks item = {} '.format(Disk(*dnames[d]))) - return disks + dev = dev._replace(serial="fake-serial-" + str(uuid.uuid4())) + serials_seen.append(dev.serial) + # N.B. no dev.field should be = "": but None of NoneType instead. + # Transfer our now processed Disk into the dnames dict; indexed by dev.name + dnames[dev.name] = dev + return list(dnames.values()) def uptime(): @@ -1174,11 +1119,11 @@ def root_disk(): raise NonBTRFSRootException(msg) -def get_md_members(device_name, test=None): +def get_md_members(device_name, test=None) -> str | None: """ Returns the md members from a given device, if the given device is not an - md device or the udevadm info command returns a non 0 (error) then an - empty string is returned. + md device or the udevadm info command returns a non 0 (error) then None + is returned. Example lines to parse from udevadmin:- E: MD_DEVICE_sda_DEV=/dev/sda E: MD_DEVICE_sda_ROLE=0 @@ -1196,7 +1141,7 @@ def get_md_members(device_name, test=None): line_fields = [] # if non md device then return empty string if re.match("/dev/md", device_name) is None: - return "" + return None members_string = "" if test is None: out, err, rc = run_command( @@ -1207,7 +1152,7 @@ def get_md_members(device_name, test=None): out = test rc = 0 if rc != 0: # if return code is an error return empty string - return "" + return None # search output of udevadmin to find all current md members. for line in out: if line == "": @@ -1237,10 +1182,12 @@ def get_md_members(device_name, test=None): # > 1 char value that doesn't start with /dev, so raid # level members_string += line_fields[2] + if members_string == "": + members_string = None return members_string -def get_disk_serial(device_name, device_type=None, test=None): +def get_disk_serial(device_name, device_type=None, test=None) -> str | None: """Returns the serial number of device_name using udevadm to match that returned by lsblk. N.B. udevadm has been observed to return the following:- ID_SCSI_SERIAL rarely seen @@ -1261,9 +1208,9 @@ def get_disk_serial(device_name, device_type=None, test=None): None as an indication that the caller cannot provide this info. :param test: When not None this parameter's contents is substituted for the return of the udevadm info --name=device_name command output - :return: 12345678901234567890 or empty string if no serial was retrieved. + :return: 12345678901234567890 or None if no serial was retrieved. """ - serial_num = "" + serial_num = None uuid_search_string = "" line_fields = [] # udevadm requires the full path for Device Mapped (DM) disks so if our @@ -1328,10 +1275,10 @@ def get_disk_serial(device_name, device_type=None, test=None): if line_fields[1] == "ID_SERIAL": # SERIAL is sometimes our only option but only use it if we # have found nothing else. - if serial_num == "": + if serial_num == "" or serial_num is None: serial_num = line_fields[2] # should return one of the following in order of priority - # SCSI_SERIAL, SERIAL_SHORT, SERIAL + # SCSI_SERIAL, SERIAL_SHORT, SERIAL, None return serial_num