Replace os.system and remove subprocess with shell=True (#12177)

Signed-off-by: maipbui <maibui@microsoft.com>
#### Why I did it
`subprocess` is used with `shell=True`, which is very dangerous for shell injection.
`os` - not secure against maliciously constructed input and dangerous if used to evaluate dynamic content
#### How I did it
remove `shell=True`, use `shell=False`
Replace `os` by `subprocess`
This commit is contained in:
Mai Bui 2022-11-04 10:48:51 -04:00 committed by GitHub
parent b522b7762f
commit 61a085e55e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 27 additions and 28 deletions

View File

@ -80,10 +80,10 @@ class LldpManager(daemon_base.DaemonBase):
self.port_init_done = False
def update_hostname(self, hostname):
cmd = "lldpcli configure system hostname {0}".format(hostname)
cmd = ["lldpcli", "configure", "system", "hostname", hostname]
self.log_debug("Running command: '{}'".format(cmd))
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = subprocess.Popen(cmd,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
@ -93,15 +93,15 @@ class LldpManager(daemon_base.DaemonBase):
def update_mgmt_addr(self, ip):
if ip == "None":
cmd = "lldpcli unconfigure system ip management pattern"
cmd = ["lldpcli", "unconfigure", "system", "ip", "management", "pattern"]
self.log_info("Mgmt IP {0} deleted".format(self.mgmt_ip))
else:
cmd = "lldpcli configure system ip management pattern {0}".format(ip)
cmd = ["lldpcli", "configure", "system", "ip", "management", "pattern", ip]
self.log_info("Mgmt IP changed old ip {0}, new ip {1}".format(self.mgmt_ip, ip))
self.log_debug("Running command: '{}'".format(cmd))
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
@ -150,11 +150,11 @@ class LldpManager(daemon_base.DaemonBase):
# Get the port description. If None or empty string, we'll skip this configuration
port_desc = port_table_dict.get("description")
lldpcli_cmd = "lldpcli configure ports {0} lldp portidsubtype local {1}".format(port_name, port_alias)
lldpcli_cmd = ["lldpcli", "configure", "ports", port_name, "lldp", "portidsubtype", "local", port_alias]
# if there is a description available, also configure that
if port_desc:
lldpcli_cmd += " description '{}'".format(port_desc)
lldpcli_cmd += ["description", port_desc]
else:
self.log_info("Unable to retrieve description for port '{}'. Not adding port description".format(port_name))
@ -330,7 +330,7 @@ class LldpManager(daemon_base.DaemonBase):
self.port_init_done = self.port_config_done = True
if self.port_init_done and self.port_config_done:
self.port_init_done = self.port_config_done = False
rc, stderr = run_cmd(self, "lldpcli resume")
rc, stderr = run_cmd(self, ["lldpcli", "resume"])
if rc != 0:
self.log_error("Failed to resume lldpd with command: 'lldpcli resume': {}".format(stderr))
sys.exit(1)
@ -350,7 +350,7 @@ def main():
def run_cmd(self, cmd):
proc = subprocess.Popen(cmd, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = subprocess.Popen(cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
return proc.returncode, stderr

View File

@ -34,13 +34,13 @@ logger.set_min_log_priority_info()
def add_nat_conntrack_entry_in_kernel(ipproto, srcip, dstip, srcport, dstport, natsrcip, natdstip, natsrcport, natdstport):
# pyroute2 doesn't have support for adding conntrack entries via netlink yet. So, invoking the conntrack utility to add the entries.
state = ''
state = []
if (ipproto == IP_PROTO_TCP):
state = ' --state ESTABLISHED '
ctcmd = 'conntrack -I -n ' + natdstip + ':' + natdstport + ' -g ' + natsrcip + ':' + natsrcport + \
' --protonum ' + ipproto + state + ' --timeout 432000 --src ' + srcip + ' --sport ' + srcport + \
' --dst ' + dstip + ' --dport ' + dstport + ' -u ASSURED'
subprocess.call(ctcmd, shell=True)
state = ['--state', 'ESTABLISHED']
ctcmd = ['conntrack', '-I', '-n', natdstip + ':' + natdstport, '-g', natsrcip + ':' + natsrcport, \
'--protonum', ipproto] + state + ['--timeout', '432000', '--src', srcip, '--sport', srcport, \
'--dst', dstip, '--dport', dstport, '-u', 'ASSURED']
subprocess.call(ctcmd)
logger.log_info("Restored NAT entry: {}".format(ctcmd))

View File

@ -5,7 +5,7 @@ import os
import socket
import tarfile
import time
import subprocess
import yaml
from azure.storage.file import FileService
from sonic_py_common.logger import Logger
@ -42,8 +42,8 @@ logger.set_min_log_priority_info()
def make_new_dir(p):
os.system("rm -rf " + p)
os.system("mkdir -p " + p)
subprocess.call(["rm", "-rf", p])
subprocess.call(["mkdir", "-p", p])
def parse_a_json(data, prefix, val):

View File

@ -41,7 +41,7 @@ def get_command_result(command):
try:
proc_instance = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, universal_newlines=True)
universal_newlines=True)
command_stdout, command_stderr = proc_instance.communicate()
if proc_instance.returncode != 0:
syslog.syslog(syslog.LOG_ERR, "[memory_checker] Failed to execute the command '{}'. Return code: '{}'"
@ -66,7 +66,7 @@ def check_memory_usage(container_name, threshold_value):
Returns:
None.
"""
command = "docker stats --no-stream --format \{{\{{.MemUsage\}}\}} {}".format(container_name)
command = ["docker", "stats", "--no-stream", "--format", "{{.MemUsage}}", container_name]
command_stdout = get_command_result(command)
mem_usage = command_stdout.split("/")[0].strip()
match_obj = re.match(r"\d+\.?\d*", mem_usage)
@ -105,7 +105,7 @@ def is_service_active(service_name):
Returns:
True if service is running, False otherwise
"""
status = subprocess.run("systemctl is-active --quiet {}".format(service_name), shell=True, check=False)
status = subprocess.run(["systemctl", "is-active", "--quiet", service_name])
return status.returncode == 0

View File

@ -39,7 +39,7 @@ def get_command_result(command):
try:
proc_instance = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, universal_newlines=True)
universal_newlines=True)
command_stdout, command_stderr = proc_instance.communicate()
if proc_instance.returncode != 0:
return 1, command_stdout.strip(), command_stderr.strip()
@ -58,7 +58,7 @@ def reset_failed_flag(service_name):
Returns:
None
"""
reset_failed_command = "sudo systemctl reset-failed {}.service".format(service_name)
reset_failed_command = ["sudo", "systemctl", "reset-failed", "{}.service".format(service_name)]
syslog.syslog(syslog.LOG_INFO, "Resetting failed status of service '{}' ..."
.format(service_name))
@ -81,7 +81,7 @@ def restart_service(service_name):
Returns:
None.
"""
restart_command = "sudo systemctl restart {}.service".format(service_name)
restart_command = ["sudo", "systemctl", "restart", "{}.service".format(service_name)]
reset_failed_flag(service_name)

View File

@ -73,18 +73,17 @@ class MarkDhcpPacket(object):
return intf_mark
def run_command(self, cmd):
subprocess.call(cmd, shell=True)
subprocess.call(cmd)
log.log_info("run command: {}".format(cmd))
def clear_dhcp_packet_marks(self):
'''
Flush the INPUT chain in ebtables upon restart
'''
self.run_command("sudo ebtables -F INPUT")
self.run_command(["sudo", "ebtables", "-F", "INPUT"])
def apply_mark_in_ebtables(self, intf, mark):
self.run_command("sudo ebtables -A INPUT -i {} -j mark --mark-set {}"
.format(intf, mark))
self.run_command(["sudo", "ebtables", "-A", "INPUT", "-i", intf, "-j", "mark", "--mark-set", mark])
def update_mark_in_state_db(self, intf, mark):
self.state_db.set(