Conflicts: platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py
This commit is contained in:
parent
bb68cbcd3d
commit
b57b17318d
@ -23,6 +23,7 @@
|
||||
#############################################################################
|
||||
|
||||
try:
|
||||
import ctypes
|
||||
import subprocess
|
||||
import os
|
||||
from sonic_platform_base.sonic_eeprom import eeprom_dts
|
||||
@ -126,11 +127,42 @@ CPU_MASK = PORT_TYPE_MASK & (PORT_TYPE_CPU << PORT_TYPE_OFFSET)
|
||||
SFP_STATUS_INSERTED = '1'
|
||||
|
||||
# SFP constants
|
||||
SFP_PAGE_SIZE = 256
|
||||
SFP_UPPER_PAGE_OFFSET = 128
|
||||
SFP_VENDOR_PAGE_START = 640
|
||||
SFP_PAGE_SIZE = 256 # page size of page0h
|
||||
SFP_UPPER_PAGE_OFFSET = 128 # page size of other pages
|
||||
|
||||
BYTES_IN_DWORD = 4
|
||||
# SFP sysfs path constants
|
||||
SFP_PAGE0_PATH = '0/i2c-0x50/data'
|
||||
SFP_A2H_PAGE0_PATH = '0/i2c-0x51/data'
|
||||
SFP_EEPROM_ROOT_TEMPLATE = '/sys/module/sx_core/asic0/module{}/eeprom/pages'
|
||||
|
||||
# SFP type constants
|
||||
SFP_TYPE_CMIS = 'cmis'
|
||||
SFP_TYPE_SFF8472 = 'sff8472'
|
||||
SFP_TYPE_SFF8636 = 'sff8636'
|
||||
|
||||
# SFP stderr
|
||||
SFP_EEPROM_NOT_AVAILABLE = 'Input/output error'
|
||||
|
||||
# SFP EEPROM limited bytes
|
||||
limited_eeprom = {
|
||||
SFP_TYPE_CMIS: {
|
||||
'write': {
|
||||
0: [26, (31, 36), (126, 127)],
|
||||
16: [(0, 128)]
|
||||
}
|
||||
},
|
||||
SFP_TYPE_SFF8472: {
|
||||
'write': {
|
||||
0: [110, (114, 115), 118, 127]
|
||||
}
|
||||
},
|
||||
SFP_TYPE_SFF8636: {
|
||||
'write': {
|
||||
0: [(86, 88), 93, (98, 99), (100, 106), 127],
|
||||
3: [(230, 241), (242, 251)]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Global logger class instance
|
||||
logger = Logger()
|
||||
@ -157,75 +189,6 @@ def deinitialize_sdk_handle(sdk_handle):
|
||||
logger.log_warning("Sdk handle is none")
|
||||
return False
|
||||
|
||||
class MlxregManager:
|
||||
def __init__(self, mst_pci_device, slot_id, sdk_index):
|
||||
self.mst_pci_device = mst_pci_device
|
||||
self.slot_id = slot_id
|
||||
self.sdk_index = sdk_index
|
||||
|
||||
def construct_dword(self, write_buffer):
|
||||
if len(write_buffer) == 0:
|
||||
return None
|
||||
|
||||
used_bytes_in_dword = len(write_buffer) % BYTES_IN_DWORD
|
||||
|
||||
res = "dword[0]=0x"
|
||||
for idx, x in enumerate(write_buffer):
|
||||
word = hex(x)[2:]
|
||||
|
||||
if (idx > 0) and (idx % BYTES_IN_DWORD) == 0:
|
||||
res += ",dword[{}]=0x".format(str((idx + 1)//BYTES_IN_DWORD))
|
||||
res += word.zfill(2)
|
||||
|
||||
if used_bytes_in_dword > 0:
|
||||
res += (BYTES_IN_DWORD - used_bytes_in_dword) * "00"
|
||||
return res
|
||||
|
||||
def write_mlxreg_eeprom(self, num_bytes, dword, device_address, page):
|
||||
if not dword:
|
||||
return False
|
||||
|
||||
try:
|
||||
cmd = "mlxreg -d /dev/mst/{} --reg_name MCIA --indexes \
|
||||
slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0 \
|
||||
--set {} -y".format(self.mst_pci_device, self.slot_id, self.sdk_index, device_address, page, num_bytes, dword)
|
||||
subprocess.check_call(cmd, shell=True, universal_newlines=True, stdout=subprocess.DEVNULL)
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.log_error("Error! Unable to write data dword={} for {} port, page {} offset {}, rc = {}, err msg: {}".format(dword, self.sdk_index, page, device_address, e.returncode, e.output))
|
||||
return False
|
||||
return True
|
||||
|
||||
def read_mlxred_eeprom(self, offset, page, num_bytes):
|
||||
try:
|
||||
cmd = "mlxreg -d /dev/mst/{} --reg_name MCIA --indexes \
|
||||
slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0 \
|
||||
--get".format(self.mst_pci_device, self.slot_id, self.sdk_index, offset, page, num_bytes)
|
||||
result = subprocess.check_output(cmd, universal_newlines=True, shell=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.log_error("Error! Unable to read data for {} port, page {} offset {}, rc = {}, err msg: {}".format(self.sdk_index, page, offset, e.returncode, e.output))
|
||||
return None
|
||||
return result
|
||||
|
||||
def parse_mlxreg_read_output(self, read_output, num_bytes):
|
||||
if not read_output:
|
||||
return None
|
||||
|
||||
res = ""
|
||||
dword_num = num_bytes // BYTES_IN_DWORD
|
||||
used_bytes_in_dword = num_bytes % BYTES_IN_DWORD
|
||||
arr = [value for value in read_output.split('\n') if value[0:5] == "dword"]
|
||||
for i in range(dword_num):
|
||||
dword = arr[i].split()[2]
|
||||
res += dword[2:]
|
||||
|
||||
if used_bytes_in_dword > 0:
|
||||
# Cut needed info and insert into final hex string
|
||||
# Example: 3 bytes : 0x12345600
|
||||
# ^ ^
|
||||
dword = arr[dword_num].split()[2]
|
||||
res += dword[2 : 2 + used_bytes_in_dword * 2]
|
||||
|
||||
return bytearray.fromhex(res) if res else None
|
||||
|
||||
class SdkHandleContext(object):
|
||||
def __init__(self):
|
||||
@ -310,6 +273,7 @@ class SFP(NvidiaSFPCommon):
|
||||
|
||||
self.slot_id = slot_id
|
||||
self.mst_pci_device = self.get_mst_pci_device()
|
||||
self._sfp_type_str = None
|
||||
|
||||
# get MST PCI device name
|
||||
def get_mst_pci_device(self):
|
||||
@ -335,6 +299,7 @@ class SFP(NvidiaSFPCommon):
|
||||
Re-initialize this SFP object when a new SFP inserted
|
||||
:return:
|
||||
"""
|
||||
self._sfp_type_str = None
|
||||
self.refresh_xcvr_api()
|
||||
|
||||
def get_presence(self):
|
||||
@ -344,34 +309,9 @@ class SFP(NvidiaSFPCommon):
|
||||
Returns:
|
||||
bool: True if device is present, False if not
|
||||
"""
|
||||
eeprom_raw = self.read_eeprom(0, 1)
|
||||
|
||||
eeprom_raw = self._read_eeprom(0, 1, log_on_error=False)
|
||||
return eeprom_raw is not None
|
||||
|
||||
# Read out any bytes from any offset
|
||||
def _read_eeprom_specific_bytes(self, offset, num_bytes):
|
||||
if offset + num_bytes > SFP_VENDOR_PAGE_START:
|
||||
logger.log_error("Error mismatch between page size and bytes to read (offset: {} num_bytes: {}) ".format(offset, num_bytes))
|
||||
return None
|
||||
|
||||
eeprom_raw = []
|
||||
ethtool_cmd = "ethtool -m sfp{} hex on offset {} length {}".format(self.index, offset, num_bytes)
|
||||
try:
|
||||
output = subprocess.check_output(ethtool_cmd,
|
||||
shell=True,
|
||||
universal_newlines=True)
|
||||
output_lines = output.splitlines()
|
||||
first_line_raw = output_lines[0]
|
||||
if "Offset" in first_line_raw:
|
||||
for line in output_lines[2:]:
|
||||
line_split = line.split()
|
||||
eeprom_raw = eeprom_raw + line_split[1:]
|
||||
except subprocess.CalledProcessError as e:
|
||||
return None
|
||||
|
||||
eeprom_raw = list(map(lambda h: int(h, base=16), eeprom_raw))
|
||||
return bytearray(eeprom_raw)
|
||||
|
||||
# read eeprom specfic bytes beginning from offset with size as num_bytes
|
||||
def read_eeprom(self, offset, num_bytes):
|
||||
"""
|
||||
@ -379,43 +319,37 @@ class SFP(NvidiaSFPCommon):
|
||||
Returns:
|
||||
bytearray, if raw sequence of bytes are read correctly from the offset of size num_bytes
|
||||
None, if the read_eeprom fails
|
||||
Example:
|
||||
mlxreg -d /dev/mst/mt52100_pciconf0 --reg_name MCIA --indexes slot_index=0,module=1,device_address=148,page_number=0,i2c_device_address=0x50,size=16,bank_number=0 -g
|
||||
Sending access register...
|
||||
Field Name | Data
|
||||
===================================
|
||||
status | 0x00000000
|
||||
slot_index | 0x00000000
|
||||
module | 0x00000001
|
||||
l | 0x00000000
|
||||
device_address | 0x00000094
|
||||
page_number | 0x00000000
|
||||
i2c_device_address | 0x00000050
|
||||
size | 0x00000010
|
||||
bank_number | 0x00000000
|
||||
dword[0] | 0x43726564
|
||||
dword[1] | 0x6f202020
|
||||
dword[2] | 0x20202020
|
||||
dword[3] | 0x20202020
|
||||
dword[4] | 0x00000000
|
||||
dword[5] | 0x00000000
|
||||
....
|
||||
16 bytes to read from dword -> 0x437265646f2020202020202020202020 -> Credo
|
||||
"""
|
||||
# recalculate offset and page. Use 'ethtool' if there is no need to read vendor pages
|
||||
if offset < SFP_VENDOR_PAGE_START:
|
||||
return self._read_eeprom_specific_bytes(offset, num_bytes)
|
||||
else:
|
||||
page = (offset - SFP_PAGE_SIZE) // SFP_UPPER_PAGE_OFFSET + 1
|
||||
# calculate offset per page
|
||||
device_address = (offset - SFP_PAGE_SIZE) % SFP_UPPER_PAGE_OFFSET + SFP_UPPER_PAGE_OFFSET
|
||||
return self._read_eeprom(offset, num_bytes)
|
||||
|
||||
if not self.mst_pci_device:
|
||||
def _read_eeprom(self, offset, num_bytes, log_on_error=True):
|
||||
"""Read eeprom specfic bytes beginning from a random offset with size as num_bytes
|
||||
|
||||
Args:
|
||||
offset (int): read offset
|
||||
num_bytes (int): read size
|
||||
log_on_error (bool, optional): whether log error when exception occurs. Defaults to True.
|
||||
|
||||
Returns:
|
||||
bytearray: the content of EEPROM
|
||||
"""
|
||||
_, page, page_offset = self._get_page_and_page_offset(offset)
|
||||
if not page:
|
||||
return None
|
||||
|
||||
mlxreg_mngr = MlxregManager(self.mst_pci_device, self.slot_id, self.sdk_index)
|
||||
read_output = mlxreg_mngr.read_mlxred_eeprom(device_address, page, num_bytes)
|
||||
return mlxreg_mngr.parse_mlxreg_read_output(read_output, num_bytes)
|
||||
try:
|
||||
with open(page, mode='rb', buffering=0) as f:
|
||||
f.seek(page_offset)
|
||||
content = f.read(num_bytes)
|
||||
if ctypes.get_errno() != 0:
|
||||
raise IOError(f'errno = {os.strerror(ctypes.get_errno())}')
|
||||
except (OSError, IOError) as e:
|
||||
if log_on_error:
|
||||
logger.log_error(f'Failed to read sfp={self.sdk_index} EEPROM page={page}, page_offset={page_offset}, \
|
||||
size={num_bytes}, offset={offset}, error = {e}')
|
||||
return None
|
||||
|
||||
return bytearray(content)
|
||||
|
||||
# write eeprom specfic bytes beginning from offset with size as num_bytes
|
||||
def write_eeprom(self, offset, num_bytes, write_buffer):
|
||||
@ -431,21 +365,28 @@ class SFP(NvidiaSFPCommon):
|
||||
logger.log_error("Error mismatch between buffer length and number of bytes to be written")
|
||||
return False
|
||||
|
||||
# recalculate offset and page
|
||||
if offset < SFP_PAGE_SIZE:
|
||||
page = 0
|
||||
device_address = offset
|
||||
else:
|
||||
page = (offset - SFP_PAGE_SIZE) // SFP_UPPER_PAGE_OFFSET + 1
|
||||
# calculate offset per page
|
||||
device_address = (offset - SFP_PAGE_SIZE) % SFP_UPPER_PAGE_OFFSET + SFP_UPPER_PAGE_OFFSET
|
||||
|
||||
if not self.mst_pci_device:
|
||||
page_num, page, page_offset = self._get_page_and_page_offset(offset)
|
||||
if not page:
|
||||
return False
|
||||
|
||||
mlxreg_mngr = MlxregManager(self.mst_pci_device, self.slot_id, self.sdk_index)
|
||||
dword = mlxreg_mngr.construct_dword(write_buffer)
|
||||
return mlxreg_mngr.write_mlxreg_eeprom(num_bytes, dword, device_address, page)
|
||||
try:
|
||||
if self._is_write_protected(page_num, page_offset, num_bytes):
|
||||
# write limited eeprom is not supported
|
||||
raise IOError('write limited bytes')
|
||||
|
||||
with open(page, mode='r+b', buffering=0) as f:
|
||||
f.seek(page_offset)
|
||||
ret = f.write(write_buffer[0:num_bytes])
|
||||
if ret != num_bytes:
|
||||
raise IOError(f'write return code = {ret}')
|
||||
if ctypes.get_errno() != 0:
|
||||
raise IOError(f'errno = {os.strerror(ctypes.get_errno())}')
|
||||
except (OSError, IOError) as e:
|
||||
data = ''.join('{:02x}'.format(x) for x in write_buffer)
|
||||
logger.log_error(f'Failed to write EEPROM data sfp={self.sdk_index} EEPROM page={page}, page_offset={page_offset}, size={num_bytes}, \
|
||||
offset={offset}, data = {data}, error = {e}')
|
||||
return False
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def mgmt_phy_mod_pwr_attr_get(cls, power_attr_type, sdk_handle, sdk_index, slot_id):
|
||||
@ -755,6 +696,109 @@ class SFP(NvidiaSFPCommon):
|
||||
error_description = "Unknow SFP module status ({})".format(oper_status)
|
||||
return error_description
|
||||
|
||||
def _get_eeprom_path(self):
|
||||
return SFP_EEPROM_ROOT_TEMPLATE.format(self.sdk_index)
|
||||
|
||||
def _get_page_and_page_offset(self, overall_offset):
|
||||
"""Get EEPROM page and page offset according to overall offset
|
||||
|
||||
Args:
|
||||
overall_offset (int): Overall read offset
|
||||
|
||||
Returns:
|
||||
tuple: (<page_num>, <page_path>, <page_offset>)
|
||||
"""
|
||||
eeprom_path = self._get_eeprom_path()
|
||||
if not os.path.exists(eeprom_path):
|
||||
logger.log_error(f'EEPROM file path for sfp {self.sdk_index} does not exist')
|
||||
return None, None, None
|
||||
|
||||
if overall_offset < SFP_PAGE_SIZE:
|
||||
return 0, os.path.join(eeprom_path, SFP_PAGE0_PATH), overall_offset
|
||||
|
||||
if self._get_sfp_type_str(eeprom_path) == SFP_TYPE_SFF8472:
|
||||
page1h_start = SFP_PAGE_SIZE * 2
|
||||
if overall_offset < page1h_start:
|
||||
return -1, os.path.join(eeprom_path, SFP_A2H_PAGE0_PATH), overall_offset - SFP_PAGE_SIZE
|
||||
else:
|
||||
page1h_start = SFP_PAGE_SIZE
|
||||
|
||||
page_num = (overall_offset - page1h_start) // SFP_UPPER_PAGE_OFFSET + 1
|
||||
page = f'{page_num}/data'
|
||||
offset = (overall_offset - page1h_start) % SFP_UPPER_PAGE_OFFSET
|
||||
return page_num, os.path.join(eeprom_path, page), offset
|
||||
|
||||
def _get_sfp_type_str(self, eeprom_path):
|
||||
"""Get SFP type by reading first byte of EEPROM
|
||||
|
||||
Args:
|
||||
eeprom_path (str): EEPROM path
|
||||
|
||||
Returns:
|
||||
str: SFP type in string
|
||||
"""
|
||||
if self._sfp_type_str is None:
|
||||
page = os.path.join(eeprom_path, SFP_PAGE0_PATH)
|
||||
try:
|
||||
with open(page, mode='rb', buffering=0) as f:
|
||||
id_byte_raw = bytearray(f.read(1))
|
||||
id = id_byte_raw[0]
|
||||
if id == 0x18 or id == 0x19 or id == 0x1e:
|
||||
self._sfp_type_str = SFP_TYPE_CMIS
|
||||
elif id == 0x11 or id == 0x0D:
|
||||
# in sonic-platform-common, 0x0D is treated as sff8436,
|
||||
# but it shared the same implementation on Nvidia platforms,
|
||||
# so, we treat it as sff8636 here.
|
||||
self._sfp_type_str = SFP_TYPE_SFF8636
|
||||
elif id == 0x03:
|
||||
self._sfp_type_str = SFP_TYPE_SFF8472
|
||||
else:
|
||||
logger.log_error(f'Unsupported sfp type {id}')
|
||||
except (OSError, IOError) as e:
|
||||
# SFP_EEPROM_NOT_AVAILABLE usually indicates SFP is not present, no need
|
||||
# print such error information to log
|
||||
if SFP_EEPROM_NOT_AVAILABLE not in str(e):
|
||||
logger.log_error(f'Failed to get SFP type, index={self.sdk_index}, error={e}')
|
||||
return None
|
||||
return self._sfp_type_str
|
||||
|
||||
def _is_write_protected(self, page, page_offset, num_bytes):
|
||||
"""Check if the EEPROM read/write operation hit limitation bytes
|
||||
|
||||
Args:
|
||||
page (str): EEPROM page path
|
||||
page_offset (int): EEPROM page offset
|
||||
num_bytes (int): read/write size
|
||||
|
||||
Returns:
|
||||
bool: True if the limited bytes is hit
|
||||
"""
|
||||
eeprom_path = self._get_eeprom_path()
|
||||
limited_data = limited_eeprom.get(self._get_sfp_type_str(eeprom_path))
|
||||
if not limited_data:
|
||||
return False
|
||||
|
||||
access_type = 'write'
|
||||
limited_data = limited_data.get(access_type)
|
||||
if not limited_data:
|
||||
return False
|
||||
|
||||
limited_ranges = limited_data.get(page)
|
||||
if not limited_ranges:
|
||||
return False
|
||||
|
||||
access_begin = page_offset
|
||||
access_end = page_offset + num_bytes - 1
|
||||
for limited_range in limited_ranges:
|
||||
if isinstance(limited_range, int):
|
||||
if access_begin <= limited_range <= access_end:
|
||||
return True
|
||||
else: # tuple
|
||||
if not (access_end < limited_range[0] or access_begin > limited_range[1]):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_rx_los(self):
|
||||
"""Accessing rx los is not supproted, return all False
|
||||
|
||||
@ -782,7 +826,7 @@ class SFP(NvidiaSFPCommon):
|
||||
"""
|
||||
if self._xcvr_api is None:
|
||||
self.refresh_xcvr_api()
|
||||
if self._xcvr_api is not None:
|
||||
if self._xcvr_api is not None:
|
||||
self._xcvr_api.get_rx_los = self.get_rx_los
|
||||
self._xcvr_api.get_tx_fault = self.get_tx_fault
|
||||
return self._xcvr_api
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -14,7 +14,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
import ctypes
|
||||
import os
|
||||
import pytest
|
||||
import shutil
|
||||
import sys
|
||||
if sys.version_info.major == 3:
|
||||
from unittest import mock
|
||||
@ -27,8 +30,6 @@ sys.path.insert(0, modules_path)
|
||||
|
||||
from sonic_platform.sfp import SFP, SX_PORT_MODULE_STATUS_INITIALIZING, SX_PORT_MODULE_STATUS_PLUGGED, SX_PORT_MODULE_STATUS_UNPLUGGED, SX_PORT_MODULE_STATUS_PLUGGED_WITH_ERROR, SX_PORT_MODULE_STATUS_PLUGGED_DISABLED
|
||||
from sonic_platform.chassis import Chassis
|
||||
from sonic_platform.sfp import MlxregManager
|
||||
from tests.input_platform import output_sfp
|
||||
|
||||
|
||||
class TestSfp:
|
||||
@ -86,31 +87,61 @@ class TestSfp:
|
||||
|
||||
assert description == expected_description
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP.get_mst_pci_device', mock.MagicMock(return_value="pciconf"))
|
||||
@mock.patch('sonic_platform.sfp.MlxregManager.write_mlxreg_eeprom', mock.MagicMock(return_value=True))
|
||||
def test_sfp_write_eeprom(self):
|
||||
mlxreg_mngr = MlxregManager("", 0, 0)
|
||||
write_buffer = bytearray([1,2,3,4])
|
||||
offset = 793
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_page_and_page_offset')
|
||||
@mock.patch('sonic_platform.sfp.SFP._is_write_protected')
|
||||
def test_sfp_write_eeprom(self, mock_limited_eeprom, mock_get_page):
|
||||
sfp = SFP(0)
|
||||
sfp.write_eeprom(offset, 4, write_buffer)
|
||||
MlxregManager.write_mlxreg_eeprom.assert_called_with(4, output_sfp.write_eeprom_dword1, 153, 5)
|
||||
assert not sfp.write_eeprom(0, 1, bytearray())
|
||||
|
||||
offset = 641
|
||||
write_buffer = bytearray([1,2,3,4,5,6])
|
||||
sfp.write_eeprom(offset, 6, write_buffer)
|
||||
MlxregManager.write_mlxreg_eeprom.assert_called_with(6, output_sfp.write_eeprom_dword2, 129, 4)
|
||||
mock_get_page.return_value = (None, None, None)
|
||||
assert not sfp.write_eeprom(0, 1, bytearray([1]))
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP.get_mst_pci_device', mock.MagicMock(return_value="pciconf"))
|
||||
@mock.patch('sonic_platform.sfp.MlxregManager.read_mlxred_eeprom', mock.MagicMock(return_value=output_sfp.read_eeprom_output))
|
||||
def test_sfp_read_eeprom(self):
|
||||
mlxreg_mngr = MlxregManager("", 0, 0)
|
||||
offset = 644
|
||||
mock_get_page.return_value = (0, '/tmp/mock_page', 0)
|
||||
mock_limited_eeprom.return_value = True
|
||||
assert not sfp.write_eeprom(0, 1, bytearray([1]))
|
||||
|
||||
mock_limited_eeprom.return_value = False
|
||||
mo = mock.mock_open()
|
||||
print('after mock open')
|
||||
with mock.patch('sonic_platform.sfp.open', mo):
|
||||
handle = mo()
|
||||
handle.write.return_value = 1
|
||||
assert sfp.write_eeprom(0, 1, bytearray([1]))
|
||||
|
||||
handle.seek.assert_called_once_with(0)
|
||||
handle.write.assert_called_once_with(bytearray([1]))
|
||||
handle.write.return_value = -1
|
||||
assert not sfp.write_eeprom(0, 1, bytearray([1]))
|
||||
|
||||
handle.write.return_value = 1
|
||||
ctypes.set_errno(1)
|
||||
assert not sfp.write_eeprom(0, 1, bytearray([1]))
|
||||
ctypes.set_errno(0)
|
||||
|
||||
handle.write.side_effect = OSError('')
|
||||
assert not sfp.write_eeprom(0, 1, bytearray([1]))
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP.get_mst_pci_device', mock.MagicMock(return_value = None))
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_page_and_page_offset')
|
||||
def test_sfp_read_eeprom(self, mock_get_page):
|
||||
sfp = SFP(0)
|
||||
assert output_sfp.y_cable_part_number == sfp.read_eeprom(offset, 16).decode()
|
||||
MlxregManager.read_mlxred_eeprom.assert_called_with(132, 4, 16)
|
||||
mock_get_page.return_value = (None, None, None)
|
||||
assert sfp.read_eeprom(0, 1) is None
|
||||
|
||||
mock_get_page.return_value = (0, '/tmp/mock_page', 0)
|
||||
mo = mock.mock_open()
|
||||
with mock.patch('sonic_platform.sfp.open', mo):
|
||||
handle = mo()
|
||||
handle.read.return_value = b'\x00'
|
||||
assert sfp.read_eeprom(0, 1) == bytearray([0])
|
||||
handle.seek.assert_called_once_with(0)
|
||||
|
||||
ctypes.set_errno(1)
|
||||
assert sfp.read_eeprom(0, 1) is None
|
||||
ctypes.set_errno(0)
|
||||
|
||||
handle.read.side_effect = OSError('')
|
||||
assert sfp.read_eeprom(0, 1) is None
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP._fetch_port_status')
|
||||
def test_is_port_admin_status_up(self, mock_port_status):
|
||||
@ -120,6 +151,80 @@ class TestSfp:
|
||||
mock_port_status.return_value = (0, False)
|
||||
assert not SFP.is_port_admin_status_up(None, None)
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_eeprom_path', mock.MagicMock(return_value = None))
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_sfp_type_str')
|
||||
def test_is_write_protected(self, mock_get_type_str):
|
||||
sfp = SFP(0)
|
||||
mock_get_type_str.return_value = 'cmis'
|
||||
assert sfp._is_write_protected(page=0, page_offset=26, num_bytes=1)
|
||||
assert not sfp._is_write_protected(page=0, page_offset=27, num_bytes=1)
|
||||
|
||||
# not exist page
|
||||
assert not sfp._is_write_protected(page=3, page_offset=0, num_bytes=1)
|
||||
|
||||
# invalid sfp type str
|
||||
mock_get_type_str.return_value = 'invalid'
|
||||
assert not sfp._is_write_protected(page=0, page_offset=0, num_bytes=1)
|
||||
|
||||
def test_get_sfp_type_str(self):
|
||||
sfp = SFP(0)
|
||||
expect_sfp_types = ['cmis', 'sff8636', 'sff8472']
|
||||
mock_eeprom_path = '/tmp/mock_eeprom'
|
||||
mock_dir = '/tmp/mock_eeprom/0/i2c-0x50'
|
||||
os.makedirs(os.path.join(mock_dir), exist_ok=True)
|
||||
for expect_sfp_type in expect_sfp_types:
|
||||
source_eeprom_file = os.path.join(test_path, 'input_platform', expect_sfp_type + '_page0')
|
||||
shutil.copy(source_eeprom_file, os.path.join(mock_dir, 'data'))
|
||||
assert sfp._get_sfp_type_str(mock_eeprom_path) == expect_sfp_type
|
||||
sfp._sfp_type_str = None
|
||||
|
||||
os.system('rm -rf {}'.format(mock_eeprom_path))
|
||||
assert sfp._get_sfp_type_str('invalid') is None
|
||||
|
||||
@mock.patch('os.path.exists')
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_eeprom_path')
|
||||
@mock.patch('sonic_platform.sfp.SFP._get_sfp_type_str')
|
||||
def test_get_page_and_page_offset(self, mock_get_type_str, mock_eeprom_path, mock_path_exists):
|
||||
sfp = SFP(0)
|
||||
mock_path_exists.return_value = False
|
||||
page_num, page, page_offset = sfp._get_page_and_page_offset(0)
|
||||
assert page_num is None
|
||||
assert page is None
|
||||
assert page_offset is None
|
||||
|
||||
mock_path_exists.return_value = True
|
||||
mock_eeprom_path.return_value = '/tmp'
|
||||
page_num, page, page_offset = sfp._get_page_and_page_offset(255)
|
||||
assert page_num == 0
|
||||
assert page == '/tmp/0/i2c-0x50/data'
|
||||
assert page_offset is 255
|
||||
|
||||
mock_get_type_str.return_value = 'cmis'
|
||||
page_num, page, page_offset = sfp._get_page_and_page_offset(256)
|
||||
assert page_num == 1
|
||||
assert page == '/tmp/1/data'
|
||||
assert page_offset is 0
|
||||
|
||||
mock_get_type_str.return_value = 'sff8472'
|
||||
page_num, page, page_offset = sfp._get_page_and_page_offset(511)
|
||||
assert page_num == -1
|
||||
assert page == '/tmp/0/i2c-0x51/data'
|
||||
assert page_offset is 255
|
||||
|
||||
page_num, page, page_offset = sfp._get_page_and_page_offset(512)
|
||||
assert page_num == 1
|
||||
assert page == '/tmp/1/data'
|
||||
assert page_offset is 0
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP._read_eeprom')
|
||||
def test_get_presence(self, mock_read_eeprom):
|
||||
sfp = SFP(0)
|
||||
mock_read_eeprom.return_value = None
|
||||
assert not sfp.get_presence()
|
||||
|
||||
mock_read_eeprom.return_value = bytearray([1])
|
||||
assert sfp.get_presence()
|
||||
|
||||
@mock.patch('sonic_platform.sfp.SFP.get_xcvr_api')
|
||||
def test_dummy_apis(self, mock_get_xcvr_api):
|
||||
mock_api = mock.MagicMock()
|
||||
|
Loading…
Reference in New Issue
Block a user