Skip to content

Commit

Permalink
Use select in xcvrd tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Junchao-Mellanox committed Sep 26, 2021
1 parent e0402a5 commit da0761c
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 253 deletions.
157 changes: 85 additions & 72 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from xcvrd.xcvrd import *
from xcvrd.xcvrd_utilities.y_cable_helper import *
from xcvrd.xcvrd_utilities.sfp_status_helper import *
from xcvrd.xcvrd_utilities.port_mapping import *
#from xcvrd.xcvrd_utilities.port_mapping import PortMapping, subscribe_port_config_change, handle_port_config_change

class TestXcvrdScript(object):

Expand Down Expand Up @@ -409,62 +411,61 @@ def test_is_error_sfp_status(self):
@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table):
mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('PortConfigDone', None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.wait_for_port_config_done('')
assert swsscommon.Select.select.call_count == 2

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_DaemonXcvrd_handle_port_config_change(self, mock_select, mock_sub_table):
def test_handle_port_config_change(self, mock_select, mock_sub_table):
mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True])
mock_observer = MagicMock()
mock_observer.notify_port_change_event = MagicMock()
xcvrd.subscribe_port_change_event(mock_observer)

xcvrd.handle_port_config_change()
assert mock_observer.notify_port_change_event.call_count == 1
assert xcvrd.port_mapping.logical_port_list.count('Ethernet0')
assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0
assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0']
assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1]

xcvrd.stop_event.is_set = MagicMock(side_effect=[False, True])

sel, asic_context = subscribe_port_config_change()
port_mapping = PortMapping()
stop_event = threading.Event()
stop_event.is_set = MagicMock(return_value=False)
logger = MagicMock()
handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_mapping.handle_port_change_event)

assert port_mapping.logical_port_list.count('Ethernet0')
assert port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0
assert port_mapping.get_physical_to_logical(1) == ['Ethernet0']
assert port_mapping.get_logical_to_physical('Ethernet0') == [1]

mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.DEL_COMMAND, (('index', '1'), )), (None, None, None)])
xcvrd.handle_port_config_change()
assert mock_observer.notify_port_change_event.call_count == 2
assert not xcvrd.port_mapping.logical_port_list
assert not xcvrd.port_mapping.logical_to_physical
assert not xcvrd.port_mapping.physical_to_logical
assert not xcvrd.port_mapping.logical_to_asic
handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_mapping.handle_port_change_event)
assert not port_mapping.logical_port_list
assert not port_mapping.logical_to_physical
assert not port_mapping.physical_to_logical
assert not port_mapping.logical_to_asic


@patch('swsscommon.swsscommon.Table')
def test_DaemonXcvrd_init_port_mapping(self, mock_swsscommon_table):
def test_get_port_mapping(self, mock_swsscommon_table):
mock_table = MagicMock()
mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4'])
mock_table.get = MagicMock(side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))])
mock_swsscommon_table.return_value = mock_table
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.init_port_mapping()
assert xcvrd.port_mapping.logical_port_list.count('Ethernet0')
assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0
assert xcvrd.port_mapping.get_physical_to_logical(1) == ['Ethernet0']
assert xcvrd.port_mapping.get_logical_to_physical('Ethernet0') == [1]
port_mapping = get_port_mapping()
assert port_mapping.logical_port_list.count('Ethernet0')
assert port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0
assert port_mapping.get_physical_to_logical(1) == ['Ethernet0']
assert port_mapping.get_logical_to_physical('Ethernet0') == [1]

assert xcvrd.port_mapping.logical_port_list.count('Ethernet4')
assert xcvrd.port_mapping.get_asic_id_for_logical_port('Ethernet4') == 0
assert xcvrd.port_mapping.get_physical_to_logical(2) == ['Ethernet4']
assert xcvrd.port_mapping.get_logical_to_physical('Ethernet4') == [2]
assert port_mapping.logical_port_list.count('Ethernet4')
assert port_mapping.get_asic_id_for_logical_port('Ethernet4') == 0
assert port_mapping.get_physical_to_logical(2) == ['Ethernet4']
assert port_mapping.get_logical_to_physical('Ethernet4') == [2]


