[dhcp_server] add config dhcp server option (#18013)

* add dhcp server option cli
This commit is contained in:
Xichen96 2024-02-22 08:52:51 +08:00 committed by GitHub
parent 8506826348
commit 2244aa2d7b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 259 additions and 9 deletions

View File

@ -33,11 +33,15 @@
"lease_time": "3600",
"mode": "PORT",
"netmask": "255.255.255.0",
"customized_options": "option60",
"state": "disabled"
},
"DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|option60": {
"id": "60",
"id": "163",
"type": "string",
"value": "dummy_value"
},
"DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|option61": {
"id": "164",
"type": "string",
"value": "dummy_value"
},

View File

@ -606,3 +606,153 @@ class TestConfigDHCPServer(object):
["Vlan100", "Ethernet4", "100.1.1.13,100.1.1.14"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_add(self, mock_db):
expected_value = {
"option_id": "165",
"type": "string",
"value": "dummy_value"
}
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["add"], \
["option62", "165", "string", "dummy_value"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get_all("CONFIG_DB", "DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|option62") == expected_value
def test_config_dhcp_server_ipv4_option_add_existing(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["add"], \
["option60", "163", "string", "dummy_value"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_add_illegal_option_id(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["add"], \
["option62", "10", "string", "dummy_value"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_add_illegal_type(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["add"], \
["option62", "165", "xx", "xx"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_add_illegal_value(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["add"], \
["option62", "165", "uint8", "1000000"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_del(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["del"], \
["option61"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|option61") == False
def test_config_dhcp_server_ipv4_option_del_nonexisting(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["del"], \
["option62"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_del_referenced(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["del"], \
["option60"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_bind(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["bind"], \
["Vlan300", "option60"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan300", "customized_options") == "option60"
def test_config_dhcp_server_ipv4_option_bind_multiple_options(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["bind"], \
["Vlan300", "option60,option61"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan300", "customized_options")
assert result and set(result.split(",")) == set("option60,option61".split(","))
def test_config_dhcp_server_ipv4_option_bind_to_existing(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["bind"], \
["Vlan100", "option61"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan100", "customized_options")
assert result and set(result.split(",")) == set("option60,option61".split(","))
def test_config_dhcp_server_ipv4_option_bind_same_option_to_existing(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["bind"], \
["Vlan100", "option60"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan100", "customized_options") == "option60"
def test_config_dhcp_server_ipv4_option_bind_to_nonexisting_intf(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["bind"], \
["Vlan200", "option60"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_bind_nonexisting_option(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["bind"], \
["Vlan300", "option62"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_unbind(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["unbind"], \
["Vlan100", "option60"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4|Vlan100", "customized_options")
assert result == None or result == ""
def test_config_dhcp_server_ipv4_option_unbind_nonexisting_intf(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["unbind"], \
["Vlan200", "option60"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_option_unbind_nonexisting_option(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["option"].commands["unbind"], \
["Vlan100", "option61"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

View File

@ -185,7 +185,9 @@ class TestShowDHCPServer(object):
+---------------+-------------+-------------+--------+
| Option Name | Option ID | Value | Type |
+===============+=============+=============+========+
| option60 | 60 | dummy_value | string |
| option60 | 163 | dummy_value | string |
+---------------+-------------+-------------+--------+
| option61 | 164 | dummy_value | string |
+---------------+-------------+-------------+--------+
"""
runner = CliRunner()
@ -200,7 +202,7 @@ class TestShowDHCPServer(object):
+---------------+-------------+-------------+--------+
| Option Name | Option ID | Value | Type |
+===============+=============+=============+========+
| option60 | 60 | dummy_value | string |
| option60 | 163 | dummy_value | string |
+---------------+-------------+-------------+--------+
"""
runner = CliRunner()

View File

@ -5,7 +5,7 @@ import ipaddress
import string
SUPPORT_TYPE = ["binary", "boolean", "ipv4-address", "string", "uint8", "uint16", "uint32"]
SUPPORTED_TYPE = ["binary", "boolean", "ipv4-address", "string", "uint8", "uint16", "uint32"]
def validate_str_type(type_, value):
@ -20,7 +20,7 @@ def validate_str_type(type_, value):
"""
if not isinstance(value, str):
return False
if type_ not in SUPPORT_TYPE:
if type_ not in SUPPORTED_TYPE:
return False
if type_ == "string":
return True
@ -46,6 +46,7 @@ def validate_str_type(type_, value):
@click.group(cls=clicommon.AbbreviationGroup, name="dhcp_server")
@clicommon.pass_db
def dhcp_server():
"""config DHCP Server information"""
ctx = click.get_current_context()
@ -341,10 +342,103 @@ def dhcp_server_ipv4_ip_unbind(db, dhcp_interface, member_interface, range_, ip_
ctx.fail("Attempting to unbind range or ip that is not binded")
def register(cli):
# cli.add_command(dhcp_server)
@dhcp_server_ipv4.group(cls=clicommon.AliasedGroup, name="option")
def dhcp_server_ipv4_option():
pass
SUPPORTED_OPTION_ID = ["147", "148", "149", "163", "164", "165", "166", "167", "168", "169", "170", "171", "172", "173", "174", "178", "179", "180", "181", "182", "183", "184", "185", "186", "187", "188", "189", "190", "191", "192", "193", "194", "195", "196", "197", "198", "199", "200", "201", "202", "203", "204", "205", "206", "207", "214", "215", "216", "217", "218", "219", "222", "223"]
@dhcp_server_ipv4_option.command(name="add")
@click.argument("option_name", required=True)
@click.argument("option_id", required=True)
@click.argument("type_", required=True)
@click.argument("value", required=True)
@clicommon.pass_db
def dhcp_server_ipv4_option_add(db, option_name, option_id, type_, value):
ctx = click.get_current_context()
if option_id not in SUPPORTED_OPTION_ID:
ctx.fail("Option id {} is not supported".format(option_id))
if type_ not in SUPPORTED_TYPE:
ctx.fail("Input type is not supported")
if not validate_str_type(type_, value):
ctx.fail("Value {} is not of type {}".format(value, type_))
dbconn = db.db
key = "DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|" + option_name
if dbconn.exists("CONFIG_DB", key):
ctx.fail("Option {} already exist".format(option_name))
dbconn.hmset("CONFIG_DB", key, {
"option_id": option_id,
"type": type_,
"value": value,
})
@dhcp_server_ipv4_option.command(name="del")
@click.argument("option_name", required=True)
@clicommon.pass_db
def dhcp_server_ipv4_option_del(db, option_name):
ctx = click.get_current_context()
dbconn = db.db
option_key = "DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|" + option_name
if not dbconn.exists("CONFIG_DB", option_key):
ctx.fail("Option {} does not exist, cannot delete".format(option_name))
for key in dbconn.keys("CONFIG_DB", "DHCP_SERVER_IPV4|*"):
existing_options = dbconn.get("CONFIG_DB", key, "customized_options")
if existing_options and option_name in existing_options.split(","):
ctx.fail("Option {} is referenced in {}, cannot delete".format(option_name, key[len("DHCP_SERVER_IPV4|"):]))
dbconn.delete("CONFIG_DB", option_key)
@dhcp_server_ipv4_option.command(name="bind")
@click.argument("dhcp_interface", required=True)
@click.argument("option_list", required=True)
@clicommon.pass_db
def dhcp_server_ipv4_option_bind(db, dhcp_interface, option_list):
ctx = click.get_current_context()
dbconn = db.db
key = "DHCP_SERVER_IPV4|" + dhcp_interface
if not dbconn.exists("CONFIG_DB", key):
ctx.fail("Interface {} is not valid dhcp interface".format(dhcp_interface))
option_list = option_list.split(",")
for option_name in option_list:
option_key = "DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|" + option_name
if not dbconn.exists("CONFIG_DB", option_key):
ctx.fail("Option {} does not exist, cannot bind".format(option_name))
existing_value = dbconn.get("CONFIG_DB", key, "customized_options")
value_set = set(existing_value.split(",")) if existing_value else set()
new_value_set = value_set.union(option_list)
dbconn.set("CONFIG_DB", key, "customized_options", ",".join(new_value_set))
@dhcp_server_ipv4_option.command(name="unbind")
@click.argument("dhcp_interface", required=True)
@click.argument("option_list", required=False)
@click.option("--all", "all_", required=False, default=False, is_flag=True)
@clicommon.pass_db
def dhcp_server_ipv4_option_unbind(db, dhcp_interface, option_list, all_):
ctx = click.get_current_context()
dbconn = db.db
key = "DHCP_SERVER_IPV4|" + dhcp_interface
if not dbconn.exists("CONFIG_DB", key):
ctx.fail("Interface {} is not valid dhcp interface".format(dhcp_interface))
if all_:
dbconn.set("CONFIG_DB", key, "customized_options", "")
else:
unbind_value = set(option_list.split(","))
existing_value = dbconn.get("CONFIG_DB", key, "customized_options")
value_set = set(existing_value.split(",")) if existing_value else set()
if value_set.issuperset(unbind_value):
new_value_set = value_set.difference(unbind_value)
dbconn.set("CONFIG_DB", key, "customized_options", ",".join(new_value_set))
else:
ctx.fail("Attempting to unbind option that is not binded")
def register(cli):
cli.add_command(dhcp_server)
if __name__ == '__main__':
dhcp_server()

View File

@ -13,7 +13,7 @@ def ts_to_str(ts):
return datetime.fromtimestamp(int(ts)).strftime("%Y-%m-%d %H:%M:%S")
@click.group(cls=clicommon.AliasedGroup)
@click.group(cls=clicommon.AbbreviationGroup, name="dhcp_server")
@clicommon.pass_db
def dhcp_server(db):
"""Show dhcp_server related info"""