648ca075c7
Signed-off-by: maipbui <maibui@microsoft.com> Dependency: [PR (#12065)](https://github.com/sonic-net/sonic-buildimage/pull/12065) needs to merge first. #### Why I did it `subprocess.Popen()` and `subprocess.check_output()` is used with `shell=True`, which is very dangerous for shell injection. #### How I did it Disable `shell=True`, enable `shell=False` #### How to verify it Tested on DUT, compare and verify the output between the original behavior and the new changes' behavior. [testresults.zip](https://github.com/sonic-net/sonic-buildimage/files/9550867/testresults.zip)
128 lines
3.6 KiB
Python
128 lines
3.6 KiB
Python
#
|
|
# Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES.
|
|
# Apache-2.0
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import os
|
|
import pytest
|
|
import sys
|
|
if sys.version_info.major == 3:
|
|
from unittest import mock
|
|
else:
|
|
import mock
|
|
|
|
test_path = os.path.dirname(os.path.abspath(__file__))
|
|
modules_path = os.path.dirname(test_path)
|
|
sys.path.insert(0, modules_path)
|
|
|
|
from sonic_platform import utils
|
|
|
|
|
|
class TestUtils:
|
|
def test_read_file(self):
|
|
ret = utils.read_str_from_file('not exist', 'default return')
|
|
assert ret == 'default return'
|
|
|
|
with pytest.raises(IOError):
|
|
ret = utils.read_str_from_file('not exist', 'default return', raise_exception=True)
|
|
assert ret == 'default return'
|
|
|
|
ret = utils.read_int_from_file('not exist', 100)
|
|
assert ret == 100
|
|
|
|
with pytest.raises(IOError):
|
|
ret = utils.read_int_from_file('not exist', 200, raise_exception=True)
|
|
assert ret == 200
|
|
|
|
ret = utils.read_float_from_file('not exist', 3.14)
|
|
assert ret == 3.14
|
|
|
|
with pytest.raises(IOError):
|
|
ret = utils.read_float_from_file('not exist', 2.25, raise_exception=True)
|
|
assert ret == 2.25
|
|
|
|
def test_write_file(self):
|
|
file_path = '/tmp/test.txt'
|
|
utils.write_file(file_path, ' hello ')
|
|
assert utils.read_str_from_file(file_path) == 'hello'
|
|
|
|
utils.write_file(file_path, '123 ')
|
|
assert utils.read_int_from_file(file_path) == 123
|
|
|
|
utils.write_file(file_path, '3.14 ')
|
|
assert utils.read_float_from_file(file_path) == 3.14
|
|
|
|
with pytest.raises(IOError):
|
|
utils.write_file('/not/exist/file', '123', raise_exception=True)
|
|
|
|
def test_pre_initialize(self):
|
|
mock_call = mock.MagicMock()
|
|
|
|
class A:
|
|
@utils.pre_initialize(mock_call)
|
|
def func(self):
|
|
pass
|
|
|
|
A().func()
|
|
assert mock_call.call_count == 1
|
|
|
|
def test_pre_initialize_one(self):
|
|
mock_call = mock.MagicMock()
|
|
|
|
class A:
|
|
@utils.pre_initialize_one(mock_call)
|
|
def func(self, index):
|
|
pass
|
|
|
|
a = A()
|
|
a.func(34)
|
|
mock_call.assert_called_once_with(a, 34)
|
|
|
|
def test_read_only_cache(self):
|
|
value = 100
|
|
|
|
def func():
|
|
return value
|
|
|
|
assert func() == 100
|
|
value = 1000
|
|
assert func() == 1000
|
|
|
|
@utils.read_only_cache()
|
|
def func():
|
|
return value
|
|
|
|
assert func() == 1000
|
|
value = 10000
|
|
assert func() == 1000
|
|
|
|
@mock.patch('sonic_py_common.logger.Logger.log_debug')
|
|
def test_default_return(self, mock_log):
|
|
@utils.default_return(100, log_func=mock_log)
|
|
def func():
|
|
raise RuntimeError('')
|
|
|
|
assert func() == 100
|
|
assert mock_log.call_count == 1
|
|
|
|
def test_run_command(self):
|
|
output = utils.run_command(['ls'])
|
|
assert output
|
|
|
|
@mock.patch('sonic_py_common.device_info.get_path_to_hwsku_dir', mock.MagicMock(return_value='/tmp'))
|
|
def test_extract_RJ45_ports_index(self):
|
|
rj45_list = utils.extract_RJ45_ports_index()
|
|
assert rj45_list is None
|