@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_DaemonXcvrd_wait_for_port_config_done(self, mock_select, mock_sub_table):
mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('PortConfigDone', None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.wait_for_port_config_done('')
assert swsscommon.Select.select.call_count == 2

@patch('xcvrd.xcvrd.DaemonXcvrd.load_platform_util', MagicMock())
@patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', MagicMock(return_value=('/tmp', None)))
Expand All @@ -479,20 +480,20 @@ def test_DaemonXcvrd_init_deinit(self):

@patch('xcvrd.xcvrd.DaemonXcvrd.init')
@patch('xcvrd.xcvrd.DaemonXcvrd.deinit')
@patch('xcvrd.xcvrd.DaemonXcvrd.handle_port_config_change')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_run')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_run')
@patch('xcvrd.xcvrd.DomInfoUpdateTask.task_stop')
@patch('xcvrd.xcvrd.SfpStateUpdateTask.task_stop')
def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_handle_port_config_change, mock_deinit, mock_init):
def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init):
mock_init.return_value = (PortMapping(), set())
xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER)
xcvrd.stop_event.wait = MagicMock()
xcvrd.run()
# TODO: more check
assert mock_task_stop1.call_count == 1
assert mock_task_stop2.call_count == 1
assert mock_task_run1.call_count == 1
assert mock_task_run2.call_count == 1
assert mock_handle_port_config_change.call_count == 1
assert mock_deinit.call_count == 1
assert mock_init.call_count == 1

Expand All @@ -501,21 +502,21 @@ def test_DomInfoUpdateTask_handle_port_change_event(self):
port_mapping = PortMapping()
task = DomInfoUpdateTask(port_mapping)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.notify_port_change_event(port_change_event)
task.handle_port_change_event()
task.on_port_config_change(port_change_event)
assert task.port_mapping.logical_port_list.count('Ethernet0')
assert task.port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0
assert task.port_mapping.get_physical_to_logical(1) == ['Ethernet0']
assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1]

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE)
task.notify_port_change_event(port_change_event)
task.handle_port_change_event()
task.on_port_config_change(port_change_event)
assert not task.port_mapping.logical_port_list
assert not task.port_mapping.logical_to_physical
assert not task.port_mapping.physical_to_logical
assert not task.port_mapping.logical_to_asic

@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
def test_DomInfoUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
task = DomInfoUpdateTask(port_mapping)
Expand All @@ -527,12 +528,18 @@ def test_DomInfoUpdateTask_task_run_stop(self):
@patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status')
@patch('xcvrd.xcvrd.post_port_dom_info_to_db')
@patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db')
def test_DomInfoUpdateTask_task_worker(self, mock_post_dom_th, mock_post_dom_info, mock_detect_error):
@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_DomInfoUpdateTask_task_worker(self, mock_select, mock_sub_table, mock_post_dom_th, mock_post_dom_info, mock_detect_error):
mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable

port_mapping = PortMapping()
task = DomInfoUpdateTask(port_mapping)
task.task_stopping_event.wait = MagicMock(side_effect=[False, True])
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.notify_port_change_event(port_change_event)
mock_detect_error.return_value = True
task.task_worker([False])
assert task.port_mapping.logical_port_list.count('Ethernet0')
Expand All @@ -557,12 +564,12 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper):
mock_table_helper.get_dom_tbl = MagicMock(return_value=mock_table)
stopping_event = multiprocessing.Event()
port_mapping = PortMapping()
task = SfpStateUpdateTask(port_mapping)
retry_eeprom_set = set()
task = SfpStateUpdateTask(port_mapping, retry_eeprom_set)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.notify_port_change_event(port_change_event)
wait_time = 5
while wait_time > 0:
task.handle_port_change_event(task.event_queue, stopping_event, [False])
task.on_port_config_change(stopping_event, [False], port_change_event)
if task.port_mapping.logical_port_list:
break
wait_time -= 1
Expand All @@ -573,10 +580,9 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper):
assert task.port_mapping.get_logical_to_physical('Ethernet0') == [1]

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE)
task.notify_port_change_event(port_change_event)
wait_time = 5
while wait_time > 0:
task.handle_port_change_event(task.event_queue, stopping_event, [False])
task.on_port_config_change(stopping_event, [False], port_change_event)
if not task.port_mapping.logical_port_list:
break
wait_time -= 1
Expand All @@ -588,7 +594,8 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper):

def test_SfpStateUpdateTask_task_run_stop(self):
port_mapping = PortMapping()
task = SfpStateUpdateTask(port_mapping)
retry_eeprom_set = set()
task = SfpStateUpdateTask(port_mapping, retry_eeprom_set)
sfp_error_event = multiprocessing.Event()
task.task_run(sfp_error_event, [False])
assert wait_until(5, 1, task.task_process.is_alive)
Expand All @@ -599,7 +606,8 @@ def test_SfpStateUpdateTask_task_run_stop(self):
@patch('xcvrd.xcvrd.post_port_sfp_info_to_db')
def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info):
port_mapping = PortMapping()
task = SfpStateUpdateTask(port_mapping)
retry_eeprom_set = set()
task = SfpStateUpdateTask(port_mapping, retry_eeprom_set)
task.retry_eeprom_reading()
assert mock_post_sfp_info.call_count == 0

