[dhcp_server] Add show dhcp_server ipv4 lease (#17125)
* Add show dhcp_server ipv4 lease * add ut for show dhcp_server ipv4 lease
This commit is contained in:
parent
1bf2012de4
commit
ee38e2447d
55
dockers/docker-dhcp-server/cli-plugin-tests/conftest.py
Normal file
55
dockers/docker-dhcp-server/cli-plugin-tests/conftest.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import mock_tables
|
||||||
|
|
||||||
|
TEST_DATA_PATH = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def mock_db():
|
||||||
|
db = mock.Mock()
|
||||||
|
|
||||||
|
with open(os.path.join(TEST_DATA_PATH, "mock_config_db.json")) as f:
|
||||||
|
s = f.read()
|
||||||
|
mock_config_db = json.loads(s)
|
||||||
|
with open(os.path.join(TEST_DATA_PATH, "mock_state_db.json")) as f:
|
||||||
|
s = f.read()
|
||||||
|
mock_state_db = json.loads(s)
|
||||||
|
|
||||||
|
def keys(table, pattern="*"):
|
||||||
|
assert table == "CONFIG_DB" or table == "STATE_DB"
|
||||||
|
|
||||||
|
import fnmatch
|
||||||
|
import re
|
||||||
|
|
||||||
|
regex = fnmatch.translate(pattern)
|
||||||
|
regex = re.compile(regex)
|
||||||
|
|
||||||
|
if table == "CONFIG_DB":
|
||||||
|
return [key for key in mock_config_db if regex.match(key)]
|
||||||
|
if table == "STATE_DB":
|
||||||
|
return [key for key in mock_state_db if regex.match(key)]
|
||||||
|
|
||||||
|
def get_all(table, key):
|
||||||
|
assert table == "CONFIG_DB" or table == "STATE_DB"
|
||||||
|
if table == "CONFIG_DB":
|
||||||
|
return mock_config_db.get(key, {})
|
||||||
|
if table == "STATE_DB":
|
||||||
|
return mock_state_db.get(key, {})
|
||||||
|
|
||||||
|
def get(table, key, entry):
|
||||||
|
assert table == "CONFIG_DB" or table == "STATE_DB"
|
||||||
|
if table == "CONFIG_DB":
|
||||||
|
return mock_config_db.get(key, {}).get(entry, None)
|
||||||
|
if table == "STATE_DB":
|
||||||
|
return mock_state_db.get(key, {}).get(entry, None)
|
||||||
|
|
||||||
|
db.keys = mock.Mock(side_effect=keys)
|
||||||
|
db.get_all = mock.Mock(side_effect=get_all)
|
||||||
|
db.get = mock.Mock(side_effect=get)
|
||||||
|
|
||||||
|
yield db
|
@ -0,0 +1,32 @@
|
|||||||
|
{
|
||||||
|
"DHCP_SERVER_IPV4|Vlan100": {
|
||||||
|
"gateway": "100.1.1.1",
|
||||||
|
"lease_time": "3600",
|
||||||
|
"mode": "PORT",
|
||||||
|
"netmask": "255.255.255.0",
|
||||||
|
"customized_options": [
|
||||||
|
"option60"
|
||||||
|
],
|
||||||
|
"state": "enabled"
|
||||||
|
},
|
||||||
|
"DHCP_SERVER_IPV4_CUSTOMIZED_OPTIONS|option60": {
|
||||||
|
"id": "60",
|
||||||
|
"type": "string",
|
||||||
|
"value": "dummy_value"
|
||||||
|
},
|
||||||
|
"DHCP_SERVER_IPV4_RANGE|range1": {
|
||||||
|
"ranges": [
|
||||||
|
"100.1.1.3",
|
||||||
|
"100.1.1.5"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"DHCP_SERVER_IPV4_RANGE|range2": {
|
||||||
|
"ips": [
|
||||||
|
"100.1.1.7",
|
||||||
|
"100.1.1.8"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"DHCP_SERVER_IPV4_IP|eth0": {
|
||||||
|
"ip": "240.127.1.2"
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,26 @@
|
|||||||
|
{
|
||||||
|
"DHCP_SERVER_IPV4_LEASE|Vlan1000|10:70:fd:b6:13:00": {
|
||||||
|
"lease_start": "1677640581",
|
||||||
|
"lease_end": "1677641481",
|
||||||
|
"ip": "192.168.0.1"
|
||||||
|
},
|
||||||
|
"DHCP_SERVER_IPV4_LEASE|Vlan1000|10:70:fd:b6:13:01": {
|
||||||
|
"lease_start": "1677640581",
|
||||||
|
"lease_end": "1677641481",
|
||||||
|
"ip": "192.168.0.2"
|
||||||
|
},
|
||||||
|
"DHCP_SERVER_IPV4_LEASE|Vlan1001|10:70:fd:b6:13:02": {
|
||||||
|
"lease_start": "1677640581",
|
||||||
|
"lease_end": "1677641481",
|
||||||
|
"ip": "192.168.0.3"
|
||||||
|
},
|
||||||
|
"DHCP_SERVER_IPV4_SERVER_IP|eth0": {
|
||||||
|
"ip": "240.127.1.2"
|
||||||
|
},
|
||||||
|
"FDB_TABLE|Vlan1000:10:70:fd:b6:13:00": {
|
||||||
|
"port": "Ethernet10"
|
||||||
|
},
|
||||||
|
"FDB_TABLE|Vlan1000:10:70:fd:b6:13:01": {
|
||||||
|
"port": "Ethernet11"
|
||||||
|
}
|
||||||
|
}
|
154
dockers/docker-dhcp-server/cli-plugin-tests/mock_tables.py
Normal file
154
dockers/docker-dhcp-server/cli-plugin-tests/mock_tables.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
# MONKEY PATCH!!!
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
import mockredis
|
||||||
|
import redis
|
||||||
|
import swsssdk
|
||||||
|
from sonic_py_common import multi_asic
|
||||||
|
from swsssdk import SonicDBConfig, SonicV2Connector, ConfigDBConnector, ConfigDBPipeConnector
|
||||||
|
from swsscommon import swsscommon
|
||||||
|
|
||||||
|
|
||||||
|
topo = None
|
||||||
|
dedicated_dbs = {}
|
||||||
|
|
||||||
|
def clean_up_config():
|
||||||
|
# Set SonicDBConfig variables to initial state
|
||||||
|
# so that it can be loaded with single or multiple
|
||||||
|
# namespaces before the test begins.
|
||||||
|
SonicDBConfig._sonic_db_config = {}
|
||||||
|
SonicDBConfig._sonic_db_global_config_init = False
|
||||||
|
SonicDBConfig._sonic_db_config_init = False
|
||||||
|
|
||||||
|
def load_namespace_config():
|
||||||
|
# To support multi asic testing
|
||||||
|
# SonicDBConfig load_sonic_global_db_config
|
||||||
|
# is invoked to load multiple namespaces
|
||||||
|
clean_up_config()
|
||||||
|
SonicDBConfig.load_sonic_global_db_config(
|
||||||
|
global_db_file_path=os.path.join(
|
||||||
|
os.path.dirname(os.path.abspath(__file__)), 'database_global.json'))
|
||||||
|
|
||||||
|
def load_database_config():
|
||||||
|
# Load local database_config.json for single namespace test scenario
|
||||||
|
clean_up_config()
|
||||||
|
SonicDBConfig.load_sonic_db_config(
|
||||||
|
sonic_db_file_path=os.path.join(
|
||||||
|
os.path.dirname(os.path.abspath(__file__)), 'database_config.json'))
|
||||||
|
|
||||||
|
|
||||||
|
_old_connect_SonicV2Connector = SonicV2Connector.connect
|
||||||
|
|
||||||
|
def connect_SonicV2Connector(self, db_name, retry_on=True):
|
||||||
|
# add topo to kwargs for testing different topology
|
||||||
|
self.dbintf.redis_kwargs['topo'] = topo
|
||||||
|
# add the namespace to kwargs for testing multi asic
|
||||||
|
self.dbintf.redis_kwargs['namespace'] = self.namespace
|
||||||
|
# Mock DB filename for unit-test
|
||||||
|
global dedicated_dbs
|
||||||
|
if dedicated_dbs and dedicated_dbs.get(db_name):
|
||||||
|
self.dbintf.redis_kwargs['db_name'] = dedicated_dbs[db_name]
|
||||||
|
else:
|
||||||
|
self.dbintf.redis_kwargs['db_name'] = db_name
|
||||||
|
self.dbintf.redis_kwargs['decode_responses'] = True
|
||||||
|
_old_connect_SonicV2Connector(self, db_name, retry_on)
|
||||||
|
|
||||||
|
def _subscribe_keyspace_notification(self, db_name, client):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def config_set(self, *args):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MockPubSub:
|
||||||
|
def get_message(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def psubscribe(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __call__(self, *args, **kwargs):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def listen(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
def punsubscribe(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
INPUT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
|
||||||
|
|
||||||
|
class SwssSyncClient(mockredis.MockRedis):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(SwssSyncClient, self).__init__(strict=True, *args, **kwargs)
|
||||||
|
# Namespace is added in kwargs specifically for unit-test
|
||||||
|
# to identify the file path to load the db json files.
|
||||||
|
topo = kwargs.pop('topo')
|
||||||
|
namespace = kwargs.pop('namespace')
|
||||||
|
db_name = kwargs.pop('db_name')
|
||||||
|
self.decode_responses = kwargs.pop('decode_responses', False) == True
|
||||||
|
fname = db_name.lower() + ".json"
|
||||||
|
self.pubsub = MockPubSub()
|
||||||
|
|
||||||
|
if namespace is not None and namespace is not multi_asic.DEFAULT_NAMESPACE:
|
||||||
|
fname = os.path.join(INPUT_DIR, namespace, fname)
|
||||||
|
elif topo is not None:
|
||||||
|
fname = os.path.join(INPUT_DIR, topo, fname)
|
||||||
|
else:
|
||||||
|
fname = os.path.join(INPUT_DIR, fname)
|
||||||
|
|
||||||
|
if os.path.exists(fname):
|
||||||
|
with open(fname) as f:
|
||||||
|
js = json.load(f)
|
||||||
|
for k, v in js.items():
|
||||||
|
if 'expireat' in v and 'ttl' in v and 'type' in v and 'value' in v:
|
||||||
|
# database is in redis-dump format
|
||||||
|
if v['type'] == 'hash':
|
||||||
|
# ignore other types for now since sonic has hset keys only in the db
|
||||||
|
for attr, value in v['value'].items():
|
||||||
|
self.hset(k, attr, value)
|
||||||
|
else:
|
||||||
|
for attr, value in v.items():
|
||||||
|
self.hset(k, attr, value)
|
||||||
|
|
||||||
|
# Patch mockredis/mockredis/client.py
|
||||||
|
# The offical implementation assume decode_responses=False
|
||||||
|
# Here we detect the option and decode after doing encode
|
||||||
|
def _encode(self, value):
|
||||||
|
"Return a bytestring representation of the value. Taken from redis-py connection.py"
|
||||||
|
|
||||||
|
value = super(SwssSyncClient, self)._encode(value)
|
||||||
|
|
||||||
|
if self.decode_responses:
|
||||||
|
return value.decode('utf-8')
|
||||||
|
|
||||||
|
# Patch mockredis/mockredis/client.py
|
||||||
|
# The official implementation will filter out keys with a slash '/'
|
||||||
|
# ref: https://github.com/locationlabs/mockredis/blob/master/mockredis/client.py
|
||||||
|
def keys(self, pattern='*'):
|
||||||
|
"""Emulate keys."""
|
||||||
|
import fnmatch
|
||||||
|
import re
|
||||||
|
|
||||||
|
# Make regex out of glob styled pattern.
|
||||||
|
regex = fnmatch.translate(pattern)
|
||||||
|
regex = re.compile(regex)
|
||||||
|
|
||||||
|
# Find every key that matches the pattern
|
||||||
|
return [key for key in self.redis if regex.match(key)]
|
||||||
|
|
||||||
|
|
||||||
|
swsssdk.interface.DBInterface._subscribe_keyspace_notification = _subscribe_keyspace_notification
|
||||||
|
mockredis.MockRedis.config_set = config_set
|
||||||
|
redis.StrictRedis = SwssSyncClient
|
||||||
|
SonicV2Connector.connect = connect_SonicV2Connector
|
||||||
|
swsscommon.SonicV2Connector = SonicV2Connector
|
||||||
|
swsscommon.ConfigDBConnector = ConfigDBConnector
|
||||||
|
swsscommon.ConfigDBPipeConnector = ConfigDBPipeConnector
|
3
dockers/docker-dhcp-server/cli-plugin-tests/pytest.ini
Normal file
3
dockers/docker-dhcp-server/cli-plugin-tests/pytest.ini
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
[pytest]
|
||||||
|
addopts = --cov-config=.coveragerc --cov --cov-report html --cov-report term --cov-report xml --junitxml=test-results.xml -vv
|
||||||
|
|
@ -0,0 +1,57 @@
|
|||||||
|
import sys
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from click.testing import CliRunner
|
||||||
|
|
||||||
|
import utilities_common.cli as clicommon
|
||||||
|
|
||||||
|
sys.path.append('../cli/show/plugins/')
|
||||||
|
import show_dhcp_server
|
||||||
|
|
||||||
|
|
||||||
|
class TestShowDHCPServer(object):
|
||||||
|
def test_plugin_registration(self):
|
||||||
|
cli = mock.MagicMock()
|
||||||
|
show_dhcp_server.register(cli)
|
||||||
|
|
||||||
|
def test_show_dhcp_server_ipv4_lease_without_dhcpintf(self, mock_db):
|
||||||
|
expected_stdout = """\
|
||||||
|
Interface MAC Address IP Lease Start Lease End
|
||||||
|
------------------- ----------------- ----------- ------------------- -------------------
|
||||||
|
Vlan1000|Ethernet10 10:70:fd:b6:13:00 192.168.0.1 2023-03-01 03:16:21 2023-03-01 03:31:21
|
||||||
|
Vlan1000|Ethernet11 10:70:fd:b6:13:01 192.168.0.2 2023-03-01 03:16:21 2023-03-01 03:31:21
|
||||||
|
Vlan1001|<Unknown> 10:70:fd:b6:13:02 192.168.0.3 2023-03-01 03:16:21 2023-03-01 03:31:21
|
||||||
|
"""
|
||||||
|
runner = CliRunner()
|
||||||
|
db = clicommon.Db()
|
||||||
|
db.db = mock_db
|
||||||
|
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["lease"], [], obj=db)
|
||||||
|
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
|
||||||
|
assert result.stdout == expected_stdout
|
||||||
|
|
||||||
|
def test_show_dhcp_server_ipv4_lease_with_dhcpintf(self, mock_db):
|
||||||
|
expected_stdout = """\
|
||||||
|
Interface MAC Address IP Lease Start Lease End
|
||||||
|
------------------- ----------------- ----------- ------------------- -------------------
|
||||||
|
Vlan1000|Ethernet10 10:70:fd:b6:13:00 192.168.0.1 2023-03-01 03:16:21 2023-03-01 03:31:21
|
||||||
|
Vlan1000|Ethernet11 10:70:fd:b6:13:01 192.168.0.2 2023-03-01 03:16:21 2023-03-01 03:31:21
|
||||||
|
"""
|
||||||
|
runner = CliRunner()
|
||||||
|
db = clicommon.Db()
|
||||||
|
db.db = mock_db
|
||||||
|
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["lease"], ["Vlan1000"], obj=db)
|
||||||
|
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
|
||||||
|
assert result.stdout == expected_stdout
|
||||||
|
|
||||||
|
def test_show_dhcp_server_ipv4_lease_client_not_in_fdb(self, mock_db):
|
||||||
|
expected_stdout = """\
|
||||||
|
Interface MAC Address IP Lease Start Lease End
|
||||||
|
------------------ ----------------- ----------- ------------------- -------------------
|
||||||
|
Vlan1001|<Unknown> 10:70:fd:b6:13:02 192.168.0.3 2023-03-01 03:16:21 2023-03-01 03:31:21
|
||||||
|
"""
|
||||||
|
runner = CliRunner()
|
||||||
|
db = clicommon.Db()
|
||||||
|
db.db = mock_db
|
||||||
|
result = runner.invoke(show_dhcp_server.dhcp_server.commands["ipv4"].commands["lease"], ["Vlan1001"], obj=db)
|
||||||
|
assert result.exit_code == 0, "exit code: {}, Exception: {}, Traceback: {}".format(result.exit_code, result.exception, result.exc_info)
|
||||||
|
assert result.stdout == expected_stdout
|
@ -1,13 +1,46 @@
|
|||||||
import click
|
import click
|
||||||
|
from tabulate import tabulate
|
||||||
import utilities_common.cli as clicommon
|
import utilities_common.cli as clicommon
|
||||||
|
|
||||||
|
|
||||||
@click.group(cls=clicommon.AliasedGroup, name="dhcp_server")
|
import ipaddress
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
def ts_to_str(ts):
|
||||||
|
return datetime.fromtimestamp(int(ts)).strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
|
||||||
|
@click.group(cls=clicommon.AliasedGroup)
|
||||||
def dhcp_server():
|
def dhcp_server():
|
||||||
"""show DHCP Server information"""
|
"""Show dhcp_server related info"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@dhcp_server.group(cls=clicommon.AliasedGroup)
|
||||||
|
def ipv4():
|
||||||
|
"""Show ipv4 related dhcp_server info"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@ipv4.command()
|
||||||
|
@click.argument('dhcp_interface', required=False)
|
||||||
|
@clicommon.pass_db
|
||||||
|
def lease(db, dhcp_interface):
|
||||||
|
if not dhcp_interface:
|
||||||
|
dhcp_interface = "*"
|
||||||
|
headers = ["Interface", "MAC Address", "IP", "Lease Start", "Lease End"]
|
||||||
|
table = []
|
||||||
|
dbconn = db.db
|
||||||
|
for key in dbconn.keys("STATE_DB", "DHCP_SERVER_IPV4_LEASE|" + dhcp_interface + "|*"):
|
||||||
|
entry = dbconn.get_all("STATE_DB", key)
|
||||||
|
interface, mac = key.split("|")[1:]
|
||||||
|
port = dbconn.get("STATE_DB", "FDB_TABLE|" + interface + ":" + mac, "port")
|
||||||
|
if not port:
|
||||||
|
port = "<Unknown>"
|
||||||
|
table.append([interface + "|" + port, mac, entry["ip"], ts_to_str(entry["lease_start"]), ts_to_str(entry["lease_end"])])
|
||||||
|
click.echo(tabulate(table, headers=headers))
|
||||||
|
|
||||||
|
|
||||||
def register(cli):
|
def register(cli):
|
||||||
# cli.add_command(dhcp_server)
|
cli.add_command(dhcp_server)
|
||||||
pass
|
|
||||||
|
Reference in New Issue
Block a user