[device/mellanox] Mitigation for security vulnerability (#11877)

Signed-off-by: maipbui <maibui@microsoft.com>
Dependency: [PR (#12065)](https://github.com/sonic-net/sonic-buildimage/pull/12065) needs to merge first.
#### Why I did it
`subprocess.Popen()` and `subprocess.check_output()` is used with `shell=True`, which is very dangerous for shell injection.
#### How I did it
Disable `shell=True`, enable `shell=False`
#### How to verify it
Tested on DUT, compare and verify the output between the original behavior and the new changes' behavior.
[testresults.zip](https://github.com/sonic-net/sonic-buildimage/files/9550867/testresults.zip)
This commit is contained in:
Mai Bui 2022-10-06 14:51:31 -07:00 committed by GitHub
parent 1ad1e19733
commit 648ca075c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 110 additions and 127 deletions

View File

@ -44,7 +44,7 @@ class FanUtil(FanBase):
PWM_MAX = 255
MAX_FAN_PER_DRAWER = 2
GET_HWSKU_CMD = "sonic-cfggen -d -v DEVICE_METADATA.localhost.hwsku"
GET_HWSKU_CMD = ["sonic-cfggen", "-d", "-v", "DEVICE_METADATA.localhost.hwsku"]
sku_without_fan_direction = ['ACS-MSN2010', 'ACS-MSN2100', 'ACS-MSN2410',
'ACS-MSN2700', 'Mellanox-SN2700', 'Mellanox-SN2700-D48C8', 'LS-SN2700', 'ACS-MSN2740']
sku_with_unpluggable_fan = ['ACS-MSN2010', 'ACS-MSN2100']
@ -72,7 +72,7 @@ class FanUtil(FanBase):
self.num_of_fan, self.num_of_drawer = self._extract_num_of_fans_and_fan_drawers()
def _get_sku_name(self):
p = subprocess.Popen(self.GET_HWSKU_CMD, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
p = subprocess.Popen(self.GET_HWSKU_CMD, universal_newlines=True, stdout=subprocess.PIPE)
out, err = p.communicate()
return out.rstrip('\n')

View File

@ -42,7 +42,7 @@ class PsuUtil(PsuBase):
MAX_PSU_FAN = 1
MAX_NUM_PSU = 2
GET_HWSKU_CMD = "sonic-cfggen -d -v DEVICE_METADATA.localhost.hwsku"
GET_HWSKU_CMD = ["sonic-cfggen", "-d", "-v", "DEVICE_METADATA.localhost.hwsku"]
# for spectrum1 switches with plugable PSUs, the output voltage file is psuX_volt
# for spectrum2 switches the output voltage file is psuX_volt_out2
sku_spectrum1_with_plugable_psu = ['ACS-MSN2410', 'ACS-MSN2700',
@ -65,7 +65,7 @@ class PsuUtil(PsuBase):
self.fan_speed = "thermal/psu{}_fan1_speed_get"
def _get_sku_name(self):
p = subprocess.Popen(self.GET_HWSKU_CMD, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
p = subprocess.Popen(self.GET_HWSKU_CMD, universal_newlines=True, stdout=subprocess.PIPE)
out, err = p.communicate()
return out.rstrip('\n')

View File

@ -48,7 +48,7 @@ SYSTEM_NOT_READY = 'system_not_ready'
SYSTEM_READY = 'system_become_ready'
SYSTEM_FAIL = 'system_fail'
GET_PLATFORM_CMD = "sonic-cfggen -d -v DEVICE_METADATA.localhost.platform"
GET_PLATFORM_CMD = ["sonic-cfggen", "-d", "-v", "DEVICE_METADATA.localhost.platform"]
# Ethernet<n> <=> sfp<n+SFP_PORT_NAME_OFFSET>
SFP_PORT_NAME_OFFSET = 0
@ -110,7 +110,7 @@ class SfpUtil(SfpUtilBase):
raise Exception()
def get_port_position_tuple_by_platform_name(self):
p = subprocess.Popen(GET_PLATFORM_CMD, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
p = subprocess.Popen(GET_PLATFORM_CMD, universal_newlines=True, stdout=subprocess.PIPE)
out, err = p.communicate()
position_tuple = port_position_tuple_list[platform_dict[out.rstrip('\n')]]
return position_tuple
@ -136,9 +136,9 @@ class SfpUtil(SfpUtilBase):
port_num += SFP_PORT_NAME_OFFSET
sfpname = SFP_PORT_NAME_CONVENTION.format(port_num)
ethtool_cmd = "ethtool -m {} 2>/dev/null".format(sfpname)
ethtool_cmd = ["ethtool", "-m", sfpname]
try:
proc = subprocess.Popen(ethtool_cmd, stdout=subprocess.PIPE, shell=True, universal_newlines=True, stderr=subprocess.STDOUT)
proc = subprocess.Popen(ethtool_cmd, stdout=subprocess.PIPE, universal_newlines=True, stderr=subprocess.DEVNULL)
stdout = proc.communicate()[0]
proc.wait()
result = stdout.rstrip('\n')
@ -155,10 +155,10 @@ class SfpUtil(SfpUtilBase):
if port_num < self.port_start or port_num > self.port_end:
return False
lpm_cmd = "docker exec syncd python /usr/share/sonic/platform/plugins/sfplpmget.py {}".format(port_num)
lpm_cmd = ["docker", "exec", "syncd", "python", "/usr/share/sonic/platform/plugins/sfplpmget.py", str(port_num)]
try:
output = subprocess.check_output(lpm_cmd, shell=True, universal_newlines=True)
output = subprocess.check_output(lpm_cmd, universal_newlines=True)
if 'LPM ON' in output:
return True
except subprocess.CalledProcessError as e:
@ -178,11 +178,11 @@ class SfpUtil(SfpUtilBase):
# Compose LPM command
lpm = 'on' if lpmode else 'off'
lpm_cmd = "docker exec syncd python /usr/share/sonic/platform/plugins/sfplpmset.py {} {}".format(port_num, lpm)
lpm_cmd = ["docker", "exec", "syncd", "python", "/usr/share/sonic/platform/plugins/sfplpmset.py", str(port_num), lpm]
# Set LPM
try:
subprocess.check_output(lpm_cmd, shell=True, universal_newlines=True)
subprocess.check_output(lpm_cmd, universal_newlines=True)
except subprocess.CalledProcessError as e:
print("Error! Unable to set LPM for {}, rc = {}, err msg: {}".format(port_num, e.returncode, e.output))
return False
@ -194,10 +194,10 @@ class SfpUtil(SfpUtilBase):
if port_num < self.port_start or port_num > self.port_end:
return False
lpm_cmd = "docker exec syncd python /usr/share/sonic/platform/plugins/sfpreset.py {}".format(port_num)
lpm_cmd = ["docker", "exec", "syncd", "python", "/usr/share/sonic/platform/plugins/sfpreset.py", str(port_num)]
try:
subprocess.check_output(lpm_cmd, shell=True, universal_newlines=True)
subprocess.check_output(lpm_cmd, universal_newlines=True)
return True
except subprocess.CalledProcessError as e:
print("Error! Unable to set LPM for {}, rc = {}, err msg: {}".format(port_num, e.returncode, e.output))
@ -267,9 +267,9 @@ class SfpUtil(SfpUtilBase):
sfpname = SFP_PORT_NAME_CONVENTION.format(port_num)
eeprom_raw = []
ethtool_cmd = "ethtool -m {} hex on offset {} length {}".format(sfpname, offset, num_bytes)
ethtool_cmd = ["ethtool", "-m", sfpname, "hex", "on", "offset", str(offset), "length", str(num_bytes)]
try:
output = subprocess.check_output(ethtool_cmd, shell=True, universal_newlines=True)
output = subprocess.check_output(ethtool_cmd, universal_newlines=True)
output_lines = output.splitlines()
first_line_raw = output_lines[0]
if "Offset" in first_line_raw:

View File

@ -375,12 +375,12 @@ class ThermalUtil(ThermalBase):
MAX_PSU_FAN = 1
MAX_NUM_PSU = 2
GET_HWSKU_CMD = "sonic-cfggen -d -v DEVICE_METADATA.localhost.hwsku"
GET_HWSKU_CMD = ["sonic-cfggen", "-d", "-v", "DEVICE_METADATA.localhost.hwsku"]
number_of_thermals = 0
thermal_list = []
def _get_sku_name(self):
p = subprocess.Popen(self.GET_HWSKU_CMD, shell=True, universal_newlines=True, stdout=subprocess.PIPE)
p = subprocess.Popen(self.GET_HWSKU_CMD, universal_newlines=True, stdout=subprocess.PIPE)
out, err = p.communicate()
return out.rstrip('\n')

View File

@ -30,6 +30,7 @@ try:
import tempfile
import subprocess
from sonic_py_common import device_info
from sonic_py_common.general import check_output_pipe
if sys.version_info[0] > 2:
import configparser
else:
@ -52,8 +53,8 @@ except ImportError as e:
class MPFAManager(object):
MPFA_EXTENSION = '.mpfa'
MPFA_EXTRACT_COMMAND = 'tar xzf {} -C {}'
MPFA_CLEANUP_COMMAND = 'rm -rf {}'
MPFA_EXTRACT_COMMAND = ['tar', 'xzf', '', '-C', '']
MPFA_CLEANUP_COMMAND = ['rm', '-rf', '']
def __init__(self, mpfa_path):
self.__mpfa_path = mpfa_path
@ -78,8 +79,9 @@ class MPFAManager(object):
def __extract_contents(self, mpfa_path):
contents_path = tempfile.mkdtemp(prefix='mpfa-')
cmd = self.MPFA_EXTRACT_COMMAND.format(mpfa_path, contents_path)
subprocess.check_call(cmd.split(), universal_newlines=True)
self.MPFA_EXTRACT_COMMAND[2] = mpfa_path
self.MPFA_EXTRACT_COMMAND[4] = contents_path
subprocess.check_call(self.MPFA_EXTRACT_COMMAND, universal_newlines=True)
self.__contents_path = contents_path
@ -105,8 +107,8 @@ class MPFAManager(object):
def cleanup(self):
if os.path.exists(self.__contents_path):
cmd = self.MPFA_CLEANUP_COMMAND.format(self.__contents_path)
subprocess.check_call(cmd.split(), universal_newlines=True)
self.MPFA_CLEANUP_COMMAND[2] = self.__contents_path
subprocess.check_call(self.MPFA_CLEANUP_COMMAND, universal_newlines=True)
self.__contents_path = None
self.__metadata = None
@ -122,11 +124,11 @@ class MPFAManager(object):
class ONIEUpdater(object):
ONIE_FW_UPDATE_CMD_ADD = '/usr/bin/mlnx-onie-fw-update.sh add {}'
ONIE_FW_UPDATE_CMD_REMOVE = '/usr/bin/mlnx-onie-fw-update.sh remove {}'
ONIE_FW_UPDATE_CMD_UPDATE = '/usr/bin/mlnx-onie-fw-update.sh update'
ONIE_FW_UPDATE_CMD_INSTALL = '/usr/bin/mlnx-onie-fw-update.sh update --no-reboot'
ONIE_FW_UPDATE_CMD_SHOW_PENDING = '/usr/bin/mlnx-onie-fw-update.sh show-pending'
ONIE_FW_UPDATE_CMD_ADD = ['/usr/bin/mlnx-onie-fw-update.sh', 'add', '']
ONIE_FW_UPDATE_CMD_REMOVE = ['/usr/bin/mlnx-onie-fw-update.sh', 'remove', '']
ONIE_FW_UPDATE_CMD_UPDATE = ['/usr/bin/mlnx-onie-fw-update.sh', 'update']
ONIE_FW_UPDATE_CMD_INSTALL = ['/usr/bin/mlnx-onie-fw-update.sh', 'update', '--no-reboot']
ONIE_FW_UPDATE_CMD_SHOW_PENDING = ['/usr/bin/mlnx-onie-fw-update.sh', 'show-pending']
ONIE_VERSION_PARSE_PATTERN = '([0-9]{4})\.([0-9]{2})-([0-9]+)\.([0-9]+)\.([0-9]+)-([0-9]+)'
ONIE_VERSION_BASE_PARSE_PATTERN = '([0-9]+)\.([0-9]+)\.([0-9]+)'
@ -135,7 +137,7 @@ class ONIEUpdater(object):
ONIE_VERSION_ATTR = 'onie_version'
ONIE_NO_PENDING_UPDATES_ATTR = 'No pending firmware updates present'
ONIE_IMAGE_INFO_COMMAND = '/bin/bash {} -q -i'
ONIE_IMAGE_INFO_COMMAND = ['/bin/bash', '', '-q', '-i']
# Upgrading fireware from ONIE is not supported from the beginning on some platforms, like SN2700.
# There is a logic to check the ONIE version in order to know whether it is supported.
@ -167,14 +169,14 @@ class ONIEUpdater(object):
self.__umount_onie_fs()
cmd = "fdisk -l | grep 'ONIE boot' | awk '{print $1}'"
fs_path = subprocess.check_output(cmd,
stderr=subprocess.STDOUT,
shell=True,
universal_newlines=True).rstrip('\n')
cmd1 = ['fdisk', '-l']
cmd2 = ['grep', 'ONIE boot']
cmd3 = ['awk', '{print $1}']
fs_path = check_output_pipe(cmd1, cmd2, cmd3)
os.mkdir(fs_mountpoint)
cmd = "mount -n -r -t ext4 {} {}".format(fs_path, fs_mountpoint)
subprocess.check_call(cmd, shell=True, universal_newlines=True)
cmd = ["mount", "-n", "-r", "-t", "ext4", fs_path, fs_mountpoint]
subprocess.check_call(cmd, universal_newlines=True)
fs_onie_path = os.path.join(fs_mountpoint, 'onie/tools/lib/onie')
os.symlink(fs_onie_path, onie_path)
@ -189,8 +191,8 @@ class ONIEUpdater(object):
os.unlink(onie_path)
if os.path.ismount(fs_mountpoint):
cmd = "umount -rf {}".format(fs_mountpoint)
subprocess.check_call(cmd, shell=True, universal_newlines=True)
cmd = ["umount", "-rf", fs_mountpoint]
subprocess.check_call(cmd, universal_newlines=True)
if os.path.exists(fs_mountpoint):
os.rmdir(fs_mountpoint)
@ -198,20 +200,20 @@ class ONIEUpdater(object):
def __stage_update(self, image_path):
rename_path = self.__add_prefix(image_path)
cmd = self.ONIE_FW_UPDATE_CMD_ADD.format(rename_path)
self.ONIE_FW_UPDATE_CMD_ADD[2] = rename_path
try:
subprocess.check_call(cmd.split(), universal_newlines=True)
subprocess.check_call(self.ONIE_FW_UPDATE_CMD_ADD, universal_newlines=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to stage firmware update: {}".format(str(e)))
def __unstage_update(self, image_path):
rename_path = self.__add_prefix(image_path)
cmd = self.ONIE_FW_UPDATE_CMD_REMOVE.format(os.path.basename(rename_path))
self.ONIE_FW_UPDATE_CMD_REMOVE[2] = os.path.basename(rename_path)
try:
subprocess.check_call(cmd.split(), universal_newlines=True)
subprocess.check_call(self.ONIE_FW_UPDATE_CMD_REMOVE, universal_newlines=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to unstage firmware update: {}".format(str(e)))
@ -222,7 +224,7 @@ class ONIEUpdater(object):
cmd = self.ONIE_FW_UPDATE_CMD_INSTALL
try:
subprocess.check_call(cmd.split(), universal_newlines=True)
subprocess.check_call(cmd, universal_newlines=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to trigger firmware update: {}".format(str(e)))
@ -230,7 +232,7 @@ class ONIEUpdater(object):
cmd = self.ONIE_FW_UPDATE_CMD_SHOW_PENDING
try:
output = subprocess.check_output(cmd.split(),
output = subprocess.check_output(cmd,
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
@ -315,10 +317,10 @@ class ONIEUpdater(object):
try:
self.__mount_onie_fs()
cmd = self.ONIE_IMAGE_INFO_COMMAND.format(image_path)
self.ONIE_IMAGE_INFO_COMMAND[1] = image_path
try:
output = subprocess.check_output(cmd.split(),
output = subprocess.check_output(self.ONIE_IMAGE_INFO_COMMAND,
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
@ -413,25 +415,6 @@ class Component(ComponentBase):
return result
@staticmethod
def _get_command_result(cmdline):
try:
proc = subprocess.Popen(cmdline,
stdout=subprocess.PIPE,
shell=True,
stderr=subprocess.STDOUT,
universal_newlines=True)
stdout = proc.communicate()[0]
rc = proc.wait()
result = stdout.rstrip('\n')
if rc != 0:
raise RuntimeError("Failed to execute command {}, return code {}, message {}".format(cmdline, rc, stdout))
except OSError as e:
raise RuntimeError("Failed to execute command {} due to {}".format(cmdline, repr(e)))
return result
def _check_file_validity(self, image_path):
if not os.path.isfile(image_path):
print("ERROR: File {} doesn't exist or is not a file".format(image_path))
@ -502,10 +485,10 @@ class ComponentSSD(Component):
POWER_CYCLE_REQUIRED_ATTR = 'Power Cycle Required'
UPGRADE_REQUIRED_ATTR = 'Upgrade Required'
SSD_INFO_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -q"
SSD_FIRMWARE_INFO_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -q -i {}"
SSD_FIRMWARE_INSTALL_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh --no-power-cycle -y -u -i {}"
SSD_FIRMWARE_UPDATE_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -y -u -i {}"
SSD_INFO_COMMAND = ["/usr/bin/mlnx-ssd-fw-update.sh", "-q"]
SSD_FIRMWARE_INFO_COMMAND = ["/usr/bin/mlnx-ssd-fw-update.sh", "-q", "-i", ""]
SSD_FIRMWARE_INSTALL_COMMAND = ["/usr/bin/mlnx-ssd-fw-update.sh", "--no-power-cycle", "-y", "-u", "-i", ""]
SSD_FIRMWARE_UPDATE_COMMAND = ["/usr/bin/mlnx-ssd-fw-update.sh", "-y", "-u", "-i", ""]
def __init__(self):
super(ComponentSSD, self).__init__()
@ -519,13 +502,15 @@ class ComponentSSD(Component):
return False
if allow_reboot:
cmd = self.SSD_FIRMWARE_UPDATE_COMMAND.format(image_path)
self.SSD_FIRMWARE_UPDATE_COMMAND[4] = image_path
cmd = self.SSD_FIRMWARE_UPDATE_COMMAND
else:
cmd = self.SSD_FIRMWARE_INSTALL_COMMAND.format(image_path)
self.SSD_FIRMWARE_INSTALL_COMMAND[5] = image_path
cmd = self.SSD_FIRMWARE_INSTALL_COMMAND
try:
print("INFO: Installing {} firmware update".format(self.name))
subprocess.check_call(cmd.split(), universal_newlines=True)
subprocess.check_call(cmd, universal_newlines=True)
except subprocess.CalledProcessError as e:
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
return False
@ -563,10 +548,8 @@ class ComponentSSD(Component):
return FW_AUTO_SCHEDULED
def get_firmware_version(self):
cmd = self.SSD_INFO_COMMAND
try:
output = subprocess.check_output(cmd.split(),
output = subprocess.check_output(self.SSD_INFO_COMMAND,
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
@ -579,10 +562,10 @@ class ComponentSSD(Component):
raise RuntimeError("Failed to parse {} version".format(self.name))
def get_available_firmware_version(self, image_path):
cmd = self.SSD_FIRMWARE_INFO_COMMAND.format(image_path)
self.SSD_FIRMWARE_INFO_COMMAND[3] = image_path
try:
output = subprocess.check_output(cmd.split(),
output = subprocess.check_output(self.SSD_FIRMWARE_INFO_COMMAND,
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
@ -614,10 +597,10 @@ class ComponentSSD(Component):
return available_firmware_version
def get_firmware_update_notification(self, image_path):
cmd = self.SSD_FIRMWARE_INFO_COMMAND.format(image_path)
self.SSD_FIRMWARE_INFO_COMMAND[3] = image_path
try:
output = subprocess.check_output(cmd.split(),
output = subprocess.check_output(self.SSD_FIRMWARE_INFO_COMMAND,
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
@ -660,7 +643,7 @@ class ComponentBIOS(Component):
COMPONENT_DESCRIPTION = 'BIOS - Basic Input/Output System'
COMPONENT_FIRMWARE_EXTENSION = '.rom'
BIOS_VERSION_COMMAND = 'dmidecode --oem-string 1'
BIOS_VERSION_COMMAND = ['dmidecode', '--oem-string', '1']
def __init__(self):
super(ComponentBIOS, self).__init__()
@ -688,10 +671,8 @@ class ComponentBIOS(Component):
return True
def get_firmware_version(self):
cmd = self.BIOS_VERSION_COMMAND
try:
version = subprocess.check_output(cmd.split(),
version = subprocess.check_output(self.BIOS_VERSION_COMMAND,
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
@ -716,7 +697,7 @@ class ComponentBIOSSN2201(Component):
COMPONENT_NAME = 'BIOS'
COMPONENT_DESCRIPTION = 'BIOS - Basic Input/Output System'
BIOS_VERSION_COMMAND = 'dmidecode -t0'
BIOS_VERSION_COMMAND = ['dmidecode', '-t0']
def __init__(self):
super(ComponentBIOSSN2201, self).__init__()
@ -725,10 +706,8 @@ class ComponentBIOSSN2201(Component):
self.description = self.COMPONENT_DESCRIPTION
def get_firmware_version(self):
cmd = self.BIOS_VERSION_COMMAND
try:
output = subprocess.check_output(cmd.split(),
output = subprocess.check_output(self.BIOS_VERSION_COMMAND,
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
@ -764,7 +743,7 @@ class ComponentCPLD(Component):
CPLD_PART_NUMBER_DEFAULT = '0'
CPLD_VERSION_MINOR_DEFAULT = '0'
CPLD_FIRMWARE_UPDATE_COMMAND = 'cpldupdate --dev {} --print-progress {}'
CPLD_FIRMWARE_UPDATE_COMMAND = ['cpldupdate', '--dev', '', '--print-progress', '']
def __init__(self, idx):
super(ComponentCPLD, self).__init__()
@ -796,12 +775,13 @@ class ComponentCPLD(Component):
mst_dev = self.__get_mst_device()
if mst_dev is None:
return False
cmd = self.CPLD_FIRMWARE_UPDATE_COMMAND.format(mst_dev, image_path)
self.CPLD_FIRMWARE_UPDATE_COMMAND[2] = mst_dev
self.CPLD_FIRMWARE_UPDATE_COMMAND[4] = image_path
cmd = self.CPLD_FIRMWARE_UPDATE_COMMAND
try:
print("INFO: Installing {} firmware update: path={}".format(self.name, image_path))
subprocess.check_call(cmd.split(), universal_newlines=True)
subprocess.check_call(cmd, universal_newlines=True)
except subprocess.CalledProcessError as e:
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
return False
@ -910,14 +890,14 @@ class ComponentCPLD(Component):
class ComponentCPLDSN2201(ComponentCPLD):
CPLD_FIRMWARE_UPDATE_COMMAND = 'cpldupdate --gpio {} --uncustomized --print-progress'
CPLD_FIRMWARE_UPDATE_COMMAND = ['cpldupdate', '--gpio', '', '--uncustomized', '--print-progress']
def _install_firmware(self, image_path):
cmd = self.CPLD_FIRMWARE_UPDATE_COMMAND.format(image_path)
self.CPLD_FIRMWARE_UPDATE_COMMAND[2] = image_path
try:
print("INFO: Installing {} firmware update: path={}".format(self.name, image_path))
subprocess.check_call(cmd.split(), universal_newlines=True)
subprocess.check_call(self.CPLD_FIRMWARE_UPDATE_COMMAND, universal_newlines=True)
except subprocess.CalledProcessError as e:
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
return False

View File

@ -218,8 +218,8 @@ class PsuFan(MlnxFan):
addr = utils.read_str_from_file(self.psu_i2c_addr_path, raise_exception=True)
command = utils.read_str_from_file(self.psu_i2c_command_path, raise_exception=True)
speed = self.PSU_FAN_SPEED[int(speed // 10)]
command = "i2cset -f -y {0} {1} {2} {3} wp".format(bus, addr, command, speed)
subprocess.check_call(command, shell = True, universal_newlines=True)
command = ["i2cset", "-f", "-y", bus, addr, command, speed, "wp"]
subprocess.check_call(command, universal_newlines=True)
return True
except subprocess.CalledProcessError as ce:
logger.log_error('Failed to call command {}, return code={}, command output={}'.format(ce.cmd, ce.returncode, ce.output))

View File

@ -551,7 +551,7 @@ class InvalidPsuVolWA:
return threshold_value
# Run a sensors -s command to triger hardware to get the real threashold value
utils.run_command('sensors -s')
utils.run_command(['sensors', '-s'])
# Wait for the threshold value change
return cls.wait_set_done(threshold_file)

View File

@ -25,8 +25,8 @@
try:
import subprocess
import os
from sonic_platform_base.sonic_eeprom import eeprom_dts
from sonic_py_common.logger import Logger
from sonic_py_common.general import check_output_pipe
from . import utils
from .device_data import DeviceDataManager
from sonic_platform_base.sonic_xcvr.sfp_optoe_base import SfpOptoeBase
@ -186,10 +186,11 @@ class MlxregManager:
return False
try:
cmd = "mlxreg -d /dev/mst/{} --reg_name MCIA --indexes \
slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0 \
--set {} -y".format(self.mst_pci_device, self.slot_id, self.sdk_index, device_address, page, num_bytes, dword)
subprocess.check_call(cmd, shell=True, universal_newlines=True, stdout=subprocess.DEVNULL)
cmd = ["mlxreg", "-d", "", "--reg_name", "MCIA", "--indexes", "", "--set", "", "-y"]
cmd[2] = "/dev/mst/" + self.mst_pci_device
cmd[6] = "slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0".format(self.slot_id, self.sdk_index, device_address, page, num_bytes)
cmd[8] = dword
subprocess.check_call(cmd, universal_newlines=True, stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
logger.log_error("Error! Unable to write data dword={} for {} port, page {} offset {}, rc = {}, err msg: {}".format(dword, self.sdk_index, page, device_address, e.returncode, e.output))
return False
@ -197,10 +198,11 @@ class MlxregManager:
def read_mlxred_eeprom(self, offset, page, num_bytes):
try:
cmd = "mlxreg -d /dev/mst/{} --reg_name MCIA --indexes \
slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0 \
--get".format(self.mst_pci_device, self.slot_id, self.sdk_index, offset, page, num_bytes)
result = subprocess.check_output(cmd, universal_newlines=True, shell=True)
cmd = ["mlxreg", "-d", "", "--reg_name", "MCIA", "--indexes", "", "--get"]
cmd[2] = "/dev/mst/" + self.mst_pci_device
cmd[6] = "slot_index={},module={},device_address={},page_number={},i2c_device_address=0x50,size={},bank_number=0".format(self.slot_id, self.sdk_index, offset, page, num_bytes)
result = subprocess.check_output(cmd, universal_newlines=True)
except subprocess.CalledProcessError as e:
logger.log_error("Error! Unable to read data for {} port, page {} offset {}, rc = {}, err msg: {}".format(self.sdk_index, page, offset, e.returncode, e.output))
return None
@ -315,7 +317,7 @@ class SFP(NvidiaSFPCommon):
def get_mst_pci_device(self):
device_name = None
try:
device_name = subprocess.check_output("ls /dev/mst/ | grep pciconf", universal_newlines=True, shell=True).strip()
device_name = check_output_pipe(["ls", "/dev/mst/"], ["grep", "pciconf"]).strip()
except subprocess.CalledProcessError as e:
logger.log_error("Failed to find mst PCI device rc={} err.msg={}".format(e.returncode, e.output))
return device_name
@ -355,10 +357,12 @@ class SFP(NvidiaSFPCommon):
return None
eeprom_raw = []
ethtool_cmd = "ethtool -m sfp{} hex on offset {} length {}".format(self.index, offset, num_bytes)
ethtool_cmd = ["ethtool", "-m", "", "hex", "on", "offset", "", "length", ""]
ethtool_cmd[2] = "sfp" + str(self.index)
ethtool_cmd[6] = str(offset)
ethtool_cmd[8] = str(num_bytes)
try:
output = subprocess.check_output(ethtool_cmd,
shell=True,
universal_newlines=True)
output_lines = output.splitlines()
first_line_raw = output_lines[0]
@ -478,9 +482,9 @@ class SFP(NvidiaSFPCommon):
get_lpmode_code = 'from sonic_platform import sfp;\n' \
'with sfp.SdkHandleContext() as sdk_handle:' \
'print(sfp.SFP._get_lpmode(sdk_handle, {}, {}))'.format(self.sdk_index, self.slot_id)
lpm_cmd = "docker exec pmon python3 -c \"{}\"".format(get_lpmode_code)
lpm_cmd = ["docker", "exec", "pmon", "python3", "-c", get_lpmode_code]
try:
output = subprocess.check_output(lpm_cmd, shell=True, universal_newlines=True)
output = subprocess.check_output(lpm_cmd, universal_newlines=True)
return 'True' in output
except subprocess.CalledProcessError as e:
print("Error! Unable to get LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
@ -519,10 +523,10 @@ class SFP(NvidiaSFPCommon):
'with sfp.SdkHandleContext() as sdk_handle:' \
'print(sfp.SFP._reset(sdk_handle, {}, {}))' \
.format(self.sdk_index, self.slot_id)
reset_cmd = "docker exec pmon python3 -c \"{}\"".format(reset_code)
reset_cmd = ["docker", "exec", "pmon", "python3", "-c", reset_code]
try:
output = subprocess.check_output(reset_cmd, shell=True, universal_newlines=True)
output = subprocess.check_output(reset_cmd, universal_newlines=True)
return 'True' in output
except subprocess.CalledProcessError as e:
print("Error! Unable to set LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
@ -677,11 +681,11 @@ class SFP(NvidiaSFPCommon):
'with sfp.SdkHandleContext() as sdk_handle:' \
'print(sfp.SFP._set_lpmode({}, sdk_handle, {}, {}))' \
.format('True' if lpmode else 'False', self.sdk_index, self.slot_id)
lpm_cmd = "docker exec pmon python3 -c \"{}\"".format(set_lpmode_code)
lpm_cmd = ["docker", "exec", "pmon", "python3", "-c", set_lpmode_code]
# Set LPM
try:
output = subprocess.check_output(lpm_cmd, shell=True, universal_newlines=True)
output = subprocess.check_output(lpm_cmd, universal_newlines=True)
return 'True' in output
except subprocess.CalledProcessError as e:
print("Error! Unable to set LPM for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))
@ -824,9 +828,9 @@ class RJ45Port(NvidiaSFPCommon):
get_presence_code = 'from sonic_platform import sfp;\n' \
'with sfp.SdkHandleContext() as sdk_handle:' \
'print(sfp.RJ45Port._get_presence(sdk_handle, {}))'.format(self.sdk_index)
presence_cmd = "docker exec pmon python3 -c \"{}\"".format(get_presence_code)
presence_cmd = ["docker", "exec", "pmon", "python3", "-c", get_presence_code]
try:
output = subprocess.check_output(presence_cmd, shell=True, universal_newlines=True)
output = subprocess.check_output(presence_cmd, universal_newlines=True)
return 'True' in output
except subprocess.CalledProcessError as e:
print("Error! Unable to get presence for {}, rc = {}, err msg: {}".format(self.sdk_index, e.returncode, e.output))

View File

@ -187,9 +187,8 @@ def is_host():
return True for host and False for docker
"""
try:
proc = subprocess.Popen("docker --version 2>/dev/null",
proc = subprocess.Popen(["docker", "--version"],
stdout=subprocess.PIPE,
shell=True,
stderr=subprocess.STDOUT,
universal_newlines=True)
stdout = proc.communicate()[0]
@ -221,7 +220,7 @@ def run_command(command):
:return: Output of the shell command.
"""
try:
process = subprocess.Popen(command, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.communicate()[0].strip()
except Exception:
return None

View File

@ -17,6 +17,7 @@
import os
import sys
import subprocess
from mock import MagicMock
if sys.version_info.major == 3:
@ -172,7 +173,6 @@ class TestChassis:
@mock.patch('sonic_platform.device_data.DeviceDataManager.get_sfp_count', MagicMock(return_value=3))
def test_change_event(self):
from sonic_platform.sfp_event import sfp_event
from sonic_platform.sfp import SFP
return_port_dict = {1: '1'}
def mock_check_sfp_status(self, port_dict, error_dict, timeout):
@ -276,12 +276,12 @@ class TestChassis:
#Override the dmi file
sonic_platform.chassis.DMI_FILE = "/tmp/dmi_file"
new_dmi_file = sonic_platform.chassis.DMI_FILE
os.system("touch " + new_dmi_file)
os.system("chmod -r " + new_dmi_file)
subprocess.call(["touch", new_dmi_file])
subprocess.call(["chmod", "-r", new_dmi_file])
chassis = Chassis()
rev = chassis.get_revision()
sonic_platform.chassis.DMI_FILE = old_dmi_file
os.system("rm -f " + new_dmi_file)
subprocess.call(["rm", "-f", new_dmi_file])
assert rev == "N/A"
def test_get_port_or_cage_type(self):

View File

@ -143,7 +143,7 @@ class TestFan:
assert subprocess.check_call.call_count == 0
fan.get_presence = MagicMock(return_value=True)
assert fan.set_speed(60)
subprocess.check_call.assert_called_with("i2cset -f -y {0} {1} {2} {3} wp".format('bus', 'addr', 'command', hex(60)), shell=True, universal_newlines=True)
subprocess.check_call.assert_called_with(["i2cset", "-f", "-y", "bus", "addr", "command", hex(60), "wp"], universal_newlines=True)
subprocess.check_call = MagicMock(side_effect=subprocess.CalledProcessError('', ''))
assert not fan.set_speed(60)
subprocess.check_call = MagicMock()

View File

@ -160,4 +160,4 @@ class TestPsu:
# Normal
vpd_info[InvalidPsuVolWA.CAPACITY_FIELD] = InvalidPsuVolWA.EXPECT_CAPACITY
assert InvalidPsuVolWA.run(psu, InvalidPsuVolWA.INVALID_VOLTAGE_VALUE, '') == 9999
mock_run_command.assert_called_with('sensors -s')
mock_run_command.assert_called_with(['sensors', '-s'])

View File

@ -118,7 +118,7 @@ class TestUtils:
assert mock_log.call_count == 1
def test_run_command(self):
output = utils.run_command('ls')
output = utils.run_command(['ls'])
assert output
@mock.patch('sonic_py_common.device_info.get_path_to_hwsku_dir', mock.MagicMock(return_value='/tmp'))