Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize unsupported values to 'N/A' in CmisApi::get_transceiver_info #545

Merged
merged 9 commits into from
Mar 1, 2025
16 changes: 10 additions & 6 deletions sonic_platform_base/sonic_xcvr/api/public/c_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ...fields import consts
from .cmis import CmisApi, CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP
import time
import copy
BYTELENGTH = 8
SYSLOG_IDENTIFIER = "CCmisApi"

Expand Down Expand Up @@ -354,14 +355,17 @@ def get_transceiver_info(self):
supported_min_laser_freq = FLOAT ; support minimum laser frequency
================================================================================
"""
trans_info = super(CCmisApi,self).get_transceiver_info()
self.xcvr_info = copy.deepcopy(CmisApi.cmis_xcvr_info_dict)
self.xcvr_info = super(CCmisApi, self).get_transceiver_info()
min_power, max_power = self.get_supported_power_config()
trans_info['supported_max_tx_power'] = max_power
trans_info['supported_min_tx_power'] = min_power
_, _, _, low_freq_supported, high_freq_supported = self.get_supported_freq_config()
trans_info['supported_max_laser_freq'] = high_freq_supported
trans_info['supported_min_laser_freq'] = low_freq_supported
return trans_info
self.xcvr_info.update({
'supported_max_tx_power': max_power,
'supported_min_tx_power': min_power,
'supported_max_laser_freq': high_freq_supported,
'supported_min_laser_freq': low_freq_supported
})
return self.xcvr_info

def get_transceiver_bulk_status(self):
"""
Expand Down
88 changes: 63 additions & 25 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .cmisCDB import CmisCdbApi
from .cmisVDM import CmisVdmApi
import time
import copy
from collections import defaultdict

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,6 +83,47 @@ class CmisApi(XcvrApi):
LowPwrRequestSW = 4
LowPwrAllowRequestHW = 6

cmis_xcvr_info_dict = {
"type": "N/A",
"type_abbrv_name": "N/A",
"hardware_rev": "N/A",
"serial": "N/A",
"manufacturer": "N/A",
"model": "N/A",
"connector": "N/A",
"encoding": "N/A",
"ext_identifier": "N/A",
"ext_rateselect_compliance": "N/A",
"cable_length": "N/A",
"nominal_bit_rate": "N/A",
"vendor_date": "N/A",
"vendor_oui": "N/A",
"active_apsel_hostlane1": "N/A",
"active_apsel_hostlane2": "N/A",
"active_apsel_hostlane3": "N/A",
"active_apsel_hostlane4": "N/A",
"active_apsel_hostlane5": "N/A",
"active_apsel_hostlane6": "N/A",
"active_apsel_hostlane7": "N/A",
"active_apsel_hostlane8": "N/A",
"supported_max_tx_power": "N/A",
"supported_min_tx_power": "N/A",
"supported_max_laser_freq": "N/A",
"supported_min_laser_freq": "N/A",
"application_advertisement": "N/A",
"host_electrical_interface": "N/A",
"media_interface_code": "N/A",
"host_lane_count": "N/A",
"media_lane_count": "N/A",
"host_lane_assignment_option": "N/A",
"media_lane_assignment_option": "N/A",
"cable_type": "N/A",
"media_interface_technology": "N/A",
"vendor_rev": "N/A",
"cmis_rev": "N/A",
"specification_compliance": "N/A"
}

def __init__(self, xcvr_eeprom):
super(CmisApi, self).__init__(xcvr_eeprom)
self.vdm = CmisVdmApi(xcvr_eeprom) if not self.is_flat_memory() else None
Expand Down Expand Up @@ -267,50 +309,46 @@ def get_transceiver_info(self):
ext_id = admin_info[consts.EXT_ID_FIELD]
power_class = ext_id[consts.POWER_CLASS_FIELD]
max_power = ext_id[consts.MAX_POWER_FIELD]

