[sonic-pit] Add PIT(Platform Integration Test) feature, second part, … (#12530)

* [sonic-pit] Add PIT(Platform Integration Test) feature, second part, add 6 test cases.

Signed-off-by: Li Hua <guizhao.lh@alibaba-inc.com>

* Add missing test case configuration and platform configuration.

Signed-off-by: Li Hua <guizhao.lh@alibaba-inc.com>

* Remove unsed comment, replace duplicated function with import from other moduls.

---------

Signed-off-by: Li Hua <guizhao.lh@alibaba-inc.com>
This commit is contained in:
Clark Lee 2023-06-27 22:09:23 +08:00 committed by GitHub
parent 2047e5c6ed
commit 8b21b612ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1022 additions and 0 deletions

View File

@ -0,0 +1,6 @@
{
"name": "cpu-test",
"description": "Check CPU information",
"type": "auto",
"tags": ["manufacture", "delivery", "pa", "power", "emc"]
}

View File

@ -0,0 +1,6 @@
{
"name": "memory-test",
"description": "Check memory and pattern test",
"type": "auto",
"tags": ["manufacture", "delivery", "pa", "power", "emc"]
}

View File

@ -0,0 +1,6 @@
{
"name": "oob-test",
"description": "l2 mgmt switch test",
"type": "auto",
"tags": ["manufacture", "delivery", "pa"]
}

View File

@ -0,0 +1,6 @@
{
"name": "rtc-test",
"description": "Check RTC function",
"type": "auto",
"tags": ["manufacture", "delivery", "pa", "emc"]
}

View File

@ -0,0 +1,6 @@
{
"name": "sensor-test",
"description": "Check sensors health",
"type": "auto",
"tags": ["manufacture", "delivery", "pa", "power", "emc"]
}

View File

@ -0,0 +1,6 @@
{
"name": "ssd-test",
"description": "Check SSD capacity",
"type": "auto",
"tags": ["manufacture", "delivery", "pa", "emc", "power"]
}

View File

@ -0,0 +1,52 @@
{
"fan_info":{
"position": "bmc",
"count": 6,
"direction": "in",
"ratio_target": [10, 80, 20],
"speed_tolerance": 1000,
"speed_max": 20000,
"speed_min": 0,
"motor_count": 2
},
"psu_info":{
"position": "bmc",
"count": 2,
"in_power_min": 0,
"in_power_max": 0,
"in_vol_min": 0,
"in_vol_max": 0,
"in_curr_min": 0,
"in_curr_max": 0,
"out_power_min": 0,
"out_power_max": 0,
"out_vol_min": 0,
"out_vol_max": 0,
"out_curr_min": 0,
"out_curr_max": 0
},
"cpu_info": {
"Model name": "Intel(R) Xeon(R) CPU D-1533N @ 2.10GHz",
"BogoMIPS": 4189.0,
"CPU(s)": 6,
"CPU MHz": 2100.0
},
"memory_free_size": 100,
"rtc_info":{
"delay_time": 5,
"max_time_diff": 1
},
"ssd_test_size": "100M",
"ssd_bom": [
{
"model": "AF2MA31DTDLT240A",
"size": "240 GB"
},
{
"model": "MTFDDAV240TDS",
"size": "240 GB"
}
],
"server_ip": "192.0.0.3",
"bmc_ip": "240.1.1.1"
}

View File

@ -0,0 +1,10 @@
{
"test_cases": [
"fan_tc",
"psu_tc",
"cpu_tc",
"memory_tc",
"rtc_tc",
"sensor_tc"
]
}

View File

@ -0,0 +1,107 @@
from function import run_command
from test_case import TestCaseCommon
from errcode import E
import traceback
# CPU test class
class CPUTC(TestCaseCommon):
def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
MODULE_NAME = "cpu_tc"
TestCaseCommon.__init__(self, index, MODULE_NAME, logger, platform_cfg_file, case_cfg_file)
self.cpu_info_dict = None
try:
if self.platform_cfg_json and 'cpu_info' in self.platform_cfg_json.keys():
self.cpu_info_dict = self.platform_cfg_json['cpu_info']
except Exception as e:
self.logger.log_err(str(e), True)
self.logger.log_err(traceback.format_exc())
def test_cpu_info(self, also_print_console=False):
ret = E.OK
self.logger.log_info("check_cpu_info start", also_print_console)
cmd = "lscpu | head -n25"
status, log = run_command(cmd)
if status != 0 or len(log) <= 0:
reason = "Failed, get cpu info failed, command {}, status {}, log {}".format( \
cmd, status, log)
self.log_reason(reason)
ret = E.ECPU3005
else:
lines = log.splitlines()
expected_cpu_model = self.cpu_info_dict.get('Model name')
expected_bogomips = self.cpu_info_dict.get('BogoMIPS')
expected_cpu_num = self.cpu_info_dict.get('CPU(s)')
expected_cpu_mhz = self.cpu_info_dict.get('CPU MHz')
self.logger.log_dbg("Expected value: {}, {}, {}, {}".format(expected_cpu_model, \
expected_bogomips, expected_cpu_num, expected_cpu_mhz))
for line in lines:
cols = line.strip().split(":")
if len(cols) < 2:
continue
if expected_cpu_model and cols[0] == "Model name":
if cols[1].strip() != expected_cpu_model:
reason = "Failed, CPU model name {}(expected {})".format( \
cols[1].strip(), expected_cpu_model)
self.log_reason(reason)
ret = E.ECPU3001
else:
msg = "Model name {} =======> OK".format(cols[1].strip())
self.logger.log_info(msg)
if expected_bogomips and cols[0] == 'BogoMIPS':
read_bogomips = float(cols[1].strip())
conf_bogomips = float(expected_bogomips)
if read_bogomips <= (conf_bogomips * 0.99) or \
read_bogomips >= conf_bogomips * 1.01:
reason = "Failed, BogoMIPS {}(expected {})".format( \
read_bogomips, expected_bogomips)
self.log_reason(reason)
ret = E.ECPU3001
else:
msg = "BogoMIPS {} ===== OK".format(read_bogomips)
self.logger.log_info(msg)
if expected_cpu_num and cols[0] == 'CPU(s)':
num_cpus = int(cols[1].strip())
if num_cpus != self.cpu_info_dict.get('CPU(s)'):
reason = "Failed, CPU number {}(expected {})".format( \
num_cpus, expected_cpu_num)
self.fail_reason.append(reason)
ret = E.ECPU3001
else:
msg = "Number of CPUs {} ===== OK".format(num_cpus)
self.logger.log_info(msg)
if expected_cpu_mhz and cols[0] == 'CPU MHz':
read_cpu_mhz = float(cols[1].strip())
conf_cpu_mhz = float(expected_cpu_mhz)
if read_cpu_mhz <= (conf_cpu_mhz * 0.99) or \
read_cpu_mhz >= (conf_cpu_mhz * 1.01):
reason = "Failed, CPU MHz {}(expected {})".format( \
read_cpu_mhz, expected_cpu_mhz)
self.log_reason(reason)
ret = E.ECPU3001
else:
msg = "CPU frequency {} ===== OK".format(read_cpu_mhz)
self.logger.log_info(msg)
if ret != E.OK:
self.logger.log_err("test cpu info done, FAILED.", also_print_console)
else:
self.logger.log_info("test cpu info done, PASS.", also_print_console)
return ret
def run_test(self, *argv):
try:
ret = self.test_cpu_info(True)
return ret
except Exception as e:
self.logger.log_err("test cpu info got exception: {}".format(str(e)))
self.logger.log_err(traceback.format_exc())
return ret
return E.OK

View File

@ -0,0 +1,188 @@
from function import run_command
from test_case import TestCaseCommon
from errcode import E
import traceback
X86_ARCH_LIST = ["x86", "x86_64", "amd", "amd64"]
ARM_ARCH_LIST = ["arm", "arm64"]
# memory test class
class MEMORYTC(TestCaseCommon):
def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
MODULE_NAME = "memory_tc"
TestCaseCommon.__init__(self, index, MODULE_NAME, logger, platform_cfg_file, case_cfg_file)
self.arch = "x86" # default arch
self.memory_bom_list = None # default conf
self.free_mem_size = 100 # free memory size in kB. if free mem is less than free_mem_size, fail.
try:
if self.platform_cfg_json and 'memory_bom' in self.platform_cfg_json.keys():
self.memory_bom_list = self.platform_cfg_json['memory_bom']
if self.platform_cfg_json and 'arch' in self.platform_cfg_json.keys():
self.arch = self.platform_cfg_json['arch']
if self.platform_cfg_json and 'memory_free_size' in self.platform_cfg_json.keys():
self.free_mem_size = int(self.platform_cfg_json['memory_free_size'])
except Exception as e:
self.logger.log_err(str(e), True)
def _memory_info_check_by_dmidecode(self):
ret = E.OK
pn_list = []
vendor_list = []
status, out = run_command("dmidecode -t 17")
if status != 0 or len(out) <= 0:
self.fail_reason.append("dmidecode exec failed.")
ret = E.EMEM4001
else:
for item in out.splitlines():
self.logger.log_info(item)
if ":" in item:
key = item.split(":")[0].strip()
value = item.split(":")[1].strip()
if key == 'Part Number' and value != 'NO DIMM':
pn_list.append(value)
if key == 'Manufacturer' and value != 'NO DIMM':
vendor_list.append(value)
# memory bom check
if self.memory_bom_list:
memory_matched = False
for memory_bom in self.memory_bom_list:
if memory_bom["manufacturer"] in vendor_list and memory_bom["pn"] in pn_list:
memory_matched = True
break
if not memory_matched:
ret = E.EMEM4001
self.fail_reason.append("memory not matched")
return ret
def _arm_memory_ecc_check(self):
return E.OK
def _x86_memory_ecc_check(self):
status, out = run_command("edac-util -v")
self.logger.log_info(out)
if status:
self.fail_reason.append("memort ecc occured")
return E.EMEM4003
else:
return E.OK
def test_memory_info(self):
self.logger.log_info("test memory info start")
if self.arch in X86_ARCH_LIST:
ret = self._memory_info_check_by_dmidecode()
elif self.arch in ARM_ARCH_LIST:
ret = E.OK
if ret != E.OK:
self.logger.log_err("test memory info done, FAILED.")
else:
self.logger.log_err("test memory info done, PASS.")
return ret
def test_memory_capacity(self):
self.logger.log_info("test memory capacity start")
ret = E.OK
status, out = run_command("free -t")
self.logger.log_info(out)
if status:
self.log_reason("exec failed, cmd: free -t")
ret = E.EIO
else:
for line in out.splitlines():
if line.find("Total") >= 0:
free_mem = line.split()[3]
if int(free_mem) < self.free_mem_size:
self.log_reason("free memory less than {}kB".format(self.free_mem_size))
ret = E.EMEM4004
if ret != E.OK:
self.logger.log_err("test memory capacity done, FAILED.")
else:
self.logger.log_err("test memory capacity done, PASS.")
return ret
def test_memory_stress(self):
self.logger.log_info("test memory stress start")
ret = E.OK
status, out = run_command("memtester 1M 1")
self.logger.log_info(out)
if status:
reason = "exec failed, cmd: memtester 1M 1"
self.log_reason(reason)
ret = E.EMEM4002
if ret != E.OK:
self.logger.log_err("test memory stress done, FAILED!")
else:
self.logger.log_err("test memory stress done, PASS!")
return ret
def test_memory_ecc(self):
self.logger.log_info("test memory ecc start")
if self.arch in X86_ARCH_LIST:
ret = self._x86_memory_ecc_check()
elif self.arch in ARM_ARCH_LIST:
ret = self._arm_memory_ecc_check()
if ret != E.OK:
self.logger.log_err("test memory stress done, FAILED.")
else:
self.logger.log_err("test memory stress done, PASS.")
return ret
def run_test(self, *argv):
final_ret = E.OK
try:
ret = self.test_memory_info()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "test memory info exception {}, FAILED".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
final_ret = E.EFAIL
try:
ret = self.test_memory_capacity()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "test memory capacity exception {}, FAILED".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
final_ret = E.EFAIL
try:
ret = self.test_memory_stress()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "test memory by memtester exception {}, FAILED".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
final_ret = E.EFAIL
try:
ret = self.test_memory_ecc()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "test memory ecc exception {}, FAILED".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
final_ret = E.EFAIL
return final_ret

View File

@ -0,0 +1,140 @@
import sys
from test_case import TestCaseCommon
from errcode import E
from function import run_command, load_platform_util_module
import traceback
class OOBTC(TestCaseCommon):
__PLATFORM_SPECIFIC_MODULE_NAME = "bmcutil"
__PLATFORM_SPECIFIC_CLASS_NAME = "BmcUtil"
def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
MODULE_NAME = "oob_tc"
TestCaseCommon.__init__(self, index, MODULE_NAME, logger, platform_cfg_file, case_cfg_file)
self.server_ip = None # external ip
self.bmc_ip = None # BMC internal ip
try:
if self.platform_cfg_json and 'server_ip' in self.platform_cfg_json.keys():
self.server_ip = self.platform_cfg_json['server_ip']
if self.platform_cfg_json and 'bmc_ip' in self.platform_cfg_json.keys():
self.bmc_ip = self.platform_cfg_json['bmc_ip']
except Exception as e:
self.logger.log_err(str(e))
self.logger.log_err(traceback.format_exc())
bmc_module = load_platform_util_module(self.__PLATFORM_SPECIFIC_MODULE_NAME)
try:
bmc_util_class = getattr(bmc_module, self.__PLATFORM_SPECIFIC_CLASS_NAME)
self.bmc_util = bmc_util_class()
except AttributeError as e:
self.logger.log_err(str(e), True)
sys.exit(1)
def cpu_ping_bmc_test(self, also_print_console=True):
self.logger.log_info("cpu ping bmc test start")
ret = E.OK
if not self.bmc_ip:
self.logger.log_dbg("BMC not exist, skip")
else:
count = 5
cmd = "ping %s -c %d -I eth0.4088 | grep received" % (self.bmc_ip, count)
status, output = run_command(cmd)
self.logger.log_info(output, also_print_console)
if output.find(" 0% packet loss") > 0:
ret = E.OK
else:
self.log_reason("cpu ping bmc lost packages")
ret = E.EMGMT11003
if ret != E.OK:
self.logger.log_err("cpu ping bmc test done, FAILED.", also_print_console)
else:
self.logger.log_err("cpu ping bmc test done, PASS.", also_print_console)
return ret
def cpu_ping_server_test(self, also_print_console=True):
self.logger.log_info("cpu ping server test start")
ret = E.OK
if not self.server_ip:
self.logger.log_dbg("External server ip not set, skip")
else:
count = 5
ping_cmd = "ping %s -c %d -I eth0 | grep received" % (self.server_ip, count)
status, output = run_command(ping_cmd)
self.logger.log_info(output, also_print_console)
if output.find(" 0% packet loss") > 0:
ret = E.OK
else:
self.log_reason("cpu ping server lost packages")
ret = E.EMGMT11002
if ret != E.OK:
self.logger.log_err("FAIL!", also_print_console)
else:
self.logger.log_info("PASS.", also_print_console)
return ret
def bmc_ping_server_test(self, also_print_console=True):
self.logger.log_info("bmc ping server test start")
count = 5
ping_cmd = "ping %s -c %d -I eth0 | grep received" % (self.server_ip, count)
try:
status, output = self.bmc_util.exec_raw_cmd(ping_cmd)
self.logger.log_info(output[0], also_print_console)
if status and output[0].find(" 0% packet loss") > 0:
ret = E.OK
else:
self.log_reason.append("bmc ping server lost packages")
ret = E.EMGMT11004
except Exception as e:
self.log_reason("bmc ping server exception: {}".format(str(e)))
self.logger.log_err(traceback.format_exc())
ret = E.EMGMT11004
if ret != E.OK:
self.logger.log_err("FAIL!", also_print_console)
else:
self.logger.log_info("PASS.", also_print_console)
return ret
def run_test(self, *argv):
final_ret = E.OK
try:
ret = self.cpu_ping_bmc_test()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "cpu ping bmc test exception: {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
try:
ret = self.cpu_ping_server_test()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "cpu ping server test exception: {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
try:
ret = self.bmc_ping_server_test()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "bmc ping server test exception: {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
return ret

View File

@ -0,0 +1,144 @@
# -*- coding:utf-8
import time
from test_case import TestCaseCommon
from function import run_command
from errcode import E
import traceback
class RTCTC(TestCaseCommon):
def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
MODULE_NAME = "rtc_tc"
TestCaseCommon.__init__(self, index, MODULE_NAME, logger,
platform_cfg_file, case_cfg_file)
self.rtc_info_dict = None
try:
if self.platform_cfg_json and \
"rtc_info" in self.platform_cfg_json.keys():
self.rtc_info_dict = self.platform_cfg_json["rtc_info"]
except Exception as e:
self.logger.log_err(str(e), True)
def read_rtc_test(self, also_print_console=False):
ret = E.OK
self.logger.log_info("read_rtc_test start")
cmd = "hwclock -r"
code, out = run_command(cmd)
if code:
reason = "Failed to exec 'hwclock -r'"
ret = E.ERTC12001
self.log_reason(reason)
self.logger.log_dbg(out, also_print_console)
if ret != E.OK:
self.logger.log_info("read_rtc_test done, FAILED.")
else:
self.logger.log_info("read_rtc_test done, PASS.")
return ret
def rtc_precision_test(self, wait_time=5, also_print_console=False):
ret = E.OK
rtc_since_epoch_file = "/sys/class/rtc/rtc0/since_epoch"
self.logger.log_info("rtc_precision_test start")
try:
with open(rtc_since_epoch_file, "r") as f:
start_sec = int(f.read())
timeArraystart = time.localtime(start_sec)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArraystart)
log_msg = "rtc time: {}".format(otherStyleTime)
self.logger.log_info(log_msg, also_print_console)
log_msg = "system time: {}".format(
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
self.logger.log_info(log_msg, also_print_console)
self.logger.log_info("time sleep: " + str(wait_time), also_print_console)
time.sleep(wait_time)
with open(rtc_since_epoch_file, "r") as f:
end_sec = int(f.read())
timeArrayend = time.localtime(end_sec)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArrayend)
log_msg = "rtc time: {}".format(otherStyleTime)
self.logger.log_info(log_msg, also_print_console)
log_msg = "system time: {}".format(
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
self.logger.log_info(log_msg, also_print_console)
timeCompare = end_sec - start_sec
self.logger.log_info("time difference: " + str(timeCompare),
also_print_console)
if timeCompare < (wait_time - 1) or timeCompare > (wait_time + 1):
self.log_reason("{} beyond {}".format(timeCompare, wait_time))
ret = E.ERTC12002
except IOError as e:
self.fail_reason.append(str(e))
ret = E.ERTC12001
if ret != E.OK:
self.logger.log_info("rtc_precision_test done, FAILED.")
else:
self.logger.log_info("rtc_precision_test done, PASS.")
return ret
def rtc_functional_test(self, also_print_console=False):
self.logger.log_info("rtc_functional_test start")
current_secs_before = int(time.time())
delay_interval = self.rtc_info_dict["delay_time"]
self.logger.log_info(
"please waiting {} sec".format(delay_interval), also_print_console)
time.sleep(delay_interval)
time_end = time.strftime("%Y-%m-%d %H:%M:%S")
self.logger.log_info("current time: %s" % time_end, also_print_console)
current_secs_after = int(time.time())
delta_interval = current_secs_after - current_secs_before
if abs(delta_interval - delay_interval) > self.rtc_info_dict["max_time_diff"]:
self.log_reason("time out of sync")
ret = E.ERTC12002
else:
ret = E.OK
if ret != E.OK:
self.logger.log_err("rtc_functional_test FAILED")
else:
self.logger.log_info("rtc_functional_test PASS.")
return ret
def run_test(self, *argv):
# RTC functional test
final_result = E.OK
try:
ret = self.rtc_functional_test()
if ret != E.OK:
final_result = ret
except Exception as e:
reason = "Failed, {} rtc_functional_test exception: {}".format(self.get_tc_name(), str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
final_result = E.EFAIL
# RTC read test
try:
ret = self.read_rtc_test()
if ret != E.OK:
final_result = ret
except Exception as e:
reason = "Failed, {} read_rtc_test exception: {}".format(self.get_tc_name(), str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
final_result = E.EFAIL
# RTC precision test
try:
ret = self.rtc_precision_test(5)
if ret != E.OK:
final_result = ret
except Exception as e:
reason = "Failed, {} rtc_precision_test exception: {}".format(self.get_tc_name(), str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
final_result = E.EFAIL
return final_result

View File

@ -0,0 +1,121 @@
import sys
from tabulate import tabulate
from test_case import TestCaseCommon
from function import load_platform_util_module
from errcode import E
import traceback
class SENSORTC(TestCaseCommon):
__PLATFORM_SPECIFIC_MODULE_NAME = "sensorutil"
__PLATFORM_SPECIFIC_CLASS_NAME = "SensorUtil"
def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
MODULE_NAME = "sensor_tc"
self.sensor_util = None
TestCaseCommon.__init__(self, index, MODULE_NAME, logger, platform_cfg_file, case_cfg_file)
sensor_module = load_platform_util_module(self.__PLATFORM_SPECIFIC_MODULE_NAME)
try:
platform_util_class = getattr(sensor_module, self.__PLATFORM_SPECIFIC_CLASS_NAME)
self.sensor_util = platform_util_class()
except AttributeError as e:
self.logger.log_err(str(e), True)
sys.exit(1)
def load_sensor_info(self):
sensor_dict = {}
if self.sensor_util:
sensor_dict = self.sensor_util.get_all()
return sensor_dict
def sensor_verify(self, sensor_dict):
self.logger.log_info("sensor verify start")
if not sensor_dict:
self.log_reason("get sensors failed!")
return E.EFAIL
ret = E.OK
header = ["Sensor", 'InputName', 'Status', 'Value', 'LowThd', 'HighThd']
status_table = []
try:
for sensor_name, sensor_obj in sensor_dict.items():
if sensor_name == 'Number':
continue
if not isinstance(sensor_obj, dict):
continue
si_names = [k for k in sensor_obj.keys()]
si_names.sort()
for si_name in si_names:
si = sensor_obj[si_name]
sval = si.get('Value')
slow = si.get('LowThd')
shigh = si.get("HighThd")
sunit = si.get('Unit')
sdesc = si.get('Description')
fault = False
if not sunit:
sunit = ""
stype = si.get('Type')
if stype:
type2unit = {"amp": "A", "voltage": "V", "power": "W",
"temperature": "C", "RPM": "RPM"}
if stype in type2unit:
sunit = type2unit[stype]
try:
sval = float(sval)
except:
sval = 0.0
fault = True
try:
slow = float(slow)
except:
slow = 0.0
fault = True
try:
shigh = float(shigh)
except:
shigh = 0.0
fault = True
status = 'NOT_OK'
if fault == False and sval >= slow and sval <= shigh:
status = 'OK'
else:
ret = E.ESSR7003
self.log_reason("{} out of threshold".format(si_name))
status_table.append([sensor_name, si_name, status, "{} {}".format(sval, sunit), \
"{} {}".format(slow, sunit), "{} {}".format(shigh, sunit)])
except Exception as e:
reason = "sensor verify got exception: {}".format(str(e))
self.fail_reason.append(str(e))
self.logger.log_err(traceback.format_exc())
ret = E.ESSR7002
if len(status_table) > 0:
status_table.sort()
self.logger.log_info(tabulate(status_table, header, tablefmt="simple"))
if ret != E.OK:
self.logger.log_err("sensor verify done, FAILED.")
else:
self.logger.log_info("sensor verify done, PASS.")
return ret
def run_test(self, *argv):
try:
sensor_dict = self.load_sensor_info()
ret = self.sensor_verify(sensor_dict)
except Exception as e:
reason = "load_sensor/sensor_verify got exception: {}".format(str(e))
self.log_reason(reason)
return ret

View File

@ -0,0 +1,224 @@
import os
import re
import subprocess
from test_case import TestCaseCommon
from errcode import *
from function import run_command
import traceback
class SSDTC(TestCaseCommon):
def __init__(self, index, logger, platform_cfg_file, case_cfg_file=None):
MODULE_NAME = "ssd_tc"
TestCaseCommon.__init__(self, index, MODULE_NAME, logger, platform_cfg_file, case_cfg_file)
self.test_size = 1 # unit: MBytes, default
self.ssd_bom_list = None # default
try:
if self.platform_cfg_json and 'ssd_test_size' in self.platform_cfg_json.keys():
size = self.platform_cfg_json['ssd_test_size']
if size.endswith("m") or size.endswith("M"):
self.test_size = int(size.strip("mM"))
else:
self.test_size = int(size)
if self.platform_cfg_json and 'ssd_bom' in self.platform_cfg_json.keys():
self.ssd_bom_list = self.platform_cfg_json['ssd_bom']
except Exception as e:
self.logger.log_err(str(e))
def search_dir_by_name(self, name, dir):
result = []
try:
files = os.listdir(dir)
for file in files:
if name in file:
result.append(os.path.join(dir, file))
except Exception as e:
pass
return result
def get_ssd_location(self):
ret = NO_ERR
dir = "/sys/block/"
spect = "sd"
ssdpath = []
result = self.search_dir_by_name(spect, dir)
if len(result) <= 0:
ret = ABSENT_ERR
else:
for item in result:
with open(os.path.join(item, "removable"), 'r') as fd:
value = fd.read()
if value.strip() == "0": # found ssd
ssd_disk = "/dev/" + os.path.basename(item)
ssdpath.append(ssd_disk)
if not ssdpath: # not found ssd
self.logger.log_err("no ssd found")
ret = ABSENT_ERR
if ret:
self.log_reason("ssd not found!")
return ret, ssdpath
def test_ssd_info(self, ssdpath):
ret = E.OK
ssd = {}
self.logger.log_info("test ssd info start")
for path in ssdpath:
status, out = run_command("smartctl -i {}".format(path))
self.logger.log_info(out)
if status:
err = "Read ssd {} info failed!".format(path)
self.log_reason(err)
ret = E.ESSD2001
else:
if self.ssd_bom_list:
matched = False
model_match = False
size_match = False
lines = out.splitlines()
for ssd_bom in self.ssd_bom_list:
expected_model = ssd_bom["model"]
expected_size = ssd_bom["size"]
for line in lines:
if line.startswith("Device Model:"):
if line.find(expected_model) != -1:
model_match = True
continue
elif line.startswith("User Capacity"):
if line.find(expected_size) != -1:
size_match = True
continue
if model_match and size_match:
matched = True
break
# Does not match any of expected BOM
if not matched:
ret = E.ESSD2001
self.fail_reason.append("SSD model/size not match")
if ret != E.OK:
self.logger.log_err("test ssd info done, FAILED.")
else:
self.logger.log_err("test ssd info done, PASS.")
return ret
def ssd_health_check(self, ssdpath):
"""
SSD SMART overall-health self-assessment test
"""
ret = E.OK
self.logger.log_info("ssd health check start")
for path in ssdpath:
status, out = run_command("smartctl -H {} | grep result".format(path))
self.logger.log_info(out)
if out.find("PASSED") == -1:
reason = "ssd {} health check failed!".format(path)
ret = E.ESSD2004
self.log_reason(err)
if ret != E.OK:
self.logger.log_err("ssd health check done, FAILED.")
else:
self.logger.log_err("ssd health check done, PASS.")
return ret
def ssd_read_test(self):
self.logger.log_info("ssd read test start")
bs_count = self.test_size * 64
cmd = "dd if=/dev/sda of=/dev/null bs=16k count=%d iflag=direct,nonblock" % bs_count
self.logger.log_dbg(cmd)
status, out = run_command(cmd)
if status:
err = "[{}] read test failed!".format(self.module_name)
self.log_reason.append(err)
else:
self.logger.log_info(out)
if status:
self.logger.log_err("ssd read test done, FAILED.")
ret = E.ESSD2002
else:
self.logger.log_info("ssd read test done, PASS.")
ret = E.OK
return ret
def ssd_write_test(self):
self.logger.log_info("ssd write test start")
bs_count = self.test_size * 64
cmd = "dd if=/dev/urandom of=/tmp/txtfile_ssd bs=16k count=%d oflag=direct,nonblock" % bs_count
self.logger.log_info(cmd)
status, out = run_command(cmd)
if status:
err = "[{}] write test failed!".format(self.module_name)
self.log_reason.append(err)
else:
self.logger.log_info(out)
os.remove("/tmp/txtfile_ssd")
if status:
self.logger.log_err("ssd write test done, FAILED.")
ret = E.ESSD2003
else:
self.logger.log_info("ssd write test done, PASS.")
ret = E.OK
return ret
def run_test(self, *argv):
final_ret = E.OK
try:
status, ssdpath = self.get_ssd_location()
if status:
final_ret = E.ESSD2001
except Exception as e:
reason = "get ssd location exception {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
try:
ret = self.test_ssd_info(ssdpath)
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "ssd info check exception {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
try:
ret = self.ssd_health_check(ssdpath)
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "ssd health check exception {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
try:
ret = self.ssd_read_test()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "ssd read test exception {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
try:
ret = self.ssd_write_test()
if ret != E.OK:
final_ret = ret
except Exception as e:
reason = "ssd write test exception {}".format(str(e))
self.log_reason(reason)
self.logger.log_err(traceback.format_exc())
return final_ret