sonic-buildimage/platform/mellanox/mlnx-platform-api/tests/test_sfp.py
2022-11-03 11:17:44 -07:00

241 lines
9.6 KiB
Python

#
# Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES.
# Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import ctypes
import os
import pytest
import shutil
import sys
if sys.version_info.major == 3:
from unittest import mock
else:
import mock
test_path = os.path.dirname(os.path.abspath(__file__))
modules_path = os.path.dirname(test_path)
sys.path.insert(0, modules_path)
from sonic_platform.sfp import SFP, SX_PORT_MODULE_STATUS_INITIALIZING, SX_PORT_MODULE_STATUS_PLUGGED, SX_PORT_MODULE_STATUS_UNPLUGGED, SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR, SX_PORT_MODULE_STATUS_PLUGGED_DISABLED
from sonic_platform.chassis import Chassis
class TestSfp:
@mock.patch('sonic_platform.device_data.DeviceDataManager.get_linecard_count', mock.MagicMock(return_value=8))
@mock.patch('sonic_platform.device_data.DeviceDataManager.get_linecard_max_port_count')
def test_sfp_index(self, mock_max_port):
sfp = SFP(0)
assert sfp.sdk_index == 0
assert sfp.index == 1
mock_max_port.return_value = 16
sfp = SFP(sfp_index=0, slot_id=1, linecard_port_count=16, lc_name='LINE-CARD1')
assert sfp.sdk_index == 0
assert sfp.index == 1
sfp = SFP(sfp_index=5, slot_id=3, linecard_port_count=16, lc_name='LINE-CARD1')
assert sfp.sdk_index == 5
assert sfp.index == 38
sfp = SFP(sfp_index=1, slot_id=1, linecard_port_count=4, lc_name='LINE-CARD1')
assert sfp.sdk_index == 1
assert sfp.index == 5
@mock.patch('sonic_platform.sfp.SFP.read_eeprom', mock.MagicMock(return_value=None))
@mock.patch('sonic_platform.sfp.SFP.shared_sdk_handle', mock.MagicMock(return_value=2))
@mock.patch('sonic_platform.sfp.SFP._get_module_info')
@mock.patch('sonic_platform.chassis.Chassis.get_num_sfps', mock.MagicMock(return_value=2))
@mock.patch('sonic_platform.chassis.extract_RJ45_ports_index', mock.MagicMock(return_value=[]))
def test_sfp_get_error_status(self, mock_get_error_code):
chassis = Chassis()
# Fetch an SFP module to test
sfp = chassis.get_sfp(1)
description_dict = sfp._get_error_description_dict()
for error in description_dict.keys():
mock_get_error_code.return_value = (SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR, error)
description = sfp.get_error_description()
assert description == description_dict[error]
mock_get_error_code.return_value = (SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR, -1)
description = sfp.get_error_description()
assert description == "Unknown error (-1)"
expected_description_list = [
(SX_PORT_MODULE_STATUS_INITIALIZING, "Initializing"),
(SX_PORT_MODULE_STATUS_PLUGGED, "OK"),
(SX_PORT_MODULE_STATUS_UNPLUGGED, "Unplugged"),
(SX_PORT_MODULE_STATUS_PLUGGED_DISABLED, "Disabled")
]
for oper_code, expected_description in expected_description_list:
mock_get_error_code.return_value = (oper_code, -1)
description = sfp.get_error_description()
assert description == expected_description
@mock.patch('sonic_platform.sfp.SFP._get_page_and_page_offset')
@mock.patch('sonic_platform.sfp.SFP._is_write_protected')
def test_sfp_write_eeprom(self, mock_limited_eeprom, mock_get_page):
sfp = SFP(0)
assert not sfp.write_eeprom(0, 1, bytearray())
mock_get_page.return_value = (None, None, None)
assert not sfp.write_eeprom(0, 1, bytearray([1]))
mock_get_page.return_value = (0, '/tmp/mock_page', 0)
mock_limited_eeprom.return_value = True
assert not sfp.write_eeprom(0, 1, bytearray([1]))
mock_limited_eeprom.return_value = False
mo = mock.mock_open()
print('after mock open')
with mock.patch('sonic_platform.sfp.open', mo):
handle = mo()
handle.write.return_value = 1
assert sfp.write_eeprom(0, 1, bytearray([1]))
handle.seek.assert_called_once_with(0)
handle.write.assert_called_once_with(bytearray([1]))
handle.write.return_value = -1
assert not sfp.write_eeprom(0, 1, bytearray([1]))
handle.write.return_value = 1
ctypes.set_errno(1)
assert not sfp.write_eeprom(0, 1, bytearray([1]))
ctypes.set_errno(0)
handle.write.side_effect = OSError('')
assert not sfp.write_eeprom(0, 1, bytearray([1]))
@mock.patch('sonic_platform.sfp.SFP.get_mst_pci_device', mock.MagicMock(return_value = None))
@mock.patch('sonic_platform.sfp.SFP._get_page_and_page_offset')
def test_sfp_read_eeprom(self, mock_get_page):
sfp = SFP(0)
mock_get_page.return_value = (None, None, None)
assert sfp.read_eeprom(0, 1) is None
mock_get_page.return_value = (0, '/tmp/mock_page', 0)
mo = mock.mock_open()
with mock.patch('sonic_platform.sfp.open', mo):
handle = mo()
handle.read.return_value = b'\x00'
assert sfp.read_eeprom(0, 1) == bytearray([0])
handle.seek.assert_called_once_with(0)
ctypes.set_errno(1)
assert sfp.read_eeprom(0, 1) is None
ctypes.set_errno(0)
handle.read.side_effect = OSError('')
assert sfp.read_eeprom(0, 1) is None
@mock.patch('sonic_platform.sfp.SFP._fetch_port_status')
def test_is_port_admin_status_up(self, mock_port_status):
mock_port_status.return_value = (0, True)
assert SFP.is_port_admin_status_up(None, None)
mock_port_status.return_value = (0, False)
assert not SFP.is_port_admin_status_up(None, None)
@mock.patch('sonic_platform.sfp.SFP._get_eeprom_path', mock.MagicMock(return_value = None))
@mock.patch('sonic_platform.sfp.SFP._get_sfp_type_str')
def test_is_write_protected(self, mock_get_type_str):
sfp = SFP(0)
mock_get_type_str.return_value = 'cmis'
assert sfp._is_write_protected(page=0, page_offset=26, num_bytes=1)
assert not sfp._is_write_protected(page=0, page_offset=27, num_bytes=1)
# not exist page
assert not sfp._is_write_protected(page=3, page_offset=0, num_bytes=1)
# invalid sfp type str
mock_get_type_str.return_value = 'invalid'
assert not sfp._is_write_protected(page=0, page_offset=0, num_bytes=1)
def test_get_sfp_type_str(self):
sfp = SFP(0)
expect_sfp_types = ['cmis', 'sff8636', 'sff8472']
mock_eeprom_path = '/tmp/mock_eeprom'
mock_dir = '/tmp/mock_eeprom/0/i2c-0x50'
os.makedirs(os.path.join(mock_dir), exist_ok=True)
for expect_sfp_type in expect_sfp_types:
source_eeprom_file = os.path.join(test_path, 'input_platform', expect_sfp_type + '_page0')
shutil.copy(source_eeprom_file, os.path.join(mock_dir, 'data'))
assert sfp._get_sfp_type_str(mock_eeprom_path) == expect_sfp_type
sfp._sfp_type_str = None
os.system('rm -rf {}'.format(mock_eeprom_path))
assert sfp._get_sfp_type_str('invalid') is None
@mock.patch('os.path.exists')
@mock.patch('sonic_platform.sfp.SFP._get_eeprom_path')
@mock.patch('sonic_platform.sfp.SFP._get_sfp_type_str')
def test_get_page_and_page_offset(self, mock_get_type_str, mock_eeprom_path, mock_path_exists):
sfp = SFP(0)
mock_path_exists.return_value = False
page_num, page, page_offset = sfp._get_page_and_page_offset(0)
assert page_num is None
assert page is None
assert page_offset is None
mock_path_exists.return_value = True
mock_eeprom_path.return_value = '/tmp'
page_num, page, page_offset = sfp._get_page_and_page_offset(255)
assert page_num == 0
assert page == '/tmp/0/i2c-0x50/data'
assert page_offset is 255
mock_get_type_str.return_value = 'cmis'
page_num, page, page_offset = sfp._get_page_and_page_offset(256)
assert page_num == 1
assert page == '/tmp/1/data'
assert page_offset is 0
mock_get_type_str.return_value = 'sff8472'
page_num, page, page_offset = sfp._get_page_and_page_offset(511)
assert page_num == -1
assert page == '/tmp/0/i2c-0x51/data'
assert page_offset is 255
page_num, page, page_offset = sfp._get_page_and_page_offset(512)
assert page_num == 1
assert page == '/tmp/1/data'
assert page_offset is 0
@mock.patch('sonic_platform.sfp.SFP._read_eeprom')
def test_get_presence(self, mock_read_eeprom):
sfp = SFP(0)
mock_read_eeprom.return_value = None
assert not sfp.get_presence()
mock_read_eeprom.return_value = bytearray([1])
assert sfp.get_presence()
@mock.patch('sonic_platform.sfp.SFP.get_xcvr_api')
def test_dummy_apis(self, mock_get_xcvr_api):
mock_api = mock.MagicMock()
mock_api.NUM_CHANNELS = 4
mock_get_xcvr_api.return_value = mock_api
sfp = SFP(0)
assert sfp.get_rx_los() == [False] * 4
assert sfp.get_tx_fault() == [False] * 4
mock_get_xcvr_api.return_value = None
assert sfp.get_rx_los() is None
assert sfp.get_tx_fault() is None