[sonic-py-common] Add getstatusoutput_noshell() functions to general module (#12065)

Signed-off-by: maipbui <maibui@microsoft.com>
#### Why I did it
`getstatusoutput()` function from `subprocess` module has shell injection issue because it includes `shell=True` in the implementation
Eliminate duplicate code
#### How I did it
Reimplement `getstatusoutput_noshell()` and `getstatusoutput_noshell_pipe()` functions with `shell=False`
Add `check_output_pipe()` function
#### How to verify it
Pass UT
This commit is contained in:
Mai Bui 2022-09-22 06:40:42 -07:00 committed by GitHub
parent 8af369a7c9
commit 283efeeacc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 99 additions and 0 deletions

View File

@ -1,4 +1,5 @@
import sys
from subprocess import Popen, STDOUT, PIPE, CalledProcessError, check_output
def load_module_from_source(module_name, file_path):
@ -23,3 +24,70 @@ def load_module_from_source(module_name, file_path):
sys.modules[module_name] = module
return module
def getstatusoutput_noshell(cmd):
"""
This function implements getstatusoutput API from subprocess module
but using shell=False to prevent shell injection.
Ref: https://github.com/python/cpython/blob/3.10/Lib/subprocess.py#L602
"""
try:
output = check_output(cmd, universal_newlines=True, stderr=STDOUT)
exitcode = 0
except CalledProcessError as ex:
output = ex.output
exitcode = ex.returncode
if output[-1:] == '\n':
output = output[:-1]
return exitcode, output
def getstatusoutput_noshell_pipe(cmd0, *args):
"""
This function implements getstatusoutput API from subprocess module
but using shell=False to prevent shell injection. Input command
includes two or more commands connected by shell pipe(s).
"""
popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)]
i = 0
while i < len(args):
popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True))
popens[i].stdout.close()
i += 1
output = popens[-1].communicate()[0]
if output[-1:] == '\n':
output = output[:-1]
exitcodes = [0] * len(popens)
while popens:
last = popens.pop(-1)
exitcodes[len(popens)] = last.wait()
return (exitcodes, output)
def check_output_pipe(cmd0, *args):
"""
This function implements check_output API from subprocess module.
Input command includes two or more commands connected by shell pipe(s)
"""
popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)]
i = 0
while i < len(args):
popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True))
popens[i].stdout.close()
i += 1
output = popens[-1].communicate()[0]
i = 0
args_list = [cmd0] + list(args)
while popens:
current = popens.pop(0)
exitcode = current.wait()
if exitcode != 0:
raise CalledProcessError(returncode=exitcode, cmd=args_list[i], output=current.stdout)
i += 1
return output

View File

@ -0,0 +1,31 @@
import sys
import pytest
import subprocess
from sonic_py_common.general import getstatusoutput_noshell, getstatusoutput_noshell_pipe, check_output_pipe
def test_getstatusoutput_noshell(tmp_path):
exitcode, output = getstatusoutput_noshell(['echo', 'sonic'])
assert (exitcode, output) == (0, 'sonic')
exitcode, output = getstatusoutput_noshell([sys.executable, "-c", "import sys; sys.exit(6)"])
assert exitcode != 0
def test_getstatusoutput_noshell_pipe():
exitcode, output = getstatusoutput_noshell_pipe(['echo', 'sonic'], ['awk', '{print $1}'])
assert (exitcode, output) == ([0, 0], 'sonic')
exitcode, output = getstatusoutput_noshell_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(8)"])
assert exitcode == [6, 8]
def test_check_output_pipe():
output = check_output_pipe(['echo', 'sonic'], ['awk', '{print $1}'])
assert output == 'sonic\n'
with pytest.raises(subprocess.CalledProcessError) as e:
check_output_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(0)"])
assert e.returncode == [6, 0]
with pytest.raises(subprocess.CalledProcessError) as e:
check_output_pipe([sys.executable, "-c", "import sys; sys.exit(0)"], [sys.executable, "-c", "import sys; sys.exit(6)"])
assert e.returncode == [0, 6]