Expand All @@ -620,7 +628,8 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info):

def test_SfpStateUpdateTask_mapping_event_from_change_event(self):
port_mapping = PortMapping()
task = SfpStateUpdateTask(port_mapping)
retry_eeprom_set = set()
task = SfpStateUpdateTask(port_mapping, retry_eeprom_set)
port_dict = {}
assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL
assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL
Expand All @@ -638,6 +647,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self):
@patch('time.sleep', MagicMock())
@patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock())
@patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock())
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
@patch('os.kill')
@patch('xcvrd.xcvrd.SfpStateUpdateTask._mapping_event_from_change_event')
@patch('xcvrd.xcvrd._wrapper_get_transceiver_change_event')
Expand All @@ -649,30 +660,31 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self):
@patch('xcvrd.xcvrd.update_port_transceiver_status_table')
def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_info, mock_post_dom_info, mock_post_dom_th, mock_update_media_setting, mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill):
port_mapping = PortMapping()
task = SfpStateUpdateTask(port_mapping)
retry_eeprom_set = set()
task = SfpStateUpdateTask(port_mapping, retry_eeprom_set)
stop_event = multiprocessing.Event()
sfp_error_event = multiprocessing.Event()
mock_change_event.return_value = (True, {0:0}, {})
mock_mapping_event.return_value = SYSTEM_NOT_READY

# Test state machine: STATE_INIT + SYSTEM_NOT_READY event => STATE_INIT + SYSTEM_NOT_READY event ... => STATE_EXIT
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_os_kill.call_count == 1
assert sfp_error_event.is_set()

mock_mapping_event.return_value = SYSTEM_FAIL
mock_os_kill.reset_mock()
sfp_error_event.clear()
# Test state machine: STATE_INIT + SYSTEM_FAIL event => STATE_INIT + SYSTEM_FAIL event ... => STATE_EXIT
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_os_kill.call_count == 1
assert sfp_error_event.is_set()

mock_mapping_event.side_effect = [SYSTEM_BECOME_READY, SYSTEM_NOT_READY]
mock_os_kill.reset_mock()
sfp_error_event.clear()
# Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_NOT_READY event ... => STATE_EXIT
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_os_kill.call_count == 1
assert not sfp_error_event.is_set()

Expand All @@ -681,7 +693,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_
sfp_error_event.clear()
# Test state machine: STATE_INIT + SYSTEM_BECOME_READY event => STATE_NORMAL + SYSTEM_FAIL event ... => STATE_INIT
# + SYSTEM_FAIL event ... => STATE_EXIT
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_os_kill.call_count == 1
assert sfp_error_event.is_set()

Expand All @@ -692,7 +704,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_
mock_post_sfp_info.return_value = SFP_EEPROM_NOT_READY
stop_event.is_set = MagicMock(side_effect=[False, True])
# Test state machine: handle SFP insert event, but EEPROM read failure
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_updata_status.call_count == 1
assert mock_post_sfp_info.call_count == 2 # first call and retry call
assert mock_post_dom_info.call_count == 0
Expand All @@ -706,7 +718,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_
mock_updata_status.reset_mock()
mock_post_sfp_info.reset_mock()
# Test state machine: handle SFP insert event, and EEPROM read success
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_updata_status.call_count == 1
assert mock_post_sfp_info.call_count == 1
assert mock_post_dom_info.call_count == 1
Expand All @@ -717,7 +729,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_
mock_change_event.return_value = (True, {1:SFP_STATUS_REMOVED}, {})
mock_updata_status.reset_mock()
# Test state machine: handle SFP remove event
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_updata_status.call_count == 1
assert mock_del_dom.call_count == 1

Expand All @@ -727,7 +739,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_
mock_updata_status.reset_mock()
mock_del_dom.reset_mock()
# Test state machine: handle SFP error event
task.task_worker(stop_event, sfp_error_event, [False], task.event_queue)
task.task_worker(stop_event, sfp_error_event, [False])
assert mock_updata_status.call_count == 1
assert mock_del_dom.call_count == 1

Expand Down Expand Up @@ -756,7 +768,8 @@ class MockTable:
mock_table_helper.get_dom_tbl = MagicMock(return_value=dom_tbl)

port_mapping = PortMapping()
task = SfpStateUpdateTask(port_mapping)
retry_eeprom_set = set()
task = SfpStateUpdateTask(port_mapping, retry_eeprom_set)
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.port_mapping.handle_port_change_event(port_change_event)
# SFP information is in the DB, copy the SFP information for the newly added logical port
Expand Down
Loading

0 comments on commit da0761c

Please sign in to comment.