[Mellanox] Revert LPM implementation to the old way (#17096)
- Why I did it The current low power mode setting implementation requests the user to set the port to admin down first before toggling LP mode, this is not backward compatible, now revert it to the old way so that the user can toggle the LP mode regardless of the port admin status. - How I did it Revert the recent changes related to LPM in PR #14130 and #16545 - How to verify it Run all sfputil and SFP platform API related tests on all the Mellanox platforms. Signed-off-by: Kebo Liu <kebol@nvidia.com>
This commit is contained in:
parent
8c1bd85830
commit
f96742fb98
@ -130,6 +130,10 @@ class Chassis(ChassisBase):
|
||||
if self.sfp_event:
|
||||
self.sfp_event.deinitialize()
|
||||
|
||||
if self._sfp_list:
|
||||
if self.sfp_module.SFP.shared_sdk_handle:
|
||||
self.sfp_module.deinitialize_sdk_handle(self.sfp_module.SFP.shared_sdk_handle)
|
||||
|
||||
@property
|
||||
def RJ45_port_list(self):
|
||||
if not self._RJ45_port_inited:
|
||||
|
@ -35,6 +35,16 @@ try:
|
||||
except ImportError as e:
|
||||
raise ImportError (str(e) + "- required module not found")
|
||||
|
||||
try:
|
||||
# python_sdk_api does not support python3 for now. Daemons like thermalctld or psud
|
||||
# also import this file without actually use the sdk lib. So we catch the ImportError
|
||||
# and ignore it here. Meanwhile, we have to trigger xcvrd using python2 now because it
|
||||
# uses the sdk lib.
|
||||
from python_sdk_api.sxd_api import *
|
||||
from python_sdk_api.sx_api import *
|
||||
except ImportError as e:
|
||||
pass
|
||||
|
||||
# Define the sdk constants
|
||||
SX_PORT_MODULE_STATUS_INITIALIZING = 0
|
||||
SX_PORT_MODULE_STATUS_PLUGGED = 1
|
||||
@ -42,6 +52,15 @@ SX_PORT_MODULE_STATUS_UNPLUGGED = 2
|
||||
SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR = 3
|
||||
SX_PORT_MODULE_STATUS_PLUGGED_DISABLED = 4
|
||||
|
||||
try:
|
||||
if os.environ["PLATFORM_API_UNIT_TESTING"] == "1":
|
||||
# Unable to import SDK constants under unit test
|
||||
# Define them here
|
||||
SX_PORT_ADMIN_STATUS_UP = True
|
||||
SX_PORT_ADMIN_STATUS_DOWN = False
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# identifier value of xSFP module which is in the first byte of the EEPROM
|
||||
# if the identifier value falls into SFP_TYPE_CODE_LIST the module is treated as a SFP module and parsed according to 8472
|
||||
# for QSFP_TYPE_CODE_LIST the module is treated as a QSFP module and parsed according to 8436/8636
|
||||
@ -160,12 +179,52 @@ limited_eeprom = {
|
||||
logger = Logger()
|
||||
|
||||
|
||||
# SDK initializing stuff, called from chassis
|
||||
def initialize_sdk_handle():
|
||||
rc, sdk_handle = sx_api_open(None)
|
||||
if (rc != SX_STATUS_SUCCESS):
|
||||
logger.log_warning("Failed to open api handle, please check whether SDK is running.")
|
||||
sdk_handle = None
|
||||
|
||||
return sdk_handle
|
||||
|
||||
|
||||
def deinitialize_sdk_handle(sdk_handle):
|
||||
if sdk_handle is not None:
|
||||
rc = sx_api_close(sdk_handle)
|
||||
if (rc != SX_STATUS_SUCCESS):
|
||||
logger.log_warning("Failed to close api handle.")
|
||||
|
||||
return rc == SXD_STATUS_SUCCESS
|
||||
else:
|
||||
logger.log_warning("Sdk handle is none")
|
||||
return False
|
||||
|
||||
class SdkHandleContext(object):
|
||||
def __init__(self):
|
||||
self.sdk_handle = None
|
||||
|
||||
def __enter__(self):
|
||||
self.sdk_handle = initialize_sdk_handle()
|
||||
return self.sdk_handle
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
deinitialize_sdk_handle(self.sdk_handle)
|
||||
|
||||
class NvidiaSFPCommon(SfpOptoeBase):
|
||||
def __init__(self, sfp_index):
|
||||
super(NvidiaSFPCommon, self).__init__()
|
||||
self.index = sfp_index + 1
|
||||
self.sdk_index = sfp_index
|
||||
|
||||
@property
|
||||
def sdk_handle(self):
|
||||
if not SFP.shared_sdk_handle:
|
||||
SFP.shared_sdk_handle = initialize_sdk_handle()
|
||||
if not SFP.shared_sdk_handle:
|
||||
logger.log_error('Failed to open SDK handle')
|
||||
return SFP.shared_sdk_handle
|
||||
|
||||
@classmethod
|
||||
def _get_module_info(self, sdk_index):
|
||||
"""
|
||||
@ -185,6 +244,7 @@ class NvidiaSFPCommon(SfpOptoeBase):
|
||||
|
||||
class SFP(NvidiaSFPCommon):
|
||||
"""Platform-specific SFP class"""
|
||||
shared_sdk_handle = None
|
||||
SFP_MLNX_ERROR_DESCRIPTION_LONGRANGE_NON_MLNX_CABLE = 'Long range for non-Mellanox cable or module'
|
||||
SFP_MLNX_ERROR_DESCRIPTION_ENFORCE_PART_NUMBER_LIST = 'Enforce part number list'
|
||||
SFP_MLNX_ERROR_DESCRIPTION_PMD_TYPE_NOT_ENABLED = 'PMD type not enabled'
|
||||
@ -311,6 +371,24 @@ class SFP(NvidiaSFPCommon):
|
||||
return False
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def mgmt_phy_mod_pwr_attr_get(cls, power_attr_type, sdk_handle, sdk_index, slot_id):
|
||||
sx_mgmt_phy_mod_pwr_attr_p = new_sx_mgmt_phy_mod_pwr_attr_t_p()
|
||||
sx_mgmt_phy_mod_pwr_attr = sx_mgmt_phy_mod_pwr_attr_t()
|
||||
sx_mgmt_phy_mod_pwr_attr.power_attr_type = power_attr_type
|
||||
sx_mgmt_phy_mod_pwr_attr_t_p_assign(sx_mgmt_phy_mod_pwr_attr_p, sx_mgmt_phy_mod_pwr_attr)
|
||||
module_id_info = sx_mgmt_module_id_info_t()
|
||||
module_id_info.slot_id = slot_id
|
||||
module_id_info.module_id = sdk_index
|
||||
try:
|
||||
rc = sx_mgmt_phy_module_pwr_attr_get(sdk_handle, module_id_info, sx_mgmt_phy_mod_pwr_attr_p)
|
||||
assert SX_STATUS_SUCCESS == rc, "sx_mgmt_phy_module_pwr_attr_get failed {}".format(rc)
|
||||
sx_mgmt_phy_mod_pwr_attr = sx_mgmt_phy_mod_pwr_attr_t_p_value(sx_mgmt_phy_mod_pwr_attr_p)
|
||||
pwr_mode_attr = sx_mgmt_phy_mod_pwr_attr.pwr_mode_attr
|
||||
return pwr_mode_attr.admin_pwr_mode_e, pwr_mode_attr.oper_pwr_mode_e
|
||||
finally:
|
||||
delete_sx_mgmt_phy_mod_pwr_attr_t_p(sx_mgmt_phy_mod_pwr_attr_p)
|
||||
|
||||
def get_lpmode(self):
|
||||
"""
|
||||
Retrieves the lpmode (low power mode) status of this SFP
|
||||
@ -318,9 +396,36 @@ class SFP(NvidiaSFPCommon):
|
||||
Returns:
|
||||
A Boolean, True if lpmode is enabled, False if disabled
|
||||
"""
|
||||
file_path = SFP_SDK_MODULE_SYSFS_ROOT_TEMPLATE.format(self.sdk_index) + SFP_SYSFS_POWER_MODE
|
||||
power_mode = utils.read_int_from_file(file_path)
|
||||
return power_mode == POWER_MODE_LOW
|
||||
if utils.is_host():
|
||||
# To avoid performance issue,
|
||||
# call class level method to avoid initialize the whole sonic platform API
|
||||
get_lpmode_code = 'from sonic_platform import sfp;\n' \
|
||||
'with sfp.SdkHandleContext() as sdk_handle:' \
|
||||
'print(sfp.SFP._get_lpmode(sdk_handle, {}, {}))'.format(self.sdk_index, self.slot_id)
|
||||
lpm_cmd = ["docker", "exec", "pmon", "python3", "-c", get_lpmode_code]
|
||||
try:
|
||||
output = subprocess.check_output(lpm_cmd, universal_newlines=True)
|
||||
return 'True' in output
|
||||
except subprocess.CalledProcessError as e:
|
||||
print("Error! Unable to get LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
|
||||
return False
|
||||
else:
|
||||
return self._get_lpmode(self.sdk_handle, self.sdk_index, self.slot_id)
|
||||
|
||||
@classmethod
|
||||
def _get_lpmode(cls, sdk_handle, sdk_index, slot_id):
|
||||
"""Class level method to get low power mode.
|
||||
|
||||
Args:
|
||||
sdk_handle: SDK handle
|
||||
sdk_index (integer): SDK port index
|
||||
slot_id (integer): Slot ID
|
||||
|
||||
Returns:
|
||||
[boolean]: True if low power mode is on else off
|
||||
"""
|
||||
_, oper_pwr_mode = cls.mgmt_phy_mod_pwr_attr_get(SX_MGMT_PHY_MOD_PWR_ATTR_PWR_MODE_E, sdk_handle, sdk_index, slot_id)
|
||||
return oper_pwr_mode == SX_MGMT_PHY_MOD_PWR_MODE_LOW_E
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
@ -334,6 +439,128 @@ class SFP(NvidiaSFPCommon):
|
||||
file_path = SFP_SDK_MODULE_SYSFS_ROOT_TEMPLATE.format(self.sdk_index) + SFP_SYSFS_RESET
|
||||
return utils.write_file(file_path, '1')
|
||||
|
||||
|
||||
@classmethod
|
||||
def is_nve(cls, port):
|
||||
return (port & NVE_MASK) != 0
|
||||
|
||||
|
||||
@classmethod
|
||||
def is_cpu(cls, port):
|
||||
return (port & CPU_MASK) != 0
|
||||
|
||||
|
||||
@classmethod
|
||||
def _fetch_port_status(cls, sdk_handle, log_port):
|
||||
oper_state_p = new_sx_port_oper_state_t_p()
|
||||
admin_state_p = new_sx_port_admin_state_t_p()
|
||||
module_state_p = new_sx_port_module_state_t_p()
|
||||
rc = sx_api_port_state_get(sdk_handle, log_port, oper_state_p, admin_state_p, module_state_p)
|
||||
assert rc == SXD_STATUS_SUCCESS, "sx_api_port_state_get failed, rc = %d" % rc
|
||||
|
||||
admin_state = sx_port_admin_state_t_p_value(admin_state_p)
|
||||
oper_state = sx_port_oper_state_t_p_value(oper_state_p)
|
||||
|
||||
delete_sx_port_oper_state_t_p(oper_state_p)
|
||||
delete_sx_port_admin_state_t_p(admin_state_p)
|
||||
delete_sx_port_module_state_t_p(module_state_p)
|
||||
|
||||
return oper_state, admin_state
|
||||
|
||||
|
||||
@classmethod
|
||||
def is_port_admin_status_up(cls, sdk_handle, log_port):
|
||||
_, admin_state = cls._fetch_port_status(sdk_handle, log_port);
|
||||
return admin_state == SX_PORT_ADMIN_STATUS_UP
|
||||
|
||||
|
||||
@classmethod
|
||||
def set_port_admin_status_by_log_port(cls, sdk_handle, log_port, admin_status):
|
||||
rc = sx_api_port_state_set(sdk_handle, log_port, admin_status)
|
||||
if SX_STATUS_SUCCESS != rc:
|
||||
logger.log_error("sx_api_port_state_set failed, rc = %d" % rc)
|
||||
|
||||
return SX_STATUS_SUCCESS == rc
|
||||
|
||||
|
||||
@classmethod
|
||||
def get_logical_ports(cls, sdk_handle, sdk_index, slot_id):
|
||||
# Get all the ports related to the sfp, if port admin status is up, put it to list
|
||||
port_cnt_p = new_uint32_t_p()
|
||||
uint32_t_p_assign(port_cnt_p, 0)
|
||||
rc = sx_api_port_device_get(sdk_handle, DEVICE_ID, SWITCH_ID, None, port_cnt_p)
|
||||
|
||||
assert rc == SX_STATUS_SUCCESS, "sx_api_port_device_get failed, rc = %d" % rc
|
||||
port_cnt = uint32_t_p_value(port_cnt_p)
|
||||
port_attributes_list = new_sx_port_attributes_t_arr(port_cnt)
|
||||
|
||||
rc = sx_api_port_device_get(sdk_handle, DEVICE_ID , SWITCH_ID, port_attributes_list, port_cnt_p)
|
||||
assert rc == SX_STATUS_SUCCESS, "sx_api_port_device_get failed, rc = %d" % rc
|
||||
|
||||
port_cnt = uint32_t_p_value(port_cnt_p)
|
||||
log_port_list = []
|
||||
for i in range(0, port_cnt):
|
||||
port_attributes = sx_port_attributes_t_arr_getitem(port_attributes_list, i)
|
||||
if not cls.is_nve(int(port_attributes.log_port)) \
|
||||
and not cls.is_cpu(int(port_attributes.log_port)) \
|
||||
and port_attributes.port_mapping.module_port == sdk_index \
|
||||
and port_attributes.port_mapping.slot == slot_id \
|
||||
and cls.is_port_admin_status_up(sdk_handle, port_attributes.log_port):
|
||||
log_port_list.append(port_attributes.log_port)
|
||||
|
||||
delete_sx_port_attributes_t_arr(port_attributes_list)
|
||||
delete_uint32_t_p(port_cnt_p)
|
||||
return log_port_list
|
||||
|
||||
|
||||
@classmethod
|
||||
def mgmt_phy_mod_pwr_attr_set(cls, sdk_handle, sdk_index, slot_id, power_attr_type, admin_pwr_mode):
|
||||
result = False
|
||||
sx_mgmt_phy_mod_pwr_attr = sx_mgmt_phy_mod_pwr_attr_t()
|
||||
sx_mgmt_phy_mod_pwr_mode_attr = sx_mgmt_phy_mod_pwr_mode_attr_t()
|
||||
sx_mgmt_phy_mod_pwr_attr.power_attr_type = power_attr_type
|
||||
sx_mgmt_phy_mod_pwr_mode_attr.admin_pwr_mode_e = admin_pwr_mode
|
||||
sx_mgmt_phy_mod_pwr_attr.pwr_mode_attr = sx_mgmt_phy_mod_pwr_mode_attr
|
||||
sx_mgmt_phy_mod_pwr_attr_p = new_sx_mgmt_phy_mod_pwr_attr_t_p()
|
||||
sx_mgmt_phy_mod_pwr_attr_t_p_assign(sx_mgmt_phy_mod_pwr_attr_p, sx_mgmt_phy_mod_pwr_attr)
|
||||
module_id_info = sx_mgmt_module_id_info_t()
|
||||
module_id_info.slot_id = slot_id
|
||||
module_id_info.module_id = sdk_index
|
||||
try:
|
||||
rc = sx_mgmt_phy_module_pwr_attr_set(sdk_handle, SX_ACCESS_CMD_SET, module_id_info, sx_mgmt_phy_mod_pwr_attr_p)
|
||||
if SX_STATUS_SUCCESS != rc:
|
||||
logger.log_error("Error occurred when setting power mode for SFP module {}, slot {}, error code {}".format(sdk_index, slot_id, rc))
|
||||
result = False
|
||||
else:
|
||||
result = True
|
||||
finally:
|
||||
delete_sx_mgmt_phy_mod_pwr_attr_t_p(sx_mgmt_phy_mod_pwr_attr_p)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@classmethod
|
||||
def _set_lpmode_raw(cls, sdk_handle, sdk_index, slot_id, ports, attr_type, power_mode):
|
||||
result = False
|
||||
# Check if the module already works in the same mode
|
||||
admin_pwr_mode, oper_pwr_mode = cls.mgmt_phy_mod_pwr_attr_get(attr_type, sdk_handle, sdk_index, slot_id)
|
||||
if (power_mode == SX_MGMT_PHY_MOD_PWR_MODE_LOW_E and oper_pwr_mode == SX_MGMT_PHY_MOD_PWR_MODE_LOW_E) \
|
||||
or (power_mode == SX_MGMT_PHY_MOD_PWR_MODE_AUTO_E and admin_pwr_mode == SX_MGMT_PHY_MOD_PWR_MODE_AUTO_E):
|
||||
return True
|
||||
try:
|
||||
# Bring the port down
|
||||
for port in ports:
|
||||
cls.set_port_admin_status_by_log_port(sdk_handle, port, SX_PORT_ADMIN_STATUS_DOWN)
|
||||
# Set the desired power mode
|
||||
result = cls.mgmt_phy_mod_pwr_attr_set(sdk_handle, sdk_index, slot_id, attr_type, power_mode)
|
||||
finally:
|
||||
# Bring the port up
|
||||
for port in ports:
|
||||
cls.set_port_admin_status_by_log_port(sdk_handle, port, SX_PORT_ADMIN_STATUS_UP)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def set_lpmode(self, lpmode):
|
||||
"""
|
||||
Sets the lpmode (low power mode) of SFP
|
||||
@ -345,14 +572,38 @@ class SFP(NvidiaSFPCommon):
|
||||
Returns:
|
||||
A boolean, True if lpmode is set successfully, False if not
|
||||
"""
|
||||
print('\nNotice: please set port admin status to down before setting power mode, ignore this message if already set')
|
||||
file_path = SFP_SDK_MODULE_SYSFS_ROOT_TEMPLATE.format(self.sdk_index) + SFP_SYSFS_POWER_MODE_POLICY
|
||||
target_admin_mode = POWER_MODE_POLICY_AUTO if lpmode else POWER_MODE_POLICY_HIGH
|
||||
current_admin_mode = utils.read_int_from_file(file_path)
|
||||
if current_admin_mode == target_admin_mode:
|
||||
return True
|
||||
if utils.is_host():
|
||||
# To avoid performance issue,
|
||||
# call class level method to avoid initialize the whole sonic platform API
|
||||
set_lpmode_code = 'from sonic_platform import sfp;\n' \
|
||||
'with sfp.SdkHandleContext() as sdk_handle:' \
|
||||
'print(sfp.SFP._set_lpmode({}, sdk_handle, {}, {}))' \
|
||||
.format('True' if lpmode else 'False', self.sdk_index, self.slot_id)
|
||||
lpm_cmd = ["docker", "exec", "pmon", "python3", "-c", set_lpmode_code]
|
||||
|
||||
return utils.write_file(file_path, str(target_admin_mode))
|
||||
# Set LPM
|
||||
try:
|
||||
output = subprocess.check_output(lpm_cmd, universal_newlines=True)
|
||||
return 'True' in output
|
||||
except subprocess.CalledProcessError as e:
|
||||
print("Error! Unable to set LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
|
||||
return False
|
||||
else:
|
||||
return self._set_lpmode(lpmode, self.sdk_handle, self.sdk_index, self.slot_id)
|
||||
|
||||
|
||||
@classmethod
|
||||
def _set_lpmode(cls, lpmode, sdk_handle, sdk_index, slot_id):
|
||||
log_port_list = cls.get_logical_ports(sdk_handle, sdk_index, slot_id)
|
||||
sdk_lpmode = SX_MGMT_PHY_MOD_PWR_MODE_LOW_E if lpmode else SX_MGMT_PHY_MOD_PWR_MODE_AUTO_E
|
||||
cls._set_lpmode_raw(sdk_handle,
|
||||
sdk_index,
|
||||
slot_id,
|
||||
log_port_list,
|
||||
SX_MGMT_PHY_MOD_PWR_ATTR_PWR_MODE_E,
|
||||
sdk_lpmode)
|
||||
logger.log_info("{} low power mode for module {}, slot {}".format("Enabled" if lpmode else "Disabled", sdk_index, slot_id))
|
||||
return True
|
||||
|
||||
def is_replaceable(self):
|
||||
"""
|
||||
@ -552,6 +803,7 @@ class RJ45Port(NvidiaSFPCommon):
|
||||
def get_presence(self):
|
||||
"""
|
||||
Retrieves the presence of the device
|
||||
For RJ45 ports, it always return True
|
||||
|
||||
Returns:
|
||||
bool: True if device is present, False if not
|
||||
|
@ -55,6 +55,7 @@ class TestSfp:
|
||||
assert sfp.index == 5
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP.read_eeprom', mock.MagicMock(return_value=None))
|
||||
@mock.patch('sonic_platform.sfp.SFP.shared_sdk_handle', mock.MagicMock(return_value=2))
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_module_info')
|
||||
@mock.patch('sonic_platform.chassis.Chassis.get_num_sfps', mock.MagicMock(return_value=2))
|
||||
@mock.patch('sonic_platform.chassis.extract_RJ45_ports_index', mock.MagicMock(return_value=[]))
|
||||
@ -142,6 +143,14 @@ class TestSfp:
|
||||
handle.read.side_effect = OSError('')
|
||||
assert sfp.read_eeprom(0, 1) is None
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP._fetch_port_status')
|
||||
def test_is_port_admin_status_up(self, mock_port_status):
|
||||
mock_port_status.return_value = (0, True)
|
||||
assert SFP.is_port_admin_status_up(None, None)
|
||||
|
||||
mock_port_status.return_value = (0, False)
|
||||
assert not SFP.is_port_admin_status_up(None, None)
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_eeprom_path', mock.MagicMock(return_value = None))
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_sfp_type_str')
|
||||
def test_is_write_protected(self, mock_get_type_str):
|
||||
@ -247,27 +256,6 @@ class TestSfp:
|
||||
assert sfp.reset()
|
||||
mock_write.assert_called_with('/sys/module/sx_core/asic0/module0/reset', '1')
|
||||
|
||||
@mock.patch('sonic_platform.utils.read_int_from_file')
|
||||
def test_get_lpmode(self, mock_read_int):
|
||||
sfp = SFP(0)
|
||||
mock_read_int.return_value = 1
|
||||
assert sfp.get_lpmode()
|
||||
mock_read_int.assert_called_with('/sys/module/sx_core/asic0/module0/power_mode')
|
||||
|
||||
mock_read_int.return_value = 2
|
||||
assert not sfp.get_lpmode()
|
||||
|
||||
@mock.patch('sonic_platform.utils.write_file')
|
||||
@mock.patch('sonic_platform.utils.read_int_from_file')
|
||||
def test_set_lpmode(self, mock_read_int, mock_write):
|
||||
sfp = SFP(0)
|
||||
mock_read_int.return_value = 1
|
||||
assert sfp.set_lpmode(False)
|
||||
assert mock_write.call_count == 0
|
||||
|
||||
assert sfp.set_lpmode(True)
|
||||
mock_write.assert_called_with('/sys/module/sx_core/asic0/module0/power_mode_policy', '2')
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP.read_eeprom')
|
||||
def test_get_xcvr_api(self, mock_read):
|
||||
sfp = SFP(0)
|
||||
@ -289,3 +277,25 @@ class TestSfp:
|
||||
assert sfp.get_transceiver_bulk_status()
|
||||
assert sfp.get_transceiver_threshold_info()
|
||||
sfp.reinit()
|
||||
|
||||
@mock.patch('sonic_platform.utils.is_host', mock.MagicMock(side_effect = [True, True, False, False]))
|
||||
@mock.patch('subprocess.check_output', mock.MagicMock(side_effect = ['True', 'False']))
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_lpmode', mock.MagicMock(side_effect = [True, False]))
|
||||
@mock.patch('sonic_platform.sfp.SFP.sdk_handle', mock.MagicMock(return_value = None))
|
||||
def test_get_lpmode(self):
|
||||
sfp = SFP(0)
|
||||
assert sfp.get_lpmode()
|
||||
assert not sfp.get_lpmode()
|
||||
assert sfp.get_lpmode()
|
||||
assert not sfp.get_lpmode()
|
||||
|
||||
@mock.patch('sonic_platform.utils.is_host', mock.MagicMock(side_effect = [True, True, False, False]))
|
||||
@mock.patch('subprocess.check_output', mock.MagicMock(side_effect = ['True', 'False']))
|
||||
@mock.patch('sonic_platform.sfp.SFP._set_lpmode', mock.MagicMock(side_effect = [True, False]))
|
||||
@mock.patch('sonic_platform.sfp.SFP.sdk_handle', mock.MagicMock(return_value = None))
|
||||
def test_set_lpmode(self):
|
||||
sfp = SFP(0)
|
||||
assert sfp.set_lpmode(True)
|
||||
assert not sfp.set_lpmode(True)
|
||||
assert sfp.set_lpmode(False)
|
||||
assert not sfp.set_lpmode(False)
|
||||
|
Loading…
Reference in New Issue
Block a user