[dhcp_server] add show dhcp server port (#17491)

* add show dhcp_server port
This commit is contained in:
Xichen96 2024-01-10 13:00:01 +08:00 committed by GitHub
parent 5987dcf59e
commit 6ceec9a78b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 0 deletions

View File

@ -44,5 +44,17 @@
},
"DHCP_SERVER_IPV4_IP|eth0": {
"ip": "240.127.1.2"
},
"DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet4": {
"ips": "100.1.1.10,10.1.1.11"
},
"DHCP_SERVER_IPV4_PORT|Vlan100|Ethernet7": {
"ranges": "range1,range2"
},
"DHCP_SERVER_IPV4_PORT|Vlan200|Ethernet8": {
"ranges": "range3,range4"
},
"DHCP_SERVER_IPV4_PORT|Ethernet9": {
"ranges": "range5,range6"
}
}

View File

@ -175,3 +175,81 @@ option60 60 dummy_value string
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["option"], ["option60"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert result.stdout == expected_stdout
def test_show_dhcp_server_ipv4_port_without_intf(self, mock_db):
expected_stdout = """\
Interface Bind
----------------- ----------
Vlan100|Ethernet4 100.1.1.10
10.1.1.11
Vlan100|Ethernet7 range1
range2
Vlan200|Ethernet8 range3
range4
Ethernet9 range5
range6
"""
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["port"], [], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert result.stdout == expected_stdout
def test_show_dhcp_server_ipv4_port_with_port(self, mock_db):
expected_stdout = """\
Interface Bind
----------------- ------
Vlan100|Ethernet7 range1
range2
"""
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["port"], ["Ethernet7"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert result.stdout == expected_stdout
def test_show_dhcp_server_ipv4_port_with_vlan(self, mock_db):
expected_stdout = """\
Interface Bind
----------------- ----------
Vlan100|Ethernet4 100.1.1.10
10.1.1.11
Vlan100|Ethernet7 range1
range2
"""
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["port"], ["Vlan100"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert result.stdout == expected_stdout
def test_show_dhcp_server_ipv4_port_with_port_and_vlan(self, mock_db):
expected_stdout = """\
Interface Bind
----------------- ------
Vlan200|Ethernet8 range3
range4
"""
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["port"], ["Vlan200|Ethernet8"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert result.stdout == expected_stdout
def test_show_dhcp_server_ipv4_port_with_single_port(self, mock_db):
expected_stdout = """\
Interface Bind
----------- ------
Ethernet9 range5
range6
"""
runner = CliRunner()
db = clicommon.Db()
db.db = mock_db
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["port"], ["Ethernet9"], obj=db)
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
assert result.stdout == expected_stdout

View File

@ -5,6 +5,8 @@ import utilities_common.cli as clicommon
import ipaddress
from datetime import datetime
import fnmatch
import re
def ts_to_str(ts):
@ -79,6 +81,17 @@ def range(db, range_name):
click.echo(tabulate(table, headers=headers))
def dhcp_interface_is_match(input_, key):
regex = fnmatch.translate(input_)
regex = re.compile(regex)
if regex.match(key):
return True
for item in key.split("|"):
if regex.match(item):
return True
return False
@ipv4.command()
@click.argument('dhcp_interface', required=False)
@click.option('--with_customized_options', default=False, is_flag=True)
@ -116,5 +129,25 @@ def option(db, option_name):
click.echo(tabulate(table, headers=headers))
@ipv4.command()
@click.argument('interface', required=False)
@clicommon.pass_db
def port(db, interface):
if not interface:
interface = "*"
headers = ["Interface", "Bind"]
table = []
dbconn = db.db
for key in dbconn.keys("CONFIG_DB", "DHCP_SERVER_IPV4_PORT|*"):
intf = key[len("DHCP_SERVER_IPV4_PORT|"):]
if dhcp_interface_is_match(interface, intf):
entry = dbconn.get_all("CONFIG_DB", key)
if "ranges" in entry:
table.append([intf, entry["ranges"].replace(",", "\n")])
if "ips" in entry:
table.append([intf, entry["ips"].replace(",", "\n")])
click.echo(tabulate(table, headers=headers))
def register(cli):
cli.add_command(dhcp_server)