[system-health] Remove subprocess with shell=True (#12572)

Signed-off-by: maipbui <maibui@microsoft.com>
#### Why I did it
`subprocess` is used with `shell=True`, which is very dangerous for shell injection.
#### How I did it
remove `shell=True`, use `shell=False`
#### How to verify it
Pass UT
Manual test
This commit is contained in:
Mai Bui 2022-11-02 10:16:48 -04:00 committed by GitHub
parent e1440f0044
commit b3a8167968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 9 deletions

View File

@ -24,13 +24,13 @@ class ServiceChecker(HealthChecker):
CRITICAL_PROCESSES_PATH = 'etc/supervisor/critical_processes'
# Command to get merged directory of a container
GET_CONTAINER_FOLDER_CMD = 'docker inspect {} --format "{{{{.GraphDriver.Data.MergedDir}}}}"'
GET_CONTAINER_FOLDER_CMD = ['docker', 'inspect', '', '--format', "{{.GraphDriver.Data.MergedDir}}"]
# Command to query the status of monit service.
CHECK_MONIT_SERVICE_CMD = 'systemctl is-active monit.service'
CHECK_MONIT_SERVICE_CMD = ['systemctl', 'is-active', 'monit.service']
# Command to get summary of critical system service.
CHECK_CMD = 'monit summary -B'
CHECK_CMD = ['monit', 'summary', '-B']
MIN_CHECK_CMD_LINES = 3
# Expect status for different system service category.
@ -168,7 +168,8 @@ class ServiceChecker(HealthChecker):
self.need_save_cache = True
def _get_container_folder(self, container):
container_folder = utils.run_command(ServiceChecker.GET_CONTAINER_FOLDER_CMD.format(container))
ServiceChecker.GET_CONTAINER_FOLDER_CMD[2] = str(container)
container_folder = utils.run_command(ServiceChecker.GET_CONTAINER_FOLDER_CMD)
if container_folder is None:
return container_folder
@ -327,7 +328,7 @@ class ServiceChecker(HealthChecker):
# We are using supervisorctl status to check the critical process status. We cannot leverage psutil here because
# it not always possible to get process cmdline in supervisor.conf. E.g, cmdline of orchagent is "/usr/bin/orchagent",
# however, in supervisor.conf it is "/usr/bin/orchagent.sh"
cmd = 'docker exec {} bash -c "supervisorctl status"'.format(container_name)
cmd = ['docker', 'exec', str(container_name), 'bash', '-c', "supervisorctl status"]
process_status = utils.run_command(cmd)
if process_status is None:
for process_name in critical_process_list:

View File

@ -235,7 +235,7 @@ class Sysmonitor(ProcessTaskBase):
#Gets the service properties
def run_systemctl_show(self, service):
command = ('systemctl show {} --property=Id,LoadState,UnitFileState,Type,ActiveState,SubState,Result'.format(service))
command = ['systemctl', 'show', str(service), '--property=Id,LoadState,UnitFileState,Type,ActiveState,SubState,Result']
output = utils.run_command(command)
srv_properties = output.split('\n')
prop_dict = {}

View File

@ -8,7 +8,7 @@ def run_command(command):
:return: Output of the shell command.
"""
try:
process = subprocess.Popen(command, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.communicate()[0]
except Exception:
return None

View File

@ -504,10 +504,10 @@ def test_manager(mock_hw_info, mock_service_info, mock_udc_info):
manager._set_system_led(chassis, manager.config, 'normal')
def test_utils():
output = utils.run_command('some invalid command')
output = utils.run_command(['some', 'invalid', 'command'])
assert not output
output = utils.run_command('ls')
output = utils.run_command(['ls'])
assert output