[device/celestica] Mitigation for command injection vulnerability (#11740)

Signed-off-by: maipbui <maibui@microsoft.com>
Dependency: [PR (#12065)](https://github.com/sonic-net/sonic-buildimage/pull/12065) needs to merge first.
#### Why I did it
1. `eval()` - not secure against maliciously constructed input, can be dangerous if used to evaluate dynamic content. This may be a code injection vulnerability.
2. `subprocess()` - when using with `shell=True` is dangerous. Using subprocess function without a static string can lead to command injection.
3. `os` - not secure against maliciously constructed input and dangerous if used to evaluate dynamic content.
4. `is` operator - string comparison should not be used with reference equality.
5. `globals()` - extremely dangerous because it may allow an attacker to execute arbitrary code on the system
#### How I did it
1. `eval()` - use `literal_eval()`
2. `subprocess()` - use `shell=False` instead. use an array string. Ref: [https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation](https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation)
3. `os` - use with `subprocess`
4. `is` - replace by `==` operator for value equality
5. `globals()` - avoid the use of globals()
This commit is contained in:
Mai Bui 2022-12-09 10:30:20 -05:00 committed by GitHub
parent d9eec94c18
commit 51a1eb112b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 158 additions and 302 deletions

View File

@ -24,7 +24,6 @@ NUM_COMPONENT = 3
RESET_REGISTER = "0x112"
HOST_REBOOT_CAUSE_PATH = "/host/reboot-cause/previous-reboot-cause.txt"
PMON_REBOOT_CAUSE_PATH = "/usr/share/sonic/platform/api_files/reboot-cause/previous-reboot-cause.txt"
HOST_CHK_CMD = "docker > /dev/null 2>&1"
STATUS_LED_PATH = "/sys/devices/platform/e1031.smc/master_led"

View File

@ -1,8 +1,8 @@
import os
import ast
import imp
import yaml
import subprocess
from sonic_py_common import device_info
@ -24,7 +24,7 @@ class Common:
SET_METHOD_IPMI = 'ipmitool'
NULL_VAL = 'N/A'
HOST_CHK_CMD = "docker > /dev/null 2>&1"
HOST_CHK_CMD = ["docker"]
REF_KEY = '$ref:'
def __init__(self, conf=None):
@ -46,8 +46,7 @@ class Common:
status = False
output = ""
try:
p = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if p.returncode == 0:
status, output = True, raw_data.strip()
@ -67,7 +66,7 @@ class Common:
cleaned_input = input_translator.get(input)
elif type(input_translator) is str:
cleaned_input = eval(input_translator.format(input))
cleaned_input = ast.literal_eval(input_translator.format(input))
return cleaned_input
@ -77,19 +76,12 @@ class Common:
if type(output_translator) is dict:
output = output_translator.get(output)
elif type(output_translator) is str:
output = eval(output_translator.format(output))
output = ast.literal_eval(output_translator.format(output))
elif type(output_translator) is list:
output = eval(output_translator[index].format(output))
output = ast.literal_eval(output_translator[index].format(output))
return output
def _ipmi_get(self, index, config):
argument = config.get('argument')
cmd = config['command'].format(
config['argument'][index]) if argument else config['command']
status, output = self.run_command(cmd)
return output if status else None
def _sysfs_read(self, index, config):
sysfs_path = config.get('sysfs_path')
argument = config.get('argument', '')
@ -132,10 +124,6 @@ class Common:
return False, output
return True, output
def _ipmi_set(self, index, config, input):
arg = config['argument'][index].format(input)
return self.run_command(config['command'].format(arg))
def _hex_ver_decode(self, hver, num_of_bits, num_of_points):
ver_list = []
c_bit = 0
@ -159,14 +147,16 @@ class Common:
return class_
def get_reg(self, path, reg_addr):
cmd = "echo {1} > {0}; cat {0}".format(path, reg_addr)
status, output = self.run_command(cmd)
return output if status else None
with open(path, 'w') as file:
file.write(reg_addr + '\n')
with open(path, 'r') as file:
output = file.readline().strip()
return output
def set_reg(self, path, reg_addr, value):
cmd = "echo {0} {1} > {2}".format(reg_addr, value, path)
status, output = self.run_command(cmd)
return output if status else None
with open(path, 'w') as file:
file.write("{0} {1}\n".format(reg_addr, value))
return None
def read_txt_file(self, path):
try:
@ -195,7 +185,11 @@ class Common:
return True
def is_host(self):
return os.system(self.HOST_CHK_CMD) == 0
try:
subprocess.call(self.HOST_CHK_CMD, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
return False
return True
def load_json_file(self, path):
"""
@ -221,87 +215,6 @@ class Common:
"""
return os.path.join(self.DEVICE_PATH, self.platform, self.CONFIG_DIR, config_name) if self.is_host() else os.path.join(self.PMON_PLATFORM_PATH, self.CONFIG_DIR, config_name)
def get_output(self, index, config, default):
"""
Retrieves the output for each function base on config
Args:
index: An integer containing the index of device.
config: A dict object containing the configuration of specified function.
default: A string containing the default output of specified function.
Returns:
A string containing the output of specified function in config
"""
output_source = config.get('output_source')
if output_source == self.OUTPUT_SOURCE_IPMI:
output = self._ipmi_get(index, config)
elif output_source == self.OUTPUT_SOURCE_GIVEN_VALUE:
output = config["value"]
elif output_source == self.OUTPUT_SOURCE_GIVEN_CLASS:
output = self._get_class(config)
elif output_source == self.OUTPUT_SOURCE_GIVEN_LIST:
output = config["value_list"][index]
elif output_source == self.OUTPUT_SOURCE_SYSFS:
output = self._sysfs_read(index, config)
elif output_source == self.OUTPUT_SOURCE_FUNC:
func_conf = self._main_conf[config['function'][index]]
output = self.get_output(index, func_conf, default)
elif output_source == self.OUTPUT_SOURCE_GIVEN_TXT_FILE:
path = config.get('path')
output = self.read_txt_file(path)
elif output_source == self.OUTPUT_SOURCE_GIVEN_VER_HEX_FILE:
path = config.get('path')
hex_ver = self.read_txt_file(path)
output = self._hex_ver_decode(
hex_ver, config['num_of_bits'], config['num_of_points'])
elif output_source == self.OUTPUT_SOURCE_GIVEN_VER_HEX_ADDR:
path = config.get('path')
addr = config.get('reg_addr')
hex_ver = self.get_reg(path, addr)
output = self._hex_ver_decode(
hex_ver, config['num_of_bits'], config['num_of_points'])
else:
output = default
return self._clean_output(index, output, config) or default
def set_output(self, index, input, config):
"""
Sets the output of specified function on config
Args:
config: A dict object containing the configuration of specified function.
index: An integer containing the index of device.
input: A string containing the input of specified function.
Returns:
bool: True if set function is successfully, False if not
"""
cleaned_input = self._clean_input(input, config)
if not cleaned_input:
return False
set_method = config.get('set_method')
if set_method == self.SET_METHOD_IPMI:
output = self._ipmi_set(index, config, cleaned_input)[0]
elif set_method == self.OUTPUT_SOURCE_SYSFS:
output = self._sysfs_write(index, config, cleaned_input)[0]
else:
output = False
return output
def get_event(self, timeout, config, sfp_list):
"""
Returns a nested dictionary containing all devices which have

View File

@ -10,7 +10,6 @@
try:
import os.path
import shutil
import shlex
import subprocess
from sonic_platform_base.component_base import ComponentBase
except ImportError as e:
@ -39,8 +38,7 @@ class Component(ComponentBase):
def __run_command(self, command):
# Run bash command and print output to stdout
try:
process = subprocess.Popen(
shlex.split(command), universal_newlines=True, stdout=subprocess.PIPE)
process = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE)
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
@ -63,12 +61,10 @@ class Component(ComponentBase):
def get_register_value(self, register):
# Retrieves the cpld register value
cmd = "echo {1} > {0}; cat {0}".format(GETREG_PATH, register)
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err is not '':
return None
with open(GETREG_PATH, 'w') as file:
file.write(register + '\n')
with open(GETREG_PATH, 'r') as file:
raw_data = file.readline()
return raw_data.strip()
def __get_cpld_version(self):
@ -76,11 +72,11 @@ class Component(ComponentBase):
cpld_version = dict()
with open(SMC_CPLD_PATH, 'r') as fd:
smc_cpld_version = fd.read()
smc_cpld_version = 'None' if smc_cpld_version is 'None' else "{}.{}".format(
smc_cpld_version = 'None' if smc_cpld_version == 'None' else "{}.{}".format(
int(smc_cpld_version[2], 16), int(smc_cpld_version[3], 16))
mmc_cpld_version = self.get_register_value(MMC_CPLD_ADDR)
mmc_cpld_version = 'None' if mmc_cpld_version is 'None' else "{}.{}".format(
mmc_cpld_version = 'None' if mmc_cpld_version == 'None' else "{}.{}".format(
int(mmc_cpld_version[2], 16), int(mmc_cpld_version[3], 16))
cpld_version["SMC_CPLD"] = smc_cpld_version
@ -159,7 +155,7 @@ class Component(ComponentBase):
ext = ".vme" if ext == "" else ext
new_image_path = os.path.join("/tmp", (root.lower() + ext))
shutil.copy(image_path, new_image_path)
install_command = "ispvm %s" % new_image_path
install_command = ["ispvm", str(new_image_path)]
# elif self.name == "BIOS":
# install_command = "afulnx_64 %s /p /b /n /x /r" % image_path
return self.__run_command(install_command)

View File

@ -73,6 +73,5 @@ class SwitchPolicyAction(ThermalPolicyActionBase):
thermal_overload_position = Common().read_txt_file(
thermal_overload_position_path)
cmd = 'bash /usr/share/sonic/platform/thermal_overload_control.sh {}'.format(
thermal_overload_position)
cmd = ['bash', '/usr/share/sonic/platform/thermal_overload_control.sh', thermal_overload_position]
Common().run_command(cmd)

View File

@ -6,7 +6,7 @@ from .thermal_infos import *
class ThermalManager(ThermalManagerBase):
FSC_ALGORITHM_CMD = ' supervisorctl {} fancontrol'
FSC_ALGORITHM_CMD = ['supervisorctl', '', 'fancontrol']
@classmethod
def start_thermal_control_algorithm(cls):
@ -43,5 +43,5 @@ class ThermalManager(ThermalManagerBase):
Returns:
bool: True if set success, False if fail.
"""
cmd = 'start' if enable else 'stop'
return Common().run_command(cls.FSC_ALGORITHM_CMD.format(cmd))
cls.FSC_ALGORITHM_CMD[1] = 'start' if enable else 'stop'
return Common().run_command(cls.FSC_ALGORITHM_CMD)

View File

@ -6,8 +6,6 @@
#
#############################################################################
import os.path
import subprocess
try:
from sonic_psu.psu_base import PsuBase

View File

@ -26,7 +26,6 @@ HOST_REBOOT_CAUSE_PATH = "/host/reboot-cause/"
REBOOT_CAUSE_FILE = "reboot-cause.txt"
PREV_REBOOT_CAUSE_FILE = "previous-reboot-cause.txt"
GETREG_PATH = "/sys/devices/platform/dx010_cpld/getreg"
HOST_CHK_CMD = "docker > /dev/null 2>&1"
STATUS_LED_PATH = "/sys/devices/platform/leds_dx010/leds/dx010:green:stat/brightness"

View File

@ -8,7 +8,6 @@
import os.path
import shutil
import subprocess
try:
from sonic_platform_base.component_base import ComponentBase
@ -52,12 +51,10 @@ class Component(ComponentBase):
def get_register_value(self, register):
# Retrieves the cpld register value
cmd = "echo {1} > {0}; cat {0}".format(GETREG_PATH, register)
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err is not '':
return None
with open(GETREG_PATH, 'w') as file:
file.write(register + '\n')
with open(GETREG_PATH, 'r') as file:
raw_data = file.readline()
return raw_data.strip()
def __get_cpld_version(self):
@ -146,11 +143,11 @@ class Component(ComponentBase):
ext = ".vme" if ext == "" else ext
new_image_path = os.path.join("/tmp", (root.lower() + ext))
shutil.copy(image_path, new_image_path)
install_command = "ispvm %s" % new_image_path
install_command = ["ispvm", str(new_image_path)]
# elif self.name == "BIOS":
# install_command = "afulnx_64 %s /p /b /n /x /r" % image_path
return self.__run_command(install_command)
return self._api_helper.run_command(install_command)
def update_firmware(self, image_path):

View File

@ -5,7 +5,7 @@ from mmap import *
from sonic_py_common import device_info
HOST_CHK_CMD = "docker > /dev/null 2>&1"
HOST_CHK_CMD = ["docker"]
EMPTY_STRING = ""
@ -15,7 +15,11 @@ class APIHelper():
(self.platform, self.hwsku) = device_info.get_platform_and_hwsku()
def is_host(self):
return os.system(HOST_CHK_CMD) == 0
try:
subprocess.call(HOST_CHK_CMD, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
return False
return True
def pci_get_value(self, resource, offset):
status = True
@ -35,7 +39,7 @@ class APIHelper():
result = ""
try:
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
@ -43,13 +47,6 @@ class APIHelper():
status = False
return status, result
def run_interactive_command(self, cmd):
try:
os.system(cmd)
except:
return False
return True
def read_txt_file(self, file_path):
try:
with open(file_path, 'r') as fd:
@ -77,57 +74,9 @@ class APIHelper():
return True
def get_cpld_reg_value(self, getreg_path, register):
cmd = "echo {1} > {0}; cat {0}".format(getreg_path, register)
status, result = self.run_command(cmd)
return result if status else None
with open(getreg_path, 'w') as file:
file.write(register + '\n')
with open(getreg_path, 'r') as file:
result = file.readline()
return result
def ipmi_raw(self, netfn, cmd):
status = True
result = ""
try:
cmd = "ipmitool raw {} {}".format(str(netfn), str(cmd))
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
else:
status = False
except:
status = False
return status, result
def ipmi_fru_id(self, id, key=None):
status = True
result = ""
try:
cmd = "ipmitool fru print {}".format(str(
id)) if not key else "ipmitool fru print {0} | grep '{1}' ".format(str(id), str(key))
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
else:
status = False
except:
status = False
return status, result
def ipmi_set_ss_thres(self, id, threshold_key, value):
status = True
result = ""
try:
cmd = "ipmitool sensor thresh '{}' {} {}".format(
str(id), str(threshold_key), str(value))
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
else:
status = False
except:
status = False
return status, result

View File

@ -73,6 +73,5 @@ class SwitchPolicyAction(ThermalPolicyActionBase):
thermal_overload_position = APIHelper().read_one_line_file(
thermal_overload_position_path)
cmd = 'bash /usr/share/sonic/platform/thermal_overload_control.sh {}'.format(
thermal_overload_position)
cmd = ['bash', '/usr/share/sonic/platform/thermal_overload_control.sh', str(thermal_overload_position)]
APIHelper().run_command(cmd)

View File

@ -5,7 +5,7 @@ from .thermal_conditions import *
from .thermal_infos import *
class ThermalManager(ThermalManagerBase):
FSC_ALGORITHM_CMD = 'service fancontrol {}'
FSC_ALGORITHM_CMD = ['service', 'fancontrol', '']
@classmethod
def start_thermal_control_algorithm(cls):
@ -42,5 +42,5 @@ class ThermalManager(ThermalManagerBase):
Returns:
bool: True if set success, False if fail.
"""
cmd = 'start' if enable else 'stop'
return APIHelper().run_command(cls.FSC_ALGORITHM_CMD.format(cmd))
cls.FSC_ALGORITHM_CMD[2] = 'start' if enable else 'stop'
return APIHelper().run_command(cls.FSC_ALGORITHM_CMD)

View File

@ -1,10 +1,9 @@
import os.path
import subprocess
import sys
import re
try:
from sonic_psu.psu_base import PsuBase
from sonic_py_common.general import getstatusoutput_noshell_pipe
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
@ -13,16 +12,16 @@ class PsuUtil(PsuBase):
"""Platform-specific PSUutil class"""
def __init__(self):
self.ipmi_sensor = "ipmitool sensor"
self.ipmi_sensor = ["ipmitool", "sensor"]
PsuBase.__init__(self)
def run_command(self, command):
proc = subprocess.Popen(command, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
(out, err) = proc.communicate()
if proc.returncode != 0:
sys.exit(proc.returncode)
def run_command(self, cmd1, cmd2):
exitcode, out = getstatusoutput_noshell_pipe(cmd1, cmd2)
i = 0
while i < 2:
if exitcode[i] != 0:
sys.exit(exitcode[i])
i += 1
return out
def find_value(self, grep_string):
@ -50,7 +49,8 @@ class PsuUtil(PsuBase):
return False
grep_key = "PSUL_Status" if index == 1 else "PSUR_Status"
grep_string = self.run_command(self.ipmi_sensor + ' | grep ' + grep_key)
grep_cmd = ["grep", grep_key]
grep_string = self.run_command(self.ipmi_sensor, grep_cmd)
status_byte = self.find_value(grep_string)
if status_byte is None:
@ -74,7 +74,8 @@ class PsuUtil(PsuBase):
return False
grep_key = "PSUL_Status" if index == 1 else "PSUR_Status"
grep_string = self.run_command(self.ipmi_sensor + ' | grep ' + grep_key)
grep_cmd = ["grep", grep_key]
grep_string = self.run_command(self.ipmi_sensor, grep_cmd)
status_byte = self.find_value(grep_string)
if status_byte is None:

View File

@ -1,4 +1,3 @@
import os.path
import subprocess
import sys
import re
@ -13,13 +12,13 @@ class PsuUtil(PsuBase):
"""Platform-specific PSUutil class"""
def __init__(self):
self.ipmi_raw = "docker exec -ti pmon ipmitool raw 0x4 0x2d"
self.ipmi_raw = ["docker", "exec", "-ti", "pmon", "ipmitool", "raw", "0x4", "0x2d", ""]
self.psu1_id = "0x2f"
self.psu2_id = "0x39"
PsuBase.__init__(self)
def run_command(self, command):
proc = subprocess.Popen(command, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
proc = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE)
(out, err) = proc.communicate()
if proc.returncode != 0:
@ -52,7 +51,8 @@ class PsuUtil(PsuBase):
return False
psu_id = self.psu1_id if index == 1 else self.psu2_id
res_string = self.run_command(self.ipmi_raw + ' ' + psu_id)
self.ipmi_raw[8] = psu_id
res_string = self.run_command(self.ipmi_raw)
status_byte = self.find_value(res_string)
if status_byte is None:
@ -76,7 +76,8 @@ class PsuUtil(PsuBase):
return False
psu_id = self.psu1_id if index == 1 else self.psu2_id
res_string = self.run_command(self.ipmi_raw + ' ' + psu_id)
self.ipmi_raw[8] = psu_id
res_string = self.run_command(self.ipmi_raw)
status_byte = self.find_value(res_string)
if status_byte is None:

View File

@ -6,7 +6,6 @@
#
#############################################################################
import json
import os.path
try:
@ -24,14 +23,18 @@ COMPONENT_LIST = [
]
SW_CPLD_VER_PATH = "/sys/module/switch_cpld/version"
BASE_CPLD_VER_PATH = "/sys/module/baseboard_lpc/version"
CPLD_UPGRADE_OPT = 4
BIOS_VER_PATH = "/sys/class/dmi/id/bios_version"
BIOS__UPGRADE_OPT = 2
BMC_VER_CMD = "ipmitool mc info | grep 'Firmware Revision'"
BMC_UPGRADE_OPT = 1
CFUFLASH_FW_UPGRADE_CMD = "CFUFLASH -cd -d {} -mse 3 {}"
BMC_VER_CMD1 = ["ipmitool", "mc", "info"]
BMC_VER_CMD2 = ["grep", "Firmware Revision"]
CFUFLASH_FW_UPGRADE_CMD = ["CFUFLASH", "-cd", "-d", "", "-mse", "3", ""]
MEM_PCI_RESOURCE = "/sys/bus/pci/devices/0000:09:00.0/resource0"
FPGA_VER_MEM_OFFSET = 0
UPGRADE_OPT = {
'BMC': '1',
'BIOS': '2',
'SWITCH_CPLD': '4',
'BASE_CPLD': '4'
}
class Component(ComponentBase):
@ -47,7 +50,7 @@ class Component(ComponentBase):
def __get_bmc_ver(self):
bmc_ver = "Unknown"
status, raw_bmc_data = self._api_helper.run_command(BMC_VER_CMD)
status, raw_bmc_data = self._api_helper.run_command(BMC_VER_CMD1, BMC_VER_CMD2)
if status:
bmc_ver_data = raw_bmc_data.split(":")
bmc_ver = bmc_ver_data[-1].strip() if len(
@ -104,16 +107,12 @@ class Component(ComponentBase):
Returns:
A boolean, True if install successfully, False if not
"""
install_command = {
"BMC": CFUFLASH_FW_UPGRADE_CMD.format(BMC_UPGRADE_OPT, image_path),
"BIOS": CFUFLASH_FW_UPGRADE_CMD.format(BIOS__UPGRADE_OPT, image_path),
"SWITCH_CPLD": CFUFLASH_FW_UPGRADE_CMD.format(CPLD_UPGRADE_OPT, image_path),
"BASE_CPLD": CFUFLASH_FW_UPGRADE_CMD.format(CPLD_UPGRADE_OPT, image_path)
}.get(self.name, None)
CFUFLASH_FW_UPGRADE_CMD[3] = UPGRADE_OPT.get(self.name)
CFUFLASH_FW_UPGRADE_CMD[6] = image_path
if not os.path.isfile(image_path) or install_command is None:
if not os.path.isfile(image_path):
return False
# print(install_command)
status = self._api_helper.run_interactive_command(install_command)
status = self._api_helper.run_interactive_command(CFUFLASH_FW_UPGRADE_CMD)
return status

View File

@ -2,8 +2,9 @@ import os
import struct
import subprocess
from mmap import *
from sonic_py_common.general import check_output_pipe
HOST_CHK_CMD = "docker > /dev/null 2>&1"
HOST_CHK_CMD = ["docker"]
EMPTY_STRING = ""
@ -13,7 +14,11 @@ class APIHelper():
pass
def is_host(self):
return os.system(HOST_CHK_CMD) == 0
try:
subprocess.call(HOST_CHK_CMD, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
return False
return True
def pci_get_value(self, resource, offset):
status = True
@ -28,22 +33,18 @@ class APIHelper():
status = False
return status, result
def run_command(self, cmd):
def run_command(self, cmd1_args, cmd2_args):
status = True
result = ""
try:
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
except:
result = check_output_pipe(cmd1_args, cmd2_args)
except subprocess.CalledProcessError:
status = False
return status, result
def run_interactive_command(self, cmd):
try:
os.system(cmd)
subprocess.call(cmd)
except:
return False
return True
@ -61,9 +62,9 @@ class APIHelper():
status = True
result = ""
try:
cmd = "ipmitool raw {} {}".format(str(netfn), str(cmd))
cmd = ["ipmitool", "raw", str(netfn), str(cmd)]
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
@ -76,28 +77,30 @@ class APIHelper():
def ipmi_fru_id(self, id, key=None):
status = True
result = ""
try:
cmd = "ipmitool fru print {}".format(str(
id)) if not key else "ipmitool fru print {0} | grep '{1}' ".format(str(id), str(key))
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
else:
cmd1_args = ["ipmitool", "fru", "print", str(id)]
if not key:
try:
p = subprocess.Popen(
cmd1_args, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()
else:
status = False
except:
status = False
except:
status = False
else:
cmd2_args = ["grep", str(key)]
status, result = self.run_command(cmd1_args, cmd2_args)
return status, result
def ipmi_set_ss_thres(self, id, threshold_key, value):
status = True
result = ""
try:
cmd = "ipmitool sensor thresh '{}' {} {}".format(str(id), str(threshold_key), str(value))
cmd = ["ipmitool", "sensor", "thresh", str(id), str(threshold_key), str(value)]
p = subprocess.Popen(
cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
raw_data, err = p.communicate()
if err == '':
result = raw_data.strip()

View File

@ -33,20 +33,15 @@ PSU_LED_OFF_CMD = "0x00"
PSU_LED_GREEN_CMD = "0x01"
PSU_LED_AMBER_CMD = "0x02"
PSU1_VOUT_SS_ID = "0x36"
PSU1_COUT_SS_ID = "0x37"
PSU1_POUT_SS_ID = "0x38"
PSU1_STATUS_REG = "0x39"
PSU2_VOUT_SS_ID = "0x40"
PSU2_COUT_SS_ID = "0x41"
PSU2_POUT_SS_ID = "0x42"
PSU2_STATUS_REG = "0x2f"
PSU1_FRU_ID = 3
SS_READ_OFFSET = 0
PSU_VOUT_SS_ID = ["0x36", "0x40"]
PSU_COUT_SS_ID = ["0x37", "0x41"]
PSU_POUT_SS_ID = ["0x38", "0x42"]
PSU_STATUS_REG = ["0x39", "0x2f"]
class Psu(PsuBase):
"""Platform-specific Psu class"""
@ -71,7 +66,7 @@ class Psu(PsuBase):
e.g. 12.1
"""
psu_voltage = 0.0
psu_vout_key = globals()['PSU{}_VOUT_SS_ID'.format(self.index+1)]
psu_vout_key = PSU_VOUT_SS_ID[self.index]
status, raw_ss_read = self._api_helper.ipmi_raw(
IPMI_SENSOR_NETFN, IPMI_SS_READ_CMD.format(psu_vout_key))
ss_read = raw_ss_read.split()[SS_READ_OFFSET]
@ -87,7 +82,7 @@ class Psu(PsuBase):
A float number, the electric current in amperes, e.g 15.4
"""
psu_current = 0.0
psu_cout_key = globals()['PSU{}_COUT_SS_ID'.format(self.index+1)]
psu_cout_key = PSU_COUT_SS_ID[self.index]
status, raw_ss_read = self._api_helper.ipmi_raw(
IPMI_SENSOR_NETFN, IPMI_SS_READ_CMD.format(psu_cout_key))
ss_read = raw_ss_read.split()[SS_READ_OFFSET]
@ -103,7 +98,7 @@ class Psu(PsuBase):
A float number, the power in watts, e.g. 302.6
"""
psu_power = 0.0
psu_pout_key = globals()['PSU{}_POUT_SS_ID'.format(self.index+1)]
psu_pout_key = PSU_POUT_SS_ID[self.index]
status, raw_ss_read = self._api_helper.ipmi_raw(
IPMI_SENSOR_NETFN, IPMI_SS_READ_CMD.format(psu_pout_key))
ss_read = raw_ss_read.split()[SS_READ_OFFSET]
@ -176,7 +171,7 @@ class Psu(PsuBase):
bool: True if PSU is present, False if not
"""
psu_presence = False
psu_pstatus_key = globals()['PSU{}_STATUS_REG'.format(self.index+1)]
psu_pstatus_key = PSU_STATUS_REG[self.index]
status, raw_status_read = self._api_helper.ipmi_raw(
IPMI_SENSOR_NETFN, IPMI_SS_READ_CMD.format(psu_pstatus_key))
status_byte = self.find_value(raw_status_read)
@ -228,7 +223,7 @@ class Psu(PsuBase):
A boolean value, True if device is operating properly, False if not
"""
psu_status = False
psu_pstatus_key = globals()['PSU{}_STATUS_REG'.format(self.index+1)]
psu_pstatus_key = PSU_STATUS_REG[self.index]
status, raw_status_read = self._api_helper.ipmi_raw(
IPMI_SENSOR_NETFN, IPMI_SS_READ_CMD.format(psu_pstatus_key))
status_byte = self.find_value(raw_status_read)

View File

@ -6,7 +6,6 @@
#
#############################################################################
import os
import time
import subprocess
from ctypes import create_string_buffer
@ -168,7 +167,7 @@ class Sfp(SfpBase):
# Path to QSFP sysfs
PLATFORM_ROOT_PATH = "/usr/share/sonic/device"
PMON_HWSKU_PATH = "/usr/share/sonic/hwsku"
HOST_CHK_CMD = "docker > /dev/null 2>&1"
HOST_CHK_CMD = ["docker"]
PLATFORM = "x86_64-cel_silverstone-r0"
HWSKU = "Silverstone"
@ -270,7 +269,11 @@ class Sfp(SfpBase):
return 'N/A'
def __is_host(self):
return os.system(self.HOST_CHK_CMD) == 0
try:
subprocess.call(self.HOST_CHK_CMD, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
return False
return True
def __get_path_to_port_config_file(self):
platform_path = "/".join([self.PLATFORM_ROOT_PATH, self.PLATFORM])

View File

@ -8,19 +8,18 @@
#
#############################################################################
import os.path
import subprocess
import time
import os
try:
from sonic_platform_base.component_base import ComponentBase
from sonic_py_common.general import getstatusoutput_noshell_pipe
#from helper import APIHelper
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
SWCPLD_VERSION_PATH = "i2cget -y -f 2 0x32 0"
BIOS_VERSION_PATH = "dmidecode -t bios | grep Version"
SWCPLD_VERSION_PATH = ["i2cget", "-y", "-f", "2", "0x32", "0"]
BIOS_VERSION_PATH_CMD1 = ["dmidecode", "-t", "bios"]
BIOS_VERSION_PATH_CMD2 = ["grep", "Version"]
COMPONENT_NAME_LIST = ["SWCPLD", "Main_BIOS", "Backup_BIOS"]
COMPONENT_DES_LIST = ["Use for boot control and BIOS switch",
"Main basic Input/Output System",
@ -39,15 +38,15 @@ class Component(ComponentBase):
self.name = self.get_name()
def run_command(self,cmd):
responses = os.popen(cmd).read()
responses = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True).stdout.read()
return responses
def __get_bios_version(self):
# Retrieves the BIOS firmware version
result = self.run_command("i2cget -y -f 2 0x32 0x19")
result = self.run_command(["i2cget", "-y", "-f", "2", "0x32", "0x19"])
if result.strip() == "0x01":
if self.name == "Main_BIOS":
version = self.run_command(BIOS_VERSION_PATH)
_, version = getstatusoutput_noshell_pipe(BIOS_VERSION_PATH_CMD1, BIOS_VERSION_PATH_CMD2)
bios_version = version.strip().split(" ")[1]
return str(bios_version)
elif self.name == "Backup_BIOS":
@ -56,7 +55,7 @@ class Component(ComponentBase):
elif result.strip() == "0x03":
if self.name == "Backup_BIOS":
version = self.run_command(BIOS_VERSION_PATH)
_, version = getstatusoutput_noshell_pipe(BIOS_VERSION_PATH_CMD1, BIOS_VERSION_PATH_CMD2)
bios_version = version.strip().split(" ")[1]
return str(bios_version)
elif self.name == "Main_BIOS":

View File

@ -1,6 +1,6 @@
try:
from sonic_platform_pddf_base.pddf_fan import PddfFan
import os
import subprocess
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
# ------------------------------------------------------------------
@ -47,8 +47,9 @@ class Fan(PddfFan):
"""
if self.is_psu_fan:
cmd_num = "58" if self.fans_psu_index == 1 else "59"
cmd = "i2cget -y -f 4 0x%s 0x80" % cmd_num
res = os.popen(cmd).read()
cmd = ["i2cget", "-y", "-f", "4", "", "0x80"]
cmd[4] = "0x" + cmd_num
res = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True).stdout.read()
# F2B
if res.strip() == "0x01":
direction = "EXHAUST"

View File

@ -23,13 +23,13 @@
try:
import sys
import getopt
import subprocess
import logging
import logging.config
import time # this is only being used as part of the example
import signal
import math
from sonic_platform import platform
from sonic_py_common.general import getstatusoutput_noshell
except ImportError as e:
raise ImportError('%s - required module not found' % str(e))
@ -224,7 +224,7 @@ def handler(signum, frame):
else:
logging.debug('INFO: FAIL. set_fan_duty_cycle (%d)' % DUTY_MAX)
# Enable the CPLD Heartbeat back
status, output = subprocess.getstatusoutput('i2cset -f -y 75 0x40 0x22 0x00')
status, output = getstatusoutput_noshell(["i2cset", "-f", "-y", "75", "0x40", "0x22", "0x00"])
if status == 0:
logging.debug('INFO: CPLD Heartbeat check is enabled back')
sys.exit(0)
@ -258,7 +258,7 @@ def main(argv):
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
# Disaable the CPLD Heartbeat check to control Fan speed from CPU via ADT7470
subprocess.getstatusoutput('i2cset -f -y 2 0x32 0x30 0x01')
getstatusoutput_noshell(['i2cset', '-f', '-y', '2', '0x32', '0x30', '0x01'])
monitor = cel_belgite_monitor(log_file, log_level)

View File

@ -1,4 +1,5 @@
import os
import ast
import imp
import yaml
import subprocess
@ -24,7 +25,7 @@ class Common:
SET_METHOD_IPMI = 'ipmitool'
NULL_VAL = 'N/A'
HOST_CHK_CMD = "docker > /dev/null 2>&1"
HOST_CHK_CMD = ["docker"]
REF_KEY = '$ref:'
def __init__(self, conf=None):
@ -56,7 +57,7 @@ class Common:
cleaned_input = input_translator.get(input)
elif type(input_translator) is str:
cleaned_input = eval(input_translator.format(input))
cleaned_input = ast.literal_eval(input_translator.format(input))
return cleaned_input
@ -66,9 +67,9 @@ class Common:
if type(output_translator) is dict:
output = output_translator.get(output)
elif type(output_translator) is str:
output = eval(output_translator.format(output))
output = ast.literal_eval(output_translator.format(output))
elif type(output_translator) is list:
output = eval(output_translator[index].format(output))
output = ast.literal_eval(output_translator[index].format(output))
return output
@ -166,7 +167,11 @@ class Common:
return True
def is_host(self):
return os.system(self.HOST_CHK_CMD) == 0
try:
subprocess.call(self.HOST_CHK_CMD, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except FileNotFoundError:
return False
return True
def load_json_file(self, path):
"""

View File

@ -12,7 +12,7 @@ import sys
import logging
import subprocess
IPMI_SDR_CMD = "ipmitool sdr elist"
IPMI_SDR_CMD = ["ipmitool", "sdr", "elist"]
MAX_NUM_FANS = 7
MAX_NUM_PSUS = 2
@ -23,7 +23,7 @@ def ipmi_sensor_dump(cmd):
'''
sensor_dump = ''
try:
sensor_dump = subprocess.check_output(cmd, shell=True)
sensor_dump = subprocess.check_output(cmd)
except subprocess.CalledProcessError as e:
logging.error('Error! Failed to execute: {}'.format(cmd))
sys.exit(1)