[sonic-ctrmgrd] Replace os.system and remove subprocess with shell=True (#12534)

Signed-off-by: maipbui <maibui@microsoft.com>
#### Why I did it
`subprocess.Popen()` and `subprocess.run()` is used with `shell=True`, which is very dangerous for shell injection.
`os` - not secure against maliciously constructed input and dangerous if used to evaluate dynamic content
#### How I did it
Replace `os` by `subprocess`, remove `shell=True`
#### How to verify it
Passed UT
Tested in DUT
This commit is contained in:
Mai Bui 2022-10-31 08:12:03 -07:00 committed by GitHub
parent a0055abb5d
commit 0fcd219c3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 36 additions and 33 deletions

View File

@ -4,11 +4,11 @@ import argparse
import datetime import datetime
import inspect import inspect
import json import json
import subprocess
import syslog import syslog
import time import time
from swsscommon import swsscommon from swsscommon import swsscommon
from sonic_py_common.general import getstatusoutput_noshell_pipe
# DB field names # DB field names
SET_OWNER = "set_owner" SET_OWNER = "set_owner"
@ -114,9 +114,12 @@ def get_docker_id():
# Read the container-id # Read the container-id
# Note: This script runs inside the context of container # Note: This script runs inside the context of container
# #
cmd = 'cat /proc/self/cgroup | grep -e ":memory:" | rev | cut -f1 -d\'/\' | rev' cmd0 = ['cat', '/proc/self/cgroup']
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) cmd1 = ['grep', '-e', ":memory:"]
output = proc.communicate()[0].decode("utf-8") cmd2 = ['rev']
cmd3 = ['cut', '-f1', '-d', '/']
cmd4 = ['rev']
_, output = getstatusoutput_noshell_pipe(cmd0, cmd1, cmd2, cmd3, cmd4)
return output.strip()[:12] return output.strip()[:12]

View File

@ -105,8 +105,8 @@ def iptable_proxy_rule_upd(ip_str, port = SQUID_PORT):
while True: while True:
num += 1 num += 1
cmd = "sudo iptables -t nat -n -L OUTPUT {}".format(num) cmd = ["sudo", "iptables", "-t", "nat", "-n", "-L", "OUTPUT", str(num)]
proc = subprocess.run(cmd, shell=True, capture_output=True) proc = subprocess.run(cmd, shell=False, capture_output=True)
check_proc(proc) check_proc(proc)
if not proc.stdout: if not proc.stdout:
@ -119,16 +119,15 @@ def iptable_proxy_rule_upd(ip_str, port = SQUID_PORT):
found = True found = True
else: else:
# Duplicate or different IP - delete it # Duplicate or different IP - delete it
cmd = "sudo iptables -t nat -D OUTPUT {}".format(num) cmd = ["sudo", "iptables", "-t", "nat", "-D", "OUTPUT", str(num)]
proc = subprocess.run(cmd, shell=True, capture_output=True) proc = subprocess.run(cmd, shell=False, capture_output=True)
check_proc(proc) check_proc(proc)
# Decrement number to accommodate deleted rule # Decrement number to accommodate deleted rule
num -= 1 num -= 1
if destination and not found: if destination and not found:
cmd = "sudo iptables -t nat -A OUTPUT -p tcp -d {} --dport {} -j DNAT --to-destination {}".format( cmd = ["sudo", "iptables", "-t", "nat", "-A", "OUTPUT", "-p", "tcp", "-d", DST_IP, "--dport", DST_PORT, "-j", "DNAT", "--to-destination", destination]
DST_IP, DST_PORT, destination) proc = subprocess.run(cmd, shell=False, capture_output=True)
proc = subprocess.run(cmd, shell=True, capture_output=True)
check_proc(proc) check_proc(proc)

View File

