Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize unsupported values to 'N/A' in CmisApi::get_transceiver_info #545

Merged
merged 9 commits into from
Mar 1, 2025
34 changes: 27 additions & 7 deletions sonic_platform_base/sonic_xcvr/api/public/c_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
"""
from sonic_py_common import logger
from ...fields import consts
from .cmis import CmisApi, CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP
from .cmis import CmisApi, CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP, CMIS_XCVR_INFO_DEFAULT_DICT
import time
import copy
BYTELENGTH = 8
SYSLOG_IDENTIFIER = "CCmisApi"

Expand Down Expand Up @@ -45,6 +46,14 @@

helper_logger = logger.Logger(SYSLOG_IDENTIFIER)

C_CMIS_XCVR_INFO_DEFAULT_DICT = copy.deepcopy(CMIS_XCVR_INFO_DEFAULT_DICT)
C_CMIS_XCVR_INFO_DEFAULT_DICT.update({
"supported_max_tx_power": "N/A",
"supported_min_tx_power": "N/A",
"supported_max_laser_freq": "N/A",
"supported_min_laser_freq": "N/A"
})

class CCmisApi(CmisApi):
def __init__(self, xcvr_eeprom):
super(CCmisApi, self).__init__(xcvr_eeprom)
Expand Down Expand Up @@ -312,6 +321,9 @@ def get_pm_all(self):
PM_dict['rx_mer_max'] = self.xcvr_eeprom.read(consts.RX_MAX_MER_PM)
return PM_dict

def _get_xcvr_info_default_dict(self):
return C_CMIS_XCVR_INFO_DEFAULT_DICT

def get_transceiver_info(self):
"""
Retrieves transceiver info of this SFP
Expand Down Expand Up @@ -354,14 +366,22 @@ def get_transceiver_info(self):
supported_min_laser_freq = FLOAT ; support minimum laser frequency
================================================================================
"""
trans_info = super(CCmisApi,self).get_transceiver_info()
xcvr_info = super(CCmisApi, self).get_transceiver_info()

# Return None if CmisApi class returns None, this indicates to XCVRD that retry is
# needed.
if xcvr_info is None:
return None

min_power, max_power = self.get_supported_power_config()
trans_info['supported_max_tx_power'] = max_power
trans_info['supported_min_tx_power'] = min_power
_, _, _, low_freq_supported, high_freq_supported = self.get_supported_freq_config()
trans_info['supported_max_laser_freq'] = high_freq_supported
trans_info['supported_min_laser_freq'] = low_freq_supported
return trans_info
xcvr_info.update({
'supported_max_tx_power': max_power,
'supported_min_tx_power': min_power,
'supported_max_laser_freq': high_freq_supported,
'supported_min_laser_freq': low_freq_supported
})
return xcvr_info

def get_transceiver_bulk_status(self):
"""
Expand Down
78 changes: 55 additions & 23 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .cmisCDB import CmisCdbApi
from .cmisVDM import CmisVdmApi
import time
import copy
from collections import defaultdict

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,6 +78,37 @@ class VdmSubtypeIndex(Enum):
"Errored Frames Current Value Host Input" : "errored_frames_curr_host_input"
}

CMIS_XCVR_INFO_DEFAULT_DICT = {
"type": "N/A",
"type_abbrv_name": "N/A",
"hardware_rev": "N/A",
"serial": "N/A",
"manufacturer": "N/A",
"model": "N/A",
"connector": "N/A",
"encoding": "N/A",
"ext_identifier": "N/A",
"ext_rateselect_compliance": "N/A",
"cable_length": "N/A",
"nominal_bit_rate": "N/A",
"vendor_date": "N/A",
"vendor_oui": "N/A",
**{f"active_apsel_hostlane{i}": "N/A" for i in range(1, 9)},
"application_advertisement": "N/A",
"host_electrical_interface": "N/A",
"media_interface_code": "N/A",
"host_lane_count": "N/A",
"media_lane_count": "N/A",
"host_lane_assignment_option": "N/A",
"media_lane_assignment_option": "N/A",
"cable_type": "N/A",
"media_interface_technology": "N/A",
"vendor_rev": "N/A",
"cmis_rev": "N/A",
"specification_compliance": "N/A",
"vdm_supported": "N/A"
}

