sonic-buildimage/platform/broadcom/sonic-platform-modules-ruijie/common/script/ruijieutil.py

1828 lines
58 KiB
Python
Raw Normal View History

# -*- coding: UTF-8 -*-
#-------------------------------------------------------------------------
# Name: ruijieutil
# Purpose: common configuration and api
#
# Author: rd
#
# Created: 02/07/2018
# Copyright: (c) rd 2018
#-------------------------------------------------------------------------
import sys
import os
import re
import syslog
import time
import binascii
import tty
import termios
import threading
import click
import mmap
from sonic_py_common.general import getstatusoutput_noshell, getstatusoutput_noshell_pipe
from ruijieconfig import rg_eeprom, FRULISTS, MAC_DEFAULT_PARAM, MAC_AVS_PARAM, FANS_DEF, \
FAN_PROTECT, E2_LOC, E2_PROTECT, RUIJIE_SERVICE_TAG, RUIJIE_DIAG_VERSION, \
STARTMODULE, RUIJIE_CARDID, RUIJIE_PRODUCTNAME, RUIJIE_PART_NUMBER, \
RUIJIE_LABEL_REVISION, RUIJIE_MAC_SIZE, RUIJIE_MANUF_NAME, RUIJIE_MANUF_COUNTRY, \
RUIJIE_VENDOR_NAME, MAILBOX_DIR
try:
from eepromutil.fru import ipmifru
except Exception or SystemExit:
pass
import logging.handlers
import shutil
import gzip
import glob
__all__ = ["MENUID", "MENUPARENT", "MENUVALUE", "CHILDID", "MENUITEMNAME", "MENUITEMDEAL",
"GOBACK", "GOQUIT", "file_name", "CRITICAL", "FATAL", "ERROR", "WARNING", "WARN",
"INFO", "DEBUG", "NOTSET", "levelNames", "TLV_INFO_ID_STRING", "TLV_INFO_VERSION",
"TLV_INFO_LENGTH", "TLV_INFO_LENGTH_VALUE", "TLV_CODE_PRODUCT_NAME",
"TLV_CODE_PART_NUMBER", "TLV_CODE_SERIAL_NUMBER", "TLV_CODE_MAC_BASE",
"TLV_CODE_MANUF_DATE", "TLV_CODE_DEVICE_VERSION", "TLV_CODE_LABEL_REVISION",
"TLV_CODE_PLATFORM_NAME", "TLV_CODE_ONIE_VERSION", "TLV_CODE_MAC_SIZE",
"TLV_CODE_MANUF_NAME", "TLV_CODE_MANUF_COUNTRY", "TLV_CODE_VENDOR_NAME",
"TLV_CODE_DIAG_VERSION", "TLV_CODE_SERVICE_TAG", "TLV_CODE_VENDOR_EXT",
"TLV_CODE_CRC_32", "_TLV_DISPLAY_VENDOR_EXT", "TLV_CODE_RJ_CARID",
"_TLV_INFO_HDR_LEN", "SYSLOG_IDENTIFIER", "log_info", "log_debug", "log_warning",
"log_error", "CompressedRotatingFileHandler", "SETMACException", "checkinput",
"checkinputproduct", "getInputSetmac", "fan_tlv", "AVSUTIL", "I2CUTIL", "BMC",
"getSdkReg", "getfilevalue", "get_sysfs_value", "write_sysfs_value",
"RJPRINTERR", "strtoint", "inttostr", "str_to_hex", "hex_to_str", "str_to_bin",
"bin_to_str", "getMacTemp", "getMacTemp_sysfs", "restartDockerService",
"waitForDhcp", "waitForSdk", "waitForDocker", "getTLV_BODY", "_crc32",
"printvalue", "generate_value", "getsyseeprombyId", "fac_init_cardidcheck",
"isValidMac", "util_setmac", "getInputCheck", "getrawch", "upper_input",
"changeTypeValue", "astrcmp", "generate_ext", "rji2cget", "rji2cset", "rjpcird",
"rjpciwr", "rjsysset", "rji2cgetWord", "rji2csetWord", "fan_setmac", "checkfansninput",
"checkfanhwinput", "util_show_fanse2", "get_fane2_sysfs", "util_show_fane2", "getPid",
"fac_fans_setmac_tlv", "fac_fan_setmac_fru", "fac_fans_setmac", "fac_fan_setmac",
"writeToEEprom", "get_local_eth0_mac", "getonieversion", "createbmcMac",
"fac_board_setmac", "ipmi_set_mac", "getInputValue", "bmc_setmac", "closeProtocol",
"checkSdkMem", "getch", "get_raw_input", "getsysvalue", "get_pmc_register", "decoder",
"decode_eeprom", "get_sys_eeprom", "getCardId", "getsysmeminfo", "getsysmeminfo_detail",
"getDmiSysByType", "gethwsys", "getsysbios", "searchDirByName", "getUsbLocation",
"getusbinfo", "get_cpu_info", "get_version_config_info", "io_rd", "io_wr"]
MENUID = "menuid"
MENUPARENT = "parentid"
MENUVALUE = "value"
CHILDID = "childid"
MENUITEMNAME = "name"
MENUITEMDEAL = "deal"
GOBACK = "goBack"
GOQUIT = "quit"
file_name = "/etc/init.d/opennsl-modules-3.16.0-5-amd64"
##########################################################################
# ERROR LOG LEVEL
##########################################################################
CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
NOTSET = 0
levelNames = {
CRITICAL: 'CRITICAL',
ERROR: 'ERROR',
WARNING: 'WARNING',
INFO: 'INFO',
DEBUG: 'DEBUG',
NOTSET: 'NOTSET',
'CRITICAL': CRITICAL,
'ERROR': ERROR,
'WARN': WARNING,
'WARNING': WARNING,
'INFO': INFO,
'DEBUG': DEBUG,
'NOTSET': NOTSET,
}
TLV_INFO_ID_STRING = "TlvInfo\x00"
TLV_INFO_VERSION = 0x01
TLV_INFO_LENGTH = 0x00
TLV_INFO_LENGTH_VALUE = 0xba
##########################################################################
# eeprom info
##########################################################################
TLV_CODE_PRODUCT_NAME = 0x21
TLV_CODE_PART_NUMBER = 0x22
TLV_CODE_SERIAL_NUMBER = 0x23
TLV_CODE_MAC_BASE = 0x24
TLV_CODE_MANUF_DATE = 0x25
TLV_CODE_DEVICE_VERSION = 0x26
TLV_CODE_LABEL_REVISION = 0x27
TLV_CODE_PLATFORM_NAME = 0x28
TLV_CODE_ONIE_VERSION = 0x29
TLV_CODE_MAC_SIZE = 0x2A
TLV_CODE_MANUF_NAME = 0x2B
TLV_CODE_MANUF_COUNTRY = 0x2C
TLV_CODE_VENDOR_NAME = 0x2D
TLV_CODE_DIAG_VERSION = 0x2E
TLV_CODE_SERVICE_TAG = 0x2F
TLV_CODE_VENDOR_EXT = 0xFD
TLV_CODE_CRC_32 = 0xFE
_TLV_DISPLAY_VENDOR_EXT = 1
TLV_CODE_RJ_CARID = 0x01
_TLV_INFO_HDR_LEN = 11
SYSLOG_IDENTIFIER = "UTILTOOL"
# ========================== Syslog wrappers ==========================
def log_info(msg, also_print_to_console=False):
syslog.openlog(SYSLOG_IDENTIFIER)
syslog.syslog(syslog.LOG_INFO, msg)
syslog.closelog()
if also_print_to_console:
click.echo(msg)
def log_debug(msg, also_print_to_console=False):
try:
syslog.openlog(SYSLOG_IDENTIFIER)
syslog.syslog(syslog.LOG_DEBUG, msg)
syslog.closelog()
if also_print_to_console:
click.echo(msg)
except Exception as e:
pass
def log_warning(msg, also_print_to_console=False):
syslog.openlog(SYSLOG_IDENTIFIER)
syslog.syslog(syslog.LOG_WARNING, msg)
syslog.closelog()
if also_print_to_console:
click.echo(msg)
def log_error(msg, also_print_to_console=False):
syslog.openlog(SYSLOG_IDENTIFIER)
syslog.syslog(syslog.LOG_ERR, msg)
syslog.closelog()
if also_print_to_console:
click.echo(msg)
class CompressedRotatingFileHandler(logging.handlers.RotatingFileHandler):
def doRollover(self):
"""
Do a rollover, as described in __init__().
"""
if self.stream:
self.stream.close()
self.stream = None
if self.backupCount > 0:
for i in range(self.backupCount - 1, 0, -1):
sfn = "%s.%d.gz" % (self.baseFilename, i)
dfn = "%s.%d.gz" % (self.baseFilename, i + 1)
if os.path.exists(sfn):
if os.path.exists(dfn):
os.remove(dfn)
os.rename(sfn, dfn)
dfn = self.baseFilename + ".1.gz"
if os.path.exists(dfn):
os.remove(dfn)
# These two lines below are the only new lines. I commented out the os.rename(self.baseFilename, dfn) and
# replaced it with these two lines.
with open(self.baseFilename, 'rb') as f_in, gzip.open(dfn, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
self.mode = 'w'
self.stream = self._open()
class SETMACException(Exception):
def __init__(self, param='ERROR', errno="-1"):
err = "Setmac fail[%s]: %s" % (errno, param)
Exception.__init__(self, err)
self.param = param
self.errno = errno
def checkinput(b):
if b.isdigit() == False:
raise Exception("Ivalid Number")
if int(b) > 0xff or int(b) < 0:
raise Exception("Out of area")
def checkinputproduct(b):
if b.isalnum() ==False:
raise Exception("Invalid string")
def getInputSetmac(val):
bia = val.boardInfoArea
pia = val.productInfoArea
if bia != None:
a = raw_input("[Board Card]Product Serial Number:")
if len(a) != 13:
raise Exception("Invalid Serial Number length")
checkinputproduct(a)
bia.boardSerialNumber = a
b = raw_input("[Board Card]Product Version:(from 1-255)")
checkinput(b)
b = "%0x" % int(b)
bia.boardextra1 = b.upper()
if pia != None:
a = raw_input("[Product Area]Product Serial Number:")
if len(a) != 13:
raise Exception("Invalid Serial Number")
checkinputproduct(a)
pia.productSerialNumber = a
b = raw_input("[Product Area]Product Version:(from 1-255)")
checkinput(b)
b = "%0x" % int(b)
pia.productVersion = b.upper()
return val
class fan_tlv(object):
VERSION = 0x01 # E2PROM Versionstart from 0x01
FLAG = 0x7E # New E2PROM version flag is 0x7E
HW_VER = 0X01 # compose by master version and fixed version
TYPE = 0xf1 # hw type defination
TLV_LEN = 00 # data length (16bit)
_FAN_TLV_HDR_LEN = 6
_FAN_TLV_CRC_LEN = 2
_FAN_TLV_TYPE_NAME = 0x02
_FAN_TLV_TYPE_SN = 0x03
_FAN_TLV_TYPE_HW_INFO = 0x05
_FAN_TLV_TYPE_DEV_TYPE = 0x06
_fandecodetime = 0
@property
def dstatus(self):
return self._dstatus
@property
def typename(self):
return self._typename
@property
def typesn(self):
return self._typesn
@property
def typehwinfo(self):
return self._typehwinfo
@property
def typedevtype(self):
return self._typedevtype
@property
def fanbus(self):
return self._fanbus
@property
def fanloc(self):
return self._fanloc
@property
def fandecodetime(self):
return self._fandecodetime
def __init__(self):
self._typename = ""
self._typesn = ""
self._typehwinfo = ""
self._typedevtype = ""
self._dstatus = 0
def strtoarr(self, str):
s = []
if str is not None:
for index in range(len(str)):
s.append(str[index])
return s
def generate_fan_value(self):
bin_buffer = [chr(0xff)] * 256
bin_buffer[0] = chr(self.VERSION)
bin_buffer[1] = chr(self.FLAG)
bin_buffer[2] = chr(self.HW_VER)
bin_buffer[3] = chr(self.TYPE)
temp_t = "%08x" % self.typedevtype # handle devtype first
typedevtype_t = hex_to_str(temp_t)
total_len = len(self.typename) + len(self.typesn) + \
len(self.typehwinfo) + len(typedevtype_t) + 8
bin_buffer[4] = chr(total_len >> 8)
bin_buffer[5] = chr(total_len & 0x00FF)
index_start = 6
bin_buffer[index_start] = chr(self._FAN_TLV_TYPE_NAME)
bin_buffer[index_start + 1] = chr(len(self.typename))
bin_buffer[index_start + 2: index_start + 2 +
len(self.typename)] = self.strtoarr(self.typename)
index_start = index_start + 2 + len(self.typename)
bin_buffer[index_start] = chr(self._FAN_TLV_TYPE_SN)
bin_buffer[index_start + 1] = chr(len(self.typesn))
bin_buffer[index_start + 2:index_start + 2 +
len(self.typesn)] = self.strtoarr(self.typesn)
index_start = index_start + 2 + len(self.typesn)
bin_buffer[index_start] = chr(self._FAN_TLV_TYPE_HW_INFO)
bin_buffer[index_start + 1] = chr(len(self.typehwinfo))
bin_buffer[index_start + 2:index_start + 2 +
len(self.typehwinfo)] = self.strtoarr(self.typehwinfo)
index_start = index_start + 2 + len(self.typehwinfo)
bin_buffer[index_start] = chr(self._FAN_TLV_TYPE_DEV_TYPE)
bin_buffer[index_start + 1] = chr(len(typedevtype_t))
bin_buffer[index_start + 2:index_start + 2 +
len(typedevtype_t)] = self.strtoarr(typedevtype_t)
index_start = index_start + 2 + len(typedevtype_t)
crcs = fan_tlv.fancrc(''.join(bin_buffer[0:index_start])) # check 2bytes
bin_buffer[index_start] = chr(crcs >> 8)
bin_buffer[index_start + 1] = chr(crcs & 0x00ff)
return bin_buffer
def decode(self, e2):
ret = []
self.VERSION = ord(e2[0])
self.FLAG = ord(e2[1])
self.HW_VER = ord(e2[2])
self.TYPE = ord(e2[3])
self.TLV_LEN = (ord(e2[4]) << 8) | ord(e2[5])
tlv_index = self._FAN_TLV_HDR_LEN
tlv_end = self._FAN_TLV_HDR_LEN + self.TLV_LEN
# check checksum
if len(e2) < self._FAN_TLV_HDR_LEN + self.TLV_LEN + 2:
self._dstatus = -2
return ret
sumcrc = fan_tlv.fancrc(e2[0:self._FAN_TLV_HDR_LEN + self.TLV_LEN])
readcrc = ord(e2[self._FAN_TLV_HDR_LEN + self.TLV_LEN]
) << 8 | ord(e2[self._FAN_TLV_HDR_LEN + self.TLV_LEN + 1])
if sumcrc != readcrc:
self._dstatus = -1
return ret
else:
self._dstatus = 0
while (tlv_index + 2) < len(e2) and tlv_index < tlv_end:
s = self.decoder(
e2[tlv_index:tlv_index + 2 + ord(e2[tlv_index + 1])])
tlv_index += ord(e2[tlv_index + 1]) + 2
ret.append(s)
return ret
@staticmethod
def fancrc(t):
sum = 0
for index in range(len(t)):
sum += ord(t[index])
return sum
def decoder(self, t):
try:
name = ""
value = ""
if ord(t[0]) == self._FAN_TLV_TYPE_NAME:
name = "Product Name"
value = str(t[2:2 + ord(t[1])])
self._typename = value
elif ord(t[0]) == self._FAN_TLV_TYPE_SN:
name = "serial Number"
value = str(t[2:2 + ord(t[1])])
self._typesn = value
elif ord(t[0]) == self._FAN_TLV_TYPE_HW_INFO:
name = "hardware info"
value = str(t[2:2 + ord(t[1])])
self._typehwinfo = value
elif ord(t[0]) == self._FAN_TLV_TYPE_DEV_TYPE:
name = "dev type"
value = str(t[2:2 + ord(t[1])])
value = str_to_hex(value)
self._typedevtype = value
value = "0x08%x" % value
except Exception as e:
print(e)
return {"name": name, "code": ord(t[0]), "value": value}
def __str__(self):
formatstr = "VERSION : 0x%02x \n" \
" FLAG : 0x%02x \n" \
" HW_VER : 0x%02x \n" \
" TYPE : 0x%02x \n" \
"typename : %s \n" \
"typesn : %s \n" \
"typehwinfo : %s \n"
return formatstr % (self.VERSION, self.FLAG, self.HW_VER, self.TYPE, self.typename, self.typesn, self.typehwinfo)
class AVSUTIL():
@staticmethod
def mac_avs_chip(bus, devno, loc, open, close, loop, protectaddr, level, loopaddr):
# disable protection
rji2cset(bus, devno, protectaddr, open)
rji2cset(bus, devno, loopaddr, loop)
rji2csetWord(bus, devno, loc, level)
ret, value = rji2cgetWord(bus, devno, loc)
if strtoint(value) == level:
ret = 0
# enable protection
rji2cset(bus, devno, protectaddr, close)
if ret == 0:
return True
return False
@staticmethod
def macPressure_adj(macavs, avs_param, mac_def_param):
# check whether it within range
max_adj = max(avs_param.keys())
min_adj = min(avs_param.keys())
type = mac_def_param["type"]
level = 0
if type == 0:
if macavs not in range(min_adj, max_adj + 1):
return False
else:
level = macavs
else:
if macavs not in range(min_adj, max_adj + 1):
level = mac_def_param["default"]
else:
level = macavs
ret = AVSUTIL.mac_avs_chip(mac_def_param["bus"], mac_def_param["devno"], mac_def_param["addr"], mac_def_param["open"],
mac_def_param["close"], mac_def_param["loop"], mac_def_param["protectaddr"], avs_param[level], mac_def_param["loopaddr"])
return ret
@staticmethod
def mac_adj():
macavs = 0
name = MAC_DEFAULT_PARAM["sdkreg"]
ret, status = getSdkReg(name)
if ret == False:
return False
status = strtoint(status)
# shift operation
if MAC_DEFAULT_PARAM["sdktype"] != 0:
status = (
status >> MAC_DEFAULT_PARAM["macregloc"]) & MAC_DEFAULT_PARAM["mask"]
macavs = status
ret = AVSUTIL.macPressure_adj(macavs, MAC_AVS_PARAM, MAC_DEFAULT_PARAM)
return ret
class I2CUTIL():
@staticmethod
def getvaluefromdevice(name):
ret = []
for item in DEVICE:
if item["name"] == name:
ret.append(item)
return ret
@staticmethod
def openFanE2Protect():
rji2cset(FAN_PROTECT["bus"], FAN_PROTECT["devno"],
FAN_PROTECT["addr"], FAN_PROTECT["open"])
@staticmethod
def closeFanE2Protect():
rji2cset(FAN_PROTECT["bus"], FAN_PROTECT["devno"],
FAN_PROTECT["addr"], FAN_PROTECT["close"])
@staticmethod
def writeToFanE2(bus, loc, rst_arr):
index = 0
for item in rst_arr:
rji2cset(bus, loc, index, ord(item))
index += 1
@staticmethod
def writeToE2(bus, loc, rst_arr):
index = 0
for item in rst_arr:
rji2cset(bus, loc, index, ord(item))
index += 1
@staticmethod
def getE2File(bus, loc):
return "/sys/bus/i2c/devices/%d-00%02x/eeprom" % (bus, loc)
class BMC():
_instance_lock = threading.Lock()
def __init__(self):
pass
def __new__(cls, *args, **kwargs):
if not hasattr(Singleton, "_instance"):
with Singleton._instance_lock:
if not hasattr(Singleton, "_instance"):
Singleton._instance = object.__new__(cls)
return Singleton._instance
# Internal interface
def getSdkReg(reg):
try:
cmd = ["bcmcmd", "-t", "1", "getr"+str(reg)]
ret, result = getstatusoutput_noshell(cmd)
result_t = result.strip().replace("\r", "").replace("\n", "")
if ret != 0 or "Error:" in result_t:
return False, result
patt = r"%s.(.*):(.*)>drivshell" % reg
rt = re.findall(patt, result_t, re.S)
test = re.findall("=(.*)", rt[0][0])[0]
except Exception as e:
return False, 'getsdk register error'
return True, test
def getfilevalue(location):
try:
with open(location, 'r') as fd:
value = fd.read()
return True, value.strip()
except Exception as e:
return False, "error"
def get_sysfs_value(location):
pos_t = str(location)
name = get_pmc_register(pos_t)
return name
def write_sysfs_value(reg_name, value):
fileLoc = MAILBOX_DIR + reg_name
try:
if not os.path.isfile(fileLoc):
print(fileLoc, 'not found !')
return False
with open(fileLoc, 'w') as fd:
fd.write(value)
except Exception as error:
log_error("Unable to open " + fileLoc + "file !")
return False
return True
def RJPRINTERR(str):
print("\033[0;31m%s\033[0m" % str)
def strtoint(str): # convert Hex string to int such as "4040"/"0x4040"/"0X4040" = 16448
value = 0
rest_v = str.replace("0X", "").replace("0x", "")
for index in range(len(rest_v)):
print(rest_v[index])
value |= int(rest_v[index], 16) << ((len(rest_v) - index - 1) * 4)
return value
def inttostr(vl,len): # convert int to string such as 0x3030 = 00
if type(vl) != int:
raise Exception(" type error")
index = 0
ret_t = ""
while index < len:
ret = 0xff & (vl >> index * 8)
ret_t += chr(ret)
index += 1;
return ret_t
def str_to_hex(rest_v):
value = 0
for index in range(len(rest_v)):
value |= ord(rest_v[index]) << ((len(rest_v) - index - 1) * 8)
return value
def hex_to_str(s):
len_t = len(s)
if len_t % 2 != 0:
return 0
ret = ""
for t in range(0, int(len_t / 2)):
ret += chr(int(s[2 * t:2 * t + 2], 16))
return ret
def str_to_bin(s):
return ' '.join([bin(ord(c)).replace('0b', '') for c in s])
def bin_to_str(s):
return ''.join([chr(i) for i in [int(b, 2) for b in s.split(' ')]])
def getMacTemp():
result = {}
#waitForDocker()
# exec twice, get the second result
getstatusoutput_noshell(["bcmcmd", "-t", "1", "show temp"])
ret, log = getstatusoutput_noshell(["bcmcmd", "-t", "1", "show temp"])
if ret:
return False, result
else:
# decode obtained info
logs = log.splitlines()
for line in logs:
if "average" in line:
b = re.findall(r'\d+.\d+', line)
result["average"] = b[0]
elif "maximum" in line:
b = re.findall(r'\d+.\d+', line)
result["maximum"] = b[0]
return True, result
def getMacTemp_sysfs(mactempconf):
try:
temp = -1000000
temp_list = []
mac_temp_loc = mactempconf.get("loc", [])
mac_temp_flag = mactempconf.get("flag",None)
if mac_temp_flag is not None: # check mac temperature vaild flag
gettype = mac_temp_flag.get('gettype')
okbit = mac_temp_flag.get('okbit')
okval = mac_temp_flag.get('okval')
if gettype == "io":
io_addr = mac_temp_flag.get('io_addr')
val = io_rd(io_addr)
if val is None:
raise Exception("get mac_flag by io failed.")
else:
bus = mac_temp_flag.get('bus')
loc = mac_temp_flag.get('loc')
offset = mac_temp_flag.get('offset')
ind, val = rji2cget(bus, loc, offset)
if ind is not True:
raise Exception("get mac_flag by i2c failed.")
val_t = (int(val,16) & (1<< okbit)) >> okbit
if val_t != okval:
raise Exception("mac_flag invalid, val_t:%d." % val_t)
for loc in mac_temp_loc:
temp_s = get_sysfs_value(loc)
if isinstance(temp_s, str) and temp_s.startswith("ERR"):
raise Exception("get mac temp error. loc:%s" % loc)
temp_t = int(temp_s)
if temp_t == -1000000:
raise Exception("mac temp invalid.loc:%s" % loc)
temp_list.append(temp_t)
temp_list.sort(reverse=True)
temp = temp_list[0]
except Exception as e:
return False, temp
return True, temp
def restartDockerService(force=False):
container_name = ["database","snmp","syncd","swss","dhcp_relay","radv","teamd","pmon"]
ret, status = getstatusoutput_noshell(["docker", "ps"])
if ret == 0 :
for tmpname in container_name:
if (tmpname not in status):
if (force == True):
getstatusoutput_noshell(["docker", "restart", tmpname])
else:
getstatusoutput_noshell(["systemctl", "restart", tmpname])
def waitForDhcp(timeout):
time_cnt = 0
while True:
try:
ret, status = getstatusoutput_noshell(["systemctl", "status", "dhcp_relay.service"])
if (ret == 0 and "running" in status) or "SUCCESS" in status:
break
else:
sys.stdout.write(".")
sys.stdout.flush()
time_cnt = time_cnt + 1
if time_cnt > timeout:
raise Exception("waitForDhcp timeout")
time.sleep(1)
except Exception as e:
return False
return True
def waitForSdk(sdk_fpath ,timeout):
time_cnt = 0
while True:
try:
if os.path.exists(sdk_fpath):
break
else:
sys.stdout.write(".")
sys.stdout.flush()
time_cnt = time_cnt + 1
if time_cnt > timeout:
raise Exception("waitForSdk timeout")
time.sleep(1)
except Exception as e:
return False
return True
def waitForDocker(need_restart=False,timeout=180):
sdkcheck_params = STARTMODULE.get("sdkcheck",{})
if sdkcheck_params.get("checktype") == "file": # pass file check
sdk_fpath = sdkcheck_params.get("sdk_fpath")
return waitForSdk(sdk_fpath,timeout)
return waitForDhcp(timeout)
def getTLV_BODY(type, productname):
x = []
temp_t = ""
if type == TLV_CODE_MAC_BASE:
arr = productname.split(':')
for tt in arr:
temp_t += chr(int(tt, 16))
elif type == TLV_CODE_DEVICE_VERSION:
temp_t = chr(productname)
elif type == TLV_CODE_MAC_SIZE:
temp_t = chr(productname >> 8) + chr(productname & 0x00ff)
else:
temp_t = productname
x.append(chr(type))
x.append(chr(len(temp_t)))
for i in temp_t:
x.append(i)
return x
def _crc32(v):
return '0x%08x' % (binascii.crc32(v) & 0xffffffff) # get 8 bytes of crc32 %x return hex
def printvalue(b):
index = 0
for i in range(0, len(b)):
if index % 16 == 0:
print(" ")
print("%02x " % ord(b[i]))
index += 1
print("\n")
def generate_value(_t):
ret = []
for i in TLV_INFO_ID_STRING:
ret.append(i)
ret.append(chr(TLV_INFO_VERSION))
ret.append(chr(TLV_INFO_LENGTH))
ret.append(chr(TLV_INFO_LENGTH_VALUE))
total_len = 0
for key in _t:
x = getTLV_BODY(key, _t[key])
ret += x
total_len += len(x)
ret[10] = chr(total_len + 6)
ret.append(chr(0xFE))
ret.append(chr(0x04))
s = _crc32(''.join(ret))
for t in range(0, 4):
ret.append(chr(int(s[2 * t + 2:2 * t + 4], 16)))
totallen = len(ret)
if (totallen < 256):
for left_t in range(0, 256 - totallen):
ret.append(chr(0x00))
return (ret, True)
def getsyseeprombyId(id):
ret = get_sys_eeprom()
for item in ret:
if item["code"] == id:
return item
return None
def fac_init_cardidcheck():
rest = getsyseeprombyId(TLV_CODE_RJ_CARID) # check cardId same or not
if rest is None:
print("need to program write bin file")
return False
else:
rest_v = rest['value']
value = strtoint(rest_v)
if value == RUIJIE_CARDID:
log_debug("check card ID pass")
else:
log_debug("check card ID error")
return False
return True
def isValidMac(mac):
if re.match(r"^\s*([0-9a-fA-F]{2,2}:){5,5}[0-9a-fA-F]{2,2}\s*$", mac):
return True
return False
# Internet cardsetmac
def util_setmac(eth, mac):
rulefile = "/etc/udev/rules.d/70-persistent-net.rules"
if isValidMac(mac) == False:
return False, "MAC invaild"
cmd1 = ["ethtool", "-e", eth]
cmd2 = ["grep", "0x0010"]
cmd3 = ["awk", '{print \"0x\"$13$12$15$14}']
ret, log = getstatusoutput_noshell_pipe(cmd1, cmd2, cmd3)
log_debug(log)
magic = ""
if ret == 0 and len(log):
magic = log
macs = mac.upper().split(":")
# chage ETH0 to value after setmac
ifconfigcmd = ["ifconfig", "eth0", "hw", "ether", mac]
log_debug(' '.join(ifconfigcmd))
ret, status = getstatusoutput_noshell(ifconfigcmd)
if ret:
raise SETMACException("software set Internet cardMAC error")
index = 0
for item in macs:
cmd = ["ethtool", "-E", eht, "magic", magic, "offset", str(index), "value", "0x"+item]
log_debug(cmd)
index += 1
ret, log = getstatusoutput_noshell(cmd)
if ret != 0:
raise SETMACException(" set hardware Internet card MAC error")
# get value after setting
cmd_t = ["ethtool", "-e", "eth0", "offset", "0", "length", "6"]
ret, log = getstatusoutput_noshell(cmd_t)
m = re.split(':', log)[-1].strip().upper()
mac_result = m.upper().split(" ")
for ind, s in enumerate(macs):
if s != mac_result[ind]:
RJPRINTERR("MAC comparison error")
if os.path.exists(rulefile):
os.remove(rulefile)
print("MGMT MAC[%s]" % mac)
return True
def getInputCheck(tips):
str = raw_input(tips)
if astrcmp(str, "y") or astrcmp(str, "ye") or astrcmp(str, "yes") or astrcmp(str, ""):
return True
else:
return False
def getrawch():
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def upper_input(tips):
sys.stdout.write(tips)
sys.stdout.flush()
passwd = []
while True:
ch = getrawch().upper()
if ch == "\r" or ch == "\n":
return "".join(passwd)
elif ch == '\b' or ord(ch) == 127:
if passwd:
del passwd[-1]
sys.stdout.write('\b \b')
else:
sys.stdout.write(ch)
passwd.append(ch)
def changeTypeValue(_value, type1, tips, example):
if type1 == TLV_CODE_PRODUCT_NAME:
while True:
print("please check (1)air from forward to backward/(2)air from backward to forward:")
option = raw_input()
if option == "1":
_value[type1] = example + "-F-RJ"
print("check Product is air from forward to backward device,Product Name:%s"%_value[type1])
break
elif option == "2":
_value[type1] = example + "-R-RJ"
print("check Product is air from backward to forward device,Product Name:%s"%_value[type1])
break
else:
print("input incorrect, check please")
return True
print("Please input[%s]such as(%s):" % (tips, example))
name = upper_input("")
if type1 == TLV_CODE_MAC_BASE:
if len(name) != 12:
raise SETMACException("MAC address length incorrect, check please")
release_mac = ""
for i in range(int(len(name) / 2)):
if i == 0:
release_mac += name[i * 2:i * 2 + 2]
else:
release_mac += ":" + name[i * 2:i * 2 + 2]
if isValidMac(release_mac) == True:
_value[type1] = release_mac
else:
raise SETMACException("MAC address invaild, check please")
elif type1 == TLV_CODE_DEVICE_VERSION:
if name.isdigit():
_value[type1] = int(name)
else:
raise SETMACException("Version is not number, check please")
elif type1 == TLV_CODE_MAC_SIZE:
if name.isdigit():
_value[type1] = int(name, 16)
else:
raise SETMACException("Version is not number, check please")
elif type1 == TLV_CODE_SERIAL_NUMBER:
if name.isalnum() == False:
raise SETMACException("Serial Number invaild string, check please")
elif len(name) != 13:
raise SETMACException("Serial Number length incorrect, check please")
else:
_value[type1] = name
elif type1 == TLV_CODE_VENDOR_EXT:
_value[type1] = name
else:
_value[type1] = name
return True
def astrcmp(str1, str2):
return str1.lower() == str2.lower()
def generate_ext(cardid):
s = "%08x" % cardid
ret = ""
for t in range(0, 4):
ret += chr(int(s[2 * t:2 * t + 2], 16))
ret = chr(0x01) + chr(len(ret)) + ret
return ret
def rji2cget(bus, devno, address):
command_line = ["i2cget", "-f", "-y", str(bus), "0x%02x"%str(devno), "0x%02x"%str(address)]
retrytime = 6
ret_t = ""
for i in range(retrytime):
ret, ret_t = getstatusoutput_noshell(command_line)
if ret == 0:
return True, ret_t
time.sleep(0.1)
return False, ret_t
def rji2cset(bus, devno, address, byte):
command_line = ["i2cset", "-f", "-y", str(bus), "0x%02x"%str(devno), "0x%02x"%str(address), "0x%02x"%str(byte)]
retrytime = 6
ret_t = ""
for i in range(retrytime):
ret, ret_t = getstatusoutput_noshell(command_line)
if ret == 0:
return True, ret_t
return False, ret_t
def rjpcird(pcibus , slot , fn, bar, offset):
'''read pci register'''
if offset % 4 != 0:
return
filename = "/sys/bus/pci/devices/0000:%02x:%02x.%x/resource%d" % (int(pcibus), int(slot), int(fn), int(bar))
file = open(filename, "r+")
size = os.path.getsize(filename)
data = mmap.mmap(file.fileno(), size)
result = data[offset: offset + 4]
s = result[::-1]
val = 0
for i in range(0, len(s)):
val = val << 8 | ord(s[i])
return "0x%08x" % val
def rjpciwr(pcibus , slot ,fn, bar, offset, data):
'''write pci register'''
ret = inttostr(data, 4)
filename = "/sys/bus/pci/devices/0000:%02x:%02x.%x/resource%d" % (int(pcibus), int(slot), int(fn), int(bar))
file = open(filename, "r+")
size = os.path.getsize(filename)
data = mmap.mmap(file.fileno(), size)
data[offset: offset + 4] = ret
result = data[offset: offset + 4]
s = result[::-1]
val = 0
for i in range(0, len(s)):
val = val << 8 | ord(s[i])
data.close()
def rjsysset(location, value):
retrytime = 6
for i in range(retrytime):
try:
with open(location, 'w') as f:
f.write('0x%02x\n'%value)
except (IOError, FileNotFoundError) as e:
return False, str(e)
return True, ''
def rji2cgetWord(bus, devno, address):
command_line = ["i2cget", "-f", "-y", str(bus), "0x%02x"%str(devno), "0x%02x"%str(address), "w"]
retrytime = 3
ret_t = ""
for i in range(retrytime):
ret, ret_t = getstatusoutput_noshell(command_line)
if ret == 0:
return True, ret_t
return False, ret_t
def rji2csetWord(bus, devno, address, byte):
command_line = ["i2cset", "-f", "-y", str(bus), "0x%02x"%str(devno), "0x%02x"%str(address), "0x%x"%str(byte), "w"]
getstatusoutput_noshell(command_line)
def fan_setmac():
rji2cset(FAN_PROTECT["bus"], FAN_PROTECT["devno"],
FAN_PROTECT["addr"], FAN_PROTECT["open"])
rji2cset(FAN_PROTECT["bus"], FAN_PROTECT["devno"],
FAN_PROTECT["addr"], FAN_PROTECT["close"])
def checkfansninput(fan_sn, fansntemp):
if fan_sn in fansntemp:
RJPRINTERR("exist same Serial Numberplease input again")
return False
if(len(fan_sn) != 13):
RJPRINTERR("Serial Number length incorrectplease input again")
return False
return True
# check 输入的hardware version
def checkfanhwinput(hw):
if len(hw) != 4:
RJPRINTERR("hardware version length incorrect, please input again")
return False
if hw.find(".") != 1:
RJPRINTERR("hardware version incorrect, please input again")
return False
return True
def util_show_fanse2(fans):
formatstr = "%-8s %-20s %-20s %-20s %-20s"
print(formatstr % ("id", "Name", "hardware version", "Serial Number", "Time"))
print(formatstr % ("------", "---------------", "---------------", "---------------", "----"))
for fan in fans:
# print fan.dstatus
if fan.dstatus < 0:
print("%-8s" % ("FAN%d" % (fans.index(fan) + 1)))
RJPRINTERR(" decode e2 error")
else:
print(formatstr % ("FAN%d" % (fans.index(fan) + 1), fan.typename.replace(chr(0x00), ""),
fan.typehwinfo.replace(chr(0x00), ""), fan.typesn.replace(chr(0x00), ""), fan.fandecodetime))
def get_fane2_sysfs(bus, loc):
rg_fan_e2 = "%d-%04x/fan" % (bus, loc)
eeprom = get_sysfs_value(rg_fan_e2)
return eeprom
def util_show_fane2():
ret = sorted(I2CUTIL.getvaluefromdevice("rg_fan"))
if len(ret) <=0:
return None
fans = []
for index in range(len(ret)):
t1 = (int(round(time.time() * 1000)))
eeprom = get_fane2_sysfs(ret[index]["bus"], ret[index]["loc"])
t2 = (int(round(time.time() * 1000)))
fane2 = fan_tlv()
fane2.fandecodetime = t2 - t1
fane2.decode(eeprom)
fans.append(fane2)
util_show_fanse2(fans)
def getPid(name):
ret = []
for dirname in os.listdir('/proc'):
if dirname == 'curproc':
continue
try:
with open('/proc/{}/cmdline'.format(dirname), mode='rb') as fd:
content = fd.read()
except Exception:
continue
if name in content:
ret.append(dirname)
return ret
def fac_fans_setmac_tlv(ret):
if len(ret) <=0:
return None
fans = []
fansntemp = []
for index in range(len(ret)):
item = ret[index]
log_debug(item)
eeprom = get_fane2_sysfs(item["bus"], item["loc"])
fane2 = fan_tlv()
fane2.decode(eeprom)
fane2.fanbus = item["bus"]
fane2.fanloc = item["loc"]
log_debug("decode eeprom success")
print("Fan[%d]-[%s]setmac" % ((index + 1), FANS_DEF[fane2.typedevtype]))
while True:
print("Please input[%s]:" % "Serial Number")
fan_sn = raw_input()
if checkfansninput(fan_sn, fansntemp) == False:
continue
fansntemp.append(fan_sn)
fan_sn = fan_sn + chr(0x00)
fane2.typesn = fan_sn + chr(0x00)
break
while True:
print("Please input[%s]:" % "hardware version")
hwinfo = raw_input()
if checkfanhwinput(hwinfo) == False:
continue
fan_hwinfo = hwinfo + chr(0x00)
fane2.typehwinfo = fan_hwinfo + chr(0x00)
break
log_debug(fane2.typedevtype)
fane2.typename = FANS_DEF[fane2.typedevtype] + chr(0x00)
fans.append(fane2)
print("\n")
print("\n*******************************\n")
util_show_fanse2(fans)
if getInputCheck("check input correctly or notYes/No):") == True:
for fan in fans:
log_debug("ouput fan")
fac_fan_setmac(fan)
else:
print("setmac quit")
return False
def fac_fan_setmac_fru(ret):
fans = FRULISTS.get('fans')
fanfrus = {}
newfrus = {}
#getmsg
try:
for fan in fans:
print("===============%s ================getmessage" % fan.get('name'))
eeprom = getsysvalue(I2CUTIL.getE2File(fan.get('bus'), fan.get('loc')))
fru = ipmifru()
fru.decodeBin(eeprom)
fanfrus[fan.get('name')] = fru
except Exception as e:
print(str(e))
return False
#setmsg
for fan in fans:
print("===============%s ================setmac" % fan.get('name'))
fruold = fanfrus.get(fan.get('name'))
newfru = getInputSetmac(fruold)
newfru.recalcute()
newfrus[fan.get('name')] = newfru
#writemsg
for fan in fans:
print("===============%s ================writeToE2" % fan.get('name'))
ret_t = newfrus.get(fan.get('name'))
I2CUTIL.openFanE2Protect()
I2CUTIL.writeToFanE2(fan.get('bus'), fan.get('loc'), ret_t.bindata)
I2CUTIL.closeFanE2Protect()
#check
try:
for fan in fans:
print("===============%s ================getmessage" % fan.get('name'))
eeprom = getsysvalue(I2CUTIL.getE2File(fan.get('bus'), fan.get('loc')))
fru = ipmifru()
fru.decodeBin(eeprom)
except Exception as e:
print(str(e))
return False
return True
def fac_fans_setmac():
ret = I2CUTIL.getvaluefromdevice("rg_fan")
if ret is not None and len(ret) > 0:
return fac_fans_setmac_tlv(ret)
fans = FRULISTS.get('fans', None)
if fans is not None and len(fans)>0:
return fac_fan_setmac_fru(ret)
return False
def fac_fan_setmac(item):
I2CUTIL.openFanE2Protect()
I2CUTIL.writeToFanE2(item.fanbus, item.fanloc, item.generate_fan_value())
I2CUTIL.closeFanE2Protect()
def writeToEEprom(rst_arr):
dealtype = E2_PROTECT.get('gettype',None)
if dealtype is None:
rji2cset(E2_PROTECT["bus"], E2_PROTECT["devno"],
E2_PROTECT["addr"], E2_PROTECT["open"])
elif dealtype == "io":
io_wr(E2_PROTECT["io_addr"], E2_PROTECT["open"])
index = 0
for item in rst_arr:
rji2cset(E2_LOC["bus"], E2_LOC["devno"], index, ord(item))
index += 1
if dealtype is None:
rji2cset(E2_PROTECT["bus"], E2_PROTECT["devno"],
E2_PROTECT["addr"], E2_PROTECT["close"])
elif dealtype == "io":
io_wr(E2_PROTECT["io_addr"], E2_PROTECT["close"])
# deal last drivers
getstatusoutput_noshell(["rmmod", "at24"])
getstatusoutput_noshell(["modprobe", "at24"])
getstatusoutput_noshell(["rm", "-f", "/var/cache/sonic/decode-syseeprom/syseeprom_cache"])
def get_local_eth0_mac():
cmd1 = ["ifconfig", "eth0"]
cmd2 = ["grep", "HWaddr"]
print(getstatusoutput_noshell_pipe(cmd1, cmd2))
def getonieversion():
if not os.path.isfile('/host/machine.conf'):
return ""
machine_vars = {}
with open('/host/machine.conf') as machine_file:
for line in machine_file:
tokens = line.split('=')
if len(tokens) < 2:
continue
machine_vars[tokens[0]] = tokens[1].strip()
return machine_vars.get("onie_version")
def createbmcMac(cpumac , num = 2):
bcmvalue = strtoint(cpumac[cpumac.rindex(":")+ 1:len(cpumac)]) + num
# bmcmac =
t = cpumac.split(":")
t[5] = "%02x" % bcmvalue
bmcmac = ":".join(t)
return bmcmac.upper()
def fac_board_setmac():
_value = {}
# default value
_value[TLV_CODE_VENDOR_EXT] = generate_ext(RUIJIE_CARDID) # generate id
_value[TLV_CODE_PRODUCT_NAME] = RUIJIE_PRODUCTNAME
_value[TLV_CODE_PART_NUMBER] = RUIJIE_PART_NUMBER
_value[TLV_CODE_LABEL_REVISION] = RUIJIE_LABEL_REVISION
_value[TLV_CODE_PLATFORM_NAME] = platform
_value[TLV_CODE_ONIE_VERSION] = getonieversion()
_value[TLV_CODE_MAC_SIZE] = RUIJIE_MAC_SIZE
_value[TLV_CODE_MANUF_NAME] = RUIJIE_MANUF_NAME
_value[TLV_CODE_MANUF_COUNTRY] = RUIJIE_MANUF_COUNTRY
_value[TLV_CODE_VENDOR_NAME] = RUIJIE_VENDOR_NAME
_value[TLV_CODE_DIAG_VERSION] = RUIJIE_DIAG_VERSION
_value[TLV_CODE_SERVICE_TAG] = RUIJIE_SERVICE_TAG
try:
if 0x00004052 == RUIJIE_CARDID:
_value[TLV_CODE_PRODUCT_NAME] = RUIJIE_PRODUCTNAME + "-RJ"
elif 0x00004051 == RUIJIE_CARDID or 0x00004050 == RUIJIE_CARDID:
changeTypeValue(_value, TLV_CODE_PRODUCT_NAME,
"Product name",RUIJIE_PRODUCTNAME)
changeTypeValue(_value, TLV_CODE_SERIAL_NUMBER,
"SN", "0000000000000") # add serial number
changeTypeValue(_value, TLV_CODE_DEVICE_VERSION,
"hardware version", "101") # hardware version
changeTypeValue(_value, TLV_CODE_MAC_BASE,
"MAC address", "58696cfb2108") # MAC address
_value[TLV_CODE_MANUF_DATE] = time.strftime(
'%m/%d/%Y %H:%M:%S', time.localtime()) # add setmac time
rst, ret = generate_value(_value)
if util_setmac("eth0", _value[TLV_CODE_MAC_BASE]) == True: # set Internet cardIP
writeToEEprom(rst) # write to e2
# set BMC MAC
if "bmcsetmac" in FACTESTMODULE and FACTESTMODULE['bmcsetmac'] == 1:
bmcmac = createbmcMac(_value[TLV_CODE_MAC_BASE])
if ipmi_set_mac(bmcmac) == True:
print("BMC MAC[%s]"%bmcmac)
else:
print("SET BMC MAC FAILED")
return False
else:
return False
except SETMACException as e:
#print(e)
RJPRINTERR("\n\n%s\n\n" % e)
return False
except ValueError as e:
return False
return True
def ipmi_set_mac(mac):
macs = mac.split(":")
cmdinit = ["ipmitool", "raw", "0x0c", "0x01", "0x01", "0xc2", "0x00"]
cmdset = ["ipmitool", "raw", "0x0c", "0x01", "0x01", "0x05"]
for ind in range(len(macs)):
cmdset.append("0x%02x" % int(macs[ind], 16))
getstatusoutput_noshell(cmdinit)
ret, status = getstatusoutput_noshell(cmdset)
if ret:
RJPRINTERR("\n\n%s\n\n" % status)
return False
return True
def getInputValue(title, tips):
print("Please input[%s]such as(%s):" % (title, tips))
name = raw_input()
return name
def bmc_setmac():
tips = "BMC MAC"
print("Please input value you want to change[%s]:" % tips)
name = raw_input()
if len(name) != 12:
RJPRINTERR("\nMAC address invaild, try again\n")
return False
release_mac = ""
for i in range(int(len(name) / 2)):
if i == 0:
release_mac += name[i * 2:i * 2 + 2]
else:
release_mac += ":" + name[i * 2:i * 2 + 2]
if isValidMac(release_mac) == True:
if ipmi_set_mac(release_mac) == True:
return True
else:
RJPRINTERR("\nMAC address invaild, try again\n")
return False
def closeProtocol():
# disable LLDP
log_info("disable LLDP")
sys.stdout.write(".")
sys.stdout.flush()
getstatusoutput_noshell(["systemctl", "stop", "lldp.service"])
log_info("disable lldp service")
sys.stdout.write(".")
sys.stdout.flush()
getstatusoutput_noshell(["systemctl", "stop", "bgp.service"])
log_info("disable bgp service")
sys.stdout.write(".")
sys.stdout.flush()
#ret, status = rj_os_system('bcmcmd "port ce,xe stp=disable"')
# check SDK memory must be 256M
def checkSdkMem():
ind = 0
file_data = ""
with open(file_name, "r") as f:
for line in f:
if "dmasize=16M" in line:
line = line.replace("dmasize=16M", "dmasize=256M")
ind = -1
file_data += line
if ind == 0:
return
with open(file_name, "w") as f:
f.write(file_data)
print("change SDK memory to 256, reboot required")
getstatusoutput_noshell(["sync"])
getstatusoutput_noshell(["reboot"])
##########################################################################
# receives a character setting
##########################################################################
def getch(msg):
ret = ""
fd = sys.stdin.fileno()
old_ttyinfo = termios.tcgetattr(fd)
new_ttyinfo = old_ttyinfo[:]
new_ttyinfo[3] &= ~termios.ICANON
new_ttyinfo[3] &= ~termios.ECHO
sys.stdout.write(msg)
sys.stdout.flush()
try:
termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
ret = os.read(fd, 1)
finally:
# print "try to setting"
termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)
return ret
def get_raw_input():
ret=""
fd=sys.stdin.fileno()
old_ttyinfo=termios.tcgetattr(fd)
new_ttyinfo=old_ttyinfo[:]
new_ttyinfo[3] &= ~termios.ICANON
new_ttyinfo[3] &= ~termios.ECHO
try:
termios.tcsetattr(fd,termios.TCSANOW,new_ttyinfo)
ret=raw_input("")
except Exception as e:
print(e)
finally:
termios.tcsetattr(fd,termios.TCSANOW,old_ttyinfo)
return ret
def getsysvalue(location):
retval = None
mb_reg_file = location
if (not os.path.isfile(mb_reg_file)):
print(mb_reg_file, 'not found !')
return retval
try:
if (not os.path.isfile(mb_reg_file)):
print(mb_reg_file, 'not found !')
return retval
with open(mb_reg_file, 'r') as fd:
retval = fd.read()
except Exception as error:
log_error("Unable to open " + mb_reg_file + "file !")
retval = retval.rstrip('\r\n')
retval = retval.lstrip(" ")
#log_debug(retval)
return retval
# get file value
def get_pmc_register(reg_name):
retval = 'ERR'
mb_reg_file = MAILBOX_DIR + reg_name
filepath = glob.glob(mb_reg_file)
if(len(filepath) == 0):
return "%s %s notfound"% (retval , mb_reg_file)
mb_reg_file = filepath[0]
if (not os.path.isfile(mb_reg_file)):
return "%s %s notfound"% (retval , mb_reg_file)
try:
with open(mb_reg_file, 'r') as fd:
retval = fd.read()
except Exception as error:
pass
retval = retval.rstrip('\r\n')
retval = retval.lstrip(" ")
return retval
# decode EEPROM
def decoder(s, t):
if ord(t[0]) == TLV_CODE_PRODUCT_NAME:
name = "Product Name"
value = str(t[2:2 + ord(t[1])])
elif ord(t[0]) == TLV_CODE_PART_NUMBER:
name = "Part Number"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_SERIAL_NUMBER:
name = "Serial Number"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_MAC_BASE:
name = "Base MAC Address"
value = ":".join([binascii.b2a_hex(T) for T in t[2:8]]).upper()
elif ord(t[0]) == TLV_CODE_MANUF_DATE:
name = "Manufacture Date"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_DEVICE_VERSION:
name = "Device Version"
value = str(ord(t[2]))
elif ord(t[0]) == TLV_CODE_LABEL_REVISION:
name = "Label Revision"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_PLATFORM_NAME:
name = "Platform Name"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_ONIE_VERSION:
name = "ONIE Version"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_MAC_SIZE:
name = "MAC Addresses"
value = str((ord(t[2]) << 8) | ord(t[3]))
elif ord(t[0]) == TLV_CODE_MANUF_NAME:
name = "Manufacturer"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_MANUF_COUNTRY:
name = "Manufacture Country"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_VENDOR_NAME:
name = "Vendor Name"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_DIAG_VERSION:
name = "Diag Version"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_SERVICE_TAG:
name = "Service Tag"
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_VENDOR_EXT:
name = "Vendor Extension"
value = ""
if _TLV_DISPLAY_VENDOR_EXT:
value = t[2:2 + ord(t[1])]
elif ord(t[0]) == TLV_CODE_CRC_32 and len(t) == 6:
name = "CRC-32"
value = "0x%08X" % (((ord(t[2]) << 24) | (
ord(t[3]) << 16) | (ord(t[4]) << 8) | ord(t[5])),)
elif ord(t[0]) == TLV_CODE_RJ_CARID:
name = "rj_cardid"
value = ""
for c in t[2:2 + ord(t[1])]:
value += "%02X" % (ord(c),)
else:
name = "Unknown"
value = ""
for c in t[2:2 + ord(t[1])]:
value += "0x%02X " % (ord(c),)
return {"name": name, "code": ord(t[0]), "value": value}
def decode_eeprom(e):
total_len = (ord(e[9]) << 8) | ord(e[10])
tlv_index = _TLV_INFO_HDR_LEN
tlv_end = _TLV_INFO_HDR_LEN + total_len
ret = []
while (tlv_index + 2) < len(e) and tlv_index < tlv_end:
rt = decoder(None, e[tlv_index:tlv_index + 2 + ord(e[tlv_index + 1])])
ret.append(rt)
if ord(e[tlv_index]) == TLV_CODE_CRC_32:
break
tlv_index += ord(e[tlv_index + 1]) + 2
for item in ret:
if item['code'] == TLV_CODE_VENDOR_EXT:
rt = decoder(None, item["value"]
[0: 0 + 2 + ord(item["value"][0 + 1])])
ret.append(rt)
return ret
def get_sys_eeprom():
eeprom = get_sysfs_value(rg_eeprom)
return decode_eeprom(eeprom)
# get card ID
def getCardId():
ret = get_sys_eeprom()
for item in ret:
if item['code'] == TLV_CODE_RJ_CARID:
return item.get('value',None)
return None
###########################################
# get memory slot and number via DMI command
###########################################
def getsysmeminfo():
ret, log = getstatusoutput_noshell(["which", "dmidecode"])
if ret != 0 or len(log) <= 0:
error = "cmd find dmidecode"
return False, error
cmd1 = [log[0].rstrip('\n')]
cmd2 = ["grep", "-P", "-A5", "Memory\s+Device"]
cmd3 = ["grep", "Size"]
cmd4 = ["grep", "-v", "Range"]
# get total number first
result = []
ret1, log1 = getstatusoutput_noshell_pipe(cmd1, cmd2, cmd3, cmd4)
if ret1 == 0 and len(log1):
log1 = log1.lstrip()
arr = log1.split("\n")
#total = len(arr) # total slot number
for i in range(len(arr)):
val = re.sub("\D", "", arr[i])
if val == "":
val = arr[i].lstrip()
val = re.sub('Size:', '', val).lstrip()
# print val
result.append({"slot": i + 1, "size": val})
return True, result
return False, "error"
###########################################
# get memory slot and number via DMI command
# return various arrays
###########################################
def getsysmeminfo_detail():
ret, log = getstatusoutput_noshell(["which", "dmidecode"])
if ret != 0 or len(log) <= 0:
error = "cmd find dmidecode"
return False, error
cmd1 = [log[0].rstrip('\n')] + ["-t", "17"]
cmd2 = ["grep", "-A21", "Memory Device"] # 17
# get total number
ret1, log1 = getstatusoutput_noshell_pipe(cmd1, cmd2)
if ret1 != 0 or len(log1) <= 0:
return False, "command execution error[%s][%s]" % (cmd1, cmd2)
result_t = log1.split("--")
mem_rets = []
for item in result_t:
its = item.replace("\t", "").strip().split("\n")
ret = {}
for it in its:
if ":" in it:
key = it.split(":")[0].lstrip()
value = it.split(":")[1].lstrip()
ret[key] = value
mem_rets.append(ret)
return True, mem_rets
###########################################
# get BIOS info via DMI command
###########################################
def getDmiSysByType(type_t):
ret, log = getstatusoutput_noshell(["which", "dmidecode"])
if ret != 0 or len(log) <= 0:
error = "cmd find dmidecode"
return False, error
cmd = [log[0].rstrip('\n')] + ["-t", str(type_t)]
# get total number
ret1, log1 = getstatusoutput_noshell(cmd)
if ret1 != 0 or len(log1) <= 0:
return False, "command execution error[%s]" % cmd
its = log1.replace("\t", "").strip().split("\n")
ret = {}
for it in its:
if ":" in it:
key = it.split(":")[0].lstrip()
value = it.split(":")[1].lstrip()
ret[key] = value
return True, ret
def gethwsys():
return getDmiSysByType(1)
###########################################
# get BIOS info via DMI command
def getsysbios():
return getDmiSysByType(0)
def searchDirByName(name, dir):
result = []
try:
files = os.listdir(dir)
for file in files:
if name in file:
result.append(os.path.join(dir, file))
except Exception as e:
pass
return result
def getUsbLocation():
dir = "/sys/block/"
spect = "sd"
usbpath = ""
result = searchDirByName(spect, dir)
if len(result) <= 0:
return False
for item in result:
with open(os.path.join(item, "removable"), 'r') as fd:
value = fd.read()
if value.strip() == "1": # U-Disk found
usbpath = item
break
if usbpath == "": # no U-Disk found
log_debug("no usb found")
return False, usbpath
return True, usbpath
# judge USB file
def getusbinfo():
ret, path = getUsbLocation()
if ret == False:
return False, "not usb exists"
str = os.path.join(path, "size")
ret, value = getfilevalue(str)
if ret == True:
return True, {"id": os.path.basename(path), "size": float(value) * 512 / 1024 / 1024 / 1024}
else:
return False, "Err"
def get_cpu_info():
cmd1 = ["cat", "/proc/cpuinfo"]
cmd2 = ["grep", "processor", "-A18"] # 17
ret, log1 = getstatusoutput_noshell_pipe(cmd1, cmd2)
if ret != 0 or len(log1) <= 0:
return False, "command execution error[%s][%s]" % (cmd1, cmd2)
result_t = log1.split("--")
mem_rets = []
for item in result_t:
its = item.replace("\t", "").strip().split("\n")
ret = {}
for it in its:
if ":" in it:
key = it.split(":")[0].lstrip()
value = it.split(":")[1].lstrip()
ret[key] = value
mem_rets.append(ret)
return True, mem_rets
# read file
def get_version_config_info(attr_key, file_name=None):
if file_name is None:
version_conf_filename = "/root/version.json"
else:
version_conf_filename = file_name
if not os.path.isfile(version_conf_filename):
return None
with open(version_conf_filename) as rjconf_file:
for line in rjconf_file:
tokens = line.split('=')
if len(tokens) < 2:
continue
if tokens[0] == attr_key:
return tokens[1].strip()
return None
def io_rd(reg_addr, len =1):
u'''io read'''
try:
regaddr = 0
if type(reg_addr) == int:
regaddr = reg_addr
else:
regaddr = int(reg_addr, 16)
devfile = "/dev/port"
fd = os.open(devfile, os.O_RDWR|os.O_CREAT)
os.lseek(fd, regaddr, os.SEEK_SET)
str = os.read(fd, len)
return "".join(["%02x"% ord(item) for item in str])
except ValueError:
return None
except Exception as e:
print(e)
return None
finally:
os.close(fd)
def io_wr(reg_addr, reg_data):
u'''io write'''
try:
regdata = 0
regaddr = 0
if type(reg_addr) == int:
regaddr = reg_addr
else:
regaddr = int(reg_addr, 16)
if type(reg_data) == int:
regdata = reg_data
else:
regdata = int(reg_data, 16)
devfile = "/dev/port"
fd = os.open(devfile, os.O_RDWR|os.O_CREAT)
os.lseek(fd, regaddr, os.SEEK_SET)
os.write(fd, chr(regdata))
return True
except ValueError as e:
print(e)
return False
except Exception as e:
print(e)
return False
finally:
os.close(fd)