@ -6,7 +6,7 @@ import json
import os import os
import sys import sys
import syslog import syslog
import subprocess
from collections import defaultdict from collections import defaultdict
from ctrmgr.ctrmgr_iptables import iptable_proxy_rule_upd from ctrmgr.ctrmgr_iptables import iptable_proxy_rule_upd
@ -119,7 +119,7 @@ def ts_now():
def is_systemd_active(feat): def is_systemd_active(feat):
if not UNIT_TESTING: if not UNIT_TESTING:
status = os.system('systemctl is-active --quiet {}'.format(feat)) status = subprocess.call(['systemctl', 'is-active', '--quiet', str(feat)])
else: else:
status = UNIT_TESTING_ACTIVE status = UNIT_TESTING_ACTIVE
log_debug("system status for {}: {}".format(feat, str(status))) log_debug("system status for {}: {}".format(feat, str(status)))
@ -129,7 +129,7 @@ def is_systemd_active(feat):
def restart_systemd_service(server, feat, owner): def restart_systemd_service(server, feat, owner):
log_debug("Restart service {} to owner:{}".format(feat, owner)) log_debug("Restart service {} to owner:{}".format(feat, owner))
if not UNIT_TESTING: if not UNIT_TESTING:
status = os.system("systemctl restart {}".format(feat)) status = subprocess.call(["systemctl", "restart", str(feat)])
else: else:
server.mod_db_entry(STATE_DB_NAME, server.mod_db_entry(STATE_DB_NAME,
FEATURE_TABLE, feat, {"restart": "true"}) FEATURE_TABLE, feat, {"restart": "true"})

View File

@ -103,30 +103,31 @@ class proc:
def mock_subproc_run(cmd, shell, capture_output): def mock_subproc_run(cmd, shell, capture_output):
cmd_prefix = "sudo iptables -t nat " list_cmd = ["sudo", "iptables", "-t", "nat", "-n", "-L", "OUTPUT"]
list_cmd = "{}-n -L OUTPUT ".format(cmd_prefix) del_cmd = ["sudo", "iptables", "-t", "nat", "-D", "OUTPUT"]
del_cmd = "{}-D OUTPUT ".format(cmd_prefix) ins_cmd = ["sudo", "iptables", "-t", "nat", "-A", "OUTPUT", "-p", "tcp", "-d"]
ins_cmd = "{}-A OUTPUT -p tcp -d ".format(cmd_prefix)
assert shell assert shell == False
print("cmd={}".format(cmd)) print("cmd={}".format(cmd))
if cmd.startswith(list_cmd): if list_cmd == cmd[:len(list_cmd)]:
num = int(cmd[len(list_cmd):]) if len(cmd) == len(list_cmd) + 1:
out = current_rules[num] if len(current_rules) > num else "" num = int(cmd[len(list_cmd)])
return proc(0, out, "") out = current_rules[num] if len(current_rules) > num else ""
return proc(0, out, "")
if cmd.startswith(del_cmd): if del_cmd == cmd[:len(del_cmd)]:
num = int(cmd[len(del_cmd):]) if len(cmd) == len(del_cmd) + 1:
if num >= len(current_rules): num = int(cmd[len(del_cmd)])
print("delete num={} is greater than len={}".format(num, len(current_rules))) if num >= len(current_rules):
print("current_rules = {}".format(current_rules)) print("delete num={} is greater than len={}".format(num, len(current_rules)))
assert False print("current_rules = {}".format(current_rules))
del current_rules[num] assert False
return proc(0, "", "") del current_rules[num]
return proc(0, "", "")
if cmd.startswith(ins_cmd): if ins_cmd == cmd[:len(ins_cmd)]:
l = cmd.split() l = cmd
assert len(l) == 16 assert len(l) == 16
rule = "DNAT tcp -- 0.0.0.0/0 {} tcp dpt:{} to:{}".format(l[9], l[11], l[-1]) rule = "DNAT tcp -- 0.0.0.0/0 {} tcp dpt:{} to:{}".format(l[9], l[11], l[-1])
current_rules.append(rule) current_rules.append(rule)