3b89e5d467
As part of consolidating all common Python-based functionality into the new sonic-py-common package, this pull request: 1. Redirects all Python applications/scripts in sonic-buildimage repo which previously imported sonic_device_util or sonic_daemon_base to instead import sonic-py-common, which was added in https://github.com/Azure/sonic-buildimage/pull/5003 2. Replaces all calls to `sonic_device_util.get_platform_info()` to instead call `sonic_py_common.get_platform()` and removes any calls to `sonic_device_util.get_machine_info()` which are no longer necessary (i.e., those which were only used to pass the results to `sonic_device_util.get_platform_info()`. 3. Removes unused imports to the now-deprecated sonic-daemon-base package and sonic_device_util.py module This is the next step toward resolving https://github.com/Azure/sonic-buildimage/issues/4999 Also reverted my previous change in which device_info.get_platform() would first try obtaining the platform ID string from Config DB and fall back to gathering it from machine.conf upon failure because this function is called by sonic-cfggen before the data is in the DB, in which case, the db_connect() call will hang indefinitely, which was not the behavior I expected. As of now, the function will always reference machine.conf.
123 lines
3.4 KiB
Python
123 lines
3.4 KiB
Python
#!/usr/bin/env python
|
|
|
|
import os
|
|
import struct
|
|
import subprocess
|
|
from mmap import *
|
|
|
|
from sonic_py_common import device_info
|
|
|
|
HOST_CHK_CMD = "docker > /dev/null 2>&1"
|
|
EMPTY_STRING = ""
|
|
|
|
|
|
class APIHelper():
|
|
|
|
def __init__(self):
|
|
(self.platform, self.hwsku) = device_info.get_platform_and_hwsku()
|
|
|
|
def is_host(self):
|
|
return os.system(HOST_CHK_CMD) == 0
|
|
|
|
def pci_get_value(self, resource, offset):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
fd = os.open(resource, os.O_RDWR)
|
|
mm = mmap(fd, 0)
|
|
mm.seek(int(offset))
|
|
read_data_stream = mm.read(4)
|
|
result = struct.unpack('I', read_data_stream)
|
|
except:
|
|
status = False
|
|
return status, result
|
|
|
|
def run_command(self, cmd):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
p = subprocess.Popen(
|
|
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
raw_data, err = p.communicate()
|
|
if err == '':
|
|
result = raw_data.strip()
|
|
except:
|
|
status = False
|
|
return status, result
|
|
|
|
def run_interactive_command(self, cmd):
|
|
try:
|
|
os.system(cmd)
|
|
except:
|
|
return False
|
|
return True
|
|
|
|
def read_txt_file(self, file_path):
|
|
try:
|
|
with open(file_path, 'r') as fd:
|
|
data = fd.read()
|
|
return data.strip()
|
|
except IOError:
|
|
pass
|
|
return None
|
|
|
|
def read_one_line_file(self, file_path):
|
|
try:
|
|
with open(file_path, 'r') as fd:
|
|
data = fd.readline()
|
|
return data.strip()
|
|
except IOError:
|
|
pass
|
|
return None
|
|
|
|
def ipmi_raw(self, netfn, cmd):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
cmd = "ipmitool raw {} {}".format(str(netfn), str(cmd))
|
|
p = subprocess.Popen(
|
|
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
raw_data, err = p.communicate()
|
|
if err == '':
|
|
result = raw_data.strip()
|
|
else:
|
|
status = False
|
|
except:
|
|
status = False
|
|
return status, result
|
|
|
|
def ipmi_fru_id(self, id, key=None):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
cmd = "ipmitool fru print {}".format(str(
|
|
id)) if not key else "ipmitool fru print {0} | grep '{1}' ".format(str(id), str(key))
|
|
|
|
p = subprocess.Popen(
|
|
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
raw_data, err = p.communicate()
|
|
if err == '':
|
|
result = raw_data.strip()
|
|
else:
|
|
status = False
|
|
except:
|
|
status = False
|
|
return status, result
|
|
|
|
def ipmi_set_ss_thres(self, id, threshold_key, value):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
cmd = "ipmitool sensor thresh '{}' {} {}".format(
|
|
str(id), str(threshold_key), str(value))
|
|
p = subprocess.Popen(
|
|
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
raw_data, err = p.communicate()
|
|
if err == '':
|
|
result = raw_data.strip()
|
|
else:
|
|
status = False
|
|
except:
|
|
status = False
|
|
return status, result
|