[sonic-yang-mgmt]: sonic-yang-mgmt package for configuration validation. (#3861)

**- What I did**

#### wheel package Makefiles

- wheel package Makefiles for sonic-yang-mgmt package.

#### libyang Python APIs:
- python APIs based on libyang
- functions to load/merge yang models and Yang data files
- function to validate data trees based on Yang models
- functions to merge yang data files/trees
- add/set/delete node in schema and data trees
- find data/schema nodes from xpath from the Yang data/schema tree in memory
- find dependencies
- dump the data tree in json/xml

#### Extension of libyang Python APIs:
-- Cropping input config based on Yang Model.
-- Translate input config based on Yang Model.
-- rev Translate input config based on Yang Model.
-- Find xpath of port, portleaf and a yang list.
-- Find if node is key of a list while deletion if yes, then delete the parent.

Signed-off-by: Praveen Chaudhary pchaudhary@linkedin.com
Signed-off-by: Ping Mao pmao@linkedin.com
This commit is contained in:
Praveen Chaudhary 2020-05-21 16:27:57 -07:00 committed by GitHub
parent 2398992d52
commit 0ccdd70671
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 3907 additions and 8 deletions

View File

@ -132,6 +132,11 @@ SONIC_YANG_MODEL_PY3_WHEEL_NAME=$(basename {{sonic_yang_models_py3_wheel_path}})
sudo cp {{sonic_yang_models_py3_wheel_path}} $FILESYSTEM_ROOT/$SONIC_YANG_MODEL_PY3_WHEEL_NAME
sudo https_proxy=$https_proxy LANG=C chroot $FILESYSTEM_ROOT pip3 install $SONIC_YANG_MODEL_PY3_WHEEL_NAME
sudo rm -rf $FILESYSTEM_ROOT/$SONIC_YANG_MODEL_PY3_WHEEL_NAME
# Install sonic-yang-mgmt Python package
SONIC_YANG_MGMT_PY_WHEEL_NAME=$(basename {{sonic_yang_mgmt_py_wheel_path}})
sudo cp {{sonic_yang_mgmt_py_wheel_path}} $FILESYSTEM_ROOT/$SONIC_YANG_MGMT_PY_WHEEL_NAME
sudo https_proxy=$https_proxy LANG=C chroot $FILESYSTEM_ROOT pip install $SONIC_YANG_MGMT_PY_WHEEL_NAME
sudo rm -rf $FILESYSTEM_ROOT/$SONIC_YANG_MGMT_PY_WHEEL_NAME
# Install sonic-platform-common Python 2 package
PLATFORM_COMMON_PY2_WHEEL_NAME=$(basename {{platform_common_py2_wheel_path}})

View File

@ -0,0 +1,10 @@
SPATH := $($(SONIC_YANG_MGMT_PY)_SRC_PATH)
DEP_FILES := $(SONIC_COMMON_FILES_LIST) rules/sonic-yang-mgmt-py2.mk rules/sonic-yang-mgmt-py2.dep
DEP_FILES += $(SONIC_COMMON_BASE_FILES_LIST)
DEP_FILES += $(shell git ls-files $(SPATH))
$(SONIC_YANG_MGMT_PY)_CACHE_MODE := GIT_CONTENT_SHA
$(SONIC_YANG_MGMT_PY)_DEP_FLAGS := $(SONIC_COMMON_FLAGS_LIST)
$(SONIC_YANG_MGMT_PY)_DEP_FILES := $(DEP_FILES)

View File

@ -0,0 +1,9 @@
# sonic-yang-mgmt python2 wheel
SONIC_YANG_MGMT_PY = sonic_yang_mgmt-1.0-py2-none-any.whl
$(SONIC_YANG_MGMT_PY)_SRC_PATH = $(SRC_PATH)/sonic-yang-mgmt
$(SONIC_YANG_MGMT_PY)_PYTHON_VERSION = 2
$(SONIC_YANG_MGMT_PY)_DEBS_DEPENDS = $(LIBYANG)
$(SONIC_YANG_MGMT_PY)_DEPENDS = $(SONIC_YANG_MODELS_PY3)
SONIC_PYTHON_WHEELS += $(SONIC_YANG_MGMT_PY)

View File

@ -542,7 +542,6 @@ SONIC_TARGET_LIST += $(addprefix $(PYTHON_DEBS_PATH)/, $(SONIC_PYTHON_STDEB_DEBS
$(addprefix $(PYTHON_WHEELS_PATH)/, $(SONIC_PYTHON_WHEELS)) : $(PYTHON_WHEELS_PATH)/% : .platform $$(addsuffix -install,$$(addprefix $(PYTHON_WHEELS_PATH)/,$$($$*_DEPENDS))) \
$(call dpkg_depend,$(PYTHON_WHEELS_PATH)/%.dep) \
$$(addsuffix -install,$$(addprefix $(DEBS_PATH)/,$$($$*_DEBS_DEPENDS)))
$(HEADER)
# Load the target deb from DPKG cache
@ -786,7 +785,8 @@ $(addprefix $(TARGET_PATH)/, $(SONIC_INSTALLERS)) : $(TARGET_PATH)/% : \
$(addprefix $(PYTHON_WHEELS_PATH)/,$(SONIC_PLATFORM_COMMON_PY2)) \
$(addprefix $(PYTHON_WHEELS_PATH)/,$(REDIS_DUMP_LOAD_PY2)) \
$(addprefix $(PYTHON_WHEELS_PATH)/,$(SONIC_PLATFORM_API_PY2)) \
$(addprefix $(PYTHON_WHEELS_PATH)/,$(SONIC_YANG_MODELS_PY3))
$(addprefix $(PYTHON_WHEELS_PATH)/,$(SONIC_YANG_MODELS_PY3)) \
$(addprefix $(PYTHON_WHEELS_PATH)/,$(SONIC_YANG_MGMT_PY))
$(HEADER)
# Pass initramfs and linux kernel explicitly. They are used for all platforms
export debs_path="$(IMAGE_DISTRO_DEBS_PATH)"
@ -818,6 +818,7 @@ $(addprefix $(TARGET_PATH)/, $(SONIC_INSTALLERS)) : $(TARGET_PATH)/% : \
export redis_dump_load_py2_wheel_path="$(addprefix $(PYTHON_WHEELS_PATH)/,$(REDIS_DUMP_LOAD_PY2))"
export install_debug_image="$(INSTALL_DEBUG_TOOLS)"
export sonic_yang_models_py3_wheel_path="$(addprefix $(PYTHON_WHEELS_PATH)/,$(SONIC_YANG_MODELS_PY3))"
export sonic_yang_mgmt_py_wheel_path="$(addprefix $(PYTHON_WHEELS_PATH)/,$(SONIC_YANG_MGMT_PY))"
export multi_instance="false"
$(foreach docker, $($*_DOCKERS),\

View File

@ -0,0 +1,15 @@
=======
Credits
=======
Development Lead
----------------
LNOS-CODERS <lnos-coders@linkedin.com>
MSFT-LINUX-DEV <linuxnetdev@microsoft.com>
Contributors
------------
Praveen Chaudhary <pchaudhary@linkedin.com>
Ping Mao <pmao@linkedin.com>

View File

@ -0,0 +1,13 @@
Copyright 2019 Microsoft, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,6 @@
"
This package includes python yang libraries which will be used with sonic utilities
pacakge to validate the config. This python libraries are written on top of libyang
and also provides functionality to translate the config from SONiC ConfigDB to SONiC
YANG and vice-versa.
"

View File

@ -0,0 +1,98 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""The setup script."""
from setuptools import setup, find_packages
from setuptools.command.build_py import build_py
from os import system, environ
from sys import exit
import pytest
# find path of pkgs from os environment vars
prefix = '../../'; debs = environ["IMAGE_DISTRO_DEBS_PATH"]
wheels = environ["PYTHON_WHEELS_PATH"]
wheels_path = '{}/{}'.format(prefix, wheels)
deps_path = '{}/{}'.format(prefix, debs)
# dependencies
libyang = '{}/{}'.format(deps_path, environ["LIBYANG"])
libyangCpp = '{}/{}'.format(deps_path, environ["LIBYANG_CPP"])
libyangPy2 = '{}/{}'.format(deps_path, environ["LIBYANG_PY2"])
libyangPy3 = '{}/{}'.format(deps_path, environ["LIBYANG_PY3"])
sonicYangModels = '{}/{}'.format(wheels_path, environ["SONIC_YANG_MODELS_PY3"])
# important reuirements parameters
build_requirements = [libyang, libyangCpp, libyangPy2, libyangPy3, sonicYangModels,]
setup_requirements = ['pytest-runner']
test_requirements = ['pytest>=3']
# read me
with open('README.rst') as readme_file:
readme = readme_file.read()
# class for prerequisites to build this package
class pkgBuild(build_py):
"""Custom Build PLY"""
def run (self):
# install libyang and sonic_yang_models
for req in build_requirements:
if '.deb' in req:
pkg_install_cmd = "sudo dpkg -i {}".format(req)
if (system(pkg_install_cmd)):
print("{} installation failed".format(req))
exit(1)
else:
print("{} installed".format(req))
elif '.whl' in req:
pkg_install_cmd = "pip3 install {}".format(req)
if (system(pkg_install_cmd)):
print("{} installation failed".format(req))
exit(1)
else:
print("{} installed".format(req))
# run pytest for libyang python APIs
self.pytest_args = []
errno = pytest.main(self.pytest_args)
if (errno):
exit(errno)
# Continue usual build steps
build_py.run(self)
setup(
cmdclass={
'build_py': pkgBuild,
},
author="lnos-coders",
author_email='lnos-coders@linkedin.com',
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*',
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU General Public License v3 (GPLv3)',
'Natural Language :: English',
"Programming Language :: Python :: 2",
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
],
description="Package contains Python Library for YANG for sonic.",
tests_require = test_requirements,
license="GNU General Public License v3",
long_description=readme + '\n\n',
include_package_data=True,
keywords='sonic_yang_mgmt',
name='sonic_yang_mgmt',
py_modules=['sonic_yang', 'sonic_yang_ext'],
packages=find_packages(),
setup_requires=setup_requirements,
version='1.0',
zip_safe=False,
)

View File

@ -0,0 +1,659 @@
import yang as ly
import syslog
from json import dump
from glob import glob
from sonic_yang_ext import SonicYangExtMixin, SonicYangException
"""
Yang schema and data tree python APIs based on libyang python
Here, sonic_yang_ext_mixin extends funtionality of sonic_yang,
i.e. it is mixin not parent class.
"""
class SonicYang(SonicYangExtMixin):
def __init__(self, yang_dir, debug=False):
self.yang_dir = yang_dir
self.ctx = None
self.module = None
self.root = None
# logging vars
self.SYSLOG_IDENTIFIER = "sonic_yang"
self.DEBUG = debug
# yang model files, need this map it to module
self.yangFiles = list()
# map from TABLE in config DB to container and module
self.confDbYangMap = dict()
# JSON format of yang model [similar to pyang conversion]
self.yJson = list()
# config DB json input, will be cropped as yang models
self.jIn = dict()
# YANG JSON, this is traslated from config DB json
self.xlateJson = dict()
# reverse translation from yang JSON, == config db json
self.revXlateJson = dict()
# below dict store the input config tables which have no YANG models
self.tablesWithOutYang = dict()
try:
self.ctx = ly.Context(yang_dir)
except Exception as e:
self.fail(e)
return
def __del__(self):
pass
def sysLog(self, debug=syslog.LOG_INFO, msg=None):
# log debug only if enabled
if self.DEBUG == False and debug == syslog.LOG_DEBUG:
return
syslog.openlog(self.SYSLOG_IDENTIFIER)
syslog.syslog(debug, msg)
syslog.closelog()
return
def fail(self, e):
print(e)
raise e
"""
load_schema_module(): load a Yang model file
input: yang_file - full path of a Yang model file
returns: Exception if error
"""
def _load_schema_module(self, yang_file):
try:
return self.ctx.parse_module_path(yang_file, ly.LYS_IN_YANG)
except Exception as e:
print("Failed to load yang module file: " + yang_file)
self.fail(e)
"""
load_schema_module_list(): load all Yang model files in the list
input: yang_files - a list of Yang model file full path
returns: Exception if error
"""
def _load_schema_module_list(self, yang_files):
for file in yang_files:
try:
self._load_schema_module(file)
except Exception as e:
self.fail(e)
"""
load_schema_modules(): load all Yang model files in the directory
input: yang_dir - the directory of the yang model files to be loaded
returns: Exception if error
"""
def _load_schema_modules(self, yang_dir):
py = glob(yang_dir+"/*.yang")
for file in py:
try:
self._load_schema_module(file)
except Exception as e:
self.fail(e)
"""
load_schema_modules_ctx(): load all Yang model files in the directory to context: ctx
input: yang_dir, context
returns: Exception if error, returrns context object if no error
"""
def _load_schema_modules_ctx(self, yang_dir=None):
if not yang_dir:
yang_dir = self.yang_dir
ctx = ly.Context(yang_dir)
py = glob(yang_dir+"/*.yang")
for file in py:
try:
ctx.parse_module_path(str(file), ly.LYS_IN_YANG)
except Exception as e:
print("Failed to parse yang module file: " + file)
self.fail(e)
return ctx
"""
load_data_file(): load a Yang data json file
input: data_file - the full path of the yang json data file to be loaded
returns: Exception if error
"""
def _load_data_file(self, data_file):
try:
data_node = self.ctx.parse_data_path(data_file, ly.LYD_JSON, ly.LYD_OPT_CONFIG | ly.LYD_OPT_STRICT)
except Exception as e:
print("Failed to load data file: " + str(data_file))
self.fail(e)
else:
self.root = data_node
"""
get module name from xpath
input: path
returns: module name
"""
def _get_module_name(self, schema_xpath):
module_name = schema_xpath.split(':')[0].strip('/')
return module_name
"""
get_module(): get module object from Yang module name
input: yang module name
returns: Schema_Node object
"""
def _get_module(self, module_name):
mod = self.ctx.get_module(module_name)
return mod
"""
load_data_model(): load both Yang module fileis and data json files
input: yang directory, list of yang files and list of data files (full path)
returns: returns (context, root) if no error, or Exception if failed
"""
def _load_data_model(self, yang_dir, yang_files, data_files, output=None):
if (self.ctx is None):
self.ctx = ly.Context(yang_dir)
try:
self._load_schema_module_list(yang_files)
if len(data_files) == 0:
return (self.ctx, self.root)
self._load_data_file(data_files[0])
for i in range(1, len(data_files)):
self._merge_data(data_files[i])
except Exception as e:
print("Failed to load data files")
self.fail(e)
return
if output is not None:
self._print_data_mem(output)
return (self.ctx, self.root)
"""
print_data_mem(): print the data tree
input: option: "JSON" or "XML"
"""
def _print_data_mem(self, option):
if (option == "JSON"):
mem = self.root.print_mem(ly.LYD_JSON, ly.LYP_WITHSIBLINGS | ly.LYP_FORMAT)
else:
mem = self.root.print_mem(ly.LYD_XML, ly.LYP_WITHSIBLINGS | ly.LYP_FORMAT)
return mem
"""
save_data_file_json(): save the data tree in memory into json file
input: outfile - full path of the file to save the data tree to
"""
def _save_data_file_json(self, outfile):
mem = self.root.print_mem(ly.LYD_JSON, ly.LYP_FORMAT)
with open(outfile, 'w') as out:
dump(mem, out, indent=4)
"""
get_module_tree(): get yang module tree in JSON or XMAL format
input: module name
returns: JSON or XML format of the input yang module schema tree
"""
def _get_module_tree(self, module_name, format):
result = None
try:
module = self.ctx.get_module(str(module_name))
except Exception as e:
print("Cound not get module: " + str(module_name))
self.fail(e)
else:
if (module is not None):
if (format == "XML"):
#libyang bug with format
result = module.print_mem(ly.LYD_JSON, ly.LYP_FORMAT)
else:
result = module.print_mem(ly.LYD_XML, ly.LYP_FORMAT)
return result
"""
validate_data(): validate data tree
input:
node: root of the data tree
ctx: context
returns: Exception if failed
"""
def _validate_data(self, node=None, ctx=None):
if not node:
node = self.root
if not ctx:
ctx = self.ctx
try:
node.validate(ly.LYD_OPT_CONFIG, ctx)
except Exception as e:
self.fail(e)
"""
validate_data_tree(): validate the data tree. (Public)
returns: Exception if failed
"""
def validate_data_tree(self):
try:
self._validate_data(self.root, self.ctx)
except Exception as e:
print("Failed to validate data tree")
raise SonicYangException("Failed to validate data tree\n{}".\
format(str(e)))
"""
find_parent_data_node(): find the parent node object
input: data_xpath - xpath of the data node
returns: parent node
"""
def _find_parent_data_node(self, data_xpath):
if (self.root is None):
print("data not loaded")
return None
try:
data_node = self._find_data_node(data_xpath)
except Exception as e:
print("Failed to find data node from xpath: " + str(data_xpath))
self.fail(e)
else:
if data_node is not None:
return data_node.parent()
return None
"""
get_parent_data_xpath(): find the parent data node's xpath
input: data_xpath - xpathof the data node
returns: - xpath of parent data node
- Exception if error
"""
def _get_parent_data_xpath(self, data_xpath):
path=""
try:
data_node = self._find_parent_data_node(data_xpath)
except Exception as e:
print("Failed to find parent node from xpath: " + str(data_xpath))
self.fail(e)
else:
if data_node is not None:
path = data_node.path()
return path
"""
new_data_node(): create a new data node in the data tree
input:
xpath: xpath of the new node
value: value of the new node
returns: new Data_Node object if success, Exception if falied
"""
def _new_data_node(self, xpath, value):
val = str(value)
try:
data_node = self.root.new_path(self.ctx, xpath, val, 0, 0)
except Exception as e:
print("Failed to add data node for path: " + str(xpath))
self.fail(e)
else:
return data_node
"""
find_data_node(): find the data node from xpath
input: data_xpath: xpath of the data node
returns - Data_Node object if found
- None if not exist
- Exception if there is error
"""
def _find_data_node(self, data_xpath):
try:
set = self.root.find_path(data_xpath)
except Exception as e:
print("Failed to find data node from xpath: " + str(data_xpath))
self.fail(e)
else:
if set is not None:
for data_node in set.data():
if (data_xpath == data_node.path()):
return data_node
return None
"""
find_schema_node(): find the schema node from schema xpath
example schema xpath:
"/sonic-port:sonic-port/sonic-port:PORT/sonic-port:PORT_LIST/sonic-port:port_name"
input: xpath of the node
returns: Schema_Node oject or None if not found
"""
def _find_schema_node(self, schema_xpath):
try:
schema_set = self.ctx.find_path(schema_xpath)
for schema_node in schema_set.schema():
if (schema_xpath == schema_node.path()):
return schema_node
except Exception as e:
self.fail(e)
return None
else:
for schema_node in schema_set.schema():
if schema_xapth == schema_node.path():
return schema_node
return None
"""
find_data_node_schema_xpath(): find the xpath of the schema node from data xpath
data xpath example:
"/sonic-port:sonic-port/PORT/PORT_LIST[port_name='Ethernet0']/port_name"
input: data_xpath - xpath of the data node
returns: - xpath of the schema node if success
- Exception if error
"""
def _find_data_node_schema_xpath(self, data_xpath):
path = ""
try:
set = self.root.find_path(data_xpath)
except Exception as e:
self.fail(e)
else:
for data_node in set.data():
if data_xpath == data_node.path():
return data_node.schema().path()
return path
"""
add_node(): add a node to Yang schema or data tree
input: xpath and value of the node to be added
returns: Exception if failed
"""
def _add_data_node(self, data_xpath, value):
try:
self._new_data_node(data_xpath, value)
#check if the node added to the data tree
self._find_data_node(data_xpath)
except Exception as e:
print("add_node(): Failed to add data node for xpath: " + str(data_xpath))
self.fail(e)
"""
merge_data(): merge a data file to the existing data tree
input: yang model directory and full path of the data json file to be merged
returns: Exception if failed
"""
def _merge_data(self, data_file, yang_dir=None):
#load all yang models to ctx
if not yang_dir:
yang_dir = self.yang_dir
try:
ctx = self._load_schema_modules_ctx(yang_dir)
#source data node
source_node = ctx.parse_data_path(str(data_file), ly.LYD_JSON, ly.LYD_OPT_CONFIG | ly.LYD_OPT_STRICT)
#merge
self.root.merge(source_node, 0)
except Exception as e:
self.fail(e)
"""
_deleteNode(): delete a node from the schema/data tree, internal function
input: xpath of the schema/data node
returns: True - success False - failed
"""
def _deleteNode(self, xpath=None, node=None):
if node is None:
node = self._find_data_node(xpath)
if (node):
node.unlink()
dnode = self._find_data_node(xpath)
if (dnode is None):
#deleted node not found
return True
else:
print('Could not delete Node')
return False
else:
print("failed to find node, xpath: " + xpath)
return False
"""
find_data_node_value(): find the value of a node from the data tree
input: data_xpath of the data node
returns: value string of the node
"""
def _find_data_node_value(self, data_xpath):
output = ""
try:
data_node = self._find_data_node(data_xpath)
except Exception as e:
print("find_data_node_value(): Failed to find data node from xpath: {}".format(data_xpath))
self.fail(e)
else:
if (data_node is not None):
subtype = data_node.subtype()
if (subtype is not None):
value = subtype.value_str()
return value
return output
"""
set the value of a node in the data tree
input: xpath of the data node
returns: Exception if failed
"""
def _set_data_node_value(self, data_xpath, value):
try:
self.root.new_path(self.ctx, data_xpath, str(value), ly.LYD_ANYDATA_STRING, ly.LYD_PATH_OPT_UPDATE)
except Exception as e:
print("set data node value failed for xpath: " + str(data_xpath))
self.fail(e)
"""
find_data_nodes(): find the set of nodes for the xpath
input: xpath of the data node
returns: list of xpath of the dataset
"""
def _find_data_nodes(self, data_xpath):
list = []
node = self.root.child()
try:
node_set = node.find_path(data_xpath);
except Exception as e:
self.fail(e)
else:
if node_set is None:
raise Exception('data node not found')
for data_set in node_set.data():
data_set.schema()
list.append(data_set.path())
return list
"""
find_schema_dependencies(): find the schema dependencies from schema xpath
input: schema_xpath of the schema node
returns: - list of xpath of the dependencies
- Exception if schema node not found
"""
def _find_schema_dependencies(self, schema_xpath):
ref_list = []
try:
schema_node = self._find_schema_node(schema_xpath)
except Exception as e:
print("Cound not find the schema node from xpath: " + str(schema_xpath))
self.fail(e)
return ref_list
schema_node = ly.Schema_Node_Leaf(schema_node)
backlinks = schema_node.backlinks()
if backlinks.number() > 0:
for link in backlinks.schema():
print("backlink schema: {}".format(link.path()))
ref_list.append(link.path())
return ref_list
"""
find_data_dependencies(): find the data dependencies from data xpath
input: data_xpath - xpath of data node. (Public)
returns: - list of xpath
- Exception if error
"""
def find_data_dependencies(self, data_xpath):
ref_list = []
node = self.root
try:
data_node = self._find_data_node(data_xpath)
except Exception as e:
print("find_data_dependencies(): Failed to find data node from xpath: {}".format(data_xapth))
return ref_list
try:
value = str(self._find_data_node_value(data_xpath))
schema_node = ly.Schema_Node_Leaf(data_node.schema())
backlinks = schema_node.backlinks()
if backlinks.number() > 0:
for link in backlinks.schema():
node_set = node.find_path(link.path())
for data_set in node_set.data():
data_set.schema()
casted = data_set.subtype()
if value == casted.value_str():
ref_list.append(data_set.path())
except Exception as e:
print('Failed to find node or dependencies for {}'.format(data_xpath))
raise SonicYangException("Failed to find node or dependencies for \
{}\n{}".format(data_xpath, str(e)))
return ref_list
"""
get_module_prefix: get the prefix of a Yang module
input: name of the Yang module
output: prefix of the Yang module
"""
def _get_module_prefix(self, module_name):
prefix = ""
try:
module = self._get_module(module_name)
except Exception as e:
self.fail(e)
return prefix
else:
return module.prefix()
"""
str_to_type: map string to type of node
input: string
output: type
"""
def _str_to_type(self, type_str):
mapped_type = {
"LY_TYPE_DER":ly.LY_TYPE_DER,
"LY_TYPE_BINARY":ly.LY_TYPE_BINARY,
"LY_TYPE_BITS":ly.LY_TYPE_BITS,
"LY_TYPE_BOOL":ly.LY_TYPE_BOOL,
"LY_TYPE_DEC64":ly.LY_TYPE_DEC64,
"LY_TYPE_EMPTY":ly.LY_TYPE_EMPTY,
"LY_TYPE_ENUM":ly.LY_TYPE_ENUM,
"LY_TYPE_IDENT":ly.LY_TYPE_IDENT,
"LY_TYPE_INST":ly.LY_TYPE_INST,
"LY_TYPE_LEAFREF":ly.LY_TYPE_LEAFREF,
"LY_TYPE_STRING":ly.LY_TYPE_STRING,
"LY_TYPE_UNION":ly.LY_TYPE_UNION,
"LY_TYPE_INT8":ly.LY_TYPE_INT8,
"LY_TYPE_UINT8":ly.LY_TYPE_UINT8,
"LY_TYPE_INT16":ly.LY_TYPE_INT16,
"LY_TYPE_UINT16":ly.LY_TYPE_UINT16,
"LY_TYPE_INT32":ly.LY_TYPE_INT32,
"LY_TYPE_UINT32":ly.LY_TYPE_UINT32,
"LY_TYPE_INT64":ly.LY_TYPE_INT64,
"LY_TYPE_UINT64":ly.LY_TYPE_UINT64,
"LY_TYPE_UNKNOWN":ly.LY_TYPE_UNKNOWN
}
if type_str not in mapped_type:
return ly.LY_TYPE_UNKNOWN
return mapped_type[type_str]
def _get_data_type(self, schema_xpath):
try:
schema_node = self._find_schema_node(schema_xpath)
except Exception as e:
print("get_data_type(): Failed to find schema node from xpath: {}".format(schema_xpath))
self.fail(e)
return None
if (schema_node is not None):
return schema_node.subtype().type().base()
return ly.LY_TYPE_UNKNOWN
"""
get_leafref_type: find the type of node that leafref references to
input: data_xpath - xpath of a data node
output: type of the node this leafref references to
"""
def _get_leafref_type(self, data_xpath):
data_node = self._find_data_node(data_xpath)
if (data_node is not None):
subtype = data_node.subtype()
if (subtype is not None):
if data_node.schema().subtype().type().base() != ly.LY_TYPE_LEAFREF:
print("get_leafref_type() node type for data xpath: {} is not LEAFREF".format(data_xpath))
return ly.LY_TYPE_UNKNOWN
else:
return subtype.value_type()
return ly.LY_TYPE_UNKNOWN
"""
get_leafref_path(): find the leafref path
input: schema_xpath - xpath of a schema node
output: path value of the leafref node
"""
def _get_leafref_path(self, schema_xpath):
schema_node = self._find_schema_node(schema_xpath)
if (schema_node is not None):
subtype = schema_node.subtype()
if (subtype is not None):
if subtype.type().base() != ly.LY_TYPE_LEAFREF:
return None
else:
return subtype.type().info().lref().path()
return None
"""
get_leafref_type_schema: find the type of node that leafref references to
input: schema_xpath - xpath of a schema node
output: type of the node this leafref references to
"""
def _get_leafref_type_schema(self, schema_xpath):
schema_node = self._find_schema_node(schema_xpath)
if (schema_node is not None):
subtype = schema_node.subtype()
if (subtype is not None):
if subtype.type().base() != ly.LY_TYPE_LEAFREF:
return None
else:
subtype.type().info().lref().path()
target = subtype.type().info().lref().target()
target_path = target.path()
target_type = self._get_data_type(target_path)
return target_type
return None

View File

@ -0,0 +1,756 @@
# This script is used as extension of sonic_yang class. It has methods of
# class sonic_yang. A separate file is used to avoid a single large file.
from __future__ import print_function
import yang as ly
import re
import syslog
from json import dump, dumps, loads
from xmltodict import parse
from glob import glob
"""
This is the Exception thrown out of all public function of this class.
"""
class SonicYangException(Exception):
pass
# class sonic_yang methods, use mixin to extend sonic_yang
class SonicYangExtMixin:
"""
load all YANG models, create JSON of yang models. (Public function)
"""
def loadYangModel(self):
try:
# get all files
self.yangFiles = glob(self.yang_dir +"/*.yang")
# load yang modules
for file in self.yangFiles:
m = self._load_schema_module(file)
if m is not None:
self.sysLog(msg="module: {} is loaded successfully".format(m.name()))
else:
raise(Exception("Could not load module {}".format(file)))
# keep only modules name in self.yangFiles
self.yangFiles = [f.split('/')[-1] for f in self.yangFiles]
self.yangFiles = [f.split('.')[0] for f in self.yangFiles]
print('Loaded below Yang Models')
print(self.yangFiles)
# load json for each yang model
self._loadJsonYangModel()
# create a map from config DB table to yang container
self._createDBTableToModuleMap()
except Exception as e:
print("Yang Models Load failed")
raise SonicYangException("Yang Models Load failed\n{}".format(str(e)))
return True
"""
load JSON schema format from yang models
"""
def _loadJsonYangModel(self):
try:
for f in self.yangFiles:
m = self.ctx.get_module(f)
if m is not None:
xml = m.print_mem(ly.LYD_JSON, ly.LYP_FORMAT)
self.yJson.append(parse(xml))
self.sysLog(msg="Parsed Json for {}".format(m.name()))
except Exception as e:
raise e
return
"""
Create a map from config DB tables to container in yang model
This module name and topLevelContainer are fetched considering YANG models are
written using below Guidelines:
https://github.com/Azure/SONiC/blob/master/doc/mgmt/SONiC_YANG_Model_Guidelines.md.
"""
def _createDBTableToModuleMap(self):
for j in self.yJson:
# get module name
moduleName = j['module']['@name']
# get top level container
topLevelContainer = j['module'].get('container')
# if top level container is none, this is common yang files, which may
# have definitions. Store module.
if topLevelContainer is None:
self.confDbYangMap[moduleName] = j['module']
continue
# top level container must exist for rest of the yang files and it should
# have same name as module name.
assert topLevelContainer['@name'] == moduleName
# Each container inside topLevelContainer maps to a sonic config table.
container = topLevelContainer['container']
# container is a list
if isinstance(container, list):
for c in container:
self.confDbYangMap[c['@name']] = {
"module" : moduleName,
"topLevelContainer": topLevelContainer['@name'],
"container": c
}
# container is a dict
else:
self.confDbYangMap[container['@name']] = {
"module" : moduleName,
"topLevelContainer": topLevelContainer['@name'],
"container": container
}
return
"""
Get module, topLevelContainer(TLC) and json container for a config DB table
"""
def _getModuleTLCcontainer(self, table):
cmap = self.confDbYangMap
m = cmap[table]['module']
t = cmap[table]['topLevelContainer']
c = cmap[table]['container']
return m, t, c
"""
Crop config as per yang models,
This Function crops from config only those TABLEs, for which yang models is
provided. The Tables without YANG models are stored in
self.tablesWithOutYangModels.
"""
def _cropConfigDB(self, croppedFile=None):
for table in self.jIn.keys():
if table not in self.confDbYangMap:
# store in tablesWithOutYang
self.tablesWithOutYang[table] = self.jIn[table]
del self.jIn[table]
if len(self.tablesWithOutYang):
print("Note: Below table(s) have no YANG models:")
for table in self.tablesWithOutYang.keys():
print(unicode(table), end=", ")
print()
if croppedFile:
with open(croppedFile, 'w') as f:
dump(self.jIn, f, indent=4)
return
"""
Extract keys from table entry in Config DB and return in a dict
Input:
tableKey: Config DB Primary Key, Example tableKey = "Vlan111|2a04:5555:45:6709::1/64"
keys: key string from YANG list, i.e. 'vlan_name ip-prefix'.
regex: A regex to extract keys from tableKeys, good to have it as accurate as possible.
Return:
KeyDict = {"vlan_name": "Vlan111", "ip-prefix": "2a04:5555:45:6709::1/64"}
"""
def _extractKey(self, tableKey, keys, regex):
keyList = keys.split()
# get the value groups
value = re.match(regex, tableKey)
# create the keyDict
i = 1
keyDict = dict()
for k in keyList:
if value.group(i):
keyDict[k] = value.group(i)
else:
raise Exception("Value not found for {} in {}".format(k, tableKey))
i = i + 1
return keyDict
"""
Fill the dict based on leaf as a list or dict @model yang model object
"""
def _fillLeafDict(self, leafs, leafDict, isleafList=False):
if leafs is None:
return
# fill default values
def _fillSteps(leaf):
leaf['__isleafList'] = isleafList
leafDict[leaf['@name']] = leaf
return
if isinstance(leafs, list):
for leaf in leafs:
#print("{}:{}".format(leaf['@name'], leaf))
_fillSteps(leaf)
else:
#print("{}:{}".format(leaf['@name'], leaf))
_fillSteps(leafs)
return
"""
create a dict to map each key under primary key with a dict yang model.
This is done to improve performance of mapping from values of TABLEs in
config DB to leaf in YANG LIST.
"""
def _createLeafDict(self, model):
leafDict = dict()
#Iterate over leaf, choices and leaf-list.
self._fillLeafDict(model.get('leaf'), leafDict)
#choices, this is tricky, since leafs are under cases in tree.
choices = model.get('choice')
if choices:
for choice in choices:
cases = choice['case']
for case in cases:
self._fillLeafDict(case.get('leaf'), leafDict)
# leaf-lists
self._fillLeafDict(model.get('leaf-list'), leafDict, True)
return leafDict
"""
Convert a string from Config DB value to Yang Value based on type of the
key in Yang model.
@model : A List of Leafs in Yang model list
"""
def _findYangTypedValue(self, key, value, leafDict):
# convert config DB string to yang Type
def _yangConvert(val):
# Convert everything to string
val = str(val)
# find type of this key from yang leaf
type = leafDict[key]['type']['@name']
if 'uint' in type:
vValue = int(val, 10)
# TODO: find type of leafref from schema node
elif 'leafref' in type:
vValue = val
#TODO: find type in sonic-head, as of now, all are enumeration
elif 'head:' in type:
vValue = val
else:
vValue = val
return vValue
# if it is a leaf-list do it for each element
if leafDict[key]['__isleafList']:
vValue = list()
for v in value:
vValue.append(_yangConvert(v))
else:
vValue = _yangConvert(value)
return vValue
"""
Xlate a list
This function will xlate from a dict in config DB to a Yang JSON list
using yang model. Output will be go in self.xlateJson
"""
def _xlateList(self, model, yang, config, table):
#create a dict to map each key under primary key with a dict yang model.
#This is done to improve performance of mapping from values of TABLEs in
#config DB to leaf in YANG LIST.
leafDict = self._createLeafDict(model)
# fetch regex from YANG models.
keyRegEx = model['ext:key-regex-configdb-to-yang']['@value']
# seperator `|` has special meaning in regex, so change it appropriately.
keyRegEx = re.sub('\|', '\\|', keyRegEx)
# get keys from YANG model list itself
listKeys = model['key']['@value']
self.sysLog(msg="xlateList regex:{} keyList:{}".\
format(keyRegEx, listKeys))
for pkey in config.keys():
try:
vKey = None
self.sysLog(syslog.LOG_DEBUG, "xlateList Extract pkey:{}".\
format(pkey))
# Find and extracts key from each dict in config
keyDict = self._extractKey(pkey, listKeys, keyRegEx)
# fill rest of the values in keyDict
for vKey in config[pkey]:
self.sysLog(syslog.LOG_DEBUG, "xlateList vkey {}".format(vKey))
keyDict[vKey] = self._findYangTypedValue(vKey, \
config[pkey][vKey], leafDict)
yang.append(keyDict)
# delete pkey from config, done to match one key with one list
del config[pkey]
except Exception as e:
# log debug, because this exception may occur with multilists
self.sysLog(syslog.LOG_DEBUG, "xlateList Exception {}".format(e))
# with multilist, we continue matching other keys.
continue
return
"""
Process list inside a Container.
This function will call xlateList based on list(s) present in Container.
"""
def _xlateListInContainer(self, model, yang, configC, table):
clist = model
#print(clist['@name'])
yang[clist['@name']] = list()
self.sysLog(msg="xlateProcessListOfContainer: {}".format(clist['@name']))
self._xlateList(clist, yang[clist['@name']], configC, table)
# clean empty lists
if len(yang[clist['@name']]) == 0:
del yang[clist['@name']]
return
"""
Process container inside a Container.
This function will call xlateContainer based on Container(s) present
in outer Container.
"""
def _xlateContainerInContainer(self, model, yang, configC, table):
ccontainer = model
#print(ccontainer['@name'])
yang[ccontainer['@name']] = dict()
if not configC.get(ccontainer['@name']):
return
self.sysLog(msg="xlateProcessListOfContainer: {}".format(ccontainer['@name']))
self._xlateContainer(ccontainer, yang[ccontainer['@name']], \
configC[ccontainer['@name']], table)
# clean empty container
if len(yang[ccontainer['@name']]) == 0:
del yang[ccontainer['@name']]
# remove copy after processing
del configC[ccontainer['@name']]
return
"""
Xlate a container
This function will xlate from a dict in config DB to a Yang JSON container
using yang model. Output will be stored in self.xlateJson
"""
def _xlateContainer(self, model, yang, config, table):
# To Handle multiple Lists, Make a copy of config, because we delete keys
# from config after each match. This is done to match one pkey with one list.
configC = config.copy()
clist = model.get('list')
# If single list exists in container,
if clist and isinstance(clist, dict) and \
clist['@name'] == model['@name']+"_LIST" and bool(configC):
self._xlateListInContainer(clist, yang, configC, table)
# If multi-list exists in container,
elif clist and isinstance(clist, list) and bool(configC):
for modelList in clist:
self._xlateListInContainer(modelList, yang, configC, table)
# Handle container(s) in container
ccontainer = model.get('container')
# If single list exists in container,
if ccontainer and isinstance(ccontainer, dict) and bool(configC):
self._xlateContainerInContainer(ccontainer, yang, configC, table)
# If multi-list exists in container,
elif ccontainer and isinstance(ccontainer, list) and bool(configC):
for modelContainer in ccontainer:
self._xlateContainerInContainer(modelContainer, yang, configC, table)
## Handle other leaves in container,
leafDict = self._createLeafDict(model)
for vKey in configC.keys():
#vkey must be a leaf\leaf-list\choice in container
if leafDict.get(vKey):
self.sysLog(syslog.LOG_DEBUG, "xlateContainer vkey {}".format(vKey))
yang[vKey] = self._findYangTypedValue(vKey, configC[vKey], leafDict)
# delete entry from copy of config
del configC[vKey]
# All entries in copy of config must have been parsed.
if len(configC):
self.sysLog(syslog.LOG_ERR, "Alert: Remaining keys in Config")
raise(Exception("All Keys are not parsed in {}\n{}".format(table, \
configC.keys())))
return
"""
xlate ConfigDB json to Yang json
"""
def _xlateConfigDBtoYang(self, jIn, yangJ):
# find top level container for each table, and run the xlate_container.
for table in jIn.keys():
cmap = self.confDbYangMap[table]
# create top level containers
key = cmap['module']+":"+cmap['topLevelContainer']
subkey = cmap['topLevelContainer']+":"+cmap['container']['@name']
# Add new top level container for first table in this container
yangJ[key] = dict() if yangJ.get(key) is None else yangJ[key]
yangJ[key][subkey] = dict()
self.sysLog(msg="xlateConfigDBtoYang {}:{}".format(key, subkey))
self._xlateContainer(cmap['container'], yangJ[key][subkey], \
jIn[table], table)
return
"""
Read config file and crop it as per yang models
"""
def _xlateConfigDB(self, xlateFile=None):
jIn= self.jIn
yangJ = self.xlateJson
# xlation is written in self.xlateJson
self._xlateConfigDBtoYang(jIn, yangJ)
if xlateFile:
with open(xlateFile, 'w') as f:
dump(self.xlateJson, f, indent=4)
return
"""
create config DB table key from entry in yang JSON
"""
def _createKey(self, entry, regex):
keyDict = dict()
keyV = regex
# get the keys from regex of key extractor
keyList = re.findall(r'<(.*?)>', regex)
for key in keyList:
val = entry.get(key)
if val:
#print("pair: {} {}".format(key, val))
keyDict[key] = sval = str(val)
keyV = re.sub(r'<'+key+'>', sval, keyV)
#print("VAL: {} {}".format(regex, keyV))
else:
raise Exception("key {} not found in entry".format(key))
#print("kDict {}".format(keyDict))
return keyV, keyDict
"""
Convert a string from Config DB value to Yang Value based on type of the
key in Yang model.
@model : A List of Leafs in Yang model list
"""
def _revFindYangTypedValue(self, key, value, leafDict):
# convert yang Type to config DB string
def _revYangConvert(val):
# config DB has only strings, thank god for that :), wait not yet!!!
return str(val)
# if it is a leaf-list do it for each element
if leafDict[key]['__isleafList']:
vValue = list()
for v in value:
vValue.append(_revYangConvert(v))
else:
vValue = _revYangConvert(value)
return vValue
"""
Rev xlate from <TABLE>_LIST to table in config DB
"""
def _revXlateList(self, model, yang, config, table):
# fetch regex from YANG models
keyRegEx = model['ext:key-regex-yang-to-configdb']['@value']
self.sysLog(msg="revXlateList regex:{}".format(keyRegEx))
# create a dict to map each key under primary key with a dict yang model.
# This is done to improve performance of mapping from values of TABLEs in
# config DB to leaf in YANG LIST.
leafDict = self._createLeafDict(model)
# list with name <NAME>_LIST should be removed,
if "_LIST" in model['@name']:
for entry in yang:
# create key of config DB table
pkey, pkeydict = self._createKey(entry, keyRegEx)
self.sysLog(syslog.LOG_DEBUG, "revXlateList pkey:{}".format(pkey))
config[pkey]= dict()
# fill rest of the entries
for key in entry:
if key not in pkeydict:
config[pkey][key] = self._revFindYangTypedValue(key, \
entry[key], leafDict)
return
"""
Rev xlate a list inside a yang container
"""
def _revXlateListInContainer(self, model, yang, config, table):
modelList = model
# Pass matching list from Yang Json if exist
if yang.get(modelList['@name']):
self.sysLog(msg="revXlateListInContainer {}".format(modelList['@name']))
self._revXlateList(modelList, yang[modelList['@name']], config, table)
return
"""
Rev xlate a container inside a yang container
"""
def _revXlateContainerInContainer(self, model, yang, config, table):
modelContainer = model
# Pass matching list from Yang Json if exist
if yang.get(modelContainer['@name']):
config[modelContainer['@name']] = dict()
self.sysLog(msg="revXlateContainerInContainer {}".format(modelContainer['@name']))
self._revXlateContainer(modelContainer, yang[modelContainer['@name']], \
config[modelContainer['@name']], table)
return
"""
Rev xlate from yang container to table in config DB
"""
def _revXlateContainer(self, model, yang, config, table):
# IF container has only one list
clist = model.get('list')
if isinstance(clist, dict):
self._revXlateListInContainer(clist, yang, config, table)
# IF container has lists
elif isinstance(clist, list):
for modelList in clist:
self._revXlateListInContainer(modelList, yang, config, table)
ccontainer = model.get('container')
# IF container has only one inner container
if isinstance(ccontainer, dict):
self._revXlateContainerInContainer(ccontainer, yang, config, table)
# IF container has only many inner container
elif isinstance(ccontainer, list):
for modelContainer in ccontainer:
self._revXlateContainerInContainer(modelContainer, yang, config, table)
## Handle other leaves in container,
leafDict = self._createLeafDict(model)
for vKey in yang:
#vkey must be a leaf\leaf-list\choice in container
if leafDict.get(vKey):
self.sysLog(syslog.LOG_DEBUG, "revXlateContainer vkey {}".format(vKey))
config[vKey] = self._revFindYangTypedValue(vKey, yang[vKey], leafDict)
return
"""
rev xlate ConfigDB json to Yang json
"""
def _revXlateYangtoConfigDB(self, yangJ, cDbJson):
yangJ = self.xlateJson
cDbJson = self.revXlateJson
# find table in config DB, use name as a KEY
for module_top in yangJ.keys():
# module _top will be of from module:top
for container in yangJ[module_top].keys():
#table = container.split(':')[1]
table = container
#print("revXlate " + table)
cmap = self.confDbYangMap[table]
cDbJson[table] = dict()
#print(key + "--" + subkey)
self.sysLog(msg="revXlateYangtoConfigDB {}".format(table))
self._revXlateContainer(cmap['container'], yangJ[module_top][container], \
cDbJson[table], table)
return
"""
Reverse Translate tp config DB
"""
def _revXlateConfigDB(self, revXlateFile=None):
yangJ = self.xlateJson
cDbJson = self.revXlateJson
# xlation is written in self.xlateJson
self._revXlateYangtoConfigDB(yangJ, cDbJson)
if revXlateFile:
with open(revXlateFile, 'w') as f:
dump(self.revXlateJson, f, indent=4)
return
"""
Find a list in YANG Container
c = container
l = list name
return: list if found else None
"""
def _findYangList(self, container, listName):
if isinstance(container['list'], dict):
clist = container['list']
if clist['@name'] == listName:
return clist
elif isinstance(container['list'], list):
clist = [l for l in container['list'] if l['@name'] == listName]
return clist[0]
return None
"""
Find xpath of the PORT Leaf in PORT container/list. Xpath of Leaf is needed,
because only leaf can have leafrefs depend on them. (Public)
"""
def findXpathPortLeaf(self, portName):
try:
table = "PORT"
xpath = self.findXpathPort(portName)
module, topc, container = self._getModuleTLCcontainer(table)
list = self._findYangList(container, table+"_LIST")
xpath = xpath + "/" + list['key']['@value'].split()[0]
except Exception as e:
print("find xpath of port Leaf failed")
raise SonicYangException("find xpath of port Leaf failed\n{}".format(str(e)))
return xpath
"""
Find xpath of PORT. (Public)
"""
def findXpathPort(self, portName):
try:
table = "PORT"
module, topc, container = self._getModuleTLCcontainer(table)
xpath = "/" + module + ":" + topc + "/" + table
list = self._findYangList(container, table+"_LIST")
xpath = self._findXpathList(xpath, list, [portName])
except Exception as e:
print("find xpath of port failed")
raise SonicYangException("find xpath of port failed\n{}".format(str(e)))
return xpath
"""
Find xpath of a YANG LIST from keys,
xpath: xpath till list
list: YANG List
keys: list of keys in YANG LIST
"""
def _findXpathList(self, xpath, list, keys):
try:
# add list name in xpath
xpath = xpath + "/" + list['@name']
listKeys = list['key']['@value'].split()
i = 0;
for listKey in listKeys:
xpath = xpath + '['+listKey+'=\''+keys[i]+'\']'
i = i + 1
except Exception as e:
raise e
return xpath
"""
load_data: load Config DB, crop, xlate and create data tree from it. (Public)
input: data
returns: True - success False - failed
"""
def loadData(self, configdbJson):
try:
self.jIn = configdbJson
# reset xlate and tablesWithOutYang
self.xlateJson = dict()
self.tablesWithOutYang = dict()
# self.jIn will be cropped
self._cropConfigDB()
# xlated result will be in self.xlateJson
self._xlateConfigDB()
#print(self.xlateJson)
self.sysLog(msg="Try to load Data in the tree")
self.root = self.ctx.parse_data_mem(dumps(self.xlateJson), \
ly.LYD_JSON, ly.LYD_OPT_CONFIG|ly.LYD_OPT_STRICT)
except Exception as e:
self.root = None
print("Data Loading Failed")
raise SonicYangException("Data Loading Failed\n{}".format(str(e)))
return True
"""
Get data from Data tree, data tree will be assigned in self.xlateJson. (Public)
"""
def getData(self):
try:
self.xlateJson = loads(self._print_data_mem('JSON'))
# reset reverse xlate
self.revXlateJson = dict()
# result will be stored self.revXlateJson
self._revXlateConfigDB()
except Exception as e:
print("Get Data Tree Failed")
raise SonicYangException("Get Data Tree Failed\n{}".format(str(e)))
return self.revXlateJson
"""
Delete a node from data tree, if this is LEAF and KEY Delete the Parent.
(Public)
"""
def deleteNode(self, xpath):
# These MACROS used only here, can we get it from Libyang Header ?
try:
LYS_LEAF = 4
node = self._find_data_node(xpath)
if node is None:
raise('Node {} not found'.format(xpath))
snode = node.schema()
# check for a leaf if it is a key. If yes delete the parent
if (snode.nodetype() == LYS_LEAF):
leaf = ly.Schema_Node_Leaf(snode)
if leaf.is_key():
# try to delete parent
nodeP = self._find_parent_data_node(xpath)
xpathP = nodeP.path()
if self._deleteNode(xpath=xpathP, node=nodeP) == False:
raise Exception('_deleteNode failed')
else:
return True
# delete non key element
if self._deleteNode(xpath=xpath, node=node) == False:
raise Exception('_deleteNode failed')
except Exception as e:
raise SonicYangException("Failed to delete node {}\n{}".\
format( xpath, str(e)))
return True
# End of class sonic_yang

View File

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
"""Unit test package for sonic_yang_mgmt."""

View File

@ -0,0 +1,260 @@
{
"test-vlan:vlan": {
"test-vlan:VLAN_INTERFACE": {
"VLAN_INTERFACE_LIST": [{
"vlanid": 111,
"ip-prefix": "2000:f500:45:6709::1/64",
"scope": "global",
"family": "IPv6"
},
{
"vlanid": 111,
"ip-prefix": "10.1.1.65/26",
"scope": "global",
"family": "IPv4"
},
{
"vlanid": 111,
"ip-prefix": "fe80::1/10",
"scope": "local",
"family": "IPv6"
},
{
"vlanid": 555,
"ip-prefix": "2000:f500:41:4e9::1/64",
"scope": "global",
"family": "IPv6"
},
{
"vlanid": 555,
"ip-prefix": "10.1.5.65/26",
"scope": "global",
"family": "IPv4"
},
{
"vlanid": 555,
"ip-prefix": "fe80::1/10",
"scope": "local",
"family": "IPv6"
}
]
},
"test-vlan:VLAN": {
"VLAN_LIST": [{
"vlanid": 111,
"description": "server_vlan",
"dhcp_servers": [
"10.1.7.116"
],
"mtu": "9216",
"admin_status": "up"
},
{
"vlanid": 555,
"description": "ipmi_vlan",
"dhcp_servers": [
"10.1.7.116"
],
"mtu": "9216",
"admin_status": "up"
}
]
},
"test-vlan:VLAN_MEMBER": {
"VLAN_MEMBER_LIST": [{
"vlanid": 111,
"port": "Ethernet0",
"tagging_mode": "tagged"
},
{
"vlanid": 111,
"port": "Ethernet1",
"tagging_mode": "tagged"
},
{
"vlanid": 111,
"port": "Ethernet2",
"tagging_mode": "tagged"
},
{
"vlanid": 111,
"port": "Ethernet3",
"tagging_mode": "tagged"
},
{
"vlanid": 111,
"port": "Ethernet4",
"tagging_mode": "tagged"
},
{
"vlanid": 111,
"port": "Ethernet5",
"tagging_mode": "tagged"
},
{
"vlanid": 111,
"port": "Ethernet6",
"tagging_mode": "tagged"
}
]
}
},
"test-port:port": {
"test-port:PORT": {
"PORT_LIST": [{
"port_name": "Ethernet0",
"alias": "eth0",
"description": "Ethernet0",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet1",
"alias": "eth1",
"description": "Ethernet1",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet2",
"alias": "eth2",
"description": "Ethernet2",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet3",
"alias": "eth2",
"description": "Ethernet3",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet4",
"alias": "eth4",
"description": "Ethernet4",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet5",
"alias": "eth5",
"description": "Ethernet5",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet6",
"alias": "eth6",
"description": "Ethernet6",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet7",
"alias": "eth7",
"description": "Ethernet7",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet8",
"alias": "eth8",
"description": "Ethernet8",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet9",
"alias": "eth9",
"description": "Ethernet9",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
}
]
}
},
"test-acl:acl": {
"test-acl:ACL_RULE": {
"ACL_RULE_LIST": [{
"ACL_TABLE_NAME": "PACL-V4",
"RULE_NAME": "Rule_20",
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.1.72.0/26",
"SRC_IP": "10.1.0.0/15",
"PRIORITY": "999980",
"IP_TYPE": "IPV4ANY"
},
{
"ACL_TABLE_NAME": "PACL-V4",
"RULE_NAME": "Rule_40",
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.1.72.64/26",
"SRC_IP": "10.1.0.0/15",
"PRIORITY": "999960",
"IP_TYPE": "IPV4ANY",
"INNER_ETHER_TYPE": "0x88CC"
},
{
"ACL_TABLE_NAME": "PACL-V6",
"RULE_NAME": "Rule_20",
"PACKET_ACTION": "FORWARD",
"IP_TYPE": "IP",
"SRC_IPV6": "2000:f500:41::/48",
"PRIORITY": "999980",
"DST_IPV6": "2000:f500:43:320::/64",
"L4_SRC_PORT_RANGE": "653-1053"
}
]
},
"test-acl:ACL_TABLE": {
"ACL_TABLE_LIST": [{
"ACL_TABLE_NAME": "PACL-V6",
"policy_desc": "Filter IPv6",
"type": "L3V6",
"stage": "EGRESS",
"ports": ["Ethernet7", "Ethernet9", "Ethernet8"]
},
{
"ACL_TABLE_NAME": "PACL-V4",
"policy_desc": "Filter IPv6",
"type": "L3",
"stage": "INGRESS",
"ports": ["Ethernet2", "Ethernet0", "Ethernet1"]
}
]
}
},
"test-interface:interface": {
"test-interface:INTERFACE": {
"INTERFACE_LIST": [{
"interface": "Ethernet8",
"ip-prefix": "10.1.1.65/26",
"scope": "global",
"family": "IPv4"
},
{
"interface": "Ethernet8",
"ip-prefix": "2000:f500:40:a749::2/126",
"scope": "global",
"family": "IPv6"
}
]
}
}
}

View File

@ -0,0 +1,154 @@
{
"test-vlan:vlan": {
"test-vlan:VLAN_INTERFACE": {
"VLAN_INTERFACE_LIST": [{
"vlanid": 111,
"ip-prefix": "2000:f500:45:6709::1/64",
"scope": "global",
"family": "IPv6"
},
{
"vlanid": 111,
"ip-prefix": "10.1.1.64/26",
"scope": "global",
"family": "IPv4"
},
{
"vlanid": 200,
"ip-prefix": "2000:f500:45:6708::1/64",
"scope": "global",
"family": "IPv6"
},
{
"vlanid": 200,
"ip-prefix": "2000:f500:45:6709::1/64",
"scope": "global",
"family": "IPv6"
}
]
},
"test-vlan:VLAN": {
"VLAN_LIST": [{
"vlanid": 200,
"description": "server_vlan",
"dhcp_servers": [
"10.1.72.116"
],
"mtu": "9216",
"admin_status": "up"
},
{
"vlanid": 111,
"description": "server_vlan",
"dhcp_servers": [
"10.1.72.116"
],
"mtu": "9216",
"admin_status": "up"
}
]
},
"test-vlan:VLAN_MEMBER": {
"VLAN_MEMBER_LIST": [{
"vlanid": 200,
"port": "Ethernet0",
"tagging_mode": "tagged"
}]
}
},
"test-port:port": {
"test-port:PORT": {
"PORT_LIST": [{
"port_name": "Ethernet0",
"alias": "eth0",
"description": "Ethernet0",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet1",
"alias": "eth1",
"description": "Ethernet1",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet2",
"alias": "eth2",
"description": "Ethernet2",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet3",
"alias": "eth2",
"description": "Ethernet3",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet4",
"alias": "eth4",
"description": "Ethernet4",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet5",
"alias": "eth5",
"description": "Ethernet5",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet6",
"alias": "eth6",
"description": "Ethernet6",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet7",
"alias": "eth7",
"description": "Ethernet7",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet8",
"alias": "eth8",
"description": "Ethernet8",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet9",
"alias": "eth9",
"description": "Ethernet9",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
},
{
"port_name": "Ethernet10",
"alias": "eth10",
"description": "Ethernet10",
"speed": 25000,
"mtu": 9000,
"admin_status": "up"
}
]
}
}
}

View File

@ -0,0 +1,268 @@
module test-acl {
yang-version 1.1;
namespace "http://github.com/Azure/acl";
prefix acl;
import ietf-yang-types {
prefix yang;
}
import ietf-inet-types {
prefix inet;
}
import test-head {
prefix head;
revision-date 2019-07-01;
}
import test-port {
prefix port;
revision-date 2019-07-01;
}
import test-portchannel {
prefix lag;
revision-date 2019-07-01;
}
revision 2019-07-01 {
description "First Revision";
}
container acl {
container ACL_RULE {
description "ACL_RULE part of config_db.json";
list ACL_RULE_LIST {
key "ACL_TABLE_NAME RULE_NAME";
leaf ACL_TABLE_NAME {
type leafref {
path "/acl:acl/acl:ACL_TABLE/acl:ACL_TABLE_LIST/acl:ACL_TABLE_NAME";
}
}
leaf RULE_NAME {
type string {
length 1..255;
}
}
leaf PACKET_ACTION {
type head:packet_action;
}
leaf IP_TYPE {
type head:ip_type;
}
leaf PRIORITY {
type uint32 {
range 0..999999;
}
}
choice ip_prefix {
case ip4_prefix {
when "boolean(IP_TYPE[.='ANY' or .='IP' or .='IPV4' or .='IPV4ANY' or .='ARP'])";
leaf SRC_IP {
type inet:ipv4-prefix;
}
leaf DST_IP {
type inet:ipv4-prefix;
}
}
case ip6_prefix {
when "boolean(IP_TYPE[.='ANY' or .='IP' or .='IPV6' or .='IPV6ANY'])";
leaf SRC_IPV6 {
type inet:ipv6-prefix;
}
leaf DST_IPV6 {
type inet:ipv6-prefix;
}
}
}
leaf-list IN_PORTS {
/* Values in leaf list are UNIQUE */
type uint16;
}
leaf-list OUT_PORTS {
/* Values in leaf list are UNIQUE */
type uint16;
}
choice src_port {
case l4_src_port {
leaf L4_SRC_PORT {
type uint16;
}
}
case l4_src_port_range {
leaf L4_SRC_PORT_RANGE {
type string {
pattern '([0-9]{1,4}|[0-5][0-9]{4}|[6][0-4][0-9]{3}|[6][5][0-2][0-9]{2}|[6][5][3][0-5]{2}|[6][5][3][6][0-5])-([0-9]{1,4}|[0-5][0-9]{4}|[6][0-4][0-9]{3}|[6][5][0-2][0-9]{2}|[6][5][3][0-5]{2}|[6][5][3][6][0-5])';
}
}
}
}
choice dst_port {
case l4_dst_port {
leaf L4_DST_PORT {
type uint16;
}
}
case l4_dst_port_range {
leaf L4_DST_PORT_RANGE {
type string {
pattern '([0-9]{1,4}|[0-5][0-9]{4}|[6][0-4][0-9]{3}|[6][5][0-2][0-9]{2}|[6][5][3][0-5]{2}|[6][5][3][6][0-5])-([0-9]{1,4}|[0-5][0-9]{4}|[6][0-4][0-9]{3}|[6][5][0-2][0-9]{2}|[6][5][3][0-5]{2}|[6][5][3][6][0-5])';
}
}
}
}
leaf ETHER_TYPE {
type string {
pattern "(0x88CC|0x8100|0x8915|0x0806|0x0800|0x86DD|0x8847)";
}
}
leaf IP_PROTOCOL {
type uint8 {
range 1..143;
}
}
leaf TCP_FLAGS {
type string {
pattern '0[x][0-9a-fA-F]{1,2}|0[X][0-9a-fA-F]{1,2}';
}
}
leaf DSCP {
type uint8;
}
leaf TC {
type uint8;
}
choice icmp {
case icmp4 {
when "boolean(IP_TYPE[.='ANY' or .='IP' or .='IPV4' or .='IPV4ANY' or .='ARP'])";
leaf ICMP_TYPE {
type uint8 {
range 1..44;
}
}
leaf ICMP_CODE {
type uint8 {
range 1..16;
}
}
}
case icmp6 {
when "boolean(IP_TYPE[.='ANY' or .='IP' or .='IPV6' or .='IPV6ANY'])";
leaf ICMPV6_TYPE {
type uint8 {
range 1..44;
}
}
leaf ICMPV6_CODE {
type uint8 {
range 1..16;
}
}
}
}
leaf INNER_ETHER_TYPE {
type string {
pattern "(0x88CC|0x8100|0x8915|0x0806|0x0800|0x86DD|0x8847)";
}
}
leaf INNER_IP_PROTOCOL {
type uint8 {
range 1..143;
}
}
leaf INNER_L4_SRC_PORT {
type uint16;
}
leaf INNER_L4_DST_PORT {
type uint16;
}
}
/* end of ACL_RULE_LIST */
}
/* end of container ACL_RULE */
container ACL_TABLE {
description "ACL_TABLE part of config_db.json";
list ACL_TABLE_LIST {
key "ACL_TABLE_NAME";
leaf ACL_TABLE_NAME {
type string;
}
leaf policy_desc {
type string {
length 1..255;
}
}
leaf type {
type head:acl_table_type;
}
leaf stage {
type enumeration {
enum INGRESS;
enum EGRESS;
}
}
leaf-list ports {
/* union of leafref is allowed in YANG 1.1 */
type union {
type leafref {
path /port:port/port:PORT/port:PORT_LIST/port:port_name;
}
type leafref {
path /lag:portchannel/lag:PORTCHANNEL/lag:PORTCHANNEL_LIST/lag:portchannel_name;
}
}
}
}
/* end of ACL_TABLE_LIST */
}
/* end of container ACL_TABLE */
}
/* end of container acl */
}
/* end of module acl */

View File

@ -0,0 +1,66 @@
module test-head {
namespace "http://head";
prefix head;
revision 2019-07-01 {
description "First Revision";
}
typedef ip-family {
type enumeration {
enum IPv4;
enum IPv6;
}
}
typedef admin_status {
type enumeration {
enum up;
enum down;
}
}
typedef packet_action{
type enumeration {
enum DROP;
enum FORWARD;
enum REDIRECT;
}
}
typedef ip_type {
type enumeration {
enum ANY;
enum IP;
enum NON_IP;
enum IPV4;
enum IPV6;
enum IPV4ANY;
enum NON_IPv4;
enum IPV6ANY;
enum NON_IPv6;
enum ARP;
}
}
typedef acl_table_type {
type enumeration {
enum L2;
enum L3;
enum L3V6;
enum MIRROR;
enum MIRRORV6;
enum MIRROR_DSCP;
enum CTRLPLANE;
}
}
typedef vlan_tagging_mode {
type enumeration {
enum tagged;
enum untagged;
enum priority_tagged;
}
}
}

View File

@ -0,0 +1,72 @@
module test-interface {
namespace "http://github.com/Azure/interface";
prefix intf;
import ietf-yang-types {
prefix yang;
}
import ietf-inet-types {
prefix inet;
}
import test-head {
prefix head;
revision-date 2019-07-01;
}
import test-port {
prefix port;
revision-date 2019-07-01;
}
revision 2019-07-01 {
description "First Revision";
}
container interface {
container INTERFACE {
description "INTERFACE part of config_db.json";
list INTERFACE_LIST {
key "interface ip-prefix";
leaf interface {
type leafref {
path /port:port/port:PORT/port:PORT_LIST/port:port_name;
}
}
leaf ip-prefix {
type inet:ip-prefix;
}
leaf scope {
type enumeration {
enum global;
enum local;
}
}
leaf family {
/* family leaf needed for backward compatibility
Both ip4 and ip6 address are string in IETF RFC 6021,
so must statement can check based on : or ., family
should be IPv4 or IPv6 according.
*/
must "(contains(../ip-prefix, ':') and current()='IPv6') or
(contains(../ip-prefix, '.') and current()='IPv4')";
type head:ip-family;
}
}
/* end of INTERFACE_LIST */
}
/* end of INTERFACE container */
}
}

View File

@ -0,0 +1,78 @@
module test-port{
namespace "http://github.com/Azure/port";
prefix port;
import ietf-yang-types {
prefix yang;
}
import ietf-inet-types {
prefix inet;
}
import test-head {
prefix head;
revision-date 2019-07-01;
}
revision 2019-07-01 {
description "First Revision";
}
container port{
container PORT {
description "PORT part of config_db.json";
list PORT_LIST {
key "port_name";
leaf port_name {
type string {
length 1..128;
}
}
leaf alias {
type string {
length 1..128;
}
}
leaf lanes {
type string {
length 1..128;
}
}
leaf description {
type string {
length 1..255;
}
}
leaf speed {
type uint32 {
range 1..100000;
}
}
leaf mtu {
type uint16 {
range 1..9216;
}
}
leaf admin_status {
mandatory true;
type head:admin_status;
}
} /* end of list PORT_LIST */
} /* end of container PORT */
} /* end of container port */
} /* end of module port */

View File

@ -0,0 +1,79 @@
module test-portchannel {
namespace "http://github.com/Azure/portchannel";
prefix lag;
import ietf-yang-types {
prefix yang;
}
import ietf-inet-types {
prefix inet;
}
import test-head {
prefix head;
revision-date 2019-07-01;
}
import test-port {
prefix port;
revision-date 2019-07-01;
}
revision 2019-07-01 {
description "First Revision";
}
container portchannel {
container PORTCHANNEL {
description "PORTCHANNEL part of config_db.json";
list PORTCHANNEL_LIST {
key "portchannel_name";
leaf portchannel_name {
type string {
length 1..128;
pattern 'PortChannel[0-9]{1,4}';
}
}
leaf-list members {
/* leaf-list members are unique by default */
type leafref {
path /port:port/port:PORT/port:PORT_LIST/port:port_name;
}
}
leaf min_links {
type uint8 {
range 1..128;
}
}
leaf description {
type string {
length 1..255;
}
}
leaf mtu {
type uint16 {
range 1..9216;
}
}
leaf admin_status {
mandatory true;
type head:admin_status;
}
} /* end of list PORTCHANNEL_LIST */
} /* end of container PORTCHANNEL */
} /* end of container portchannel */
} /* end of module port */

View File

@ -0,0 +1,144 @@
module test-vlan {
namespace "http://github.com/Azure/vlan";
prefix vlan;
import ietf-yang-types {
prefix yang;
}
import ietf-inet-types {
prefix inet;
}
import test-head {
prefix head;
revision-date 2019-07-01;
}
import test-port {
prefix port;
revision-date 2019-07-01;
}
revision 2019-07-01 {
description "First Revision";
}
container vlan {
container VLAN_INTERFACE {
description "VLAN_INTERFACE part of config_db.json";
list VLAN_INTERFACE_LIST {
key "vlanid ip-prefix";
leaf vlanid {
type leafref {
path ../../../VLAN/VLAN_LIST/vlanid;
}
}
leaf ip-prefix {
mandatory true;
type inet:ip-prefix;
}
leaf scope {
type enumeration {
enum global;
enum local;
}
}
leaf family {
/* family leaf needed for backward compatibility
Both ip4 and ip6 address are string in IETF RFC 6021,
so must statement can check based on : or ., family
should be IPv4 or IPv6 according.
*/
must "(contains(../ip-prefix, ':') and current()='IPv6') or
(contains(../ip-prefix, '.') and current()='IPv4')";
type head:ip-family;
}
}
/* end of VLAN_INTERFACE_LIST */
}
/* end of VLAN_INTERFACE container */
container VLAN {
description "VLAN part of config_db.json";
list VLAN_LIST {
key "vlanid";
leaf vlanid {
type uint16 {
range 1..4094;
}
}
leaf description {
type string {
length 1..255;
}
}
leaf-list dhcp_servers {
type inet:ip-address;
}
leaf mtu {
type uint16 {
range 1..9216;
}
}
leaf admin_status {
mandatory true;
type head:admin_status;
}
}
/* end of VLAN_LIST */
}
/* end of container VLAN */
container VLAN_MEMBER {
description "VLAN_MEMBER part of config_db.json";
list VLAN_MEMBER_LIST {
key "vlanid port";
leaf vlanid {
type leafref {
path ../../../VLAN/VLAN_LIST/vlanid;
}
}
leaf port {
/* key elements are mandatory by default */
mandatory true;
type leafref {
path /port:port/port:PORT/port:PORT_LIST/port:port_name;
}
}
leaf tagging_mode {
mandatory true;
type head:vlan_tagging_mode;
}
}
/* end of list VLAN_MEMBER_LIST */
}
/* end of container VLAN_MEMBER */
}
/* end of container vlan */
}
/* end of module vlan */

View File

@ -0,0 +1,648 @@
{
"VLAN_INTERFACE": {
"Vlan111|2a04:5555:45:6709::1/64": {
"scope": "global",
"family": "IPv6"
},
"Vlan111|10.222.10.65/26": {
"scope": "global",
"family": "IPv4"
},
"Vlan111|fe80::1/10": {
"scope": "local",
"family": "IPv6"
},
"Vlan777|2a04:5555:41:4e9::1/64": {
"scope": "global",
"family": "IPv6"
},
"Vlan777|10.111.58.65/26": {
"scope": "global",
"family": "IPv4"
},
"Vlan777|fe80::1/10": {
"scope": "local",
"family": "IPv6"
}
},
"ACL_RULE": {
"V4-ACL-TABLE|DEFAULT_DENY": {
"PACKET_ACTION": "DROP",
"IP_TYPE": "IPv4ANY",
"PRIORITY": "0"
},
"V4-ACL-TABLE|Rule_20": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.222.72.0/26",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777780",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_40": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.222.72.64/26",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777760",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_60": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.222.80.0/26",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777740",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_80": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.222.80.64/26",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777720",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_111": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.152.17.52/32",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777700",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_120": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.252.208.41/32",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777880",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_140": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.148.128.245/32",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777860",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_160": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.222.1.245/32",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777840",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_180": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "10.252.222.21/32",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "777820",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_9000": {
"PACKET_ACTION": "DROP",
"DST_IP": "0.0.0.0/0",
"SRC_IP": "10.222.0.0/15",
"PRIORITY": "991110",
"IP_TYPE": "IPv4ANY"
},
"V4-ACL-TABLE|Rule_11100": {
"PACKET_ACTION": "FORWARD",
"DST_IP": "0.0.0.0/0",
"SRC_IP": "0.0.0.0/0",
"PRIORITY": "990000",
"IP_TYPE": "IPv4ANY"
},
"V6-ACL-TBLE|DEFAULT_DENY": {
"PACKET_ACTION": "DROP",
"IP_TYPE": "IPv6ANY",
"PRIORITY": "0"
},
"V6-ACL-TBLE|Rule_20": {
"PACKET_ACTION": "FORWARD",
"IP_TYPE": "IPv6ANY",
"SRC_IPV6": "2a04:5555:41::/48",
"PRIORITY": "777780",
"DST_IPV6": "2a04:5555:43:320::/64"
},
"V6-ACL-TBLE|Rule_40": {
"PACKET_ACTION": "FORWARD",
"IP_TYPE": "IPv6ANY",
"SRC_IPV6": "2a04:5555:41::/48",
"PRIORITY": "777760",
"DST_IPV6": "2a04:5555:43:321::/64"
},
"V6-ACL-TBLE|Rule_60": {
"PACKET_ACTION": "FORWARD",
"IP_TYPE": "IPv6ANY",
"SRC_IPV6": "2a04:5555:41::/48",
"PRIORITY": "777740",
"DST_IPV6": "2a04:5555:43:340::/64"
},
"V6-ACL-TBLE|Rule_80": {
"PACKET_ACTION": "FORWARD",
"IP_TYPE": "IPv6ANY",
"SRC_IPV6": "2a04:5555:41::/48",
"PRIORITY": "777720",
"DST_IPV6": "2a04:5555:43:341::/64"
},
"V6-ACL-TBLE|Rule_111": {
"PACKET_ACTION": "FORWARD",
"IP_TYPE": "IPv6ANY",
"SRC_IPV6": "2a04:5555:41::/48",
"PRIORITY": "777700",
"DST_IPV6": "2a04:5555:32:12::/64"
},
"V6-ACL-TBLE|Rule_9000": {
"PACKET_ACTION": "DROP",
"IP_TYPE": "IPv6ANY",
"SRC_IPV6": "2a04:5555:41::/48",
"PRIORITY": "991110",
"DST_IPV6": "::/0"
},
"V6-ACL-TBLE|Rule_11100": {
"PACKET_ACTION": "FORWARD",
"IP_TYPE": "IPv6ANY",
"SRC_IPV6": "::/0",
"PRIORITY": "990000",
"DST_IPV6": "::/0"
}
},
"DEVICE_METADATA": {
"localhost": {
"mg_type": "ToR",
"mac": "00:11:22:33:dd:5a",
"hostname": "asw.dc",
"bgp_asn": "64850",
"hwsku": "Stone"
}
},
"VLAN": {
"Vlan111": {
"description": "svlan",
"dhcp_servers": [
"10.222.72.116"
],
"vlanid": "111",
"mtu": "9216",
"admin_status": "up",
"members": [
"Ethernet8",
"Ethernet3",
"Ethernet0",
"Ethernet1",
"Ethernet6",
"Ethernet4",
"Ethernet5",
"Ethernet9",
"Ethernet2",
"Ethernet7",
"Ethernet32",
"Ethernet30",
"Ethernet31",
"Ethernet36",
"Ethernet34",
"Ethernet33",
"Ethernet35",
"Ethernet29",
"Ethernet21",
"Ethernet20",
"Ethernet23",
"Ethernet22",
"Ethernet27",
"Ethernet26",
"Ethernet18",
"Ethernet19",
"Ethernet14",
"Ethernet15",
"Ethernet16",
"Ethernet17",
"Ethernet10",
"Ethernet11",
"Ethernet12",
"Ethernet13",
"Ethernet28"
]
},
"Vlan777": {
"description": "pvlan",
"dhcp_servers": [
"10.222.72.116"
],
"vlanid": "777",
"mtu": "9216",
"admin_status": "up",
"members": [
"Ethernet9",
"Ethernet2",
"Ethernet8",
"Ethernet27",
"Ethernet14",
"Ethernet35"
]
}
},
"DEVICE_NEIGHBOR": {
"Ethernet112": {
"name": "dccsw01.nw",
"port": "Eth18"
},
"Ethernet114": {
"name": "dccsw02.nw",
"port": "Eth18"
},
"Ethernet116": {
"name": "dccsw03.nw",
"port": "Eth18"
},
"Ethernet118": {
"name": "dccsw04.nw",
"port": "Eth18"
}
},
"PORT": {
"Ethernet0": {
"alias": "Eth1/1",
"lanes": "65",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet1": {
"alias": "Eth1/2",
"lanes": "66",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet2": {
"alias": "Eth1/3",
"lanes": "67",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet3": {
"alias": "Eth1/4",
"lanes": "68",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet4": {
"alias": "Eth2/1",
"lanes": "69",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet5": {
"alias": "Eth2/2",
"lanes": "70",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet6": {
"alias": "Eth2/3",
"lanes": "71",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet7": {
"alias": "Eth2/4",
"lanes": "72",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet8": {
"alias": "Eth3/1",
"lanes": "73",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet9": {
"alias": "Eth3/2",
"lanes": "74",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet10": {
"alias": "Eth3/3",
"lanes": "75",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet11": {
"alias": "Eth3/4",
"lanes": "76",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet12": {
"alias": "Eth4/1",
"lanes": "77",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet13": {
"alias": "Eth4/2",
"lanes": "78",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet14": {
"alias": "Eth4/3",
"lanes": "79",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet15": {
"alias": "Eth4/4",
"lanes": "80",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet16": {
"alias": "Eth5/1",
"lanes": "33",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet17": {
"alias": "Eth5/2",
"lanes": "34",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet18": {
"alias": "Eth5/3",
"lanes": "35",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet19": {
"alias": "Eth5/4",
"lanes": "36",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet20": {
"alias": "Eth6/1",
"lanes": "37",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet21": {
"alias": "Eth6/2",
"lanes": "38",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet22": {
"alias": "Eth6/3",
"lanes": "39",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet23": {
"alias": "Eth6/4",
"lanes": "40",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet24": {
"alias": "Eth7/1",
"lanes": "41",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet25": {
"alias": "Eth7/2",
"lanes": "42",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet26": {
"alias": "Eth7/3",
"lanes": "43",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet27": {
"alias": "Eth7/4",
"lanes": "44",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet28": {
"alias": "Eth8/1",
"lanes": "45",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet29": {
"alias": "Eth8/2",
"lanes": "46",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet30": {
"alias": "Eth8/3",
"lanes": "47",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet31": {
"alias": "Eth8/4",
"lanes": "48",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet32": {
"alias": "Eth9/1",
"lanes": "49",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet33": {
"alias": "Eth9/2",
"lanes": "50",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet34": {
"alias": "Eth9/3",
"lanes": "51",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet35": {
"alias": "Eth9/4",
"lanes": "52",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet36": {
"alias": "Eth10/1",
"lanes": "53",
"description": "",
"speed": "11100",
"admin_status": "up"
},
"Ethernet112": {
"alias": "Eth29/1",
"lanes": "113,114",
"description": "50G|dccsw01.nw|Eth18",
"fec": "fc",
"admin_status": "up"
}
},
"ACL_TABLE": {
"V4-ACL-TABLE": {
"type": "L3",
"policy_desc": "V4-ACL-TABLE",
"ports": [
"Ethernet26",
"Ethernet27",
"Ethernet24"
]
},
"V6-ACL-TBLE": {
"type": "L3V6",
"policy_desc": "V6-ACL-TBLE",
"ports": [
"Ethernet14",
"Ethernet15",
"Ethernet23",
"Ethernet30",
"Ethernet31",
"Ethernet18",
"Ethernet19",
"Ethernet25",
"Ethernet24"
]
}
},
"INTERFACE": {
"Ethernet112|2a04:5555:40:a709::2/126": {
"scope": "global",
"family": "IPv6"
},
"Ethernet112|10.184.228.211/31": {
"scope": "global",
"family": "IPv4"
},
"Ethernet14|2a04:5555:40:a749::2/126": {
"scope": "global",
"family": "IPv6"
},
"Ethernet14|10.184.229.211/31": {
"scope": "global",
"family": "IPv4"
},
"Ethernet16|2a04:5555:40:a789::2/126": {
"scope": "global",
"family": "IPv6"
},
"Ethernet16|10.184.230.211/31": {
"scope": "global",
"family": "IPv4"
},
"Ethernet18|2a04:5555:40:a7c9::2/126": {
"scope": "global",
"family": "IPv6"
},
"Ethernet18|10.184.231.211/31": {
"scope": "global",
"family": "IPv4"
}
},
"VLAN_MEMBER": {
"Vlan111|Ethernet0": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet1": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet2": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet3": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet4": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet5": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet6": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet29": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet30": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet31": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet32": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet33": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet34": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet35": {
"tagging_mode": "untagged"
},
"Vlan111|Ethernet36": {
"tagging_mode": "untagged"
}
},
"LOOPBACK_INTERFACE": {
"Loopback0|2a04:5555:40:4::4e9/128": {
"scope": "global",
"family": "IPv6"
},
"Loopback0|10.184.8.233/32": {
"scope": "global",
"family": "IPv4"
}
},
"CRM": {
"Config": {
"polling_interval": "0"
}
}
}

View File

@ -0,0 +1,144 @@
{
"yang_dir":"./tests/libyang-python-tests/sample-yang-models/",
"data_file":"./tests/libyang-python-tests/config_data.json",
"data_merge_file":"./tests/libyang-python-tests/config_data_merge.json",
"modules":[
{"file":"test-head.yang", "module":"test-head"},
{"file":"test-port.yang", "module":"test-port"},
{"file":"test-acl.yang", "module":"test-acl"},
{"file":"test-interface.yang", "module":"test-interface"},
{"file":"test-portchannel.yang", "module":"test-portchannel"},
{"file":"test-vlan.yang", "module":"test-vlan"}
],
"merged_nodes":[
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet10']/speed", "value":"25000"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='200'][ip-prefix='2000:f500:45:6708::/64']/family",
"value":"IPv6"}
],
"new_nodes":[
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet12']/alias", "value":"Ethernet10_alias"},
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet12']/speed", "value":"5000"},
{"xpath":"/test-acl:acl/ACL_RULE/ACL_RULE_LIST[ACL_TABLE_NAME='PACL-test'][RULE_NAME='rule_20']/RULE_NAME",
"value":"rule_20"}
],
"data_nodes":[
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']/alias", "valid":"True"},
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet20']/alias", "valid":"False"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE", "valid":"True"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST", "valid":"False"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='2000:f500:45:6709::/64']", "valid":"True"}
],
"set_nodes":[
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet10']/speed", "value":"10000"},
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']/mtu", "value":"1500"},
{"xpath":"/test-vlan:vlan/VLAN/VLAN_LIST[vlanid='111']/description", "value":"server_vlan111"}
],
"node_values":[
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']/speed", "value":"25000"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='2000:f500:45:6709::/64']/family",
"value":"IPv6"}
],
"schema_nodes":[
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']/family",
"value":"/test-vlan:vlan/test-vlan:VLAN_INTERFACE/test-vlan:VLAN_INTERFACE_LIST/test-vlan:family"},
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']/speed",
"value":"/test-port:port/test-port:PORT/test-port:PORT_LIST/test-port:speed"}
],
"delete_nodes":[
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet10']/speed", "valid":"False"},
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']/mtu", "valid":"True"},
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet20']/mtu", "valid":"False"}
],
"dependencies":[
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet8']/port_name",
"dependencies":
["/test-acl:acl/ACL_TABLE/ACL_TABLE_LIST[ACL_TABLE_NAME='PACL-V6']/ports[.='Ethernet8']",
"/test-interface:interface/INTERFACE/INTERFACE_LIST[interface='Ethernet8'][ip-prefix='10.1.1.64/26']/interface",
"/test-interface:interface/INTERFACE/INTERFACE_LIST[interface='Ethernet8'][ip-prefix='2000:f500:40:a749::/126']/interface"]}
],
"schema_dependencies":[
{"xpath":"/test-port:port/test-port:PORT/test-port:PORT_LIST/test-port:port_name",
"schema_dependencies":
["/test-acl:acl/test-acl:ACL_TABLE/test-acl:ACL_TABLE_LIST/test-acl:ports",
"/test-portchannel:portchannel/test-portchannel:PORTCHANNEL/test-portchannel:PORTCHANNEL_LIST/test-portchannel:members",
"/test-interface:interface/test-interface:INTERFACE/test-interface:INTERFACE_LIST/test-interface:interface",
"/test-vlan:vlan/test-vlan:VLAN_MEMBER/test-vlan:VLAN_MEMBER_LIST/test-vlan:port"]}
],
"members":[
{"xpath":"/test-port:port/PORT/PORT_LIST",
"members":
["/test-port:port/PORT/PORT_LIST[port_name='Ethernet0']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet1']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet2']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet3']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet4']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet5']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet6']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet7']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet8']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet10']",
"/test-port:port/PORT/PORT_LIST[port_name='Ethernet12']"]}
],
"parents":[
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='2000:f500:45:6709::/64']/family",
"parent":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='2000:f500:45:6709::/64']"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']/scope",
"parent":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']/vlanid",
"parent":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']/ip-prefix",
"parent":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']"},
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']/family",
"parent":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='10.1.1.64/26']"},
{"xpath":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']/speed",
"parent":"/test-port:port/PORT/PORT_LIST[port_name='Ethernet9']"}
],
"prefix":[
{"module_name":"test-head", "module_prefix":"head"},
{"module_name":"test-port", "module_prefix":"port"},
{"module_name":"test-acl", "module_prefix":"acl"},
{"module_name":"test-interface", "module_prefix":"intf"},
{"module_name":"test-portchannel", "module_prefix":"lag"},
{"module_name":"test-vlan", "module_prefix":"vlan"}
],
"data_type":[
{"xpath":"/test-port:port/test-port:PORT/test-port:PORT_LIST/test-port:port_name", "data_type":"LY_TYPE_STRING"},
{"xpath":"/test-vlan:vlan/test-vlan:VLAN_INTERFACE/test-vlan:VLAN_INTERFACE_LIST/test-vlan:vlanid", "data_type":"LY_TYPE_LEAFREF"}
],
"leafref_type":[
{"xpath":"/test-vlan:vlan/VLAN_INTERFACE/VLAN_INTERFACE_LIST[vlanid='111'][ip-prefix='2000:f500:45:6709::/64']/vlanid", "data_type":"LY_TYPE_UINT16"},
{"xpath":"/test-interface:interface/INTERFACE/INTERFACE_LIST[interface='Ethernet8'][ip-prefix='2000:f500:40:a749::/126']/interface", "data_type":"LY_TYPE_STRING"},
{"xpath":"/test-vlan:vlan/VLAN_MEMBER/VLAN_MEMBER_LIST[vlanid='111'][port='Ethernet0']/port", "data_type":"LY_TYPE_STRING"},
{"xpath":"/test-vlan:vlan/VLAN_MEMBER/VLAN_MEMBER_LIST[vlanid='111'][port='Ethernet0']/vlanid", "data_type":"LY_TYPE_UINT16"}
],
"leafref_type_schema":[
{"xpath":"/test-vlan:vlan/test-vlan:VLAN_INTERFACE/test-vlan:VLAN_INTERFACE_LIST/test-vlan:vlanid",
"data_type":"LY_TYPE_UINT16"},
{"xpath":"/test-interface:interface/test-interface:INTERFACE/test-interface:INTERFACE_LIST/test-interface:interface",
"data_type":"LY_TYPE_STRING"},
{"xpath":"/test-vlan:vlan/test-vlan:VLAN_MEMBER/test-vlan:VLAN_MEMBER_LIST/test-vlan:port",
"data_type":"LY_TYPE_STRING"},
{"xpath":"/test-vlan:vlan/test-vlan:VLAN_MEMBER/test-vlan:VLAN_MEMBER_LIST/test-vlan:vlanid",
"data_type":"LY_TYPE_UINT16"}
],
"leafref_path":[
{"xpath":"/test-vlan:vlan/test-vlan:VLAN_INTERFACE/test-vlan:VLAN_INTERFACE_LIST/test-vlan:vlanid",
"leafref_path":"../../../VLAN/VLAN_LIST/vlanid"},
{"xpath":"/test-interface:interface/test-interface:INTERFACE/test-interface:INTERFACE_LIST/test-interface:interface",
"leafref_path":"/test-port:port/test-port:PORT/test-port:PORT_LIST/test-port:port_name"},
{"xpath":"/test-vlan:vlan/test-vlan:VLAN_MEMBER/test-vlan:VLAN_MEMBER_LIST/test-vlan:port",
"leafref_path":"/test-port:port/test-port:PORT/test-port:PORT_LIST/test-port:port_name"},
{"xpath":"/test-vlan:vlan/test-vlan:VLAN_MEMBER/test-vlan:VLAN_MEMBER_LIST/test-vlan:vlanid",
"leafref_path":"../../../VLAN/VLAN_LIST/vlanid"}
]
}

View File

@ -0,0 +1,324 @@
import sys
import os
import pytest
import sonic_yang as sy
import json
import glob
import logging
from ijson import items as ijson_itmes
test_path = os.path.dirname(os.path.abspath(__file__))
modules_path = os.path.dirname(test_path)
sys.path.insert(0, modules_path)
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger("YANG-TEST")
log.setLevel(logging.INFO)
log.addHandler(logging.NullHandler())
class Test_SonicYang(object):
# class vars
@pytest.fixture(autouse=True, scope='class')
def data(self):
test_file = "./tests/libyang-python-tests/test_SonicYang.json"
data = self.jsonTestParser(test_file)
return data
@pytest.fixture(autouse=True, scope='class')
def yang_s(self, data):
yang_dir = str(data['yang_dir'])
yang_s = sy.SonicYang(yang_dir)
return yang_s
def jsonTestParser(self, file):
"""
Open the json test file
"""
with open(file) as data_file:
data = json.load(data_file)
return data
"""
Get the JSON input based on func name
and return jsonInput
"""
def readIjsonInput(self, yang_test_file, test):
try:
# load test specific Dictionary, using Key = func
# this is to avoid loading very large JSON in memory
print(" Read JSON Section: " + test)
jInput = ""
with open(yang_test_file, 'rb') as f:
jInst = ijson_itmes(f, test)
for it in jInst:
jInput = jInput + json.dumps(it)
except Exception as e:
print("Reading Ijson failed")
raise(e)
return jInput
def setup_class(self):
pass
def load_yang_model_file(self, yang_s, yang_dir, yang_file, module_name):
yfile = yang_dir + yang_file
try:
yang_s._load_schema_module(str(yfile))
except Exception as e:
print(e)
raise
#test load and get yang module
def test_load_yang_model_files(self, data, yang_s):
yang_dir = data['yang_dir']
for module in data['modules']:
file = str(module['file'])
module = str(module['module'])
self.load_yang_model_file(yang_s, yang_dir, file, module)
assert yang_s._get_module(module) is not None
#test load non-exist yang module file
def test_load_invalid_model_files(self, data, yang_s):
yang_dir = data['yang_dir']
file = "invalid.yang"
module = "invalid"
with pytest.raises(Exception):
assert self.load_yang_model_file(yang_s, yang_dir, file, module)
#test load yang modules in directory
def test_load_yang_model_dir(self, data, yang_s):
yang_dir = data['yang_dir']
yang_s._load_schema_modules(str(yang_dir))
for module_name in data['modules']:
assert yang_s._get_module(str(module_name['module'])) is not None
#test load yang modules and data files
def test_load_yang_model_data(self, data, yang_s):
yang_dir = str(data['yang_dir'])
yang_files = glob.glob(yang_dir+"/*.yang")
data_file = str(data['data_file'])
data_merge_file = str(data['data_merge_file'])
data_files = []
data_files.append(data_file)
data_files.append(data_merge_file)
print(yang_files)
yang_s._load_data_model(yang_dir, yang_files, data_files)
#validate the data tree from data_merge_file is loaded
for node in data['merged_nodes']:
xpath = str(node['xpath'])
value = str(node['value'])
val = yang_s._find_data_node_value(xpath)
assert str(val) == str(value)
#test load data file
def test_load_data_file(self, data, yang_s):
data_file = str(data['data_file'])
yang_s._load_data_file(data_file)
#test_validate_data_tree():
def test_validate_data_tree(self, data, yang_s):
yang_s.validate_data_tree()
#test find node
def test_find_node(self, data, yang_s):
for node in data['data_nodes']:
expected = node['valid']
xpath = str(node['xpath'])
dnode = yang_s._find_data_node(xpath)
if(expected == "True"):
assert dnode is not None
assert dnode.path() == xpath
else:
assert dnode is None
#test add node
def test_add_node(self, data, yang_s):
for node in data['new_nodes']:
xpath = str(node['xpath'])
value = node['value']
yang_s._add_data_node(xpath, str(value))
data_node = yang_s._find_data_node(xpath)
assert data_node is not None
#test find node value
def test_find_data_node_value(self, data, yang_s):
for node in data['node_values']:
xpath = str(node['xpath'])
value = str(node['value'])
print(xpath)
print(value)
val = yang_s._find_data_node_value(xpath)
assert str(val) == str(value)
#test delete data node
def test_delete_node(self, data, yang_s):
for node in data['delete_nodes']:
xpath = str(node['xpath'])
yang_s._deleteNode(xpath)
#test set node's value
def test_set_datanode_value(self, data, yang_s):
for node in data['set_nodes']:
xpath = str(node['xpath'])
value = node['value']
yang_s._set_data_node_value(xpath, value)
val = yang_s._find_data_node_value(xpath)
assert str(val) == str(value)
#test list of members
def test_find_members(self, yang_s, data):
for node in data['members']:
members = node['members']
xpath = str(node['xpath'])
list = yang_s._find_data_nodes(xpath)
assert list.sort() == members.sort()
#get parent xpath
def test_get_parent_data_xpath(self, yang_s, data):
for node in data['parents']:
xpath = str(node['xpath'])
expected_xpath = str(node['parent'])
path = yang_s._get_parent_data_xpath(xpath)
assert path == expected_xpath
#test find_data_node_schema_xpath
def test_find_data_node_schema_xpath(self, yang_s, data):
for node in data['schema_nodes']:
xpath = str(node['xpath'])
schema_xpath = str(node['value'])
path = yang_s._find_data_node_schema_xpath(xpath)
assert path == schema_xpath
#test data dependencies
def test_find_data_dependencies(self, yang_s, data):
for node in data['dependencies']:
xpath = str(node['xpath'])
list = node['dependencies']
depend = yang_s.find_data_dependencies(xpath)
assert set(depend) == set(list)
#test data dependencies
def test_find_schema_dependencies(self, yang_s, data):
for node in data['schema_dependencies']:
xpath = str(node['xpath'])
list = node['schema_dependencies']
depend = yang_s._find_schema_dependencies(xpath)
assert set(depend) == set(list)
#test merge data tree
def test_merge_data_tree(self, data, yang_s):
data_merge_file = data['data_merge_file']
yang_dir = str(data['yang_dir'])
yang_s._merge_data(data_merge_file, yang_dir)
#yang_s.root.print_mem(ly.LYD_JSON, ly.LYP_FORMAT)
#test get module prefix
def test_get_module_prefix(self, yang_s, data):
for node in data['prefix']:
xpath = str(node['module_name'])
expected = node['module_prefix']
prefix = yang_s._get_module_prefix(xpath)
assert expected == prefix
#test get data type
def test_get_data_type(self, yang_s, data):
for node in data['data_type']:
xpath = str(node['xpath'])
expected = node['data_type']
expected_type = yang_s._str_to_type(expected)
data_type = yang_s._get_data_type(xpath)
assert expected_type == data_type
def test_get_leafref_type(self, yang_s, data):
for node in data['leafref_type']:
xpath = str(node['xpath'])
expected = node['data_type']
expected_type = yang_s._str_to_type(expected)
data_type = yang_s._get_leafref_type(xpath)
assert expected_type == data_type
def test_get_leafref_path(self, yang_s, data):
for node in data['leafref_path']:
xpath = str(node['xpath'])
expected_path = node['leafref_path']
path = yang_s._get_leafref_path(xpath)
assert expected_path == path
def test_get_leafref_type_schema(self, yang_s, data):
for node in data['leafref_type_schema']:
xpath = str(node['xpath'])
expected = node['data_type']
expected_type = yang_s._str_to_type(expected)
data_type = yang_s._get_leafref_type_schema(xpath)
assert expected_type == data_type
"""
This is helper function to load YANG models for tests cases, which works
on Real SONiC Yang models. Mainly tests for translation and reverse
translation.
"""
@pytest.fixture(autouse=True, scope='class')
def sonic_yang_data(self):
sonic_yang_dir = "../sonic-yang-models/yang-models/"
sonic_yang_test_file = "../sonic-yang-models/tests/yang_model_tests/yangTest.json"
syc = sy.SonicYang(sonic_yang_dir)
syc.loadYangModel()
sonic_yang_data = dict()
sonic_yang_data['yang_dir'] = sonic_yang_dir
sonic_yang_data['test_file'] = sonic_yang_test_file
sonic_yang_data['syc'] = syc
return sonic_yang_data
def test_xlate_rev_xlate(self, sonic_yang_data):
# In this test, xlation and revXlation is tested with latest Sonic
# YANG model.
test_file = sonic_yang_data['test_file']
syc = sonic_yang_data['syc']
jIn = self.readIjsonInput(test_file, 'SAMPLE_CONFIG_DB_JSON')
syc.loadData(json.loads(jIn))
# TODO: Make sure no extra table is loaded
syc.getData()
if syc.jIn and syc.jIn == syc.revXlateJson:
print("Xlate and Rev Xlate Passed")
else:
print("Xlate and Rev Xlate failed")
# make it fail
assert False == True
return
def test_table_with_no_yang(self, sonic_yang_data):
# in this test, tables with no YANG models must be stored seperately
# by this library.
test_file = sonic_yang_data['test_file']
syc = sonic_yang_data['syc']
jIn = self.readIjsonInput(test_file, 'SAMPLE_CONFIG_DB_JSON_1')
syc.loadData(json.loads(jIn))
ty = syc.tablesWithOutYang
assert (len(ty) and "UNKNOWN_TABLE" in ty)
return
def teardown_class(self):
pass

View File

@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Tests for `sonic_yang_mgmt` package."""
import pytest
@pytest.fixture
def response():
"""Sample pytest fixture.
See more at: http://doc.pytest.org/en/latest/fixture.html
"""
# import requests
# return requests.get('https://github.com/audreyr/cookiecutter-pypackage')
def test_content(response):
"""Sample pytest test function with the pytest fixture as an argument."""
# from bs4 import BeautifulSoup
# assert 'GitHub' in BeautifulSoup(response.content).title.string

View File

@ -1275,10 +1275,76 @@
"family": "IPv4"
}
},
"CRM": {
"Config": {
"polling_interval": "0"
}
}
}
"BREAKOUT_CFG": {
"Ethernet0": {
"brkout_mode": "1x100G[40G]"
},
"Ethernet4": {
"brkout_mode": "4x25G"
},
"Ethernet8": {
"brkout_mode": "1x100G[40G]"
}
},
"VERSIONS": {
"DATABASE": {
"VERSION": "version_1_0_3"
}
},
"FLEX_COUNTER_TABLE": {
"PFCWD": {
"FLEX_COUNTER_STATUS": "enable"
},
"PG_WATERMARK": {
"FLEX_COUNTER_STATUS": "enable"
},
"PORT": {
"FLEX_COUNTER_STATUS": "enable"
},
"QUEUE": {
"FLEX_COUNTER_STATUS": "enable"
},
"QUEUE_WATERMARK": {
"FLEX_COUNTER_STATUS": "enable"
}
},
"CRM": {
"Config": {
"acl_counter_high_threshold": "85",
"acl_counter_low_threshold": "70",
"acl_counter_threshold_type": "percentage",
"polling_interval": "0"
}
}
},
"SAMPLE_CONFIG_DB_JSON_1": {
"FLEX_COUNTER_TABLE": {
"PFCWD": {
"FLEX_COUNTER_STATUS": "enable"
},
"PG_WATERMARK": {
"FLEX_COUNTER_STATUS": "enable"
},
"PORT": {
"FLEX_COUNTER_STATUS": "enable"
},
"QUEUE": {
"FLEX_COUNTER_STATUS": "enable"
},
"QUEUE_WATERMARK": {
"FLEX_COUNTER_STATUS": "enable"
}
},
"CRM": {
"Config": {
"acl_counter_high_threshold": "85",
"acl_counter_low_threshold": "70",
"acl_counter_threshold_type": "percentage",
"polling_interval": "0"
}
},
"UNKNOWN_TABLE": {
"Error": "This Table is for testing, This Table does not have YANG models."
}
}
}