class CmisApi(XcvrApi):
NUM_CHANNELS = 8
LowPwrRequestSW = 4
Expand Down Expand Up @@ -259,6 +291,9 @@ def get_module_inactive_firmware(self):
inactive_fw = [str(num) for num in [inactive_fw_major, inactive_fw_minor]]
return '.'.join(inactive_fw)

def _get_xcvr_info_default_dict(self):
return CMIS_XCVR_INFO_DEFAULT_DICT

def get_transceiver_info(self):
admin_info = self.xcvr_eeprom.read(consts.ADMIN_INFO_FIELD)
if admin_info is None:
Expand All @@ -267,44 +302,41 @@ def get_transceiver_info(self):
ext_id = admin_info[consts.EXT_ID_FIELD]
power_class = ext_id[consts.POWER_CLASS_FIELD]
max_power = ext_id[consts.MAX_POWER_FIELD]

xcvr_info = {
xcvr_info = copy.deepcopy(self._get_xcvr_info_default_dict())
xcvr_info.update({
"type": admin_info[consts.ID_FIELD],
"type_abbrv_name": admin_info[consts.ID_ABBRV_FIELD],
"hardware_rev": self.get_module_hardware_revision(),
"serial": admin_info[consts.VENDOR_SERIAL_NO_FIELD],
"manufacturer": admin_info[consts.VENDOR_NAME_FIELD],
"model": admin_info[consts.VENDOR_PART_NO_FIELD],
"connector": admin_info[consts.CONNECTOR_FIELD],
"encoding": "N/A", # Not supported
"ext_identifier": "%s (%sW Max)" % (power_class, max_power),
"ext_rateselect_compliance": "N/A", # Not supported
"cable_length": float(admin_info[consts.LENGTH_ASSEMBLY_FIELD]),
"nominal_bit_rate": 0, # Not supported
"vendor_date": admin_info[consts.VENDOR_DATE_FIELD],
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD]
}
appl_advt = self.get_application_advertisement()
xcvr_info['application_advertisement'] = str(appl_advt) if len(appl_advt) > 0 else 'N/A'
xcvr_info['host_electrical_interface'] = self.get_host_electrical_interface()
xcvr_info['media_interface_code'] = self.get_module_media_interface()
xcvr_info['host_lane_count'] = self.get_host_lane_count()
xcvr_info['media_lane_count'] = self.get_media_lane_count()
xcvr_info['host_lane_assignment_option'] = self.get_host_lane_assignment_option()
xcvr_info['media_lane_assignment_option'] = self.get_media_lane_assignment_option()
xcvr_info['cable_type'] = self.get_cable_length_type()
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD],
"application_advertisement": str(self.get_application_advertisement()) if len(self.get_application_advertisement()) > 0 else 'N/A',
"host_electrical_interface": self.get_host_electrical_interface(),
"media_interface_code": self.get_module_media_interface(),
"host_lane_count": self.get_host_lane_count(),
"media_lane_count": self.get_media_lane_count(),
"host_lane_assignment_option": self.get_host_lane_assignment_option(),
"media_lane_assignment_option": self.get_media_lane_assignment_option(),
"cable_type": self.get_cable_length_type(),
"media_interface_technology": self.get_media_interface_technology(),
"vendor_rev": self.get_vendor_rev(),
"cmis_rev": self.get_cmis_rev(),
"specification_compliance": self.get_module_media_type(),
"vdm_supported": self.is_transceiver_vdm_supported()
})
apsel_dict = self.get_active_apsel_hostlane()
for lane in range(1, self.NUM_CHANNELS+1):
for lane in range(1, self.NUM_CHANNELS + 1):
xcvr_info["%s%d" % ("active_apsel_hostlane", lane)] = \
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]
xcvr_info['media_interface_technology'] = self.get_media_interface_technology()
xcvr_info['vendor_rev'] = self.get_vendor_rev()
xcvr_info['cmis_rev'] = self.get_cmis_rev()
xcvr_info['specification_compliance'] = self.get_module_media_type()
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]

