[inventec] Replace os.system and remove subprocess with shell=True (#12108)

Signed-off-by: maipbui <maibui@microsoft.com>
Dependency: [https://github.com/sonic-net/sonic-buildimage/pull/12065](https://github.com/sonic-net/sonic-buildimage/pull/12065)
#### Why I did it
1. `getstatusoutput` is used without a static string and it uses `shell=True`
2. `subprocess()` - when using with `shell=True` is dangerous. Using subprocess function without a static string can lead to command injection.
3. `os` - not secure against maliciously constructed input and dangerous if used to evaluate dynamic content.
#### How I did it
1. use `getstatusoutput` without shell=True
2. `subprocess()` - use `shell=False` instead. use an array string. Ref: [https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation](https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation)
3. `os` - use with `subprocess`
This commit is contained in:
Mai Bui 2022-10-14 10:21:44 -04:00 committed by GitHub
parent ea101a90d5
commit 92d25be08f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 32 additions and 23 deletions

View File

@ -17,13 +17,11 @@ options:
try:
import os
import commands
import sys, getopt
import logging
import re
import time
import syslog
from sonic_sfp.bcmshell import bcmshell
from sonic_py_common.general import getstatusoutput_noshell
except ImportError as e:
raise ImportError("%s - required module not found" % str(e))
@ -53,10 +51,10 @@ def initialLoop():
bcm_obj = BCMUtil()
bcm_obj.execute_command("echo")
initialNotOK = False
print bcm_obj
print(bcm_obj)
log_message( syslog.LOG_INFO, "BCMUtil Object initialed successfully" )
except Exception, e:
print "Exception. The warning is {0}".format(str(e))
except Exception as e:
print("Exception. The warning is {0}".format(str(e)))
time.sleep(10)
class BCMUtil(bcmshell):
@ -66,7 +64,7 @@ class BCMUtil(bcmshell):
def get_platform(self):
if self.platform is None:
self.platform = os.popen("uname -n").read().strip()
_, self.platform = getstatusoutput_noshell(["uname", "-n"]).strip()
return self.platform
def get_asic_temperature( self ):
@ -102,14 +100,18 @@ def main():
content = readPtr.read().strip()
if bcm_obj.get_platform() == INV_SEQUOIA_PLATFORM :
if content == "inv_bmc" and SWITCH_TEMP_FILE_NAME in file_list :
os.system("echo {0} > {1}/{2}/device/{3}".format( ( bcm_obj.get_asic_temperature() * 1000 ), HWMON_PATH, index, SWITCH_TEMP_FILE_NAME ))
file = "{0}/{1}/device/{2}".format(HWMON_PATH, index, SWITCH_TEMP_FILE_NAME)
with open(file, 'w') as f:
f.write(str(bcm_obj.get_asic_temperature() * 1000) + '\n')
break
else :
if content == "inv_psoc" and SWITCH_TEMP_FILE_NAME in file_list :
print "echo {0} > {1}/{2}/device/{3}".format( ( bcm_obj.get_asic_temperature() * 1000 ), HWMON_PATH, index, SWITCH_TEMP_FILE_NAME )
os.system("echo {0} > {1}/{2}/device/{3}".format( ( bcm_obj.get_asic_temperature() * 1000 ), HWMON_PATH, index, SWITCH_TEMP_FILE_NAME ))
print("echo {0} > {1}/{2}/device/{3}".format( ( bcm_obj.get_asic_temperature() * 1000 ), HWMON_PATH, index, SWITCH_TEMP_FILE_NAME))
file = "{0}/{1}/device/{2}".format(HWMON_PATH, index, SWITCH_TEMP_FILE_NAME)
with open(file, 'w') as f:
f.write(str(bcm_obj.get_asic_temperature() * 1000) + '\n')
break
except Exception, e:
except Exception as e:
log_message( syslog.LOG_WARNING, "Exception. The warning is {0}".format(str(e)) )
initialLoop()
time.sleep(5)

View File

@ -20,6 +20,7 @@ import time
import syslog
import re
from sonic_sfp.bcmshell import bcmshell
from sonic_py_common.general import getstatusoutput_noshell
# =====================================================================
@ -120,8 +121,8 @@ def _board_init():
global SYNC_S
global SYNC_P
cmd = "uname -n"
platform = os.popen(cmd).read()
cmd = ["uname", "-n"]
_, platform = getstatusoutput_noshell(cmd)
if platform.rstrip() == INV_MAGNOLIA:
BOARD_TPYE = "inventec_d6254qs"

View File

@ -2,6 +2,7 @@
import os
import socket
import subprocess
from collections import OrderedDict
# Purpose: Shutdown DUT upon receiving thermaltrip event from kernel (inv_pthread)
@ -59,6 +60,6 @@ if __name__ == '__main__':
# Receive thermaltrip event
if event['ACTION'] == 'remove' and event['DEVPATH'] == '/kernel/platform_status/fan':
os.system("shutdown -h now")
subprocess.call(["shutdown", "-h", "now"])

View File

@ -28,6 +28,7 @@ try:
import syslog
from sfputil import SfpUtil
from sonic_sfp.bcmshell import bcmshell
from sonic_py_common.general import getstatusoutput_noshell
except ImportError as e:
raise ImportError("%s - required module not found" % str(e))
@ -127,7 +128,7 @@ class BCMUtil(bcmshell):
def get_platform(self):
if self.platform is None:
self.platform = os.popen("uname -n").read().strip()
_, self.platform = getstatusoutput_noshell(["uname", "-n"])
return self.platform
def get_port_to_bcm_mapping(self):

View File

@ -8,6 +8,7 @@
try:
import os
import logging
import subprocess
from ctypes import create_string_buffer
from sonic_platform_base.sfp_base import SfpBase
from sonic_platform_base.sonic_sfp.sff8436 import sff8436Dom
@ -120,7 +121,7 @@ class QSfp(SfpBase):
return True
def __is_host(self):
return os.system("docker > /dev/null 2>&1") == 0
return subprocess.call(["docker"]) == 0
def __get_path_to_port_config_file(self):
host_platform_root_path = '/usr/share/sonic/device'

View File

@ -33,6 +33,7 @@ import sys, getopt
import logging
import syslog
import time
from sonic_py_common.general import getstatusoutput_noshell_pipe
DEBUG = False
args = []
@ -236,8 +237,9 @@ def system_install(boot_option):
return status
for addr_offset in range (0,FAN_NUM):
addr=FAN_VPD_ADDR_BASE+addr_offset
cmd = "i2cdetect -y "+str(FAN_VPD_CHANNEL)+" "+str(addr)+" "+str(addr)+" | grep "+str(hex(addr)).replace('0x','')
result=os.system(cmd)
cmd1 = ["i2cdetect", "-y", str(FAN_VPD_CHANNEL), str(addr), str(addr)]
cmd2 = ["grep", f'{addr:x}']
result, _ = getstatusoutput_noshell_pipe(cmd1, cmd2)
if( result==0 ):
cmd="echo inv_eeprom "+str(addr)+" > /sys/bus/i2c/devices/i2c-"+FAN_VPD_CHANNEL
status, output = exec_cmd(cmd,1)

View File

@ -9,6 +9,7 @@ try:
import os
import sys
import time
import subprocess
from sonic_platform_base.sfp_base import SfpBase
from sonic_platform_base.sonic_sfp.sff8436 import sff8436Dom
from sonic_platform_base.sonic_sfp.sff8436 import sff8436InterfaceId
@ -108,7 +109,7 @@ class QSfp(SfpBase):
return retval
def __is_host(self):
return os.system("docker > /dev/null 2>&1") == 0
return subprocess.call(["docker"]) == 0
def __get_path_to_port_config_file(self):
host_platform_root_path = '/usr/share/sonic/device'

View File

@ -7,7 +7,7 @@
try:
import os
import sys
import subprocess
from sonic_platform_base.sfp_base import SfpBase
from sonic_platform_base.sonic_sfp.sff8472 import sff8472Dom
from sonic_platform_base.sonic_sfp.sff8472 import sff8472InterfaceId
@ -95,7 +95,7 @@ class Sfp(SfpBase):
return retval
def __is_host(self):
return os.system("docker > /dev/null 2>&1") == 0
return subprocess.call(["docker"]) == 0
def __get_path_to_port_config_file(self):
host_platform_root_path = '/usr/share/sonic/device'

View File

@ -153,7 +153,7 @@ class Sfp(SfpBase):
# Path to QSFP sysfs
PLATFORM_ROOT_PATH = "/usr/share/sonic/device"
PMON_HWSKU_PATH = "/usr/share/sonic/hwsku"
HOST_CHK_CMD = "docker > /dev/null 2>&1"
HOST_CHK_CMD = ["docker"]
PLATFORM = "x86_64-inventec_d7054q28b-r0"
HWSKU = "INVENTEC-D7054Q28B-S48-Q6"
@ -285,7 +285,7 @@ class Sfp(SfpBase):
return ""
def __is_host(self):
return os.system(self.HOST_CHK_CMD) == 0
return subprocess.call(self.HOST_CHK_CMD) == 0
def __get_path_to_port_config_file(self):
platform_path = "/".join([self.PLATFORM_ROOT_PATH, self.PLATFORM])