diff --git a/sonic_platform_base/sonic_xcvr/api/public/cmisTargetFWUpgrade.py b/sonic_platform_base/sonic_xcvr/api/public/cmisTargetFWUpgrade.py index cc3bac81d..c527dedf2 100644 --- a/sonic_platform_base/sonic_xcvr/api/public/cmisTargetFWUpgrade.py +++ b/sonic_platform_base/sonic_xcvr/api/public/cmisTargetFWUpgrade.py @@ -5,9 +5,132 @@ upgrade of remote target from the local target itself. """ +import struct +import sys +import traceback from ...fields import consts from .cmis import CmisApi +import logging + +logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) + +TARGET_E0_VALUE = 0 +TARGET_E1_VALUE = 1 +TARGET_E2_VALUE = 2 + +SERVER_FW_VERSION_SIZE = 16 +SERVER_FW_VERSION_NUMBER_SIZE = 4 + +TARGET_LIST = [TARGET_E0_VALUE, TARGET_E1_VALUE, TARGET_E2_VALUE] + +CABLE_E1_FIRMWARE_INFO_MAP = { + 'active_firmware': 'e1_active_firmware', + 'inactive_firmware': 'e1_inactive_firmware', + 'server_firmware': 'e1_server_firmware' +} + +CABLE_E2_FIRMWARE_INFO_MAP = { + 'active_firmware': 'e2_active_firmware', + 'inactive_firmware': 'e2_inactive_firmware', + 'server_firmware': 'e2_server_firmware' +} + +REMOTE_TARGET_FIRMWARE_INFO_MAP = { + TARGET_E1_VALUE: CABLE_E1_FIRMWARE_INFO_MAP, + TARGET_E2_VALUE: CABLE_E2_FIRMWARE_INFO_MAP, +} + class CmisTargetFWUpgradeAPI(CmisApi): def set_firmware_download_target_end(self, target): return self.xcvr_eeprom.write(consts.TARGET_MODE, target) + + """ + Reads the active, inactive and server firmware version from all targets + and returns a dictionary of the firmware versions. + Returns: + A dictionary of the firmware versions for all targets. + """ + def get_transceiver_info_firmware_versions(self): + return_dict = { + 'active_firmware': 'N/A', + 'inactive_firmware': 'N/A', + 'e1_active_firmware': 'N/A', + 'e1_inactive_firmware': 'N/A', + 'e2_active_firmware': 'N/A', + 'e2_inactive_firmware': 'N/A', + 'e1_server_firmware': 'N/A', + 'e2_server_firmware': 'N/A' + } + + for target in TARGET_LIST: + try: + if not self.set_firmware_download_target_end(target): + logging.error("Target mode change failed. Target: {}".format(target)) + continue + + # Any register apart from the TARGET_MODE register will have the value 0xff + # if the remote target is not accessible from the local target. + module_type = self.get_module_type() + if 'Unknown' in module_type: + logging.info("Remote target {} not accessible. Skipping.".format(target)) + continue + + firmware_versions = super().get_transceiver_info_firmware_versions() + if target in REMOTE_TARGET_FIRMWARE_INFO_MAP: + # Add server firmware version to the firmware_versions dictionary + firmware_versions.update(self._get_server_firmware_version()) + return_dict.update(self._convert_firmware_info_to_target_firmware_info( + firmware_versions, REMOTE_TARGET_FIRMWARE_INFO_MAP[target])) + else: + return_dict.update(firmware_versions) + except Exception as e: + logging.error("Exception occurred while handling target {} firmware version: {}".format(target, repr(e))) + exc_type, exc_value, exc_traceback = sys.exc_info() + msg = traceback.format_exception(exc_type, exc_value, exc_traceback) + for tb_line in msg: + for tb_line_split in tb_line.splitlines(): + logging.error(tb_line_split) + continue + + self.set_firmware_download_target_end(TARGET_E0_VALUE) + return return_dict + + def _convert_firmware_info_to_target_firmware_info(self, firmware_info, firmware_info_map): + return_dict = {} + for key, value in firmware_info_map.items(): + if key in firmware_info: + return_dict[value] = firmware_info[key] + return return_dict + + """ + Reads the server firmware version and return a dictionary of the server firmware version. + The server firmware version is of the format "A.B.C.D" where A, B, C and D are 4 bytes each representing a number. + Following are the steps to read the server firmware version: + 1. Read the magic byte at page 3h, offset 128. If this has the value 0x0, then the server + firmware version is not available and hence, return without proceeding to step 2. + 2. Calculate the checksum of the server firmware version. If the checksum is not valid, then the server + firmware version is not available. If the checksum is valid, then proceed to step 3. + 3. Read the server firmware version from page 3h, offset 130-145. + Returns: + A dictionary of the server firmware version. + """ + def _get_server_firmware_version(self): + return_dict = { + 'server_firmware': 'N/A' + } + + magic_byte = self.xcvr_eeprom.read(consts.SERVER_FW_MAGIC_BYTE) + if magic_byte != 0: + checksum = self.xcvr_eeprom.read(consts.SERVER_FW_CHECKSUM) + server_fw_version_byte_array, server_fw_version_str = self.xcvr_eeprom.read(consts.SERVER_FW_VERSION) + + calculated_checksum = 0 + for byte in server_fw_version_byte_array: + calculated_checksum += byte + + if calculated_checksum & 0xFF == checksum: + return_dict['server_firmware'] = server_fw_version_str + + return return_dict diff --git a/sonic_platform_base/sonic_xcvr/api/xcvr_api.py b/sonic_platform_base/sonic_xcvr/api/xcvr_api.py index cf85500ae..d1915a952 100644 --- a/sonic_platform_base/sonic_xcvr/api/xcvr_api.py +++ b/sonic_platform_base/sonic_xcvr/api/xcvr_api.py @@ -75,8 +75,8 @@ def get_transceiver_info_firmware_versions(self): Retrieves active and inactive firmware versions of the xcvr Returns: - A list with active and inactive firmware versions of the xcvr - [active_firmware, inactive_firmware] + A dictionary containing the active and inactive firmware versions of the transceiver + {'active_firmware' : 'A.B.C', 'inactive_firmware' : 'X.Y.Z'} """ raise NotImplementedError diff --git a/sonic_platform_base/sonic_xcvr/fields/consts.py b/sonic_platform_base/sonic_xcvr/fields/consts.py index bf781b29c..44b44073e 100644 --- a/sonic_platform_base/sonic_xcvr/fields/consts.py +++ b/sonic_platform_base/sonic_xcvr/fields/consts.py @@ -477,6 +477,12 @@ CDB_CMD = "CdbCommand" CDB_WRITE_MSG = "CdbWriteMessage" +#CMISTargetFWUpgrade +CMIS_TARGET_SERVER_INFO = "CmisTargetServerInfo" +SERVER_FW_MAGIC_BYTE = "ServerFirmwareMagicByte" +SERVER_FW_CHECKSUM = "ServerFirmwareChecksum" +SERVER_FW_VERSION = "ServerFirmwareVersion" + #VENDOR SPECIFIC VENDOR_CUSTOM = "VendorCustom" TARGET_MODE = "TargetMode" diff --git a/sonic_platform_base/sonic_xcvr/fields/xcvr_field.py b/sonic_platform_base/sonic_xcvr/fields/xcvr_field.py index 93f51613f..c8856911b 100644 --- a/sonic_platform_base/sonic_xcvr/fields/xcvr_field.py +++ b/sonic_platform_base/sonic_xcvr/fields/xcvr_field.py @@ -248,6 +248,26 @@ def __init__(self, name, offset, *fields, **kwargs): def decode(self, raw_data, **decoded_deps): return '-'.join([ "%02x" % byte for byte in raw_data]) +class ServerFWVersionRegField(RegField): + """ + Returns the raw byte(s) + """ + def __init__(self, name, offset, *fields, **kwargs): + super(ServerFWVersionRegField, self).__init__(name, offset, *fields, **kwargs) + + def decode(self, raw_data, **decoded_deps): + server_fw_version_str = '' + server_fw_version_size = 16 + server_fw_version_number_size = 4 + + # Use a list comprehension to convert each 4-byte number to a string + server_fw_version_str = '.'.join( + str(struct.unpack('>I', raw_data[i:i+server_fw_version_number_size])[0]) + for i in range(0, server_fw_version_size, server_fw_version_number_size) + ) + + return raw_data, server_fw_version_str + class RegGroupField(XcvrField): """ Field denoting one or more bytes, logically interpreted as one or more RegFields diff --git a/sonic_platform_base/sonic_xcvr/mem_maps/public/cmisTargetFWUpgrade.py b/sonic_platform_base/sonic_xcvr/mem_maps/public/cmisTargetFWUpgrade.py index 3cb2bdd7a..46128088e 100644 --- a/sonic_platform_base/sonic_xcvr/mem_maps/public/cmisTargetFWUpgrade.py +++ b/sonic_platform_base/sonic_xcvr/mem_maps/public/cmisTargetFWUpgrade.py @@ -6,7 +6,19 @@ """ from .cmis import CmisMemMap +from ...fields.xcvr_field import ( + NumberRegField, + RegGroupField, + ServerFWVersionRegField +) +from ...fields import consts class CmisTargetFWUpgradeMemMap(CmisMemMap): # Vendor agnostic implementation to be added here - pass + def __init__(self, codes): + super().__init__(codes) + + self.CMIS_TARGET_SERVER_INFO = RegGroupField(consts.CMIS_TARGET_SERVER_INFO, + NumberRegField(consts.SERVER_FW_MAGIC_BYTE, self.getaddr(0x3, 128), format="B", size=1), + NumberRegField(consts.SERVER_FW_CHECKSUM, self.getaddr(0x3, 129), format="B", size=1), + ServerFWVersionRegField(consts.SERVER_FW_VERSION, self.getaddr(0x3, 130), size=16)) diff --git a/tests/sonic_xcvr/test_cmisTargetFWUpgrade.py b/tests/sonic_xcvr/test_cmisTargetFWUpgrade.py new file mode 100644 index 000000000..37464aa9d --- /dev/null +++ b/tests/sonic_xcvr/test_cmisTargetFWUpgrade.py @@ -0,0 +1,79 @@ +from unittest.mock import patch +from mock import MagicMock +import pytest +from sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade import TARGET_E0_VALUE, TARGET_LIST, CmisTargetFWUpgradeAPI +from sonic_platform_base.sonic_xcvr.codes.public.cmisTargetFWUpgrade import CmisTargetFWUpgradeCodes +from sonic_platform_base.sonic_xcvr.mem_maps.public.cmisTargetFWUpgrade import CmisTargetFWUpgradeMemMap +from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom + +class TestCmis(object): + codes = CmisTargetFWUpgradeCodes + mem_map = CmisTargetFWUpgradeMemMap(codes) + reader = MagicMock(return_value=None) + writer = MagicMock() + eeprom = XcvrEeprom(reader, writer, mem_map) + api = CmisTargetFWUpgradeAPI(eeprom) + + @pytest.mark.parametrize("set_firmware_result, module_type, exception_raised", [ + (False, 'QSFP+ or later with CMIS', False), + (True, 'Unknown', False), + (True, 'QSFP+ or later with CMIS', True) + ]) + @patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info_firmware_versions', MagicMock(side_effect=({}, Exception('error'), {}))) + @patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._get_server_firmware_version', MagicMock()) + @patch('traceback.format_exception') + def test_get_transceiver_info_firmware_versions_failure(self, mock_format_exception, set_firmware_result, module_type, exception_raised): + expected_output = {'active_firmware': 'N/A', 'inactive_firmware': 'N/A', 'e1_active_firmware': 'N/A',\ + 'e1_inactive_firmware': 'N/A', 'e2_active_firmware': 'N/A', 'e2_inactive_firmware': 'N/A',\ + 'e1_server_firmware': 'N/A', 'e2_server_firmware': 'N/A'} + self.api.set_firmware_download_target_end = MagicMock(return_value=set_firmware_result) + self.api.get_module_type = MagicMock(return_value=module_type) + + result = self.api.get_transceiver_info_firmware_versions() + assert result == expected_output + + assert self.api.set_firmware_download_target_end.call_count == len(TARGET_LIST) + 1 + # Ensure that FW version is read for all targets + for index, call in enumerate(self.api.set_firmware_download_target_end.call_args_list): + args, _ = call + # Ensure target is restore to E0 after reading FW version from all targets + if index == len(TARGET_LIST): + assert args[0] == TARGET_E0_VALUE + else: + assert args[0] == TARGET_LIST[index] + + if exception_raised: + assert mock_format_exception.call_count == 1 + assert self.api._get_server_firmware_version.call_count == 1 + else: + self.api._get_server_firmware_version.assert_not_called() + + @pytest.mark.parametrize("fw_info_dict, server_fw_info_dict, expected_output", [ + (({'active_firmware': '1.1.1', 'inactive_firmware': '1.0.0'}, {'active_firmware': '1.1.1', 'inactive_firmware': '1.0.0'}, {'active_firmware': '1.1.1', 'inactive_firmware': '1.0.0'}), ({'server_firmware': '1.5.0.1421'}, {'server_firmware': '1.5.0.1421'}),\ + {'active_firmware': '1.1.1', 'inactive_firmware': '1.0.0', 'e1_active_firmware': '1.1.1', 'e1_inactive_firmware': '1.0.0', 'e2_active_firmware': '1.1.1', 'e2_inactive_firmware': '1.0.0', 'e1_server_firmware': '1.5.0.1421', 'e2_server_firmware': '1.5.0.1421'}), + (({'active_firmware': '1.1.1', 'inactive_firmware': '1.0.0'}, {'active_firmware': '2.1.1', 'inactive_firmware': '1.0.0'}, {'active_firmware': '1.1.1', 'inactive_firmware': '2.0.1'}), ({'server_firmware': '1223.6.0.739'}, {'server_firmware': '93.5.0.3431'}),\ + {'active_firmware': '1.1.1', 'inactive_firmware': '1.0.0', 'e1_active_firmware': '2.1.1', 'e1_inactive_firmware': '1.0.0', 'e2_active_firmware': '1.1.1', 'e2_inactive_firmware': '2.0.1', 'e1_server_firmware': '1223.6.0.739', 'e2_server_firmware': '93.5.0.3431'}) + ]) + def test_get_transceiver_info_firmware_versions_success(self, fw_info_dict, server_fw_info_dict, expected_output): + with patch('sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info_firmware_versions', side_effect=fw_info_dict): + with patch('sonic_platform_base.sonic_xcvr.api.public.cmisTargetFWUpgrade.CmisTargetFWUpgradeAPI._get_server_firmware_version', side_effect=server_fw_info_dict): + self.api.set_firmware_download_target_end = MagicMock(return_value=True) + self.api.get_module_type = MagicMock(return_value='QSFP+ or later with CMIS') + + result = self.api.get_transceiver_info_firmware_versions() + assert result == expected_output + assert self.api.set_firmware_download_target_end.call_count == len(TARGET_LIST) + 1 + + @pytest.mark.parametrize("magic_byte, checksum, server_fw_version_byte_array, expected", [ + (0, 0, (), {'server_firmware': 'N/A'}), + (0, 0x98, [0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 5, 0x8d], {'server_firmware': 'N/A'}), # Magic byte is 0 but other values are valid + (0xAC, 0x98, ([0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 5, 0x8d], "1.5.0.1421"), {'server_firmware': '1.5.0.1421'}), + (0xff, 0xff, ([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff], "N/A"), {'server_firmware': 'N/A'}), + (0xAC, 0x98, ([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff], "N/A"), {'server_firmware': 'N/A'}) + ]) + def test_get_server_firmware_version(self, magic_byte, checksum, server_fw_version_byte_array, expected): + self.api.xcvr_eeprom.read = MagicMock() + self.api.xcvr_eeprom.read.side_effect = [magic_byte, checksum, server_fw_version_byte_array] + + result = self.api._get_server_firmware_version() + assert result == expected diff --git a/tests/sonic_xcvr/test_xcvr_field.py b/tests/sonic_xcvr/test_xcvr_field.py index a955cc8e8..3b87e1bc7 100644 --- a/tests/sonic_xcvr/test_xcvr_field.py +++ b/tests/sonic_xcvr/test_xcvr_field.py @@ -3,6 +3,7 @@ DateField, FixedNumberRegField, HexRegField, + ServerFWVersionRegField, NumberRegField, RegBitField, RegBitsField, @@ -64,6 +65,7 @@ def __init__(self, codes): ) self.STRING_REG = StringRegField("StringReg", 12, size=15) self.HEX_REG = HexRegField("HexReg", 30, size=3) + self.BYTES_REG = ServerFWVersionRegField("BytesReg", 10, size=4) self.REG_GROUP = RegGroupField("RegGroup", NumberRegField("Field0", 6, ro=False), NumberRegField("Field1", 7, @@ -298,6 +300,12 @@ def test_decode(self): data = bytearray([0xAA, 0xBB, 0xCC]) assert field.decode(data) == "aa-bb-cc" +class TestServerFWVersionRegField(object): + def test_decode(self): + field = mem_map.get_field("BytesReg") + data = bytearray([0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 5, 0x8d]) + assert field.decode(data) == (bytearray([0, 0, 0, 1, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 5, 0x8d]), "1.5.0.1421") + class TestRegGroupField(object): def test_offset(self): field = mem_map.get_field("RegGroup")