sonic-buildimage/dockers/docker-pde/stress-sfp-i2c.py

155 lines
4.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python
try:
import sys
import time
import imp
from natsort import natsorted
except ImportError as e:
raise ImportError("%s - required module not found" % str(e))
HWSKU_PATH = "/usr/share/sonic/hwsku"
PLATFORM_PATH = "/usr/share/sonic/platform"
PLATFORM_SPECIFIC_MODULE_NAME = 'sfputil'
PLATFORM_SPECIFIC_CLASS_NAME = 'SfpUtil'
XCVR_EEPROM_TYPE_UNKNOWN = 0
XCVR_EEPROM_TYPE_SFP = 1
XCVR_EEPROM_TYPE_QSFP = 2
XCVR_EEPROM_TYPE_QSFPDD = 3
XCVR_EEPROM_TYPE_OSFP = XCVR_EEPROM_TYPE_QSFPDD
OSFP_TYPE_ID = "18"
# Global platform-specific sfputil class instance
platform_sfputil = None
# Global port dictionaries
port_dict = None
# Loads platform specific sfputil module from source
def load_platform_sfputil():
global platform_sfputil
if platform_sfputil is not None:
return
try:
module_file = "/".join([PLATFORM_PATH, "plugins", PLATFORM_SPECIFIC_MODULE_NAME + ".py"])
module = imp.load_source(PLATFORM_SPECIFIC_MODULE_NAME, module_file)
except IOError, e:
print("Failed to load platform module '%s': %s" % (PLATFORM_SPECIFIC_MODULE_NAME, str(e)), True)
assert module is not None
try:
platform_sfputil_class = getattr(module, PLATFORM_SPECIFIC_CLASS_NAME)
platform_sfputil = platform_sfputil_class()
except AttributeError, e:
print("Failed to instantiate '%s' class: %s" % (PLATFORM_SPECIFIC_CLASS_NAME, str(e)), True)
assert platform_sfputil is not None
return
# Loads platform port dictionaries
def load_platform_portdict():
global port_dict
if port_dict is not None:
return
port_dict = {}
idx = 0
file = open(HWSKU_PATH + "/port_config.ini", "r")
line = file.readline()
while line is not None and len(line) > 0:
line = line.strip()
if line.startswith("#"):
line = file.readline()
continue
list = line.split()
if len(list) >= 4:
idx = int(list[3])
port_dict[list[0]] = { "index": str(idx) }
idx += 1
line = file.readline()
return port_dict
def get_sfp_eeprom_type(port, data):
type = XCVR_EEPROM_TYPE_UNKNOWN
if (port in platform_sfputil.osfp_ports) or (port in platform_sfputil.qsfp_ports):
if data is None:
return type
if data[0] in OSFP_TYPE_ID:
code = 0
for i in range(128, 222):
code += int(data[i], 16)
if (code & 0xff) == int(data[222], 16):
type = XCVR_EEPROM_TYPE_OSFP
else:
type = XCVR_EEPROM_TYPE_QSFP
else:
type = XCVR_EEPROM_TYPE_QSFP
else:
type = XCVR_EEPROM_TYPE_SFP
return type
# Test for SFP EEPROM
def stress_sfp_i2c_one():
load_platform_sfputil()
load_platform_portdict()
num_sfp = 0
for intf in natsorted(port_dict.keys()):
port = int(port_dict[intf]['index'])
if not platform_sfputil._is_valid_port(port):
continue
if platform_sfputil.get_presence(port):
num_sfp += 1
assert num_sfp >= 2, "2 or more SFP modules should be attached for this test"
for intf in natsorted(port_dict.keys()):
port = int(port_dict[intf]['index'])
if not platform_sfputil._is_valid_port(port):
continue
if not platform_sfputil.get_presence(port):
continue
data = platform_sfputil.get_eeprom_raw(port, 256)
assert data is not None, "SFP{}: unable to read EEPROM".format(port)
code = 0
type = get_sfp_eeprom_type(port, data)
if type == XCVR_EEPROM_TYPE_QSFPDD:
for i in range(128, 222):
code += int(data[i], 16)
assert (code & 0xff) == int(data[222], 16), "{}: check code error".format(intf)
elif type == XCVR_EEPROM_TYPE_QSFP:
for i in range(128, 191):
code += int(data[i], 16)
assert (code & 0xff) == int(data[191], 16), "{}: check code error".format(intf)
else:
for i in range(0, 63):
code += int(data[i], 16)
assert (code & 0xff) == int(data[63], 16), "{}: check code error".format(intf)
def stress_sfp_i2c(sec=180):
print("Initiating {} seconds SFP I2C stress test...".format(sec))
timeout = time.time() + sec
while timeout >= time.time():
stress_sfp_i2c_one()
sys.stdout.write("#")
sys.stdout.flush()
print("\nPASS")
if __name__ == '__main__':
sec = 180
if len(sys.argv) >= 2:
sec = int(sys.argv[1])
stress_sfp_i2c(sec)