# In normal case will get a valid value for each of the fields. If get a 'None' value
# means there was a failure while reading the EEPROM, either because the EEPROM was
# not ready yet or experincing some other issues. It shouldn't return a dict with a
# not ready yet or experiencing some other issues. It shouldn't return a dict with a
# wrong field value, instead should return a 'None' to indicate to XCVRD that retry is
# needed.
if None in xcvr_info.values():
Expand Down
80 changes: 70 additions & 10 deletions tests/sonic_xcvr/test_ccmis.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from mock import MagicMock
from mock import patch
import pytest
from sonic_platform_base.sonic_xcvr.api.public.c_cmis import CCmisApi
from sonic_platform_base.sonic_xcvr.api.public.c_cmis import CCmisApi, C_CMIS_XCVR_INFO_DEFAULT_DICT
from sonic_platform_base.sonic_xcvr.mem_maps.public.c_cmis import CCmisMemMap
from sonic_platform_base.sonic_xcvr.xcvr_eeprom import XcvrEeprom
from sonic_platform_base.sonic_xcvr.codes.public.cmis import CmisCodes
Expand Down Expand Up @@ -160,32 +160,92 @@ def test_get_pm_all(self, mock_response, expected):

@pytest.mark.parametrize("mock_response, expected",[
(
[ {
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver'
(
{ # EEPROM DATA
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'type_abbrv_name': 'QSFP-DD',
'model': 'ABCD',
'encoding': 'N/A',
'ext_identifier': 'Power Class 8 (20.0W Max)',
'ext_rateselect_compliance': 'N/A',
'cable_type': 'Length Cable Assembly(m)',
'cable_length': 0.0,
'nominal_bit_rate': 'N/A',
'specification_compliance': 'sm_media_interface',
'application_advertisement': 'N/A',
'media_lane_count': 1,
'vendor_rev': '0.0',
'host_electrical_interface': '400GAUI-8 C2M (Annex 120E)',
'vendor_oui': 'xx-xx-xx',
'manufacturer': 'VENDOR_NAME',
'media_interface_technology': '1550 nm DFB',
'media_interface_code': '400ZR, DWDM, amplified',
'serial': '00000000',
'host_lane_count': 8,
**{f'active_apsel_hostlane{i}': 1 for i in range(1, 9)},
'hardware_rev': '0.0',
'cmis_rev': '5.0',
'media_lane_assignment_option': 1,
'connector': 'LC',
'host_lane_assignment_option': 1,
'vendor_date': '21010100',
'vdm_supported': True,
},
(-20, 0),
(0xff, -72, 120, 191300, 196100)
],
{
),
{ # Expected Result
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'type_abbrv_name': 'QSFP-DD',
'model': 'ABCD',
'encoding': 'N/A',
'ext_identifier': 'Power Class 8 (20.0W Max)',
'ext_rateselect_compliance': 'N/A',
'cable_type': 'Length Cable Assembly(m)',
'cable_length': 0.0,
'nominal_bit_rate': 'N/A',
'specification_compliance': 'sm_media_interface',
'application_advertisement': 'N/A',
'media_lane_count': 1,
'vendor_rev': '0.0',
'host_electrical_interface': '400GAUI-8 C2M (Annex 120E)',
'vendor_oui': 'xx-xx-xx',
'manufacturer': 'VENDOR_NAME',
'media_interface_technology': '1550 nm DFB',
'media_interface_code': '400ZR, DWDM, amplified',
'serial': '00000000',
'host_lane_count': 8,
**{f'active_apsel_hostlane{i}': 1 for i in range(1, 9)},
'hardware_rev': '0.0',
'cmis_rev': '5.0',
'media_lane_assignment_option': 1,
'connector': 'LC',
'host_lane_assignment_option': 1,
'vendor_date': '21010100',
'vdm_supported': True,
'supported_min_laser_freq': 191300,
'supported_max_laser_freq': 196100,
'supported_max_tx_power': 0,
'supported_min_tx_power': -20,

}
)
])
@patch("sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info")
def test_get_transceiver_info(self, get_transceiver_info_func, mock_response, expected):
# Mock the base class method to return initial transceiver data
get_transceiver_info_func.return_value = mock_response[0]
self.api.get_supported_power_config = MagicMock()
self.api.get_supported_power_config.return_value = mock_response[1]
self.api.get_supported_freq_config = MagicMock()
self.api.get_supported_freq_config.return_value = mock_response[2]

# Mock the power and frequency configurations
self.api.get_supported_power_config = MagicMock(return_value = mock_response[1])
self.api.get_supported_freq_config = MagicMock(return_value = mock_response[2])

# Call function under test
result = self.api.get_transceiver_info()
assert result == expected

# Test result is same as default dictionary length
assert len(C_CMIS_XCVR_INFO_DEFAULT_DICT) == len(result)


@pytest.mark.parametrize("mock_response, expected",[
(
Expand Down
Loading
Loading