[dhcp_server] add config dhcp_server add (#17489)

* dhcp_server add
* add test dup gw nm
This commit is contained in:
Xichen96 2023-12-21 01:09:46 +08:00 committed by GitHub
parent 1e92ba24ec
commit 13a16cf87f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 235 additions and 0 deletions

View File

@ -48,8 +48,24 @@ def mock_db():
if table == "STATE_DB":
return mock_state_db.get(key, {}).get(entry, None)
def hmset(table, key, value):
assert table == "CONFIG_DB" or table == "STATE_DB"
if table == "CONFIG_DB":
mock_config_db[key] = value
if table == "STATE_DB":
mock_state_db[key] = value
def exists(table, key):
assert table == "CONFIG_DB" or table == "STATE_DB"
if table == "CONFIG_DB":
return key in mock_config_db
if table == "STATE_DB":
return key in mock_state_db
db.keys = mock.Mock(side_effect=keys)
db.get_all = mock.Mock(side_effect=get_all)
db.get = mock.Mock(side_effect=get)
db.hmset = mock.Mock(side_effect=hmset)
db.exists = mock.Mock(side_effect=exists)
yield db

View File

@ -2,6 +2,16 @@
"FEATURE|dhcp_server": {
"state": "enabled"
},
"VLAN_INTERFACE|Vlan100": {
},
"VLAN_INTERFACE|Vlan100|100.1.1.1/24": {
},
"VLAN_INTERFACE|Vlan200": {
},
"VLAN_INTERFACE|Vlan200|100.1.1.2/24": {
},
"VLAN_INTERFACE|Vlan300": {
},
"DHCP_SERVER_IPV4|Vlan100": {
"gateway": "100.1.1.1",
"lease_time": "3600",

View File

@ -0,0 +1,115 @@
import sys
from unittest import mock
import pytest
from click.testing import CliRunner
import utilities_common.cli as clicommon
sys.path.append('../cli/config/plugins/')
import dhcp_server
class TestConfigDHCPServer(object):
def test_plugin_registration(self):
cli = mock.MagicMock()
dhcp_server.register(cli)
str_type = [[12, "whatever", False],
["text", "whatever", False],
["string", "whatever", True],
["binary", "12abc", False],
["binary", "123abc45", True],
["boolean", "True", False],
["boolean", "true", True],
["ipv4-address", "10.10.1", False],
["ipv4-address", "10.10.1.0", True],
["uint8", "4500", False],
["uint8", "-45", False],
["uint8", "45", True],
]
@pytest.mark.parametrize("type, value, result", str_type)
def test_validate_str_type(self, type, value, result):
assert dhcp_server.validate_str_type(type, value) == result
def test_config_dhcp_server_ipv4_add(self, mock_db):
expected_value = {
"gateway": "10.10.10.10",
"lease_time": "1000",
"mode": "PORT",
"netmask": "255.255.254.0",
"state": "disabled"
}
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan200", "--mode=PORT", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan200") == expected_value
def test_config_dhcp_server_ipv4_add_dup_gw_nm(self, mock_db):
expected_value = {
"gateway": "100.1.1.2",
"lease_time": "1000",
"mode": "PORT",
"netmask": "255.255.255.0",
"state": "disabled"
}
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan200", "--mode=PORT", "--lease_time=1000", "--dup_gw_nm"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan200") == expected_value
def test_config_dhcp_server_ipv4_add_illegal_mode(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan200", "--mode=WHATEVER", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_add_illegal_lease_time(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan200", "--mode=PORT", "--lease_time=-1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_add_no_vlan(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan400", "--mode=PORT", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_add_no_vlan_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan300", "--mode=PORT", "--lease_time=1000", "--dup_gw_nm"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_add_illegal_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan200", "--mode=PORT", "--lease_time=1000", "--gateway=10000.10.10.10", "--netmask=255.255.254.0"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_add_already_exist(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["add"], \
["Vlan100", "--mode=PORT", "--lease_time=1000", "--gateway=10.10.10.10", "--netmask=255.255.254.0"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

View File

@ -1,13 +1,107 @@
import click
import utilities_common.cli as clicommon
import ipaddress
import string
SUPPORT_TYPE = ["binary", "boolean", "ipv4-address", "string", "uint8", "uint16", "uint32"]
def validate_str_type(type, value):
"""
To validate whether type is consistent with string value
Args:
type: string, value type
value: checked value
Returns:
True, type consistent with value
False, type not consistent with value
"""
if not isinstance(value, str):
return False
if type not in SUPPORT_TYPE:
return False
if type == "string":
return True
if type == "binary":
if len(value) == 0 or len(value) % 2 != 0:
return False
return all(c in set(string.hexdigits) for c in value)
if type == "boolean":
return value in ["true", "false"]
if type == "ipv4-address":
try:
if len(value.split(".")) != 4:
return False
return ipaddress.ip_address(value).version == 4
except ValueError:
return False
if type.startswith("uint"):
if not value.isdigit():
return False
length = int("".join([c for c in type if c.isdigit()]))
return 0 <= int(value) <= int(pow(2, length)) - 1
return False
@click.group(cls=clicommon.AbbreviationGroup, name="dhcp_server")
def dhcp_server():
"""config DHCP Server information"""
ctx = click.get_current_context()
dbconn = db.db
if dbconn.get("CONFIG_DB", "FEATURE|dhcp_server", "state") != "enabled":
ctx.fail("Feature dhcp_server is not enabled")
@dhcp_server.group(cls=clicommon.AliasedGroup, name="ipv4")
def dhcp_server_ipv4():
"""Show ipv4 related dhcp_server info"""
pass
@dhcp_server_ipv4.command(name="add")
@click.argument("dhcp_interface", required=True)
@click.option("--mode", required=True)
@click.option("--lease_time", required=False, default="900")
@click.option("--dup_gw_nm", required=False, default=False, is_flag=True)
@click.option("--gateway", required=False)
@click.option("--netmask", required=False)
@clicommon.pass_db
def dhcp_server_ipv4_add(db, mode, lease_time, dup_gw_nm, gateway, netmask, dhcp_interface):
ctx = click.get_current_context()
if mode != "PORT":
ctx.fail("Only mode PORT is supported")
if not validate_str_type("uint32", lease_time):
ctx.fail("lease_time is required and must be nonnegative integer")
dbconn = db.db
if not dbconn.exists("CONFIG_DB", "VLAN_INTERFACE|" + dhcp_interface):
ctx.fail("dhcp_interface {} does not exist".format(dhcp_interface))
if dup_gw_nm:
dup_success = False
for key in dbconn.keys("CONFIG_DB", "VLAN_INTERFACE|" + dhcp_interface + "|*"):
intf = ipaddress.ip_interface(key.split("|")[2])
if intf.version != 4:
continue
dup_success = True
gateway, netmask = str(intf.ip), str(intf.netmask)
if not dup_success:
ctx.fail("failed to found gateway and netmask for Vlan interface {}".format(dhcp_interface))
elif not validate_str_type("ipv4-address", gateway) or not validate_str_type("ipv4-address", netmask):
ctx.fail("gateway and netmask must be valid ipv4 string")
key = "DHCP_SERVER_IPV4|" + dhcp_interface
if dbconn.exists("CONFIG_DB", key):
ctx.fail("Dhcp_interface %s already exist".format(dhcp_interface))
else:
dbconn.hmset("CONFIG_DB", key, {
"mode": mode,
"lease_time": lease_time,
"gateway": gateway,
"netmask": netmask,
"state": "disabled",
})
def register(cli):
# cli.add_command(dhcp_server)
pass