[Dhcp_server] add config dhcp_server bind/unbind (#17811)

* add dhcp_server bind/unbind
This commit is contained in:
Xichen96 2024-01-25 11:38:29 +08:00 committed by GitHub
parent 24f8f8b966
commit caefe1d17b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 348 additions and 11 deletions

View File

@ -8,10 +8,18 @@
},
"VLAN_INTERFACE|Vlan200": {
},
"VLAN_INTERFACE|Vlan200|100.1.1.2/24": {
"VLAN_INTERFACE|Vlan200|100.1.2.2/24": {
},
"VLAN_INTERFACE|Vlan300": {
},
"VLAN_MEMBER|Vlan100|Ethernet2": {
},
"VLAN_MEMBER|Vlan100|Ethernet4": {
},
"VLAN_MEMBER|Vlan100|Ethernet7": {
},
"VLAN_MEMBER|Vlan100|Ethernet8": {
},
"DHCP_SERVER_IPV4|Vlan100": {
"gateway": "100.1.1.1",
"lease_time": "3600",
@ -42,17 +50,23 @@
"DHCP_SERVER_IPV4_RANGE|range3": {
"range": "100.1.1.10"
},
"DHCP_SERVER_IPV4_RANGE|range5": {
"range": "100.1.2.10"
},
"DHCP_SERVER_IPV4_RANGE|range6": {
"range": "100.1.2.11"
},
"DHCP_SERVER_IPV4_IP|eth0": {
"ip": "240.127.1.2"
},
"DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet4": {
"ips": "100.1.1.10,10.1.1.11"
"ips": "100.1.1.10,100.1.1.11"
},
"DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7": {
"ranges": "range1,range3"
},
"DHCP_SERVER_IPV4_PORT|Vlan200|Ethernet8": {
"ranges": "range1,range4"
"ranges": "range5,range6"
},
"DHCP_SERVER_IPV4_PORT|Ethernet9": {
"ranges": "range5,range6"

View File

@ -51,7 +51,7 @@ class TestConfigDHCPServer(object):
def test_config_dhcp_server_ipv4_add_dup_gw_nm(self, mock_db):
expected_value = {
"gateway": "100.1.1.2",
"gateway": "100.1.2.2",
"lease_time": "1000",
"mode": "PORT",
"netmask": "255.255.255.0",
@ -370,3 +370,239 @@ class TestConfigDHCPServer(object):
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|range1") == False
def test_config_dhcp_server_ipv4_bind_range_nonexisting(self, mock_db):
expected_value = "range2,range3"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet2", "--range", "range2,range3"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet2", "ranges")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_bind_ip_nonexisting(self, mock_db):
expected_value = "100.1.1.1,100.1.1.2"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet2", "100.1.1.1,100.1.1.2"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet2", "ips")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_bind_range_existing_no_duplicate(self, mock_db):
expected_value = "range1,range2,range3"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet7", "--range", "range2"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7", "ranges")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_bind_range_existing_duplicate(self, mock_db):
expected_value = "range1,range2,range3"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet7", "--range", "range2,range3"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7", "ranges")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_bind_ip_existing_no_duplicate(self, mock_db):
expected_value = "100.1.1.10,100.1.1.11,100.1.1.12,100.1.1.13"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet4", "100.1.1.12,100.1.1.13"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet4", "ips")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_bind_ip_existing_duplicate(self, mock_db):
expected_value = "100.1.1.10,100.1.1.11,100.1.1.12"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet4", "100.1.1.11,100.1.1.12"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet4", "ips")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_bind_nonexisting_range(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet7", "--range", "range4"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_bind_range_out_of_subnet(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet7", "--range", "range5"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_bind_ip_out_of_subnet(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet4", "100.1.2.10"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_bind_interface_not_in_vlan(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet5", "100.1.1.10"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_bind_range_and_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet2", "100.1.1.13,100.1.1.14", "--range", "range3"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_bind_range_to_existing_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet4", "--range", "range3"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_bind_ip_to_existing_range(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet7", "100.1.1.13,100.1.1.14"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_bind_nothing(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["bind"], \
["Vlan100", "Ethernet2"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_unbind_range_with_remain(self, mock_db):
expected_value = "range1"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet7", "--range", "range3"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7", "ranges")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_unbind_range_with_no_remain(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet7", "--range", "range1,range3"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7") == False
def test_config_dhcp_server_ipv4_unbind_range_with_all(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet7", "all"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7") == False
def test_config_dhcp_server_ipv4_unbind_ip_with_remain(self, mock_db):
expected_value = "100.1.1.10"
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet4", "100.1.1.11"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
result = mock_db.get("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet4", "ips")
assert result and set(result.split(",")) == set(expected_value.split(","))
def test_config_dhcp_server_ipv4_unbind_ip_with_no_remain(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet4", "100.1.1.10,100.1.1.11"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet4") == False
def test_config_dhcp_server_ipv4_unbind_ip_all(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet4", "all"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert mock_db.exists("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet4") == False
def test_config_dhcp_server_ipv4_unbind_range_and_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet2", "100.1.1.13,100.1.1.14", "--range", "range3"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_unbind_range_to_existing_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet4", "--range", "range3"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_unbind_ip_to_existing_range(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet7", "100.1.1.13,100.1.1.14"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_unbind_nothing(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet4"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_unbind_unbind_range(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet7", "--range", "range2"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
def test_config_dhcp_server_ipv4_unbind_unbind_ip(self, mock_db):
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(dhcp_server.dhcp_server.commands["ipv4"].commands["unbind"], \
["Vlan100", "Ethernet4", "100.1.1.13,100.1.1.14"], obj=db)
assert result.exit_code == 2, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)

View File

@ -76,6 +76,10 @@ class TestShowDHCPServer(object):
+---------+------------+------------+------------------------+
| range3 | 100.1.1.10 | 100.1.1.10 | 1 |
+---------+------------+------------+------------------------+
| range5 | 100.1.2.10 | 100.1.2.10 | 1 |
+---------+------------+------------+------------------------+
| range6 | 100.1.2.11 | 100.1.2.11 | 1 |
+---------+------------+------------+------------------------+
"""
runner = CliRunner()
db = clicommon.Db()
@ -212,13 +216,13 @@ class TestShowDHCPServer(object):
| Interface | Bind |
+===================+============+
| Vlan100|Ethernet4 | 100.1.1.10 |
| | 10.1.1.11 |
| | 100.1.1.11 |
+-------------------+------------+
| Vlan100|Ethernet7 | range1 |
| | range3 |
+-------------------+------------+
| Vlan200|Ethernet8 | range1 |
| | range4 |
| Vlan200|Ethernet8 | range5 |
| | range6 |
+-------------------+------------+
| Ethernet9 | range5 |
| | range6 |
@ -253,7 +257,7 @@ class TestShowDHCPServer(object):
| Interface | Bind |
+===================+============+
| Vlan100|Ethernet4 | 100.1.1.10 |
| | 10.1.1.11 |
| | 100.1.1.11 |
+-------------------+------------+
| Vlan100|Ethernet7 | range1 |
| | range3 |
@ -271,8 +275,8 @@ class TestShowDHCPServer(object):
+-------------------+--------+
| Interface | Bind |
+===================+========+
| Vlan200|Ethernet8 | range1 |
| | range4 |
| Vlan200|Ethernet8 | range5 |
| | range6 |
+-------------------+--------+
"""
runner = CliRunner()

View File

@ -252,12 +252,95 @@ def dhcp_sever_ipv4_range_del(db, range_name, force):
for port in dbconn.keys("CONFIG_DB", "DHCP_SERVER_IPV4_PORT*"):
ranges = dbconn.get("CONFIG_DB", port, "ranges")
if ranges and range_name in ranges.split(","):
ctx.fail("Range {} is referenced in {}, cannot delete, add --force to bypass".format(range_name, port))
ctx.fail("Range {} is referenced in {}, cannot delete, add --force to bypass or range unbind to unbind range first".format(range_name, port))
dbconn.delete("CONFIG_DB", key)
else:
ctx.fail("Range {} does not exist, cannot delete".format(range_name))
@dhcp_server_ipv4.command(name="bind")
@click.argument("dhcp_interface", required=True)
@click.argument("member_interface", required=True)
@click.option("--range", "range_", required=False)
@click.argument("ip_list", required=False)
@clicommon.pass_db
def dhcp_server_ipv4_ip_bind(db, dhcp_interface, member_interface, range_, ip_list):
ctx = click.get_current_context()
dbconn = db.db
if not dbconn.exists("CONFIG_DB", "VLAN_MEMBER|" + dhcp_interface + "|" + member_interface):
ctx.fail("Cannot confirm member interface {} is really in dhcp interface {}".format(member_interface, dhcp_interface))
vlan_prefix = "VLAN_INTERFACE|" + dhcp_interface + "|"
subnets = [ipaddress.ip_network(key[len(vlan_prefix):], strict=False) for key in dbconn.keys("CONFIG_DB", vlan_prefix + "*")]
if range_:
range_ = set(range_.split(","))
for r in range_:
if not dbconn.exists("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|" + r):
ctx.fail("Cannot bind nonexistent range {} to interface".format(r))
ip_range = dbconn.get("CONFIG_DB", "DHCP_SERVER_IPV4_RANGE|" + r, "range").split(",")
if len(ip_range) == 1:
ip_start = ip_range[0]
ip_end = ip_range[0]
if len(ip_range) == 2:
ip_start = ip_range[0]
ip_end = ip_range[1]
if not any([ipaddress.ip_address(ip_start) in subnet and ipaddress.ip_address(ip_end) in subnet for subnet in subnets]):
ctx.fail("Range {} is not in any subnet of vlan {}".format(r, dhcp_interface))
if ip_list:
ip_list = set(ip_list.split(","))
for ip in ip_list:
if not validate_str_type("ipv4-address", ip):
ctx.fail("Illegal IP address {}".format(ip))
if not any([ipaddress.ip_address(ip) in subnet for subnet in subnets]):
ctx.fail("IP {} is not in any subnet of vlan {}".format(ip, dhcp_interface))
if range_ and ip_list or not range_ and not ip_list:
ctx.fail("Only one of range and ip list need to be provided")
key = "DHCP_SERVER_IPV4_PORT|" + dhcp_interface + "|" + member_interface
key_exist = dbconn.exists("CONFIG_DB", key)
for bind_value_name, bind_value in [["ips", ip_list], ["ranges", range_]]:
if key_exist:
existing_value = dbconn.get("CONFIG_DB", key, bind_value_name)
if (not not existing_value) == (not bind_value):
ctx.fail("IP bind cannot have ip range and ip list configured at the same time")
if bind_value:
value_set = set(existing_value.split(",")) if existing_value else set()
new_value_set = value_set.union(bind_value)
dbconn.set("CONFIG_DB", key, bind_value_name, ",".join(new_value_set))
elif bind_value:
dbconn.hmset("CONFIG_DB", key, {bind_value_name: ",".join(bind_value)})
@dhcp_server_ipv4.command(name="unbind")
@click.argument("dhcp_interface", required=True)
@click.argument("member_interface", required=True)
@click.option("--range", "range_", required=False)
@click.argument("ip_list", required=False)
@clicommon.pass_db
def dhcp_server_ipv4_ip_unbind(db, dhcp_interface, member_interface, range_, ip_list):
ctx = click.get_current_context()
dbconn = db.db
key = "DHCP_SERVER_IPV4_PORT|" + dhcp_interface + "|" + member_interface
if ip_list == "all":
dbconn.delete("CONFIG_DB", key)
return
if range_ and ip_list or not range_ and not ip_list:
ctx.fail("Only one of range and ip list need to be provided")
if not dbconn.exists("CONFIG_DB", key):
ctx.fail("The specified dhcp_interface and member interface is not bind to ip or range")
for unbind_value_name, unbind_value in [["ips", ip_list], ["ranges", range_]]:
if unbind_value:
unbind_value = set(unbind_value.split(","))
existing_value = dbconn.get("CONFIG_DB", key, unbind_value_name)
value_set = set(existing_value.split(",")) if existing_value else set()
if value_set.issuperset(unbind_value):
new_value_set = value_set.difference(unbind_value)
if new_value_set:
dbconn.set("CONFIG_DB", key, unbind_value_name, ",".join(new_value_set))
else:
dbconn.delete("CONFIG_DB", key)
else:
ctx.fail("Attempting to unbind range or ip that is not binded")
def register(cli):
# cli.add_command(dhcp_server)
pass