xcvr_info = {
self.xcvr_info = copy.deepcopy(CmisApi.cmis_xcvr_info_dict)
self.xcvr_info.update({
"type": admin_info[consts.ID_FIELD],
"type_abbrv_name": admin_info[consts.ID_ABBRV_FIELD],
"hardware_rev": self.get_module_hardware_revision(),
"serial": admin_info[consts.VENDOR_SERIAL_NO_FIELD],
"manufacturer": admin_info[consts.VENDOR_NAME_FIELD],
"model": admin_info[consts.VENDOR_PART_NO_FIELD],
"connector": admin_info[consts.CONNECTOR_FIELD],
"encoding": "N/A", # Not supported
"ext_identifier": "%s (%sW Max)" % (power_class, max_power),
"ext_rateselect_compliance": "N/A", # Not supported
"cable_length": float(admin_info[consts.LENGTH_ASSEMBLY_FIELD]),
"nominal_bit_rate": 0, # Not supported
"vendor_date": admin_info[consts.VENDOR_DATE_FIELD],
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD]
}
appl_advt = self.get_application_advertisement()
xcvr_info['application_advertisement'] = str(appl_advt) if len(appl_advt) > 0 else 'N/A'
xcvr_info['host_electrical_interface'] = self.get_host_electrical_interface()
xcvr_info['media_interface_code'] = self.get_module_media_interface()
xcvr_info['host_lane_count'] = self.get_host_lane_count()
xcvr_info['media_lane_count'] = self.get_media_lane_count()
xcvr_info['host_lane_assignment_option'] = self.get_host_lane_assignment_option()
xcvr_info['media_lane_assignment_option'] = self.get_media_lane_assignment_option()
xcvr_info['cable_type'] = self.get_cable_length_type()
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD],
"application_advertisement": str(self.get_application_advertisement()) if len(self.get_application_advertisement()) > 0 else 'N/A',
"host_electrical_interface": self.get_host_electrical_interface(),
"media_interface_code": self.get_module_media_interface(),
"host_lane_count": self.get_host_lane_count(),
"media_lane_count": self.get_media_lane_count(),
"host_lane_assignment_option": self.get_host_lane_assignment_option(),
"media_lane_assignment_option": self.get_media_lane_assignment_option(),
"cable_type": self.get_cable_length_type(),
"media_interface_technology": self.get_media_interface_technology(),
"vendor_rev": self.get_vendor_rev(),
"cmis_rev": self.get_cmis_rev(),
"specification_compliance": self.get_module_media_type()
})
apsel_dict = self.get_active_apsel_hostlane()
for lane in range(1, self.NUM_CHANNELS+1):
xcvr_info["%s%d" % ("active_apsel_hostlane", lane)] = \
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]
xcvr_info['media_interface_technology'] = self.get_media_interface_technology()
xcvr_info['vendor_rev'] = self.get_vendor_rev()
xcvr_info['cmis_rev'] = self.get_cmis_rev()
xcvr_info['specification_compliance'] = self.get_module_media_type()
for lane in range(1, self.NUM_CHANNELS + 1):
self.xcvr_info["%s%d" % ("active_apsel_hostlane", lane)] = \
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]

# In normal case will get a valid value for each of the fields. If get a 'None' value
# means there was a failure while reading the EEPROM, either because the EEPROM was
# not ready yet or experincing some other issues. It shouldn't return a dict with a
# wrong field value, instead should return a 'None' to indicate to XCVRD that retry is
# needed.
if None in xcvr_info.values():
if None in self.xcvr_info.values():
return None
else:
return xcvr_info
return self.xcvr_info

def get_transceiver_info_firmware_versions(self):
return_dict = {"active_firmware" : "N/A", "inactive_firmware" : "N/A"}
Expand Down
17 changes: 10 additions & 7 deletions tests/sonic_xcvr/test_ccmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,29 +160,32 @@ def test_get_pm_all(self, mock_response, expected):

@pytest.mark.parametrize("mock_response, expected",[
(
[ {
(
{
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver'
},
(-20, 0),
(0xff, -72, 120, 191300, 196100)
],
),
{
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'supported_min_laser_freq': 191300,
'supported_max_laser_freq': 196100,
'supported_max_tx_power': 0,
'supported_min_tx_power': -20,

}
)
])
@patch("sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info")
def test_get_transceiver_info(self, get_transceiver_info_func, mock_response, expected):
# Mock the base class method to return initial transceiver data
get_transceiver_info_func.return_value = mock_response[0]
self.api.get_supported_power_config = MagicMock()
self.api.get_supported_power_config.return_value = mock_response[1]
self.api.get_supported_freq_config = MagicMock()
self.api.get_supported_freq_config.return_value = mock_response[2]

# Mock the power and frequency configurations
self.api.get_supported_power_config = MagicMock(return_value = mock_response[1])
self.api.get_supported_freq_config = MagicMock(return_value = mock_response[2])

# Call function under test
result = self.api.get_transceiver_info()
assert result == expected

Expand Down
12 changes: 8 additions & 4 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,7 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
'ext_rateselect_compliance': 'N/A',
'cable_type': 'Length Cable Assembly(m)',
'cable_length': 0.0,
'nominal_bit_rate': 0,
'nominal_bit_rate': 'N/A',
'specification_compliance': 'sm_media_interface',
'application_advertisement': 'N/A',
'media_lane_count': 1,
Expand All @@ -1498,7 +1498,11 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
'media_lane_assignment_option': 1,
'connector': 'LC',
'host_lane_assignment_option': 1,
'vendor_date': '21010100'
'vendor_date': '21010100',
'supported_max_laser_freq': 'N/A',
'supported_max_tx_power': 'N/A',
'supported_min_laser_freq': 'N/A',
'supported_min_tx_power': 'N/A'
}
)
])
Expand Down Expand Up @@ -1526,11 +1530,11 @@ def test_get_transceiver_info(self, mock_response, expected):
self.api.get_cmis_rev = MagicMock()
self.api.get_cmis_rev.return_value = mock_response[10]
self.api.get_module_fw_info = MagicMock()
self.api.get_module_fw_info.return_value = mock_response[14]
self.api.get_module_media_type = MagicMock()
self.api.get_module_media_type.return_value = mock_response[13]
self.api.get_module_hardware_revision = MagicMock()
self.api.get_module_hardware_revision.return_value = '0.0'
self.api.get_module_fw_info.return_value = mock_response[14]
self.api.is_flat_memory = MagicMock()
self.api.is_flat_memory.return_value = False
result = self.api.get_transceiver_info()
Expand Down Expand Up @@ -3082,7 +3086,7 @@ def test_get_error_description(self):
}
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.return_value = 0x10

result = self.api.get_error_description()
assert result is 'OK'

Expand Down
Loading